All Products
Search
Document Center

DataHub:Buat langganan

Last Updated:Jun 30, 2025

Ikhtisar fitur langganan

Konsumsi yang dapat dilanjutkan diperlukan dalam skenario di mana Anda mengonsumsi data dalam topik DataHub dan ingin melanjutkan konsumsi dari waktu aplikasi Anda gagal. Jika Anda perlu melanjutkan konsumsi, Anda harus menyimpan offset konsumsi saat ini dan memastikan bahwa layanan yang digunakan untuk menyimpan offset mendukung ketersediaan tinggi. Hal ini meningkatkan kompleksitas pengembangan aplikasi. Fitur langganan DataHub memungkinkan Anda menyimpan offset konsumsi ke server untuk menyelesaikan masalah tersebut. Anda hanya perlu mengaktifkan fitur ini dan menambahkan beberapa baris kode ke aplikasi Anda untuk mendapatkan layanan pemeliharaan offset konsumsi dengan ketersediaan tinggi. Selain itu, fitur langganan memungkinkan Anda mengatur ulang offset konsumsi, memastikan bahwa data dapat dikonsumsi setidaknya sekali. Misalnya, jika terjadi kesalahan ketika aplikasi Anda memproses data dalam periode tertentu dan Anda ingin mengonsumsi data tersebut lagi, Anda dapat mengatur ulang offset konsumsi tanpa memulai ulang aplikasi. Aplikasi Anda secara otomatis akan mengonsumsi data dari offset konsumsi yang ditentukan.

Buat langganan

Pastikan akun Anda memiliki izin untuk berlangganan topik dalam proyek tertentu. Untuk informasi lebih lanjut, lihat Kontrol Akses. Ikuti langkah-langkah berikut untuk membuat langganan:

  • Buka halaman detail topik. Klik Langganan di pojok kanan atas.

8

  • Di panel Buat Langganan, atur parameter sesuai kebutuhan, lalu klik Buat.

    • Aplikasi: nama aplikasi untuk mana Anda ingin membuat langganan.

    • Deskripsi: deskripsi langganan.

    9

Pada tab Daftar Langganan, cari langganan yang telah dibuat dan klik ikon pencarian di kolom Consumer Offset untuk melihat status konsumsi semua shard.

Di tab Daftar Langganan, temukan langganan yang dibuat dan klik ikon pencarian di kolom Offset Konsumen untuk melihat status konsumsi semua shard.

10

2. Contoh

Fitur langganan memungkinkan Anda menyimpan offset konsumsi. Anda dapat menggunakan kemampuan baca dan tulis DataHub bersama dengan kemampuan menyimpan offset konsumsi dalam skenario di mana Anda harus menyimpan offset konsumsi setelah data dibaca. Untuk informasi lebih lanjut tentang kemampuan baca dan tulis DataHub, lihat DataHub SDK for Java.

  • Contoh Kode

// Kode contoh berikut memberikan contoh cara mengonsumsi data dari offset konsumsi yang disimpan dan mengirimkan offset konsumsi selama konsumsi.
public void offset_consumption(int maxRetry) {
    String endpoint = "<YourEndPoint>";
    String accessId = "<YourAccessId>";
    String accessKey = "<YourAccessKey>";
    String projectName = "<YourProjectName>";
    String topicName = "<YourTopicName>";
    String subId = "<YourSubId>";
    String shardId = "0";
    List<String> shardIds = Arrays.asList(shardId);
    // Buat klien DataHub.
    DatahubClient datahubClient = DatahubClientBuilder.newBuilder()
            .setDatahubConfig(
                    new DatahubConfig(endpoint,
                            // Tentukan apakah akan mengaktifkan transmisi data biner. Server versi V2.12 atau lebih baru mendukung transmisi data biner.
                            new AliyunAccount(accessId, accessKey), true))
            .build();
    RecordSchema schema = datahubClient.getTopic(projectName, topicName).getRecordSchema();
    OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(projectName, topicName, subId, shardIds);
    SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
    // 1. Dapatkan kursor rekaman pada offset konsumsi saat ini. Jika rekaman kedaluwarsa atau rekaman belum dikonsumsi, dapatkan kursor rekaman pertama dalam periode time-to-live (TTL) topik.
    String cursor = "";
    // Jika nomor urut lebih kecil dari 0, rekaman belum dikonsumsi.
    if (subscriptionOffset.getSequence() < 0) {
        // Dapatkan kursor rekaman pertama dalam periode TTL topik.
        cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
    } else {
        // Dapatkan kursor rekaman berikutnya.
        long nextSequence = subscriptionOffset.getSequence() + 1;
        try {
            // Jika error SeekOutOfRange dikembalikan setelah Anda mendapatkan kursor berdasarkan nomor urut, rekaman telah kedaluwarsa.
            cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
        } catch (SeekOutOfRangeException e) {
            // Dapatkan kursor rekaman pertama dalam periode TTL topik.
            cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
        }
    }
    // 2. Baca rekaman dan simpan offset konsumsi. Dalam contoh ini, Anda membaca rekaman tipe TUPLE dan menyimpan offset konsumsi setiap kali 1.000 rekaman dibaca.
    long recordCount = 0L;
    // Baca 1.000 rekaman setiap kali.
    int fetchNum = 1000;
    int retryNum = 0;
    int commitNum = 1000;
    while (retryNum < maxRetry) {
        try {
            GetRecordsResult getRecordsResult = datahubClient.getRecords(projectName, topicName, shardId, schema, cursor, fetchNum);
            if (getRecordsResult.getRecordCount() <= 0) {
                // Jika tidak ada rekaman yang dapat dibaca, jeda thread selama 1 detik dan lanjutkan membaca rekaman.
                System.out.println("tidak ada data, tidur 1 detik");
                Thread.sleep(1000);
                continue;
            }
            for (RecordEntry recordEntry : getRecordsResult.getRecords()) {
                // Konsumsi data.
                TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
                System.out.println("field1:" + data.getField("field1") + "\t"
                        + "field2:" + data.getField("field2"));
                // Simpan offset konsumsi setelah data dikonsumsi.
                recordCount++;
                subscriptionOffset.setSequence(recordEntry.getSequence());
                subscriptionOffset.setTimestamp(recordEntry.getSystemTime());
                // commit offset setiap 1000 rekaman
                if (recordCount % commitNum == 0) {
                    // Kirim offset konsumsi.
                    Map<String, SubscriptionOffset> offsetMap = new HashMap<>();
                    offsetMap.put(shardId, subscriptionOffset);
                    datahubClient.commitSubscriptionOffset(projectName, topicName, subId, offsetMap);
                    System.out.println("commit offset berhasil");
                }
            }
            cursor = getRecordsResult.getNextCursor();
        } catch (SubscriptionOfflineException | SubscriptionSessionInvalidException e) {
            // Sesi langganan keluar. Offline: Langganan offline; SessionChange: Langganan sedang dikonsumsi oleh klien lain.
            e.printStackTrace();
            throw e;
        } catch (SubscriptionOffsetResetException e) {
            // Offset konsumsi diatur ulang. Anda harus mendapatkan informasi versi offset konsumsi lagi.
            SubscriptionOffset offset = datahubClient.getSubscriptionOffset(projectName, topicName, subId, shardIds).getOffsets().get(shardId);
            subscriptionOffset.setVersionId(offset.getVersionId());
            // Setelah offset konsumsi diatur ulang, Anda harus mendapatkan kursor rekaman pada offset konsumsi lagi. Metode untuk mendapatkan kursor bergantung pada metode yang digunakan untuk mengatur ulang offset konsumsi. 
            // Jika baik nomor urut maupun timestamp ditentukan untuk mengatur ulang offset konsumsi, Anda bisa mendapatkan kursor berdasarkan nomor urut atau timestamp.
            // Jika hanya nomor urut yang ditentukan untuk mengatur ulang offset konsumsi, Anda hanya bisa mendapatkan kursor berdasarkan nomor urut. 
            // Jika hanya timestamp yang ditentukan untuk mengatur ulang offset konsumsi, Anda hanya bisa mendapatkan kursor berdasarkan timestamp.
            // Dalam sebagian besar kasus, prioritaskan mendapatkan kursor berdasarkan nomor urut. Jika kursor gagal didapatkan berdasarkan nomor urut atau timestamp, dapatkan kursor rekaman paling awal.
            cursor = null;
            if (cursor == null) {
                try {
                    long nextSequence = offset.getSequence() + 1;
                    cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
                    System.out.println("dapatkan kursor berhasil");
                } catch (DatahubClientException exception) {
                    System.out.println("dapatkan kursor oleh SEQUENCE gagal, coba dapatkan kursor oleh SYSTEM_TIME");
                }
            }
            if (cursor == null) {
                try {
                    cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SYSTEM_TIME, offset.getTimestamp()).getCursor();
                    System.out.println("dapatkan kursor berhasil");
                } catch (DatahubClientException exception) {
                    System.out.println("dapatkan kursor oleh SYSTEM_TIME gagal, coba dapatkan kursor oleh OLDEST");
                }
            }
            if (cursor == null) {
                try {
                    cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
                    System.out.println("dapatkan kursor berhasil");
                } catch (DatahubClientException exception) {
                    System.out.println("dapatkan kursor oleh OLDEST gagal");
                    System.out.println("dapatkan kursor gagal!!");
                    throw e;
                }
            }
        } catch (LimitExceededException e) {
            // batas terlampaui, coba lagi
            e.printStackTrace();
            retryNum++;
        } catch (DatahubClientException e) {
            // kesalahan lain, coba lagi
            e.printStackTrace();
            retryNum++;
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(-1);
        }
    }
}
  • Hasil

1. Saat Anda memulai aplikasi untuk pertama kalinya, aplikasi mengonsumsi data dari rekaman paling awal. Saat aplikasi berjalan, Anda dapat menyegarkan tab Daftar Langganan di konsol DataHub. Offset konsumsi shard akan bergerak maju. 2. Jika Anda mengatur ulang offset konsumsi dengan mengklik Atur Ulang di konsol DataHub selama konsumsi, aplikasi Anda secara otomatis mendeteksi perubahan offset konsumsi dan mengonsumsi data dari offset konsumsi yang ditentukan. Ketika aplikasi menangkap OffsetResetedException, aplikasi memanggil metode getSubscriptionOffset untuk meminta offset konsumsi terbaru dari server. Kemudian, aplikasi dapat mengonsumsi data dari offset konsumsi terbaru. 3. Perhatikan bahwa shard dalam langganan tidak dapat dikonsumsi oleh beberapa thread atau proses pada saat yang sama. Jika tidak, offset konsumsi yang dikirimkan oleh satu thread akan ditimpa oleh yang dikirimkan oleh thread lain, dan server tidak dapat menentukan thread mana yang dimiliki oleh offset konsumsi yang disimpan. Dalam kasus ini, server melempar OffsetSessionChangedException. Kami sarankan Anda keluar dari sesi langganan untuk memeriksa apakah data dikonsumsi berulang kali jika pengecualian ini tertangkap.