本トピックでは、Data Integration のスクリプトモードを使用して、バッチ同期タスクを定期的に実行するように設定する方法について説明します。
前提条件
- バッチ同期タスクを設定する前に、ソースデータソースと送信先データソースを設定する必要があります。これにより、タスク設定時に名前でデータソースを選択できます。サポートされているデータソースと、その Reader および Writer プラグインの詳細については、「サポートされているデータソースと Reader および Writer プラグイン」をご参照ください。説明 データソースの詳細については、「データソースの概要」をご参照ください。
- データ統合専用リソースグループを購入済みであること。詳細については、「データ統合専用リソースグループの使用」をご参照ください。
- データ統合専用リソースグループとデータソース間のネットワーク接続が確立されていること。詳細については、「ネットワーク接続ソリューション」をご参照ください。
DataStudio ページへの移動
- DataWorks コンソールにログインします。
- 左側のナビゲーションウィンドウで、ワークスペースリスト をクリックします。
- ワークスペースが配置されているリージョンを選択し、対象のワークスペースを見つけて、[操作] 列の Data Studio をクリックします。
操作手順
- ステップ 1:バッチ同期ノードの作成
- ステップ 2:バッチ同期タスクの設定
- 同期ネットワークリンクの設定
- スクリプトモードへの切り替えとテンプレートのインポート
- スクリプトを編集して、データソース、送信先、転送レート制限、ダーティデータ処理ルールを設定
- スケジューリングプロパティの設定
- ステップ 3:タスクのコミットと公開
ステップ 1:バッチ同期ノードの作成
- ワークフローを作成します。詳細については、「ワークフローの作成」をご参照ください。
- バッチ同期ノードを作成します。
バッチ同期ノードは、次の 2 つの方法のいずれかで作成します:
- 方法 1:ワークフローを展開し、データ統合 を右クリックして、 を選択します。
- 方法 2:ワークフロー名をダブルクリックします。データ統合 フォルダから、Batch Synchronization ノードを右側のワークフロー編集キャンバスにドラッグします。
- 表示されたダイアログボックスで、バッチ同期ノードのパラメーターを設定します。
ステップ 2:バッチ同期タスクの設定
- 同期ネットワークリンクを設定します。
ソースデータソースと送信先データソース、およびタスク用のリソースグループを選択します。その後、接続性をテストします。説明
- ソースのシャーディングされたデータベースとテーブルから、送信先の単一テーブルにデータを同期できます。詳細については、「シャーディングされたデータベースとテーブルからのデータ同期」をご参照ください。
- データソースとリソースグループ間のネットワーク接続に失敗した場合は、画面の指示またはドキュメントに従ってネットワーク接続を設定してください。詳細については、「ネットワーク接続ソリューション」をご参照ください。
- スクリプトモードに切り替えます。
ツールバーの [スクリプトに切り替え] アイコンをクリックします。
スクリプトが設定されていない場合は、ツールバーの
アイコンをクリックし、プロンプトに従ってスクリプトテンプレートをインポートできます。 - スクリプトを編集して同期タスクを設定します。
次のコードは、スクリプトモードにおけるスクリプトの一般的な設定を示しています:説明
typeおよびversionフィールドにはデフォルト値が設定されており、変更できません。- スクリプト内の
Processor設定は無視してかまいません。設定は不要です。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "plugin_name", "parameter": {...}, "name": "Reader", "category": "reader" }, { "stepType": "plugin_name", "parameter": {...}, "name": "Writer", "category": "writer" }, { "name": "Processor", "stepType": null, "category": "processor", "copies": 1, "parameter": {...} } ], "setting": { "executeMode": null, "errorLimit": { "record": "" }, "speed": { "concurrent": 2, "throttle": false } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }- Reader と Writer の基本情報とフィールドマッピングを設定します。
これらのパラメーターを設定することで、以下の操作を実行できます。詳細については、各プラグインのドキュメントをご参照ください:「サポートされているデータソースと Reader および Writer プラグイン」。
- Reader
操作 説明 同期範囲の設定 一部のプラグインは、増分同期を実行するためのフィルターパラメーターをサポートしています。たとえば、MySQL Reader プラグインを使用して MySQL データを同期する場合、where パラメーターを DataWorks のスケジューリングパラメーターと組み合わせて使用することで、増分同期を実装できます。詳細については、「シナリオ:増分バッチ同期タスクの設定」をご参照ください。 説明- プラグインが増分同期をサポートするかどうか、またその実装方法はプラグインによって異なります。詳細については、特定のプラグインのドキュメントをご参照ください。
- 増分同期をサポートするプラグインを使用している場合でも、データフィルター条件を指定しないと、デフォルトで完全なデータ同期が実行されます。
- スケジューリングプロパティを設定する際に、データフィルターと送信先テーブルの設定で定義した変数に値を割り当てることができます。これにより、増分データや完全なデータを送信先テーブルの対応する時間パーティションに書き込むなどの操作が可能になります。スケジューリングパラメーターの詳細については、「サポートされているスケジューリングパラメーターのフォーマット」をご参照ください。
- 増分同期のフィルター条件の構文は、データベースの構文とほぼ同じです。同期中、バッチ同期タスクは完全な SQL ステートメントを組み立てて、データソースからデータを抽出します。
リレーショナルデータベースのシャードキーの設定 ソースデータ内でシャーディングに使用するフィールドを指定します。タスクは、このフィールドに基づいて複数のサブタスクに分割され、データを並行して読み取ります。説明- テーブルのプライマリキーを
splitPkの値として使用することを推奨します。プライマリキーは通常、データ分散が均等であるため、シャードでのデータホットスポットの発生を防ぐのに役立ちます。 - 現在、
splitPkは整数ベースのシャーディングのみをサポートしており、文字列、浮動小数点数、日付などの他の型はサポートしていません。サポートされていない型の列を指定した場合、splitPkパラメーターは無視され、タスクは単一チャネルを使用してデータを同期します。 splitPkを指定しない場合、またはその値が空の場合、データ同期は単一チャネルで実行されます。- すべてのプラグインが、シャードキーを指定してタスクのシャーディングロジックを設定することをサポートしているわけではありません。ここで提供される情報は一例です。詳細については、特定のプラグインのドキュメントをご参照ください。詳細については、「サポートされているデータソースと Reader および Writer プラグイン」をご参照ください。
送信先フィールドへの値の割り当て 定数値や変数 ('123' や '${variable_name}' など) が入力された列を出力に追加できます。ここで定義した変数には、次のステップでスケジューリングプロパティを設定する際に値を割り当てることができます。スケジューリングパラメーターの詳細については、「サポートされているスケジューリングパラメーターのフォーマット」をご参照ください。 ソーステーブルフィールドの編集 ソースデータベースでサポートされている関数を使用して、フィールドを処理できます。たとえば、 Max(id)を使用して、ID 値が最大のレコードのみを同期します。説明 MaxCompute Reader は関数をサポートしていません。 - Writer
操作 説明 同期前後の SQL ステートメントの設定 一部のデータソースでは、データを書き込む前後に送信先で SQL ステートメントを実行できます。 例:MySQL Writer は preSql と postSql の設定をサポートしており、MySQL にデータが書き込まれる前後に MySQL コマンドを実行できます。たとえば、[同期前 SQL ステートメント] (preSql) パラメーターで、
truncate table tablenameコマンドを指定して、同期タスクが開始される前にテーブルから古いデータをクリアできます。競合時の書き込みモードの設定 送信先にデータを書き込む際の、パスやプライマリキーの競合などの処理方法を定義します。利用可能なオプションは、送信先データソースと Writer プラグインによって異なります。設定の詳細については、特定の Writer プラグインのドキュメントをご参照ください。
- Reader
- チャネル制御。
setting セクションで、同時実行数、転送レート、ダーティデータ処理などのパフォーマンス設定を構成できます。
パラメーター 説明 executeMode (分散処理能力) タスクを分散モードで実行するかどうかを決定します。 distribute:分散モードを有効にします。このモードでは、タスクが分割され、複数の実行ノードに分散されて並列実行されます。これにより、同期速度がクラスターのサイズに応じて水平にスケールし、シングルノードのパフォーマンスボトルネックを克服できます。null:分散モードを無効にします。設定された同時実行数は単一ノードのスレッドに制限され、タスクは複数のマシンの計算能力を活用できません。
重要- データ統合専用リソースグループにマシンが 1 台しかない場合は、分散モードを使用しないでください。
- 単一のマシンでパフォーマンス要件がすでに満たされている場合は、タスクの実行を簡素化するためにシングルノードモードを使用することを推奨します。
- 分散モードは、同時実行数が 8 以上に設定されている場合にのみ有効にできます。
- 分散モードのサポートはデータソースによって異なります。詳細については、特定のプラグインのドキュメントをご参照ください。
concurrent (想定される最大同時実行数) ソースからの読み取りと送信先への書き込みのための最大同時実行スレッド数を指定します。 説明 リソース仕様などの要因により、実行中の実際の同時実行数は設定値以下になる場合があります。請求は、実際に使用された同時実行数に基づいて行われます。詳細については、「パフォーマンスメトリック」をご参照ください。throttle (転送レート) 転送レートを制御します。 true:速度制限を有効にします。これにより、データ抽出速度を制限することで、ソースデータベースを過剰な負荷から保護します。最小レートは 1 MB/s です。説明throttleを true に設定した場合は、mbps パラメーターも設定して最大転送レートを指定する必要があります。false:速度制限を無効にします。タスクは、ハードウェア環境と設定された同時実行数に基づいて、最大利用可能帯域幅を使用します。
説明 転送レートは Data Integration 内部のメトリックであり、実際のネットワークインターフェースカード (NIC) のトラフィックを表すものではありません。通常、NIC のトラフィックは、ストレージシステムのデータシリアル化に応じて、チャネルトラフィックの 1〜2 倍になります。errorLimit (エラーレコード制御) ダーティデータの許容度を設定します。 重要 ダーティデータが多すぎると、全体の同期速度が低下する可能性があります。- このパラメーターが設定されていない場合、デフォルトでダーティデータは許可され、ダーティデータが生成されてもタスクは実行を継続します。
- このパラメーターが
0に設定されている場合、ダーティデータは許可されません。ダーティデータが生成されるとタスクは失敗します。 - ダーティデータを許可し、しきい値を設定した場合:
- ダーティデータのレコード数がしきい値内であれば、そのレコードは無視され (送信先には書き込まれません)、タスクは正常に実行を継続します。
- ダーティデータのレコード数がしきい値を超えると、タスクは失敗します。
説明 ダーティデータとは? ダーティデータとは、ビジネスにとって無意味なデータ、フォーマットが無効なデータ、または同期中にエラーを引き起こすデータのことです。送信先データソースへの書き込みに失敗したレコードは、すべてダーティデータと見なされます。たとえば、ソースから VARCHAR データを送信先の INT 列に書き込もうとすると、変換エラーが発生してレコードは書き込まれません。このレコードはダーティデータと見なされます。ダーティデータを許可するかどうか、およびダーティデータのレコード数の上限を設定できます。ダーティデータのレコード数が指定された上限を超えると、タスクは失敗します。
説明 これらの設定に加えて、全体の同期速度は、ソースデータソースのパフォーマンスやネットワーク環境などの要因にも影響されます。転送レートとパフォーマンスチューニングの詳細については、「バッチ同期タスクの速度を高速化または制限する」をご参照ください。
- [次へ] をクリックして、スケジューリングプロパティを設定します。
バッチ同期タスクを定期的に実行するには、そのスケジューリングプロパティを設定する必要があります。スケジューリングパラメーターの詳細については、「Data Integration でのスケジューリングパラメーターの使用」をご参照ください。
- ノードスケジューリングプロパティの設定:前のステップで定義した変数にスケジューリングパラメーターを割り当てます。定数と変数の両方を割り当てることができます。
- 時間プロパティの設定:本番環境でタスクが定期的にスケジュールされる方法を定義します。インスタンス生成モード、スケジューリングタイプ、スケジューリング周期などのプロパティを設定できます。
- リソースプロパティの設定:タスクを Data Integration の実行リソースグループに送信するために使用されるスケジューリングリソースグループを定義します。このセクションでタスクを実行するためのリソースグループを選択できます。説明 Data Integration のバッチタスクは、スケジューリングリソースグループによって対応する Data Integration の実行リソースグループに送信されます。このプロセスには、スケジューリング関連の料金が発生します。送信メカニズムの詳細については、「DataWorks リソースグループの概要」をご参照ください。
ステップ 3:タスクのコミットと公開
タスクを定期的なスケジュールで実行する必要がある場合は、本番環境にデプロイする必要があります。タスクのデプロイの詳細については、「タスクのデプロイ」をご参照ください。