All Products
Search
Document Center

Data Transmission Service:Konsumsi data langganan dengan Kafka

Last Updated:May 08, 2026

Fitur change tracking dari Data Transmission Service (DTS) memungkinkan Anda mengonsumsi data menggunakan client Kafka (versi 0.11 hingga 2.7). Topik ini menjelaskan cara memulai dengan demo client Kafka yang disediakan.

Catatan

  • Saat menggunakan demo dalam topik ini, auto commit dapat menyebabkan kehilangan data karena offset mungkin dikomit sebelum pemrosesan data selesai. Untuk mencegah hal ini, kami merekomendasikan manual commit.

    Catatan

    Jika commit gagal, client akan memulai ulang konsumsi dari titik pemeriksaan konsumsi terakhir yang terekam. Hal ini dapat menyebabkan duplikasi data, yang harus Anda tangani dalam logika aplikasi Anda.

  • Data disimpan menggunakan serialisasi Avro. Untuk detail format, lihat file Record.avsc.

    Peringatan

    Jika Anda menggunakan client Kafka selain yang disediakan dalam topik ini, data mungkin diurai secara salah selama deserialisasi (Contoh deserialisasi Avro DTS). Anda harus memverifikasi data tersebut.

  • Di DTS, API offsetForTimes menggunakan satuan detik, sedangkan API Kafka asli menggunakan milidetik.

  • Server Data Subscription mungkin mengalami gangguan jaringan sementara akibat peristiwa seperti disaster recovery. Jika Anda tidak menggunakan client Kafka yang disediakan dalam topik ini, pastikan client Anda mendukung retry jaringan.

  • Jika Anda menggunakan client Kafka asli dalam mode subscribe, titik pemeriksaan konsumsi yang disimpan di server mungkin dihapus saat DTS mengganti modul pengumpulan data inkrementalnya. Dalam kasus ini, Anda harus menyesuaikan titik pemeriksaan konsumsi secara manual. Jika Anda perlu menggunakan mode subscribe, kami merekomendasikan menggunakan SDK yang disediakan DTS untuk mengonsumsi data Data Subscription dan mengelola titik pemeriksaan konsumsi sendiri. Untuk informasi lebih lanjut, lihat Konsumsi data Data Subscription menggunakan SDK dan Mengelola titik pemeriksaan konsumsi.

Waktu proses client Kafka

Unduh demo client Kafka. Untuk instruksi penggunaan, lihat file Readme pada demo tersebut.

Catatan
  • Klik dan pilih Download ZIP.

  • Untuk menggunakan client Kafka 2.0, ubah versi client menjadi 2.0.0 dalam file subscribe_example-master/javaimpl/pom.xml.

Tabel 1. Cara kerja

Langkah

Direktori atau file

1. Gunakan consumer Kafka asli untuk mengambil data perubahan dari saluran langganan.

subscribe_example-master/javaimpl/src/main/java/recordgenerator/

2. Deserialisasi data perubahan untuk mengekstrak before image , after image , dan atribut lainnya.

Peringatan
  • Jika instans sumber adalah database Oracle yang dikelola sendiri, Anda harus mengaktifkan full supplemental logging untuk memastikan client dapat mengonsumsi data langganan dan before image serta after image lengkap.

  • Jika instans sumber bukan database Oracle yang dikelola sendiri, DTS tidak dapat menjamin integritas before image. Kami merekomendasikan untuk memverifikasi before image tersebut.

subscribe_example-master/javaimpl/src/main/java/boot/RecordPrinter.java

3. Konversi field dataTypeNumber ke tipe field database yang sesuai.

Catatan

Untuk detail pemetaan, lihat Pemetaan antara tipe field dan nilai dataTypeNumber.

subscribe_example-master/javaimpl/src/main/java/recordprocessor/mysql/

Prosedur

Topik ini menjelaskan cara menjalankan demo client Kafka di IntelliJ IDEA untuk mengonsumsi data dari saluran langganan data.

  1. Buat saluran langganan data. Untuk informasi lebih lanjut, lihat Ikhtisar solusi langganan.

  2. Buat satu atau beberapa kelompok konsumen. Untuk informasi lebih lanjut, lihat Buat kelompok konsumen.

  3. Unduh demo client Kafka dan ekstrak file tersebut.

    Catatan

    Klik , lalu pilih Download ZIP untuk mengunduh file.

  4. Buka IntelliJ IDEA dan klik Open.

  5. Pada kotak dialog, arahkan ke direktori tempat Anda mengekstrak demo client Kafka. Buka folder seperti yang ditunjukkan pada gambar berikut untuk menemukan file Project Object Model (POM): pom.xml.

  6. Pada kotak dialog yang muncul, klik Open as Project.

  7. Di jendela IntelliJ IDEA, buka folder, lalu temukan dan klik ganda file demo client Kafka: NotifyDemoDB.java.

  8. Atur nilai parameter dalam file NotifyDemoDB.java.

    Parameter

    Deskripsi

    Cara memperoleh

    USER_NAME

    Username untuk kelompok konsumen.

    Peringatan

    Jika Anda menggunakan client berbeda, Anda harus mengatur username dalam format <akun kelompok konsumen>-<ID kelompok konsumen>. Contoh: dtstest-dtsae******bpv.

    Di Konsol DTS, klik ID instans langganan target. Di panel navigasi kiri, klik Consume Data. Pada halaman yang muncul, Anda dapat menemukan informasi Consumer Group ID/Name dan Account untuk kelompok konsumen.

    Catatan

    Password untuk akun kelompok konsumen ditentukan saat Anda membuat kelompok konsumen.

    PASSWORD_NAME

    Password untuk akun kelompok konsumen.

    SID_NAME

    ID kelompok konsumen.

    GROUP_NAME

    Nilai ini harus sesuai dengan ID kelompok konsumen.

    KAFKA_TOPIC

    Topik langganan untuk saluran langganan data.

    Di Konsol DTS, klik ID instans langganan target. Pada halaman Basic Information, Anda dapat menemukan informasi Topic dan Network.

    KAFKA_BROKER_URL_NAME

    Alamat jaringan untuk saluran langganan data.

    Catatan
    • Jika client Kafka dan saluran langganan data Anda berada dalam jaringan klasik atau VPC yang sama, gunakan alamat IP internal untuk meminimalkan latensi jaringan.

    • Kami tidak merekomendasikan penggunaan titik akhir publik karena potensi ketidakstabilan jaringan.

    INITIAL_CHECKPOINT_NAME

    Waktu untuk memulai konsumsi data, ditentukan sebagai Stempel waktu UNIX. Contoh: 1592269238.

    Catatan
    • Menyimpan stempel waktu memungkinkan Anda:

      • Melanjutkan konsumsi dari stempel waktu terakhir yang disimpan setelah gangguan untuk mencegah kehilangan data.

      • Memulai konsumsi data dari titik waktu tertentu.

    • Jika SUBSCRIBE_MODE_NAME diatur ke subscribe, nilai INITIAL_CHECKPOINT_NAME hanya berlaku saat client langganan pertama kali dijalankan.

    Waktu mulai konsumsi harus berada dalam rentang data instans langganan dan harus dikonversi ke Stempel waktu UNIX.

    Catatan
    • Anda dapat melihat rentang data instans langganan di kolom Data Range pada daftar tugas langganan.

    • Anda dapat menggunakan mesin pencari untuk menemukan konverter Stempel waktu UNIX.

    USE_CONFIG_CHECKPOINT_NAME

    Nilai default adalah true, yang memaksa client untuk memulai dari waktu yang ditentukan. Hal ini mencegah kehilangan data yang telah diterima tetapi belum diproses.

    Tidak ada

    SUBSCRIBE_MODE_NAME

    Untuk menjalankan dua atau lebih client Kafka dalam kelompok konsumen yang sama, atur parameter ini ke subscribe pada semua client.

    Nilai default adalah assign. Dalam mode ini, fitur ini dinonaktifkan, dan kami merekomendasikan hanya menerapkan satu client.

    Tidak ada

  9. Dari bilah menu atas, pilih Run > Run untuk menjalankan client.

    Catatan

    Saat menjalankan aplikasi untuk pertama kalinya, IntelliJ IDEA mungkin memerlukan waktu untuk mengunduh dan menginstal paket dependensi yang diperlukan secara otomatis.

Hasil

Client berhasil berlangganan data perubahan dari database sumber.

Untuk melihat data perubahan secara detail, hapus // dari //log.info(ret); pada baris 25 dalam file NotifyDemoDB.java dan jalankan kembali client tersebut.

FAQ

  • T: Mengapa saya perlu melacak titik pemeriksaan konsumsi client saya secara manual?

    J: Titik pemeriksaan konsumsi yang direkam oleh DTS menunjukkan kapan DTS menerima commit dari client Kafka, bukan kapan aplikasi Anda selesai memproses data. Jika aplikasi atau client Kafka Anda berhenti secara tak terduga, menyediakan titik pemeriksaan yang Anda rekam sendiri memungkinkan Anda melanjutkan konsumsi dari titik gangguan yang tepat, sehingga mencegah duplikasi data maupun kehilangan data.

Kelola checkpoint

  1. Konfigurasikan client langganan untuk mendengarkan pergantian kluster dalam modul pengumpulan data DTS.

    Untuk melakukan ini, daftarkan ClusterSwitchListener sebagai interceptor konsumen dengan mengatur properti berikut untuk consumer:

    properties.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ClusterSwitchListener.class.getName());

    Kode berikut menunjukkan contoh implementasi ClusterSwitchListener:

    public class ClusterSwitchListener implements ClusterResourceListener, ConsumerInterceptor {
        private final static Logger LOG = LoggerFactory.getLogger(ClusterSwitchListener.class);
        private ClusterResource originClusterResource = null;
        private ClusterResource currentClusterResource = null;
    
        public ConsumerRecords onConsume(ConsumerRecords records) {
            return records;
        }
    
    
        public void close() {
        }
    
        public void onCommit(Map offsets) {
        }
    
    
        public void onUpdate(ClusterResource clusterResource) {
            synchronized (this) {
                originClusterResource = currentClusterResource;
                currentClusterResource = clusterResource;
                if (null == originClusterResource) {
                    LOG.info("Cluster updated to " + currentClusterResource.clusterId());
                } else {
                    if (originClusterResource.clusterId().equals(currentClusterResource.clusterId())) {
                        LOG.info("Cluster not changed on update:" + clusterResource.clusterId());
                    } else {
                        LOG.error("Cluster changed");
                        throw new ClusterSwitchException("Cluster changed from " + originClusterResource.clusterId() + " to " + currentClusterResource.clusterId()
                                + ", consumer require restart");
                    }
                }
            }
        }
    
        public boolean isClusterResourceChanged() {
            if (null == originClusterResource) {
                return false;
            }
            if (originClusterResource.clusterId().equals(currentClusterResource.clusterId())) {
                return false;
            }
            return true;
        }
    
        public void configure(Map<String, ?> configs) {
        }
    
        public static class ClusterSwitchException extends KafkaException {
            public ClusterSwitchException(String message, Throwable cause) {
                super(message, cause);
            }
    
            public ClusterSwitchException(String message) {
                super(message);
            }
    
            public ClusterSwitchException(Throwable cause) {
                super(cause);
            }
    
            public ClusterSwitchException() {
                super();
            }
    
        }
  2. Tangani event pergantian kluster dari modul pengumpulan data DTS.

    Saat ClusterSwitchException tertangkap, atur ulang checkpoint awal untuk langganan berikutnya ke stempel waktu dari catatan terakhir yang berhasil dikonsumsi. Potongan kode berikut memberikan contoh:

    try{
       //do some action
    } catch (ClusterSwitchListener.ClusterSwitchException e) {
       reset();
    }
    
    // Reset the checkpoint.
    public reset() {
      long offset = kafkaConsumer.offsetsForTimes(timestamp);
      kafkaConsumer.seek(tp,offset);
    }
    Catatan

    Untuk contoh implementasi, lihat KafkaRecordFetcher.

Pemetaan tipe data dan dataTypeNumber

Pemetaan tipe data MySQL dan dataTypeNumber

Tipe data MySQL

Nilai dataTypeNumber

MYSQL_TYPE_DECIMAL

0

MYSQL_TYPE_INT8

1

MYSQL_TYPE_INT16

2

MYSQL_TYPE_INT32

3

MYSQL_TYPE_FLOAT

4

MYSQL_TYPE_DOUBLE

5

MYSQL_TYPE_NULL

6

MYSQL_TYPE_TIMESTAMP

7

MYSQL_TYPE_INT64

8

MYSQL_TYPE_INT24

9

MYSQL_TYPE_DATE

10

MYSQL_TYPE_TIME

11

MYSQL_TYPE_DATETIME

12

MYSQL_TYPE_YEAR

13

MYSQL_TYPE_DATE_NEW

14

MYSQL_TYPE_VARCHAR

15

MYSQL_TYPE_BIT

16

MYSQL_TYPE_TIMESTAMP_NEW

17

MYSQL_TYPE_DATETIME_NEW

18

MYSQL_TYPE_TIME_NEW

19

MYSQL_TYPE_JSON

245

MYSQL_TYPE_DECIMAL_NEW

246

MYSQL_TYPE_ENUM

247

MYSQL_TYPE_SET

248

MYSQL_TYPE_TINY_BLOB

249

MYSQL_TYPE_MEDIUM_BLOB

250

MYSQL_TYPE_LONG_BLOB

251

MYSQL_TYPE_BLOB

252

MYSQL_TYPE_VAR_STRING

253

MYSQL_TYPE_STRING

254

MYSQL_TYPE_GEOMETRY

255

Pemetaan tipe data Oracle dan dataTypeNumber

Tipe data Oracle

dataTypeNumber value

VARCHAR2/NVARCHAR2

1

NUMBER/FLOAT

2

LONG

8

DATE

12

RAW

23

LONG_RAW

24

UNDEFINED

29

XMLTYPE

58

ROWID

69

CHAR, NCHAR

96

BINARY_FLOAT

100

BINARY_DOUBLE

101

CLOB/NCLOB

112

BLOB

113

BFILE

114

TIMESTAMP

180

TIMESTAMP_WITH_TIME_ZONE

181

INTERVAL_YEAR_TO_MONTH

182

INTERVAL_DAY_TO_SECOND

183

UROWID

208

TIMESTAMP_WITH_LOCAL_TIME_ZONE

231

Pemetaan tipe data PostgreSQL dan dataTypeNumber

Tipe data PostgreSQL

dataTypeNumber value

INT2/SMALLINT

21

INT4/INTEGER/SERIAL

23

INT8/BIGINT

20

CHARACTER

18

CHARACTER VARYING

1043

REAL

700

DOUBLE PRECISION

701

NUMERIC

1700

MONEY

790

DATE

1082

TIME/TIME WITHOUT TIME ZONE

1083

TIME WITH TIME ZONE

1266

TIMESTAMP/TIMESTAMP WITHOUT TIME ZONE

1114

TIMESTAMP WITH TIME ZONE

1184

BYTEA

17

TEXT

25

JSON

114

JSONB

3082

XML

142

UUID

2950

POINT

600

LSEG

601

PATH

602

BOX

603

POLYGON

604

LINE

628

CIDR

650

CIRCLE

718

MACADDR

829

INET

869

INTERVAL

1186

TXID_SNAPSHOT

2970

PG_LSN

3220

TSVECTOR

3614

TSQUERY

3615