このトピックでは、DataHub コネクタの使用方法について説明します。
背景
Alibaba Cloud DataHub は、ストリーミングデータを処理するために設計されたリアルタイムデータ配信プラットフォームです。DataHub でストリーミングデータをパブリッシュおよびサブスクライブし、他のプラットフォームにデータを配信できます。DataHub を使用すると、ストリーミングデータを分析し、ストリーミングデータに基づいてアプリケーションを構築できます。詳細については、「DataHub とは」をご参照ください。
DataHub は Kafka プロトコルと互換性があります。Upsert Kafka コネクタの代わりに標準の Kafka コネクタを使用して、DataHub からのデータの読み取りや DataHub へのデータの書き込みができます。詳細については、「Kafka との互換性」をご参照ください。
次の表に、DataHub コネクタでサポートされている機能を示します。
項目 | 説明 |
サポートされるタイプ | ソースおよびシンク |
実行モード | ストリーミングおよびバッチ |
データフォーマット | N/A |
メトリクス | N/A |
API タイプ | DataStream および SQL |
シンクでのデータ更新/削除のサポート | サポートされていません。シンクは、挿入のみの行をターゲット Topic に書き込むことができます。 |
構文
CREATE TEMPORARY TABLE datahub_input (
`time` BIGINT,
`sequence` STRING METADATA VIRTUAL,
`shard-id` BIGINT METADATA VIRTUAL,
`system-time` TIMESTAMP METADATA VIRTUAL
) WITH (
'connector' = 'datahub',
'subId' = '<yourSubId>',
'endPoint' = '<yourEndPoint>',
'project' = '<yourProjectName>',
'topic' = '<yourTopicName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}'
);コネクタオプション
一般
オプション
説明
型
必須
デフォルト値
備考
connector
使用するコネクタ。
文字列
はい
デフォルト値なし
値は datahub である必要があります。
endPoint
コンシューマーエンドポイント。
文字列
はい
デフォルト値なし
オプション値は、DataHub プロジェクトのリージョンによって異なります。詳細については、「エンドポイント」をご参照ください。
project
DataHub プロジェクト名。
文字列
はい
デフォルト値なし
DataHub プロジェクトの作成方法については、「DataHub の使用開始」をご参照ください。
topic
DataHub Topic 名。
文字列
はい
デフォルト値なし
DataHub Topic の作成方法については、「DataHub の使用開始」をご参照ください。
説明BLOB タイプ (型なしおよび非構造化データ用) の DataHub Topic の場合、対応する Flink テーブルには VARBINARY 列が 1 つだけ含まれている必要があります。
accessId
Alibaba Cloud アカウントの AccessKey ID。
文字列
はい
デフォルト値なし
詳細については、「コンソール操作」をご参照ください。
重要AccessKey ペアを保護するには、変数を使用して情報を指定します。詳細については、「変数の管理」をご参照ください。
accessKey
Alibaba Cloud アカウントの AccessKey Secret。
文字列
はい
デフォルト値なし
retryTimeout
リトライの最大タイムアウト期間。
整数
いいえ
1800000
デフォルト値を使用することをお勧めします。単位: ミリ秒。
retryInterval
再試行間隔。
整数
いいえ
1000
デフォルト値を使用することをお勧めします。単位: ミリ秒。
CompressType
読み取りおよび書き込みの圧縮ポリシー。
文字列
いいえ
lz4
lz4: lz4 圧縮アルゴリズム。
deflate: deflate 圧縮アルゴリズム。
"": 空の文字列。データ圧縮が無効であることを示します。
説明VVR 6.0.5 以降を使用する Realtime Compute for Apache Flink のみがこのオプションをサポートします。
ソース固有
オプション
説明
型
必須
デフォルト値
備考
subId
サブスクリプション ID。
文字列
はい
デフォルト値なし
DataHub サブスクリプションの作成方法の詳細については、「サブスクリプションの作成」をご参照ください。
maxFetchSize
一度に読み取られるデータレコードの数。
整数
いいえ
50
このオプションは読み取りパフォーマンスに影響します。読み取りスループットを向上させるには、より大きな値を設定できます。
maxBufferSize
非同期で読み取られるキャッシュデータレコードの最大数。
整数
いいえ
50
このオプションは読み取りパフォーマンスに影響します。読み取りスループットを向上させるには、より大きな値を設定できます。
fetchLatestDelay
データソースからデータがフェッチされなくなった後のスリープ期間。
整数
いいえ
500
単位: ミリ秒。データソースからデータがまれにしか送信されない場合は、このオプションに小さい値を指定して、読み取りスループットを最適化します。
lengthCheck
行ごとのフィールド数を確認するためのルール。
文字列
いいえ
NONE
NONE
行から解析されたフィールドの数が定義されたフィールドの数より大きい場合、データは定義されたフィールドの数に基づいて左から右に抽出されます。
行から解析されたフィールドの数が定義されたフィールドの数より少ない場合、この行はスキップされます。
SKIP: 行から解析されたフィールドの数が定義されたフィールドの数と異なる場合、この行はスキップされます。
EXCEPTION: 行から解析されたフィールドの数が定義されたフィールドの数と異なる場合、例外が報告されます。
PAD: データは、定義されたフィールドの順序に基づいて左から右にパディングされます。
行から解析されたフィールドの数が定義されたフィールドの数より大きい場合、データは定義されたフィールドの数に基づいて左から右に抽出されます。
行から解析されたフィールドの数が定義されたフィールドの数より少ない場合、欠落しているフィールドの値は左から右に "Null" でパディングされます。
columnErrorDebug
デバッグを有効にするかどうかを指定します。
ブール値
いいえ
false
false: デバッグは無効です。
true: デバッグが有効になり、解析例外に関するログが出力されます。
startTime
ログ消費が開始される時間。
文字列
いいえ
デフォルト値なし
フォーマット: yyyy-MM-dd hh:mm:ss。
endTime
ログ消費が停止される時間。
文字列
いいえ
デフォルト値なし
フォーマット: yyyy-MM-dd hh:mm:ss。
startTimeMs
ログ消費が開始される時間。
Long
いいえ
-1
単位: ミリ秒。このオプションは
startTimeよりも優先されます。デフォルト値 -1 は、DataHub Topic の最新のオフセットから消費することを示します。オフセットが存在しない場合、消費は最も早いオフセットから開始されます。重要デフォルト値に依存すると、データが失われる可能性があります。最初のチェックポイントの前にタスクが失敗した場合、DataHub Topic の最新のオフセットが進んでいる可能性があり、データを見逃す原因となります。これを防ぐには、デフォルトを使用する代わりに、このオプションを明示的に構成してください。
シンク固有
オプション
説明
型
必須
デフォルト値
備考
batchCount
一度に書き込むことができる行数。
整数
いいえ
500
このオプション値を大きくすると、レイテンシーは高くなりますが、書き込みスループットは向上します。
batchSize
一度に書き込むことができるデータのサイズ。
整数
いいえ
512000
この値を大きくすると、レイテンシーは高くなりますが、書き込みスループットは向上します。単位: バイト。
flushInterval
データフラッシュ間隔。
整数
いいえ
5000
このオプション値を大きくすると、レイテンシーは高くなりますが、書き込みスループットは向上します。単位: ミリ秒。
hashFields
列名。列名を指定すると、同じ名前の列の値が同じシャードに書き込まれます。
文字列
いいえ
null
複数の列値をコンマ (,) で区切ります (例:
hashFields=a,b)。デフォルト値 "null" はランダム書き込みを示します。timeZone
データのタイムゾーン。
文字列
いいえ
デフォルト値なし
オプション値は、タイムゾーン間の TIMESTAMP フィールドの変換に影響します。
schemaVersion
登録されたスキーマのバージョン。
整数
いいえ
-1
N/A
データ型のマッピング
Flink | DataHub |
TINYINT | TINYINT |
BOOLEAN | BOOLEAN |
INTEGER | INTEGER |
BIGINT | BIGINT |
BIGINT | TIMESTAMP |
TIMESTAMP | |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL | DECIMAL |
VARCHAR | STRING |
SMALLINT | SMALLINT |
VARBINARY | BLOB |
メタデータ
キー | 型 | 説明 |
shard-id | BIGINT METADATA VIRTUAL | シャード ID。 |
sequence | STRING METADATA VIRTUAL | データシーケンス。 |
system-time | TIMESTAMP METADATA VIRTUAL | システム時間。 |
前述の DataHub メタデータは、VVR 3.0.1 以降を使用している場合にのみ取得できます。
サンプルコード
ソース
CREATE TEMPORARY TABLE datahub_input ( `time` BIGINT, `sequence` STRING METADATA VIRTUAL, `shard-id` BIGINT METADATA VIRTUAL, `system-time` TIMESTAMP METADATA VIRTUAL ) WITH ( 'connector' = 'datahub', 'subId' = '<yourSubId>', 'endPoint' = '<yourEndPoint>', 'project' = '<yourProjectName>', 'topic' = '<yourTopicName>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}' ); CREATE TEMPORARY TABLE test_out ( `time` BIGINT, `sequence` STRING, `shard-id` BIGINT, `system-time` TIMESTAMP ) WITH ( 'connector' = 'print', 'logger' = 'true' ); INSERT INTO test_out SELECT `time`, `sequence` , `shard-id`, `system-time` FROM datahub_input;シンク
CREATE TEMPORARY table datahub_source( name VARCHAR ) WITH ( 'connector'='datahub', 'endPoint'='<endPoint>', 'project'='<yourProjectName>', 'topic'='<yourTopicName>', 'subId'='<yourSubId>', 'accessId'='${secret_values.ak_id}', 'accessKey'='${secret_values.ak_secret}', 'startTime'='2018-06-01 00:00:00' ); CREATE TEMPORARY table datahub_sink( name varchar ) WITH ( 'connector'='datahub', 'endPoint'='<endPoint>', 'project'='<yourProjectName>', 'topic'='<yourTopicName>', 'accessId'='${secret_values.ak_id}', 'accessKey'='${secret_values.ak_secret}', 'batchSize'='512000', 'batchCount'='500' ); INSERT INTO datahub_sink SELECT LOWER(name) from datahub_source;
Datastream API
DataStream API を呼び出してデータを読み書きする場合は、関連するタイプの DataStream コネクタを使用して Realtime Compute for Apache Flink に接続する必要があります。DataStream コネクタの構成方法の詳細については、「DataStream コネクタの設定」をご参照ください。
DataHub ソース
VVR は、SourceFunction インターフェイスを実装する DatahubSourceFunction クラスを提供します。このクラスを使用して、DataHub ソースからデータを読み取ることができます。次のサンプルコードは、DataHub からデータを読み取る方法を示しています。
env.setParallelism(1);
-- 接続構成を指定します。
DatahubSourceFunction datahubSource =
new DatahubSourceFunction(
<yourEndPoint>,
<yourProjectName>,
<yourTopicName>,
<yourSubId>,
<yourAccessId>,
<yourAccessKey>,
"public",
<yourStartTime>,
<yourEndTime>
);
datahubSource.setRequestTimeout(30 * 1000);
datahubSource.enableExitAfterReadFinished();
env.addSource(datahubSource)
.map((MapFunction<RecordEntry, Tuple2<String, Long>>) this::getStringLongTuple2)
.print();
env.execute();
private Tuple2<String, Long> getStringLongTuple2(RecordEntry recordEntry) {
Tuple2<String, Long> tuple2 = new Tuple2<>();
TupleRecordData recordData = (TupleRecordData) (recordEntry.getRecordData());
tuple2.f0 = (String) recordData.getField(0);
tuple2.f1 = (Long) recordData.getField(1);
return tuple2;StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataHub シンク
VVR は、DatahubSinkFunction インターフェイスを実装する OutputFormatSinkFunction クラスを提供します。このクラスを使用して、DataHub にデータを書き込むことができます。次のサンプルコードは、DataHub にデータを書き込む方法を示しています。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-- 接続構成を指定します。
env.generateSequence(0, 100)
.map((MapFunction<Long, RecordEntry>) aLong -> getRecordEntry(aLong, "default:"))
.addSink(
new DatahubSinkFunction<>(
<yourEndPoint>,
<yourProjectName>,
<yourTopicName>,
<yourSubId>,
<yourAccessId>,
<yourAccessKey>,
"public",
<schemaVersion> // schemaRegistry が有効な場合、データ書き込みのために schemaVersion の値を指定する必要があります。それ以外の場合は、この schemaVersion を 0 に設定できます。
);
env.execute();
private RecordEntry getRecordEntry(Long message, String s) {
RecordSchema recordSchema = new RecordSchema();
recordSchema.addField(new Field("f1", FieldType.STRING));
recordSchema.addField(new Field("f2", FieldType.BIGINT));
recordSchema.addField(new Field("f3", FieldType.DOUBLE));
recordSchema.addField(new Field("f4", FieldType.BOOLEAN));
recordSchema.addField(new Field("f5", FieldType.TIMESTAMP));
recordSchema.addField(new Field("f6", FieldType.DECIMAL));
RecordEntry recordEntry = new RecordEntry();
TupleRecordData recordData = new TupleRecordData(recordSchema);
recordData.setField(0, s + message);
recordData.setField(1, message);
recordEntry.setRecordData(recordData);
return recordEntry;
}XML
Maven 中央リポジトリに格納されているさまざまなバージョンの DataHub DataStream コネクタ を使用できます。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-datahub</artifactId>
<version>${vvr-version}</version>
</dependency>リファレンス
Realtime Compute for Apache Flink でサポートされているコネクタの詳細については、「サポートされているコネクタ」をご参照ください。
Kafka コネクタを使用して DataHub にアクセスする方法については、「Kafka」をご参照ください。
DataHub Topic が分割またはスケールインされた後に失敗した Flink タスクを回復するにはどうすればよいですか?