本文为您介绍全量MaxCompute源表DDL定义、WITH参数、类型映射和常见问题。
说明 MaxCompute Connector可以作为Stream作业和Batch作业的源表使用。
什么是MaxCompute
大数据计算服务MaxCompute(原名ODPS)是一种快速、完全托管的EB级数据仓库解决方案,致力于批量结构化数据的存储和计算,提供海量数据仓库的解决方案及分析建模服务。MaxCompute详情请参见什么是MaxCompute。
前提条件
已创建MaxCompute表,详情请参见创建表。
使用限制
仅Flink计算引擎VVR 2.0.0及以上版本支持全量MaxCompute Connector。
注意事项
建议您使用Flink 1.12-VVR 3.0.4及以下版本的MaxCompute Connector,因为Flink 1.12-VVR 3.0.4以上版本的MaxCompute Connector可能会导致作业Failover。
DDL定义
create table odps_source(
id INT,
user_name VARCHAR,
content VARCHAR
) with (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'tunnelEndpoint' = '<yourTunnelEndpoint>',
'project' = '<yourProjectName>',
'tablename' = '<yourTableName>',
'accessid' = '<yourAccessKeyId>',
'accesskey' = '<yourAccessKeySecret>',
'partition' = 'ds=2018****'
);
说明
- DDL定义的表字段顺序和类型必须与MaxCompute物理表保持一致,否则可能导致MaxCompute物理表中查询的数据为NULL或报错。
- 因为MaxCompute产品侧会将建表后的字段名称全部自动转为小写,所以DDL定义的表字段名称需要全部为小写,才能和MaxCompute物理表大小写保持一致。如果DDL定义的字段名称有大写,在语法验证时,可能会报未识别的XXX字段的错。例如,
org.apache.flink.table.api.ValidationException: SQL validation failed. Unknown column MobileCountryCode!
。
WITH参数
参数 | 说明 | 是否必填 | 备注 |
---|---|---|---|
connector | 源表类型。 | 是 | 固定值为odps 。
|
endpoint | MaxCompute服务地址。 | 是 | 请参见Endpoint。 |
tunnelEndpoint | MaxCompute Tunnel服务的连接地址。 | 否 | 请参见Endpoint。 说明 VPC环境下为必填。
|
project | MaxCompute项目名称。 | 是 | 无 |
tablename | MaxCompute表名。 | 是 | 无 |
accessid | AccessKey ID。 | 是 | 无 |
accesskey | AccessKey Secret。 | 是 | 无 |
partition | 分区名。 | 否 |
|
compressAlgorithm | MaxCompute Tunnel使用的压缩算法。 | 否 | 参数取值如下:
说明
|
类型映射
MaxCompute字段类型 | Flink字段类型 |
---|---|
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
DATETIME | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
VARCHAR | VARCHAR |
DECIMAL | DECIMAL |
BINARY | VARBINARY |
STRING | VARCHAR |
代码示例
CREATE TEMPORARY TABLE odps_source (
cid varchar,
rt DOUBLE
) with (
'connector' = 'odps',
'endpoint' = '<yourEndpointName>',
'tunnelEndpoint' = '<yourTunnelEndpoint>',
'project' = '<yourProjectName>',
'tablename' = '<yourTableName>',
'accessid' = '<yourAccessId>',
'accesskey' = '<yourAccessPassword>',
'partition' = 'ds=20180905'
);
CREATE TEMPORARY TABLE blackhole_sink (
cid varchar,
invoke_count BIGINT
) with (
'connector'='blackhole'
);
INSERT INTO blackhole_sink
SELECT
cid,
count(*) as invoke_count
FROM odps_source GROUP BY cid;