The Hologres connector for Real-time Compute for Flink (exclusive mode) lets you write streaming data from Flink jobs directly to Hologres and query it immediately — no staging layer or batch loading required.
Prerequisites
Before you begin, make sure that:
-
Your Real-time Compute for Flink service and Hologres instance are in the same region. Cross-region connections are not supported.
-
You are running Real-time Compute for Flink in exclusive mode V3.6 or later. Earlier versions do not include the built-in Hologres connector and require a JAR file reference instead. Upgrade to V3.6 or later before proceeding.
-
You have an AccessKey ID and AccessKey Secret for your Alibaba Cloud account.
Version compatibility
Different versions of Real-time Compute for Flink in exclusive mode have different capabilities. Confirm your version before configuring your job.
| Feature | V3.6 and later | V3.7 and later | V3.7.6 and later |
|---|---|---|---|
| Built-in Hologres connector | Yes | Yes | Yes |
Automatic child partition table creation (createparttable) |
No | Yes | Yes |
blink.checkpoint.fail_on_checkpoint_error=true required |
Yes | Yes | No |
If you are on a version earlier than V3.6, see Common upgrade preparation failure errors before upgrading.
DDL syntax
Use the following statement to create a Hologres sink table in your Flink job:
CREATE TABLE Hologres_sink (
name VARCHAR,
age BIGINT,
birthday BIGINT
) WITH (
type = 'hologres',
dbname = '<your-db-name>', -- Hologres database name
tablename = '<your-table-name>', -- Target table in Hologres
username = '<your-access-key-id>', -- Alibaba Cloud AccessKey ID
password = '<your-access-key-secret>', -- Alibaba Cloud AccessKey Secret
endpoint = '<your-endpoint>' -- VPC endpoint of the Hologres instance (ip:port)
);
Replace the placeholders as follows:
| Placeholder | Description | Example |
|---|---|---|
<your-db-name> |
Name of your Hologres database | holodb |
<your-table-name> |
Name of the target table | blink_test |
<your-access-key-id> |
AccessKey ID of your Alibaba Cloud account | LTAI5tXxx |
<your-access-key-secret> |
AccessKey Secret of your Alibaba Cloud account | xXxXxXx |
<your-endpoint> |
VPC endpoint in ip:port format. Get this from the Network Information section on the Hologres Management Console. |
demo-cn-hangzhou-vpc.hologres.aliyuncs.com:80 |
WITH parameters
| Parameter | Required | Default | Description |
|---|---|---|---|
type |
Yes | — | Sink table type. Set to hologres. |
endpoint |
Yes | — | VPC network address of the Hologres instance in ip:port format. Get this from the Network Information section in the Hologres Management Console. |
username |
Yes | — | AccessKey ID. Get it from AccessKey Management. |
password |
Yes | — | AccessKey Secret. Get it from AccessKey Management. |
dbname |
Yes | — | Name of the Hologres database. |
tablename |
Yes | — | Name of the target table in the Hologres database. |
arraydelimiter |
No | \u0002 |
Delimiter used to split a STRING field into an array before writing to Hologres. |
mutatetype |
No | insertorignore |
Write mode. Controls behavior when a primary key conflict occurs. For details, see Write semantics. |
ignoredelete |
No | false |
Whether to ignore retraction messages. Set to true to prevent Flink Groupby retraction messages from generating DELETE requests in Hologres. Applies only when streaming semantics are used. |
partitionrouter |
No | false |
Whether to route data to child partition tables automatically when writing to a partitioned table. |
createparttable |
No | false |
Whether to auto-create child partition tables based on partition values. Requires V3.7 or later. Use with caution: dirty partition values can result in incorrect tables being created. |
Thearraydelimiter,mutatetype,ignoredelete,partitionrouter, andcreateparttableparameters are not included in the DDL example above. Add them to theWITHclause as needed.
Write semantics
The Hologres connector buffers data in memory and flushes it asynchronously to Hologres. The mutatetype parameter controls how records are written when a primary key conflict occurs:
| Value | Behavior |
|---|---|
insertorignore (default) |
Inserts the record. If a record with the same primary key already exists, the new record is discarded. |
insertorupdate |
Upserts the record. If a record with the same primary key exists, the existing record is updated with the new values. |
Checkpoint behavior: Because Hologres writes are asynchronous, add blink.checkpoint.fail_on_checkpoint_error=true to your job configuration. This triggers a failover if a write failure occurs during a checkpoint. This setting is not required for V3.7.6 and later.
Write data to a regular Hologres table
Step 1: Create the target table in Hologres.
CREATE TABLE blink_test (a INT, b TEXT, c TEXT, d FLOAT8, e BIGINT);
Step 2: Create a Flink job.
Log on to the Real-time Compute for Flink console and create a job with the following SQL. This example generates random data and writes columns a, b, and c to the blink_test table.
CREATE TABLE randomSource (
a INT,
b VARCHAR,
c VARCHAR,
d DOUBLE,
e BIGINT
) WITH (type = 'random');
CREATE TABLE test (
a INT,
b VARCHAR,
c VARCHAR,
PRIMARY KEY (a)
) WITH (
type = 'hologres',
`endpoint` = '$ip:$port', -- VPC endpoint of the Hologres instance
`username` = '<your-access-key-id>',
`password` = '<your-access-key-secret>',
`dbname` = '<your-db-name>',
`tablename` = 'blink_test'
);
INSERT INTO test
SELECT a, b, c FROM randomSource;
Step 3: Publish the job.
-
Click Syntax Check in the editor. Confirm that Successful is displayed.
-
Click Save.
-
Click Publish and configure the publishing settings.
Step 4: Start the job.
On the Real-time Compute for Flink Platform for Developers page, click O&M in the upper-right corner. Select your job and click Start.
Step 5: Query data in Hologres.
SELECT * FROM blink_test;
Write data from multiple streams to a wide table
Use this pattern when multiple Flink streams each carry different columns for the same Hologres table — a common scenario in real-time data integration pipelines.
Scenario: A Hologres wide table named WIDE_TABLE has columns A (primary key), B, C, D, and E. Stream 1 carries columns A, B, and C. Stream 2 carries columns A, D, and E.
Setup:
-
In Flink SQL, declare two sink tables both mapped to
WIDE_TABLE: one with columns A, B, C and one with columns A, D, E. -
Set
mutatetype = insertorupdateon both sink tables so that each stream updates only its own columns. -
Set
ignoredelete = trueon both sink tables to prevent retraction messages from generating DELETE requests. -
Insert each stream into its respective sink table.
Example:
-- Sink table for Stream 1 (columns A, B, C)
CREATE TABLE wide_table_sink_1 (
A VARCHAR,
B VARCHAR,
C VARCHAR,
PRIMARY KEY (A)
) WITH (
type = 'hologres',
`endpoint` = '$ip:$port',
`username` = '<your-access-key-id>',
`password` = '<your-access-key-secret>',
`dbname` = '<your-db-name>',
`tablename` = 'WIDE_TABLE',
`mutatetype` = 'insertorupdate',
`ignoredelete` = 'true'
);
-- Sink table for Stream 2 (columns A, D, E)
CREATE TABLE wide_table_sink_2 (
A VARCHAR,
D VARCHAR,
E VARCHAR,
PRIMARY KEY (A)
) WITH (
type = 'hologres',
`endpoint` = '$ip:$port',
`username` = '<your-access-key-id>',
`password` = '<your-access-key-secret>',
`dbname` = '<your-db-name>',
`tablename` = 'WIDE_TABLE',
`mutatetype` = 'insertorupdate',
`ignoredelete` = 'true'
);
-- Insert from each stream
INSERT INTO wide_table_sink_1 SELECT A, B, C FROM stream1;
INSERT INTO wide_table_sink_2 SELECT A, D, E FROM stream2;
Limitations for this pattern:
-
WIDE_TABLEmust have a primary key. -
Each stream must include all primary key columns.
-
For column-oriented (columnar) tables, a high records-per-second (RPS) rate can cause elevated CPU usage. Disable Dictionary encoding on the affected columns to mitigate this.
Write data to a partitioned Hologres table
When writing to a Hologres partitioned table, data goes directly to the parent table and is automatically routed to the matching child partition table via the real-time data API.
Limitations for partitioned tables:
-
Only list partitioning is supported.
-
Partition key columns must be of type
textorint4. -
Partition values cannot contain hyphens (
-). For example,2020-09-12is invalid as a partition value. -
If the table has a primary key, all partition key columns must be part of the primary key.
-
Child partition table columns for the partition key must use static field values.
-
Partition key values in data written to a child table must exactly match the values defined when that child table was created; mismatches result in an error.
-
The DEFAULT partition is not supported.
Step 1: Create the partitioned table in Hologres.
The following example creates a parent partitioned table test_message partitioned by bizdate.
DROP TABLE IF EXISTS test_message;
BEGIN;
CREATE TABLE test_message (
"bizdate" TEXT NOT NULL,
"tag" TEXT NOT NULL,
"id" INT4 NOT NULL,
"title" TEXT NOT NULL,
"body" TEXT,
PRIMARY KEY (bizdate, tag, id)
)
PARTITION BY LIST (bizdate);
COMMIT;
For versions earlier than V3.7, create child partition tables in Hologres manually before starting the job. Automatic partition creation (createparttable) is only available in V3.7 and later.
Step 2: Create a Flink job.
The following example applies to V3.7 and later. It generates random source data and routes it to the test_message partitioned table for two partition values.
CREATE TABLE test_message_src (
tag VARCHAR,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
type = 'random',
`interval` = '10',
`count` = '100'
);
CREATE TABLE test_message_sink (
bizdate VARCHAR,
tag VARCHAR,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
type = 'hologres',
`endpoint` = '$ip:$port',
`username` = '<your-access-key-id>',
`password` = '<your-access-key-secret>',
`dbname` = '<your-db-name>',
`tablename` = 'test_message',
`partitionrouter` = 'true', -- Route data to child partition tables
`createparttable` = 'true' -- Auto-create child partition tables (V3.7+ only)
);
INSERT INTO test_message_sink SELECT '20200327', * FROM test_message_src;
INSERT INTO test_message_sink SELECT '20200328', * FROM test_message_src;
Step 3: Publish and start the job.
Follow the same steps as in Write data to a regular Hologres table.
Step 4: Query data in Hologres.
Query across all partitions or filter by a specific partition:
SELECT * FROM test_message;
SELECT * FROM test_message WHERE bizdate = '20200327';
Data type mapping
For the data type mapping between Real-time Compute for Flink in exclusive mode and Hologres, see Data type summary.
What's next
-
Real-time data API — Learn how Hologres routes data to child partition tables.
-
Data type summary — Full mapping of Flink and Hologres data types.