すべてのプロダクト
Search
ドキュメントセンター

DataWorks:オフライン同期に関するよくある質問

最終更新日:Nov 15, 2025

このトピックでは、オフライン同期に関する一般的な問題について説明します。

ドキュメントの概要

このドキュメント内のキーワードを検索して、一般的な問題とその解決策を見つけることができます。

問題の分類

問題のキーワード

リファレンス

オフライン同期タスクの一般的な O&M の問題

ネットワーク通信の問題

データソースの接続性テストは成功するのに、オフライン同期タスクがデータソースに接続できないのはなぜですか?

リソースグループの切り替え

オフライン同期タスクの実行リソースグループを切り替えるにはどうすればよいですか?

ダーティデータ

実行タイムアウト

長時間実行されるオフライン同期タスクのトラブルシューティング方法

WHERE 句にインデックスがないためにデータ同期タスクが遅くなり、全表スキャンが発生する

ソーステーブルのデフォルト値を保持する

ソーステーブルにデフォルト値がある場合、それらは Data Integration によって作成されたターゲットテーブルに、NOT NULL 制約とともに保持されますか?

シャードキー

オフライン統合タスクを設定する際に、複合プライマリキーをシャードキーとして使用できますか?

データ欠損

データ同期後、ターゲットテーブルのデータがソーステーブルと一致しない

非プラグインエラーの原因と解決策

ダーティデータ

エンコード形式の設定や文字化けによって引き起こされるダーティデータエラーの対処方法

SSRF 攻撃

「Task have SSRF attacks」エラーの対処方法

ネットワーク通信の問題

オフライン同期タスクが成功したり失敗したりするのはなぜですか?

テーブル/列名のキーワード

テーブル名または列名のキーワードによって引き起こされる同期タスクの失敗に対処するにはどうすればよいですか?

テーブルへの列の追加

オフライン同期タスクのソーステーブルで列を追加または変更するにはどうすればよいですか?

日付の書き込み

日付と時刻のデータをテキストファイルに書き込む際に、ミリ秒を保持したり、カスタムの日付と時刻の形式を指定したりするにはどうすればよいですか?

特定のプラグインエラーの原因と解決策

MongoDB

OSS

OSS から読み取れるファイルの数に制限はありますか?

DataHub

DataHub に書き込む際に、単一書き込みデータ制限を超えたことによる書き込み失敗を処理するにはどうすればよいですか?

Lindorm

Lindorm の一括メソッドを使用してデータを書き込む場合、毎回既存データが置き換えられますか?

Elasticsearch

ES インデックス配下のすべてのフィールドをクエリするにはどうすればよいですか?

OTS Writer の設定

自動採番主キー列を含むターゲットテーブルにデータを書き込むように OTS Writer を設定するにはどうすればよいですか?

時系列モデルの設定

時系列モデルの設定で、_tag フィールドと is_timeseries_tag フィールドは何を意味しますか?

オフライン同期のシナリオと解決策

カスタムテーブル名

オフライン同期タスクのテーブル名をカスタマイズするにはどうすればよいですか?

MaxCompute

タスク設定の問題

オフライン同期ノードを設定する際に、すべてのテーブルを表示できません。これを処理するにはどうすればよいですか?

LogHub

Kafka

OSS

MySQL

TTL の変更

同期されたデータテーブルの場合、TTL は ALTER メソッドでのみ変更できますか?

関数の集約

API 経由で同期する場合、集約のためにソース側の関数 (例: MaxCompute) を使用することはサポートされていますか?たとえば、ソーステーブルに Lindorm のプライマリキーとして列 a と b がある場合。

Elasticsearch

フィールドマッピング

非構造化データソースの場合、[データプレビュー] をクリックしてもフィールドをマッピングできません。これを処理するにはどうすればよいですか?

エラーメッセージと解決策

リソース設定の問題

OSS

OSS からデータを読み取る際にエラーが発生します: AccessDenied The bucket you access does not belong to you.

Redis

ハッシュモードで Redis に書き込む際にエラーが発生します: Code:[RedisWriter-04], Description:[Dirty data]. - source column number is in valid!

PostgreSQL

PostgreSQL からデータを読み取る際にエラーが発生します: org.postgresql.util.PSQLException: FATAL: terminating connection due to conflict with recovery

MySQL

タスクインスタンスの競合

オフラインタスクがエラーで失敗します: Duplicate entry 'xxx' for key 'uk_uk_op'

ネットワーク通信の問題

MySQL データソースを使用したオフライン同期タスクが接続タイムアウトエラーで失敗します: Communications link failure

フィールドマッピング

オフラインタスクがエラーで失敗します: plugin xx does not specify column

MaxCompute

RestAPI

RestAPI Writer がエラーを報告します: The JSON string found through path:[] is not an array type

RDS

Amazon RDS ソースを使用したオフライン同期タスクがエラーで失敗します: Host is blocked

MongoDB

Elasticsearch

Hive

ローカル Hive インスタンスへのオフライン同期タスクがエラーで失敗します: Could not get block locations.

実行タイムアウト

MongoDB ソースを使用したオフライン同期タスクがエラーで失敗します: MongoDBReader$Task - operation exceeded time limitcom.mongodb.MongoExecutionTimeoutException: operation exceeded time limit.

ネットワーク通信の問題

データソースの接続性テストは成功するのに、オフライン同期タスクがデータソースに接続できないのはなぜですか?

  • 以前に接続性テストに合格した場合は、再度実行して、リソースグループとデータベースがまだ接続されていることを確認します。データベースに変更が加えられていないことを確認してください。

  • 接続性テストに使用されたリソースグループが、タスク実行に使用されたリソースグループと同じであるかどうかを確認します。

    タスクが実行されているリソースグループを表示するには:

    • タスクがデフォルトのリソースグループで実行される場合、ログには次の情報が含まれます: running in Pipeline[basecommon_ group_xxxxxxxxx]

    • タスクがデータ統合専用リソースグループで実行される場合、ログには次の情報が含まれます: running in Pipeline[basecommon_S_res_group_xxx]

    • タスクが Serverless リソースグループで実行される場合、ログには次の情報が含まれます: running in Pipeline[basecommon_Serverless_res_group_xxx]

  • スケジュールされたタスクが早朝に断続的に失敗し、再実行すると成功する場合は、エラー発生時のデータベースの負荷を確認してください。

オフライン同期タスクが成功したり失敗したりするのはなぜですか?

ホワイトリスト設定が不完全な場合、オフライン同期タスクが断続的に失敗することがあります。データベースのホワイトリストが完全に設定されているか確認してください。

データ統合専用リソースグループを使用する場合:

  • データ統合専用リソースグループの Elastic Network Interface (ENI) IP アドレスをデータソースのホワイトリストに追加した場合、リソースグループをスケールアウトした後にホワイトリストを更新する必要があります。スケールアウトされたリソースグループの ENI IP アドレスをデータソースのホワイトリストに追加してください。

  • リソースグループをスケールアウトした後にホワイトリストを更新するのを避けるために、データ統合専用リソースグループの vSwitch CIDR ブロックをデータベースのホワイトリストに追加できます。詳細については、「ホワイトリストの追加」をご参照ください。

Serverless リソースグループを使用する場合: 詳細については、「ホワイトリストの追加」をご参照ください。リソースグループのホワイトリスト設定を確認し、ネットワーク設定が正しいことを確認してください。

ホワイトリストが正しく設定されている場合は、データベースの負荷が高すぎないか確認してください。負荷が高いと接続が中断される可能性があります。

リソース設定の問題

オフライン同期タスクが次のエラーで失敗します: [TASK_MAX_SLOT_EXCEED]:Unable to find a gateway that meets resource requirements. 20 slots are requested, but the maximum is 16 slots.

  • 考えられる原因:

    並行処理レベルが高すぎるため、リソースが不足しています。

  • 解決策:

    オフライン同期タスクの並行処理を減らします。

    • コードレス UI でオフライン同期タスクを設定する場合、チャネルコントロール設定で [最大並行数] の値を減らします。詳細については、「コードレス UI で同期ノードを設定する」をご参照ください。

    • コードエディタでオフライン同期タスクを設定する場合、チャネルコントロール設定で [concurrent] パラメーターの値を減らします。詳細については、「コードエディタで同期ノードを設定する」をご参照ください。

オフライン同期タスクが次のエラーで失敗します: OutOfMemoryError: Java heap space

このエラーを解決するには、次の手順を実行します:

  1. プラグイン設定が `batchSize` や `maxFileSize` などのパラメーターをサポートしている場合は、それらの値を減らします。

    どのプラグインがこれらのパラメーターをサポートしているかを確認するには、「サポートされているデータソースとリーダーおよびライタープラグイン」に移動し、関連するプラグインをクリックしてパラメーターの詳細を表示します。

  2. 並行処理を減らします。

    • コードレス UI でオフライン同期タスクを設定する場合、チャネルコントロール設定で [最大並行数] の値を減らします。詳細については、「コードレス UI で同期ノードを設定する」をご参照ください。

    • コードエディタでオフライン同期タスクを設定する場合、チャネルコントロール設定で [concurrent] パラメーターの値を減らします。詳細については、「コードエディタで同期ノードを設定する」をご参照ください。

  3. OSS からのファイル同期など、ファイル同期の場合は、読み取るファイルの数を減らします。

  4. タスク設定の [実行リソース] セクションで、[リソース消費 (CU)] の値を適切なレベルに上げて、他の実行中のタスクに影響を与えないようにします。

タスクインスタンスの競合

オフラインタスクがエラーで失敗します: Duplicate entry 'xxx' for key 'uk_uk_op'

  • エラーメッセージ: Error updating database. Cause: com.mysql.jdbc.exceptions.jdbc4.MySQLIntegrityConstraintViolationException: Duplicate entry 'cfc68cd0048101467588e97e83ffd7a8-0' for key 'uk_uk_op'

  • 考えられる原因: Data Integration は、同じ同期タスクノードの異なるインスタンスが同時に実行されることを許可しません。これは、同じ JSON 設定を持つ複数の同期タスクが同時に実行できないことを意味します。たとえば、同期タスクが 5 分ごとに実行されるようにスケジュールされており、上流の遅延が発生したとします。00:05 に、00:00 のインスタンスと 00:05 のインスタンスの両方がトリガーされます。これにより、一方のインスタンスが失敗する可能性があります。タスクインスタンスの実行中にデータバックフィルまたは再実行操作が実行された場合にも競合が発生する可能性があります。

  • 解決策: インスタンスの実行時間をずらします。毎時または毎分実行されるタスクの場合、自己依存を設定できます。これにより、現在のサイクルのインスタンスは、前のサイクルのインスタンスが完了した後にのみ開始されるようになります。以前のバージョンのデータ開発については、「自己依存」をご参照ください。新しいバージョンのデータ開発については、「依存関係タイプの選択 (クロスサイクル依存)」をご参照ください。

実行タイムアウト

MongoDB ソースを使用したオフライン同期タスクがエラーで失敗します: MongoDBReader$Task - operation exceeded time limitcom.mongodb.MongoExecutionTimeoutException: operation exceeded time limit.

  • エラーメッセージ: データ同期タスクがエラー MongoDBReader$Task - operation exceeded time limitcom.mongodb.MongoExecutionTimeoutException: operation exceeded time limit で失敗します。

  • 考えられる原因: プルされるデータ量が大きすぎます。

  • 解決策:

    • 並行処理を増やします。

    • BatchSize を減らします。

    • Reader パラメーター設定で、cursorTimeoutInMs 設定を追加します。3,600,000 ms などのより大きな値を設定してみてください。

MySQL データソースを使用したオフライン同期タスクが接続タイムアウトエラーで失敗します: Communications link failure

  • 読み取りエラー

    • 症状:

      データが読み取られると、次のエラーが発生します: Communications link failure The last packet successfully received from the server was 7,200,100 milliseconds ago. The last packet sent successfully to the server was 7,200,100 milliseconds ago. - com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

    • 考えられる原因:

      データベースが SQL クエリをゆっくりと実行しているため、MySQL の読み取りタイムアウトが発生します。

    • 解決策:

      • where フィルター条件が設定されているかどうかを確認し、フィルターされたフィールドにインデックスが追加されていることを確認します。

      • ソースデータテーブルにデータが多すぎないか確認します。多すぎる場合は、タスクを複数の小さなタスクに分割します。

      • ログでブロッキング SQL 文を見つけ、データベース管理者 (DBA) に相談して問題を解決します。

  • 書き込みエラー

    • 症状:

      データが書き込まれると、次のエラーが発生します: Caused by: java.util.concurrent.ExecutionException: ERR-CODE: [TDDL-4614][ERR_EXECUTE_ON_MYSQL] Error occurs when execute on GROUP 'xxx' ATOM 'dockerxxxxx_xxxx_trace_shard_xxxx': Communications link failure The last packet successfully received from the server was 12,672 milliseconds ago. The last packet sent successfully to the server was 12,013 milliseconds ago. More...

    • 考えられる原因:

      スロークエリがソケットタイムアウトを引き起こしています。TDDL 接続のデフォルトのソケットタイムアウトは 12 秒です。SQL 文が MySQL 側で結果を返さずに 12 秒以上実行されると、4614 エラーが報告されます。このエラーは、データ量が多い場合やサーバーがビジー状態の場合に断続的に発生する可能性があります。

    • 解決策:

      • データベースが安定した後に同期タスクを再実行します。

      • データベース管理者に連絡してタイムアウト期間を調整します。

長時間実行されるオフライン同期タスクのトラブルシューティング方法

考えられる原因 1: 長い実行時間

  • `preSql` や `postSql` などの実行前ステートメントまたは実行後ステートメントがデータベースで実行されるのに時間がかかりすぎるため、タスクの実行が遅くなります。

  • シャードキーが正しく設定されていないため、タスクの実行が遅くなります。

    オフライン同期では、シャードキー (`splitPk`) を使用してデータをシャーディングします。データ同期プロセスは、この設定に基づいて同時タスクを開始し、効率を向上させます。関連するプラグインのドキュメントを確認して、シャードキーを設定する必要があるかどうかを判断してください。

解決策 1:

  • 実行前または実行後のステートメントを設定する場合は、データフィルタリングにインデックス付きフィールドを使用します。

  • プラグインがシャードキーをサポートしている場合は、正しく設定します。以下に、MySQL Reader プラグインのシャードキーの設定方法について説明します。

    • `splitPk` にはテーブルのプライマリキーを使用します。プライマリキーは通常、均等に分散されているため、作成されたシャードでのデータホットスポットを防ぐのに役立ちます。

    • 現在、`splitPk` はシャーディングに整数データのみをサポートしています。文字列、浮動小数点数、日付などの他のデータ型はサポートしていません。サポートされていないデータ型を指定すると、同期に単一のチャネルが使用されます。

    • `splitPk` を指定しない場合、または `splitPk` の値が空の場合、データ同期は単一のチャネルを使用してテーブルデータを同期します。

考えられる原因 2: タスクがデータ統合実行リソースを待機している

解決策 2: ログに長時間の `WAIT` ステータスが表示される場合、タスクが使用しているデータ統合専用リソースグループに、タスクを実行するための利用可能な同時リソースが不足しています。原因と解決策の詳細については、「Data Integration タスクが常に「待機」ステータスを表示するのはなぜですか?」をご参照ください。

説明

オフライン同期タスクは、スケジューリングリソースグループからデータ統合実行リソースグループにディスパッチされます。したがって、オフライン同期タスクは 1 つのスケジューリングリソースを消費します。オフライン同期タスクがリソースを解放せずに長時間実行されると、他のオフラインタスクだけでなく、他の種類のスケジュールされたタスクもブロックします。

WHERE 句にインデックスがないためにデータ同期タスクが遅くなり、全表スキャンが発生する

  • シナリオ例

    実行された SQL 文は次のとおりです。

    SELECT bid,inviter,uid,createTime FROM `relatives` WHERE createTime>='2016-10-2300:00:00' AND reateTime<'2016-10-24 00:00:00';

    実行は 2016-10-25 11:01:24.875 に開始され、結果は 2016-10-25 11:11:05.489 に返され始めます。同期プログラムは、データベースが SQL クエリの結果を返すのを待ちます。その結果、MaxCompute は実行する前に長時間待機する必要があります。

  • 原因分析

    `WHERE` 句でクエリを実行する際、createTime 列にインデックスがないため、全表スキャンが発生します。

  • 解決策

    パフォーマンスを向上させるために、where 句でインデックス付きの列を使用します。必要な列にインデックスを追加することもできます。

リソースグループの切り替え

オフライン同期タスクの実行リソースグループを切り替えるにはどうすればよいですか?

以前のバージョンのデータ開発の場合:

[DataStudio] のオフライン同期タスク詳細ページでデバッグ用のリソースグループを変更できます。また、[オペレーションセンター] でタスクスケジューリング用のデータ統合実行リソースグループを変更することもできます。詳細については、「Data Integration リソースグループの切り替え」をご参照ください。

新しいバージョンのデータ開発の場合:

[DataStudio] でデータ統合タスクのデバッグに使用するリソースグループを変更できます。また、[オペレーションセンター] でスケジュールされたデータ統合タスクの実行リソースグループを変更することもできます。詳細については、「リソースグループの O&M」をご参照ください。

ダーティデータ

ダーティデータのトラブルシューティングと特定方法

ダーティデータの定義: ターゲットデータソースへの書き込み時に例外を引き起こす単一のデータレコードは、ダーティデータと見なされます。書き込みに失敗したデータはすべてダーティデータとして分類されます。

ダーティデータの影響: ダーティデータは宛先に書き込まれません。ダーティデータを許可するかどうかを制御し、ダーティデータレコード数のしきい値を設定できます。デフォルトでは、Data Integration はダーティデータを許可します。同期タスクを設定する際に、許可されるダーティデータレコード数を指定できます。詳細については、「コードレス UI で同期ノードを設定する」をご参照ください。

  • タスクがダーティデータを許可するように設定されている場合: ダーティデータが生成されると、タスクは実行を続行しますが、ダーティデータは破棄され、宛先には書き込まれません。

  • 許可されるダーティデータレコードの数:

    • 許可されるダーティデータレコード数が 0 の場合、最初のダーティデータレコードが生成されるとタスクは失敗して停止します。

    • 許可されるダーティデータレコード数が x に設定されている場合、ダーティデータレコード数が x を超えるとタスクは失敗して停止します。ダーティデータレコード数が x 未満の場合、タスクは実行を続行しますが、ダーティデータは破棄され、宛先には書き込まれません。

ダーティデータのシナリオ:

  • シナリオ 1:

    • エラーメッセージ: {"message":"Dirty data was encountered when writing to the ODPS destination table: An error occurred in the data of the [3rd] field. Please check the data and modify it, or you can increase the threshold to ignore this record.","record":[{"byteSize":0,"index":0,"type":"DATE"},{"byteSize":0,"index":1,"type":"DATE"},{"byteSize":1,"index":2,"rawData":0,"type":"LONG"},{"byteSize":0,"index":3,"type":"STRING"},{"byteSize":1,"index":4,"rawData":0,"type":"LONG"},{"byteSize":0,"index":5,"type":"STRING"},{"byteSize":0,"index":6,"type":"STRING"}]}

    • 解決策: ログは、ダーティデータが 3 番目のフィールドにあることを示しています。

      • ライターがダーティデータを報告します。ライターのテーブル作成ステートメントを確認してください。ODPS のテーブルに指定されたフィールドサイズが、MySQL の対応するフィールドのデータサイズよりも小さい場合にこのエラーが発生することがあります。

      • データ同期の原則: ソースデータソースからのデータは、宛先データソースに書き込み可能でなければなりません。これは、ソースと宛先のデータ型が一致し、定義されたフィールドサイズに互換性がある必要があることを意味します。たとえば、ソースの `VARCHAR` 型のデータは、`INT` 型のターゲット列に書き込むことはできません。宛先で定義されたデータ型のサイズは、ソースからマッピングされたフィールドの実際のデータサイズを収容できる必要があります。ソースデータが `LONG`、`VARCHAR`、または `DOUBLE` 型の場合、宛先は `string` や `text` などの大きな範囲の型を使用できます。

      • ダーティデータエラーの原因が不明な場合は、ログからダーティデータレコード全体をコピーします。次に、データを調べて宛先のデータ型と比較し、どの部分が仕様に準拠していないかを特定します。

      例:

      {"byteSize":28,"index":25,"rawData":"ohOM71vdGKqXOqtmtriUs5QqJsf4","type":"STRING"}

      この例では、`byteSize` はバイト数、`index: 25` は 26 番目のフィールドを示し、`rawData` は特定の値、`type` はデータ型です。

  • シナリオ 2:

    • 症状: DataX は、MySQL から null 値を読み取るときにダーティデータを報告します。

    • 解決策: ソースで null 値を持つフィールドが、ターゲットテーブルでマッピングされたフィールドのデータ型と一致するかどうかを確認します。データ型が一致しない場合、エラーが報告されます。たとえば、文字列型の null 値を int 型のターゲットフィールドに書き込もうとするとエラーが発生します。

ダーティデータを表示するにはどうすればよいですか?

ログ詳細ページに移動し、[詳細ログ URL] をクリックしてオフライン同期ログとダーティデータ情報を表示できます。查看日志

オフライン同期タスク中にダーティデータのレコード数が上限を超えた場合、すでに同期されたデータは保持されますか?

ダーティデータのレコード数は、タスクの実行中に蓄積されます。この数が指定されたダーティデータの上限を超えると、タスクは直ちに中止されます。

  • データの保持: タスクが中止される前に宛先に正常に書き込まれたデータは保持されます。ロールバック操作は実行されません。

  • ゼロトレランスポリシー: ダーティデータの上限が 0 に設定されている場合、システムはゼロトレランスポリシーを採用します。これは、最初のダーティデータレコードを検出した直後にタスクが失敗して停止することを意味します。

エンコード形式の設定や文字化けによって引き起こされるダーティデータエラーの対処方法

  • 症状:

    データに絵文字が含まれている場合、同期中にダーティデータエラーが報告されることがあります: [13350975-0-0-writer] ERROR StdoutPluginCollector - Dirty data {"exception":"Incorrect string value: '\\xF0\\x9F\\x98\\x82\\xE8\\xA2...' for column 'introduction' at row 1","record":[{"byteSize":8,"index":0,"rawData":9642,"type":"LONG"}],"type":"writer"}

  • 考えられる原因:

    • データベースのエンコーディングが `utf8mb4` に設定されていないため、絵文字の同期時にエラーが発生します。

    • ソースデータ自体が文字化けしています。

    • データベースとクライアントが異なる文字セットを使用しています。

    • ブラウザが異なる文字セットを使用しているため、プレビューが失敗したり文字化けしたりします。

  • 解決策:

    文字化けの原因に応じて、適切な解決策を選択してください:

    • 生データが文字化けしている場合は、同期タスクを実行する前に生データを処理します。

    • データベースとクライアントが一致しない文字セットを使用している場合は、まず文字セットを統一します。

    • ブラウザがデータベースまたはクライアントの文字セットと一致しない文字セットを使用している場合は、まず文字セットを統一してからデータをプレビューします。

    次の操作を試すことができます:

    1. Java Database Connectivity (JDBC) 形式を使用して追加されたデータソースの場合、接続文字列を `utf8mb4` を使用するように変更します: jdbc:mysql://xxx.x.x.x:3306/database?com.mysql.jdbc.faultInjection.serverCharsetIndex=45

    2. インスタンス ID を使用して追加されたデータソースの場合、データベース名の後にパラメーターを追加します: database?com.mysql.jdbc.faultInjection.serverCharsetIndex=45

    3. 関連するデータベースのエンコード形式を `utf8mb4` に変更します。たとえば、RDS コンソールで RDS データベースのエンコード形式を変更できます。

      説明

      RDS データソースのエンコード形式を設定するコマンド: set names utf8mb4。RDS データベースのエンコード形式を表示するコマンド: show variables like 'char%'

ソーステーブルのデフォルト値を保持する

ソーステーブルにデフォルト値がある場合、それらは Data Integration によって作成されたターゲットテーブルに、NOT NULL 制約とともに保持されますか?

DataWorks がターゲットテーブルを作成する際、ソーステーブルから保持されるのは列名、データ型、コメントのみです。ソーステーブルのデフォルト値や `NOT NULL` 制約、インデックスなどの制約は保持されません。

シャードキー

オフライン統合タスクを設定する際に、複合プライマリキーをシャードキーとして使用できますか?

いいえ。オフライン統合タスクは、複合プライマリキーをシャードキーとして使用することをサポートしていません。

データ欠損

データ同期後、ターゲットテーブルのデータがソーステーブルと一致しない

データ同期が完了した後にデータ品質の問題が発生した場合は、「オフライン同期におけるデータ品質問題のトラブルシューティング」をご参照ください。詳細なトラブルシューティングについて説明しています。

SSRF 攻撃

タスクにおける SSRF 攻撃Task have SSRF attacks: 解決方法は?

Q: 「Task have SSRF attacks」エラーの対処方法

原因: クラウドのセキュリティを確保するため、DataWorks はタスクがパブリック IP アドレスを使用してクラウド環境の内部ネットワークアドレスに直接アクセスすることを禁止しています。このセキュリティ対策は、HTTP Reader などのプラグイン設定で内部 IP アドレスまたは VPC ドメイン名を指す URL が入力されたときにトリガーされます。

解決策:

内部データソースで実行されるタスクについては、パブリックリソースグループの使用を中止してください。代わりに、安全で信頼性の高いServerless リソースグループ (推奨) またはデータ統合専用リソースグループに切り替えてください。

日付の書き込み

日付と時刻のデータをテキストファイルに書き込む際に、ミリ秒を保持したり、カスタムの日付と時刻の形式を指定したりするにはどうすればよいですか?

同期タスクをコードエディタに切り替えた後、タスク設定ページの setting セクションに次の設定項目を追加します:

"common": {
  "column": {
    "dateFormat": "yyyyMMdd",
    "datetimeFormatInNanos": "yyyyMMdd HH:mm:ss.SSS"
  }
}

image.png

ここで:

  • `dateFormat` は、ソースの `DATE` 型 (時刻なし) がテキストに変換されるときに使用する日付形式を指定します。

  • `datetimeFormatInNanos` は、ソースの `DATETIME` または `TIMESTAMP` 型 (時刻あり) がテキストに変換されるときに使用する日付形式を指定し、精度はミリ秒までです。

MaxCompute

MaxCompute (ODPS) テーブルからデータを読み取る際のソーステーブルフィールドの「行の追加」または「フィールドの追加」に関する注意点

  1. 定数を入力できます。値は、'abc''123' のように単一引用符で囲む必要があります。

  2. '${bizdate}' のように、スケジューリングパラメーターと組み合わせて使用できます。スケジューリングパラメーターの使用方法の詳細については、「スケジューリングパラメーターのサポートされている形式」をご参照ください。

  3. `pt` のように、同期したいパーティション列の名前を入力できます。

  4. 入力した値が解析できない場合、型は 'カスタム' として表示されます。

  5. ODPS 関数は設定できません。

  6. 手動で追加した列がカスタムとして表示される場合 (MaxCompute パーティション列や LogHub データでプレビューされない列など)、タスクの実行には影響しません。

MaxCompute (ODPS) テーブルからデータを読み取る際に、パーティションフィールドを同期するにはどうすればよいですか?

フィールドマッピングリストの [ソーステーブルフィールド] で、[行の追加] または [フィールドの追加] をクリックします。pt などのパーティション列名を入力し、ターゲットテーブルフィールドへのマッピングを設定します。

image

MaxCompute (ODPS) テーブルからデータを読み取る際に、複数のパーティションからデータを同期するにはどうすればよいですか?

データを読み取るパーティションを指定します。

  • ODPS パーティション設定は、Linux シェルのワイルドカード文字をサポートしています。アスタリスク (*) は 0 文字以上を表し、疑問符 (?) は 1 文字を表します。

  • デフォルトでは、存在しないパーティションを読み取ろうとするとタスクはエラーを報告します。パーティションが存在しなくてもタスクを成功させるには、[パーティションが存在しない場合] パラメーターを [存在しないパーティションを無視] に設定します。または、コードエディタに切り替えて、ODPS パラメーターに "successOnNoPartition": true 設定を追加することもできます。

たとえば、パーティションテーブル `test` に `pt=1,ds=hangzhou`、`pt=1,ds=shanghai`、`pt=2,ds=hangzhou`、`pt=2,ds=beijing` の 4 つのパーティションが含まれている場合、異なるパーティションからデータを読み取るための設定は次のようになります:

  • `pt=1,ds=hangzhou` パーティションからデータを読み取るには、パーティション情報の設定は "partition":"pt=1,ds=hangzhou" です。

  • `pt=1` のすべてのパーティションからデータを読み取るには、パーティション情報の設定は "partition":"pt=1,ds=*" です。

  • `test` テーブルのすべてのパーティションからデータを読み取るには、パーティション情報の設定は "partition":"pt=*,ds=*" です。

条件を設定してパーティションデータを取得することもできます。次の操作では、コードエディタでタスクを設定する必要があります:

  • 最大パーティションを指定するには、/*query*/ ds=(select MAX(ds) from DataXODPSReaderPPR) 設定を追加できます。

  • 条件でフィルタリングするには、/*query*/ pt+expression 設定などの条件を追加できます。たとえば、/*query*/ pt>=20170101 and pt<20170110 は、2017 年 1 月 1 日 (含む) 以降、2017 年 1 月 10 日 (含まない) より前の `pt` パーティションからすべてのデータを取得します。

説明

/*query*/ は、後続のコンテンツが `WHERE` 条件として認識されることを示します。

MaxCompute で列のフィルタリング、並べ替え、null パディングを実行するにはどうすればよいですか?

MaxCompute Writer を設定することで、列のフィルタリング、並べ替え、null パディングなど、MaxCompute がサポートしていない操作を実行できます。たとえば、インポートするフィールドのリストにすべてのフィールドが含まれている場合、"column": ["*"] のように設定できます。

MaxCompute テーブルに `a`、`b`、`c` の 3 つのフィールドがあり、`c` と `b` のフィールドのみを同期したい場合は、列を "column": ["c","b"] のように設定できます。これは、リーダーからの 1 番目と 2 番目の列が MaxCompute テーブルの `c` と `b` フィールドにインポートされ、MaxCompute テーブルの新しく挿入された行の `a` フィールドが null に設定されることを意味します。

MaxCompute の列設定エラーの処理

書き込まれたデータの信頼性を確保し、余分な列データの損失によるデータ品質の問題を回避するために、MaxCompute Writer は余分な列を検出するとエラーを報告します。たとえば、MaxCompute テーブルに `a`、`b`、`c` のフィールドがあり、MaxCompute Writer が 3 つ以上の列を書き込もうとすると、エラーが報告されます。

MaxCompute パーティション設定に関する注意点

MaxCompute Writer は、最終レベルのパーティションへのデータ書き込みのみをサポートします。特定のフィールドに基づくパーティションルーティングなどの機能はサポートしていません。たとえば、テーブルに 3 つのレベルのパーティションがある場合、パーティション設定では第 3 レベルのパーティションへの書き込みを明示的に指定する必要があります。たとえば、テーブルの第 3 レベルのパーティションにデータを書き込むには、pt=20150101, type=1, biz=2 のように設定できますが、pt=20150101, type=1pt=20150101 のようには設定できません。

MaxCompute タスクの再実行とフェールオーバー

MaxCompute Writer は、"truncate": true を設定すると書き込みのべき等性を保証します。これは、書き込み操作が失敗してタスクが再実行された場合、MaxCompute Writer は以前のデータをクリアし、新しいデータをインポートして、再実行ごとのデータ整合性を確保することを意味します。実行中に他の例外によってタスクが中断された場合、データの原子性は保証されません。データはロールバックされたり、自動的に再実行されたりしません。データの整合性を確保するには、べき等性機能を使用してタスクを再実行する必要があります。

説明

`truncate` が `true` に設定されている場合、指定されたパーティションまたはテーブルのすべてのデータがクリアされます。このパラメーターは注意して使用してください。

MaxCompute (ODPS) テーブルからデータを読み取る際にエラーが発生します: The download session is expired.

  • エラーメッセージ:

    Code:DATAX_R_ODPS_005:Failed to read ODPS data, Solution:[Please contact the ODPS administrator]. RequestId=202012091137444331f60b08cda1d9, ErrorCode=StatusConflict, ErrorMessage=The download session is expired.

  • 考えられる原因:

    オフライン同期では、MaxCompute Tunnel コマンドを使用してデータをアップロードおよびダウンロードします。サーバー上の Tunnel セッションのライフサイクルは 24 時間です。したがって、オフライン同期タスクが 24 時間以上実行されると、タスクは失敗して停止します。Tunnel の詳細については、「命令」をご参照ください。

  • 解決策:

    オフライン同期タスクの並行処理を増やし、同期するデータ量を計画して、タスクが 24 時間以内に完了できるようにします。

MaxCompute (ODPS) への書き込み時にエラーが発生します: Error writing request body to server

  • エラーメッセージ:

    Code:[OdpsWriter-09], Description:[Failed to write data to the ODPS destination table.]. - ODPS destination table write block:0 failed, uploadId=[202012081517026537dc0b0160354b]. Please contact the ODPS administrator. - java.io.IOException: Error writing request body to server.

  • 考えられる原因:

    • 考えられる原因 1: データ型の例外。ソースデータが ODPS データ型仕様に準拠していません。たとえば、ODPS `decimal(18,10)` データ型のフィールドに値 `4.2223` を書き込もうとしています。

    • 考えられる原因 2: ODPS ブロックまたは通信の例外。

  • 解決策:

    データをデータ型仕様に準拠するデータ型に変換します。

MySQL

シャーディングされた MySQL テーブルを単一の MaxCompute テーブルに同期するにはどうすればよいですか?

シャーディングされたテーブルの同期」の指示に従ってこれを設定できます。

宛先の MySQL テーブルの文字セットが utf8mb4 の場合、同期後に文字化けした中国語文字を処理するにはどうすればよいですか?

接続文字列を使用してデータソースを追加する場合、JDBC 形式を次のように変更します: jdbc:mysql://xxx.x.x.x:3306/database?com.mysql.jdbc.faultInjection.serverCharsetIndex=45。詳細については、「MySQL データソースの設定」をご参照ください。

MySQL への書き込みまたは読み取り時にエラーが発生します: Application was streaming results when the connection failed. Consider raising value of 'net_write_timeout/net_read_timeout' on the server.

  • 原因:

    • `net_read_timeout`: DataX は、`SplitPk` に基づいて RDS for MySQL からのデータを複数の等しいデータ取得 SQL 文 (`SELECT` 文) に分割します。SQL 文の実行時間が RDS 側で許可される最大実行時間を超えると、このエラーが発生します。

    • `net_write_timeout`: クライアントにブロックを送信するのを待つタイムアウト期間が短すぎます。

  • 解決策:

    データソースの URL 接続に `net_write_timeout` または `net_read_timeout` パラメーターを追加してより大きな値を設定するか、RDS コンソールでこのパラメーターを調整します。

  • 提案:

    タスクが再実行可能な場合は、エラー時にタスクが自動的に再実行されるように設定します。

数据源参数设置

例: jdbc:mysql://192.168.1.1:3306/lizi?useUnicode=true&characterEncoding=UTF8&net_write_timeout=72000

MySQL へのオフライン同期タスクがエラーで失敗します: [DBUtilErrorCode-05]ErrorMessage: Code:[DBUtilErrorCode-05]Description:[Failed to write data to the configured destination table.]. - com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: No operations allowed after connection closed

原因:

MySQL の wait_timeout パラメーターのデフォルト値は 8 時間です。この時間に達したときにまだデータがフェッチされている場合、同期タスクは中断されます。

解決策:

MySQL 設定ファイル my.cnf (Windows では my.ini) を変更します。MySQL モジュールで、次のパラメーター (秒単位) を追加します: wait_timeout=2592000 interactive_timeout=2592000。次に、MySQL を再起動してログインし、次のステートメントを実行して設定が成功したかどうかを確認します: show variables like '%wait_time%'

MySQL データベースから読み取り時にエラーが発生します: The last packet successfully received from the server was 902,138 milliseconds ago

CPU 使用率は正常でもメモリ使用率が高い場合、接続が切断されることがあります。

タスクが自動的に再実行できることを確認した場合は、タスクを [エラー時に再実行] に設定します。詳細については、「時間プロパティの設定」をご参照ください。

PostgreSQL

PostgreSQL からデータを読み取る際にエラーが発生します: org.postgresql.util.PSQLException: FATAL: terminating connection due to conflict with recovery

  • シナリオ: オフライン同期ツールを使用して PostgreSQL データを同期すると、次のエラーが発生します: org.postgresql.util.PSQLException: FATAL: terminating connection due to conflict with recovery

  • 考えられる原因: これは、データベースからのデータ取得に時間がかかるために発生します。max_standby_archive_delay および max_standby_streaming_delay パラメーターの値を増やしてください。詳細については、「スタンバイサーバーイベント」をご参照ください。

RDS

Amazon RDS ソースを使用したオフライン同期タスクがエラーで失敗します: Host is blocked

Amazon RDS への接続で Host is blocked が返される場合は、Amazon ロードバランサーのヘルスチェックを無効にする必要があります。無効にすると、ブロックの問題は報告されなくなります。

MongoDB

root ユーザーで MongoDB データソースを追加するとエラーが発生するのはなぜですか?

root ユーザーで MongoDB データソースを追加するとエラーが発生するのは、同期する必要のあるテーブルのデータベースで作成されたユーザー名を使用する必要があるためです。root ユーザーは使用できません。

たとえば、`name` テーブルをインポートし、`name` テーブルが `test` データベースにある場合、ここのデータベース名は `test` でなければならず、`test` データベースで作成されたユーザーのユーザー名を使用する必要があります。

MongoDB から読み取る際に、クエリパラメーターでタイムスタンプを使用して増分同期を実行するにはどうすればよいですか?

代入ノードを使用して、まず日付型の時刻をタイムスタンプに処理し、この値を MongoDB データ同期の入力パラメーターとして使用できます。詳細については、「MongoDB のタイムスタンプ型フィールドで増分同期を実行する方法」をご参照ください。

MongoDB から宛先データソースにデータを同期した後、タイムゾーンが +8 時間ずれます。これを修正するにはどうすればよいですか?

MongoDB Reader の設定でタイムゾーンを設定する必要があります。詳細については、「MongoDB Reader」をご参照ください。

MongoDB からのデータ読み取り中に、ソースでレコードが更新されても宛先に同期されません。これを処理するにはどうすればよいですか?

クエリ条件を変更せずに、一定期間後にタスクを再起動できます。これは、設定を変更せずに同期タスクの実行時間を遅らせることを意味します。

MongoDB Reader は大文字と小文字を区別しますか?

データを読み取る際、ユーザーが設定した Column.name は大文字と小文字を区別します。間違って設定すると、読み取られたデータは null になります。例:

  • MongoDB ソースデータ:

    {
        "MY_NAME": "zhangsan"
    }
  • 同期タスクの列設定:

    {
        "column":
        [
            {
                "name": "my_name"
            }
        ]
    }

同期タスクの設定の大文字と小文字がソースデータと一致しないため、データ読み取り例外が発生します。

MongoDB Reader のタイムアウト期間を設定するにはどうすればよいですか?

タイムアウト設定パラメーターは cursorTimeoutInMs で、デフォルト値は 600,000 ms (10 分) です。このパラメーターは、MongoDB サーバーがクエリを実行するのにかかる合計時間を表し、データ転送時間は含まれません。フルプルで読み取るデータ量が多い場合、エラー MongoDBReader$Task - operation exceeded time limitcom.mongodb.MongoExecutionTimeoutException: operation exceeded time limit が発生する可能性があります。

MongoDB から読み取り時にエラーが発生します: no master

現在、DataWorks 同期タスクはセカンダリノードからのデータ読み取りをサポートしていません。セカンダリノードからの読み取りを設定すると、エラー no master が報告されます。

MongoDB から読み取り時にエラーが発生します: MongoExecutionTimeoutException: operation exceeded time limit

  • 原因分析:

    このエラーはカーソルのタイムアウトが原因です。

  • 解決策:

    cursorTimeoutInMs パラメーターの値を増やします。

MongoDB から読み取るオフライン同期タスクがエラーで失敗します: DataXException: operation exceeded time limit

タスクの並行処理と読み取りの `BatchSize` を増やす必要があります。

MongoDB 同期タスクがエラーで失敗します: no such cmd splitVector

  • 考えられる原因:

    同期タスクが実行されると、デフォルトでタスクのシャーディングに splitVector コマンドを使用します。一部の MongoDB バージョンは splitVector コマンドをサポートしていないため、no such cmd splitVector エラーが発生します。

  • 解決策:

    1. 同期タスク設定ページに移動し、[スクリプトに変換] 转换脚本 ボタンをクリックします。タスクをコードエディタに変更します。

    2. MongoDB パラメーター設定で、パラメーター

      "useSplitVector" : false

      を追加して、splitVector の使用を回避します。

MongoDB のオフライン同期タスクがエラーで失敗する:更新を適用後、(不変) フィールド '_id' が _id: "2" に変更されていることが検出されました

  • 症状:

    この問題は、コードレス UI などで、[書き込みモード (上書き)][はい] に設定され、_id 以外のフィールドが [ビジネスプライマリキー] として設定されている場合に、同期タスクで発生する可能性があります。写入模式报错

  • 考えられる原因:

    データには、[_id] が設定済みの [ビジネス主キー] と一致しないレコードが含まれています (上記の構成例の my_id など)。

  • 解決策:

    • 解決策 1: オフライン同期タスクを変更して、設定された [ビジネスプライマリキー]_id と一致するようにします。

    • 解決策 2: データ同期のビジネスプライマリキーとして _id を使用します。

Redis

ハッシュモードで Redis に書き込む際にエラーが発生します: Code:[RedisWriter-04], Description:[Dirty data]. - source column number is in valid!

  • 原因:

    Redis がストレージにハッシュモードを使用する場合、ハッシュの属性と値はペアで表示される必要があります。例: odpsReader: "column":[ "id", "name", "age", "address" ]。宛先では、RedisWriter は `"keyIndexes":[ 0, 1]` で設定されています。Redis では、`id` と `name` がキーとして使用され、`age` が属性として、`address` が値として、ハッシュ型で保存されます。ODPS ソースが 2 つの列のみで設定されている場合、ハッシュモードを使用して Redis キャッシュを保存することはできず、この例外が報告されます。

  • 解決策:

    2 つの列のみを使用したい場合は、Redis が情報を文字列モードで保存するように設定します。ハッシュモードを使用する必要がある場合は、ソースに少なくとも 3 つの列を設定します。

OSS

複数のデリミタを持つ CSV ファイルを読み取る際に、ダーティデータを処理するにはどうすればよいですか?

  • 症状:

    オフライン同期タスクを設定して OSS や FTP などのファイルストレージからデータを読み取る場合、ファイルが CSV 形式で、列区切り文字として複数の文字 (たとえば、|,##、または ;;) を使用していると、タスクが「ダーティデータ」エラーで失敗することがあります。タスクの実行ログには、同様の IndexOutOfBoundsException (範囲外の配列インデックス) エラーが表示され、ダーティデータが生成されます。

  • 原因分析:

    DataWorks の組み込み csv リーダー ("fileFormat": "csv") は、複数の文字で構成されるデリミタを処理する際に制限があり、データ行の列分割が不正確になります。

  • 解決策:

    • コードレス UI: テキストタイプを text に切り替え、複数文字のデリミタを明示的に指定します。

    • コードエディタ: "fileFormat": "csv""fileFormat": "text" に変更し、デリミタを正しく設定します: "fieldDelimiter":"<multi-delimiter>", "fieldDelimiterOrigin":"<multi-delimiter>"

OSS から読み取れるファイルの数に制限はありますか?

オフライン同期自体は、OSS リーダープラグインが読み取れるファイルの数を制限しません。ファイル読み取りの主な制限は、タスク自体のリソース消費 (CU) から来ます。一度にあまりにも多くのファイルを読み取ると、メモリ不足エラーが発生しやすくなります。したがって、OutOfMemoryError: Java heap space エラーを防ぐために、object パラメーターを `*` に設定しないことをお勧めします。

OSS に書き込む際に、ファイル名に表示されるランダムな文字列を削除するにはどうすればよいですか?

OSS Writer はファイル名を使用してディレクトリ構造をシミュレートします。OSS にはオブジェクト名に関する次の制限があります: `"object": "datax"` を使用すると、書き込まれたオブジェクトは `datax` で始まり、接尾辞としてランダムな文字列が追加されます。ファイルの数は、シャーディングされたタスクの実際の数によって決まります。

ランダムな UUID 接尾辞が不要な場合は、`"writeSingleObject" : "true"` を設定します。詳細については、「OSS データソース」ドキュメントの `writeSingleObject` の説明をご参照ください。

OSS からデータを読み取る際にエラーが発生します: AccessDenied The bucket you access does not belong to you.

  • 原因:

    データソースに設定された AccessKey アカウントに、バケットに対する必要な権限がありません。

  • 解決策:

    OSS データソースに設定されている AccessKey アカウントに、バケットの読み取り権限を付与します。

Hive

ローカル Hive インスタンスへのオフライン同期タスクがエラーで失敗します: Could not get block locations.

  • 原因分析:

    mapred.task.timeout パラメーターが短すぎる値に設定されている可能性があり、その結果 Hadoop がタスクを終了し、一時ディレクトリをクリーンアップします。これにより、一時データが見つからなくなります。

  • 解決策:

    オフライン同期タスクのデータソース設定で、[Hive 読み取り方法][Hive JDBC に基づくデータ読み取り (条件付きフィルタリングをサポート)] を選択した場合、[セッション設定] フィールドで mapred.task.timeout パラメーターを設定できます。例: mapred.task.timeout=600000

DataHub

DataHub に書き込む際に、単一書き込みデータ制限を超えたことによる書き込み失敗を処理するにはどうすればよいですか?

  • エラーメッセージ:

    ERROR JobContainer - Exception when job runcom.alibaba.datax.common.exception.DataXException: Code:[DatahubWriter-04], Description:[Write data failed.]. - com.aliyun.datahub.exception.DatahubServiceException: Record count 12498 exceed max limit 10000 (Status Code: 413; Error Code: TooLargePayload; Request ID: 20201201004200a945df0bf8e11a42)

  • 考えられる原因:

    このエラーは、DataX が単一のバッチで DataHub に送信するデータ量が DataHub の制限を超えたために発生します。DataHub に送信されるデータ量に影響を与える設定パラメーターは主に次のとおりです:

    • maxCommitSize: データがバッチで宛先に送信される前のデータバッファーの最大サイズ (メガバイト (MB))。デフォルト値は 1 MB (1,048,576 バイト) です。

    • batchSize: DataX-On-Flume バッファーに蓄積されるデータレコードの数。蓄積されたレコード数が指定された `batchSize` に達すると、レコードはバッチとして宛先に送信されます。

  • 解決策:

    [maxCommitSize] および [batchSize] パラメーターを減らします。

LogHub

LogHub から読み取る際に、データを含むフィールドが空として同期される

このプラグインはフィールド名の大文字と小文字を区別します。LogHub リーダーの列設定を確認してください。

LogHub から読み取る際にデータが欠落する

Data Integration は、データが LogHub に入った時刻を使用します。LogHub コンソールで、`receive_time` メタデータフィールドがタスクに設定された時間範囲内にあるかどうかを確認してください。

LogHub から読み取る際に、マッピングされたフィールドが期待どおりでない

これが発生した場合は、インターフェイスで列を手動で編集してください。

Lindorm

Lindorm の一括メソッドを使用してデータを書き込む場合、毎回既存データが置き換えられますか?

ロジックは API 書き込みメソッドと同じです。同じ行と列のデータは上書きされますが、他のデータは変更されません。

Elasticsearch

ES インデックス配下のすべてのフィールドをクエリするにはどうすればよいですか?

curl コマンドを使用して ES インデックスマッピングを取得し、マッピングからすべてのフィールドを抽出できます。

  • クエリ Shell コマンド:

    //es7
    curl -u username:password --request GET 'http://esxxx.elasticsearch.aliyuncs.com:9200/indexname/_mapping'
    //es6
    curl -u username:password --request GET 'http://esxxx.elasticsearch.aliyuncs.com:9200/indexname/typename/_mapping'
  • 結果からフィールドを取得:

    {
        "indexname": {
            "mappings": {
                "typename": {
                    "properties": {
                        "field1": {
                            "type": "text"
                        },
                        "field2": {
                            "type": "long"
                        },
                        "field3": {
                            "type": "double"
                        }
                    }
                }
            }
        }
    }

    返された結果では、`properties` セクションにインデックスのすべてのフィールドとそのプロパティ定義が含まれています。上記のインデックスには、`field1`、`field2`、`field3` の 3 つのフィールドが含まれています。

ES から他のデータソースにデータをオフラインで同期する際、インデックス名が毎日変わります。これを設定するにはどうすればよいですか?

インデックス設定に日付スケジューリング変数を追加できます。これにより、インデックス文字列が異なる日付に基づいて計算され、Elasticsearch Reader のインデックス名が自動的に変更されるようになります。設定プロセスには、日付変数の定義、インデックス変数の設定、タスクの公開と実行の 3 つのステップが含まれます。

  1. 日付変数の定義: 同期タスクのスケジューリング設定で、[パラメーターの追加] を選択して日付変数を定義します。以下の例では、`var1` はタスクの実行時間 (当日) を表し、`var2` はタスクのデータタイムスタンプ (前日) を表すように設定されています。定义日期变量

  2. インデックス変数の設定: タスクをコードエディタに切り替え、Elasticsearch Reader のインデックスを設定します。設定形式は `${variable_name}` です。下の図を参照してください。配置索引变量

  3. タスクの公開と実行: 検証後、タスクをオペレーションセンターに送信して公開し、定期的なスケジュールまたはデータバックフィルを使用して実行します。

    1. [パラメーター付きで実行] ボタンをクリックして、検証のためにタスクを実行します。この操作により、タスク設定のスケジューリングシステムパラメーターが置き換えられます。タスクが完了したら、ログをチェックして、同期されたインデックスが期待どおりであることを確認します。

      説明

      パラメーター付きで実行する場合、テスト用のパラメーター値を直接入力します。

      运行运行

    2. 検証が成功した場合は、[保存][送信] をクリックして、同期タスクを本番環境に追加できます。提交任务

      標準プロジェクトの場合、[公開] ボタンをクリックして公開センターを開き、同期タスクを本番環境に公開します。发布

  4. 結果: 次の図は、設定と実行からの実際のインデックス結果を示しています。

    スクリプトインデックス設定: "index": "esstress_1_${var1}_${var2}"

    実行中のインデックスは次のように置き換えられました: esstress_1_20230106_20230105

    运行结果

Elasticsearch Reader は Object または Nested フィールドのプロパティをどのように同期しますか?(例: object.field1 の同期)

オブジェクトフィールドのプロパティを同期するには、コードエディタを使用する必要があります。コードエディタで、`multi` を次のように設定し、`property.sub-property` 形式を使用して列を設定します。

"multi":{
   "multi":true 
 }

設定については、次の例を参照できます:

#例:
##ES のデータ
"hits": [
    {
        "_index": "mutiltest_1",
        "_type": "_doc",
        "_id": "7XAOOoMB4GR_1Dmrrust",
        "_score": 1.0,
        "_source": {
            "level1": {
                "level2": [
                    {
                        "level3": "testlevel3_1"
                    },
                    {
                        "level3": "testlevel3_2"
                    }
                ]
            }
        }
    }
]
##Reader の設定
"parameter": {
  "column": [
      "level1",
      "level1.level2",
      "level1.level2[0]"
  ],
  "multi":{
        "multi":true
    }
}
##Writer の結果: 1 行 3 列。列の順序は reader の設定と同じです。
COLUMN              VALUE
level1:             {"level2":[{"level3":"testlevel3_1"},{"level3":"testlevel3_2"}]}
level1.level2:      [{"level3":"testlevel3_1"},{"level3":"testlevel3_2"}]
level1.level2[0]:   {"level3":"testlevel3_1"}

ODPS から ES に文字列型を同期した後、周囲の引用符が欠落しています。これを修正するにはどうすればよいですか?ソースからの JSON 型の文字列を ES の NESTED オブジェクトとして同期できますか?

  1. 文字の周りの余分な二重引用符は、Kibana ツールの表示上の問題です。実際のデータにはこれらの周囲の二重引用符はありません。curl コマンドまたは Postman を使用して実際のデータを表示できます。データを取得するための curl コマンドは次のとおりです:

    //es7
    curl -u username:password --request GET 'http://esxxx.elasticsearch.aliyuncs.com:9200/indexname/_mapping'
    //es6
    curl -u username:password --request GET 'http://esxxx.elasticsearch.aliyuncs.com:9200/indexname/typename/_mapping'

    结果

  2. ES 書き込みフィールドタイプを `nested` に設定して、ODPS からの JSON 型文字列データを ES に同期し、ネスト形式で保存できます。次の例では、`name` フィールドをネスト形式で ES に同期します。

    • 同期設定: `name` のタイプを nested に設定します。同步配置

    • 同期結果: `name` はネストされたオブジェクトタイプです。同步结果

ソースデータが 文字列「[1,2,3,4,5]」の場合、それを ES に配列として同期するにはどうすればよいですか?

ES に配列型として書き込むには 2 つの設定方法があります。ソースデータの形式に基づいて適切な同期方法を選択できます。

  • ソースデータを JSON 形式で解析して ES に配列型として書き込みます。たとえば、ソースデータが `"[1,2,3,4,5]"` の場合、json_array=true を設定してソースデータを解析し、ES フィールドに配列として書き込みます。ColumnList を json_array=true に設定します。

    • コードレス UI 設定:Codeless UI configuration

    • コードエディタ設定:

      "column":[
        {
          "name":"docs",
          "type":"keyword",
          "json_array":true
        }
      ]
  • ソースデータをデリミタで解析して ES に配列型として書き込みます。たとえば、ソースデータが `"1,2,3,4,5"` の場合、デリミタ `splitter=","` を設定して解析し、ES フィールドに配列として書き込みます。

    • 制限事項:

      • タスクごとに 1 種類の区切り文字のみがサポートされます。splitter はグローバルに一意です。複数の Array フィールドに異なる区切り文字を設定することはできません。たとえば、ソース列が col1="1,2,3,4,5" , col2="6-7-8-9-10" の場合、各列に個別の splitter を設定することはできません。

      • スプリッターは正規表現にすることができます。たとえば、ソース列の値が `"6-,-7-,-8+,*9-,-10"` の場合、`splitter:".,."` を設定できます。これはコードレス UI でサポートされています。

    • コードレス UI 設定: 脚本模式配置splitter: デフォルトは "-,-"

    • コードエディタ設定:

      "parameter" : {
            "column": [
              {
                "name": "col1",
                "array": true,
                "type": "long"
              }
            ],
            "splitter":","
      }

ES にデータを書き込む際、ユーザー名なしで送信が行われますが、それでもユーザー名の検証が必要となり、送信が失敗して要求されたすべてのデータがログに記録されます。これにより、毎日多くの監査ログが生成されます。これを処理するにはどうすればよいですか?

  • 原因分析:

    HttpClient は、接続が確立されるたびに、まず資格情報なしでリクエストを行うように事前設定されています。認証要件を受け取り、応答に基づいて認証方法を決定した後、資格情報付きでリクエストを行います。ES にデータを書き込むたびに接続を確立する必要があるため、書き込みごとに資格情報なしのリクエストが 1 つあります。これにより、各データ書き込みが監査ログに記録されます。

  • 解決策:

    コードエディタで "preemptiveAuth":true 設定を追加する必要があります。

ES にデータを Date 型として同期するにはどうすればよいですか?

日付の書き込みを設定するには 2 つの方法があります。必要に応じて選択できます。

  • リーダーが読み取ったフィールドの内容を ES の Date フィールドに直接書き込みます:

    • `origin:true` を設定して、読み取った内容を直接 ES に書き込みます。

    • ES 書き込みを使用してマッピングを作成するときに、フィールドのフォーマット属性を指定するために `"format"` を設定します。

      "parameter" : {
          "column": [
              {
                  "name": "col_date",
                  "type": "date",
                  "format": "yyyy-MM-dd HH:mm:ss",
                  "origin": true
              }
                ]
      }
  • タイムゾーン変換: Data Integration にタイムゾーン変換を実行させる必要がある場合は、Timezone パラメーターを追加します。

    "parameter" : {
        "column": [
            {
                "name": "col_date",
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss",
                "Timezone": "UTC"
            }
              ]
    }

指定された外部バージョンのため、Elasticsearch Writer がデータの書き込みに失敗します。これを処理するにはどうすればよいですか?

  • `type:version` を設定した場合、ES は現在、外部バージョンの指定をサポートしていません。

        "column":[
                                {
                                    "name":"id",
                                    "type":"version"
                                },
      ]
  • 解決策:

    `"type":"version"` 設定を削除する必要があります。Elasticsearch Writer は現在、外部バージョンの指定をサポートしていません。

Elasticsearch から読み取るオフライン同期タスクがエラーで失敗します: ERROR ESReaderUtil - ES_MISSING_DATE_FORMAT, Unknown date value. please add "dataFormat". sample value:

  • 原因分析:

    ES データソースの対応する日付フィールドのマッピングにフォーマットが設定されていないため、Elasticsearch Reader は日付型フィールドの日付フォーマットを解析できません。

  • 解決策:

    • `dateFormat` パラメーターを ES 日付フィールドのフォーマットと同じフォーマットで、"||" で区切って設定します。フォーマットにはすべての日付型フォーマットを含める必要があります。例:

      "parameter" : {
            "column": [
           			"dateCol1",
              	"dateCol2",
                "otherCol"
            ],
           "dateFormat" : "yyyy-MM-dd||yyyy-MM-dd HH:mm:ss",
      }
    • ES データベースのすべての日付フィールドのマッピングをフォーマットに設定します。

Elasticsearch から読み取るオフライン同期タスクがエラーで失敗します: com.alibaba.datax.common.exception.DataXException: Code:[Common-00].

  • 原因分析:

    fastjson のキーワード制限により、インデックスまたは列に `$ref` などのキーワードが含まれている可能性があります。

  • 解決策:

    Elasticsearch Reader は現在、フィールドにキーワード `$ref` を含むインデックスの同期をサポートしていません。詳細については、「Elasticsearch Reader」をご参照ください。

Elasticsearch に書き込むオフライン同期タスクがエラーで失敗します: version_conflict_engine_exception.

  • 原因分析:

    ES の楽観的ロックメカニズムがトリガーされました。現在のバージョン番号は xxx であるべきですが、更新コマンドが異なるバージョン番号を渡したため、バージョン競合が発生しました。データが更新されている間に、別のプロセスがインデックスデータを削除していました。

  • 解決策:

    1. データ削除の動作が発生したかどうかを確認します。

    2. タスクの同期方法を [更新] から [インデックス] に変更します。

Elasticsearch に書き込むオフライン同期タスクがエラーで失敗します: illegal_argument_exception.

  • 原因分析:

    列フィールドに `similarity` や `properties` などの高度なプロパティを設定する場合、プラグインがそれらを認識するためには `other_params` が必要です。原因

  • 解決策:

    列に other_params を設定し、other_params 内に `similarity` を追加します。以下に示します:

    {"name":"dim2_name",...,"other_params":{"similarity":"len_similarity"}}

ODPS Array フィールドから Elasticsearch へのオフライン同期タスクがエラーで失敗します: dense_vector

  • 原因分析:

    現在、Elasticsearch へのオフライン同期は dense_vector 型をサポートしていません。次の型のみがサポートされています:

    ID,PARENT,ROUTING,VERSION,STRING,TEXT,KEYWORD,LONG,
    INTEGER,SHORT,BYTE,DOUBLE,FLOAT,DATE,BOOLEAN,BINARY,
    INTEGER_RANGE,FLOAT_RANGE,LONG_RANGE,DOUBLE_RANGE,DATE_RANGE,
    GEO_POINT,GEO_SHAPE,IP,IP_RANGE,COMPLETION,TOKEN_COUNT,OBJECT,NESTED;
  • 解決策:

    Elasticsearch Writer がサポートしていない型を処理するには:

    • Elasticsearch Writer を使用してインデックスマッピングを作成しないでください。代わりに顧客が作成したマッピングを使用してください。

    • 対応する型を `NESTED` に変更します。

    • 設定を次のように変更します: dynamic = true および cleanup=false

Elasticsearch Writer で設定された設定がインデックス作成時に有効にならないのはなぜですか?

  • 原因:

    #不正な設定
    "settings": {
      "index": {
        "number_of_shards": 1,
        "number_of_replicas": 0
      }
    }
    #正しい設定
    "settings": {
      "number_of_shards": 1,
      "number_of_replicas": 0
    }
  • 解決策:

    設定はインデックスが作成されるときにのみ有効になります。インデックス作成は、インデックスが存在しない場合と cleanup=true の 2 つの場合に発生します。cleanup=true の場合、設定構成は `"index"` を使用する必要はありません。

自作のインデックスのネスト型のプロパティが、自動生成後にキーワード型に変わるのはなぜですか? (自動生成とは、cleanup=true を設定した同期タスクの実行を指します。)

#元のマッピング
{
  "name":"box_label_ret",
  "properties":{
    "box_id":{
      "type":"keyword"
    }
}
#cleanup=true で再構築後、次のようになります
{
    "box_label_ret": {
      "properties": {
        "box_id": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }}}}
}
  • 原因分析:

    ネストされた型の場合、Elasticsearch Writer は最上位のマッピングのみを使用し、ES が下の複合型を適応させることができます。プロパティタイプを `text` に変更し、`fields:keyword` を追加することは ES の適応的な動作であり、その使用には影響しません。このマッピング形式が気になる場合は、「Data Integration 同期タスクの機能」をご参照ください。

  • 解決策:

    同期の前に、期待される ES インデックスマッピングを作成し、ES 同期タスクの cleanup パラメーターを false に設定して同期タスクを実行します。

Kafka

Kafka から読み取る際に、endDateTime を設定してデータ範囲を指定したが、この時間を超えるデータが宛先で見つかる

Kafka Reader はデータをバッチで読み取ります。データのバッチに endDateTime を超えるレコードが含まれている場合、同期は停止しますが、endDateTime を超えるこのデータも宛先データソースに書き込まれます。

  • `skipExceedRecord` 設定項目を使用して、超過データを同期するかどうかを指定できます。使用方法の詳細については、「Kafka Reader」をご参照ください。[データ損失の原因となる可能性があるため、これを同期しないように設定しないことをお勧めします。]

  • Kafka の `max.poll.records` パラメーターを設定して、一度にプルするデータ量を指定できます。これを並行処理と組み合わせることで、潜在的な超過データ量を制御できます。超過データ量は `max.poll.records` × 並行処理未満です。

Kafka 内のデータ量が少ないタスクが、データを読み取らずに長時間実行され続けるのはなぜですか?

  • 原因分析:

    データ量が少ないか、データ分布が不均一なため、一部の Kafka パーティションに新しいデータがないか、新しいデータが読み取りの指定された終了オフセットに達していません。タスクの終了条件はすべてのパーティションが指定された終了オフセットに達することであるため、これらの「アイドル」パーティションは条件を満たすことができず、タスク全体の正常な完了を妨げます。

  • 解決策:

    この場合、[同期終了ポリシー][1 分間新しいデータが読み取られない場合は停止] に設定します (コードエディタでは、stopWhenPollEmpty パラメーターを true に、stopWhenReachEndOffset パラメーターを true に設定します)。これにより、すべてのパーティションから最新のオフセットデータを読み取った直後にタスクが終了し、ドライランを回避できます。ただし、タスクが終了した後に、設定された終了オフセットより前のタイムスタンプを持つレコードがパーティションに書き込まれた場合、これらのレコードは消費されないことに注意してください。

RestAPI

RestAPI Writer がエラーを報告します: The JSON string found through path:[] is not an array type

RestAPI Writer は 2 つの書き込みモードを提供します。複数のデータレコードを同期する場合は、`dataMode` を `multiData` に設定する必要があります。詳細については、「RestAPI Writer」をご参照ください。また、RestAPI Writer スクリプトに `dataPath:"data.list"` パラメーターを追加する必要もあります。参数

重要

Column を設定する際に "data.list" プレフィックスを追加する必要はありません。

OTS Writer の設定

自動採番主キー列を含むターゲットテーブルにデータを書き込むように OTS Writer を設定するにはどうすればよいですか?

  1. OTS Writer の設定には、次の 2 行を含める必要があります:

    "newVersion": "true",
    "enableAutoIncrement": "true",
  2. OTS Writer には、自動採番主キー列の名前を設定する必要はありません。

  3. OTS Writer で設定された primaryKey エントリの数 + column エントリの数は、上流の OTS Reader データの列数と等しくなければなりません。

時系列モデルの設定

時系列モデルの設定で、_tagis_timeseries_tag フィールドは何を意味しますか?

例: データレコードには `[phone=Xiaomi, memory=8G, camera=Leica]` の 3 つのタグがあります。数据

  • データエクスポートの例 (OTS Reader)

    • 上記のタグを 1 つの列にマージしてエクスポートする場合は、次のように設定します:

      "column": [
            {
              "name": "_tags",
            }
          ],

      DataWorks はタグを 1 つのデータ列としてエクスポートし、次の形式になります:

      ["phone=xiaomi","camera=LEICA","RAM=8G"]
    • phonecamera タグをエクスポートして、各タグが別々の列になるようにするには、次の設定を使用します:

      "column": [
            {
              "name": "phone",
              "is_timeseries_tag":"true",
            },
            {
              "name": "camera",
              "is_timeseries_tag":"true",
            }
          ],

      次の 2 つのデータ列が取得されます:

      xiaomi, LEICA
  • データインポートの例 (OTS Writer)

    上流のデータソース (Reader) に 2 つのデータ列があるとします:

    • 1 つのデータ列は: ["phone=xiaomi","camera=LEICA","RAM=8G"] です。

    • もう 1 つのデータ列は: 6499 です。

    これらの両方の列をタグに追加し、書き込み後の期待されるタグフィールド形式が以下のようになる場合: 格式次のように設定します:

    "column": [
          {
            "name": "_tags",
          },
          {
            "name": "price",
            "is_timeseries_tag":"true",
          },
        ],
    • 最初の列の設定では、["phone=xiaomi","camera=LEICA","RAM=8G"] を全体としてタグフィールドにインポートします。

    • 2 番目の列の設定では、price=6499 を個別にタグフィールドにインポートします。

カスタムテーブル名

オフライン同期タスクのテーブル名をカスタマイズするにはどうすればよいですか?

テーブル名が orders_20170310orders_20170311orders_20170312 のように規則的なパターンに従い、日によって区別され、テーブル構造が一貫している場合、スケジューリングパラメーターとコードエディタ設定を組み合わせてテーブル名をカスタマイズできます。これにより、毎朝ソースデータベースから前日のテーブルデータを自動的に読み取ることができます。

たとえば、今日が 2017 年 3 月 15 日の場合、タスクは自動的に orders_20170314 テーブルからデータを読み取り、インポートします。自定义表名

コードエディタで、ソーステーブル名を orders_${tablename} のように変数に変更します。テーブル名は日によって区別され、毎日前日のデータを読み取る必要があるため、タスクのパラメーター設定で、変数を tablename=${yyyymmdd} として割り当てます。

説明

スケジューリングパラメーターの使用方法の詳細については、「スケジューリングパラメーターのサポートされている形式」をご参照ください。

テーブルの変更と列の追加

オフライン同期タスクのソーステーブルで列を追加または変更するにはどうすればよいですか?

同期タスク設定ページに移動し、フィールドマッピングを変更して、タスク設定で変更されたフィールドを更新します。その後、変更を有効にするには、タスクを再送信して実行する必要があります。

タスク設定の問題

オフライン同期ノードを設定する際に、すべてのテーブルを表示できません。これを処理するにはどうすればよいですか?

オフライン同期ノードを設定する際、[ソースの選択] エリアには、選択したデータソースの最初の 25 テーブルのみがデフォルトで表示されます。さらに多くのテーブルがある場合は、テーブル名で検索するか、開発にコードエディタを使用できます。

テーブル/列名のキーワード

テーブル名または列名のキーワードによって引き起こされる同期タスクの失敗に対処するにはどうすればよいですか?

  • 原因: 列に予約語が含まれているか、列設定に数値で始まるフィールドが含まれています。

  • 解決策: Data Integration 同期タスクをコードエディタに切り替え、列設定で特殊なフィールドをエスケープします。コードエディタでタスクを設定する方法の詳細については、「コードエディタで同期ノードを設定する」をご参照ください。

    • MySQL のエスケープ文字は `keyword` です。

    • Oracle と PostgreSQL のエスケープ文字は "keyword" です。

    • SQL Server のエスケープ文字は [keyword] です。

    MySQL の例:字段冲突

  • MySQL データソースを例にとります:

    1. 次のステートメントを実行して、`aliyun` という名前の新しいテーブルを作成します。create table aliyun (`table` int ,msg varchar(10));

    2. 次のステートメントを実行してビューを作成し、`table` 列にエイリアスを付けます。create view v_aliyun as select `table` as col1,msg as col2 from aliyun;

      説明
      • `table` は MySQL のキーワードです。データ同期中に生成されるコードはエラーを引き起こします。したがって、`table` 列にエイリアスを付けるためにビューを作成する必要があります。

      • テーブルの列名としてキーワードを使用しないでください。

    3. 上記のステートメントを実行することで、キーワードを持つ列にエイリアスを割り当てることができます。同期タスクを設定するときは、`aliyun` テーブルの代わりに `v_aliyun` ビューを選択します。

フィールドマッピング

オフラインタスクがエラーで失敗します: plugin xx does not specify column

このエラーは、同期タスクのフィールドマッピングが正しく設定されていないか、プラグインの列が正しく設定されていないために発生する可能性があります。

  1. フィールドマッピングが設定されているかどうかを確認します。

  2. プラグインの列が正しく設定されているかどうかを確認します。

非構造化データソースの場合、[データプレビュー] をクリックしてもフィールドをマッピングできません。これを処理するにはどうすればよいですか?

  • 症状:

    [データプレビュー] をクリックすると、次のメッセージが表示され、フィールドが最大バイトサイズを超えていることを示します。

    问题现象

  • 原因: メモリ不足 (OOM) エラーを回避するため、データソースサービスはデータプレビューリクエストを処理する際にフィールドの長さをチェックします。単一の列が 1,000 バイトを超えると、上記のメッセージが表示されます。このメッセージはタスクの通常の操作には影響しません。このエラーを無視して、オフライン同期タスクを直接実行できます。

    説明

    ファイルが存在し、接続が正常な場合、データプレビューが失敗する他の理由には次のものがあります:

    • ファイルの単一行のバイトサイズが 10 MB の制限を超えています。この場合、上記のメッセージと同様にデータは表示されません。

    • ファイルの単一行の列数が 1,000 の制限を超えています。この場合、最初の 1,000 列のみが表示され、1001 番目の列にメッセージが表示されます。

TTL の変更

同期されたデータテーブルの場合、TTL は ALTER メソッドでのみ変更できますか?

生存時間 (TTL) はテーブルに設定されます。同期タスクの設定にはそのようなオプションはありません。

関数の集約

API 経由で同期する場合、集約のためにソース側の関数 (例: MaxCompute) を使用することはサポートされていますか?たとえば、ソーステーブルに Lindorm のプライマリキーとして列 a と b がある場合

API 同期はソース側の関数の使用をサポートしていません。まずソース側で関数を使用してデータを処理し、その後データをインポートする必要があります。