変更追跡タスクを構成した後、Flink プログラム内で flink-dts-connector を使用して追跡データを消費します。Data Transmission Service (DTS) では、2 種類の Flink プログラミングモデル(DataStream API および Table API & SQL)をサポートしています。
注意事項
Table API & SQL:各変更追跡タスクの構成は、1 つのテーブルからのみデータを消費します。複数のテーブルからデータを消費する場合は、各テーブルごとに個別のタスクを構成してください。
前提条件
開始する前に、以下の条件を満たしていることを確認してください。
変更追跡タスク。「変更追跡シナリオの概要」をご参照ください。
1 つ以上のコンシューマーグループ。詳細については、「コンシューマーグループの作成」をご参照ください。
flink-dts-connector のセットアップ
本例では、IntelliJ IDEA Community Edition 2020.1(Windows 版)を使用します。
flink-dts-connector リポジトリをダウンロードし、解凍します。
プロジェクトを IntelliJ IDEA で開きます。
起動画面で、[開くまたはインポート] をクリックします。

[プロジェクトとして開く] をクリックします。
次の依存関係を
pom.xmlに追加します。<dependency> <groupId>com.alibaba.flink</groupId> <artifactId>flink-dts-connector</artifactId> <version>1.1.1-SNAPSHOT</version> <classifier>jar-with-dependencies</classifier> </dependency>
DataStream API を使用した実行
flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\datastream\DtsExample.java を開きます。
コネクタを実行するには、以下の手順を実行します。
上部メニューバーで、[実行] アイコンをクリックします。

[DtsExample] > [Edit] を選択します。

[プログラム引数] フィールドに接続パラメーターを入力し、[実行] をクリックします。プレースホルダー値は、実際の値に置き換えてください。
--broker-url <endpoint>:<port> --topic <topic-name> --sid <consumer-group-id> --user <username> --password <password> --checkpoint <unix-timestamp>例:
--broker-url dts-cn-******.******.***:****** --topic cn_hangzhou_rm_**********_dtstest_version2 --sid dts****** --user dtstest --password Test123456 --checkpoint 1624440043各パラメーターの説明および値の取得方法については、「パラメーター」をご参照ください。
コネクタが正常に実行されると、Flink プログラムによりソースデータベースからのデータ変更が追跡・表示されます。

特定の変更レコードをクエリするには、Flink プログラムの [タスクマネージャー] ページに移動します。
Table API & SQL を使用した実行
単一の DtsTableISelectTCaseTest.java ファイルは、1 つの変更追跡タスクおよび 1 つのテーブルのみをサポートします。複数のテーブルからデータを消費する場合は、各テーブルごとに個別のタスクを構成してください。flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\sql\DtsTableISelectTCaseTest.java を開きます。
コネクタを実行するには、以下の手順を実行します。
既存の構成行をコメントアウトするため、各行の先頭に
//を追加します。
SQL ステートメントを使用して、追跡対象のテーブルを指定します。
変更追跡インスタンスの接続パラメーターを設定します。パラメーターの説明については、「パラメーター」をご参照ください。

上部メニューバーで、[‘DtsTableISelectTCaseTest’ の実行] をクリックします。
コネクタが正常に実行されると、Flink プログラムによりソースデータベースからのデータ変更が追跡・表示されます。

特定の変更レコードをクエリするには、Flink プログラムの [タスクマネージャー] ページに移動します。
パラメーター
| パラメーター(DataStream API) | パラメーター(Table API & SQL) | 説明 | 値の取得方法 |
|---|---|---|---|
broker-url | dts.server | 変更追跡インスタンスのエンドポイントおよびポート番号です。ネットワーク遅延を最小限に抑えるため、Flink プログラムはクラシックネットワーク上の Elastic Compute Service (ECS) インスタンス、または変更追跡インスタンスと同じ仮想プライベートクラウド(VPC)内にデプロイすることを推奨します。 | DTS コンソールでインスタンス ID をクリックし、[基本情報] ページの [Topic] および [ネットワーク] を確認します。 |
topic | topic | 変更追跡インスタンスのトピック名です。 | 上記と同様です。 |
sid | dts.sid | コンシューマーグループ ID です。 | DTS コンソールでインスタンス ID をクリックし、左側ナビゲーションウィンドウから [データ消費] をクリックします。[コンシューマーグループ ID/名] および [アカウント] を確認します。 |
user | dts.user | コンシューマーグループのユーザー名です。 警告 他のクライアント(flink-dts-connector 以外)を使用する場合、ユーザー名は | 上記と同様です。 |
password | dts.password | コンシューマーグループのパスワードです。 | コンシューマーグループ作成時に指定した値です。 |
checkpoint | dts.checkpoint | 消費チェックポイント — コネクタがデータ消費を開始する UNIX タイムスタンプです。「消費チェックポイントの設定」をご参照ください。 | タイムスタンプは、変更追跡インスタンスのデータ範囲内である必要があります。「変更追跡タスク」ページの [データ範囲] を確認してください。オンラインコンバーターを使用して UNIX タイムスタンプを取得できます。 |
| 該当なし | dts-cdc.table.name | 追跡対象のテーブル名で、<データベース名>.<テーブル名> の形式(例:dtstestdata.order)で指定します。1 つの構成につき 1 つのテーブルのみ対応します。 | DTS コンソールでインスタンス ID をクリックし、[基本情報] または [タスク管理] ページで [オブジェクトの表示] をクリックします。 |
消費チェックポイントの設定
checkpoint/dts.checkpoint パラメーターは、コネクタがデータ読み取りを開始する位置を制御します。以下の 2 つのシナリオで使用します。
中断後の再開:消費が中断された場合、最後に消費されたオフセットをチェックポイントとして設定することで、データ損失を防ぎながら再開できます。
特定時点からの開始:変更追跡インスタンスのデータ範囲内の任意のタイムスタンプをチェックポイントとして設定し、その時点以降のデータを消費できます。
値は UNIX タイムスタンプである必要があり、変更追跡インスタンスのデータ範囲内に収まっている必要があります。「変更追跡タスク」ページの [データ範囲] 列でデータ範囲を確認できます。
トラブルシューティング
| エラーメッセージ | 原因 | 解決方法 |
|---|---|---|
Cluster changed from * to *, consumer require restart. | DTS が増分データを読み取るために使用する DStore モジュールが切り替わったため、Flink プログラムがコンシューマーオフセットを失いました。 | Flink プログラムを再起動しないでください。現在のコンシューマーオフセットをクエリし、checkpoint(DataStream API)または dts.checkpoint(Table API & SQL)をその値に設定して、消費を再開してください。 |
