本トピックでは、Kafka から MaxCompute へ増分データを定期的に同期するスケジュールを設定する方法について説明します。例として、1 分ごと、1 時間ごと、または 1 日ごとの同期を実行し、MaxCompute の時間単位または日単位のパーティションテーブルにデータを書き込む方法を示します。
注意事項
Kafka のバージョンは 0.10.2 以降かつ 2.2.x 以前である必要があります。また、Kafka ではレコードのタイムスタンプが有効化されており、レコードに正しいビジネスタイムスタンプが含まれている必要があります。
増分データ同期が開始された後も、開始時刻以前または等しいタイムスタンプを持つレコードが Kafka Topic に書き込まれる場合があります。これらのレコードは読み取られない可能性があります。Kafka Topic へのデータ書き込みが遅延している場合や、タイムスタンプの順序が保たれていない場合、オフライン同期タスクでデータ損失が発生するリスクがあることにご注意ください。
Kafka 側の同期終了ポリシーとして、「1 分間新しいデータがない」を選択できるのは、以下の条件をすべて満たす場合のみです。それ以外の場合、データ損失が発生する可能性があります。
Kafka Topic の一部またはすべてのパーティションに、長期間(例:10 分以上)新しいデータが書き込まれていない状態が継続している。
各定期インスタンスが開始した後、終了時刻パラメーター以前のタイムスタンプを持つレコードが Kafka Topic に書き込まれていない。
前提条件
サーバーレスリソースグループ を購入します。
Kafka および MaxCompute のデータソースを作成します。詳細については、「データソースの構成」をご参照ください。
リソースグループとデータソース間のネットワーク接続を確立します。詳細については、「ネットワーク接続ソリューションの概要」をご参照ください。
制限事項
ソースデータを MaxCompute の外部テーブルに同期することはサポートされていません。
操作手順
本トピックでは、オフライン同期タスクの構成方法を説明するために、新しい DataStudio インターフェイスを使用します。
ステップ 1:ノードの作成とタスクの構成
コードレス UI でノードを作成・構成する一般的な手順については、「コードレス UI の構成」ガイドをご参照ください。本トピックでは、これらの手順は繰り返しません。
ステップ 2:データソースと送信先の構成
Kafka のデータソースの構成
本トピックでは、Kafka から MaxCompute への単一テーブルのオフライン同期の方法について説明します。データソースは Kafka です。主な構成ポイントは以下のとおりです。
Kafka データソースの構成項目の一般的な説明については、「Kafka Reader」ドキュメントをご参照ください。以下は、本チュートリアルの参考構成です。
設定項目 | 主な構成 |
Topic | 同期対象の Kafka Topic を選択します。標準モードの DataWorks ワークスペースを使用する場合、開発環境および本番環境の両方の Kafka クラスターに、同じ名前の Topic が存在している必要があります。「Topic」には、この Topic を選択します。 説明 以下の場合は:
|
Consumer Group ID | 業務要件に基づき、Kafka クラスターに対して一意の ID を入力します。これにより、Kafka クラスター側での統計およびモニタリングが可能になります。 |
Read Start Offset、Start Time、Read End Offset、End Time | Read Start Offset および Read End Offset の両方を「特定の時刻」に設定します。Start Time および End Time には、それぞれスケジューリングパラメーター これらのパラメーターは、データ同期の開始位置および終了位置を定義します。この構成では、同期は |
Time Zone | 空欄のままにするか、DataWorks が配置されているリージョンのデフォルトサーバー時刻帯を選択できます。 説明 Alibaba Cloud のテクニカルサポートに連絡してスケジューリング時刻帯を変更済みの場合は、ここで変更後の時刻帯を選択できます。 |
Key Type、Value Type、Encoding | Kafka Topic 内の実際のレコードに基づき選択します。 |
同期完了ポリシー | 同期終了ポリシーとして、条件を満たす場合のみ「1 分間新しいデータがない」を選択します。それ以外の場合は、「指定された終了位置で停止」を選択します。
|
高度な構成 | デフォルト設定のままとします。 |
MaxCompute のデータ送信先の構成
本トピックでは、Kafka から MaxCompute への単一テーブルのオフライン同期の方法について説明します。データ送信先はテーブルです。主な構成ポイントは以下のとおりです。
以下の表で説明していないパラメーターについては、デフォルト設定のままとします。
設定項目 | 主な構成 |
データソース | 前工程で選択した MaxCompute データソースが表示されます。標準モードの DataWorks ワークスペースを使用する場合、開発プロジェクトおよび本番プロジェクトの名称が表示されます。 |
Table | 同期先の MaxCompute テーブルを選択します。標準モードの DataWorks ワークスペースを使用する場合、開発環境および本番環境の両方に、同じ名前およびスキーマを持つ MaxCompute テーブルが存在することを確認してください。 また、「ターゲットテーブルスキーマの生成」をクリックすると、システムが自動的に受信用テーブルを作成します。テーブル作成文は手動で調整可能です。 説明 以下の条件が該当する場合:
|
Partition | テーブルがパーティションテーブルの場合、パーティションキー列の値を入力できます。
|
ステップ 3:フィールドマッピングの構成
データソースおよび送信先を選択した後、リーダーとライター間のカラムマッピングを指定する必要があります。「同名フィールドのマップ」、「同一行のフィールドをマップ」、「マッピングのクリア」、または「マッピングの手動編集」を選択できます。
Kafka 側には、6 つのデフォルトフィールドがあります。
フィールド名
説明
__key__
Kafka レコードのキー。
__value__
Kafka レコードの値。
__partition__
Kafka レコードが配置されているパーティション番号。パーティション番号は 0 から始まります。
__headers__
Kafka レコードのヘッダー。
__offset__
Kafka レコードのパーティション内におけるオフセット。オフセットは 0 から始まります。
__timestamp__
Kafka レコードの 13 桁整数ミリ秒タイムスタンプ。
Kafka 側のフィールドに対してカスタム JSON 解析を構成できます。
.(サブフィールドを取得)および[](配列要素を取得)の構文を使用することで、JSON 形式の Kafka レコードの value フィールドから内容を取得できます。重要JSON フィールド名に「.」文字が含まれる場合、フィールド定義によってそのフィールド値を取得できません。これは、フィールド定義構文において曖昧さが生じるためです。
以下は、Kafka 内の JSON 形式レコードのデータ値の例です。
{ "a": { "a1": "hello" }, "b": "world", "c":[ "xxxxxxx", "yyyyyyy" ], "d":[ { "AA":"this", "BB":"is_data" }, { "AA":"that", "BB":"is_also_data" } ], "a.b": "unreachable" }a1 のデータ(
"hello")を同期するには、Kafka 側にフィールドa.a1を追加します。b のデータ(
"world")を同期するには、Kafka 側にフィールドbを追加します。c のデータ(
"yyyyyyy")を同期するには、Kafka 側にフィールドc[1]を追加します。AA のデータ(
"this")を同期するには、Kafka 側にフィールドd[0].AAを追加します。Kafka 側のフィールドを
a.bとして定義した場合、データ"unreachable"を同期できません。
ソースまたは送信先テーブルのフィールドをマッピングから除外できます。マッピングされていないソースフィールドは同期インスタンスによって読み取られず、マッピングされていない送信先フィールドには NULL が書き込まれます。
1 つのソースフィールドを複数の送信先フィールドにマッピングすることはできません。また、1 つの送信先フィールドを複数のソースフィールドからマッピングすることもできません。
ステップ 4:高度なパラメーターの構成
タスク右側の「高度な構成」をクリックし、「最大予想同時実行数」および「不正データレコードの処理ポリシー」などのパラメーターを設定します。本チュートリアルでは、「不正データレコードの処理ポリシー」を「不正データレコードを無視」に設定し、その他のパラメーターはデフォルト値のままとします。詳細については、「コードレス UI の構成」をご参照ください。
ステップ 5:テストの構成と実行
オフライン同期ノードの編集ページ右側で、「Run Configuration」をクリックします。テスト実行用に「リソースグループ」および「スクリプトパラメーター」を設定し、上部ツールバーの「実行」をクリックして、同期リンクが正常に動作するかをテストします。
左側のナビゲーションウィンドウで
アイコンをクリックし、次に [個人ディレクトリ] の右側にある
アイコンをクリックすると、.sql拡張子のファイルを作成できます。 次に、以下の SQL クエリを実行して、宛先テーブルのデータが期待どおりであるかを確認できます。説明この方法でデータを照会するには、送信先 MaxCompute プロジェクトを DataWorks に計算リソースとしてアタッチ する必要があります。
.sqlファイルの編集ページで、右側の「Run Configuration」をクリックします。データソースの「タイプ」、「計算リソース」、および「リソースグループ」を指定し、上部ツールバーの「実行」をクリックします。
SELECT * FROM <MaxCompute_destination_table_name> WHERE pt=<specified_partition> LIMIT 20;
ステップ 6:スケジューリングの構成とタスクの公開
オフライン同期タスク右側の「スケジューリング」をクリックします。「スケジューリング構成」パラメーターを定期実行用に設定し、上部ツールバーの「公開」をクリックして公開パネルを開きます。画面上の指示に従って、タスクを公開 します。
データソースおよび送信先の構成時に、3 つのスケジューリングパラメーター(${startTime}、${endTime}、${partition})を使用しました。スケジューリング構成では、同期要件に応じて、これらのパラメーターの置き換えポリシーを指定する必要があります。以下に、いくつかの典型的なシナリオの構成例を示します。
典型的なシナリオ | 推奨構成 | シナリオの説明 |
同期タスクが 5 分ごとにスケジュールされる |
| 同期タスクが 2022-11-22 の 10:00 にスケジュールされる場合:
|
同期タスクが 1 時間ごとにスケジュールされる |
説明
| 同期タスクが 2022-11-22 の 10:05 にスケジュールされる場合:
|
同期タスクが 1 日ごとにスケジュールされる |
| 同期タスクが 2022-11-22 の 00:05 にスケジュールされる場合:
|
同期タスクが 1 週間ごとにスケジュールされる |
| 同期タスクが 2022-11-22 の 00:05 にスケジュールされる場合:
|
同期タスクが 1 ヶ月ごとにスケジュールされる |
| 同期タスクが 2022-11-22 の 00:05 にスケジュールされる場合:
|
目的の間隔に応じてスケジューリングサイクルを設定します。
典型的なシナリオ | 推奨構成 | シナリオの説明 |
同期タスクが 5 分ごとにスケジュールされる |
| 該当なし |
同期タスクが 1 時間ごとにスケジュールされる |
| 開始時刻を 00:00 より若干遅らせて(例:00:15)設定します。これにより、同期タスクインスタンスの開始前に、対象時間範囲のすべてのデータが Kafka Topic に書き込まれることを保証します。 |
同期タスクが 1 日ごとにスケジュールされる |
| スケジュール時刻を 00:00 より若干遅らせて(例:00:15)設定します。これにより、同期タスクインスタンスの開始前に、対象時間範囲のすべてのデータが Kafka Topic に書き込まれることを保証します。 |
同期タスクが 1 週間ごとにスケジュールされる |
| スケジュール時刻を 00:00 より若干遅らせて(例:00:15)設定します。これにより、同期タスクインスタンスの開始前に、対象時間範囲のすべてのデータが Kafka Topic に書き込まれることを保証します。 |
同期タスクが 1 ヶ月ごとにスケジュールされる |
| スケジュール時刻を 00:00 より若干遅らせて(例:00:15)設定します。これにより、同期タスクインスタンスの開始前に、対象時間範囲のすべてのデータが Kafka Topic に書き込まれることを保証します。 |
インスタンス開始後に、開始時刻以前または等しいタイムスタンプを持つレコードが Kafka Topic に書き込まれた場合、これらのレコードは読み取られない可能性があります。Kafka Topic へのデータ書き込みが遅延している場合や、タイムスタンプの順序が保たれていない場合、オフライン同期タスクでデータ損失が発生するリスクがあることにご注意ください。