DataHub's connector streams data from a DataHub topic into a Hologres table in real time or near real time. This guide walks you through creating a connector to enable live analytics, real-time monitoring, or real-time reporting on streaming data.
In DataHub, a Project corresponds to a Hologres Database, and a Topic corresponds to a Hologres Table.
The connector supports two ingestion modes:
Row-by-row insertion: Write all DataHub records directly into Hologres, row by row. Use this mode when synchronizing all DataHub data to Hologres.
Playback: Replay binary logging (Binlog) operation logs from DTS via DataHub into Hologres, applying source database changes to keep the target in sync. Use this mode when DTS first synchronizes data to DataHub (with DataHub acting as Binlog), and then DataHub synchronizes to Hologres.
When a primary key conflict occurs, choose replace (overwrite old data) or ignore (keep old data) to control how conflicts are resolved.
Prerequisites
Before you begin, ensure that you have:
A DataHub service enabled, with a topic that contains TUPLE type data
A Hologres instance purchased and accessible
A Hologres table created to receive the data (see Data type mapping for field type requirements)
An AccessKey ID and AccessKey Secret with access to the Hologres instance
(Partitioned tables only) Child tables created before starting synchronization
Data type mapping
DataHub only supports synchronizing TUPLE type data to Hologres. Map DataHub field types to Hologres column types as follows:
| DataHub | Hologres |
|---|---|
| TINYINT | SMALLINT |
| SMALLINT | SMALLINT |
| INTEGER | INTEGER |
| BIGINT | BIGINT |
| FLOAT | REAL |
| DOUBLE | DOUBLE PRECISION |
| DECIMAL | DECIMAL |
| STRING | TEXT |
| BOOLEAN | BOOLEAN |
| TIMESTAMP | TIMESTAMPTZ |
The following example creates the lineitem table used in this guide:
BEGIN;
CREATE TABLE lineitem (
L_ORDERKEY BIGINT NOT NULL,
L_PARTKEY BIGINT NOT NULL,
L_SUPPKEY BIGINT NOT NULL,
L_LINENUMBER BIGINT NOT NULL,
L_QUANTITY DECIMAL(20,10),
L_EXTENDEDPRICE DECIMAL(20,10),
L_DISCOUNT DECIMAL(20,10),
L_TAX DECIMAL(20,10),
L_RETURNFLAG TEXT,
L_LINESTATUS TEXT,
L_SHIPDATE TIMESTAMPTZ,
L_COMMITDATE TIMESTAMPTZ,
L_RECEIPTDATE TIMESTAMPTZ,
L_SHIPINSTRUCT TEXT,
L_SHIPMODE TEXT,
L_COMMENT TEXT
);
CALL set_table_property('lineitem', 'orientation', 'column');
COMMIT;Create a synchronization task
Log on to the DataHub console and open the topic you want to sync.
On the topic details page, click +Sync in the upper-right corner.
Click Hologres to open the Create Connector page, then configure the following parameters:
Each sync task consumes one connection per shard in the DataHub topic. Make sure your Hologres instance has enough available connections before creating the task.
Parameter Description Default Instance The Hologres instance ID. Get it from the Hologres Management Console. — Database The name of the Hologres database to receive data. — Table The name of the Hologres table to receive data (for example, lineitem).— Primary Key Conflict Policy How to handle primary key conflicts during write. replace: new data overwrites old data.ignore: new data is discarded and old data is kept.replaceSynchronization Scenario The ingestion mode. default: row-by-row insertion for all DataHub data.dts: Binlog Playback with new DTS auxiliary column naming rules.dts_old: Binlog Playback with legacy DTS auxiliary column naming rules.defaultImport Fields The fields to sync into Hologres. Select all fields or a subset. — Authentication Mode The authentication method for accessing Hologres. AccessKey AccessKey ID The AccessKey ID for the Hologres instance. Get it from AccessKey Management. — AccessKey Secret The AccessKey Secret for the Hologres instance. Get it from AccessKey Management. — Timestamp Unit The time unit used for timestamp fields. MICROSECOND,MILLISECOND, orSECOND.MICROSECONDClick Create to start the synchronization. After the connector is created, monitor sync progress on the Sync Tasks tab of the topic details page.
Query the synced data in Hologres. Connect to Hologres using a development tool (see Connect to Hologres), then run:
SELECT COUNT(*) FROM lineitem;
Troubleshooting
ErrorMessage: Import field not found in dest schema
A field specified in Import Fields does not exist in the Hologres table.
Possible causes:
The Hologres table is missing columns that you want to import.
Synchronization Scenario is set to
default, but the topic contains DTS auxiliary columns that are not in the Hologres schema.
Solutions:
If the table is missing columns: recreate the table with the correct schema, or modify the sync task to exclude the missing fields.
If the issue is caused by DTS auxiliary columns: recreate the sync task and set Synchronization Scenario to
dtsordts_old.
ErrorMessage: Column type not match with Holo column
A DataHub field type does not match the corresponding Hologres column type. Recreate the Hologres table with the correct column types based on the data type mapping.
ErrorMessage: Not import column xxx not allow null and no default value
A column in the Hologres table has NOT NULL set but is not included in Import Fields and has no default value. When a column is excluded from import, Hologres must be able to fill it automatically.
Recreate the Hologres table: either add a default value to the excluded column, or remove the NOT NULL constraint from it.