All Products
Search
Document Center

MaxCompute:Use Flink to write data to MaxCompute

Last Updated:Mar 26, 2026

The MaxCompute Flink connector lets you write real-time Flink data to standard tables and Delta tables in MaxCompute. Both Flink SQL and the DataStream API are supported.

Choose a deployment path

Before you start, identify which Flink environment you are using:

Deployment path Setup effort When to use
Fully managed Flink (Realtime Compute for Apache Flink) None — connector is pre-loaded on Ververica Platform (VVP) You are already running Flink jobs on Alibaba Cloud
Self-managed open source Flink Download connector JAR and add it to the cluster You manage your own Flink cluster (versions 1.13, 1.15, 1.16, or 1.17)

Jump to the section that matches your setup:

Write modes and idempotent writes

The connector supports two write modes, set with the sink.operation parameter:

Mode Supported table types Behavior
insert Standard tables and Delta tables Append-only. Replayed records after a checkpoint recovery create duplicates.
upsert Delta tables only Insert-or-update based on primary key. Replayed records are deduplicated, giving idempotent writes.

Why upsert mode matters for fault tolerance: When a Flink job recovers from a failure, it replays data from the last successful checkpoint. In insert mode, replayed records create duplicate rows. In upsert mode, the connector uses the primary key to merge replayed records, so the final result is correct regardless of how many times records are replayed.

For any pipeline where correctness after failure matters, use upsert mode.

Grouping in upsert mode: You can group data by primary key (default) or by partition field. Grouping by partition field is useful when the table has many partitions but may cause data skew.

Checkpoint interval: Set the checkpoint interval to more than 3 minutes for upsert jobs. Shorter intervals increase checkpoint frequency, which lowers write throughput and generates many small files.

For recommended upsert parameter configurations, see Real-time data ingestion into a data warehouse.

Data type mapping

The following table maps data types between Realtime Compute for Apache Flink and MaxCompute.

Flink data type MaxCompute data type
CHAR(p) CHAR(p)
VARCHAR(p) VARCHAR(p)
STRING STRING
BOOLEAN BOOLEAN
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT LONG
FLOAT FLOAT
DOUBLE DOUBLE
DECIMAL(p, s) DECIMAL(p, s)
DATE DATE
TIMESTAMP(9) WITHOUT TIME ZONE, TIMESTAMP_LTZ(9) TIMESTAMP
TIMESTAMP(3) WITHOUT TIME ZONE, TIMESTAMP_LTZ(3) DATETIME
BYTES BINARY
ARRAY\<T\> LIST\<T\>
MAP\<K, V\> MAP\<K, V\>
ROW STRUCT
The Flink TIMESTAMP type does not carry time zone information, while the MaxCompute TIMESTAMP type does. This difference causes an 8-hour discrepancy. Use TIMESTAMP_LTZ(9) for fields that represent an absolute point in time:
CREATE TEMPORARY TABLE odps_source(
  id BIGINT NOT NULL COMMENT 'id',
  created_time TIMESTAMP NOT NULL COMMENT 'Creation time',
  updated_time TIMESTAMP_LTZ(9) NOT NULL COMMENT 'Update time',
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'maxcompute',
  ...
);

Write data from a self-managed open source Flink cluster to MaxCompute

Prerequisites

Before you begin, ensure that you have:

  • A running open source Flink cluster (version 1.13, 1.15, 1.16, or 1.17)

  • A MaxCompute project with permission to create tables and write data

  • Your AccessKey ID and AccessKey secret (available on the AccessKey Pair page)

Step 1: Create a MaxCompute table

Create the target Delta table in MaxCompute. The following example creates a non-partitioned table and a partitioned table. For Delta table property details, see Delta table parameters.

-- Non-partitioned Delta table
CREATE TABLE mf_flink_tt (
  id BIGINT NOT NULL,
  name STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id)
)
TBLPROPERTIES (
  "transactional" = "true",
  "write.bucket.num" = "64",
  "acid.data.retain.hours" = "12"
);

-- Partitioned Delta table
CREATE TABLE mf_flink_tt_part (
  id BIGINT NOT NULL,
  name STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id)
)
PARTITIONED BY (dd STRING, hh STRING)
TBLPROPERTIES (
  "transactional" = "true",
  "write.bucket.num" = "64",
  "acid.data.retain.hours" = "12"
);

Step 2: Install the Flink connector

Download the JAR for your Flink version and add it to the cluster.

Flink version Connector JAR
1.13 flink-connector-maxcompute-1.13.jar
1.15 flink-connector-maxcompute-1.15.jar
1.16 or 1.17 flink-connector-maxcompute-1.16.jar
The 1.16 connector is compatible with both Flink 1.16 and 1.17.

Move the downloaded JAR to the lib directory of your Flink installation:

mv flink-connector-odps-1.13-shaded.jar $FLINK_HOME/lib/

Step 3: Start Flink

cd $FLINK_HOME/bin
./start-cluster.sh
./sql-client.sh

Step 4: Create a Flink table and write data

Choose Flink SQL or the DataStream API based on your job type.

Use Flink SQL

Register the MaxCompute tables in Flink SQL and write data:

-- Register the non-partitioned table
CREATE TABLE mf_flink (
  id BIGINT,
  name STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector'          = 'maxcompute',
  'table.name'         = 'mf_flink_tt',
  'sink.operation'     = 'upsert',
  'odps.access.id'     = 'LTAI****************',
  'odps.access.key'    = '********************',
  'odps.end.point'     = 'http://service.cn-beijing.maxcompute.aliyun.com/api',
  'odps.project.name'  = 'mf_mc_bj'
);

-- Register the partitioned table
CREATE TABLE mf_flink_part (
  id BIGINT,
  name STRING,
  age INT,
  status BOOLEAN,
  dd STRING,
  hh STRING,
  PRIMARY KEY (id) NOT ENFORCED
) PARTITIONED BY (`dd`, `hh`)
WITH (
  'connector'          = 'maxcompute',
  'table.name'         = 'mf_flink_tt_part',
  'sink.operation'     = 'upsert',
  'odps.access.id'     = 'LTAI****************',
  'odps.access.key'    = '********************',
  'odps.end.point'     = 'http://service.cn-beijing.maxcompute.aliyun.com/api',
  'odps.project.name'  = 'mf_mc_bj'
);

Insert records and verify the results in MaxCompute:

-- Insert into the non-partitioned table
INSERT INTO mf_flink VALUES (1, 'Danny', 27, false);

-- Query MaxCompute to verify
SELECT * FROM mf_flink_tt;
-- Result:
-- +----+-------+-----+--------+
-- | id | name  | age | status |
-- +----+-------+-----+--------+
-- | 1  | Danny |  27 | false  |
-- +----+-------+-----+--------+

-- Update the record via upsert (same primary key, new age value)
INSERT INTO mf_flink VALUES (1, 'Danny', 28, false);

SELECT * FROM mf_flink_tt;
-- Result:
-- +----+-------+-----+--------+
-- | id | name  | age | status |
-- +----+-------+-----+--------+
-- | 1  | Danny |  28 | false  |
-- +----+-------+-----+--------+

-- Insert into the partitioned table
INSERT INTO mf_flink_part VALUES (1, 'Danny', 27, false, '01', '01');

SELECT * FROM mf_flink_tt_part WHERE dd = '01' AND hh = '01';
-- Result:
-- +----+-------+-----+--------+----+----+
-- | id | name  | age | status | dd | hh |
-- +----+-------+-----+--------+----+----+
-- | 1  | Danny |  27 | false  | 01 | 01 |
-- +----+-------+-----+--------+----+----+

Use the DataStream API

Add the connector as a Maven dependency:

<dependency>
  <groupId>com.aliyun.odps</groupId>
  <artifactId>flink-connector-maxcompute</artifactId>
  <version>xxx</version>
  <scope>system</scope>
  <systemPath>${mvn_project.basedir}/lib/flink-connector-maxcompute-xxx-shaded.jar</systemPath>
</dependency>
Replace xxx with the actual connector version.

The following example shows how to configure the sink using OdpsPipeline:

package com.aliyun.odps.flink.examples;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.odps.table.OdpsOptions;
import org.apache.flink.odps.util.OdpsConf;
import org.apache.flink.odps.util.OdpsPipeline;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.RowData;

public class Examples {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(120 * 1000);

        StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env);

        Table source = streamTableEnvironment.sqlQuery("SELECT * FROM source_table");
        DataStream<RowData> input = streamTableEnvironment.toAppendStream(source, RowData.class);

        // Configure sink behavior
        Configuration config = new Configuration();
        config.set(OdpsOptions.SINK_OPERATION, "upsert");
        config.set(OdpsOptions.UPSERT_COMMIT_THREAD_NUM, 8);
        config.set(OdpsOptions.UPSERT_MAJOR_COMPACT_MIN_COMMITS, 100);

        // Configure MaxCompute credentials and endpoint
        OdpsConf odpsConfig = new OdpsConf(
            "accessid",
            "accesskey",
            "endpoint",
            "project",
            "tunnel endpoint"
        );

        // Build and run the sink pipeline
        OdpsPipeline.Builder builder = OdpsPipeline.builder();
        builder.projectName("sql2_isolation_2a")
               .tableName("user_ledger_portfolio")
               .partition("")
               .configuration(config)
               .odpsConf(odpsConfig)
               .sink(input, false);

        env.execute();
    }
}

Write data from fully managed Flink on Alibaba Cloud to MaxCompute

The Flink connector is pre-loaded on Ververica Platform (VVP). No additional installation is needed.

Prerequisites

Before you begin, ensure that you have:

Step 1: Create a MaxCompute table

SET odps.sql.type.system.odps2 = true;

DROP TABLE IF EXISTS mf_flink_upsert;

CREATE TABLE mf_flink_upsert (
  c1 INT NOT NULL,
  c2 STRING,
  gt TIMESTAMP,
  PRIMARY KEY (c1)
)
PARTITIONED BY (ds STRING)
TBLPROPERTIES (
  "transactional" = "true",
  "write.bucket.num" = "64",
  "acid.data.retain.hours" = "12"
);

Step 2: Create and deploy a Flink SQL job

On the Flink job development page, create a Flink SQL job. The following example defines a source table that auto-generates random data and a sink table that writes to MaxCompute in upsert mode. For job development details, see Job development map.

-- Source table with auto-generated data
CREATE TEMPORARY TABLE fake_src_table (
  c1 INT,
  c2 VARCHAR,
  gt AS CURRENT_TIMESTAMP
) WITH (
  'connector'               = 'faker',
  'fields.c2.expression'    = '#{superhero.name}',
  'rows-per-second'         = '100',
  'fields.c1.expression'    = '#{number.numberBetween ''0'',''1000''}'
);

-- Sink table pointing to the MaxCompute Delta table
CREATE TEMPORARY TABLE test_c_d_g (
  c1 INT,
  c2 VARCHAR,
  gt TIMESTAMP,
  ds VARCHAR,
  PRIMARY KEY (c1) NOT ENFORCED
) PARTITIONED BY (ds)
WITH (
  'connector'               = 'maxcompute',
  'table.name'              = 'mf_flink_upsert',
  'sink.operation'          = 'upsert',
  'odps.access.id'          = 'LTAI****************',
  'odps.access.key'         = '********************',
  'odps.end.point'          = 'http://service.cn-beijing.maxcompute.aliyun.com/api',
  'odps.project.name'       = 'mf_mc_bj',
  'upsert.write.bucket.num' = '64'
);

-- Write logic
INSERT INTO test_c_d_g
SELECT
  c1,
  c2,
  gt,
  date_format(gt, 'yyyyMMddHH') AS ds
FROM fake_src_table;
For odps.end.point, use the internal network endpoint for your region. The upsert.write.bucket.num value must match the write.bucket.num property of the MaxCompute table.

After you develop the job, deploy it from the Flink job development page.

Step 3: Verify data in MaxCompute

SELECT * FROM mf_flink_upsert WHERE ds = '2023061517';

-- The Flink source generates random data, so your results will differ.
-- Example output:
-- +-----+--------------------+-------------------------+------------+
-- | c1  | c2                 | gt                      | ds         |
-- +-----+--------------------+-------------------------+------------+
-- | 0   | Skaar              | 2023-06-16 01:59:41.116 | 2023061517 |
-- | 21  | Supah Century      | 2023-06-16 01:59:59.117 | 2023061517 |
-- | 104 | Dark Gorilla Grodd | 2023-06-16 01:59:57.117 | 2023061517 |
-- | 126 | Leader             | 2023-06-16 01:59:39.116 | 2023061517 |
-- +-----+--------------------+-------------------------+------------+

Connector parameters

Basic parameters

Parameter Required Default Description
connector Yes Connector type. Set to maxcompute.
odps.project.name Yes MaxCompute project name.
odps.access.id Yes AccessKey ID. Get it from the AccessKey Pair page.
odps.access.key Yes AccessKey secret. Get it from the AccessKey Pair page.
odps.end.point Yes MaxCompute endpoint. See Endpoint for regional values.
odps.tunnel.end.point No Auto-routed to match the MaxCompute network Tunnel service endpoint. If set, automatic routing is disabled. See Endpoint for regional values.
odps.tunnel.quota.name No Tunnel quota name for accessing MaxCompute.
table.name Yes MaxCompute table name. Format: [project.][schema.]table.
odps.namespace.schema No false Whether to use the three-layer model (project.schema.table). See Schema operations.
sink.operation Yes insert Write mode. Valid values: insert, upsert. Only Delta tables support upsert.
sink.parallelism No Inherits input parallelism Sink write parallelism. Set write.bucket.num to an integer multiple of this value for optimal performance and minimum memory use on the sink node.
sink.meta.cache.time No 400 Metadata cache size.
sink.meta.cache.expire.time No 1200 Metadata cache timeout, in seconds.
sink.coordinator.enable No true Whether to enable coordinator mode.

Partition parameters

Parameter Required Default Description
sink.partition No Target partition name. For dynamic partitioning, this is the parent partition name.
sink.partition.default-value No __DEFAULT_PARTITION__ Default partition name for null partition key values in dynamic partitioning.
sink.dynamic-partition.limit No 100 Maximum number of partitions written concurrently in a single checkpoint. Do not increase this significantly — writing to too many partitions at the same time can cause out-of-memory (OOM) errors on the sink node.
sink.group-partition.enable No false Whether to group data by partition when writing to dynamic partitions.
sink.partition.assigner.class No Custom PartitionAssigner implementation class.

FileCached mode parameters

Use FileCached mode when you have a large number of dynamic partitions. In this mode, data is written to local cache files before being uploaded to MaxCompute.

Parameter Required Default Description
sink.file-cached.enable No false Whether to enable FileCached mode. Set to true when writing to many dynamic partitions.
sink.file-cached.tmp.dirs No ./local Local directory for cache files.
sink.file-cached.writer.num No 16 Number of concurrent upload threads per task. Do not increase this significantly — high concurrency can cause OOM errors.
sink.bucket.check-interval No 60000 Interval to check file size in FileCached mode, in milliseconds.
sink.file-cached.rolling.max-size No 16 M Maximum size of a single cache file. Files that exceed this size are uploaded immediately.
sink.file-cached.memory No 64 M Maximum off-heap memory for file writes in FileCached mode.
sink.file-cached.memory.segment-size No 128 KB Buffer size for file writes in FileCached mode.
sink.file-cached.flush.always No true Whether to use buffered writes in FileCached mode.
sink.file-cached.write.max-retries No 3 Number of upload retries in FileCached mode.

Upsert write parameters

For recommended parameter configurations, see Recommended parameter configurations for upsert writes.

Parameter Required Default Description
upsert.write.bucket.num Yes Number of buckets for the sink table. Must match the write.bucket.num property of the MaxCompute table.
upsert.writer.max-retries No 3 Number of retries if a writer fails to write to a bucket.
upsert.writer.buffer-size No 64 m Per-writer buffer size. When the total buffer across all buckets reaches this threshold, the writer flushes to the server. Increase this to improve throughput; decrease it if writing to many partitions causes OOM errors.
upsert.writer.bucket.buffer-size No 1 m Per-bucket buffer size. Decrease this if the Flink node is low on memory.
upsert.write.slot-num No 1 Number of Tunnel slots per session.
upsert.commit.max-retries No 3 Number of retries for a session commit.
upsert.commit.thread-num No 16 Commit parallelism. Keep this value low — high concurrency increases resource usage and may cause performance issues.
upsert.commit.timeout No 600 Timeout for a session commit to complete, in seconds.
upsert.major-compact.enable No false Whether to enable major compaction.
upsert.major-compact.min-commits No 100 Minimum number of commits required to trigger a major compaction.
upsert.flush.concurrent No 2 Maximum number of buckets flushed concurrently in a single partition. Each flush occupies one Tunnel slot.

Insert write parameters

Parameter Required Default Description
insert.commit.thread-num No 16 Commit parallelism for insert sessions.
insert.arrow-writer.enable No false Whether to use the Arrow format for writes.
insert.arrow-writer.batch-size No 512 Maximum number of rows per Arrow batch.
insert.arrow-writer.flush-interval No 100000 Interval at which the Arrow writer flushes data, in milliseconds.
insert.writer.buffer-size No 64 M Buffer size for the insert writer.

What's next