All Products
Search
Document Center

Data Transmission Service:Konfigurasikan ETL dalam tugas migrasi atau sinkronisasi DTS

Last Updated:Jun 21, 2026

Data Transmission Service (DTS) menyediakan kemampuan pemrosesan data extract, transform, and load (ETL) berbasis aliran. Dikombinasikan dengan replikasi data aliran yang efisien dari DTS, fitur ini memungkinkan ekstraksi, transformasi, manipulasi, dan pemuatan data secara real-time. Topik ini menjelaskan cara mengonfigurasi ETL dalam tautan DTS serta menyediakan informasi sintaks terkait untuk membantu Anda menggunakan ETL dalam berbagai skenario, seperti penyaringan data, penyamaran data, pencatatan stempel waktu modifikasi data, dan audit perubahan data.

Informasi latar belakang

DTS adalah layanan migrasi dan sinkronisasi data yang biasanya digunakan untuk relokasi data atau transmisi data real-time. Terkadang, pengguna perlu mentransformasi atau menyaring data real-time sebelum menuliskannya ke database tujuan. Untuk memenuhi kebutuhan tersebut, DTS menawarkan pemrosesan data ETL berbasis aliran menggunakan bahasa skrip domain-spesifik (DSL) guna mendefinisikan logika pemrosesan data secara fleksibel. Untuk ikhtisar DSL dan sintaks konfigurasinya, lihat Pengenalan sintaks DSL pemrosesan data.

DTS mendukung dua cara mengonfigurasi ETL:

Catatan

Baik tugas migrasi DTS maupun tugas sinkronisasi mendukung konfigurasi ETL. Topik ini menggunakan tugas sinkronisasi sebagai contoh. Metode konfigurasi untuk tugas migrasi serupa.

Database yang didukung

Tabel berikut mencantumkan database sumber dan tujuan yang didukung untuk ETL.

Source database

Destination database

SQL Server

  • AnalyticDB for MySQL 3.0

  • SQL Server

  • MySQL

  • PolarDB for MySQL

MySQL

  • AnalyticDB for MySQL 3.0

  • AnalyticDB for PostgreSQL

  • Kafka

  • ClickHouse cluster

  • MySQL

  • PolarDB for MySQL

  • Elasticsearch

  • Redis

Self-managed Oracle

  • AnalyticDB for MySQL 3.0

  • AnalyticDB for PostgreSQL

  • Kafka

  • MaxCompute

  • PolarDB-X 2.0

  • PolarDB for PostgreSQL (Compatible with Oracle)

PolarDB for MySQL

  • AnalyticDB for MySQL 3.0

  • MySQL

  • PolarDB for MySQL

PolarDB for PostgreSQL (Compatible with Oracle)

  • AnalyticDB for MySQL 3.0

  • PolarDB for PostgreSQL (Compatible with Oracle)

PolarDB-X 1.0

  • Kafka

  • Tablestore

PolarDB-X 2.0

  • PolarDB-X 2.0

  • AnalyticDB for MySQL 3.0

  • MySQL

  • PolarDB for MySQL

Self-managed Db2 for LUW

MySQL

Self-managed Db2 for i

MySQL

PolarDB for PostgreSQL

  • PolarDB for PostgreSQL

  • PostgreSQL

PostgreSQL

  • PolarDB for PostgreSQL

  • PostgreSQL

  • ApsaraDB SelectDB Edition

TiDB

  • PolarDB for MySQL

  • MySQL

  • AnalyticDB for MySQL 3.0

MongoDB

Lindorm

Konfigurasikan ETL saat membuat tugas sinkronisasi

Catatan

  • Jika skrip ETL Anda menambahkan kolom baru, tambahkan kolom tersebut secara manual ke database tujuan. Jika tidak, skrip ETL tidak akan berlaku. Misalnya, dalam e_set(`new_column`, dt_now()), Anda harus menambahkan new_column secara manual ke database tujuan.

  • Skrip DSL hanya menangani operasi transformasi dan pembersihan data. Skrip ini tidak mendukung pembuatan objek database.

  • Bidang yang dirujuk dalam skrip DSL harus ada di database sumber dan tidak boleh difilter oleh kondisi filter apa pun. Jika tidak, tugas mungkin gagal.

  • Skrip DSL bersifat case-sensitive. Nama database, nama tabel, dan nama bidang harus persis sama dengan yang ada di database sumber.

  • Skrip DSL tidak mendukung beberapa ekspresi. Gunakan fungsi e_compose untuk menggabungkan beberapa ekspresi menjadi satu.

  • Semua perubahan DML dari semua tabel di database sumber harus menghasilkan informasi kolom yang identik setelah pemrosesan DSL. Jika tidak, tugas mungkin gagal. Misalnya, jika Anda menggunakan fungsi e_set untuk menambahkan kolom, pastikan operasi INSERT, UPDATE, dan DELETE dari database sumber semuanya menghasilkan kolom tambahan yang sama di tabel tujuan. Untuk informasi lebih lanjut, lihat Catat waktu modifikasi data.

Prosedur

  1. Buat tugas sinkronisasi. Untuk detailnya, lihat Ikhtisar solusi sinkronisasi.

  2. Pada langkah Advanced Configurations, atur Configure ETL ke Yes.

  3. Di kotak input, masukkan pernyataan ETL Anda menggunakan sintaks DSL pemrosesan data.

    Catatan

    Sebagai contoh, untuk menghapus catatan di mana id lebih besar dari 3, gunakan e_if(op_gt(`id`, 3), e_drop()). Di sini, op_gt adalah fungsi ekspresi yang memeriksa apakah suatu nilai lebih besar dari nilai lain, dan id adalah variabel. Skrip ini menyaring catatan di mana id > 3.

  4. Selesaikan langkah-langkah yang tersisa sesuai kebutuhan.

Ubah konfigurasi ETL pada tugas sinkronisasi yang sudah ada

Mengubah konfigurasi ETL pada tugas sinkronisasi yang sudah ada mencakup:

  • Jika Anda memiliki tugas sinkronisasi yang sudah ada tanpa konfigurasi ETL—artinya saat membuat tugas sinkronisasi, Configure ETL diatur ke No—Anda dapat mengubah No menjadi Yes dan mengonfigurasi skrip DSL.

  • Jika ETL sudah dikonfigurasi, Anda dapat mengubah skrip DSL yang ada atau mengatur Configure ETL ke No.

    Penting
    • Untuk mengubah skrip DSL yang ada, pertama-tama pindahkan objek sinkronisasi dari Selected Objects kembali ke Source Objects, lalu tambahkan kembali ke Selected Objects sebelum mengedit skrip DSL.

    • Tugas migrasi tidak mendukung pengubahan skrip DSL.

Catatan

  • Mengubah konfigurasi ETL pada tugas sinkronisasi yang sudah ada tidak mendukung perubahan skema tabel di database tujuan. Untuk mengubah skema, lakukan di database tujuan sebelum memulai tugas sinkronisasi.

  • Mengubah konfigurasi ETL dapat mengganggu tautan data. Lakukan dengan hati-hati.

  • Perubahan konfigurasi ETL hanya berlaku untuk data inkremental yang diproses setelah tugas dimulai ulang. Perubahan ini tidak memengaruhi data historis yang diproses sebelum perubahan.

  • Skrip DSL hanya menangani operasi transformasi dan pembersihan data. Skrip ini tidak mendukung pembuatan objek database.

  • Bidang yang dirujuk dalam skrip DSL harus ada di database sumber dan tidak boleh difilter oleh kondisi filter apa pun. Jika tidak, tugas mungkin gagal.

  • Skrip DSL bersifat case-sensitive. Nama database, nama tabel, dan nama bidang harus persis sama dengan yang ada di database sumber.

  • Skrip DSL tidak mendukung beberapa ekspresi. Gunakan fungsi e_compose untuk menggabungkan beberapa ekspresi menjadi satu.

  • Semua perubahan DML dari semua tabel di database sumber harus menghasilkan informasi kolom yang identik setelah pemrosesan DSL. Jika tidak, tugas mungkin gagal. Misalnya, jika Anda menggunakan fungsi e_set untuk menambahkan kolom, pastikan operasi INSERT, UPDATE, dan DELETE dari database sumber semuanya menghasilkan kolom tambahan yang sama di tabel tujuan. Untuk informasi lebih lanjut, lihat Catat waktu modifikasi data.

Prosedur

  1. Masuk ke halaman daftar tugas sinkronisasi DTS baru.

  2. Pada tugas sinkronisasi target, klik 点点点 lalu pilih Modify ETL Configurations.

  3. Pada langkah Advanced Configurations, atur Configure ETL ke Yes.

  4. Di kotak input, masukkan pernyataan ETL Anda menggunakan sintaks DSL pemrosesan data.

    Catatan

    Sebagai contoh, untuk menghapus catatan di mana id lebih besar dari 3, gunakan e_if(op_gt(`id`, 3), e_drop()). Di sini, op_gt adalah fungsi ekspresi yang memeriksa apakah suatu nilai lebih besar dari nilai lain, dan id adalah variabel. Skrip ini menyaring catatan di mana id > 3.

  5. Selesaikan langkah-langkah yang tersisa sesuai kebutuhan.

Pengenalan sintaks DSL pemrosesan data

DSL pemrosesan data adalah bahasa skrip yang dirancang oleh DTS khusus untuk pemrosesan data dalam skenario sinkronisasi data. DSL ini mendukung fungsi kondisional serta menangani string, tanggal, dan nilai numerik, sehingga memungkinkan Anda mendefinisikan logika pemrosesan data secara fleksibel dengan fitur-fitur berikut:

  • Fungsionalitas kuat: Menyediakan banyak fungsi dan mendukung komposisi fungsi.

  • Sintaks relatif sederhana: Menyertakan contoh untuk skenario umum seperti penyaringan data, transformasi, dan penyamaran. Untuk detailnya, lihat Contoh skenario khas.

  • Efisiensi eksekusi tinggi: Menggunakan teknologi generasi kode untuk meminimalkan dampak performa pada proses sinkronisasi.

Catatan
  • Dalam sintaks DSL, nama kolom menggunakan backtick (``), bukan tanda kutip tunggal ('').

  • Produk ini merujuk sintaks pemrosesan data SLS. Produk ini mendukung fungsi JSON tetapi tidak mendukung fungsi pemisahan event. Untuk sintaks SLS, lihat Pengenalan sintaks.

Contoh skenario khas

Penyaringan data

  • Filter berdasarkan kolom numerik: Hapus catatan di mana id > 10000 agar tidak disinkronkan: e_if(op_gt(`id`, 10000), e_drop()).

  • Filter berdasarkan kecocokan string: Hapus catatan di mana name mengandung "hangzhou": e_if(str_contains(`name`, "hangzhou"), e_drop()).

  • Filter berdasarkan tanggal: Jangan sinkronkan catatan di mana order_timestamp lebih awal dari waktu tertentu: e_if(op_lt(`order_timestamp`, "2015-02-23 23:54:55"), e_drop()).

  • Filter berdasarkan beberapa kondisi:

    • Hapus catatan di mana id > 1000 dan name mengandung "hangzhou": e_if(op_and(str_contains(`name`, "hangzhou"), op_gt(`id`, 1000)), e_drop()).

    • Hapus catatan di mana id > 1000 atau name mengandung "hangzhou": e_if(op_or(str_contains(`name`, "hangzhou"), op_gt(`id`, 1000)), e_drop()).

Penyamaran data

Penyamaran: Ganti empat digit terakhir pada kolom nomor telepon dengan tanda bintang: e_set(`phone`, str_mask(`phone`, 7, 10, '*')).

Catat waktu modifikasi data

  • Tambahkan kolom ke semua tabel: Saat __OPERATION__ adalah INSERT, UPDATE, atau DELETE, tambahkan kolom bernama "dts_sync_time" dengan nilai stempel waktu commit log (__COMMIT_TIMESTAMP__).

    e_if(op_or(op_or(
            op_eq(__OPERATION__, __OP_INSERT__),
            op_eq(__OPERATION__, __OP_UPDATE__)),
            op_eq(__OPERATION__, __OP_DELETE__)),
        e_set(dts_sync_time, __COMMIT_TIMESTAMP__))
  • Tambahkan kolom ke tabel tertentu "dts_test_table": Saat __OPERATION__ adalah INSERT, UPDATE, atau DELETE, tambahkan kolom bernama "dts_sync_time" dengan nilai stempel waktu commit log (__COMMIT_TIMESTAMP__).

    e_if(op_and(
          op_eq(__TB__,'dts_test_table'),
          op_or(op_or(
            op_eq(__OPERATION__,__OP_INSERT__),
            op_eq(__OPERATION__,__OP_UPDATE__)),
            op_eq(__OPERATION__,__OP_DELETE__))),
          e_set(dts_sync_time,__COMMIT_TIMESTAMP__))
    Catatan

    Anda harus menambahkan kolom "dts_sync_time" secara manual ke tabel tujuan sebelum memulai tugas.

Audit perubahan data

Catat jenis dan waktu perubahan data: Tulis jenis perubahan ke kolom "operation_type" dan stempel waktu perubahan ke kolom "updated" di database tujuan.

e_compose(
    e_switch(
        op_eq(__OPERATION__,__OP_DELETE__), e_set(operation_type, 'DELETE'),
        op_eq(__OPERATION__,__OP_UPDATE__), e_set(operation_type, 'UPDATE'),
        op_eq(__OPERATION__,__OP_INSERT__), e_set(operation_type, 'INSERT')),
    e_set(updated, __COMMIT_TIMESTAMP__),
    e_set(__OPERATION__,__OP_INSERT__)
)
Catatan

Anda harus menambahkan kolom "operation_type" dan "updated" ke tabel tujuan sebelum memulai tugas.

Membedakan antara data penuh dan data inkremental

Catat apakah data berasal dari migrasi penuh atau inkremental di kolom is_increment_dml. Anda dapat membedakan antara migrasi penuh dan inkremental dengan memeriksa nilai __COMMIT_TIMESTAMP__. Dalam migrasi penuh, __COMMIT_TIMESTAMP__ bernilai 0 (1970-01-01 08:00:00, dipengaruhi zona waktu). Dalam migrasi inkremental, nilainya mencerminkan waktu penulisan log database sumber. Skrip ETL yang sesuai adalah:

e_if_else(__COMMIT_TIMESTAMP__ > DATETIME('2000-01-01 00:00:00'), 
    e_set(`is_increment_dml`, True),
    e_set(`is_increment_dml`, False)
)

Sintaks DSL pemrosesan data

Konstanta dan variabel

  • Konstanta

    Type

    Example

    int

    123

    float

    123.4

    string

    "hello1_world"

    boolean

    true or false

    datetime

    DATETIME('2021-01-01 10:10:01')

  • Variabel

    Variable

    Description

    Data type

    Example value

    __TB__

    Table name

    string

    table

    __DB__

    Database name

    string

    mydb

    __OPERATION__

    Operation type

    string

    __OP_INSERT__, __OP_UPDATE__, __OP_DELETE__

    __BEFORE__

    Nilai pre-image untuk operasi UPDATE (nilai sebelum perubahan)

    Catatan

    Operasi DELETE hanya memiliki nilai pre-image.

    Special marker, no type

    v(`column_name`,__BEFORE__)

    __AFTER__

    Nilai post-image untuk operasi UPDATE (nilai setelah perubahan)

    Catatan

    Operasi INSERT hanya memiliki nilai post-image.

    Special marker, no type

    v(`column_name`,__AFTER__)

    __COMMIT_TIMESTAMP__

    Transaction commit time

    datetime

    '2021-01-01 10:10:01'

    `column`

    Value of the specified column for a record

    string

    `id`, `name`

Fungsi ekspresi

  • Operasi numerik

    Function

    Syntax

    Value range

    Return value

    Example

    Addition

    op_sum(value1, value2)

    • value1: integer or floating-point number

    • value2: integer or floating-point number

    Returns an integer if both parameters are integers; otherwise, returns a floating-point number.

    op_sum(`col1`, 1.0)

    Subtraction

    op_sub(value1, value2)

    • value1: integer or floating-point number

    • value2: integer or floating-point number

    Returns an integer if both parameters are integers; otherwise, returns a floating-point number.

    op_sub(`col1`, 1.0)

    Multiplication

    op_mul(value1, value2)

    • value1: integer or floating-point number

    • value2: integer or floating-point number

    Returns an integer if both parameters are integers; otherwise, returns a floating-point number.

    op_mul(`col1`, 1.0)

    Division

    op_div_true(value1, value2)

    • value1: integer or floating-point number

    • value2: integer or floating-point number

    Returns an integer if both parameters are integers; otherwise, returns a floating-point number.

    op_div_true(`col1`, 2.0); if col1=15, returns 7.5.

    Modulo operation

    op_mod(value1, value2)

    • value1: integer or floating-point number

    • value2: integer or floating-point number

    Returns an integer if both parameters are integers; otherwise, returns a floating-point number.

    op_mod(`col1`, 10); if col1=23, returns 3.

  • Operasi logis

    Function

    Syntax

    Valid values

    Return value

    Example

    Equality check

    op_eq(value1, value2)

    • value1: integer, floating-point number, or string

    • value2: integer, floating-point number, or string

    Boolean: true or false

    op_eq(`col1`, 23)

    Greater-than check

    op_gt(value1, value2)

    • value1: integer, floating-point number, or string

    • value2: integer, floating-point number, or string

    Boolean: true or false

    op_gt(`col1`, 1.0)

    is less than

    op_lt(value1, value2)

    • value1: integer, floating-point number, or string

    • value2: integer, floating-point number, or string

    Boolean: true or false

    op_lt(`col1`, 1.0)

    Greater-than-or-equal check

    op_ge(value1, value2)

    • value1: integer, floating-point number, or string

    • value2: integer, floating-point number, or string

    Boolean: true or false

    op_ge(`col1`, 1.0)

    Less-than-or-equal check

    op_le(value1, value2)

    • value1: integer, floating-point number, or string

    • value2: integer, floating-point number, or string

    Boolean: true or false

    op_le(`col1`, 1.0)

    AND operation

    op_and(value1, value2)

    • value1: boolean

    • value2: boolean

    Boolean: true or false

    op_and(`is_male`, `is_student`)

    OR operation

    op_or(value1, value2)

    • value1: boolean

    • value2: boolean

    Boolean: true or false

    op_or(`is_male`, `is_student`)

    IN operation

    op_in(value, json_array)

    • value: any type

    • json_array: JSON-formatted string

    Boolean: true or false

    op_in(`id`,json_array('["0","1","2","3","4","5","6","7","8"]'))

    Check if value is null

    op_is_null(value)

    value: any type

    Boolean: true or false

    op_is_null(`name`)

    Check if value is not null

    op_is_not_null(value)

    value: any type

    Boolean: true or false

    op_is_not_null(`name`)

  • Fungsi string

    Function

    Syntax

    Value range

    Return value

    Example

    String concatenation

    op_add(str_1,str_2,...,str_n)

    • str_1: string

    • str_2: string

    • ...

    • str_n: string

    Concatenated string

    op_add(`col`,'hangzhou','dts')

    String formatting and concatenation

    str_format(format, value1, value2, value3, ...)

    • format: string with braces as placeholders, such as "part1: {}, part2: {}"

    • value1: any type

    • value2: any type

    Formatted string

    str_format("part1: {}, part2: {}", `col1`, `col2`); if col1="ab" and col2="12", returns "part1: ab, part2: 12".

    String replacement

    str_replace(original, oldStr, newStr, count)

    • original: original string

    • oldStr: string to replace

    • newStr: replacement string

    • count: integer, maximum number of replacements. Set to -1 for all occurrences.

    Replaced string

    str_replace(`name`, "a", 'b', 1); if name="aba", returns "bba". str_replace(`name`, "a", 'b', -1); if name="aba", returns "bbb".

    Replace values in all string-type fields (such as varchar, text, char)

    tail_replace_string_field(search, replace, all)

    • search: string to replace

    • replace: replacement string

    • all: whether to replace all matches; currently supports only true.

      Catatan

      If you do not want to replace all matches, use the str_replace function.

    Replaced string

    tail_replace_string_field('\u000f', '', true) replaces all occurrences of "\u000f" in string-type field values with a space.

    Remove specified characters from start and end of string

    str_strip(string_val, charSet)

    • string_val: original string

    • char_set: set of characters to remove

    String with leading and trailing characters removed

    str_strip(`name`, 'ab'); if name=axbzb, returns xbz.

    Convert string to lowercase

    str_lower(value)

    value: string column or string literal

    Lowercase string

    str_lower(`str_col`)

    Convert string to uppercase

    str_upper(value)

    value: string column or string literal

    Uppercase string

    str_upper(`str_col`)

    Convert string to number

    cast_string_to_long(value)

    value: string

    Integer

    cast_string_to_long(`col`)

    Convert number to string

    cast_long_to_string(value)

    value: integer

    String

    cast_long_to_string(`col`)

    Count substring occurrences

    str_count(str,pattern)

    • str: string column or string literal

    • pattern: substring to find

    Number of times the substring appears

    str_count(`str_col`, 'abc'); if str_col="zabcyabcz", returns 2.

    Find substring position

    str_find(str, pattern)

    • str: string column or string literal

    • pattern: substring to find

    Position of first match; returns `-1` if not found

    str_find(`str_col`, 'abc'); if `str_col="xabcy"`, returns `1`.

    Check if string consists only of letters

    str_isalpha(str)

    str: string column or string literal

    true or false

    str_isalpha(`str_col`)

    Check if string consists only of digits

    str_isdigit(str)

    • str: string column or string literal

    true or false

    str_isdigit(`str_col`)

    Regular expression matching

    regex_match(str,regex)

    • str: string column or string literal

    • regex: regular expression string column or string literal

    true or false

    regex_match(__TB__,'user_\\d+')

    Mask part of a string with a specified character for data masking, such as replacing the last four digits of a phone number with asterisks

    str_mask(str, start, end, maskStr)

    • str: string column or string literal

    • start: integer, starting position for masking (minimum 0)

    • end: integer, ending position for masking (maximum string length minus one)

    • maskStr: single-character string, such as '#'

    String with characters from start to end masked

    str_mask(`phone`, 7, 10, '#')

    Extract part of string after cond

    substring_after(str, cond)

    • str: original string

    • cond: string

    String

    Catatan

    Result does not include cond.

    substring_after(`col`, 'abc')

    Extract part of string before cond

    substring_before(str, cond)

    • str: original string

    • cond: string

    String

    Catatan

    Result does not include cond.

    substring_before(`col`, 'efg')

    Extract part of string between cond1 and cond2

    substring_between(str, cond1, cond2)

    • str: original string

    • cond1: string

    • cond2: string

    String

    Catatan

    Result does not include cond1 or cond2.

    substring_between(`col`, 'abc','efg')

    Check if value is a string type

    is_string_value(value)

    value: string or column name

    Boolean: true or false

    is_string_value(`col1`)

    Replace content in string-type fields; starts from the end in reverse order

    tail_replace_string_field(search, replace, all)

    search: string to replace

    replace: replacement string

    all: whether to replace all; true or false

    Replaced string

    Replace "\u000f" with a space in all string field values.

    tail_replace_string_field('\u000f','',true)

    Get value of a field in MongoDB

    bson_value("field1","field2","field3",...)

    • field1: top-level field name

    • field2: second-level field name

    Value of the specified field in the document

    • e_set(`user_id`, bson_value("id"))

    • e_set(`user_name`, bson_value("person","name"))

  • Fungsi tanggal dan waktu

    Function

    Syntax

    Value range

    Return value

    Example

    Current system time

    dt_now()

    None

    DATETIME, accurate to seconds

    dts_now()

    dt_now_millis()

    None

    DATETIME, accurate to milliseconds

    dt_now_millis()

    Convert UTC timestamp (seconds) to DATETIME

    dt_fromtimestamp(value,[timezone])

    • value: integer

    • timezone: time zone (optional)

    DATETIME, accurate to seconds

    dt_fromtimestamp(1626837629)

    dt_fromtimestamp(1626837629,'GMT+08')

    Convert UTC timestamp (milliseconds) to DATETIME

    dt_fromtimestamp_millis(value,[timezone])

    • value: integer

    • timezone: time zone (optional)

    DATETIME, accurate to milliseconds

    dt_fromtimestamp_millis(1626837629123);

    dt_fromtimestamp_millis(1626837629123,'GMT+08')

    Convert DATETIME to UTC timestamp (seconds)

    dt_parsetimestamp(value,[timezone])

    • value: DATETIME

    • timezone: time zone (optional)

    Integer

    dt_parsetimestamp(`datetime_col`)

    dt_parsetimestamp(`datetime_col`,'GMT+08')

    Convert DATETIME to UTC timestamp (milliseconds)

    dt_parsetimestamp_millis(value,[timezone])

    • value: DATETIME

    • timezone: time zone (optional)

    Integer

    dt_parsetimestamp_millis(`datetime_col`)

    dt_parsetimestamp_millis(`datetime_col`,'GMT+08')

    Convert DATETIME to string

    dt_str(value, format)

    • value: DATETIME

    • format: string in yyyy-MM-dd HH:mm:ss format

    String

    dt_str(`col1`, 'yyyy-MM-dd HH:mm:ss')

    Convert string to DATETIME

    dt_strptime(value,format)

    • value: string

    • format: string in yyyy-MM-dd HH:mm:ss format

    DATETIME

    dt_strptime('2021-07-21 03:20:29', 'yyyy-MM-dd hh:mm:ss')

    Adjust time by adding or subtracting years, months, days, hours, minutes, or seconds

    dt_add(value, [years=intVal],

    [months=intVal],

    [days=intVal],

    [hours=intVal],

    [minutes=intVal]

    )

    • value: DATETIME

    • intVal: integer

      Catatan

      The minus sign (−) indicates subtraction.

    DATETIME

    • dt_add(datetime_col,years=-1)

    • dt_add(datetime_col,years=1,months=1)

  • Ekspresi kondisional

    Function

    Syntax

    Value range

    Return value

    Example

    Similar to the ternary operator (? :) in C, returns a value based on a condition

    (cond ? val_1 : val_2)

    • cond: boolean field or expression

    • val_1: return value 1

    • val_2: return value 2

      Catatan

      val_1 and val_2 must be of the same type.

    Returns val_1 if cond is true; otherwise, returns val_2

    (id>1000? 1 : 0)

  • Fungsi JSON

    Catatan

    The value type represents any field type in the database.

    Function

    Syntax

    Value range

    Return value

    Example

    Convert a JSON array string to a Set

    json_array(arrayText)

    Catatan

    Can only be used in expressions that return a boolean.

    arrayText: string, the JSON array string to convert

    Set

    op_in(`id`,json_array('["0","1","2","3"]')) returns the Set ["0","1","2","3"].

    Create a JSON array with specified data

    json_array2(item...)

    item...: value type, data for the JSON array

    JSON array

    json_array2("0","1","2","3") returns ["0","1","2","3"].

    Create a JSON object with specified data

    json_object(item...)

    item...: data of a JSON object (key-value pairs), consisting of a key name (string) and a key value (value type), separated by a comma (,).

    JSON

    json_object('name','ZhangSan','age',32, 'loginId',100) returns {"name":"ZhangSan","age":32,"loginId":100}.

    Insert data at a specified position (array) in a JSON object

    json_array_insert(json, kvPairs...)

    • json: string, the JSON object to modify

    • kvPairs...: data to insert. Each pair consists of a JSONPath (string) and a value (value type), separated by a comma.

    JSON

    Catatan
    • If the specified position does not exist, returns the original JSON object.

    • If the element at the specified position does not exist, the data is appended to the end of the target array.

    json_array_insert('{"Address":["City",1]}','$.Address[3]',100) returns {"Address":["City",1,100]}.

    Insert data at a specified position in a JSON object

    json_insert(json, kvPairs...)

    • json: string, the JSON object to modify

    • kvPairs...: data to insert. Each pair consists of a JSONPath (string) and a value (value type), separated by a comma.

    JSON

    Catatan
    • If the specified location exists, the system returns the JSON object to be operated on.

    • If the specified position does not exist, the data is added to the JSON object.

    json_insert('{"Address":["City","Xian","Number",1]}','$.ID',100) returns {"Address":["City","Xian","Number",1],"ID":100}.

    Insert or update data at a specified position in a JSON object

    json_set(json, kvPairs...)

    • json: string, the JSON object to modify

    • kvPairs...: data to insert or update. Each pair consists of a JSONPath (string) and a value (value type), separated by a comma.

    value type

    Catatan
    • If the specified position exists, updates the data.

    • If the specified position does not exist, adds the data to the JSON object.

    json_set('{"ID":1,"Address":["City","Xian","Number",1]}',"$.IP",100) returns {"ID":1,"Address":["City","Xian","Number",1], "IP":100}.

    Insert or update a key-value pair in a JSON object

    json_put(json, key, value)

    • json: string, the JSON object to modify

    • key: string, the key name to insert or update

    • value: value type, the value for the key

    JSON

    Catatan
    • If json is not a JSON object, returns null.

    • If the key exists, updates its value.

    • If the key does not exist, adds it to the JSON object.

    json_put('{"loginId":100}','loginTime','2024-10-10') returns {"loginId":100, "loginTime":"2024-10-10"}.

    Replace data at a specified position in a JSON object

    json_replace(json, kvPairs...)

    • json: string, the JSON object to modify

    • kvPairs...: data to replace. Each pair consists of a JSONPath (string) and a value (value type), separated by a comma.

    value type

    Catatan

    If the specified position does not exist, returns the original JSON object.

    json_replace('{"ID":1,"Address":["City","Xian","Number",1]}',"$.IP",100) returns {"ID":1,"Address":["City","Xian","Number",1]}.

    Check if specified data exists at a position in a JSON object

    json_contains(json, jsonPath, item)

    • json: string, the JSON object to query

    • jsonPath: string, the position in the JSON object

    • item: value type, the data to search for

    Boolean: true or false

    json_contains('{"ID":1,"Address":["City","Xian","Number",1]}','$.ID',1) returns true.

    Check whether a specified position exists in a JSON object.

    json_contains_path(json, jsonPath)

    • json: string, the JSON object to query

    • jsonPath: string, the position to check

    Boolean: true or false

    json_contains_path('{"ID":1,"Address":["City","Xian","Number",1]}','$.ID') returns true.

    Get data from a specified position in a JSON object

    json_extract(json, jsonPath)

    • json: string, the JSON object to query

    • jsonPath: string, the position in the JSON object

    value type

    json_extract('{"ID":1,"Address":["City","Xian","Number",1]}','$.ID') returns 1.

    Get the value of a specified key in a JSON object

    json_get(json, key)

    • json: string, the JSON object to query

    • key: string, the key name

    value type

    json_get('{"ID":1,"Address":["City","Xian","Number",1]}','ID') returns 1.

    Get all keys at a specified position in a JSON object

    json_keys(json, jsonPath)

    • json: string, the JSON object to query

    • jsonPath: string, the position in the JSON object

    JSON array

    json_keys('{"ID":1,"Address":["City","Xian","Number",1]}','$') returns ["ID","Address"].

    Get the length (number of keys) at a specified position in a JSON object

    json_length(json, jsonPath)

    • json: string, the JSON object to query

    • jsonPath: string, the position in the JSON object

      Catatan

      If jsonPath is "$", it is equivalent to json_length(json).

    Integer

    json_length('{"ID":1,"Address":["City","Xian","Number",1]}','$') returns 2.

    Get the length (number of keys) at the root of a JSON object

    json_length(json)

    json: string, the JSON object to query

    Integer

    json_length('{"ID":1,"Address":["City","Xian","Number",1]}') returns 2.

    Parse a JSON string into a JSON object

    json_parse(json)

    json: string, the JSON string to parse

    value type

    json_parse('{"ID":1,"Address":["City","Xian","Number",1]}') returns {"ID":1,"Address":["City","Xian","Number",1]}.

    Remove data from a specified position in a JSON object

    json_remove(json, jsonPath)

    • json: string, the JSON object to modify

    • jsonPath: string, the position in the JSON object

    JSON

    json_remove('{"loginId":100, "loginTime":"2024-10-10"}','$.loginTime') returns {"loginId":100}.

Fungsi global

  • Fungsi kontrol alur

    Function

    Syntax

    Parameters

    Example

    if statement

    e_if(bool_expr, func_invoke)

    • bool_expr: boolean constant or function call. Constants: true or false. Function call example: op_gt(`id`, 10).

    • func_invoke: function call. Supported: e_drop, e_keep, e_set, e_if, e_compose

    e_if(op_gt(`id`, 10), e_drop()); drops the record if ID > 10.

    if-else statement

    e_if_else(bool_expr, func_invoke1, func_invoke2)

    • bool_expr: boolean constant or function call. Constants: true or false. Function call example: op_gt(`id`, 10).

    • func_invoke1: function call executed if condition is true.

    • func_invoke2: function call executed if condition is false.

    e_if_else(op_gt(`id`, 10), e_set(`tag`, 'large'), e_set(`tag`, 'small')); sets tag to "large" if ID > 10, otherwise to "small".

    Switch-like statement that evaluates multiple conditions and executes the first matching operation. Executes a default operation if no conditions match.

    s_switch(condition1, func1, condition2, func2, ..., default=default_func)

    • condition1: boolean constant or function call. Constants: true or false. Function call example: op_gt(`id`, 10).

    • func_invoke: function call. Checks condition1; if true, executes this function and exits the switch. If false, checks the next condition.

    • default_func: function call executed if all conditions are false.

    e_switch(op_gt(`id`, 100), e_set(`str_col`, '>100'), op_gt(`id`, 90), e_set(`str_col`, '>90'), default=e_set(`str_col`, '<=90')).

    Combine multiple operations

    e_compose(func1, func2, func3, ...)

    • func1: function call. Can be e_set, e_drop, e_if.

    • func2: function call. Can be e_set, e_drop, e_if.

    e_compose(e_set(`str_col`, 'test'), e_set(`dt_col`, dt_now())); sets str_col to "test" and dt_col to the current time.

  • Fungsi manipulasi data

    Function

    Syntax

    Parameters

    Example

    Drop this record (do not synchronize)

    e_drop()

    None

    e_if(op_gt(`id`, 10), e_drop()); drops records where ID > 10.

    Keep this record (synchronize to destination)

    e_keep(condition)

    condition: boolean expression

    e_keep(op_gt(id, 1)); synchronizes only records where ID > 1.

    Set column value

    e_set(`col`, val, NEW)

    • col: column name

    • val: constant or function call. Must match col's data type.

    • NEW: converts col to val's data type (optional)

      Penting

      If you omit NEW, do not include the preceding comma. Ensure data type compatibility to avoid task errors.

    • e_set(`dt_col`, dt_now()); sets dt_col to current time.

    • e_set(`col1`, `col2` + 1); sets col1 to col2 + 1.

    • e_set(`col1`, 1, NEW); converts col1 to numeric type and sets it to 1.

    MongoDB field retention, field dropping, and field name mapping

    e_expand_bson_value('*', 'fieldA',{"fieldB":"fieldC"})

    • *: field names to retain; * means all fields.

    • fieldA: field names to drop.

    • {"fieldB":"fieldC"}: field name mapping; fieldB is the source field name, fieldC is the destination field name.

      Catatan

      Field name mapping is optional.

    e_expand_bson_value("*", "_id,name"); writes all fields except _id and name to the destination.