このチュートリアルでは、タイムウィンドウ化されたレコードを Kafka トピックから読み取り、パーティション化された MaxCompute テーブルに書き込む定期オフライン同期タスクを設定する方法について説明します。このタスクは、分、時間、日、週、月など、任意のスケジュールで実行できます。
前提条件
開始する前に、以下を確認してください。
-
Kafka および MaxCompute データソースが構成済みであること。詳細については、「データソース構成」をご参照ください。
-
リソースグループと両方のデータソース間のネットワーク接続性。詳細については、「ネットワーク接続ソリューションの概要」をご参照ください。
制限事項
-
ソースデータを MaxCompute 外部テーブルに同期することはサポートされていません。
-
Kafka のバージョンは 0.10.2 以降、2.2.x 以前である必要があります。
-
Kafka はレコードのタイムスタンプが有効になっており、各レコードは正しい業務タイムスタンプを保持している必要があります。
データ損失のリスク
定期インスタンスの開始後に、開始時刻以前または等しいタイムスタンプを持つレコードが Kafka トピックに到着する場合があります。これらのレコードは読み取られない可能性があります。Kafka トピックへの書き込みが遅延したり、タイムスタンプが順不同であったりすると、オフライン同期タスクでデータ損失が発生する可能性があります。
同期タスクの構成
このチュートリアルでは、新しい DataStudio インターフェイスを使用します。
ステップ 1: ノードの作成
コードレス UI 構成ガイドに従って、オフライン同期ノードを作成および構成します。このチュートリアルでは、Kafka から MaxCompute への同期に特化した構成の詳細に焦点を当てています。
ステップ 2: データソースと送信先の構成
データソース (Kafka) の構成
このチュートリアルでは、Kafka から MaxCompute への単一テーブルのオフライン同期について説明します。すべての Kafka Reader 構成オプションの完全なリファレンスについては、Kafka Reader ドキュメントをご参照ください。
以下の表は、このチュートリアルにおける主要な設定項目を網羅しています。
| 構成項目 | 主要構成 |
|---|---|
| [Topic] | 同期する Kafka トピックを選択します。標準モードの DataWorks ワークスペースでは、開発環境と本番環境の両方の Kafka クラスターに、同じ名前のトピックが存在する必要があります。 説明
開発環境にトピックが存在しない場合、[Topic] ドロップダウンリストに表示されません。本番環境にトピックが存在しない場合、タスクが公開された後に定期スケジュールが失敗します。これは、タスクが同期対象のトピックを特定できないためです。 |
| [Consumer Group ID] | Kafka クラスターの一意の ID を入力します。この ID は、Kafka 側での統計とモニタリングに使用されます。 |
| [読み取り開始オフセット] と [開始時刻] | [読み取り開始オフセット] を [特定の時刻] に設定し、[開始時刻] を ${startTime} に設定します。これにより、同期の開始位置が定義されます。${startTime} 以降のレコードが含まれます。 |
| [読み取り終了オフセット] と [終了時刻] | [読み取り終了オフセット] を [特定の時刻] に設定し、[終了時刻] を ${endTime} に設定します。${endTime} (含まない) までのレコードが同期されます。${startTime} と ${endTime} は両方とも、設定したスケジューリングパラメーター式に基づいて、ランタイム時に具体的なタイムスタンプに置き換えられます。 |
| [タイムゾーン] | 空白のままにすると、DataWorks リージョンのデフォルトのサーバータイムゾーンが使用されます。Alibaba Cloud サポートによってスケジューリングタイムゾーンを変更した場合は、ここでそのタイムゾーンを選択します。 |
| [キータイプ]、[値タイプ]、[エンコーディング] | Kafka トピック内の実際のレコードに基づいて選択します。 |
| [同期完了ポリシー] | 同期タスクが読み取りを停止するタイミングを制御します。ご利用の Kafka トラフィックパターンに基づいて選択してください。以下の比較をご参照ください。 |
| [詳細構成] | デフォルトのままにします。 |
同期完了ポリシーの選択
2 つのオプションは異なる動作をし、異なるシナリオに適しています。
| 1 分間新しいデータなし | 指定された終了位置で停止 | |
|---|---|---|
| 仕組み | すべてのパーティションで 1 分間連続して新しいレコードが到着しない場合、タスクは停止します。 | タスクは、${endTime} に対応するオフセットに到達するとすぐに停止します。 |
| 適用範囲 | 以下の両方が該当する場合:(1) 一部またはすべてのパーティションが定期的に長時間(例:10 分以上)サイレンス状態になること、および (2) 定期インスタンスが開始された後、トピックに ${endTime} よりも古いタイムスタンプを持つレコードが書き込まれないこと。 |
上記 (1) または (2) のいずれかを保証できない場合。こちらがより安全なデフォルト設定です。 |
| 誤用時のリスク | 1 分間のサイレンスが発生した時点で遅れて到着するレコードがまだ書き込まれている場合、データ損失が発生する可能性があります。 | なし — タスクは確実に設定された終了位置で停止します。 |
データ送信先 (MaxCompute) の構成
| 設定項目 | 主要な構成 |
|---|---|
| [Data Source] | 前のステップで選択した MaxCompute データソースが表示されます。標準モードのワークスペースでは、開発および本番プロジェクト名が表示されます。 |
| テーブル | 送信先 MaxCompute テーブルを選択します。標準モードのワークスペースでは、両方の環境に同じ名前とスキーマを持つテーブルが存在する必要があります。または、[Generate Target Table Schema] をクリックしてシステムにテーブルを自動的に作成させ、必要に応じて CREATE TABLE 文を調整します。 説明
開発環境にテーブルがない場合、ドロップダウンリストには表示されません。本番環境にテーブルがない場合、タスクはランタイム時に失敗します。環境間でスキーマが異なる場合、カラムマッピングによって不正なデータ書き込みが発生する可能性があります。 |
| [Partition] | パーティションテーブルの場合、パーティションキー値を入力します。例えば、ds=20220101 のような静的値、またはランタイム時に自動的に置き換えられる ds=${partition} のようなスケジューリングパラメーターを使用します。 |
ステップ 3: フィールドマッピングの構成
ソースと送信先を選択した後、Kafka リーダーと MaxCompute ライターの間で列をマップします。必要に応じて、[同じ名前のフィールドをマップ]、[同一行のフィールドをマップ]、[マッピングのクリア]、または [マッピングの手動編集] を使用します。
デフォルトの Kafka フィールド
Kafka は、MaxCompute カラムに直接マッピングできる 6 つの組み込みフィールドを公開しています。
| フィールド名 | 説明 |
|---|---|
__key__ |
Kafka レコードのキー。 |
__value__ |
Kafka レコードの値。 |
__partition__ |
レコードが配置されているパーティション番号。0 から始まります。 |
__headers__ |
Kafka レコードのヘッダー。 |
__offset__ |
パーティション内のレコードのオフセット。0 から始まります。 |
__timestamp__ |
13 桁のミリ秒整数としてのレコードのタイムスタンプ。 |
JSON フィールドの解析
JSON 形式の Kafka 値の場合、カスタムフィールド定義を追加するには、サブフィールドにアクセスするために . を使用し、配列要素にアクセスするために [] を使用します。
このレコード値の例を次に示します。
{
"a": {
"a1": "hello"
},
"b": "world",
"c": [
"xxxxxxx",
"yyyyyyy"
],
"d": [
{ "AA": "this", "BB": "is_data" },
{ "AA": "that", "BB": "is_also_data" }
],
"a.b": "unreachable"
}
| フィールド定義 | 取得値 | 備考 |
|---|---|---|
a.a1 |
"hello" |
サブフィールドアクセスは . |
b |
"world" |
トップレベルフィールド |
c[1] |
"yyyyyyy" |
配列要素アクセスは [] |
d[0].AA |
"this" |
複合アクセス |
a.b |
*(取得できません)* | . を含むフィールド名は解析できません。. はサブフィールド区切り文字として解釈され、曖昧さの原因となります。 |
マッピングルール
-
マッピングされていないソースフィールドは、同期インスタンスによって読み取られません。
-
マッピングされていない送信先フィールドには NULL が書き込まれます。
-
1 つのソースフィールドを複数の送信先フィールドにマッピングすることはできません。
-
1 つの送信先フィールドを複数のソースフィールドからマッピングすることはできません。
ステップ 4: 詳細パラメーターの構成
タスクの右側にある [詳細設定] をクリックします。 このチュートリアルでは、[ダーティデータレコードのポリシー] を [ダーティデータレコードを無視] に設定し、他のすべてのパラメーターはデフォルトのままにします。 パラメーターの詳細については、「コードレス UI 構成」をご参照ください。
ステップ 5: 同期のテスト
-
編集ページの右側にある[実行設定]をクリックします。テスト実行の[リソースグループ]と[スクリプトパラメータ]を設定し、上部のツールバーにある[実行]をクリックします。
-
テスト実行が完了したら、送信先テーブル内のデータを確認します。左側のナビゲーションウィンドウで、
.sql拡張子のファイルを作成し、次のクエリを実行します。説明-
この方法でデータをクエリするには、送信先 MaxCompute プロジェクトをコンピューティングリソースとして DataWorks にアタッチします。
-
.sqlファイル編集ページで、右側にある [実行構成] をクリックし、[コンピューティングリソース] と [リソースグループ] を指定してから、[実行] をクリックします。
SELECT * FROM <MaxCompute_destination_table_name> WHERE pt=<specified_partition> LIMIT 20; -
ステップ 6: スケジューリングの構成と公開
タスクの右側にある[スケジューリング]をクリックして、定期的な実行のスケジューリング設定を行い、次に[公開]をクリックしてタスクを公開します。
このチュートリアルで使用される 3 つのスケジューリングパラメーター ${startTime}、${endTime}、および ${partition} は互いに関連しています。${startTime} と ${endTime} に注入される値は各インスタンスの Kafka タイムウィンドウを制御し、${partition} はデータを受信する MaxCompute パーティションを決定します。スケジューリングサイクルに基づいて、これら 3 つをまとめて設定してください。
以下の例は、最も一般的なスケジューリングパターンを網羅しています。
スケジューリングパラメーター式
| スケジューリングサイクル | startTime 式 |
endTime 式 |
partition 式 |
|---|---|---|---|
| 5 分ごと | $[yyyymmddhh24mi-8/24/60]00 |
$[yyyymmddhh24mi-3/24/60]00 |
$[yyyymmddhh24mi-8/24/60] |
| 1 時間ごと | $[yyyymmddhh24-1/24]0000 |
$[yyyymmddhh24]0000 |
$[yyyymmddhh24] |
| 2 時間ごと | $[yyyymmddhh24-2/24]0000 |
$[yyyymmddhh24]0000 |
$[yyyymmddhh24] |
| 3 時間ごと | $[yyyymmddhh24-3/24]0000 |
$[yyyymmddhh24]0000 |
$[yyyymmddhh24] |
| 毎日 | $[yyyymmdd-1]000000 |
$[yyyymmdd]000000 |
$[yyyymmdd-1] |
| 毎週 | $[yyyymmdd-7]000000 |
$[yyyymmdd]000000 |
$[yyyymmdd-1] |
| 毎月 | $[add_months(yyyymmdd,-1)]000000 |
$[yyyymmdd]000000 |
$[yyyymmdd-1] |
式の仕組み (時間ごとの例)
2022-11-22 の 10:05 に実行がスケジュールされたタスクの場合:
-
startTimeは20221122090000に解決されます — 09:00 からレコードを読み取ります(09:00 を含む) -
endTimeは20221122100000に解決されます — 10:00 で停止(10:00 は含まない) -
partitionは2022112210に解決され、MaxCompute の該当パーティションに書き込みます。
5 分の例
2022-11-22 の 10:00 に実行がスケジュールされたタスクの場合:
-
startTimeは20221122095200となります — 09:52 から (09:52 を含む) レコードを読み取ります。 -
endTimeは20221122095700に相当します — 09:57 で終了(排他) -
partitionは202211220952になります。
endTime は、インスタンスの実行時間の 3 分前に設定されます。このバッファーにより、インスタンスが読み取りを開始する前に、タイムウィンドウのすべてのレコードが Kafka に書き込まれることが保証されます。
スケジューリングサイクル設定
| スケジューリングサイクル | サイクル | 開始時刻 | 間隔 | 終了時刻 | 曜日/日付 |
|---|---|---|---|---|---|
| 5 分ごと | 分 | 00:00 | 5 分 | 23:59 | — |
| 1 時間ごと | 時間 | 00:15 | 1 時間 | 23:59 | — |
| 毎日 | 日 | 00:15 | — | — | — |
| 毎週 | 週 | 00:15 | — | — | 月曜日 |
| 毎月 | 月 | 00:15 | — | — | 毎月 1 日 |
時間ごと、日ごと、週ごと、月ごとのスケジュールでは、開始時刻を深夜直後 (たとえば、00:00 ではなく 00:15) に設定します。これにより、同期インスタンスが開始する前に Kafka がレコードの書き込みを完了する時間が与えられ、データ損失のリスクが軽減されます。
定期インスタンスがすでに開始された後に、タイムスタンプが ${startTime} 以下のレコードが Kafka トピックに書き込まれた場合、それらのレコードは読み取られない可能性があります。書き込みの遅延や順不同のタイムスタンプは、データ損失のリスクを高めます。
次のステップ
-
構成オプションの完全なリストについては、「Kafka Reader リファレンス」をご参照ください。
-
依存関係や再実行ポリシーなどの高度なスケジューリングオプションについては、「スケジューリング構成」をご参照ください。