すべてのプロダクト
Search
ドキュメントセンター

Data Transmission Service:DTS タスクにおける ETL の設定

最終更新日:Mar 29, 2026

Data Transmission Service (DTS) を使用してデータをレプリケーションする場合、ソースデータはターゲットに到達する前に変換が必要になることがよくあります。例えば、条件に合わない行のフィルタリング、機密性の高い値のマスキング、各レコードへの変更時刻などのメタデータの付与などです。DTS には、この処理ロジックをレプリケーションエンジンとターゲットデータベースの間でインフライトで実行する、ストリーミング抽出・変換・書き出し (ETL) 機能が含まれています。ETL ロジックは、データ同期シナリオ向けに設計されたドメイン特化言語 (DSL) で表現されます。

一般的なユースケース:

  • データフィルタリング:しきい値を超える ID を持つレコードや、特定の日付より古い注文など、同期基準を満たさないレコードを削除します。

  • データマスキング:電話番号の下 4 桁をアスタリスクに置き換えるなど、機密性の高い値を隠します。

  • 変更追跡:同期された各行にコミットタイムスタンプを付与し、ターゲットテーブルにデータがいつ変更されたかを記録します。

  • 監査ログ:コンプライアンスやデバッグのために、操作タイプ (INSERT、UPDATE、DELETE) と変更時刻を専用の列に書き込みます。

ETL の仕組み

ETL は、DTS レプリケーションエンジンとターゲットデータベースの間の処理ステップとして実行されます。受信する各データレコードは DSL スクリプトを通過し、スクリプトはレコードをターゲットに書き込む前に、レコードの削除、列の値の変更、または新しい列の追加を行うことができます。

DSL は、Simple Log Service (SLS) のデータ処理構文に基づいています。条件付きロジック、文字列操作、日時操作、数値演算、JSON 処理をサポートしています。イベント分割関数はサポートしていません。DSL には次の属性があります:

  • 強力:多くの関数を提供し、関数の組み合わせをサポートします。

  • 比較的シンプルな構文:データフィルタリング、データ変換、データマスキングなどの典型的なシナリオの例を提供します。

  • 高性能:コード生成技術に基づいており、同期プロセスへのパフォーマンスへの影響を最小限に抑えます。

DSL スクリプト内の列名は、バックティック (` `) で囲む必要があります。シングルクォート (') ではありません。

サポートされるデータベース

ETL は、以下のソースデータベースとターゲットデータベースの組み合わせで利用できます。

ソースデータベースターゲットデータベース
SQL ServerAnalyticDB for MySQL 3.0、SQL Server、MySQL、PolarDB for MySQL
MySQLAnalyticDB for MySQL 3.0、AnalyticDB for PostgreSQL、Kafka、ApsaraDB for ClickHouse クラスター、MySQL、PolarDB for MySQL、Elasticsearch、Redis
セルフマネージド OracleAnalyticDB for MySQL 3.0、AnalyticDB for PostgreSQL、Kafka、MaxCompute、PolarDB-X 2.0、PolarDB for PostgreSQL (Oracle 互換)
PolarDB for MySQL
  • AnalyticDB for MySQL 3.0

  • MySQL

  • PolarDB for MySQL

PolarDB for PostgreSQL (Oracle 互換)AnalyticDB for MySQL 3.0、PolarDB for PostgreSQL (Oracle 互換)
PolarDB-X 1.0Kafka、Tablestore
PolarDB-X 2.0
  • PolarDB-X 2.0

  • AnalyticDB for MySQL 3.0

  • MySQL

  • PolarDB for MySQL

セルフマネージド Db2 for LUWMySQL
セルフマネージド Db2 for iMySQL
PolarDB for PostgreSQLPolarDB for PostgreSQL、PostgreSQL
PostgreSQLPolarDB for PostgreSQL、PostgreSQL
TiDBPolarDB for MySQL、MySQL、AnalyticDB for MySQL 3.0
MongoDBLindorm

同期タスク作成時の ETL の設定

このトピックでは、同期タスクを例に説明します。移行タスクの設定手順も同様です。

注意事項

制約違反した場合の結果
DSL スクリプトでは大文字と小文字が区別されます。データベース名、テーブル名、フィールド名はソースと完全に一致する必要があります。スクリプトは通知なくレコードのマッチングに失敗するか、タスクがエラーを報告します。
DSL スクリプトで参照されるすべてのフィールドは、ソースデータベースに存在し、フィルター条件によって除外されていない必要があります。フィールドが見つからないというエラーでタスクが失敗します。
DSL スクリプトは、単一のトップレベルの式のみをサポートします。複数の操作を組み合わせるには、e_compose を使用してください。検証時にタスクがスクリプトを拒否します。
DSL スクリプトが新しい列を追加する場合、タスクを開始する前にその列をターゲットテーブルに手動で追加してください。例えば、スクリプトが e_set(new_column, dt_now()) を使用する場合、まず new_column をターゲットテーブルに追加します。ターゲット列が存在しないため、ETL スクリプトは効果がありません。
DSL 処理後、すべてのテーブルの DML 変更は一貫した列情報を生成する必要があります。例えば、e_set が列を追加する場合、INSERT、UPDATE、DELETE のすべての操作でその列が追加される結果になる必要があります。操作タイプ間のスキーマの不整合により、タスクが失敗する可能性があります。
DSL スクリプトは、データ変換とクレンジング専用です。データベースオブジェクトの作成 (CREATE TABLE など) はサポートされておらず、失敗します。

操作手順

  1. 同期タスクを作成します。詳細については、「同期ソリューション」をご参照ください。

  2. [詳細設定] ステップで、[ETL の設定][はい] に設定します。

  3. テキストボックスに、データ処理 DSL 構文に基づいて DSL スクリプトを入力します。

    id が 3 より大きいレコードをフィルタリングするには、e_if(op_gt(id, 3), e_drop()) を使用します。これにより、id の値が 3 を超えるレコードはすべて削除され、ターゲットに同期されません。

    image

  4. 必要に応じて残りのステップを完了します。

既存の同期タスクの ETL の変更

既存の同期タスクの ETL 設定を更新するには、次のいずれかを選択します:

  • [ETL の設定][いいえ] に設定されていた場合は、[はい] に変更して DSL スクリプトを入力します。

  • [ETL の設定][はい] に設定されていた場合は、DSL スクリプトを変更するか、[ETL の設定][いいえ] に戻します。

重要
  • DSL スクリプトを変更する前に、[選択済みオブジェクト] リストから [ソースオブジェクト] リストに同期オブジェクトを戻し、再度追加してください。このステップを省略すると、タスクが不整合な状態になる可能性があります。

  • 移行タスクの ETL 設定は変更できません。この操作は同期タスクのみがサポートしています。

注意事項

制約違反した場合の結果
ETL 設定を変更すると、実行中のタスクが中断される可能性があります。更新中はデータ同期が一時停止します。メンテナンスウィンドウ中に実行してください。
変更は、タスクの再起動後の増分データにのみ適用されます。すでに同期されている履歴データは再処理されません。変更前に同期されたレコードは影響を受けません。これは意図された仕様です。
ETL の変更中にターゲットテーブルのスキーマ変更はサポートされていません。スキーマを変更するには、同期タスクを開始する前にターゲットテーブルを更新してください。
DSL スクリプトの制約 (大文字と小文字の区別、フィールドの存在、単一の式、新しい列) は、新規タスクの場合と同じです。上記、新規タスクの注意事項に記載されている結果と同じです。

操作手順

  1. 新しい DTS コンソールのデータ同期タスクページにログインします。

  2. 対象の同期タスクの行で、点点点 アイコンをクリックし、[ETL 設定の変更] を選択します。

  3. [詳細設定] ステップで、[ETL の設定][はい] に設定します。

  4. テキストボックスに、更新された DSL スクリプトを入力します。

    id が 3 より大きいレコードをフィルタリングするには、e_if(op_gt(id, 3), e_drop()) を使用します。これにより、id の値が 3 を超えるレコードはすべて削除され、ターゲットに同期されません。

    image

  5. 必要に応じて残りのステップを完了します。

ETL スクリプトの例

以下の例は、最も一般的なシナリオをカバーしています。それぞれ、「DSL 構文リファレンス」セクションで説明されている DSL 関数を使用しています。

データフィルタリング

データフィルタリングを使用して、レコードのサブセットのみを同期します。例えば、最近の注文を分析データベースにレプリケーションする、本番同期からテストデータを除外する、または地域固有のデータレジデンシールールを適用するために属さないレコードを削除するなどです。

  • id > 10000 のレコードを削除する:

    e_if(op_gt(`id`, 10000), e_drop())
  • name に "hangzhou" が含まれるレコードを削除する:

    e_if(str_contains(`name`, "hangzhou"), e_drop())
  • 締切日より前の注文を削除する:

    e_if(op_lt(`order_timestamp`, "2015-02-23 23:54:55"), e_drop())
  • 複数の条件に一致するレコードを削除する (AND):

    e_if(op_and(str_contains(`name`, "hangzhou"), op_gt(`id`, 1000)), e_drop())
  • いずれかの条件に一致するレコードを削除する (OR):

    e_if(op_or(str_contains(`name`, "hangzhou"), op_gt(`id`, 1000)), e_drop())

データマスキング

ターゲットデータベースがソースよりも広い範囲のオーディエンスにアクセスされる場合、データマスキングを使用します。例えば、本番データをレポートシステムに同期したり、第三者とデータを共有したりする場合です。マスキングにより、分析用のデータを保持しつつ、個人識別情報 (PII) を保護できます。

電話番号の下 4 桁 (7〜10 番目の位置) をアスタリスクに置き換える:

e_set(`phone`, str_mask(`phone`, 7, 10, '*'))

変更追跡

ダウンストリームのコンシューマーが行が最後にいつ変更されたかを知る必要がある場合、変更追跡を使用します。例えば、増分処理パイプラインを実装する、古いレコードを検出する、またはソースデータベースをクエリせずにデータの新鮮さを監査するなどです。

重要

タスクを開始する前に、dts_sync_time 列をターゲットテーブルに追加してください。

INSERT、UPDATE、DELETE 操作のために、すべてのテーブルに dts_sync_time 列を追加する:

e_if(op_or(op_or(
        op_eq(__OPERATION__, __OP_INSERT__),
        op_eq(__OPERATION__, __OP_UPDATE__)),
        op_eq(__OPERATION__, __OP_DELETE__)),
    e_set(dts_sync_time, __COMMIT_TIMESTAMP__))

これを特定のテーブル (例:dts_test_table) にのみ適用する場合:

e_if(op_and(
      op_eq(__TB__,'dts_test_table'),
      op_or(op_or(
        op_eq(__OPERATION__,__OP_INSERT__),
        op_eq(__OPERATION__,__OP_UPDATE__)),
        op_eq(__OPERATION__,__OP_DELETE__))),
      e_set(dts_sync_time,__COMMIT_TIMESTAMP__))

監査ログ

ターゲットテーブルにデータ変更の完全な履歴が必要な場合、監査ログを使用します。例えば、コンプライアンス要件を満たす、ロールバックシナリオをサポートする、または別の監査システムなしで変更ログを構築するなどです。このパターンは、すべての受信変更を INSERT として書き込むため、ターゲットテーブルにはすべての変更の完全なレコードが蓄積されます。

重要

タスクを開始する前に、operation_typeupdated 列をターゲットテーブルに追加してください。

DML 操作タイプとコミットタイムスタンプをターゲット行に書き込み、すべての操作を INSERT に変換する:

e_compose(
    e_switch(
        op_eq(__OPERATION__,__OP_DELETE__), e_set(operation_type, 'DELETE'),
        op_eq(__OPERATION__,__OP_UPDATE__), e_set(operation_type, 'UPDATE'),
        op_eq(__OPERATION__,__OP_INSERT__), e_set(operation_type, 'INSERT')),
    e_set(updated, __COMMIT_TIMESTAMP__),
    e_set(__OPERATION__,__OP_INSERT__)
)

完全データと増分データの区別

このパターンは、ターゲットテーブルが完全移行データと増分変更の両方を受け取り、ダウンストリームのコンシューマーがそれらを区別する必要がある場合に使用します。例えば、ストリーミングパイプラインで履歴行の再処理をスキップする、または増分ロジックを適用する前に初期ロードが完了したことを検証するなどです。

完全移行中、__COMMIT_TIMESTAMP__0 (1970-01-01 08:00:00、ローカルタイムゾーンに合わせて調整) です。増分移行中は、実際のログコミット時刻を保持します。以下のスクリプトは、True または Falseis_increment_dml 列に書き込みます:

e_if_else(__COMMIT_TIMESTAMP__ > DATETIME('2000-01-01 00:00:00'),
    e_set(`is_increment_dml`, True),
    e_set(`is_increment_dml`, False)
)

DSL 構文リファレンス

組み込み変数

以下の組み込み変数は、すべての DSL スクリプトで利用できます。

変数説明データの型値の例
__TB__テーブル名stringtable
__DB__データベース名stringmydb
__OPERATION__DML 操作タイプstring__OP_INSERT____OP_UPDATE____OP_DELETE__
__BEFORE__UPDATE の更新前イメージ (変更前の値)。DELETE 操作には更新前イメージのみがあります。特殊マークv(column_name,__BEFORE__)
__AFTER__UPDATE の更新後イメージ (変更後の値)。INSERT 操作には更新後イメージのみがあります。特殊マークv(column_name,__AFTER__)
__COMMIT_TIMESTAMP__トランザクションのコミット時刻datetime'2021-01-01 10:10:01'
` column `現在のレコードの指定された列の値string` id , name `

定数

タイプ
int123
float123.4
string"hello1_world"
booleantrue または false
datetimeDATETIME('2021-01-01 10:10:01')

式関数

数値演算

操作構文パラメーター戻り値
加算op_sum(value1, value2)integer または float両方の入力が integer の場合は Integer、それ以外は floatop_sum(col1, 1.0)
減算op_sub(value1, value2)integer または float両方の入力が integer の場合は Integer、それ以外は floatop_sub(col1, 1.0)
乗算op_mul(value1, value2)integer または float両方の入力が integer の場合は Integer、それ以外は floatop_mul(col1, 1.0)
部門op_div_true(value1, value2)integer または float両方の入力が integer の場合は Integer、それ以外は floatop_div_true(col1, 2.0)col1=15 の場合、7.5
剰余op_mod(value1, value2)integer または float両方の入力が integer の場合は Integer、それ以外は floatop_mod(col1, 10)col1=23 の場合、3

論理演算

操作構文パラメーター戻り値
等しいop_eq(value1, value2)integer、float、または stringbooleanop_eq(col1, 23)
より大きいop_gt(value1, value2)integer、float、または stringbooleanop_gt(col1, 1.0)
より小さいop_lt(value1, value2)integer、float、または stringbooleanop_lt(col1, 1.0)
以上op_ge(value1, value2)integer、float、または stringbooleanop_ge(col1, 1.0)
以下op_le(value1, value2)integer、float、または stringbooleanop_le(col1, 1.0)
ANDop_and(value1, value2)booleanbooleanop_and(is_male, is_student)
ORop_or(value1, value2)booleanbooleanop_or(is_male, is_student)
INop_in(value, json_array)任意の型; JSON 配列文字列booleanop_in(id,json_array('["0","1","2","3"]'))
Is nullop_is_null(value)任意の型booleanop_is_null(name)
Is not nullop_is_not_null(value)任意の型booleanop_is_not_null(name)

文字列関数

操作構文パラメーター戻り値
文字列の連結op_add(str_1, str_2, ..., str_n)文字列連結された文字列op_add(col,'hangzhou','dts')
フォーマットと連結str_format(format, value1, value2, ...)format{} プレースホルダーを含む文字列; values:任意フォーマットされた文字列str_format("part1: {}, part2: {}", col1, col2)col1="ab" かつ col2="12" の場合、"part1: ab, part2: 12"
部分文字列の置換str_replace(original, oldStr, newStr, count)count:最大置換回数; -1 はすべて置換置換後の文字列str_replace(name, "a", 'b', -1)name="aba" の場合、"bbb"
すべての文字列型フィールドで置換tail_replace_string_field(search, replace, all)alltrue置換後の文字列tail_replace_string_field('\u000f','',true) — すべての varchar、text、char フィールドで \u000f を空文字列に置換します
両端の文字を削除str_strip(string_val, charSet)charSet:削除する文字削除後の文字列str_strip(name, 'ab')name="axbzb" の場合、"xbz"
小文字化str_lower(value)string小文字の文字列str_lower(str_col)
大文字化str_upper(value)string大文字の文字列str_upper(str_col)
文字列を整数に変換cast_string_to_long(value)stringIntegercast_string_to_long(col)
整数を文字列に変換cast_long_to_string(value)integerStringcast_long_to_string(col)
部分文字列の出現回数をカウントstr_count(str, pattern)string; 部分文字列Integerstr_count(str_col, 'abc')str_col="zabcyabcz" の場合、2
部分文字列の位置を検索str_find(str, pattern)string; 部分文字列最初の一致の位置; 見つからない場合は -1str_find(str_col, 'abc')str_col="xabcy" の場合、1
アルファベットかどうかをチェックstr_isalpha(str)stringbooleanstr_isalpha(str_col)
数値かどうかをチェックstr_isdigit(str)stringbooleanstr_isdigit(str_col)
正規表現マッチregex_match(str, regex)string; regex 文字列booleanregex_match(__TB__,'user_\\\d+')
部分文字列をマスクstr_mask(str, start, end, maskStr)start 最小値:0; end 最大値:length−1; maskStr:単一文字マスクされた文字列str_mask(phone, 7, 10, '#')
マーカーの後の部分文字列substring_after(str, cond)string; マーカー文字列マーカーの後の文字列 (マーカーは含まない)substring_after(col, 'abc')
マーカーの前の部分文字列substring_before(str, cond)string; マーカー文字列マーカーの前の文字列 (マーカーは含まない)substring_before(col, 'efg')
2 つのマーカー間の部分文字列substring_between(str, cond1, cond2)string; 2 つのマーカー文字列マーカー間の文字列 (マーカーは含まない)substring_between(col, 'abc','efg')
値が文字列かどうかをチェックis_string_value(value)string または列名booleanis_string_value(col1)
MongoDB ドキュメントフィールドを取得bson_value("field1", "field2", ...)各レベルのフィールド名フィールド値e_set(user_name, bson_value("person","name"))

時間関数

操作構文パラメーター戻り値
現在のシステム時刻 (秒精度)dt_now()なしdatetimedt_now()
現在のシステム時刻 (ミリ秒精度)dt_now_millis()なしdatetimedt_now_millis()
協定世界時 (UTC) タイムスタンプ (秒) を datetime に変換dt_fromtimestamp(value, [timezone])integer; オプションのタイムゾーンdatetime (秒精度)dt_fromtimestamp(1626837629,'GMT+08')
UTC タイムスタンプ (ミリ秒) を datetime に変換dt_fromtimestamp_millis(value, [timezone])integer; オプションのタイムゾーンdatetime (ミリ秒精度)dt_fromtimestamp_millis(1626837629123,'GMT+08')
datetime を UTC タイムスタンプ (秒) に変換dt_parsetimestamp(value, [timezone])datetime; オプションのタイムゾーンIntegerdt_parsetimestamp(datetime_col,'GMT+08')
datetime を UTC タイムスタンプ (ミリ秒) に変換dt_parsetimestamp_millis(value, [timezone])datetime; オプションのタイムゾーンIntegerdt_parsetimestamp_millis(datetime_col,'GMT+08')
datetime を文字列に変換dt_str(value, format)datetime; フォーマット文字列Stringdt_str(col1, 'yyyy-MM-dd HH:mm:ss')
文字列を datetime に変換dt_strptime(value, format)string; フォーマット文字列datetimedt_strptime('2021-07-21 03:20:29', 'yyyy-MM-dd hh:mm:ss')
時間単位の加算または減算dt_add(value, [years=n], [months=n], [days=n], [hours=n], [minutes=n])datetime; 整数オフセット (減算の場合は負)datetimedt_add(datetime_col, years=-1)

条件式

構文説明
(cond ? val_1 : val_2)三項演算子。cond が true の場合は val_1 を、それ以外の場合は val_2 を返します。val_1val_2 は同じ型である必要があります。(id>1000 ? 1 : 0)

JSON 関数

value 型は、データベース内の任意のフィールドの型を表します。
操作構文パラメーター戻り値
JSON 配列文字列をセットに変換json_array(arrayText) — boolean 式でのみ使用可能arrayText:JSON 配列文字列セットコレクションop_in(id,json_array('["0","1","2","3"]'))
JSON 配列を作成json_array2(item...)任意の型のアイテムJSON 配列json_array2("0","1","2","3")["0","1","2","3"]
JSON オブジェクトを作成json_object(item...)キーと値のペア (キーは文字列、値は任意)JSONjson_object('name','田中太郎','age',32,'loginId',100){"name":"田中太郎","age":32,"loginId":100}
JSON 配列の位置に挿入json_array_insert(json, kvPairs...)JSONPath の位置; 挿入するデータ。位置が存在しない場合は元の JSON を返します。要素が存在しない場合は配列の末尾に追加します。JSONjson_array_insert('{"Address":["City",1]}','$.Address[3]',100){"Address":["City",1,100]}
JSON オブジェクトに挿入 (存在しない場合のみ)json_insert(json, kvPairs...)JSONPath の位置; 挿入するデータ。位置が存在する場合は元の JSON を返します。JSONjson_insert('{"Address":["City","東京","Number",1]}','$.ID',100){"Address":["City","東京","Number",1],"ID":100}
JSON パスで挿入または更新json_set(json, kvPairs...)JSONPath の位置; データ。位置が存在する場合は更新し、存在しない場合は挿入します。value 型json_set('{"ID":1,"Address":["City","Xian","Number",1]}',"$.IP",100) を実行すると、{"ID":1,"Address":["City","Xian","Number",1],"IP":100}
キーと値のペアを挿入または更新json_put(json, key, value)JSON オブジェクト; キー文字列; 値。json が JSON オブジェクトでない場合は null を返します。JSONjson_put('{"loginId":100}','loginTime','2024-10-10'){"loginId":100,"loginTime":"2024-10-10"}
JSON パスで置換json_replace(json, kvPairs...)JSONPath の位置; 置換データ。位置が存在しない場合は元の JSON を返します。value 型json_replace('{"ID":1,"Address":["City","Xian","Number",1]}',"$.IP",100) は、元の JSON を返します (位置が存在しないため)
JSON パスに値が存在するかをチェックjson_contains(json, jsonPath, item)JSON オブジェクト; JSONPath; チェックするデータbooleanjson_contains('{"ID":1,"Address":["City","東京","Number",1]}','$.ID',1)true
JSON パスが存在するかをチェックjson_contains_path(json, jsonPath)JSON オブジェクト; JSONPathbooleanjson_contains_path('{"ID":1,"Address":["City","Xian","Number",1]}','$.ID')true
JSON パスで値を取得json_extract(json, jsonPath)JSON オブジェクト; JSONPathvalue 型json_extract('{"ID":1,"Address":["City","東京","Number",1]}','$.ID')1
キー名で値を取得json_get(json, key)JSON オブジェクト; キー文字列value 型json_get('{"ID":1,"Address":["City","東京","Number",1]}','ID')1
JSON パスのすべてのキーを取得json_keys(json, jsonPath)JSON オブジェクト; JSONPathJSON 配列json_keys('{"ID":1,"Address":["City","Xian","Number",1]}','$')["ID","Address"]
JSON パスのキー数を取得json_length(json, jsonPath)JSON オブジェクト; JSONPath ("$"json_length(json) と同等)Integerjson_length('{"ID":1,"Address":["City","Xian","Number",1]}','$')2
ルートキー数を取得json_length(json)JSON オブジェクトIntegerjson_length('{"ID":1,"Address":["City","Xian","Number",1]}')2
JSON 文字列を解析json_parse(json)JSON 文字列value 型json_parse('{"ID":1}'){"ID":1}
JSON パスの値を削除json_remove(json, jsonPath)JSON オブジェクト; JSONPathJSONjson_remove('{"loginId":100,"loginTime":"2024-10-10"}','$.loginTime'){"loginId":100}

グローバル関数

フロー制御関数

関数構文説明
Ife_if(bool_expr, func_invoke)bool_expr が true の場合に func_invoke を実行します。e_if(op_gt(id, 10), e_drop())id > 10 のレコードを削除します
If-elsee_if_else(bool_expr, func_invoke1, func_invoke2)true の場合は func_invoke1 を実行し、false の場合は func_invoke2 を実行します。e_if_else(op_gt(id, 10), e_set(tag, 'large'), e_set(tag, 'small'))
Switche_switch(condition1, func1, condition2, func2, ..., default=default_func)条件を順に評価し、最初に一致したものを実行します。一致するものがない場合は default_func を実行します。e_switch(op_gt(id, 100), e_set(str_col, '>100'), op_gt(id, 90), e_set(str_col, '>90'), default=e_set(str_col, '<=90'))
Composee_compose(func1, func2, func3, ...)複数の関数を単一の式として順に実行します。e_compose(e_set(str_col, 'test'), e_set(dt_col, dt_now())) — 1 つの式で 2 つの列を設定します

データ操作関数

関数構文説明
レコードの削除e_drop()現在のレコードを削除します。ターゲットには書き込まれません。e_if(op_gt(id, 10), e_drop())
レコードの保持e_keep(condition)condition が true の場合にのみレコードを保持します。e_keep(op_gt(id, 1))id > 1 のレコードのみを同期します
列の値の設定e_set(col, val, NEW)colval に設定します。NEW (前にカンマを付けない) を渡すと、列を val のデータ型に変換します。NEW を省略し、型の互換性を確保してください。不一致があるとタスクが失敗する可能性があります。e_set(col1, 1, NEW)col1 を数値型に変換し、1
MongoDB フィールドマッピングe_expand_bson_value('*', 'fieldA', {"fieldB":"fieldC"})保持するフィールドを選択し (* = すべて)、fieldA を削除し、fieldBfieldC に名前変更します。フィールド名のマッピングはオプションです。e_expand_bson_value("*", "_id,name")_idname を除くすべてのフィールドをターゲットに書き込みます

次のステップ

  • 完全な SLS ベースの DSL 構文リファレンスについては、「構文の概要」をご参照ください。

  • 同期タスクを作成するには、「同期ソリューション」をご参照ください。