This topic provides the DDL syntax that is used to create a full MaxCompute source table, describes the parameters in the WITH clause, and provides data type mappings and FAQ.

Note A MaxCompute connector can be used to store data of a source table for streaming jobs and batch jobs.

What is MaxCompute?

MaxCompute is a fast and fully managed computing platform for large-scale data warehousing. MaxCompute can process exabytes of data. It provides solutions for storing and computing mass structured data in data warehouses and provides analytics and modeling services. For more information about MaxCompute, see What is MaxCompute?.

Prerequisites

A MaxCompute table is created. For more information about how to create a MaxCompute table, see Create tables.

Limits

Only Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports full MaxCompute source table connectors.

DDL syntax

create table odps_source(
  id INT,
  user_name VARCHAR,
  content VARCHAR
) with (
  'connector' = 'odps', 
  'endpoint' = '<yourEndpoint>',
  'tunnelEndpoint' = '<yourTunnelEndpoint>',
  'project' = '<yourProjectName>',
  'tablename' = '<yourTableName>',
  'accessid' = '<yourAccessKeyId>',
  'accesskey' = '<yourAccessKeySecret>',
  'partition' = 'ds=2018****'
);
Note The parameters in the WITH clause must be all lowercase.

Parameters in the WITH clause

Parameter Description Required Remarks
connector The type of the source table. Yes Set the value to odps.
endPoint The endpoint of MaxCompute. Yes For more information, see Endpoints in different regions (Internet).
tunnelEndpoint The endpoint of MaxCompute Tunnel. No For more information, see Endpoints in different regions (Internet).
Note This parameter is required if MaxCompute is deployed in a virtual private cloud (VPC).
project The name of a MaxCompute project. Yes N/A.
tableName The name of the MaxCompute table. Yes N/A.
accessId The AccessKey ID that is used to access MaxCompute. Yes N/A.
accessKey The AccessKey secret that is used to access MaxCompute. Yes N/A.
partition The name of a partition. No
  • A MaxCompute table that has only one level of partitions

    For example, if only one partition key column ds exists, `partition` = 'ds=20180905' indicates that data in the ds=20180905 partition is read.

  • A MaxCompute table that has multiple levels of partitions

    For example, if two partition key columns ds and hh exist, `partition`='ds=20180905,hh=*' indicates that the data in the ds=20180905 partition is read.

    Note You must declare the values of all partitions when you filter partitions. In the preceding example, if you declare only 'partition' = 'ds=20180905', no partition data is read.

Data type mapping

Data type of MaxCompute Data type of Flink
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
BOOLEAN BOOLEAN
DATETIME TIMESTAMP
TIMESTAMP TIMESTAMP
VARCHAR VARCHAR
DECIMAL DECIMAL
BINARY VARBINARY
STRING VARCHAR

Sample code

CREATE TEMPORARY TABLE odps_source (
  cid varchar,
  rt DOUBLE
) with (
  'connector' = 'odps', 
  'endpoint' = '<yourEndpointName>', 
  'tunnelEndpoint' = '<yourTunnelEndpoint>',
  'project' = '<yourProjectName>',
  'tablename' = '<yourTableName>',
  'accessid' = '<yourAccessId>',
  'accesskey' = '<yourAccessPassword>',
  'partition' = 'ds=20180905'
);

CREATE TEMPORARY TABLE blackhole_sink (
  cid varchar,
  invoke_count BIGINT
) with (
  'connector'='blackhole'
);

INSERT INTO blackhole_sink 
SELECT 
   cid,
   count(*) as invoke_count
FROM odps_source GROUP BY cid;