本文说明如何创建MaxCompute Sink Connector将数据从消息队列Kafka版实例的数据源Topic导出至MaxCompute的表。

前提条件

在创建MaxCompute Sink Connector前,请确保您已完成以下操作:
  • 消息队列Kafka版实例开启Connector。更多信息,请参见开启Connector
  • 消息队列Kafka版实例创建数据源Topic。更多信息,请参见步骤一:创建Topic

    本文以名称为maxcompute-test-input的Topic为例。

  • 通过MaxCompute客户端创建表。更多信息,请参见创建和查看表

    本文以名称为connector_test的项目下名称为test_kafka的表为例。该表的建表语句如下:

    CREATE TABLE IF NOT EXISTS test_kafka(topic STRING,partition BIGINT,offset BIGINT,key STRING,value STRING) PARTITIONED by (pt STRING);

操作流程

使用MaxCompute Sink Connector将数据从消息队列Kafka版实例的数据源Topic导出至MaxCompute的表操作流程如下:

  1. 授予消息队列Kafka版访问MaxCompute的权限
  2. 可选:创建MaxCompute Sink Connector依赖的Topic和Consumer Group

    如果您不需要自定义Topic和Consumer Group的名称,您可以直接跳过该步骤,在下一步骤选择自动创建。

    注意 部分MaxCompute Sink Connector依赖的Topic的存储引擎必须为Local存储,大版本为0.10.2的消息队列Kafka版实例不支持手动创建Local存储的Topic,只支持自动创建。
    1. 可选:创建MaxCompute Sink Connector依赖的Topic
    2. 可选:创建MaxCompute Sink Connector依赖的Consumer Group
  3. 创建并部署MaxCompute Sink Connector
  4. 结果验证
    1. 发送测试消息
    2. 查看表数据

创建RAM角色

由于RAM角色不支持直接选择消息队列Kafka版作为受信服务,您在创建RAM角色时,需要选择任意支持的服务作为受信服务。RAM角色创建后,手工修改信任策略。

  1. 登录访问控制控制台
  2. 在左侧导航栏,单击RAM角色管理
  3. RAM角色管理页面,单击创建RAM角色
  4. 创建RAM角色面板,执行以下操作。
    1. 当前可信实体类型区域,选择阿里云服务,然后单击下一步
    2. 角色类型区域,选择普通服务角色,在角色名称文本框,输入AliyunKafkaMaxComputeUser1,从选择受信服务列表,选择函数计算,然后单击完成
  5. RAM角色管理页面,找到AliyunKafkaMaxComputeUser1,单击AliyunKafkaMaxComputeUser1
  6. AliyunKafkaMaxComputeUser1页面,单击信任策略管理页签,单击修改信任策略
  7. 修改信任策略面板,将脚本中fc替换为alikafka,单击确定
    pg_ram

添加权限

为使Connector将消息同步到MaxCompute表,您需要为创建的RAM角色至少授予以下权限:

客体 操作 描述
Project CreateInstance 在项目中创建实例。
Table Describe 读取表的元信息。
Table Alter 修改表的元信息或添加删除分区。
Table Update 覆盖或添加表的数据。

关于以上权限的详细说明以及授权操作,请参见授权

为本文创建的AliyunKafkaMaxComputeUser1添加权限的示例步骤如下:

  1. 登录MaxCompute客户端。
  2. 执行以下命令添加RAM角色为用户。
    add user `RAM$<accountid>:role/aliyunkafkamaxcomputeuser1`;
    说明 将<accountid>替换为您自己的阿里云账号ID。
  3. 为RAM用户授予访问MaxCompute所需的最小权限。
    1. 执行以下命令为RAM用户授予项目相关权限。
      grant CreateInstance on project connector_test to user `RAM$<accountid>:role/aliyunkafkamaxcomputeuser1`;
      说明 将<accountid>替换为您自己的阿里云账号ID。
    2. 执行以下命令为RAM用户授予表相关权限。
      grant Describe, Alter, Update on table test_kafka to user `RAM$<accountid>:role/aliyunkafkamaxcomputeuser1`;
      说明 将<accountid>替换为您自己的阿里云账号ID。

创建MaxCompute Sink Connector依赖的Topic

您可以在消息队列Kafka版控制台手动创建MaxCompute Sink Connector依赖的5个Topic。

  1. 登录消息队列Kafka版控制台
  2. 在顶部菜单栏,选择地域。
  3. 在左侧导航栏,单击实例列表
  4. 实例列表页面,单击目标实例名称。
  5. 在左侧导航栏,单击Topic管理
  6. Topic管理页面,单击创建 Topic
  7. 创建 Topic面板,设置Topic属性,然后单击创建
    Topic 描述
    任务位点Topic 用于存储消费位点的Topic。
    • Topic名称:建议以connect-offset开头。
    • 分区数:Topic的分区数量必须大于1。
    • 存储引擎:Topic的存储引擎必须为Local存储。
    • cleanup.policy:Topic的日志清理策略必须为compact。
    任务配置Topic 用于存储任务配置的Topic。
    • Topic名称:建议以connect-config开头。
    • 分区数:Topic的分区数量必须为1。
    • 存储引擎:Topic的存储引擎必须为Local存储。
    • cleanup.policy:Topic的日志清理策略必须为compact。
    任务状态Topic 用于存储任务状态的Topic。
    • Topic名称:建议以connect-status开头。
    • 分区数:Topic的分区数量建议为6。
    • 存储引擎:Topic的存储引擎必须为Local存储。
    • cleanup.policy:Topic的日志清理策略必须为compact。
    死信队列Topic 用于存储Connect框架的异常数据的Topic。该Topic可以和异常数据Topic为同一个Topic,以节省Topic资源。
    • Topic名称:建议以connect-error开头。
    • 分区数:Topic的分区数量建议为6。
    • 存储引擎:Topic的存储引擎可以为Local存储或云存储。
    异常数据Topic 用于存储Sink的异常数据的Topic。该Topic可以和死信队列Topic为同一个Topic,以节省Topic资源。
    • Topic名称:建议以connect-error开头。
    • 分区数:Topic的分区数量建议为6。
    • 存储引擎:Topic的存储引擎可以为Local存储或云存储。

创建MaxCompute Sink Connector依赖的Consumer Group

您可以在消息队列Kafka版控制台手动创建MaxCompute Sink Connector依赖的2个Consumer Group。

  1. 登录消息队列Kafka版控制台
  2. 在顶部菜单栏,选择地域。
  3. 在左侧导航栏,单击实例列表
  4. 实例列表页面,单击目标实例名称。
  5. 在左侧导航栏,单击Consumer Group管理
  6. Consumer Group管理页面,单击创建Consumer Group
  7. 创建Consumer Group 面板,设置Consumer Group属性,然后单击创建
    Consumer Group 描述
    Connector任务消费组 Connector的数据同步任务使用的Consumer Group。该Consumer Group的名称必须为connect-任务名称
    Connector消费组 Connector使用的Consumer Group。该Consumer Group的名称建议以connect-cluster开头。

创建并部署MaxCompute Sink Connector

创建并部署用于将数据从消息队列Kafka版同步至MaxCompute的MaxCompute Sink Connector。

  1. 登录消息队列Kafka版控制台
  2. 在顶部菜单栏,选择地域。
  3. 在左侧导航栏,单击实例列表
  4. 实例列表页面,单击目标实例名称。
  5. 在左侧导航栏,单击Connector(公测组件)
  6. Connector(公测组件)页面,单击创建Connector
  7. 创建Connector面板,完成以下操作。
    1. 基础信息页签,配置以下参数,然后单击下一步
      参数 描述 示例值
      Connector名称 Connector的名称。命名规则:
      • 可以包含数字、小写英文字母和短划线(-),但不能以短划线(-)开头,长度限制为48个字符。
      • 同一个消息队列Kafka版实例内保持唯一。

      Connector的数据同步任务必须使用名称为connect-任务名称的Consumer Group。如果您未手动创建该Consumer Group,系统将为您自动创建。

      kafka-maxcompute-sink
      转储路径 配置数据转储的源和目标。第一个下拉列表中选择数据源,第二个下拉列表中选择目标。 消息队列Kafka版转储到MaxCompute
    2. 源实例配置页签,按需配置以下参数,然后单击下一步
      说明 如果您已创建好Topic和Consumer Group,那么请选择手动创建资源,并填写已创建的资源信息。否则,请选择自动创建资源。
      参数 描述 示例值
      VPC ID 数据同步任务所在的VPC。默认为消息队列Kafka版实例所在的VPC,您无需填写。 vpc-bp1xpdnd3l***
      VSwitch ID 数据同步任务所在的交换机。该交换机必须与消息队列Kafka版实例处于同一VPC。默认为部署消息队列Kafka版实例时填写的交换机。 vsw-bp1d2jgg81***
      数据源Topic 需要同步数据的Topic。 maxcompute-test-input
      消费初始位置 开始消费的位置。取值说明如下:
      • latest:从最新位点开始消费。
      • earliestearliest:从最初位点开始消费。
      latest
      Connector消费组 Connector使用的Consumer Group。该Consumer Group的名称建议以connect-cluster开头。 connect-cluster-kafka-maxcompute-sink
      任务位点Topic 用于存储消费位点的Topic。
      • Topic名称:建议以connect-offset开头。
      • 分区数:Topic的分区数量必须大于1。
      • 存储引擎:Topic的存储引擎必须为Local存储。
      • cleanup.policy:Topic的日志清理策略必须为compact。
      connect-offset-kafka-maxcompute-sink
      任务配置Topic 用于存储任务配置的Topic。
      • Topic名称:建议以connect-config开头。
      • 分区数:Topic的分区数量必须为1。
      • 存储引擎:Topic的存储引擎必须为Local存储。
      • cleanup.policy:Topic的日志清理策略必须为compact。
      connect-config-kafka-maxcompute-sink
      任务状态Topic 用于存储任务状态的Topic。
      • Topic名称:建议以connect-status开头。
      • 分区数:Topic的分区数量建议为6。
      • 存储引擎:Topic的存储引擎必须为Local存储。
      • cleanup.policy:Topic的日志清理策略必须为compact。
      connect-status-kafka-maxcompute-sink
      死信队列Topic 用于存储Connect框架的异常数据的Topic。该Topic可以和异常数据Topic为同一个Topic,以节省Topic资源。
      • Topic名称:建议以connect-error开头。
      • 分区数:Topic的分区数量建议为6。
      • 存储引擎:Topic的存储引擎可以为Local存储或云存储。
      connect-error-kafka-maxcompute-sink
      异常数据Topic 用于存储Sink的异常数据的Topic。该Topic可以和死信队列Topic为同一个Topic,以节省Topic资源。
      • Topic名称:建议以connect-error开头。
      • 分区数:Topic的分区数量建议为6。
      • 存储引擎:Topic的存储引擎可以为Local存储或云存储。
      connect-error-kafka-maxcompute-sink
    3. 目标实例配置页签,按需配置以下参数,然后单击下一步
      参数 描述 示例值
      MaxCompute连接地址 MaxCompute的服务接入点。更多信息,请参见配置Endpoint
      • VPC网络Endpoint:低延迟,推荐。适用于消息队列Kafka版实例和MaxCompute处于同一地域场景。
      • 外网Endpoint:高延迟,不推荐。适用于消息队列Kafka版实例和MaxCompute处于不同地域的场景。如需使用公网Endpoint,您需要为Connector开启公网访问。更多信息,请参见为Connector开启公网访问
      http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api
      MaxCompute工作空间 MaxCompute的工作空间。 connector_test
      MaxCompute表 MaxCompute的表。 test_kafka
      MaxCompute表地域 MaxCompute表所在地域。 华东1(杭州)
      服务账号 MaxCompute的阿里云账号ID。 188***
      授权角色名 消息队列Kafka版的RAM角色的名称。更多信息,请参见创建RAM角色 AliyunKafkaMaxComputeUser1
      模式 消息同步到Connector的模式。默认为DEFAULT。取值说明如下:
      • KEY:只保留消息的Key,并将Key写入MaxCompute表的key列。
      • VALUE:只保留消息的Value,并将Value写入MaxCompute表的value列。
      • DEFAULT:同时保留消息的Key和Value,并将Key和Value分别写入MaxCompute表的key列和value列。
        注意 DEFAULT模式下,不支持选择CSV格式,只支持TEXT格式和BINARY格式。
      DEFAULT
      格式 消息同步到Connector的格式。默认为TEXT。取值说明如下:
      • TEXT:消息的格式为字符串。
      • BINARY:消息的格式为字节数组。
      • CSV:消息的格式为逗号(,)分隔的字符串。
        注意 CSV格式下,不支持DEFAULT模式,只支持KEY模式和VALUE模式:
        • KEY模式:只保留消息的Key,根据逗号(,)分隔Key字符串,并将分隔后的字符串按照索引顺序写入表。
        • VALUE模式:只保留消息的Value,根据逗号(,)分隔Value字符串,并将分隔后的字符串按照索引顺序写入表。
      TEXT
      分区 分区的粒度。默认为HOUR。取值说明如下:
      • DAY:每天将数据写入一个新分区。
      • HOUR:每小时将数据写入一个新分区。
      • MINUTE:每分钟将数据写入一个新分区。
      HOUR
      时区 向Connector的数据源Topic发送消息的消息队列Kafka版生产者客户端所在时区。默认为GMT 08:00。 GMT 08:00
    4. 预览/提交下方,确认Connector的配置,然后单击提交
  8. 创建Connector面板,单击部署

发送测试消息

部署MaxCompute Sink Connector后,您可以向消息队列Kafka版的数据源Topic发送消息,测试数据能否被同步至MaxCompute。

  1. Connector(公测组件)页面,找到目标Connector,在其操作列,单击测试
  2. Topic管理页面,选择实例,找到maxcompute-test-input,在其操作列,单击发送消息
  3. 发送消息面板,发送测试消息。
    1. 分区文本框,输入0
    2. Message Key文本框,输入1
    3. Message Value文本框,输入1
    4. 单击发送

查看表数据

消息队列Kafka版的数据源Topic发送消息后,在MaxCompute客户端查看表数据,验证是否收到消息。

查看本文写入的test_kafka的示例步骤如下:

  1. 登录MaxCompute客户端。
  2. 执行以下命令查看表的数据分区。
    show partitions test_kafka;
    返回结果示例如下:
    pt=11-17-2020 15
    
    OK
  3. 执行以下命令查看分区的数据。
    select * from test_kafka where pt ="11-17-2020 14";
    返回结果示例如下:
    +----------------------+------------+------------+-----+-------+---------------+
    | topic                | partition  | offset     | key | value | pt            |
    +----------------------+------------+------------+-----+-------+---------------+
    | maxcompute-test-input| 0          | 0          | 1   | 1     | 11-17-2020 14 |
    +----------------------+------------+------------+-----+-------+---------------+