本文为您介绍全量MaxCompute源表DDL定义、WITH参数、类型映射和常见问题。

说明 MaxCompute Connector可以作为Stream作业和Batch作业的源表使用。

什么是MaxCompute

大数据计算服务MaxCompute(原名ODPS)是一种快速、完全托管的EB级数据仓库解决方案,致力于批量结构化数据的存储和计算,提供海量数据仓库的解决方案及分析建模服务。MaxCompute详情请参见什么是MaxCompute

前提条件

已创建MaxCompute表,详情请参见创建表

使用限制

仅Flink计算引擎VVR 2.0.0及以上版本支持全量MaxCompute Connector。

注意事项

建议您使用Flink 1.12-VVR 3.0.4及以下版本的MaxCompute Connector,因为Flink 1.12-VVR 3.0.4以上版本的MaxCompute Connector可能会导致作业Failover。

DDL定义

create table odps_source(
  id INT,
  user_name VARCHAR,
  content VARCHAR
) with (
  'connector' = 'odps', 
  'endpoint' = '<yourEndpoint>',
  'tunnelEndpoint' = '<yourTunnelEndpoint>',
  'project' = '<yourProjectName>',
  'tablename' = '<yourTableName>',
  'accessid' = '<yourAccessKeyId>',
  'accesskey' = '<yourAccessKeySecret>',
  'partition' = 'ds=2018****'
);
说明
  • DDL定义的表字段顺序和类型必须与MaxCompute物理表保持一致,否则可能导致MaxCompute物理表中查询的数据为NULL或报错。
  • 因为MaxCompute产品侧会将建表后的字段名称全部自动转为小写,所以DDL定义的表字段名称需要全部为小写,才能和MaxCompute物理表大小写保持一致。如果DDL定义的字段名称有大写,在语法验证时,可能会报未识别的XXX字段的错。例如,org.apache.flink.table.api.ValidationException: SQL validation failed. Unknown column MobileCountryCode!

WITH参数

参数 说明 是否必填 备注
connector 源表类型。 固定值为odps
endpoint MaxCompute服务地址。 请参见Endpoint
tunnelEndpoint MaxCompute Tunnel服务的连接地址。 请参见Endpoint
说明 VPC环境下为必填。
project MaxCompute项目名称。
tablename MaxCompute表名。
accessid AccessKey ID。
accesskey AccessKey Secret。
partition 分区名。
  • 只存在一级分区的MaxCompute表

    例如,如果只存在1个分区列ds,则`partition` = 'ds=20180905' 表示读ds=20180905分区的数据。

  • 存在多级分区的MaxCompute表

    例如,如果存在2个分区列dshh,则`partition`='ds=20180905,hh=*'表示读ds=20180905分区的数据。

    说明 分区过滤时需要声明所有分区的值。例如,上述示例中,只声明`partition` = 'ds=20180905',则不会读取任何分区。
compressAlgorithm MaxCompute Tunnel使用的压缩算法。 参数取值如下:
  • RAW(无压缩)
  • ZLIB
  • SNAPPY

    SNAPPY相比ZLIB能带来明显的吞吐提升。在测试场景下,吞吐提升约50%。

说明
  • 仅实时计算引擎VVR 4.0.13及以上版本支持该参数。
  • VVR 4.0.13版本及以上版本,该参数默认值为ZLIB; VVR 6.0.1及以上版本,该参数默认值为SNAPPY。

类型映射

MaxCompute字段类型 Flink字段类型
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
BOOLEAN BOOLEAN
DATETIME TIMESTAMP
TIMESTAMP TIMESTAMP
VARCHAR VARCHAR
DECIMAL DECIMAL
BINARY VARBINARY
STRING VARCHAR

代码示例

CREATE TEMPORARY TABLE odps_source (
  cid varchar,
  rt DOUBLE
) with (
  'connector' = 'odps', 
  'endpoint' = '<yourEndpointName>', 
  'tunnelEndpoint' = '<yourTunnelEndpoint>',
  'project' = '<yourProjectName>',
  'tablename' = '<yourTableName>',
  'accessid' = '<yourAccessId>',
  'accesskey' = '<yourAccessPassword>',
  'partition' = 'ds=20180905'
);

CREATE TEMPORARY TABLE blackhole_sink (
  cid varchar,
  invoke_count BIGINT
) with (
  'connector'='blackhole'
);

INSERT INTO blackhole_sink 
SELECT 
   cid,
   count(*) as invoke_count
FROM odps_source GROUP BY cid;