This topic describes how to use Realtime Compute for Apache Flink or Blink to consume Hologres binary log data in real time.

Usage notes

Take note of the following items before you consume Hologres binary log data:
  • Only Hologres V0.9 and later allow you to consume binary log data. If the version of your Hologres instance is earlier than V0.9, submit a ticket or join the official DingTalk group of Hologres for technical support. For more information about how to join the DingTalk group, see Obtain online support for Hologres.
  • Hologres allows you to consume binary log data at the table level. Both row-oriented tables and column-oriented tables are supported. If the version of your Hologres instance is V1.1 or later, you can also consume the binary log data from tables that store data in hybrid row-column storage mode.
  • For information about the compatibility of Hologres binary log data and how to enable and configure binary logging, see Subscribe to Hologres binary logs.
  • We recommend that you do not configure a whitelist for a Hologres instance when binary logging is enabled. If you configure a whitelist for a Hologres instance, the binary log data of the instance cannot be consumed.
  • Binary log data of the ARRAY type cannot be consumed. Only the binary log data of the following types can be consumed: INTEGER, BIGINT, TEXT, REAL, DOUBLE PRECISION, BOOLEAN, NUMERIC(38,8), DATE, and TIMESTAMPTZ. If you specify a table that contains fields of SMALLINT or other unsupported data types for a job, the system may fail to run the job even if these fields are not specified for consumption.

Use Realtime Compute for Apache Flink to consume binary log data in real time

Realtime Compute for Apache Flink allows you to use Hologres connectors to consume binary log data in real time. To consume binary log data, perform the following steps:

  1. Execute a DDL statement to create a source table.
    • Syntax
      • Source table DDL in non-change data capture (CDC) mode
        In this mode, the binary log data consumed by a source table is transferred to a downstream node as regular Flink data. The change type of all data is INSERT. You can process data of a type that is specified by hg_binlog_event_type based on your business requirements. The following sample code provides an example of the CREATE TABLE statement:
        create table test_message_src_binlog_table(
          hg_binlog_lsn BIGINT,
          hg_binlog_event_type BIGINT,
          hg_binlog_timestamp_us BIGINT,
          id INTEGER,
          title VARCHAR,
          body VARCHAR
        ) with (
          'connector'='hologres',
          'dbname'='<yourDbname>',
          'tablename'='<yourTablename>',
          'username'='<yourAccessID>',
          'password'='<yourAccessSecret>',
          'endpoint'='<yourEndpoint>',
          'binlog' = 'true',
          'binlogMaxRetryTimes' = '10',
          'binlogRetryIntervalMs' = '500',
          'binlogBatchReadSize' = '100'
        );
      • Source table DDL in CDC mode
        In this mode, each row of the binary log data consumed by a source table is automatically assigned an accurate Flink RowKind type, such as INSERT, DELETE, UPDATE_BEFORE, or UPDATE_AFTER, based on the type that is specified by hg_binlog_event_type. This way, the binary log data can be mirrored to the source table. This is similar to the CDC feature in MySQL and PostgreSQL.
        Note You cannot define watermarks for a CDC Hologres source table that is created for binary log data. If you want to perform window aggregation on such a source table, you can use a different method to perform aggregation. For more information, see How do I perform window aggregation if watermarks are not supported?.
        After binary logging is enabled for a Hologres table, you can execute the following DDL statement to consume binary log data in the CDC source table in Realtime Compute for Apache Flink.
        create table test_message_src_binlog_table(
          id INTEGER,
          title VARCHAR,
          body VARCHAR
        ) with (
          'connector'='hologres',
          'dbname'='<yourDbname>',
          'tablename'='<yourTablename>',
          'username'='<yourAccessID>',
          'password'='<yourAccessSecret>',
          'endpoint'='<yourEndpoint>',
          'binlog' = 'true',
          'cdcMode' = 'true',
          'binlogMaxRetryTimes' = '10',
          'binlogRetryIntervalMs' = '500',
          'binlogBatchReadSize' = '100'
        );
      • Consumption of full and incremental data by a source table

        Ververica Runtime (VVR) V1.13-vvr-4.0.13 and Hologres instances of V0.10 and later supports the consumption of full and incremental data by a source table in CDC mode. When the binary log data is consumed in this method, the full data in the database is first read and then the incremental binary log data is read. For more information, see Create a Hologres source table.

    • Parameters

      In the sample syntax, the three fields that are prefixed with hg_binlog_ are system fields. You cannot change the names or data types of these fields. The remaining fields are user-defined fields. The names of the user-defined fields must be in lowercase. The following table describes the user-defined parameters.

      Parameter Required Description
      connector Yes The type of the source table. Set the value to hologres.
      dbname Yes The name of the Hologres database from which you want to read data.
      tablename Yes The name of the Hologres table from which you want to read data.
      username Yes The AccessKey ID of your Alibaba Cloud account. You can obtain the AccessKey ID from the Security Management page.
      password Yes The AccessKey secret of your Alibaba Cloud account. You can obtain the AccessKey secret from the Security Management page.
      endpoint Yes The virtual private cloud (VPC) endpoint of your Hologres instance. You can obtain the VPC endpoint and port number from the Hologres console.
      binlog Yes Specifies whether to use the current table as a source for binary log data. To use the table as a source for binary log data, set the binlog parameter to true.
      cdcmode No Specifies whether to read binary log data in CDC mode. To read binary log data in CDC mode, set the cdcmode parameter to true.
      binlogMaxRetryTimes No The maximum number of retries that can be performed after an error occurs in the process of reading binary log data. Default value: 60.
      binlogRetryIntervalMs No The interval between two consecutive retries after an error occurs in the process of reading binary log data. Unit: milliseconds. Default value: 2000.
      binlogBatchReadSize No The number of binary log entries that you want to read at a time. Default value: 16.
      startTime No The start offset for binary log consumption. If this parameter is not specified and the job is not resumed from the latest state, the consumption starts from the earliest binary log entry. Specify the value in the format of yyyy-MM-dd hh:mm:ss.
      Note If you specify this parameter, the binlogStartupMode parameter is automatically set to timestamp.
      binlogStartupMode No The startup mode of binary log consumption. We recommend that you specify this parameter in CDC mode. Valid values:
      • timestamp: The consumption starts from the offset specified by the startTime parameter.
      • initial: The consumption starts from the full data in the database to the incremental binary log data.
      • earliestOffset: The consumption starts from the earliest binary log entry. This is the default value.
  2. Set a concurrency value for binary log consumption.
    The concurrency value of binary log consumption equals the number of shards in the Hologres table. You can execute the following statement to query the number of shards. Replace <tablename> in the statement with the name of your table. We recommend that you set the concurrency value to the number of shards in the relevant Hologres table.
    select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>';
  3. Monitor consumption latency.
    Ververica Platform (VVP) 4.0.12 and later supports the EventTimeLag metric. You can use the metric to monitor the latency of binary log consumption. A value of 0 indicates no latency.

Use Blink to consume binary log data in real time

Blink V3.7 and later allow you to use Holo-blink connectors to consume binary log data in real time. To consume binary log data, perform the following steps:

  1. Execute a DDL statement to create a source table.
    • Syntax
      create table test_message_src_binlog_table(
        hg_binlog_lsn BIGINT,
        hg_binlog_event_type BIGINT,
        hg_binlog_timestamp_us BIGINT,
        id INTEGER,
        title VARCHAR,
        body VARCHAR
      ) with (
        type = 'hologres',
        'endpoint' = 'ip:port', -- The VPC endpoint of your Hologres instance.
        'username' = 'xxxx', -- The AccessKey ID of your Alibaba Cloud account.
        'password' = 'xxxx', -- The AccessKey secret of your Alibaba Cloud account.
        'dbname' = 'xxxx', -- The name of the Hologres database from which you want to read data.
        'tablename' = 'xxxx', -- The name of the Hologres table from which you want to read data.
        'binlog' = 'true',
        'binlogMaxRetryTimes' = '10',
        'binlogRetryIntervalMs' = '500',
        'binlogBatchReadSize' = '256'
      );
    • Parameters

      In the sample syntax, the three fields that are prefixed with hg_binlog_ are system fields. You cannot change the names or data types of these fields. The remaining fields are user-defined fields. The names of the user-defined fields must be in lowercase. The following table describes the user-defined parameters.

      Parameter Required Description
      type Yes The type of the source table. Set the value to hologres.
      dbname Yes The name of the Hologres database from which you want to read data.
      tablename Yes The name of the Hologres table from which you want to read data.
      username Yes The AccessKey ID of your Alibaba Cloud account. You can obtain the AccessKey ID from the Security Management page.
      password Yes The AccessKey secret of your Alibaba Cloud account. You can obtain the AccessKey secret from the Security Management page.
      endpoint Yes The VPC endpoint of your Hologres instance. You can obtain the VPC endpoint and port number from the Hologres console.
      binlog Yes Specifies whether to use the current table as a source for binary log data. To use the table as a source for binary log data, set this parameter to true.
      binlogMaxRetryTimes No The maximum number of retries that can be performed after an error occurs in the process of reading binary log data. Default value: 60.
      binlogRetryIntervalMs No The interval between two consecutive retries after an error occurs in the process of reading binary log data. Unit: milliseconds. Default value: 2000.
      binlogBatchReadSize No The number of binary log entries that you want to read at a time. Default value: 16.
      startTime No The start offset for binary log consumption. If this parameter is not specified and the job is not resumed from the latest state, the consumption starts from the earliest binary log entry. Specify the value in the format of yyyy-MM-dd hh:mm:ss.
  2. Set a concurrency value for binary log consumption.
    The concurrency value of binary log consumption equals the number of shards in the Hologres table. You can execute the following statement to query the number of shards. Replace <tablename> in the statement with the name of your table. We recommend that you set the concurrency value to the number of shards in the relevant Hologres table.
    select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>';