This topic provides the DDL syntax that is used to create a MaxCompute result table, describes the parameters in the WITH clause, and provides data type mappings and sample code.

Note A MaxCompute connector can be used to store data of a result table for streaming jobs and batch jobs.

What is MaxCompute?

MaxCompute is a fast and fully managed computing platform for large-scale data warehousing. MaxCompute can process exabytes of data. It provides solutions for storing and computing large amounts of structured data in data warehouses and provides analytics and modeling services. For more information about MaxCompute, see What is MaxCompute?.

Prerequisites

A MaxCompute table is created. For more information about how to create a MaxCompute table, see Create tables.

Limits

Only Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports MaxCompute connectors.

Principles

The MaxCompute sink works in two phases:
  1. Writes data. The MaxCompute sink calls an interface in the MaxCompute SDK to write data to the buffer. Then, the sink uploads data to the temporary files of MaxCompute at the specified interval or when the data size in the buffer exceeds 64 MB.
  2. Commits sessions. When a task creates checkpoints, the MaxCompute sink calls the Tunnel commit method to commit sessions and moves temporary files to the data directory of the MaxCompute table. Then, the MaxCompute sink modifies the metadata.
    Note The commit method does not provide atomicity. Therefore, the MaxCompute sink supports at-least-once delivery instead of exactly-once delivery.

DDL syntax

Flink allows you to use MaxCompute to store output data. The following code shows an example:
create table odps_sink(
  id INT,
  user_name VARCHAR,
  content VARCHAR
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpoint>',
  'tunnelEndpoint' = '<yourTunnelEndpoint>',
  'project' = '<yourProjectName>',
  'tablename' = '<yourTableName>',
  'accessid' = '<yourAccessKeyId>',
  'accesskey' = '<yourAccessKeySecret>',
  'partition' = 'ds=2018****'
);
Note
  • The parameters in the WITH clause must be all lowercase.
  • The name, sequence, and type of fields in the DDL statement must be the same as those in the MaxCompute physical table. Otherwise, the data that is queried in the MaxCompute physical table may be /n.

Parameters in the WITH clause

Parameter Description Required Remarks
connector The type of the result table. Yes Set the value to odps.
endPoint The endpoint of MaxCompute. Yes For more information, see Endpoints.
tunnelEndpoint The endpoint of MaxCompute Tunnel. Yes For more information, see Endpoints.
Note This parameter is required if MaxCompute is deployed in a virtual private cloud (VPC).
project The name of a MaxCompute project. Yes N/A.
tableName The name of the table. Yes N/A.
accessId The AccessKey ID that is used to access MaxCompute. Yes N/A.
accessKey The AccessKey secret that is used to access MaxCompute. Yes N/A.
partition The name of a partition. No This parameter is required if a partitioned table is used. Take note of the following points:
  • Static partitions

    For example, `partition`='ds=20180905' indicates that data is written to the ds=20180905 partition.

  • Dynamic partitions

    If partition values are not explicitly specified, data is written to the related partitions based on the values of the specified partition key columns. For example, `partition`='ds' indicates that data is written to a specified partition based on the value of the ds field.

    If you want to create multi-level dynamic partitions, you must make sure that the fields in the partitions are in the same order as those in the MaxCompute physical table. Multiple fields in the partitions are separated by commas (,).

    Note
    • In the CREATE TABLE statement, you must explicitly specify the partition key column that you use to create dynamic partitions.
    • If the dynamic partition key column is empty and ds=null or ds='' exists in the data source, the output is the partition with ds=NULL.
flushIntervalMs The flush interval for the buffer of a writer in MaxCompute Tunnel.

The MaxCompute sink inserts data into the buffer. Then, the MaxCompute sink writes the data in the buffer to the destination MaxCompute table at an interval that is specified by the flushIntervalMs parameter. The sink also writes the data to the destination table when the size of the buffer data exceeds the specified threshold.

No Default value: 30000. Unit: milliseconds.
dynamicPartitionLimit The maximum number of partitions. No Default value: 100. A map in the memory maintains the mappings between the existing partitions to which data is written and TunnelBufferedWriter. If the map size exceeds the value of the dynamicPartitionLimit parameter, the system reports the error message Too many dynamic partitions: 100, which exceeds the size limit: 100.

Data type mapping

Data type of MaxCompute Data type of Flink
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
BOOLEAN BOOLEAN
DATETIME TIMESTAMP
TIMESTAMP TIMESTAMP
VARCHAR VARCHAR
STRING VARCHAR
DECIMAL DECIMAL

Sample code

  • Write data to a static partition
    CREATE TEMPORARY TABLE datagen_source (
      id INT,
      len INT,
      content VARCHAR
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE odps_sink (
      id INT,
      len INT,
      content VARCHAR
    ) WITH (
      'connector' = 'odps',
      'endpoint' = '<yourEndpoint>',
      'tunnelEndpoint' = '<yourTunnelEndpoint>',
      'project' = '<yourProjectName>',
      'tablename' = '<yourTableName>',
      'accessid' = '<yourAccessKeyId>',
      'accesskey' = '<yourAccessKeySecret>',
      'partition' = 'ds=20180905'
    );
    
    INSERT INTO odps_sink 
    SELECT 
      id, len, content 
    FROM datagen_source;
  • Write data to a dynamic partition
    CREATE TEMPORARY TABLE datagen_source (
      id INT,
      len INT,
      content VARCHAR,
      c TIMESTAMP 
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE odps_sink (
      id  INT,
      len INT,
      content VARCHAR,
      ds VARCHAR --The partition key column that you use to create dynamic partitions must be explicitly specified. 
    ) WITH (
      'connector' = 'odps',
      'endpoint' = '<yourEndpoint>',
      'tunnelEndpoint' = '<yourTunnelEndpoint>',
      'project' = '<yourProjectName>',
      'tablename' = '<yourTableName>',
      'accessid' = '<yourAccessKeyId>',
      'accesskey' = '<yourAccessKeySecret>',
      `partition`='ds' --The partition value is not provided. This means that data is written to a partition specified by the ds field. 
    );
    
    INSERT INTO odps_sink 
    SELECT 
       id, 
       len, 
       content,
       DATE_FORMAT(c, 'yyMMdd') as ds
    FROM datagen_source;