This topic provides the DDL syntax that is used to create a full MaxCompute source table, describes the parameters in the WITH clause, and provides data type mappings and answers to some frequently asked questions.

Note A MaxCompute connector can be used to store data of a source table for streaming deployments and batch deployments.

What is MaxCompute?

MaxCompute is a fast and fully managed computing platform for large-scale data warehousing. MaxCompute can process exabytes of data. It provides solutions for storing and computing mass structured data in data warehouses and provides analytics and modeling services. For more information about MaxCompute, see What is MaxCompute?.

Prerequisites

A MaxCompute table is created. For more information about how to create a MaxCompute table, see Create tables.

Limits

Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports the MaxCompute connector.

Precautions

If you use the MaxCompute connector of a version later than vvr-3.0.4-flink-1.12, a deployment failover may occur. Therefore, we recommend that you use the MaxCompute connector of vvr-3.0.4-flink-1.12 or earlier.

DDL syntax

create table odps_source(
  id INT,
  user_name VARCHAR,
  content VARCHAR
) with (
  'connector' = 'odps', 
  'endpoint' = '<yourEndpoint>',
  'tunnelEndpoint' = '<yourTunnelEndpoint>',
  'project' = '<yourProjectName>',
  'tablename' = '<yourTableName>',
  'accessid' = '<yourAccessKeyId>',
  'accesskey' = '<yourAccessKeySecret>',
  'partition' = 'ds=2018****'
);
Note
  • The sequence and data type of table fields in the DDL statement must be the same as those in the MaxCompute physical table. Otherwise, the data that is queried in the MaxCompute physical table may be null or an error is returned.
  • MaxCompute automatically converts all the field names that are obtained after table creation to lowercase letters. Therefore, all the names of table fields defined in the DDL statement must be lowercase letters to ensure that the field names are the same as those of the MaxCompute physical table. If a field name that is in uppercase letters exists in the DDL statement, an error that indicates a failure to identify the field may be returned during syntax verification. For example, the following error message is returned: org.apache.flink.table.api.ValidationException: SQL validation failed. Unknown column MobileCountryCode!.

Parameters in the WITH clause

ParameterDescriptionRequiredRemarks
connectorThe type of the source table. YesSet the value to odps.
endpointThe endpoint of MaxCompute. YesFor more information, see Endpoints.
tunnelEndpointThe endpoint of MaxCompute Tunnel. NoFor more information, see Endpoints.
Note This parameter is required if MaxCompute is deployed in a virtual private cloud (VPC).
projectThe name of the MaxCompute project. YesN/A.
tablenameThe name of the MaxCompute table. YesN/A.
accessidThe AccessKey ID that is used to access MaxCompute. YesN/A.
accesskeyThe AccessKey secret that is used to access MaxCompute. YesN/A.
partitionThe name of a partition. No
  • A MaxCompute table that has only one level of partitions

    For example, if only one partition key column ds exists, 'partition' = 'ds=20180905' indicates that data in the ds=20180905 partition is read.

  • A MaxCompute table that has multiple levels of partitions

    For example, if two partition key columns ds and hh exist, 'partition'='ds=20180905,hh=*' indicates that data in the ds=20180905 partition is read.

    Note When you filter partitions, you must declare the values of all partitions. In the preceding example, if you declare only 'partition' = 'ds=20180905', no partition data is read.
compressAlgorithmThe compression algorithm used by MaxCompute Tunnel. NoValid values:
  • RAW (no compression)
  • ZLIB
  • SNAPPY

    Compared with ZLIB, SNAPPY can significantly improve the throughput. In test scenarios, the throughput is increased by about 50%.

Note
  • Only Realtime Compute for Apache Flink that uses VVR 4.0.13 or later supports this parameter.
  • For VVR 4.0.13 and later, the default value of this parameter is ZLIB. For VVR 6.0.1 and later, the default value of this parameter is SNAPPY.

Data type mappings

Data type of MaxComputeData type of Flink
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
BOOLEANBOOLEAN
DATETIMETIMESTAMP
TIMESTAMPTIMESTAMP
VARCHARVARCHAR
DECIMALDECIMAL
BINARYVARBINARY
STRINGVARCHAR

Sample code

CREATE TEMPORARY TABLE odps_source (
  cid varchar,
  rt DOUBLE
) with (
  'connector' = 'odps', 
  'endpoint' = '<yourEndpointName>', 
  'tunnelEndpoint' = '<yourTunnelEndpoint>',
  'project' = '<yourProjectName>',
  'tablename' = '<yourTableName>',
  'accessid' = '<yourAccessId>',
  'accesskey' = '<yourAccessPassword>',
  'partition' = 'ds=20180905'
);

CREATE TEMPORARY TABLE blackhole_sink (
  cid varchar,
  invoke_count BIGINT
) with (
  'connector'='blackhole'
);

INSERT INTO blackhole_sink 
SELECT 
   cid,
   count(*) as invoke_count
FROM odps_source GROUP BY cid;