All Products
Search
Document Center

Realtime Compute for Apache Flink:Hologres connector for data ingestion (YAML)

Last Updated:Feb 06, 2026

This topic describes how to use the Hologres connector to synchronize data in a YAML-based data ingestion job.

Background information

Hologres is an end-to-end real-time data warehouse engine. It supports large-scale real-time data ingestion, updates, and analytics. It uses standard SQL and is compatible with the PostgreSQL protocol. It supports OLAP and ad hoc analysis on petabytes of data. It delivers high-concurrency, low-latency online data services. It integrates tightly with MaxCompute, Flink, and DataWorks to provide full-stack online and offline data warehouse solutions. The following table lists the capabilities of the Hologres YAML connector.

Category

Description

Supported type

Data ingestion sink

Running mode

Streaming and batch modes

Data format

Not supported

Monitoring metrics

  • numRecordsOut

  • numRecordsOutPerSecond

Note

For more information about the metrics, see Monitoring metrics.

API type

YAML

Support for updating or deleting data in sink tables

Yes

Features

Feature

Description

Synchronize all tables in a database

Synchronizes full and incremental data from all tables in a database—or multiple tables—to each corresponding sink table.

Synchronize table schema changes

While synchronizing all tables in a database, also synchronizes schema changes—such as adding, deleting, or renaming columns—in each source table to its sink table in real time.

Merge sharded tables

Uses regular expressions to match source tables across multiple sharded databases. After merging the data, synchronizes it to downstream sink tables with matching names.

Write to partitioned tables

Writes data from an upstream table to a Hologres partitioned table.

Data type mapping

Maps upstream data types to wider Hologres data types using multiple mapping strategies.

Syntax

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}

Parameters

Parameter

Description

Data type

Required

Default value

Remarks

type

The sink type.

String

Yes

None

Set to hologres.

name

The sink name.

String

No

None

None.

dbname

The database name.

String

Yes

None

None.

username

The username. Enter your Alibaba Cloud account's AccessKey ID.

String

Yes

None

For more information, see How do I view the AccessKey ID and AccessKey secret?

Important

To protect your AccessKey pair, use variables to configure the AccessKey ID. For more information, see Project variables.

password

The password. Enter your Alibaba Cloud account's AccessKey secret.

String

Yes

None

endpoint

The Hologres endpoint.

String

Yes

None

For more information, see Endpoints.

jdbcRetryCount

The maximum number of retries for write and query operations if a connection fails.

Integer

No

10

None.

jdbcRetrySleepInitMs

The fixed wait time before each retry.

Long

No

1000

Unit: milliseconds. The actual wait time is calculated as jdbcRetrySleepInitMs + retry × jdbcRetrySleepStepMs.

jdbcRetrySleepStepMs

The incremental wait time added before each retry.

Long

No

5000

Unit: milliseconds. The actual wait time is calculated as jdbcRetrySleepInitMs + retry × jdbcRetrySleepStepMs.

jdbcConnectionMaxIdleMs

The maximum idle time for a JDBC connection.

Long

No

60000

Unit: milliseconds. If a connection remains idle longer than this value, it closes and releases.

jdbcMetaCacheTTL

The time-to-live (TTL) for cached TableSchema information.

Long

No

60000

Unit: milliseconds.

jdbcMetaAutoRefreshFactor

If the remaining cache time is less than the trigger time, the system refreshes the cache automatically.

Integer

No

4

The remaining time of the cache is calculated using the following formula: Remaining time = Time-to-live (TTL) - Elapsed time. After the cache is automatically refreshed, the elapsed time is reset to 0.

Trigger time = jdbcMetaCacheTTL / jdbcMetaAutoRefreshFactor.

mutatetype

The data write mode.

String

No

INSERT_OR_UPDATE

If primary keys are configured in the Hologres physical table, the Hologres streaming sink uses the exactly-once semantics based on the primary keys. If multiple records with the same primary key appear, you must specify the mutatetype parameter to determine how the sink table is updated. Valid values of the mutatetype parameter:

  • INSERT_OR_IGNORE: keeps the first record and ignores later duplicates.

  • INSERT_OR_REPLACE: replaces the entire existing row with the new one.

  • INSERT_OR_UPDATE: updates only specified columns. For example, a table has fields a, b, c, and d. Field a is the primary key. Only a and b are written to Hologres. On duplicate primary keys, only b updates. Fields c and d remain unchanged.

createparttable

Whether to create missing partitioned tables automatically based on partition values.

Boolean

No

false

None.

sink.delete-strategy

How to handle retraction messages.

String

No

None

Valid values:

  • IGNORE_DELETE: ignores UPDATE BEFORE and DELETE messages. Use this for scenarios where you insert or update data but never delete.

  • DELETE_ROW_ON_PK: applies deletions by primary key. Updates run as delete-then-insert to ensure accuracy.

jdbcWriteBatchSize

The maximum number of rows per batch when writing in JDBC mode.

Integer

No

256

Unit: rows.

Note

Only one of jdbcWriteBatchSize, jdbcWriteBatchByteSize, or jdbcWriteFlushInterval needs to be satisfied to trigger a write.

jdbcWriteBatchByteSize

The maximum number of bytes per batch when writing in JDBC mode.

Long

No

2097152 (2 × 1024 × 1024 bytes), or 2 MB

Note

Only one of jdbcWriteBatchSize, jdbcWriteBatchByteSize, or jdbcWriteFlushInterval needs to be satisfied to trigger a write.

jdbcWriteFlushInterval

The maximum wait time before flushing a batch to Hologres in JDBC mode.

Long

No

10000

Unit: milliseconds.

Note

Only one of jdbcWriteBatchSize, jdbcWriteBatchByteSize, or jdbcWriteFlushInterval needs to be satisfied to trigger a write.

ignoreNullWhenUpdate

Whether to ignore null values in update writes when mutatetype='insertOrUpdate'.

Boolean

No

false

Valid values:

  • false (default): writes null values to the Hologres sink table.

  • true: ignores null values in update writes.

jdbcEnableDefaultForNotNullColumn

Whether to let the connector fill a default value when writing null to a NOT NULL column without a default in the Hologres table.

Boolean

No

true

Valid values:

  • true (default): lets the connector fill and write defaults. Rules:

    • If the column is STRING, write empty string ("").

    • If the column is NUMBER, write 0.

    • If the column is DATE, TIMESTAMP, or TIMESTAMPTZ, write 1970-01-01 00:00:00.

  • false: does not fill defaults. Writing null to a NOT NULL column throws an error.

remove-u0000-in-text.enabled

Whether to remove \u0000 invalid characters from STRING data before writing.

Boolean

No

false

Valid values:

  • false (default): The connector does not operate on data. However, if it encounters dirty data, writing operations may throw the following exception: ERROR: invalid byte sequence for encoding "UTF8": 0x00

    In this case, you need to remove the dirty data from the source table in advance or define the dirty data processing logic in the SQL statement.

  • true: connector removes \u0000 from STRING data to prevent errors.

deduplication.enabled

Whether to deduplicate during batch writes in JDBC or jdbc_fixed mode.

Boolean

No

true

Valid values:

  • true (default): deduplicates rows with the same primary key in a batch. Keeps only the last arriving row. Example: two fields, first field is PK.

    • INSERT (1,'a') and INSERT (1,'b') arrive in order. After deduplication, only (1,'b') writes to the Hologres sink table.

    • (1,'a') exists in the sink table. Then DELETE (1,'a') and INSERT (1,'b') arrive in order. Only (1,'b') writes. This behaves as direct update—not delete-then-insert.

  • false: no deduplication during batching. If new data has the same PK as current batch data, write the batch first, then write the new data.

sink.type-normalize-strategy

The data mapping strategy.

String

No

STANDARD

The strategy used when the Hologres sink converts upstream data types to Hologres types.

  • STANDARD: maps Flink CDC types to PostgreSQL types per standards.

  • BROADEN: maps Flink CDC types to broader Hologres types.

  • ONLY_BIGINT_OR_TEXT: maps all Flink CDC types to BIGINT or STRING in Hologres.

table_property.*

Hologres physical table properties.

String

No

None

When creating a Hologres table, you can set physical table properties in the WITH clause. Proper settings help organize and query data efficiently.

Warning

The default value of table_property.distribution_key is the primary key. Do not change it unless necessary—it affects write correctness.

connection.ssl.mode

Whether to enable SSL encryption in transit, and which mode to use.

String

No

disable

  • disable (default): disables encryption in transit.

  • require: enables SSL and encrypts the data link only.

  • verify-ca: enables SSL, encrypts the data link, and verifies the Hologres server identity using a CA certificate.

  • verify-full: enables SSL, encrypts the data link, verifies the Hologres server identity using a CA certificate, and checks that the CN or DNS in the certificate matches the configured Hologres endpoint.

Note
  • Hologres versions 2.1 and later support verify-ca and verify-full. See Encryption in transit.

  • When using verify-ca or verify-full, also set connection.ssl.root-cert.location.

connection.ssl.root-cert.location

The path to the certificate file when encryption in transit requires a certificate.

String

No

None

Required when connection.ssl.mode is set to verify-ca or verify-full. Upload the CA certificate using the File Management feature in the Realtime Compute console. Uploaded files go to /flink/usrlib. For example, if your CA certificate is named certificate.crt, set this parameter to '/flink/usrlib/certificate.crt'.

Note

For instructions on obtaining the CA certificate, see Encryption in transit — Download the CA certificate.

Data type mapping

Use the sink.type-normalize-strategy parameter to define how upstream data types map to Hologres data types.

Note
  • Enable sink.type-normalize-strategy when starting a YAML job for the first time. If you enable it after startup, delete downstream tables and restart the job statelessly for the setting to take effect.

  • Supported array types include INTEGER, BIGINT, FLOAT, DOUBLE, BOOLEAN, CHAR, and VARCHAR.

  • Hologres does not support NUMERIC as a primary key. If a primary key maps to NUMERIC, Hologres converts it to VARCHAR.

STANDARD

When sink.type-normalize-strategy is STANDARD, the mapping is as follows:

Flink CDC type

Hologres type

CHAR

bpchar

STRING

text

VARCHAR

text (if length > 10,485,760 bytes)

varchar (if length ≤ 10,485,760 bytes)

BOOLEAN

bool

BINARY

bytea

VARBINARY

DECIMAL

numeric

TINYINT

int2

SMALLINT

INTEGER

int4

BIGINT

int8

FLOAT

float4

DOUBLE

float8

DATE

date

TIME_WITHOUT_TIME_ZONE

time

TIMESTAMP_WITHOUT_TIME_ZONE

timestamp

TIMESTAMP_WITH_LOCAL_TIME_ZONE

timestamptz

ARRAY

Arrays of various types

MAP

Not supported

ROW

Not supported

BROADEN

When sink.type-normalize-strategy is BROADEN, Flink CDC types map to broader Hologres types. The mapping is as follows:

Flink CDC type

Hologres type

CHAR

text

STRING

VARCHAR

BOOLEAN

bool

BINARY

bytea

VARBINARY

DECIMAL

numeric

TINYINT

int8

SMALLINT

INTEGER

BIGINT

FLOAT

float8

DOUBLE

DATE

date

TIME_WITHOUT_TIME_ZONE

time

TIMESTAMP_WITHOUT_TIME_ZONE

timestamp

TIMESTAMP_WITH_LOCAL_TIME_ZONE

timestamptz

ARRAY

Arrays of various types

MAP

Not supported

ROW

Not supported

ONLY_BIGINT_OR_TEXT

When sink.type-normalize-strategy is ONLY_BIGINT_OR_TEXT, all Flink CDC types map to BIGINT or STRING in Hologres. The mapping is as follows:

Flink CDC type

Hologres type

TINYINT

int8

SMALLINT

INTEGER

BIGINT

BOOLEAN

text

BINARY

VARBINARY

DECIMAL

FLOAT

DOUBLE

DATE

TIME_WITHOUT_TIME_ZONE

TIMESTAMP_WITHOUT_TIME_ZONE

TIMESTAMP_WITH_LOCAL_TIME_ZONE

ARRAY

Arrays of various types

MAP

Not supported

ROW

Not supported

Write to partitioned tables

Hologres sinks support writing to partitioned tables. Combine them with the transform module to write upstream data to Hologres partitioned tables. Note the following:

  • The partition key must be part of the primary key. Using a non-primary key from upstream as the partition key may cause inconsistent primary keys between upstream and downstream. Inconsistent primary keys cause data inconsistency during synchronization.

  • You can use columns of the TEXT, VARCHAR, and INT data types as partition keys. In Hologres V1.3.22 and later, you can also use columns of the DATE data type as partition keys.

  • Set createparttable to true to create child partitioned tables automatically. Otherwise, create them manually.

For an example, see Write to partitioned tables.

Synchronize table schema changes

CDC YAML pipeline jobs use different policies to handle schema evolution. These policies are specified using the pipeline-level configuration item schema.change.behavior. Valid values for schema.change.behavior are IGNORE, LENIENT, TRY_EVOLVE, EVOLVE, and EXCEPTION. Hologres sinks do not support the TRY_EVOLVE policy. The LENIENT and EVOLVE policies involve schema evolution. The following sections describe how to handle different schema change events.

LENIENT (default)

In LENIENT mode, schema changes are handled as follows:

  • Add a nullable column: adds the column to the end of the sink table schema and synchronizes data to it.

  • Delete a nullable column: fills the column with null instead of removing it from the sink table.

  • Add a non-nullable column: adds the column to the end of the sink table schema and synchronizes data to it. The new column defaults to nullable. Data before the column was added defaults to null.

  • Rename a column: treated as add + delete. Adds the renamed column to the end of the sink table and fills the original column with null. For example, if col_a becomes col_b, col_b is added and col_a is filled with null.

  • Change column type: not supported. Hologres does not allow column type changes. Use sink.type-normalize-strategy instead.

  • The following schema changes are not supported:

    • Changes to constraints such as primary keys or indexes.

    • Deletion of non-nullable columns.

    • Changing from NOT NULL to nullable.

EVOLVE

In EVOLVE mode, schema changes are handled as follows:

  • Add a nullable column: supported.

  • Delete a nullable column: not supported.

  • A non-null column is added to the sink table as a nullable column.

  • Rename a column: supported. Renames the column in the sink table.

  • Change column type: not supported. Hologres does not allow column type changes. Use sink.type-normalize-strategy instead.

  • The following schema changes are not supported:

    • Changes to constraints such as primary keys or indexes.

    • Deletion of non-nullable columns.

    • Changing from NOT NULL to nullable.

Warning

In EVOLVE mode, restarting a job statelessly without deleting the sink table may cause schema inconsistency between upstream and sink tables—and job failure. Manually adjust the sink table schema.

For an example of enabling EVOLVE mode, see Enable EVOLVE mode.

Code examples

Wide Type Mapping

Use the sink.type-normalize-strategy parameter to broaden data type mapping.

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  # Map CDC data types to broader Hologres types.
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

Write to partitioned tables

Convert the upstream timestamp field create_time to date type and use it as the Hologres table partition key.

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  # Create missing partitioned tables automatically.
  createparttable: true
 
transform:
  - source-table: test_db.test_source_table
    projection: \*, DATE_FORMAT(CAST(create_time AS TIMESTAMP), 'yyyy-MM-dd') as partition_key
    primary-keys: id, create_time, partition_key
    partition-keys: partition_key
    description: add partition key 

pipeline:
  name: MySQL to Hologres Pipeline

Enable EVOLVE mode

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  # Create missing partitioned tables automatically.
  createparttable: true

pipeline:
  name: MySQL to Hologres Pipeline
  schema.change.behavior: evolve

Synchronize a single table

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  # Map CDC data types to broader Hologres types.
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

Synchronize all tables in a database

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  # Map CDC data types to broader Hologres types.
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

Merge sharded tables

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.user\.*
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  # Map CDC data types to broader Hologres types.  
  sink.type-normalize-strategy: BROADEN
  
route:
  # Merge all sharded tables in MySQL test_db into one Hologres table: test_db.user.
  - source-table: test_db.user\.*
    sink-table: test_db.user

pipeline:
  name: MySQL to Hologres Pipeline

Synchronize to a specific schema

A schema in Hologres corresponds to a database in MySQL. You can specify the schema for the sink table.

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.user\.*
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  # Map CDC data types to broader Hologres types.
  sink.type-normalize-strategy: BROADEN
  
route:
  # Synchronize all tables from MySQL test_db to Hologres test_db2 schema, keeping table names unchanged.
  - source-table: test_db.\.*
    sink-table: test_db2.<>
    replace-symbol: <>

pipeline:
  name: MySQL to Hologres Pipeline

Synchronize new tables without restarting

To synchronize newly created tables in real time while the job runs, set scan.binlog.newly-added-table.enabled to true.

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  server-id: 5401-5499
  # Capture data from new tables created while the job runs.
  scan.binlog.newly-added-table.enabled: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  # Map CDC data types to broader Hologres types.
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

Restart Newly Added Existing Table

If you want to add synchronization for existing tables, set scan.newly-added-table.enabled = true and restart the job.

Warning

If you already use scan.binlog.newly-added-table.enabled = true to capture new tables, do not use scan.newly-added-table.enabled = true again to capture existing tables after restart. Doing so causes duplicate data.

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  server-id: 5401-5499
  scan.startup.mode: initial
  # On restart, check the tables parameter for new tables and run snapshots.
  # Requires scan.startup.mode: initial.
  scan.newly-added-table.enabled: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  # Map CDC data types to broader Hologres types.
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

Exclude tables during full-database sync

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  # Tables matched by this regex will not be synchronized.
  tables.exclude: test_db.table1
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  # Map CDC data types to broader Hologres types.
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

References