A Hologres data source provides bidirectional channels to read data from and write data to Hologres. This topic describes the data synchronization capabilities of Hologres data sources in DataWorks.
Limitations
Offline read and write
A Hologres data source supports a Serverless resource group (recommended) and an exclusive resource group for Data Integration.
Hologres Writer does not support writing data to Hologres foreign tables.
A Hologres data source obtains a Hologres endpoint based on the following logic:
For a Hologres instance in the current region, endpoints are obtained in the following order: .
For a cross-region Hologres instance, endpoints are obtained in the following order: .
Real-time full database write
Real-time data synchronization tasks support only Serverless resource groups (recommended) and exclusive resource groups for Data Integration.
Real-time data synchronization tasks do not support tables without a primary key.
Supported field types
Field type | Offline read (Hologres Reader) | Offline write (Hologres Writer) | Real-time write |
UUID | Not supported | Not supported | Not supported |
CHAR | Supported | Supported | Supported |
NCHAR | Supported | Supported | Supported |
VARCHAR | Supported | Supported | Supported |
LONGVARCHAR | Supported | Supported | Supported |
NVARCHAR | Supported | Supported | Supported |
LONGNVARCHAR | Supported | Supported | Supported |
CLOB | Supported | Support | Supported |
NCLOB | Supported | Supported | Support |
SMALLINT | Supported | Supported | Support |
TINYINT | Support | Supported | Supported |
INTEGER | Supported | Supported | Support |
BIGINT | Support | Supported | Support |
NUMERIC | Supported | Support | Supported |
DECIMAL | Supported | Supported | Supported |
FLOAT | Supported | Supported | Supported |
REAL | Supported | Supported | Supported |
DOUBLE | Supported | Supported | Supported |
TIME | Supported | Support | Supported |
DATE | Supported | Supported | Supported |
TIMESTAMP | Supported | Supported | Support |
BINARY | Supported | Support | Supported |
VARBINARY | Supported | Supported | Supported |
BLOB | Supported | Supported | Supported |
LONGVARBINARY | Support | Supported | Supported |
BOOLEAN | Supported | Supported | Supported |
BIT | Supported | Supported | Supported |
JSON | Supported | Supported | Supported |
JSONB | Supported | Supported | Supported |
How it works
Offline read
Hologres Reader uses PSQL to read data from Hologres tables. It initiates multiple concurrent tasks based on the shard count of the table. Each shard corresponds to a concurrent SELECT task.
When you create a table in Hologres, you can configure its shard count by running the
CALL set_table_property('table_name', 'shard_count', 'xx')statement within the sameCREATE TABLEtransaction.By default, the shard count of the database is used. The specific value depends on the configuration of the Hologres instance.
The SELECT statement uses the built-in hg_shard_id column of the table to filter data by shard.
Offline write
Hologres Writer obtains the protocol data that is generated by the reader through the data synchronization framework. It then determines the conflict resolution policy for data writes based on the conflictMode (conflict policy) setting.
You can configure the conflictMode parameter to specify how to handle new data when a primary key conflict occurs between the new data and existing data:
conflictMode applies only to tables with a primary key. For more information about the write principles and performance, see Technical Principles.
If conflictMode is set to Replace (full row update), the new data overwrites the old data. All columns in the row are overwritten. Fields that are not mapped to a column are forcibly set to NULL.
If conflictMode is set to Update, the new data overwrites the old data. Only the fields that are mapped to a column are overwritten.
If conflictMode is set to Ignore, the new data is ignored.
Add a data source
Before you develop a synchronization task in DataWorks, you must add the required data source to DataWorks by following the instructions in Data Source Management. You can view the infotips of parameters in the DataWorks console to understand the meanings of the parameters when you add a data source.
Develop a data synchronization task
For information about the entry point for and the procedure of configuring a synchronization task, see the following configuration guides.
Guide to configuring an offline synchronization task for a single table
For more information about the procedure, see Codeless UI configuration or Code editor configuration.
For a complete list of parameters and a code sample for script mode, see Appendix: Code and parameters.
Guide to configuring a real-time synchronization task for a single table
For more information about the operation flow, see Configure real-time sync tasks in DataStudio and Configure a real-time sync task in Data Integration.
Guide to configuring offline and real-time full database synchronization tasks
For more information about the procedure, see Configure an offline full-database sync task and Full-database real-time sync task configuration.
Appendix: Script demo and parameter description
Configure a batch synchronization task by using the code editor
If you want to configure a batch synchronization task by using the code editor, you must configure the related parameters in the script based on the unified script format requirements. For more information, see Configuration in the code editor. The following information describes the parameters that you must configure for data sources when you configure a batch synchronization task by using the code editor.
Reader script demo
Configure a non-partitioned table
The following code shows how to configure a task that reads data from a non-partitioned Hologres table to memory.
{ "transform": false, "type": "job", "version": "2.0", "steps": [ { "stepType": "holo", "parameter": { "datasource": "holo_db", "envType": 1, "column": [ "tag", "id", "title", "body" ], "where": "", "table": "holo_reader_basic_src" }, "name": "Reader", "category": "reader" }, { "stepType": "stream", "parameter": { "print": false, "fieldDelimiter": "," }, "name": "Writer", "category": "writer" } ], "setting": { "executeMode": null, "failoverEnable": null, "errorLimit": { "record": "0" }, "speed": { "concurrent": 2, "throttle": false } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }The following code shows the DDL statement for the Hologres table.
begin; drop table if exists holo_reader_basic_src; create table holo_reader_basic_src( tag text not null, id int not null, title text not null, body text, primary key (tag, id)); call set_table_property('holo_reader_basic_src', 'orientation', 'column'); call set_table_property('holo_reader_basic_src', 'shard_count', '3'); commit;
Configure a partitioned table
You can configure a task to read data from a child table of a partitioned Hologres table to memory.
NoteNote the configuration of the partition parameter.
{ "transform": false, "type": "job", "version": "2.0", "steps": [ { "stepType": "holo", "parameter": { "selectedDatabase": "public", "partition": "tag=foo", "datasource": "holo_db", "envType": 1, "column": [ "tag", "id", "title", "body" ], "tableComment": "", "where": "", "table": "public.holo_reader_basic_part_src" }, "name": "Reader", "category": "reader" }, { "stepType":"stream", "parameter":{}, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"0" }, "speed":{ "throttle":true, "concurrent":1, "mbps":"12" } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }The following code shows the DDL statement for the Hologres table.
begin; drop table if exists holo_reader_basic_part_src; create table holo_reader_basic_part_src( tag text not null, id int not null, title text not null, body text, primary key (tag, id)) partition by list( tag ); call set_table_property('holo_reader_basic_part_src', 'orientation', 'column'); call set_table_property('holo_reader_basic_part_src', 'shard_count', '3'); commit; create table holo_reader_basic_part_src_1583161774228 partition of holo_reader_basic_part_src for values in ('foo'); # Make sure that the child table of the partitioned table is created and data is imported. postgres=# \d+ holo_reader_basic_part_src Table "public.holo_reader_basic_part_src" Column | Type | Collation | Nullable | Default | Storage | Stats target | Description --------+---------+-----------+----------+---------+----------+--------------+------------- tag | text | | not null | | extended | | id | integer | | not null | | plain | | title | text | | not null | | extended | | body | text | | | | extended | | Partition key: LIST (tag) Indexes: "holo_reader_basic_part_src_pkey" PRIMARY KEY, btree (tag, id) Partitions: holo_reader_basic_part_src_1583161774228 FOR VALUES IN ('foo')
Reader script parameters
Parameter | Description | Required | Default value |
database | The name of the database in the Hologres instance. | Yes | None |
table | The name of the Hologres table. If the table is a partitioned table, specify the name of the parent table. | Yes | None |
column | The columns from which you want to read data. The value must include the primary key columns of the source table. Example: | Yes | None |
partition | For a partitioned table, this parameter specifies the partition column and its value. The format is Important
| No | Empty. This indicates a non-partitioned table. |
Writer script demo
Configure a non-partitioned table
This example shows the configuration for importing data from MySQL to a Hologres standard table in JDBC mode.
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "mysql", "parameter": { "envType": 0, "useSpecialSecret": false, "column": [ "<column1>", "<column2>", ......, "<columnN>" ], "tableComment": "", "connection": [ { "datasource": "<mysql_source_name>",// The name of the MySQL data source. "table": [ "<mysql_table_name>" ] } ], "where": "", "splitPk": "", "encoding": "UTF-8" }, "name": "Reader", "category": "reader" }, { "stepType": "holo", "parameter": { "selectedDatabase":"public", "schema": "public", "maxConnectionCount": 9, "truncate":true,// The cleanup rule. "datasource": "<holo_sink_name>",// The name of the Hologres data source. "conflictMode": "ignore", "envType": 0, "column": [ "<column1>", "<column2>", ......, "<columnN>" ], "tableComment": "", "table": "<holo_table_name>", "reShuffleByDistributionKey":false }, "name": "Writer", "category": "writer" } ], "setting": { "executeMode": null, "errorLimit": { "record": "0" }, "locale": "zh_CN", "speed": { "concurrent": 2,// The number of concurrent jobs. "throttle": false// Throttling. } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }The following code shows the DDL statement for the Hologres table.
begin; drop table if exists mysql_to_holo_test; create table mysql_to_holo_test( tag text not null, id int not null, body text not null, brrth date, primary key (tag, id)); call set_table_property('mysql_to_holo_test', 'orientation', 'column'); call set_table_property('mysql_to_holo_test', 'distribution_key', 'id'); call set_table_property('mysql_to_holo_test', 'clustering_key', 'birth'); commit;
Configure a partitioned table
NoteHologres supports only LIST partitioning. Only a single column can be used as the partition column, and the column must be of the INT4 or TEXT type.
Make sure that this parameter is consistent with the partition configuration in the DDL statement of the table.
You can configure a task to synchronize data from a MySQL table to a child table of a partitioned Hologres table.
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "mysql", "parameter": { "envType": 0, "useSpecialSecret": false, "column": [ "<column1>", "<column2>", ......, "<columnN>" ], "tableComment": "", "connection": [ { "datasource": "<mysql_source_name>", "table": [ "<mysql_table_name>" ] } ], "where": "", "splitPk": "<mysql_pk>",// The primary key field of the MySQL table. "encoding": "UTF-8" }, "name": "Reader", "category": "reader" }, { "stepType": "holo", "parameter": { "selectedDatabase": "public", "maxConnectionCount": 9, "partition": "<partition_key>",// The partition key of the Hologres table. "truncate": "false", "datasource": "<holo_sink_name>",// The name of the Hologres data source. "conflictMode": "ignore", "envType": 0, "column": [ "<column1>", "<column2>", ......, "<columnN>" ], "tableComment": "", "table": "<holo_table_name>", "reShuffleByDistributionKey":false }, "name": "Writer", "category": "writer" } ], "setting": { "executeMode": null, "failoverEnable": null, "errorLimit": { "record": "0" }, "speed": { "concurrent": 2,// The number of concurrent jobs. "throttle": false// Throttling. } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }The following code shows the DDL statement for the Hologres table.
BEGIN; CREATE TABLE public.hologres_parent_table( a text , b int, c timestamp, d text, ds text, primary key(ds,b) ) PARTITION BY LIST(ds); CALL set_table_property('public.hologres_parent_table', 'orientation', 'column'); CREATE TABLE public.holo_child_1 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201215'); CREATE TABLE public.holo_child_2 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201216'); CREATE TABLE public.holo_child_3 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201217'); COMMIT;
Writer script parameters
Parameter | Description | Required | Default value |
database | The name of the database in the Hologres instance. | Yes | None |
table | The name of the Hologres table. You can include the schema name in the table name. Example: | Yes | None |
conflictMode | conflictMode includes Replace, Update, and Ignore. For more information, see How it works. | Yes | None |
column | The columns to which you want to write data. The value must include the primary key columns of the destination table. Example: | Yes | None |
partition | For a partitioned table, this parameter specifies the partition column and its value. The format is Note
| No | Empty. This indicates a non-partitioned table. |
reShuffleByDistributionKey | In Hologres, batch importing data into a table with a primary key triggers a table lock by default. This limits the concurrent write capability of multiple connections. You can enable the reShuffle feature in offline synchronization scenarios. This feature allows different tasks to write data to specified Holo shards based on the data sharding key. This enables concurrent batch writes and significantly improves write performance. Compared with real-time writes in traditional JDBC mode, this feature reduces the payload on the Holo server and improves write efficiency. Important This feature can be enabled only for Serverless resource groups. | No | false |
truncate | Specifies whether to delete data from the destination table before writing data to it.
| No | false |