All Products
Search
Document Center

Hologres:Streaming writes to Hologres with Apache Flink 1.11+

Last Updated:Mar 26, 2026

The Hologres connector for Apache Flink lets you write data streams from an open-source Flink cluster into Hologres tables in real time. The connector supports both the Flink SQL interface and the DataStream API. It is open-sourced in Apache Flink 1.11 and later, and its release packages are published in the Maven repository.

Prerequisites

Before you begin, ensure that you have:

  • A Hologres instance with a connected development tool. See Connect to HoloWeb

  • An Apache Flink cluster (this topic uses Flink 1.15 in standalone mode). To set up a cluster, download the binary from the Apache Flink website and follow Local installation of Flink

  • Network connectivity between the Flink cluster and your Hologres instance:

    • Same region: use the Virtual Private Cloud (VPC) endpoint of your Hologres instance

    • Different regions: use the public endpoint

Add the Maven dependency

Add the Hologres connector dependency to your pom.xml. Use the version that matches your Flink installation:

Apache Flink version Hologres connector
1.11 hologres-connector-flink-1.11:1.0.1
1.12 hologres-connector-flink-1.12:1.0.1
1.13 hologres-connector-flink-1.13:1.3.2
1.14 hologres-connector-flink-1.14:1.3.2
1.15 hologres-connector-flink-1.15:1.4.1
1.17 hologres-connector-flink-1.17:1.4.1

Use Flink 1.15 or later to access more connector features.

The following example uses Flink 1.15:

<dependency>
    <groupId>com.alibaba.hologres</groupId>
    <artifactId>hologres-connector-flink-1.15</artifactId>
    <version>1.4.0</version>
    <classifier>jar-with-dependencies</classifier>
</dependency>

Choose a write mode

The Hologres connector supports three write modes. Choose based on your requirements and table schema:

Write mode Options Best for
INSERT (default) jdbcCopyWriteMode=false General streaming workloads
Fixed copy jdbcCopyWriteMode=true, bulkLoad=false High-throughput writes
Bulk load jdbcCopyWriteMode=true, bulkLoad=true Highest-performance batch-style writes to tables without a primary key, or to empty primary key tables

Bulk load notes:

  • Bulk load reduces Hologres instance load by approximately 66.7% compared to fixed copy.

  • Writing to a table with a primary key causes table-level locks. To reduce lock granularity to the shard level, set target-shards.enabled=true. This allows concurrent bulk load jobs.

  • If you use bulk load to write to a table with a primary key, the table must be empty.

  • Requires Hologres V1.3.1+. Bulk load specifically requires Hologres V1.4.0+.

Flush behavior: A write batch is flushed to Hologres when any of the following conditions is met:

  • The number of records reaches jdbcWriteBatchSize (default: 256)

  • The data size in a single thread reaches jdbcWriteBatchByteSize (default: 2 MB)

  • The data size across all threads reaches jdbcWriteBatchTotalByteSize (default: 20 MB)

  • The time since the last flush reaches jdbcWriteFlushInterval (default: 10 seconds)

Write data using Flink SQL

Use a Flink SQL sink table to write data to Hologres. The connector type is hologres.

CREATE TABLE sink (
    user_id     BIGINT,
    user_name   STRING,
    price       DECIMAL(38, 2),
    sale_timestamp TIMESTAMP
) WITH (
    'connector'  = 'hologres',
    'dbname'     = '<your-database>',
    'tablename'  = '<your-table>',
    'username'   = '<your-access-key-id>',
    'password'   = '<your-access-key-secret>',
    'endpoint'   = '<your-endpoint>'   -- Format: IP:Port
);

INSERT INTO sink SELECT * FROM source;

Replace the placeholders with your actual values:

Placeholder Description
<your-database> Name of the Hologres database
<your-table> Name of the target Hologres table
<your-access-key-id> Your Alibaba Cloud AccessKey ID. Get it from the AccessKey Pair page
<your-access-key-secret> Your Alibaba Cloud AccessKey secret. Get it from the AccessKey Pair page
<your-endpoint> The VPC endpoint of your Hologres instance, in IP:Port format. Find it on the instance details page in the Hologres console

Write data using the DataStream API

The following example applications demonstrate common patterns. All examples write to Hologres using the connector options described in Connector options reference.

Application Description
FlinkSQLToHoloExample Write data using the Flink SQL interface
FlinkDSAndSQLToHoloExample Convert a DataStream to a table, then write using the Flink SQL interface
FlinkDataStreamToHoloExample Write data streams directly using the Flink DataStream interface
FlinkRoaringBitmapAggJob Count unique visitors (UVs) in real time using roaring bitmaps and Hologres dimension tables, then write the results to Hologres
FlinkToHoloRePartitionExample Partition data by shard before writing, using the DataStream interface. Suitable for bulk-importing data into multiple empty tables with primary keys; produces an effect similar to INSERT OVERWRITE

Connector options reference

Required options

Option Description
connector The sink connector type. Set to hologres
dbname Name of the Hologres database
tablename Name of the target Hologres table
username Your AccessKey ID
password Your AccessKey secret
endpoint VPC endpoint of your Hologres instance, in IP:Port format. Use the VPC endpoint for same-region access; use the public endpoint for cross-region access

Connection options

Option Default Description
connectionSize 3 Number of Java Database Connectivity (JDBC) connections per connection pool per Flink task. Increase proportionally with throughput requirements
connectionPoolName (none) Name of the connection pool. Tables sharing a pool must have the same connectionSize value. By default, each table uses its own pool
fixedConnectionMode false When true, write and point query operations do not consume connections. Requires connector 1.2.0+ and Hologres 1.3+. This feature is in beta
jdbcRetryCount 10 Maximum number of retry attempts on connection failure
jdbcRetrySleepInitMs 1000 Base retry interval in milliseconds. Retry interval = jdbcRetrySleepInitMs + (retry count x jdbcRetrySleepStepMs)
jdbcRetrySleepStepMs 5000 Step increment for retry intervals in milliseconds
jdbcConnectionMaxIdleMs 60000 Idle timeout for write and point query connections in milliseconds. The connection is released after the timeout expires
jdbcMetaCacheTTL 60000 Time-to-live (TTL) of the table schema cache in milliseconds
jdbcMetaAutoRefreshFactor -1 Factor that triggers automatic cache refresh. The cache is refreshed when its remaining TTL drops below jdbcMetaCacheTTL / jdbcMetaAutoRefreshFactor. -1 disables automatic refresh
connection.ssl.mode disable SSL-encrypted transmission mode. Valid values: disable, require, verify-ca, verify-full
connection.ssl.root-cert.location (none) Path to the CA certificate file on the Flink cluster. Required when connection.ssl.mode is verify-ca or verify-full
jdbcDirectConnect false When true, the connector checks whether Flink can directly connect to Hologres Frontend (FE) nodes in the current environment, and uses direct connections if available

Sink options

Option Default Description
mutatetype insertorignore Write mode for the sink. See Streaming semantics
ignoredelete true When true, retract messages (DELETE requests generated by Flink GROUP BY operations) are ignored. Applies only when streaming semantics is used
createparttable false When true, partitions are automatically created based on partition key values when writing to a partitioned table. Use with caution: invalid partition key values create invalid partitions
ignoreNullWhenUpdate false When true, null values are not written during an update if mutatetype='insertOrUpdate'
jdbcWriteBatchSize 256 Maximum number of records per write batch per streaming sink node
jdbcWriteBatchByteSize 2097152 Maximum data size per write batch per thread, in bytes (default: 2 MB)
jdbcWriteBatchTotalByteSize 20971520 Maximum data size per write batch across all threads, in bytes (default: 20 MB)
jdbcWriteFlushInterval 10000 Maximum wait time before flushing a write batch, in milliseconds (default: 10 seconds)
jdbcUseLegacyPutHandler false Controls the SQL syntax used for writes. true: INSERT INTO xxx(c0,c1,...) VALUES (?,...) ON CONFLICT. false: INSERT INTO xxx(c0,c1,...) SELECT unnest(?), unnest(?),... ON CONFLICT
jdbcEnableDefaultForNotNullColumn true When true, null values written to NOT NULL columns without a default are replaced with: "" (STRING), 0 (NUMBER), or 1970-01-01 00:00:00 (DATE/TIMESTAMP/TIMESTAMPTZ)
remove-u0000-in-text.enabled false When true, replaces u0000 characters in TEXT columns that are not UTF-8 encoded
deduplication.enabled true When true, only the last record with each primary key value is retained when multiple records share the same key in the same write batch. When false, all records are written without deduplication. Disabling deduplication may cause write failures if all records in a batch share the same primary key
aggressive.enabled false When true, the connector commits data immediately when an idle connection is detected, regardless of batch size. Reduces write latency under low traffic
jdbcCopyWriteMode false When true, uses the COPY protocol instead of INSERT. See Choose a write mode. Requires Hologres V1.3.1+
jdbcCopyWriteFormat binary Protocol format when jdbcCopyWriteMode=true. binary is faster; text is available as an alternative. Requires Hologres V1.3.1+
bulkLoad false When true and jdbcCopyWriteMode=true, uses bulk load instead of fixed copy. Requires Hologres V1.4.0+
target-shards.enabled false When true, bulk load targets individual shards, reducing lock granularity and allowing concurrent bulk load jobs. Requires Hologres V1.4.1+

Point query options

These options apply when using Hologres as a Flink dimension table for lookup joins.

Option Default Description
jdbcReadBatchSize 128 Maximum number of requests per batch per thread for dimension table point queries
jdbcReadBatchQueueSize 256 Maximum number of queued requests per thread
async false When true, the connector processes multiple requests and responses concurrently, improving query throughput. Requests are not guaranteed to complete in order
cache None Cache policy for dimension table lookups. None: no caching. LRU: cache a subset of dimension table data in memory
cachesize 10000 Maximum number of cached records. Applies only when cache=LRU
cachettlms (no expiry) Cache refresh interval in milliseconds. Applies only when cache=LRU. By default, cached entries do not expire
cacheempty true When true, the results of JOIN queries that return no rows are also cached

Data type mappings

For data type mappings between Apache Flink and Hologres, see the "Data type mappings between Realtime Compute for Apache Flink or Blink and Hologres" section in Data types.

What's next