データ統合は、コードを記述することなく、ソーステーブル (シャーディングされたテーブルを含む) からターゲットテーブルに完全データまたは増分データを定期的に同期できるコードレス UI を提供します。UI でソースと宛先を選択し、DataWorks でスケジューリングパラメーターを設定することで、同期タスクを設定できます。このトピックでは、コードレス UI でのバッチ同期タスクの一般的な設定について説明します。設定は、データソースによって異なる場合があります。詳細については、「サポートされているデータソースと同期ソリューション」をご参照ください。
準備
データソースを構成します。Data Integration 同期タスクを構成する前に、DataWorks の[データソース管理]でソースデータベースとターゲットデータベースを構成しておく必要があります。データソース構成の詳細については、「データソースリスト」をご参照ください。
説明バッチ同期でサポートされているデータソースとその設定の詳細については、「サポートされているデータソースと同期ソリューション」をご参照ください。
データソースの特徴の詳細については、「データソース管理」をご参照ください。
適切な仕様のリソースグループを購入し、ワークスペースにアタッチします。詳細については、「データ統合に Serverless リソースグループを使用する」および「データ統合専用リソースグループを使用する」をご参照ください。
リソースグループとデータソース間のネットワーク接続を確立します。詳細については、「ネットワーク接続の設定」をご参照ください。
ステップ 1: バッチ同期ノードの作成
新しいデータ開発
DataWorks コンソールにログインします。ターゲットリージョンに切り替えます。左側のナビゲーションウィンドウで、 を選択します。ドロップダウンリストから目的のワークスペースを選択し、[DataStudio に入る] をクリックします。
ワークフローを作成します。詳細については、「ワークフローの編成」をご参照ください。
バッチ同期ノードを作成します。次のいずれかの方法を使用できます。
方法 1: ワークフローリストの右上隅にある
アイコンをクリックし、 を選択します。方法 2: ワークフロー名をダブルクリックし、[データ統合] ディレクトリから [バッチ同期] ノードを右側のワークフローエディターにドラッグします。
ノードの基本情報、ソース、および宛先を設定します。次に、[確認] をクリックします。
以前のデータ開発
DataWorks コンソールにログインします。ターゲットリージョンに切り替えます。左側のナビゲーションウィンドウで、 をクリックします。ドロップダウンリストから目的のワークスペースを選択し、[データ開発に入る] をクリックします。
ワークフローを作成します。詳細については、「ワークフローの作成」をご参照ください。
バッチ同期ノードを作成します。次のいずれかの方法を使用できます。
方法 1: ワークフローを展開し、[データ統合] を右クリックして、 を選択します。
方法 2: ワークフロー名をダブルクリックし、[データ統合] ディレクトリから [バッチ同期] ノードを右側のワークフローエディターにドラッグします。
プロンプトに従ってバッチ同期ノードを作成します。
ステップ 2: データソースとリソースグループの設定
バッチ同期タスクのソースデータソースとターゲットデータソースを選択します。
タスクを実行するためのリソースグループとリソースクォータを選択します。推奨されるリソースクォータ設定については、「データ統合のパフォーマンスメトリック」をご参照ください。
データソースとリソースグループ間のネットワーク接続をテストします。接続に失敗した場合は、プロンプトに従って、またはドキュメントの説明に従ってネットワーク接続を設定します。詳細については、「ネットワーク接続の設定」をご参照ください。
リソースグループを作成したのに表示されない場合は、リソースグループがワークスペースにアタッチされているかどうかを確認してください。詳細については、「データ統合に Serverless リソースグループを使用する」および「データ統合専用リソースグループを使用する」をご参照ください。
Serverless リソースグループでは、同期タスクのコンピューティングユニット (CU) の上限を指定できます。リソース不足によるメモリ不足 (OOM) エラーで同期タスクが失敗した場合は、リソースグループの CU 使用量を調整できます。
ステップ 3: ソースと宛先の設定
ソースセクションと宛先セクションで、データの読み取り元と書き込み先のテーブルを設定します。同期するデータ範囲を指定することもできます。
プラグインの設定は異なる場合があります。次のセクションでは、一般的な設定の例を示します。プラグインが特定の設定をサポートしているかどうか、およびその実装方法を確認するには、そのプラグインのドキュメントをご参照ください。詳細については、「データソースリスト」をご参照ください。
ソース
一部のソースタイプはデータフィルタリングをサポートしています。条件 (
WHERE句 (`where` キーワードなし)) を指定して、ソースデータをフィルタリングできます。実行時に、タスクは条件を満たすデータのみを同期します。詳細については、「シナリオ: 増分データ用のバッチ同期タスクを設定する」をご参照ください。増分同期を実行するには、このフィルター条件をスケジューリングパラメーターと組み合わせて動的にすることができます。たとえば、
gmt_create >= '${bizdate}'を使用すると、タスクは実行されるたびに、その日の新しいデータのみを同期します。スケジューリングプロパティを設定する際に、ここで定義された変数に値を割り当てる必要もあります。詳細については、「サポートされているスケジューリングパラメーターのフォーマット」をご参照ください。増分同期を設定するメソッドは、データソース (プラグイン) によって異なります。
フィルター条件を設定しない場合、タスクはデフォルトでテーブルからすべてのデータを同期します。
`splitPk` にはテーブルのプライマリキーを使用することをお勧めします。プライマリキーは通常、均等に分散されるためです。これにより、作成されたシャードでのデータホットスポットを防ぐことができます。
現在、`splitPk` はシャーディングに整数データのみをサポートしています。文字列、浮動小数点数、日付、またはその他のタイプはサポートしていません。サポートされていないタイプを指定した場合、`splitPk` 機能は無視され、タスクは単一のチャンネルを使用して同期します。
`splitPk` を指定しない場合、または値が空の場合、データ同期タスクは単一のチャンネルを使用してテーブルデータを同期します。
すべてのプラグインがタスクシャーディングロジックを設定するためのシャードキーの指定をサポートしているわけではありません。上記の情報は一例です。特定のプラグインのドキュメントをご参照ください。詳細については、「サポートされているデータソースと同期ソリューション」をご参照ください。
データ処理
重要データ処理は、新しいデータ開発で利用可能な機能です。以前のバージョンを使用している場合は、この機能を使用するためにワークスペースをアップグレードする必要があります。アップグレード方法については、「DataStudio アップグレードガイド」をご参照ください。
データ処理を使用すると、処理されたデータを宛先テーブルに書き込む前に、文字列の置換、AI 支援処理、データベクトル化などのメソッドを使用してソーステーブルのデータを処理できます。

スイッチをクリックしてデータ処理をオンにします。
[データ処理リスト] で [ノードの追加] をクリックし、データ処理タイプ ([文字列の置換]、[AI 支援処理]、または [データベクトル化]) を選択します。複数のデータ処理ノードを追加でき、DataWorks はそれらを順次処理します。
プロンプトに従ってデータ処理ルールを設定します。AI 支援処理とデータベクトル化については、「インテリジェントデータ処理」をご参照ください。
説明データ処理には追加の計算リソースが必要であり、これによりデータ同期タスクのリソースオーバーヘッドと実行時間が増加します。同期効率に影響を与えないように、処理ロジックはできるだけシンプルにしてください。
宛先
操作
説明
同期の前後に実行するステートメントの設定
一部のデータソースでは、データが書き込まれる前 (同期前) とデータが書き込まれた後 (同期後) に宛先で SQL ステートメントを実行できます。
MySQL Writer は `preSql` および `postSql` 設定項目をサポートしており、データが MySQL に書き込まれる前または後に MySQL コマンドを実行できます。たとえば、[Pre-SQL Statement] (preSql) 設定項目で MySQL コマンド
truncate table tablenameを設定して、同期前にテーブルから既存のデータをクリアできます。競合時の書き込みモードの定義
パスやプライマリキーの競合などが発生した場合に、宛先にデータを書き込む方法を定義します。この設定は、データソースの属性とライタープラグインのサポートによって異なります。設定の詳細については、特定のライタープラグインのドキュメントをご参照ください。
操作 | 説明 |
同期範囲の設定 | |
リレーショナルデータベースのシャードキーの設定 | ソースデータ内のシャードキーとして使用されるフィールドを定義します。同期タスクは、このキーに基づいてデータを複数のタスクに分割し、同時バッチデータ読み取りを行います。 |
ステップ 4: フィールドマッピングの設定
ソースと宛先を選択した後、ソース列と宛先列のマッピングを指定する必要があります。タスクは、これらのマッピングに基づいてソースフィールドから対応する宛先フィールドにデータを書き込みます。
同期中にソースと宛先の間でフィールドタイプが一致しない場合、ダーティデータが生成され、書き込みに失敗する可能性があります。ダーティデータの許容値を設定するには、次のステップの [チャネル制御] 設定をご参照ください。
ソースフィールドが宛先フィールドにマッピングされていない場合、そのデータは同期されません。
自動マッピングが期待どおりでない場合は、手動でマッピングを調整できます。
特定のフィールドのマッピングが不要な場合は、ソースフィールドと宛先フィールドを接続する線をを手動で削除できます。そのソースフィールドのデータは同期されません。
名前によるマッピングと行によるマッピングがサポートされています。次の操作も実行できます。
宛先フィールドへの値の割り当て: 行の追加 を使用して、'123'、'${scheduling_parameter}'、'#{built_in_variable}#' などの定数、スケジューリングパラメーター、または組み込み変数を宛先テーブルに追加できます。
説明次のステップでスケジューリングを設定する際に、スケジューリングパラメーターに値を割り当てることができます。スケジューリングパラメーターの使用方法の詳細については、「サポートされているスケジューリングパラメーターのフォーマット」をご参照ください。
組み込み変数の追加: 組み込み変数を手動で追加し、それらを宛先フィールドにマッピングして、ダウンストリームノードに出力できます。
各プラグインで利用可能な組み込み変数は次のとおりです。
組み込み変数
説明
サポートされているプラグイン
'
#{DATASOURCE_NAME_SRC}#'ソースデータソース名
MySQL Reader
MySQL (sharded) Reader
PolarDB Reader
PolarDB (sharded) Reader
PostgreSQL Reader
PolarDB-O Reader
PolarDB-O (sharded) Reader
'
#{DB_NAME_SRC}#'ソーステーブルが配置されているデータベースの名前
MySQL Reader
MySQL (sharded) Reader
PolarDB Reader
PolarDB (sharded) Reader
PostgreSQL Reader
PolarDB-O Reader
PolarDB-O (sharded) Reader
'
#{SCHEMA_NAME_SRC}#'ソーステーブルが配置されているスキーマの名前
PolarDB Reader
PolarDB (sharded) Reader
PostgreSQL Reader
PolarDB-O Reader
PolarDB-O (sharded) Reader
'
#{TABLE_NAME_SRC}#'ソーステーブル名
MySQL Reader
MySQL (sharded) Reader
PolarDB Reader
PolarDB (sharded) Reader
PostgreSQL Reader
PolarDB-O Reader
PolarDB-O (sharded) Reader
[ソースフィールドの編集]: [手動でマッピングを編集] をクリックして、次の操作を実行します。
ソースデータベースでサポートされている関数を使用してフィールドを処理します。たとえば、`Max(id)` を使用して最大値のみを同期できます。
フィールドマッピングプロセス中にすべてのフィールドがプルされなかった場合は、ソースフィールドを手動で編集します。
説明MaxCompute Reader は関数の使用をサポートしていません。
ステップ 5: チャネルの設定
新しいデータ開発では、チャネルの設定機能はタスク設定インターフェイスの右側にある [高度な設定] セクションにあります。
チャネル制御を使用して、データ同期プロセスに関連するプロパティを設定できます。パラメーターの詳細については、「バッチ同期の同時実行性とスロットリングの関係」をご参照ください。
パラメーター | 説明 |
[最大同時実行数] | 現在のタスクでソースからの同時読み取りまたは宛先への同時書き込みを行うスレッドの最大数を定義します。 説明
|
[同期レート] | 同期レートを制御します。
説明 トラフィックメジャーはデータ統合自体のメトリックであり、実際のネットワークインターフェイスカード (NIC) トラフィックを表すものではありません。通常、NIC トラフィックはチャネルトラフィックの 1 ~ 2 倍です。実際のトラフィックの増加は、データストレージシステムの転送シリアル化に依存します。 |
[ダーティデータポリシー] | ダーティデータとは、タイプの競合や制約違反などの例外により、宛先への書き込みに失敗したレコードを指します。バッチ同期はダーティデータポリシーの定義をサポートしており、ダーティデータの許容値とそのタスクへの影響を設定できます。
重要 ダーティデータが過剰にあると、同期タスク全体の速度に影響を与える可能性があります。 |
[分散処理能力] | 現在のタスクを実行するために分散モードを使用するかどうかを制御します。
同期パフォーマンスに対する要件が高い場合は、分散モードを使用できます。分散モードは、断片化されたマシンリソースも使用できるため、リソース使用率にフレンドリです。 重要
|
[タイムゾーン] | ソースと宛先でタイムゾーンをまたいだ同期が必要な場合は、ソースのタイムゾーンを設定してタイムゾーン変換を実行できます。 |
上記の設定に加えて、全体的な同期速度は、ソースデータソースのパフォーマンスや同期ネットワーク環境などの要因にも影響されます。同期速度と最適化の詳細については、「バッチ同期タスクの高速化または速度制限」をご参照ください。
ステップ 6: スケジューリングプロパティの設定
定期的にスケジュールされるバッチ同期タスクの場合、そのスケジューリングプロパティを設定する必要があります。ノードの編集ページで、右側の [スケジューling 設定] をクリックして設定します。
同期タスクには、スケジューリングパラメーター、スケジューリングポリシー、スケジューリング時間、およびスケジューリングの依存関係を設定する必要があります。設定プロセスは他のデータ開発ノードと同じであり、このトピックでは説明しません。
新しいデータ開発でのスケジューリング設定については、「ノードスケジューリング (新バージョン)」をご参照ください。
以前のデータ開発でのスケジューリング設定については、「ノードスケジューリング設定 (旧バージョン)」をご参照ください。
スケジューリングパラメーターの使用方法の詳細については、「Data Integration におけるスケジューリングパラメーターの一般的なシナリオ」をご参照ください。
ステップ 7: タスクのテストと公開
テストパラメーターの設定
バッチ同期タスクの設定ページで、右側の [テスト設定] をクリックし、次のパラメーターを設定してテストを実行できます。
設定項目
説明
[リソースグループ]
データソースに接続されているリソースグループを選択します。
[スクリプトパラメーター]
データ同期タスク内のプレースホルダーパラメーターに値を割り当てます。たとえば、タスクが
${bizdate}パラメーターで設定されている場合、yyyymmdd形式で日付パラメーターを設定する必要があります。タスクの実行
ツールバーの
実行アイコンをクリックして、データ開発でタスクを実行およびテストします。タスクの実行後、宛先テーブルタイプのノードを作成して宛先テーブルデータをクエリし、同期されたデータが期待どおりかどうかを確認できます。タスクの公開
タスクが正常に実行された後、定期的にスケジュールする必要がある場合は、ノード設定ページのツールバーにある
アイコンをクリックして、タスクを本番環境に公開します。タスクの公開方法の詳細については、「タスクの公開」をご参照ください。
制限事項
一部のデータソースは、コードレス UI でのバッチ同期タスクの設定をサポートしていません。
データソースを選択した後、コードレス UI がサポートされていないことを示すメッセージが表示された場合は、ツールバーの
アイコンをクリックしてコードエディターに切り替え、タスクの設定を続行します。詳細については、「コードエディターでタスクを設定する」をご参照ください。
コードレス UI は使いやすいですが、一部の高度な機能をサポートしていません。より詳細な構成管理が必要な場合は、ツールバーのスクリプトに変換アイコンをクリックしてコードエディターに切り替え、バッチ同期タスクを設定できます。
次のステップ
タスクが本番環境に公開された後、本番環境のオペレーションセンターに移動して、スケジュールされたタスクを表示できます。バッチ同期タスクの実行と管理、ステータスの監視、およびリソースグループでの O&M の実行方法の詳細については、「バッチ同期タスクの O&M」をご参照ください。