すべてのプロダクト
Search
ドキュメントセンター

Data Transmission Service:移行、同期、またはサブスクリプションオブジェクトの説明

最終更新日:Nov 09, 2025

DTS は、API 操作を呼び出して DTS タスクの移行、同期、またはサブスクリプションオブジェクトを構成またはクエリすることをサポートしています。このトピックでは、関連する API 操作について説明し、オブジェクトの定義と構成例を示します。

関連する API 操作とパラメーター

API

説明

Dblist リクエストパラメーターで DTS タスクの移行、同期、またはサブスクリプションオブジェクトを構成します。

DbObject レスポンスパラメーターで DTS タスクの移行、同期、またはサブスクリプションオブジェクトをクエリします。

移行、同期、またはサブスクリプションオブジェクトの定義

オブジェクト関連のパラメーターの値は JSON 文字列です。次のセクションでは、オブジェクト関連のパラメーターについて説明します。

  • 移行、同期、またはサブスクリプションオブジェクトに複数のデータベースが含まれている場合は、次の定義をご参照ください。

    重要

    サブスクリプションインスタンスはマッピング機能をサポートしていません。サブスクリプションインスタンスの name パラメーターの値は、サブスクライブするデータベースまたはテーブルの名前と同じである必要があります。

    {
        "移行、同期、またはサブスクライブするデータベース 1 の名前": {
            "name": "宛先インスタンスのデータベース 1 の名前",
            "all": true (データベース全体が移行、同期、またはサブスクライブされることを示します)
        },
        "移行、同期、またはサブスクライブするデータベース 2 の名前": {
            "name": "宛先インスタンスのデータベース 2 の名前",
            "all": false (データベース全体が移行、同期、またはサブスクライブされないことを示します),
            "Table": {
                "移行、同期、またはサブスクライブするテーブル A の名前": {
                    "name": "宛先インスタンスのテーブル A の名前",
                    "all": true (テーブル全体が移行、同期、またはサブスクライブされることを示します),
                    "dml_op": "増分的に移行または同期する DML 操作",
                    "ddl_op": "増分的に移行または同期する DDL 操作"
                }
            }
        },
        "移行、同期、またはサブスクライブするデータベース 3 の名前": {
            "name": "宛先インスタンスのデータベース 3 の名前",
            "all": true (データベース全体が移行、同期、またはサブスクライブされることを示します),
            "dml_op": "増分的に移行または同期する DML 操作",
            "ddl_op": "増分的に移行または同期する DDL 操作"
        }
    }
  • 移行または同期オブジェクトが列レベルであるか、フィルター条件を含む場合は、次の定義をご参照ください。

    {
        "移行、同期、またはサブスクライブするデータベースの名前": {
            "name": "宛先インスタンスのデータベースの名前",
            "all": false (データベース全体が移行、同期、またはサブスクライブされないことを示します),
            "Table": {
                "移行、同期、またはサブスクライブするテーブル A の名前": {
                    "name": "宛先インスタンスのテーブル A の名前",
                    "all": false (テーブル全体が移行、同期、またはサブスクライブされないことを示します),
                    "filter": "id>10"
                    "column": {
                        "id": {
                            "key": "PRI",
                            "name": "id",
                            "type": "int(11)",
                            "sharedKey": false,
                            "state": "checked"
                        }
                    },
                    "shard": 12
                }
            }
        }
    }
  • 移行または同期オブジェクトの宛先データベースインスタンスが AnalyticDB for MySQL または AnalyticDB for PostgreSQL の場合は、次の定義をご参照ください。

    {
        "移行または同期するデータベースの名前": {
            "name": "宛先インスタンスのデータベースの名前",
            "all": false (false に固定。オブジェクトがデータベースレベルかテーブルレベルかに関係なく、宛先インスタンスが AnalyticDB for MySQL または AnalyticDB for PostgreSQL の場合、このパラメーターは false に固定され、テーブルのパーティションキーなどの情報も指定する必要があります),
            "Table": {
                "移行または同期するテーブル A の名前": {
                    "all": true (テーブル全体が移行または同期されることを示します),
                    "name": "宛先インスタンスのテーブル A の名前",
                    "primary_key": "id (プライマリキーを指定します)",
                    "type": "dimension (テーブルのタイプ)",
                }
                "移行または同期するテーブル B の名前": {
                    "all": true (テーブル全体が移行または同期されることを示します),
                    "name": "宛先インスタンスのテーブル B の名前",
                    "part_key": "id (パーティションキーを指定します)",
                    "primary_key": "id (プライマリキーを指定します)",
                    "type": "partition (テーブルのタイプ)",
                    "tagColumnValue": "タグ列の値"
                }
            }
        }
    }
  • 同期オブジェクトに独立した競合解決ポリシーを設定する必要がある場合は、次の定義をご参照ください。

    説明
    • この機能は、MySQL インスタンス間または PolarDB for MySQL クラスター間の双方向同期インスタンスでのみサポートされます。

    • データベースまたはテーブルレベルで独立した競合解決ポリシーを設定できます。

    • 独立した競合解決ポリシーが構成されている列の場合、グローバル競合解決ポリシーは有効になりません。

    テーブルレベルの設定

    {
        "同期するデータベース 1 の名前": {
          "name": "宛先インスタンスのデータベース 1 の名前",
          "all": true (データベース全体が同期されることを示します),
          "conflict": "タスクレベルの競合解決ポリシー"
        },
        "同期するデータベース 2 の名前": {
          "name": "宛先インスタンスのデータベース 2 の名前",
          "all": false (データベース全体が同期されないことを示します),
          "conflict": "overwrite",
          "Table": {
            "同期するテーブル A の名前": {
              "name": "宛先インスタンスのテーブル A の名前",
              "all": true (テーブル全体が同期されることを示します),
              "cdr_cmp_col": "競合検出列",
              "cdr_rslv_col": "競合検出列",
              "resolve_method": "テーブルレベルの競合解決ポリシー"
            }
          }
        }
    }

    データベースレベルの設定

    • 同期オブジェクトがデータベース全体の場合:

      "同期するデータベース 1 の名前": {
        "name": "宛先インスタンスのデータベース 1 の名前",
        "all": true (データベース全体が同期されることを示します),
        "conflict": "タスクレベルの競合解決ポリシー",
        "cdr_cmp_col": "競合検出列",
        "cdr_rslv_col": "競合検出列",
        "resolve_method": "データベースレベルの競合解決ポリシー"
        }
      }
    • 同期オブジェクトがデータベース全体ではない場合:

      "同期するデータベース 2 の名前": {
        "name": "宛先インスタンスのデータベース 2 の名前",
        "all": false (データベース全体が同期されないことを示します),
        "conflict": "タスクレベルの競合解決ポリシー",
        "cdr_cmp_col": "競合検出列",
        "cdr_rslv_col": "競合検出列",
        "resolve_method": "データベースレベルの競合解決ポリシー",
        "Table": {
          "同期するテーブル A の名前": {
            "name": "宛先インスタンスのテーブル A の名前",
            "all": true (テーブル全体が同期されることを示します)
          }
        }
      }
  • データレイクへのデータ統合タスクを構成する必要がある場合は、次の定義をご参照ください。

    パラメーター

    説明

    write_operation

    データ競合が発生した場合にデータを書き込むために使用されるメソッド。

    • append: 宛先データベースの現在のデータを保持し、新しいデータを追加します。

    • overwrite: 宛先データベースの競合するデータを上書きします。

    • errorIfExists: タスクはエラーを報告して終了します。

    • ignore: 現在のデータ書き込み操作をスキップし、実行を続行し、宛先データベースの競合データを使用します。

    targetType

    OSS に書き込まれた後のデータのフォーマット (強制変換)。次のフォーマットがサポートされています: ByteIntegerLongDoubleStringBinaryBooleanTimestamp、および Date

    説明

    このパラメーターが指定されていない場合、DTS はソースからサポートされているタイプにデータの型を自動的に変換します。

    etl_date

    追加する追加の列 (定数) の名前。

    説明

    2 つの etl_date パラメーターの値は同じである必要があります。

    syntacticType

    列が追加されることを示す ADD に固定。

    説明

    追加された列 (value) の値は定数のみであり、一重引用符 ('') で囲む必要があります。

    part_key

    宛先テーブルのパーティションキー。このパラメーターには 2 つの可能な値があります。

    説明

    このパラメーターは、宛先テーブルがパーティションテーブルの場合にのみ必須です。

    • ソースから統合される列。

    • 宛先に追加され、パーティションキーとして設定される定数列。

      説明

      フォーマットは <Key>=<Value> です。たとえば、dt=2025-07-07 は、値が 2025-07-07dt という名前の定数列を示します。

    {
        "統合するデータベース 1 の名前": {
            "all": false (false に固定), 
            "Table": {
                "統合するテーブル A の名前": {
                    "all": false (テーブル全体が統合されないことを示します), 
                    "filter": "", 
                    "write_operation": "データを書き込むために使用されるメソッド", 
                    "name": "宛先インスタンスのテーブル A の名前", 
                    "column": {
                        "統合する列 a の名前": {
                            "name": "宛先インスタンスの列 a の名前", 
                            "targetType": "OSS の列のタイプ"
                        }, 
                        ******, 
                        "etl_date": {
                            "syntacticType": "ADD (ADD に固定)", 
                            "name": "etl_date", 
                            "type": "String (String に固定)", 
                            "value": "'2025-07-08 03:30:00'"
                        }
                    }, 
                    "part_key": "dt=2025-07-07"
                }
            }, 
            "name": "dtstestdata (宛先インスタンスのデータベース 1 の名前)"
        }
    }

パラメーター

説明

name

宛先のソースデータベース、テーブル、または列がマップされる名前。たとえば、dtssource という名前のデータベースを dtstarget という名前のデータベースに移行する場合、name パラメーターを dtstarget に設定する必要があります。

all

すべてのテーブルまたは列を選択するかどうかを指定します。有効な値:

  • true: はい。

    説明

    すべてのテーブルまたは列を選択した場合、特定のテーブルまたは列の情報を構成する必要はありません。

  • false: いいえ。

Table

ソーステーブルの情報。

filter

移行、同期、またはサブスクライブするデータをフィルター処理するために使用されるフィルター条件。このパラメーターはテーブルレベルでのみ設定できます。

たとえば、このパラメーターを id>10 に設定して、ID 列の値が 10 より大きいデータのみを移行または同期できます。詳細については、「フィルター条件を設定する」をご参照ください。

説明

サブスクリプションタスクはフィルター条件の設定をサポートしていません。

column

ソース列の情報。

key

列がプライマリキーであるかどうかを指定します。有効な値:

  • PRI: はい。

  • 空の文字列: いいえ。

sharedKey

列がシャードキーであるかどうかを指定します。有効な値:

  • true: はい。

  • false: いいえ。

説明

このパラメーターは、移行または同期オブジェクトのデータベースタイプが Kafka の場合にのみ必須です。

type

フィールドのデータの型。

state

値が checked の場合、列が選択されます。

shard

移行または同期するテーブルのシャード数。

説明

このパラメーターは、移行または同期データのデータベースタイプが Kafka の場合にのみ必須です。

dml_op

増分的に移行または同期する DML 操作。有効な値:

  • i: INSERT。

  • u: UPDATE。

  • d: DELETE。

  • このパラメーターが空の場合、タスクでサポートされているすべての DML 操作が増分的に移行または同期されます。

  • none: DML 操作は増分的に移行または同期されません。

説明

さまざまな移行または同期タスクでサポートされている DML 操作をクエリするには、「移行ソリューション」または「同期ソリューション」の特定のタスクの構成ドキュメントをご参照ください。

ddl_op

増分的に移行または同期する DDL 操作。有効な値:

  • ct: CREATE TABLE。

  • at: ALTER TABLE。

  • dt: DROP TABLE。

  • rt: RENAME TABLE。

  • tt: TRUNCATE TABLE。

  • このパラメーターが空の場合、タスクでサポートされているすべての DDL 操作が増分的に移行または同期されます。

  • none: DDL 操作は増分的に移行または同期されません。

説明

さまざまな移行または同期タスクでサポートされている DDL 操作をクエリするには、「移行ソリューション」または「同期ソリューション」の特定のタスクの構成ドキュメントをご参照ください。

primary_key

プライマリキーを指定します。このパラメーターは、宛先インスタンスが AnalyticDB for MySQL または AnalyticDB for PostgreSQL の場合にのみ使用可能で、必須です。

part_key

パーティションキーを指定します。このパラメーターは、宛先インスタンスが AnalyticDB for MySQL または AnalyticDB for PostgreSQL の場合に利用可能で、指定する必要があります。

type

重要

この type パラメーターは、フィールドのデータの型を表す type パラメーターとは異なります。

宛先インスタンスが AnalyticDB for MySQL または AnalyticDB for PostgreSQL の場合、移行または同期するオブジェクトのテーブルタイプを指定する必要があります:

  • dimension: ディメンションテーブル。

  • partition: パーティションテーブル。

tagColumnValue

__dts_data_source タグ列のカスタム値。宛先インスタンスが AnalyticDB for MySQL の場合、このパラメーターは使用可能であり、渡す必要があります。

conflict

タスクレベルでのグローバル競合解決ポリシー。このパラメーターは、同期する各データベースに含める必要があり、値は同じでなければなりません。有効な値:

  • overwrite: データ同期の競合が発生した場合、宛先データベースの競合するレコードは直接上書きされます。

  • interrupt: データ同期の競合が発生した場合、同期タスクはエラーを報告して終了します。同期タスクは失敗状態になり、タスクを修正するために介入する必要があります。

    説明

    コンソールには [TaskFailed] と表示されます。

  • ignore: データ同期の競合が発生した場合、現在の同期文はスキップされ、実行が続行され、宛先データベースの競合するレコードが使用されます。

resolve_method

テーブルレベルでの独立した競合解決ポリシー (増分同期でのみサポート)。有効な値:

  • overwrite: データ同期の競合が発生した場合、宛先データベースの競合するレコードは直接上書きされます。

  • interrupt: データ同期の競合が発生した場合、同期タスクはエラーを報告して終了します。同期タスクは失敗状態になり、タスクを修正するために介入する必要があります。

    説明

    コンソールには [TaskFailed] と表示されます。

  • ignore: データ同期の競合が発生した場合、現在の同期文はスキップされ、実行が続行され、宛先データベースの競合するレコードが使用されます。

  • use_max: データ同期の競合が発生した場合、2 つの競合するレコードが比較され、値の大きいレコードが宛先データベースに書き込まれます。宛先レコードが存在しないか、フィールドタイプが要件を満たしていない場合、システムの処理方法は overwrite と同等です。

  • use_min: データ同期の競合が発生した場合、2 つの競合するレコードが比較され、値の小さいレコードが宛先データベースに書き込まれます。宛先レコードが存在しないか、フィールドタイプが要件を満たしていない場合、システムの処理方法は ignore と同等です。

cdr_cmp_col

  • テーブルレベル: 独立した競合解決ポリシーを設定する必要がある競合検出列。プライマリキーとユニークキーは除きます。値は同じである必要があります。

    重要
    • 競合解決ポリシーが use_max または use_min の場合、cdr_cmp_col および cdr_rslv_col パラメーターを指定する必要があります。

    • 値には、デフォルトでプライマリキーとユニークキーがすでに含まれています。対応する列を手動で指定する必要はありません。

  • データベースレベル: 独立した競合解決ポリシーを設定する必要がある競合検出列。値は同じである必要があります。

    重要

    cdr_cmp_col および cdr_rslv_col パラメーターを指定する必要があります。

cdr_rslv_col

移行、同期、またはサブスクリプションオブジェクトの構成例

  • 例 1: dtstestdata データベースのすべてのテーブルを移行、同期、またはサブスクライブします。

    {"dtstestdata": {   "name": "dtstestdata",   "all": true }}
  • 例 2: dtstestdata データベースを移行または同期し、dtstestdata_new に名前を変更します。

    {"dtstestdata": {   "name": "dtstestdata_new",   "all": true }}
  • 例 3: dtstestdata データベースの特定のテーブル (customer など) を移行、同期、またはサブスクライブします。

    {"dtstestdata": {
       "name": "dtstestdata",
       "all": false,
       "Table": {
         "customer": {
           "name": "customer",
           "all": true, 
           "column": { 
             "id": {
               "key": "PRI",
               "name": "id",
               "type": "int(11)",
               "sharedKey": false,
               "state": "checked"  
             },
             "gmt_create": {
               "key": "",
               "name": "gmt_create",
               "type": "datetime",
               "sharedKey": false,
               "state": "checked"
             },
             "gmt_modify": {
               "key": "",
               "name": "gmt_modify",
               "type": "datetime",
               "sharedKey": false,
               "state": "checked"
             },
             "valid_time": {
               "key": "",
               "name": "valid_time",
               "type": "datetime",
               "sharedKey": false,
               "state": "checked"
             },
             "creator": {
               "key": "",
               "name": "creator",
               "type": "varchar(200)",
               "sharedKey": false,
               "state": "checked"
             }
           },
           "shard": 12
         }
       }
     }
    }
  • 例 4: dtstestdata データベースのテーブル (customer や order など) の特定の列を移行または同期します。

    {"dtstestdata": {
       "name": "dtstestdata",
       "all": false,
       "Table": {
         "customer": {
           "name": "customer",
           "all": false, 
           "column": { 
             "id": {
               "key": "PRI",
               "name": "id",
               "type": "int(11)",
               "sharedKey": false,
               "state": "checked"  
             },
             "level": {
               "key": "",
               "name": "level",
               "type": "varchar(5000)",
               "sharedKey": false,
               "state": "checked"
             },
             "name": {
               "key": "",
               "name": "name",
               "type": "varchar(500)",
               "sharedKey": false,
               "state": "checked"
             },
           },
           "shard": 12
         },
         "order": {
           "name": "order",
           "all": false,
          "column": {
             "id": {
               "key": "PRI",
               "name": "id",
               "type": "int(11)",
               "sharedKey": false,
               "state": "checked"
             }
           },
           "shard": 12
         }
       }
     }
    }
  • 例 5: dtstestdata データベースから宛先の AnalyticDB for MySQL または AnalyticDB for PostgreSQL インスタンスにテーブル (customer、order、commodity など) を移行または同期します。

    {
        "dtstestdata": {
            "name": "dtstestdatanew",
            "all": false,
            "Table": {
                "order": {
                    "name": "ordernew",
                    "all": true,
                    "part_key": "id",
                    "primary_key": "id",
                    "type": "partition"
                },
                "customer": {
                    "name": "customernew",
                    "all": true,
                    "primary_key": "id",
                    "type": "dimension"
                },
                "commodity": {
                    "name": "commoditynew",
                    "all": false,
                    "filter": "id>10",
                    "column": {
                        "id": {
                            "key": "PRI",
                            "name": "id",
                            "type": "int(11)"
                        }
                    },
                    "part_key": "id",
                    "primary_key": "id",
                    "type": "partition"
                }
            }
        }
    }
  • 例 6: 同期オブジェクトに独立した競合解決ポリシーを設定します。

    テーブルレベルの設定

    同期タスクオブジェクトのグローバル競合解決ポリシーを interrupt に設定します。dtstestdata2 データベースの customer テーブルのプライマリキー列、ユニークキー列、および name 列の独立した競合解決ポリシーを overwrite に設定します。

    {
        "dtstestdata1": {
          "name": "dtstestdata1",
          "all": true,
          "conflict": "interrupt"
        },
        "dtstestdata2": {
          "name": "dtstestdata2",
          "all": false,
          "conflict": "interrupt",
          "Table": {
            "customer": {
              "name": "customer",
              "all": true,
              "cdr_cmp_col": "name",
              "cdr_rslv_col": "name",
              "resolve_method": "overwrite"
            }
          }
        }
      }

    データベースレベルの設定

    • 同期オブジェクトがデータベース全体の場合: dtstestdata1 データベースで同期するすべてのテーブルの name 列と addr 列の独立した競合解決ポリシーを use_max に設定します。

      "dtstestdata1": {
        "name": "dtstestdata1",
        "all": true,
        "conflict": "overwrite",
        "cdr_cmp_col": "name,addr",
        "cdr_rslv_col": "name,addr",
        "resolve_method": "use_max"
        }
      }
    • 同期オブジェクトがデータベース全体ではない場合: dtstestdata2 データベースで同期するすべてのテーブルの name 列と addr 列の独立した競合解決ポリシーを use_max に設定します。

      "dtstestdata2": {
        "name": "dtstestdata2",
        "all": false,
        "conflict": "overwrite",
        "cdr_cmp_col": "name,addr",
        "cdr_rslv_col": "name,addr",
        "resolve_method": "use_max",
        "Table": {
          "person": {
            "name": "person",
            "all": true
          },
          "class": {
            "name": "class",
            "all": true
          }
        }
      }

  • 例 7: ソースデータベースタイプが Tair/Redis の同期インスタンスの場合、DB 名が 0 と 1 の DB からキープレフィックスが HProp のデータのみを同期します (つまり、同期するプレフィックスHProp です)。DB 名が 2 の DB の場合、キープレフィックスが dts で、dtstest を含まないデータのみを同期します (つまり、同期するプレフィックスdts で、フィルタリングするプレフィックスdtstest です)。

    {
        "0": {
            "name": "0", 
            "all": true,
             "filter": "[{"condition":"HProp","filterType":"white","filterPattern":"prefix"}]"
        }, 
        "1": {
            "name": "1", 
            "all": true,
             "filter": "[{"condition":"HProp","filterType":"white","filterPattern":"prefix"}]"
        }, 
        "2": {
            "name": "2", 
            "all": true,
             "filter": "[{"condition":"dts","filterType":"white","filterPattern":"prefix"},{"condition":"dtstest","filterType":"black","filterPattern":"prefix"}]"
        }
    }
  • 例 8: dtstestdata データベースの commodity テーブルを Delta フォーマットで宛先 OSS に統合します。

    {
        "dtstestdata(": {
            "all": false, 
            "Table": {
                "commodity": {
                    "all": false(), 
                    "filter": "", 
                    "write_operation": "overwrite", 
                    "name": "commodity", 
                    "column": {
                        "IS_VALID": {
                            "name": "is_valid", 
                            "targetType": "String"
                        }, 
                        "BuiltinArchiveDate": {
                            "name": "builtinarchivedate", 
                            "targetType": "String"
                        }, 
                        "PRODUCT_NAME": {
                            "name": "product_name", 
                            "targetType": "String"
                        }, 
                        "PRODUCT_CODE": {
                            "name": "product_code", 
                            "targetType": "String"
                        }, 
                        "etl_date": {
                            "syntacticType": "ADD", 
                            "name": "etl_date", 
                            "type": "String", 
                            "value": "'2025-07-08 03:30:00'"
                        }
                    }, 
                    "part_key": "dt=2025-07-07"
                }
            }, 
            "name": "dtstestdata"
        }
    }