The Hologres connector for Apache Flink lets you write data streams from an open-source Flink cluster into Hologres tables in real time. The connector supports both the Flink SQL interface and the DataStream API. It is open-sourced in Apache Flink 1.11 and later, and its release packages are published in the Maven repository.
Prerequisites
Before you begin, ensure that you have:
-
A Hologres instance with a connected development tool. See Connect to HoloWeb
-
An Apache Flink cluster (this topic uses Flink 1.15 in standalone mode). To set up a cluster, download the binary from the Apache Flink website and follow Local installation of Flink
-
Network connectivity between the Flink cluster and your Hologres instance:
-
Same region: use the Virtual Private Cloud (VPC) endpoint of your Hologres instance
-
Different regions: use the public endpoint
-
Add the Maven dependency
Add the Hologres connector dependency to your pom.xml. Use the version that matches your Flink installation:
| Apache Flink version | Hologres connector |
|---|---|
| 1.11 | hologres-connector-flink-1.11:1.0.1 |
| 1.12 | hologres-connector-flink-1.12:1.0.1 |
| 1.13 | hologres-connector-flink-1.13:1.3.2 |
| 1.14 | hologres-connector-flink-1.14:1.3.2 |
| 1.15 | hologres-connector-flink-1.15:1.4.1 |
| 1.17 | hologres-connector-flink-1.17:1.4.1 |
Use Flink 1.15 or later to access more connector features.
The following example uses Flink 1.15:
<dependency>
<groupId>com.alibaba.hologres</groupId>
<artifactId>hologres-connector-flink-1.15</artifactId>
<version>1.4.0</version>
<classifier>jar-with-dependencies</classifier>
</dependency>
Choose a write mode
The Hologres connector supports three write modes. Choose based on your requirements and table schema:
| Write mode | Options | Best for |
|---|---|---|
| INSERT (default) | jdbcCopyWriteMode=false |
General streaming workloads |
| Fixed copy | jdbcCopyWriteMode=true, bulkLoad=false |
High-throughput writes |
| Bulk load | jdbcCopyWriteMode=true, bulkLoad=true |
Highest-performance batch-style writes to tables without a primary key, or to empty primary key tables |
Bulk load notes:
-
Bulk load reduces Hologres instance load by approximately 66.7% compared to fixed copy.
-
Writing to a table with a primary key causes table-level locks. To reduce lock granularity to the shard level, set
target-shards.enabled=true. This allows concurrent bulk load jobs. -
If you use bulk load to write to a table with a primary key, the table must be empty.
-
Requires Hologres V1.3.1+. Bulk load specifically requires Hologres V1.4.0+.
Flush behavior: A write batch is flushed to Hologres when any of the following conditions is met:
-
The number of records reaches
jdbcWriteBatchSize(default: 256) -
The data size in a single thread reaches
jdbcWriteBatchByteSize(default: 2 MB) -
The data size across all threads reaches
jdbcWriteBatchTotalByteSize(default: 20 MB) -
The time since the last flush reaches
jdbcWriteFlushInterval(default: 10 seconds)
Write data using Flink SQL
Use a Flink SQL sink table to write data to Hologres. The connector type is hologres.
CREATE TABLE sink (
user_id BIGINT,
user_name STRING,
price DECIMAL(38, 2),
sale_timestamp TIMESTAMP
) WITH (
'connector' = 'hologres',
'dbname' = '<your-database>',
'tablename' = '<your-table>',
'username' = '<your-access-key-id>',
'password' = '<your-access-key-secret>',
'endpoint' = '<your-endpoint>' -- Format: IP:Port
);
INSERT INTO sink SELECT * FROM source;
Replace the placeholders with your actual values:
| Placeholder | Description |
|---|---|
<your-database> |
Name of the Hologres database |
<your-table> |
Name of the target Hologres table |
<your-access-key-id> |
Your Alibaba Cloud AccessKey ID. Get it from the AccessKey Pair page |
<your-access-key-secret> |
Your Alibaba Cloud AccessKey secret. Get it from the AccessKey Pair page |
<your-endpoint> |
The VPC endpoint of your Hologres instance, in IP:Port format. Find it on the instance details page in the Hologres console |
Write data using the DataStream API
The following example applications demonstrate common patterns. All examples write to Hologres using the connector options described in Connector options reference.
| Application | Description |
|---|---|
FlinkSQLToHoloExample |
Write data using the Flink SQL interface |
FlinkDSAndSQLToHoloExample |
Convert a DataStream to a table, then write using the Flink SQL interface |
FlinkDataStreamToHoloExample |
Write data streams directly using the Flink DataStream interface |
FlinkRoaringBitmapAggJob |
Count unique visitors (UVs) in real time using roaring bitmaps and Hologres dimension tables, then write the results to Hologres |
FlinkToHoloRePartitionExample |
Partition data by shard before writing, using the DataStream interface. Suitable for bulk-importing data into multiple empty tables with primary keys; produces an effect similar to INSERT OVERWRITE |
Connector options reference
Required options
| Option | Description |
|---|---|
connector |
The sink connector type. Set to hologres |
dbname |
Name of the Hologres database |
tablename |
Name of the target Hologres table |
username |
Your AccessKey ID |
password |
Your AccessKey secret |
endpoint |
VPC endpoint of your Hologres instance, in IP:Port format. Use the VPC endpoint for same-region access; use the public endpoint for cross-region access |
Connection options
| Option | Default | Description |
|---|---|---|
connectionSize |
3 | Number of Java Database Connectivity (JDBC) connections per connection pool per Flink task. Increase proportionally with throughput requirements |
connectionPoolName |
(none) | Name of the connection pool. Tables sharing a pool must have the same connectionSize value. By default, each table uses its own pool |
fixedConnectionMode |
false | When true, write and point query operations do not consume connections. Requires connector 1.2.0+ and Hologres 1.3+. This feature is in beta |
jdbcRetryCount |
10 | Maximum number of retry attempts on connection failure |
jdbcRetrySleepInitMs |
1000 | Base retry interval in milliseconds. Retry interval = jdbcRetrySleepInitMs + (retry count x jdbcRetrySleepStepMs) |
jdbcRetrySleepStepMs |
5000 | Step increment for retry intervals in milliseconds |
jdbcConnectionMaxIdleMs |
60000 | Idle timeout for write and point query connections in milliseconds. The connection is released after the timeout expires |
jdbcMetaCacheTTL |
60000 | Time-to-live (TTL) of the table schema cache in milliseconds |
jdbcMetaAutoRefreshFactor |
-1 | Factor that triggers automatic cache refresh. The cache is refreshed when its remaining TTL drops below jdbcMetaCacheTTL / jdbcMetaAutoRefreshFactor. -1 disables automatic refresh |
connection.ssl.mode |
disable | SSL-encrypted transmission mode. Valid values: disable, require, verify-ca, verify-full |
connection.ssl.root-cert.location |
(none) | Path to the CA certificate file on the Flink cluster. Required when connection.ssl.mode is verify-ca or verify-full |
jdbcDirectConnect |
false | When true, the connector checks whether Flink can directly connect to Hologres Frontend (FE) nodes in the current environment, and uses direct connections if available |
Sink options
| Option | Default | Description |
|---|---|---|
mutatetype |
insertorignore | Write mode for the sink. See Streaming semantics |
ignoredelete |
true | When true, retract messages (DELETE requests generated by Flink GROUP BY operations) are ignored. Applies only when streaming semantics is used |
createparttable |
false | When true, partitions are automatically created based on partition key values when writing to a partitioned table. Use with caution: invalid partition key values create invalid partitions |
ignoreNullWhenUpdate |
false | When true, null values are not written during an update if mutatetype='insertOrUpdate' |
jdbcWriteBatchSize |
256 | Maximum number of records per write batch per streaming sink node |
jdbcWriteBatchByteSize |
2097152 | Maximum data size per write batch per thread, in bytes (default: 2 MB) |
jdbcWriteBatchTotalByteSize |
20971520 | Maximum data size per write batch across all threads, in bytes (default: 20 MB) |
jdbcWriteFlushInterval |
10000 | Maximum wait time before flushing a write batch, in milliseconds (default: 10 seconds) |
jdbcUseLegacyPutHandler |
false | Controls the SQL syntax used for writes. true: INSERT INTO xxx(c0,c1,...) VALUES (?,...) ON CONFLICT. false: INSERT INTO xxx(c0,c1,...) SELECT unnest(?), unnest(?),... ON CONFLICT |
jdbcEnableDefaultForNotNullColumn |
true | When true, null values written to NOT NULL columns without a default are replaced with: "" (STRING), 0 (NUMBER), or 1970-01-01 00:00:00 (DATE/TIMESTAMP/TIMESTAMPTZ) |
remove-u0000-in-text.enabled |
false | When true, replaces u0000 characters in TEXT columns that are not UTF-8 encoded |
deduplication.enabled |
true | When true, only the last record with each primary key value is retained when multiple records share the same key in the same write batch. When false, all records are written without deduplication. Disabling deduplication may cause write failures if all records in a batch share the same primary key |
aggressive.enabled |
false | When true, the connector commits data immediately when an idle connection is detected, regardless of batch size. Reduces write latency under low traffic |
jdbcCopyWriteMode |
false | When true, uses the COPY protocol instead of INSERT. See Choose a write mode. Requires Hologres V1.3.1+ |
jdbcCopyWriteFormat |
binary | Protocol format when jdbcCopyWriteMode=true. binary is faster; text is available as an alternative. Requires Hologres V1.3.1+ |
bulkLoad |
false | When true and jdbcCopyWriteMode=true, uses bulk load instead of fixed copy. Requires Hologres V1.4.0+ |
target-shards.enabled |
false | When true, bulk load targets individual shards, reducing lock granularity and allowing concurrent bulk load jobs. Requires Hologres V1.4.1+ |
Point query options
These options apply when using Hologres as a Flink dimension table for lookup joins.
| Option | Default | Description |
|---|---|---|
jdbcReadBatchSize |
128 | Maximum number of requests per batch per thread for dimension table point queries |
jdbcReadBatchQueueSize |
256 | Maximum number of queued requests per thread |
async |
false | When true, the connector processes multiple requests and responses concurrently, improving query throughput. Requests are not guaranteed to complete in order |
cache |
None | Cache policy for dimension table lookups. None: no caching. LRU: cache a subset of dimension table data in memory |
cachesize |
10000 | Maximum number of cached records. Applies only when cache=LRU |
cachettlms |
(no expiry) | Cache refresh interval in milliseconds. Applies only when cache=LRU. By default, cached entries do not expire |
cacheempty |
true | When true, the results of JOIN queries that return no rows are also cached |
Data type mappings
For data type mappings between Apache Flink and Hologres, see the "Data type mappings between Realtime Compute for Apache Flink or Blink and Hologres" section in Data types.
What's next
-
Create a Hologres result table — learn about streaming semantics and the
mutatetypeoption -
Data types — full data type mapping reference