全部产品
Search
文档中心

DataWorks:Lampiran: Format pesan

更新时间:Jul 08, 2025

Topik ini menjelaskan format pesan yang ditulis ke Kafka dan arti setiap bidang dalam pesan tersebut.

Informasi latar belakang

Sebuah node yang menyinkronkan semua data dari sumber data ke Kafka menulis data yang dibaca DataWorks dari sumber data ke topik Kafka dalam format JSON. Pesan yang ditulis ke Kafka mencakup informasi perubahan kolom serta status data sebelum dan sesudah perubahan. Untuk memastikan konsumen mengetahui kemajuan node sinkronisasi saat mengonsumsi pesan Kafka, node sinkronisasi secara berkala menghasilkan pesan denyut jantung dengan bidang op bernilai MHEARTBEAT dan menulisnya ke topik Kafka. Informasi lebih lanjut tentang format pesan yang ditulis ke Kafka dapat ditemukan di Format Pesan Kafka, Format Pesan Denyut Jantung yang Dihasilkan oleh Node Sinkronisasi, dan Format Pesan Kafka untuk Perubahan Data dalam Sumber. Informasi lebih lanjut tentang tipe dan arti setiap bidang dalam pesan dapat ditemukan di Tipe Bidang dan Bidang.

Format pesan Kafka

Format pesan yang ditulis ke Kafka:

{
    "schema": { // Informasi perubahan metadata. Hanya nama kolom dan tipe kolom yang ditentukan.
        "dataColumn": [// Informasi perubahan kolom. Data dalam topik tujuan diperbarui berdasarkan informasi ini.
            {
                "name": "id",
                "type": "LONG"
            },
            {
                "name": "name",
                "type": "STRING"
            },
            {
                "name": "binData",
                "type": "BYTES"
            },
            {
                "name": "ts",
                "type": "DATE"
            },
            {
              "name":"rowid",// Jika sumber data Oracle digunakan, rowid ditambahkan sebagai kolom.
              "type":"STRING"
            }
        ],
        "primaryKey": [
            "pkName1",
            "pkName2"
        ],
        "source": {
            "dbType": "mysql",
            "dbVersion": "1.0.0",
            "dbName": "myDatabase",
            "schemaName": "mySchema",
            "tableName": "tableName"
        }
    },
    "payload": {
        "before": {
            "dataColumn":{
                "id": 111,
                "name":"scooter",
                "binData": "[string base64]",
                "ts": 1590315269000,
                "rowid": "AAIUMPAAFAACxExAAE"// ID baris dalam sumber data Oracle. Nilainya bertipe STRING.
            }
        },
        "after": {
            "dataColumn":{
                "id": 222,
                "name":"donald",
                "binData": "[string base64]",
                "ts": 1590315269000,
                "rowid": "AAIUMPAAFAACxExAAE"// ID baris dalam sumber data Oracle. Nilainya bertipe STRING.
            }
        },
        "sequenceId":"XXX",// ID urutan unik setiap catatan data yang dihasilkan setelah data tambahan dan semua data digabungkan. Nilainya bertipe STRING.
        "scn":"xxxx",// Nomor perubahan sistem (SCN) dari sumber data Oracle. Nilainya bertipe STRING.
        "op": "INSERT/UPDATE_BEFOR/UPDATE_AFTER/DELETE/TRANSACTION_BEGIN/TRANSACTION_END/CREATE/ALTER/ERASE/QUERY/TRUNCATE/RENAME/CINDEX/DINDEX/GTID/XACOMMIT/XAROLLBACK/MHEARTBEAT...",// Operasi yang dilakukan. Nilai parameter ini peka huruf besar-kecil.
        "timestamp": {
            "eventTime": 1,// Diperlukan. Waktu ketika data dalam database sumber berubah. Nilainya adalah timestamp 13-bit dalam milidetik.
            "systemTime": 2,// Opsional. Waktu ketika node sinkronisasi membaca pesan perubahan. Nilainya adalah timestamp 13-bit dalam milidetik.
            "checkpointTime": 3// Opsional. Waktu tertentu ketika offset sinkronisasi diatur ulang. Nilainya adalah timestamp 13-bit dalam milidetik dan sama dengan nilai bidang eventTime dalam banyak kasus.
        },
        "ddl": {
            "text": "ADD COLUMN ...",
            "ddlMeta": "[Pernyataan SQL diserialisasi biner, dinyatakan dalam string base64]"
        }
    },
    "version":"1.0.0"
}
Catatan

Untuk detail tentang tipe bidang dan deskripsi, lihat Tipe Bidang dan Bidang.

Format pesan denyut jantung yang dihasilkan oleh node sinkronisasi

{
    "schema": {
        "dataColumn": null,
        "primaryKey": null,
        "source": null
    },
    "payload": {
        "before": null,
        "after": null,
        "sequenceId": null,
        "timestamp": {
            "eventTime": 1620457659000,
            "checkpointTime": 1620457659000
        },
        "op": "MHEARTBEAT",
        "ddl": null
    },
    "version": "0.0.1"
}
Catatan

Untuk detail tentang tipe bidang dan deskripsi, lihat Tipe Bidang dan Bidang.

Format pesan Kafka untuk perubahan data dalam sumber

  • Format pesan Kafka untuk penyisipan data ke dalam sumber:

    {
        "schema": {
            "dataColumn": [
                {
                    "name": "name",
                    "type": "STRING"
                },
                {
                    "name": "job",
                    "type": "STRING"
                },
                {
                    "name": "sex",
                    "type": "STRING"
                },
                {
                    "name": "#alibaba_rds_row_id#",
                    "type": "LONG"
                }
            ],
            "primaryKey": null,
            "source": {
                "dbType": "MySQL",
                "dbName": "pkset_test",
                "tableName": "pkset_test_no_pk"
            }
        },
        "payload": {
            "before": null,
            "after": {
                "dataColumn": {
                    "name": "name11",
                    "job": "job11",
                    "sex": "man",
                    "#alibaba_rds_row_id#": 15
                }
            },
            "sequenceId": "1620457642589000000",
            "timestamp": {
                "eventTime": 1620457896000,
                "systemTime": 1620457896977,
                "checkpointTime": 1620457896000
            },
            "op": "INSERT",
            "ddl": null
        },
        "version": "0.0.1"
    }
  • Format pesan Kafka untuk pembaruan data dalam sumber:

    • Jika When one record in the source is updated, one Kafka record is generated tidak dipilih, dua pesan Kafka dihasilkan untuk pembaruan data dalam sumber. Satu pesan menjelaskan status data sebelum pembaruan, dan pesan lainnya menjelaskan status data setelah pembaruan. Contoh berikut menunjukkan format:

      Format pesan Kafka yang menjelaskan status data sebelum perubahan:

      {
          "schema": {
              "dataColumn": [
                  {
                      "name": "name",
                      "type": "STRING"
                  },
                  {
                      "name": "job",
                      "type": "STRING"
                  },
                  {
                      "name": "sex",
                      "type": "STRING"
                  },
                  {
                      "name": "#alibaba_rds_row_id#",
                      "type": "LONG"
                  }
              ],
              "primaryKey": null,
              "source": {
                  "dbType": "MySQL",
                  "dbName": "pkset_test",
                  "tableName": "pkset_test_no_pk"
              }
          },
          "payload": {
              "before": {
                  "dataColumn": {
                      "name": "name11",
                      "job": "job11",
                      "sex": "man",
                      "#alibaba_rds_row_id#": 15
                  }
              },
              "after": null,
              "sequenceId": "1620457642589000001",
              "timestamp": {
                  "eventTime": 1620458077000,
                  "systemTime": 1620458077779,
                  "checkpointTime": 1620458077000
              },
              "op": "UPDATE_BEFOR",
              "ddl": null
          },
          "version": "0.0.1"
      }

      Format pesan Kafka yang menjelaskan status data setelah perubahan:

      {
          "schema": {
              "dataColumn": [
                  {
                      "name": "name",
                      "type": "STRING"
                  },
                  {
                      "name": "job",
                      "type": "STRING"
                  },
                  {
                      "name": "sex",
                      "type": "STRING"
                  },
                  {
                      "name": "#alibaba_rds_row_id#",
                      "type": "LONG"
                  }
              ],
              "primaryKey": null,
              "source": {
                  "dbType": "MySQL",
                  "dbName": "pkset_test",
                  "tableName": "pkset_test_no_pk"
              }
          },
          "payload": {
              "before": null,
              "after": {
                  "dataColumn": {
                      "name": "name11",
                      "job": "job11",
                      "sex": "woman",
                      "#alibaba_rds_row_id#": 15
                  }
              },
              "sequenceId": "1620457642589000001",
              "timestamp": {
                  "eventTime": 1620458077000,
                  "systemTime": 1620458077779,
                  "checkpointTime": 1620458077000
              },
              "op": "UPDATE_AFTER",
              "ddl": null
          },
          "version": "0.0.1"
      }
    • Jika When one record in the source is updated, one Kafka record is generated dipilih, hanya satu pesan Kafka yang dihasilkan untuk pembaruan data dalam sumber. Pesan tersebut menjelaskan status data sebelum dan sesudah pembaruan. Contoh berikut menunjukkan format:

      {
          "schema": {
              "dataColumn": [
                  {
                      "name": "name",
                      "type": "STRING"
                  },
                  {
                      "name": "job",
                      "type": "STRING"
                  },
                  {
                      "name": "sex",
                      "type": "STRING"
                  },
                  {
                      "name": "#alibaba_rds_row_id#",
                      "type": "LONG"
                  }
              ],
              "primaryKey": null,
              "source": {
                  "dbType": "MySQL",
                  "dbName": "pkset_test",
                  "tableName": "pkset_test_no_pk"
              }
          },
          "payload": {
              "before": {
                  "dataColumn": {
                      "name": "name11",
                      "job": "job11",
                      "sex": "man",
                      "#alibaba_rds_row_id#": 15
                  }
              },
              "after": {
                  "dataColumn": {
                      "name": "name11",
                      "job": "job11",
                      "sex": "woman",
                      "#alibaba_rds_row_id#": 15
                  }
              },
              "sequenceId": "1620457642589000001",
              "timestamp": {
                  "eventTime": 1620458077000,
                  "systemTime": 1620458077779,
                  "checkpointTime": 1620458077000
              },
              "op": "UPDATE_AFTER",
              "ddl": null
          },
          "version": "0.0.1"
      }
  • Format pesan Kafka untuk penghapusan data dari sumber:

    {
        "schema": {
            "dataColumn": [
                {
                    "name": "name",
                    "type": "STRING"
                },
                {
                    "name": "job",
                    "type": "STRING"
                },
                {
                    "name": "sex",
                    "type": "STRING"
                },
                {
                    "name": "#alibaba_rds_row_id#",
                    "type": "LONG"
                }
            ],
            "primaryKey": null,
            "source": {
                "dbType": "MySQL",
                "dbName": "pkset_test",
                "tableName": "pkset_test_no_pk"
            }
        },
        "payload": {
            "before": {
                "dataColumn": {
                    "name": "name11",
                    "job": "job11",
                    "sex": "woman",
                    "#alibaba_rds_row_id#": 15
                }
            },
            "after": null,
            "sequenceId": "1620457642589000002",
            "timestamp": {
                "eventTime": 1620458266000,
                "systemTime": 1620458266101,
                "checkpointTime": 1620458266000
            },
            "op": "DELETE",
            "ddl": null
        },
        "version": "0.0.1"
    }
    Catatan

    Untuk detail tentang tipe bidang dan deskripsi, lihat Tipe Bidang dan Bidang.

Format pesan untuk keluaran waktu nyata dari satu tabel

Saat mengonfigurasi Kafka sebagai tujuan untuk tugas sinkronisasi tabel tunggal waktu nyata, Anda harus mengonfirmasi format nilai yang ditulis ke Kafka. Format yang valid adalah Canal CDC dan JSON. Untuk detailnya, lihat Lampiran: Deskripsi Format Keluaran.

Tipe bidang

Data yang dibaca dari sumber dipetakan ke tipe BOOLEAN, DOUBLE, DATE, BYTES, LONG, dan STRING, lalu ditulis ke topik Kafka dalam format JSON yang berbeda.

Tipe bidang

Deskripsi

BOOLEAN

Berkorespondensi dengan tipe BOOLEAN dalam JSON. Nilai valid: true dan false.

DATE

Berkorespondensi dengan tipe NUMBER dalam JSON. Nilainya adalah timestamp 13-digit dalam milidetik.

BYTES

Berkorespondensi dengan tipe STRING dalam JSON. Sebelum data ditulis ke Kafka, larik byte dikodekan dalam Base64 dan diubah menjadi string. Konsumen perlu mendekode string yang dikodekan Base64 sebelum mengonsumsi string tersebut. Base64.getEncoder().encodeToString(text.getBytes("UTF-8")) digunakan untuk pengkodean dan Base64.getDecoder().decode(encodedText)) digunakan untuk dekode.

STRING

Berkorespondensi dengan tipe STRING dalam JSON.

LONG

Berkorespondensi dengan tipe NUMBER dalam JSON.

DOUBLE

Berkorespondensi dengan tipe NUMBER dalam JSON.

Bidang

Tabel berikut menjelaskan arti setiap bidang dalam pesan yang ditulis ke Kafka.

Bidang tingkat-1

Bidang tingkat-2

Deskripsi

schema

dataColumn

Nama dan tipe kolom. Nilainya bertipe JSONArray. dataColumn mencatat nama dan tipe kolom yang diperbarui dalam sumber. Operasi perubahan dapat berupa penambahan, penghapusan, atau modifikasi data, atau perubahan skema tabel dalam sumber.

  • name: nama kolom.

  • type: tipe kolom.

primaryKey

Informasi kunci utama. Nilainya bertipe List.

pk: nama kunci utama.

source

Informasi tentang database sumber atau tabel sumber. Nilainya bertipe Object.

  • dbType: tipe sumber. Nilainya bertipe STRING.

  • dbVersion: versi sumber. Nilainya bertipe STRING.

  • dbName: nama sumber. Nilainya bertipe STRING.

  • schemaName: nama skema. Bidang ini spesifik untuk sumber seperti PostgreSQL dan SQL Server. Nilainya bertipe STRING.

  • tableName: nama tabel sumber. Nilainya bertipe STRING.

payload

before

Data sebelum perubahan. Nilainya bertipe JSONObject. Misalnya, jika database MySQL adalah sumber dan data dalam sumber tersebut diperbarui, bidang before mencatat data sebelum pembaruan.

  • Setelah pesan pembaruan atau penghapusan data dibaca dari sumber, bidang before ditentukan dalam catatan tulis.

  • dataColumn: informasi kolom. Nilainya bertipe JSONObject. Nilai bidang dalam format Nama kolom:Tipe kolom. Nama kolom adalah string, dan tipe kolom bisa BOOLEAN, DOUBLE, DATE, BYTES, LONG, atau STRING.

after

Data setelah perubahan. Bidang after mencatat data setelah perubahan dalam format data yang sama dengan bidang before.

sequenceId

ID urutan unik setiap catatan data yang dihasilkan oleh StreamX setelah data tambahan dan semua data digabungkan. Nilainya bertipe STRING.

Catatan

Setelah pesan pembaruan data dibaca dari sumber, dua catatan tulis dihasilkan: update before dan update after. Kedua catatan tulis memiliki ID urutan yang sama.

scn

SCN dari sumber. Bidang ini valid ketika sumber adalah database Oracle.

op

Tipe operasi yang dilakukan pada data dalam sumber. Nilai valid:

  • INSERT: menyisipkan data.

  • UPDATE_BEFOR: memperbarui data (sebelum).

  • UPDATE_AFTER: memperbarui data (sesudah).

  • DELETE: menghapus data.

  • TRANSACTION_BEGIN: memulai transaksi database.

  • TRANSACTION_END: mengakhiri transaksi database.

  • CREATE: membuat tabel dalam sumber.

  • ALTER: memodifikasi tabel dalam sumber.

  • QUERY: meminta perubahan data dalam sumber dengan mengeksekusi pernyataan SQL yang menyebabkan perubahan data.

  • TRUNCATE: menghapus semua baris dari tabel dalam sumber.

  • RENAME: mengganti nama tabel dalam sumber.

  • CINDEX: membuat indeks.

  • DINDEX: menghapus indeks.

  • MHEARTBEAT: pesan detak jantung. Pesan ini menunjukkan bahwa node sinkronisasi berjalan sesuai harapan ketika tidak ada data baru yang dihasilkan dalam sumber.

timestamp

Timestamp dari catatan data. Nilainya bertipe JSONObject.

  • eventTime: waktu ketika data dalam sumber berubah. Nilainya adalah timestamp 13-digit dalam milidetik dan bertipe LONG.

  • systemTime: waktu ketika node sinkronisasi membaca pesan perubahan. Nilainya adalah timestamp 13-digit dalam milidetik dan bertipe LONG.

  • checkpointTime: waktu tertentu ketika offset sinkronisasi diatur ulang. Nilainya adalah timestamp 13-digit dalam milidetik dan bertipe LONG. Dalam banyak kasus, nilai bidang ini sama dengan nilai bidang eventTime.

ddl

Bidang ini ditentukan hanya jika skema tabel dalam sumber diubah. Nilainya NULL ketika operasi DDL, seperti penambahan, penghapusan, atau modifikasi data dilakukan dalam sumber.

  • text: teks pernyataan DDL dalam sumber. Nilainya bertipe STRING.

  • ddlMeta: string yang dikodekan Base64 yang diperoleh dari objek Java yang diserialisasi. Objek Java mencatat perubahan berbasis DDL. Nilainya bertipe STRING.

version

N/A

Versi data dalam format JSON.