All Products
Search
Document Center

Data Transmission Service:Configure ETL in a DTS migration or sync task

Last Updated:Dec 05, 2025

Data Transmission Service (DTS) provides a streaming data extract, transform, and load (ETL) feature. This feature, combined with the efficient data replication capabilities of DTS, lets you extract, transform, process, and load streaming data. This topic describes how to configure ETL for a DTS task and provides related syntax information. You can use the ETL feature in scenarios such as data filtering, data masking, recording data modification times, and auditing data changes.

Background information

DTS is a data migration and synchronization service that is typically used for data relocation or real-time data transmission. However, you may sometimes need to process data, such as transforming or filtering real-time data before writing it to a database. To meet these needs, DTS provides a streaming data ETL feature. This feature supports a domain-specific language (DSL) scripting language to flexibly define data processing logic. For an introduction to DSL and its configuration syntax, see Introduction to the data processing DSL syntax.

DTS supports two methods for configuring ETL.

Note

You can configure ETL for both DTS migration and sync tasks. This topic uses a sync task as an example. The configuration method for a migration task is similar.

Supported databases

The following table lists the source and destination databases that support ETL.

Source database

Destination database

SQL Server

  • AnalyticDB for MySQL 3.0

  • SQL Server

  • MySQL

  • PolarDB for MySQL

MySQL

  • AnalyticDB for MySQL 3.0

  • AnalyticDB for PostgreSQL

  • Kafka

  • ApsaraDB for ClickHouse cluster

  • MySQL

  • PolarDB for MySQL

  • Elasticsearch

  • Redis

Self-managed Oracle

  • AnalyticDB for MySQL 3.0

  • AnalyticDB for PostgreSQL

  • Kafka

  • MaxCompute

  • PolarDB-X 2.0

  • PolarDB for PostgreSQL (Compatible with Oracle)

PolarDB for MySQL

  • AnalyticDB for MySQL 3.0

  • MySQL

  • PolarDB for MySQL

PolarDB for PostgreSQL (Compatible with Oracle)

  • AnalyticDB for MySQL 3.0

  • PolarDB for PostgreSQL (Compatible with Oracle)

PolarDB-X 1.0

  • Kafka

  • Tablestore

PolarDB-X 2.0

  • PolarDB-X 2.0

  • AnalyticDB for MySQL 3.0

  • MySQL

  • PolarDB for MySQL

Self-managed Db2 for LUW

MySQL

Self-managed Db2 for i

MySQL

PolarDB for PostgreSQL

  • PolarDB for PostgreSQL

  • PostgreSQL

PostgreSQL

  • PolarDB for PostgreSQL

  • PostgreSQL

TiDB

  • PolarDB for MySQL

  • MySQL

  • AnalyticDB for MySQL 3.0

MongoDB

Lindorm

Configure ETL when you create a sync task

Notes

  • If your ETL script includes an operation to add a new column, you must manually add that column to the destination database. Otherwise, the ETL script will not take effect. For example, if you use e_set(`new_column`, dt_now()), you must manually add the new_column column to the destination database.

  • DSL scripts are used primarily for data transformation and cleansing. They do not support the creation of database objects.

  • The fields configured in the DSL script must exist in the source database and cannot be fields that are filtered out by filter conditions. Otherwise, the task may fail.

  • DSL scripts are case-sensitive. The database, table, and field names must match the names in the source database exactly.

  • DSL scripts do not support multiple expressions. You can use the e_compose function to combine multiple expressions into a single expression.

  • After being processed by the DSL script, DML changes for all tables in the source database must have the same column information. Otherwise, the task may fail. For example, if you use the e_set function in a DSL script to add a new column, you must ensure that INSERT, UPDATE, and DELETE operations in the source database all result in a new column being added to the destination table. For more information, see Record data modification time.

Procedure

  1. Create a sync task. For more information, see Sync solutions.

  2. In the Advanced Configurations step, set Configure ETL to Yes.

  3. In the text box, enter the data transformation (ETL) statement based on the Data processing DSL syntax.

    image

    Note

    For example, to use DSL to process records where the ID is greater than 3, you can use e_if(op_gt(`id`, 3), e_drop()). In this script, op_gt is a conditional function that checks if a value is greater than another value, and id is a variable. This script filters out records where the ID is greater than 3.

  4. Complete the subsequent steps as needed.

Modify the ETL configuration of an existing sync task

You can modify the ETL configuration of an existing sync task in the following ways:

  • If a sync task was created without ETL configured (the Configure ETL option was set to No), you can change the setting from No to Yes and configure a DSL script.

  • If a sync task has ETL configured, you can modify the DSL script or set Configure ETL to No.

    Important
    • Before you modify an existing DSL script, you must move the sync objects from the Selected Objects list to the Source Objects list and then add them back to the Selected Objects list.

    • You cannot modify the DSL script for a migration task.

Notes

  • Modifying the ETL configuration of an existing sync task does not support changing the table schema of the destination table. To make schema changes, you must change the table schema in the destination database before you start the sync task.

  • Modifying the ETL configuration may interrupt the task. Proceed with caution.

  • Changes to the ETL configuration take effect only on incremental data after the sync task is started. These changes do not affect historical data that was synchronized before the modification.

  • DSL scripts are used primarily for data transformation and cleansing. They do not support the creation of database objects.

  • The fields configured in the DSL script must exist in the source database and cannot be fields that are filtered out by filter conditions. Otherwise, the task may fail.

  • DSL scripts are case-sensitive. The database, table, and field names must match the names in the source database exactly.

  • DSL scripts do not support multiple expressions. You can use the e_compose function to combine multiple expressions into a single expression.

  • After being processed by the DSL script, DML changes for all tables in the source database must have the same column information. Otherwise, the task may fail. For example, if you use the e_set function in a DSL script to add a new column, you must ensure that INSERT, UPDATE, and DELETE operations in the source database all result in a new column being added to the destination table. For more information, see Record data modification time.

Procedure

  1. Log on to the Data Synchronization Tasks page of the new DTS console.

  2. In the row for the target sync task, click the 点点点 icon and select Modify ETL Configurations.

  3. In the Advanced Configurations step, set Configure ETL to Yes.

  4. In the text box, enter the data transformation (ETL) statement based on the Data processing DSL syntax.

    image

    Note

    For example, to use DSL to process records where the ID is greater than 3, you can use e_if(op_gt(`id`, 3), e_drop()). In this script, op_gt is a conditional function that checks if a value is greater than another value, and id is a variable. This script filters out records where the ID is greater than 3.

  5. Complete the subsequent steps as needed.

Introduction to the data processing DSL syntax

The data processing DSL is a scripting language designed by DTS for data processing in data synchronization scenarios. It supports conditional functions and can process strings, dates, and numbers. It lets you flexibly define data processing logic and has the following attributes:

  • Powerful: Provides many functions and supports function composition.

  • Relatively simple syntax: Provides examples for typical scenarios, such as data filtering, data transformation, and data masking. For more information, see Examples for typical scenarios.

  • High performance: Based on code generation technology, it has a minimal performance impact on the synchronization process.

Note
  • In DSL syntax, column names are enclosed in backticks (``), not single quotation marks ('').

  • This feature is based on the data processing syntax of Simple Log Service (SLS). It supports JSON functions but does not support event-splitting functions. For more information about the SLS syntax, see Syntax overview.

Examples for typical scenarios

Data filtering

  • Filter by a numeric column: If `id` > 10000, drop the record and do not synchronize it to the destination database: e_if(op_gt(`id`, 10000), e_drop()).

  • Filter by a string match condition: If the `name` column contains "hangzhou", drop the record: e_if(str_contains(`name`, "hangzhou"), e_drop()).

  • Filter by date: If the order time is earlier than a specific time, do not synchronize the record: e_if(op_lt(`order_timestamp`, "2015-02-23 23:54:55"), e_drop()).

  • Filter by multiple conditions:

    • If `id` > 1000 AND the `name` column contains "hangzhou", drop the record: e_if(op_and(str_contains(`name`, "hangzhou"), op_gt(`id`, 1000)), e_drop()).

    • If `id` > 1000 OR the `name` column contains "hangzhou", drop the record: e_if(op_or(str_contains(`name`, "hangzhou"), op_gt(`id`, 1000)), e_drop()).

Data masking

Masking: Replace the last four digits of the `phone` column with asterisks: e_set(`phone`, str_mask(`phone`, 7, 10, '*')).

Record data modification time

  • Add a new column to all tables: If the value of __OPERATION__ is INSERT, UPDATE, or DELETE, add a new column named "dts_sync_time". Set its value to the log commit time (__COMMIT_TIMESTAMP__).

    e_if(op_or(op_or(
            op_eq(__OPERATION__, __OP_INSERT__),
            op_eq(__OPERATION__, __OP_UPDATE__)),
            op_eq(__OPERATION__, __OP_DELETE__)),
        e_set(dts_sync_time, __COMMIT_TIMESTAMP__))
  • Add a new column to a specific table named "dts_test_table": If the value of __OPERATION__ is INSERT, UPDATE, or DELETE, add a new column named "dts_sync_time". Set its value to the log commit time (__COMMIT_TIMESTAMP__).

    e_if(op_and(
          op_eq(__TB__,'dts_test_table'),
          op_or(op_or(
            op_eq(__OPERATION__,__OP_INSERT__),
            op_eq(__OPERATION__,__OP_UPDATE__)),
            op_eq(__OPERATION__,__OP_DELETE__))),
          e_set(dts_sync_time,__COMMIT_TIMESTAMP__))
    Note

    For the preceding operations that add a new column, you must modify the destination table definition to add the "dts_sync_time" column before you start the task.

Audit data changes

Audit data changes for a table: Record the data change type in the "operation_type" column and the time of the data change in the "updated" column of the destination table.

e_compose(
    e_switch(
        op_eq(__OPERATION__,__OP_DELETE__), e_set(operation_type, 'DELETE'),
        op_eq(__OPERATION__,__OP_UPDATE__), e_set(operation_type, 'UPDATE'),
        op_eq(__OPERATION__,__OP_INSERT__), e_set(operation_type, 'INSERT')),
    e_set(updated, __COMMIT_TIMESTAMP__),
    e_set(__OPERATION__,__OP_INSERT__)
)
Note

You must add the "operation_type" and "updated" columns to the destination table before you start the task.

Distinguish between full and incremental data

Record whether the data is from a full migration or an incremental migration in the is_increment_dml column of the destination table. You can distinguish between full and incremental migration by checking the value of __COMMIT_TIMESTAMP__. For a full migration, the value of __COMMIT_TIMESTAMP__ is 0 (1970-01-01 08:00:00, which is affected by the time zone). For an incremental migration, its value is the time when the log was written to the source database. The corresponding ETL script is as follows:

e_if_else(__COMMIT_TIMESTAMP__ > DATETIME('2000-01-01 00:00:00'), 
    e_set(`is_increment_dml`, True),
    e_set(`is_increment_dml`, False)
)

Data processing DSL syntax

Constants and variables

  • Constants

    Type

    Example

    int

    123

    float

    123.4

    string

    "hello1_world"

    boolean

    true or false

    datetime

    DATETIME('2021-01-01 10:10:01')

  • Variables

    Variable

    Description

    Data type

    Example value

    __TB__

    Table name

    string

    table

    __DB__

    Database name

    string

    mydb

    __OPERATION__

    Operation type

    string

    __OP_INSERT__,__OP_UPDATE__,__OP_DELETE__

    __BEFORE__

    The before image of an UPDATE operation (the value before the update)

    Note

    A DELETE operation has only a before image.

    Special mark, no type

    v(`column_name`,__BEFORE__)

    __AFTER__

    The after image of an UPDATE operation (the value after the update)

    Note

    An INSERT operation has only an after image.

    Special mark, no type

    v(`column_name`,__AFTER__)

    __COMMIT_TIMESTAMP__

    Transaction commit time

    datetime

    '2021-01-01 10:10:01'

    `column`

    The value of the specified column in a data record

    string

    `id`, `name`

Expression functions

  • Numeric operations

    Feature

    Syntax

    Value range

    Return value

    Example

    Addition

    op_sum(value1, value2)

    • value1: integer or floating-point number

    • value2: integer or floating-point number

    If both parameters are integers, an integer is returned. Otherwise, a floating-point number is returned.

    op_sum(`col1`, 1.0)

    Subtraction

    op_sub(value1, value2)

    • value1: integer or floating-point number

    • value2: integer or floating-point number

    If both parameters are integers, an integer is returned. Otherwise, a floating-point number is returned.

    op_sub(`col1`, 1.0)

    Multiplication

    op_mul(value1, value2)

    • value1: integer or floating-point number

    • value2: integer or floating-point number

    If both parameters are integers, an integer is returned. Otherwise, a floating-point number is returned.

    op_mul(`col1`, 1.0)

    Division

    op_div_true(value1, value2)

    • value1: integer or floating-point number

    • value2: integer or floating-point number

    If both parameters are integers, an integer is returned. Otherwise, a floating-point number is returned.

    op_div_true(`col1`, 2.0). If col1=15, 7.5 is returned.

    Modulo operation

    op_mod(value1, value2)

    • value1: integer or floating-point number

    • value2: integer or floating-point number

    If both parameters are integers, an integer is returned. Otherwise, a floating-point number is returned.

    op_mod(`col1`, 10). If col1=23, 3 is returned.

  • Logical operations

    Feature

    Syntax

    Value range

    Return value

    Example

    Equals

    op_eq(value1, value2)

    • value1: integer, floating-point number, or string

    • value2: integer, floating-point number, or string

    boolean: true or false

    op_eq(`col1`, 23)

    Greater than

    op_gt(value1, value2)

    • value1: integer, floating-point number, or string

    • value2: integer, floating-point number, or string

    boolean: true or false

    op_gt(`col1`, 1.0)

    Less than

    op_lt(value1, value2)

    • value1: integer, floating-point number, or string

    • value2: integer, floating-point number, or string

    boolean: true or false

    op_lt(`col1`, 1.0)

    Greater than or equal to

    op_ge(value1, value2)

    • value1: integer, floating-point number, or string

    • value2: integer, floating-point number, or string

    boolean: true or false

    op_ge(`col1`, 1.0)

    Less than or equal to

    op_le(value1, value2)

    • value1: integer, floating-point number, or string

    • value2: integer, floating-point number, or string

    boolean: true or false

    op_le(`col1`, 1.0)

    AND operation

    op_and(value1, value2)

    • value1: boolean

    • value2: boolean

    boolean: true or false

    op_and(`is_male`, `is_student`)

    OR operation

    op_or(value1, value2)

    • value1: boolean

    • value2: boolean

    boolean: true or false

    op_or(`is_male`, `is_student`)

    IN operation

    op_in(value, json_array)

    • value: any type

    • json_array: a string in the JSON format

    boolean: true or false

    op_in(`id`,json_array('["0","1","2","3","4","5","6","7","8"]'))

    Is null

    op_is_null(value)

    value: any type

    boolean: true or false

    op_is_null(`name`)

    Is not null

    op_is_not_null(value)

    value: any type

    boolean: true or false

    op_is_not_null(`name`)

  • String functions

    Feature

    Syntax

    Value range

    Return value

    Example

    String concatenation

    op_add(str_1,str_2,...,str_n)

    • str_1: string

    • str_2: string

    • ...

    • str_n: string

    The concatenated string

    op_add(`col`,'hangzhou','dts')

    String formatting and concatenation

    str_format(format, value1, value2, value3, ...)

    • format: a string that uses curly braces ({}) as placeholders, such as "part1: {}, part2: {}".

    • value1: any

    • value2: any

    The formatted string

    str_format("part1: {}, part2: {}", `col1`, `col2`). If col1="ab" and col2="12", "part1: ab, part2: 12" is returned.

    String replacement

    str_replace(original, oldStr, newStr, count)

    • original: the original string

    • oldStr: the string to be replaced

    • newStr: the replacement string

    • count: an integer that specifies the maximum number of replacements. If you set this parameter to -1, all occurrences are replaced.

    The string after replacement

    str_replace(`name`, "a", 'b', 1). If name="aba", "bba" is returned. str_replace(`name`, "a", 'b', -1). If name="aba", "bbb" is returned.

    Replace values in all string-type fields (such as varchar, text, and char)

    tail_replace_string_field(search, replace, all)

    • search: the string to be replaced

    • replace: the replacement string

    • all: specifies whether to replace all matched strings. This parameter supports only true.

      Note

      If you do not need to replace all matched strings, use the str_replace function.

    The string after replacement

    tail_replace_string_field('\u000f','',true). Replaces "\u000f" with a space in the values of all string-type fields.

    Remove specific characters from the beginning and end of a string

    str_strip(string_val, charSet)

    • string_val: the original string

    • char_set: the collection of characters to be removed

    The string after the characters are removed from the beginning and end

    str_strip(`name`, 'ab'). If name=axbzb, xbz is returned.

    Convert a string to lowercase

    str_lower(value)

    value: a string column or string constant

    A lowercase string

    str_lower(`str_col`)

    Convert a string to uppercase

    str_upper(value)

    value: a string column or string constant

    An uppercase string

    str_upper(`str_col`)

    Convert a string to a number

    cast_string_to_long(value)

    value: string

    Integer

    cast_string_to_long(`col`)

    Convert a number to a string

    cast_long_to_string(value)

    value: integer

    String

    cast_long_to_string(`col`)

    Count occurrences of a substring

    str_count(str,pattern)

    • str: a string column or string constant

    • pattern: the substring to find

    The number of occurrences of the substring

    str_count(`str_col`, 'abc'). If str_col="zabcyabcz", 2 is returned.

    Find a substring

    str_find(str, pattern)

    • str: a string column or string constant

    • pattern: the substring to find

    The position of the first match of the substring. If no match is found, `-1` is returned.

    str_find(`str_col`, 'abc'). If `str_col="xabcy"`, `1` is returned.

    Check if a string consists of only letters

    str_isalpha(str)

    str: a string column or string constant

    true or false

    str_isalpha(`str_col`)

    Check if a string consists of only digits

    str_isdigit(str)

    • str: a string column or string constant

    true or false

    str_isdigit(`str_col`)

    Regular expression matching

    regex_match(str,regex)

    • str: a string column or string constant

    • regex: a regular expression string column or string constant

    true or false

    regex_match(__TB__,'user_\\d+')

    Mask a part of a string with a specified character. This can be used for data masking, such as replacing the last four digits of a phone number with asterisks.

    str_mask(str, start, end, maskStr)

    • str: a string column or string constant

    • start: an integer that specifies the start position of the mask. The minimum value is 0.

    • end: an integer that specifies the end position of the mask. The maximum value is the string length minus 1.

    • maskStr: a string of length 1, such as '#'.

    The string after the part from the start position to the end position is masked

    str_mask(`phone`, 7, 10, '#')

    Get the substring after a specified string

    substring_after(str, cond)

    • str: the original string

    • cond: string

    String

    Note

    The return value does not include the string cond.

    substring_after(`col`, 'abc')

    Get the substring before a specified string

    substring_before(str, cond)

    • str: the original string

    • cond: string

    String

    Note

    The return value does not include the string cond.

    substring_before(`col`, 'efg')

    Get the substring between two specified strings

    substring_between(str, cond1, cond2)

    • str: the original string

    • cond1: string

    • cond2: string

    String

    Note

    The return value does not include the strings cond1 and cond2.

    substring_between(`col`, 'abc','efg')

    Check if the value is a string

    is_string_value(value)

    value: a string or column name

    boolean: true or false

    is_string_value(`col1`)

    Replace content in string-type fields, starting in reverse from the end.

    tail_replace_string_field(search, replace, all)

    search: the string to be replaced

    replace: the replacement string

    all: specifies whether to replace all occurrences. The value can be true or false.

    The string after replacement

    Replace "\u000f" with a space in the values of all string-type fields.

    tail_replace_string_field('\u000f','',true)

    Get the value of a field in MongoDB

    bson_value("field1","field2","field3",...)

    • field1: the name of the level-1 field.

    • field2: the name of the level-2 field.

    The value of the corresponding field in the document

    • e_set(`user_id`, bson_value("id"))

    • e_set(`user_name`, bson_value("person","name"))

  • Time functions

    Feature

    Syntax

    Value range

    Return value

    Example

    Current system time

    dt_now()

    None

    DATETIME, accurate to the second

    dts_now()

    dt_now_millis()

    None

    DATETIME, accurate to the millisecond

    dt_now_millis()

    Convert a UTC timestamp (in seconds) to DATETIME

    dt_fromtimestamp(value,[timezone])

    • value: integer

    • timezone: the time zone. This is an optional parameter.

    DATETIME, accurate to the second

    dt_fromtimestamp(1626837629)

    dt_fromtimestamp(1626837629,'GMT+08')

    Convert a UTC timestamp (in milliseconds) to DATETIME

    dt_fromtimestamp_millis(value,[timezone])

    • value: integer

    • timezone: the time zone. This is an optional parameter.

    DATETIME, accurate to the millisecond

    dt_fromtimestamp_millis(1626837629123);

    dt_fromtimestamp_millis(1626837629123,'GMT+08')

    Convert DATETIME to a UTC timestamp (in seconds)

    dt_parsetimestamp(value,[timezone])

    • value: DATETIME

    • timezone: the time zone. This is an optional parameter.

    Integer

    dt_parsetimestamp(`datetime_col`)

    dt_parsetimestamp(`datetime_col`,'GMT+08')

    Convert DATETIME to a UTC timestamp (in milliseconds)

    dt_parsetimestamp_millis(value,[timezone])

    • value: DATETIME

    • timezone: the time zone. This is an optional parameter.

    Integer

    dt_parsetimestamp_millis(`datetime_col`)

    dt_parsetimestamp_millis(`datetime_col`,'GMT+08')

    Convert DATETIME to a string

    dt_str(value, format)

    • value: DATETIME

    • format: a string in the yyyy-MM-dd HH:mm:ss format

    String

    dt_str(`col1`, 'yyyy-MM-dd HH:mm:ss')

    Convert a string to DATETIME

    dt_strptime(value,format)

    • value: string

    • format: a string in the yyyy-MM-dd HH:mm:ss format

    DATETIME

    dt_strptime('2021-07-21 03:20:29', 'yyyy-MM-dd hh:mm:ss')

    Modify the time by adding or subtracting a value from the year, month, day, hour, minute, or second

    dt_add(value, [years=intVal],

    [months=intVal],

    [days=intVal],

    [hours=intVal],

    [minutes=intVal]

    )

    • value: DATETIME

    • intVal: integer

      Note

      A minus sign (-) indicates subtraction.

    DATETIME

    • dt_add(datetime_col,years=-1)

    • dt_add(datetime_col,years=1,months=1)

  • Conditional expressions

    Feature

    Syntax

    Value range

    Return value

    Example

    Similar to the ternary operator (? :) in C. Returns a value based on a condition.

    (cond ? val_1 : val_2)

    • cond: a boolean field or expression

    • val_1: return value 1

    • val_2: return value 2

      Note

      val_1 and val_2 must be of the same type.

    If cond is true, val_1 is returned. Otherwise, val_2 is returned.

    (id>1000? 1 : 0)

  • JSON functions

    Note

    The `value` type represents the type of any field in the database.

    Feature

    Syntax

    Value range

    Return value

    Example

    Convert a JSON array string to a set collection

    json_array(arrayText)

    Note

    This function can be used only in expressions that return a boolean value.

    arrayText: A string that specifies the JSON array string to be converted.

    A set collection

    op_in(`id`,json_array('["0","1","2","3"]')). The set collection ["0","1","2","3"] is returned.

    Create a JSON array that contains specified data

    json_array2(item...)

    item...: The data for the JSON array. The data type is value.

    A JSON array

    json_array2("0","1","2","3"). ["0","1","2","3"] is returned.

    Create a JSON object that contains specified data

    json_object(item...)

    item...: The data (key-value pairs) for the JSON object. It consists of a key name (string) and a key value (value type), separated by a comma (,).

    JSON

    json_object('name','ZhangSan','age',32, 'loginId',100). {"name":"ZhangSan","age":32,"loginId":100} is returned.

    Insert data into a specified position in a JSON object (array)

    json_array_insert(json, kvPairs...)

    • json: A string that specifies the JSON object to be processed.

    • kvPairs...: The data to be inserted. It consists of an array position (jsonPath, string type) and specific data (value type), separated by a comma (,).

    JSON

    Note
    • If the specified position does not exist, the original JSON object is returned.

    • If the element at the specified position does not exist, the data is added to the end of the destination array.

    json_array_insert('{"Address":["City",1]}','$.Address[3]',100). {"Address":["City",1,100]} is returned.

    Insert data into a specified position in a JSON object

    json_insert(json, kvPairs...)

    • json: A string that specifies the JSON object to be processed.

    • kvPairs...: The data to be inserted. It consists of a position (jsonPath, string type) and specific data (value type), separated by a comma (,).

    JSON

    Note
    • If the specified position exists, the original JSON object is returned.

    • If the specified position does not exist, the data is added to the JSON object.

    json_insert('{"Address":["City","Xian","Number",1]}','$.ID',100). {"Address":["City","Xian","Number",1],"ID":100} is returned.

    Insert or update data at a specified position in a JSON object

    json_set(json, kvPairs...)

    • json: A string that specifies the JSON object to be processed.

    • kvPairs...: The data to be inserted. It consists of a position (jsonPath, string type) and specific data (value type), separated by a comma (,).

    value type

    Note
    • If the specified position exists, the data at that position is updated.

    • If the specified position does not exist, the data is added to the JSON object.

    json_set('{"ID":1,"Address":["City","Xian","Number",1]}',"$.IP",100). {"ID":1,"Address":["City","Xian","Number",1], "IP":100} is returned.

    Insert or update data (a key-value pair) in a JSON object

    json_put(json, key, value)

    • json: A string that specifies the JSON object to be processed.

    • key: A string that specifies the key name to be inserted.

    • value: The key value to be inserted. The data type is value.

    JSON

    Note
    • If the json parameter is not a JSON object, null is returned.

    • If the specified key name exists, the corresponding key value is updated.

    • If the specified key name does not exist, the data is added to the JSON object.

    json_put('{"loginId":100}','loginTime','2024-10-10'). {"loginId":100, 'loginTime':'2024-10-10'} is returned.

    Replace data at a specified position in a JSON object

    json_replace(json, kvPairs...)

    • json: A string that specifies the JSON object to be processed.

    • kvPairs...: The data to be replaced. It consists of a position (jsonPath, string type) and specific data (value type), separated by a comma (,).

    value type

    Note

    If the specified position does not exist, the original JSON object is returned.

    json_replace('{"ID":1,"Address":["City","Xian","Number",1]}',"$.IP",100). {"ID":1,"Address":["City","Xian","Number",1]} is returned.

    Check whether specified data exists at a specified position in a JSON object

    json_contains(json, jsonPath, item)

    • json: A string that specifies the JSON object to be processed.

    • jsonPath: A string that specifies the position in the JSON object.

    • item: The specific data to query. The data type is value.

    boolean: true or false

    json_contains('{"ID":1,"Address":["City","Xian","Number",1]}','$.ID',1). true is returned.

    Check whether a specified position exists in a JSON object

    json_contains_path(json, jsonPath)

    • json: A string that specifies the JSON object to be processed.

    • jsonPath: A string that specifies the position information to query.

    boolean: true or false

    json_contains_path('{"ID":1,"Address":["City","Xian","Number",1]}','$.ID'). true is returned.

    Retrieve data from a specified position in a JSON object

    json_extract(json, jsonPath)

    • json: A string that specifies the JSON object to be processed.

    • jsonPath: A string that specifies the position in the JSON object.

    value type

    json_extract('{"ID":1,"Address":["City","Xian","Number",1]}','$.ID'). 1 is returned.

    Retrieve the value of a specified key in a JSON object

    json_get(json, key)

    • json: A string that specifies the JSON object to be processed.

    • key: A string that specifies the key name of the data to retrieve.

    value type

    json_get('{"ID":1,"Address":["City","Xian","Number",1]}','ID'). 1 is returned.

    Retrieve all keys at a specified position in a JSON object

    json_keys(json, jsonPath)

    • json: A string that specifies the JSON object to be processed.

    • jsonPath: A string that specifies the position in the JSON object.

    A JSON array

    json_keys('{"ID":1,"Address":["City","Xian","Number",1]}','$'). ["ID","Address"] is returned.

    Query the length (number of keys) at a specified position in a JSON object

    json_length(json, jsonPath)

    • json: A string that specifies the JSON object to be processed.

    • jsonPath: A string that specifies the position in the JSON object.

      Note

      If the value of jsonPath is "$", it is equivalent to json_length(json).

    Integer

    json_length('{"ID":1,"Address":["City","Xian","Number",1]}','$'). 2 is returned.

    Query the length (number of keys) of the root node of a JSON object

    json_length(json)

    json: A string that specifies the JSON object to be processed.

    Integer

    json_length('{"ID":1,"Address":["City","Xian","Number",1]}'). 2 is returned.

    Convert a JSON string to a JSON object

    json_parse(json)

    json: A string that specifies the JSON object to be processed.

    value type

    json_parse('{"ID":1,"Address":["City","Xian","Number",1]}'). {"ID":1,"Address":["City","Xian","Number",1]} is returned.

    Remove data from a specified position in a JSON object

    json_remove(json, jsonPath)

    • json: A string that specifies the JSON object to be processed.

    • jsonPath: A string that specifies the position in the JSON object.

    JSON

    json_remove('{"loginId":100, 'loginTime':'2024-10-10'}','$.loginTime'). {"loginId":100} is returned.

Global functions

  • Flow control functions

    Feature

    Syntax

    Parameter description

    Example

    if statement

    e_if(bool_expr, func_invoke)

    • bool_expr: a boolean constant or function invocation. Constant: true or false. Function invocation: op_gt(`id`, 10).

    • func_invoke: a function invocation. e_drop,e_keep,e_set,e_if,e_compose

    e_if(op_gt(`id`, 10), e_drop()). Drops the record if the ID is greater than 10.

    if else statement

    e_if_else(bool_expr, func_invoke1, func_invoke2)

    • bool_expr: a boolean constant or function invocation. Constant: true or false. Function invocation: op_gt(`id`, 10).

    • func_invoke1: a function invocation. Executed if the condition is true.

    • func_invoke2: a function invocation. Executed if the condition is false.

    e_if_else(op_gt(`id`, 10), e_set(`tag`, 'large'), e_set(`tag`, 'small')). If the ID is greater than 10, sets the tag column to "large". Otherwise, sets it to "small".

    A switch-like statement that evaluates multiple conditions. It executes the operation for the first matching condition. If no conditions match, it executes the default operation.

    s_switch(condition1, func1, condition2, func2, ..., default = default_func)

    • condition1: A Boolean constant or a function invocation. Constant: true or false. Function invocation: op_gt(`id`, 10).

    • func_invoke: A function invocation. If condition1 is true, this function is executed and the switch statement is exited. If condition1 is false, the check proceeds to the next condition.

    • default_func: A function invocation. This function is executed if all previous conditions are false.

    e_switch(op_gt(`id`, 100), e_set(`str_col`, '>100'), op_gt(`id`, 90), e_set(`str_col`, '>90'), default=e_set(`str_col`, '<=90')).

    Combine multiple operations

    e_compose(func1, func2, func3, ...)

    • func1: a function invocation. Can be e_set, e_drop, or e_if.

    • func2: a function invocation. Can be e_set, e_drop, or e_if.

    e_compose(e_set(`str_col`, 'test'), e_set(`dt_col`, dt_now())). Sets the value of the str_col column to test and sets the value of the dt_col column to the current time.

  • Data manipulation functions

    Feature

    Syntax

    Parameter description

    Example

    Drop this data record and do not sync it

    e_drop()

    None

    e_if(op_gt(`id`, 10), e_drop()). Drops records where the ID is greater than 10.

    Keep this data record and sync it to the destination

    e_keep(condition)

    condition: a boolean expression

    e_keep(op_gt(id, 1)). Syncs only data where the ID is greater than 1.

    Set a column value

    e_set(`col`, val, NEW)

    • col: column name

    • val: a constant or function invocation. The type must match the type of col.

    • NEW: converts the data type of the col column to the data type of val. This is an optional field.

      Important

      If you do not pass NEW, do not pass the comma (,) before it. Also, ensure data type compatibility. Otherwise, the task may report an error.

    • e_set(`dt_col`, dt_now()). Sets dt_col to the current time.

    • e_set(`col1`, `col2` + 1). Sets col1 to col2+1.

    • e_set(`col1`, 1, NEW). Converts the col1 column to a numeric type and sets its value to 1.

    MongoDB feature to keep fields, drop fields, and map field names

    e_expand_bson_value('*', 'fieldA',{"fieldB":"fieldC"})

    • *: the name of the field to keep. An asterisk (*) indicates all fields.

    • fieldA: the name of the field to drop.

    • {"fieldB":"fieldC"}: the field name mapping. fieldB is the source field name, and fieldC is the destination field name.

      Note

      Field name mapping is an optional expression.

    e_expand_bson_value("*", "_id,name"). Writes all fields except for the _id and name fields to the destination.