本文为您介绍MaxCompute结果表的DDL定义、WITH参数、类型映射和代码示例。

说明 MaxCompute Connector可以作为Stream作业和Batch作业的源表使用。

什么是MaxCompute

大数据计算服务MaxCompute(原名ODPS)是一种快速、完全托管的EB级数据仓库解决方案,致力于批量结构化数据的存储和计算,提供海量数据仓库的解决方案及分析建模服务。MaxCompute详情请参见什么是MaxCompute

前提条件

已创建MaxCompute表,详情请参见创建表

使用限制

仅Flink计算引擎VVR 2.0.0及以上版本支持MaxCompute Connector。

注意事项

建议您使用Flink 1.12-VVR 3.0.4及以下版本的MaxCompute Connector,因为Flink 1.12-VVR 3.0.4以上版本的MaxCompute Connector可能会导致作业Failover。

实现原理

MaxCompute Sink可以分为以下两个阶段:
  1. 写入数据。调用MaxCompute SDK中的接口将数据写入缓冲区,在缓冲区大小超过64 MB或者每隔指定的时间间隔时,上传数据到MaxCompute的临时文件中。
  2. 提交会话。在任务进行Checkpoint时, MaxCompute Sink会调用Tunnel的Commit方法,提交会话,移动临时文件到MaxCompute表的数据目录,并修改元数据。
    说明 Commit方法不能提供原子性。因此,MaxCompute Sink提供的是At least Once方式,而不是Exactly Once方式。

DDL定义

Flink支持将MaxCompute作为结果输出,示例代码如下。
create table odps_sink(
  id INT,
  user_name VARCHAR,
  content VARCHAR
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpoint>',
  'tunnelEndpoint' = '<yourTunnelEndpoint>',
  'project' = '<yourProjectName>',
  'tablename' = '<yourTableName>',
  'accessid' = '<yourAccessKeyId>',
  'accesskey' = '<yourAccessKeySecret>',
  'partition' = 'ds=2018****'
);
说明 DDL定义中的字段名称、顺序和类型必须与MaxCompute物理表保持一致,否则可能导致MaxCompute物理表中查询的数据为/n

WITH参数

参数 说明 是否必填 备注
connector 结果表类型。 固定值为odps
endPoint MaxCompute服务地址。 请参见Endpoint
tunnelEndpoint MaxCompute Tunnel服务的连接地址。 请参见Endpoint
说明 VPC环境下为必填。
project MaxCompute项目名称。 无。
tableName 表名。 无。
accessId AccessKey ID。 无。
accessKey AccessKey Secret。 无。
partition 分区名。 如果存在分区表,则必填partition。填写partition需要注意以下两点:
  • 固定分区

    例如`partition` = 'ds=20180905'表示将数据写入分区ds= 20180905

  • 动态分区

    如果未明文显示分区值,则根据写入数据中分区列具体值,写入对应分区。例如`partition`='ds'表示根据ds字段值写入对应分区。

    如果要创建多级动态分区,Partition中多个字段顺序必须和MaxCompute物理表保持一致,各个分区字段之间使用逗号(,)分割。

    说明
    • 动态分区列需要显式写在建表语句中。
    • 在动态分区字段为空时,如果数据源中ds=null或者ds='',则输出结果为ds=NULL的分区。
flushIntervalMs MaxCompute Tunnel Writer缓冲区Flush间隔。

MaxCompute Sink写入记录时,先将数据存储到MaxCompute的缓冲区中,等缓冲区溢出或者每隔一段时间(flushIntervalMs),再把缓冲区里的数据写到目标 MaxCompute表。

单位为毫秒,默认值为30000(30秒)。
说明 本参数可以与batchSize一同使用,满足任一条件即会Flush数据。
batchSize MaxCompute Tunnel Writer缓冲区Flush的大小。

MaxCompute Sink写入记录时,先将数据存储到MaxCompute的缓冲区中,等缓冲区达到一定大小(batchSize),再把缓冲区里的数据写到目标MaxCompute表。

单位为字节,默认值为67108864(64 MB)。
说明
  • 仅实时计算引擎VVR 4.0.14及以上版本支持该参数。
  • 本参数可以与flushIntervalMs一同使用,满足任一条件即会Flush数据。
numFlushThreads MaxCompute Tunnel Writer缓冲区Flush的线程数。

每个MaxCompute Sink并发将创建numFlushThreads个线程用于flush数据。当该值大于1时,将允许不同分区的数据并发Flush,提升Flush的效率。

默认值为1。
说明 仅实时计算引擎VVR 4.0.14及以上版本支持该参数。
dynamicPartitionLimit 分区数目最大值。 默认值为100,系统会把已写入的分区和TunnelBufferedWriter的映射关系维护到一个Map里,如果该Map大小超过了dynamicPartitionLimit设定值,则会出现Too many dynamic partitions: 100, which exceeds the size limit: 100报错。
compressAlgorithm MaxCompute Tunnel使用的压缩算法。 参数取值如下:
  • RAW(无压缩)
  • ZLIB
  • SNAPPY

    相比ZLIB,SNAPPY能带来明显的吞吐提升。在测试场景下,吞吐提升约50%。

说明
  • 仅实时计算引擎VVR 4.0.13及以上版本支持该参数。
  • VVR 4.0.13版本及以上版本,该参数默认值为ZLIB; VVR 6.0.1及以上版本,该参数默认值为SNAPPY。
useStreamTunnel 是否使用MaxCompute Stream Tunnel上传数据。
参数取值如下:
  • true:使用MaxCompute Stream Tunnel上传数据。
  • false(默认值):使用MaxCompute Batch Tunnel上传数据。

    对于使用MaxCompute Batch Tunnel的作业,在Checkpoint进行的很慢甚至超时,且确认下游可以接受重复数据时,可以考虑使用MaxCompute Stream Tunnel。

说明 仅实时计算引擎VVR 4.0.13及以上版本支持该参数。如何选择数据通道详情请参见Batch Tunnel与Streaming Tunnel的选择

Batch Tunnel与Streaming Tunnel的选择

Batch Tunnel与Streaming Tunnel是MaxCompute提供的两种数据通道,根据您对一致性与运行效率的需求不同,您可以选择不同的数据通道。
需求 Batch Tunnel Streaming Tunnel
一致性 相比Streaming Tunnel,在绝大多数情况下都能将数据不多不少地写入MaxCompute表,保证数据不少(At Least Once语义)。

只有当Checkpoint过程中出现异常,且作业同时写入多个分区时,才有可能在一部分分区中产生重复数据。

保证数据不少(At Least Once语义),当作业在任意情况下出现异常时,都有可能产生重复数据。
运行效率 由于需要在Checkpoint过程中Commit数据以及需要在服务端创建文件等操作,整体效率低于Streaming Tunnel。 无需在Checkpoint过程中Commit数据。

如果使用了Streaming Tunnel,同时设置numFlushThreads值大于1,在Flush数据的过程中也能不间断地接受接收上游数据,整体效率高于Batch Tunnel。

类型映射

MaxCompute字段类型 Flink字段类型
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
BOOLEAN BOOLEAN
DATETIME TIMESTAMP
TIMESTAMP TIMESTAMP
VARCHAR VARCHAR
STRING VARCHAR
BINARY VARBINARY
说明 仅实时计算引擎VVR 6.0.1及以上版本支持VARBINARY。
DECIMAL DECIMAL

代码示例

  • 写入固定分区
    CREATE TEMPORARY TABLE datagen_source (
      id INT,
      len INT,
      content VARCHAR
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE odps_sink (
      id INT,
      len INT,
      content VARCHAR
    ) WITH (
      'connector' = 'odps',
      'endpoint' = '<yourEndpoint>',
      'tunnelEndpoint' = '<yourTunnelEndpoint>',
      'project' = '<yourProjectName>',
      'tablename' = '<yourTableName>',
      'accessid' = '<yourAccessKeyId>',
      'accesskey' = '<yourAccessKeySecret>',
      'partition' = 'ds=20180905'
    );
    
    INSERT INTO odps_sink 
    SELECT 
      id, len, content 
    FROM datagen_source;
  • 写入动态分区
    CREATE TEMPORARY TABLE datagen_source (
      id INT,
      len INT,
      content VARCHAR,
      c TIMESTAMP 
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE odps_sink (
      id  INT,
      len INT,
      content VARCHAR,
      ds VARCHAR --动态分区列需要显式声明。
    ) WITH (
      'connector' = 'odps',
      'endpoint' = '<yourEndpoint>',
      'tunnelEndpoint' = '<yourTunnelEndpoint>',
      'project' = '<yourProjectName>',
      'tablename' = '<yourTableName>',
      'accessid' = '<yourAccessKeyId>',
      'accesskey' = '<yourAccessKeySecret>',
      'partition' ='ds' --不写分区的值,表示根据ds字段的值写入不同分区。
    );
    
    INSERT INTO odps_sink 
    SELECT 
       id, 
       len, 
       content,
       DATE_FORMAT(c, 'yyMMdd') as ds
    FROM datagen_source;

常见问题

报错:ErrorMessage=Authorization Failed [4019], You have NO privilege'ODPS:***'