このトピックでは、定期スケジュールを設定して Kafka から MaxCompute へ増分データを同期する方法について説明します。この例では、分、時間、または日単位でデータを同期し、MaxCompute の時間単位または日単位のパーティションテーブルに書き込む方法をカバーします。
注意事項
ご利用の Kafka のバージョンは 0.10.2 以降、かつ 2.2.x 以前である必要があります。Kafka でレコードのタイムスタンプを有効にし、レコードに正しいビジネスタイムスタンプが含まれている必要があります。
増分データ同期の開始後、開始時刻以前のタイムスタンプを持つレコードが Kafka Topic に書き込まれる可能性があります。これらのレコードは読み取られない場合があります。Kafka Topic へのデータ書き込みが遅延したり、タイムスタンプが順不同になったりする場合、オフライン同期タスクでデータが損失するリスクがあることにご注意ください。
Kafka 側の同期終了ポリシーについては、以下の条件を満たす場合にのみ [1分間新しいデータなし] を選択できます。そうでない場合、データ損失が発生する可能性があります。
Kafka Topic の一部またはすべてのパーティションに、10 分以上などの長期間にわたって新しいデータが書き込まれていない。
各定期インスタンスの開始後、終了時刻パラメーターより前のタイムスタンプを持つレコードが Kafka Topic に書き込まれない。
前提条件
Serverless リソースグループを購入します。
Kafka と MaxCompute のデータソースを作成します。詳細については、「データソースの設定」をご参照ください。
リソースグループとデータソース間のネットワーク接続を確立します。詳細については、「ネットワーク接続ソリューションの概要」をご参照ください。
制限事項
ソースデータを MaxCompute の外部テーブルに同期することはサポートされていません。
操作手順
このトピックでは、新しい DataStudio インターフェイスを使用して、オフライン同期タスクを設定する方法を説明します。
ステップ 1:ノードの作成とタスクの設定
コードレス UI でノードを作成および設定する一般的な手順については、「コードレス UI 設定」ガイドをご参照ください。このトピックでは、これらの手順は繰り返しません。
ステップ 2:データソースと宛先の設定
データソース (Kafka) の設定
このトピックでは、Kafka から MaxCompute への単一テーブルのオフライン同期を実行する方法について説明します。データソースは Kafka です。主要な設定項目は以下のとおりです。
Kafka データソースの設定項目に関する一般的な説明については、「Kafka Reader」ドキュメントをご参照ください。以下は、このチュートリアルのリファレンス設定です。
設定項目 | 主要な設定 |
Topic | 同期する Kafka Topic を選択します。標準モードの DataWorks ワークスペースを使用する場合、開発環境と本番環境の両方の Kafka クラスターに同じ名前の Topic が存在している必要があります。[Topic] には、この Topic を選択します。 説明 以下の場合:
|
コンシューマーグループ ID | ビジネスニーズに基づいて、Kafka クラスターの一意の ID を入力します。これは、Kafka クラスター側での統計およびモニタリングに役立ちます。 |
読み取り開始オフセット、開始時刻、読み取り終了オフセット、終了時刻 | [読み取り開始オフセット] と [読み取り終了オフセット] の両方を [指定時刻] に設定します。[開始時刻] と [終了時刻] を、スケジューリングパラメーターの これらのパラメーターは、データ同期の開始位置と終了位置を定義します。この設定は、同期が |
タイムゾーン | この項目は空のままにするか、DataWorks が配置されているリージョンのデフォルトのサーバータイムゾーンを選択できます。 説明 Alibaba Cloud のテクニカルサポートに連絡してスケジューリングタイムゾーンを変更した場合は、ここで変更後のタイムゾーンを選択できます。 |
キータイプ、値タイプ、エンコーディング | Kafka Topic の実際のレコードに基づいて選択します。 |
同期完了ポリシー | 同期終了ポリシーには、条件を満たす場合は [1 分間新規データなし] を選択し、それ以外の場合は [指定された終了位置で停止] を選択します。
|
詳細設定 | デフォルト設定のままにします。 |
データ宛先 (MaxCompute) の設定
このトピックでは、Kafka から MaxCompute への単一テーブルのオフライン同期を実行する方法について説明します。データ宛先はテーブルです。主要な設定項目は以下のとおりです。
次の表で説明されていないパラメーターについては、デフォルト設定のままにすることができます。
設定項目 | 主要な設定 |
データソース | 前のステップで選択した MaxCompute データソースが表示されます。標準モードの DataWorks ワークスペースを使用する場合、開発プロジェクトと本番プロジェクトの名前が表示されます。 |
テーブル | 同期先の MaxCompute テーブルを選択します。標準モードの DataWorks ワークスペースを使用する場合、開発環境と本番環境の両方に同じ名前とスキーマを持つ MaxCompute テーブルが存在することを確認してください。 [ターゲットテーブルスキーマの生成] をクリックすることもできます。システムが、データを受信するためのテーブルを自動的に作成します。テーブル作成文を手動で調整できます。 説明 以下の場合:
|
パーティション | テーブルがパーティションテーブルの場合、パーティションキー列の値を入力できます。
|
ステップ 3:フィールドマッピングの設定
データソースと宛先を選択した後、リーダーとライター間の列マッピングを指定する必要があります。[同じ名前のフィールドをマッピング]、[同じ行のフィールドをマッピング]、[マッピングをクリア]、または [マッピングを手動で編集] を選択できます。
Kafka 側には 6 つのデフォルトフィールドがあります。
フィールド名
説明
__key__
Kafka レコードのキー。
__value__
Kafka レコードの値。
__partition__
Kafka レコードが配置されているパーティション番号。パーティション番号は 0 から始まります。
__headers__
Kafka レコードのヘッダー。
__offset__
パーティション内での Kafka レコードのオフセット。オフセットは 0 から始まります。
__timestamp__
Kafka レコードの 13 桁の整数ミリ秒タイムスタンプ。
Kafka 側のフィールドに対してカスタム JSON 解析を設定できます。JSON 形式の Kafka レコードの値フィールドからコンテンツを取得するには、
. (サブフィールドを取得)と[] (配列要素を取得)の構文を使用します。重要JSON フィールド名に「.」文字が含まれている場合、フィールド定義の構文があいまいになるため、フィールドを定義してフィールド値を取得することはできません。
以下は、Kafka の JSON 形式レコードのデータ値の例です。
{ "a": { "a1": "hello" }, "b": "world", "c":[ "xxxxxxx", "yyyyyyy" ], "d":[ { "AA":"this", "BB":"is_data" }, { "AA":"that", "BB":"is_also_data" } ], "a.b": "unreachable" }a1 のデータ
"hello"を同期するには、Kafka 側にフィールドa.a1を追加します。b のデータ
"world"を同期するには、Kafka 側にフィールドbを追加します。c のデータ
"yyyyyyy"を同期するには、Kafka 側にフィールドc[1]を追加します。AA のデータ
"this"を同期するには、Kafka 側にフィールドd[0].AAを追加します。Kafka 側のフィールドが
a.bと定義されている場合、データ"unreachable"を同期することはできません。
ソースまたは宛先テーブルのフィールドは、マッピングから除外できます。同期インスタンスは、マッピングされていないソースフィールドを読み取りません。マッピングされていない宛先フィールドには NULL が書き込まれます。
1 つのソースフィールドを複数の宛先フィールドにマッピングすることはできません。また、1 つの宛先フィールドを複数のソースフィールドからマッピングすることもできません。
ステップ 4:詳細パラメーターの設定
タスクの右側にある[高度な設定]をクリックして、[予想される最大同時実行数]や[ダーティデータレコードのポリシー]などのパラメーターを設定します。このチュートリアルでは、[ダーティデータレコードのポリシー]を[ダーティデータレコードを無視]に設定し、その他のパラメーターはデフォルト値のままにします。詳細については、「コードレス UI 設定」をご参照ください。
ステップ 5:テストの設定と実行
オフライン同期ノード編集ページの右側で、[デバッグ設定] をクリックします。テスト実行用に [リソースグループ] と [スクリプトパラメーター] を設定します。次に、上部のツールバーで [実行] をクリックして、同期リンクが正常に実行されるかテストします。
左側のナビゲーションウィンドウで
アイコンをクリックし、次に [個人用ディレクトリ] の右側にある
アイコンをクリックすると、.sql拡張子のファイルを作成できます。その後、次の SQL クエリを実行して、宛先テーブルのデータが期待どおりであるかどうかを確認できます。説明この方法でデータをクエリするには、宛先の MaxCompute プロジェクトを計算リソースとして DataWorks にアタッチする必要があります。
.sqlファイルの編集ページで、右側の [デバッグ設定] をクリックし、データソースの [タイプ]、[コンピューティングリソース]、および [リソースグループ] を指定します。 次に、上部のツールバーで [実行] をクリックします。
SELECT * FROM <MaxCompute_destination_table_name> WHERE pt=<specified_partition> LIMIT 20;
ステップ 6:スケジューリングの設定とタスクの公開
オフライン同期タスクの右側にある[スケジューリング]をクリックします。 定期実行用にスケジューリング構成パラメーターを設定します。 次に、上部のツールバーにある[公開]をクリックして、公開パネルを開きます。 画面の指示に従って、タスクを公開します。
データソースと宛先を設定する際に、${startTime}、${endTime}、${partition} の 3 つのスケジューリングパラメーターを使用しました。スケジューリング設定では、同期のニーズに基づいてこれらのパラメーターの置換ポリシーを指定する必要があります。以下は、いくつかの典型的なシナリオの設定例です。
典型的なシナリオ | 推奨設定 | シナリオの説明 |
同期タスクは 5 分ごとにスケジュールされる |
| 同期タスクが 2022-11-22 10:00 に開始するようにスケジュールされている場合:
|
同期タスクは 1 時間ごとにスケジュールされる |
説明
| 同期タスクが 2022-11-22 10:05 に開始するようにスケジュールされている場合:
|
同期タスクは毎日スケジュールされる |
| 同期タスクが 2022-11-22 00:05 に開始するようにスケジュールされている場合:
|
同期タスクは毎週スケジュールされる |
| 同期タスクが 2022-11-22 00:05 に開始するようにスケジュールされている場合:
|
同期タスクは毎月スケジュールされる |
| 同期タスクが 2022-11-22 00:05 に開始するようにスケジュールされている場合:
|
希望する間隔に基づいてスケジューリング周期を設定します。
典型的なシナリオ | 推奨設定 | シナリオの説明 |
同期タスクは 5 分ごとにスケジュールされる |
| なし |
同期タスクは 1 時間ごとにスケジュールされる |
| 開始時刻を 00:00 より少し遅い時刻 (例:00:15) に設定します。この方法により、同期タスクインスタンスが開始する前に、対象期間のすべてのデータが Kafka Topic に書き込まれることが保証されます。 |
同期タスクは毎日スケジュールされる |
| スケジューリング時刻を 00:00 より少し遅い時刻 (例:00:15) に設定します。この方法により、同期タスクインスタンスが開始する前に、対象期間のすべてのデータが Kafka Topic に書き込まれることが保証されます。 |
同期タスクは毎週スケジュールされる |
| スケジューリング時刻を 00:00 より少し遅い時刻 (例:00:15) に設定します。この方法により、同期タスクインスタンスが開始する前に、対象期間のすべてのデータが Kafka Topic に書き込まれることが保証されます。 |
同期タスクは毎月スケジュールされる |
| スケジューリング時刻を 00:00 より少し遅い時刻 (例:00:15) に設定します。この方法により、同期タスクインスタンスが開始する前に、対象期間のすべてのデータが Kafka Topic に書き込まれることが保証されます。 |
インスタンスの開始後に開始時刻以前のタイムスタンプを持つレコードが Kafka Topic に書き込まれた場合、これらのレコードは読み取られない可能性があります。Kafka Topic へのデータ書き込みが遅延したり、タイムスタンプが順不同になったりする場合、オフライン同期タスクでデータが損失するリスクがあることにご注意ください。