Kafka是应用较为广泛的分布式、高吞吐量、高可扩展性消息队列服务,普遍用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域,是大数据生态中不可或缺的产品之一。通过数据传输服务DTS(Data Transmission Service),您可以将PolarDB MySQL同步至自建Kafka集群,扩展消息处理能力。

前提条件

  • Kafka集群的版本为0.10.1.0-1.0.2版本。
  • PolarDB MySQL已开启Binlog,详情请参见如何开启Binlog

注意事项

如果源数据库没有主键或唯一约束,且所有字段没有唯一性,可能会导致目标数据库中出现重复数据。

功能限制

  • 仅支持表粒度的数据同步。
  • 不支持自动调整同步对象。
    说明 如果在同步的过程中,对源库中待同步的表执行了重命名操作,且重命名后的名称不在同步对象中,那么该表将不再被同步到目标Kafka集群中。如果该表还需要同步,那么您需要新增同步对象

消息格式

同步到Kafka集群中的数据以avro格式存储,schema定义详情请参见DTS avro schema定义

说明 数据同步到Kafka集群后,您需要根据avro schema定义进行数据解析。

操作步骤

  1. 购买数据同步作业,详情请参见购买流程
    说明 购买时,选择源实例为PolarDB、目标实例为Kafka,并选择同步拓扑为单向同步
  2. 登录数据传输控制台
  3. 在左侧导航栏,单击数据同步
  4. 同步作业列表页面顶部,选择同步的目标实例所属地域。
    选择地域
  5. 定位至已购买的数据同步实例,单击配置同步链路
  6. 配置源实例及目标实例信息。
    同步通道的源和目标实例配置
    类别 配置 说明
    同步作业名称 DTS会自动生成一个同步作业名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。
    源实例信息 实例类型 固定为PolarDB实例,不可变更。
    实例地区 购买数据同步实例时选择的源实例地域,不可变更。
    PolarDB实例ID 选择PolarDB集群ID。
    数据库账号 填入PolarDB集群的数据库账号,需要具备待同步数据库的读权限。
    数据库密码 填入该数据库账号的密码。
    目标实例信息 实例类型 根据Kafka集群的部署位置选择,本文以ECS上的自建数据库为例介绍配置流程。
    说明 当选择为其他实例类型时,您还需要执行相应的准备工作,详情请参见准备工作概览
    实例地区 购买数据同步实例时选择的目标实例地域,不可变更。
    ECS实例ID 选择部署了Kafka集群的ECS实例ID。
    数据库类型 选择为Kafka
    端口 Kafka集群对外提供服务的端口,默认为9092。
    数据库账号 填入Kafka集群的用户名,如Kafka集群未开启验证可不填写。
    数据库密码 填入Kafka集群用户名对应的密码,如Kafka集群未开启验证可不填写。
    Topic 单击右侧的获取Topic列表,然后在下拉框中选择具体的Topic。
    Kafka版本 根据目标Kafka集群版本,选择对应的版本信息。
    连接方式 根据业务及安全需求,选择非加密连接SCRAM-SHA-256
  7. 单击页面右下角的授权白名单并进入下一步
    说明 此步骤会将DTS服务器的IP地址自动添加到源PolarDB集群的白名单和目标ECS实例的内网入方向安全组规则中,用于保障DTS服务器能够正常连接源和目标实例。
  8. 配置同步对象信息。
    配置同步对象
    配置 说明
    同步对象 源库对象区域框中,选择需要同步的对象(选择的粒度为表),然后单击向右箭头图标将其移动到已选对象区域框中。
    说明 DTS会自动将表名映射为配置同步的源和目标实例信息时选择的Topic名称。如果需要更换同步的目标Topic,请参见步骤9。
  9. 上述配置完成后单击页面右下角的下一步
  10. 配置同步初始化的高级配置信息。
    Kafka同步初始化高级配置
    配置 说明
    同步初始化 默认选择结构初始化全量数据初始化,DTS会在增量数据同步之前,将源库中待同步对象的结构和存量数据,同步到目标库。
    过滤选项 默认选择忽略增量同步阶段的 DDL,即增量同步阶段源库执行的DDL操作不会被DTS同步至目标库。
  11. 上述配置完成后,单击页面右下角的预检查并启动
    说明
    • 在数据同步作业正式启动之前,会先进行预检查。只有预检查通过后,才能成功启动数据同步作业。
    • 如果预检查失败,单击具体检查项后的提示图标,查看失败详情。根据提示修复后,重新进行预检查。
  12. 预检查对话框中显示预检查通过后,关闭预检查对话框,数据同步作业正式开始。
    您可以在数据同步页面,查看数据同步状态。查看数据同步状态