This topic provides the DDL syntax that is used to create an incremental MaxCompute source table, describes the parameters in the WITH clause, and provides data type mappings and FAQ.

What is MaxCompute?

MaxCompute is a fast and fully managed computing platform for large-scale data warehousing. MaxCompute can process exabytes of data. It provides solutions for storing and computing mass structured data in data warehouses and provides analytics and modeling services. For more information about MaxCompute, see What is MaxCompute?.

Prerequisites

A MaxCompute table is created. For more information about how to create a MaxCompute table, see Create tables.

Limits

Only Flink that uses Ververica Runtime (VVR) 2.1.2 or later supports incremental MaxCompute source table connectors.

DDL syntax

create table odps_source(
  id INT,
  user_name VARCHAR,
  content VARCHAR
) with (
  'connector' = 'continuous-odps', 
  'endpoint' = '<yourEndpoint>',
  'tunnelEndpoint' = '<yourTunnelEndpoint>',
  'project' = '<yourProjectName>',
  'tablename' = '<yourTableName>',
  'accessid' = '<yourAccessKeyId>',
  'accesskey' = '<yourAccessKeySecret>',
  'startpartition' = 'ds=2018****'
);
Note
  • The parameters in the WITH clause must be all lowercase.
  • Incremental MaxCompute source tables cannot be used as dimension tables.
  • Incremental MaxCompute source tables must be partitioned tables.

Parameters in the WITH clause

Parameter Description Required Remarks
connector The type of the source table. Yes Set the value to continuous-odps.
endPoint The endpoint of MaxCompute. Yes For more information, see Configure endpoints.
tunnelEndpoint The endpoint of MaxCompute Tunnel. Yes For more information, see Configure endpoints.
project The name of the project to which a table belongs. Yes N/A.
tableName The name of the table. Yes N/A.
accessId The AccessKey ID that is used to access MaxCompute. Yes N/A.
accessKey The AccessKey secret that is used to access MaxCompute. Yes N/A.
startPartition The start partition from which data is read. When the system loads partitioned tables, it compares startPartition with all partitions in each partitioned table in alphabetical order and then loads the data that meets the specified condition from the partitions.
An incremental MaxCompute source table can also continuously listen to incremental MaxCompute partitioned tables. After the source table reads data from the existing partitions, it continues to listen to the generation of new partitions. If a new partition is generated, the source table reads data from the new partition.
Note
  • An incremental MaxCompute source table must have first-level partitions. Second-level partitions are optional.
  • If you specify a second-level partition, make sure that it is placed after a first-level partition.
  • If the partition specified by the startPartition parameter does not exist, the next partition is used as the start partition.
Yes For example, if startPartition is set to ds=20191201, data of all the partitions that meets the condition of ds >= 20191201 in the incremental MaxCompute partitioned table is loaded.
For example, an incremental MaxCompute partitioned table has the first-level partition key column ds and the second-level partition key column type and contains the following partitions:
  • ds=20191201,type=a
  • ds=20191201,type=b
  • ds=20191202,type=a
  • ds=20191202,type=b
  • ds=20191202,type=c
The partitions from which data is read vary based on the setting of startPartition:
  • ds=20191202
    • ds=20191202,type=a
    • ds=20191202,type=b
    • ds=20191202,type=c
  • ds=20191201,type=c
    • ds=20191202,type=a
    • ds=20191202,type=b
    • ds=20191202,type=c

Data type mapping

Data type of MaxCompute Data type of Flink
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
BOOLEAN BOOLEAN
DATETIME TIMESTAMP
TIMESTAMP TIMESTAMP
DECIMAL DECIMAL
BINARY VARBINARY
STRING VARCHAR
Notice
  • Incremental MaxCompute source tables do not support the CHAR, VARCHAR, ARRAY, MAP, or STRUCT data types.
  • You can use the STRING data type instead of the VARCHAR data type.

Sample code

One partition is generated in an incremental MaxCompute source table every day. The partition key column is ds. The incremental MaxCompute source table loads data from partitions whose partition names are greater than or equal to 20191201 and continuously listens to the generation of new partitions.
-- The incremental MaxCompute source table reads data from partitions in the range of [ds=20191201, ∞). 
CREATE TEMPORARY TABLE odps_source (
  cid VARCHAR,
  rt DOUBLE
) with (
  'connector' = 'continuous-odps', 
  'endpoint' = '<yourEndpoint>',
  'tunnelEndpoint' = '<yourTunnelEndpoint>',
  'project' = '<yourProjectName>',
  'tablename' = '<yourTableName>',
  'accessid' = '<yourAccessKeyId>',
  'accesskey' = '<yourAccessKeySecret>',
  'startpartition' = 'ds=2018****'
);

CREATE TEMPORARY TABLE blackhole_sink (
  cid VARCHAR,
  rt DOUBLE
) WITH (
  'connector'='blackhole'
);

INSERT INTO blackhole_sink 
SELECT 
    cid, rt FROM odps_source;