全部产品
Search
文档中心

数据传输服务 DTS:从PolarDB PostgreSQL版(兼容Oracle)集群同步至自建Kafka

更新时间:Oct 09, 2023

Kafka是应用较为广泛的分布式、高吞吐量、高可扩展性消息队列服务,普遍用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域,是大数据生态中不可或缺的产品之一。通过数据传输服务DTS(Data Transmission Service),您可以将PolarDB PostgreSQL版(兼容Oracle)集群同步至自建Kafka,扩展消息处理能力。

前提条件

  • PolarDB PostgreSQL版(兼容Oracle)集群需为最新版本,升级方式详情请见版本管理
  • PolarDB PostgreSQL版(兼容Oracle)集群中,待同步的表需具备主键或非空唯一索引。
  • PolarDB PostgreSQL版(兼容Oracle)集群中,wal_level参数的值需设置为logical,即在预写式日志WAL(Write-ahead logging)中增加支持逻辑编码所需的信息。详情请参见设置集群参数

注意事项

  • 本场景中,DTS仅支持 增量数据同步 ,不支持 结构初始化全量数据初始化
  • 一个数据同步作业只能同步一个数据库,如有多个数据库需要同步,您需要为每个数据库配置数据同步作业。
  • 为保障同步延迟时间展示的准确性,DTS会在源库中新增一个表,表名为dts_postgres_heartbeat,结构及内容如下图所示。表结构
  • 若源库有长事务,且实例包含增量同步任务,则可能会导致源库长事务提交前的预写日志WAL(Write-Ahead Logging)无法清理而堆积,从而造成源库磁盘空间不足。

费用说明

同步类型链路配置费用
库表结构同步和全量数据同步不收费。
增量数据同步收费,详情请参见计费概述

操作步骤

  1. 购买数据同步作业,详情请参见购买流程
    说明 购买时,源实例选择为PolarDB-O,目标实例选择为Kafka,并选择同步拓扑为单向同步
  2. 登录数据传输控制台

    说明

    若数据传输控制台自动跳转至数据管理DMS控制台,您可以在右下角的jiqiren中单击返回旧版,返回至旧版数据传输控制台。

  3. 在左侧导航栏,单击数据同步

  4. 同步作业列表页面顶部,选择同步的目标实例所属地域。

  5. 定位至已购买的数据同步实例,单击配置同步链路

  6. 配置同步作业的源和目标实例信息。
    类别配置说明
    同步作业名称DTS会自动生成一个同步作业名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。
    源实例信息实例类型固定为PolarDB实例,不可变更。
    实例地区购买数据同步实例时选择的源实例地域信息,不可变更。
    PolarDB实例ID选择源PolarDB PostgreSQL版(兼容Oracle)集群ID。
    数据库名称填入待同步的数据库名称。
    数据库账号填入源PolarDB PostgreSQL版(兼容Oracle)集群的高权限账号。数据库账号创建方法请参见创建数据库账号
    数据库密码填入数据库账号的密码。
    目标实例信息实例类型根据自建Kafka的部署位置进行选择,本文以ECS上的自建数据库为例介绍配置流程。
    说明 当自建Kafka为其他实例类型时,您还需要执行相应的准备工作,详情请参见准备工作概览
    实例地区购买数据同步实例时选择的目标实例地域信息,不可变更。
    ECS实例ID选择部署了Kafka集群的ECS实例ID。
    数据库类型选择为Kafka
    端口自建Kafka对外提供服务的端口,默认为9092。
    数据库账号填入自建Kafka的用户名,如自建Kafka未开启验证可不填写。
    数据库密码填入该用户的密码,如自建Kafka未开启验证可不填写。
    Topic单击右侧的获取Topic列表,然后在下拉框中选择具体的Topic。
    Kafka版本根据自建Kafka的版本进行选择。
    连接方式根据业务及安全需求,选择非加密连接SCRAM-SHA-256
  7. 单击页面右下角的授权白名单并进入下一步

    如果源或目标数据库是阿里云数据库实例(例如RDS MySQL云数据库MongoDB版等),DTS会自动将对应地区DTS服务的IP地址添加到阿里云数据库实例的白名单中;如果源或目标数据库是ECS上的自建数据库,DTS会自动将对应地区DTS服务的IP地址添到ECS的安全规则中,您还需确保自建数据库没有限制ECS的访问(若数据库是集群部署在多个ECS实例,您需要手动将DTS服务对应地区的IP地址添到其余每个ECS的安全规则中);如果源或目标数据库是IDC自建数据库或其他云数据库,则需要您手动添加对应地区DTS服务的IP地址,以允许来自DTS服务器的访问。DTS服务的IP地址,请参见DTS服务器的IP地址段

    警告

    DTS自动添加或您手动添加DTS服务的公网IP地址段可能会存在安全风险,一旦使用本产品代表您已理解和确认其中可能存在的安全风险,并且需要您做好基本的安全防护,包括但不限于加强账号密码强度防范、限制各网段开放的端口号、内部各API使用鉴权方式通信、定期检查并限制不需要的网段,或者使用通过内网(专线/VPN网关/智能网关)的方式接入。

  8. 配置同步对象。
    选择同步对象
    配置说明
    选择同步对象源库对象框中单击待同步的对象(仅支持表粒度的选择),然后单击向右小箭头将其移动至已选择对象框。
    说明 DTS会自动将表名映射为步骤6中选择的Topic名称。如需更换同步的目标Topic(该Topic需是Kafka实例中真实存在),您可以将鼠标指针放置在要进行名称映射的表上,并单击出现的编辑进行调整,详情请参见设置同步对象在目标实例中的名称
    投递到kafka的数据格式同步到Kafka集群中的数据以avro格式或者Shareplex Json格式存储,定义详情请参见Kafka集群的数据存储格式
    同步到Kafka Partition策略根据业务需求选择同步的策略,详细介绍请参见Kafka Partition同步策略说明
    映射名称更改

    如需更改同步对象在目标实例中的名称,请使用对象名映射功能,详情请参见库表列映射

    源、目标库无法连接重试时间

    当源、目标库无法连接时,DTS默认重试720分钟(即12小时),您也可以自定义重试时间。如果DTS在设置的时间内重新连接上源、目标库,同步任务将自动恢复。否则,同步任务将失败。

    说明

    由于连接重试期间,DTS将收取任务运行费用,建议您根据业务需要自定义重试时间,或者在源和目标库实例释放后尽快释放DTS实例。

  9. 上述配置完成后,单击页面右下角的下一步

  10. 配置同步初始化和过滤选项。
    数据同步高级配置
    配置说明
    同步初始化默认勾选增量数据初始化,即DTS会将同步过程中,源库产生的增量变更数据实时同步至目标库。
    过滤选项默认勾选忽略增量同步阶段的 DDL,即DTS不会将源库在增量数据同步阶段执行的DDL操作同步至目标库。
    说明 目前还不支持过滤选项,即无论是否勾选该选项,都无法将源库在增量同步阶段执行的DDL操作同步至目标库。
  11. 上述配置完成后,单击页面右下角的预检查并启动

    说明
    • 在同步作业正式启动之前,会先进行预检查。只有预检查通过后,才能成功启动同步作业。

    • 如果预检查失败,单击具体检查项后的提示,查看失败详情。

      • 您可以根据提示修复后重新进行预检查。

      • 如无需修复告警检测项,您也可以选择确认屏蔽忽略告警项并重新进行预检查,跳过告警检测项重新进行预检查。

  12. 预检查对话框中显示预检查通过后,关闭预检查对话框,同步作业将正式开始。

  13. 等待同步作业的链路初始化完成,直至处于同步中状态。

    您可以在数据同步页面,查看数据同步作业的状态。查看同步作业状态