本文为您介绍全量MaxCompute源表DDL定义、WITH参数、类型映射和常见问题。

说明 MaxCompute Connector可以作为Stream作业和Batch作业的源表使用。

什么是MaxCompute

大数据计算服务MaxCompute(原名ODPS)是一种快速、完全托管的EB级数据仓库解决方案,致力于批量结构化数据的存储和计算,提供海量数据仓库的解决方案及分析建模服务。MaxCompute详情请参见什么是MaxCompute

前提条件

已创建MaxCompute表,详情请参见创建表

使用限制

仅Flink计算引擎VVR 2.0.0及以上版本支持全量MaxCompute Connector。

注意事项

建议您使用Flink 1.12-VVR 3.0.4及以下版本的MaxCompute Connector,因为Flink 1.12-VVR 3.0.4以上版本的MaxCompute Connector可能会导致作业Failover。

DDL定义

create table odps_source(
  id INT,
  user_name VARCHAR,
  content VARCHAR
) with (
  'connector' = 'odps', 
  'endpoint' = '<yourEndpoint>',
  'tunnelEndpoint' = '<yourTunnelEndpoint>',
  'project' = '<yourProjectName>',
  'tablename' = '<yourTableName>',
  'accessid' = '<yourAccessKeyId>',
  'accesskey' = '<yourAccessKeySecret>',
  'partition' = 'ds=2018****'
);
说明 WITH参数需要全部小写。

WITH参数

参数 说明 是否必填 备注
connector 源表类型。 固定值为odps
endPoint MaxCompute服务地址。 请参见Endpoint
tunnelEndpoint MaxCompute Tunnel服务的连接地址。 请参见Endpoint
说明 VPC环境下为必填。
project MaxCompute项目名称。
tableName MaxCompute表名。
accessId AccessKey ID。
accessKey AccessKey Secret。
partition 分区名。
  • 只存在一级分区的MaxCompute表

    例如,如果只存在1个分区列ds,则`partition` = 'ds=20180905' 表示读ds=20180905分区的数据。

  • 存在多级分区的MaxCompute表

    例如,如果存在2个分区列dshh,则`partition`='ds=20180905,hh=*'表示读ds=20180905分区的数据。

    说明 分区过滤时需要声明所有分区的值。例如,上述示例中,只声明`partition` = 'ds=20180905',则不会读取任何分区。

类型映射

MaxCompute字段类型 Flink字段类型
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
BOOLEAN BOOLEAN
DATETIME TIMESTAMP
TIMESTAMP TIMESTAMP
VARCHAR VARCHAR
DECIMAL DECIMAL
BINARY VARBINARY
STRING VARCHAR

代码示例

CREATE TEMPORARY TABLE odps_source (
  cid varchar,
  rt DOUBLE
) with (
  'connector' = 'odps', 
  'endpoint' = '<yourEndpointName>', 
  'tunnelEndpoint' = '<yourTunnelEndpoint>',
  'project' = '<yourProjectName>',
  'tablename' = '<yourTableName>',
  'accessid' = '<yourAccessId>',
  'accesskey' = '<yourAccessPassword>',
  'partition' = 'ds=20180905'
);

CREATE TEMPORARY TABLE blackhole_sink (
  cid varchar,
  invoke_count BIGINT
) with (
  'connector'='blackhole'
);

INSERT INTO blackhole_sink 
SELECT 
   cid,
   count(*) as invoke_count
FROM odps_source GROUP BY cid;