All Products
Search
Document Center

Data Transmission Service:Mengonsumsi data yang telah berlangganan menggunakan client Kafka

Last Updated:Jun 25, 2026

Versi terbaru pelacakan perubahan mendukung client Kafka versi 0.11 hingga 2.7 untuk mengonsumsi data langganan. DTS menyediakan demo client Kafka. Topik ini menjelaskan cara menggunakan client tersebut.

Catatan penting

  • Jika Anda menggunakan demo yang disediakan dalam topik ini dan mengaktifkan auto commit, beberapa data mungkin hilang karena operasi commit dapat dijalankan sebelum semua data dikonsumsi. Gunakan manual commit untuk menghindari masalah ini.

    Catatan

    Jika kegagalan mencegah commit berhasil, client akan melanjutkan konsumsi data dari offset terakhir yang terekam setelah restart. Hal ini dapat mengakibatkan data duplikat. Filter duplikat secara manual.

  • Data disimpan dalam format serialisasi Avro. Untuk detail tentang format tersebut, lihat dokumen Record.avsc.

    Peringatan

    Jika Anda tidak menggunakan client Kafka yang disediakan dalam topik ini, deserialisasi (lihat contoh deserialisasi Avro DTS) mungkin menghasilkan data yang tidak akurat. Verifikasi kebenaran data secara mandiri.

  • Untuk API offsetForTimes, DTS menggunakan detik sebagai satuan pencarian, sedangkan Kafka asli menggunakan milidetik.

  • Pemutusan jaringan sementara dapat terjadi pada server pelacakan perubahan karena disaster recovery atau alasan lain. Jika Anda tidak menggunakan client Kafka yang disediakan dalam topik ini, pastikan client Kafka Anda mendukung retry jaringan.

  • Jika Anda menggunakan client Kafka asli untuk mengonsumsi data langganan, DTS mungkin mengganti modul ingestion data inkrementalnya. Hal ini akan menghapus offset konsumen yang tersimpan di server dalam mode subscribe. Sesuaikan offset konsumen secara manual sesuai kebutuhan untuk mengonsumsi data.

Alur kerja client Kafka

Unduh kode demo client Kafka. Untuk detail lebih lanjut tentang penggunaan kode tersebut, lihat file Readme dalam demo.

Catatan
  • Klik code, lalu pilih Download ZIP untuk mengunduh file.

  • Untuk menggunakan client Kafka versi 2.0, ubah file subscribe_example-master/javaimpl/pom.xml dan ganti versi client Kafka menjadi 2.0.0.

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.0.0</version>
</dependency>

Tabel 1. Deskripsi alur kerja

Langkah

Direktori atau file terkait

1. Gunakan consumer Kafka asli untuk mengambil data inkremental dari saluran pelacakan perubahan.

subscribe_example-master/javaimpl/src/main/java/recordgenerator/

2. Deserialisasi data inkremental yang diambil untuk mendapatkan before image (nilai bidang sebelum perubahan), after image (nilai bidang setelah perubahan), dan properti lainnya.

Peringatan
  • Jika instans sumber adalah database Oracle yang dikelola sendiri, aktifkan full supplemental logging untuk memastikan konsumsi data berhasil dan kelengkapan before image serta after image.

  • Jika instans sumber bukan database Oracle yang dikelola sendiri, DTS tidak dapat menjamin kelengkapan before image. Validasi before image yang Anda terima.

subscribe_example-master/javaimpl/src/main/java/boot/RecordPrinter.java

3. Konversi field dataTypeNumber dalam data yang telah dideserialisasi ke tipe bidang database yang sesuai.

subscribe_example-master/javaimpl/src/main/java/recordprocessor/mysql/

Prosedur

Topik ini menggunakan IntelliJ IDEA (Community Edition 2018.1.4 untuk Windows) sebagai contoh untuk menjelaskan cara menjalankan client guna mengonsumsi data dari saluran pelacakan perubahan.

  1. Buat saluran pelacakan perubahan baru. Untuk detailnya, lihat Buat saluran pelacakan perubahan RDS MySQL, Buat saluran pelacakan perubahan PolarDB MySQL, atau Buat saluran pelacakan perubahan Oracle.

  2. Buat satu atau beberapa kelompok konsumen. Untuk detailnya, lihat Tambahkan kelompok konsumen.

  3. Unduh kode demo client Kafka, lalu ekstrak file tersebut.

    Catatan

    Klik code, lalu pilih Download ZIP untuk mengunduh file.

  4. Buka IntelliJ IDEA, lalu klik Open.

  5. Pada kotak dialog yang muncul, buka direktori tempat Anda mengunduh kode demo client Kafka. Navigasi melalui folder untuk menemukan file model objek proyek: pom.xml.

    Navigasi ke kafkademo > subscribe_example-master > javaimpl, pilih pom.xml, lalu klik OK.

  6. Pada kotak dialog yang muncul, pilih Open as Project.

  7. Di antarmuka IntelliJ IDEA, navigasi melalui folder untuk menemukan dan klik ganda file demo client Kafka: NotifyDemoDB.java.

  8. Atur nilai parameter dalam file NotifyDemoDB.java.

    public static Properties getConfigs() {
        Properties properties = new Properties();
        // user password and sid for auth
        properties.setProperty(USER_NAME, "dtstest");
        properties.setProperty(PASSWORD_NAME, "xxx");
        properties.setProperty(SID_NAME, "dtsxxx");
        // kafka consumer group general same with sid
        properties.setProperty(GROUP_NAME, "dtsxxx");
        // topic to consume, partition is 0
        properties.setProperty(KAFKA_TOPIC, "cn_hangzhou_xxx");
        // kafka broker url
        properties.setProperty(KAFKA_BROKER_URL_NAME, "dts-cn-xxx.com:18001");
        // initial checkpoint for first seek(a timestamp to set, eg 1566180200 if you want (Mon Aug 19 10:03:21 CST 2019))
        properties.setProperty(INITIAL_CHECKPOINT_NAME, "1583307907");
        // if force use config checkpoint when start. for checkpoint reset
        properties.setProperty(USE_CONFIG_CHECKPOINT_NAME, "true");
        // use consumer assign or subscribe interface
        // when use subscribe mode, group config is required. kafka consumer group is enabled
        properties.setProperty(SUBSCRIBE_MODE_NAME, "assign");
        return properties;
    }

    Parameter

    Deskripsi

    Cara memperoleh

    USER_NAME

    Akun untuk kelompok konsumen.

    Peringatan

    Jika Anda tidak menggunakan client yang disediakan dalam topik ini, atur username dalam format <akun kelompok konsumen>-<ID kelompok konsumen> (misalnya, dtstest-dtsae******bpv). Jika tidak, koneksi akan gagal.

    Di Konsol DTS, klik ID instansi subscription target, lalu klik Consume Data. Anda dapat memperoleh informasi Consumer Group ID dan Account.

    Catatan

    Anda menentukan password untuk akun kelompok konsumen saat membuat kelompok konsumen.

    PASSWORD_NAME

    Password untuk akun tersebut.

    SID_NAME

    ID kelompok konsumen.

    GROUP_NAME

    Nama kelompok konsumen. Atur parameter ini dengan nilai yang sama seperti ID kelompok konsumen.

    KAFKA_TOPIC

    Topik subscription dari saluran pelacakan perubahan.

    Di Konsol DTS, klik ID instansi subscription target. Pada halaman Task Management, Anda dapat memperoleh informasi Topic dan alamat jaringan. Dapatkan nilai Subscription Topic dari bagian Basic Information dan VPC endpoint (format contoh: xxx.aliyuncs.com:18003) dari bagian Network.

    KAFKA_BROKER_URL_NAME

    Informasi alamat jaringan untuk saluran subscription data.

    Catatan
    • Jika instance ECS tempat Anda men-deploy client Kafka dan saluran pelacakan perubahan berada di jaringan klasik atau VPC yang sama, gunakan titik akhir internal untuk meminimalkan latensi jaringan.

    • Kami tidak menyarankan menggunakan titik akhir publik karena potensi ketidakstabilan jaringan.

    INITIAL_CHECKPOINT_NAME

    Waktu data untuk memulai konsumsi, dalam format stempel waktu UNIX (misalnya, 1592269238).

    Catatan
    • Simpan stempel waktu tersebut sendiri untuk:

      • Melanjutkan konsumsi data dari stempel waktu terakhir yang dikonsumsi setelah program terganggu guna mencegah kehilangan data.

      • Menentukan offset konsumen yang diinginkan saat memulai client subscription agar dapat mengonsumsi data sesuai kebutuhan.

    • Jika SUBSCRIBE_MODE_NAME diatur ke subscribe, nilai INITIAL_CHECKPOINT_NAME hanya berlaku selama startup pertama client subscription.

    Stempel waktu data harus berada dalam rentang waktu instansi subscription dan harus dikonversi ke stempel waktu UNIX. Di daftar tugas subscription DTS, periksa bidang Timestamp Range untuk tugas subscription guna menentukan rentang valid untuk INITIAL_CHECKPOINT_NAME.

    Catatan

    Gunakan mesin pencari untuk menemukan konverter stempel waktu UNIX.

    USE_CONFIG_CHECKPOINT_NAME

    Nilai default adalah true. Ini memaksa konsumsi dari stempel waktu data yang ditentukan untuk menghindari kehilangan data yang telah diterima tetapi belum diproses.

    Tidak ada

    SUBSCRIBE_MODE_NAME

    Untuk menjalankan dua atau lebih client Kafka dalam satu kelompok konsumen, atur parameter ini ke subscribe untuk semua client.

    Nilai default adalah assign, yang menonaktifkan fitur ini. Deploy hanya satu client dalam mode ini.

    Tidak ada

  9. Di menu atas antarmuka IntelliJ IDEA, pilih Run > Run untuk menjalankan client.

    Catatan

    Saat menjalankan pertama kali, perangkat lunak memerlukan waktu untuk memuat dan menginstal dependensi secara otomatis.

Hasil eksekusi

Saat Anda menjalankan client, client tersebut berhasil berlangganan perubahan data dari database sumber.

[2020-03-09 10:41:52,408] INFO [Consumer clientId=consumer-1, groupId=dts_xxx] Discovered coordinator xxx (id: xxx rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-03-09 10:41:57,203] INFO commit record with checkpoint Checkpoint[ topicPartition: cn_hangzhou_rm_xxx_dtstest-0timestamp: 1583721711, offset: 1732521, info: 1583721711] (recordprocessor.EtlRecordProcessor)
[2020-03-09 10:41:57,571] INFO EtlRecordProcessor: haven't receive records from generator for  5s (recordprocessor.EtlRecordProcessor)
[2020-03-09 10:42:02,203] INFO commit record with checkpoint Checkpoint[ topicPartition: cn_hangzhou_rm_xxx_dtstest-0timestamp: 1583721721, offset: 1732539, info: 1583721721] (recordprocessor.EtlRecordProcessor)
[2020-03-09 10:42:07,204] INFO commit record with checkpoint Checkpoint[ topicPartition: cn_hangzhou_rm_xxx_dtstest-0timestamp: 1583721726, offset: 1732544, info: 1583721726] (recordprocessor.EtlRecordProcessor)
[2020-03-09 10:42:12,205] INFO commit record with checkpoint Checkpoint[ topicPartition: cn_hangzhou_rm_xxx_dtstest-0timestamp: 1583721731, offset: 1732548, info: 1583721731] (recordprocessor.EtlRecordProcessor)
[2020-03-09 10:42:17,205] INFO commit record with checkpoint Checkpoint[ topicPartition: cn_hangzhou_rm_xxx_dtstest-0timestamp: 1583721736, offset: 1732554, info: 1583721736] (recordprocessor.EtlRecordProcessor)
[2020-03-09 10:42:22,205] INFO commit record with checkpoint Checkpoint[ topicPartition: cn_hangzhou_rm_xxx_dtstest-0timestamp: 1583721741, offset: 1732559, info: 1583721741] (recordprocessor.EtlRecordProcessor)
[2020-03-09 10:42:27,206] INFO commit record with checkpoint Checkpoint[ topicPartition: cn_hangzhou_rm_xxx_dtstest-0timestamp: 1583721746, offset: 1732569, info: 1583721746] (recordprocessor.EtlRecordProcessor)

Anda juga dapat mengaktifkan kembali baris pencetakan log dalam file NotifyDemoDB.java (hapus // dari baris 25: //log.info(ret);), lalu jalankan kembali client untuk melihat informasi perubahan data secara rinci.

FAQ

  • Q: Mengapa saya perlu mencatat offset konsumen sendiri?

    A: DTS mencatat offset konsumen berdasarkan waktu penerimaan operasi commit dari client consumer Kafka, yang mungkin berbeda dari waktu konsumsi aktual. Setelah aplikasi atau client consumer Kafka Anda mengalami gangguan abnormal, Anda dapat melanjutkan konsumsi dari offset yang telah Anda catat untuk menghindari data duplikat atau hilang.

Pemetaan antara tipe bidang MySQL dan nilai dataTypeNumber

Tipe bidang MySQL

Nilai dataTypeNumber yang sesuai

MYSQL_TYPE_DECIMAL

0

MYSQL_TYPE_INT8

1

MYSQL_TYPE_INT16

2

MYSQL_TYPE_INT32

3

MYSQL_TYPE_FLOAT

4

MYSQL_TYPE_DOUBLE

5

MYSQL_TYPE_NULL

6

MYSQL_TYPE_TIMESTAMP

7

MYSQL_TYPE_INT64

8

MYSQL_TYPE_INT24

9

MYSQL_TYPE_DATE

10

MYSQL_TYPE_TIME

11

MYSQL_TYPE_DATETIME

12

MYSQL_TYPE_YEAR

13

MYSQL_TYPE_DATE_NEW

14

MYSQL_TYPE_VARCHAR

15

MYSQL_TYPE_BIT

16

MYSQL_TYPE_TIMESTAMP_NEW

17

MYSQL_TYPE_DATETIME_NEW

18

MYSQL_TYPE_TIME_NEW

19

MYSQL_TYPE_JSON

245

MYSQL_TYPE_DECIMAL_NEW

246

MYSQL_TYPE_ENUM

247

MYSQL_TYPE_SET

248

MYSQL_TYPE_TINY_BLOB

249

MYSQL_TYPE_MEDIUM_BLOB

250

MYSQL_TYPE_LONG_BLOB

251

MYSQL_TYPE_BLOB

252

MYSQL_TYPE_VAR_STRING

253

MYSQL_TYPE_STRING

254

MYSQL_TYPE_GEOMETRY

255

Pemetaan antara tipe bidang Oracle dan nilai dataTypeNumber

Tipe bidang Oracle

Nilai dataTypeNumber yang sesuai

VARCHAR2/NVARCHAR2

1

NUMBER/FLOAT

2

LONG

8

DATE

12

RAW

23

LONG_RAW

24

UNDEFINED

29

XMLTYPE

58

ROWID

69

CHAR、NCHAR

96

BINARY_FLOAT

100

BINARY_DOUBLE

101

CLOB/NCLOB

112

BLOB

113

BFILE

114

TIMESTAMP

180

TIMESTAMP_WITH_TIME_ZONE

181

INTERVAL_YEAR_TO_MONTH

182

INTERVAL_DAY_TO_SECOND

183

UROWID

208

TIMESTAMP_WITH_LOCAL_TIME_ZONE

231

Pemetaan antara tipe bidang PostgreSQL dan nilai dataTypeNumber

Tipe bidang PostgreSQL

Nilai dataTypeNumber yang sesuai

INT2/SMALLINT

21

INT4/INTEGER/SERIAL

23

INT8/BIGINT

20

CHARACTER

18

CHARACTER VARYING

1043

REAL

700

DOUBLE PRECISION

701

NUMERIC

1700

MONEY

790

DATE

1082

TIME/TIME WITHOUT TIME ZONE

1083

TIME WITH TIME ZONE

1266

TIMESTAMP/TIMESTAMP WITHOUT TIME ZONE

1114

TIMESTAMP WITH TIME ZONE

1184

BYTEA

17

TEXT

25

JSON

114

JSONB

3082

XML

142

UUID

2950

POINT

600

LSEG

601

PATH

602

BOX

603

POLYGON

604

LINE

628

CIDR

650

CIRCLE

718

MACADDR

829

INET

869

INTERVAL

1186

TXID_SNAPSHOT

2970

PG_LSN

3220

TSVECTOR

3614

TSQUERY

3615