All Products
Search
Document Center

Realtime Compute for Apache Flink:DataHub connector

Last Updated:Mar 26, 2026

The DataHub connector lets you read streaming data from Alibaba Cloud DataHub into Flink jobs and write processed results back to DataHub topics. It supports both Flink SQL and the DataStream API.

Note DataHub is compatible with the Kafka protocol. To connect Flink to DataHub using the Kafka protocol, use the standard Kafka connector — not the Upsert Kafka connector. For details, see Compatibility with Kafka.

Capabilities

Item Description
Supported type Source and sink
Running mode Streaming and batch
Data format N/A
Metrics N/A
API type DataStream and SQL
Data update/deletion support in the sink Not supported. The sink writes insert-only rows to the target topic.

Prerequisites

Before you begin, ensure that you have:

Syntax

CREATE TEMPORARY TABLE datahub_input (
  `time` BIGINT,
  `sequence`  STRING METADATA VIRTUAL,
  `shard-id` BIGINT METADATA VIRTUAL,
  `system-time` TIMESTAMP METADATA VIRTUAL
) WITH (
  'connector' = 'datahub',
  'subId' = '<yourSubId>',
  'endPoint' = '<yourEndPoint>',
  'project' = '<yourProjectName>',
  'topic' = '<yourTopicName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}'
);

Connector options

General

Option Type Required Default Description
connector String Yes (none) The connector type. Set this to datahub.
endPoint String Yes (none) The endpoint of the DataHub project. The value varies by region. See Endpoints.
project String Yes (none) The DataHub project name.
topic String Yes (none) The DataHub topic name. For BLOB topics (untyped, unstructured data), the Flink table must contain exactly one VARBINARY column.
accessId String Yes (none) The AccessKey ID of your Alibaba Cloud account. Store it as a variable instead of hardcoding it. See Manage variables.
accessKey String Yes (none) The AccessKey secret of your Alibaba Cloud account.
retryTimeout Integer No 1800000 The maximum timeout for a retry attempt, in milliseconds.
retryInterval Integer No 1000 The interval between retry attempts, in milliseconds.
CompressType String No lz4 The compression algorithm for reads and writes. Valid values: lz4, deflate, "" (disabled). Requires VVR 6.0.5 or later.

Source-specific

Option Type Required Default Description
subId String Yes (none) The DataHub subscription ID.
maxFetchSize Integer No 50 The number of records fetched per request. Increase this value to improve read throughput.
maxBufferSize Integer No 50 The maximum number of records cached from asynchronous reads. Increase this value to improve read throughput.
fetchLatestDelay Integer No 500 The sleep duration in milliseconds when no data is available. Decrease this value to reduce read latency for low-traffic topics.
lengthCheck String No NONE The rule for handling rows where the parsed field count does not match the defined column count. Valid values: NONE, SKIP, EXCEPTION, PAD. See Field count validation rules.
columnErrorDebug Boolean No false Specifies whether to enable debug logging for field parsing errors. Set to true to print parsing exception logs.
startTime String No (none) The timestamp to start consuming from. Format: yyyy-MM-dd hh:mm:ss.
endTime String No (none) The timestamp to stop consuming at. Format: yyyy-MM-dd hh:mm:ss.
startTimeMs Long No -1 The timestamp to start consuming from, in milliseconds. Takes precedence over startTime. See Consumption start position.

Consumption start position

The startTimeMs option controls where the source starts reading:

  • -1 (default): Starts from the latest offset in the topic. If no offset exists, falls back to the earliest offset.

  • A specific timestamp: Starts from the first record at or after the specified timestamp.

Important

The default value of -1 can cause data loss. If your job fails before its first checkpoint, the latest offset in the topic may have advanced, and records written during that window are skipped. Set startTimeMs explicitly to a specific timestamp to control the starting position.

Field count validation rules

The lengthCheck option determines what happens when the number of parsed fields in a row does not match the number of defined columns:

Value Behavior
NONE (default) If parsed fields > defined columns: reads from left to right up to the defined count. If parsed fields < defined columns: skips the row.
SKIP Skips rows where the parsed field count differs from the defined column count.
EXCEPTION Throws an exception when the parsed field count differs from the defined column count.
PAD Reads from left to right. If parsed fields > defined columns: reads from left to right up to the defined count. If parsed fields < defined columns: pads missing fields with null.

Sink-specific

Option Type Required Default Description
batchCount Integer No 500 The maximum number of rows per write batch.
batchSize Integer No 512000 The maximum size of a write batch, in bytes.
flushInterval Integer No 5000 The flush interval, in milliseconds.
hashFields String No null A comma-separated list of column names used to route rows to shards. Rows with the same values in these columns are written to the same shard. Default (null) uses random writes. Example: hashFields=a,b.
timeZone String No (none) The time zone used when converting TIMESTAMP fields.
schemaVersion Integer No -1 The schema version in the registered schema registry.

Write batch flush behavior

A write batch is flushed to DataHub when any of the following conditions is met first:

  • The number of buffered rows reaches batchCount.

  • The total size of buffered data reaches batchSize.

  • The time since the last flush exceeds flushInterval.

Increasing batchCount, batchSize, or flushInterval improves write throughput at the cost of higher latency.

Data type mappings

Flink type DataHub type
TINYINT TINYINT
BOOLEAN BOOLEAN
INTEGER INTEGER
BIGINT BIGINT
BIGINT TIMESTAMP
FLOAT FLOAT
DOUBLE DOUBLE
DECIMAL DECIMAL
VARCHAR STRING
SMALLINT SMALLINT
VARBINARY BLOB

Metadata

Metadata fields are read-only (R). Declare them as METADATA VIRTUAL in the source table definition to include them in queries without writing them back to DataHub.

Note Metadata fields are available only when using VVR 3.0.1 or later.
Key Data type Description R/W
shard-id BIGINT METADATA VIRTUAL The shard ID of the record. R
sequence STRING METADATA VIRTUAL The sequence number of the record within the shard. R
system-time TIMESTAMP METADATA VIRTUAL The time at which DataHub received the record. R

Examples

Source

The following example reads data from a DataHub topic and prints it to the console.

CREATE TEMPORARY TABLE datahub_input (
  `time` BIGINT,
  `sequence`  STRING METADATA VIRTUAL,
  `shard-id` BIGINT METADATA VIRTUAL,
  `system-time` TIMESTAMP METADATA VIRTUAL
) WITH (
  'connector' = 'datahub',
  'subId' = '<yourSubId>',
  'endPoint' = '<yourEndPoint>',
  'project' = '<yourProjectName>',
  'topic' = '<yourTopicName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}'
);

CREATE TEMPORARY TABLE test_out (
  `time` BIGINT,
  `sequence`  STRING,
  `shard-id` BIGINT,
  `system-time` TIMESTAMP
) WITH (
  'connector' = 'print',
  'logger' = 'true'
);

INSERT INTO test_out
SELECT
  `time`,
  `sequence`,
  `shard-id`,
  `system-time`
FROM datahub_input;

Sink

The following example reads from one DataHub topic, converts the name field to lowercase, and writes the results to another DataHub topic.

CREATE TEMPORARY TABLE datahub_source (
  name VARCHAR
) WITH (
  'connector' = 'datahub',
  'endPoint' = '<endPoint>',
  'project' = '<yourProjectName>',
  'topic' = '<yourTopicName>',
  'subId' = '<yourSubId>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'startTime' = '2018-06-01 00:00:00'
);

CREATE TEMPORARY TABLE datahub_sink (
  name VARCHAR
) WITH (
  'connector' = 'datahub',
  'endPoint' = '<endPoint>',
  'project' = '<yourProjectName>',
  'topic' = '<yourTopicName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'batchSize' = '512000',
  'batchCount' = '500'
);

INSERT INTO datahub_sink
SELECT
  LOWER(name)
FROM datahub_source;

DataStream API

Important

To use the DataStream API with DataHub, configure a DataStream connector for Realtime Compute for Apache Flink. See Settings of DataStream connectors.

Read from DataHub

VVR provides the DatahubSourceFunction class, which implements Flink's SourceFunction interface.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

// Configure the DataHub source
DatahubSourceFunction datahubSource =
    new DatahubSourceFunction(
        <yourEndPoint>,
        <yourProjectName>,
        <yourTopicName>,
        <yourSubId>,
        <yourAccessId>,
        <yourAccessKey>,
        "public",
        <yourStartTime>,
        <yourEndTime>
    );
datahubSource.setRequestTimeout(30 * 1000);
datahubSource.enableExitAfterReadFinished();

env.addSource(datahubSource)
    .map((MapFunction<RecordEntry, Tuple2<String, Long>>) this::getStringLongTuple2)
    .print();
env.execute();

private Tuple2<String, Long> getStringLongTuple2(RecordEntry recordEntry) {
    Tuple2<String, Long> tuple2 = new Tuple2<>();
    TupleRecordData recordData = (TupleRecordData) (recordEntry.getRecordData());
    tuple2.f0 = (String) recordData.getField(0);
    tuple2.f1 = (Long) recordData.getField(1);
    return tuple2;
}

Write to DataHub

VVR provides the OutputFormatSinkFunction class, which implements the DatahubSinkFunction interface.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Configure the DataHub sink
env.generateSequence(0, 100)
    .map((MapFunction<Long, RecordEntry>) aLong -> getRecordEntry(aLong, "default:"))
    .addSink(
        new DatahubSinkFunction<>(
            <yourEndPoint>,
            <yourProjectName>,
            <yourTopicName>,
            <yourSubId>,
            <yourAccessId>,
            <yourAccessKey>,
            "public",
            <schemaVersion> // If schema registry is enabled, you must specify the schema version. Otherwise, set this to 0.
        )
    );
env.execute();

private RecordEntry getRecordEntry(Long message, String s) {
    RecordSchema recordSchema = new RecordSchema();
    recordSchema.addField(new Field("f1", FieldType.STRING));
    recordSchema.addField(new Field("f2", FieldType.BIGINT));
    recordSchema.addField(new Field("f3", FieldType.DOUBLE));
    recordSchema.addField(new Field("f4", FieldType.BOOLEAN));
    recordSchema.addField(new Field("f5", FieldType.TIMESTAMP));
    recordSchema.addField(new Field("f6", FieldType.DECIMAL));
    RecordEntry recordEntry = new RecordEntry();
    TupleRecordData recordData = new TupleRecordData(recordSchema);
    recordData.setField(0, s + message);
    recordData.setField(1, message);
    recordEntry.setRecordData(recordData);
    return recordEntry;
}

Maven dependency

Add the DataHub DataStream connector to your project. All available versions are listed in the Maven central repository.

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-datahub</artifactId>
    <version>${vvr-version}</version>
</dependency>

What's next