This topic provides the DDL syntax that is used to create a MaxCompute result table, describes the parameters in the WITH clause, and provides data type mappings and sample code.

Note The MaxCompute connector can be used to store data of a source table for streaming deployments and batch deployments.

What is MaxCompute?

MaxCompute is a fast and fully managed computing platform for large-scale data warehousing. MaxCompute can process exabytes of data. It provides solutions for storing and computing mass structured data in data warehouses and provides analytics and modeling services. For more information about MaxCompute, see What is MaxCompute?.

Prerequisites

A MaxCompute table is created. For more information about how to create a MaxCompute table, see Create tables.

Limits

Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports the MaxCompute connector.

Precautions

If you use the MaxCompute connector of a version later than vvr-3.0.4-flink-1.12, a deployment failover may occur. Therefore, we recommend that you use the MaxCompute connector of vvr-3.0.4-flink-1.12 or earlier.

Principles

The MaxCompute sink works in two phases:
  1. Writes data. The MaxCompute sink calls an interface in MaxCompute SDK to write data to the buffer. Then, the sink uploads data to the temporary files of MaxCompute at the specified interval or when the data size in the buffer exceeds 64 MB.
  2. Commits sessions. When a task creates checkpoints, the MaxCompute sink calls the Tunnel commit method to commit sessions and moves temporary files to the data directory of the MaxCompute table. Then, the MaxCompute sink modifies the metadata.
    Note The commit method does not provide atomicity. Therefore, the MaxCompute sink supports at-least-once delivery instead of exactly-once delivery.

DDL syntax

Realtime Compute for Apache Flink allows you to use MaxCompute to store output data. The following code shows an example:
create table odps_sink(
  id INT,
  user_name VARCHAR,
  content VARCHAR
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpoint>',
  'tunnelEndpoint' = '<yourTunnelEndpoint>',
  'project' = '<yourProjectName>',
  'tablename' = '<yourTableName>',
  'accessid' = '<yourAccessKeyId>',
  'accesskey' = '<yourAccessKeySecret>',
  'partition' = 'ds=2018****'
);
Note
  • The sequence and data type of table fields in the DDL statement must be the same as those in the MaxCompute physical table. Otherwise, the data that is queried in the MaxCompute physical table may be null or an error is returned.
  • MaxCompute automatically converts all the field names that are obtained after table creation to lowercase letters. Therefore, all the names of table fields defined in the DDL statement must be lowercase letters to ensure that the field names are the same as those of the MaxCompute physical table. If a field name that is in uppercase letters exists in the DDL statement, an error that indicates a failure to identify the field may be returned during syntax verification. For example, the following error message is returned: org.apache.flink.table.api.ValidationException: SQL validation failed. Unknown column MobileCountryCode!.

Parameters in the WITH clause

ParameterDescriptionRequiredRemarks
connectorThe type of the result table. YesSet the value to odps.
endPointThe endpoint of MaxCompute. YesFor more information, see Endpoints.
tunnelEndpointThe endpoint of MaxCompute Tunnel. YesFor more information, see Endpoints.
Note
  • This parameter is required if MaxCompute is deployed in a virtual private cloud (VPC).
  • The MaxCompute service must reside in the same region as fully managed Flink. If the region that the MaxCompute service resides is different from the region that fully managed Flink resides, the access to the endpoint of MaxCompute Tunnel may fail even if the MaxCompute service can be connected to fully managed Flink.
projectThe name of the MaxCompute project. YesN/A.
tableNameThe name of the table in the database. YesN/A.
accessIdThe AccessKey ID that is used to access MaxCompute. YesN/A.
accessKeyThe AccessKey secret that is used to access MaxCompute. YesN/A.
partitionThe name of a partition. NoThis parameter is required if a partitioned table is used. Take note of the following points:
  • Static partitions

    For example, 'partition' = 'ds=20180905' indicates that data is written to the ds=20180905 partition.

  • Dynamic partitions

    If partition values are not explicitly specified, data is written to the related partitions based on the values of the specified partition key columns. For example, 'partition'='ds' indicates that data is written to the specified partition based on the value of the ds field.

    If you want to create multi-level dynamic partitions, you must make sure that the fields in the partitions are in the same order as those in the MaxCompute physical table. Multiple fields in the partitions are separated by commas (,).

    Note
    • In the CREATE TABLE statement, you must explicitly specify the partition key column that you use to create dynamic partitions.
    • If the dynamic partition key column is empty and ds=null or ds='' exists in the data source, the output is the partition with ds=NULL.
flushIntervalMsThe flush interval for the buffer of a writer in MaxCompute Tunnel.

The MaxCompute sink inserts data into the buffer. Then, the MaxCompute sink writes the data in the buffer to the destination MaxCompute table at an interval that is specified by the flushIntervalMs parameter or when the size of the buffer data exceeds the value that is specified by the batchSize parameter.

NoDefault value: 30000. Unit: milliseconds.
Note This parameter can be used together with the batchSize parameter. The flush operation is triggered when the condition that is specified by the batchSize parameter or the flushIntervalMs parameter is met.
batchSizeThe flush interval for the buffer of a writer in MaxCompute Tunnel.

The MaxCompute sink inserts data into the buffer. Then, the MaxCompute sink writes the data in the buffer to the destination MaxCompute table when the size of the buffer data exceeds the value that is specified by the batchSize parameter.

NoDefault value: 67108864. Unit: bytes.
Note
  • Only Realtime Compute for Apache Flink that uses VVR 4.0.14 or later supports this parameter.
  • This parameter can be used together with the flushIntervalMs parameter. The flush operation is triggered when the condition that is specified by the batchSize parameter or the flushIntervalMs parameter is met.
numFlushThreadsThe number of threads that are used to flush data in the buffer of a writer in MaxCompute Tunnel.

Each MaxCompute sink creates the number of threads that is specified by the numFlushThreads parameter to flush data. If the value of this parameter is greater than 1, the data in different partitions can be flushed at the same time. This improves the flush operation efficiency.

NoDefault value: 1.
Note Only Realtime Compute for Apache Flink that uses VVR 4.0.14 or later supports this parameter.
dynamicPartitionLimitThe maximum number of partitions. NoDefault value: 100. A map in the memory maintains the mappings between the existing partitions to which data is written and TunnelBufferedWriter. If the map size exceeds the value of the dynamicPartitionLimit parameter, the system reports the error message Too many dynamic partitions: 100, which exceeds the size limit: 100.
compressAlgorithmThe compression algorithm used by MaxCompute Tunnel. NoValid values:
  • RAW (no compression)
  • ZLIB
  • SNAPPY

    Compared with ZLIB, SNAPPY can significantly improve the throughput. In a test scenario, the throughput is increased by approximately 50%.

Note
  • Only Realtime Compute for Apache Flink that uses VVR 4.0.13 or later supports this parameter.
  • For VVR 4.0.13 and later, the default value of this parameter is ZLIB. For VVR 6.0.1 and later, the default value of this parameter is SNAPPY.
useStreamTunnelSpecifies whether to use MaxCompute Streaming Tunnel to upload data. No
Valid values:
  • true: MaxCompute Streaming Tunnel is used to upload data.
  • false: MaxCompute Batch Tunnel is used to upload data. This is the default value.

    If the execution of a checkpoint on a deployment that uses MaxCompute Batch Tunnel is slow or even times out and the downstream store allows to receive duplicate data, you can use MaxCompute Streaming Tunnel for the deployment.

Note Only Realtime Compute for Apache Flink that uses VVR 4.0.13 or later supports this parameter. For more information about how to select a MaxCompute tunnel type, see Batch Tunnel and Streaming Tunnel.

Batch Tunnel and Streaming Tunnel

MaxCompute provides the following types of tunnels: Batch Tunnel and Streaming Tunnel. You can select a tunnel type based on your business requirements for consistency and operation efficiency.
Business requirementBatch TunnelStreaming Tunnel
ConsistentCompared with Streaming Tunnel, Batch Tunnel is used to write all data without duplicate data to MaxCompute tables in most cases. The at-least-once semantics is used to ensure that no data is missing.

Duplicate data is generated in specific partitions only if an error occurs during checkpointing and data in a deployment is written to multiple partitions at the same time.

The at-least-once semantics is used to ensure that no data is missing. If a deployment becomes abnormal, duplicate data may be generated.
Operation efficiencyIf you use Batch Tunnel, the overall operation efficiency is lower than the overall efficiency of Streaming Tunnel because you must commit data during checkpointing and create files on the server. You do not need to commit data during checkpointing.

If you use Streaming Tunnel and set the numFlushThreads parameter to a value greater than 1, upstream data can be continuously received during the flush process. Therefore, the overall operation efficiency is higher than the overall operation efficiency of Batch Tunnel.

Data type mappings

Data type of MaxComputeData type of Flink
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
BOOLEANBOOLEAN
DATETIMETIMESTAMP
TIMESTAMPTIMESTAMP
VARCHARVARCHAR
STRINGVARCHAR
BINARYVARBINARY
Note Only Realtime Compute for Apache Flink that uses VVR 6.0.1 and later supports the VARBINARY data type.
DECIMALDECIMAL

Sample code

  • Write data to a static partition
    CREATE TEMPORARY TABLE datagen_source (
      id INT,
      len INT,
      content VARCHAR
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE odps_sink (
      id INT,
      len INT,
      content VARCHAR
    ) WITH (
      'connector' = 'odps',
      'endpoint' = '<yourEndpoint>',
      'tunnelEndpoint' = '<yourTunnelEndpoint>',
      'project' = '<yourProjectName>',
      'tablename' = '<yourTableName>',
      'accessid' = '<yourAccessKeyId>',
      'accesskey' = '<yourAccessKeySecret>',
      'partition' = 'ds=20180905'
    );
    
    INSERT INTO odps_sink 
    SELECT 
      id, len, content 
    FROM datagen_source;
  • Write data to a dynamic partition
    CREATE TEMPORARY TABLE datagen_source (
      id INT,
      len INT,
      content VARCHAR,
      c TIMESTAMP 
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE odps_sink (
      id  INT,
      len INT,
      content VARCHAR,
      ds VARCHAR -- The partition key column that you use to create dynamic partitions must be explicitly specified. 
    ) WITH (
      'connector' = 'odps',
      'endpoint' = '<yourEndpoint>',
      'tunnelEndpoint' = '<yourTunnelEndpoint>',
      'project' = '<yourProjectName>',
      'tablename' = '<yourTableName>',
      'accessid' = '<yourAccessKeyId>',
      'accesskey' = '<yourAccessKeySecret>',
      'partition' = 'ds' -- The partition value is not provided. This means that data is written to a partition that is specified by the ds field. 
    );
    
    INSERT INTO odps_sink 
    SELECT 
       id, 
       len, 
       content,
       DATE_FORMAT(c, 'yyMMdd') as ds
    FROM datagen_source;

FAQ

What do I do if the error message "ErrorMessage=Authorization Failed [4019], You have NO privilege'ODPS:***'" appears?