全部产品
Search
文档中心

Data Transmission Service:Gunakan klien Kafka untuk mengonsumsi data yang dilacak

更新时间:Jul 02, 2025

Topik ini menjelaskan cara menggunakan demo klien Kafka untuk mengonsumsi data yang dilacak. Fitur pelacakan perubahan versi baru memungkinkan Anda mengonsumsi data yang dilacak dengan menggunakan klien Kafka V0.11 hingga V2.7.

Catatan penggunaan

  • Jika Anda mengaktifkan komit otomatis saat menggunakan fitur pelacakan perubahan, beberapa data mungkin dikomit sebelum dikonsumsi, yang dapat menyebabkan kehilangan data. Kami menyarankan Anda untuk secara manual mengkomit data.

    Catatan

    Jika data gagal dikomit, Anda dapat memulai ulang klien untuk melanjutkan konsumsi data dari titik pemeriksaan terakhir yang direkam. Namun, data duplikat mungkin dihasilkan selama periode ini. Anda harus secara manual menyaring data duplikat tersebut.

  • Data diserialisasi dan disimpan dalam format Avro. Untuk informasi lebih lanjut, lihat Record.avsc.

    Peringatan

    Jika Anda tidak menggunakan klien Kafka yang dijelaskan dalam topik ini, Anda harus mengurai data yang dilacak berdasarkan skema Avro dan memeriksa data yang telah diurai.

  • Satuan pencarian adalah detik ketika Data Transmission Service (DTS) memanggil operasi offsetForTimes. Satuan pencarian adalah milidetik ketika klien Kafka asli memanggil operasi ini.

  • Koneksi transien mungkin terjadi antara klien Kafka dan server pelacakan perubahan karena beberapa alasan, seperti pemulihan bencana. Jika Anda tidak menggunakan klien Kafka yang dijelaskan dalam topik ini, klien Kafka Anda harus memiliki kemampuan penyambungan ulang jaringan.

  • Jika Anda menggunakan klien Kafka asli untuk mengonsumsi data yang dilacak, modul pengumpulan data inkremental mungkin berubah di DTS. Dalam mode langganan, titik pemeriksaan konsumsi yang disimpan oleh klien Kafka ke server DTS dihapus. Anda perlu menentukan titik pemeriksaan konsumsi untuk mengonsumsi data yang dilacak sesuai dengan kebutuhan bisnis Anda. Jika Anda ingin mengonsumsi data dalam mode langganan, kami menyarankan Anda menggunakan demo SDK yang disediakan oleh DTS untuk melacak dan mengonsumsi data, atau secara manual mengelola titik pemeriksaan konsumsi. Untuk informasi lebih lanjut, lihat Gunakan Demo SDK untuk Mengonsumsi Data yang Dilacak dan bagian Kelola Titik Pemeriksaan Konsumsi dari topik ini.

Jalankan klien Kafka

Unduh demo klien Kafka. Untuk informasi lebih lanjut tentang cara menggunakan demo, lihat Readme.

Catatan
  • Klik code dan pilih Download ZIP untuk mengunduh paket.

  • Jika Anda menggunakan klien Kafka versi 2.0, Anda harus mengubah nomor versi dalam file subscribe_example-master/javaimpl/pom.xml menjadi 2.0.0.

kafka2.0

Tabel 1 Deskripsi Proses

Langkah

Direktori atau file terkait

1. Gunakan konsumen Kafka asli untuk mendapatkan data inkremental dari instance pelacakan perubahan.

subscribe_example-master/javaimpl/src/main/java/recordgenerator/

2. Deserialize gambar data inkremental, dan dapatkan pre-image , post-image , dan atribut lainnya.

Peringatan
  • Jika instance sumber adalah database Oracle yang dikelola sendiri, Anda harus mengaktifkan logging tambahan untuk semua kolom. Ini memastikan bahwa klien dapat berhasil mengonsumsi data yang dilacak dan memastikan integritas pre-image dan post-image.

  • Jika instance sumber bukan database Oracle yang dikelola sendiri, DTS tidak menjamin integritas pre-image. Kami menyarankan Anda memverifikasi pre-image yang diperoleh.

subscribe_example-master/javaimpl/src/main/java/boot/RecordPrinter.java

3. Konversikan nilai dataTypeNumber dalam data yang telah dideserialisasi menjadi tipe data dari database yang sesuai.

Catatan

Untuk informasi lebih lanjut, lihat Pemetaan antara tipe data dan nilai dataTypeNumber dalam topik ini.

subscribe_example-master/javaimpl/src/main/java/recordprocessor/mysql/

Prosedur

Langkah-langkah berikut menunjukkan cara menjalankan klien Kafka untuk mengonsumsi data yang dilacak. Dalam contoh ini, IntelliJ IDEA Community Edition 2018.1.4 untuk Windows digunakan.

  1. Buat instance pelacakan perubahan. Untuk informasi lebih lanjut, lihat Ikhtisar Skenario Pelacakan Perubahan.

  2. Buat satu atau lebih grup konsumen. Untuk informasi lebih lanjut, lihat Buat Grup Konsumen.

  3. Unduh paket demo klien Kafka dan dekompresi paket tersebut.

    Catatan

    Klik code dan pilih Download ZIP untuk mengunduh paket.

  4. Buka IntelliJ IDEA. Di jendela yang muncul, klik Open.

    打开项目

  5. Di kotak dialog yang muncul, pergi ke direktori tempat demo yang diunduh berada. Temukan file pom.xml.

    打开项目文件

  6. Di kotak dialog yang muncul, pilih Open as Project.

  7. Di jendela Alat Proyek IntelliJ IDEA, klik folder untuk menemukan file demo klien Kafka, dan klik dua kali file tersebut. Nama file adalah NotifyDemoDB.java.

  8. Tentukan parameter dalam file NotifyDemoDB.java.

    设置参数值

    Parameter

    Deskripsi

    Cara mendapatkan nilai parameter

    USER_NAME

    Nama pengguna akun grup konsumen.

    Peringatan

    Jika Anda tidak menggunakan klien Kafka yang dijelaskan dalam topik ini, Anda harus menentukan parameter ini dalam format berikut: <Username>-<ID Grup Konsumen>. Contoh: dtstest-dtsae******bpv. Jika tidak, koneksi akan gagal.

    Di konsol DTS, temukan instance pelacakan perubahan yang ingin Anda kelola dan klik ID instance. Di panel navigasi di sebelah kiri, klik Consume Data. Di halaman yang muncul, Anda dapat melihat informasi tentang grup konsumen, seperti ID atau nama dan akun grup konsumen.

    Catatan

    Kata sandi akun grup konsumen ditentukan saat Anda membuat grup konsumen.

    PASSWORD_NAME

    Kata sandi akun.

    SID_NAME

    ID grup konsumen.

    GROUP_NAME

    Nama grup konsumen. Tetapkan parameter ini ke ID grup konsumen.

    KAFKA_TOPIC

    Nama topik yang dilacak dari instance pelacakan perubahan.

    Di konsol DTS, temukan instance pelacakan perubahan yang ingin Anda kelola dan klik ID instance. Di halaman Basic Information, Anda dapat melihat informasi tentang topik dan jaringan.

    KAFKA_BROKER_URL_NAME

    Titik akhir instance pelacakan perubahan.

    Catatan
    • Jika Anda melacak perubahan data melalui jaringan internal, latensi jaringan minimal. Ini berlaku jika instance Elastic Compute Service (ECS) tempat Anda menerapkan klien Kafka berada di jaringan klasik atau di virtual private cloud (VPC) yang sama dengan instance pelacakan perubahan.

    • Untuk memastikan stabilitas jaringan, kami menyarankan Anda untuk tidak menggunakan titik akhir publik.

    INITIAL_CHECKPOINT_NAME

    Titik pemeriksaan konsumsi data yang dikonsumsi. Nilainya adalah timestamp UNIX. Contoh: 1592269238.

    Catatan
    • Anda harus menyimpan titik pemeriksaan konsumsi untuk alasan berikut:

      • Jika proses konsumsi terganggu, Anda dapat menentukan titik pemeriksaan konsumsi pada klien Kafka untuk melanjutkan konsumsi data. Ini mencegah kehilangan data.

      • Saat Anda memulai klien Kafka, Anda dapat menentukan titik pemeriksaan konsumsi untuk mengonsumsi data berdasarkan kebutuhan bisnis Anda.

    • Jika parameter SUBSCRIBE_MODE_NAME diatur ke subscribe, parameter INITIAL_CHECKPOINT_NAME yang Anda tentukan hanya berlaku saat Anda memulai klien Kafka untuk pertama kalinya.

    Titik pemeriksaan konsumsi data yang dikonsumsi harus berada dalam rentang data instance pelacakan perubahan. Titik pemeriksaan konsumsi harus dikonversi menjadi timestamp UNIX.

    Catatan
    • Anda dapat melihat rentang data instance pelacakan perubahan di kolom Data Range pada halaman Tugas Pelacakan Perubahan.

    • Anda dapat menggunakan mesin pencari untuk mendapatkan konverter timestamp UNIX.

    USE_CONFIG_CHECKPOINT_NAME

    Menentukan apakah klien dipaksa untuk mengonsumsi data dari titik pemeriksaan konsumsi yang ditentukan. Nilai default: true. Anda dapat mengatur parameter ini ke true untuk mencegah data yang diterima tetapi belum diproses hilang.

    Tidak ada

    SUBSCRIBE_MODE_NAME

    Menentukan apakah menjalankan dua atau lebih klien Kafka untuk grup konsumen. Jika Anda ingin menggunakan fitur ini, atur parameter ini ke subscribe untuk klien Kafka ini.

    Nilai default adalah assign, yang menunjukkan bahwa fitur ini tidak digunakan. Kami menyarankan Anda hanya menerapkan satu klien Kafka untuk grup konsumen.

    Tidak ada

  9. Di bilah menu atas IntelliJ IDEA, pilih Run > Run untuk menjalankan klien.

    Catatan

    Jika Anda menjalankan IntelliJ IDEA untuk pertama kali, waktu tertentu diperlukan untuk memuat dan menginstal dependensi yang relevan.

Hasil pada klien Kafka

Gambar berikut menunjukkan bahwa klien Kafka dapat melacak perubahan data dari database sumber.

Kafka客户端订阅结果

Anda dapat menghapus dua garis miring ke depan (//) dari string //log.info(ret) di baris 25 file NotifyDemoDB.java. Kemudian, jalankan klien lagi untuk melihat informasi perubahan data.

FAQ

  • T: Mengapa saya perlu mencatat titik pemeriksaan konsumsi klien Kafka?

    J: Titik pemeriksaan konsumsi yang dicatat oleh DTS adalah titik waktu ketika DTS menerima operasi commit dari klien Kafka. Titik pemeriksaan konsumsi yang dicatat mungkin berbeda dari waktu konsumsi aktual. Jika aplikasi bisnis atau klien Kafka terganggu secara tak terduga, Anda dapat menentukan titik pemeriksaan konsumsi yang akurat untuk melanjutkan konsumsi data. Ini mencegah kehilangan data atau konsumsi data duplikat.

Kelola titik pemeriksaan konsumsi

  1. Konfigurasikan klien Kafka untuk mendengarkan pergantian modul pengumpulan data di DTS.

    Anda dapat mengonfigurasi properti konsumen klien Kafka untuk mendengarkan pergantian modul pengumpulan data di DTS. Kode berikut memberikan contoh tentang cara mengonfigurasi properti konsumen:

    properties.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ClusterSwitchListener.class.getName());

    Kode berikut memberikan contoh tentang cara mengimplementasikan ClusterSwitchListener:

    public class ClusterSwitchListener implements ClusterResourceListener, ConsumerInterceptor {
        private final static Logger LOG = LoggerFactory.getLogger(ClusterSwitchListener.class);
        private ClusterResource originClusterResource = null;
        private ClusterResource currentClusterResource = null;
    
        public ConsumerRecords onConsume(ConsumerRecords records) {
            return records;
        }
    
    
        public void close() {
        }
    
        public void onCommit(Map offsets) {
        }
    
    
        public void onUpdate(ClusterResource clusterResource) {
            synchronized (this) {
                originClusterResource = currentClusterResource;
                currentClusterResource = clusterResource;
                if (null == originClusterResource) {
                    LOG.info("Cluster updated to " + currentClusterResource.clusterId());
                } else {
                    if (originClusterResource.clusterId().equals(currentClusterResource.clusterId())) {
                        LOG.info("Cluster not changed on update:" + clusterResource.clusterId());
                    } else {
                        LOG.error("Cluster changed");
                        throw new ClusterSwitchException("Cluster changed from " + originClusterResource.clusterId() + " to " + currentClusterResource.clusterId()
                                + ", consumer require restart");
                    }
                }
            }
        }
    
        public boolean isClusterResourceChanged() {
            if (null == originClusterResource) {
                return false;
            }
            if (originClusterResource.clusterId().equals(currentClusterResource.clusterId())) {
                return false;
            }
            return true;
        }
    
        public void configure(Map<String, ?> configs) {
        }
    
        public static class ClusterSwitchException extends KafkaException {
            public ClusterSwitchException(String message, Throwable cause) {
                super(message, cause);
            }
    
            public ClusterSwitchException(String message) {
                super(message);
            }
    
            public ClusterSwitchException(Throwable cause) {
                super(cause);
            }
    
            public ClusterSwitchException() {
                super();
            }
    
        }
  2. Tentukan titik pemeriksaan konsumsi berdasarkan pergantian modul pengumpulan data yang ditangkap di DTS.

    Atur titik pemeriksaan konsumsi awal pelacakan data berikutnya ke timestamp entri data terlacak terbaru yang dikonsumsi oleh klien. Kode berikut memberikan contoh tentang cara menentukan titik pemeriksaan konsumsi:

    try{
       //do some action
    } catch (ClusterSwitchListener.ClusterSwitchException e) {
       reset();
    }
    
    // Reset the consumption checkpoint.
    public reset() {
      long offset = kafkaConsumer.offsetsForTimes(timestamp);
      kafkaConsumer.seek(tp,offset);
    }
    Catatan

    Untuk informasi lebih lanjut tentang contoh-contoh, lihat KafkaRecordFetcher.

Pemetaan antara tipe data dan nilai dataTypeNumber

Pemetaan antara tipe data MySQL dan nilai dataTypeNumber

Tipe data MySQL

Nilai dataTypeNumber

MYSQL_TYPE_DECIMAL

0

MYSQL_TYPE_INT8

1

MYSQL_TYPE_INT16

2

MYSQL_TYPE_INT32

3

MYSQL_TYPE_FLOAT

4

MYSQL_TYPE_DOUBLE

5

MYSQL_TYPE_NULL

6

MYSQL_TYPE_TIMESTAMP

7

MYSQL_TYPE_INT64

8

MYSQL_TYPE_INT24

9

MYSQL_TYPE_DATE

10

MYSQL_TYPE_TIME

11

MYSQL_TYPE_DATETIME

12

MYSQL_TYPE_YEAR

13

MYSQL_TYPE_DATE_NEW

14

MYSQL_TYPE_VARCHAR

15

MYSQL_TYPE_BIT

16

MYSQL_TYPE_TIMESTAMP_NEW

17

MYSQL_TYPE_DATETIME_NEW

18

MYSQL_TYPE_TIME_NEW

19

MYSQL_TYPE_JSON

245

MYSQL_TYPE_DECIMAL_NEW

246

MYSQL_TYPE_ENUM

247

MYSQL_TYPE_SET

248

MYSQL_TYPE_TINY_BLOB

249

MYSQL_TYPE_MEDIUM_BLOB

250

MYSQL_TYPE_LONG_BLOB

251

MYSQL_TYPE_BLOB

252

MYSQL_TYPE_VAR_STRING

253

MYSQL_TYPE_STRING

254

MYSQL_TYPE_GEOMETRY

255

Pemetaan antara tipe data Oracle dan nilai dataTypeNumber

Tipe data Oracle

Nilai dataTypeNumber

VARCHAR2/NVARCHAR2

1

NUMBER/FLOAT

2

LONG

8

DATE

12

RAW

23

LONG_RAW

24

UNDEFINED

29

XMLTYPE

58

ROWID

69

CHAR and NCHAR

96

BINARY_FLOAT

100

BINARY_DOUBLE

101

CLOB/NCLOB

112

BLOB

113

BFILE

114

TIMESTAMP

180

TIMESTAMP_WITH_TIME_ZONE

181

INTERVAL_YEAR_TO_MONTH

182

INTERVAL_DAY_TO_SECOND

183

UROWID

208

TIMESTAMP_WITH_LOCAL_TIME_ZONE

231

Pemetaan antara tipe data PostgreSQL dan nilai dataTypeNumber

Tipe data PostgreSQL

Nilai dataTypeNumber

INT2/SMALLINT

21

INT4/INTEGER/SERIAL

23

INT8/BIGINT

20

CHARACTER

18

CHARACTER VARYING

1043

REAL

700

DOUBLE PRECISION

701

NUMERIC

1700

MONEY

790

DATE

1082

TIME/TIME WITHOUT TIME ZONE

1083

TIME WITH TIME ZONE

1266

TIMESTAMP/TIMESTAMP WITHOUT TIME ZONE

1114

TIMESTAMP WITH TIME ZONE

1184

BYTEA

17

TEXT

25

JSON

114

JSONB

3082

XML

142

UUID

2950

POINT

600

LSEG

601

PATH

602

BOX

603

POLYGON

604

LINE

628

CIDR

650

CIRCLE

718

MACADDR

829

INET

869

INTERVAL

1186

TXID_SNAPSHOT

2970

PG_LSN

3220

TSVECTOR

3614

TSQUERY

3615