本文为您介绍如何在阿里云实时计算Flink版上实现Flink CDC以Iceberg REST与DLF Catalog对接。
前提条件
已创建Flink全托管工作空间。如未创建,详情请参见开通实时计算Flink版。
请确保Flink工作空间与DLF位于同一地域下,且添加Flink所在VPC为白名单,请参见配置VPC白名单。
使用限制
仅实时计算引擎VVR 11.6.0及以上版本支持以 Iceberg REST对接DLF Catalog。
创建Catalog
这里仅用于建立与DLF Catalog的映射连接,创建或删除Catalog仅影响映射关系,不会对DLF中的实际数据产生影响。
通过Iceberg REST 在DLF Catalog创建的表均为Iceberg表。
登录实时计算控制台。
单击目标工作空间操作列下的控制台,进入对应的工作空间。
在左侧导航栏,单击。
在数据查询文本编辑区域,输入以下SQL,单击页面右下角的执行环境,选择对应版本的Session集群(VVR 11.2.0及以上引擎版本),执行SQL,即可以Iceberg REST创建DLF Catalog。
CREATE CATALOG `catalog_name` WITH ( 'type' = 'iceberg', 'catalog-type' = 'rest', 'uri' = 'http://cn-hangzhou-vpc.dlf.aliyuncs.com/iceberg', 'warehouse' = 'iceberg_test', 'rest.signing-region' = 'cn-hangzhou', 'io-impl' = 'org.apache.iceberg.rest.DlfFileIO' );参数说明如下:
参数
描述
是否必填
示例
type
类型,固定为iceberg。从自定义Jar自动解析,请勿更改。
是
iceberg
catalog-type
Catalog类型,固定为rest。
是
rest
token.provider
Token提供方,固定为dlf。
是
dlf
uri
访问DLF Rest Catalog的URI,详见Iceberg REST。
是
http://cn-hangzhou-vpc.dlf.aliyuncs.com/iceberg
warehouse
DLF Catalog名称。
是
iceberg_test
rest.signing-region
DLF的Region ID,详见服务接入点。
是
cn-hangzhou
io-impl
固定值:org.apache.iceberg.rest.DlfFileIO
是
org.apache.iceberg.rest.DlfFileIO
Flink CDC对接Catalog配置参数
创建数据摄入作业的操作流程,请参见Flink CDC数据摄入作业开发。
在预先创建好 Flink Catalog映射的前提下,参考复用已有Catalog获取连接信息中复用已有Catalog获取连接信息的功能,Flink中数据摄入作业的Sink使用以下配置:
sink:
type: iceberg
using.built-in-catalog: catalog_name配置示例
下面为您介绍几种典型的通过Flink CDC YAML作业将数据同步到数据湖DLF的配置方案:
MySQL整库同步数据湖DLF
MySQL整库同步数据到DLF的CDC YAML作业如下所示:
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
#(可选)同步增量阶段新创建的表的数据
scan.binlog.newly-added-table.enabled: true
#(可选)同步表注释和字段注释
include-comments.enabled: true
#(可选)优先分发无界的分片以避免可能出现的TaskManager OutOfMemory问题
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
#(可选)开启解析过滤,加速读取
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: iceberg
using.built-in-catalog: catalog_name【MySQL Source配置】,建议设置下列配置项,详情请参见MySQL。
参数:scan.binlog.newly-added-table.enabled
作用:同步增量阶段新创建的表的数据。
参数:include-comments.enabled
作用:同步表注释和字段注释。
参数:scan.incremental.snapshot.unbounded-chunk-first.enabled
作用:避免可能出现的TaskManager OutOfMemory问题。
参数:scan.only.deserialize.captured.tables.changelog.enabled: true
作用:仅对作业匹配的表的数据进行解析,加速读取。
写入数据湖DLF分区表
数据摄入作业的源表通常不包含分区字段信息,如果希望写入的下游表为分区表,您需要通过Flink CDC数据摄入作业开发参考中的partition-keys设置分区字段,配置示例如下:
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
#(可选)同步增量阶段新创建的表的数据
scan.binlog.newly-added-table.enabled: true
#(可选)同步表注释和字段注释
include-comments.enabled: true
#(可选)优先分发无界的分片以避免可能出现的TaskManager OutOfMemory问题
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
#(可选)开启解析过滤,加速读取
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: iceberg
using.built-in-catalog: catalog_name
transform:
- source-table: mysql_test.tbl1
#(可选)设置分区字段
partition-keys: id,pt
- source-table: mysql_test.tbl2
partition-keys: id,pt写入数据湖DLF Append Only表
数据摄入作业的源表包含完整的变更类型,如果希望写入的下游表为将删除操作转化为插入操作实现逻辑删除的功能,您可以通过Flink CDC数据摄入作业开发参考实现该需求,配置示例如下:
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
#(可选)同步增量阶段新创建的表的数据
scan.binlog.newly-added-table.enabled: true
#(可选)同步表注释和字段注释
include-comments.enabled: true
#(可选)优先分发无界的分片以避免可能出现的TaskManager OutOfMemory问题
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
#(可选)开启解析过滤,加速读取
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: iceberg
using.built-in-catalog: catalog_name
transform:
- source-table: mysql_test.tbl1
#(可选)设置分区字段
partition-keys: id,pt
#(可选)实现软删除
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETE
- source-table: mysql_test.tbl2
#(可选)设置分区字段
partition-keys: id,pt
#(可选)实现软删除
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETE通过在projection中添加__data_event_type,将变更类型作为新增字段写入到下游表中。同时设置converter-after-transform为SOFT_DELETE,可以将删除操作转化为插入操作,使得下游能够完整记录全部变更操作。详见Flink CDC数据摄入作业开发参考。
Kafka CDC数据实时同步到数据湖DLF
假设Kafka的inventory Topic中存储了两张表(customers和products)的变更数据,且数据格式为Debezium JSON。以下示例作业可将这两张表的数据分别同步到DLF对应的目标表:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: ${kafka.bootstrap.servers}
topic: inventory
scan.startup.mode: earliest-offset
value.format: debezium-json
debezium-json.distributed-tables: true
sink:
type: iceberg
using.built-in-catalog: catalog_name
# debezium-json不包含主键信息,需要另外为表添加主键
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: idKafka数据源读取的格式支持canal-json、debezium-json(默认)和json格式。
当数据格式为debezium-json时,由于debezium-json消息不记录主键信息,需要通过transform规则手动为表添加主键:
transform: - source-table: \.*.\.* projection: \* primary-keys: id当单表的数据分布在多个分区中,或数据位于不同分区中的表需要进行分库分表合并时,需要将配置项debezium-json.distributed-tables或canal-json.distributed-tables设为true。
kafka数据源支持多种Schema推导策略,可以通过配置项schema.inference.strategy设置,Schema推导和变更同步策略详情请参见消息队列Kafka。
Kafka 日志数据实时同步到数据湖DLF
如果您的Kafka集群中存储的是自定义的JSON格式的数据,您可以配置CDC YAML作业同步Kafka数据到DLF存储,我们会为您提供自动数据类型推导、表结构推导和表结构演进的支持。
假设Kafka的inventory Topic中存储了一张日志表的数据,且数据格式为JSON。以下示例作业可将这张表的数据同步到DLF对应的目标表:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: ${kafka.bootstrap.servers}
topic: inventory
scan.startup.mode: earliest-offset
value.format: json
# (可选)递归式地展开JSON中的嵌套列
json.infer-schema.flatten-nested-columns.enable: true
# (可选)跳过前 100 次出现的解析异常;若超过 100 次则作业失败。
ingestion.ignore-errors: true
ingestion.error-tolerance.max-count: 100
sink:
type: iceberg
using.built-in-catalog: catalog_name
# 为表添加主键信息
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: id
# 将 inventory topic 中所有的数据都写入到 test_database.inventory 表中
route:
- source-table: inventory
sink-table: test_database.inventory
pipeline:
# (可选)将会导致处理异常的脏数据记录到日志中
dirty-data.collector:
name: Logger Dirty Data Collector
type: logger假设Kafka的inventory Topic中存储了多张日志表的数据,数据格式为JSON,且在JSON内容中的databaseName、tableName字段中提供了库名、表名信息。以下示例作业可将这个Topic中的多张表的数据同步到DLF对应的目标表:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: ${kafka.bootstrap.servers}
topic: inventory
scan.startup.mode: earliest-offset
value.format: json
# (可选)递归式地展开JSON中的嵌套列
json.infer-schema.flatten-nested-columns.enable: true
# 使用 databaseName 字段中的值作为库名,使用 tableName 字段中的值作为表名
json.decode.parser-table-id.fields: databaseName,tableName
# (可选)跳过前 100 次出现的解析异常;若超过 100 次则作业失败。
ingestion.ignore-errors: true
ingestion.error-tolerance.max-count: 100
sink:
type: iceberg
using.built-in-catalog: catalog_name
# 为表添加主键信息
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: id
# 将 ods.inventory、ods.customer,ods.user 中的数据分别写入到 test_database.inventory,test_database.customer,test_database.user 表中
route:
- source-table: ods.inventory
sink-table: test_database.inventory
- source-table: ods.customer
sink-table: test_database.customer
- source-table: ods.user
sink-table: test_database.user
pipeline:
# (可选)将会导致处理异常的脏数据记录到日志中
dirty-data.collector:
name: Logger Dirty Data Collector
type: logger如果您希望了解更多JSON格式的Kafka源表的表结构推导与演进策略,可以参考表结构解析和变更同步策略说明。
如果您希望添加进行更精细的作业配置,可以查询Flink CDC数据摄入作业开发参考。