All Products
Search
Document Center

Realtime Compute for Apache Flink:Hologres data ingestion

Last Updated:Mar 26, 2026

Use the Hologres connector to sink data from a Flink Change Data Capture (CDC) YAML pipeline into Hologres in real time. The connector supports both streaming and batch modes, handles schema evolution, and can write to partitioned tables.

Connector overview

Category Value
Supported type Data ingestion sink
Running mode Streaming and batch modes
Data format Not supported
Monitoring metrics numRecordsOut, numRecordsOutPerSecond
API type YAML
Update or delete data in sink tables Yes

For details on the monitoring metrics, see Monitoring metrics.

Quickstart

The following is a minimal working configuration. Replace the placeholders before running.

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <your-endpoint>
  dbname: <your-database>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}

Store your AccessKey ID and AccessKey secret as project variables rather than hardcoding them. The syntax ${secret_values.ak_id} references a variable named ak_id.

To find your endpoint, see Endpoints. To look up your AccessKey pair, see How do I view the AccessKey ID and AccessKey secret?

Usage notes

Read these before configuring the connector:

  • Primary keys are required for exactly-once semantics. If the Hologres physical table has primary keys, the sink applies exactly-once semantics based on those keys. If duplicate primary keys arrive, set mutatetype to control how conflicts are resolved.

  • `sink.type-normalize-strategy` must be set at job start. Changing it after the job starts requires deleting the downstream tables and restarting the job statelessly.

  • `table_property.distribution_key` defaults to the primary key. Do not change it unless necessary — it affects write correctness.

  • Partitioned table writes require the partition key to be part of the primary key. A non-primary-key partition column causes inconsistent primary keys between upstream and downstream, leading to data inconsistency.

  • Hologres does not support column type changes. Use sink.type-normalize-strategy to handle type mapping instead.

  • EVOLVE mode has a restart restriction. In EVOLVE mode, restarting a job statelessly without first deleting the sink table may cause schema inconsistency between upstream and sink tables, and job failure. Manually adjust the sink table schema before restarting.

  • TRY_EVOLVE is not supported. Valid values for schema.change.behavior are IGNORE, LENIENT, EVOLVE, and EXCEPTION.

  • `remove-u0000-in-text.enabled` is off by default. If your source data contains \u0000 null bytes, writing fails with ERROR: invalid byte sequence for encoding "UTF8": 0x00. Set this parameter to true to strip them automatically.

Parameters

Required parameters

Parameter Type Description
type String Set to hologres.
endpoint String The Hologres endpoint. See Endpoints.
dbname String The database name.
username String Your Alibaba Cloud AccessKey ID. Use a project variable: ${secret_values.ak_id}.
password String Your Alibaba Cloud AccessKey secret. Use a project variable: ${secret_values.ak_secret}.

Optional parameters

Parameter Type Default Description
name String None A label for this sink.
mutatetype String INSERT_OR_UPDATE How to handle rows with duplicate primary keys. See Write mode.
createparttable Boolean false Set to true to create missing child partitioned tables automatically based on partition values.
sink.delete-strategy String None How to handle retraction messages. See Delete strategy.
ignoreNullWhenUpdate Boolean false When mutatetype is INSERT_OR_UPDATE, set to true to skip null values instead of writing them to the sink table.
deduplication.enabled Boolean true Deduplicate rows with the same primary key within a batch. Keeps only the last arriving row. When set to false, no deduplication is performed — if new data has the same primary key as data already in the current batch, the batch is written first, then the new data is written.
sink.type-normalize-strategy String STANDARD How to map upstream Flink CDC types to Hologres types. See Data type mapping.
remove-u0000-in-text.enabled Boolean false Strip \u0000 characters from STRING data before writing.
jdbcEnableDefaultForNotNullColumn Boolean true When writing null to a NOT NULL column with no default, fill a type-appropriate default: "" for STRING, 0 for NUMBER, 1970-01-01 00:00:00 for DATE/TIMESTAMP/TIMESTAMPTZ. Set to false to throw an error instead.
table_property.* String None Hologres physical table properties set in the WITH clause when creating a table.

Connection and retry parameters

Parameter Type Default Description
jdbcRetryCount Integer 10 Maximum retries for write and query operations on connection failure.
jdbcRetrySleepInitMs Long 1000 Base wait time before each retry, in milliseconds. Actual wait = jdbcRetrySleepInitMs + retry × jdbcRetrySleepStepMs.
jdbcRetrySleepStepMs Long 5000 Incremental wait time added per retry, in milliseconds. Actual wait = jdbcRetrySleepInitMs + retry × jdbcRetrySleepStepMs.
jdbcConnectionMaxIdleMs Long 60000 Maximum idle time for a JDBC connection, in milliseconds. Idle connections beyond this threshold are closed.
jdbcMetaCacheTTL Long 60000 Time-to-live (TTL) for cached table schema metadata, in milliseconds.
jdbcMetaAutoRefreshFactor Integer 4 Triggers an automatic metadata cache refresh when the remaining TTL falls below the trigger time. The remaining cache time is calculated as: Remaining time = TTL - Elapsed time. After the cache is automatically refreshed, the elapsed time is reset to 0. Trigger time = jdbcMetaCacheTTL / jdbcMetaAutoRefreshFactor.

Batch write parameters

A write is triggered when any one of the following thresholds is reached:

Parameter Type Default Description
jdbcWriteBatchSize Integer 256 Maximum rows per batch.
jdbcWriteBatchByteSize Long 2097152 (2 MB) Maximum bytes per batch.
jdbcWriteFlushInterval Long 10000 Maximum wait time before flushing a batch, in milliseconds.

SSL parameters

Parameter Type Default Description
connection.ssl.mode String disable SSL encryption mode. See SSL encryption.
connection.ssl.root-cert.location String None Path to the CA certificate file. Required when connection.ssl.mode is verify-ca or verify-full.

Write mode

Use mutatetype to control how the sink handles rows with duplicate primary keys:

Value Behavior
INSERT_OR_UPDATE (default) Updates only the columns being written. Other columns remain unchanged.
INSERT_OR_REPLACE Replaces the entire existing row with the new one.
INSERT_OR_IGNORE Keeps the first row and ignores later duplicates.

Example: A table has columns a (primary key), b, c, d. Only a and b are written. With INSERT_OR_UPDATE, a duplicate primary key updates only b. Columns c and d stay unchanged.

Delete strategy

Use sink.delete-strategy to control how the sink handles retraction messages:

Value Behavior
IGNORE_DELETE Ignores UPDATE BEFORE and DELETE messages. Use when you only insert or update data, never delete.
DELETE_ROW_ON_PK Deletes by primary key. Updates run as delete-then-insert to ensure accuracy.

SSL encryption

connection.ssl.mode accepts the following values:

Value Behavior
disable (default) No encryption in transit.
require Enables SSL and encrypts the data link only.
verify-ca Encrypts the data link and verifies the Hologres server identity using a CA certificate.
verify-full Encrypts the data link, verifies the Hologres server identity using a CA certificate, and checks that the CN or DNS in the certificate matches the configured endpoint.
verify-ca and verify-full require Hologres 2.1 or later. See Encryption in transit. When using either mode, also set connection.ssl.root-cert.location.

To set connection.ssl.root-cert.location, upload the CA certificate through File Management in the Realtime Compute console. Uploaded files go to /flink/usrlib. For example, if your certificate file is named certificate.crt, set this parameter to /flink/usrlib/certificate.crt. To download the CA certificate, see Encryption in transit — Download the CA certificate.

Data type mapping

Use sink.type-normalize-strategy to control how Flink CDC types map to Hologres types.

Note
  • Set sink.type-normalize-strategy when starting the job for the first time. Changing it after startup requires deleting the downstream tables and restarting the job statelessly.

  • Supported array element types: INTEGER, BIGINT, FLOAT, DOUBLE, BOOLEAN, CHAR, and VARCHAR.

  • Hologres does not support NUMERIC as a primary key. If a primary key maps to NUMERIC, Hologres converts it to VARCHAR.

STANDARD (default)

Maps Flink CDC types to PostgreSQL-standard Hologres types.

Flink CDC type Hologres type Notes
CHAR bpchar Fixed-length character type.
STRING text
VARCHAR (length ≤ 10,485,760 bytes) varchar Lengths up to 10 MB map to varchar.
VARCHAR (length > 10,485,760 bytes) text Lengths exceeding 10 MB fall back to text.
BOOLEAN bool
BINARY bytea
VARBINARY bytea
DECIMAL numeric
TINYINT int2 TINYINT and SMALLINT both map to int2 (16-bit integer).
SMALLINT int2
INTEGER int4
BIGINT int8
FLOAT float4
DOUBLE float8
DATE date
TIME_WITHOUT_TIME_ZONE time
TIMESTAMP_WITHOUT_TIME_ZONE timestamp
TIMESTAMP_WITH_LOCAL_TIME_ZONE timestamptz
ARRAY Arrays of various types Supported element types: INTEGER, BIGINT, FLOAT, DOUBLE, BOOLEAN, CHAR, VARCHAR.
MAP Not supported
ROW Not supported

BROADEN

Maps Flink CDC types to wider Hologres types to reduce schema mismatch errors. Use when write compatibility is more important than type precision.

Flink CDC type Hologres type Notes
CHAR text All character types map to text for maximum compatibility.
STRING text
VARCHAR text
BOOLEAN bool
BINARY bytea
VARBINARY bytea
DECIMAL numeric
TINYINT int8 All integer types widen to int8 to prevent overflow.
SMALLINT int8
INTEGER int8
BIGINT int8
FLOAT float8 Both float types widen to float8 for precision.
DOUBLE float8
DATE date
TIME_WITHOUT_TIME_ZONE time
TIMESTAMP_WITHOUT_TIME_ZONE timestamp
TIMESTAMP_WITH_LOCAL_TIME_ZONE timestamptz
ARRAY Arrays of various types Supported element types: INTEGER, BIGINT, FLOAT, DOUBLE, BOOLEAN, CHAR, VARCHAR.
MAP Not supported
ROW Not supported

ONLY_BIGINT_OR_TEXT

Maps all Flink CDC types to either BIGINT or text in Hologres. Use when maximum type compatibility is required.

Flink CDC type Hologres type Notes
TINYINT int8 All integer types map to int8.
SMALLINT int8
INTEGER int8
BIGINT int8
BOOLEAN text All non-integer types map to text.
BINARY text
VARBINARY text
DECIMAL text
FLOAT text
DOUBLE text
DATE text
TIME_WITHOUT_TIME_ZONE text
TIMESTAMP_WITHOUT_TIME_ZONE text
TIMESTAMP_WITH_LOCAL_TIME_ZONE text
ARRAY Arrays of various types Supported element types: INTEGER, BIGINT, FLOAT, DOUBLE, BOOLEAN, CHAR, VARCHAR.
MAP Not supported
ROW Not supported

Schema evolution

Set schema.change.behavior at the pipeline level to control how schema changes in source tables are applied to sink tables. The Hologres connector supports IGNORE, LENIENT, EVOLVE, and EXCEPTION. TRY_EVOLVE is not supported.

LENIENT (default)

Schema change Behavior
Add nullable column Adds the column to the end of the sink table schema and synchronizes data.
Delete nullable column Fills the column with null. The column is not removed from the sink table.
Add non-nullable column Adds the column to the end of the sink table schema as nullable. Pre-existing rows default to null.
Rename column Treated as add + delete. The renamed column is added to the end of the sink table. The original column is filled with null.
Change column type Not supported. Use sink.type-normalize-strategy instead.
Change constraints (primary keys, indexes) Not supported.
Delete non-nullable column Not supported.
Change NOT NULL to nullable Not supported.

Example: Renaming col_a to col_b adds col_b to the end of the sink table and fills col_a with null.

EVOLVE

Schema change Behavior
Add nullable column Supported.
Delete nullable column Not supported.
Add non-nullable column Added to the sink table as a nullable column.
Rename column Supported. Renames the column in the sink table directly.
Change column type Not supported. Use sink.type-normalize-strategy instead.
Change constraints (primary keys, indexes) Not supported.
Delete non-nullable column Not supported.
Change NOT NULL to nullable Not supported.
Warning

In EVOLVE mode, restarting a job statelessly without deleting the sink table may cause schema inconsistency between upstream and sink tables, leading to job failure. Manually adjust the sink table schema before restarting.

Write to partitioned tables

The Hologres sink supports writing to partitioned tables. Use the transform module to compute the partition key from upstream data.

Requirements:

  • The partition key must be part of the primary key. A non-primary-key partition column causes inconsistent primary keys between upstream and downstream, leading to data inconsistency during synchronization.

  • Supported partition key types: TEXT, VARCHAR, and INT. Hologres V1.3.22 and later also support DATE.

  • Set createparttable: true to create child partitioned tables automatically. Otherwise, create them manually before starting the job.

For an example, see Write to partitioned tables in the code examples section.

Code examples

Quickstart: sync a single table

The simplest end-to-end configuration: sync one MySQL table to Hologres with wide type mapping.

source:
  type: mysql
  name: MySQL Source
  hostname: <your-hostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <your-endpoint>
  dbname: <your-database>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

Sync all tables in a database

Set tables to a regex pattern to capture all tables in a database.

source:
  type: mysql
  name: MySQL Source
  hostname: <your-hostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db\..*
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <your-endpoint>
  dbname: <your-database>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

Merge sharded tables

Use a route rule to merge multiple sharded tables from MySQL into a single Hologres table.

source:
  type: mysql
  name: MySQL Source
  hostname: <your-hostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.user\..*
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <your-endpoint>
  dbname: <your-database>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

route:
  # Merge all sharded tables in MySQL test_db into one Hologres table: test_db.user.
  - source-table: test_db.user\..*
    sink-table: test_db.user

pipeline:
  name: MySQL to Hologres Pipeline

Write to partitioned tables

Convert the upstream timestamp field create_time to a DATE partition key using the transform module.

source:
  type: mysql
  name: MySQL Source
  hostname: <your-hostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <your-endpoint>
  dbname: <your-database>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  # Create missing child partitioned tables automatically.
  createparttable: true

transform:
  - source-table: test_db.test_source_table
    projection: \*, DATE_FORMAT(CAST(create_time AS TIMESTAMP), 'yyyy-MM-dd') as partition_key
    primary-keys: id, create_time, partition_key
    partition-keys: partition_key
    description: add partition key

pipeline:
  name: MySQL to Hologres Pipeline

Sync to a specific schema

A schema in Hologres corresponds to a database in MySQL. Use a route rule with replace-symbol to redirect all tables from the source database into a target Hologres schema.

source:
  type: mysql
  name: MySQL Source
  hostname: <your-hostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.user\..*
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <your-endpoint>
  dbname: <your-database>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

route:
  # Sync all tables from MySQL test_db to Hologres schema test_db2, keeping table names unchanged.
  - source-table: test_db\..*
    sink-table: test_db2.<>
    replace-symbol: <>

pipeline:
  name: MySQL to Hologres Pipeline

Enable EVOLVE mode

Set schema.change.behavior: evolve at the pipeline level to enable schema evolution.

source:
  type: mysql
  name: MySQL Source
  hostname: <your-hostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <your-endpoint>
  dbname: <your-database>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  createparttable: true

pipeline:
  name: MySQL to Hologres Pipeline
  schema.change.behavior: evolve

Sync newly created tables without restarting

Set scan.binlog.newly-added-table.enabled: true to capture tables created after the job starts.

source:
  type: mysql
  name: MySQL Source
  hostname: <your-hostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db\..*
  server-id: 5401-5499
  # Capture data from new tables created while the job is running.
  scan.binlog.newly-added-table.enabled: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <your-endpoint>
  dbname: <your-database>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

Add existing tables on restart

To add synchronization for existing tables, set scan.newly-added-table.enabled: true and restart the job.

Warning

Do not combine scan.binlog.newly-added-table.enabled: true (for tables created at runtime) with scan.newly-added-table.enabled: true (for tables added on restart) in the same job. Using both causes duplicate data.

source:
  type: mysql
  name: MySQL Source
  hostname: <your-hostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db\..*
  server-id: 5401-5499
  scan.startup.mode: initial
  # On restart, detect new tables in the tables parameter and run a full snapshot for them.
  # Requires scan.startup.mode: initial.
  scan.newly-added-table.enabled: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <your-endpoint>
  dbname: <your-database>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

Exclude tables during full-database sync

Use tables.exclude to skip specific tables when syncing an entire database.

source:
  type: mysql
  name: MySQL Source
  hostname: <your-hostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db\..*
  # Tables matching this regex are not synchronized.
  tables.exclude: test_db.table1
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <your-endpoint>
  dbname: <your-database>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

What's next