This topic provides the DDL syntax that is used to create an incremental MaxCompute source table, describes the parameters in the WITH clause, and provides data type mappings and answers to some frequently asked questions.
What is MaxCompute?
MaxCompute is a fast and fully managed computing platform for large-scale data warehousing. MaxCompute can process exabytes of data. It provides solutions for storing and computing mass structured data in data warehouses and provides analytics and modeling services. For more information about MaxCompute, see What is MaxCompute?.
Prerequisites
A MaxCompute table is created. For more information about how to create a MaxCompute table, see Create tables.
Limits
- Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 2.1.2 or later supports the incremental MaxCompute connector.
- Incremental MaxCompute source tables cannot be used as dimension tables.
- Incremental MaxCompute source tables must be partitioned tables.
Precautions
If you use the MaxCompute connector that uses the Flink compute engine of a version later than vvr-3.0.4-flink-1.12, a deployment failover may occur. Therefore, we recommend that you use the MaxCompute connector that uses the Flink compute engine of vvr-3.0.4-flink-1.12 or earlier.
DDL syntax
CREATE TABLE odps_source(
id INT,
user_name VARCHAR,
content VARCHAR
) WITH (
'connector' = 'continuous-odps',
'endpoint' = '<yourEndpoint>',
'tunnelEndpoint' = '<yourTunnelEndpoint>',
'project' = '<yourProjectName>',
'tablename' = '<yourTableName>',
'accessid' = '<yourAccessKeyId>',
'accesskey' = '<yourAccessKeySecret>',
'startpartition' = 'ds=2018****'
);
- The sequence and data type of table fields in the DDL statement must be the same as those in the MaxCompute physical table. Otherwise, the data that is queried in the MaxCompute physical table may be null or an error is returned.
- MaxCompute automatically converts all the field names that are obtained after table
creation to lowercase letters. Therefore, all the names of table fields defined in
the DDL statement must be lowercase letters to ensure that the field names are the
same as those of the MaxCompute physical table. If a field name that is in uppercase
letters exists in the DDL statement, an error that indicates a failure to identify
the field may be returned during syntax verification. For example, the following error
message is returned:
org.apache.flink.table.api.ValidationException: SQL validation failed. Unknown column MobileCountryCode!
.
Parameters in the WITH clause
Parameter | Description | Required | Remarks |
---|---|---|---|
connector | The type of the source table. | Yes | Set the value to continuous-odps .
|
endPoint | The endpoint of MaxCompute. | Yes | For more information, see Endpoints. |
tunnelEndpoint | The endpoint of MaxCompute Tunnel. | Yes | For more information, see Endpoints.
Note This parameter is required if MaxCompute is deployed in a virtual private cloud (VPC).
|
project | The name of the project to which a table belongs. | Yes | N/A. |
tableName | The name of the table in the database. | Yes | N/A. |
accessId | The AccessKey ID that is used to access MaxCompute. | Yes | N/A. |
accessKey | The AccessKey secret that is used to access MaxCompute. | Yes | N/A. |
startPartition | The start partition from which data is read. When the system loads partitioned tables,
the system compares startPartition with all partitions in each partitioned table in alphabetical order and then loads
the data that meets the specified condition from the partitions.
An incremental MaxCompute source table can also continuously listen to incremental
MaxCompute partitioned tables. After the source table reads data from the existing
partitions, it continues to listen to the generation of new partitions. After a new
partition is generated, the source table reads data from the new partition.
Note
|
Yes | For example, if startPartition is set to ds=20191201, data of all the partitions that meets the condition of ds >= 20191201 in the incremental MaxCompute partitioned table is loaded.
For example, an incremental MaxCompute partitioned table has the first-level partition
key column ds and the second-level partition key column type and contains the following
partitions:
|
compressAlgorithm | The compression algorithm that is used by MaxCompute Tunnel. | No | Valid values:
Note
|
Data type mappings
Data type of MaxCompute | Data type of Flink |
---|---|
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
DATETIME | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
DECIMAL | DECIMAL |
BINARY | VARBINARY |
STRING | VARCHAR |
- Incremental MaxCompute source tables do not support the CHAR, VARCHAR, ARRAY, MAP, or STRUCT data type.
- You can use the STRING data type instead of the VARCHAR data type.
Sample code
-- The incremental MaxCompute source table reads data from partitions in the range of [ds=20191201, ∞).
CREATE TEMPORARY TABLE odps_source (
cid VARCHAR,
rt DOUBLE
) WITH (
'connector' = 'continuous-odps',
'endpoint' = '<yourEndpoint>',
'tunnelEndpoint' = '<yourTunnelEndpoint>',
'project' = '<yourProjectName>',
'tablename' = '<yourTableName>',
'accessid' = '<yourAccessKeyId>',
'accesskey' = '<yourAccessKeySecret>',
'startpartition' = 'ds=20191201'
);
CREATE TEMPORARY TABLE blackhole_sink (
cid VARCHAR,
rt DOUBLE
) WITH (
'connector'='blackhole'
);
INSERT INTO blackhole_sink
SELECT
cid, rt FROM odps_source;