This topic describes how to use a DataConnector to synchronize data from a DataHub topic to a Hologres table in real time.

Prerequisites

  • A Hologres instance is purchased. A development tool is connected to the instance. For more information, see PSQL quick start.
  • DataHub is activated. For more information, see Get started with DataHub.
  • You can synchronize only data of the TUPLE type from a DataHub topic to a Hologres table.

Background information

DataHub is a real-time data distribution platform designed to process streaming data. You can publish and subscribe to applications for streaming data in DataHub and distribute the data to other platforms. DataHub allows you to analyze streaming data and build applications based on the streaming data.

DataHub allows you to synchronize the data source and data sink. You can synchronize data from a DataHub topic to Hologres in real time by using a DataConnector, and analyze the data from multiple dimensions and explore the data in real time.

The following table describes the mappings between terms in DataHub and Hologres.
DataHub Hologres
Project Database
Topic Table

Synchronization

Two synchronization modes and two synchronization policies are available to synchronize data from DataHub to Hologres. You can also combine the synchronization modes and synchronization policies to achieve different effects.
Note
  • The following two synchronization modes and two synchronization policies are not the task-level configuration of DataHub, but the table attributes when you create a Hologres table. You must specify them when you create a Hologres table.
  • Synchronizing data from DataHub to Hologres conflicts with synchronizing data to Hologres in SDK write mode by using batch sync nodes provided by the Data Integration service of DataWorks. For more information about the SDK write mode, see Hologres Writer.
  • Synchronization modes
    • One-by-one insert
      One-by-one insert means that DataHub data is written to Hologres one by one. You must specify the following table attributes when you create a Hologres table:
      call set_table_property('<table_name>', 'datahub_sync_mode', 'node');
    • Playback
      Playback means the playback of DML operations on the upstream database. DataHub is equivalent to binary logs. If you use the dts-datahub-hologres link to write data, you must set the Apply New Naming Rules of Additional Columns parameter in Data Transmission Service (DTS). You must specify the following table attributes when you create a Hologres table:
      • If you set the Apply New Naming Rules of Additional Columns parameter to Yes, you must configure the following table attributes when you create a Hologres table:
        call set_table_property('<table_name>', 'datahub_sync_mode', 'dts');
      • If you set the Apply New Naming Rules of Additional Columns parameter to No, you must configure the following table attributes when you create a Hologres table:
        call set_table_property('<table_name>', 'datahub_sync_mode', 'dts_old');
      When DTS synchronizes data to DataHub, eight columns are added to the original data column to describe the playback data information (INSERT, UPDATE, or DELETE). The following part describes the fields.
      • Name format of additional columns
        Previous data column name New data column name
        dts_${Original column name} ${Original column name}
      • Additional column description
        Previous additional column name New additional column name Data type Description
        dts_record_id new_dts_sync_dts_record_id String The unique ID of the incremental log entry.
        dts_operation_flag new_dts_sync_dts_operation_flag String The operation type. Valid values:
        • I: an INSERT operation
        • D: a DELETE operation
        • U: an UPDATE operation
        dts_instance_id new_dts_sync_dts_instance_id String The server ID of the database. The value is set to NULL. To ensure database security, the actual value is not displayed.
        dts_db_name new_dts_sync_dts_db_name String The name of the database.
        dts_table_name new_dts_sync_dts_table_name String The name of the table.
        dts_utc_timestamp new_dts_sync_dts_utc_timestamp String The operation timestamp, in UTC. It is also the timestamp of the binary log file.
        dts_before_flag new_dts_sync_dts_before_flag String Indicates whether the column values are pre-update values. Valid values: Y and N.
        dts_after_flag new_dts_sync_dts_after_flag String Indicates whether the column values are post-update values. Valid values: Y and N.
  • Synchronization policy (primary key conflict policy)
    If you have set a primary key for a Hologres table, the following two primary key conflict policies are available for the data written from DataHub:
    • Overwrite
      Overwrite means that if a primary key conflict occurs when you write data, new data overwrites original data. In this case, you must specify the following table attributes when you create a Hologres table:
      call set_table_property('<table_name>', 'datahub_upsert_mode', 'insert_or_replace');
    • Ignore
      Ignore means that if a primary key conflict occurs when you write data, new data is ignored. In other words, the data is not updated, and original data is still used. In this case, you must specify the following table attributes when you create a Hologres table:
      call set_table_property('<table_name>', 'datahub_upsert_mode', 'insert_or_ignore');
  • Combinations of synchronization modes and synchronization policies
    The preceding modes and policies of synchronizing data from DataHub to Hologres can achieve different effects in different combinations. The following part describes the combinations.
    • Combination of the insert mode and the overwrite policy
      This combination is equivalent to executing the following SQL statement in Hologres:
      INSERT INTO target_table (column0,…,columnN) values (?,…,?) ON CONFLICT(PK) DO UPDATE
    • Combination of the insert mode and the ignore policy
      This combination is equivalent to executing the following SQL statement in Hologres:
      INSERT INTO target_table (column0,…,columnN) values (?,…,?) ON CONFLICT(PK) DO NOTHING
    • Combination of the playback mode and the overwrite policy
      • If dts_operation_flag=I, this combination is equivalent to executing the following SQL statement in Hologres:
        INSERT INTO target_table (column0,…,columnN) values (?,…,?) ON CONFLICT(PK) DO UPDATE
      • If dts_operation_flag=D, this combination is equivalent to executing the following SQL statement in Hologres:
        DELETEFROM target_table where pk=?
      • If dts_operation_flag=U, and dts_before_flag=Y, this combination is equivalent to executing the following SQL statement in Hologres:
        DELETEFROM target_table where pk=?
      • If dts_operation_flag=U, and dts_after_flag=Y, this combination is equivalent to executing the following SQL statement in Hologres:
        INSERT INTO target_table (column0,…,columnN) values (?,…,?) ON CONFLICT(PK) DO UPDATE
    • Combination of the playback mode and the ignore policy
      • If dts_operation_flag=I, this combination is equivalent to executing the following SQL statement in Hologres:
        INSERT INTO target_table (column0,…,columnN) values (?,…,?) ON CONFLICT(PK) DO NOTHING
      • If dts_operation_flag=D, this combination is equivalent to executing the following SQL statement in Hologres:
        DELETEFROM target_table where pk=?
      • If dts_operation_flag=U, and dts_before_flag=Y, this combination is equivalent to executing the following SQL statement in Hologres:
        DELETEFROM target_table where pk=?
      • If dts_operation_flag=U, and dts_after_flag=Y, this combination is equivalent to executing the following SQL statement in Hologres:
        INSERT INTO target_table (column0,…,columnN) values (?,…,?) ON CONFLICT(PK) DO NOTHING

Procedure

  1. Prepare a data source in DataHub.
    1. Create a DataHub project.
      1. Log on to the DataHub console. In the left-side navigation pane, click Projects.
      2. On the Projects page, click Create Project in the upper-right corner.
      3. In the Create Project panel, set the parameters as required and click Create.
      a
    2. Create a DataHub topic.
      1. On the Projects page, find the created project and click the project name.
      2. In the upper-right corner of the project details page, click Create Topic. In the Create Topic panel, set the parameters as required.
      Create Topic
      Parameter Description
      Creation Type
      • Create Directly: Create a new topic.
      • Import MaxCompute Tables: Select an existing table schema in MaxCompute to create a topic.
      Name The name of the custom topic.
      Type The type of the topic. Valid values:
      • TUPLE: structured data
      • BLOB: unstructured data
      Schema Details The details of the schema. The Schema Details parameter is displayed if you set the Type parameter to TUPLE. You can create fields based on your business requirements. If you select Allow Null for a field, the field is set to NULL if the field does not exist in the upstream. If you clear Allow Null for a field, the field configuration is strictly verified. An error is returned if the type specified for the field is invalid.
      Number of Shards The number of shards in the topic. Shards ensure the concurrent data transmission of a topic. Each shard has a unique ID. A shard may be in one of the following states: Opening: The shard is starting. Active: The shard is started and available. Each active shard consumes server resources. We recommend that you create shards as needed.
      Lifecycle The period that each record can be retained in the topic. Unit: day. Valid values: 1 to 7.
      Description The description of the topic.
    3. Write data to the topic.
      Use a tool, such as Realtime Compute for Apache Flink, or an application to write data to the created topic.
  2. Create a Hologres table to which the data is to be synchronized.
    Create a Hologres table used to receive data. You must create fields with data types corresponding to those of the DataHub topic.
    The following table describes the mappings between data types in DataHub and Hologres.
    DataHub Hologres
    BIGINT BIGINT
    STRING TEXT
    BOOLEAN BOOLEAN
    DOUBLE DOUBLE PRECISION
    TIMESTAMP TIMESTAMPTZ
    DECIMAL DECIMAL
    For example, you can execute the following statements to create a table:
    BEGIN;
    CREATE TABLE lineitem ( 
    L_ORDERKEY BIGINT NOT NULL,
    L_PARTKEY BIGINT NOT NULL,
    L_SUPPKEY BIGINT NOT NULL,
    L_LINENUMBER BIGINT NOT NULL,
    L_QUANTITY DECIMAL(20,10),
    L_EXTENDEDPRICE DECIMAL(20,10),
    L_DISCOUNT DECIMAL(20,10),
    L_TAX DECIMAL(20,10),
    L_RETURNFLAG TEXT,
    L_LINESTATUS TEXT,
    L_SHIPDATE TIMESTAMPTZ,
    L_COMMITDATE TIMESTAMPTZ,
    L_RECEIPTDATE TIMESTAMPTZ,
    L_SHIPINSTRUCT TEXT,
    L_SHIPMODE TEXT,
    L_COMMENT TEXT
    );
    
    CALL set_table_property('lineitem', 'orientation', 'column');
    CALL set_table_property('lineitem', 'shard_count', '8');
    
    COMMIT;
  3. Create a DataConnector in DataHub to synchronize data to Hologres
    1. On the Topic List tab of the project details page, find the created topic and click the topic name.
    2. On the details page of the topic, click Connector in the upper-right corner.
    3. In the Create Connector panel, click Hologres. Then, set the parameters as required and click Create. connector
      Parameter Description
      Instance The ID of the Hologres instance. You can view the instance ID in the Hologres console.
      Project The name of the Hologres database to which you want to connect.
      Topic The name of the Hologres table to which data is to be synchronized.
      Import Fields The fields to be synchronized to the Hologres table. You can synchronize all or part of the fields of the DataHub topic based on your business requirements.
      Auth Mode The mode in which the access to the Hologres table is authenticated. Default value: AK.
      AccessId The AccessKey ID of the current Alibaba Cloud account used to access the Hologres instance. You can obtain the AccessKey ID from the Security Management page.
      AccessKey The AccessKey secret of the current Alibaba Cloud account used to access the Hologres instance. You can obtain the AccessKey secret from the Security Management page.
  4. Wait until the data is synchronized to Hologres.
    On the topic details page, click the Connectors tab. Find the created DataConnector and view the status of the DataConnector. The status indicates the progress of the data synchronization. chakan
  5. Query the synchronized data in Hologres.
    Connect the Hologres instance to a development tool and use the tool to check whether the data is synchronized to the Hologres instance in real time. For more information about development tools, see Overview. For example, you can execute the following statement to query the synchronized data:
    SELECT COUNT(*) FROM lineitem;

Common error troubleshooting

This section describes common errors that are reported when you use Hologres so that you can troubleshoot and resolve problems.

  • Scenario 1: The following error message is returned when you query data:
    ErrorMessage [Import field not found in dest schema;

    Possible cause: The datahub_sync_mode property is not set to dts.

    Solution: Create a Hologres table and set the table property datahub_sync_mode to dts.

  • Scenario 2: The following error message is returned when you query data:
    ErrorCode=InternalServerError; ErrorMessage =Field already exists 

    Possible cause: The datahub_sync_mode property for a Hologres table is set to dts, and eight additional columns are included when the table is created.

    Solution: Recreate the Hologres table, and if you set the datahub_sync_mode property to dts, keep the fields the same as those of the upstream. You do not need to add eight additional system columns.