All Products
Search
Document Center

Data Transmission Service:Configure ETL in DTS tasks

Last Updated:Mar 30, 2026

When you replicate data with Data Transmission Service (DTS), the source data often needs transformation before it reaches the destination—filtering out rows that don't belong, masking sensitive values, or enriching each record with metadata such as the time it changed. DTS includes a streaming extract, transform, and load (ETL) feature that runs this processing logic in-flight, between the replication engine and the destination database. ETL logic is expressed in a domain-specific language (DSL) designed for data synchronization scenarios.

Common use cases:

  • Data filtering: Drop records that don't meet sync criteria, such as records with IDs above a threshold or orders older than a cutoff date.

  • Data masking: Obscure sensitive values, such as replacing the last four digits of a phone number with asterisks.

  • Change tracking: Stamp each synchronized row with the commit timestamp so the destination table records when data changed.

  • Audit logging: Write the operation type (INSERT, UPDATE, DELETE) and change time into dedicated columns for compliance or debugging.

How ETL works

ETL runs as a processing step between the DTS replication engine and the destination database. Each incoming data record passes through your DSL script, which can drop the record, modify column values, or add new columns before the record is written to the destination.

The DSL is based on the data processing syntax of Simple Log Service (SLS). It supports conditional logic, string manipulation, date/time operations, numeric arithmetic, and JSON processing. It does not support event-splitting functions. The DSL has the following attributes:

  • Powerful: Provides many functions and supports function composition.

  • Relatively simple syntax: Provides examples for typical scenarios, such as data filtering, data transformation, and data masking.

  • High performance: Based on code generation technology, it has a minimal performance impact on the synchronization process.

Column names in DSL scripts are enclosed in backticks (` `), not single quotation marks ('`).

Supported databases

ETL is available for the following source and destination database combinations.

Source database Destination database
SQL Server AnalyticDB for MySQL 3.0, SQL Server, MySQL, PolarDB for MySQL
MySQL AnalyticDB for MySQL 3.0, AnalyticDB for PostgreSQL, Kafka, ApsaraDB for ClickHouse cluster, MySQL, PolarDB for MySQL, Elasticsearch, Redis
Self-managed Oracle AnalyticDB for MySQL 3.0, AnalyticDB for PostgreSQL, Kafka, MaxCompute, PolarDB-X 2.0, PolarDB for PostgreSQL (Compatible with Oracle)
PolarDB for MySQL
  • AnalyticDB for MySQL 3.0

  • MySQL

  • PolarDB for MySQL

PolarDB for PostgreSQL (Compatible with Oracle) AnalyticDB for MySQL 3.0, PolarDB for PostgreSQL (Compatible with Oracle)
PolarDB-X 1.0 Kafka, Tablestore
PolarDB-X 2.0
  • PolarDB-X 2.0

  • AnalyticDB for MySQL 3.0

  • MySQL

  • PolarDB for MySQL

Self-managed Db2 for LUW MySQL
Self-managed Db2 for i MySQL
PolarDB for PostgreSQL PolarDB for PostgreSQL, PostgreSQL
PostgreSQL PolarDB for PostgreSQL, PostgreSQL
TiDB PolarDB for MySQL, MySQL, AnalyticDB for MySQL 3.0
MongoDB Lindorm

Configure ETL when creating a sync task

This topic uses a sync task as an example. The configuration steps for a migration task are similar.

Usage notes

Constraint Consequence if violated
DSL scripts are case-sensitive. Database, table, and field names must match the source exactly. The script silently fails to match records, or the task reports an error.
All fields referenced in the DSL script must exist in the source database and must not be filtered out by filter conditions. The task fails with a field-not-found error.
DSL scripts support only a single top-level expression. Use e_compose to combine multiple operations. The task rejects the script at validation time.
If a DSL script adds a new column, add that column to the destination table manually before starting the task. For example, if the script uses e_set(new_column, dt_now()), add new_column to the destination table first. The ETL script has no effect because the destination column does not exist.
After DSL processing, DML changes for all tables must produce consistent column information. For example, if e_set adds a column, INSERT, UPDATE, and DELETE operations must all result in that column being added. The task may fail due to schema inconsistency between operation types.
DSL scripts are for data transformation and cleansing only. Database object creation (such as CREATE TABLE) is not supported and will fail.

Procedure

  1. Create a sync task. For details, see Sync solutions.

  2. In the Advanced Configurations step, set Configure ETL to Yes.

  3. In the text box, enter your DSL script based on the data processing DSL syntax.

    To filter records where id is greater than 3, use e_if(op_gt(id, 3), e_drop()). This drops any record where the id value exceeds 3, so it is not synchronized to the destination.

    image

  4. Complete the remaining steps as needed.

Modify ETL on an existing sync task

To update the ETL configuration of an existing sync task, choose one of the following:

  • If Configure ETL was set to No, change it to Yes and enter a DSL script.

  • If Configure ETL was set to Yes, modify the DSL script or set Configure ETL back to No.

Important
  • Before modifying a DSL script, move the sync objects from the Selected Objects list back to the Source Objects list, then add them again. Skipping this step may leave the task in an inconsistent state.

  • ETL configuration cannot be modified for migration tasks. Only sync tasks support this operation.

Usage notes

Constraint Consequence if violated
Modifying the ETL configuration may interrupt the running task. Data synchronization pauses during the update. Proceed during a maintenance window.
Changes take effect only on incremental data after the task restarts. Historical data already synchronized is not reprocessed. Records synchronized before the modification are not affected—intentional by design.
Destination table schema changes are not supported during ETL modification. To change the schema, update the destination table before starting the sync task.
DSL script constraints (case sensitivity, field existence, single expression, new columns) are the same as for new tasks. Same consequences as described in the new task usage notes above.

Procedure

  1. Log on to the Data Synchronization Tasks page of the new DTS console.

  2. In the row for the target sync task, click the 点点点 icon and select Modify ETL Configurations.

  3. In the Advanced Configurations step, set Configure ETL to Yes.

  4. In the text box, enter the updated DSL script.

    To filter records where id is greater than 3, use e_if(op_gt(id, 3), e_drop()). This drops any record where the id value exceeds 3, so it is not synchronized to the destination.

    image

  5. Complete the remaining steps as needed.

ETL script examples

The following examples cover the most common scenarios. Each uses DSL functions described in the DSL syntax reference section.

Data filtering

Use data filtering to sync only a subset of records—for example, to replicate recent orders to an analytics database, exclude test data from a production sync, or enforce region-specific data residency rules by dropping records that don't belong.

  • Drop records where id > 10000:

    e_if(op_gt(`id`, 10000), e_drop())
  • Drop records where name contains "hangzhou":

    e_if(str_contains(`name`, "hangzhou"), e_drop())
  • Drop orders placed before a cutoff date:

    e_if(op_lt(`order_timestamp`, "2015-02-23 23:54:55"), e_drop())
  • Drop records matching multiple conditions (AND):

    e_if(op_and(str_contains(`name`, "hangzhou"), op_gt(`id`, 1000)), e_drop())
  • Drop records matching either condition (OR):

    e_if(op_or(str_contains(`name`, "hangzhou"), op_gt(`id`, 1000)), e_drop())

Data masking

Use data masking when the destination database is accessed by a broader audience than the source—for example, when syncing production data to a reporting system or sharing data with a third party. Masking lets you protect personally identifiable information (PII) while preserving data for analytics.

Replace the last four digits of a phone number (positions 7–10) with asterisks:

e_set(`phone`, str_mask(`phone`, 7, 10, '*'))

Change tracking

Use change tracking when downstream consumers need to know when a row was last modified—for example, to implement incremental processing pipelines, detect stale records, or audit data freshness without querying the source database.

Important

Before starting the task, add the dts_sync_time column to the destination table.

Add a dts_sync_time column to all tables for INSERT, UPDATE, and DELETE operations:

e_if(op_or(op_or(
        op_eq(__OPERATION__, __OP_INSERT__),
        op_eq(__OPERATION__, __OP_UPDATE__)),
        op_eq(__OPERATION__, __OP_DELETE__)),
    e_set(dts_sync_time, __COMMIT_TIMESTAMP__))

To apply this only to a specific table (for example, dts_test_table):

e_if(op_and(
      op_eq(__TB__,'dts_test_table'),
      op_or(op_or(
        op_eq(__OPERATION__,__OP_INSERT__),
        op_eq(__OPERATION__,__OP_UPDATE__)),
        op_eq(__OPERATION__,__OP_DELETE__))),
      e_set(dts_sync_time,__COMMIT_TIMESTAMP__))

Audit logging

Use audit logging when you need a full history of data changes in the destination table—for example, to meet compliance requirements, support rollback scenarios, or build a change log without a separate audit system. This pattern writes every incoming change as an INSERT, so the destination table accumulates a complete record of all modifications.

Important

Before starting the task, add the operation_type and updated columns to the destination table.

Write the DML operation type and commit timestamp into the destination row, then convert all operations to INSERT:

e_compose(
    e_switch(
        op_eq(__OPERATION__,__OP_DELETE__), e_set(operation_type, 'DELETE'),
        op_eq(__OPERATION__,__OP_UPDATE__), e_set(operation_type, 'UPDATE'),
        op_eq(__OPERATION__,__OP_INSERT__), e_set(operation_type, 'INSERT')),
    e_set(updated, __COMMIT_TIMESTAMP__),
    e_set(__OPERATION__,__OP_INSERT__)
)

Distinguish full and incremental data

Use this pattern when the destination table receives both full migration data and incremental changes, and downstream consumers need to tell them apart—for example, to skip reprocessing historical rows in a streaming pipeline, or to validate that the initial load completed before applying incremental logic.

During a full migration, __COMMIT_TIMESTAMP__ is 0 (1970-01-01 08:00:00, adjusted for the local time zone). During incremental migration, it holds the actual log commit time. The script below writes True or False into an is_increment_dml column:

e_if_else(__COMMIT_TIMESTAMP__ > DATETIME('2000-01-01 00:00:00'),
    e_set(`is_increment_dml`, True),
    e_set(`is_increment_dml`, False)
)

DSL syntax reference

Built-in variables

The following built-in variables are available in all DSL scripts.

Variable Description Data type Example value
__TB__ Table name string table
__DB__ Database name string mydb
__OPERATION__ DML operation type string __OP_INSERT__, __OP_UPDATE__, __OP_DELETE__
__BEFORE__ Before image of an UPDATE (value before the change). A DELETE operation has only a before image. Special mark v(column_name,__BEFORE__)
__AFTER__ After image of an UPDATE (value after the change). An INSERT operation has only an after image. Special mark v(column_name,__AFTER__)
__COMMIT_TIMESTAMP__ Transaction commit time datetime '2021-01-01 10:10:01'
` column ` Value of the specified column in the current record string ` id , name `

Constants

Type Example
int 123
float 123.4
string "hello1_world"
boolean true or false
datetime DATETIME('2021-01-01 10:10:01')

Expression functions

Numeric operations

Operation Syntax Parameters Return value Example
Addition op_sum(value1, value2) integer or float Integer if both inputs are integers; otherwise float op_sum(col1, 1.0)
Subtraction op_sub(value1, value2) integer or float Integer if both inputs are integers; otherwise float op_sub(col1, 1.0)
Multiplication op_mul(value1, value2) integer or float Integer if both inputs are integers; otherwise float op_mul(col1, 1.0)
Division op_div_true(value1, value2) integer or float Integer if both inputs are integers; otherwise float op_div_true(col1, 2.0) — if col1=15, returns 7.5
Modulo op_mod(value1, value2) integer or float Integer if both inputs are integers; otherwise float op_mod(col1, 10) — if col1=23, returns 3

Logical operations

Operation Syntax Parameters Return value Example
Equals op_eq(value1, value2) integer, float, or string boolean op_eq(col1, 23)
Greater than op_gt(value1, value2) integer, float, or string boolean op_gt(col1, 1.0)
Less than op_lt(value1, value2) integer, float, or string boolean op_lt(col1, 1.0)
Greater than or equal to op_ge(value1, value2) integer, float, or string boolean op_ge(col1, 1.0)
Less than or equal to op_le(value1, value2) integer, float, or string boolean op_le(col1, 1.0)
AND op_and(value1, value2) boolean boolean op_and(is_male, is_student)
OR op_or(value1, value2) boolean boolean op_or(is_male, is_student)
IN op_in(value, json_array) any type; JSON array string boolean op_in(id,json_array('["0","1","2","3"]'))
Is null op_is_null(value) any type boolean op_is_null(name)
Is not null op_is_not_null(value) any type boolean op_is_not_null(name)

String functions

Operation Syntax Parameters Return value Example
Concatenate strings op_add(str_1, str_2, ..., str_n) strings Concatenated string op_add(col,'hangzhou','dts')
Format and concatenate str_format(format, value1, value2, ...) format: string with {} placeholders; values: any Formatted string str_format("part1: {}, part2: {}", col1, col2) — if col1="ab" and col2="12", returns "part1: ab, part2: 12"
Replace substring str_replace(original, oldStr, newStr, count) count: max replacements; -1 replaces all String after replacement str_replace(name, "a", 'b', -1) — if name="aba", returns "bbb"
Replace in all string-type fields tail_replace_string_field(search, replace, all) all: must be true String after replacement tail_replace_string_field('\u000f','',true) — replaces \u000f with an empty string in all varchar, text, and char fields
Strip characters from both ends str_strip(string_val, charSet) charSet: characters to remove Stripped string str_strip(name, 'ab') — if name="axbzb", returns "xbz"
Lowercase str_lower(value) string Lowercase string str_lower(str_col)
Uppercase str_upper(value) string Uppercase string str_upper(str_col)
String to integer cast_string_to_long(value) string Integer cast_string_to_long(col)
Integer to string cast_long_to_string(value) integer String cast_long_to_string(col)
Count substring occurrences str_count(str, pattern) string; substring Integer str_count(str_col, 'abc') — if str_col="zabcyabcz", returns 2
Find substring position str_find(str, pattern) string; substring Position of the first match; -1 if not found str_find(str_col, 'abc') — if str_col="xabcy", returns 1
Check if alphabetic str_isalpha(str) string boolean str_isalpha(str_col)
Check if numeric str_isdigit(str) string boolean str_isdigit(str_col)
Regular expression match regex_match(str, regex) string; regex string boolean regex_match(__TB__,'user_\\\d+')
Mask a substring str_mask(str, start, end, maskStr) start min: 0; end max: length−1; maskStr: single character Masked string str_mask(phone, 7, 10, '#')
Substring after a marker substring_after(str, cond) string; marker string String after the marker (marker not included) substring_after(col, 'abc')
Substring before a marker substring_before(str, cond) string; marker string String before the marker (marker not included) substring_before(col, 'efg')
Substring between two markers substring_between(str, cond1, cond2) string; two marker strings String between the markers (markers not included) substring_between(col, 'abc','efg')
Check if value is a string is_string_value(value) string or column name boolean is_string_value(col1)
Get a MongoDB document field bson_value("field1", "field2", ...) field names at each level Field value e_set(user_name, bson_value("person","name"))

Time functions

Operation Syntax Parameters Return value Example
Current system time (second precision) dt_now() None datetime dt_now()
Current system time (millisecond precision) dt_now_millis() None datetime dt_now_millis()
UTC timestamp (seconds) to datetime dt_fromtimestamp(value, [timezone]) integer; optional timezone datetime (second precision) dt_fromtimestamp(1626837629,'GMT+08')
UTC timestamp (milliseconds) to datetime dt_fromtimestamp_millis(value, [timezone]) integer; optional timezone datetime (millisecond precision) dt_fromtimestamp_millis(1626837629123,'GMT+08')
Datetime to UTC timestamp (seconds) dt_parsetimestamp(value, [timezone]) datetime; optional timezone Integer dt_parsetimestamp(datetime_col,'GMT+08')
Datetime to UTC timestamp (milliseconds) dt_parsetimestamp_millis(value, [timezone]) datetime; optional timezone Integer dt_parsetimestamp_millis(datetime_col,'GMT+08')
Datetime to string dt_str(value, format) datetime; format string String dt_str(col1, 'yyyy-MM-dd HH:mm:ss')
String to datetime dt_strptime(value, format) string; format string datetime dt_strptime('2021-07-21 03:20:29', 'yyyy-MM-dd hh:mm:ss')
Add or subtract time units dt_add(value, [years=n], [months=n], [days=n], [hours=n], [minutes=n]) datetime; integer offsets (negative for subtraction) datetime dt_add(datetime_col, years=-1)

Conditional expressions

Syntax Description Example
(cond ? val_1 : val_2) Ternary operator. Returns val_1 if cond is true; otherwise val_2. val_1 and val_2 must be the same type. (id>1000 ? 1 : 0)

JSON functions

The value type represents the type of any field in the database.
Operation Syntax Parameters Return value Example
Convert JSON array string to a set json_array(arrayText) — usable only in boolean expressions arrayText: JSON array string Set collection op_in(id,json_array('["0","1","2","3"]'))
Create a JSON array json_array2(item...) items of any type JSON array json_array2("0","1","2","3") returns ["0","1","2","3"]
Create a JSON object json_object(item...) key-value pairs (key is string, value is any type) JSON json_object('name','ZhangSan','age',32,'loginId',100) returns {"name":"ZhangSan","age":32,"loginId":100}
Insert into a JSON array at a position json_array_insert(json, kvPairs...) JSONPath position; data to insert. If the position doesn't exist, returns the original JSON. If the element doesn't exist, appends to the array. JSON json_array_insert('{"Address":["City",1]}','$.Address[3]',100) returns {"Address":["City",1,100]}
Insert into a JSON object (only if absent) json_insert(json, kvPairs...) JSONPath position; data to insert. If the position exists, returns the original JSON. JSON json_insert('{"Address":["City","Xian","Number",1]}','$.ID',100) returns {"Address":["City","Xian","Number",1],"ID":100}
Insert or update at a JSON path json_set(json, kvPairs...) JSONPath position; data. Updates if position exists; inserts if absent. value type json_set('{"ID":1,"Address":["City","Xian","Number",1]}',"$.IP",100) returns {"ID":1,"Address":["City","Xian","Number",1],"IP":100}
Insert or update a key-value pair json_put(json, key, value) JSON object; key string; value. Returns null if json is not a JSON object. JSON json_put('{"loginId":100}','loginTime','2024-10-10') returns {"loginId":100,"loginTime":"2024-10-10"}
Replace at a JSON path json_replace(json, kvPairs...) JSONPath position; replacement data. If position doesn't exist, returns the original JSON. value type json_replace('{"ID":1,"Address":["City","Xian","Number",1]}',"$.IP",100) returns the original JSON (position absent)
Check if a value exists at a JSON path json_contains(json, jsonPath, item) JSON object; JSONPath; data to check boolean json_contains('{"ID":1,"Address":["City","Xian","Number",1]}','$.ID',1) returns true
Check if a JSON path exists json_contains_path(json, jsonPath) JSON object; JSONPath boolean json_contains_path('{"ID":1,"Address":["City","Xian","Number",1]}','$.ID') returns true
Get value at a JSON path json_extract(json, jsonPath) JSON object; JSONPath value type json_extract('{"ID":1,"Address":["City","Xian","Number",1]}','$.ID') returns 1
Get value by key name json_get(json, key) JSON object; key string value type json_get('{"ID":1,"Address":["City","Xian","Number",1]}','ID') returns 1
Get all keys at a JSON path json_keys(json, jsonPath) JSON object; JSONPath JSON array json_keys('{"ID":1,"Address":["City","Xian","Number",1]}','$') returns ["ID","Address"]
Get key count at a JSON path json_length(json, jsonPath) JSON object; JSONPath ("$" is equivalent to json_length(json)) Integer json_length('{"ID":1,"Address":["City","Xian","Number",1]}','$') returns 2
Get root key count json_length(json) JSON object Integer json_length('{"ID":1,"Address":["City","Xian","Number",1]}') returns 2
Parse a JSON string json_parse(json) JSON string value type json_parse('{"ID":1}') returns {"ID":1}
Remove a value at a JSON path json_remove(json, jsonPath) JSON object; JSONPath JSON json_remove('{"loginId":100,"loginTime":"2024-10-10"}','$.loginTime') returns {"loginId":100}

Global functions

Flow control functions

Function Syntax Description Example
If e_if(bool_expr, func_invoke) Runs func_invoke if bool_expr is true. e_if(op_gt(id, 10), e_drop()) — drops records where id > 10
If-else e_if_else(bool_expr, func_invoke1, func_invoke2) Runs func_invoke1 if true; runs func_invoke2 if false. e_if_else(op_gt(id, 10), e_set(tag, 'large'), e_set(tag, 'small'))
Switch e_switch(condition1, func1, condition2, func2, ..., default=default_func) Evaluates conditions in order and runs the first match. Runs default_func if none match. e_switch(op_gt(id, 100), e_set(str_col, '>100'), op_gt(id, 90), e_set(str_col, '>90'), default=e_set(str_col, '<=90'))
Compose e_compose(func1, func2, func3, ...) Runs multiple functions in sequence as a single expression. e_compose(e_set(str_col, 'test'), e_set(dt_col, dt_now())) — sets two columns in one expression

Data manipulation functions

Function Syntax Description Example
Drop record e_drop() Drops the current record. It is not written to the destination. e_if(op_gt(id, 10), e_drop())
Keep record e_keep(condition) Keeps the record only if condition is true. e_keep(op_gt(id, 1)) — syncs only records where id > 1
Set column value e_set(col, val, NEW) Sets col to val. Pass NEW (without a preceding comma) to convert the column to the data type of val. Omit NEW and ensure type compatibility; a mismatch may cause the task to fail. e_set(col1, 1, NEW) — converts col1 to a numeric type and sets it to 1
MongoDB field mapping e_expand_bson_value('*', 'fieldA', {"fieldB":"fieldC"}) Selects fields to keep (* = all), drops fieldA, and renames fieldB to fieldC. Field name mapping is optional. e_expand_bson_value("*", "_id,name") — writes all fields except _id and name to the destination

What's next