Flink全托管产品提供丰富强大的日志数据实时入仓入湖能力。本文为您介绍如何在Flink全托管控制台上快速构建一个从Kafka到Hologres的数据同步作业。

背景信息

假设消息队列Kafka实例中有一个名称为users的Topic,其中有100条JSON数据,其数据分布大致如下图所示。数据分布

本文使用Flink全托管提供的CREATE TABLE AS(CTAS)语句,一键完成日志数据的同步,以及实时的表结构变更同步。

前提条件

步骤一:配置IP白名单

为了让Flink能访问Kafka和Hologres实例,您需要将Flink全托管实例的网段添加到在Kafka和Hologres的白名单中。

  1. 获取Flink全托管实例的VPC网段。
    1. 登录实时计算控制台
    2. 在目标工作空间右侧操作列,选择其他 > 工作空间详情
    3. 工作空间详情对话框,查看Flink全托管虚拟交换机的网段信息。
      网段信息
  2. 在消息队列Kafka的IP白名单中,添加Flink全托管网段信息。
    操作步骤详情请参见配置白名单Kafka白名单
  3. 在Hologres的IP白名单中,添加Flink全托管网段信息。
    操作步骤详情请参见IP白名单Holo白名单

步骤二:准备Kafka测试数据

使用Flink全托管的模拟数据生成源表作为数据生成器,将数据写入到Kafka中。请按以下步骤使用Flink全托管开发控制台将数据写入至消息队列Kafka。

  1. 在Kafka控制台创建一个名称为users的Topic。
    操作详情请参见步骤一:创建Topic
  2. 创建将数据写入到Kafka的作业。
    1. 登录实时计算管理控制台
    2. Flink全托管页签,单击目标工作空间操作列下的控制台
    3. 在左侧导航栏,单击作业开发
    4. 单击新建
    5. 新建文件对话框,填写作业配置信息。
      作业参数 示例 说明
      文件名称 kafka-data-input 作业的名称。
      说明 作业名称在当前项目中必须保持唯一。
      文件类型 流作业/SQL 数据同步作业仅支持流作业/SQL类型。
      部署目标 vvp-workload 选择作业需要部署的集群名称。Flink全托管支持Per-Job集群和Session集群两种集群模式。两种集群模式的区别说明,请参见配置开发测试环境(Session集群)
      存储位置 作业开发 指定该作业的代码文件所属的文件夹。默认存放在作业开发目录。

      您还可以在现有文件夹右侧,单击新建文件夹图标,新建子文件夹。

    6. 单击确认
    7. 将以下作业代码拷贝到作业文本编辑区。
      CREATE TEMPORARY TABLE source (
        id INT,
        first_name STRING,
        last_name STRING,
        `address` ROW<`country` STRING, `state` STRING, `city` STRING>,
        event_time TIMESTAMP
      ) WITH (
        'connector' = 'faker',
        'number-of-rows' = '100',
        'rows-per-second' = '10',
        'fields.id.expression' = '#{number.numberBetween ''0'',''1000''}',
        'fields.first_name.expression' = '#{name.firstName}',
        'fields.last_name.expression' = '#{name.lastName}',
        'fields.address.country.expression' = '#{Address.country}',
        'fields.address.state.expression' = '#{Address.state}',
        'fields.address.city.expression' = '#{Address.city}',
        'fields.event_time.expression' = '#{date.past ''15'',''SECONDS''}'
      );
      
      CREATE TEMPORARY TABLE sink (
        id INT,
        first_name STRING,
        last_name STRING,
        `address` ROW<`country` STRING, `state` STRING, `city` STRING>,
        `timestamp` TIMESTAMP METADATA
      ) WITH (
        'connector' = 'kafka',
        'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000',
        'topic' = 'users',
        'format' = 'json'
      );
      
      INSERT INTO sink SELECT * FROM source;
    8. 请按您的实际配置,修改以下参数配置信息。
      参数 示例值 说明
      properties.bootstrap.servers alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000 Kafka Broker地址。

      格式为host:port,host:port,host:port,以英文逗号(,)分割。

      topic users Kafka Topic名称。
  3. 启动作业。
    1. 作业开发页面,单击上线
    2. 作业运维页面,单击目标作业名称操作列中的启动
    3. 单击确认启动
      单击启动后,您可以看到作业从当前状态到期望状态的变化过程及最终结果。直到状态变为RUNNING,则代表作业运行正常。状态变化
      由于faker数据源是一个有限流,因此在作业处于运行状态后,大约1分钟左右后,作业就会处于完成状态。当作业结束运行代表作业已经将相关的数据写入到Kafka的users中。其中,写入到消息队列Kafka的JSON数据格式大致如下。
      {
        "id": 765,
        "first_name": "Barry",
        "last_name": "Pollich",
        "address": {
          "country": "United Arab Emirates",
          "state": "Nevada",
          "city": "Powlowskifurt"
        }
      }

步骤三:创建Hologres Catalog

单表同步都需要依赖目标Catalog来创建目标表。因此,您需要通过控制台创建目标Catalog。本文将以目标Catalog为Hologres Catalog为例,为您进行介绍。

  1. 创建名称为holo的Hologres Catalog。
    操作步骤详情请参见配置Hologres Catalogholo catalog
    注意 您需要在您的目标实例中已创建flink_test_db数据库,否则创建Catalog会报错。
  2. Schemas页签,确认已创建名为holo的Catalog。
    刷新按钮

步骤四:创建并启动数据同步作业

  1. 登录Flink全托管开发控制台,创建数据同步作业。
    1. 登录实时计算管理控制台
    2. Flink全托管页签,单击目标工作空间操作列下的控制台
    3. 在左侧导航栏,单击作业开发
    4. 单击新建
    5. 新建文件对话框,填写作业配置信息。
      作业参数 示例 说明
      文件名称 flink-quickstart-test 作业的名称。
      说明 作业名称在当前项目中必须保持唯一。
      文件类型 流作业/SQL 数据同步作业仅支持流作业/SQL类型。
      部署目标 vvp-workload 选择作业需要部署的集群名称。
      存储位置 作业开发 指定该作业的代码文件所属的文件夹。默认存放在作业开发目录。

      您还可以在现有文件夹右侧,单击新建文件夹图标,新建子文件夹。

    6. 单击确认
  2. 将以下作业代码拷贝到作业文本编辑区。
    将消息队列Kafka中名称为users的Topic数据同步至Hologres的flink_test_db数据库的sync_kafka_users表中。您可以通过以下任意一种方式进行:
    • 通过CATS语句同步
      该方式无需您手动在Hologres中创建该表并指名需要对应的列类型为JSON或JSONB。
      CREATE TEMPORARY TABLE kafka_users (
        `id` INT NOT NULL,
        `address` STRING,
        `offset` BIGINT NOT NULL METADATA,
        `partition` BIGINT NOT NULL METADATA,
        `timestamp` TIMESTAMP METADATA,
        `date` AS CAST(`timestamp` AS DATE),
        `country` AS JSON_VALUE(`address`, '$.country'),
        PRIMARY KEY (`partition`, `offset`) NOT ENFORCED
      ) WITH (
        'connector' = 'kafka',
        'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000',
        'topic' = 'users',
        'format' = 'json',
        'json.infer-schema.flatten-nested-columns.enable' = 'true', -- 自动展开嵌套列。
        'scan.startup.mode' = 'earliest-offset'
      );
      
      CREATE TABLE IF NOT EXISTS holo.flink_test_db.sync_kafka_users
      WITH (
        'connector' = 'hologres'
      ) AS TABLE kafka_users;
      说明 为了避免作业Failover后,作业重启将重复数据写入到Hologres中,您可以添加相关主键从而唯一地标识数据。当数据重发时,Hologres将会保证相同partition和offset的数据只会保留一份。
    • 通过INSERT INTO语句同步

      考虑到Hologres中对于JSON和JSONB类型的数据会进行特殊的优化,您也可以通过INSERT INTO语句将嵌套JSON写入到Hologres中。

      该方式需要您手动在Hologres中创建该表并指名需要对应的列类型为JSON或JSONB,然后通过下文的SQL,会将address数据写入到 Hologres中类型为JSON的列。
      CREATE TEMPORARY TABLE kafka_users (
        `id` INT NOT NULL,
        `address` STRING, -- 该列对应的数据为嵌套JSON。
        `offset` BIGINT NOT NULL METADATA,
        `partition` BIGINT NOT NULL METADATA,
        `timestamp` TIMESTAMP METADATA,
        `date` AS CAST(`timestamp` AS DATE),
        `country` AS JSON_VALUE(`address`, '$.country')
      ) WITH (
        'connector' = 'kafka',
        'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000',
        'topic' = 'users',
        'format' = 'json',
        'json.infer-schema.flatten-nested-columns.enable' = 'true', -- 自动展开嵌套列。
        'scan.startup.mode' = 'earliest-offset'
      );
      
      CREATE TEMPORARY TABLE holo (
        `id` INT NOT NULL,
        `address` STRING,
        `offset` BIGINT,
        `partition` BIGINT,
        `timestamp` TIMESTAMP,
        `date` DATE,
        `country` STRING
      ) WITH (
        'connector' = 'hologres',
        'endpoint' = 'hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80',
        'username' = 'LTAI5tE572UJ44Xwhx6i****',
        'password' = 'KtyIXK3HIDKA9VzKX4tpct9xTm****',
        'dbname' = 'flink_test_db',
        'tablename' = 'sync_kafka_users'
      );
      
      INSERT INTO holo
      SELECT * FROM kafka_users;
  3. 请按您的实际配置,修改以下参数配置信息。
    参数 示例值 说明
    properties.bootstrap.servers alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000 Kafka Broker地址。

    格式为host:port,host:port,host:port,以英文逗号(,)分割。

    topic users Kafka Topic名称。
    endpoint hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80 Hologres端点。

    格式为<ip>:<port>。

    username LTAI5tE572UJ44Xwhx6i**** Hologres用户名,请填写阿里云账号的AccessKey ID。
    password KtyIXK3HIDKA9VzKX4tpct9xTm**** Hologres密码,请填写阿里云账号的AccessKey Secret。
    dbname flink_test_db Hologres数据库名称。
    tablename sync_kafka_users Hologres表名称。
    说明
    • 如果您通过INSERT INTO方式同步数据,则需要提前在目标实例的数据库中创建sync_kafka_users表和字段。
    • 如果Schema不为Public时,则tablename需要填写为schema.tableName。
  4. 单击保存
  5. 作业开发页面,单击上线
  6. 作业运维页面,单击目标作业名称操作列中的启动
    单击确认启动后,您可以看到作业从当前状态到期望状态的变化过程及最终结果。直到状态变为RUNNING,则代表作业运行正常。可以在作业运维页面观察作业的运行信息和状态。状态变化

步骤五:观察全量同步结果

  1. 登录Hologres管理控制台
  2. 实例列表页面,单击目标实例名称。
  3. 在页面右上角,单击登录实例
  4. 元数据管理页签,查看users数据库中同步的sync_kafka_users表结构和数据。
    sync_kafka_users表
    同步后的表结构和数据如下图所示。
    • 表结构

      双击sync_kafka_users表名称,查看表结构。

      表结构
      说明 在同步过程中,建议声明Kafka的Metadata partition和offset作为Hologres表中的主键。这样可以避免由于作业Failover,数据重发导致下游存储多份相同数据。
    • 表数据
      在sync_kafka_users表信息页面右上角,单击查询表后,输入如下命令,单击运行
      SELECT * FROM public.sync_kafka_users order by partition, "offset";
      表数据结果如下图所示。表数据

步骤六:观察自动同步表结构变更

  1. 在Kafka控制台手动发送一条包含新增列的消息。
    1. 登录消息队列Kafka版控制台
    2. 实例列表页面,单击目标实例名称。
    3. Topic管理页面,单击目标Topic名称users。
    4. 单击体验发送消息
    5. 填写消息内容。
      消息内容
      配置项 示例
      发送方式 选中控制台
      消息Key 填写为flinktest。
      消息内容 将以下JSON内容复制粘贴到消息内容中。
      {
        "id": 100001,
        "first_name": "Dennise",
        "last_name": "Schuppe",
        "address": {
          "country": "Isle of Man",
          "state": "Montana",
          "city": "East Coleburgh"
        },
        "house-points": {
          "house": "Pukwudgie",
          "points": 76
        }
      }
      说明 该示例中house-points是一个新增的嵌套列。
      发送到指定分区 选中
      分区ID 填写为0。
    6. 单击确定
  2. 在Hologres控制台,查看sync_kafka_users表结构和数据的变化。
    1. 登录Hologres管理控制台
    2. 实例列表页面,单击目标实例名称。
    3. 在页面右上角,单击登录实例
    4. 元数据管理页签,双击sync_kafka_users表名称。
    5. 单击查询表后,输入如下命令,单击运行
      SELECT * FROM public.sync_kafka_users order by partition, "offset";
    6. 查看表数据结果。
      表数据结果如下图所示。Hologres表结果
      可以观察到id为100001的数据已经成功地写入到了Hologres中。同时,Hologres中多了house-points.house和house-points.points 两列。
      说明 虽然插入到Kafka中的数据仅只有一个嵌套列house-points,但是由于在kafka_users表的WITH参数内声明要求json.infer-schema.flatten-nested-columns.enable,那么Flink 就会自动展平新增的嵌套列,并用访问该列的路径作为展开后的列的名字。

(可选)步骤七:调整作业资源配置

根据数据量的不同,我们往往需要调节不同节点的并发和资源,以达到更优的作业性能。您可以使用资源配置的基础模式简单配置作业并发度和CU数,也可以使用资源配置的专家模式细粒度地调整节点的并发和资源。

  1. 登录Flink全托管开发控制台,进入作业详情页面。
    1. 登录实时计算管理控制台
    2. Flink全托管页签,单击目标工作空间操作列下的控制台
    3. 在左侧导航栏,单击作业运维
    4. 单击目标作业名称。
  2. 修改资源配置。
    1. 单击编辑
    2. 在页面右侧,单击资源配置页签。
    3. 资源模式选择为专家模式
    4. 单击立刻获取
    5. 单击展开全部
      观察完整的拓扑图,通过完整的拓扑图能了解到整个数据的同步计划,即具体同步哪些表。
    6. 手动设置每个节点的并发。
      由于Kafka users Topic有四个分区,因此可以设置作业为4并发。由于日志数据只是写入到Hologres一张表中,为了降低Hologres的连接数,可以调节Hologres的并发为2。资源配置步骤详情请参见配置细粒度资源。经过调节后的作业资源配置计划如下图所示。作业配置计划
    7. 单击保存配置计划
    8. 单击上线
    9. 运维页面目标作业名称的操作列,单击启动
  3. 作业总览页面,查看调整效果。
    调整效果

相关文档