全部产品
Search
文档中心

Data Transmission Service:Menggunakan klien Kafka untuk mengonsumsi data yang dilacak

更新时间:Jul 02, 2025

Topik ini menjelaskan cara menggunakan demo klien Kafka untuk mengonsumsi data yang dilacak. Fitur pelacakan perubahan versi baru memungkinkan Anda mengonsumsi data yang dilacak menggunakan klien Kafka versi V0.11 hingga V2.7.

Catatan penggunaan

  • Jika Anda mengaktifkan komit otomatis saat menggunakan fitur pelacakan perubahan, beberapa data mungkin dikomit sebelum dikonsumsi, sehingga menyebabkan kehilangan data. Kami menyarankan Anda untuk secara manual mengkomit data.

    Catatan

    Jika data gagal dikomit, Anda dapat memulai ulang klien untuk melanjutkan konsumsi data dari titik pemeriksaan terakhir yang tercatat. Namun, data duplikat mungkin dihasilkan selama periode ini. Anda harus secara manual menyaring data duplikat tersebut.

  • Data diserialisasi dan disimpan dalam format Avro. Untuk informasi lebih lanjut, lihat Record.avsc.

    Peringatan

    Jika Anda tidak menggunakan klien Kafka yang dijelaskan dalam topik ini, Anda harus mengurai data yang dilacak berdasarkan skema Avro.

  • Satuan pencarian adalah detik ketika Data Transmission Service (DTS) memanggil operasi offsetForTimes. Satuan pencarian adalah milidetik ketika klien Kafka asli memanggil operasi ini.

  • Koneksi transien mungkin terjadi antara klien Kafka dan server pelacakan perubahan karena beberapa alasan, seperti pemulihan bencana. Jika Anda tidak menggunakan klien Kafka yang dijelaskan dalam topik ini, klien Kafka Anda harus memiliki kemampuan penyambungan ulang jaringan.

Jalankan klien Kafka

Unduh demo klien Kafka. Untuk informasi lebih lanjut tentang cara menggunakan demo, lihat Readme.

Catatan
  • Klik ikon code dan pilih Download ZIP untuk mengunduh paket.

  • Jika Anda menggunakan klien Kafka versi 2.0, Anda harus mengubah nomor versi dalam file subscribe_example-master/javaimpl/pom.xml menjadi 2.0.0.

kafka2.0

Tabel 1. Langkah-langkah untuk Menjalankan Klien Kafka

Langkah

File atau direktori

1. Gunakan konsumen Kafka asli untuk mendapatkan data inkremental dari instance pelacakan perubahan.

subscribe_example-master/javaimpl/src/main/java/recordgenerator/

2. Deserialize gambar data inkremental, dan dapatkan pre-image (Nilai setiap bidang sebelum entri data diperbarui), post-image (Nilai setiap bidang setelah entri data diperbarui), dan atribut lainnya.

Peringatan
  • Jika instance sumber adalah database Oracle yang dikelola sendiri, Anda harus mengaktifkan logging tambahan untuk semua kolom. Ini memastikan bahwa klien dapat berhasil mengonsumsi data yang dilacak dan memastikan integritas pre-image dan post-image.

  • Jika instance sumber bukan database Oracle yang dikelola sendiri, DTS tidak menjamin integritas pre-image. Kami menyarankan Anda memverifikasi pre-image yang diperoleh.

subscribe_example-master/javaimpl/src/main/java/boot/RecordPrinter.java

3. Konversikan nilai dataTypeNumber dalam data yang telah dideserialize menjadi tipe data dari database yang sesuai.

subscribe_example-master/javaimpl/src/main/java/recordprocessor/mysql/

Prosedur

Langkah-langkah berikut menunjukkan cara menjalankan klien Kafka untuk mengonsumsi data yang dilacak. Dalam contoh ini, IntelliJ IDEA Community Edition 2018.1.4 untuk Windows digunakan.

  1. Buat tugas pelacakan perubahan. Untuk informasi lebih lanjut, lihat Lacak Perubahan Data dari Instance ApsaraDB RDS for MySQL, Lacak Perubahan Data dari Kluster PolarDB for MySQL, atau Lacak Perubahan Data dari Database Oracle yang Dikelola Sendiri.

  2. Buat satu atau lebih grup konsumen. Untuk informasi lebih lanjut, lihat Buat Grup Konsumen.

  3. Unduh demo klien Kafka dan ekstrak paket tersebut.

    Catatan

    Klik ikon code dan pilih Download ZIP untuk mengunduh paket.

  4. Buka IntelliJ IDEA. Di jendela yang muncul, klik Open.

    Open a project

  5. Di kotak dialog yang muncul, navigasikan ke direktori tempat demo yang diunduh berada. Temukan file pom.xml.

    Open the pom.xml file

  6. Di kotak dialog yang muncul, pilih Open as Project.

  7. Di jendela Alat Proyek IntelliJ IDEA, klik folder untuk menemukan file demo klien Kafka, lalu klik dua kali file tersebut. Nama file adalah NotifyDemoDB.java.

  8. Konfigurasikan parameter dalam file NotifyDemoDB.java.

    Configure the parameters

    Parameter

    Deskripsi

    Cara mendapatkan nilai parameter

    USER_NAME

    Akun grup konsumen.

    Peringatan

    Jika Anda tidak menggunakan klien Kafka yang dijelaskan dalam topik ini, Anda harus menentukan akun dalam format berikut: <Username>-<Consumer group ID>. Contoh: dtstest-dtsae******bpv. Jika tidak, koneksi akan gagal.

    Di konsol DTS, temukan instance pelacakan perubahan yang ingin Anda kelola dan klik ID instance. Di panel navigasi di sebelah kiri, klik Consume Data. Di halaman yang muncul, Anda dapat memperoleh ID grup konsumen dan nama pengguna yang sesuai.

    Catatan

    Kata sandi akun grup konsumen ditentukan secara otomatis saat Anda membuat grup konsumen.

    View the consumer group ID and username

    PASSWORD_NAME

    Kata sandi akun.

    SID_NAME

    ID grup konsumen.

    GROUP_NAME

    Nama grup konsumen. Atur parameter ini ke ID grup konsumen.

    KAFKA_TOPIC

    Topik instance pelacakan perubahan.

    Di konsol DTS, temukan instance pelacakan perubahan yang ingin Anda kelola dan klik ID instance. Pada halaman Task Management, Anda dapat memperoleh topik dan informasi jaringan. Obtain the topic and network information

    KAFKA_BROKER_URL_NAME

    Titik akhir instance pelacakan perubahan.

    Catatan

    Jika Anda melacak perubahan data melalui jaringan internal, latensi jaringan minimal. Ini berlaku jika instance Elastic Compute Service (ECS) tempat Anda menerapkan klien Kafka berada pada jaringan klasik atau di virtual private cloud (VPC) yang sama dengan instance pelacakan perubahan.

    INITIAL_CHECKPOINT_NAME

    Titik pemeriksaan konsumsi data yang dikonsumsi. Nilainya adalah timestamp UNIX. Contoh: 1592269238.

    Catatan
    • Anda harus menyimpan titik pemeriksaan konsumsi untuk alasan berikut:

      • Jika proses konsumsi terganggu, Anda dapat menentukan titik pemeriksaan konsumsi pada klien Kafka untuk melanjutkan konsumsi data. Ini mencegah kehilangan data.

      • Saat Anda memulai klien Kafka, Anda dapat menentukan titik pemeriksaan konsumsi untuk mengonsumsi data sesuai permintaan.

    • Jika parameter SUBSCRIBE_MODE_NAME diatur ke subscribe, parameter INITIAL_CHECKPOINT_NAME yang Anda tentukan hanya berlaku saat Anda memulai klien Kafka untuk pertama kalinya.

    Titik pemeriksaan konsumsi data yang dikonsumsi harus berada dalam rentang data instance pelacakan perubahan, seperti yang ditunjukkan pada gambar berikut. Titik pemeriksaan konsumsi harus dikonversi menjadi timestamp UNIX. Data range

    Catatan

    Anda dapat menggunakan mesin pencari untuk mendapatkan konverter timestamp UNIX.

    USE_CONFIG_CHECKPOINT_NAME

    Menentukan apakah akan memaksa klien mengonsumsi data dari titik pemeriksaan konsumsi yang ditentukan. Nilai default: true. Anda dapat mengatur parameter ini ke true untuk mencegah data yang diterima tetapi belum diproses hilang.

    Tidak tersedia

    SUBSCRIBE_MODE_NAME

    Menentukan apakah akan menjalankan dua atau lebih klien Kafka untuk grup konsumen. Jika Anda ingin menggunakan fitur ini, atur parameter ini ke subscribe untuk klien Kafka ini.

    Nilai default adalah assign, yang menunjukkan bahwa fitur ini tidak digunakan. Kami menyarankan Anda hanya menerapkan satu klien Kafka untuk grup konsumen.

    Tidak tersedia

  9. Di bilah menu atas IntelliJ IDEA, pilih Run > Run untuk menjalankan klien.

    Catatan

    Jika Anda menjalankan IntelliJ IDEA untuk pertama kali, waktu tertentu diperlukan untuk memuat dan menginstal dependensi yang relevan.

Hasil pada klien Kafka

Gambar berikut menunjukkan bahwa klien Kafka dapat melacak perubahan data dari database sumber.

Results on the Kafka client

Anda dapat menghapus dua garis miring ke depan (//) dari string //log.info(ret); di baris 25 file NotifyDemoDB.java. Kemudian, jalankan klien lagi untuk melihat informasi perubahan data.

FAQ

  • T: Mengapa saya perlu mencatat titik pemeriksaan konsumsi klien Kafka?

    J: Titik pemeriksaan konsumsi yang dicatat oleh DTS adalah titik waktu ketika DTS menerima operasi commit dari klien Kafka. Titik pemeriksaan konsumsi yang dicatat mungkin berbeda dari waktu konsumsi aktual. Jika aplikasi bisnis atau klien Kafka terganggu secara tak terduga, Anda dapat menentukan titik pemeriksaan konsumsi yang akurat untuk melanjutkan konsumsi data. Ini mencegah kehilangan data atau konsumsi data duplikat.

Pemetaan antara tipe data MySQL dan nilai dataTypeNumber

Untuk informasi lebih lanjut, lihat SQL Type field.

Pemetaan antara tipe data Oracle dan nilai dataTypeNumber

Tipe data Oracle

Nilai dataTypeNumber

VARCHAR2 dan NVARCHAR2

1

NUMBER dan FLOAT

2

LONG

8

DATE

12

RAW

23

LONG_RAW

24

UNDEFINED

29

XMLTYPE

58

ROWID

69

CHAR dan NCHAR

96

BINARY_FLOAT

100

BINARY_DOUBLE

101

CLOB dan NCLOB

112

BLOB

113

BFILE

114

TIMESTAMP

180

TIMESTAMP_WITH_TIME_ZONE

181

INTERVAL_YEAR_TO_MONTH

182

INTERVAL_DAY_TO_SECOND

183

UROWID

208

TIMESTAMP_WITH_LOCAL_TIME_ZONE

231

Pemetaan antara tipe data PostgreSQL dan nilai dataTypeNumber

Tipe data PostgreSQL

Nilai dataTypeNumber

INT2 dan SMALLINT

21

INT4, INTEGER, dan SERIAL

23

INT8 dan BIGINT

20

CHARACTER

18

CHARACTER VARYING

1043

REAL

700

DOUBLE PRECISION

701

NUMERIC

1700

MONEY

790

DATE

1082

TIME dan TIME WITHOUT TIME ZONE

1083

TIME WITH TIME ZONE

1266

TIMESTAMP dan TIMESTAMP WITHOUT TIME ZONE

1114

TIMESTAMP WITH TIME ZONE

1184

BYTEA

17

TEXT

25

JSON

114

JSONB

3082

XML

142

UUID

2950

POINT

600

LSEG

601

PATH

602

BOX

603

POLYGON

604

LINE

628

CIDR

650

CIRCLE

718

MACADDR

829

INET

869

INTERVAL

1186

TXID_SNAPSHOT

2970

PG_LSN

3220

TSVECTOR

3614

TSQUERY

3615