The MaxCompute connector lets you read from and write to MaxCompute (previously ODPS) — Alibaba Cloud's fully managed, exabyte-scale data warehousing platform — directly from Flink SQL and DataStream jobs.
Capabilities
| Item | Description |
|---|---|
| Table types | Source table, dimension table, and sink table |
| Running modes | Streaming mode and batch mode |
| API types | DataStream API and SQL API |
| Semantics | At-least-once |
| Data update or deletion in a sink table | Batch Tunnel or Streaming Tunnel: insert only. Upsert Tunnel: insert, update, and delete. |
Prerequisites
Before you begin, make sure that you have:
-
A MaxCompute table. See Create a table.
Limitations
-
The connector supports at-least-once semantics only. Duplicate records may appear in MaxCompute depending on which tunnel you use. For guidance on tunnel selection, see the "How do I select a data tunnel?" section of the FAQ about upstream and downstream storage.
-
By default, a source operates in full mode: it reads only from the partition specified by the
partitionoption. Once all data is read, the job finishes and does not monitor for new partitions. To continuously monitor new partitions, configure an incremental source usingstartPartition. -
Each time a dimension table's cache is refreshed, the table checks for the latest partition. After the source starts, it does not read data newly added to a partition already being read — run a deployment only after the partition contains complete data.
Choose a tunnel
MaxCompute provides three tunnels for writing data from Flink. Choose based on your use case:
| Tunnel | Default | When to use |
|---|---|---|
| MaxCompute Batch Tunnel | Yes (useStreamTunnel=false, enableUpsert=false) |
Batch loads; data is available only after checkpointing. Set flushIntervalMs=0 to disable scheduled flushing. |
| MaxCompute Streaming Tunnel | No (useStreamTunnel=true) |
Near real-time ingestion; flushed data is immediately available in MaxCompute. |
| MaxCompute Upsert Tunnel | No (enableUpsert=true) |
INSERT, UPDATE, and DELETE operations on a MaxCompute Delta table. Requires VVR 8.0.6+. |
For a detailed comparison, see the "How do I select a data tunnel?" section of the FAQ about upstream and downstream storage.
SQL
The MaxCompute connector can be used as a source, dimension, or sink table in SQL-based jobs.
Syntax
CREATE TEMPORARY TABLE odps_source(
id INT,
user_name VARCHAR,
content VARCHAR
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'schemaName' = '<yourSchemaName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=2018****'
);
Connector options
General options
| Option | Required | Default | Type | Description |
|---|---|---|---|---|
connector |
Yes | — | STRING | Set to odps. |
endpoint |
Yes | — | STRING | MaxCompute endpoint. See Endpoint. |
tunnelEndpoint |
No | — | STRING | MaxCompute Tunnel endpoint. If not specified, MaxCompute allocates tunnel connections through Server Load Balancer (SLB). |
project |
Yes | — | STRING | MaxCompute project name. |
schemaName |
No | — | STRING | Required only when the MaxCompute schema feature is enabled. Set this to the table's schema name. See Schema operations. VVR 8.0.6+. |
tableName |
Yes | — | STRING | MaxCompute table name. |
accessId |
Yes | — | STRING | AccessKey ID used to access MaxCompute. See How do I view my AccessKey ID and AccessKey secret?
Important
Store the AccessKey ID as a variable. See Manage variables. |
accessKey |
Yes | — | STRING | AccessKey secret used to access MaxCompute. |
partition |
No | — | STRING | Partition name in the MaxCompute table. Not required for non-partitioned tables or incremental sources. See the "How do I configure the partition option?" section of the FAQ about upstream and downstream storage. |
compressAlgorithm |
No | SNAPPY |
STRING | Compression algorithm for MaxCompute Tunnel. Valid values: RAW (no compression), ZLIB, SNAPPY. SNAPPY improves throughput by approximately 50% compared to ZLIB in test scenarios. |
quotaName |
No | — | STRING | Quota name for exclusive MaxCompute Tunnel resource groups. VVR 8.0.3+. If specified, remove tunnelEndpoint — otherwise the tunnel specified by tunnelEndpoint takes precedence. |
Source options
| Option | Required | Default | Type | Description |
|---|---|---|---|---|
maxPartitionCount |
No | 100 |
INTEGER | Maximum number of partitions to read from. If exceeded, the error "The number of matched partitions exceeds the default limit" appears. Reading from too many partitions can overload MaxCompute and slow job startup — increase this value only when your workload requires it. |
useArrow |
No | false |
BOOLEAN | Read data using the Arrow format, which calls the MaxCompute storage API. Batch deployments only. VVR 8.0.8+. |
splitSize |
No | 256 MB |
MEMORYSIZE | Amount of data pulled per split when using the Arrow format. Batch deployments only. VVR 8.0.8+. |
compressCodec |
No | "" (none) |
STRING | Compression algorithm when reading with the Arrow format. Valid values: "" (none), ZSTD, LZ4_FRAME. Specifying a codec improves throughput over no compression. Batch deployments only. VVR 8.0.8+. |
dynamicLoadBalance |
No | false |
BOOLEAN | Enable dynamic shard allocation to improve processing performance and reduce overall read time. Note that this may cause data skew because different operators read inconsistent amounts of data. Batch deployments only. VVR 8.0.8+. |
Incremental source options
The incremental source polls MaxCompute intermittently to discover new partitions. Before reading a new partition, all data writes to that partition must be complete. For details, see the "What do I do if an incremental source detects a new partition while data is still being written?" section of the FAQ about upstream and downstream storage.
Partition ordering: The source reads partitions whose alphabetical order is greater than or equal to the startPartition value. For example, year=2023,month=10 sorts before year=2023,month=9 alphabetically, so zero-pad month values (use year=2023,month=09 instead of year=2023,month=9) to ensure correct ordering.
| Option | Required | Default | Type | Description |
|---|---|---|---|---|
startPartition |
Yes | — | STRING | Start partition for incremental reads. When specified, partition is ignored. For multi-level partitioned tables, configure partition column values in descending order by level. See the "How do I configure startPartition?" section of the FAQ about upstream and downstream storage. |
subscribeIntervalInSec |
No | 30 |
INTEGER | Polling interval in seconds. |
modifiedTableOperation |
No | NONE |
Enum | Action when a partition is modified during reading. Download sessions are saved in checkpoints; if data in a partition changes after a session starts, resuming from the checkpoint fails and the deployment restarts repeatedly. Valid values: NONE — update startPartition to skip past the unavailable partition and restart without state; SKIP — automatically skip the unavailable partition when resuming. VVR 8.0.3+. If set to either value, data already read from the modified partition is retained; unread data is discarded. |
Sink options
| Option | Required | Default | Type | Description |
|---|---|---|---|---|
useStreamTunnel |
No | false |
BOOLEAN | Use MaxCompute Streaming Tunnel instead of Batch Tunnel. true: Streaming Tunnel; false: Batch Tunnel. See Choose a tunnel. |
flushIntervalMs |
No | 30000 (30 s) |
LONG | Flush interval for the tunnel writer buffer, in milliseconds. For Streaming Tunnel: flushed data is immediately available. For Batch Tunnel: data becomes available only after checkpointing — set to 0 to disable scheduled flushing. Triggered when either flushIntervalMs or batchSize is reached. |
batchSize |
No | 67108864 (64 MB) |
LONG | Buffer size in bytes. Data is flushed when the buffer reaches this size. Triggered when either batchSize or flushIntervalMs is reached. |
numFlushThreads |
No | 1 |
INTEGER | Number of threads used to flush the tunnel writer buffer. Values greater than 1 allow concurrent flushing across partitions. |
slotNum |
No | 0 |
INTEGER | Number of Tunnel slots for receiving data from Flink. See Overview of the data transmission service for slot limits. |
dynamicPartitionLimit |
No | 100 |
INTEGER | Maximum number of dynamic partitions written between two checkpoints. If exceeded, the error "Too many dynamic partitions" appears. Writing to many partitions increases load on MaxCompute and slows checkpointing — increase this value only when your workload requires it. |
retryTimes |
No | 3 |
INTEGER | Maximum retries for MaxCompute server requests (session creation, submission, or flush failures). |
sleepMillis |
No | 1000 |
INTEGER | Retry interval in milliseconds. |
enableUpsert |
No | false |
BOOLEAN | Use MaxCompute Upsert Tunnel. true: processes INSERT, UPDATE_AFTER, and DELETE records; false: uses the tunnel specified by useStreamTunnel. VVR 8.0.6+. If the sink encounters errors or long-running faults during session commits in upsert mode, set the sink operator parallelism to 10 or fewer. |
upsertAsyncCommit |
No | false |
BOOLEAN | Use asynchronous mode when committing upsert sessions. Async mode reduces commit time, but committed data is not immediately queryable. VVR 8.0.6+. |
upsertCommitTimeoutMs |
No | 120000 (120 s) |
INTEGER | Timeout for upsert session commits, in milliseconds. VVR 8.0.6+. |
sink.operation |
No | insert |
STRING | Write mode for a Delta table. insert: append mode; upsert: update mode. VVR 8.0.10+. |
sink.parallelism |
No | — | INTEGER | Write parallelism for a Delta table. Defaults to upstream parallelism. The write.bucket.num value must be an integral multiple of sink.parallelism for optimal write performance and memory efficiency. VVR 8.0.10+. |
sink.file-cached.enable |
No | false |
BOOLEAN | Enable file cache mode when writing to dynamic partitions of a Delta table. Reduces small files written to the server but increases write latency. Enable when the sink has high parallelism. VVR 8.0.10+. |
sink.file-cached.writer.num |
No | 16 |
INTEGER | Concurrent upload threads per task in file cache mode. Avoid setting this too high — writing to many partitions simultaneously can cause out-of-memory (OOM) errors. Effective only when sink.file-cached.enable=true. VVR 8.0.10+. |
sink.bucket.check-interval |
No | 60000 |
INTEGER | File size check interval in file cache mode, in milliseconds. Effective only when sink.file-cached.enable=true. VVR 8.0.10+. |
sink.file-cached.rolling.max-size |
No | 16 MB |
MEMORYSIZE | Maximum size of a single cached file. When exceeded, data is uploaded to the server. Effective only when sink.file-cached.enable=true. VVR 8.0.10+. |
sink.file-cached.memory |
No | 64 MB |
MEMORYSIZE | Maximum off-heap memory for file writes in file cache mode. Effective only when sink.file-cached.enable=true. VVR 8.0.10+. |
sink.file-cached.memory.segment-size |
No | 128 KB |
MEMORYSIZE | Buffer segment size for file writes in file cache mode. Effective only when sink.file-cached.enable=true. VVR 8.0.10+. |
sink.file-cached.flush.always |
No | true |
BOOLEAN | Whether to use the cache when writing files in file cache mode. Effective only when sink.file-cached.enable=true. VVR 8.0.10+. |
sink.file-cached.write.max-retries |
No | 3 |
INTEGER | Retry count for data uploads in file cache mode. Effective only when sink.file-cached.enable=true. VVR 8.0.10+. |
upsert.writer.max-retries |
No | 3 |
INTEGER | Maximum retries for writing to a bucket in an Upsert Writer session. VVR 8.0.10+. |
upsert.writer.buffer-size |
No | 64 MB |
MEMORYSIZE | Total buffer size across all buckets in an Upsert Writer session. Data is flushed when the total reaches this threshold. Increase for better write efficiency; decrease if writing to many partitions causes OOM errors. VVR 8.0.10+. |
upsert.writer.bucket.buffer-size |
No | 1 MB |
MEMORYSIZE | Per-bucket buffer size in an Upsert Writer session. Decrease if Flink server memory is insufficient. VVR 8.0.10+. |
upsert.write.bucket.num |
Yes | — | INTEGER | Number of buckets for the target Delta table. Must match the write.bucket.num configured on the Delta table. VVR 8.0.10+. |
upsert.write.slot-num |
No | 1 |
INTEGER | Tunnel slots per upsert session. VVR 8.0.10+. |
upsert.commit.max-retries |
No | 3 |
INTEGER | Maximum retries for upsert session commits. VVR 8.0.10+. |
upsert.commit.thread-num |
No | 16 |
INTEGER | Parallelism for upsert session commits. Avoid large values — excessive concurrent commits increase resource consumption and can cause performance issues. VVR 8.0.10+. |
upsert.commit.timeout |
No | 600 |
INTEGER | Timeout for upsert session commits, in seconds. VVR 8.0.10+. |
upsert.flush.concurrent |
No | 2 |
INTEGER | Maximum concurrent bucket flushes per partition. Each bucket flush occupies a Tunnel slot. VVR 8.0.10+. |
insert.commit.thread-num |
No | 16 |
INTEGER | Parallelism for insert session commits. VVR 8.0.10+. |
insert.arrow-writer.enable |
No | false |
BOOLEAN | Use the Arrow format for inserts. VVR 8.0.10+. |
insert.arrow-writer.batch-size |
No | 512 |
INTEGER | Maximum rows per Arrow-format batch. VVR 8.0.10+. |
insert.arrow-writer.flush-interval |
No | 100000 |
INTEGER | Writer flush interval in milliseconds. VVR 8.0.10+. |
insert.writer.buffer-size |
No | 64 MB |
MEMORYSIZE | Cache size for the buffered writer. VVR 8.0.10+. |
upsert.partial-column.enable |
No | false |
BOOLEAN | Update only specified columns (partial column update). Applies only to Delta table sinks. See Update data in specific columns. When true: if a record with the same primary key exists, specified non-null fields are overwritten; if no matching record exists, a new record is inserted with new values for specified columns and null for all unspecified columns. VVR 8.0.11+. |
Dimension table options
When a deployment starts, the dimension table loads all data from the partition specified by partition. The partition option supports the max_pt() function. On cache reload, the latest partition is re-read. Set partition to max_two_pt() to load data from two partitions.
Dimension tables requirecache=ALL. Increase join node memory to at least four times the size of the remote table data. For large dimension tables, use theSHUFFLE_HASHhint to distribute data evenly. For ultra-large tables that cause frequent Java Virtual Machine (JVM) garbage collections (GCs), convert to a key-value dimension table with an LRU (Least Recently Used) cache policy — for example, an ApsaraDB for HBase dimension table.
| Option | Required | Default | Type | Description |
|---|---|---|---|---|
cache |
Yes | — | STRING | Cache policy. Must be set to ALL and explicitly declared in the DDL statement. All dimension table data is loaded into cache before the deployment runs. Subsequent lookups search the cache only. The cache reloads after entries expire. |
cacheSize |
No | 100000 |
LONG | Maximum rows to cache. If exceeded, the error "Row count of table <table-name> partition <partition-name> exceeds maxRowCount limit" appears. Large caches consume significant JVM heap memory and slow startup and cache refresh — increase this value only when your workload requires it. |
cacheTTLMs |
No | Long.MAX_VALUE |
LONG | Cache timeout in milliseconds. |
cacheReloadTimeBlackList |
No | — | STRING | Time periods during which the cache is not refreshed. Use during peak traffic periods (such as promotional events) to prevent deployment instability from cache refreshes. See the "How do I configure cacheReloadTimeBlackList?" section of the FAQ about upstream and downstream storage. |
maxLoadRetries |
No | 10 |
INTEGER | Maximum retries for the initial cache load on deployment startup. If retries are exhausted, the deployment fails. |
Data type mappings
For the full list of MaxCompute data types, see MaxCompute data type system version 2.0.
| MaxCompute type | Flink type |
|---|---|
| BOOLEAN | BOOLEAN |
| TINYINT | TINYINT |
| SMALLINT | SMALLINT |
| INT | INTEGER |
| BIGINT | BIGINT |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| DECIMAL(precision, scale) | DECIMAL(precision, scale) |
| CHAR(n) | CHAR(n) |
| VARCHAR(n) | VARCHAR(n) |
| STRING | STRING |
| BINARY | BYTES |
| DATE | DATE |
| DATETIME | TIMESTAMP(3) |
| TIMESTAMP | TIMESTAMP(9) |
| TIMESTAMP_NTZ | TIMESTAMP(9) |
| ARRAY | ARRAY |
| MAP | MAP |
| STRUCT | ROW |
| JSON | STRING |
If a MaxCompute physical table contains nested composite type fields (ARRAY, MAP, or STRUCT) or a JSON field, set tblproperties('columnar.nested.type'='true') when creating the table to allow Realtime Compute for Apache Flink to read and write data correctly.
Flink CDC (public preview)
The MaxCompute connector can ingest Change Data Capture (CDC) data as a sink in YAML-based jobs. Requires VVR 11.1+.
Syntax
source:
type: xxx
sink:
type: maxcompute
name: MaxComputeSink
access-id: ${your_accessId}
access-key: ${your_accessKey}
endpoint: ${your_maxcompute_endpoint}
project: ${your_project}
buckets-num: 8
Configuration options
| Option | Required | Default | Type | Description |
|---|---|---|---|---|
type |
Yes | — | String | Set to maxcompute. |
name |
No | — | String | Sink name. |
access-id |
Yes | — | String | AccessKey ID of your Alibaba Cloud account or RAM user. Get it from the Resource Access Management (RAM) console. |
access-key |
Yes | — | String | AccessKey secret. |
endpoint |
Yes | — | String | MaxCompute endpoint. Configure based on the region and network connection method. See Endpoint. |
project |
Yes | — | String | MaxCompute project name. To find it: log on to the MaxCompute console, go to Workspace > Projects, and copy the project name. |
tunnel.endpoint |
No | — | String | MaxCompute Tunnel endpoint. Usually inferred automatically. Required in special network environments, such as with a proxy server. |
quota.name |
No | — | String | Quota name for an exclusive resource group. If not specified, a shared resource group is used. |
sts-token |
No | — | String | Security Token Service (STS) token for RAM role authentication. Required when accessing MaxCompute with a RAM role. |
buckets-num |
No | 16 |
Integer | Number of buckets for an auto-created MaxCompute Delta table. See Near real-time data warehouse. |
compress.algorithm |
No | zlib |
String | Data compression algorithm. Valid values: raw (no compression), zlib, snappy. |
total.buffer-size |
No | 64 MB |
String | In-memory buffer size. For partitioned tables: applies per partition. For non-partitioned tables: applies per table. Buffers for different partitions or tables are independent. Data is flushed when the buffer is full. |
bucket.buffer-size |
No | 4 MB |
String | Per-bucket buffer size. Applies only when writing to MaxCompute Delta tables. |
commit.thread-num |
No | 16 |
Integer | Maximum partitions or tables committed concurrently during checkpointing. |
flush.concurrent-num |
No | 4 |
Integer | Maximum buckets flushed concurrently. Applies only when writing to MaxCompute Delta tables. |
Table location mappings
When the connector auto-creates tables in MaxCompute, locations are mapped as follows:
If the schema feature is disabled for your MaxCompute project, the connector ignores tableId.namespace. In this case, only a single database (or its logical equivalent) is ingested into MaxCompute — for example, only one MySQL database when ingesting from MySQL.
| MySQL location | Flink CDC abstract | MaxCompute location |
|---|---|---|
| N/A | Project (from configuration) | Project |
| Database | TableId.namespace |
Schema (ignored if schema is disabled) |
| Table | TableId.tableName |
Table |
Data type mappings
| Flink CDC type | MaxCompute type |
|---|---|
| CHAR | STRING |
| VARCHAR | STRING |
| BOOLEAN | BOOLEAN |
| BINARY/VARBINARY | BINARY |
| DECIMAL | DECIMAL |
| TINYINT | TINYINT |
| SMALLINT | SMALLINT |
| INTEGER | INTEGER |
| BIGINT | BIGINT |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| TIME_WITHOUT_TIME_ZONE | STRING |
| DATE | DATE |
| TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_NTZ |
| TIMESTAMP_WITH_LOCAL_TIME_ZONE (precision > 3) | TIMESTAMP |
| TIMESTAMP_WITH_LOCAL_TIME_ZONE (precision <= 3) | DATETIME |
| TIMESTAMP_WITH_TIME_ZONE (precision > 3) | TIMESTAMP |
| TIMESTAMP_WITH_TIME_ZONE (precision <= 3) | DATETIME |
| ARRAY | ARRAY |
| MAP | MAP |
| ROW | STRUCT |
Examples
SQL API
Source table
Read all data from a partition
Read all data from the partition specified by partition:
CREATE TEMPORARY TABLE odps_source (
cid VARCHAR,
rt DOUBLE
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpointName>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=201809*'
);
CREATE TEMPORARY TABLE blackhole_sink (
cid VARCHAR,
invoke_count BIGINT
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT
cid,
COUNT(*) AS invoke_count
FROM odps_source GROUP BY cid;
Read incremental data
Read data starting from the partition specified by startPartition and continuously monitor new partitions:
CREATE TEMPORARY TABLE odps_source (
cid VARCHAR,
rt DOUBLE
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpointName>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'startPartition' = 'yyyy=2018,MM=09,dd=05' -- Start reading from the 20180905 partition.
);
CREATE TEMPORARY TABLE blackhole_sink (
cid VARCHAR,
invoke_count BIGINT
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT cid, COUNT(*) AS invoke_count
FROM odps_source GROUP BY cid;
Sink table
Write to a static partition
Write to the partition specified by partition:
CREATE TEMPORARY TABLE datagen_source (
id INT,
len INT,
content VARCHAR
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_sink (
id INT,
len INT,
content VARCHAR
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=20180905' -- Write to partition 20180905.
);
INSERT INTO odps_sink
SELECT
id, len, content
FROM datagen_source;
Write to dynamic partitions
Write data to partitions determined at runtime by the values in the ds column:
CREATE TEMPORARY TABLE datagen_source (
id INT,
len INT,
content VARCHAR,
c TIMESTAMP
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_sink (
id INT,
len INT,
content VARCHAR,
ds VARCHAR -- Dynamic partition column.
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds' -- Omit the value; data is routed to partitions based on the ds field.
);
INSERT INTO odps_sink
SELECT
id,
len,
content,
DATE_FORMAT(c, 'yyMMdd') as ds
FROM datagen_source;
Dimension table
Single-value key
Specify a primary key when each key maps to exactly one row:
CREATE TEMPORARY TABLE datagen_source (
k INT,
v VARCHAR
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_dim (
k INT,
v VARCHAR,
PRIMARY KEY (k) NOT ENFORCED -- Specify the primary key.
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=20180905',
'cache' = 'ALL'
);
CREATE TEMPORARY TABLE blackhole_sink (
k VARCHAR,
v1 VARCHAR,
v2 VARCHAR
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT k, s.v, d.v
FROM datagen_source AS s
INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;
Multi-value key
Omit the primary key when a key can map to multiple rows:
CREATE TEMPORARY TABLE datagen_source (
k INT,
v VARCHAR
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_dim (
k INT,
v VARCHAR
-- No primary key needed for multi-value lookups.
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=20180905',
'cache' = 'ALL'
);
CREATE TEMPORARY TABLE blackhole_sink (
k VARCHAR,
v1 VARCHAR,
v2 VARCHAR
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT k, s.v, d.v
FROM datagen_source AS s
INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;
DataStream API
-
To use the DataStream API with MaxCompute, configure a DataStream connector. See Integrate DataStream connectors.
-
VVR 6.0.6+ supports on-premises debugging of DataStream programs with the MaxCompute connector for up to 30 minutes. Sessions exceeding 30 minutes are terminated with an error. See Debug connectors locally.
-
Reading from a MaxCompute Delta table (a table created with a primary key and
transactional=true) is not supported.
Declare the MaxCompute table using SQL, then access it via the Table API or DataStream API.
Connect to the source table
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
"\n",
"CREATE TEMPORARY TABLE IF NOT EXISTS odps_source (",
" cid VARCHAR,",
" rt DOUBLE",
") WITH (",
" 'connector' = 'odps',",
" 'endpoint' = '<yourEndpointName>',",
" 'project' = '<yourProjectName>',",
" 'tableName' = '<yourTableName>',",
" 'accessId' = '<yourAccessId>',",
" 'accessKey' = '<yourAccessPassword>',",
" 'partition' = 'ds=201809*'",
")");
DataStream<Row> source = tEnv.toDataStream(tEnv.from("odps_source"));
source.print();
env.execute("odps source");
Connect to the sink
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
"\n",
"CREATE TEMPORARY TABLE IF NOT EXISTS odps_sink (",
" cid VARCHAR,",
" rt DOUBLE",
") WITH (",
" 'connector' = 'odps',",
" 'endpoint' = '<yourEndpointName>',",
" 'project' = '<yourProjectName>',",
" 'tableName' = '<yourTableName>',",
" 'accessId' = '<yourAccessId>',",
" 'accessKey' = '<yourAccessPassword>',",
" 'partition' = 'ds=20180905'",
")");
DataStream<Row> data = env.fromElements(
Row.of("id0", 3.),
Row.of("id1", 4.));
tEnv.fromDataStream(data).insertInto("odps_sink").execute();
Maven dependency
Add the MaxCompute DataStream connector to your project. All versions are available in the Maven central repository.
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-odps</artifactId>
<version>${vvr-version}</version>
</dependency>