本文介绍如何创建使用MaxCompute Sink Connector,您可以通过MaxCompute Sink Connector将数据从云消息队列 Kafka 版实例的数据源Topic导出至MaxCompute的表中。

前提条件

详细步骤,请参见创建前提

注意事项

如需使用MaxCompute分区功能,创建表时需额外创建一个分区列,列名为time,类型为STRING。

步骤一:创建目标资源

通过MaxCompute客户端创建表。更多信息,请参见创建表
本文以名称为kafka_to_maxcompute的表为例。表中有3列数据,并使用分区功能。该表的建表语句如下:
CREATE TABLE IF NOT EXISTS kafka_to_maxcompute(topic STRING,valueName STRING,valueAge BIGINT) PARTITIONED by (time STRING);
如不使用分区功能,语句如下:
CREATE TABLE IF NOT EXISTS kafka_to_maxcompute(topic STRING,valueName STRING,valueAge BIGINT);
执行成功后,如下图所示:执行成果
表管理页面,查看创建的表信息。表

步骤二:创建MaxCompute Sink Connector并启动

  1. 登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。
  2. 在左侧导航栏,选择Connector生态集成 > 消息流出(Sink)
  3. 消息流出(Sink)页面,单击创建任务
  4. 消息流出创建面板,配置以下参数,单击确定
    1. 基础信息区域,设置任务名称,将流出类型选择为大数据计算服务MaxCompute
    2. 资源配置区域,设置以下参数。
      表 1. 源(云消息队列 Kafka 版
      参数说明示例
      地域源Kafka实例所在的地域。华东1(杭州)
      kafka实例数据源所在的Kafka实例ID。alikafka_post-cn-9hdsbdhd****
      Topic数据源所在的Kafka实例Topic。guide-sink-topic
      Group ID数据源所在的Kafka实例中的Group ID。
      • 快速创建:自动创建以GID_EVENTBRIDGE_xxx命名的Group ID。
      • 使用已有:选择已创建的Group,请选择独立的Group ID,不要和已有的业务混用,以免影响已有的消息收发。
      使用已有
      并发配额(消费者数)消费Topic数据的并发线程数,线程和Topic分区的对应关系如下:
      • Topic分区数=并发消费数:一个线程消费一个Topic分区。建议使用。
      • Topic分区数>并发消费数:多个并发消费会均摊所有分区消费。
      • Topic分区数<并发消费数:一个线程消费一个Topic分区,多出的消费数无效。
      2
      消费位点
      • 最新位点:从最新位点开始消费。
      • 最早位点:从最初位点开始消费。
      最新位点
      网络配置有跨境传输数据需求时选择自建公网,其他情况可选择默认网络默认网络
      表 2. 目标(大数据计算服务MaxCompute)
      参数说明示例
      账号AccessKey ID阿里云账号的AccessKey ID,用于访问MaxCompute服务。LTAI5tHPVCZywsoEVvFu****
      账号AccessKey Secret阿里云账号的AccessKey Secret。4RAKUQpZtUntDgvoKu0tvrkrOM****
      MaxCompute项目名称选择已创建的MaxCompute项目。test_compute
      MaxCompute表名称选择已创建的MaxCompute表。kafka_to_maxcompute
      MaxCompute表入参选择MaxCompute表后,此处会展示表的列名和类型信息,配置数据提取规则即可。下面是一条消息示例,本示例中Topic列对应的值从Topic字段提取,则定义数值提取规则$.topic
      {
          'topic': 'guide-sink-topic',
          'partition': 2,
          'offset': 1,
          'timestamp': 1681372713689,
          'headers': {
              'headers': [],
              'isReadOnly': False
          },
          'key': 'fc_k1',
          'value': 'fc_v1'
      }
      $.topic
      是否开启分区能力
      • 开启:开启分区能力。
      • 关闭:不开启分区能力。
      开启
      分区配置仅当是否开启分区能力参数设置为开启时需配置此参数。
      • DAY:time列的时间格式为YYYY-MM-DD,例如:2023-01-01。
      • HOUR:time列的时间格式为YYYY-MM-DD HH,例如:2023-01-01 12。
      • MINUTE:time列的时间格式为YYYY-MM-DD HH:mm,例如:2023-01-01 12:30。
      HOUR
      网络配置
      • 专有网络:通过专有网络VPC将Kafka消息投递到MaxCompute。
      • 公网:通过公网将Kafka消息投递到MaxCompute。
      公网
      VPC选择VPC ID。仅当网络配置专有网络时需配置此参数。vpc-bp17fapfdj0dwzjkd****
      交换机选择vSwitch ID。仅当网络配置专有网络时需配置此参数。vsw-bp1gbjhj53hdjdkg****
      安全组选择安全组。仅当网络配置专有网络时需配置此参数。test_group
      批量推送批量推送可帮您批量聚合多个事件,当批量推送条数批量推送间隔两个条件达到任意一个时即会触发批量推送。例如:您设置的推送条数为100条,间隔时间为15 s,在10 s内消息条数已达到100条,那么该次推送则不会等待15 s后再推送。
      • 开启:开启批量推送功能。
      • 关闭:关闭批量推动功能。默认每次仅投递给FC一条消息。
      开启
      批量推送条数一次调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求。取值范围为[1,10000]。仅当批量推送参数设置为开启时需要配置此参数。100
      批量推送间隔调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算。取值范围为[0,15],单位:秒。取值为0表示没有等待时间,直接投递。仅当批量推送参数设置为开启时需要配置此参数。10
    完成上述配置后,在消息流出(Sink)页面,找到刚创建的MaxCompute Sink Connector任务,单击其右侧操作列的启动。当状态栏由启动中变为运行中时,Connector创建成功。

步骤三:测试MaxCompute Sink Connector

  1. 消息流出(Sink)页面,在MaxCompute Sink Connector任务的事件源列单击源Topic。
  2. 在Topic详情页面,单击体验发送消息
  3. 快速体验消息收发面板,按照下图配置消息内容,然后单击确定
    发送消息
  4. 进入MaxCompute控制台,执行以下SQL语句查看分区信息。
    show PARTITIONS kafka_to_maxcompute;
    查询结果如下所示:分区
  5. 根据分区信息,执行以下语句,查看分区内数据。
    SELECT * FROM kafka_to_maxcompute WHERE time="2023-04-13 17";
    查询结果如下所示:分区内数据