DTS は、API 操作を呼び出して DTS タスクの移行、同期、またはサブスクリプションオブジェクトを構成またはクエリすることをサポートしています。このトピックでは、関連する API 操作について説明し、オブジェクトの定義と構成例を示します。
関連する API 操作とパラメーター
API | 説明 |
| |
|
移行、同期、またはサブスクリプションオブジェクトの定義
オブジェクト関連のパラメーターの値は JSON 文字列です。次のセクションでは、オブジェクト関連のパラメーターについて説明します。
移行、同期、またはサブスクリプションオブジェクトに複数のデータベースが含まれている場合は、次の定義をご参照ください。
重要サブスクリプションインスタンスはマッピング機能をサポートしていません。サブスクリプションインスタンスの
nameパラメーターの値は、サブスクライブするデータベースまたはテーブルの名前と同じである必要があります。{ "移行、同期、またはサブスクライブするデータベース 1 の名前": { "name": "宛先インスタンスのデータベース 1 の名前", "all": true (データベース全体が移行、同期、またはサブスクライブされることを示します) }, "移行、同期、またはサブスクライブするデータベース 2 の名前": { "name": "宛先インスタンスのデータベース 2 の名前", "all": false (データベース全体が移行、同期、またはサブスクライブされないことを示します), "Table": { "移行、同期、またはサブスクライブするテーブル A の名前": { "name": "宛先インスタンスのテーブル A の名前", "all": true (テーブル全体が移行、同期、またはサブスクライブされることを示します), "dml_op": "増分的に移行または同期する DML 操作", "ddl_op": "増分的に移行または同期する DDL 操作" } } }, "移行、同期、またはサブスクライブするデータベース 3 の名前": { "name": "宛先インスタンスのデータベース 3 の名前", "all": true (データベース全体が移行、同期、またはサブスクライブされることを示します), "dml_op": "増分的に移行または同期する DML 操作", "ddl_op": "増分的に移行または同期する DDL 操作" } }移行または同期オブジェクトが列レベルであるか、フィルター条件を含む場合は、次の定義をご参照ください。
{ "移行、同期、またはサブスクライブするデータベースの名前": { "name": "宛先インスタンスのデータベースの名前", "all": false (データベース全体が移行、同期、またはサブスクライブされないことを示します), "Table": { "移行、同期、またはサブスクライブするテーブル A の名前": { "name": "宛先インスタンスのテーブル A の名前", "all": false (テーブル全体が移行、同期、またはサブスクライブされないことを示します), "filter": "id>10" "column": { "id": { "key": "PRI", "name": "id", "type": "int(11)", "sharedKey": false, "state": "checked" } }, "shard": 12 } } } }移行または同期オブジェクトの宛先データベースインスタンスが AnalyticDB for MySQL または AnalyticDB for PostgreSQL の場合は、次の定義をご参照ください。
{ "移行または同期するデータベースの名前": { "name": "宛先インスタンスのデータベースの名前", "all": false (false に固定。オブジェクトがデータベースレベルかテーブルレベルかに関係なく、宛先インスタンスが AnalyticDB for MySQL または AnalyticDB for PostgreSQL の場合、このパラメーターは false に固定され、テーブルのパーティションキーなどの情報も指定する必要があります), "Table": { "移行または同期するテーブル A の名前": { "all": true (テーブル全体が移行または同期されることを示します), "name": "宛先インスタンスのテーブル A の名前", "primary_key": "id (プライマリキーを指定します)", "type": "dimension (テーブルのタイプ)", } "移行または同期するテーブル B の名前": { "all": true (テーブル全体が移行または同期されることを示します), "name": "宛先インスタンスのテーブル B の名前", "part_key": "id (パーティションキーを指定します)", "primary_key": "id (プライマリキーを指定します)", "type": "partition (テーブルのタイプ)", "tagColumnValue": "タグ列の値" } } } }同期オブジェクトに独立した競合解決ポリシーを設定する必要がある場合は、次の定義をご参照ください。
説明この機能は、MySQL インスタンス間または PolarDB for MySQL クラスター間の双方向同期インスタンスでのみサポートされます。
データベースまたはテーブルレベルで独立した競合解決ポリシーを設定できます。
独立した競合解決ポリシーが構成されている列の場合、グローバル競合解決ポリシーは有効になりません。
テーブルレベルの設定
{ "同期するデータベース 1 の名前": { "name": "宛先インスタンスのデータベース 1 の名前", "all": true (データベース全体が同期されることを示します), "conflict": "タスクレベルの競合解決ポリシー" }, "同期するデータベース 2 の名前": { "name": "宛先インスタンスのデータベース 2 の名前", "all": false (データベース全体が同期されないことを示します), "conflict": "overwrite", "Table": { "同期するテーブル A の名前": { "name": "宛先インスタンスのテーブル A の名前", "all": true (テーブル全体が同期されることを示します), "cdr_cmp_col": "競合検出列", "cdr_rslv_col": "競合検出列", "resolve_method": "テーブルレベルの競合解決ポリシー" } } } }データベースレベルの設定
同期オブジェクトがデータベース全体の場合:
"同期するデータベース 1 の名前": { "name": "宛先インスタンスのデータベース 1 の名前", "all": true (データベース全体が同期されることを示します), "conflict": "タスクレベルの競合解決ポリシー", "cdr_cmp_col": "競合検出列", "cdr_rslv_col": "競合検出列", "resolve_method": "データベースレベルの競合解決ポリシー" } }同期オブジェクトがデータベース全体ではない場合:
"同期するデータベース 2 の名前": { "name": "宛先インスタンスのデータベース 2 の名前", "all": false (データベース全体が同期されないことを示します), "conflict": "タスクレベルの競合解決ポリシー", "cdr_cmp_col": "競合検出列", "cdr_rslv_col": "競合検出列", "resolve_method": "データベースレベルの競合解決ポリシー", "Table": { "同期するテーブル A の名前": { "name": "宛先インスタンスのテーブル A の名前", "all": true (テーブル全体が同期されることを示します) } } }
データレイクへのデータ統合タスクを構成する必要がある場合は、次の定義をご参照ください。
パラメーター
説明
write_operationデータ競合が発生した場合にデータを書き込むために使用されるメソッド。
append: 宛先データベースの現在のデータを保持し、新しいデータを追加します。overwrite: 宛先データベースの競合するデータを上書きします。errorIfExists: タスクはエラーを報告して終了します。ignore: 現在のデータ書き込み操作をスキップし、実行を続行し、宛先データベースの競合データを使用します。
targetTypeOSS に書き込まれた後のデータのフォーマット (強制変換)。次のフォーマットがサポートされています:
Byte、Integer、Long、Double、String、Binary、Boolean、Timestamp、およびDate。説明このパラメーターが指定されていない場合、DTS はソースからサポートされているタイプにデータの型を自動的に変換します。
etl_date追加する追加の列 (定数) の名前。
説明2 つの
etl_dateパラメーターの値は同じである必要があります。syntacticType列が追加されることを示す ADD に固定。
説明追加された列 (
value) の値は定数のみであり、一重引用符 ('') で囲む必要があります。part_key宛先テーブルのパーティションキー。このパラメーターには 2 つの可能な値があります。
説明このパラメーターは、宛先テーブルがパーティションテーブルの場合にのみ必須です。
ソースから統合される列。
宛先に追加され、パーティションキーとして設定される定数列。
説明フォーマットは
<Key>=<Value>です。たとえば、dt=2025-07-07は、値が2025-07-07のdtという名前の定数列を示します。
{ "統合するデータベース 1 の名前": { "all": false (false に固定), "Table": { "統合するテーブル A の名前": { "all": false (テーブル全体が統合されないことを示します), "filter": "", "write_operation": "データを書き込むために使用されるメソッド", "name": "宛先インスタンスのテーブル A の名前", "column": { "統合する列 a の名前": { "name": "宛先インスタンスの列 a の名前", "targetType": "OSS の列のタイプ" }, ******, "etl_date": { "syntacticType": "ADD (ADD に固定)", "name": "etl_date", "type": "String (String に固定)", "value": "'2025-07-08 03:30:00'" } }, "part_key": "dt=2025-07-07" } }, "name": "dtstestdata (宛先インスタンスのデータベース 1 の名前)" } }
パラメーター | 説明 |
| 宛先のソースデータベース、テーブル、または列がマップされる名前。たとえば、dtssource という名前のデータベースを dtstarget という名前のデータベースに移行する場合、name パラメーターを dtstarget に設定する必要があります。 |
| すべてのテーブルまたは列を選択するかどうかを指定します。有効な値:
|
| ソーステーブルの情報。 |
| 移行、同期、またはサブスクライブするデータをフィルター処理するために使用されるフィルター条件。このパラメーターはテーブルレベルでのみ設定できます。 たとえば、このパラメーターを 説明 サブスクリプションタスクはフィルター条件の設定をサポートしていません。 |
| ソース列の情報。 |
| 列がプライマリキーであるかどうかを指定します。有効な値:
|
| 列がシャードキーであるかどうかを指定します。有効な値:
説明 このパラメーターは、移行または同期オブジェクトのデータベースタイプが Kafka の場合にのみ必須です。 |
| フィールドのデータの型。 |
| 値が |
| 移行または同期するテーブルのシャード数。 説明 このパラメーターは、移行または同期データのデータベースタイプが Kafka の場合にのみ必須です。 |
| 増分的に移行または同期する DML 操作。有効な値:
|
| 増分的に移行または同期する DDL 操作。有効な値:
|
| プライマリキーを指定します。このパラメーターは、宛先インスタンスが AnalyticDB for MySQL または AnalyticDB for PostgreSQL の場合にのみ使用可能で、必須です。 |
| パーティションキーを指定します。このパラメーターは、宛先インスタンスが AnalyticDB for MySQL または AnalyticDB for PostgreSQL の場合に利用可能で、指定する必要があります。 |
重要 この | 宛先インスタンスが AnalyticDB for MySQL または AnalyticDB for PostgreSQL の場合、移行または同期するオブジェクトのテーブルタイプを指定する必要があります:
|
| __dts_data_source タグ列のカスタム値。宛先インスタンスが AnalyticDB for MySQL の場合、このパラメーターは使用可能であり、渡す必要があります。 |
| タスクレベルでのグローバル競合解決ポリシー。このパラメーターは、同期する各データベースに含める必要があり、値は同じでなければなりません。有効な値:
|
| テーブルレベルでの独立した競合解決ポリシー (増分同期でのみサポート)。有効な値:
|
|
|
|
移行、同期、またはサブスクリプションオブジェクトの構成例
例 1: dtstestdata データベースのすべてのテーブルを移行、同期、またはサブスクライブします。
{"dtstestdata": { "name": "dtstestdata", "all": true }}例 2: dtstestdata データベースを移行または同期し、dtstestdata_new に名前を変更します。
{"dtstestdata": { "name": "dtstestdata_new", "all": true }}例 3: dtstestdata データベースの特定のテーブル (customer など) を移行、同期、またはサブスクライブします。
{"dtstestdata": { "name": "dtstestdata", "all": false, "Table": { "customer": { "name": "customer", "all": true, "column": { "id": { "key": "PRI", "name": "id", "type": "int(11)", "sharedKey": false, "state": "checked" }, "gmt_create": { "key": "", "name": "gmt_create", "type": "datetime", "sharedKey": false, "state": "checked" }, "gmt_modify": { "key": "", "name": "gmt_modify", "type": "datetime", "sharedKey": false, "state": "checked" }, "valid_time": { "key": "", "name": "valid_time", "type": "datetime", "sharedKey": false, "state": "checked" }, "creator": { "key": "", "name": "creator", "type": "varchar(200)", "sharedKey": false, "state": "checked" } }, "shard": 12 } } } }例 4: dtstestdata データベースのテーブル (customer や order など) の特定の列を移行または同期します。
{"dtstestdata": { "name": "dtstestdata", "all": false, "Table": { "customer": { "name": "customer", "all": false, "column": { "id": { "key": "PRI", "name": "id", "type": "int(11)", "sharedKey": false, "state": "checked" }, "level": { "key": "", "name": "level", "type": "varchar(5000)", "sharedKey": false, "state": "checked" }, "name": { "key": "", "name": "name", "type": "varchar(500)", "sharedKey": false, "state": "checked" }, }, "shard": 12 }, "order": { "name": "order", "all": false, "column": { "id": { "key": "PRI", "name": "id", "type": "int(11)", "sharedKey": false, "state": "checked" } }, "shard": 12 } } } }例 5: dtstestdata データベースから宛先の AnalyticDB for MySQL または AnalyticDB for PostgreSQL インスタンスにテーブル (customer、order、commodity など) を移行または同期します。
{ "dtstestdata": { "name": "dtstestdatanew", "all": false, "Table": { "order": { "name": "ordernew", "all": true, "part_key": "id", "primary_key": "id", "type": "partition" }, "customer": { "name": "customernew", "all": true, "primary_key": "id", "type": "dimension" }, "commodity": { "name": "commoditynew", "all": false, "filter": "id>10", "column": { "id": { "key": "PRI", "name": "id", "type": "int(11)" } }, "part_key": "id", "primary_key": "id", "type": "partition" } } } }例 6: 同期オブジェクトに独立した競合解決ポリシーを設定します。
テーブルレベルの設定
同期タスクオブジェクトのグローバル競合解決ポリシーを
interruptに設定します。dtstestdata2 データベースの customer テーブルのプライマリキー列、ユニークキー列、およびname列の独立した競合解決ポリシーをoverwriteに設定します。{ "dtstestdata1": { "name": "dtstestdata1", "all": true, "conflict": "interrupt" }, "dtstestdata2": { "name": "dtstestdata2", "all": false, "conflict": "interrupt", "Table": { "customer": { "name": "customer", "all": true, "cdr_cmp_col": "name", "cdr_rslv_col": "name", "resolve_method": "overwrite" } } } }データベースレベルの設定
同期オブジェクトがデータベース全体の場合: dtstestdata1 データベースで同期するすべてのテーブルの
name列とaddr列の独立した競合解決ポリシーをuse_maxに設定します。"dtstestdata1": { "name": "dtstestdata1", "all": true, "conflict": "overwrite", "cdr_cmp_col": "name,addr", "cdr_rslv_col": "name,addr", "resolve_method": "use_max" } }同期オブジェクトがデータベース全体ではない場合: dtstestdata2 データベースで同期するすべてのテーブルの
name列とaddr列の独立した競合解決ポリシーをuse_maxに設定します。"dtstestdata2": { "name": "dtstestdata2", "all": false, "conflict": "overwrite", "cdr_cmp_col": "name,addr", "cdr_rslv_col": "name,addr", "resolve_method": "use_max", "Table": { "person": { "name": "person", "all": true }, "class": { "name": "class", "all": true } } }
例 7: ソースデータベースタイプが Tair/Redis の同期インスタンスの場合、DB 名が 0 と 1 の DB からキープレフィックスが
HPropのデータのみを同期します (つまり、同期するプレフィックス はHPropです)。DB 名が 2 の DB の場合、キープレフィックスがdtsで、dtstestを含まないデータのみを同期します (つまり、同期するプレフィックス はdtsで、フィルタリングするプレフィックス はdtstestです)。{ "0": { "name": "0", "all": true, "filter": "[{"condition":"HProp","filterType":"white","filterPattern":"prefix"}]" }, "1": { "name": "1", "all": true, "filter": "[{"condition":"HProp","filterType":"white","filterPattern":"prefix"}]" }, "2": { "name": "2", "all": true, "filter": "[{"condition":"dts","filterType":"white","filterPattern":"prefix"},{"condition":"dtstest","filterType":"black","filterPattern":"prefix"}]" } }例 8: dtstestdata データベースの commodity テーブルを Delta フォーマットで宛先 OSS に統合します。
{ "dtstestdata(": { "all": false, "Table": { "commodity": { "all": false(), "filter": "", "write_operation": "overwrite", "name": "commodity", "column": { "IS_VALID": { "name": "is_valid", "targetType": "String" }, "BuiltinArchiveDate": { "name": "builtinarchivedate", "targetType": "String" }, "PRODUCT_NAME": { "name": "product_name", "targetType": "String" }, "PRODUCT_CODE": { "name": "product_code", "targetType": "String" }, "etl_date": { "syntacticType": "ADD", "name": "etl_date", "type": "String", "value": "'2025-07-08 03:30:00'" } }, "part_key": "dt=2025-07-07" } }, "name": "dtstestdata" } }