すべてのプロダクト
Search
ドキュメントセンター

DataWorks:バッチ同期に関するよくある質問

最終更新日:Jun 24, 2026

このトピックでは、バッチ同期に関する一般的な質問について説明します。

概要

キーワードを照合することで、一般的な問題とその解決策を見つけることができます。

カテゴリ

キーワード

関連トピック

バッチ同期タスクの一般的な運用保守の問題

ネットワーク接続

データソースの接続テストは成功するのに、バッチ同期タスクがデータソース接続エラーで失敗するのはなぜですか?

リソースグループの切り替え

バッチ同期タスクの実行リソースグループを切り替えるにはどうすればよいですか?

ダーティデータ

実行タイムアウト

実行に時間がかかりすぎるバッチ同期タスクのトラブルシューティング方法

インデックス未設定の WHERE 句による全表スキャンでデータ同期タスクが遅延する場合の対処法

デフォルト値の保持

Data Integration は、送信先テーブルを作成する際に、デフォルト値や NULL 非許容制約などのプロパティを保持しますか?

シャードキー

バッチ同期タスクのシャードキーを設定する際、複合主キーをシャードキーとして使用できますか?

データ損失

同期完了後、送信先テーブルのデータがソーステーブルのデータと一致しない場合の対処法

プラグイン以外のエラーの原因と解決策

ダーティデータ

エンコード形式の設定や文字化けによって引き起こされるダーティデータエラーの対処法

SSRF 攻撃

タスクが「Task have SSRF attacks」というエラーを報告し、サーバーサイドリクエストフォージェリ (SSRF) 攻撃の可能性を示している場合の対処法

ネットワーク接続

バッチ同期タスクが断続的に失敗する場合の対処法

テーブル/列名のキーワード

テーブル名または列名にキーワードが含まれているためにタスクが失敗する場合の対処法

列の追加

バッチ同期タスクのソーステーブルで列が追加または変更された場合の対処法

日付/時刻データの書き込み

日付/時刻データをテキストファイルに書き込む際に、ミリ秒を保持したり、カスタムの日付/時刻形式を指定したりするにはどうすればよいですか?

特定のプラグインエラーの原因と解決策

MongoDB

OSS

一度に OSS から読み取れるファイル数に制限はありますか?

DataHub

1 回のバッチで書き込まれるデータ量が上限を超えたために DataHub への書き込み操作が失敗した場合の対処法

Lindorm

Lindorm のバルクモードを使用してデータを書き込む場合、既存のデータは毎回置き換えられますか?

Elasticsearch

Elasticsearch インデックス内のすべてのフィールドをクエリするにはどうすればよいですか?

Tablestore Writer の設定

自動採番主キー列を含む送信先テーブルにデータを書き込むように Tablestore Writer を設定するにはどうすればよいですか?

時系列モデルの設定

時系列モデルの設定において、_tag フィールドと is_timeseries_tag フィールドは何を意味しますか?

バッチ同期のシナリオと解決策

カスタムテーブル名

バッチ同期タスクにカスタムテーブル名を指定するにはどうすればよいですか?

MaxCompute

タスク設定

バッチ同期ノードを設定する際に、すべてのテーブルが表示されない場合の対処法

LogHub

Kafka

OSS

MySQL

TTL の変更

ALTER 文のみを使用して同期されたテーブルの生存時間 (TTL) を変更できますか?

関数集約

API を使用してデータを同期する際に、ソース側の関数 (例:MaxCompute 関数) を集約に使用できますか?たとえば、ソーステーブルの 2 つの列 a と b を Lindorm の主キーとして使用できますか?

Elasticsearch

フィールドマッピング

[データプレビュー] をクリックしたときに、非構造化データソースのフィールドをマッピングできない場合の対処法

エラーメッセージと解決策

リソース設定

OSS

OSS からデータを読み取る際に「AccessDenied: The bucket you access does not belong to you.」というエラーが報告されます。

Redis

ハッシュストレージモードを使用して Redis にデータを書き込む際に「Code:[RedisWriter-04], Description:[Dirty data]. - source column number is in valid!」というエラーが報告されます。

PostgreSQL

PostgreSQL からデータを読み取る際に「org.postgresql.util.PSQLException: FATAL: terminating connection due to conflict with recovery」というエラーが報告されます。

MySQL

インスタンス実行の競合

バッチ同期タスクが失敗し、エラー「Duplicate entry 'xxx' for key 'uk_uk_op'」が報告されます。

ネットワーク接続

MySQL データソースを使用したバッチ同期タスクが「Communications link failure」タイムアウトエラーで失敗します。

フィールドマッピング

バッチ同期タスクが失敗し、エラー「plugin xx does not specify column」が報告されます。

MaxCompute

RESTful API

RESTful API Writer がエラー「The JSON string found by path:[] is not an array.」を報告します。

RDS

Amazon RDS ソースを使用したバッチ同期タスクがエラー「Host is blocked」で失敗します。

MongoDB

Elasticsearch

Hive

ローカルの Hive インスタンスにデータを同期する際に「Could not get block locations.」というエラーが報告されます。

実行タイムアウト

MongoDB ソースを使用したバッチ同期タスクが失敗し、エラー「MongoDBReader$Task - operation exceeded time limitcom.mongodb.MongoExecutionTimeoutException: operation exceeded time limit.」が報告されます。

ネットワーク接続の問題

タスク接続の失敗

  • 以前にネットワーク接続テストが成功していた場合は、再度テストを実行してください。リソースグループがまだデータベースに接続でき、データベースの設定が変更されていないことを確認します。

  • タスクがネットワーク接続テストに合格したのと同じリソースグループを使用していることを確認してください。

    タスクが使用するリソースグループを確認するには:

    • タスクがデフォルトのリソースグループで実行されている場合、ログには次の情報が含まれます:running in Pipeline[basecommon_ group_xxxxxxxxx]

    • タスクがデータ統合専用リソースグループで実行されている場合、ログには次の情報が含まれます:running in Pipeline[basecommon_S_res_group_xxx]

    • タスクが Serverless リソースグループで実行されている場合、ログには次の情報が含まれます:running in Pipeline[basecommon_Serverless_res_group_xxx]

  • 定期タスクが早朝に断続的に失敗し、再実行すると成功する場合は、エラーが発生した時点のデータベースの負荷を確認してください。

タスクの断続的な失敗

IP アドレスホワイトリストの設定が不完全な場合、バッチ同期タスクが断続的に失敗することがあります。データベースの IP アドレスホワイトリストが完全に設定されていることを確認してください。

データ統合専用リソースグループを使用する場合:

  • データ統合専用リソースグループの Elastic Network Interface (ENI) の IP アドレスをデータソースの IP アドレスホワイトリストに追加した場合、リソースグループをスケールアウトする際に、新しい ENI の IP アドレスでホワイトリストを更新する必要があります。

  • スケールアウト後にホワイトリストを更新する手間を省くために、データ統合専用リソースグループの vSwitch の CIDR ブロック全体をデータベースの IP アドレスホワイトリストに追加することを推奨します。詳細については、「IP アドレスホワイトリストの設定」をご参照ください。

Serverless リソースグループを使用する場合は、「IP アドレスホワイトリストの設定」を参照してホワイトリストの設定を確認し、ネットワーク設定が正しいことを確認してください。

IP アドレスホワイトリストが正しく設定されている場合は、データベースの負荷が高いために接続が中断されていないか確認してください。

リソース設定の問題

[TASK_MAX_SLOT_EXCEED]:Unable to find a gateway that meets resource requirements. 20 slots are requested, but the maximum is 16 slots.

  • 原因:

    指定された並列スレッド数が利用可能なリソースを超えています。

  • 解決策:

    バッチ同期タスクの並列スレッド数を減らします。

    • コードレス UI でバッチ同期タスクを設定する場合は、チャネルコントロールポリシーセクションで Expected Maximum Concurrency パラメーターを減らしてください。 詳細については、「コードレス UI を使用してタスクを設定する」をご参照ください。

    • スクリプトモードでバッチ同期タスクを設定する場合、スクリプトのチャネル制御ポリシーセクションの concurrent パラメーターを減らします。詳細については、「スクリプトモードでのタスクの設定」をご参照ください。

OutOfMemoryError: Java heap space

この問題を解決するには、次の 1 つ以上の操作を実行します。

  1. リーダーまたはライターの設定に batchsize または maxfilesize パラメーターが含まれている場合は、その値を減らします。

    リーダーまたはライターがこれらのパラメーターをサポートしているか確認するには、「サポートされているデータソース、リーダー、ライター」を参照し、特定のリーダーまたはライターをクリックしてそのパラメーターを表示します。

  2. 並列スレッド数を減らします。

    • コードレス UI でバッチ同期タスクを設定する場合は、チャンネルコントロールポリシーセクションの Expected Maximum Concurrency パラメーターを小さくしてください。 詳細については、「コードレス UI を使用してタスクを設定する」をご参照ください。

    • スクリプトモードでバッチ同期タスクを設定する場合、スクリプトのチャネル制御ポリシーセクションの concurrent パラメーターを減らします。詳細については、「スクリプトモードでのタスクの設定」をご参照ください。

  3. Object Storage Service (OSS) オブジェクトなどのファイルを同期している場合は、各タスクで読み取るファイル数を減らします。

  4. タスク構成の Running Resources セクションで、Resource Usage (CU) を増やします。他の実行中のタスクに影響を与えないように、CU 値を慎重に設定してください。

インスタンスの同時実行の競合

バッチタスクエラー:Duplicate entry 'xxx' for key 'uk_uk_op'

  • 症状:次のエラーメッセージが返されます:Error updating database. Cause: com.mysql.jdbc.exceptions.jdbc4.MySQLIntegrityConstraintViolationException: Duplicate entry 'cfc68cd0048101467588e97e83ffd7a8-0' for key 'uk_uk_op'

  • 原因:Data Integration は、同じノードの複数のインスタンスが同時に実行されることを許可しません。つまり、同じ JSON 設定を持つ複数の同期タスクインスタンスを同時に実行することはできません。たとえば、5 分ごとに実行するようにスケジュールされた同期タスクで、上流の遅延により、00:00 にスケジュールされたインスタンスと 00:05 にスケジュールされたインスタンスの両方が 00:05 に開始される場合があります。この競合により、いずれかのインスタンスが開始できなくなります。この問題は、インスタンスのデータを補填したり、実行中に再実行したりする場合にも発生する可能性があります。

  • 解決策:インスタンスの開始時刻をずらします。時間単位または分単位のタスクの場合は、自己依存を設定して、前のサイクルのインスタンスが完了した後にのみインスタンスが開始されるようにします。DataWorks (クラシック) では、「自己依存」をご参照ください。DataWorks (新規) では、「クロスサイクル依存」をご参照ください。

タイムアウトエラー

MongoDB ソースのタイムアウトエラー

  • 症状:バッチ同期タスクが次のエラーメッセージで失敗します:MongoDBReader$Task - operation exceeded time limitcom.mongodb.MongoExecutionTimeoutException: operation exceeded time limit

  • 原因:フルロード中にタスクが過剰な量のデータをプルします。

  • 解決策:

    • 同時実行数を増やします。

    • BatchSize の値を減らします。

    • cursorTimeoutInMs パラメーターをリーダー設定に追加し、3,600,000 ms などのより大きな値を設定します。

MySQL をデータソースとして使用するバッチ同期タスクが接続タイムアウトエラーを報告します:Communications link failure

  • 読み取りエラー

    • 症状:

      データを読み取ると、次のエラーが報告されます:Communications link failure The last packet successfully received from the server was 7,200,100 milliseconds ago. The last packet sent successfully to the server was 7,200,100 milliseconds ago. - com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

    • 原因:

      データベースでの SQL クエリの実行が遅いため、MySQL の読み取りタイムアウトが発生します。

    • 解決策:

      • WHERE 句が設定されているか確認し、フィルタリングされた列にインデックスが付けられていることを確認します。

      • ソーステーブルに大量のデータが含まれているか確認します。その場合は、タスクを複数の小さなタスクに分割することを検討してください。

      • ログを調べてボトルネックの原因となっている SQL ステートメントを特定し、データベース管理者に相談して問題を解決します。

  • 書き込みエラー

    • 症状:

      データを書き込む際、タスクは次のエラーで失敗します:Caused by: java.util.concurrent.ExecutionException: ERR-CODE: [TDDL-4614][ERR_EXECUTE_ON_MYSQL] Error occurs when execute on GROUP 'xxx' ATOM 'dockerxxxxx_xxxx_trace_shard_xxxx': Communications link failure The last packet successfully received from the server was 12,672 milliseconds ago. The last packet sent successfully to the server was 12,013 milliseconds ago. More...

    • 原因:

      スロークエリによりソケットタイムアウトが発生します。TDDL 接続のデフォルトの socketTimeout は 12 秒です。SQL ステートメントが MySQL サーバーで実行され、結果を返すのに 12 秒以上かかると、TDDL はエラー 4614 を報告します。このエラーは、データ量が多い場合やサーバーがビジー状態の場合に断続的に発生する可能性があります。

    • 解決策:

      • データベースのパフォーマンスが安定したら、同期タスクを再実行します。

      • データベース管理者に連絡して、タイムアウト設定を調整します。

長時間実行タスク

原因 1:実行時間が長い

  • preSqlpostSql などの実行前 SQL 文または実行後 SQL 文が長時間実行されると、タスクが遅くなる可能性があります。

  • シャードキーが不適切に設定されていると、タスクが遅くなる可能性があります。

    バッチ同期タスクは、シャードキー (splitPk) を使用してデータをパーティション分割します。その後、Data Integration はこの設定に基づいて同時サブタスクを開始し、同期効率を向上させます。プラグインのドキュメントを参照して、シャードキーが必要かどうかを判断してください。

解決策 1:

  • 実行前または実行後の SQL 文を設定する場合は、データフィルタリングにインデックス付きの列を使用します。

  • プラグインがシャードキーをサポートしている場合は、適切に設定します。たとえば、MySQL リーダーのシャードキーを設定する場合は、次の点を考慮してください。

    • テーブルの主キーを splitPk として使用します。主キーは通常、均一な分布を持つため、データホットスポットを防ぐのに役立ちます。

    • 現在、splitPk はシャーディングに整数データ型のみをサポートしています。文字列、浮動小数点、日付型はサポートされていません。サポートされていないデータ型を指定すると、同期は単一のスレッドで実行されます。

    • splitPk を指定しないか、空のままにすると、タスクは単一のスレッドでデータを同期します。

原因 2:Data Integration ランタイムリソースの待機

解決策 2:ログに長時間の WAIT ステータスが表示される場合、データ統合専用リソースグループにはタスクを実行するための十分な同時スロットがありません。原因と解決策の詳細については、「Data Integration タスクが常に「待機」ステータスを表示するのはなぜですか?」をご参照ください。

説明

バッチ同期タスクは、スケジューリング用のリソースグループからデータ統合専用リソースグループにディスパッチされます。したがって、各タスクは 1 つのスケジューリングリソースを消費します。長時間実行されるタスクがリソースを解放しない場合、他のバッチ同期タスクだけでなく、他の種類の定期タスクもブロックする可能性があります。

インデックス未設定の WHERE 句

  • シナリオ例

    SQL ステートメントの例:

    SELECT bid,inviter,uid,createTime FROM `relatives` WHERE createTime>='2016-10-2300:00:00' AND reateTime<'2016-10-24 00:00:00';

    クエリは 2016-10-25 11:01:24.875 に開始され、2016-10-25 11:11:05.489 に結果を返し始めます。タスクはデータベースがクエリ結果を返すのを待つため、MaxCompute が処理を進める前に大幅な遅延が発生します。

  • 原因

    WHERE 句で使用されている createTime 列にインデックスが付けられていないため、全表スキャンが強制されます。

  • 解決策

    パフォーマンスを向上させるために、WHERE 句でインデックス付きの列を使用します。必要に応じて、列にインデックスを追加します。

リソースグループの切り替え

バッチ同期用のリソースグループ

以前のバージョンのデータ開発の場合:

DataStudio では、バッチ同期タスクの詳細ページで、デバッグ用のリソースグループを変更できます。Operation Center では、スケジュール実行用の Data Integration リソースグループを変更できます。詳細については、「Data Integration リソースグループを切り替える」をご参照ください。

新しいバージョンのデータ開発の場合:

DataStudio では、Data Integration タスクのデバッグ用のリソースグループを変更できます。Operation Center では、定期実行用の Data Integration リソースグループを変更できます。詳細については、「リソースグループの運用保守」をご参照ください。

ダーティデータ

ダーティデータのトラブルシューティング

データレコードは、エラーのために送信先データソースへの書き込みに失敗した場合、ダーティデータと見なされます。

影響:ダーティデータは送信先に書き込まれません。デフォルトでは、Data Integration はダーティデータを許可します。この動作は、バッチ同期タスク設定で許可されるダーティデータレコードの最大数を設定することで制御できます。詳細については、「コードレス UI 設定」をご参照ください。

  • ダーティデータを許可する:タスクがダーティデータを許可するように設定されている場合、ダーティデータに遭遇しても実行を継続します。ただし、これらのレコードは破棄され、送信先には書き込まれません。

  • 許可されるダーティデータレコードの数を制御する:

    • しきい値を 0 に設定すると、最初のダーティデータレコードが見つかった時点でタスクは失敗し、即座に停止します。

    • しきい値を x に設定すると、ダーティデータレコードの数が x を超えた場合にタスクは失敗し、停止します。カウントが x を超えない場合、タスクは継続しますが、ダーティデータレコードは送信先に書き込まずに破棄します。

ダーティデータシナリオの分析:

  • シナリオ 1:

    • 症状:タスクは次のエラーメッセージを返します:{"message":"Dirty data was found when writing to the destination MaxCompute table: An error occurred in the data of the [3rd] field. Check the data and correct it, or you can increase the threshold to ignore this record.","record":[{"byteSize":0,"index":0,"type":"DATE"},{"byteSize":0,"index":1,"type":"DATE"},{"byteSize":1,"index":2,"rawData":0,"type":"LONG"},{"byteSize":0,"index":3,"type":"STRING"},{"byteSize":1,"index":4,"rawData":0,"type":"LONG"},{"byteSize":0,"index":5,"type":"STRING"},{"byteSize":0,"index":6,"type":"STRING"}]}

    • 解決策:ログは 3 番目のフィールドにエラーがあることを示しています。

      • ライターがこのダーティデータエラーを報告します。送信先でテーブル作成ステートメントを確認してください。このエラーは通常、送信先 MaxCompute テーブルに指定されたフィールドサイズが、ソース MySQL テーブルの対応するフィールドのデータサイズよりも小さい場合に発生します。

      • データ同期の原則:ソースデータは送信先に書き込み可能でなければなりません。つまり、データ型とフィールドサイズに互換性がある必要があります。たとえば、VARCHAR データを INT 列に書き込むことはできません。送信先データ型のサイズは、マッピングされたソースフィールドのデータを収容できる大きさでなければなりません。LONG、VARCHAR、DOUBLE などのソースデータ型の場合、STRING や TEXT などのより大きな範囲を持つ送信先データ型を使用できます。

      • ダーティデータのエラーメッセージが明確でない場合は、ログから完全なダーティデータレコードをコピーします。値を送信先の列の型と比較して、準拠していないフィールドを特定します。

      例:

      {"byteSize":28,"index":25,"rawData":"ohOM71vdGKqXOqtmtriUs5QqJsf4","type":"STRING"}

      byteSize:バイト数。index:25、26 番目のフィールドを示します。rawData:実際の値。type:データ型。

  • シナリオ 2:

    • 症状:DataX は、MySQL から NULL 値を読み取るときにダーティデータを報告します。

    • 解決策:ソースフィールドのデータ型が、マッピングされた送信先フィールドのデータ型と互換性があるかどうかを確認します。不一致があるとエラーが発生します。たとえば、STRING 型のソースフィールドから NULL 値を INT 型の送信先フィールドに書き込むと失敗します。

ダーティデータの表示

ダーティデータに関する情報を見つけるには、ログ詳細ページに移動し、[Detail log url] の横にあるリンクをクリックしてバッチ同期ログを表示します。查看日志

上限を超えた場合のデータ保持

タスクは実行中にダーティデータレコードをカウントします。このカウントが設定されたしきい値を超えると、タスクは直ちに停止します。

  • データ保持:送信先は、タスクが停止する前に正常に書き込まれたすべてのデータを保持します。システムはロールバックを実行しません。

  • ゼロトレランスポリシー:ダーティデータのしきい値を 0 に設定すると、最初のダーティデータレコードを検出した時点でタスクは失敗し、停止します。

エンコーディングと文字化け

  • 症状:

    データに絵文字が含まれている場合、同期中に次のようなダーティデータエラーが発生することがあります:[13350975-0-0-writer] ERROR StdoutPluginCollector - Dirty data {"exception":"Incorrect string value: '\\xF0\\x9F\\x98\\x82\\xE8\\xA2...' for column 'introduction' at row 1","record":[{"byteSize":8,"index":0,"rawData":9642,"type":"LONG"}],"type":"writer"}

  • 考えられる原因:

    • データベースのエンコーディングが utf8mb4 ではないため、絵文字を正しく同期できません。

    • ソースデータ自体に文字化けが含まれています。

    • データベースのエンコード形式がクライアントのエンコード形式と異なります。

    • ブラウザのエンコーディングがデータベースまたはクライアントのエンコーディングと異なるため、プレビューの失敗や文字化けが発生します。

  • 解決策:

    根本原因に基づいて解決策を選択します。

    • ソースデータに文字化けが含まれている場合は、まずソースデータを修正してからバッチ同期タスクを実行します。

    • データベースとクライアントが異なるエンコード形式を使用している場合は、同じ形式を使用するように設定します。

    • ブラウザのエンコーディングが異なる場合は、データをプレビューする前にデータベースとクライアントのエンコーディングに合わせて設定します。

    次の方法も試すことができます。

    1. JDBC URL を介して追加されたデータソースの場合、URL に utf8mb4 設定を追加します:jdbc:mysql://xxx.x.x.x:3306/database?com.mysql.jdbc.faultInjection.serverCharsetIndex=45

    2. Alibaba Cloud インスタンスモードで追加されたデータソースの場合、データベース名に次を追加します:database?com.mysql.jdbc.faultInjection.serverCharsetIndex=45

    3. データベースのエンコーディングを utf8mb4 に変更します。たとえば、ApsaraDB RDS コンソールで ApsaraDB RDS インスタンスのデータベースエンコーディングを変更します。

      説明

      ApsaraDB RDS データソースのエンコード形式を設定するには、set names utf8mb4 を実行します。ApsaraDB RDS データベースのエンコード形式を表示するには、show variables like 'char%' を実行します。

デフォルト値の保持

デフォルト値とプロパティ

DataWorks が送信先テーブルを作成するとき、ソーステーブルから保持されるのは列名、データ型、コメントのみです。デフォルト値や制約 (NULL 非許容制約やインデックスを含む) などの他のプロパティは保持されません。

シャードキー

複合主キー

いいえ、複合主キーはバッチ同期タスクのシャードキーとして使用できません。

データ損失

データ不整合

データ同期後にデータ品質の問題が見つかった場合は、「オフライン同期におけるデータ品質問題のトラブルシューティング」をご参照ください。

SSRF 攻撃

Task have SSRF attacks

Q:「Task have SSRF attacks」エラーを解決するにはどうすればよいですか?

原因:セキュリティのため、DataWorks はタスクがパブリック IP アドレスを使用して内部ネットワークアドレスにアクセスすることをブロックします。この制御は、HTTP Reader などのプラグイン設定の URL がプライベート IP アドレスまたは VPC ドメイン名を指している場合にトリガーされます。

推奨されるアプローチ:

解決策:内部データソースでタスクを実行するには、パブリックリソースグループの代わりに Serverless リソースグループ (推奨) または データ統合専用リソースグループ を使用します。

日付と時刻データの書き込み

カスタム日付形式

同期タスクをスクリプトモードに切り替え、タスク設定ページの common キーの下に次の設定を追加します。

"common": {
  "column": {
    "dateFormat": "yyyyMMdd",
    "datetimeFormatInNanos": "yyyyMMdd HH:mm:ss.SSS"
  }
}

image.png

パラメーター:

  • dateFormat:時、分、秒を含まないソース DATE 値のテキスト形式を指定します。

  • datetimeFormatInNanos:ソース DATETIME または TIMESTAMP 値のテキスト形式を指定します。この形式はミリ秒の精度をサポートします。

MaxCompute

ソースフィールドマッピング

  1. 定数を入力できます。値は 'abc''123' のように単一引用符で囲む必要があります。

  2. '${bizdate}' などのスケジューリングパラメーターを使用できます。詳細については、「スケジューリングパラメーターでサポートされている形式」をご参照ください。

  3. pt などの同期するパーティションキー列を入力できます。

  4. 入力した値が解析できない場合、その型は 'custom' と表示されます。

  5. MaxCompute 関数はサポートされていません。

  6. 手動で追加した列が「custom」として分類されても、タスクは期待どおりに実行できます。これは、MaxCompute パーティションキー列や、LogHub データプレビューに表示されない列で発生する可能性があります。

パーティションキー列の同期

フィールドマッピングリストのソースフィールド列で、Add または Create Field をクリックします。パーティションキー列の名前 (例: pt) を入力し、送信先フィールドにマップします。

image

複数のパーティションからのデータ同期

データを読み取るパーティションを指定します。

  • ODPS のパーティション設定は、Linux シェルのワイルドカードをサポートしています。アスタリスク (*) は 0 文字以上を表し、疑問符 (?) は 1 文字を表します。

  • デフォルトでは、指定されたパーティションが存在する必要があります。存在しない場合、タスクでエラーが報告されます。パーティションが存在しない場合でもタスクを成功させるには、 When partitions do not exist, オプションを「無視する」に設定します。または、スクリプトモードに切り替えて "successOnNoPartition": true を ODPS の Parameter 構成に追加します。

たとえば、test という名前のパーティションテーブルには、pt=1,ds=hangzhoupt=1,ds=shanghaipt=2,ds=hangzhoupt=2,ds=beijing の 4 つのパーティションが含まれています。次のように、異なるパーティションからデータを読み取るようにタスクを設定できます。

  • pt=1,ds=hangzhou パーティションからデータを読み取るには、"partition":"pt=1,ds=hangzhou" を指定します。

  • pt=1 パーティション内のすべての ds パーティションからデータを読み取るには、"partition":"pt=1,ds=*" を指定します。

  • test テーブル内のすべてのパーティションからデータを読み取るには、"partition":"pt=*,ds=*" を指定します。

パーティションをフィルタリングする条件を設定することもできます。次の操作にはスクリプトモードが必要です。

  • パーティションキーに基づいて最新のパーティションからデータを読み取るには、/*query*/ ds=(select MAX(ds) from DataXODPSReaderPPR) などの設定を追加できます。

  • 条件でパーティションをフィルタリングするには、/*query*/ pt+expression などの式を追加できます。たとえば、/*query*/ pt>=20170101 and pt<20170110 は、2017 年 1 月 1 日 (含む) から 2017 年 1 月 10 日 (含まない) までのすべての pt パーティションからデータを読み取ります。

説明

/*query*/ に続く内容は WHERE 句として扱われます。

列のフィルタリング、並べ替え、パディング

MaxCompute Writer を設定することで、列のフィルタリング、並べ替え、NULL 値のパディングなど、MaxCompute 自体がサポートしていない操作を実行できます。たとえば、すべてのフィールドをインポートするには、設定を "column": ["*"] に設定できます。

MaxCompute テーブルに abc の 3 つの列があり、列 cb にのみデータを同期したい場合は、列を "column": ["c","b"] として設定できます。この設定は、リーダーの最初の列と 2 番目の列を、それぞれ MaxCompute テーブルの列 cb にマッピングします。新しく挿入された行では、列 a は NULL に設定されます。

列設定エラー

データ損失を防ぐため、MaxCompute Writer は、送信先テーブルに含まれる列よりも多くの列を書き込もうとするとエラーを報告します。

パーティション設定

MaxCompute Writer は、最下位レベルのパーティションにのみデータを書き込むことができます。フィールド値に基づいてパーティションにデータをルーティングするなどの機能はサポートしていません。テーブルに 3 つのレベルのパーティションがある場合は、パーティション設定で 3 番目のレベルのパーティションへの完全なパスを指定する必要があります。たとえば、3 番目のレベルのパーティションにデータを書き込むには、pt=20150101, type=1, biz=2 を指定できます。pt=20150101, type=1pt=20150101 のような中間レベルのパーティションは指定できません。

タスクの再実行とフェイルオーバー

MaxCompute Writer は、"truncate": true 設定で書き込みのべき等性を保証します。失敗後のタスクの再実行時に、新しいデータをインポートする前に以前に書き込まれたデータをクリアし、一貫性を保証します。ただし、他の例外がタスクを中断した場合、原子性は保証されません。データはロールバックされず、タスクは自動的に再実行されません。データ整合性を確保するために、べき等性を利用してタスクを手動で再実行する必要があります。

説明

truncatetrue に設定すると、指定されたパーティションまたはテーブルのすべてのデータがクリアされます。この設定は注意して使用してください。

読み取りエラー:The download session is expired.

  • 症状

    Code:DATAX_R_ODPS_005:Failed to read ODPS data, Solution:[Please contact the ODPS administrator]. RequestId=202012091137444331f60b08cda1d9, ErrorCode=StatusConflict, ErrorMessage=The download session is expired.

  • 考えられる原因

    バッチ同期タスクは、MaxCompute Tunnel サービスを使用してデータをアップロードおよびダウンロードします。Tunnel セッションはサーバー上で 24 時間のライフサイクルを持ちます。バッチ同期タスクが 24 時間以上実行されると、セッションが期限切れになり、タスクは失敗します。Tunnel の詳細については、「使用上の注意」をご参照ください。

  • 解決策

    タスクの並列スレッド数を増やし、バッチ同期が 24 時間以内に完了するようにデータ量を計画します。

書き込みエラー:Error writing request body to server

  • 症状

    Code:[OdpsWriter-09], Description:[Failed to write data to the ODPS destination table.]. - Failed to write block:0 to the ODPS destination table, uploadId=[202012081517026537dc0b0160354b]. Please contact the ODPS administrator for assistance. - java.io.IOException: Error writing request body to server.

  • 考えられる原因

    • 不正なデータ型:ソースデータが ODPS データ型仕様に準拠していません。たとえば、値 4.2223 を ODPS の DECIMAL(18,10) 型の列に書き込むことはできません。

    • ODPS ブロックの問題または通信エラー。

  • 解決策

    ソースデータが送信先データ型仕様に準拠していることを確認します。必要に応じて、データをサポートされている型に変換します。

MySQL

シャーディングされたテーブルの同期

設定手順については、「シャーディングされたデータベースとテーブルからのデータ同期」をご参照ください。

文字化けの修正

接続文字列を使用してデータソースを追加する場合、JDBC 形式を jdbc:mysql://xxx.x.x.x:3306/database?com.mysql.jdbc.faultInjection.serverCharsetIndex=45 に変更します。詳細については、「MySQL データソースの追加」をご参照ください。

読み取り/書き込みエラー:Application was streaming results when the connection failed. Consider raising value of 'net_write_timeout/net_read_timeout' on the server.

  • 原因:

    • net_read_timeout:Data Integration が ApsaraDB RDS for MySQL からデータを読み取る際、シャードキー (splitPk) に基づいてクエリを複数の SELECT ステートメントに分割します。これらのステートメントのいずれかの実行時間が ApsaraDB RDS インスタンスで許可されている最大実行時間を超えると、このエラーが発生します。

    • net_write_timeout:このエラーは、クライアントへのデータブロックの送信時間が設定されたタイムアウトを超えた場合に発生します。

  • 解決策:

    データソースの JDBC URL にパラメーターを追加するか、ApsaraDB RDS コンソールで変更できます。

  • 推奨事項:

    タスクが安全に再実行できる場合は、タスクの自動再実行を有効にします。

数据源参数设置

例:jdbc:mysql://192.168.1.1:3306/lizi?useUnicode=true&characterEncoding=UTF8&net_write_timeout=72000

MySQL へのバッチ同期中のエラー:[DBUtilErrorCode-05]ErrorMessage: Code:[DBUtilErrorCode-05]Description:[Failed to write data to the destination table that you configured.]. - com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: No operations allowed after connection closed

原因:

MySQL では、wait_timeout パラメーターのデフォルト値は 8 時間です。このタイムアウトに達したときに同期タスクがまだデータをフェッチしている場合、タスクは中断されます。

解決策:

MySQL サーバーの my.cnf 設定ファイル (Windows の場合は my.ini) を変更します。MySQL セクションに、秒単位の値で次のパラメーターを追加します:wait_timeout=2592000 interactive_timeout=2592000。変更を保存した後、MySQL サービスを再起動し、MySQL にログオンして show variables like '%wait_time%' コマンドを実行して新しい設定を確認します。

読み取りエラー:The last packet successfully received from the server was 902,138 milliseconds ago

このエラーは、CPU 使用率が正常であっても、高いメモリ使用量のために接続が切断された場合に発生する可能性があります。

タスクが安全に再実行できる場合は、[エラー時に自動再実行] を有効にします。詳細については、「時間プロパティ」をご参照ください。

PostgreSQL

PostgreSQL 読み取りエラー:org.postgresql.util.PSQLException: FATAL: terminating connection due to conflict with recovery

  • 症状:PostgreSQL データベースからデータを読み取るバッチ同期タスクが次のエラーで失敗します:org.postgresql.util.PSQLException: FATAL: terminating connection due to conflict with recovery

  • 原因:このエラーは、長時間実行されるクエリがデータベースのリカバリプロセスと競合する場合に、スタンバイ PostgreSQL インスタンスで発生します。サーバーは、クエリが max_standby_archive_delay または max_standby_streaming_delay パラメーターで設定された時間制限を超えたため、クエリを終了します。これを解決するには、これらのパラメーター値を増やして、クエリが完了するまでの時間を長くします。詳細については、「スタンバイサーバーイベント」をご参照ください。

RDS

Host is blocked エラー

Amazon RDS データソースへの接続が Host is blocked エラーを返す場合は、Amazon のロードバランシングヘルスチェックを無効にして、このエラーが再発しないようにします。

MongoDB

root ユーザー使用時のエラー

MongoDB データソースを追加する場合、同期したいコレクションを含むデータベースで作成されたユーザーを使用する必要があります。root ユーザーは使用できません。

たとえば、test データベースから name コレクションをインポートしたい場合は、test データベースで作成されたユーザーを使用する必要があります。

タイムスタンプによる増分同期

代入ノードを使用して日付値をタイムスタンプに変換し、それを MongoDB 同期タスクの入力パラメーターとして使用できます。詳細については、「MongoDB のタイムスタンプフィールドで増分同期を実行するにはどうすればよいですか?」をご参照ください。

同期後の時刻の不一致

MongoDB Reader の設定でタイムゾーンを設定します。詳細については、「MongoDB Reader」をご参照ください。

ソースの更新が同期されない

他のすべての設定を変更せずに、一定の間隔をあけてタスクを再起動します。

MongoDB Reader の大文字と小文字の区別

はい。データが読み取られるとき、設定した Column.name は大文字と小文字を区別します。設定された大文字と小文字がソースと一致しない場合、データは NULL として読み取られます。例:

  • MongoDB のソースデータ:

    {
        "MY_NAME": "zhangsan"
    }
  • 同期タスクの列設定:

    {
        "column":
        [
            {
                "name": "my_name"
            }
        ]
    }

この不一致により、データ読み取りエラーが発生します。

MongoDB Reader のタイムアウト設定

タイムアウトパラメーターは cursorTimeoutInMs です。デフォルト値は 600,000 ms (10 分) です。このパラメーターは、データ転送時間を除く、MongoDB サーバーでの最大クエリ実行時間を指定します。完全同期で大量のデータを読み取ると、次のエラーが発生する可能性があります:MongoDBReader$Task - operation exceeded time limitcom.mongodb.MongoExecutionTimeoutException: operation exceeded time limit

読み取りエラー:no master

同期タスクは現在、セカンダリインスタンスからの読み取りをサポートしていません。セカンダリインスタンスから読み取るようにタスクを設定すると、no master エラーが発生します。

読み取りエラー:MongoExecutionTimeoutException: operation exceeded time limit

  • 原因:

    カーソルのタイムアウトが発生しました。

  • 解決策:

    cursorTimeoutInMs パラメーターの値を増やします。

バッチ同期エラー:DataXException: operation exceeded time limit

タスクの並列スレッド数とバッチサイズを増やします。

同期タスクエラー:no such cmd splitVector

  • 考えられる原因:

    デフォルトでは、同期タスクはタスクスライシングに splitVector コマンドを使用します。ただし、一部のバージョンの MongoDB は splitVector コマンドをサポートしていないため、no such cmd splitVector エラーが発生します。

  • 解決策:

    1. 同期タスク設定ページで、上部のツールバーにあるスクリプトに変換アイコン 转换脚本 をクリックしてスクリプトモードに入ります。

    2. MongoDB パラメーター設定で、パラメーターを追加します。

      "useSplitVector" : false

      splitVector の使用を避けるため。

不変の _id フィールド変更エラー

  • 症状:

    この問題は、バッチ同期タスクで Write Mode (Overwrite)Yes に設定され、設定された Business Key_id フィールドではない場合に発生します。書き込みモードのエラー

  • 考えられる原因:

    書き込むデータには、_id が設定済みの Business Key と一致しないエントリ (前の設定例の my_id など) が含まれています。

  • 解決策:

    • 解決策 1: ソースデータで、Business Key フィールドの値が _id フィールドの値と一致するようにしてください。

    • 解決策 2:タスク設定を変更して、_id フィールドをビジネスキーとして使用します。

Redis

ハッシュモード書き込みエラー:Code:[RedisWriter-04], Description:[Dirty data]. - source column number is in valid!

  • 原因:

    ハッシュモードを使用して Redis にデータを保存する場合、属性と値はペアで出現する必要があります。たとえば、ソースが odpsReader: "column":[ "id", "name", "age", "address" ] で設定され、ターゲットの Redis ライターが RedisWriter: "keyIndexes":[ 0, 1] で設定されている場合、id と name の列がキーとして使用されます。残りの列、age と address は、ハッシュの属性と値になります。ソースが 2 つの列しか提供せず、両方がキーとして指定されている場合、属性と値のペアを形成する列が残らないため、このエラーが発生します。

  • 解決策:

    2 つの列のみを使用する必要がある場合は、Redis が文字列モードを使用するように設定します。ハッシュモードを使用する必要がある場合は、ソースがキー、属性、値の少なくとも 3 つの列を提供することを確認してください。

OSS

CSV の複数文字区切り文字

  • 症状:

    OSS や FTP などのファイルストレージサービスから読み取る際、バッチ同期タスクがダーティデータエラーで失敗することがあります。この問題は、ソースファイルが CSV 形式で、|,##;; などの複数文字の区切り文字を使用している場合に発生します。タスクの実行ログには IndexOutOfBoundsException とダーティデータエラーが表示されます。

  • 原因:

    DataWorks に組み込まれている csv リーダー ("fileFormat": "csv") は、複数文字の区切り文字を正しく処理できないため、列の分割が正しく行われません。

  • 解決策:

    • ウィザードモード:ファイルタイプをテキストに切り替え、複数文字の区切り文字を明示的に指定します。

    • スクリプトモード:"fileFormat": "csv""fileFormat": "text" に変更し、次のパラメーターで区切り文字を設定します: "fieldDelimiter":"<your_multi-character_delimiter>", "fieldDelimiterOrigin":"<your_multi-character_delimiter>"

OSS ファイル制限

OSS リーダーは、バッチ同期タスクで読み取れるファイル数に制限を設けていません。主な制限は、タスクによって消費される計算ユニット (CU) から来ます。一度に多くのファイルを読み取ると、メモリ不足エラーが発生する可能性があります。OutOfMemoryError: Java heap space エラーを防ぐために、object パラメーターを * に設定しないでください。

ファイル名のランダムな文字列の削除

OSS Writer は、書き込まれたオブジェクトのファイル名を使用してディレクトリ構造をシミュレートします。たとえば、「object」:「datax」と指定すると、結果のオブジェクト名は datax で始まり、ランダムな文字列が付加されます。作成されるファイル数は、バッチ同期タスクのサブタスクの数によって決まります。

ランダムな UUID サフィックスを避けるには、「writeSingleObject」:「true」を設定します。詳細については、OSS データソースドキュメントの writeSingleObject の説明をご参照ください。

OSS エラー:AccessDenied

  • 原因:

    データソースの AccessKey に、バケットに対する必要な権限がありません。

  • 解決策:

    OSS データソースに使用される AccessKey に、バケットに対する読み取り権限を付与します。

Hive

エラー:Could not get block locations

  • 原因:

    このエラーは、mapred.task.timeout パラメーターの値が低すぎる場合に発生する可能性があります。タイムアウトに達すると、Hadoop はタスクを終了し、一時ディレクトリをクリーンアップします。その後、タスクは一時データを見つけられないため失敗します。

  • 解決策:

    バッチ同期タスクのデータソース設定で、Hive read methods[Hive JDBC を使用してデータを読み取る (条件付きフィルタリングをサポート)] に設定した場合、Session Configuration セクションで mapred.task.timeout パラメーターの値を増やすことができます。たとえば、パラメーターを mapred.task.timeout=600000 に設定します。

DataHub

ペイロードサイズ超過による失敗

  • 症状:

    次のようなエラーメッセージが表示されます:ERROR JobContainer - Exception when job runcom.alibaba.datax.common.exception.DataXException: Code:[DatahubWriter-04], Description:[Failed to write data.]. - com.aliyun.datahub.exception.DatahubServiceException: Record count 12498 exceed max limit 10000 (Status Code: 413; Error Code: TooLargePayload; Request ID: 20201201004200a945df0bf8e11a42)

  • 原因:

    このエラーは、単一の DataX リクエストのデータが DataHub サービスの制限を超えているために発生します。次のパラメーターが各バッチコミットのサイズを制御します。

    • maxCommitSize パラメーターは、DataX がバッチを送信先にコミットする前にバッファリングするデータの最大サイズをメガバイト (MB) 単位で設定します。デフォルト値は 1 MB、つまり 1,048,576 バイトです。

    • batchSize パラメーターは、DataX がバッチを送信先にコミットする前にバッファリングする最大レコード数を設定します。

  • 解決策:

    この問題を解決するには、maxCommitSizebatchSize パラメーターを減らします。

LogHub

同期後の空のフィールド

このプラグインのフィールドは大文字と小文字を区別します。LogHub リーダーの column 設定を確認してください。

同期後のデータ欠落

Data Integration は、LogHub への到着時間に基づいてデータを取得します。LogHub コンソールで、データの receive_time メタデータフィールドがタスクの設定された時間範囲内にあることを確認してください。

マッピングにおける予期しないフィールド

これが発生した場合は、UI で column を手動で編集してください。

読み取られた __time__ の値が設定された時間範囲外である、またはコンソールのレコード数が同じ時間範囲の同期タスクによって読み取られたエントリ数と一致しないのはなぜですか?

バッチ同期タスクに設定された開始時刻と終了時刻は、Reader が SLS GetCursor API を呼び出して開始カーソルと終了カーソルを決定するために使用されます。これらの時刻は、SLS サーバーの受信時間に基づいて読み取り範囲を定義するために使用されます。タスクは実際にカーソル範囲内のデータを読み取ります。これは、__time__ 出力フィールドによるフィルタリングとは異なります。

出力フィールド __time__ は、各ログの log.getTime() から派生し、ログ自体のタイムスタンプを表します。SLS コンソールでのクエリは通常、クエリ時間範囲、クエリ文、およびインデックスフィールドに基づいており、ログ時間 __time__ が一般的な時間参照です。したがって、同期タスクとコンソールクエリが同じ時間値を使用しても、異なる時間参照を使用している場合、取得された __time__ の範囲またはレコード数が一致しない場合があります。

一般的なシナリオは次のとおりです。

  1. ログの収集または配信に遅延がある場合、過去のログが再送信される場合、またはクライアントの時刻が不正確な場合、ログ時間 __time__ は SLS サーバーの受信時間よりも早くなったり遅くなったりすることがあります。同期タスクはサーバーの受信時間に基づいてカーソルを配置し、コンソールは __time__ に基づいてデータをクエリするため、2 つの結果が異なる場合があります。

  2. SLS データ処理を使用して別の LogStore にデータを書き込む場合、データ処理ステートメントが __time__ を明示的に設定しない限り、送信先ログの __time__ は通常、データ処理タスクの実行時間ではなく、ソースログのタイムスタンプを保持します。この場合、同期タスクは、これらのログが送信先 LogStore に書き込まれる時間範囲内でこれらのログを読み取る可能性があります。ただし、処理の実行時間または現在の時間範囲に基づいて送信先 LogStore コンソールをクエリしても、これらのログが見つからない場合があります。ログの実際の __time__ に対応する時間範囲に基づいてクエリする必要があります。

  3. コンソールのクエリ文、インデックスフィールド、時間範囲、および同期タスクのルールフィルターステートメント (SPL) が、全体的な時間枠が同じであっても一致しない場合、レコード数が異なる場合があります。

トラブルシューティングの提案:

  1. コンソールのクエリ時間範囲、ステートメント、およびインデックスフィールドが、同期タスクの開始時刻、終了時刻、およびフィルター規則 (SPL) と一致していることを確認します。

  2. column で、__time__ (ログ時間) と __tag__:__receive_time__ (ログタグに存在する必要がある SLS サーバー側受信時間の観測可能なフィールド) の両方を出力して、ログ時間とサーバー側受信時間を比較します。

  3. データが SLS データ処理から来ている場合は、処理ステートメントが __time__ を明示的に設定していることを確認し、実際の __time__ に従って送信先 LogStore のコンソールでクエリ時間範囲を調整します。

  4. 下流システムがログ時間による厳密な照合を必要とする場合は、ターゲットに書き込んだ後、__time__ に基づいてフィルタリングまたは統計を実行できます。

たとえば、ソースログの __time__2026-06-01 10:00:00 です。SLS データ処理タスクは、2026-06-12 10:00:00 にログを送信先 LogStore に書き込み、処理ステートメントは __time__ を明示的に変更しません。この場合、送信先ログの __time__ は依然として 2026-06-01 10:00:00 です。同期タスクに設定された開始時刻と終了時刻が 2026-06-12 10:00:00 をカバーしている場合、タスクはこのログを読み取る可能性があります。ただし、コンソールで送信先 LogStore をクエリする場合、2026-06-12 10:00:00 前後の時間範囲を選択し、コンソールが __time__ でログをフィルタリングすると、ログが見つからない場合があります。この場合、コンソールのクエリ時間を 2026-06-01 10:00:00 前後の時間範囲に調整するか、ビジネス要件に基づいてデータ処理中に送信先ログの __time__ を明示的に設定する必要があります。

同期後の空のフィールド

Reader は、column 設定に基づいて、フェッチされたログコンテンツ、その組み込みメタデータフィールドマッピング、および LogTag からフィールド名を照合します。フィールド名は大文字と小文字を区別し、一致が見つからない場合、Reader はエラーを報告せずに null を出力します。

一般的な原因は次のとおりです。

  1. column に設定されたフィールド名の大文字と小文字が、元のログフィールドキーの大文字と小文字と一致しません。

  2. コンソールには、エイリアス、インデックスフィールド、または展開された JSON フィールドが表示されますが、これらはリーダーが受け取る生のログキーとは異なります。

  3. フィールドは LogTag から派生しており、__tag__:<tagKey> として設定する必要があります。

  4. ルールフィルタリングステートメント (SPL) または処理設定を設定した後、処理された出力フィールド名が column と完全に一致しません。

トラブルシューティングの際には、まずビジュアルページのソースフィールドとデータプレビューを使用して、Reader が実際に認識するフィールドを確認できます。スクリプトモードでは、一時的に column["*"] に設定し、Reader が取得する一般的なログコンテンツのフィールドキーを観察してから、元のキーを使用して column を設定することもできます。

Lindorm

バルク書き込みデータの置換

書き込みロジックは API 呼び出しのロジックと同じです。一致する行と列のデータを上書きし、他のすべてのデータは変更しません。

Elasticsearch

インデックス内のすべてのフィールドのクエリ

curl コマンドを使用して ES インデックスのマッピングを取得できます。その後、マッピングからすべてのフィールド定義を抽出できます。

  • クエリシェルコマンド:

    // ES 7.x の場合
    curl -u username:password --request GET 'http://esxxx.elasticsearch.aliyuncs.com:9200/indexname/_mapping'
    // ES 6.x の場合
    curl -u username:password --request GET 'http://esxxx.elasticsearch.aliyuncs.com:9200/indexname/typename/_mapping'
  • 応答の例:

    {
        "indexname": {
            "mappings": {
                "typename": {
                    "properties": {
                        "field1": {
                            "type": "text"
                        },
                        "field2": {
                            "type": "long"
                        },
                        "field3": {
                            "type": "double"
                        }
                    }
                }
            }
        }
    }

    応答では、properties オブジェクトにインデックスのすべてのフィールドとそのプロパティ定義が含まれています。この例では、インデックスには field1field2field3 の 3 つのフィールドが含まれています。

動的インデックス名による同期

インデックス設定で日付ベースのスケジューリング変数を使用できます。これにより、システムは実行日に基づいてインデックス名を動的に生成できます。このプロセスには、日付変数の定義、インデックス変数の設定、タスクのデプロイと実行の 3 つのステップが含まれます。

  1. 日付変数の定義:同期タスクのスケジューリング設定で、新しいパラメーターを追加して日付変数を定義します。次の例では、var1 はタスク実行時間 (当日) を表し、var2 は業務日 (前日) を表します。定义日期变量

  2. インデックス変数の設定:スクリプトモードに切り替え、Elasticsearch Reader のインデックスを設定します。次の図に示すように、${variable_name} 形式を使用します。配置索引变量

  3. タスクのデプロイと実行:タスクを確認した後、コミットしてオペレーションセンターにデプロイします。その後、スケジュールに基づいて実行するか、データ補填機能を使用して実行できます。

    1. Running with Parameters をクリックしてタスクを実行し、テストします。この操作により、構成内のスケジューリングパラメーターが実際の値に置き換えられます。タスクの実行後、ログを確認して、正しいインデックスが使用されたことを確認します。

      説明

      [パラメーター付きで実行] を使用する場合は、パラメーター値を直接入力してテストします。

      运行运行

    2. テスト結果が期待どおりの場合、タスク構成は完了です。SaveCommitを順にクリックして、同期タスクを本番環境にコミットします。提交任务

      プロジェクトが標準モードの場合、Deploy をクリックしてデプロイメントセンターを開き、同期タスクを本番環境にデプロイする必要があります。发布

  4. 結果:次の例は、設定とランタイムで使用される実際のインデックス名を示しています。

    スクリプトモードでのインデックス設定:"index": "esstress_1_${var1}_${var2}"

    ランタイムで使用されるインデックス名:esstress_1_20230106_20230105

    运行结果

オブジェクトまたはネストされたフィールドのプロパティの同期

オブジェクトフィールドのプロパティを同期するには、スクリプトモードを使用する必要があります。スクリプトモードでは、次のコードに示すように multi パラメーターを設定し、ドット表記 (例:property.child_property) を使用して列パスを指定します。

"multi":{
   "multi":true 
 }

次の例は設定を示しています。

# 例:
## ES のデータ
"hits": [
    {
        "_index": "mutiltest_1",
        "_type": "_doc",
        "_id": "7XAOOoMB4GR_1Dmrrust",
        "_score": 1.0,
        "_source": {
            "level1": {
                "level2": [
                    {
                        "level3": "testlevel3_1"
                    },
                    {
                        "level3": "testlevel3_2"
                    }
                ]
            }
        }
    }
]
## Reader の設定
"parameter": {
  "column": [
      "level1",
      "level1.level2",
      "level1.level2[0]"
  ],
  "multi":{
        "multi":true
    }
}
## 送信先の結果:リーダー設定と同じ順序で 3 つの列を持つ 1 行。
COLUMN              VALUE
level1:             {"level2":[{"level3":"testlevel3_1"},{"level3":"testlevel3_2"}]}
level1.level2:      [{"level3":"testlevel3_1"},{"level3":"testlevel3_2"}]
level1.level2[0]:   {"level3":"testlevel3_1"}

文字列の引用符と JSON からネストへの同期の処理

  1. 文字列の周りの二重引用符は Kibana の表示上のアーティファクトです。Elasticsearch の実際のデータにはこれらの余分な引用符はありません。curl コマンドや Postman などのツールを使用して生のデータを表示できます。データを取得するための curl コマンドは次のとおりです。

    // ES 7.x の場合
    curl -u username:password --request GET 'http://esxxx.elasticsearch.aliyuncs.com:9200/indexname/_search'
    // ES 6.x の場合
    curl -u username:password --request GET 'http://esxxx.elasticsearch.aliyuncs.com:9200/indexname/typename/_search'

    结果

  2. はい、送信先 ES フィールドの型を nested に設定して、MaxCompute の JSON 文字列を ES の nested オブジェクトに同期できます。次の例は、name フィールドを ES の nested 形式に同期する方法を示しています。

    • 同期設定:name フィールドの typenested に設定します。同步配置

    • 同期結果:name フィールドは nested オブジェクトです。同步结果

ソースデータが 文字列「[1,2,3,4,5]」 の場合、ES に配列として保存するにはどうすればよいですか?

データを ES に配列として書き込むには、ソースデータの形式に基づいて 2 つの方法のいずれかを使用できます。

  • ソースデータを JSON 配列として解析します。ソースデータが "[1,2,3,4,5]" の場合は、json_array=true を設定してソースデータを解析します。これにより、データが ES フィールドに配列として書き込まれます。列設定で json_array=true を設定します。

    • コードレス UI での設定:向导模式配置

    • スクリプトモードでの設定:

      "column":[
        {
          "name":"docs",
          "type":"keyword",
          "json_array":true
        }
      ]
  • 区切り文字を使用してソースデータを解析します。ソースデータが「1,2,3,4,5」の場合は、splitter="," を設定して文字列を解析し、ES フィールドに配列として書き込みます。

    • 制限事項:

      • 各タスクは 1 つの splitter (区切り文字) のみをサポートします。異なる配列フィールドに異なる区切り文字を設定することはできません。たとえば、ソース列 col1="1,2,3,4,5"col2="6-7-8-9-10" がある場合、各列に異なる splitter を指定することはできません。

      • splitter は正規表現にすることができます。たとえば、ソース列の値が「6-,-7-,-8+,*9-,-10」の場合、splitter"[-,*+]" に設定できます。これはコードレス UI でサポートされています。

    • コードレス UI での設定:脚本模式配置 デフォルトの splitter"-,-" です。

    • スクリプトモードでの設定:

      "parameter" : {
            "column": [
              {
                "name": "col1",
                "array": true,
                "type": "long"
              }
            ],
            "splitter":","
      }

認証失敗と過剰なログの防止

  • 原因:

    HttpClient は、まず認証されていないリクエストを送信して、必要な認証方式を決定します。サーバーが認証チャレンジで応答した後、クライアントは 2 番目の認証済みリクエストを送信します。このプロセスは、すべての接続で発生します。したがって、ES へのすべての書き込み操作は、最初に認証されていないリクエストを生成し、それが監査ログに記録されます。

  • 解決策:

    スクリプトモードで "preemptiveAuth":true パラメーターを追加します。

日付型としてのデータ同期

日付ベースの書き込みを設定するには、要件に基づいて次のいずれかの方法を使用します。

  • リーダーによって読み取られたコンテンツを ES date フィールドに直接書き込みます。

    • origin:true を設定して、コンテンツを ES に直接書き込みます。

    • format パラメーターは、ライターが ES でマッピングを作成するときにフィールドの format プロパティを指定します。

      "parameter" : {
          "column": [
              {
                  "name": "col_date",
                  "type": "date",
                  "format": "yyyy-MM-dd HH:mm:ss",
                  "origin": true
              }
                ]
      }
  • タイムゾーン変換:Data Integration にタイムゾーンを変換させたい場合は、Timezone パラメーターを追加します。

    "parameter" : {
        "column": [
            {
                "name": "col_date",
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss",
                "Timezone": "UTC"
            }
              ]
    }

外部バージョンによる書き込み失敗の処理

  • type: "version" を持つ列を設定しました。Elasticsearch は外部バージョンの指定をサポートしていません。

        "column":[
                                {
                                    "name":"id",
                                    "type":"version"
                                },
      ]
  • 解決策:

    type: "version" 設定を削除します。

エラー:ES_MISSING_DATE_FORMAT

  • 原因:

    ES データソースのフィールドのマッピングに format が指定されていないため、Elasticsearch Reader は date フィールドの日付形式を解析できません。

  • 解決策:

    • dateFormat パラメーターを使用して、考えられるすべての日付形式を || で区切って指定します。形式は、ソース date フィールドの日付値と一致する必要があります。例:

      "parameter" : {
            "column": [
           			"dateCol1",
              	"dateCol2",
                "otherCol"
            ],
           "dateFormat" : "yyyy-MM-dd||yyyy-MM-dd HH:mm:ss",
      }
    • ES インデックスのすべての date フィールドのマッピングを更新して、format 定義を含めます。

エラー:DataXException

  • 原因:

    このエラーは、Fastjson ライブラリの制限により、インデックス名またはフィールド名に $ref などの制限されたキーワードが含まれていることが原因で発生する可能性があります。

  • 解決策:

    Elasticsearch Reader は、それらを含むインデックスの同期をサポートしていないため、インデックス名またはフィールド名に $ref などの制限されたキーワードを使用しないでください。詳細については、「Elasticsearch Reader」をご参照ください。

エラー:version_conflict_engine_exception

  • 原因:

    このエラーは、ES の楽観的ロックメカニズムによってトリガーされるバージョン競合が原因で発生します。これは、更新コマンドのドキュメントのバージョンがドキュメントの現在のバージョンと一致しないことを示します。これは、ドキュメントが別のプロセスによって同時に削除または更新されている場合に発生する可能性があります。

  • 解決策:

    1. 他のプロセスがインデックスからデータを削除しているかどうかを確認します。

    2. タスクの同期方法を Update から Index に変更します。

エラー:illegal_argument_exception

  • 原因:

    similarityproperties などの高度なプロパティを列に設定する場合、プラグインがそれらを認識できるように、other_params オブジェクト内に配置する必要があります。原因

  • 解決策:

    Columnother_params を設定し、other_params で次のように similarity を追加します。

    {"name":"dim2_name",...,"other_params":{"similarity":"len_similarity"}}

配列フィールド同期エラー:dense_vector

  • 原因:

    現在、Elasticsearch Writer は dense_vector 型をサポートしていません。サポートされている型は次のとおりです。

    ID,PARENT,ROUTING,VERSION,STRING,TEXT,KEYWORD,LONG,
    INTEGER,SHORT,BYTE,DOUBLE,FLOAT,DATE,BOOLEAN,BINARY,
    INTEGER_RANGE,FLOAT_RANGE,LONG_RANGE,DOUBLE_RANGE,DATE_RANGE,
    GEO_POINT,GEO_SHAPE,IP,IP_RANGE,COMPLETION,TOKEN_COUNT,OBJECT,NESTED;
  • 解決策:

    サポートされていない型を操作するには、次の手順を実行します。

    • 代わりに、ES でインデックスマッピングを手動で作成します。

    • 送信先フィールドのデータ型を NESTED に変更します。

    • ライター設定で、dynamic = truecleanup=false を設定します。

ライター設定が有効にならない

  • 原因:

    # 不正な設定
    "settings": {
      "index": {
        "number_of_shards": 1,
        "number_of_replicas": 0
      }
    }
    # 正しい設定
    "settings": {
      "number_of_shards": 1,
      "number_of_replicas": 0
    }
  • 解決策:

    設定はインデックスが作成されたときにのみ適用されます。インデックスは、インデックスが存在しない、または cleanup=true が設定されているという 2 つのシナリオで作成されます。cleanup=true を設定する場合は、settings 設定からラップしている index オブジェクトを削除します。

自己構築インデックスのキーワード型ネスト属性が、自動生成 (cleanup=true で同期タスクを実行) 後にキーワードに変わるのはなぜですか?

# 元のマッピング
{
  "name":"box_label_ret",
  "properties":{
    "box_id":{
      "type":"keyword"
    }
}
# cleanup=true で再構築後のマッピング
{
    "box_label_ret": {
      "properties": {
        "box_id": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }}}}
}
  • 原因:

    nested 型を処理する場合、Elasticsearch Writer はトップレベルのマッピングのみを適用し、Elasticsearch にネストされたプロパティのマッピングを推論させます。この変更は Elasticsearch の適応動作であり、その機能には影響しません。特定のマッピング形式を使用する必要がある場合は、「Data Integration の同期タスクの機能」をご参照ください。

  • 解決策:

    目的のマッピングを維持するには、同期タスクを実行する前に、必要なマッピングで ES インデックスを作成します。次に、タスク設定で cleanup パラメーターを false に設定します。

Kafka

データが endDateTime を超える

Kafka Reader はデータをバッチで読み取ります。バッチに指定された endDateTime より後のレコードが含まれている場合、リーダーは後続のバッチの同期を停止します。ただし、リーダーは、endDateTime を超える現在のバッチのレコードを送信先データソースに書き込みます。

  • skipExceedRecord パラメーターを使用して、これらの超過レコードを書き込むかどうかを制御できます。詳細については、「Kafka Reader」をご参照ください。データ損失につながる可能性があるため、これらのレコードをスキップすることはお勧めしません。

  • Kafka の max.poll.records パラメーターを設定して、1 回のポーリングでフェッチされるレコード数を制限できます。このパラメーターを並列スレッド数とともに調整することで、超過データの最大量を制御できます。超過データの量は、max.poll.records * 並列スレッド数よりも少なくなります。

タスクが完了しない

  • 原因:

    データ量が少ないか、データ分散が不均一な場合、一部の Kafka パーティションが新しいデータを受信しないか、新しいデータが指定された終了オフセットに到達しないことがあります。バッチ同期タスクは、すべてのパーティションが指定された終了オフセットに到達した場合にのみ終了できます。これらのアイドル状態のパーティションにより、タスクが完了できなくなります。

  • 解決策:

    この状況では、[終了戦略][1 分間新しいデータが読み取られない場合に終了] に設定します。スクリプトモードでは、stopWhenPollEmpty パラメーターを true に、stopWhenReachEndOffset パラメーターを true に設定します。これにより、タスクはすべてのパーティションの最新のオフセットまでのすべてのデータを読み取った後に終了でき、アイドル状態の実行を防ぎます。ただし、タスクの終了後に設定された endDateTime より前のタイムスタンプを持つレコードが生成された場合、それらのレコードは消費できないことに注意してください。

RestAPI

RestAPI Writer エラー:The JSON data at path:[] is not an array

RestAPI Writer は 2 つの書き込みモードを提供します。複数のデータレコードを同期するには、dataMode パラメーターを multiData に設定し、RestAPI Writer スクリプトに dataPath:"data.list" を追加します。詳細については、「RestAPI Writer」をご参照ください。参数

重要

列を設定する際には、「data.list」プレフィックスを追加しないでください。

Tablestore Writer の設定

自動採番主キー

  1. Tablestore Writer の設定には、次のパラメーターを含める必要があります。

    "newVersion": "true",
    "enableAutoIncrement": "true",
  2. 自動採番主キー列の名前を指定しないでください。

  3. primaryKey パラメーターと column パラメーターで定義された列の総数は、上流の Tablestore Reader の列数と等しくなければなりません。

時系列モデルの設定

_tagsis_timeseries_tag

たとえば、データレコードには phone=xiaomi、RAM=8G、camera=LEICA の 3 つのタグがあります。数据

  • データエクスポートの例 (Tablestore Reader)

    • これらのタグを結合して単一の列としてエクスポートするには、次の設定を使用します。

      "column": [
            {
              "name": "_tags",
            }
          ],

      DataWorks は、次の形式でタグを単一の列としてエクスポートします。

      ["phone=xiaomi","camera=LEICA","RAM=8G"]
    • phone タグと camera タグを別々の列としてエクスポートするには、次の設定を使用します。

      "column": [
            {
              "name": "phone",
              "is_timeseries_tag":"true",
            },
            {
              "name": "camera",
              "is_timeseries_tag":"true",
            }
          ],

      DataWorks は、次の形式で 2 つの列をエクスポートします。

      xiaomi, LEICA
  • データインポートの例 (Tablestore Writer)

    上流のデータソースに 2 列のデータが含まれていると仮定します。

    • 1 つの列には ["phone=xiaomi","camera=LEICA","RAM=8G"] が含まれています。

    • もう 1 つの列には 6499 が含まれています。

    両方の列のデータをタグフィールドに追加するには、格式 この設定を使用します。

    "column": [
          {
            "name": "_tags",
          },
          {
            "name": "price",
            "is_timeseries_tag":"true",
          },
        ],
    • 最初の列の設定では、["phone=xiaomi","camera=LEICA","RAM=8G"] 配列全体をタグフィールドにインポートします。

    • 2 番目の列の設定では、price=6499 を別のタグとしてタグフィールドにインポートします。

カスタムテーブル名

カスタムテーブル名

テーブル名が orders_20170310orders_20170311orders_20170312 のような日次テーブルなどのパターンに従い、同じテーブルスキーマを共有している場合は、スクリプトモード設定でスケジューリングパラメーターを使用してカスタムテーブル名を指定できます。これにより、タスクは毎朝ソースデータベースの前日のテーブルから自動的にデータを読み取ることができます。

たとえば、タスクが 2017 年 3 月 15 日に実行される場合、orders_20170314 テーブルからデータを読み取ります。自定义表名

スクリプトモードでは、ソーステーブル名を orders_${tablename} などの変数に置き換えます。前日のデータを読み取る必要があるため、タスクのパラメーター設定で tablename=${yyyymmdd-1} を設定します。

説明

スケジューリングパラメーターの使用方法の詳細については、「スケジューリングパラメーターの形式」をご参照ください。

テーブル列の変更

ソース列の変更

同期タスク設定ページに移動し、列マッピングを更新してタスクを再送信します。

タスク設定の問題

不完全なテーブルリスト

バッチ同期タスクを設定すると、[ソースの選択] セクションにはデフォルトで最初の 25 テーブルのみがリストされます。必要なテーブルがリストにない場合は、テーブル名で検索するか、コードエディタを使用してください。

テーブル名または列名のキーワード

キーワード関連のタスクの失敗

  • 原因:列名が予約キーワードであるか、数字で始まる場合にタスクが失敗します。

  • 解決策:Data Integration 同期タスクをスクリプトモードに切り替え、列設定の特殊なフィールドにエスケープ文字を使用します。詳細については、「スクリプトモードでのタスクの設定」をご参照ください。

    • MySQL の場合は、キーワードをバッククォートで囲みます (例:`keyword`)。

    • Oracle および PostgreSQL の場合は、キーワードを二重引用符で囲みます (例:"keyword")。

    • SQL Server の場合は、キーワードを角括弧で囲みます (例:[keyword])。

    以下は MySQL を使用した例です。字段冲突

  • 次の例では、MySQL データソースを使用しています。

    1. 次のステートメントを実行して、aliyun という名前のテーブルを作成します。create table aliyun (`table` int ,msg varchar(10));

    2. 次のステートメントを実行してビューを作成し、table 列にエイリアスを割り当てます。create view v_aliyun as select `table` as col1,msg as col2 from aliyun;

      説明
      • table という単語は MySQL のキーワードです。その結果、生成された SQL ステートメントはデータ同期中に失敗します。これを防ぐには、ビューを作成し、table 列にエイリアスを割り当てます。

      • ベストプラクティスとして、キーワードを列名として使用しないでください。

    3. 同期タスクを設定する際、キーワード列に割り当てたエイリアスを使用するために、aliyun テーブルの代わりに v_aliyun ビューを選択します。

フィールドマッピング

バッチタスクエラー:plugin xx does not specify column

このエラーは、同期タスクのフィールドマッピングが正しくないか、プラグインの column パラメーターが正しく設定されていない場合に発生します。

  1. フィールドマッピングが設定されていることを確認します。

  2. プラグインの column パラメーターが正しく設定されていることを確認します。

非構造化データマッピングの失敗

  • 症状:

    Preview Data をクリックすると、フィールドのサイズがバイト制限を超えているというメッセージが表示されます。

    问题现象

  • 原因:メモリ不足 (OOM) エラーを防ぐため、データソースサービスはデータプレビューリクエストを処理する際に各フィールドの長さをチェックします。単一の列が 1,000 バイトを超えると、このメッセージが表示されます。このメッセージはタスクの通常の操作には影響しません。エラーを無視してバッチ同期タスクを実行できます。

    説明

    ソースファイルが存在し、ネットワーク接続が正常な場合、次の理由でデータプレビューが失敗することもあります。

    • ファイルの 1 行のデータが 10 MB の制限を超えています。この場合、データは表示されず、同様のメッセージが表示されます。

    • 1 行の列数が 1,000 列の制限を超えています。この場合、最初の 1,000 列のみが表示され、1,001 番目の列にメッセージが表示されます。

TTL の変更

同期されたテーブルの TTL

TTL はテーブルレベルの設定です。バッチ同期タスクで設定することはできません。

関数集約

API による関数集約

いいえ。API ベースのデータ同期はソース関数をサポートしていません。データを送信先に同期する前に、ソースでデータを処理してください。