本文为您介绍如何使用大数据计算服务MaxCompute连接器。
背景信息
大数据计算服务MaxCompute(原名ODPS)是一种快速、完全托管的EB级数据仓库解决方案,致力于批量结构化数据的存储和计算,提供海量数据仓库的解决方案及分析建模服务。MaxCompute详情请参见什么是MaxCompute。
MaxCompute连接器支持的信息如下。
类别 | 详情 |
支持类型 | 源表、维表和结果表 |
运行模式 | 流模式和批模式 |
数据格式 | 暂不支持 |
特有监控指标 |
说明 指标的含义及如何查看监控指标,请参见查看监。 |
API种类 | Datastream和SQL |
是否支持更新或删除结果表数据 | 不支持更新和删除结果表数据,只支持插入数据。 |
前提条件
已创建MaxCompute表,详情请参见创建表。
使用限制
仅实时计算引擎VVR 2.0.0及以上版本支持MaxCompute连接器。
MaxCompute连接器仅支持At Least Once语义。
说明At Least Once语义会保证数据不缺失,但在少部分情况下,可能会将重复数据写入MaxCompute。不同的MaxCompute Tunnel出现重复数据的情况不同,MaxCompute Tunnel详情请参见如何选择数据通道?。
默认情况下源表为全量模式,仅会读取partition参数中指定的分区,在读完所有数据后结束运行,状态转换为finished,不会监控是否有新分区产生。
如果您需要持续监控新分区,请通过WITH参数中指定startPartition使用增量源表模式。
说明维表每次更新时都会检查最新分区,不受这一限制。
在源表开始运行后,向分区里添加的新数据不会被读取,请在分区数据完整的情况下运行作业。
语法结构
CREATE TABLE odps_source(
id INT,
user_name VARCHAR,
content VARCHAR
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'tunnelEndpoint' = '<yourTunnelEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '<yourAccessKeyId>',
'accessKey' = '<yourAccessKeySecret>',
'partition' = 'ds=2018****'
);
WITH参数
通用
参数
说明
数据类型
是否必填
默认值
备注
connector
表类型。
String
是
无
固定值为odps。
endpoint
MaxCompute服务地址。
String
是
无
请参见Endpoint。
tunnelEndpoint
MaxCompute Tunnel服务的连接地址。
String
否
无
请参见Endpoint。
说明VPC环境下为必填。
如果未填写,MaxCompute会根据内部的负载均衡服务分配Tunnel的连接。
project
MaxCompute项目名称。
String
是
无
无。
tableName
MaxCompute表名。
String
是
无
无。
accessId
MaxCompute AccessKey ID。
String
是
无
无。
accessKey
MaxCompute AccessKey Secret。
String
是
无
无。
partition
MaxCompute分区名。
String
否
无
对于非分区表和增量源表无需填写。
说明分区表详情请参见在读取或写入分区时,如何填写Partition参数?。
compressAlgorithm
MaxCompute Tunnel使用的压缩算法。
String
否
VVR 4.0.13及以上版本:ZLIB
VVR 6.0.1及以上版本:SNAPPY
参数取值如下:
RAW(无压缩)
ZLIB
SNAPPY
SNAPPY相比ZLIB能带来明显的吞吐提升。在测试场景下,吞吐提升约50%。
说明仅实时计算引擎VVR 4.0.13及以上版本支持该参数。
源表独有
参数
说明
数据类型
是否必填
默认值
备注
maxPartitionCount
可以读取的最大分区数量。
Integer
否
100
如果读取的分区数量超过了该参数,则会出现报错
The number of matched partitions exceeds the default limit
。重要由于一次性读取大量分区会给MaxCompute服务带来一定压力,同时也会让作业启动速度变慢,因此您需要确认是否需要读取这么多分区(而不是误填partition参数)。如果确实需要,需要手动调大maxPartitionCount参数。
增量源表独有
增量源表通过间歇轮询MaxCompute服务器获取所有的分区信息来发现新增的分区,读取新分区时要求分区内数据已写入完毕,详情参见增量MaxCompute源表监听到新分区时,如果该分区还有数据没有写完,如何处理?。通过startPartition可以指定起始点位,但注意只读取字典序大于等于起始点位的分区,例如分区
year=2023,month=10
字典序小于分区year=2023,month=9
,对于这种类型的分区声明可以通过加0补齐的方式来保证字典序正确,例如year=2023,month=09
。参数
说明
数据类型
是否必填
默认值
备注
startPartition
增量读取的起始MaxCompute分区点位(包含)。
String
是
无
使用该参数后启用增量源表模式,将忽略partition参数。
多级分区必须按分区级别从大到小声明每个分区列的值。
说明startPartition参数详情,请参见如何填写增量MaxCompute的startPartition参数?。
subscribeIntervalInSec
轮询MaxCompute获取分区列表的时间间隔。
Integer
否
30
单位为秒。
结果表独有
参数
说明
数据类型
是否必填
默认值
备注
useStreamTunnel
是否使用MaxCompute Stream Tunnel上传数据。
Boolean
否
false
参数取值如下:
true:使用MaxCompute Stream Tunnel上传数据。
false:使用MaxCompute Batch Tunnel上传数据。
说明仅实时计算引擎VVR 4.0.13及以上版本支持该参数。
数据通道选择详情请参见如何选择数据通道?。
flushIntervalMs
MaxCompute Tunnel Writer缓冲区flush间隔。
Long
否
30000(30秒)
MaxCompute Sink写入记录时,先将数据存储到MaxCompute的缓冲区中,等缓冲区溢出或者每隔一段时间(flushIntervalMs),再把缓冲区里的数据写到目标MaxCompute表。
对于Stream Tunnel,flush的数据立即可见;对于Batch Tunnel,数据flush后仍需要等待checkpoint完成后才可见,建议设置该参数为0来关闭定时flush。
单位为毫秒。
说明本参数可以与batchSize一同使用,满足任一条件即会Flush数据。
batchSize
MaxCompute Tunnel Writer缓冲区flush的大小。
Long
否
67108864(64 MB)
MaxCompute Sink写入记录时,先将数据存储到MaxCompute的缓冲区中,等缓冲区达到一定大小(batchSize),再把缓冲区里的数据写到目标MaxCompute表。
单位为字节。
说明仅实时计算引擎VVR 4.0.14及以上版本支持该参数。
本参数可以与flushIntervalMs一同使用,满足任一条件即会Flush数据。
numFlushThreads
MaxCompute Tunnel Writer缓冲区flush的线程数。
Integer
否
1
每个MaxCompute Sink并发将创建numFlushThreads个线程用于flush数据。当该值大于1时,将允许不同分区的数据并发Flush,提升Flush的效率。
说明仅实时计算引擎VVR 4.0.14及以上版本支持该参数。
dynamicPartitionLimit
写入动态分区的最大数量。
Integer
否
100
当结果表在两次Checkpoint之间写入的动态分区数量超过了dynamicPartitionLimit,则会出现报错
Too many dynamic partitions
。重要由于一次性写入大量分区会给MaxCompute服务带来一定压力,同时也会导致结果表flush和作业Checkpoint变慢。因此当报错出现时,您需要确认是否需要写入这么多分区。如果确实需要,需要手动调大dynamicPartitionLimit参数。
retryTimes
向MaxCompute服务器请求最大重试次数。
Integer
否
3
创建session、提交session、flush数据时可能存在短暂的MaxCompute服务不可用时,会根据该配置进行重试。
sleepMillis
重试间隔时间。
Integer
否
1000
单位为毫秒。
维表独有
MaxCompute维表在作业启动时从指定的分区拉取全量数据,partition参数支持使用max_pt()等函数。当缓存过期重新加载时会重新解析partition参数拉取最新的分区,使用max_two_pt()时维表可拉取两个分区,其他情况下只支持指定单个分区。
参数
说明
数据类型
是否必填
默认值
备注
cache
缓存策略。
String
是
无
目前MaxCompute维表仅支持
ALL
策略,必须显式声明。 适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。ALL策略:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查询都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。
说明因为系统会异步加载维表数据,所以在使用CACHE ALL时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的至少4倍,具体值与MaxCompute存储压缩算法有关。
如果MaxCompute维表数据量较大,可以考虑使用SHUFFLE_HASH注解将维表数据均匀分散到各个并发中。详情请参见如何使用维表SHUFFLE_HASH注解?。
在使用超大MaxCompute维表时,如果JVM频繁GC导致作业异常,且在增加维表JOIN节点的内存仍无改善的情况下,建议改为支持LRU Cache策略的KV型维表,例如云数据库Hbase版维表。
cacheSize
最多缓存的数据条数。
Long
否
100000
如果维表数据量超过了cacheSize,则会出现报错
Row count of table <table-name> partition <partition-name> exceeds maxRowCount limit
。重要由于维表数据量太大会占用大量JVM堆内存,同时也会让作业启动和维表更新变慢,因此您需要确认是否需要缓存这么多数据,如果确实需要,需要手动调大该参数。
cacheTTLMs
缓存超时时间,也就是缓存更新的间隔时间。
Long
否
Long.MAX_VALUE(相当于永不更新)
单位为毫秒。
cacheReloadTimeBlackList
更新时间黑名单。在该参数规定的时间段内不会更新缓存。
String
否
无
用于防止缓存在关键时间段(例如活动流量峰值期间)更新导致作业不稳定。填写方式详情请参见如何填写CacheReloadTimeBlackList参数?。
maxLoadRetries
缓存更新时(包含作业启动时初次拉取数据)最多尝试次数,超过该次数后作业运行失败。
Integer
否
10
无。
类型映射
MaxCompute字段类型 | Flink字段类型 |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
DATETIME | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
VARCHAR | VARCHAR |
DECIMAL | DECIMAL |
BINARY | VARBINARY |
STRING | VARCHAR |
使用示例
SQL
源表示例
全量读取
CREATE TEMPORARY TABLE odps_source ( cid VARCHAR, rt DOUBLE ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpointName>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '<yourAccessId>', 'accessKey' = '<yourAccessPassword>', 'partition' = 'ds=201809*' ); CREATE TEMPORARY TABLE blackhole_sink ( cid VARCHAR, invoke_count BIGINT ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT cid, COUNT(*) AS invoke_count FROM odps_source GROUP BY cid;
增量读取
CREATE TEMPORARY TABLE odps_source ( cid VARCHAR, rt DOUBLE ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpointName>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '<yourAccessId>', 'accessKey' = '<yourAccessPassword>', 'startPartition' = 'yyyy=2018,MM=09,dd=05' -- 从20180905对应分区开始读取 ); CREATE TEMPORARY TABLE blackhole_sink ( cid VARCHAR, invoke_count BIGINT ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT cid, COUNT(*) AS invoke_count FROM odps_source GROUP BY cid;
结果表示例
写入固定分区
CREATE TEMPORARY TABLE datagen_source ( id INT, len INT, content VARCHAR ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE odps_sink ( id INT, len INT, content VARCHAR ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpoint>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '<yourAccessKeyId>', 'accessKey' = '<yourAccessKeySecret>', 'partition' = 'ds=20180905' -- 写入固定分区ds=20180905。 ); INSERT INTO odps_sink SELECT id, len, content FROM datagen_source;
写入动态分区
CREATE TEMPORARY TABLE datagen_source ( id INT, len INT, content VARCHAR, c TIMESTAMP ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE odps_sink ( id INT, len INT, content VARCHAR, ds VARCHAR --需要显式声明动态分区列。 ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpoint>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '<yourAccessKeyId>', 'accessKey' = '<yourAccessKeySecret>', 'partition' = 'ds' --不写分区的值,表示根据ds字段的值写入不同分区。 ); INSERT INTO odps_sink SELECT id, len, content, DATE_FORMAT(c, 'yyMMdd') as ds FROM datagen_source;
维表示例
一对一维表
CREATE TEMPORARY TABLE datagen_source ( k INT, v VARCHAR ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE odps_dim ( k INT, v VARCHAR, PRIMARY KEY (k) NOT ENFORCED -- 一对一维表需要声明主键。 ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpoint>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '<yourAccessKeyId>', 'accessKey' = '<yourAccessKeySecret>', 'partition' = 'ds=20180905', 'cache' = 'ALL' ); CREATE TEMPORARY TABLE blackhole_sink ( k VARCHAR, v1 VARCHAR, v2 VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT k, s.v, d.v FROM datagen_source AS s INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;
一对多维表
CREATE TEMPORARY TABLE datagen_source ( k INT, v VARCHAR ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE odps_dim ( k INT, v VARCHAR -- 一对多维表无需声明主键。 ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpoint>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '<yourAccessKeyId>', 'accessKey' = '<yourAccessKeySecret>', 'partition' = 'ds=20180905', 'cache' = 'ALL' ); CREATE TEMPORARY TABLE blackhole_sink ( k VARCHAR, v1 VARCHAR, v2 VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT k, s.v, d.v FROM datagen_source AS s INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;
DataStream
通过DataStream的方式读写数据时,则需要使用对应的DataStream连接器连接Flink全托管,DataStream连接器设置方法请参见DataStream连接器使用方法。Maven中央库中已经放置了MaxCompute DataStream连接器。
为了保护知识产权,从实时计算引擎VVR6.0.6版本起,此连接器在本地调试单次运行作业的时间为5分钟,5分钟后作业会报错并退出。
MaxCompute连接器的Maven依赖包含了构建全量源表、增量源表、结果表和维表的所需要的类。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-odps</artifactId>
<version>${connector.version}</version>
</dependency>
在DataStream中使用MaxCompute连接器推荐使用SQL声明MaxCompute表,通过Table/DataStream相互转换来连接MaxCompute表和数据流。
连接源表
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); tEnv.executeSql(String.join( "\n", "CREATE TEMPORARY TABLE IF NOT EXISTS odps_source (", " cid VARCHAR,", " rt DOUBLE", ") WITH (", " 'connector' = 'odps',", " 'endpoint' = '<yourEndpointName>',", " 'project' = '<yourProjectName>',", " 'accessId' = '<yourAccessId>',", " 'accessKey' = '<yourAccessPassword>',", " 'partition' = 'ds=201809*'", ")"); DataStream<Row> source = tEnv.toDataStream(tEnv.from("odps_source")); source.print(); env.execute("odps source");
连接结果表
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); tEnv.executeSql(String.join( "\n", "CREATE TEMPORARY TABLE IF NOT EXISTS odps_sink (", " cid VARCHAR,", " rt DOUBLE", ") WITH (", " 'connector' = 'odps',", " 'endpoint' = '<yourEndpointName>',", " 'project' = '<yourProjectName>',", " 'accessId' = '<yourAccessId>',", " 'accessKey' = '<yourAccessPassword>',", " 'partition' = 'ds=20180905'", ")"); DataStream<Row> data = env.fromElements( Row.of("id0", 3.), Row.of("id1", 4.)); tEnv.fromDataStream(data).insertInto("odps_sink"); env.execute("odps sink");