This topic describes how to use Apache Flink or Blink in exclusive mode, which is a service type in the original product line of Realtime Compute for Apache Flink, to consume Hologres binary logs in real time.

Precautions

Take note of the following items before you consume Hologres binary logs:
  • Only Hologres V0.9 and later support binary log consumption. If the version of your Hologres instance is earlier than V0.9, submit a ticket or join the official DingTalk group of Hologres for technical support. For more information about how to join the DingTalk group, see How do I obtain more online support for Hologres?.
  • Hologres allows you to consume binary logs at the table level. Both row-oriented tables and column-oriented tables are supported. If the version of your Hologres instance is V1.1 or later, you can also consume the binary logs of a table whose data is stored in hybrid row-column storage mode.
  • For information about the compatibility of Hologres binary logs and how to enable and configure binary logging, see Subscribe to Hologres binlogs.

Real-time consumption of binary logs by using Apache Flink

Flink VVP 2.4 and later allow you to consume binary logs in real time by using Hologres connectors. To consume binary logs, perform the following steps:

  1. Use a DDL statement to create a source table.
    • Syntax
      • Source table DDL (non-CDC mode)
        In this mode, the binary logs consumed by a source table are transmitted to a descendant node as regular Flink data. That is, the change type of all the data is INSERT. The following sample code shows you how to process data of a type specified by hg_binlog_event_type based on your business needs:
        create table test_message_src_binlog_table(
          hg_binlog_lsn BIGINT,
          hg_binlog_event_type BIGINT,
          hg_binlog_timestamp_us BIGINT,
          id INTEGER,
          title VARCHAR,
          body VARCHAR
        ) with (
          'connector'='hologres',
          'dbname'='<yourDbname>',
          'tablename'='<yourTablename>',
          'username'='<yourAccessID>',
          'password'='<yourAccessSecret>',
          'endpoint'='<yourEndpoint>',
          'binlog' = 'true',
          'binlogMaxRetryTimes' = '10',
          'binlogRetryIntervalMs' = '500',
          'binlogBatchReadSize' = '100'
        );
      • Source table DDL (CDC mode)
        In this mode, each row of the binary log data consumed by a source table is automatically assigned an accurate Flink RowKind type, such as INSERT, DELETE, UPDATE_BEFORE, and UPDATE_AFTER based on hg_binlog_event_type. This way, the binary log data can be mapped to the source table. This is similar to the Change Data Capture (CDC) feature in MySQL and PostgreSQL. To create a source table that consumes binary logs in CDC mode, execute the following statement:
        create table test_message_src_binlog_table(
          id INTEGER,
          title VARCHAR,
          body VARCHAR
        ) with (
          'connector'='hologres',
          'dbname'='<yourDbname>',
          'tablename'='<yourTablename>',
          'username'='<yourAccessID>',
          'password'='<yourAccessSecret>',
          'endpoint'='<yourEndpoint>',
          'binlog' = 'true',
          'cdcMode' = 'true'
          'binlogMaxRetryTimes' = '10',
          'binlogRetryIntervalMs' = '500',
          'binlogBatchReadSize' = '100'
        );
    • Parameters

      In the sample syntax, the three fields with the prefix hg_binlog_ are system fields. You cannot modify their names or change their data types. Some of the remaining fields are user-defined fields. The names of the user-defined fields must be in lower case. The following table describes more parameters.

      Parameter Required Description
      connector Yes The type of the source table. Set the value to hologres.
      dbname Yes The name of the Hologres database that you want to read.
      tablename Yes The name of the Hologres table that you want to read.
      username Yes The AccessKey ID of your Alibaba Cloud account. You can obtain the AccessKey ID from the Security Management page.
      password Yes The AccessKey secret of your Alibaba Cloud account. You can obtain the AccessKey secret from the Security Management page.
      endpoint Yes The Virtual Private Cloud (VPC) endpoint of your Hologres instance. You can obtain the VPC endpoint and port number from the Hologres console.
      binlog Yes Specifies whether the current table serves as a source for binary logs. To use the table as a source for binary logs, set the binlog parameter to true.
      cdcmode No Specifies whether to read binary logs in CDC mode. To read binary logs in CDC mode, set the cdcmode parameter to true.
      binlogMaxRetryTimes No The maximum number of retries allowed after an error occurs in the process of retrieving binary logs. Default value: 60.
      binlogRetryIntervalMs No The interval between two consecutive retries after an error occurs in the process of retrieving binary logs. Unit: milliseconds. Default value: 2000.
      binlogBatchReadSize No The number of binary logs that are retrieved at a time. Default value: 16.
      startTime No The start time when Hologres binary logs are consumed. If this parameter is not specified and jobs are not resumed from the state, the consumption starts from the earliest generated binary log. The value must be in the format of yyyy-MM-dd hh:mm:ss.
  2. Set a concurrency for binary log consumption.
    The concurrency of binary log consumption equals the number of shards in the Hologres table. You can execute a statement to query the number of shards, as shown in the following statement. Replace <tablename> in the statement with the name of your table. We recommend that you set the concurrency to the number of shards in the relevant Hologres table.
    select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>';

Real-time consumption of binary logs by using Blink in exclusive mode

Blink V3.7 and later allow Holo-blink connectors to consume binary logs in real time. To consume binary logs, perform the following steps:

  1. Use a DDL statement to create a source table.
    • Syntax
      create table test_message_src_binlog_table(
        hg_binlog_lsn BIGINT,
        hg_binlog_event_type BIGINT,
        hg_binlog_timestamp_us BIGINT,
        id INTEGER,
        title VARCHAR,
        body VARCHAR
      ) with (
        type = 'hologres',
        'endpoint' = 'ip:port', -- The VPC endpoint of your Hologres instance.
        'username' = 'xxxx', -- The AccessKey ID of your Alibaba Cloud account.
        'password' = 'xxxx', -- The AccessKey secret of your Alibaba Cloud account.
        'dbname' = 'xxxx', -- The name of the Hologres database that you want to read.
        'tablename' = 'xxxx', -- The name of the Hologres table that you want to read.
        'binlog' = 'true',
        'binlogMaxRetryTimes' = '10',
        'binlogRetryIntervalMs' = '500',
        'binlogBatchReadSize' = '256'
      );
    • Parameters

      In the sample syntax, the three fields with the prefix hg_binlog_ are system fields. You cannot modify their names or change their data types. Some of the remaining fields are user-defined fields. The names of the user-defined fields must be in lower case. The following table describes more parameters.

      Parameter Required Description
      type Yes The type of the source table. Set the value to hologres.
      dbname Yes The name of the Hologres database that you want to read.
      tablename Yes The name of the Hologres table that you want to read.
      username Yes The AccessKey ID of your Alibaba Cloud account. You can obtain the AccessKey ID from the Security Management page.
      password Yes The AccessKey secret of your Alibaba Cloud account. You can obtain the AccessKey secret from the Security Management page.
      endpoint Yes The VPC endpoint of your Hologres instance. You can obtain the VPC endpoint and port number from the Hologres console.
      binlog Yes Specifies whether the current table serves as a source for binary logs. To use the table as a source for binary logs, set this parameter to true.
      binlogMaxRetryTimes No The maximum number of retries allowed after an error occurs in the process of retrieving binary logs. Default value: 60.
      binlogRetryIntervalMs No The interval between two consecutive retries after an error occurs in the process of retrieving binary logs. Unit: milliseconds. Default value: 2000.
      binlogBatchReadSize No The number of binary logs that are retrieved at a time. Default value: 16.
      startTime No The start time when Hologres binary logs are consumed. If this parameter is not specified and jobs are not resumed from the state, the consumption starts from the earliest generated binary log. The value must be in the format of yyyy-MM-dd hh:mm:ss.
  2. Set a concurrency for binary log consumption.
    The concurrency of binary log consumption equals the number of shards in the Hologres table. You can execute a statement to query the number of shards, as shown in the following statement. Replace <tablename> in the statement with the name of your table. We recommend that you set the concurrency to the number of shards in the relevant Hologres table.
    select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>';