This topic describes how to use a DataConnector to synchronize data from a DataHub topic to a Hologres table in real time.
Prerequisites
- A Hologres instance is purchased. A development tool is connected to the instance. For more information, see PSQL quick start.
- DataHub is activated. For more information, see Get started with DataHub.
- A DataHub topic of the TUPLE type is created. You can synchronize data only from a DataHub topic of the TUPLE type to a Hologres table.
Background information
DataHub is a real-time data distribution platform designed to process streaming data. You can publish and subscribe to streaming data in DataHub and distribute the data to other platforms. DataHub allows you to analyze streaming data and build applications based on the streaming data.
DataHub supports data synchronization as a data source or a data sink. You can synchronize data from a DataHub topic to Hologres in real time by using a DataConnector, and analyze the data from multiple aspects and explore the data in real time.
DataHub | Hologres |
---|---|
Project | Database |
Topic | Table |
Limits
- If you enable the IP address whitelist feature for a Hologres instance, you cannot synchronize data from DataHub to the Hologres instance. Do not enable the IP address whitelist feature.
- You must create a child partitioned table in Hologres before you write data to a partitioned table. For more information, see CREATE TABLE.
Synchronization
- The following two synchronization modes and two synchronization policies are not the task-level configurations of DataHub, but the table properties of a Hologres table. You must specify them when you create a Hologres table.
- Synchronizing data from DataHub to Hologres conflicts with synchronizing data to Hologres in SDK write mode by using batch synchronization nodes provided by the Data Integration service of DataWorks. For more information about the SDK write mode, see Hologres Writer.
- Synchronization modes
- One-by-one insert
One-by-one insert means that DataHub data is written to Hologres one by one. You must specify the following table property when you create a Hologres table:
call set_table_property('<table_name>', 'datahub_sync_mode', 'none');
- Playback
Playback means the playback of DML operations on the upstream database. DataHub is equivalent to binary logs. If you use the
dts-datahub-hologres
link to write data, you must set the Apply New Naming Rules of Additional Columns parameter in Data Transmission Service (DTS). You must also specify the following table property when you create a Hologres table:- If you set the Apply New Naming Rules of Additional Columns parameter to Yes, you must specify the following table property when you create a Hologres table:
call set_table_property('<table_name>', 'datahub_sync_mode', 'dts');
- If you set the Apply New Naming Rules of Additional Columns parameter to No, you must specify the following table property when you create a Hologres table:
call set_table_property('<table_name>', 'datahub_sync_mode', 'dts_old');
When DTS synchronizes data to DataHub, eight columns are added to the synchronized data columns to describe the playback data information, such as the INSERT, UPDATE, or DELETE operation. The following part describes the additional columns.- Name format of additional columns
Previous data column name New data column name dts_${Original column name} ${Original column name} - Additional column description
Previous additional column name New additional column name Data type Description dts_record_id new_dts_sync_dts_record_id String The unique ID of the incremental log entry. dts_operation_flag new_dts_sync_dts_operation_flag String The operation type. Valid values: - I: an INSERT operation
- D: a DELETE operation
- U: an UPDATE operation
dts_instance_id new_dts_sync_dts_instance_id String The server ID of the database. The value is set to NULL. The actual value is not displayed to ensure data consistency. dts_db_name new_dts_sync_dts_db_name String The name of the database. dts_table_name new_dts_sync_dts_table_name String The name of the table. dts_utc_timestamp new_dts_sync_dts_utc_timestamp String The operation timestamp, in UTC. It is also the timestamp of the binary log file. dts_before_flag new_dts_sync_dts_before_flag String Indicates whether the column values are pre-update values. Valid values: Y and N. dts_after_flag new_dts_sync_dts_after_flag String Indicates whether the column values are post-update values. Valid values: Y and N.
- If you set the Apply New Naming Rules of Additional Columns parameter to Yes, you must specify the following table property when you create a Hologres table:
- One-by-one insert
- Synchronization policies for primary key conflicts
If you have set a primary key for a Hologres table, the following two primary key conflict policies are available for the data written from DataHub:
- Overwrite
Overwrite means that if a primary key conflict occurs when you write data, new data overwrites original data. In this case, you must specify the following table property when you create a Hologres table:
call set_table_property('<table_name>', 'datahub_upsert_mode', 'insert_or_replace');
- Ignore
Ignore means that if a primary key conflict occurs when you write data, new data is ignored. In other words, the data is not updated, and original data is still used. In this case, you must specify the following table property when you create a Hologres table:
call set_table_property('<table_name>', 'datahub_upsert_mode', 'insert_or_ignore');
- Overwrite
- Combinations of synchronization modes and synchronization policies
The preceding modes and policies of synchronizing data from DataHub to Hologres can achieve different effects in different combinations. The following part describes the combinations.
- Combination of the insert mode and the overwrite policy
This combination is equivalent to executing the following SQL statement in Hologres:
INSERT INTO target_table (column0,…,columnN) values (?,…,?) ON CONFLICT(PK) DO UPDATE
- Combination of the insert mode and the ignore policy
This combination is equivalent to executing the following SQL statement in Hologres:
INSERT INTO target_table (column0,…,columnN) values (?,…,?) ON CONFLICT(PK) DO NOTHING
- Combination of the playback mode and the overwrite policy
- If the value of the dts_operation_flag field is I, this combination is equivalent
to executing the following SQL statement in Hologres:
INSERT INTO target_table (column0,…,columnN) values (?,…,?) ON CONFLICT(PK) DO UPDATE
- If the value of the dts_operation_flag field is D, this combination is equivalent
to executing the following SQL statement in Hologres:
DELETE FROM target_table where pk=?
- If the value of the dts_operation_flag field is U and the value of the dts_before_flag
field is Y, this combination is equivalent to executing the following SQL statement
in Hologres:
DELETE FROM target_table where pk=?
- If the value of the dts_operation_flag field is U and the value of the dts_after_flag
field is Y, this combination is equivalent to executing the following SQL statement
in Hologres:
INSERT INTO target_table (column0,…,columnN) values (?,…,?) ON CONFLICT(PK) DO UPDATE
- If the value of the dts_operation_flag field is I, this combination is equivalent
to executing the following SQL statement in Hologres:
- Combination of the playback mode and the ignore policy
- If the value of the dts_operation_flag field is I, this combination is equivalent
to executing the following SQL statement in Hologres:
INSERT INTO target_table (column0,…,columnN) values (?,…,?) ON CONFLICT(PK) DO NOTHING
- If the value of the dts_operation_flag field is D, this combination is equivalent
to executing the following SQL statement in Hologres:
DELETE FROM target_table where pk=?
- If the value of the dts_operation_flag field is U and the value of the dts_before_flag
field is Y, this combination is equivalent to executing the following SQL statement
in Hologres:
DELETE FROM target_table where pk=?
- If the value of the dts_operation_flag field is U and the value of the dts_after_flag
field is Y, this combination is equivalent to executing the following SQL statement
in Hologres:
INSERT INTO target_table (column0,…,columnN) values (?,…,?) ON CONFLICT(PK) DO NOTHING
- If the value of the dts_operation_flag field is I, this combination is equivalent
to executing the following SQL statement in Hologres:
- Combination of the insert mode and the overwrite policy
Procedure
Common errors and troubleshooting
This section describes common errors that are reported when you use Hologres to help you troubleshoot the errors.
-
Scenario 1: The following error message is returned when you query data:
ErrorMessage [Import field not found in dest schema;
Possible cause: The datahub_sync_mode property of the Hologres table is not set to
dts
.Solution: Create another Hologres table and set the table property datahub_sync_mode to
dts
. -
Scenario 2: The following error message is returned when you query data:
ErrorCode=InternalServerError; ErrorMessage =Field already exists
Possible cause: The datahub_sync_mode property of the Hologres table is set to
dts
, and eight additional columns are included when you create the table.Solution: Create another Hologres table. If you set the datahub_sync_mode property to
dts
, keep the fields the same as those of the upstream. You do not need to add eight additional columns.