本文为您介绍如何创建全量MaxCompute源表,以及创建源表时使用的WITH参数和类型映射。
- 本文仅适用于Blink 2.2.7及以上版本。
- 与数据总线DataHub、Kafka等数据源不同,全量MaxCompute源表通常作为有限流表使用。Blink 3.4.4版本临时支持将全量MaxCompute源表作为无限流表使用,实现不间断监听并读取新增分区的功能。Blink 3.5.0版本废弃该功能,如果您需要使用无限流MaxCompute源表,请参见创建增量MaxCompute源表。
DDL定义
create table odps_source(
id INT,
user_name VARCHAR,
content VARCHAR
) with (
type = 'odps',
endPoint = 'http://service.cn.maxcompute.aliyun-inc.com/api',
project = '<projectName>',
tableName = '<tableName>',
accessId = '<yourAccessKeyId>',
accessKey = '<yourAccessKeySecret>',
`partition` = 'ds=2018****' --如果您的MaxCompute源表为非分区表,不声明该参数即可。
);
WITH参数
参数 | 说明 | 是否必填 | 备注 |
---|---|---|---|
endPoint | MaxCompute服务地址。 | 是 | 请参见Endpoint。 |
tunnelEndpoint | MaxCompute Tunnel服务的连接地址。 | 否 | 请参见Endpoint。 说明 VPC环境下为必填。
|
project | MaxCompute项目名称。 | 是 | 无。 |
tableName | MaxCompute表名。 | 是 | 无。 |
accessId | AccessKey ID。 | 是 | 无。 |
accessKey | AccessKey Secret。 | 是 | 无。 |
partition | 分区名。 | 否 |
|
subscribeNewPartition | 是否监听符合条件的新分区。 | 否 | 默认值为false,不监听新产生的分区。
说明
|
subscribeIntervalInSec | 监听新分区的间隔。 | 否 | 默认值为30,单位为秒。
说明 监听间隔设置太小,会对全量MaxCompute MetaData服务造成压力,有可能导致监听服务失败。
|
maxPartitionCount | 未设置Partition参数时,读取当前分区表的分区个数。 | 否 | 默认值为100。
说明 仅Blink 3.0及以上版本支持该参数。
|
类型映射
全量MaxCompute字段类型 | 实时计算Flink版字段类型 |
---|---|
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
DATETIME | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
VARCHAR | VARCHAR |
DECIMAL | DECIMAL |
BINARY | VARBINARY |
STRING | VARCHAR |
代码示例
CREATE TABLE odps_source (
cid varchar,
rt DOUBLE,
) with (
type = 'odps',
endPoint = '<yourEndpointName>',
project = '<yourProjectName>',
tableName = '<yourTableName>',
accessId = '<yourAccessId>',
accessKey = '<yourAccessPassword>',
partition = 'ds=20190712'
);
CREATE TABLE test (
cid varchar,
invoke_count BIGINT
) with (
type='print'
);
INSERT INTO test
SELECT
cid,
count(*) as invoke_count
FROM odps_source GROUP BY cid;
常见问题
- Q:DDL中endPoint和tunnelEndpoint参数配置错误,有什么影响?
A:endPoint和tunnelEndpoint参数介绍请参见各地域Endpoint对照表(外网连接方式)。参数配置错误会导致以下问题:
- endPoint配置错误:任务上线进度停滞在91%处。
- tunnelEndpoint配置错误:任务运行失败。
- Q:全量MaxCompute数据存储底层是如何读取全量MaxCompute源表的?
A:全量MaxCompute数据存储通过Tunnel读取全量MaxCompute数据的,读取速度及带宽受全量MaxCompute Tunnel带宽限制。
- Q:实时计算Flink版作业启动后,全量MaxCompute源表是否能读取新写入到全量MaxCompute表或者全量MaxCompute分区的数据?
A:全量MaxCompute数据存储使用Tunnel读取表数据或者分区数据。实时计算Flink版作业启动后,读取完成Reader后,不再读取新写入全量MaxCompute表和分区的数据。
- Q:实时计算Flink版作业启动后,全量MaxCompute源表如何能够读取到全量MaxCompute表或者全量MaxCompute分区的数据?
A:实时计算Flink版 3.4及以上版本支持subscribeNewPartition参数,控制是否需要监听新产生的分区。新产生的数据可以写入到新的分区,示例代码如下。
CREATE TABLE blink_source ( cid varchar, rt DOUBLE, ) with ( type = 'odps', endPoint = '<yourEndpointName>', project = '<yourProjectName>', tableName = '<yourTableName>',_table_name', subscribeNewPartition = 'true' --请注意:如果需要监听新的分区,则不能指定partition值。 accessId = '<yourAccessKeyId>', accessKey = '<yourAccessKeySecret>', ); CREATE TABLE test ( cid varchar, invoke_count BIGINT ) with ( type='print' ); INSERT INTO test SELECT cid, count(*) as invoke_count FROM blink_source GROUP BY cid;
说明 全量MaxCompute表新分区产生的数据需要写入实时计算Flink版全量MaxCompute源表的新分区。如果写入现有分区,则不生效。 - Q:WITH参数中partition值是否可以使用
max_pt()
或者max_pt_with_done()
?A:不推荐使用。如果需要使用,请务必了解在没有开启监听新分区和开启监听新分区的场景下max_pt()
在全量MaxCompute源表的作用:- 未开启监听新分区
任务启动后,会通过全量MaxCompute MetaData服务获取当前全量MaxCompute 源表所有分区列表,从中读取
max_pt()
。读取结束后,Reader退出。不再读取该分区中的新数据或者监听新分区。 - 开启监听新分区
任务启动后,会通过全量MaxCompute MetaData服务获取当前全量MaxCompute源表所有分区列表,从中读取
max_pt()
。读取结束后,不再读取该分区中的新数据。间隔一定周期(参见subscribeIntervalInSec参数)监听是否有新分区产生。如果有新分区产生,读取新产生的分区列表,从中获取max_pt()
,并读取。读取完成后还会等待下次监听事件。如果没有新分区产生,等待下次监听事件。
- 未开启监听新分区
- Q:引用全量MaxCompute作为数据源,在作业启动后,向已有的分区或者表里追加数据,这些新数据是否能被读取?
A:数据不会被读取,而且可能导致作业失败。全量MaxCompute数据存储使用
ODPS DOWNLOAD SESSION
读取表数据或者分区数据。新建DOWNLOAD SESSION
,服务端会创建一个Index文件,相当于创建DOWNLOAD SESSION
那一瞬间数据的映射,后续的数据读取以这个映射为基础。因此在新建DOWNLOAD SESSION
后,追加到全量MaxCompute表或者分区里的数据,正常流程下是不会被读取的。分为两种情况:- Tunnel在读取数据的过程中,写入新数据会产生报错
ErrorCode=TableModified,ErrorMessage=The specified table has been modified since the download initiated.
。 - Tunnel已经关闭,写入新数据。这些数据不会被读取的,因为Tunnel已经关闭。而且一旦作业发生Failover或者暂停恢复作业后,不能保证数据正确性,例如,已经读过的数据可能会重读,新追加的数据可能读不全。
- Tunnel在读取数据的过程中,写入新数据会产生报错
- Q:全量MaxCompute源表作业是否支持暂停、恢复和修改源表的并发度?
A:全量MaxCompute源表作业不支持暂停、恢复和修改源表并发度。系统根据并发度,计算每个并发应该读哪些分区的哪一段内容,并且每个并发会把自己消费的情况记录到State里。以便暂停恢复后或者Failover后,可以从上次读取的位置继续读取。这个逻辑是建立在并发度确定的前提下的。如果修改了全量MaxCompute源表的并发度后进行暂停恢复操作,对作业产生的影响是无法预估的,因为已经读取的数据可能会再次读取,没有读的数据反而被遗漏。
- Q:作业启动位点设置了
2019-10-11 00:00:00
, 为什么启动位点前的分区也会被读取?A:设置启动位点只对消息队列类型的数据源有效 (例如DataHub),对全量MaxCompute源表无效。实时计算Flink版作业启动后数据读取的范围如下:- 分区表:读取当前所有分区。
- 非分区表:读取当前存在的数据。
- Q:作业运行过程中报错ErrorMessage=Authorization Failed [4019], You have NO privilege'ODPS:***'
A:该报错是因为MaxCompute DDL定义中填写的用户身份信息无法访问MaxCompute。因此,您需要通过阿里云账号、RAM用户账号或RAM角色认证用户身份,详情请参见用户认证。