本文为您介绍增量MaxCompute源表DDL定义、WITH参数、类型映射和常见问题。
什么是MaxCompute
大数据计算服务MaxCompute(原名ODPS)是一种快速、完全托管的EB级数据仓库解决方案,致力于批量结构化数据的存储和计算,提供海量数据仓库的解决方案及分析建模服务。MaxCompute详情请参见什么是MaxCompute。
前提条件
已创建MaxCompute表,详情请参见创建表。
使用限制
- 仅Flink计算引擎VVR 2.1.2及以上版本支持增量MaxCompute Connector。
- 增量MaxCompute源表不支持作为维表使用。
- 增量MaxCompute源表只支持MaxCompute分区表,不支持非分区表。
注意事项
建议您使用Flink 1.12-VVR 3.0.4及以下版本的MaxCompute Connector,因为Flink 1.12-VVR 3.0.4以上版本的MaxCompute Connector可能会导致作业Failover。
DDL定义
CREATE TABLE odps_source(
id INT,
user_name VARCHAR,
content VARCHAR
) WITH (
'connector' = 'continuous-odps',
'endpoint' = '<yourEndpoint>',
'tunnelEndpoint' = '<yourTunnelEndpoint>',
'project' = '<yourProjectName>',
'tablename' = '<yourTableName>',
'accessid' = '<yourAccessKeyId>',
'accesskey' = '<yourAccessKeySecret>',
'startpartition' = 'ds=2018****'
);
说明
- DDL定义的表字段顺序和类型必须与MaxCompute物理表保持一致,否则可能导致MaxCompute物理表中查询的数据为NULL或报错。
- 因为MaxCompute产品侧会将建表后的字段名称全部自动转为小写,所以DDL定义的表字段名称需要全部为小写,才能和MaxCompute物理表大小写保持一致。如果DDL定义的字段名称有大写,在语法验证时,可能会报未识别的XXX字段的错。例如,
org.apache.flink.table.api.ValidationException: SQL validation failed. Unknown column MobileCountryCode!
。
WITH参数
参数 | 说明 | 是否必填 | 备注 |
---|---|---|---|
connector | 源表类型。 | 是 | 固定值为continuous-odps 。
|
endPoint | MaxCompute服务本身的连接地址。 | 是 | 请参见Endpoint。 |
tunnelEndpoint | MaxCompute Tunnel服务的连接地址。 | 是 | 请参见Endpoint。 说明 VPC环境下为必填。
|
project | 表所属的project名称。 | 是 | 无。 |
tableName | 表名。 | 是 | 无。 |
accessId | AccessKey ID。 | 是 | 无。 |
accessKey | AccessKey Secret。 | 是 | 无。 |
startPartition | 指定读取的起始分区。系统加载分区列表时,会把每个分区列表的所有分区和startPartition按照字母顺序进行比较,加载满足条件的分区的数据。
此外,增量MaxCompute源表可以持续监听增量MaxCompute分区表。读完已有的分区后,任务不会退出,且持续监听并读入新分区。
说明
|
是 | 例如,指定startPartition是ds=20191201,表示加载增量MaxCompute表里所有满足ds >= 20191201的分区数据。
如果一个增量MaxCompute分区表,有一级分区ds和二级分区type两个分区列,假设表里有以下5个分区:
|
compressAlgorithm | MaxCompute Tunnel使用的压缩算法。 | 否 | 参数取值如下:
说明
|
类型映射
MaxCompute字段类型 | Flink字段类型 |
---|---|
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
DATETIME | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
DECIMAL | DECIMAL |
BINARY | VARBINARY |
STRING | VARCHAR |
重要
- 增量MaxCompute源表暂不支持CHAR、VARCHAR、ARRAY、MAP和STRUCT数据类型。
- 您可以临时使用STRING替换VARCHAR。
代码示例
增量MaxCompute源表每天产生一个分区,分区列是ds,从ds=20191201分区开始,加载后续的所有分区,并一直监听新分区的产生。
--读增量MaxCompute表,读取的分区范围是[ds=20191201,∞)。
CREATE TEMPORARY TABLE odps_source (
cid VARCHAR,
rt DOUBLE
) WITH (
'connector' = 'continuous-odps',
'endpoint' = '<yourEndpoint>',
'tunnelEndpoint' = '<yourTunnelEndpoint>',
'project' = '<yourProjectName>',
'tablename' = '<yourTableName>',
'accessid' = '<yourAccessKeyId>',
'accesskey' = '<yourAccessKeySecret>',
'startpartition' = 'ds=20191201'
);
CREATE TEMPORARY TABLE blackhole_sink (
cid VARCHAR,
rt DOUBLE
) WITH (
'connector'='blackhole'
);
INSERT INTO blackhole_sink
SELECT
cid, rt FROM odps_source;