This topic provides the DDL syntax that is used to create a full MaxCompute source table, describes the parameters in the WITH clause, and provides data type mappings and answers to some frequently asked questions.
What is MaxCompute?
MaxCompute is a fast and fully managed computing platform for large-scale data warehousing. MaxCompute can process exabytes of data. It provides solutions for storing and computing mass structured data in data warehouses and provides analytics and modeling services. For more information about MaxCompute, see What is MaxCompute?.
Prerequisites
A MaxCompute table is created. For more information about how to create a MaxCompute table, see Create tables.
Limits
Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports the MaxCompute connector.
Precautions
If you use the MaxCompute connector of a version later than vvr-3.0.4-flink-1.12, a deployment failover may occur. Therefore, we recommend that you use the MaxCompute connector of vvr-3.0.4-flink-1.12 or earlier.
DDL syntax
create table odps_source(
id INT,
user_name VARCHAR,
content VARCHAR
) with (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'tunnelEndpoint' = '<yourTunnelEndpoint>',
'project' = '<yourProjectName>',
'tablename' = '<yourTableName>',
'accessid' = '<yourAccessKeyId>',
'accesskey' = '<yourAccessKeySecret>',
'partition' = 'ds=2018****'
);
- The sequence and data type of table fields in the DDL statement must be the same as those in the MaxCompute physical table. Otherwise, the data that is queried in the MaxCompute physical table may be null or an error is returned.
- MaxCompute automatically converts all the field names that are obtained after table creation to lowercase letters. Therefore, all the names of table fields defined in the DDL statement must be lowercase letters to ensure that the field names are the same as those of the MaxCompute physical table. If a field name that is in uppercase letters exists in the DDL statement, an error that indicates a failure to identify the field may be returned during syntax verification. For example, the following error message is returned:
org.apache.flink.table.api.ValidationException: SQL validation failed. Unknown column MobileCountryCode!
.
Parameters in the WITH clause
Parameter | Description | Required | Remarks |
---|---|---|---|
connector | The type of the source table. | Yes | Set the value to odps . |
endpoint | The endpoint of MaxCompute. | Yes | For more information, see Endpoints. |
tunnelEndpoint | The endpoint of MaxCompute Tunnel. | No | For more information, see Endpoints. Note This parameter is required if MaxCompute is deployed in a virtual private cloud (VPC). |
project | The name of the MaxCompute project. | Yes | N/A. |
tablename | The name of the MaxCompute table. | Yes | N/A. |
accessid | The AccessKey ID that is used to access MaxCompute. | Yes | N/A. |
accesskey | The AccessKey secret that is used to access MaxCompute. | Yes | N/A. |
partition | The name of a partition. | No |
|
compressAlgorithm | The compression algorithm used by MaxCompute Tunnel. | No | Valid values:
Note
|
Data type mappings
Data type of MaxCompute | Data type of Flink |
---|---|
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
DATETIME | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
VARCHAR | VARCHAR |
DECIMAL | DECIMAL |
BINARY | VARBINARY |
STRING | VARCHAR |
Sample code
CREATE TEMPORARY TABLE odps_source (
cid varchar,
rt DOUBLE
) with (
'connector' = 'odps',
'endpoint' = '<yourEndpointName>',
'tunnelEndpoint' = '<yourTunnelEndpoint>',
'project' = '<yourProjectName>',
'tablename' = '<yourTableName>',
'accessid' = '<yourAccessId>',
'accesskey' = '<yourAccessPassword>',
'partition' = 'ds=20180905'
);
CREATE TEMPORARY TABLE blackhole_sink (
cid varchar,
invoke_count BIGINT
) with (
'connector'='blackhole'
);
INSERT INTO blackhole_sink
SELECT
cid,
count(*) as invoke_count
FROM odps_source GROUP BY cid;