Data Transmission Service (DTS) を使用してデータをレプリケーションする場合、ソースデータはターゲットに到達する前に変換が必要になることがよくあります。例えば、条件に合わない行のフィルタリング、機密性の高い値のマスキング、各レコードへの変更時刻などのメタデータの付与などです。DTS には、この処理ロジックをレプリケーションエンジンとターゲットデータベースの間でインフライトで実行する、ストリーミング抽出・変換・書き出し (ETL) 機能が含まれています。ETL ロジックは、データ同期シナリオ向けに設計されたドメイン特化言語 (DSL) で表現されます。
一般的なユースケース:
データフィルタリング:しきい値を超える ID を持つレコードや、特定の日付より古い注文など、同期基準を満たさないレコードを削除します。
データマスキング:電話番号の下 4 桁をアスタリスクに置き換えるなど、機密性の高い値を隠します。
変更追跡:同期された各行にコミットタイムスタンプを付与し、ターゲットテーブルにデータがいつ変更されたかを記録します。
監査ログ:コンプライアンスやデバッグのために、操作タイプ (INSERT、UPDATE、DELETE) と変更時刻を専用の列に書き込みます。
ETL の仕組み
ETL は、DTS レプリケーションエンジンとターゲットデータベースの間の処理ステップとして実行されます。受信する各データレコードは DSL スクリプトを通過し、スクリプトはレコードをターゲットに書き込む前に、レコードの削除、列の値の変更、または新しい列の追加を行うことができます。
DSL は、Simple Log Service (SLS) のデータ処理構文に基づいています。条件付きロジック、文字列操作、日時操作、数値演算、JSON 処理をサポートしています。イベント分割関数はサポートしていません。DSL には次の属性があります:
強力:多くの関数を提供し、関数の組み合わせをサポートします。
比較的シンプルな構文:データフィルタリング、データ変換、データマスキングなどの典型的なシナリオの例を提供します。
高性能:コード生成技術に基づいており、同期プロセスへのパフォーマンスへの影響を最小限に抑えます。
DSL スクリプト内の列名は、バックティック (``) で囲む必要があります。シングルクォート (') ではありません。
サポートされるデータベース
ETL は、以下のソースデータベースとターゲットデータベースの組み合わせで利用できます。
| ソースデータベース | ターゲットデータベース |
|---|---|
| SQL Server | AnalyticDB for MySQL 3.0、SQL Server、MySQL、PolarDB for MySQL |
| MySQL | AnalyticDB for MySQL 3.0、AnalyticDB for PostgreSQL、Kafka、ApsaraDB for ClickHouse クラスター、MySQL、PolarDB for MySQL、Elasticsearch、Redis |
| セルフマネージド Oracle | AnalyticDB for MySQL 3.0、AnalyticDB for PostgreSQL、Kafka、MaxCompute、PolarDB-X 2.0、PolarDB for PostgreSQL (Oracle 互換) |
| PolarDB for MySQL |
|
| PolarDB for PostgreSQL (Oracle 互換) | AnalyticDB for MySQL 3.0、PolarDB for PostgreSQL (Oracle 互換) |
| PolarDB-X 1.0 | Kafka、Tablestore |
| PolarDB-X 2.0 |
|
| セルフマネージド Db2 for LUW | MySQL |
| セルフマネージド Db2 for i | MySQL |
| PolarDB for PostgreSQL | PolarDB for PostgreSQL、PostgreSQL |
| PostgreSQL | PolarDB for PostgreSQL、PostgreSQL |
| TiDB | PolarDB for MySQL、MySQL、AnalyticDB for MySQL 3.0 |
| MongoDB | Lindorm |
同期タスク作成時の ETL の設定
このトピックでは、同期タスクを例に説明します。移行タスクの設定手順も同様です。
注意事項
| 制約 | 違反した場合の結果 |
|---|---|
| DSL スクリプトでは大文字と小文字が区別されます。データベース名、テーブル名、フィールド名はソースと完全に一致する必要があります。 | スクリプトは通知なくレコードのマッチングに失敗するか、タスクがエラーを報告します。 |
| DSL スクリプトで参照されるすべてのフィールドは、ソースデータベースに存在し、フィルター条件によって除外されていない必要があります。 | フィールドが見つからないというエラーでタスクが失敗します。 |
DSL スクリプトは、単一のトップレベルの式のみをサポートします。複数の操作を組み合わせるには、e_compose を使用してください。 | 検証時にタスクがスクリプトを拒否します。 |
DSL スクリプトが新しい列を追加する場合、タスクを開始する前にその列をターゲットテーブルに手動で追加してください。例えば、スクリプトが e_set(new_column, dt_now()) を使用する場合、まず new_column をターゲットテーブルに追加します。 | ターゲット列が存在しないため、ETL スクリプトは効果がありません。 |
DSL 処理後、すべてのテーブルの DML 変更は一貫した列情報を生成する必要があります。例えば、e_set が列を追加する場合、INSERT、UPDATE、DELETE のすべての操作でその列が追加される結果になる必要があります。 | 操作タイプ間のスキーマの不整合により、タスクが失敗する可能性があります。 |
| DSL スクリプトは、データ変換とクレンジング専用です。 | データベースオブジェクトの作成 (CREATE TABLE など) はサポートされておらず、失敗します。 |
操作手順
同期タスクを作成します。詳細については、「同期ソリューション」をご参照ください。
[詳細設定] ステップで、[ETL の設定] を [はい] に設定します。
テキストボックスに、データ処理 DSL 構文に基づいて DSL スクリプトを入力します。
idが 3 より大きいレコードをフィルタリングするには、e_if(op_gt(id, 3), e_drop())を使用します。これにより、idの値が 3 を超えるレコードはすべて削除され、ターゲットに同期されません。
必要に応じて残りのステップを完了します。
既存の同期タスクの ETL の変更
既存の同期タスクの ETL 設定を更新するには、次のいずれかを選択します:
[ETL の設定] が [いいえ] に設定されていた場合は、[はい] に変更して DSL スクリプトを入力します。
[ETL の設定] が [はい] に設定されていた場合は、DSL スクリプトを変更するか、[ETL の設定] を [いいえ] に戻します。
DSL スクリプトを変更する前に、[選択済みオブジェクト] リストから [ソースオブジェクト] リストに同期オブジェクトを戻し、再度追加してください。このステップを省略すると、タスクが不整合な状態になる可能性があります。
移行タスクの ETL 設定は変更できません。この操作は同期タスクのみがサポートしています。
注意事項
| 制約 | 違反した場合の結果 |
|---|---|
| ETL 設定を変更すると、実行中のタスクが中断される可能性があります。 | 更新中はデータ同期が一時停止します。メンテナンスウィンドウ中に実行してください。 |
| 変更は、タスクの再起動後の増分データにのみ適用されます。すでに同期されている履歴データは再処理されません。 | 変更前に同期されたレコードは影響を受けません。これは意図された仕様です。 |
| ETL の変更中にターゲットテーブルのスキーマ変更はサポートされていません。 | スキーマを変更するには、同期タスクを開始する前にターゲットテーブルを更新してください。 |
| DSL スクリプトの制約 (大文字と小文字の区別、フィールドの存在、単一の式、新しい列) は、新規タスクの場合と同じです。 | 上記、新規タスクの注意事項に記載されている結果と同じです。 |
操作手順
新しい DTS コンソールのデータ同期タスクページにログインします。
対象の同期タスクの行で、
アイコンをクリックし、[ETL 設定の変更] を選択します。[詳細設定] ステップで、[ETL の設定] を [はい] に設定します。
テキストボックスに、更新された DSL スクリプトを入力します。
idが 3 より大きいレコードをフィルタリングするには、e_if(op_gt(id, 3), e_drop())を使用します。これにより、idの値が 3 を超えるレコードはすべて削除され、ターゲットに同期されません。
必要に応じて残りのステップを完了します。
ETL スクリプトの例
以下の例は、最も一般的なシナリオをカバーしています。それぞれ、「DSL 構文リファレンス」セクションで説明されている DSL 関数を使用しています。
データフィルタリング
データフィルタリングを使用して、レコードのサブセットのみを同期します。例えば、最近の注文を分析データベースにレプリケーションする、本番同期からテストデータを除外する、または地域固有のデータレジデンシールールを適用するために属さないレコードを削除するなどです。
id> 10000 のレコードを削除する:e_if(op_gt(`id`, 10000), e_drop())nameに "hangzhou" が含まれるレコードを削除する:e_if(str_contains(`name`, "hangzhou"), e_drop())締切日より前の注文を削除する:
e_if(op_lt(`order_timestamp`, "2015-02-23 23:54:55"), e_drop())複数の条件に一致するレコードを削除する (AND):
e_if(op_and(str_contains(`name`, "hangzhou"), op_gt(`id`, 1000)), e_drop())いずれかの条件に一致するレコードを削除する (OR):
e_if(op_or(str_contains(`name`, "hangzhou"), op_gt(`id`, 1000)), e_drop())
データマスキング
ターゲットデータベースがソースよりも広い範囲のオーディエンスにアクセスされる場合、データマスキングを使用します。例えば、本番データをレポートシステムに同期したり、第三者とデータを共有したりする場合です。マスキングにより、分析用のデータを保持しつつ、個人識別情報 (PII) を保護できます。
電話番号の下 4 桁 (7〜10 番目の位置) をアスタリスクに置き換える:
e_set(`phone`, str_mask(`phone`, 7, 10, '*'))変更追跡
ダウンストリームのコンシューマーが行が最後にいつ変更されたかを知る必要がある場合、変更追跡を使用します。例えば、増分処理パイプラインを実装する、古いレコードを検出する、またはソースデータベースをクエリせずにデータの新鮮さを監査するなどです。
タスクを開始する前に、dts_sync_time 列をターゲットテーブルに追加してください。
INSERT、UPDATE、DELETE 操作のために、すべてのテーブルに dts_sync_time 列を追加する:
e_if(op_or(op_or(
op_eq(__OPERATION__, __OP_INSERT__),
op_eq(__OPERATION__, __OP_UPDATE__)),
op_eq(__OPERATION__, __OP_DELETE__)),
e_set(dts_sync_time, __COMMIT_TIMESTAMP__))これを特定のテーブル (例:dts_test_table) にのみ適用する場合:
e_if(op_and(
op_eq(__TB__,'dts_test_table'),
op_or(op_or(
op_eq(__OPERATION__,__OP_INSERT__),
op_eq(__OPERATION__,__OP_UPDATE__)),
op_eq(__OPERATION__,__OP_DELETE__))),
e_set(dts_sync_time,__COMMIT_TIMESTAMP__))監査ログ
ターゲットテーブルにデータ変更の完全な履歴が必要な場合、監査ログを使用します。例えば、コンプライアンス要件を満たす、ロールバックシナリオをサポートする、または別の監査システムなしで変更ログを構築するなどです。このパターンは、すべての受信変更を INSERT として書き込むため、ターゲットテーブルにはすべての変更の完全なレコードが蓄積されます。
タスクを開始する前に、operation_type と updated 列をターゲットテーブルに追加してください。
DML 操作タイプとコミットタイムスタンプをターゲット行に書き込み、すべての操作を INSERT に変換する:
e_compose(
e_switch(
op_eq(__OPERATION__,__OP_DELETE__), e_set(operation_type, 'DELETE'),
op_eq(__OPERATION__,__OP_UPDATE__), e_set(operation_type, 'UPDATE'),
op_eq(__OPERATION__,__OP_INSERT__), e_set(operation_type, 'INSERT')),
e_set(updated, __COMMIT_TIMESTAMP__),
e_set(__OPERATION__,__OP_INSERT__)
)完全データと増分データの区別
このパターンは、ターゲットテーブルが完全移行データと増分変更の両方を受け取り、ダウンストリームのコンシューマーがそれらを区別する必要がある場合に使用します。例えば、ストリーミングパイプラインで履歴行の再処理をスキップする、または増分ロジックを適用する前に初期ロードが完了したことを検証するなどです。
完全移行中、__COMMIT_TIMESTAMP__ は 0 (1970-01-01 08:00:00、ローカルタイムゾーンに合わせて調整) です。増分移行中は、実際のログコミット時刻を保持します。以下のスクリプトは、True または False を is_increment_dml 列に書き込みます:
e_if_else(__COMMIT_TIMESTAMP__ > DATETIME('2000-01-01 00:00:00'),
e_set(`is_increment_dml`, True),
e_set(`is_increment_dml`, False)
)DSL 構文リファレンス
組み込み変数
以下の組み込み変数は、すべての DSL スクリプトで利用できます。
| 変数 | 説明 | データの型 | 値の例 |
|---|---|---|---|
__TB__ | テーブル名 | string | table |
__DB__ | データベース名 | string | mydb |
__OPERATION__ | DML 操作タイプ | string | __OP_INSERT__、__OP_UPDATE__、__OP_DELETE__ |
__BEFORE__ | UPDATE の更新前イメージ (変更前の値)。DELETE 操作には更新前イメージのみがあります。 | 特殊マーク | v(column_name,__BEFORE__) |
__AFTER__ | UPDATE の更新後イメージ (変更後の値)。INSERT 操作には更新後イメージのみがあります。 | 特殊マーク | v(column_name,__AFTER__) |
__COMMIT_TIMESTAMP__ | トランザクションのコミット時刻 | datetime | '2021-01-01 10:10:01' |
` column ` | 現在のレコードの指定された列の値 | string | ` id , name ` |
定数
| タイプ | 例 |
|---|---|
| int | 123 |
| float | 123.4 |
| string | "hello1_world" |
| boolean | true または false |
| datetime | DATETIME('2021-01-01 10:10:01') |
式関数
数値演算
| 操作 | 構文 | パラメーター | 戻り値 | 例 |
|---|---|---|---|---|
| 加算 | op_sum(value1, value2) | integer または float | 両方の入力が integer の場合は Integer、それ以外は float | op_sum(col1, 1.0) |
| 減算 | op_sub(value1, value2) | integer または float | 両方の入力が integer の場合は Integer、それ以外は float | op_sub(col1, 1.0) |
| 乗算 | op_mul(value1, value2) | integer または float | 両方の入力が integer の場合は Integer、それ以外は float | op_mul(col1, 1.0) |
| 部門 | op_div_true(value1, value2) | integer または float | 両方の入力が integer の場合は Integer、それ以外は float | op_div_true(col1, 2.0) — col1=15 の場合、7.5 |
| 剰余 | op_mod(value1, value2) | integer または float | 両方の入力が integer の場合は Integer、それ以外は float | op_mod(col1, 10) — col1=23 の場合、3 |
論理演算
| 操作 | 構文 | パラメーター | 戻り値 | 例 |
|---|---|---|---|---|
| 等しい | op_eq(value1, value2) | integer、float、または string | boolean | op_eq(col1, 23) |
| より大きい | op_gt(value1, value2) | integer、float、または string | boolean | op_gt(col1, 1.0) |
| より小さい | op_lt(value1, value2) | integer、float、または string | boolean | op_lt(col1, 1.0) |
| 以上 | op_ge(value1, value2) | integer、float、または string | boolean | op_ge(col1, 1.0) |
| 以下 | op_le(value1, value2) | integer、float、または string | boolean | op_le(col1, 1.0) |
| AND | op_and(value1, value2) | boolean | boolean | op_and(is_male, is_student) |
| OR | op_or(value1, value2) | boolean | boolean | op_or(is_male, is_student) |
| IN | op_in(value, json_array) | 任意の型; JSON 配列文字列 | boolean | op_in(id,json_array('["0","1","2","3"]')) |
| Is null | op_is_null(value) | 任意の型 | boolean | op_is_null(name) |
| Is not null | op_is_not_null(value) | 任意の型 | boolean | op_is_not_null(name) |
文字列関数
| 操作 | 構文 | パラメーター | 戻り値 | 例 |
|---|---|---|---|---|
| 文字列の連結 | op_add(str_1, str_2, ..., str_n) | 文字列 | 連結された文字列 | op_add(col,'hangzhou','dts') |
| フォーマットと連結 | str_format(format, value1, value2, ...) | format:{} プレースホルダーを含む文字列; values:任意 | フォーマットされた文字列 | str_format("part1: {}, part2: {}", col1, col2) — col1="ab" かつ col2="12" の場合、"part1: ab, part2: 12" |
| 部分文字列の置換 | str_replace(original, oldStr, newStr, count) | count:最大置換回数; -1 はすべて置換 | 置換後の文字列 | str_replace(name, "a", 'b', -1) — name="aba" の場合、"bbb" |
| すべての文字列型フィールドで置換 | tail_replace_string_field(search, replace, all) | all:true | 置換後の文字列 | tail_replace_string_field('\u000f','',true) — すべての varchar、text、char フィールドで \u000f を空文字列に置換します |
| 両端の文字を削除 | str_strip(string_val, charSet) | charSet:削除する文字 | 削除後の文字列 | str_strip(name, 'ab') — name="axbzb" の場合、"xbz" |
| 小文字化 | str_lower(value) | string | 小文字の文字列 | str_lower(str_col) |
| 大文字化 | str_upper(value) | string | 大文字の文字列 | str_upper(str_col) |
| 文字列を整数に変換 | cast_string_to_long(value) | string | Integer | cast_string_to_long(col) |
| 整数を文字列に変換 | cast_long_to_string(value) | integer | String | cast_long_to_string(col) |
| 部分文字列の出現回数をカウント | str_count(str, pattern) | string; 部分文字列 | Integer | str_count(str_col, 'abc') — str_col="zabcyabcz" の場合、2 |
| 部分文字列の位置を検索 | str_find(str, pattern) | string; 部分文字列 | 最初の一致の位置; 見つからない場合は -1 | str_find(str_col, 'abc') — str_col="xabcy" の場合、1 |
| アルファベットかどうかをチェック | str_isalpha(str) | string | boolean | str_isalpha(str_col) |
| 数値かどうかをチェック | str_isdigit(str) | string | boolean | str_isdigit(str_col) |
| 正規表現マッチ | regex_match(str, regex) | string; regex 文字列 | boolean | regex_match(__TB__,'user_\\\d+') |
| 部分文字列をマスク | str_mask(str, start, end, maskStr) | start 最小値:0; end 最大値:length−1; maskStr:単一文字 | マスクされた文字列 | str_mask(phone, 7, 10, '#') |
| マーカーの後の部分文字列 | substring_after(str, cond) | string; マーカー文字列 | マーカーの後の文字列 (マーカーは含まない) | substring_after(col, 'abc') |
| マーカーの前の部分文字列 | substring_before(str, cond) | string; マーカー文字列 | マーカーの前の文字列 (マーカーは含まない) | substring_before(col, 'efg') |
| 2 つのマーカー間の部分文字列 | substring_between(str, cond1, cond2) | string; 2 つのマーカー文字列 | マーカー間の文字列 (マーカーは含まない) | substring_between(col, 'abc','efg') |
| 値が文字列かどうかをチェック | is_string_value(value) | string または列名 | boolean | is_string_value(col1) |
| MongoDB ドキュメントフィールドを取得 | bson_value("field1", "field2", ...) | 各レベルのフィールド名 | フィールド値 | e_set(user_name, bson_value("person","name")) |
時間関数
| 操作 | 構文 | パラメーター | 戻り値 | 例 |
|---|---|---|---|---|
| 現在のシステム時刻 (秒精度) | dt_now() | なし | datetime | dt_now() |
| 現在のシステム時刻 (ミリ秒精度) | dt_now_millis() | なし | datetime | dt_now_millis() |
| 協定世界時 (UTC) タイムスタンプ (秒) を datetime に変換 | dt_fromtimestamp(value, [timezone]) | integer; オプションのタイムゾーン | datetime (秒精度) | dt_fromtimestamp(1626837629,'GMT+08') |
| UTC タイムスタンプ (ミリ秒) を datetime に変換 | dt_fromtimestamp_millis(value, [timezone]) | integer; オプションのタイムゾーン | datetime (ミリ秒精度) | dt_fromtimestamp_millis(1626837629123,'GMT+08') |
| datetime を UTC タイムスタンプ (秒) に変換 | dt_parsetimestamp(value, [timezone]) | datetime; オプションのタイムゾーン | Integer | dt_parsetimestamp(datetime_col,'GMT+08') |
| datetime を UTC タイムスタンプ (ミリ秒) に変換 | dt_parsetimestamp_millis(value, [timezone]) | datetime; オプションのタイムゾーン | Integer | dt_parsetimestamp_millis(datetime_col,'GMT+08') |
| datetime を文字列に変換 | dt_str(value, format) | datetime; フォーマット文字列 | String | dt_str(col1, 'yyyy-MM-dd HH:mm:ss') |
| 文字列を datetime に変換 | dt_strptime(value, format) | string; フォーマット文字列 | datetime | dt_strptime('2021-07-21 03:20:29', 'yyyy-MM-dd hh:mm:ss') |
| 時間単位の加算または減算 | dt_add(value, [years=n], [months=n], [days=n], [hours=n], [minutes=n]) | datetime; 整数オフセット (減算の場合は負) | datetime | dt_add(datetime_col, years=-1) |
条件式
| 構文 | 説明 | 例 |
|---|---|---|
(cond ? val_1 : val_2) | 三項演算子。cond が true の場合は val_1 を、それ以外の場合は val_2 を返します。val_1 と val_2 は同じ型である必要があります。 | (id>1000 ? 1 : 0) |
JSON 関数
value 型は、データベース内の任意のフィールドの型を表します。| 操作 | 構文 | パラメーター | 戻り値 | 例 |
|---|---|---|---|---|
| JSON 配列文字列をセットに変換 | json_array(arrayText) — boolean 式でのみ使用可能 | arrayText:JSON 配列文字列 | セットコレクション | op_in(id,json_array('["0","1","2","3"]')) |
| JSON 配列を作成 | json_array2(item...) | 任意の型のアイテム | JSON 配列 | json_array2("0","1","2","3") は ["0","1","2","3"] |
| JSON オブジェクトを作成 | json_object(item...) | キーと値のペア (キーは文字列、値は任意) | JSON | json_object('name','田中太郎','age',32,'loginId',100) は {"name":"田中太郎","age":32,"loginId":100} |
| JSON 配列の位置に挿入 | json_array_insert(json, kvPairs...) | JSONPath の位置; 挿入するデータ。位置が存在しない場合は元の JSON を返します。要素が存在しない場合は配列の末尾に追加します。 | JSON | json_array_insert('{"Address":["City",1]}','$.Address[3]',100) は {"Address":["City",1,100]} |
| JSON オブジェクトに挿入 (存在しない場合のみ) | json_insert(json, kvPairs...) | JSONPath の位置; 挿入するデータ。位置が存在する場合は元の JSON を返します。 | JSON | json_insert('{"Address":["City","東京","Number",1]}','$.ID',100) は {"Address":["City","東京","Number",1],"ID":100} |
| JSON パスで挿入または更新 | json_set(json, kvPairs...) | JSONPath の位置; データ。位置が存在する場合は更新し、存在しない場合は挿入します。 | value 型 | json_set('{"ID":1,"Address":["City","Xian","Number",1]}',"$.IP",100) を実行すると、{"ID":1,"Address":["City","Xian","Number",1],"IP":100} |
| キーと値のペアを挿入または更新 | json_put(json, key, value) | JSON オブジェクト; キー文字列; 値。json が JSON オブジェクトでない場合は null を返します。 | JSON | json_put('{"loginId":100}','loginTime','2024-10-10') は {"loginId":100,"loginTime":"2024-10-10"} |
| JSON パスで置換 | json_replace(json, kvPairs...) | JSONPath の位置; 置換データ。位置が存在しない場合は元の JSON を返します。 | value 型 | json_replace('{"ID":1,"Address":["City","Xian","Number",1]}',"$.IP",100) は、元の JSON を返します (位置が存在しないため) |
| JSON パスに値が存在するかをチェック | json_contains(json, jsonPath, item) | JSON オブジェクト; JSONPath; チェックするデータ | boolean | json_contains('{"ID":1,"Address":["City","東京","Number",1]}','$.ID',1) は true |
| JSON パスが存在するかをチェック | json_contains_path(json, jsonPath) | JSON オブジェクト; JSONPath | boolean | json_contains_path('{"ID":1,"Address":["City","Xian","Number",1]}','$.ID') は true |
| JSON パスで値を取得 | json_extract(json, jsonPath) | JSON オブジェクト; JSONPath | value 型 | json_extract('{"ID":1,"Address":["City","東京","Number",1]}','$.ID') は 1 |
| キー名で値を取得 | json_get(json, key) | JSON オブジェクト; キー文字列 | value 型 | json_get('{"ID":1,"Address":["City","東京","Number",1]}','ID') は 1 |
| JSON パスのすべてのキーを取得 | json_keys(json, jsonPath) | JSON オブジェクト; JSONPath | JSON 配列 | json_keys('{"ID":1,"Address":["City","Xian","Number",1]}','$') は ["ID","Address"] |
| JSON パスのキー数を取得 | json_length(json, jsonPath) | JSON オブジェクト; JSONPath ("$" は json_length(json) と同等) | Integer | json_length('{"ID":1,"Address":["City","Xian","Number",1]}','$') は 2 |
| ルートキー数を取得 | json_length(json) | JSON オブジェクト | Integer | json_length('{"ID":1,"Address":["City","Xian","Number",1]}') は 2 |
| JSON 文字列を解析 | json_parse(json) | JSON 文字列 | value 型 | json_parse('{"ID":1}') は {"ID":1} |
| JSON パスの値を削除 | json_remove(json, jsonPath) | JSON オブジェクト; JSONPath | JSON | json_remove('{"loginId":100,"loginTime":"2024-10-10"}','$.loginTime') は {"loginId":100} |
グローバル関数
フロー制御関数
| 関数 | 構文 | 説明 | 例 |
|---|---|---|---|
| If | e_if(bool_expr, func_invoke) | bool_expr が true の場合に func_invoke を実行します。 | e_if(op_gt(id, 10), e_drop()) — id > 10 のレコードを削除します |
| If-else | e_if_else(bool_expr, func_invoke1, func_invoke2) | true の場合は func_invoke1 を実行し、false の場合は func_invoke2 を実行します。 | e_if_else(op_gt(id, 10), e_set(tag, 'large'), e_set(tag, 'small')) |
| Switch | e_switch(condition1, func1, condition2, func2, ..., default=default_func) | 条件を順に評価し、最初に一致したものを実行します。一致するものがない場合は default_func を実行します。 | e_switch(op_gt(id, 100), e_set(str_col, '>100'), op_gt(id, 90), e_set(str_col, '>90'), default=e_set(str_col, '<=90')) |
| Compose | e_compose(func1, func2, func3, ...) | 複数の関数を単一の式として順に実行します。 | e_compose(e_set(str_col, 'test'), e_set(dt_col, dt_now())) — 1 つの式で 2 つの列を設定します |
データ操作関数
| 関数 | 構文 | 説明 | 例 |
|---|---|---|---|
| レコードの削除 | e_drop() | 現在のレコードを削除します。ターゲットには書き込まれません。 | e_if(op_gt(id, 10), e_drop()) |
| レコードの保持 | e_keep(condition) | condition が true の場合にのみレコードを保持します。 | e_keep(op_gt(id, 1)) — id > 1 のレコードのみを同期します |
| 列の値の設定 | e_set(col, val, NEW) | col を val に設定します。NEW (前にカンマを付けない) を渡すと、列を val のデータ型に変換します。NEW を省略し、型の互換性を確保してください。不一致があるとタスクが失敗する可能性があります。 | e_set(col1, 1, NEW) — col1 を数値型に変換し、1 |
| MongoDB フィールドマッピング | e_expand_bson_value('*', 'fieldA', {"fieldB":"fieldC"}) | 保持するフィールドを選択し (* = すべて)、fieldA を削除し、fieldB を fieldC に名前変更します。フィールド名のマッピングはオプションです。 | e_expand_bson_value("*", "_id,name") — _id と name を除くすべてのフィールドをターゲットに書き込みます |