This topic provides the DDL syntax that is used to create an incremental MaxCompute source table, describes the parameters in the WITH clause, and provides data type mappings and answers to some frequently asked questions.

What is MaxCompute?

MaxCompute is a fast and fully managed computing platform for large-scale data warehousing. MaxCompute can process exabytes of data. It provides solutions for storing and computing mass structured data in data warehouses and provides analytics and modeling services. For more information about MaxCompute, see What is MaxCompute?.

Prerequisites

A MaxCompute table is created. For more information about how to create a MaxCompute table, see Create tables.

Limits

  • Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 2.1.2 or later supports the incremental MaxCompute connector.
  • Incremental MaxCompute source tables cannot be used as dimension tables.
  • Incremental MaxCompute source tables must be partitioned tables.

Precautions

If you use the MaxCompute connector that uses the Flink compute engine of a version later than vvr-3.0.4-flink-1.12, a deployment failover may occur. Therefore, we recommend that you use the MaxCompute connector that uses the Flink compute engine of vvr-3.0.4-flink-1.12 or earlier.

DDL syntax

CREATE TABLE odps_source(
  id INT,
  user_name VARCHAR,
  content VARCHAR
) WITH (
  'connector' = 'continuous-odps', 
  'endpoint' = '<yourEndpoint>',
  'tunnelEndpoint' = '<yourTunnelEndpoint>',
  'project' = '<yourProjectName>',
  'tablename' = '<yourTableName>',
  'accessid' = '<yourAccessKeyId>',
  'accesskey' = '<yourAccessKeySecret>',
  'startpartition' = 'ds=2018****'
);
Note
  • The sequence and data type of table fields in the DDL statement must be the same as those in the MaxCompute physical table. Otherwise, the data that is queried in the MaxCompute physical table may be null or an error is returned.
  • MaxCompute automatically converts all the field names that are obtained after table creation to lowercase letters. Therefore, all the names of table fields defined in the DDL statement must be lowercase letters to ensure that the field names are the same as those of the MaxCompute physical table. If a field name that is in uppercase letters exists in the DDL statement, an error that indicates a failure to identify the field may be returned during syntax verification. For example, the following error message is returned: org.apache.flink.table.api.ValidationException: SQL validation failed. Unknown column MobileCountryCode!.

Parameters in the WITH clause

Parameter Description Required Remarks
connector The type of the source table. Yes Set the value to continuous-odps.
endPoint The endpoint of MaxCompute. Yes For more information, see Endpoints.
tunnelEndpoint The endpoint of MaxCompute Tunnel. Yes For more information, see Endpoints.
Note This parameter is required if MaxCompute is deployed in a virtual private cloud (VPC).
project The name of the project to which a table belongs. Yes N/A.
tableName The name of the table in the database. Yes N/A.
accessId The AccessKey ID that is used to access MaxCompute. Yes N/A.
accessKey The AccessKey secret that is used to access MaxCompute. Yes N/A.
startPartition The start partition from which data is read. When the system loads partitioned tables, the system compares startPartition with all partitions in each partitioned table in alphabetical order and then loads the data that meets the specified condition from the partitions.
An incremental MaxCompute source table can also continuously listen to incremental MaxCompute partitioned tables. After the source table reads data from the existing partitions, it continues to listen to the generation of new partitions. After a new partition is generated, the source table reads data from the new partition.
Note
  • An incremental MaxCompute source table must have first-level partitions. Second-level partitions are optional.
  • If you specify a second-level partition, make sure that the second-level partition is placed after a first-level partition.
  • If the partition specified by the startPartition parameter does not exist, the next partition is used as the start partition.
Yes For example, if startPartition is set to ds=20191201, data of all the partitions that meets the condition of ds >= 20191201 in the incremental MaxCompute partitioned table is loaded.
For example, an incremental MaxCompute partitioned table has the first-level partition key column ds and the second-level partition key column type and contains the following partitions:
  • ds=20191201,type=a
  • ds=20191201,type=b
  • ds=20191202,type=a
  • ds=20191202,type=b
  • ds=20191202,type=c
The partitions from which data is read vary based on the setting of startPartition:
  • ds=20191202
    • ds=20191202,type=a
    • ds=20191202,type=b
    • ds=20191202,type=c
  • ds=20191201,type=c
    • ds=20191202,type=a
    • ds=20191202,type=b
    • ds=20191202,type=c
compressAlgorithm The compression algorithm that is used by MaxCompute Tunnel. No Valid values:
  • RAW (not compressed)
  • ZLIB
  • SNAPPY

    Compared with ZLIB, SNAPPY can significantly improve the throughput. In test scenarios, the throughput is increased by about 50%.

Note
  • Only Realtime Compute for Apache Flink that uses VVR 4.0.13 or later supports this parameter.
  • For VVR 4.0.13 and later, the default value of this parameter is ZLIB. For VVR 6.0.1 and later, the default value of this parameter is SNAPPY.

Data type mappings

Data type of MaxCompute Data type of Flink
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
BOOLEAN BOOLEAN
DATETIME TIMESTAMP
TIMESTAMP TIMESTAMP
DECIMAL DECIMAL
BINARY VARBINARY
STRING VARCHAR
Important
  • Incremental MaxCompute source tables do not support the CHAR, VARCHAR, ARRAY, MAP, or STRUCT data type.
  • You can use the STRING data type instead of the VARCHAR data type.

Sample code

One partition is generated in an incremental MaxCompute source table every day. The partition key column is ds. The incremental MaxCompute source table loads data from partitions whose partition names are greater than or equal to 20191201 and continuously listens to the generation of new partitions.
-- The incremental MaxCompute source table reads data from partitions in the range of [ds=20191201, ∞). 
CREATE TEMPORARY TABLE odps_source (
  cid VARCHAR,
  rt DOUBLE
) WITH (
  'connector' = 'continuous-odps', 
  'endpoint' = '<yourEndpoint>',
  'tunnelEndpoint' = '<yourTunnelEndpoint>',
  'project' = '<yourProjectName>',
  'tablename' = '<yourTableName>',
  'accessid' = '<yourAccessKeyId>',
  'accesskey' = '<yourAccessKeySecret>',
  'startpartition' = 'ds=20191201'
);

CREATE TEMPORARY TABLE blackhole_sink (
  cid VARCHAR,
  rt DOUBLE
) WITH (
  'connector'='blackhole'
);

INSERT INTO blackhole_sink 
SELECT 
    cid, rt FROM odps_source;