全部产品
Search
文档中心

Realtime Compute for Apache Flink:Konektor DataHub

更新时间:Nov 06, 2025

Topik ini menjelaskan penggunaan konektor DataHub.

Latar Belakang

Alibaba Cloud DataHub adalah platform distribusi data real-time yang dirancang untuk memproses data streaming. Anda dapat mempublikasikan dan berlangganan data streaming di DataHub serta mendistribusikannya ke platform lain. DataHub memungkinkan analisis data streaming dan pembangunan aplikasi berbasis data tersebut. Untuk informasi lebih lanjut, lihat Apa itu DataHub.

Catatan

DataHub kompatibel dengan protokol Kafka. Anda dapat menggunakan konektor Kafka standar alih-alih Upsert Kafka connector untuk membaca atau menulis data dari/ke DataHub. Untuk detailnya, lihat Kompatibilitas dengan Kafka.

Tabel berikut menggambarkan kemampuan yang didukung oleh konektor DataHub.

Item

Deskripsi

Tipe yang didukung

Sumber dan sink

Mode operasi

Streaming dan batch

Format data

Tidak tersedia

Metrik

Tidak tersedia

Jenis API

DataStream dan SQL

Dukungan pembaruan/penghapusan data di sink

Tidak didukung. Sink dapat menulis baris insert-only ke topik target.

Sintaksis

CREATE TEMPORARY TABLE datahub_input (
  `time` BIGINT,
  `sequence`  STRING METADATA VIRTUAL,
  `shard-id` BIGINT METADATA VIRTUAL,
  `system-time` TIMESTAMP METADATA VIRTUAL
) WITH (
  'connector' = 'datahub',
  'subId' = '<yourSubId>',
  'endPoint' = '<yourEndPoint>',
  'project' = '<yourProjectName>',
  'topic' = '<yourTopicName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}'
);

Opsi konektor

  • Umum

    Opsi

    Deskripsi

    Tipe

    Wajib

    Nilai default

    Catatan

    connector

    Konektor yang ingin Anda gunakan.

    String

    Ya

    Tidak ada nilai default

    Nilainya harus datahub.

    endPoint

    Titik akhir konsumen.

    String

    Ya

    Tidak ada nilai default

    Nilai opsi bervariasi berdasarkan wilayah Proyek DataHub. Untuk detailnya, lihat Titik Akhir.

    project

    Nama proyek DataHub.

    String

    Ya

    Tidak ada nilai default

    Untuk informasi tentang cara membuat proyek DataHub, lihat Memulai dengan DataHub.

    topic

    Nama topik DataHub.

    String

    Ya

    Tidak ada nilai default

    Untuk informasi tentang cara membuat topik DataHub, lihat Memulai dengan DataHub.

    Catatan

    Untuk topik DataHub tipe BLOB (untuk data tidak bertipe dan tidak terstruktur), tabel Flink yang sesuai harus berisi tepat satu kolom VARBINARY.

    accessId

    ID AccessKey akun Alibaba Cloud Anda.

    String

    Ya

    Tidak ada nilai default

    Untuk informasi lebih lanjut, lihat Operasi konsol.

    Penting

    Untuk melindungi pasangan AccessKey Anda, sediakan informasi menggunakan variabel. Untuk informasi lebih lanjut, lihat Kelola variabel.

    accessKey

    Rahasia AccessKey akun Alibaba Cloud Anda.

    String

    Ya

    Tidak ada nilai default

    retryTimeout

    Maksimum periode timeout untuk percobaan ulang.

    Integer

    Tidak

    1800000

    Kami sarankan menggunakan nilai default. Unit: milidetik.

    retryInterval

    Interval percobaan ulang.

    Integer

    Tidak

    1000

    Kami sarankan menggunakan nilai default. Unit: milidetik.

    CompressType

    Kebijakan kompresi untuk pembacaan dan penulisan.

    String

    Tidak

    lz4

    • lz4: algoritma kompresi lz4.

    • deflate: algoritma kompresi deflate.

    • "": string kosong, menunjukkan bahwa kompresi data dinonaktifkan.

    Catatan

    Hanya Realtime Compute for Apache Flink yang menggunakan VVR 6.0.5 atau lebih baru yang mendukung opsi ini.

  • Spesifik sumber

    Opsi

    Deskripsi

    Tipe

    Wajib

    Nilai default

    Catatan

    subId

    ID langganan.

    String

    Ya

    Tidak ada nilai default

    Untuk informasi lebih lanjut tentang cara membuat langganan DataHub, lihat Buat langganan.

    maxFetchSize

    Jumlah catatan data yang dibaca sekaligus.

    Integer

    Tidak

    50

    Opsi ini mempengaruhi kinerja pembacaan. Anda dapat menyetelnya ke nilai yang lebih besar untuk meningkatkan throughput pembacaan.

    maxBufferSize

    Jumlah maksimum catatan data yang dibaca secara asinkron.

    Integer

    Tidak

    50

    Opsi ini mempengaruhi kinerja pembacaan. Anda dapat menyetelnya ke nilai yang lebih besar untuk meningkatkan throughput pembacaan.

    fetchLatestDelay

    Durasi tidur setelah tidak ada data yang diambil dari sumber data.

    Integer

    Tidak

    500

    Unit: milidetik. Jika data jarang dikirim dari sumber data, tentukan nilai yang lebih kecil untuk opsi ini untuk mengoptimalkan throughput pembacaan.

    lengthCheck

    Aturan untuk memeriksa jumlah bidang per baris.

    String

    Tidak

    NONE

    • NONE

      • Jika jumlah bidang yang diuraikan dari sebuah baris lebih besar dari jumlah bidang yang didefinisikan, data diekstraksi dari kiri ke kanan berdasarkan jumlah bidang yang didefinisikan.

      • Jika jumlah bidang yang diuraikan dari sebuah baris kurang dari jumlah bidang yang didefinisikan, baris ini dilewati.

    • SKIP: Jika jumlah bidang yang diuraikan dari sebuah baris berbeda dari jumlah bidang yang didefinisikan, baris ini dilewati.

    • EXCEPTION: Jika jumlah bidang yang diuraikan dari sebuah baris berbeda dari jumlah bidang yang didefinisikan, pengecualian dilaporkan.

    • PAD: Data dipadati dari kiri ke kanan berdasarkan urutan bidang yang didefinisikan.

      • Jika jumlah bidang yang diuraikan dari sebuah baris lebih besar dari jumlah bidang yang didefinisikan, data diekstraksi dari kiri ke kanan berdasarkan jumlah bidang yang didefinisikan.

      • Jika jumlah bidang yang diuraikan dari sebuah baris kurang dari jumlah bidang yang didefinisikan, nilai bidang yang hilang dipadati dengan "Null" dari kiri ke kanan.

    columnErrorDebug

    Menentukan apakah akan mengaktifkan debugging.

    Boolean

    Tidak

    false

    • false: Debugging dinonaktifkan.

    • true: Debugging diaktifkan dan log tentang pengecualian penguraian dicetak.

    startTime

    Waktu mulai konsumsi log.

    String

    Tidak

    Tidak ada nilai default

    Format: yyyy-MM-dd hh:mm:ss.

    endTime

    Waktu berhenti konsumsi log.

    String

    Tidak

    Tidak ada nilai default

    Format: yyyy-MM-dd hh:mm:ss.

    startTimeMs

    Waktu mulai konsumsi log.

    Long

    Tidak

    -1

    Unit: milidetik. Opsi ini memiliki prioritas lebih tinggi daripada startTime. Nilai default -1 menunjukkan konsumsi dari offset terbaru dalam topik DataHub. Jika tidak ada offset, konsumsi akan dimulai dari offset paling awal.

    Penting

    Mengandalkan nilai default dapat mengakibatkan kehilangan data. Jika pekerjaan Anda gagal sebelum checkpoint pertama, offset terbaru dalam topik DataHub mungkin telah maju, menyebabkan Anda melewatkan data. Untuk mencegah hal ini, konfigurasikan opsi ini secara eksplisit alih-alih menggunakan nilai default.

  • Spesifik sink

    Opsi

    Deskripsi

    Tipe

    Wajib

    Nilai default

    Catatan

    batchCount

    Jumlah baris yang dapat ditulis sekaligus.

    Integer

    Tidak

    500

    Menambahkan nilai opsi ini meningkatkan throughput penulisan dengan biaya latensi yang lebih tinggi.

    batchSize

    Ukuran data yang dapat ditulis sekaligus.

    Integer

    Tidak

    512000

    Menambahkan nilai ini meningkatkan throughput penulisan dengan biaya latensi yang lebih tinggi. Unit: byte.

    flushInterval

    Interval flush data.

    Integer

    Tidak

    5000

    Menambahkan nilai opsi ini meningkatkan throughput penulisan dengan biaya latensi yang lebih tinggi. Unit: milidetik.

    hashFields

    Nama kolom. Setelah nama kolom ditentukan, nilai kolom dengan nama yang sama ditulis ke shard yang sama.

    String

    Tidak

    null

    Pisahkan beberapa nilai kolom dengan koma (,), contohnya hashFields=a,b. Nilai default "null" menunjukkan penulisan acak.

    timeZone

    Zona waktu data.

    String

    Tidak

    Tidak ada nilai default

    Nilai opsi memengaruhi konversi bidang TIMESTAMP antar zona waktu.

    schemaVersion

    Versi dalam skema terdaftar.

    Integer

    Tidak

    -1

    Tidak tersedia

Pemetaan tipe data

Flink

DataHub

TINYINT

TINYINT

BOOLEAN

BOOLEAN

INTEGER

INTEGER

BIGINT

BIGINT

BIGINT

TIMESTAMP

TIMESTAMP

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL

DECIMAL

VARCHAR

STRING

SMALLINT

SMALLINT

VARBINARY

BLOB

Metadata

Kunci

Tipe

Deskripsi

shard-id

BIGINT METADATA VIRTUAL

ID shard.

sequence

STRING METADATA VIRTUAL

Urutan data.

system-time

TIMESTAMP METADATA VIRTUAL

Waktu sistem.

Catatan

Anda hanya dapat memperoleh metadata DataHub sebelumnya jika menggunakan VVR 3.0.1 atau versi lebih baru.

Contoh kode

  • Sumber

    CREATE TEMPORARY TABLE datahub_input (
      `time` BIGINT,
      `sequence`  STRING METADATA VIRTUAL,
      `shard-id` BIGINT METADATA VIRTUAL,
      `system-time` TIMESTAMP METADATA VIRTUAL
    ) WITH (
      'connector' = 'datahub',
      'subId' = '<yourSubId>',
      'endPoint' = '<yourEndPoint>',
      'project' = '<yourProjectName>',
      'topic' = '<yourTopicName>',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}'
    );
    
    CREATE TEMPORARY TABLE test_out (
      `time` BIGINT,
      `sequence`  STRING,
      `shard-id` BIGINT,
      `system-time` TIMESTAMP
    ) WITH (
      'connector' = 'print',
      'logger' = 'true'
    );
    
    INSERT INTO test_out
    SELECT
      `time`,
      `sequence` ,
      `shard-id`,
      `system-time`
    FROM datahub_input;
  • Sink

    CREATE TEMPORARY table datahub_source(
      name VARCHAR
    ) WITH (
      'connector'='datahub',
      'endPoint'='<endPoint>',
      'project'='<yourProjectName>',
      'topic'='<yourTopicName>',
      'subId'='<yourSubId>',
      'accessId'='${secret_values.ak_id}',
      'accessKey'='${secret_values.ak_secret}',
      'startTime'='2018-06-01 00:00:00'
    );
    
    CREATE TEMPORARY table datahub_sink(
      name varchar
    ) WITH (
      'connector'='datahub',
      'endPoint'='<endPoint>',
      'project'='<yourProjectName>',
      'topic'='<yourTopicName>',
      'accessId'='${secret_values.ak_id}',
      'accessKey'='${secret_values.ak_secret}',
      'batchSize'='512000',
      'batchCount'='500'
    );
    
    INSERT INTO datahub_sink
    SELECT
      LOWER(name)
    from datahub_source;

API Datastream

Penting

Jika ingin memanggil API DataStream untuk membaca atau menulis data, Anda harus menggunakan konektor DataStream dari tipe terkait untuk terhubung ke Realtime Compute for Apache Flink. Untuk informasi lebih lanjut tentang konfigurasi konektor DataStream, lihat Pengaturan Konektor DataStream.

Sumber DataHub

VVR menyediakan kelas DatahubSourceFunction yang mengimplementasikan antarmuka SourceFunction. Anda dapat menggunakan kelas tersebut untuk membaca data dari sumber DataHub. Contoh kode berikut menunjukkan cara membaca data dari DataHub.


env.setParallelism(1);
-- Tentukan konfigurasi koneksi. 
DatahubSourceFunction datahubSource =
    new DatahubSourceFunction(
    <yourEndPoint>,
    <yourProjectName>,
    <yourTopicName>,
    <yourSubId>,
    <yourAccessId>,
    <yourAccessKey>,
    "public",
    <yourStartTime>,
    <yourEndTime>
    );
datahubSource.setRequestTimeout(30 * 1000);
datahubSource.enableExitAfterReadFinished();
env.addSource(datahubSource)
    .map((MapFunction<RecordEntry, Tuple2<String, Long>>) this::getStringLongTuple2)
    .print();
env.execute();
private Tuple2<String, Long> getStringLongTuple2(RecordEntry recordEntry) {
    Tuple2<String, Long> tuple2 = new Tuple2<>();
    TupleRecordData recordData = (TupleRecordData) (recordEntry.getRecordData());
    tuple2.f0 = (String) recordData.getField(0);
    tuple2.f1 = (Long) recordData.getField(1);
    return tuple2;StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Sink DataHub

VVR menyediakan kelas OutputFormatSinkFunction yang mengimplementasikan antarmuka DatahubSinkFunction. Anda dapat menggunakan kelas tersebut untuk menulis data ke DataHub. Contoh kode berikut menunjukkan cara menulis data ke DataHub.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-- Tentukan konfigurasi koneksi. 
env.generateSequence(0, 100)
    .map((MapFunction<Long, RecordEntry>) aLong -> getRecordEntry(aLong, "default:"))
    .addSink(
    new DatahubSinkFunction<>(
       <yourEndPoint>,
       <yourProjectName>,
       <yourTopicName>,
       <yourSubId>,
       <yourAccessId>,
       <yourAccessKey>,
       "public",
       <schemaVersion> // Jika schemaRegistry diaktifkan, Anda harus menentukan nilai schemaVersion untuk penulisan data. Dalam kasus lain, Anda dapat menyetel schemaVersion ini ke 0. 
       );
env.execute();
private RecordEntry getRecordEntry(Long message, String s) {
    RecordSchema recordSchema = new RecordSchema();
    recordSchema.addField(new Field("f1", FieldType.STRING));
    recordSchema.addField(new Field("f2", FieldType.BIGINT));
    recordSchema.addField(new Field("f3", FieldType.DOUBLE));
    recordSchema.addField(new Field("f4", FieldType.BOOLEAN));
    recordSchema.addField(new Field("f5", FieldType.TIMESTAMP));
    recordSchema.addField(new Field("f6", FieldType.DECIMAL));
    RecordEntry recordEntry = new RecordEntry();
    TupleRecordData recordData = new TupleRecordData(recordSchema);
    recordData.setField(0, s + message);
    recordData.setField(1, message);
    recordEntry.setRecordData(recordData);
    return recordEntry;
}

XML

Anda dapat menggunakan Konektor DataStream DataHub dari versi yang berbeda yang disimpan di repositori pusat Maven.

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-datahub</artifactId>
    <version>${vvr-version}</version>
</dependency>

Referensi