The MaxCompute Flink connector lets you write real-time Flink data to standard tables and Delta tables in MaxCompute. Both Flink SQL and the DataStream API are supported.
Choose a deployment path
Before you start, identify which Flink environment you are using:
| Deployment path | Setup effort | When to use |
|---|---|---|
| Fully managed Flink (Realtime Compute for Apache Flink) | None — connector is pre-loaded on Ververica Platform (VVP) | You are already running Flink jobs on Alibaba Cloud |
| Self-managed open source Flink | Download connector JAR and add it to the cluster | You manage your own Flink cluster (versions 1.13, 1.15, 1.16, or 1.17) |
Jump to the section that matches your setup:
Write modes and idempotent writes
The connector supports two write modes, set with the sink.operation parameter:
| Mode | Supported table types | Behavior |
|---|---|---|
insert |
Standard tables and Delta tables | Append-only. Replayed records after a checkpoint recovery create duplicates. |
upsert |
Delta tables only | Insert-or-update based on primary key. Replayed records are deduplicated, giving idempotent writes. |
Why upsert mode matters for fault tolerance: When a Flink job recovers from a failure, it replays data from the last successful checkpoint. In insert mode, replayed records create duplicate rows. In upsert mode, the connector uses the primary key to merge replayed records, so the final result is correct regardless of how many times records are replayed.
For any pipeline where correctness after failure matters, use upsert mode.
Grouping in upsert mode: You can group data by primary key (default) or by partition field. Grouping by partition field is useful when the table has many partitions but may cause data skew.
Checkpoint interval: Set the checkpoint interval to more than 3 minutes for upsert jobs. Shorter intervals increase checkpoint frequency, which lowers write throughput and generates many small files.
For recommended upsert parameter configurations, see Real-time data ingestion into a data warehouse.
Data type mapping
The following table maps data types between Realtime Compute for Apache Flink and MaxCompute.
| Flink data type | MaxCompute data type |
|---|---|
| CHAR(p) | CHAR(p) |
| VARCHAR(p) | VARCHAR(p) |
| STRING | STRING |
| BOOLEAN | BOOLEAN |
| TINYINT | TINYINT |
| SMALLINT | SMALLINT |
| INT | INT |
| BIGINT | LONG |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| DECIMAL(p, s) | DECIMAL(p, s) |
| DATE | DATE |
| TIMESTAMP(9) WITHOUT TIME ZONE, TIMESTAMP_LTZ(9) | TIMESTAMP |
| TIMESTAMP(3) WITHOUT TIME ZONE, TIMESTAMP_LTZ(3) | DATETIME |
| BYTES | BINARY |
| ARRAY\<T\> | LIST\<T\> |
| MAP\<K, V\> | MAP\<K, V\> |
| ROW | STRUCT |
The Flink TIMESTAMP type does not carry time zone information, while the MaxCompute TIMESTAMP type does. This difference causes an 8-hour discrepancy. Use TIMESTAMP_LTZ(9) for fields that represent an absolute point in time:CREATE TEMPORARY TABLE odps_source(
id BIGINT NOT NULL COMMENT 'id',
created_time TIMESTAMP NOT NULL COMMENT 'Creation time',
updated_time TIMESTAMP_LTZ(9) NOT NULL COMMENT 'Update time',
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'maxcompute',
...
);
Write data from a self-managed open source Flink cluster to MaxCompute
Prerequisites
Before you begin, ensure that you have:
-
A running open source Flink cluster (version 1.13, 1.15, 1.16, or 1.17)
-
A MaxCompute project with permission to create tables and write data
-
Your AccessKey ID and AccessKey secret (available on the AccessKey Pair page)
Step 1: Create a MaxCompute table
Create the target Delta table in MaxCompute. The following example creates a non-partitioned table and a partitioned table. For Delta table property details, see Delta table parameters.
-- Non-partitioned Delta table
CREATE TABLE mf_flink_tt (
id BIGINT NOT NULL,
name STRING,
age INT,
status BOOLEAN,
PRIMARY KEY (id)
)
TBLPROPERTIES (
"transactional" = "true",
"write.bucket.num" = "64",
"acid.data.retain.hours" = "12"
);
-- Partitioned Delta table
CREATE TABLE mf_flink_tt_part (
id BIGINT NOT NULL,
name STRING,
age INT,
status BOOLEAN,
PRIMARY KEY (id)
)
PARTITIONED BY (dd STRING, hh STRING)
TBLPROPERTIES (
"transactional" = "true",
"write.bucket.num" = "64",
"acid.data.retain.hours" = "12"
);
Step 2: Install the Flink connector
Download the JAR for your Flink version and add it to the cluster.
| Flink version | Connector JAR |
|---|---|
| 1.13 | flink-connector-maxcompute-1.13.jar |
| 1.15 | flink-connector-maxcompute-1.15.jar |
| 1.16 or 1.17 | flink-connector-maxcompute-1.16.jar |
The 1.16 connector is compatible with both Flink 1.16 and 1.17.
Move the downloaded JAR to the lib directory of your Flink installation:
mv flink-connector-odps-1.13-shaded.jar $FLINK_HOME/lib/
Step 3: Start Flink
cd $FLINK_HOME/bin
./start-cluster.sh
./sql-client.sh
Step 4: Create a Flink table and write data
Choose Flink SQL or the DataStream API based on your job type.
Use Flink SQL
Register the MaxCompute tables in Flink SQL and write data:
-- Register the non-partitioned table
CREATE TABLE mf_flink (
id BIGINT,
name STRING,
age INT,
status BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'maxcompute',
'table.name' = 'mf_flink_tt',
'sink.operation' = 'upsert',
'odps.access.id' = 'LTAI****************',
'odps.access.key' = '********************',
'odps.end.point' = 'http://service.cn-beijing.maxcompute.aliyun.com/api',
'odps.project.name' = 'mf_mc_bj'
);
-- Register the partitioned table
CREATE TABLE mf_flink_part (
id BIGINT,
name STRING,
age INT,
status BOOLEAN,
dd STRING,
hh STRING,
PRIMARY KEY (id) NOT ENFORCED
) PARTITIONED BY (`dd`, `hh`)
WITH (
'connector' = 'maxcompute',
'table.name' = 'mf_flink_tt_part',
'sink.operation' = 'upsert',
'odps.access.id' = 'LTAI****************',
'odps.access.key' = '********************',
'odps.end.point' = 'http://service.cn-beijing.maxcompute.aliyun.com/api',
'odps.project.name' = 'mf_mc_bj'
);
Insert records and verify the results in MaxCompute:
-- Insert into the non-partitioned table
INSERT INTO mf_flink VALUES (1, 'Danny', 27, false);
-- Query MaxCompute to verify
SELECT * FROM mf_flink_tt;
-- Result:
-- +----+-------+-----+--------+
-- | id | name | age | status |
-- +----+-------+-----+--------+
-- | 1 | Danny | 27 | false |
-- +----+-------+-----+--------+
-- Update the record via upsert (same primary key, new age value)
INSERT INTO mf_flink VALUES (1, 'Danny', 28, false);
SELECT * FROM mf_flink_tt;
-- Result:
-- +----+-------+-----+--------+
-- | id | name | age | status |
-- +----+-------+-----+--------+
-- | 1 | Danny | 28 | false |
-- +----+-------+-----+--------+
-- Insert into the partitioned table
INSERT INTO mf_flink_part VALUES (1, 'Danny', 27, false, '01', '01');
SELECT * FROM mf_flink_tt_part WHERE dd = '01' AND hh = '01';
-- Result:
-- +----+-------+-----+--------+----+----+
-- | id | name | age | status | dd | hh |
-- +----+-------+-----+--------+----+----+
-- | 1 | Danny | 27 | false | 01 | 01 |
-- +----+-------+-----+--------+----+----+
Use the DataStream API
Add the connector as a Maven dependency:
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>flink-connector-maxcompute</artifactId>
<version>xxx</version>
<scope>system</scope>
<systemPath>${mvn_project.basedir}/lib/flink-connector-maxcompute-xxx-shaded.jar</systemPath>
</dependency>
Replace xxx with the actual connector version.
The following example shows how to configure the sink using OdpsPipeline:
package com.aliyun.odps.flink.examples;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.odps.table.OdpsOptions;
import org.apache.flink.odps.util.OdpsConf;
import org.apache.flink.odps.util.OdpsPipeline;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.RowData;
public class Examples {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(120 * 1000);
StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env);
Table source = streamTableEnvironment.sqlQuery("SELECT * FROM source_table");
DataStream<RowData> input = streamTableEnvironment.toAppendStream(source, RowData.class);
// Configure sink behavior
Configuration config = new Configuration();
config.set(OdpsOptions.SINK_OPERATION, "upsert");
config.set(OdpsOptions.UPSERT_COMMIT_THREAD_NUM, 8);
config.set(OdpsOptions.UPSERT_MAJOR_COMPACT_MIN_COMMITS, 100);
// Configure MaxCompute credentials and endpoint
OdpsConf odpsConfig = new OdpsConf(
"accessid",
"accesskey",
"endpoint",
"project",
"tunnel endpoint"
);
// Build and run the sink pipeline
OdpsPipeline.Builder builder = OdpsPipeline.builder();
builder.projectName("sql2_isolation_2a")
.tableName("user_ledger_portfolio")
.partition("")
.configuration(config)
.odpsConf(odpsConfig)
.sink(input, false);
env.execute();
}
}
Write data from fully managed Flink on Alibaba Cloud to MaxCompute
The Flink connector is pre-loaded on Ververica Platform (VVP). No additional installation is needed.
Prerequisites
Before you begin, ensure that you have:
-
Access to the Realtime Compute for Apache Flink console
-
A MaxCompute project with permission to create tables and write data
-
Your AccessKey ID and AccessKey secret (available on the AccessKey Pair page)
Step 1: Create a MaxCompute table
SET odps.sql.type.system.odps2 = true;
DROP TABLE IF EXISTS mf_flink_upsert;
CREATE TABLE mf_flink_upsert (
c1 INT NOT NULL,
c2 STRING,
gt TIMESTAMP,
PRIMARY KEY (c1)
)
PARTITIONED BY (ds STRING)
TBLPROPERTIES (
"transactional" = "true",
"write.bucket.num" = "64",
"acid.data.retain.hours" = "12"
);
Step 2: Create and deploy a Flink SQL job
On the Flink job development page, create a Flink SQL job. The following example defines a source table that auto-generates random data and a sink table that writes to MaxCompute in upsert mode. For job development details, see Job development map.
-- Source table with auto-generated data
CREATE TEMPORARY TABLE fake_src_table (
c1 INT,
c2 VARCHAR,
gt AS CURRENT_TIMESTAMP
) WITH (
'connector' = 'faker',
'fields.c2.expression' = '#{superhero.name}',
'rows-per-second' = '100',
'fields.c1.expression' = '#{number.numberBetween ''0'',''1000''}'
);
-- Sink table pointing to the MaxCompute Delta table
CREATE TEMPORARY TABLE test_c_d_g (
c1 INT,
c2 VARCHAR,
gt TIMESTAMP,
ds VARCHAR,
PRIMARY KEY (c1) NOT ENFORCED
) PARTITIONED BY (ds)
WITH (
'connector' = 'maxcompute',
'table.name' = 'mf_flink_upsert',
'sink.operation' = 'upsert',
'odps.access.id' = 'LTAI****************',
'odps.access.key' = '********************',
'odps.end.point' = 'http://service.cn-beijing.maxcompute.aliyun.com/api',
'odps.project.name' = 'mf_mc_bj',
'upsert.write.bucket.num' = '64'
);
-- Write logic
INSERT INTO test_c_d_g
SELECT
c1,
c2,
gt,
date_format(gt, 'yyyyMMddHH') AS ds
FROM fake_src_table;
Forodps.end.point, use the internal network endpoint for your region. Theupsert.write.bucket.numvalue must match thewrite.bucket.numproperty of the MaxCompute table.
After you develop the job, deploy it from the Flink job development page.
Step 3: Verify data in MaxCompute
SELECT * FROM mf_flink_upsert WHERE ds = '2023061517';
-- The Flink source generates random data, so your results will differ.
-- Example output:
-- +-----+--------------------+-------------------------+------------+
-- | c1 | c2 | gt | ds |
-- +-----+--------------------+-------------------------+------------+
-- | 0 | Skaar | 2023-06-16 01:59:41.116 | 2023061517 |
-- | 21 | Supah Century | 2023-06-16 01:59:59.117 | 2023061517 |
-- | 104 | Dark Gorilla Grodd | 2023-06-16 01:59:57.117 | 2023061517 |
-- | 126 | Leader | 2023-06-16 01:59:39.116 | 2023061517 |
-- +-----+--------------------+-------------------------+------------+
Connector parameters
Basic parameters
| Parameter | Required | Default | Description |
|---|---|---|---|
connector |
Yes | — | Connector type. Set to maxcompute. |
odps.project.name |
Yes | — | MaxCompute project name. |
odps.access.id |
Yes | — | AccessKey ID. Get it from the AccessKey Pair page. |
odps.access.key |
Yes | — | AccessKey secret. Get it from the AccessKey Pair page. |
odps.end.point |
Yes | — | MaxCompute endpoint. See Endpoint for regional values. |
odps.tunnel.end.point |
No | Auto-routed to match the MaxCompute network | Tunnel service endpoint. If set, automatic routing is disabled. See Endpoint for regional values. |
odps.tunnel.quota.name |
No | — | Tunnel quota name for accessing MaxCompute. |
table.name |
Yes | — | MaxCompute table name. Format: [project.][schema.]table. |
odps.namespace.schema |
No | false |
Whether to use the three-layer model (project.schema.table). See Schema operations. |
sink.operation |
Yes | insert |
Write mode. Valid values: insert, upsert. Only Delta tables support upsert. |
sink.parallelism |
No | Inherits input parallelism | Sink write parallelism. Set write.bucket.num to an integer multiple of this value for optimal performance and minimum memory use on the sink node. |
sink.meta.cache.time |
No | 400 |
Metadata cache size. |
sink.meta.cache.expire.time |
No | 1200 |
Metadata cache timeout, in seconds. |
sink.coordinator.enable |
No | true |
Whether to enable coordinator mode. |
Partition parameters
| Parameter | Required | Default | Description |
|---|---|---|---|
sink.partition |
No | — | Target partition name. For dynamic partitioning, this is the parent partition name. |
sink.partition.default-value |
No | __DEFAULT_PARTITION__ |
Default partition name for null partition key values in dynamic partitioning. |
sink.dynamic-partition.limit |
No | 100 |
Maximum number of partitions written concurrently in a single checkpoint. Do not increase this significantly — writing to too many partitions at the same time can cause out-of-memory (OOM) errors on the sink node. |
sink.group-partition.enable |
No | false |
Whether to group data by partition when writing to dynamic partitions. |
sink.partition.assigner.class |
No | — | Custom PartitionAssigner implementation class. |
FileCached mode parameters
Use FileCached mode when you have a large number of dynamic partitions. In this mode, data is written to local cache files before being uploaded to MaxCompute.
| Parameter | Required | Default | Description |
|---|---|---|---|
sink.file-cached.enable |
No | false |
Whether to enable FileCached mode. Set to true when writing to many dynamic partitions. |
sink.file-cached.tmp.dirs |
No | ./local |
Local directory for cache files. |
sink.file-cached.writer.num |
No | 16 |
Number of concurrent upload threads per task. Do not increase this significantly — high concurrency can cause OOM errors. |
sink.bucket.check-interval |
No | 60000 |
Interval to check file size in FileCached mode, in milliseconds. |
sink.file-cached.rolling.max-size |
No | 16 M |
Maximum size of a single cache file. Files that exceed this size are uploaded immediately. |
sink.file-cached.memory |
No | 64 M |
Maximum off-heap memory for file writes in FileCached mode. |
sink.file-cached.memory.segment-size |
No | 128 KB |
Buffer size for file writes in FileCached mode. |
sink.file-cached.flush.always |
No | true |
Whether to use buffered writes in FileCached mode. |
sink.file-cached.write.max-retries |
No | 3 |
Number of upload retries in FileCached mode. |
Upsert write parameters
For recommended parameter configurations, see Recommended parameter configurations for upsert writes.
| Parameter | Required | Default | Description |
|---|---|---|---|
upsert.write.bucket.num |
Yes | — | Number of buckets for the sink table. Must match the write.bucket.num property of the MaxCompute table. |
upsert.writer.max-retries |
No | 3 |
Number of retries if a writer fails to write to a bucket. |
upsert.writer.buffer-size |
No | 64 m |
Per-writer buffer size. When the total buffer across all buckets reaches this threshold, the writer flushes to the server. Increase this to improve throughput; decrease it if writing to many partitions causes OOM errors. |
upsert.writer.bucket.buffer-size |
No | 1 m |
Per-bucket buffer size. Decrease this if the Flink node is low on memory. |
upsert.write.slot-num |
No | 1 |
Number of Tunnel slots per session. |
upsert.commit.max-retries |
No | 3 |
Number of retries for a session commit. |
upsert.commit.thread-num |
No | 16 |
Commit parallelism. Keep this value low — high concurrency increases resource usage and may cause performance issues. |
upsert.commit.timeout |
No | 600 |
Timeout for a session commit to complete, in seconds. |
upsert.major-compact.enable |
No | false |
Whether to enable major compaction. |
upsert.major-compact.min-commits |
No | 100 |
Minimum number of commits required to trigger a major compaction. |
upsert.flush.concurrent |
No | 2 |
Maximum number of buckets flushed concurrently in a single partition. Each flush occupies one Tunnel slot. |
Insert write parameters
| Parameter | Required | Default | Description |
|---|---|---|---|
insert.commit.thread-num |
No | 16 |
Commit parallelism for insert sessions. |
insert.arrow-writer.enable |
No | false |
Whether to use the Arrow format for writes. |
insert.arrow-writer.batch-size |
No | 512 |
Maximum number of rows per Arrow batch. |
insert.arrow-writer.flush-interval |
No | 100000 |
Interval at which the Arrow writer flushes data, in milliseconds. |
insert.writer.buffer-size |
No | 64 M |
Buffer size for the insert writer. |
What's next
-
Real-time data ingestion into a data warehouse — recommended upsert configurations for production pipelines
-
Delta table parameters — full list of Delta table properties
-
MaxCompute endpoints — endpoint values by region and network type
-
Job development map — Flink SQL job development guide for fully managed Flink