All Products
Search
Document Center

Hologres:Hologres sink table

Last Updated:Mar 26, 2026

The Hologres connector for Real-time Compute for Flink (exclusive mode) lets you write streaming data from Flink jobs directly to Hologres and query it immediately — no staging layer or batch loading required.

Prerequisites

Before you begin, make sure that:

  • Your Real-time Compute for Flink service and Hologres instance are in the same region. Cross-region connections are not supported.

  • You are running Real-time Compute for Flink in exclusive mode V3.6 or later. Earlier versions do not include the built-in Hologres connector and require a JAR file reference instead. Upgrade to V3.6 or later before proceeding.

  • You have an AccessKey ID and AccessKey Secret for your Alibaba Cloud account.

Version compatibility

Different versions of Real-time Compute for Flink in exclusive mode have different capabilities. Confirm your version before configuring your job.

Feature V3.6 and later V3.7 and later V3.7.6 and later
Built-in Hologres connector Yes Yes Yes
Automatic child partition table creation (createparttable) No Yes Yes
blink.checkpoint.fail_on_checkpoint_error=true required Yes Yes No
Important

If you are on a version earlier than V3.6, see Common upgrade preparation failure errors before upgrading.

DDL syntax

Use the following statement to create a Hologres sink table in your Flink job:

CREATE TABLE Hologres_sink (
  name VARCHAR,
  age BIGINT,
  birthday BIGINT
) WITH (
  type = 'hologres',
  dbname = '<your-db-name>',       -- Hologres database name
  tablename = '<your-table-name>', -- Target table in Hologres
  username = '<your-access-key-id>',     -- Alibaba Cloud AccessKey ID
  password = '<your-access-key-secret>', -- Alibaba Cloud AccessKey Secret
  endpoint = '<your-endpoint>'     -- VPC endpoint of the Hologres instance (ip:port)
);

Replace the placeholders as follows:

Placeholder Description Example
<your-db-name> Name of your Hologres database holodb
<your-table-name> Name of the target table blink_test
<your-access-key-id> AccessKey ID of your Alibaba Cloud account LTAI5tXxx
<your-access-key-secret> AccessKey Secret of your Alibaba Cloud account xXxXxXx
<your-endpoint> VPC endpoint in ip:port format. Get this from the Network Information section on the Hologres Management Console. demo-cn-hangzhou-vpc.hologres.aliyuncs.com:80

WITH parameters

Parameter Required Default Description
type Yes Sink table type. Set to hologres.
endpoint Yes VPC network address of the Hologres instance in ip:port format. Get this from the Network Information section in the Hologres Management Console.
username Yes AccessKey ID. Get it from AccessKey Management.
password Yes AccessKey Secret. Get it from AccessKey Management.
dbname Yes Name of the Hologres database.
tablename Yes Name of the target table in the Hologres database.
arraydelimiter No \u0002 Delimiter used to split a STRING field into an array before writing to Hologres.
mutatetype No insertorignore Write mode. Controls behavior when a primary key conflict occurs. For details, see Write semantics.
ignoredelete No false Whether to ignore retraction messages. Set to true to prevent Flink Groupby retraction messages from generating DELETE requests in Hologres. Applies only when streaming semantics are used.
partitionrouter No false Whether to route data to child partition tables automatically when writing to a partitioned table.
createparttable No false Whether to auto-create child partition tables based on partition values. Requires V3.7 or later. Use with caution: dirty partition values can result in incorrect tables being created.
The arraydelimiter, mutatetype, ignoredelete, partitionrouter, and createparttable parameters are not included in the DDL example above. Add them to the WITH clause as needed.

Write semantics

The Hologres connector buffers data in memory and flushes it asynchronously to Hologres. The mutatetype parameter controls how records are written when a primary key conflict occurs:

Value Behavior
insertorignore (default) Inserts the record. If a record with the same primary key already exists, the new record is discarded.
insertorupdate Upserts the record. If a record with the same primary key exists, the existing record is updated with the new values.

Checkpoint behavior: Because Hologres writes are asynchronous, add blink.checkpoint.fail_on_checkpoint_error=true to your job configuration. This triggers a failover if a write failure occurs during a checkpoint. This setting is not required for V3.7.6 and later.

Write data to a regular Hologres table

Step 1: Create the target table in Hologres.

CREATE TABLE blink_test (a INT, b TEXT, c TEXT, d FLOAT8, e BIGINT);

Step 2: Create a Flink job.

Log on to the Real-time Compute for Flink console and create a job with the following SQL. This example generates random data and writes columns a, b, and c to the blink_test table.

CREATE TABLE randomSource (
  a INT,
  b VARCHAR,
  c VARCHAR,
  d DOUBLE,
  e BIGINT
) WITH (type = 'random');

CREATE TABLE test (
  a INT,
  b VARCHAR,
  c VARCHAR,
  PRIMARY KEY (a)
) WITH (
  type = 'hologres',
  `endpoint` = '$ip:$port',  -- VPC endpoint of the Hologres instance
  `username` = '<your-access-key-id>',
  `password` = '<your-access-key-secret>',
  `dbname` = '<your-db-name>',
  `tablename` = 'blink_test'
);

INSERT INTO test
SELECT a, b, c FROM randomSource;

Step 3: Publish the job.

  1. Click Syntax Check in the editor. Confirm that Successful is displayed.

  2. Click Save.

  3. Click Publish and configure the publishing settings.

4

Step 4: Start the job.

On the Real-time Compute for Flink Platform for Developers page, click O&M in the upper-right corner. Select your job and click Start.

6

Step 5: Query data in Hologres.

SELECT * FROM blink_test;

Write data from multiple streams to a wide table

Use this pattern when multiple Flink streams each carry different columns for the same Hologres table — a common scenario in real-time data integration pipelines.

Scenario: A Hologres wide table named WIDE_TABLE has columns A (primary key), B, C, D, and E. Stream 1 carries columns A, B, and C. Stream 2 carries columns A, D, and E.

Setup:

  1. In Flink SQL, declare two sink tables both mapped to WIDE_TABLE: one with columns A, B, C and one with columns A, D, E.

  2. Set mutatetype = insertorupdate on both sink tables so that each stream updates only its own columns.

  3. Set ignoredelete = true on both sink tables to prevent retraction messages from generating DELETE requests.

  4. Insert each stream into its respective sink table.

Example:

-- Sink table for Stream 1 (columns A, B, C)
CREATE TABLE wide_table_sink_1 (
  A VARCHAR,
  B VARCHAR,
  C VARCHAR,
  PRIMARY KEY (A)
) WITH (
  type = 'hologres',
  `endpoint` = '$ip:$port',
  `username` = '<your-access-key-id>',
  `password` = '<your-access-key-secret>',
  `dbname` = '<your-db-name>',
  `tablename` = 'WIDE_TABLE',
  `mutatetype` = 'insertorupdate',
  `ignoredelete` = 'true'
);

-- Sink table for Stream 2 (columns A, D, E)
CREATE TABLE wide_table_sink_2 (
  A VARCHAR,
  D VARCHAR,
  E VARCHAR,
  PRIMARY KEY (A)
) WITH (
  type = 'hologres',
  `endpoint` = '$ip:$port',
  `username` = '<your-access-key-id>',
  `password` = '<your-access-key-secret>',
  `dbname` = '<your-db-name>',
  `tablename` = 'WIDE_TABLE',
  `mutatetype` = 'insertorupdate',
  `ignoredelete` = 'true'
);

-- Insert from each stream
INSERT INTO wide_table_sink_1 SELECT A, B, C FROM stream1;
INSERT INTO wide_table_sink_2 SELECT A, D, E FROM stream2;

Limitations for this pattern:

  • WIDE_TABLE must have a primary key.

  • Each stream must include all primary key columns.

  • For column-oriented (columnar) tables, a high records-per-second (RPS) rate can cause elevated CPU usage. Disable Dictionary encoding on the affected columns to mitigate this.

Write data to a partitioned Hologres table

When writing to a Hologres partitioned table, data goes directly to the parent table and is automatically routed to the matching child partition table via the real-time data API.

Limitations for partitioned tables:

  • Only list partitioning is supported.

  • Partition key columns must be of type text or int4.

  • Partition values cannot contain hyphens (-). For example, 2020-09-12 is invalid as a partition value.

  • If the table has a primary key, all partition key columns must be part of the primary key.

  • Child partition table columns for the partition key must use static field values.

  • Partition key values in data written to a child table must exactly match the values defined when that child table was created; mismatches result in an error.

  • The DEFAULT partition is not supported.

Step 1: Create the partitioned table in Hologres.

The following example creates a parent partitioned table test_message partitioned by bizdate.

DROP TABLE IF EXISTS test_message;

BEGIN;
CREATE TABLE test_message (
  "bizdate" TEXT NOT NULL,
  "tag"     TEXT NOT NULL,
  "id"      INT4 NOT NULL,
  "title"   TEXT NOT NULL,
  "body"    TEXT,
  PRIMARY KEY (bizdate, tag, id)
)
PARTITION BY LIST (bizdate);
COMMIT;
For versions earlier than V3.7, create child partition tables in Hologres manually before starting the job. Automatic partition creation (createparttable) is only available in V3.7 and later.

Step 2: Create a Flink job.

The following example applies to V3.7 and later. It generates random source data and routes it to the test_message partitioned table for two partition values.

CREATE TABLE test_message_src (
  tag   VARCHAR,
  id    INTEGER,
  title VARCHAR,
  body  VARCHAR
) WITH (
  type = 'random',
  `interval` = '10',
  `count` = '100'
);

CREATE TABLE test_message_sink (
  bizdate VARCHAR,
  tag     VARCHAR,
  id      INTEGER,
  title   VARCHAR,
  body    VARCHAR
) WITH (
  type = 'hologres',
  `endpoint`        = '$ip:$port',
  `username`        = '<your-access-key-id>',
  `password`        = '<your-access-key-secret>',
  `dbname`          = '<your-db-name>',
  `tablename`       = 'test_message',
  `partitionrouter` = 'true',   -- Route data to child partition tables
  `createparttable` = 'true'    -- Auto-create child partition tables (V3.7+ only)
);

INSERT INTO test_message_sink SELECT '20200327', * FROM test_message_src;
INSERT INTO test_message_sink SELECT '20200328', * FROM test_message_src;

Step 3: Publish and start the job.

Follow the same steps as in Write data to a regular Hologres table.

Step 4: Query data in Hologres.

Query across all partitions or filter by a specific partition:

SELECT * FROM test_message;
SELECT * FROM test_message WHERE bizdate = '20200327';

Data type mapping

For the data type mapping between Real-time Compute for Flink in exclusive mode and Hologres, see Data type summary.

What's next