This topic describes how to use the Hologres connector to synchronize data in a YAML-based data ingestion job.
Background information
Hologres is an end-to-end real-time data warehouse engine. It supports large-scale real-time data ingestion, updates, and analytics. It uses standard SQL and is compatible with the PostgreSQL protocol. It supports OLAP and ad hoc analysis on petabytes of data. It delivers high-concurrency, low-latency online data services. It integrates tightly with MaxCompute, Flink, and DataWorks to provide full-stack online and offline data warehouse solutions. The following table lists the capabilities of the Hologres YAML connector.
Category | Description |
Supported type | Data ingestion sink |
Running mode | Streaming and batch modes |
Data format | Not supported |
Monitoring metrics |
Note For more information about the metrics, see Monitoring metrics. |
API type | YAML |
Support for updating or deleting data in sink tables | Yes |
Features
Feature | Description |
Synchronizes full and incremental data from all tables in a database—or multiple tables—to each corresponding sink table. | |
While synchronizing all tables in a database, also synchronizes schema changes—such as adding, deleting, or renaming columns—in each source table to its sink table in real time. | |
Uses regular expressions to match source tables across multiple sharded databases. After merging the data, synchronizes it to downstream sink tables with matching names. | |
Writes data from an upstream table to a Hologres partitioned table. | |
Maps upstream data types to wider Hologres data types using multiple mapping strategies. |
Syntax
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}Parameters
Parameter | Description | Data type | Required | Default value | Remarks |
type | The sink type. | String | Yes | None | Set to |
name | The sink name. | String | No | None | None. |
dbname | The database name. | String | Yes | None | None. |
username | The username. Enter your Alibaba Cloud account's AccessKey ID. | String | Yes | None | For more information, see How do I view the AccessKey ID and AccessKey secret? Important To protect your AccessKey pair, use variables to configure the AccessKey ID. For more information, see Project variables. |
password | The password. Enter your Alibaba Cloud account's AccessKey secret. | String | Yes | None | |
endpoint | The Hologres endpoint. | String | Yes | None | For more information, see Endpoints. |
jdbcRetryCount | The maximum number of retries for write and query operations if a connection fails. | Integer | No | 10 | None. |
jdbcRetrySleepInitMs | The fixed wait time before each retry. | Long | No | 1000 | Unit: milliseconds. The actual wait time is calculated as |
jdbcRetrySleepStepMs | The incremental wait time added before each retry. | Long | No | 5000 | Unit: milliseconds. The actual wait time is calculated as |
jdbcConnectionMaxIdleMs | The maximum idle time for a JDBC connection. | Long | No | 60000 | Unit: milliseconds. If a connection remains idle longer than this value, it closes and releases. |
jdbcMetaCacheTTL | The time-to-live (TTL) for cached TableSchema information. | Long | No | 60000 | Unit: milliseconds. |
jdbcMetaAutoRefreshFactor | If the remaining cache time is less than the trigger time, the system refreshes the cache automatically. | Integer | No | 4 | The remaining time of the cache is calculated using the following formula: Remaining time = Time-to-live (TTL) - Elapsed time. After the cache is automatically refreshed, the elapsed time is reset to 0. Trigger time = jdbcMetaCacheTTL / jdbcMetaAutoRefreshFactor. |
mutatetype | The data write mode. | String | No | INSERT_OR_UPDATE | If primary keys are configured in the Hologres physical table, the Hologres streaming sink uses the exactly-once semantics based on the primary keys. If multiple records with the same primary key appear, you must specify the mutatetype parameter to determine how the sink table is updated. Valid values of the mutatetype parameter:
|
createparttable | Whether to create missing partitioned tables automatically based on partition values. | Boolean | No | false | None. |
sink.delete-strategy | How to handle retraction messages. | String | No | None | Valid values:
|
jdbcWriteBatchSize | The maximum number of rows per batch when writing in JDBC mode. | Integer | No | 256 | Unit: rows. Note Only one of jdbcWriteBatchSize, jdbcWriteBatchByteSize, or jdbcWriteFlushInterval needs to be satisfied to trigger a write. |
jdbcWriteBatchByteSize | The maximum number of bytes per batch when writing in JDBC mode. | Long | No | 2097152 (2 × 1024 × 1024 bytes), or 2 MB | Note Only one of jdbcWriteBatchSize, jdbcWriteBatchByteSize, or jdbcWriteFlushInterval needs to be satisfied to trigger a write. |
jdbcWriteFlushInterval | The maximum wait time before flushing a batch to Hologres in JDBC mode. | Long | No | 10000 | Unit: milliseconds. Note Only one of jdbcWriteBatchSize, jdbcWriteBatchByteSize, or jdbcWriteFlushInterval needs to be satisfied to trigger a write. |
ignoreNullWhenUpdate | Whether to ignore null values in update writes when mutatetype='insertOrUpdate'. | Boolean | No | false | Valid values:
|
jdbcEnableDefaultForNotNullColumn | Whether to let the connector fill a default value when writing null to a NOT NULL column without a default in the Hologres table. | Boolean | No | true | Valid values:
|
remove-u0000-in-text.enabled | Whether to remove \u0000 invalid characters from STRING data before writing. | Boolean | No | false | Valid values:
|
deduplication.enabled | Whether to deduplicate during batch writes in JDBC or jdbc_fixed mode. | Boolean | No | true | Valid values:
|
sink.type-normalize-strategy | The data mapping strategy. | String | No | STANDARD | The strategy used when the Hologres sink converts upstream data types to Hologres types.
|
table_property.* | Hologres physical table properties. | String | No | None | When creating a Hologres table, you can set physical table properties in the WITH clause. Proper settings help organize and query data efficiently. Warning The default value of table_property.distribution_key is the primary key. Do not change it unless necessary—it affects write correctness. |
connection.ssl.mode | Whether to enable SSL encryption in transit, and which mode to use. | String | No | disable |
Note
|
connection.ssl.root-cert.location | The path to the certificate file when encryption in transit requires a certificate. | String | No | None | Required when connection.ssl.mode is set to verify-ca or verify-full. Upload the CA certificate using the File Management feature in the Realtime Compute console. Uploaded files go to /flink/usrlib. For example, if your CA certificate is named certificate.crt, set this parameter to Note For instructions on obtaining the CA certificate, see Encryption in transit — Download the CA certificate. |
Data type mapping
Use the sink.type-normalize-strategy parameter to define how upstream data types map to Hologres data types.
Enable sink.type-normalize-strategy when starting a YAML job for the first time. If you enable it after startup, delete downstream tables and restart the job statelessly for the setting to take effect.
Supported array types include INTEGER, BIGINT, FLOAT, DOUBLE, BOOLEAN, CHAR, and VARCHAR.
Hologres does not support NUMERIC as a primary key. If a primary key maps to NUMERIC, Hologres converts it to VARCHAR.
STANDARD
When sink.type-normalize-strategy is STANDARD, the mapping is as follows:
Flink CDC type | Hologres type |
CHAR | bpchar |
STRING | text |
VARCHAR | text (if length > 10,485,760 bytes) |
varchar (if length ≤ 10,485,760 bytes) | |
BOOLEAN | bool |
BINARY | bytea |
VARBINARY | |
DECIMAL | numeric |
TINYINT | int2 |
SMALLINT | |
INTEGER | int4 |
BIGINT | int8 |
FLOAT | float4 |
DOUBLE | float8 |
DATE | date |
TIME_WITHOUT_TIME_ZONE | time |
TIMESTAMP_WITHOUT_TIME_ZONE | timestamp |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | timestamptz |
ARRAY | Arrays of various types |
MAP | Not supported |
ROW | Not supported |
BROADEN
When sink.type-normalize-strategy is BROADEN, Flink CDC types map to broader Hologres types. The mapping is as follows:
Flink CDC type | Hologres type |
CHAR | text |
STRING | |
VARCHAR | |
BOOLEAN | bool |
BINARY | bytea |
VARBINARY | |
DECIMAL | numeric |
TINYINT | int8 |
SMALLINT | |
INTEGER | |
BIGINT | |
FLOAT | float8 |
DOUBLE | |
DATE | date |
TIME_WITHOUT_TIME_ZONE | time |
TIMESTAMP_WITHOUT_TIME_ZONE | timestamp |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | timestamptz |
ARRAY | Arrays of various types |
MAP | Not supported |
ROW | Not supported |
ONLY_BIGINT_OR_TEXT
When sink.type-normalize-strategy is ONLY_BIGINT_OR_TEXT, all Flink CDC types map to BIGINT or STRING in Hologres. The mapping is as follows:
Flink CDC type | Hologres type |
TINYINT | int8 |
SMALLINT | |
INTEGER | |
BIGINT | |
BOOLEAN | text |
BINARY | |
VARBINARY | |
DECIMAL | |
FLOAT | |
DOUBLE | |
DATE | |
TIME_WITHOUT_TIME_ZONE | |
TIMESTAMP_WITHOUT_TIME_ZONE | |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | |
ARRAY | Arrays of various types |
MAP | Not supported |
ROW | Not supported |
Write to partitioned tables
Hologres sinks support writing to partitioned tables. Combine them with the transform module to write upstream data to Hologres partitioned tables. Note the following:
The partition key must be part of the primary key. Using a non-primary key from upstream as the partition key may cause inconsistent primary keys between upstream and downstream. Inconsistent primary keys cause data inconsistency during synchronization.
You can use columns of the TEXT, VARCHAR, and INT data types as partition keys. In Hologres V1.3.22 and later, you can also use columns of the DATE data type as partition keys.
Set createparttable to true to create child partitioned tables automatically. Otherwise, create them manually.
For an example, see Write to partitioned tables.
Synchronize table schema changes
CDC YAML pipeline jobs use different policies to handle schema evolution. These policies are specified using the pipeline-level configuration item schema.change.behavior. Valid values for schema.change.behavior are IGNORE, LENIENT, TRY_EVOLVE, EVOLVE, and EXCEPTION. Hologres sinks do not support the TRY_EVOLVE policy. The LENIENT and EVOLVE policies involve schema evolution. The following sections describe how to handle different schema change events.
LENIENT (default)
In LENIENT mode, schema changes are handled as follows:
Add a nullable column: adds the column to the end of the sink table schema and synchronizes data to it.
Delete a nullable column: fills the column with null instead of removing it from the sink table.
Add a non-nullable column: adds the column to the end of the sink table schema and synchronizes data to it. The new column defaults to nullable. Data before the column was added defaults to null.
Rename a column: treated as add + delete. Adds the renamed column to the end of the sink table and fills the original column with null. For example, if col_a becomes col_b, col_b is added and col_a is filled with null.
Change column type: not supported. Hologres does not allow column type changes. Use sink.type-normalize-strategy instead.
The following schema changes are not supported:
Changes to constraints such as primary keys or indexes.
Deletion of non-nullable columns.
Changing from NOT NULL to nullable.
EVOLVE
In EVOLVE mode, schema changes are handled as follows:
Add a nullable column: supported.
Delete a nullable column: not supported.
A non-null column is added to the sink table as a nullable column.
Rename a column: supported. Renames the column in the sink table.
Change column type: not supported. Hologres does not allow column type changes. Use sink.type-normalize-strategy instead.
The following schema changes are not supported:
Changes to constraints such as primary keys or indexes.
Deletion of non-nullable columns.
Changing from NOT NULL to nullable.
In EVOLVE mode, restarting a job statelessly without deleting the sink table may cause schema inconsistency between upstream and sink tables—and job failure. Manually adjust the sink table schema.
For an example of enabling EVOLVE mode, see Enable EVOLVE mode.
Code examples
Wide Type Mapping
Use the sink.type-normalize-strategy parameter to broaden data type mapping.
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.test_source_table
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
# Map CDC data types to broader Hologres types.
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres PipelineWrite to partitioned tables
Convert the upstream timestamp field create_time to date type and use it as the Hologres table partition key.
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.test_source_table
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
# Create missing partitioned tables automatically.
createparttable: true
transform:
- source-table: test_db.test_source_table
projection: \*, DATE_FORMAT(CAST(create_time AS TIMESTAMP), 'yyyy-MM-dd') as partition_key
primary-keys: id, create_time, partition_key
partition-keys: partition_key
description: add partition key
pipeline:
name: MySQL to Hologres PipelineEnable EVOLVE mode
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.test_source_table
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
# Create missing partitioned tables automatically.
createparttable: true
pipeline:
name: MySQL to Hologres Pipeline
schema.change.behavior: evolveSynchronize a single table
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.test_source_table
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
# Map CDC data types to broader Hologres types.
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres PipelineSynchronize all tables in a database
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.\.*
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
# Map CDC data types to broader Hologres types.
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres PipelineMerge sharded tables
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.user\.*
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
# Map CDC data types to broader Hologres types.
sink.type-normalize-strategy: BROADEN
route:
# Merge all sharded tables in MySQL test_db into one Hologres table: test_db.user.
- source-table: test_db.user\.*
sink-table: test_db.user
pipeline:
name: MySQL to Hologres PipelineSynchronize to a specific schema
A schema in Hologres corresponds to a database in MySQL. You can specify the schema for the sink table.
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.user\.*
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
# Map CDC data types to broader Hologres types.
sink.type-normalize-strategy: BROADEN
route:
# Synchronize all tables from MySQL test_db to Hologres test_db2 schema, keeping table names unchanged.
- source-table: test_db.\.*
sink-table: test_db2.<>
replace-symbol: <>
pipeline:
name: MySQL to Hologres PipelineSynchronize new tables without restarting
To synchronize newly created tables in real time while the job runs, set scan.binlog.newly-added-table.enabled to true.
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.\.*
server-id: 5401-5499
# Capture data from new tables created while the job runs.
scan.binlog.newly-added-table.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
# Map CDC data types to broader Hologres types.
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres PipelineRestart Newly Added Existing Table
If you want to add synchronization for existing tables, set scan.newly-added-table.enabled = true and restart the job.
If you already use scan.binlog.newly-added-table.enabled = true to capture new tables, do not use scan.newly-added-table.enabled = true again to capture existing tables after restart. Doing so causes duplicate data.
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.\.*
server-id: 5401-5499
scan.startup.mode: initial
# On restart, check the tables parameter for new tables and run snapshots.
# Requires scan.startup.mode: initial.
scan.newly-added-table.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
# Map CDC data types to broader Hologres types.
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres PipelineExclude tables during full-database sync
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.\.*
# Tables matched by this regex will not be synchronized.
tables.exclude: test_db.table1
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
# Map CDC data types to broader Hologres types.
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres PipelineReferences
For development references on sources, sinks, transforms, and routes, see Flink CDC data ingestion development reference.
For steps to develop a YAML data ingestion job, see Develop a YAML data ingestion job (public preview).