The DataHub connector lets you read streaming data from Alibaba Cloud DataHub into Flink jobs and write processed results back to DataHub topics. It supports both Flink SQL and the DataStream API.
Capabilities
| Item | Description |
|---|---|
| Supported type | Source and sink |
| Running mode | Streaming and batch |
| Data format | N/A |
| Metrics | N/A |
| API type | DataStream and SQL |
| Data update/deletion support in the sink | Not supported. The sink writes insert-only rows to the target topic. |
Prerequisites
Before you begin, ensure that you have:
-
A DataHub project and topic. See Get started with DataHub.
-
A DataHub subscription (required for source). See Create a subscription.
-
An Alibaba Cloud AccessKey ID and AccessKey secret. See Console operations.
Syntax
CREATE TEMPORARY TABLE datahub_input (
`time` BIGINT,
`sequence` STRING METADATA VIRTUAL,
`shard-id` BIGINT METADATA VIRTUAL,
`system-time` TIMESTAMP METADATA VIRTUAL
) WITH (
'connector' = 'datahub',
'subId' = '<yourSubId>',
'endPoint' = '<yourEndPoint>',
'project' = '<yourProjectName>',
'topic' = '<yourTopicName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}'
);
Connector options
General
| Option | Type | Required | Default | Description |
|---|---|---|---|---|
| connector | String | Yes | (none) | The connector type. Set this to datahub. |
| endPoint | String | Yes | (none) | The endpoint of the DataHub project. The value varies by region. See Endpoints. |
| project | String | Yes | (none) | The DataHub project name. |
| topic | String | Yes | (none) | The DataHub topic name. For BLOB topics (untyped, unstructured data), the Flink table must contain exactly one VARBINARY column. |
| accessId | String | Yes | (none) | The AccessKey ID of your Alibaba Cloud account. Store it as a variable instead of hardcoding it. See Manage variables. |
| accessKey | String | Yes | (none) | The AccessKey secret of your Alibaba Cloud account. |
| retryTimeout | Integer | No | 1800000 | The maximum timeout for a retry attempt, in milliseconds. |
| retryInterval | Integer | No | 1000 | The interval between retry attempts, in milliseconds. |
| CompressType | String | No | lz4 | The compression algorithm for reads and writes. Valid values: lz4, deflate, "" (disabled). Requires VVR 6.0.5 or later. |
Source-specific
| Option | Type | Required | Default | Description |
|---|---|---|---|---|
| subId | String | Yes | (none) | The DataHub subscription ID. |
| maxFetchSize | Integer | No | 50 | The number of records fetched per request. Increase this value to improve read throughput. |
| maxBufferSize | Integer | No | 50 | The maximum number of records cached from asynchronous reads. Increase this value to improve read throughput. |
| fetchLatestDelay | Integer | No | 500 | The sleep duration in milliseconds when no data is available. Decrease this value to reduce read latency for low-traffic topics. |
| lengthCheck | String | No | NONE | The rule for handling rows where the parsed field count does not match the defined column count. Valid values: NONE, SKIP, EXCEPTION, PAD. See Field count validation rules. |
| columnErrorDebug | Boolean | No | false | Specifies whether to enable debug logging for field parsing errors. Set to true to print parsing exception logs. |
| startTime | String | No | (none) | The timestamp to start consuming from. Format: yyyy-MM-dd hh:mm:ss. |
| endTime | String | No | (none) | The timestamp to stop consuming at. Format: yyyy-MM-dd hh:mm:ss. |
| startTimeMs | Long | No | -1 | The timestamp to start consuming from, in milliseconds. Takes precedence over startTime. See Consumption start position. |
Consumption start position
The startTimeMs option controls where the source starts reading:
-
-1 (default): Starts from the latest offset in the topic. If no offset exists, falls back to the earliest offset.
-
A specific timestamp: Starts from the first record at or after the specified timestamp.
The default value of -1 can cause data loss. If your job fails before its first checkpoint, the latest offset in the topic may have advanced, and records written during that window are skipped. Set startTimeMs explicitly to a specific timestamp to control the starting position.
Field count validation rules
The lengthCheck option determines what happens when the number of parsed fields in a row does not match the number of defined columns:
| Value | Behavior |
|---|---|
NONE (default) |
If parsed fields > defined columns: reads from left to right up to the defined count. If parsed fields < defined columns: skips the row. |
SKIP |
Skips rows where the parsed field count differs from the defined column count. |
EXCEPTION |
Throws an exception when the parsed field count differs from the defined column count. |
PAD |
Reads from left to right. If parsed fields > defined columns: reads from left to right up to the defined count. If parsed fields < defined columns: pads missing fields with null. |
Sink-specific
| Option | Type | Required | Default | Description |
|---|---|---|---|---|
| batchCount | Integer | No | 500 | The maximum number of rows per write batch. |
| batchSize | Integer | No | 512000 | The maximum size of a write batch, in bytes. |
| flushInterval | Integer | No | 5000 | The flush interval, in milliseconds. |
| hashFields | String | No | null | A comma-separated list of column names used to route rows to shards. Rows with the same values in these columns are written to the same shard. Default (null) uses random writes. Example: hashFields=a,b. |
| timeZone | String | No | (none) | The time zone used when converting TIMESTAMP fields. |
| schemaVersion | Integer | No | -1 | The schema version in the registered schema registry. |
Write batch flush behavior
A write batch is flushed to DataHub when any of the following conditions is met first:
-
The number of buffered rows reaches
batchCount. -
The total size of buffered data reaches
batchSize. -
The time since the last flush exceeds
flushInterval.
Increasing batchCount, batchSize, or flushInterval improves write throughput at the cost of higher latency.
Data type mappings
| Flink type | DataHub type |
|---|---|
| TINYINT | TINYINT |
| BOOLEAN | BOOLEAN |
| INTEGER | INTEGER |
| BIGINT | BIGINT |
| BIGINT | TIMESTAMP |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| DECIMAL | DECIMAL |
| VARCHAR | STRING |
| SMALLINT | SMALLINT |
| VARBINARY | BLOB |
Metadata
Metadata fields are read-only (R). Declare them as METADATA VIRTUAL in the source table definition to include them in queries without writing them back to DataHub.
| Key | Data type | Description | R/W |
|---|---|---|---|
| shard-id | BIGINT METADATA VIRTUAL | The shard ID of the record. | R |
| sequence | STRING METADATA VIRTUAL | The sequence number of the record within the shard. | R |
| system-time | TIMESTAMP METADATA VIRTUAL | The time at which DataHub received the record. | R |
Examples
Source
The following example reads data from a DataHub topic and prints it to the console.
CREATE TEMPORARY TABLE datahub_input (
`time` BIGINT,
`sequence` STRING METADATA VIRTUAL,
`shard-id` BIGINT METADATA VIRTUAL,
`system-time` TIMESTAMP METADATA VIRTUAL
) WITH (
'connector' = 'datahub',
'subId' = '<yourSubId>',
'endPoint' = '<yourEndPoint>',
'project' = '<yourProjectName>',
'topic' = '<yourTopicName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}'
);
CREATE TEMPORARY TABLE test_out (
`time` BIGINT,
`sequence` STRING,
`shard-id` BIGINT,
`system-time` TIMESTAMP
) WITH (
'connector' = 'print',
'logger' = 'true'
);
INSERT INTO test_out
SELECT
`time`,
`sequence`,
`shard-id`,
`system-time`
FROM datahub_input;
Sink
The following example reads from one DataHub topic, converts the name field to lowercase, and writes the results to another DataHub topic.
CREATE TEMPORARY TABLE datahub_source (
name VARCHAR
) WITH (
'connector' = 'datahub',
'endPoint' = '<endPoint>',
'project' = '<yourProjectName>',
'topic' = '<yourTopicName>',
'subId' = '<yourSubId>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'startTime' = '2018-06-01 00:00:00'
);
CREATE TEMPORARY TABLE datahub_sink (
name VARCHAR
) WITH (
'connector' = 'datahub',
'endPoint' = '<endPoint>',
'project' = '<yourProjectName>',
'topic' = '<yourTopicName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'batchSize' = '512000',
'batchCount' = '500'
);
INSERT INTO datahub_sink
SELECT
LOWER(name)
FROM datahub_source;
DataStream API
To use the DataStream API with DataHub, configure a DataStream connector for Realtime Compute for Apache Flink. See Settings of DataStream connectors.
Read from DataHub
VVR provides the DatahubSourceFunction class, which implements Flink's SourceFunction interface.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// Configure the DataHub source
DatahubSourceFunction datahubSource =
new DatahubSourceFunction(
<yourEndPoint>,
<yourProjectName>,
<yourTopicName>,
<yourSubId>,
<yourAccessId>,
<yourAccessKey>,
"public",
<yourStartTime>,
<yourEndTime>
);
datahubSource.setRequestTimeout(30 * 1000);
datahubSource.enableExitAfterReadFinished();
env.addSource(datahubSource)
.map((MapFunction<RecordEntry, Tuple2<String, Long>>) this::getStringLongTuple2)
.print();
env.execute();
private Tuple2<String, Long> getStringLongTuple2(RecordEntry recordEntry) {
Tuple2<String, Long> tuple2 = new Tuple2<>();
TupleRecordData recordData = (TupleRecordData) (recordEntry.getRecordData());
tuple2.f0 = (String) recordData.getField(0);
tuple2.f1 = (Long) recordData.getField(1);
return tuple2;
}
Write to DataHub
VVR provides the OutputFormatSinkFunction class, which implements the DatahubSinkFunction interface.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configure the DataHub sink
env.generateSequence(0, 100)
.map((MapFunction<Long, RecordEntry>) aLong -> getRecordEntry(aLong, "default:"))
.addSink(
new DatahubSinkFunction<>(
<yourEndPoint>,
<yourProjectName>,
<yourTopicName>,
<yourSubId>,
<yourAccessId>,
<yourAccessKey>,
"public",
<schemaVersion> // If schema registry is enabled, you must specify the schema version. Otherwise, set this to 0.
)
);
env.execute();
private RecordEntry getRecordEntry(Long message, String s) {
RecordSchema recordSchema = new RecordSchema();
recordSchema.addField(new Field("f1", FieldType.STRING));
recordSchema.addField(new Field("f2", FieldType.BIGINT));
recordSchema.addField(new Field("f3", FieldType.DOUBLE));
recordSchema.addField(new Field("f4", FieldType.BOOLEAN));
recordSchema.addField(new Field("f5", FieldType.TIMESTAMP));
recordSchema.addField(new Field("f6", FieldType.DECIMAL));
RecordEntry recordEntry = new RecordEntry();
TupleRecordData recordData = new TupleRecordData(recordSchema);
recordData.setField(0, s + message);
recordData.setField(1, message);
recordEntry.setRecordData(recordData);
return recordEntry;
}
Maven dependency
Add the DataHub DataStream connector to your project. All available versions are listed in the Maven central repository.
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-datahub</artifactId>
<version>${vvr-version}</version>
</dependency>
What's next
-
For a full list of connectors supported by Realtime Compute for Apache Flink, see Supported connectors.
-
To connect to DataHub using the Kafka connector instead, see Message Queue for Apache Kafka.
-
How do I resume a deployment that fails after a DataHub topic is split or scaled in?