When you replicate data with Data Transmission Service (DTS), the source data often needs transformation before it reaches the destination—filtering out rows that don't belong, masking sensitive values, or enriching each record with metadata such as the time it changed. DTS includes a streaming extract, transform, and load (ETL) feature that runs this processing logic in-flight, between the replication engine and the destination database. ETL logic is expressed in a domain-specific language (DSL) designed for data synchronization scenarios.
Common use cases:
-
Data filtering: Drop records that don't meet sync criteria, such as records with IDs above a threshold or orders older than a cutoff date.
-
Data masking: Obscure sensitive values, such as replacing the last four digits of a phone number with asterisks.
-
Change tracking: Stamp each synchronized row with the commit timestamp so the destination table records when data changed.
-
Audit logging: Write the operation type (INSERT, UPDATE, DELETE) and change time into dedicated columns for compliance or debugging.
How ETL works
ETL runs as a processing step between the DTS replication engine and the destination database. Each incoming data record passes through your DSL script, which can drop the record, modify column values, or add new columns before the record is written to the destination.
The DSL is based on the data processing syntax of Simple Log Service (SLS). It supports conditional logic, string manipulation, date/time operations, numeric arithmetic, and JSON processing. It does not support event-splitting functions. The DSL has the following attributes:
-
Powerful: Provides many functions and supports function composition.
-
Relatively simple syntax: Provides examples for typical scenarios, such as data filtering, data transformation, and data masking.
-
High performance: Based on code generation technology, it has a minimal performance impact on the synchronization process.
Column names in DSL scripts are enclosed in backticks (``), not single quotation marks ('`).
Supported databases
ETL is available for the following source and destination database combinations.
| Source database | Destination database |
|---|---|
| SQL Server | AnalyticDB for MySQL 3.0, SQL Server, MySQL, PolarDB for MySQL |
| MySQL | AnalyticDB for MySQL 3.0, AnalyticDB for PostgreSQL, Kafka, ApsaraDB for ClickHouse cluster, MySQL, PolarDB for MySQL, Elasticsearch, Redis |
| Self-managed Oracle | AnalyticDB for MySQL 3.0, AnalyticDB for PostgreSQL, Kafka, MaxCompute, PolarDB-X 2.0, PolarDB for PostgreSQL (Compatible with Oracle) |
| PolarDB for MySQL |
|
| PolarDB for PostgreSQL (Compatible with Oracle) | AnalyticDB for MySQL 3.0, PolarDB for PostgreSQL (Compatible with Oracle) |
| PolarDB-X 1.0 | Kafka, Tablestore |
| PolarDB-X 2.0 |
|
| Self-managed Db2 for LUW | MySQL |
| Self-managed Db2 for i | MySQL |
| PolarDB for PostgreSQL | PolarDB for PostgreSQL, PostgreSQL |
| PostgreSQL | PolarDB for PostgreSQL, PostgreSQL |
| TiDB | PolarDB for MySQL, MySQL, AnalyticDB for MySQL 3.0 |
| MongoDB | Lindorm |
Configure ETL when creating a sync task
This topic uses a sync task as an example. The configuration steps for a migration task are similar.
Usage notes
| Constraint | Consequence if violated |
|---|---|
| DSL scripts are case-sensitive. Database, table, and field names must match the source exactly. | The script silently fails to match records, or the task reports an error. |
| All fields referenced in the DSL script must exist in the source database and must not be filtered out by filter conditions. | The task fails with a field-not-found error. |
DSL scripts support only a single top-level expression. Use e_compose to combine multiple operations. |
The task rejects the script at validation time. |
If a DSL script adds a new column, add that column to the destination table manually before starting the task. For example, if the script uses e_set(new_column, dt_now()), add new_column to the destination table first. |
The ETL script has no effect because the destination column does not exist. |
After DSL processing, DML changes for all tables must produce consistent column information. For example, if e_set adds a column, INSERT, UPDATE, and DELETE operations must all result in that column being added. |
The task may fail due to schema inconsistency between operation types. |
| DSL scripts are for data transformation and cleansing only. | Database object creation (such as CREATE TABLE) is not supported and will fail. |
Procedure
-
Create a sync task. For details, see Sync solutions.
-
In the Advanced Configurations step, set Configure ETL to Yes.
-
In the text box, enter your DSL script based on the data processing DSL syntax.
To filter records where
idis greater than 3, usee_if(op_gt(id, 3), e_drop()). This drops any record where theidvalue exceeds 3, so it is not synchronized to the destination.
-
Complete the remaining steps as needed.
Modify ETL on an existing sync task
To update the ETL configuration of an existing sync task, choose one of the following:
-
If Configure ETL was set to No, change it to Yes and enter a DSL script.
-
If Configure ETL was set to Yes, modify the DSL script or set Configure ETL back to No.
-
Before modifying a DSL script, move the sync objects from the Selected Objects list back to the Source Objects list, then add them again. Skipping this step may leave the task in an inconsistent state.
-
ETL configuration cannot be modified for migration tasks. Only sync tasks support this operation.
Usage notes
| Constraint | Consequence if violated |
|---|---|
| Modifying the ETL configuration may interrupt the running task. | Data synchronization pauses during the update. Proceed during a maintenance window. |
| Changes take effect only on incremental data after the task restarts. Historical data already synchronized is not reprocessed. | Records synchronized before the modification are not affected—intentional by design. |
| Destination table schema changes are not supported during ETL modification. | To change the schema, update the destination table before starting the sync task. |
| DSL script constraints (case sensitivity, field existence, single expression, new columns) are the same as for new tasks. | Same consequences as described in the new task usage notes above. |
Procedure
-
Log on to the Data Synchronization Tasks page of the new DTS console.
-
In the row for the target sync task, click the
icon and select Modify ETL Configurations. -
In the Advanced Configurations step, set Configure ETL to Yes.
-
In the text box, enter the updated DSL script.
To filter records where
idis greater than 3, usee_if(op_gt(id, 3), e_drop()). This drops any record where theidvalue exceeds 3, so it is not synchronized to the destination.
-
Complete the remaining steps as needed.
ETL script examples
The following examples cover the most common scenarios. Each uses DSL functions described in the DSL syntax reference section.
Data filtering
Use data filtering to sync only a subset of records—for example, to replicate recent orders to an analytics database, exclude test data from a production sync, or enforce region-specific data residency rules by dropping records that don't belong.
-
Drop records where
id> 10000:e_if(op_gt(`id`, 10000), e_drop()) -
Drop records where
namecontains "hangzhou":e_if(str_contains(`name`, "hangzhou"), e_drop()) -
Drop orders placed before a cutoff date:
e_if(op_lt(`order_timestamp`, "2015-02-23 23:54:55"), e_drop()) -
Drop records matching multiple conditions (AND):
e_if(op_and(str_contains(`name`, "hangzhou"), op_gt(`id`, 1000)), e_drop()) -
Drop records matching either condition (OR):
e_if(op_or(str_contains(`name`, "hangzhou"), op_gt(`id`, 1000)), e_drop())
Data masking
Use data masking when the destination database is accessed by a broader audience than the source—for example, when syncing production data to a reporting system or sharing data with a third party. Masking lets you protect personally identifiable information (PII) while preserving data for analytics.
Replace the last four digits of a phone number (positions 7–10) with asterisks:
e_set(`phone`, str_mask(`phone`, 7, 10, '*'))
Change tracking
Use change tracking when downstream consumers need to know when a row was last modified—for example, to implement incremental processing pipelines, detect stale records, or audit data freshness without querying the source database.
Before starting the task, add the dts_sync_time column to the destination table.
Add a dts_sync_time column to all tables for INSERT, UPDATE, and DELETE operations:
e_if(op_or(op_or(
op_eq(__OPERATION__, __OP_INSERT__),
op_eq(__OPERATION__, __OP_UPDATE__)),
op_eq(__OPERATION__, __OP_DELETE__)),
e_set(dts_sync_time, __COMMIT_TIMESTAMP__))
To apply this only to a specific table (for example, dts_test_table):
e_if(op_and(
op_eq(__TB__,'dts_test_table'),
op_or(op_or(
op_eq(__OPERATION__,__OP_INSERT__),
op_eq(__OPERATION__,__OP_UPDATE__)),
op_eq(__OPERATION__,__OP_DELETE__))),
e_set(dts_sync_time,__COMMIT_TIMESTAMP__))
Audit logging
Use audit logging when you need a full history of data changes in the destination table—for example, to meet compliance requirements, support rollback scenarios, or build a change log without a separate audit system. This pattern writes every incoming change as an INSERT, so the destination table accumulates a complete record of all modifications.
Before starting the task, add the operation_type and updated columns to the destination table.
Write the DML operation type and commit timestamp into the destination row, then convert all operations to INSERT:
e_compose(
e_switch(
op_eq(__OPERATION__,__OP_DELETE__), e_set(operation_type, 'DELETE'),
op_eq(__OPERATION__,__OP_UPDATE__), e_set(operation_type, 'UPDATE'),
op_eq(__OPERATION__,__OP_INSERT__), e_set(operation_type, 'INSERT')),
e_set(updated, __COMMIT_TIMESTAMP__),
e_set(__OPERATION__,__OP_INSERT__)
)
Distinguish full and incremental data
Use this pattern when the destination table receives both full migration data and incremental changes, and downstream consumers need to tell them apart—for example, to skip reprocessing historical rows in a streaming pipeline, or to validate that the initial load completed before applying incremental logic.
During a full migration, __COMMIT_TIMESTAMP__ is 0 (1970-01-01 08:00:00, adjusted for the local time zone). During incremental migration, it holds the actual log commit time. The script below writes True or False into an is_increment_dml column:
e_if_else(__COMMIT_TIMESTAMP__ > DATETIME('2000-01-01 00:00:00'),
e_set(`is_increment_dml`, True),
e_set(`is_increment_dml`, False)
)
DSL syntax reference
Built-in variables
The following built-in variables are available in all DSL scripts.
| Variable | Description | Data type | Example value |
|---|---|---|---|
__TB__ |
Table name | string | table |
__DB__ |
Database name | string | mydb |
__OPERATION__ |
DML operation type | string | __OP_INSERT__, __OP_UPDATE__, __OP_DELETE__ |
__BEFORE__ |
Before image of an UPDATE (value before the change). A DELETE operation has only a before image. | Special mark | v(column_name,__BEFORE__) |
__AFTER__ |
After image of an UPDATE (value after the change). An INSERT operation has only an after image. | Special mark | v(column_name,__AFTER__) |
__COMMIT_TIMESTAMP__ |
Transaction commit time | datetime | '2021-01-01 10:10:01' |
` column ` |
Value of the specified column in the current record | string | ` id , name ` |
Constants
| Type | Example |
|---|---|
| int | 123 |
| float | 123.4 |
| string | "hello1_world" |
| boolean | true or false |
| datetime | DATETIME('2021-01-01 10:10:01') |
Expression functions
Numeric operations
| Operation | Syntax | Parameters | Return value | Example |
|---|---|---|---|---|
| Addition | op_sum(value1, value2) |
integer or float | Integer if both inputs are integers; otherwise float | op_sum(col1, 1.0) |
| Subtraction | op_sub(value1, value2) |
integer or float | Integer if both inputs are integers; otherwise float | op_sub(col1, 1.0) |
| Multiplication | op_mul(value1, value2) |
integer or float | Integer if both inputs are integers; otherwise float | op_mul(col1, 1.0) |
| Division | op_div_true(value1, value2) |
integer or float | Integer if both inputs are integers; otherwise float | op_div_true(col1, 2.0) — if col1=15, returns 7.5 |
| Modulo | op_mod(value1, value2) |
integer or float | Integer if both inputs are integers; otherwise float | op_mod(col1, 10) — if col1=23, returns 3 |
Logical operations
| Operation | Syntax | Parameters | Return value | Example |
|---|---|---|---|---|
| Equals | op_eq(value1, value2) |
integer, float, or string | boolean | op_eq(col1, 23) |
| Greater than | op_gt(value1, value2) |
integer, float, or string | boolean | op_gt(col1, 1.0) |
| Less than | op_lt(value1, value2) |
integer, float, or string | boolean | op_lt(col1, 1.0) |
| Greater than or equal to | op_ge(value1, value2) |
integer, float, or string | boolean | op_ge(col1, 1.0) |
| Less than or equal to | op_le(value1, value2) |
integer, float, or string | boolean | op_le(col1, 1.0) |
| AND | op_and(value1, value2) |
boolean | boolean | op_and(is_male, is_student) |
| OR | op_or(value1, value2) |
boolean | boolean | op_or(is_male, is_student) |
| IN | op_in(value, json_array) |
any type; JSON array string | boolean | op_in(id,json_array('["0","1","2","3"]')) |
| Is null | op_is_null(value) |
any type | boolean | op_is_null(name) |
| Is not null | op_is_not_null(value) |
any type | boolean | op_is_not_null(name) |
String functions
| Operation | Syntax | Parameters | Return value | Example |
|---|---|---|---|---|
| Concatenate strings | op_add(str_1, str_2, ..., str_n) |
strings | Concatenated string | op_add(col,'hangzhou','dts') |
| Format and concatenate | str_format(format, value1, value2, ...) |
format: string with {} placeholders; values: any |
Formatted string | str_format("part1: {}, part2: {}", col1, col2) — if col1="ab" and col2="12", returns "part1: ab, part2: 12" |
| Replace substring | str_replace(original, oldStr, newStr, count) |
count: max replacements; -1 replaces all |
String after replacement | str_replace(name, "a", 'b', -1) — if name="aba", returns "bbb" |
| Replace in all string-type fields | tail_replace_string_field(search, replace, all) |
all: must be true |
String after replacement | tail_replace_string_field('\u000f','',true) — replaces \u000f with an empty string in all varchar, text, and char fields |
| Strip characters from both ends | str_strip(string_val, charSet) |
charSet: characters to remove |
Stripped string | str_strip(name, 'ab') — if name="axbzb", returns "xbz" |
| Lowercase | str_lower(value) |
string | Lowercase string | str_lower(str_col) |
| Uppercase | str_upper(value) |
string | Uppercase string | str_upper(str_col) |
| String to integer | cast_string_to_long(value) |
string | Integer | cast_string_to_long(col) |
| Integer to string | cast_long_to_string(value) |
integer | String | cast_long_to_string(col) |
| Count substring occurrences | str_count(str, pattern) |
string; substring | Integer | str_count(str_col, 'abc') — if str_col="zabcyabcz", returns 2 |
| Find substring position | str_find(str, pattern) |
string; substring | Position of the first match; -1 if not found |
str_find(str_col, 'abc') — if str_col="xabcy", returns 1 |
| Check if alphabetic | str_isalpha(str) |
string | boolean | str_isalpha(str_col) |
| Check if numeric | str_isdigit(str) |
string | boolean | str_isdigit(str_col) |
| Regular expression match | regex_match(str, regex) |
string; regex string | boolean | regex_match(__TB__,'user_\\\d+') |
| Mask a substring | str_mask(str, start, end, maskStr) |
start min: 0; end max: length−1; maskStr: single character |
Masked string | str_mask(phone, 7, 10, '#') |
| Substring after a marker | substring_after(str, cond) |
string; marker string | String after the marker (marker not included) | substring_after(col, 'abc') |
| Substring before a marker | substring_before(str, cond) |
string; marker string | String before the marker (marker not included) | substring_before(col, 'efg') |
| Substring between two markers | substring_between(str, cond1, cond2) |
string; two marker strings | String between the markers (markers not included) | substring_between(col, 'abc','efg') |
| Check if value is a string | is_string_value(value) |
string or column name | boolean | is_string_value(col1) |
| Get a MongoDB document field | bson_value("field1", "field2", ...) |
field names at each level | Field value | e_set(user_name, bson_value("person","name")) |
Time functions
| Operation | Syntax | Parameters | Return value | Example |
|---|---|---|---|---|
| Current system time (second precision) | dt_now() |
None | datetime | dt_now() |
| Current system time (millisecond precision) | dt_now_millis() |
None | datetime | dt_now_millis() |
| UTC timestamp (seconds) to datetime | dt_fromtimestamp(value, [timezone]) |
integer; optional timezone | datetime (second precision) | dt_fromtimestamp(1626837629,'GMT+08') |
| UTC timestamp (milliseconds) to datetime | dt_fromtimestamp_millis(value, [timezone]) |
integer; optional timezone | datetime (millisecond precision) | dt_fromtimestamp_millis(1626837629123,'GMT+08') |
| Datetime to UTC timestamp (seconds) | dt_parsetimestamp(value, [timezone]) |
datetime; optional timezone | Integer | dt_parsetimestamp(datetime_col,'GMT+08') |
| Datetime to UTC timestamp (milliseconds) | dt_parsetimestamp_millis(value, [timezone]) |
datetime; optional timezone | Integer | dt_parsetimestamp_millis(datetime_col,'GMT+08') |
| Datetime to string | dt_str(value, format) |
datetime; format string | String | dt_str(col1, 'yyyy-MM-dd HH:mm:ss') |
| String to datetime | dt_strptime(value, format) |
string; format string | datetime | dt_strptime('2021-07-21 03:20:29', 'yyyy-MM-dd hh:mm:ss') |
| Add or subtract time units | dt_add(value, [years=n], [months=n], [days=n], [hours=n], [minutes=n]) |
datetime; integer offsets (negative for subtraction) | datetime | dt_add(datetime_col, years=-1) |
Conditional expressions
| Syntax | Description | Example |
|---|---|---|
(cond ? val_1 : val_2) |
Ternary operator. Returns val_1 if cond is true; otherwise val_2. val_1 and val_2 must be the same type. |
(id>1000 ? 1 : 0) |
JSON functions
The value type represents the type of any field in the database.
| Operation | Syntax | Parameters | Return value | Example |
|---|---|---|---|---|
| Convert JSON array string to a set | json_array(arrayText) — usable only in boolean expressions |
arrayText: JSON array string |
Set collection | op_in(id,json_array('["0","1","2","3"]')) |
| Create a JSON array | json_array2(item...) |
items of any type | JSON array | json_array2("0","1","2","3") returns ["0","1","2","3"] |
| Create a JSON object | json_object(item...) |
key-value pairs (key is string, value is any type) | JSON | json_object('name','ZhangSan','age',32,'loginId',100) returns {"name":"ZhangSan","age":32,"loginId":100} |
| Insert into a JSON array at a position | json_array_insert(json, kvPairs...) |
JSONPath position; data to insert. If the position doesn't exist, returns the original JSON. If the element doesn't exist, appends to the array. | JSON | json_array_insert('{"Address":["City",1]}','$.Address[3]',100) returns {"Address":["City",1,100]} |
| Insert into a JSON object (only if absent) | json_insert(json, kvPairs...) |
JSONPath position; data to insert. If the position exists, returns the original JSON. | JSON | json_insert('{"Address":["City","Xian","Number",1]}','$.ID',100) returns {"Address":["City","Xian","Number",1],"ID":100} |
| Insert or update at a JSON path | json_set(json, kvPairs...) |
JSONPath position; data. Updates if position exists; inserts if absent. | value type | json_set('{"ID":1,"Address":["City","Xian","Number",1]}',"$.IP",100) returns {"ID":1,"Address":["City","Xian","Number",1],"IP":100} |
| Insert or update a key-value pair | json_put(json, key, value) |
JSON object; key string; value. Returns null if json is not a JSON object. |
JSON | json_put('{"loginId":100}','loginTime','2024-10-10') returns {"loginId":100,"loginTime":"2024-10-10"} |
| Replace at a JSON path | json_replace(json, kvPairs...) |
JSONPath position; replacement data. If position doesn't exist, returns the original JSON. | value type | json_replace('{"ID":1,"Address":["City","Xian","Number",1]}',"$.IP",100) returns the original JSON (position absent) |
| Check if a value exists at a JSON path | json_contains(json, jsonPath, item) |
JSON object; JSONPath; data to check | boolean | json_contains('{"ID":1,"Address":["City","Xian","Number",1]}','$.ID',1) returns true |
| Check if a JSON path exists | json_contains_path(json, jsonPath) |
JSON object; JSONPath | boolean | json_contains_path('{"ID":1,"Address":["City","Xian","Number",1]}','$.ID') returns true |
| Get value at a JSON path | json_extract(json, jsonPath) |
JSON object; JSONPath | value type | json_extract('{"ID":1,"Address":["City","Xian","Number",1]}','$.ID') returns 1 |
| Get value by key name | json_get(json, key) |
JSON object; key string | value type | json_get('{"ID":1,"Address":["City","Xian","Number",1]}','ID') returns 1 |
| Get all keys at a JSON path | json_keys(json, jsonPath) |
JSON object; JSONPath | JSON array | json_keys('{"ID":1,"Address":["City","Xian","Number",1]}','$') returns ["ID","Address"] |
| Get key count at a JSON path | json_length(json, jsonPath) |
JSON object; JSONPath ("$" is equivalent to json_length(json)) |
Integer | json_length('{"ID":1,"Address":["City","Xian","Number",1]}','$') returns 2 |
| Get root key count | json_length(json) |
JSON object | Integer | json_length('{"ID":1,"Address":["City","Xian","Number",1]}') returns 2 |
| Parse a JSON string | json_parse(json) |
JSON string | value type | json_parse('{"ID":1}') returns {"ID":1} |
| Remove a value at a JSON path | json_remove(json, jsonPath) |
JSON object; JSONPath | JSON | json_remove('{"loginId":100,"loginTime":"2024-10-10"}','$.loginTime') returns {"loginId":100} |
Global functions
Flow control functions
| Function | Syntax | Description | Example |
|---|---|---|---|
| If | e_if(bool_expr, func_invoke) |
Runs func_invoke if bool_expr is true. |
e_if(op_gt(id, 10), e_drop()) — drops records where id > 10 |
| If-else | e_if_else(bool_expr, func_invoke1, func_invoke2) |
Runs func_invoke1 if true; runs func_invoke2 if false. |
e_if_else(op_gt(id, 10), e_set(tag, 'large'), e_set(tag, 'small')) |
| Switch | e_switch(condition1, func1, condition2, func2, ..., default=default_func) |
Evaluates conditions in order and runs the first match. Runs default_func if none match. |
e_switch(op_gt(id, 100), e_set(str_col, '>100'), op_gt(id, 90), e_set(str_col, '>90'), default=e_set(str_col, '<=90')) |
| Compose | e_compose(func1, func2, func3, ...) |
Runs multiple functions in sequence as a single expression. | e_compose(e_set(str_col, 'test'), e_set(dt_col, dt_now())) — sets two columns in one expression |
Data manipulation functions
| Function | Syntax | Description | Example |
|---|---|---|---|
| Drop record | e_drop() |
Drops the current record. It is not written to the destination. | e_if(op_gt(id, 10), e_drop()) |
| Keep record | e_keep(condition) |
Keeps the record only if condition is true. |
e_keep(op_gt(id, 1)) — syncs only records where id > 1 |
| Set column value | e_set(col, val, NEW) |
Sets col to val. Pass NEW (without a preceding comma) to convert the column to the data type of val. Omit NEW and ensure type compatibility; a mismatch may cause the task to fail. |
e_set(col1, 1, NEW) — converts col1 to a numeric type and sets it to 1 |
| MongoDB field mapping | e_expand_bson_value('*', 'fieldA', {"fieldB":"fieldC"}) |
Selects fields to keep (* = all), drops fieldA, and renames fieldB to fieldC. Field name mapping is optional. |
e_expand_bson_value("*", "_id,name") — writes all fields except _id and name to the destination |
What's next
-
For the full SLS-based DSL syntax reference, see Syntax overview.
-
To create a sync task, see Sync solutions.