Fully managed Flink allows you to ingest log data into data warehouses or data lakes in real time. This topic describes how to create a job that synchronizes data from a Message Queue for Apache Kafka instance to a Hologres instance in the console of fully managed Flink.

Background information

For example, a topic named users is created for a Message Queue for Apache Kafka instance, and 100 JSON data records exist in the topic. The following figure shows the data distribution. Data distribution

In this topic, the CREATE TABLE AS statement that is provided by fully managed Flink is used to synchronize log data with one click and synchronize table schema changes in real time.

Prerequisites

Step 1: Configure an IP address whitelist

To allow fully managed Flink to access the Message Queue for Apache Kafka instance and Hologres instance, you must add the CIDR block of the vSwitch to which the fully managed Flink instance belongs to the whitelists of the Message Queue for Apache Kafka and Hologres instances.

  1. Obtain the CIDR block of the vSwitch to which the fully managed Flink instance belongs.
    1. Log on to the Realtime Compute for Apache Flink console.
    2. On the Fully Managed Flink tab, find the workspace that you want to manage, and choose More > Workspace Details in the Actions column.
    3. In the Workspace Details dialog box, view the CIDR block of the vSwitch to which the fully managed Flink instance belongs.
      CIDR Block
  2. Add the CIDR block of the vSwitch to which the fully managed Flink instance belongs to the IP address whitelist of the Message Queue for Apache Kafka instance.
    For more information, see Configure the whitelist. IP address whitelist of the Message Queue for Apache Kafka instance
  3. Add the CIDR block of the vSwitch to which the fully managed Flink instance belongs to the IP address whitelist of the Hologres instance.
    For more information, see Configure IP address whitelists. IP address whitelist of the Hologres instance

Step 2: Prepare test data of the Message Queue for Apache Kafka instance

Use a Faker source table of fully managed Flink as a data generator and write the data to the Message Queue for Apache Kafka instance. You can perform the following steps to write data to a Message Queue for Apache Kafka instance in the console of fully managed Flink.

  1. Create a topic named users in the Message Queue for Apache Kafka console.
    For more information, see Step 1: Create a topic.
  2. Create a job that writes data to a specified Message Queue for Apache Kafka instance.
    1. Log on to the Realtime Compute for Apache Flink console.
    2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.
    3. In the left-side navigation pane, click Draft Editor.
    4. Click New.
    5. In the New Draft dialog box, configure the parameters of the job. The following table describes the parameters.
      Parameter Example value Description
      Name kafka-data-input The name of the job.
      Note The job name must be unique in the current project.
      Type STREAM / SQL A data synchronization job can only be of the STREAM / SQL type.
      Deployment Target vvp-workload The name of the cluster in which you want to deploy the job. Fully managed Flink supports two cluster types: per-job cluster and session cluster. For more information about the differences between the two types of clusters, see Configure a development and test environment (session cluster).
      Location Development The folder in which the code file of the job is saved. By default, the code file of the job is stored in the Development folder.

      You can click the New Folder icon to the right of an existing folder to create a subfolder.

    6. Click OK.
    7. Copy the following job code to the code editor.
      CREATE TEMPORARY TABLE source (
        id INT,
        first_name STRING,
        last_name STRING,
        `address` ROW<`country` STRING, `state` STRING, `city` STRING>,
        event_time TIMESTAMP
      ) WITH (
        'connector' = 'faker',
        'number-of-rows' = '100',
        'rows-per-second' = '10',
        'fields.id.expression' = '#{number.numberBetween ''0'',''1000''}',
        'fields.first_name.expression' = '#{name.firstName}',
        'fields.last_name.expression' = '#{name.lastName}',
        'fields.address.country.expression' = '#{Address.country}',
        'fields.address.state.expression' = '#{Address.state}',
        'fields.address.city.expression' = '#{Address.city}',
        'fields.event_time.expression' = '#{date.past ''15'',''SECONDS''}'
      );
      
      CREATE TEMPORARY TABLE sink (
        id INT,
        first_name STRING,
        last_name STRING,
        `address` ROW<`country` STRING, `state` STRING, `city` STRING>,
        `timestamp` TIMESTAMP METADATA
      ) WITH (
        'connector' = 'kafka',
        'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000',
        'topic' = 'users',
        'format' = 'json'
      );
      
      INSERT INTO sink SELECT * FROM source;
    8. Modify the following parameter configurations based on your business requirements.
      Parameter Example value Description
      properties.bootstrap.servers alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000 The IP addresses or endpoints of Kafka brokers.

      Format: host:port,host:port,host:port. Separate multiple host:port pairs with commas (,).

      topic users The name of the Kafka topic.
  3. Start the job.
    1. In the upper-right corner of the Draft Editor page, click Publish.
    2. On the Deployments page in the console of fully managed Flink, find the job that you want to start and click Start in the Actions column.
    3. In the Deployment Starting Configuration dialog box, click Confirm Running.
      After you click Confirm Running, you can view the transition process from a current state to a desired state and the final result. When the state changes to RUNNING, the job is running properly. Status transition
      The Faker data source provides bounded streams. Therefore, the job becomes complete about one minute after the job remains in the RUNNING state. When the job is complete, data in the job is written to the users topic of the Message Queue for Apache Kafka instance. The following sample code shows the format of the JSON data that is written to the Message Queue for Apache Kafka instance.
      {
        "id": 765,
        "first_name": "Barry",
        "last_name": "Pollich",
        "address": {
          "country": "United Arab Emirates",
          "state": "Nevada",
          "city": "Powlowskifurt"
        }
      }

Step 3: Create a Hologres catalog

If you want to perform single-table synchronization, you must create a destination table in a destination catalog. You can create a destination catalog in the console of fully managed Flink. In this topic, a Hologres catalog is used as the destination catalog. This section describes how to create a Hologres catalog.

  1. Create a Hologres catalog named holo.
    For more information, see Create a Hologres catalog. holo catalog
    Notice You must make sure that a database named flink_test_db is created in the instance to which you want to synchronize data. Otherwise, an error is returned when you create a catalog.
  2. On the Schemas tab, verify that the catalog named holo is created.
    Refresh icon

Step 4: Create and start a data synchronization job

  1. Log on to the console of fully managed Flink and create a data synchronization job.
    1. Log on to the Realtime Compute for Apache Flink console.
    2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.
    3. In the left-side navigation pane, click Draft Editor.
    4. Click New.
    5. In the New Draft dialog box, configure the parameters of the job. The following table describes the parameters.
      Parameter Example value Description
      Name flink-quickstart-test The name of the job.
      Note The job name must be unique in the current project.
      Type STREAM / SQL A data synchronization job can only be of the STREAM / SQL type.
      Deployment Target vvp-workload The name of the cluster in which you want to deploy the job.
      Location Development The folder in which the code file of the job is saved. By default, the code file of the job is stored in the Development folder.

      You can click the New Folder icon to the right of an existing folder to create a subfolder.

    6. Click OK.
  2. Copy the following job code to the code editor.
    You can use one of the following methods to synchronize data of the users topic from the Message Queue for Apache Kafka instance to the sync_kafka_users table of the flink_test_db database in Hologres:
    • Use the CREATE TABLE AS statement
      If you execute the CREATE TABLE AS statement to synchronize data, you do not need to manually create the table in Hologres or configure the types of the columns to which data is written as JSON or JSONB.
      CREATE TEMPORARY TABLE kafka_users (
        `id` INT NOT NULL,
        `address` STRING,
        `offset` BIGINT NOT NULL METADATA,
        `partition` BIGINT NOT NULL METADATA,
        `timestamp` TIMESTAMP METADATA,
        `date` AS CAST(`timestamp` AS DATE),
        `country` AS JSON_VALUE(`address`, '$.country'),
        PRIMARY KEY (`partition`, `offset`) NOT ENFORCED
      ) WITH (
        'connector' = 'kafka',
        'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000',
        'topic' = 'users',
        'format' = 'json',
        'json.infer-schema.flatten-nested-columns.enable' = 'true', -- Automatically expand nested columns. 
        'scan.startup.mode' = 'earliest-offset'
      );
      
      CREATE TABLE IF NOT EXISTS holo.flink_test_db.sync_kafka_users
      WITH (
        'connector' = 'hologres'
      ) AS TABLE kafka_users;
      Note To prevent duplicate data from being written to Hologres after a job failover, you can add the related primary key to the table to uniquely identify data. If data is retransmitted, Hologres ensures that only one copy of data that has the same values of the partition and offset fields is retained.
    • Use the INSERT INTO statement

      A special method is used to optimize JSON and JSONB data in Hologres. Therefore, you can use the INSERT INTO statement to write nested JSON data to Hologres.

      If you use the INSERT INTO statement to synchronize data, you must manually create a table in Hologres and configure the types of the columns to which data is written as JSON or JSONB. Then, you can execute the INSERT INTO statement to write the address data to the column of the JSON type in Hologres.
      CREATE TEMPORARY TABLE kafka_users (
        `id` INT NOT NULL,
        'address' STRING, -- The data in this column is nested JSON data. 
        `offset` BIGINT NOT NULL METADATA,
        `partition` BIGINT NOT NULL METADATA,
        `timestamp` TIMESTAMP METADATA,
        `date` AS CAST(`timestamp` AS DATE),
        `country` AS JSON_VALUE(`address`, '$.country')
      ) WITH (
        'connector' = 'kafka',
        'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000',
        'topic' = 'users',
        'format' = 'json',
        'json.infer-schema.flatten-nested-columns.enable' = 'true', -- Automatically expand nested columns. 
        'scan.startup.mode' = 'earliest-offset'
      );
      
      CREATE TEMPORARY TABLE holo (
        `id` INT NOT NULL,
        `address` STRING,
        `offset` BIGINT,
        `partition` BIGINT,
        `timestamp` TIMESTAMP,
        `date` DATE,
        `country` STRING
      ) WITH (
        'connector' = 'hologres',
        'endpoint' = 'hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80',
        'username' = 'LTAI5tE572UJ44Xwhx6i****',
        'password' = 'KtyIXK3HIDKA9VzKX4tpct9xTm****',
        'dbname' = 'flink_test_db',
        'tablename' = 'sync_kafka_users'
      );
      
      INSERT INTO holo
      SELECT * FROM kafka_users;
  3. Modify the following parameter configurations based on your business requirements.
    Parameter Example value Description
    properties.bootstrap.servers alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000 The IP addresses or endpoints of Kafka brokers.

    Format: host:port,host:port,host:port. Separate multiple host:port pairs with commas (,).

    topic users The name of the Kafka topic.
    endpoint hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80 The endpoint of the Hologres instance.

    Format: <ip>:<port>.

    username LTAI5tE572UJ44Xwhx6i**** The username that is used to access the Hologres database. You must enter the AccessKey ID of your Alibaba Cloud account.
    password KtyIXK3HIDKA9VzKX4tpct9xTm**** The password that is used to access the Hologres database. You must enter the AccessKey secret of your Alibaba Cloud account.
    dbname flink_test_db The name of the Hologres database that you want to access.
    tablename sync_kafka_users The name of the Hologres table.
    Note
    • If you use the INSERT INTO statement to synchronize data, you must create the sync_kafka_users table and define required fields in the database of the destination instance.
    • If the public schema is not used, you must set tablename to schema.tableName.
  4. Click Save.
  5. In the upper-right corner of the Draft Editor page, click Publish.
  6. On the Deployments page in the console of fully managed Flink, find the job that you want to start and click Start in the Actions column.
    In the Deployment Starting Configuration dialog box, click Confirm Running. Then, you can view the transition process from a current state to a desired state and the final result. When the state changes to RUNNING, the job is running properly. You can view the status and information about the job on the Deployments page. Status transition

Step 5: View the result of full data synchronization

  1. Log on to the Hologres console.
  2. On the Instances page, click the name of the instance.
  3. In the upper-right corner of the page, click Connect to Instance.
  4. On the Metadata Management tab, view the schema and data of the sync_kafka_users table to which data is synchronized in the users database.
    Table sync_kafka_users
    The following figures show the schema and data of the sync_kafka_users table after full data synchronization.
    • Table schema

      Double-click the name of the sync_kafka_users table to view the table schema.

      Table schema
      Note During data synchronization, we recommend that you declare the partition and offset fields of Kafka as the primary key for the Hologres table. This way, if data is retransmitted due to a job failover, only one copy of the data that has the same values of the partition and offset fields is stored.
    • Table data
      In the upper-right corner of the page for the sync_kafka_users table, click Query table. In the SQL editor, enter the following statement and click Run:
      SELECT * FROM public.sync_kafka_users order by partition, "offset";
      The following figure shows the data of the sync_kafka_users table. Table data

Step 6: Check whether table schema changes are automatically synchronized

  1. Manually send a message that contains a new column in the Message Queue for Apache Kafka console.
    1. Log on to the Message Queue for Apache Kafka console.
    2. On the Instances page, click the name of the instance for data synchronization.
    3. In the left-side navigation pane of the page that appears, click Topics. On the right side of the page, find the topic named users.
    4. Choose More > Quick Start in the Actions column.
    5. In the Start to Send and Consume Message panel, configure the parameters and enter the content of the test message.
      Message Content
      Parameter Example
      Method of Sending Select Console.
      Message Key Enter flinktest.
      Message Content Copy and paste the following JSON content to the Message Content field.
      {
        "id": 100001,
        "first_name": "Dennise",
        "last_name": "Schuppe",
        "address": {
          "country": "Isle of Man",
          "state": "Montana",
          "city": "East Coleburgh"
        },
        "house-points": {
          "house": "Pukwudgie",
          "points": 76
        }
      }
      Note In this example, house-points is a new nested column.
      Send to Specified Partition Select Yes.
      Partition ID Enter 0.
    6. Click OK.
  2. In the Hologres console, view the changes in the schema and data of the sync_kafka_users table.
    1. Log on to the Hologres console.
    2. On the Instances page, click the name of the instance for data synchronization.
    3. In the upper-right corner of the page, click Connect to Instance.
    4. On the Metadata Management tab, double-click the name of the sync_kafka_users table.
    5. In the upper-right corner of the page for the sync_kafka_users table, click Query table. In the SQL editor, enter the following statement and click Run:
      SELECT * FROM public.sync_kafka_users order by partition, "offset";
    6. View the data of the table.
      The following figure shows the data of the sync_kafka_users table. Hologres table data
      The figure shows that the data record whose id is 100001 is written to Hologres. In addition, the house-points.house and house-points.points columns are added to Hologres.
      Note Only data in the nested column house-points is included in the data that is inserted into the table of Message Queue for Apache Kafka. However, json.infer-schema.flatten-nested-columns.enable is declared in the parameters of the WITH clause for the kafka_users table. In this case, fully managed Flink automatically expands the new nested column. After the column is expanded, the path to access the column is used as the name of the column.

(Optional) Step 7: Adjust job resource configurations

To ensure optimal job performance, we recommend that you adjust the parallelism of jobs and resource configurations of different nodes based on the amount of data that needs to be processed. To adjust the parallelism of jobs and the number of CUs in a simple manner, use the basic resource configuration mode. To adjust the parallelism of jobs and resource configurations of nodes in a more fine-grained manner, use the expert resource configuration mode.

  1. Log on to the console of fully managed Flink and go to the job details page.
    1. Log on to the Realtime Compute for Apache Flink console.
    2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.
    3. In the left-side navigation pane, click Deployments.
    4. Click the name of the desired job.
  2. Modify resource configurations.
    1. Click Configure.
    2. On the right side of the Draft Editor page, click the Resources tab.
    3. In the Resource Configuration panel, set Configuration Mode to Expert.
    4. Click Get Plan Now in the Resource Plan section.
    5. Click Expand All.
      View the complete topology to learn the data synchronization plan of the job. The plan shows the tables that need to be synchronized.
    6. Manually configure the PARALLELISM parameter for each node.
      The table in the users topic of Message Queue for Apache Kafka has four partitions. Therefore, you can set the PARALLELISM parameter for Message Queue for Apache Kafka to 4. Log data is written to only one Hologres table. To reduce the number of connections to Hologres, you can set the PARALLELISM parameter for Hologres to 2. For more information about resource configurations, see Configure resources in expert mode. The following figure shows the resource configuration plan of the job after the parallelism is adjusted. Job configuration plan
    7. Click Save Plan.
    8. Click Publish.
    9. On the Deployments page, find the job whose resource configurations are modified and click Start in the Actions column.
  3. Click the name of the desired job. On the Overview tab, view the effect after the adjustment.
    Effect after the adjustment

References