All Products
Search
Document Center

Realtime Compute for Apache Flink:Use the Hologres connector of Realtime Compute for Apache Flink to consume data of Hologres in real time

Last Updated:Feb 01, 2024

The Hologres connector of Realtime Compute for Apache Flink can be used to consume binary log data of Hologres in real time. This topic describes how to use the Hologres connector of Realtime Compute for Apache Flink to consume data of Hologres in real time.

Limits

  • In Hologres whose version is V0.10 or earlier, you cannot enable the binary logging feature by modifying the table properties of an existing table. To enable the binary logging feature for a table, you must enable the feature when you create the table. In Hologres V1.1 and later, you can enable or disable the binary logging feature based on your business requirements. You can also specify the time to live (TTL) for retaining binary log data based on your business requirements. For more information, see Subscribe to Hologres binary logs.

  • Partitioned tables do not support the binary logging feature. If you want to enable this feature, you must use a non-partitioned table.

  • Realtime Compute for Apache Flink cannot consume data of the TIMESTAMP type in real time. Therefore, when you create a Hologres source table, make sure that the date and time data in the table is of the TIMESTAMPTZ type.

  • After you enable the binary logging feature, column-oriented tables have larger overheads than row-oriented tables in theory. Therefore, we recommend that you enable the binary logging feature for row-oriented tables if data is frequently updated.

  • By default, binary log source tables do not support the ARRAY data type. The binary log source tables support only the INTEGER, BIGINT, TEXT, REAL, DOUBLE PRECISION, BOOLEAN, NUMERIC(38,8), and TIMESTAMPTZ data types.

    Note

    If a field in a table that is used in a deployment is of the data type that cannot be consumed, such as SMALLINT, the deployment cannot be published even if the data of the field is not consumed.

  • Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0.3 or later supports binary log source tables in Java Database Connectivity (JDBC) mode. By default, the Hologres connector of Realtime Compute for Apache Flink that uses VVR 6.0.7 or later consumes Hologres binary log source tables in JDBC mode. Compared with binary log source tables in HoloHub mode, binary log source tables in JDBC mode support more data types, such as SMALLINT and ARRAY, and also support custom users. Custom users are not RAM users. For more information, see Binary log source table in JDBC mode.

    Hologres V2.0 and later no longer support the HoloHub mode and support only the JDBC mode. If your Realtime Compute for Apache Flink service uses VVR of a version that is earlier than 6.0.7, you must set the sdkMode parameter to jdbc or upgrade the VVR version.

  • In Hologres V1.3.41 and later, a binary log source table in JDBC mode can read data of the JSONB data type from upstream data stores. To enable this feature, you must configure a GUC-related parameter of Hologres for a database by executing the following statement:

    -- Configure the GUC-related parameter hg_experimental_enable_binlog_jsonb for a database. Only the superuser can perform this operation. You need to configure this parameter only once for each database. 
    alter database <db_name> set hg_experimental_enable_binlog_jsonb = on;
  • The Hologres connector of Realtime Compute for Apache Flink that uses VVR 8.0.4 or later forcefully consumes the binary log source table of a Hologres instance whose version is later than V2.0 in JDBC mode. We recommend that you upgrade the version of the Hologres instance to V2.1. This way, the system automatically changes the SDK mode from HoloHub to JDBC. If the version of the Hologres instance is V2.0 and your account is not a superuser, permissions must be granted to consume binary log source tables in JDBC mode. Otherwise, the error message "permission denied for database" may appear. To consume binary log source tables in JDBC mode, you must be granted the CREATE permission on the database and the permissions of the replication role. Sample statements:

    -- In the standard PostgreSQL authorization model, grant the CREATE permission and the permissions of the replication role to a user.
    GRANT CREATE ON DATABASE database_name TO <user_name>;
    alter role <user_name> replication;
    
    -- If the simple permission model (SMP) is enabled for the database and the GRANT statement cannot be executed, you can use spm_grant to grant the administrator permission on the database to the user. You can also directly grant the permission in the HoloWeb console.
    call spm_grant('{dbname}_admin', 'ID of the Alibaba Cloud account, Alibaba Mail address, or RAM user');
    alter role <user_name> replication;

Precautions

  • The UPDATE statement generates two binary logging records, which are the data records before and after the UPDATE statement is executed. In this case, two data records are consumed. The binary logging feature ensures that the two data records are consecutive and in the correct order. The data record generated before the update operation is placed before the data record generated after the update operation.

  • We recommend that you set the parallelism of Realtime Compute for Apache Flink deployments to the number of shards in your Hologres table.

    You can log on to the Hologres console and execute the following statement to query the number of shards in your table. tablename indicates the name of your table.

    select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>';

Enable binary logging

By default, the binary logging feature is disabled. Therefore, when you write DDL statements to create a table, you must configure the binlog.level and binlog.ttl parameters. The following sample code provides an example.

begin;
CREATE TABLE test_message_src(
 id int primary key, 
 title text not null, 
 body text
);
call set_table_property('test_message_src', 'orientation', 'row');
call set_table_property('test_message_src', 'clustering_key', 'id');
call set_table_property('test_message_src', 'binlog.level', 'replica'); -- In Hologres V1.1 or later, you can enable the binary logging feature after you create a table. 
call set_table_property('test_message_src', 'binlog.ttl', '86400'); 
commit;

If binlog.level is set to replica, the binary logging feature is enabled. binlog.ttl indicates the TTL of binary logging. The unit of the value is seconds.

Consumption modes

Non-CDC mode

In this mode, the binary log data consumed by the source is transmitted to a descendant node as regular Flink data. The change type of all the data is INSERT. The following sample code shows how to process data of a type that is specified by the hg_binlog_event_type parameter based on your business requirements. The following sample code shows the DDL statement that is used to create a Hologres source table in this mode.

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'binlogMaxRetryTimes' = '10',
  'binlogRetryIntervalMs' = '500',
  'binlogBatchReadSize' = '100'
);

CDC mode

In this mode, each row of the binary log data consumed by the source is automatically assigned an accurate Flink RowKind type, such as INSERT, DELETE, UPDATE_BEFORE, and UPDATE_AFTER, based on the type that is specified by the hg_binlog_event_type field. This way, the binary log data can be mirrored to the destination table. This is similar to the CDC feature in MySQL and PostgreSQL. The following sample code shows the DDL statement that is used to create a Hologres source table in this mode.

CREATE TABLE test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'cdcMode' = 'true',
  'binlogMaxRetryTimes' = '10',
  'binlogRetryIntervalMs' = '500',
  'binlogBatchReadSize' = '100'
);

Consumption of full and incremental data in a source table

When you join a dimension table and a source table, all data of the source table cannot be used due to reasons such as the TTL value that is specified for binary log data. In the previous method that is used to resolve this issue, the TTL for the binary log table is set to a large value. This method may cause the following issues:

  • Historical binary log data is stored for a long period of time, which occupies a large number of storage resources.

  • Binary log data contains data update records. If full binary log data is consumed, data that is not required for your business is also consumed. As a result, a large number of computing resources are occupied and users cannot focus only on the latest data.

In VVR 4.0.13 or later and Hologres V0.10 or later, Hologres binary log source tables in CDC mode support consumption of full and incremental data. This way, the Hologres source table connector reads full data from the database and then smoothly switches to read incremental data from binary log data. This method can be used to resolve the preceding issue.

Scenarios

  • This method is suitable for scenarios in which historical data does not contain binary log data and you want to consume full data.

  • You can use this method if the destination table contains a primary key. If the destination table does not contain a primary key, we recommend that you use full and incremental Hologres source tables in CDC mode.

  • If you use Hologres whose version is later than V1.1, you can enable the binary logging feature based on your business requirements. You can enable the binary logging feature for a table that contains historical data.

Sample code

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'cdcMode' = 'true',
  'binlogStartUpMode' = 'initial', -- Read full historical data and then consume incremental data from binary log data. 
  'binlogMaxRetryTimes' = '10',
  'binlogRetryIntervalMs' = '500',
  'binlogBatchReadSize' = '100'
  );

JDBC mode used to consume binary log source tables

Realtime Compute for Apache Flink that uses VVR 6.0.7 or later supports the JDBC mode used to consume binary log source tables. This mode is different from other modes, such as CDC mode. In JDBC mode, the SDK is used based on a JDBC driver to obtain binary log data at the underlying layer. Compared with the HoloHub mode, the JDBC mode has the following advantages for consuming binary log source tables:

  • Supports more data types, including SMALLINT, INTEGER, BIGINT, TEXT, REAL, DOUBLE PRECISION, BOOLEAN, NUMERIC, DATE, TIME, TIMETZ, TIMESTAMP, TIMESTAMPTZ, BYTEA, JSON, int4[], int8[], float4[], float8[], boolean[], text[], and JSONB. The JSONB data type is supported only in Hologres V1.3.41 or later and a GUC-related parameter of Hologres for a database is configured. For more information about the JSONB data type, see Limits.

  • Supports custom users of Hologres. Custom users are not RAM users.

The method of using a binary log source table in JDBC mode is similar to the method of using a common binary log source table. For a binary log source table in JDBC mode, you must set the sdkMode parameter to jdbc. Sample code:

create TEMPORARY table test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) with (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'cdcMode' = 'true',
  'sdkMode'='jdbc', -- Consume a binary log source table in JDBC mode.
  'jdbcBinlogSlotName'='replication_slot_name' -- Optional. If you do not configure this parameter, the Hologres connector automatically creates a slot and publication.
);

The jdbcBinlogSlotName parameter is an optional parameter configured to use a binary log source table in JDBC mode. If you do not configure this parameter, the Hologres connector can automatically create and use a slot and publication. The name of created default publication is similar to that of publication_for_table_<table_name>_used_by_flink, and the name of created default slot is similar to that of slot_for_table_<table_name>_used_by_flink. If an error occurs when you use the slot and publication, you can delete the slot and publication and then create another slot and publication. The preparations must be complete before a binary log source table is consumed in JDBC mode. The Hologres connector automatically creates a slot only if your account is the superuser or you have the CREATE permission of the database and have the permissions of the replication role on the instance. If you do not have the required permissions, the deployment fails. You can perform the following operations or follow the instructions provided in Use JDBC to consume Hologres binary logs to handle the issues. If binary log source tables are consumed in JDBC mode and the version of the Hologres instance is V2.1 or later, you do not need to configure a slot. Therefore, the Hologres connector does not automatically create a default slot for Realtime Compute for Apache Flink that uses VVR 8.0.5 or later if the version of the Hologres instance is V2.1 or later.

-- In the standard PostgreSQL authorization model, grant the CREATE permission and the permissions of the replication role to a user.
GRANT CREATE ON DATABASE database_name TO <user_name>;
alter role <user_name> replication;

-- If the SMP is enabled for the database and the GRANT statement cannot be executed, you can use spm_grant to grant the administrator permission on the database to the user. You can also directly grant the permission in the HoloWeb console.
call spm_grant('{dbname}_admin', 'ID of the Alibaba Cloud account, Alibaba Mail address, or RAM user');
alter role <user_name> replication;

Note

If you delete a table and recreate a table that has the same name as the table that you deleted for a deployment, the error message "no table is defined in publication" or "The table xxx has no slot named xxx" may appear. This is because the publication that is associated with the deleted table is not deleted. If this error message appears, you can execute the select * from pg_publication where pubname not in (select pubname from pg_publication_tables); statement in Hologres to query the publication that is not deleted during table deletion. Then, execute the drop publication xx; statement to delete the remaining publication and restart the deployment. You can also change the engine version of Realtime Compute for Apache Flink to VVR 8.0.5. The Hologres connector automatically deletes the remaining publication.

Implementation principle of binary logging in Hologres

A binary log consists of binary log system fields and custom table fields. The following table describes the fields of binary logs.

Field name

Data type

Description

hg_binlog_lsn

BIGINT

A field in the binary log system. This field indicates the ordinal number of a binary log. The ordinal numbers for binary logs in a shard are generated in ascending order but may not be consecutive. The ordinal numbers in different shards can be the same and out of order.

hg_binlog_event_type

BIGINT

A field in the binary log system. This field indicates the change type of a record. Valid values:

  • INSERT=5: indicates that a row or column is inserted.

  • DELETE=2: indicates that a row or column is deleted.

  • BEFORE_UPDATE=3: indicates that a row or column is saved before the row or column is updated.

  • AFTER_UPDATE=7: indicates that a row or column is saved after the row or column is updated.

hg_binlog_timestamp_us

BIGINT

A field in the binary log system. This field indicates the timestamp of the system. Unit: microseconds.

user_table_column_1

Custom

A field in a user table.

...

...

A field in a user table.

user_table_column_n

Custom

A field in a user table.