All Products
Search
Document Center

Realtime Compute for Apache Flink:MaxCompute connector

Last Updated:Mar 26, 2026

The MaxCompute connector lets you read from and write to MaxCompute (previously ODPS) — Alibaba Cloud's fully managed, exabyte-scale data warehousing platform — directly from Flink SQL and DataStream jobs.

Capabilities

Item Description
Table types Source table, dimension table, and sink table
Running modes Streaming mode and batch mode
API types DataStream API and SQL API
Semantics At-least-once
Data update or deletion in a sink table Batch Tunnel or Streaming Tunnel: insert only. Upsert Tunnel: insert, update, and delete.

Metrics

Table type Metrics
Source numRecordsIn, numRecordsInPerSecond, numBytesIn, numBytesInPerSecond
Sink numRecordsOut, numRecordsOutPerSecond, numBytesOut, numBytesOutPerSecond
Dimension table dim.odps.cacheSize
For more information, see Monitoring metrics.

Prerequisites

Before you begin, make sure that you have:

Limitations

  • The connector supports at-least-once semantics only. Duplicate records may appear in MaxCompute depending on which tunnel you use. For guidance on tunnel selection, see the "How do I select a data tunnel?" section of the FAQ about upstream and downstream storage.

  • By default, a source operates in full mode: it reads only from the partition specified by the partition option. Once all data is read, the job finishes and does not monitor for new partitions. To continuously monitor new partitions, configure an incremental source using startPartition.

  • Each time a dimension table's cache is refreshed, the table checks for the latest partition. After the source starts, it does not read data newly added to a partition already being read — run a deployment only after the partition contains complete data.

Choose a tunnel

MaxCompute provides three tunnels for writing data from Flink. Choose based on your use case:

Tunnel Default When to use
MaxCompute Batch Tunnel Yes (useStreamTunnel=false, enableUpsert=false) Batch loads; data is available only after checkpointing. Set flushIntervalMs=0 to disable scheduled flushing.
MaxCompute Streaming Tunnel No (useStreamTunnel=true) Near real-time ingestion; flushed data is immediately available in MaxCompute.
MaxCompute Upsert Tunnel No (enableUpsert=true) INSERT, UPDATE, and DELETE operations on a MaxCompute Delta table. Requires VVR 8.0.6+.

For a detailed comparison, see the "How do I select a data tunnel?" section of the FAQ about upstream and downstream storage.

SQL

The MaxCompute connector can be used as a source, dimension, or sink table in SQL-based jobs.

Syntax

CREATE TEMPORARY TABLE odps_source(
  id INT,
  user_name VARCHAR,
  content VARCHAR
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpoint>',
  'project' = '<yourProjectName>',
  'schemaName' = '<yourSchemaName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=2018****'
);

Connector options

General options

Option Required Default Type Description
connector Yes STRING Set to odps.
endpoint Yes STRING MaxCompute endpoint. See Endpoint.
tunnelEndpoint No STRING MaxCompute Tunnel endpoint. If not specified, MaxCompute allocates tunnel connections through Server Load Balancer (SLB).
project Yes STRING MaxCompute project name.
schemaName No STRING Required only when the MaxCompute schema feature is enabled. Set this to the table's schema name. See Schema operations. VVR 8.0.6+.
tableName Yes STRING MaxCompute table name.
accessId Yes STRING AccessKey ID used to access MaxCompute. See How do I view my AccessKey ID and AccessKey secret?
Important

Store the AccessKey ID as a variable. See Manage variables.

accessKey Yes STRING AccessKey secret used to access MaxCompute.
partition No STRING Partition name in the MaxCompute table. Not required for non-partitioned tables or incremental sources. See the "How do I configure the partition option?" section of the FAQ about upstream and downstream storage.
compressAlgorithm No SNAPPY STRING Compression algorithm for MaxCompute Tunnel. Valid values: RAW (no compression), ZLIB, SNAPPY. SNAPPY improves throughput by approximately 50% compared to ZLIB in test scenarios.
quotaName No STRING Quota name for exclusive MaxCompute Tunnel resource groups. VVR 8.0.3+. If specified, remove tunnelEndpoint — otherwise the tunnel specified by tunnelEndpoint takes precedence.

Source options

Option Required Default Type Description
maxPartitionCount No 100 INTEGER Maximum number of partitions to read from. If exceeded, the error "The number of matched partitions exceeds the default limit" appears. Reading from too many partitions can overload MaxCompute and slow job startup — increase this value only when your workload requires it.
useArrow No false BOOLEAN Read data using the Arrow format, which calls the MaxCompute storage API. Batch deployments only. VVR 8.0.8+.
splitSize No 256 MB MEMORYSIZE Amount of data pulled per split when using the Arrow format. Batch deployments only. VVR 8.0.8+.
compressCodec No "" (none) STRING Compression algorithm when reading with the Arrow format. Valid values: "" (none), ZSTD, LZ4_FRAME. Specifying a codec improves throughput over no compression. Batch deployments only. VVR 8.0.8+.
dynamicLoadBalance No false BOOLEAN Enable dynamic shard allocation to improve processing performance and reduce overall read time. Note that this may cause data skew because different operators read inconsistent amounts of data. Batch deployments only. VVR 8.0.8+.

Incremental source options

The incremental source polls MaxCompute intermittently to discover new partitions. Before reading a new partition, all data writes to that partition must be complete. For details, see the "What do I do if an incremental source detects a new partition while data is still being written?" section of the FAQ about upstream and downstream storage.

Partition ordering: The source reads partitions whose alphabetical order is greater than or equal to the startPartition value. For example, year=2023,month=10 sorts before year=2023,month=9 alphabetically, so zero-pad month values (use year=2023,month=09 instead of year=2023,month=9) to ensure correct ordering.

Option Required Default Type Description
startPartition Yes STRING Start partition for incremental reads. When specified, partition is ignored. For multi-level partitioned tables, configure partition column values in descending order by level. See the "How do I configure startPartition?" section of the FAQ about upstream and downstream storage.
subscribeIntervalInSec No 30 INTEGER Polling interval in seconds.
modifiedTableOperation No NONE Enum Action when a partition is modified during reading. Download sessions are saved in checkpoints; if data in a partition changes after a session starts, resuming from the checkpoint fails and the deployment restarts repeatedly. Valid values: NONE — update startPartition to skip past the unavailable partition and restart without state; SKIP — automatically skip the unavailable partition when resuming. VVR 8.0.3+. If set to either value, data already read from the modified partition is retained; unread data is discarded.

Sink options

Option Required Default Type Description
useStreamTunnel No false BOOLEAN Use MaxCompute Streaming Tunnel instead of Batch Tunnel. true: Streaming Tunnel; false: Batch Tunnel. See Choose a tunnel.
flushIntervalMs No 30000 (30 s) LONG Flush interval for the tunnel writer buffer, in milliseconds. For Streaming Tunnel: flushed data is immediately available. For Batch Tunnel: data becomes available only after checkpointing — set to 0 to disable scheduled flushing. Triggered when either flushIntervalMs or batchSize is reached.
batchSize No 67108864 (64 MB) LONG Buffer size in bytes. Data is flushed when the buffer reaches this size. Triggered when either batchSize or flushIntervalMs is reached.
numFlushThreads No 1 INTEGER Number of threads used to flush the tunnel writer buffer. Values greater than 1 allow concurrent flushing across partitions.
slotNum No 0 INTEGER Number of Tunnel slots for receiving data from Flink. See Overview of the data transmission service for slot limits.
dynamicPartitionLimit No 100 INTEGER Maximum number of dynamic partitions written between two checkpoints. If exceeded, the error "Too many dynamic partitions" appears. Writing to many partitions increases load on MaxCompute and slows checkpointing — increase this value only when your workload requires it.
retryTimes No 3 INTEGER Maximum retries for MaxCompute server requests (session creation, submission, or flush failures).
sleepMillis No 1000 INTEGER Retry interval in milliseconds.
enableUpsert No false BOOLEAN Use MaxCompute Upsert Tunnel. true: processes INSERT, UPDATE_AFTER, and DELETE records; false: uses the tunnel specified by useStreamTunnel. VVR 8.0.6+. If the sink encounters errors or long-running faults during session commits in upsert mode, set the sink operator parallelism to 10 or fewer.
upsertAsyncCommit No false BOOLEAN Use asynchronous mode when committing upsert sessions. Async mode reduces commit time, but committed data is not immediately queryable. VVR 8.0.6+.
upsertCommitTimeoutMs No 120000 (120 s) INTEGER Timeout for upsert session commits, in milliseconds. VVR 8.0.6+.
sink.operation No insert STRING Write mode for a Delta table. insert: append mode; upsert: update mode. VVR 8.0.10+.
sink.parallelism No INTEGER Write parallelism for a Delta table. Defaults to upstream parallelism. The write.bucket.num value must be an integral multiple of sink.parallelism for optimal write performance and memory efficiency. VVR 8.0.10+.
sink.file-cached.enable No false BOOLEAN Enable file cache mode when writing to dynamic partitions of a Delta table. Reduces small files written to the server but increases write latency. Enable when the sink has high parallelism. VVR 8.0.10+.
sink.file-cached.writer.num No 16 INTEGER Concurrent upload threads per task in file cache mode. Avoid setting this too high — writing to many partitions simultaneously can cause out-of-memory (OOM) errors. Effective only when sink.file-cached.enable=true. VVR 8.0.10+.
sink.bucket.check-interval No 60000 INTEGER File size check interval in file cache mode, in milliseconds. Effective only when sink.file-cached.enable=true. VVR 8.0.10+.
sink.file-cached.rolling.max-size No 16 MB MEMORYSIZE Maximum size of a single cached file. When exceeded, data is uploaded to the server. Effective only when sink.file-cached.enable=true. VVR 8.0.10+.
sink.file-cached.memory No 64 MB MEMORYSIZE Maximum off-heap memory for file writes in file cache mode. Effective only when sink.file-cached.enable=true. VVR 8.0.10+.
sink.file-cached.memory.segment-size No 128 KB MEMORYSIZE Buffer segment size for file writes in file cache mode. Effective only when sink.file-cached.enable=true. VVR 8.0.10+.
sink.file-cached.flush.always No true BOOLEAN Whether to use the cache when writing files in file cache mode. Effective only when sink.file-cached.enable=true. VVR 8.0.10+.
sink.file-cached.write.max-retries No 3 INTEGER Retry count for data uploads in file cache mode. Effective only when sink.file-cached.enable=true. VVR 8.0.10+.
upsert.writer.max-retries No 3 INTEGER Maximum retries for writing to a bucket in an Upsert Writer session. VVR 8.0.10+.
upsert.writer.buffer-size No 64 MB MEMORYSIZE Total buffer size across all buckets in an Upsert Writer session. Data is flushed when the total reaches this threshold. Increase for better write efficiency; decrease if writing to many partitions causes OOM errors. VVR 8.0.10+.
upsert.writer.bucket.buffer-size No 1 MB MEMORYSIZE Per-bucket buffer size in an Upsert Writer session. Decrease if Flink server memory is insufficient. VVR 8.0.10+.
upsert.write.bucket.num Yes INTEGER Number of buckets for the target Delta table. Must match the write.bucket.num configured on the Delta table. VVR 8.0.10+.
upsert.write.slot-num No 1 INTEGER Tunnel slots per upsert session. VVR 8.0.10+.
upsert.commit.max-retries No 3 INTEGER Maximum retries for upsert session commits. VVR 8.0.10+.
upsert.commit.thread-num No 16 INTEGER Parallelism for upsert session commits. Avoid large values — excessive concurrent commits increase resource consumption and can cause performance issues. VVR 8.0.10+.
upsert.commit.timeout No 600 INTEGER Timeout for upsert session commits, in seconds. VVR 8.0.10+.
upsert.flush.concurrent No 2 INTEGER Maximum concurrent bucket flushes per partition. Each bucket flush occupies a Tunnel slot. VVR 8.0.10+.
insert.commit.thread-num No 16 INTEGER Parallelism for insert session commits. VVR 8.0.10+.
insert.arrow-writer.enable No false BOOLEAN Use the Arrow format for inserts. VVR 8.0.10+.
insert.arrow-writer.batch-size No 512 INTEGER Maximum rows per Arrow-format batch. VVR 8.0.10+.
insert.arrow-writer.flush-interval No 100000 INTEGER Writer flush interval in milliseconds. VVR 8.0.10+.
insert.writer.buffer-size No 64 MB MEMORYSIZE Cache size for the buffered writer. VVR 8.0.10+.
upsert.partial-column.enable No false BOOLEAN Update only specified columns (partial column update). Applies only to Delta table sinks. See Update data in specific columns. When true: if a record with the same primary key exists, specified non-null fields are overwritten; if no matching record exists, a new record is inserted with new values for specified columns and null for all unspecified columns. VVR 8.0.11+.

Dimension table options

When a deployment starts, the dimension table loads all data from the partition specified by partition. The partition option supports the max_pt() function. On cache reload, the latest partition is re-read. Set partition to max_two_pt() to load data from two partitions.

Dimension tables require cache=ALL. Increase join node memory to at least four times the size of the remote table data. For large dimension tables, use the SHUFFLE_HASH hint to distribute data evenly. For ultra-large tables that cause frequent Java Virtual Machine (JVM) garbage collections (GCs), convert to a key-value dimension table with an LRU (Least Recently Used) cache policy — for example, an ApsaraDB for HBase dimension table.
Option Required Default Type Description
cache Yes STRING Cache policy. Must be set to ALL and explicitly declared in the DDL statement. All dimension table data is loaded into cache before the deployment runs. Subsequent lookups search the cache only. The cache reloads after entries expire.
cacheSize No 100000 LONG Maximum rows to cache. If exceeded, the error "Row count of table <table-name> partition <partition-name> exceeds maxRowCount limit" appears. Large caches consume significant JVM heap memory and slow startup and cache refresh — increase this value only when your workload requires it.
cacheTTLMs No Long.MAX_VALUE LONG Cache timeout in milliseconds.
cacheReloadTimeBlackList No STRING Time periods during which the cache is not refreshed. Use during peak traffic periods (such as promotional events) to prevent deployment instability from cache refreshes. See the "How do I configure cacheReloadTimeBlackList?" section of the FAQ about upstream and downstream storage.
maxLoadRetries No 10 INTEGER Maximum retries for the initial cache load on deployment startup. If retries are exhausted, the deployment fails.

Data type mappings

For the full list of MaxCompute data types, see MaxCompute data type system version 2.0.

MaxCompute type Flink type
BOOLEAN BOOLEAN
TINYINT TINYINT
SMALLINT SMALLINT
INT INTEGER
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
DECIMAL(precision, scale) DECIMAL(precision, scale)
CHAR(n) CHAR(n)
VARCHAR(n) VARCHAR(n)
STRING STRING
BINARY BYTES
DATE DATE
DATETIME TIMESTAMP(3)
TIMESTAMP TIMESTAMP(9)
TIMESTAMP_NTZ TIMESTAMP(9)
ARRAY ARRAY
MAP MAP
STRUCT ROW
JSON STRING
Important

If a MaxCompute physical table contains nested composite type fields (ARRAY, MAP, or STRUCT) or a JSON field, set tblproperties('columnar.nested.type'='true') when creating the table to allow Realtime Compute for Apache Flink to read and write data correctly.

Flink CDC (public preview)

The MaxCompute connector can ingest Change Data Capture (CDC) data as a sink in YAML-based jobs. Requires VVR 11.1+.

Syntax

source:
  type: xxx

sink:
  type: maxcompute
  name: MaxComputeSink
  access-id: ${your_accessId}
  access-key: ${your_accessKey}
  endpoint: ${your_maxcompute_endpoint}
  project: ${your_project}
  buckets-num: 8

Configuration options

Option Required Default Type Description
type Yes String Set to maxcompute.
name No String Sink name.
access-id Yes String AccessKey ID of your Alibaba Cloud account or RAM user. Get it from the Resource Access Management (RAM) console.
access-key Yes String AccessKey secret.
endpoint Yes String MaxCompute endpoint. Configure based on the region and network connection method. See Endpoint.
project Yes String MaxCompute project name. To find it: log on to the MaxCompute console, go to Workspace > Projects, and copy the project name.
tunnel.endpoint No String MaxCompute Tunnel endpoint. Usually inferred automatically. Required in special network environments, such as with a proxy server.
quota.name No String Quota name for an exclusive resource group. If not specified, a shared resource group is used.
sts-token No String Security Token Service (STS) token for RAM role authentication. Required when accessing MaxCompute with a RAM role.
buckets-num No 16 Integer Number of buckets for an auto-created MaxCompute Delta table. See Near real-time data warehouse.
compress.algorithm No zlib String Data compression algorithm. Valid values: raw (no compression), zlib, snappy.
total.buffer-size No 64 MB String In-memory buffer size. For partitioned tables: applies per partition. For non-partitioned tables: applies per table. Buffers for different partitions or tables are independent. Data is flushed when the buffer is full.
bucket.buffer-size No 4 MB String Per-bucket buffer size. Applies only when writing to MaxCompute Delta tables.
commit.thread-num No 16 Integer Maximum partitions or tables committed concurrently during checkpointing.
flush.concurrent-num No 4 Integer Maximum buckets flushed concurrently. Applies only when writing to MaxCompute Delta tables.

Table location mappings

When the connector auto-creates tables in MaxCompute, locations are mapped as follows:

Important

If the schema feature is disabled for your MaxCompute project, the connector ignores tableId.namespace. In this case, only a single database (or its logical equivalent) is ingested into MaxCompute — for example, only one MySQL database when ingesting from MySQL.

MySQL location Flink CDC abstract MaxCompute location
N/A Project (from configuration) Project
Database TableId.namespace Schema (ignored if schema is disabled)
Table TableId.tableName Table

Data type mappings

Flink CDC type MaxCompute type
CHAR STRING
VARCHAR STRING
BOOLEAN BOOLEAN
BINARY/VARBINARY BINARY
DECIMAL DECIMAL
TINYINT TINYINT
SMALLINT SMALLINT
INTEGER INTEGER
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
TIME_WITHOUT_TIME_ZONE STRING
DATE DATE
TIMESTAMP_WITHOUT_TIME_ZONE TIMESTAMP_NTZ
TIMESTAMP_WITH_LOCAL_TIME_ZONE (precision > 3) TIMESTAMP
TIMESTAMP_WITH_LOCAL_TIME_ZONE (precision <= 3) DATETIME
TIMESTAMP_WITH_TIME_ZONE (precision > 3) TIMESTAMP
TIMESTAMP_WITH_TIME_ZONE (precision <= 3) DATETIME
ARRAY ARRAY
MAP MAP
ROW STRUCT

Examples

SQL API

Source table

Read all data from a partition

Read all data from the partition specified by partition:

CREATE TEMPORARY TABLE odps_source (
  cid VARCHAR,
  rt DOUBLE
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpointName>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=201809*'
);

CREATE TEMPORARY TABLE blackhole_sink (
  cid VARCHAR,
  invoke_count BIGINT
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT
   cid,
   COUNT(*) AS invoke_count
FROM odps_source GROUP BY cid;
Read incremental data

Read data starting from the partition specified by startPartition and continuously monitor new partitions:

CREATE TEMPORARY TABLE odps_source (
  cid VARCHAR,
  rt DOUBLE
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpointName>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'startPartition' = 'yyyy=2018,MM=09,dd=05' -- Start reading from the 20180905 partition.
);

CREATE TEMPORARY TABLE blackhole_sink (
  cid VARCHAR,
  invoke_count BIGINT
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT cid, COUNT(*) AS invoke_count
FROM odps_source GROUP BY cid;

Sink table

Write to a static partition

Write to the partition specified by partition:

CREATE TEMPORARY TABLE datagen_source (
  id INT,
  len INT,
  content VARCHAR
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE odps_sink (
  id INT,
  len INT,
  content VARCHAR
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpoint>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=20180905' -- Write to partition 20180905.
);

INSERT INTO odps_sink
SELECT
  id, len, content
FROM datagen_source;
Write to dynamic partitions

Write data to partitions determined at runtime by the values in the ds column:

CREATE TEMPORARY TABLE datagen_source (
  id INT,
  len INT,
  content VARCHAR,
  c TIMESTAMP
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE odps_sink (
  id  INT,
  len INT,
  content VARCHAR,
  ds VARCHAR -- Dynamic partition column.
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpoint>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds' -- Omit the value; data is routed to partitions based on the ds field.
);

INSERT INTO odps_sink
SELECT
   id,
   len,
   content,
   DATE_FORMAT(c, 'yyMMdd') as ds
FROM datagen_source;

Dimension table

Single-value key

Specify a primary key when each key maps to exactly one row:

CREATE TEMPORARY TABLE datagen_source (
  k INT,
  v VARCHAR
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE odps_dim (
  k INT,
  v VARCHAR,
  PRIMARY KEY (k) NOT ENFORCED  -- Specify the primary key.
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpoint>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=20180905',
  'cache' = 'ALL'
);

CREATE TEMPORARY TABLE blackhole_sink (
  k VARCHAR,
  v1 VARCHAR,
  v2 VARCHAR
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT k, s.v, d.v
FROM datagen_source AS s
INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;
Multi-value key

Omit the primary key when a key can map to multiple rows:

CREATE TEMPORARY TABLE datagen_source (
  k INT,
  v VARCHAR
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE odps_dim (
  k INT,
  v VARCHAR
  -- No primary key needed for multi-value lookups.
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpoint>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=20180905',
  'cache' = 'ALL'
);

CREATE TEMPORARY TABLE blackhole_sink (
  k VARCHAR,
  v1 VARCHAR,
  v2 VARCHAR
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT k, s.v, d.v
FROM datagen_source AS s
INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;

DataStream API

Important
  • To use the DataStream API with MaxCompute, configure a DataStream connector. See Integrate DataStream connectors.

  • VVR 6.0.6+ supports on-premises debugging of DataStream programs with the MaxCompute connector for up to 30 minutes. Sessions exceeding 30 minutes are terminated with an error. See Debug connectors locally.

  • Reading from a MaxCompute Delta table (a table created with a primary key and transactional=true) is not supported.

Declare the MaxCompute table using SQL, then access it via the Table API or DataStream API.

Connect to the source table

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
    "\n",
    "CREATE TEMPORARY TABLE IF NOT EXISTS odps_source (",
    "  cid VARCHAR,",
    "  rt DOUBLE",
    ") WITH (",
    "  'connector' = 'odps',",
    "  'endpoint' = '<yourEndpointName>',",
    "  'project' = '<yourProjectName>',",
    "  'tableName' = '<yourTableName>',",
    "  'accessId' = '<yourAccessId>',",
    "  'accessKey' = '<yourAccessPassword>',",
    "  'partition' = 'ds=201809*'",
    ")");
DataStream<Row> source = tEnv.toDataStream(tEnv.from("odps_source"));
source.print();
env.execute("odps source");

Connect to the sink

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
    "\n",
    "CREATE TEMPORARY TABLE IF NOT EXISTS odps_sink (",
    "  cid VARCHAR,",
    "  rt DOUBLE",
    ") WITH (",
    "  'connector' = 'odps',",
    "  'endpoint' = '<yourEndpointName>',",
    "  'project' = '<yourProjectName>',",
    "  'tableName' = '<yourTableName>',",
    "  'accessId' = '<yourAccessId>',",
    "  'accessKey' = '<yourAccessPassword>',",
    "  'partition' = 'ds=20180905'",
    ")");
DataStream<Row> data = env.fromElements(
    Row.of("id0", 3.),
    Row.of("id1", 4.));
tEnv.fromDataStream(data).insertInto("odps_sink").execute();

Maven dependency

Add the MaxCompute DataStream connector to your project. All versions are available in the Maven central repository.

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-odps</artifactId>
    <version>${vvr-version}</version>
</dependency>

What's next