Kafka是应用较为广泛的分布式、高吞吐量、高可扩展性消息队列服务,普遍用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域,是大数据生态中不可或缺的产品之一。通过数据传输服务DTS(Data
Transmission Service),您可以将PolarDB MySQL同步至自建Kafka集群,扩展消息处理能力。
前提条件
- Kafka集群的版本为0.10.1.0-1.0.2版本。
- PolarDB MySQL已开启Binlog,详情请参见如何开启Binlog。
注意事项
如果源数据库没有主键或唯一约束,且所有字段没有唯一性,可能会导致目标数据库中出现重复数据。
功能限制
- 仅支持表粒度的数据同步。
- 不支持自动调整同步对象。
说明 如果在同步的过程中,对源库中待同步的表执行了重命名操作,且重命名后的名称不在同步对象中,那么该表将不再被同步到目标Kafka集群中。如果该表还需要同步,那么您需要
新增同步对象。
消息格式
同步到Kafka集群中的数据以avro格式存储,schema定义详情请参见DTS avro schema定义。
说明 数据同步到Kafka集群后,您需要根据avro schema定义进行数据解析。
操作步骤
- 购买数据同步作业,详情请参见购买流程。
说明 购买时,选择源实例为PolarDB、目标实例为Kafka,并选择同步拓扑为单向同步。
- 登录数据传输控制台。
- 在左侧导航栏,单击数据同步。
- 在同步作业列表页面顶部,选择同步的目标实例所属地域。
- 定位至已购买的数据同步实例,单击配置同步链路。
- 配置源实例及目标实例信息。

类别 |
配置 |
说明 |
无 |
同步作业名称 |
DTS会自动生成一个同步作业名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。 |
源实例信息 |
实例类型 |
固定为PolarDB实例,不可变更。
|
实例地区 |
购买数据同步实例时选择的源实例地域,不可变更。 |
PolarDB实例ID |
选择PolarDB集群ID。 |
数据库账号 |
填入PolarDB集群的数据库账号,需要具备待同步数据库的读权限。 |
数据库密码 |
填入该数据库账号的密码。 |
目标实例信息 |
实例类型 |
根据Kafka集群的部署位置选择,本文以ECS上的自建数据库为例介绍配置流程。
说明 当选择为其他实例类型时,您还需要执行相应的准备工作,详情请参见 准备工作概览。
|
实例地区 |
购买数据同步实例时选择的目标实例地域,不可变更。 |
ECS实例ID |
选择部署了Kafka集群的ECS实例ID。 |
数据库类型 |
选择为Kafka。
|
端口 |
Kafka集群对外提供服务的端口,默认为9092。 |
数据库账号 |
填入Kafka集群的用户名,如Kafka集群未开启验证可不填写。 |
数据库密码 |
填入Kafka集群用户名对应的密码,如Kafka集群未开启验证可不填写。 |
Topic |
单击右侧的获取Topic列表,然后在下拉框中选择具体的Topic。
|
Kafka版本 |
根据目标Kafka集群版本,选择对应的版本信息。 |
连接方式 |
根据业务及安全需求,选择非加密连接或SCRAM-SHA-256。
|
- 单击页面右下角的授权白名单并进入下一步。
说明 此步骤会将DTS服务器的IP地址自动添加到源PolarDB集群的白名单和目标ECS实例的内网入方向安全组规则中,用于保障DTS服务器能够正常连接源和目标实例。
- 配置同步对象信息。

配置 |
说明 |
同步对象 |
在源库对象区域框中,选择需要同步的对象(选择的粒度为表),然后单击 图标将其移动到已选对象区域框中。
说明 DTS会自动将表名映射为配置同步的源和目标实例信息时选择的Topic名称。如果需要更换同步的目标Topic,请参见步骤9。
|
- 上述配置完成后单击页面右下角的下一步。
- 配置同步初始化的高级配置信息。

配置 |
说明 |
同步初始化 |
默认选择结构初始化和全量数据初始化,DTS会在增量数据同步之前,将源库中待同步对象的结构和存量数据,同步到目标库。
|
过滤选项 |
默认选择忽略增量同步阶段的 DDL,即增量同步阶段源库执行的DDL操作不会被DTS同步至目标库。
|
- 上述配置完成后,单击页面右下角的预检查并启动。
说明
- 在数据同步作业正式启动之前,会先进行预检查。只有预检查通过后,才能成功启动数据同步作业。
- 如果预检查失败,单击具体检查项后的
图标,查看失败详情。根据提示修复后,重新进行预检查。
- 在预检查对话框中显示预检查通过后,关闭预检查对话框,数据同步作业正式开始。
您可以在
数据同步页面,查看数据同步状态。
