バッチデータ同期タスクをより詳細に設定するには、[Code Editor] を使用します。コードエディターでは、データ同期用の JSON スクリプトを記述し、DataWorks のスケジューリングパラメーターを使用して、単一のソーステーブルまたはシャーディングされたテーブルから宛先データテーブルへ、完全データまたは増分データを定期的に同期できます。このトピックでは、このようなタスクの一般的な設定について説明します。設定はデータソースによって異なります。詳細については、「データソース一覧」をご参照ください。
シナリオ
以下のシナリオでは、コードエディターを使用して同期タスクを設定します。
-
データソースがコードレス UI での設定をサポートしていません。
説明データソースがコードレス UI をサポートしているかどうかは、ユーザーインターフェイスに表示されます。
たとえば、宛先データソースとして HBase11xsql を選択すると、ネットワークとリソースの設定ページに「The current data source type does not support task editing in the codeless UI. The task will be configured in the Code Editor.」という黄色の警告メッセージが表示されます。この場合、ツールバーの [コードエディター] ボタンをクリックしてモードを切り替え、タスクを設定します。
-
一部のデータソースの 設定パラメーター は、コードエディターでのみ利用可能です。
-
コードエディターを使用して、DataWorks で直接作成できない一部のデータソースを設定できます。
前提条件
-
必要なソースデータソースと宛先データソースが DataWorks のデータソース管理セクションで設定されていることを確認してください。詳細については、「データソース一覧」をご参照ください。
説明-
バッチデータ同期でサポートされているデータソースとその設定の詳細については、「サポートされているデータソースと同期ソリューション」をご参照ください。
-
データソースの機能の詳細については、「データソース設定」をご参照ください。
-
適切な仕様のリソースグループを購入し、ワークスペースにバインドします。詳細については、「サーバーレスリソースグループの使用」をご参照ください。
リソースグループとデータソース間のネットワーク接続を確立します。詳細については、「ネットワーク接続の設定」をご参照ください。
ステップ 1:バッチ同期ノードの作成
DataStudio (新バージョン)
DataWorks コンソールにログインします。左側のナビゲーションウィンドウで、を選択します。ドロップダウンリストから目的のワークスペースを選択し、[<p><a href={url} target="_blank">Learn more.</a></p>][Data Studio]をクリックします。
ワークフローを作成します。詳細については、「ワークフロー」をご参照ください。
次のいずれかの方法で Data Integration ノードを作成します。
方法 1: ワークフローリストの右上隅にある
アイコンをクリックし、 を選択します。方法 2: ワークフロー名をダブルクリックし、Data Integration フォルダーから Data Integration ノードを右側のワークフローキャンバスにドラッグします。
ソースとターゲットのタイプを設定し、Single Table Batch Sync を選択して、OK をクリックします。
DataStudio (旧バージョン)
DataWorks コンソールにログインします。左側のナビゲーションペインで、 を選択します。ドロップダウンリストから目的のワークスペースを選択し、データ分析 をクリックします。
ワークフローを作成します。詳細については、「ワークフローの作成」をご参照ください。
次のいずれかの方法でバッチ同期ノードを作成します。
方法 1: ワークフローを展開し、Data Integration を右クリックして、 を選択します。
方法 2: ワークフロー名をダブルクリックし、Data Integration フォルダーから Batch Synchronization ノードを右側のワークフローキャンバスにドラッグします。
画面の指示に従ってバッチ同期ノードを作成します。
ステップ 2:データソースとリソースグループの設定
どのステップでも、コードレス UI からコードエディターに切り替えることができます。JSON スクリプトに情報を自動入力させるには、次の順序に従うことを推奨します。
-
コードレス UI で、データソースとリソースグループを選択し、「ネットワーク接続性」をテストします。
-
コードエディターに切り替えます。
システムが、生成された JSON スクリプトにこの情報を自動的に入力します。
または、コードエディターに直接切り替えて、タスクを手動で設定することもできます。JSON コードでデータソースを指定し、右側の [詳細設定] パネルで、タスクに必要なリソースグループとリソースサイズを設定します。
-
作成したリソースグループが表示されない場合は、ワークスペースに関連付けられているか確認してください。詳細については、「サーバーレスリソースグループの使用」および「Data Integration 専用リソースグループの使用」をご参照ください。
-
推奨されるリソースクォータについては、「Data Integration リソースグループのパフォーマンスメトリック」をご参照ください。
ステップ 3:コードエディターへの切り替えとテンプレートのインポート
ツールバーで、コードエディターの
アイコンをクリックします。
スクリプトがまだ設定されていない場合は、ツールバーの
アイコンをクリックして、プロンプトに従ってスクリプトテンプレートを素早くインポートできます。
ステップ 4:スクリプトの編集とタスクの設定
以下のコードは、コードエディターでの一般的な設定を示しています。
-
typeフィールドとversionフィールドにはデフォルト値が設定されており、変更できません。 -
スクリプト内の
Processor関連の設定は無視してかまいません。
{
"type":"job",
"version":"2.0",
"steps":[
{
"stepType":"plugin_name",
"parameter":{...},
"name":"Reader",
"category":"reader"
},
{
"stepType":"plugin_name",
"parameter":{...},
"name":"Writer",
"category":"writer"
},
{
"name":"Processor",
"stepType":null,
"category":"processor",
"copies":1,
"parameter":{...}
}
],
"setting":{
"executeMode":null,
"errorLimit":{
"record":""
},
"speed":{
"concurrent":2,
"throttle":false
},
"timeZone":"Asia/Shanghai"
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
-
Reader と Writer の基本情報とフィールドマッピングを設定します。
重要プラグインの設定は、プラグインによって異なります。以下の例では、一般的なパラメーターについて説明します。プラグインが特定の設定をサポートしているか、またその実装方法を確認するには、プラグインのドキュメントをご参照ください。詳細については、「データソース一覧」にある各データソースの [Reader スクリプトデモ] および [Writer スクリプトデモ] をご参照ください。
以下のパラメーターを使用できます。
-
Reader
操作
説明
where(同期範囲の設定)一部のソースタイプはデータフィルタリングをサポートしています。条件 (
WHERE句から where キーワードを除いたもの) を指定してソースデータをフィルタリングできます。実行時に、タスクは条件を満たすデータのみを同期します。詳細については、「シナリオ: 増分バッチ同期タスクを設定する」をご参照ください。増分同期を実装するには、このフィルター条件をスケジューリングパラメーターと組み合わせて動的に設定できます。たとえば、
gmt_create >= '${bizdate}'を使用すると、タスクが実行されるたびに当日の新しいデータのみが同期されます。また、スケジューリングプロパティでこの変数に値を割り当てる必要があります。詳細については、「サポートされているスケジューリングパラメーターの形式」をご参照ください。増分同期の設定方法は、データソース (プラグイン) によって異なります。
データフィルター条件が設定されていない場合、デフォルトでテーブル内のすべてのデータが同期されます。
splitPk(リレーショナルデータベースの分割キーの設定)ソースデータを分割するために使用するフィールドを定義します。実行時に、同期タスクはこのフィールドに基づいて複数のサブタスクに分割され、並列でデータを読み取ることができます。
-
通常、主キーは均等に分散されており、シャーディングされたテーブルでのデータホットスポットの防止に役立つため、テーブルの主キーを分割キーとして使用することを推奨します。
-
現在、
splitPkは整数データの分割のみをサポートしています。文字列、浮動小数点数、日付はサポートしていません。サポートされていない型を指定した場合、Data Integration は splitPk の設定を無視し、データは単一チャネルで同期されます。 -
分割キーを指定しないか、その値が空の場合、データは単一チャネルで同期されます。
-
すべてのプラグインが、タスク分割ロジックを設定するための分割キーの指定をサポートしているわけではありません。上記の情報は一例です。詳細については、特定のプラグインのドキュメントをご参照ください。詳細については、「サポートされているデータソースと同期ソリューション」をご参照ください。
column(ソースフィールドの定義)column配列で同期するソースフィールドを定義します。定数、変数、関数をカスタムフィールドとして宛先に書き込むことができます。例:'123'、'${variable_name}'、'now()'。 -
-
Writer
操作
説明
preSql&postSql(同期前後に実行するステートメントの設定)一部のデータソースでは、データが書き込まれる前 (同期前) と後 (同期後) に、宛先で SQL ステートメントを実行できます。
たとえば、MySQL Writer は、データの書き込み前または書き込み後にコマンドを実行する
preSqlとpostSqlをサポートしています。同期が開始される前にtruncate table tablenameコマンドでテーブルをクリアするには、[インポート前のステートメント] (preSql) を使用できます。writeMode(競合時の書き込みモードの定義)この設定は、データソースの機能と Writer プラグインのサポート状況に依存します。
-
-
チャネル制御
設定セクションで、同時実行性、同期レート、ダーティデータの処理などのパフォーマンス設定を構成できます。
パラメーター
説明
executeMode(分散処理機能)現在のタスクを分散モードで実行するかどうかを制御します。
-
distribute:分散処理を有効にします。このモードでは、タスクをシャードに分割し、複数の実行ノードに分散して並列実行します。これにより、同期速度をクラスターサイズに応じて水平にスケールさせ、単一ノードのボトルネックを解消できます。
-
null:分散処理を無効にします。このモードでは、並行処理は単一マシン上のプロセスに限定され、複数マシンでの計算は利用できません。
重要-
マシンが 1 台しかない Data Integration 専用リソースグループを使用している場合、複数マシンのリソースを活用できないため、分散モードの使用は推奨されません。
-
単一マシンで既に速度要件を満たしている場合は、タスクの実行を簡素化するために単一ノードモードを使用することを推奨します。
-
分散処理を有効にするには、8 以上の同時実行数が必要です。
-
一部のデータソースは、分散モードでのタスク実行をサポートしています。詳細については、特定のプラグインのドキュメントをご参照ください。
-
分散処理を有効にすると、より多くのリソースを消費します。実行時にメモリ不足 (OOM) エラーが発生した場合は、このスイッチを無効にしてみてください。
concurrent(最大期待同時実行数)ソースからの並列読み取りや宛先への書き込みに使用する最大スレッド数を定義します。
説明リソース仕様やその他の要因により、実行時の実際の同時実行数は設定値以下になる場合があります。デバッグリソースグループの料金は、実際の同時実行数に基づきます。詳細については、「パフォーマンスメトリック」をご参照ください。
throttle(同期レート)同期レートを制御します。
-
true:スロットリングを有効にします。これにより、高い抽出速度によってソースデータベースに過負荷をかけるのを防ぎます。最小レート制限は 1 MB/s です。
説明throttleをtrueに設定する場合、mbps (同期レート) パラメーターも設定する必要があります。 -
false:スロットリングを無効にします。スロットリングがない場合、タスクはハードウェア環境と設定された同時実行数制限内で利用可能な最大転送性能を使用します。
説明トラフィックメトリックは Data Integration 内部のものであり、実際のネットワークインターフェイスカード (NIC) のトラフィックを表すものではありません。通常、NIC トラフィックはチャネルトラフィックの 1〜2 倍です。実際のトラフィック増加量は、特定のデータストレージシステムのシリアル化に依存します。
errorLimit(ダーティデータレコード数の制御)ダーティデータのしきい値と、それがタスクに与える影響を定義します。
重要過剰なダーティデータは、タスクの全体的な同期速度を低下させる可能性があります。
-
設定されていない場合、デフォルトではダーティデータが許可され、その存在はタスクの実行に影響しません。
-
0 に設定した場合、ダーティデータは許可されません。同期中にダーティデータが生成されると、タスクは失敗して終了します。
-
ダーティデータが許可され、しきい値が設定されている場合:
-
ダーティデータの量がしきい値内である場合、同期タスクはダーティデータを無視し (宛先には書き込まれません)、実行を継続します。
-
ダーティデータの量がしきい値を超えた場合、同期タスクは失敗して終了します。
-
説明ダーティデータとは、ビジネスにとって無意味なデータ、形式が無効なデータ、または同期中に問題を引き起こすデータのことです。単一のレコードを宛先に書き込む際に例外が発生した場合、そのレコードはダーティデータとして扱われます。したがって、書き込みに失敗したレコードはすべてダーティデータに分類されます。
たとえば、ソースから VARCHAR 型のデータを宛先の INT 型の列に書き込むと、変換エラーが発生し、データが宛先に書き込まれなくなる可能性があります。同期中にダーティデータを許可するかどうかを制御し、ダーティデータレコード数のしきい値を設定できます。カウントが指定された数を超えると、タスクは失敗します。
timeZone(タイムゾーン設定)同期タスクのタイムゾーンを設定します。このパラメーターは、ソースと宛先の両方で時間関連フィールドの変換に影響します。
設定例:
"timeZone":"Asia/Shanghai"。-
このパラメーターは、コードエディターの設定セクションでのみ設定できます。コードレス UI では、宛先のタイムゾーンを設定できません。
-
タイムゾーン値は、
Asia/ShanghaiやAmerica/New_Yorkなどの標準 IANA タイムゾーン形式を使用します。 -
設定されていない場合、Data Integration はシステムのデフォルトタイムゾーンを使用します。
説明これらの設定に加えて、全体的な同期速度は、ソースデータソースのパフォーマンスやネットワーク環境などの要因にも影響を受けます。同期レートと最適化の詳細については、「バッチ同期タスクの最適化」をご参照ください。
-
ステップ 5:スケジューリングプロパティの設定
定期的にスケジュールされる単一テーブルバッチ同期タスクについては、その自動スケジューリングプロパティを設定する必要があります。ノード設定ページで、右側のパネルにある Scheduling Settings をクリックして、ノードのスケジューリングプロパティを設定します。
同期タスクに対して、スケジューリングパラメーター、スケジューリングポリシー、スケジュール時刻、スケジューリング依存関係を設定する必要があります。設定方法は他の Data Studio ノードと同じであるため、ここでは繰り返しません。
Data Studio (新バージョン) については、「ノードスケジューリング (新バージョン)」をご参照ください。
Data Studio (旧バージョン) については、「ノードスケジューリング設定 (旧バージョン)」をご参照ください。
スケジューリングパラメーターの使用方法については、「Data Integration でのスケジューリングパラメーター使用の典型的なシナリオ」をご参照ください。
ステップ 6:タスクの送信と公開
実行パラメーターを設定します。
タスク設定ページで、右側のパネルにある Run Configuration をクリックし、テスト実行用に次のパラメーターを設定します。
パラメーター
説明
[Resource Group]
データソースに接続されているリソースグループを選択します。
[Script Parameters]
データ同期タスクのプレースホルダーパラメーターに値を割り当てます。たとえば、タスクが
${bizdate}パラメーターを使用する場合、yyyymmdd形式で日付パラメーターを入力します。タスクを実行します。
ツールバーの
実行アイコンをクリックして、Data Studio でタスクを実行し、デバッグします。タスクが完了したら、ターゲットデータソースと同じ種類のノードを作成してテーブルデータをクエリし、結果を検証できます。タスクを発行します。
テスト実行が成功した後、タスクをスケジュールで実行する必要がある場合は、ノード設定ページの上部にある
アイコンをクリックして、タスクを本番環境に発行します。タスクの発行の詳細については、「タスクの発行」をご参照ください。
次のステップ
タスクを本番環境に発行した後、オペレーションセンターに移動してスケジュールされたタスクを表示します。タスクの実行と管理、ステータスの監視、リソースグループのメンテナンスの詳細については、「バッチ同期タスクの操作」をご参照ください。