変更追跡タスクを設定した後、flink-dts-connectorファイルを使用して追跡データを消費できます。 このトピックでは、flink-dts-connectorファイルを使用して追跡データを消費する方法について説明します。
制限事項
Data Transmission Service (DTS) は、次のタイプのFlinkプログラムをサポートしています。DataStream APIおよびTable API & SQL。
Table APIとSQLプログラムを使用する場合、変更追跡タスクを設定するたびに1つのテーブルのデータのみを使用できます。 複数のテーブルのデータを使用する場合は、テーブルごとにタスクを設定する必要があります。
手順
この例では、Windows用IntelliJ IDEA Community Edition 2020.1が使用されています。
変更追跡タスクを作成します。 詳細については、「ApsaraDB RDS For MySQLインスタンスからのデータ変更の追跡」、「PolarDB for MySQLクラスターからのデータ変更の追跡」、または「自己管理型Oracleデータベースからのデータ変更の追跡」をご参照ください。
1つ以上のコンシューマグループを作成します。 詳細については、「コンシューマーグループの作成」をご参照ください。
flink-dts-connectorファイルをダウンロードして解凍します。
IntelliJ IDEAを開きます。 表示されるウィンドウで、[開く] または [インポート] をクリックします。
表示されるダイアログボックスで、flink-dts-connectorファイルが解凍されているディレクトリに移動し、フォルダを展開してpom.xmlファイルを見つけます。
表示されるダイアログボックスで、プロジェクトとして開く.
次の依存関係をpom.xmlファイルに追加します。
<dependency> <groupId>com.alibaba.flink</groupId> <artifactId>flink-dts-connector</artifactId> <version>1.1.1-スナップショット </version> <classifier>jar-with-dependencies</classifier> </dependency>
IntelliJ IDEAで、フォルダを展開してJavaファイルを見つけます。 次に、使用するFlinkコネクタのタイプに基づいてJavaファイルをダブルクリックします。
DataStream APIコネクタを使用する場合は、flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\datastream\DtsExample.javaファイルをダブルクリックし、次の操作を実行する必要があります。
IntelliJ IDEAの上部メニューバーで、実行アイコンをクリックします。
表示されるダイアログボックスで、
を選択します。[プログラム引数] フィールドにパラメーターと対応する値を入力し、[実行] をクリックしてflink-dts-connectorを実行します。
説明パラメーターとパラメーター値を取得するためのメソッドの詳細については、このトピックの「パラメーター」をご参照ください。
-- broker-url dts-cn-*******. ******.***: *******-トピックcn_hangzhou_rm_*********_dtstest_version2-sid dts ********-ユーザーdtstest-パスワードTest123456-チェックポイント1624440043
次の図は、Flinkプログラムがソースデータベースからのデータ変更を追跡できることを示しています。
説明データ変更の特定のレコードを照会するには、FlinkプログラムのTask Managerページに移動します。
Table APIとSQLコネクタを使用する場合は、flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\sql\DtsTableISelectTCaseTest.javaファイルをダブルクリックし、次の操作を実行する必要があります。
説明単一の
DtsTableISelectTCaseTest.java
ファイルを使用して、1つの変更追跡タスクのみを構成し、1つのテーブルのデータのみを使用できます。 複数のテーブルのデータを使用する場合は、テーブルごとにタスクを設定する必要があります。次の図に示すように、スラッシュ (
//
) を2つ入力し、コメントを追加します。データの変更を追跡するテーブルに関する情報を指定します。 SQL文がサポートされています。
変更追跡インスタンスに必要なパラメーターを設定します。 詳細については、このトピックの「パラメーター」セクションをご参照ください。
IntelliJ IDEAの上部メニューバーで、[Run'DtsTableISelectTCaseTest'] をクリックしてflink-dts-connectorを実行します。
次の図は、Flinkプログラムがソースデータベースからのデータ変更を追跡できることを示しています。
説明データ変更の特定のレコードを照会するには、FlinkプログラムのTask Managerページに移動します。
パラメーター
DstExampleファイルのパラメーター | DtsTableISelectTCaseTestファイルのパラメーター | 説明 | パラメータ値を取得するメソッド |
|
| 変更追跡インスタンスのネットワークアドレスとポート番号。 説明 内部ネットワーク上のデータ変更を追跡する場合、ネットワーク遅延は最小限に抑えられます。 これは、FlinkプログラムをデプロイするElastic Compute Service (ECS) インスタンスがクラシックネットワーク上にある場合、または変更追跡インスタンスと同じ仮想プライベートクラウド (VPC) にある場合に適用されます。 | DTSコンソールで、管理する変更追跡インスタンスを見つけ、インスタンスIDをクリックします。 [タスク設定の表示] ページで、追跡対象のトピック、ネットワークアドレス、ポート番号を表示できます。 |
|
| 変更追跡インスタンスのトピックの名前。 | |
|
| コンシューマーグループの ID です。 | DTSコンソールで、管理する変更追跡インスタンスを見つけ、インスタンスIDをクリックします。 左側のナビゲーションウィンドウで、[データの使用] をクリックします。 コンシューマーグループのIDとユーザー名を表示できます。 説明 コンシューマーグループのユーザー名のパスワードは、コンシューマーグループの作成時に指定します。 |
|
| コンシューマーグループのユーザー名。 警告 このトピックで説明するflink-dts-connectorファイルを使用していない場合は、このパラメーターを | |
|
| コンシューマーグループのユーザー名のパスワード。 | |
|
| 消費者オフセット。 flink-dts-connectorが最初のデータレコードを消費したときに生成されるタイムスタンプです。 この値は UNIX タイムスタンプです。 例: 1624440043。 説明 コンシューマオフセットは、次のシナリオで使用できます。
| 次の図に示すように、コンシューマオフセットは変更追跡インスタンスのデータ範囲内にある必要があります。 コンシューマオフセットは、UNIXタイムスタンプに変換する必要があります。 説明 検索エンジンを使用して、UNIXタイムスタンプコンバーターを取得できます。 |
なし |
| 変更追跡用のオブジェクト。The object for change tracking. 指定できるテーブルは1つだけです。 テーブル名を指定するときは、次の要件に従う必要があります。
| DTSコンソールで、管理する変更追跡インスタンスを見つけ、インスタンスIDをクリックします。 [タスク設定の表示] ページで、右上隅の [変更追跡用のオブジェクトの表示] をクリックします。 表示されるダイアログボックスで、変更追跡のオブジェクトが属するデータベースとテーブルを表示できます。 |
よくある質問
エラーメッセージ | 考えられる原因 | 解決策 |
| 増分データを読み取るためにDTSが使用するDStoreモジュールが切り替えられます。 その結果、Flinkプログラムのコンシューマオフセットが失われる。 | Flinkプログラムを再起動する必要はありません。 データ消費を再開するには、Flinkプログラムのコンシューマオフセットをクエリし、DtsExample.javaおよびDtsTableISelectTCaseTest.javaファイルに |