This topic provides the DDL syntax that is used to create a MaxCompute result table, describes the parameters in the WITH clause, and provides data type mappings and sample code.
What is MaxCompute?
MaxCompute is a fast and fully managed computing platform for large-scale data warehousing. MaxCompute can process exabytes of data. It provides solutions for storing and computing mass structured data in data warehouses and provides analytics and modeling services. For more information about MaxCompute, see What is MaxCompute?.
Prerequisites
A MaxCompute table is created. For more information about how to create a MaxCompute table, see Create tables.
Limits
Only Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports the MaxCompute connector.
Precautions
If you use the MaxCompute connector of a version later than vvr-3.0.4-flink-1.12, a job failover may occur. Therefore, we recommend that you use the MaxCompute connector of vvr-3.0.4-flink-1.12 or earlier.
Principles
- Writes data. The MaxCompute sink calls an interface in the MaxCompute SDK to write data to the buffer. Then, the sink uploads data to the temporary files of MaxCompute at the specified interval or when the data size in the buffer exceeds 64 MB.
- Commits sessions. When a task creates checkpoints, the MaxCompute sink calls the Tunnel
commit method to commit sessions and moves temporary files to the data directory of
the MaxCompute table. Then, the MaxCompute sink modifies the metadata.
Note The commit method does not provide atomicity. Therefore, the MaxCompute sink supports at-least-once delivery instead of exactly-once delivery.
DDL syntax
create table odps_sink(
id INT,
user_name VARCHAR,
content VARCHAR
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'tunnelEndpoint' = '<yourTunnelEndpoint>',
'project' = '<yourProjectName>',
'tablename' = '<yourTableName>',
'accessid' = '<yourAccessKeyId>',
'accesskey' = '<yourAccessKeySecret>',
'partition' = 'ds=2018****'
);
/n
.
Parameters in the WITH clause
Parameter | Description | Required | Remarks |
---|---|---|---|
connector | The type of the result table. | Yes | Set the value to odps .
|
endPoint | The endpoint of MaxCompute. | Yes | For more information, see Endpoints. |
tunnelEndpoint | The endpoint of MaxCompute Tunnel. | Yes | For more information, see Endpoints.
Note This parameter is required if MaxCompute is deployed in a virtual private cloud (VPC).
|
project | The name of the MaxCompute project. | Yes | N/A. |
tableName | The name of the table. | Yes | N/A. |
accessId | The AccessKey ID that is used to access MaxCompute. | Yes | N/A. |
accessKey | The AccessKey secret that is used to access MaxCompute. | Yes | N/A. |
partition | The name of a partition. | No | This parameter is required if a partitioned table is used. Take note of the following
points:
|
flushIntervalMs | The flush interval for the buffer of a writer in MaxCompute Tunnel.
The MaxCompute sink inserts data into the buffer. Then, the MaxCompute sink writes the data in the buffer to the destination MaxCompute table at an interval that is specified by the flushIntervalMs parameter. The sink also writes the data to the destination table when the size of the buffer data exceeds the specified threshold. |
No | Default value: 30000. Unit: milliseconds. |
dynamicPartitionLimit | The maximum number of partitions. | No | Default value: 100. A map in the memory maintains the mappings between the existing
partitions to which data is written and TunnelBufferedWriter. If the map size exceeds
the value of the dynamicPartitionLimit parameter, the system reports the error message Too many dynamic partitions: 100, which exceeds the size limit: 100 .
|
compressAlgorithm | The compression algorithm used by MaxCompute Tunnel. | No | Valid values:
Note
|
useStreamTunnel | Specifies whether to use MaxCompute Streaming Tunnel to upload data. | No |
Valid values:
Note Only Flink that uses VVR 4.0.13 or later supports this parameter.
|
Data type mappings
Data type of MaxCompute | Data type of Flink |
---|---|
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
DATETIME | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
VARCHAR | VARCHAR |
STRING | VARCHAR |
DECIMAL | DECIMAL |
Sample code
- Write data to a static partition
CREATE TEMPORARY TABLE datagen_source ( id INT, len INT, content VARCHAR ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE odps_sink ( id INT, len INT, content VARCHAR ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpoint>', 'tunnelEndpoint' = '<yourTunnelEndpoint>', 'project' = '<yourProjectName>', 'tablename' = '<yourTableName>', 'accessid' = '<yourAccessKeyId>', 'accesskey' = '<yourAccessKeySecret>', 'partition' = 'ds=20180905' ); INSERT INTO odps_sink SELECT id, len, content FROM datagen_source;
- Write data to a dynamic partition
CREATE TEMPORARY TABLE datagen_source ( id INT, len INT, content VARCHAR, c TIMESTAMP ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE odps_sink ( id INT, len INT, content VARCHAR, ds VARCHAR -- The partition key column that you use to create dynamic partitions must be explicitly specified. ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpoint>', 'tunnelEndpoint' = '<yourTunnelEndpoint>', 'project' = '<yourProjectName>', 'tablename' = '<yourTableName>', 'accessid' = '<yourAccessKeyId>', 'accesskey' = '<yourAccessKeySecret>', 'partition' = 'ds' -- The partition value is not provided. This means that data is written to a partition specified by the ds field. ); INSERT INTO odps_sink SELECT id, len, content, DATE_FORMAT(c, 'yyMMdd') as ds FROM datagen_source;