このトピックでは、Data Integration のコードレス UI を使用して、定期的にスケジュールされたバッチ同期タスクを設定する方法を説明します。
前提条件
- バッチ同期タスクを作成する前に、ソースと宛先のデータソースを設定します。そうすることで、タスク設定時にデータソースを名前で選択できます。説明 データソースの詳細については、「データソースの概要」をご参照ください。
- ビジネス要件を満たすデータ統合専用リソースグループを購入してください。詳細については、「Data Integration 専用リソースグループの使用」をご参照ください。
- データ統合専用リソースグループとデータソース間のネットワーク接続を確立してください。詳細については、「ネットワーク接続ソリューション」をご参照ください。
DataStudio への移動
- DataWorks コンソール にログインします。
- 左側のナビゲーションペインで、ワークスペースリスト をクリックします。
- ワークスペースのリージョンを選択し、対象のワークスペースを見つけて、Data Studio 列の [Data Studio] をクリックします。
操作手順
- ステップ 1: バッチ同期ノードの作成
- ステップ 2: バッチ同期タスクの設定
- ネットワーク接続の設定
- 同期するオブジェクトの選択
- フィールドマッピングの設定
- チャネルコントロールの設定
- スケジューリングプロパティの設定
- ステップ 3: タスクのコミットとデプロイ
ステップ 1: バッチ同期ノードの作成
- ワークフローを作成します。詳細については、「ワークフローの作成」をご参照ください。
- バッチ同期ノードを作成します。
バッチ同期ノードは、次のいずれかの方法で作成します。
- 方法 1:ワークフローを展開し、データ統合 を右クリックして、 を選択します。
- 方法 2:ワークフロー名をダブルクリックし、データ統合 フォルダーから Batch Synchronization ノードを右側のワークフローエディターキャンバスにドラッグします。
- 表示されるダイアログボックスで、バッチ同期ノードのパラメーターを設定します。
ステップ 2: バッチ同期タスクの設定
- ネットワーク接続を設定します。
バッチ同期タスクのソースと宛先、およびタスクを実行するリソースグループを選択し、接続をテストします。
- Data Integration は、シャーディングされたソースデータベースやテーブルから単一の宛先テーブルへのデータ同期もサポートしています。詳細については、「シャーディング同期」をご参照ください。
- データソースとリソースグループ間のネットワーク接続に失敗した場合は、画面の指示に従うか、「ネットワーク接続ソリューション」を参照して、ネットワーク接続を設定してください。
重要 設定オプションはプラグインによって異なります。以下では、一般的な設定を例に説明します。特定のプラグインが設定をサポートしているかどうか、およびその設定方法については、各プラグインのドキュメントをご参照ください。詳細については、「サポートされているデータソースと Reader/Writer プラグイン」をご参照ください。 - Next step をクリックして、同期タスクを設定します。
- 同期するオブジェクトを選択します。
データソース選択エリアで、ソースと宛先のテーブルを設定し、同期範囲を指定します。左側の [ソース] エリアで、データソースタイプ (例:MySQL)、データベース、テーブルを選択します。右側の [宛先] エリアで、宛先データソース (例:MaxCompute) と宛先テーブルを選択します。次に、[クリーンアップルール] (例:[書き込み前に既存データをクリーンアップ (Insert Overwrite)]) と [空の文字列を Null として扱う] オプションを設定します。[ターゲットテーブルの生成] をクリックすると、宛先テーブルを迅速に作成できます。
- Reader
アクション 説明 同期範囲の設定 - [データフィルター] テキストボックスでフィルター条件を指定した場合、同期タスクはフィルター条件を満たすデータのみを同期します。フィルター条件でスケジューリングパラメーターを使用して、タスクのスケジューリング時刻に応じてフィルター条件を動的に変更することもできます。これにより、増分データ同期が実現できます。増分データ同期の設定方法はプラグインによって異なります。詳細については、「シナリオ:増分データ用のバッチ同期タスクの設定」をご参照ください。説明
- [次のステップ] をクリックしてスケジューリングプロパティを設定する際に、データフィルターと宛先テーブルの設定で定義された変数に値を割り当てることができます。これにより、増分データや完全なデータを宛先テーブルの特定の時間ベースのパーティションに書き込むことができます。スケジューリングパラメーターの使用方法の詳細については、「サポートされているスケジューリングパラメーターの形式」をご参照ください。
- 増分同期フィルター条件の構文は、標準的なデータベース構文に似ています。同期中、バッチ同期タスクは完全な SQL ステートメントを連結して、ソースからデータを抽出します。
- データフィルター条件を指定しない場合、タスクはデフォルトでテーブル内のすべてのデータを同期します。
リレーショナルデータベースのシャードキーの設定 データを分割するためのソースフィールドを指定します。同期タスクの実行時に、このキーに基づいてデータが複数のサブタスクに分割され、並列でのバッチデータ読み取りが可能になります。説明- プライマリキーは通常均等に分散されているため、テーブルのプライマリキーを splitPk として使用してください。これにより、結果として得られるシャードでのデータホットスポットを防ぐことができます。
- 現在、splitPk は整数型のデータ分割のみをサポートしています。文字列、浮動小数点数、日付、その他のデータ型はサポートしていません。サポートされていないデータ型を指定した場合、splitPk は無視され、タスクは単一チャネルで同期を実行します。
- splitPk を指定しないか、空のままにした場合、タスクは単一チャネルでデータを同期します。
- すべてのプラグインがタスク分割ロジックを設定するためのシャードキーをサポートしているわけではありません。詳細については、ご使用のプラグインのドキュメント「サポートされているデータソースと Reader/Writer プラグイン」をご参照ください。
- [データフィルター] テキストボックスでフィルター条件を指定した場合、同期タスクはフィルター条件を満たすデータのみを同期します。フィルター条件でスケジューリングパラメーターを使用して、タスクのスケジューリング時刻に応じてフィルター条件を動的に変更することもできます。これにより、増分データ同期が実現できます。増分データ同期の設定方法はプラグインによって異なります。詳細については、「シナリオ:増分データ用のバッチ同期タスクの設定」をご参照ください。
- Writer
アクション 説明 同期前後の SQL ステートメントの設定 一部のデータソースでは、データが書き込まれる前 (同期前) またはデータが書き込まれた後 (同期後) に、宛先で SQL ステートメントを実行できます。 例:MySQL Writer は preSql と postSql をサポートしています。MySQL にデータが書き込まれる前後に MySQL コマンドを実行できます。たとえば、MySQL Writer の Prepare statement before import (preSql) 設定で、コマンド
truncate table tablenameを入力して、新しいデータが書き込まれる前にテーブルから古いデータをクリアできます。競合時の書き込みモードの定義 パスやプライマリキーの競合など、書き込み競合の処理方法を指定します。利用可能な設定は、データソースと Writer プラグインの機能によって異なります。設定の詳細については、ご使用の Writer プラグインのドキュメントをご参照ください。
- Reader
- フィールドマッピングを設定します。
フィールドマッピングを設定すると、タスクはマッピングに基づいてソースフィールドから対応する宛先フィールドにデータを書き込みます。
同期中にソースと宛先のフィールド間でデータ型の不一致があると、ダーティデータが発生し、書き込みに失敗する可能性があります。Channel ステップでダーティデータに対する許容度を設定できます。
フィールドは名前または位置でマッピングできます。また、次の操作も実行できます:説明 ソースフィールドが宛先フィールドにマッピングされていない場合、そのデータは同期されません。- 宛先フィールドに値を割り当てる:Add a row をクリックして、'123' や '${variable_name}' などの定数や変数を宛先テーブルに追加します。説明 [次のステップ] をクリックしてスケジューリングプロパティを設定する際に、ここで定義された変数に値を割り当てることができます。スケジューリングパラメーターの使用方法の詳細については、「サポートされているスケジューリングパラメーターの形式」をご参照ください。
- ソースフィールドを編集する:データ型の横にある
アイコンをクリックして、次の操作を実行します:- ソースデータベースでサポートされている関数を使用してフィールドを処理します。たとえば、
Max(id)を使用して、最大 ID を持つレコードのみを同期します。 - マッピングプロセス中に一部のフィールドが自動的に取得されなかった場合は、ソースフィールドを手動で編集してください。
説明 MaxCompute Reader は関数をサポートしていません。 - ソースデータベースでサポートされている関数を使用してフィールドを処理します。たとえば、
- 宛先フィールドに値を割り当てる:Add a row をクリックして、'123' や '${variable_name}' などの定数や変数を宛先テーブルに追加します。
- チャネルコントロールを設定します。
チャネル設定を構成して、データ同期プロセスのさまざまなプロパティを制御できます。
パラメーター 説明 [Expected Maximum Concurrency] ソースからの読み取り、または宛先への書き込みにおける最大並列スレッド数です。 説明 リソースの制約やその他の要因により、実際の同時セッション数が指定された値に達しない場合があります。デバッグリソースグループの料金は、実際の同時セッション数に基づきます。詳細については、「パフォーマンスメトリック」をご参照ください。[Bandwidth Throttling] データ同期レートです。 - レート制限:同期レートを制限して、ソースデータベースを過剰な負荷から保護できます。最小レートは 1 MB/s です。
- レート制限なし:タスクは、ハードウェア環境と並列設定で許可される最大速度で実行されます。
説明 このトラフィックメトリックは Data Integration によって測定されるものであり、実際のネットワークインターフェイスカード (NIC) のトラフィックを反映していない場合があります。通常、NIC トラフィックはチャネルトラフィックの 1~2 倍です。実際の差は、データストレージシステムのシリアル化方法によって異なります。[エラーレコード数 (ダーティデータ制御)] ダーティデータのしきい値と、タスクへの影響を設定します。 重要 大量のダーティデータは、全体の同期速度を低下させる可能性があります。- 設定されていない場合、タスクはデフォルトでダーティデータを許可し、タスクの実行には影響しません。
- 0 に設定した場合、ダーティデータは許可されません。ダーティデータが生成されるとタスクは失敗します。
- ダーティデータを許可し、しきい値を設定した場合:
- ダーティデータレコード数がしきい値内であれば、タスクはダーティデータを無視し (宛先には書き込まれません)、実行を継続します。
- ダーティデータレコード数がしきい値を超えた場合、タスクは失敗します。
説明 ダーティデータとは何ですか?ダーティデータとは、ビジネスにとって無意味なデータ、不正な形式のデータ、または同期中に問題を引き起こすデータのことです。Data Integration は、宛先データソースへの書き込み時にエラーが発生した場合、そのレコードをダーティデータと見なします。書き込みに失敗したレコードはすべてダーティデータとして分類されます。たとえば、ソースから VARCHAR 値を宛先の INT 列に書き込もうとすると、変換エラーが発生し、データの書き込みに失敗する可能性があります。このレコードはダーティデータと見なされます。同期タスクを設定する際に、ダーティデータを許可するかどうかを制御し、ダーティデータレコード数の上限を設定できます。上限を超えると、タスクは失敗します。
[Distributed Execution] タスクを分散モードで実行するかどうかを指定します。 - 有効:分散モードでは、タスクをスライスに分割し、複数のノードで同時に実行します。これにより、同期速度が実行クラスターのサイズに応じて水平にスケールし、単一マシンのボトルネックを克服できます。
- 無効:設定された並列数は単一マシンのスレッドにのみ適用され、複数マシンのコンピューティングは活用されません。
重要- 専用リソースグループにマシンが 1 台しかない場合は、複数マシンの機能を利用できないため、分散モードは使用しないでください。
- 単一マシンで既に速度要件を満たしている場合は、シンプルにするために単一マシンモードを使用してください。
- 分散処理機能は、同時セッション数が 8 以上の場合にのみ有効にできます。
- 分散モードのサポートはデータソースによって異なります。詳細については、ご使用のプラグインのドキュメント「サポートされているデータソースと Reader/Writer プラグイン」をご参照ください。
説明 上記の設定は、ソースデータソースのパフォーマンスやネットワーク環境とともに、全体の同期速度に影響します。速度調整の詳細については、「バッチ同期タスクの高速化または調整」をご参照ください。
- 同期するオブジェクトを選択します。
- Next step をクリックして、スケジューリングプロパティを設定します。
定期的にスケジュールされたバッチ同期タスクの場合、その自動スケジューリングプロパティを設定する必要があります。スケジューリングパラメーターの使用方法については、「Data Integration でのスケジューリングパラメーターの使用」をご参照ください。
- ノードスケジューリングプロパティの設定:スケジューリングパラメーターを、上記の設定で使用された変数に値として割り当てます。定数または変数を割り当てることができます。
- 時間プロパティの設定:本番環境でのタスクの定期的なスケジュールを設定します。スケジューリング設定の時間プロパティセクションで、インスタンス生成方法、スケジューリングタイプ、スケジューリングサイクルなどのプロパティを設定できます。
- リソースプロパティの設定:スケジューリングリソースグループを選択して、タスクを Data Integration の実行リソースグループにディスパッチします。スケジューリング設定のリソースプロパティセクションで、スケジュールされたタスクの実行に使用するリソースグループを選択できます。説明 スケジューリングリソースグループは、Data Integration のバッチタスクを実行リソースグループにディスパッチし、スケジューリング料金が発生します。タスクディスパッチメカニズムの詳細については、「DataWorks のリソースグループ」をご参照ください。
- Complete configuration をクリックします。
ステップ 3: タスクのコミットとデプロイ
タスクを定期的に実行する必要がある場合は、本番環境にデプロイする必要があります。タスクのデプロイの詳細については、「タスクのデプロイ」をご参照ください。