すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:DataHub コネクタ

最終更新日:Nov 06, 2025

このトピックでは、DataHub コネクタの使用方法について説明します。

背景

Alibaba Cloud DataHub は、ストリーミングデータを処理するために設計されたリアルタイムデータ配信プラットフォームです。DataHub でストリーミングデータをパブリッシュおよびサブスクライブし、他のプラットフォームにデータを配信できます。DataHub を使用すると、ストリーミングデータを分析し、ストリーミングデータに基づいてアプリケーションを構築できます。詳細については、「DataHub とは」をご参照ください。

説明

DataHub は Kafka プロトコルと互換性があります。Upsert Kafka コネクタの代わりに標準の Kafka コネクタを使用して、DataHub からのデータの読み取りや DataHub へのデータの書き込みができます。詳細については、「Kafka との互換性」をご参照ください。

次の表に、DataHub コネクタでサポートされている機能を示します。

項目

説明

サポートされるタイプ

ソースおよびシンク

実行モード

ストリーミングおよびバッチ

データフォーマット

N/A

メトリクス

N/A

API タイプ

DataStream および SQL

シンクでのデータ更新/削除のサポート

サポートされていません。シンクは、挿入のみの行をターゲット Topic に書き込むことができます。

構文

CREATE TEMPORARY TABLE datahub_input (
  `time` BIGINT,
  `sequence`  STRING METADATA VIRTUAL,
  `shard-id` BIGINT METADATA VIRTUAL,
  `system-time` TIMESTAMP METADATA VIRTUAL
) WITH (
  'connector' = 'datahub',
  'subId' = '<yourSubId>',
  'endPoint' = '<yourEndPoint>',
  'project' = '<yourProjectName>',
  'topic' = '<yourTopicName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}'
);

コネクタオプション

  • 一般

    オプション

    説明

    必須

    デフォルト値

    備考

    connector

    使用するコネクタ。

    文字列

    はい

    デフォルト値なし

    値は datahub である必要があります。

    endPoint

    コンシューマーエンドポイント。

    文字列

    はい

    デフォルト値なし

    オプション値は、DataHub プロジェクトのリージョンによって異なります。詳細については、「エンドポイント」をご参照ください。

    project

    DataHub プロジェクト名。

    文字列

    はい

    デフォルト値なし

    DataHub プロジェクトの作成方法については、「DataHub の使用開始」をご参照ください。

    topic

    DataHub Topic 名。

    文字列

    はい

    デフォルト値なし

    DataHub Topic の作成方法については、「DataHub の使用開始」をご参照ください。

    説明

    BLOB タイプ (型なしおよび非構造化データ用) の DataHub Topic の場合、対応する Flink テーブルには VARBINARY 列が 1 つだけ含まれている必要があります。

    accessId

    Alibaba Cloud アカウントの AccessKey ID。

    文字列

    はい

    デフォルト値なし

    詳細については、「コンソール操作」をご参照ください。

    重要

    AccessKey ペアを保護するには、変数を使用して情報を指定します。詳細については、「変数の管理」をご参照ください。

    accessKey

    Alibaba Cloud アカウントの AccessKey Secret。

    文字列

    はい

    デフォルト値なし

    retryTimeout

    リトライの最大タイムアウト期間。

    整数

    いいえ

    1800000

    デフォルト値を使用することをお勧めします。単位: ミリ秒。

    retryInterval

    再試行間隔。

    整数

    いいえ

    1000

    デフォルト値を使用することをお勧めします。単位: ミリ秒。

    CompressType

    読み取りおよび書き込みの圧縮ポリシー。

    文字列

    いいえ

    lz4

    • lz4: lz4 圧縮アルゴリズム。

    • deflate: deflate 圧縮アルゴリズム。

    • "": 空の文字列。データ圧縮が無効であることを示します。

    説明

    VVR 6.0.5 以降を使用する Realtime Compute for Apache Flink のみがこのオプションをサポートします。

  • ソース固有

    オプション

    説明

    必須

    デフォルト値

    備考

    subId

    サブスクリプション ID。

    文字列

    はい

    デフォルト値なし

    DataHub サブスクリプションの作成方法の詳細については、「サブスクリプションの作成」をご参照ください。

    maxFetchSize

    一度に読み取られるデータレコードの数。

    整数

    いいえ

    50

    このオプションは読み取りパフォーマンスに影響します。読み取りスループットを向上させるには、より大きな値を設定できます。

    maxBufferSize

    非同期で読み取られるキャッシュデータレコードの最大数。

    整数

    いいえ

    50

    このオプションは読み取りパフォーマンスに影響します。読み取りスループットを向上させるには、より大きな値を設定できます。

    fetchLatestDelay

    データソースからデータがフェッチされなくなった後のスリープ期間。

    整数

    いいえ

    500

    単位: ミリ秒。データソースからデータがまれにしか送信されない場合は、このオプションに小さい値を指定して、読み取りスループットを最適化します。

    lengthCheck

    行ごとのフィールド数を確認するためのルール。

    文字列

    いいえ

    NONE

    • NONE

      • 行から解析されたフィールドの数が定義されたフィールドの数より大きい場合、データは定義されたフィールドの数に基づいて左から右に抽出されます。

      • 行から解析されたフィールドの数が定義されたフィールドの数より少ない場合、この行はスキップされます。

    • SKIP: 行から解析されたフィールドの数が定義されたフィールドの数と異なる場合、この行はスキップされます。

    • EXCEPTION: 行から解析されたフィールドの数が定義されたフィールドの数と異なる場合、例外が報告されます。

    • PAD: データは、定義されたフィールドの順序に基づいて左から右にパディングされます。

      • 行から解析されたフィールドの数が定義されたフィールドの数より大きい場合、データは定義されたフィールドの数に基づいて左から右に抽出されます。

      • 行から解析されたフィールドの数が定義されたフィールドの数より少ない場合、欠落しているフィールドの値は左から右に "Null" でパディングされます。

    columnErrorDebug

    デバッグを有効にするかどうかを指定します。

    ブール値

    いいえ

    false

    • false: デバッグは無効です。

    • true: デバッグが有効になり、解析例外に関するログが出力されます。

    startTime

    ログ消費が開始される時間。

    文字列

    いいえ

    デフォルト値なし

    フォーマット: yyyy-MM-dd hh:mm:ss。

    endTime

    ログ消費が停止される時間。

    文字列

    いいえ

    デフォルト値なし

    フォーマット: yyyy-MM-dd hh:mm:ss。

    startTimeMs

    ログ消費が開始される時間。

    Long

    いいえ

    -1

    単位: ミリ秒。このオプションは startTime よりも優先されます。デフォルト値 -1 は、DataHub Topic の最新のオフセットから消費することを示します。オフセットが存在しない場合、消費は最も早いオフセットから開始されます。

    重要

    デフォルト値に依存すると、データが失われる可能性があります。最初のチェックポイントの前にタスクが失敗した場合、DataHub Topic の最新のオフセットが進んでいる可能性があり、データを見逃す原因となります。これを防ぐには、デフォルトを使用する代わりに、このオプションを明示的に構成してください。

  • シンク固有

    オプション

    説明

    必須

    デフォルト値

    備考

    batchCount

    一度に書き込むことができる行数。

    整数

    いいえ

    500

    このオプション値を大きくすると、レイテンシーは高くなりますが、書き込みスループットは向上します。

    batchSize

    一度に書き込むことができるデータのサイズ。

    整数

    いいえ

    512000

    この値を大きくすると、レイテンシーは高くなりますが、書き込みスループットは向上します。単位: バイト。

    flushInterval

    データフラッシュ間隔。

    整数

    いいえ

    5000

    このオプション値を大きくすると、レイテンシーは高くなりますが、書き込みスループットは向上します。単位: ミリ秒。

    hashFields

    列名。列名を指定すると、同じ名前の列の値が同じシャードに書き込まれます。

    文字列

    いいえ

    null

    複数の列値をコンマ (,) で区切ります (例: hashFields=a,b)。デフォルト値 "null" はランダム書き込みを示します。

    timeZone

    データのタイムゾーン。

    文字列

    いいえ

    デフォルト値なし

    オプション値は、タイムゾーン間の TIMESTAMP フィールドの変換に影響します。

    schemaVersion

    登録されたスキーマのバージョン。

    整数

    いいえ

    -1

    N/A

データ型のマッピング

Flink

DataHub

TINYINT

TINYINT

BOOLEAN

BOOLEAN

INTEGER

INTEGER

BIGINT

BIGINT

BIGINT

TIMESTAMP

TIMESTAMP

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL

DECIMAL

VARCHAR

STRING

SMALLINT

SMALLINT

VARBINARY

BLOB

メタデータ

キー

説明

shard-id

BIGINT METADATA VIRTUAL

シャード ID。

sequence

STRING METADATA VIRTUAL

データシーケンス。

system-time

TIMESTAMP METADATA VIRTUAL

システム時間。

説明

前述の DataHub メタデータは、VVR 3.0.1 以降を使用している場合にのみ取得できます。

サンプルコード

  • ソース

    CREATE TEMPORARY TABLE datahub_input (
      `time` BIGINT,
      `sequence`  STRING METADATA VIRTUAL,
      `shard-id` BIGINT METADATA VIRTUAL,
      `system-time` TIMESTAMP METADATA VIRTUAL
    ) WITH (
      'connector' = 'datahub',
      'subId' = '<yourSubId>',
      'endPoint' = '<yourEndPoint>',
      'project' = '<yourProjectName>',
      'topic' = '<yourTopicName>',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}'
    );
    
    CREATE TEMPORARY TABLE test_out (
      `time` BIGINT,
      `sequence`  STRING,
      `shard-id` BIGINT,
      `system-time` TIMESTAMP
    ) WITH (
      'connector' = 'print',
      'logger' = 'true'
    );
    
    INSERT INTO test_out
    SELECT
      `time`,
      `sequence` ,
      `shard-id`,
      `system-time`
    FROM datahub_input;
  • シンク

    CREATE TEMPORARY table datahub_source(
      name VARCHAR
    ) WITH (
      'connector'='datahub',
      'endPoint'='<endPoint>',
      'project'='<yourProjectName>',
      'topic'='<yourTopicName>',
      'subId'='<yourSubId>',
      'accessId'='${secret_values.ak_id}',
      'accessKey'='${secret_values.ak_secret}',
      'startTime'='2018-06-01 00:00:00'
    );
    
    CREATE TEMPORARY table datahub_sink(
      name varchar
    ) WITH (
      'connector'='datahub',
      'endPoint'='<endPoint>',
      'project'='<yourProjectName>',
      'topic'='<yourTopicName>',
      'accessId'='${secret_values.ak_id}',
      'accessKey'='${secret_values.ak_secret}',
      'batchSize'='512000',
      'batchCount'='500'
    );
    
    INSERT INTO datahub_sink
    SELECT
      LOWER(name)
    from datahub_source;

Datastream API

重要

DataStream API を呼び出してデータを読み書きする場合は、関連するタイプの DataStream コネクタを使用して Realtime Compute for Apache Flink に接続する必要があります。DataStream コネクタの構成方法の詳細については、「DataStream コネクタの設定」をご参照ください。

DataHub ソース

VVR は、SourceFunction インターフェイスを実装する DatahubSourceFunction クラスを提供します。このクラスを使用して、DataHub ソースからデータを読み取ることができます。次のサンプルコードは、DataHub からデータを読み取る方法を示しています。


env.setParallelism(1);
-- 接続構成を指定します。
DatahubSourceFunction datahubSource =
    new DatahubSourceFunction(
    <yourEndPoint>,
    <yourProjectName>,
    <yourTopicName>,
    <yourSubId>,
    <yourAccessId>,
    <yourAccessKey>,
    "public",
    <yourStartTime>,
    <yourEndTime>
    );
datahubSource.setRequestTimeout(30 * 1000);
datahubSource.enableExitAfterReadFinished();
env.addSource(datahubSource)
    .map((MapFunction<RecordEntry, Tuple2<String, Long>>) this::getStringLongTuple2)
    .print();
env.execute();
private Tuple2<String, Long> getStringLongTuple2(RecordEntry recordEntry) {
    Tuple2<String, Long> tuple2 = new Tuple2<>();
    TupleRecordData recordData = (TupleRecordData) (recordEntry.getRecordData());
    tuple2.f0 = (String) recordData.getField(0);
    tuple2.f1 = (Long) recordData.getField(1);
    return tuple2;StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataHub シンク

VVR は、DatahubSinkFunction インターフェイスを実装する OutputFormatSinkFunction クラスを提供します。このクラスを使用して、DataHub にデータを書き込むことができます。次のサンプルコードは、DataHub にデータを書き込む方法を示しています。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-- 接続構成を指定します。
env.generateSequence(0, 100)
    .map((MapFunction<Long, RecordEntry>) aLong -> getRecordEntry(aLong, "default:"))
    .addSink(
    new DatahubSinkFunction<>(
       <yourEndPoint>,
       <yourProjectName>,
       <yourTopicName>,
       <yourSubId>,
       <yourAccessId>,
       <yourAccessKey>,
       "public",
       <schemaVersion> // schemaRegistry が有効な場合、データ書き込みのために schemaVersion の値を指定する必要があります。それ以外の場合は、この schemaVersion を 0 に設定できます。
       );
env.execute();
private RecordEntry getRecordEntry(Long message, String s) {
    RecordSchema recordSchema = new RecordSchema();
    recordSchema.addField(new Field("f1", FieldType.STRING));
    recordSchema.addField(new Field("f2", FieldType.BIGINT));
    recordSchema.addField(new Field("f3", FieldType.DOUBLE));
    recordSchema.addField(new Field("f4", FieldType.BOOLEAN));
    recordSchema.addField(new Field("f5", FieldType.TIMESTAMP));
    recordSchema.addField(new Field("f6", FieldType.DECIMAL));
    RecordEntry recordEntry = new RecordEntry();
    TupleRecordData recordData = new TupleRecordData(recordSchema);
    recordData.setField(0, s + message);
    recordData.setField(1, message);
    recordEntry.setRecordData(recordData);
    return recordEntry;
}

XML

Maven 中央リポジトリに格納されているさまざまなバージョンの DataHub DataStream コネクタ を使用できます。

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-datahub</artifactId>
    <version>${vvr-version}</version>
</dependency>

リファレンス