Use the Hologres connector to sink data from a Flink Change Data Capture (CDC) YAML pipeline into Hologres in real time. The connector supports both streaming and batch modes, handles schema evolution, and can write to partitioned tables.
Connector overview
| Category | Value |
|---|---|
| Supported type | Data ingestion sink |
| Running mode | Streaming and batch modes |
| Data format | Not supported |
| Monitoring metrics | numRecordsOut, numRecordsOutPerSecond |
| API type | YAML |
| Update or delete data in sink tables | Yes |
For details on the monitoring metrics, see Monitoring metrics.
Quickstart
The following is a minimal working configuration. Replace the placeholders before running.
sink:
type: hologres
name: Hologres Sink
endpoint: <your-endpoint>
dbname: <your-database>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
Store your AccessKey ID and AccessKey secret as project variables rather than hardcoding them. The syntax ${secret_values.ak_id} references a variable named ak_id.
To find your endpoint, see Endpoints. To look up your AccessKey pair, see How do I view the AccessKey ID and AccessKey secret?
Usage notes
Read these before configuring the connector:
-
Primary keys are required for exactly-once semantics. If the Hologres physical table has primary keys, the sink applies exactly-once semantics based on those keys. If duplicate primary keys arrive, set
mutatetypeto control how conflicts are resolved. -
`sink.type-normalize-strategy` must be set at job start. Changing it after the job starts requires deleting the downstream tables and restarting the job statelessly.
-
`table_property.distribution_key` defaults to the primary key. Do not change it unless necessary — it affects write correctness.
-
Partitioned table writes require the partition key to be part of the primary key. A non-primary-key partition column causes inconsistent primary keys between upstream and downstream, leading to data inconsistency.
-
Hologres does not support column type changes. Use
sink.type-normalize-strategyto handle type mapping instead. -
EVOLVE mode has a restart restriction. In EVOLVE mode, restarting a job statelessly without first deleting the sink table may cause schema inconsistency between upstream and sink tables, and job failure. Manually adjust the sink table schema before restarting.
-
TRY_EVOLVE is not supported. Valid values for
schema.change.behaviorare IGNORE, LENIENT, EVOLVE, and EXCEPTION. -
`remove-u0000-in-text.enabled` is off by default. If your source data contains
\u0000null bytes, writing fails withERROR: invalid byte sequence for encoding "UTF8": 0x00. Set this parameter totrueto strip them automatically.
Parameters
Required parameters
| Parameter | Type | Description |
|---|---|---|
type |
String | Set to hologres. |
endpoint |
String | The Hologres endpoint. See Endpoints. |
dbname |
String | The database name. |
username |
String | Your Alibaba Cloud AccessKey ID. Use a project variable: ${secret_values.ak_id}. |
password |
String | Your Alibaba Cloud AccessKey secret. Use a project variable: ${secret_values.ak_secret}. |
Optional parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
String | None | A label for this sink. |
mutatetype |
String | INSERT_OR_UPDATE |
How to handle rows with duplicate primary keys. See Write mode. |
createparttable |
Boolean | false |
Set to true to create missing child partitioned tables automatically based on partition values. |
sink.delete-strategy |
String | None | How to handle retraction messages. See Delete strategy. |
ignoreNullWhenUpdate |
Boolean | false |
When mutatetype is INSERT_OR_UPDATE, set to true to skip null values instead of writing them to the sink table. |
deduplication.enabled |
Boolean | true |
Deduplicate rows with the same primary key within a batch. Keeps only the last arriving row. When set to false, no deduplication is performed — if new data has the same primary key as data already in the current batch, the batch is written first, then the new data is written. |
sink.type-normalize-strategy |
String | STANDARD |
How to map upstream Flink CDC types to Hologres types. See Data type mapping. |
remove-u0000-in-text.enabled |
Boolean | false |
Strip \u0000 characters from STRING data before writing. |
jdbcEnableDefaultForNotNullColumn |
Boolean | true |
When writing null to a NOT NULL column with no default, fill a type-appropriate default: "" for STRING, 0 for NUMBER, 1970-01-01 00:00:00 for DATE/TIMESTAMP/TIMESTAMPTZ. Set to false to throw an error instead. |
table_property.* |
String | None | Hologres physical table properties set in the WITH clause when creating a table. |
Connection and retry parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
jdbcRetryCount |
Integer | 10 |
Maximum retries for write and query operations on connection failure. |
jdbcRetrySleepInitMs |
Long | 1000 |
Base wait time before each retry, in milliseconds. Actual wait = jdbcRetrySleepInitMs + retry × jdbcRetrySleepStepMs. |
jdbcRetrySleepStepMs |
Long | 5000 |
Incremental wait time added per retry, in milliseconds. Actual wait = jdbcRetrySleepInitMs + retry × jdbcRetrySleepStepMs. |
jdbcConnectionMaxIdleMs |
Long | 60000 |
Maximum idle time for a JDBC connection, in milliseconds. Idle connections beyond this threshold are closed. |
jdbcMetaCacheTTL |
Long | 60000 |
Time-to-live (TTL) for cached table schema metadata, in milliseconds. |
jdbcMetaAutoRefreshFactor |
Integer | 4 |
Triggers an automatic metadata cache refresh when the remaining TTL falls below the trigger time. The remaining cache time is calculated as: Remaining time = TTL - Elapsed time. After the cache is automatically refreshed, the elapsed time is reset to 0. Trigger time = jdbcMetaCacheTTL / jdbcMetaAutoRefreshFactor. |
Batch write parameters
A write is triggered when any one of the following thresholds is reached:
| Parameter | Type | Default | Description |
|---|---|---|---|
jdbcWriteBatchSize |
Integer | 256 |
Maximum rows per batch. |
jdbcWriteBatchByteSize |
Long | 2097152 (2 MB) |
Maximum bytes per batch. |
jdbcWriteFlushInterval |
Long | 10000 |
Maximum wait time before flushing a batch, in milliseconds. |
SSL parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
connection.ssl.mode |
String | disable |
SSL encryption mode. See SSL encryption. |
connection.ssl.root-cert.location |
String | None | Path to the CA certificate file. Required when connection.ssl.mode is verify-ca or verify-full. |
Write mode
Use mutatetype to control how the sink handles rows with duplicate primary keys:
| Value | Behavior |
|---|---|
INSERT_OR_UPDATE (default) |
Updates only the columns being written. Other columns remain unchanged. |
INSERT_OR_REPLACE |
Replaces the entire existing row with the new one. |
INSERT_OR_IGNORE |
Keeps the first row and ignores later duplicates. |
Example: A table has columns a (primary key), b, c, d. Only a and b are written. With INSERT_OR_UPDATE, a duplicate primary key updates only b. Columns c and d stay unchanged.
Delete strategy
Use sink.delete-strategy to control how the sink handles retraction messages:
| Value | Behavior |
|---|---|
IGNORE_DELETE |
Ignores UPDATE BEFORE and DELETE messages. Use when you only insert or update data, never delete. |
DELETE_ROW_ON_PK |
Deletes by primary key. Updates run as delete-then-insert to ensure accuracy. |
SSL encryption
connection.ssl.mode accepts the following values:
| Value | Behavior |
|---|---|
disable (default) |
No encryption in transit. |
require |
Enables SSL and encrypts the data link only. |
verify-ca |
Encrypts the data link and verifies the Hologres server identity using a CA certificate. |
verify-full |
Encrypts the data link, verifies the Hologres server identity using a CA certificate, and checks that the CN or DNS in the certificate matches the configured endpoint. |
verify-caandverify-fullrequire Hologres 2.1 or later. See Encryption in transit. When using either mode, also setconnection.ssl.root-cert.location.
To set connection.ssl.root-cert.location, upload the CA certificate through File Management in the Realtime Compute console. Uploaded files go to /flink/usrlib. For example, if your certificate file is named certificate.crt, set this parameter to /flink/usrlib/certificate.crt. To download the CA certificate, see Encryption in transit — Download the CA certificate.
Data type mapping
Use sink.type-normalize-strategy to control how Flink CDC types map to Hologres types.
-
Set
sink.type-normalize-strategywhen starting the job for the first time. Changing it after startup requires deleting the downstream tables and restarting the job statelessly. -
Supported array element types: INTEGER, BIGINT, FLOAT, DOUBLE, BOOLEAN, CHAR, and VARCHAR.
-
Hologres does not support NUMERIC as a primary key. If a primary key maps to NUMERIC, Hologres converts it to VARCHAR.
STANDARD (default)
Maps Flink CDC types to PostgreSQL-standard Hologres types.
| Flink CDC type | Hologres type | Notes |
|---|---|---|
| CHAR | bpchar | Fixed-length character type. |
| STRING | text | |
| VARCHAR (length ≤ 10,485,760 bytes) | varchar | Lengths up to 10 MB map to varchar. |
| VARCHAR (length > 10,485,760 bytes) | text | Lengths exceeding 10 MB fall back to text. |
| BOOLEAN | bool | |
| BINARY | bytea | |
| VARBINARY | bytea | |
| DECIMAL | numeric | |
| TINYINT | int2 | TINYINT and SMALLINT both map to int2 (16-bit integer). |
| SMALLINT | int2 | |
| INTEGER | int4 | |
| BIGINT | int8 | |
| FLOAT | float4 | |
| DOUBLE | float8 | |
| DATE | date | |
| TIME_WITHOUT_TIME_ZONE | time | |
| TIMESTAMP_WITHOUT_TIME_ZONE | timestamp | |
| TIMESTAMP_WITH_LOCAL_TIME_ZONE | timestamptz | |
| ARRAY | Arrays of various types | Supported element types: INTEGER, BIGINT, FLOAT, DOUBLE, BOOLEAN, CHAR, VARCHAR. |
| MAP | Not supported | |
| ROW | Not supported |
BROADEN
Maps Flink CDC types to wider Hologres types to reduce schema mismatch errors. Use when write compatibility is more important than type precision.
| Flink CDC type | Hologres type | Notes |
|---|---|---|
| CHAR | text | All character types map to text for maximum compatibility. |
| STRING | text | |
| VARCHAR | text | |
| BOOLEAN | bool | |
| BINARY | bytea | |
| VARBINARY | bytea | |
| DECIMAL | numeric | |
| TINYINT | int8 | All integer types widen to int8 to prevent overflow. |
| SMALLINT | int8 | |
| INTEGER | int8 | |
| BIGINT | int8 | |
| FLOAT | float8 | Both float types widen to float8 for precision. |
| DOUBLE | float8 | |
| DATE | date | |
| TIME_WITHOUT_TIME_ZONE | time | |
| TIMESTAMP_WITHOUT_TIME_ZONE | timestamp | |
| TIMESTAMP_WITH_LOCAL_TIME_ZONE | timestamptz | |
| ARRAY | Arrays of various types | Supported element types: INTEGER, BIGINT, FLOAT, DOUBLE, BOOLEAN, CHAR, VARCHAR. |
| MAP | Not supported | |
| ROW | Not supported |
ONLY_BIGINT_OR_TEXT
Maps all Flink CDC types to either BIGINT or text in Hologres. Use when maximum type compatibility is required.
| Flink CDC type | Hologres type | Notes |
|---|---|---|
| TINYINT | int8 | All integer types map to int8. |
| SMALLINT | int8 | |
| INTEGER | int8 | |
| BIGINT | int8 | |
| BOOLEAN | text | All non-integer types map to text. |
| BINARY | text | |
| VARBINARY | text | |
| DECIMAL | text | |
| FLOAT | text | |
| DOUBLE | text | |
| DATE | text | |
| TIME_WITHOUT_TIME_ZONE | text | |
| TIMESTAMP_WITHOUT_TIME_ZONE | text | |
| TIMESTAMP_WITH_LOCAL_TIME_ZONE | text | |
| ARRAY | Arrays of various types | Supported element types: INTEGER, BIGINT, FLOAT, DOUBLE, BOOLEAN, CHAR, VARCHAR. |
| MAP | Not supported | |
| ROW | Not supported |
Schema evolution
Set schema.change.behavior at the pipeline level to control how schema changes in source tables are applied to sink tables. The Hologres connector supports IGNORE, LENIENT, EVOLVE, and EXCEPTION. TRY_EVOLVE is not supported.
LENIENT (default)
| Schema change | Behavior |
|---|---|
| Add nullable column | Adds the column to the end of the sink table schema and synchronizes data. |
| Delete nullable column | Fills the column with null. The column is not removed from the sink table. |
| Add non-nullable column | Adds the column to the end of the sink table schema as nullable. Pre-existing rows default to null. |
| Rename column | Treated as add + delete. The renamed column is added to the end of the sink table. The original column is filled with null. |
| Change column type | Not supported. Use sink.type-normalize-strategy instead. |
| Change constraints (primary keys, indexes) | Not supported. |
| Delete non-nullable column | Not supported. |
| Change NOT NULL to nullable | Not supported. |
Example: Renaming col_a to col_b adds col_b to the end of the sink table and fills col_a with null.
EVOLVE
| Schema change | Behavior |
|---|---|
| Add nullable column | Supported. |
| Delete nullable column | Not supported. |
| Add non-nullable column | Added to the sink table as a nullable column. |
| Rename column | Supported. Renames the column in the sink table directly. |
| Change column type | Not supported. Use sink.type-normalize-strategy instead. |
| Change constraints (primary keys, indexes) | Not supported. |
| Delete non-nullable column | Not supported. |
| Change NOT NULL to nullable | Not supported. |
In EVOLVE mode, restarting a job statelessly without deleting the sink table may cause schema inconsistency between upstream and sink tables, leading to job failure. Manually adjust the sink table schema before restarting.
Write to partitioned tables
The Hologres sink supports writing to partitioned tables. Use the transform module to compute the partition key from upstream data.
Requirements:
-
The partition key must be part of the primary key. A non-primary-key partition column causes inconsistent primary keys between upstream and downstream, leading to data inconsistency during synchronization.
-
Supported partition key types: TEXT, VARCHAR, and INT. Hologres V1.3.22 and later also support DATE.
-
Set
createparttable: trueto create child partitioned tables automatically. Otherwise, create them manually before starting the job.
For an example, see Write to partitioned tables in the code examples section.
Code examples
Quickstart: sync a single table
The simplest end-to-end configuration: sync one MySQL table to Hologres with wide type mapping.
source:
type: mysql
name: MySQL Source
hostname: <your-hostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.test_source_table
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <your-endpoint>
dbname: <your-database>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres Pipeline
Sync all tables in a database
Set tables to a regex pattern to capture all tables in a database.
source:
type: mysql
name: MySQL Source
hostname: <your-hostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db\..*
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <your-endpoint>
dbname: <your-database>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres Pipeline
Merge sharded tables
Use a route rule to merge multiple sharded tables from MySQL into a single Hologres table.
source:
type: mysql
name: MySQL Source
hostname: <your-hostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.user\..*
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <your-endpoint>
dbname: <your-database>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
route:
# Merge all sharded tables in MySQL test_db into one Hologres table: test_db.user.
- source-table: test_db.user\..*
sink-table: test_db.user
pipeline:
name: MySQL to Hologres Pipeline
Write to partitioned tables
Convert the upstream timestamp field create_time to a DATE partition key using the transform module.
source:
type: mysql
name: MySQL Source
hostname: <your-hostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.test_source_table
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <your-endpoint>
dbname: <your-database>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
# Create missing child partitioned tables automatically.
createparttable: true
transform:
- source-table: test_db.test_source_table
projection: \*, DATE_FORMAT(CAST(create_time AS TIMESTAMP), 'yyyy-MM-dd') as partition_key
primary-keys: id, create_time, partition_key
partition-keys: partition_key
description: add partition key
pipeline:
name: MySQL to Hologres Pipeline
Sync to a specific schema
A schema in Hologres corresponds to a database in MySQL. Use a route rule with replace-symbol to redirect all tables from the source database into a target Hologres schema.
source:
type: mysql
name: MySQL Source
hostname: <your-hostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.user\..*
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <your-endpoint>
dbname: <your-database>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
route:
# Sync all tables from MySQL test_db to Hologres schema test_db2, keeping table names unchanged.
- source-table: test_db\..*
sink-table: test_db2.<>
replace-symbol: <>
pipeline:
name: MySQL to Hologres Pipeline
Enable EVOLVE mode
Set schema.change.behavior: evolve at the pipeline level to enable schema evolution.
source:
type: mysql
name: MySQL Source
hostname: <your-hostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.test_source_table
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <your-endpoint>
dbname: <your-database>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
createparttable: true
pipeline:
name: MySQL to Hologres Pipeline
schema.change.behavior: evolve
Sync newly created tables without restarting
Set scan.binlog.newly-added-table.enabled: true to capture tables created after the job starts.
source:
type: mysql
name: MySQL Source
hostname: <your-hostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db\..*
server-id: 5401-5499
# Capture data from new tables created while the job is running.
scan.binlog.newly-added-table.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: <your-endpoint>
dbname: <your-database>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres Pipeline
Add existing tables on restart
To add synchronization for existing tables, set scan.newly-added-table.enabled: true and restart the job.
Do not combine scan.binlog.newly-added-table.enabled: true (for tables created at runtime) with scan.newly-added-table.enabled: true (for tables added on restart) in the same job. Using both causes duplicate data.
source:
type: mysql
name: MySQL Source
hostname: <your-hostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db\..*
server-id: 5401-5499
scan.startup.mode: initial
# On restart, detect new tables in the tables parameter and run a full snapshot for them.
# Requires scan.startup.mode: initial.
scan.newly-added-table.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: <your-endpoint>
dbname: <your-database>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres Pipeline
Exclude tables during full-database sync
Use tables.exclude to skip specific tables when syncing an entire database.
source:
type: mysql
name: MySQL Source
hostname: <your-hostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db\..*
# Tables matching this regex are not synchronized.
tables.exclude: test_db.table1
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <your-endpoint>
dbname: <your-database>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres Pipeline
What's next
-
For source, sink, transform, and route reference documentation, see Flink CDC data ingestion development reference.
-
To create and run a YAML data ingestion job, see Develop a YAML data ingestion job (public preview).