Kafka是应用较为广泛的分布式、高吞吐量、高可扩展性消息队列服务,普遍用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域,是大数据生态中不可或缺的产品之一。通过数据传输服务DTS(Data Transmission Service),您可以将PolarDB-O集群同步至自建Kafka,扩展消息处理能力。

前提条件

  • 源PolarDB-O集群需为最新版本,升级方式详情请见小版本升级
  • 源PolarDB-O集群中,待同步的表需具备主键或非空唯一索引。
  • 源PolarDB-O集群中,wal_level参数的值需设置为logical,即在预写式日志WAL(Write-ahead logging)中增加支持逻辑编码所需的信息。详情请参见设置集群参数

注意事项

  • 本场景中,DTS仅支持 增量数据同步 ,不支持 结构初始化全量数据初始化
  • 一个数据同步作业只能同步一个数据库,如有多个数据库需要同步,您需要为每个数据库配置数据同步作业。
  • 为保障同步延迟时间展示的准确性,DTS会在源库中新增一个表,表名为dts_postgres_heartbeat,结构及内容如下图所示。表结构

操作步骤

  1. 购买数据同步作业,详情请参见购买流程
    说明 购买时,源实例选择为POLARDB,目标实例选择为Kafka,并选择同步拓扑为单向同步
  2. 登录数据传输控制台
  3. 在左侧导航栏,单击数据同步
  4. 同步作业列表页面顶部,选择同步的目标实例所属地域。
    选择地域
  5. 定位至已购买的数据同步实例,单击配置同步链路
  6. 配置同步作业的源和目标实例信息。
    配置源和目标实例信息
    类别 配置 说明
    同步作业名称 DTS会自动生成一个同步作业名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。
    源实例信息 实例类型 固定为PolarDB实例,不可变更。
    实例地区 购买数据同步实例时选择的源实例地域信息,不可变更。
    PolarDB实例ID 选择源PolarDB集群ID。
    数据库名称 填入待同步的数据库名称。
    数据库账号 填入源PolarDB集群的高权限账号。数据库账号创建方法请参见创建数据库账号
    数据库密码 填入数据库账号的密码。
    目标实例信息 实例类型 根据自建Kafka的部署位置进行选择,本文以ECS上的自建数据库为例介绍配置流程。
    说明 当自建Kafka为其他实例类型时,您还需要执行相应的准备工作,详情请参见准备工作概览
    实例地区 购买数据同步实例时选择的目标实例地域信息,不可变更。
    ECS实例ID 选择部署了Kafka集群的ECS实例ID。
    数据库类型 选择为Kafka
    端口 自建Kafka对外提供服务的端口,默认为9092。
    数据库账号 填入自建Kafka的用户名,如自建Kafka未开启验证可不填写。
    数据库密码 填入该用户的密码,如自建Kafka未开启验证可不填写。
    Topic 单击右侧的获取Topic列表,然后在下拉框中选择具体的Topic。
    Kafka版本 根据自建Kafka的版本进行选择。
    连接方式 根据业务及安全需求,选择非加密连接SCRAM-SHA-256
  7. 单击页面右下角的授权白名单并进入下一步
    说明 此步骤会将DTS服务器的IP地址自动添加到PolarDB集群的白名单和ECS实例的内网入方向安全规则中,用于保障DTS服务器能够正常连接源集群和目标实例。
  8. 配置同步对象。
    选择同步对象
    配置 说明
    选择同步对象 源库对象框中单击待同步的对象(仅支持表粒度的选择),然后单击向右小箭头将其移动至已选择对象框。
    说明 DTS会自动将表名映射为步骤6中选择的Topic名称。如需更换同步的目标Topic(该Topic需是Kafka实例中真实存在),您可以将鼠标指针放置在要进行名称映射的表上,并单击出现的编辑进行调整,详情请参见设置同步对象在目标实例中的名称
  9. 上述配置完成后,单击页面右下角的下一步
  10. 配置同步初始化和过滤选项。
    数据同步高级配置
    配置 说明
    同步初始化 默认勾选增量数据初始化,即DTS会将同步过程中,源库产生的增量变更数据实时同步至目标库。
    过滤选项 默认勾选忽略增量同步阶段的 DDL,即DTS不会将源库在增量数据同步阶段执行的DDL操作同步至目标库。
    说明 目前还不支持过滤选项,即无论是否勾选该选项,都无法将源库在增量同步阶段执行的DDL操作同步至目标库。
  11. 上述配置完成后,单击页面右下角的预检查并启动
    说明
    • 在数据同步作业正式启动之前,会先进行预检查。只有预检查通过后,才能成功启动数据同步作业。
    • 如果预检查失败,单击具体检查项后的提示图标,查看失败详情。根据提示修复后,重新进行预检查。
  12. 预检查对话框中显示预检查通过后,关闭预检查对话框,同步作业将正式开始。
  13. 等待同步作业的链路初始化完成,直至处于同步中状态。
    您可以在数据同步页面,查看数据同步作业的状态。查看同步作业状态