すべてのプロダクト
Search
ドキュメントセンター

DataWorks:コードレス UI での単一テーブルバッチ同期タスクの設定

最終更新日:Jun 23, 2026

Data Integration は、タスク開発をガイドするコードレス UI を提供します。コードレス UI を使用すると、ソースとターゲットを設定し、DataWorks のスケジューリングパラメーターを使用して、単一のソーステーブルまたはシャーディングされたデータベースとテーブルからターゲットテーブルへ、完全なデータまたは増分データを定期的に同期できます。このトピックでは、コードレス UI で単一テーブルバッチ同期タスクを設定する方法について説明します。具体的な設定はデータソースによって異なります。詳細については、「サポートされるデータソースと同期ソリューション」をご参照ください。

前提条件

  • DataWorks コンソールの [データソース管理] で、必要なソースデータベースとターゲットデータベースを設定します。詳細については、「データソースリスト」をご参照ください。

    説明
  • 適切な仕様のリソースグループを購入し、ワークスペースにバインドします。詳細については、「サーバーレスリソースグループの使用」をご参照ください。

  • リソースグループとデータソース間のネットワーク接続を確立します。詳細については、「ネットワーク接続の設定」をご参照ください。

  • 現在のワークスペースにバインドされていない MaxCompute テーブル (たとえば、プロジェクト間の同期) を同期する必要がある場合は、まずターゲットの MaxCompute プロジェクトを DataWorks データソースとして追加する必要があります。これにより、同期タスクでテーブルをソースまたはターゲットとして選択できるようになります。データソースの設定に関する詳細については、「データソース管理」をご参照ください。

ステップ 1:Data Integration ノードの作成

DataStudio (新バージョン)

  1. DataWorks コンソールにログインします。左側のナビゲーションウィンドウで、データ開発と О&М > DataStudioを選択します。ドロップダウンリストから目的のワークスペースを選択し、[<p><a href={url} target="_blank">Learn more.</a></p>][Data Studio]をクリックします。

  2. ワークフローを作成します。詳細については、「ワークフロー」をご参照ください。

  3. 次のいずれかの方法で Data Integration ノードを作成します。

    • 方法 1: ワークフローリストの右上隅にある image アイコンをクリックし、Create Node > Data Integration を選択します。

    • 方法 2: ワークフロー名をダブルクリックし、Data Integration フォルダーから Data Integration ノードを右側のワークフローキャンバスにドラッグします。

  4. ソースとターゲットのタイプを設定し、Single Table Batch Sync を選択して、OK をクリックします。

DataStudio (旧バージョン)

  1. DataWorks コンソールにログインします。左側のナビゲーションペインで、データ開発と О&М > DataStudio を選択します。ドロップダウンリストから目的のワークスペースを選択し、データ分析 をクリックします。

  2. ワークフローを作成します。詳細については、「ワークフローの作成」をご参照ください。

  3. 次のいずれかの方法でバッチ同期ノードを作成します。

    • 方法 1: ワークフローを展開し、Data Integration を右クリックして、Create Node > Batch Synchronization を選択します。

    • 方法 2: ワークフロー名をダブルクリックし、Data Integration フォルダーから Batch Synchronization ノードを右側のワークフローキャンバスにドラッグします。

  4. 画面の指示に従ってバッチ同期ノードを作成します。

ステップ 2:データソースとランタイムリソースの設定

この例では、[ソース]mysql という名前の MySQL データソースで、[ターゲット]own_mc という名前の MaxCompute データソースです。[リソースグループ]dwGroup に設定され、[CU]0.5 CU に設定されています。

  1. Source InformationDestination セクションで、読み取り元と書き込み先のデータソースを選択します。

  2. Runtime Resource セクションで、同期タスクの Resource Group を選択し、Resource Group からタスクに CU を割り当てます。メモリ不足 (OOM) エラーで同期タスクが失敗した場合は、リソースグループの CU 割り当てを調整してください。推奨されるリソースクォータの設定については、「リソースグループのパフォーマンスメトリクス - Data Integration」をご参照ください。

  3. ソースとターゲットの両方のデータソースが Connectivity Check に合格することを確認します。データソースがテストに失敗した場合は、画面の指示に従うか、「ネットワーク接続の設定」を参照してネットワーク接続を確立してください。

説明
  • 作成したリソースグループが表示されない場合は、ワークスペースにバインドされていることを確認してください。詳細については、「サーバーレスリソースグループの使用」をご参照ください。

ステップ 3:同期ソリューションの設定

[ソース] と [ターゲット] セクションで、読み取り元と書き込み先のテーブルを設定し、同期するデータの範囲を指定します。

重要

プラグインの設定はさまざまです。次のセクションでは、一般的な設定について説明します。サポートされている設定とその実装の詳細については、特定のプラグインのドキュメントをご参照ください。詳細については、「データソースリスト」をご参照ください。

1. ソース

ソースセクションで、ソーステーブルを設定し、プロンプトに従って必要なパラメーターを指定します。

アクション

説明

データフィルタリング

  • 一部のソースタイプはデータフィルタリングをサポートしています。条件 (WHERE キーワードなしの WHERE 句) を指定して、ソースデータをフィルタリングできます。タスク実行時、この条件を満たすデータのみが同期されます。詳細については、「シナリオ:増分データのバッチ同期タスクの設定」をご参照ください。

  • データフィルターは WHERE 句の式のみをサポートします。SELECTJOIN のような完全な SQL ステートメントはサポートしていません。同期中に UDF の呼び出し、複数テーブルの結合、インメモリ SQL 変換などの複雑な SQL クエリを実行する必要がある場合、コードレス UI はこれらの操作を直接サポートしていません。多段階のアプローチを推奨します。まず、MaxCompute SQL または PyODPS ノードを使用して複雑なロジックとデータクレンジングを行い、結果を一時テーブルに書き込みます。次に、Data Integration ノードを使用して一時テーブルから読み取り、最終的なターゲットに同期します。

  • 増分同期を実装するには、このフィルター条件とスケジューリングパラメーターを組み合わせて動的に設定できます。たとえば、gmt_create >= '${bizdate}' を使用すると、タスクは実行されるたびに現在の日に追加されたデータのみを同期します。スケジューリングプロパティを設定する際には、ここで定義された変数に値を割り当てる必要もあります。詳細については、「スケジューリングパラメーターのサポート形式」をご参照ください。

増分同期を設定する方法は、データソース (プラグイン) によって異なります。

データフィルターを設定しない場合、デフォルトでテーブル全体が同期されます。

リレーショナルデータベースのシャーディングキー

同期のためにデータを分割するために使用されるソースフィールドです。タスクが実行されると、このフィールドに基づいてデータを複数のサブタスクに分割し、並列でバッチ読み取りを実行します。

プライマリキーは通常均等に分散されているため、シャーディングキー (splitPk) としてテーブルのプライマリキーを使用することを推奨します。これにより、シャード間のデータスキューを防ぐことができます。

現在、シャーディングキーは整数データ型のみの分割をサポートしています。文字列、浮動小数点、日付などの型はサポートされていません。サポートされていない型のフィールドを指定した場合、DataWorks はシャーディングキーを無視し、単一チャネルで同期します。

シャーディングキーを指定しない場合、または値が空の場合は、単一チャネルでテーブル内のデータが同期されます。

すべてのプラグインがシャーディングキーの設定をサポートしているわけではありません。上記の情報は参考用です。詳細については、特定のプラグインのドキュメントをご参照ください。詳細については、「データソースリスト」をご参照ください。

2. データ処理

重要

データ処理機能は Data Studio の新バージョンで利用できます。旧バージョンでこの機能を使用するには、タスクを作成する際に [新バージョンを使用する (データ処理機能付き)] を選択する必要があります。すべての機能にアクセスするために、ワークスペースを最新バージョンにアップグレードすることを推奨します。詳細については、「Data Studio アップグレードガイド」をご参照ください。

データ処理機能を使用すると、文字列置換、AI 支援処理、データ埋め込みなどの方法でソースデータを変換してから、ターゲットテーブルに書き込むことができます。

たとえば、[文字列置換] タスクを設定するには、[名前][説明] を設定する必要があります。置換ルールで、[フィールド名] を選択し、[置換されるコンテンツ] ([正規表現マッチング][大文字と小文字を区別するマッチング] をサポート) を入力し、[置換コンテンツ] を指定します。[ルールを追加] をクリックして複数の置換ルールを追加し、右上隅の [データ出力プレビュー] を使用して結果を表示できます。

  1. スイッチをクリックしてデータ処理を有効にします。

  2. Data Processing List で、Add Node をクリックし、処理タイプ (Replace String[AI 支援処理]、または [データ埋め込み]) を選択します。複数のデータ処理ノードを追加でき、DataWorks はそれらを順次処理します。

  3. プロンプトに従ってデータ処理ルールを設定します。AI 支援処理とデータ埋め込みの詳細については、「データ処理」をご参照ください。

    説明

    データ処理は追加のコンピューティングリソースを消費し、タスクの実行時間が増加します。同期効率を維持するために、処理ロジックはできるだけシンプルにしてください。

3. ターゲット

ターゲットセクションで、ターゲットテーブルを設定し、プロンプトに従って必要なパラメーターを指定します。

アクション

説明

同期前後のステートメント

一部のデータソースでは、データが書き込まれる前 (同期前) とデータが書き込まれた後 (同期後) に、ターゲットで SQL ステートメントを実行できます。

例:MySQL Writer は preSql および postSql パラメーターをサポートしており、MySQL へのデータ書き込み前後で MySQL コマンドを実行できます。たとえば、Statement Run Before Writing (preSql) パラメーターを truncate table tablename に設定できます。これにより、新しいデータが書き込まれる前に、ターゲットテーブルから既存のデータがクリアされます。

競合時の書き込みモード

パスやプライマリキーの競合など、書き込み競合の処理方法を指定します。利用可能なオプションは、ターゲットデータソースとライタープラグインによって異なります。詳細については、特定のライタープラグインのドキュメントをご参照ください。

MaxCompute パーティションテーブルに関する注意事項

ターゲットが MaxCompute パーティションテーブルである場合、次の点に注意してください。

  • パーティションフィールドの検出:DataWorks は、ターゲットの MaxCompute テーブルのパーティション構造を自動的に検出します。UI にパーティションフィールドの一部しか表示されない場合は、開発環境と本番環境の両方で、すべてのパーティションフィールドが正しく定義されているかを確認してください。タスクがテーブルパーティション情報の設定を求めるエラーで失敗した場合は、ターゲット設定でパーティションパラメーターを入力してください。

  • フィールドマッピングの更新:ソースまたはターゲットに追加された新しいフィールドがフィールドマッピングエリアに表示されない場合は、次の方法でキャッシュを更新してみてください。

    1. テーブル構造が開発環境と本番環境の間で同期されていることを確認します。

    2. 設定ページで、別のテーブルに切り替えてから元のテーブルに戻してキャッシュを更新します。

    3. フィールドがまだ更新されない場合は、ブラウザを再起動するか、シークレットモードを使用して設定ページを再度開きます。

4. フィールドマッピングの設定

ソースとターゲットを選択した後、ソース列とターゲット列間のマッピングを指定する必要があります。タスクは、このマッピングに基づいて各ソースフィールドから対応するターゲットフィールドにデータを書き込みます。

  • どのターゲットフィールドにもマッピングされていないソースフィールドは同期されません。

  • 自動マッピングが期待通りでない場合は、手動で調整する必要があります。

  • フィールドを同期から除外するには、ソースフィールドとターゲットフィールド間の接続線を手動で削除できます。

ソースフィールドとターゲットフィールドの型が一致しない場合、書き込みの失敗やダーティデータの発生につながる可能性があります。Advanced configuration セクションでダーティデータの許容範囲を設定できます。これについては次のステップで説明します。

名前によるマッピング、行によるマッピング、インテリジェントマッピング、またはルールベースのマッピングを使用してフィールドをマッピングできます。また、次のアクションも実行できます。

  • [インテリジェントマッピング]:効率を向上させ、エラーを減らすために、Data Integration はインテリジェントフィールドマッピングをサポートしています。この AI 搭載機能は、フィールド名、データ型、コメントを分析して、ワンクリックで最適なマッピングを推奨します。推奨事項を確認するか、わずかな調整を行うだけで済みます。

    フィールドマッピングエリアで、[インテリジェントマッピング] をクリックしてダイアログボックスを開きます。マッピング要件を自然言語で記述できます。

    • ユースケース

      典型例

      推奨プロンプト

      グローバルなセマンティックマッチング

      フィールド名は完全に異なるが、意味は同じ
      (例:user_id ↔ device_id)

      ソーステーブルとターゲットテーブルのすべてのフィールドをセマンティックマッチングし、同じ意味を持つフィールドを自動的に識別します。

      特定のビジネスドメインのマッチング

      特定のカテゴリのビジネスフィールドのみをマッピングする。
      (例:「ユーザー」または「注文」フィールドのみ)

      ソーステーブルの「ユーザー情報」(名前、携帯電話番号、ID など) を含むフィールドのみを、ターゲットテーブルの対応するフィールドにマッピングします。
      (注:キーワードは「注文」、「物流」、「支払い」などの他の用語に置き換えることができます。)

      プレフィックス/サフィックスのバリエーション

      同じコア名で、プレフィックスまたはサフィックスが異なる
      (例:src_user_name ↔ tgt_user_name)

      フィールドのプレフィックスとサフィックスの違いを無視し、コア名のみに基づいてセマンティックマッチングを実行します。

      略語とフルネームのマッチング

      片方が略語を使用し、もう片方がフルネームを使用している。
      (例:amt ↔ amount)

      一般的な英語の略語とそれに対応するフルネーム (amt=amount, addr=address など) を識別し、マッピングを作成します。

      特定のフィールドの除外

      一部の類似したフィールドを同期する必要はありません。
      (例:create_time フィールド)

      セマンティックマッチングを実行しますが、「time」または「log」という文字列を含むすべてのフィールドを除外します。

      複雑なロジックの修正

      自動マッチングの結果が正しくなく、手動でのガイダンスが必要。

      ソーステーブルの id フィールドをターゲットテーブルの order_id にマッピングしないでください。マッピングの提案を再生成してください。

    • プロンプトを入力した後、[プレビューを生成] をクリックします。[マッチング結果のプレビュー] エリアに、推奨されるマッピングが表示されます。目的のマッピングを確認して選択し、[適用] をクリックしてフィールドマッピング設定に適用できます。結果に満足できない場合は、プロンプトを調整して新しいプレビューを生成できます。

  • [ルールベースのマッピング]:ソースとターゲットのフィールド名に一貫した差異パターンがある場合は、[ルールベースのマッピング] を使用して、プレフィックス/サフィックスのマッチングや文字置換などのルールを設定することで、マッピングを一括作成します。フィールドマッピングエリアで、[ルールベースのマッピング] をクリックし、マッピング方法とルールを選択し、設定を行い、結果をプレビューして [適用] をクリックします。

  • [ターゲットフィールドに値を割り当てる]Source Table Field 列で、Add Fields をクリックして、定数、スケジューリングパラメーター、または組み込み変数をターゲットテーブルに追加します。例:'123'、'${scheduling_parameter}'、または '#{built_in_variable}#'。

    説明

    スケジューリングパラメーターの使用方法の詳細については、「スケジューリングパラメーターのサポート形式」をご参照ください。

  • [組み込み変数を追加する]:組み込み変数を手動で追加し、ターゲットフィールドにマッピングして、変数をダウンストリームコンポーネントに出力できます。

    次の表に、さまざまなプラグインで利用可能な組み込み変数を示します。

    組み込み変数

    説明

    サポートされているプラグイン

    '#{DATASOURCE_NAME_SRC}#'

    ソースデータソースの名前

    • MySQL Reader

    • MySQL (シャーディング) Reader

    • PolarDB Reader

    • PolarDB (シャーディング) Reader

    • PostgreSQL Reader

    • PolarDB-O Reader

    • PolarDB-O (シャーディング) Reader

    '#{DB_NAME_SRC}#'

    ソーステーブルが属するデータベースの名前

    • MySQL Reader

    • MySQL (シャーディング) Reader

    • PolarDB Reader

    • PolarDB (シャーディング) Reader

    • PostgreSQL Reader

    • PolarDB-O Reader

    • PolarDB-O (シャーディング) Reader

    '#{SCHEMA_NAME_SRC}#'

    ソーステーブルが属するスキーマの名前

    • PolarDB Reader

    • PolarDB (シャーディング) Reader

    • PostgreSQL Reader

    • PolarDB-O Reader

    • PolarDB-O (シャーディング) Reader

    '#{TABLE_NAME_SRC}#'

    ソーステーブルの名前

    • MySQL Reader

    • MySQL (シャーディング) Reader

    • PolarDB Reader

    • PolarDB (シャーディング) Reader

    • PostgreSQL Reader

    • PolarDB-O Reader

    • PolarDB-O (シャーディング) Reader

    '#{FILE_NAME_SRC}#'

    ファイル名

    • OSS Reader

    • HDFS Reader

    • FTP Reader

    • TOS Reader

    • COS Reader

    • S3 Reader

    • Azure Blob Reader

    '#{FILE_PATH_SRC}#'

    絶対ファイルパス

    • OSS Reader

    • HDFS Reader

    • FTP Reader

    • TOS Reader

    • COS Reader

    • S3 Reader

    • Azure Blob Reader

  • [ソースフィールドの編集]Manually Edit Mapping をクリックして、次のアクションを実行します。

    • ソースデータベースでサポートされている関数を適用してフィールドを処理します。たとえば、Max(id) を使用して最大値のみを同期します。

    • 自動マッピング中にすべてのフィールドが取得されなかった場合は、ソースフィールドを手動で編集します。

    説明

    MaxCompute Reader は関数をサポートしていません。

ステップ 4:詳細設定

重要

以前のバージョンのデータ同期における Channel 機能は、現在、詳細設定として知られています。

詳細設定を使用して、データ同期プロセスのプロパティを制御できます。パラメーターの詳細については、「バッチ同期の同時実行数とスロットリングの関係」をご参照ください。

パラメーター

説明

[Expected Maximum Concurrency]

ソースからの同時読み取り、またはターゲットへの同時書き込みに使用されるスレッドの最大数を指定します。

説明
  • リソースの仕様やその他の要因により、実際の実行時の同時実行数は設定値以下になる場合があります。デバッグ時のリソースグループ料金は、この実際の同時実行数に基づきます。詳細については、「パフォーマンスメトリクス」をご参照ください。

  • タスクのスケジューリング料金は、設定された同時実行数ではなく、単一テーブルバッチ同期タスクの数に関連します。

[Sync Rate]

データ転送速度を制御します。

  • スロットリング:同期レートを制御して、高い抽出速度によるソースデータベースへの過剰な負荷を防ぎます。最小速度制限は 1 MB/s です。

  • スロットリングなし:タスクは、設定された同時実行数の制限内で、ハードウェアがサポートする最大転送レートで実行されます。

説明

トラフィックメトリクスは、Data Integration 自体内のスループットを反映するものであり、実際のネットワークインターフェイスカード (NIC) のトラフィックではありません。NIC のトラフィックは、データストレージシステムの転送プロトコルのシリアル化オーバーヘッドに応じて、通常、チャネルトラフィックの 1 〜 2 倍になります。

[Policy for Dirty Data Records]

ダーティデータとは、型の競合や制約違反などの例外により、タスクがターゲットに書き込めなかったレコードのことです。許容しきい値を設定し、タスクの動作を決定するポリシーを定義できます。

  • デフォルトでは、ダーティデータは許可されており、タスクの実行に影響しません。

  • しきい値を 0 に設定した場合、ダーティデータは許可されません。同期中にダーティデータが生成されると、タスクは失敗します。

  • ダーティデータを許可し、しきい値を設定した場合:

    • ダーティデータレコードの数がしきい値内であれば、タスクはこれらのレコードを無視し (ターゲットには書き込まれません)、実行を継続します。

    • ダーティデータレコードの数がしきい値を超えると、タスクは失敗します。

重要

ダーティデータが過剰に発生すると、全体の同期速度が低下する可能性があります。

[Distributed Execution]

タスクを分散モードで実行するかどうかを制御します。

  • 有効:分散実行は、タスクを複数の並列プロセスに分割し、単一プロセスのボトルネックを解消して同期効率を向上させます。

  • 無効:タスクは単一プロセスとして実行されます。

高いパフォーマンス要件がある場合は、分散モードを使用できます。このモードは、断片化されたマシンリソースを効率的に使用することもできます。

重要
  • 分散実行は、同時実行数が 8 以上に設定されている場合にのみ有効にできます。

  • 分散実行を有効にすると、より多くのリソースを消費します。実行時にメモリ不足 (OOM) エラーが発生した場合は、この設定を無効にしてみてください。

[Time Zone]

タイムゾーンをまたいでデータを同期する必要がある場合は、ソースのタイムゾーンを設定してタイムゾーン変換を実行できます。

説明

全体の同期速度は、上記の設定だけでなく、ソースデータソースのパフォーマンスやネットワーク状況などの要因にも影響されます。同期速度とチューニングの詳細については、「バッチ同期タスクの高速化またはレート制限」をご参照ください。

ステップ 5:スケジューリングプロパティの設定

定期的にスケジュールされる単一テーブルバッチ同期タスクについては、その自動スケジューリングプロパティを設定する必要があります。ノード設定ページで、右側のパネルにある Scheduling Settings をクリックして、ノードのスケジューリングプロパティを設定します。

同期タスクに対して、スケジューリングパラメーター、スケジューリングポリシー、スケジュール時刻、スケジューリング依存関係を設定する必要があります。設定方法は他の Data Studio ノードと同じであるため、ここでは繰り返しません。

スケジューリングパラメーターの使用方法については、「Data Integration でのスケジューリングパラメーター使用の典型的なシナリオ」をご参照ください。

ステップ 6:タスクのテストと発行

  • 実行パラメーターを設定します。

    タスク設定ページで、右側のパネルにある Run Configuration をクリックし、テスト実行用に次のパラメーターを設定します。

    パラメーター

    説明

    [Resource Group]

    データソースに接続されているリソースグループを選択します。

    [Script Parameters]

    データ同期タスクのプレースホルダーパラメーターに値を割り当てます。たとえば、タスクが ${bizdate} パラメーターを使用する場合、yyyymmdd 形式で日付パラメーターを入力します。

  • タスクを実行します。

    ツールバーの image 実行アイコンをクリックして、Data Studio でタスクを実行し、デバッグします。タスクが完了したら、ターゲットデータソースと同じ種類のノードを作成してテーブルデータをクエリし、結果を検証できます。

  • タスクを発行します。

    テスト実行が成功した後、タスクをスケジュールで実行する必要がある場合は、ノード設定ページの上部にある image アイコンをクリックして、タスクを本番環境に発行します。タスクの発行の詳細については、「タスクの発行」をご参照ください。

制限事項

  • 単一テーブルバッチ同期タスクは Data Studio でのみ設定できます。

  • コードレス UI は、一部のデータソースの単一テーブルバッチ同期をサポートしていません。

    選択したデータソースでコードレス UI がサポートされていないことを示すメッセージが表示された場合は、ツールバーの image.png アイコンをクリックしてスクリプトモードに切り替え、タスクの設定を続行してください。詳細については、「スクリプトモードでのタスク設定」をご参照ください。

  • コードレス UI は習得が容易ですが、一部の高度な機能をサポートしていません。より詳細な設定管理が必要な場合は、ツールバーのスクリプト変換アイコンをクリックしてスクリプトモードに切り替えることができます。

  • コードレス UI の単一テーブルバッチ同期タスクは、単一テーブルまたは一部のシャーディングされたデータベースとテーブル (シャーディングは特定のデータソースタイプでのみサポートされており、一貫したテーブル構造が必要です) の同期のみをサポートします。テーブル構造とデータの両方のバッチ同期を含む、完全なデータベース同期はサポートしていません。完全なデータベース同期については、「完全なデータベースのバッチ同期タスク」をご参照ください。

  • バッチ同期タスクをリアルタイム同期タスクに直接変換することはできません。データをリアルタイムで同期するには、新しい単一テーブルリアルタイム同期タスクノードを作成する必要があります。

  • 発行時に「node name too long」というエラーが表示された場合は、発行ページの高度な設定セクションでノード名を変更してください。名前が 128 文字を超えないようにしてください。

次のステップ

タスクを本番環境に発行した後、オペレーションセンターに移動してスケジュールされたタスクを表示します。タスクの実行と管理、ステータスの監視、リソースグループのメンテナンスの詳細については、「バッチ同期タスクの操作」をご参照ください。

関連ドキュメント