Topik ini menjelaskan penggunaan konektor DataHub.
Latar Belakang
Alibaba Cloud DataHub adalah platform distribusi data real-time yang dirancang untuk memproses data streaming. Anda dapat mempublikasikan dan berlangganan data streaming di DataHub serta mendistribusikannya ke platform lain. DataHub memungkinkan analisis data streaming dan pembangunan aplikasi berbasis data tersebut. Untuk informasi lebih lanjut, lihat Apa itu DataHub.
DataHub kompatibel dengan protokol Kafka. Anda dapat menggunakan konektor Kafka standar alih-alih Upsert Kafka connector untuk membaca atau menulis data dari/ke DataHub. Untuk detailnya, lihat Kompatibilitas dengan Kafka.
Tabel berikut menggambarkan kemampuan yang didukung oleh konektor DataHub.
Item | Deskripsi |
Tipe yang didukung | Sumber dan sink |
Mode operasi | Streaming dan batch |
Format data | Tidak tersedia |
Metrik | Tidak tersedia |
Jenis API | DataStream dan SQL |
Dukungan pembaruan/penghapusan data di sink | Tidak didukung. Sink dapat menulis baris insert-only ke topik target. |
Sintaksis
CREATE TEMPORARY TABLE datahub_input (
`time` BIGINT,
`sequence` STRING METADATA VIRTUAL,
`shard-id` BIGINT METADATA VIRTUAL,
`system-time` TIMESTAMP METADATA VIRTUAL
) WITH (
'connector' = 'datahub',
'subId' = '<yourSubId>',
'endPoint' = '<yourEndPoint>',
'project' = '<yourProjectName>',
'topic' = '<yourTopicName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}'
);Opsi konektor
Umum
Opsi
Deskripsi
Tipe
Wajib
Nilai default
Catatan
connector
Konektor yang ingin Anda gunakan.
String
Ya
Tidak ada nilai default
Nilainya harus datahub.
endPoint
Titik akhir konsumen.
String
Ya
Tidak ada nilai default
Nilai opsi bervariasi berdasarkan wilayah Proyek DataHub. Untuk detailnya, lihat Titik Akhir.
project
Nama proyek DataHub.
String
Ya
Tidak ada nilai default
Untuk informasi tentang cara membuat proyek DataHub, lihat Memulai dengan DataHub.
topic
Nama topik DataHub.
String
Ya
Tidak ada nilai default
Untuk informasi tentang cara membuat topik DataHub, lihat Memulai dengan DataHub.
CatatanUntuk topik DataHub tipe BLOB (untuk data tidak bertipe dan tidak terstruktur), tabel Flink yang sesuai harus berisi tepat satu kolom VARBINARY.
accessId
ID AccessKey akun Alibaba Cloud Anda.
String
Ya
Tidak ada nilai default
Untuk informasi lebih lanjut, lihat Operasi konsol.
PentingUntuk melindungi pasangan AccessKey Anda, sediakan informasi menggunakan variabel. Untuk informasi lebih lanjut, lihat Kelola variabel.
accessKey
Rahasia AccessKey akun Alibaba Cloud Anda.
String
Ya
Tidak ada nilai default
retryTimeout
Maksimum periode timeout untuk percobaan ulang.
Integer
Tidak
1800000
Kami sarankan menggunakan nilai default. Unit: milidetik.
retryInterval
Interval percobaan ulang.
Integer
Tidak
1000
Kami sarankan menggunakan nilai default. Unit: milidetik.
CompressType
Kebijakan kompresi untuk pembacaan dan penulisan.
String
Tidak
lz4
lz4: algoritma kompresi lz4.
deflate: algoritma kompresi deflate.
"": string kosong, menunjukkan bahwa kompresi data dinonaktifkan.
CatatanHanya Realtime Compute for Apache Flink yang menggunakan VVR 6.0.5 atau lebih baru yang mendukung opsi ini.
Spesifik sumber
Opsi
Deskripsi
Tipe
Wajib
Nilai default
Catatan
subId
ID langganan.
String
Ya
Tidak ada nilai default
Untuk informasi lebih lanjut tentang cara membuat langganan DataHub, lihat Buat langganan.
maxFetchSize
Jumlah catatan data yang dibaca sekaligus.
Integer
Tidak
50
Opsi ini mempengaruhi kinerja pembacaan. Anda dapat menyetelnya ke nilai yang lebih besar untuk meningkatkan throughput pembacaan.
maxBufferSize
Jumlah maksimum catatan data yang dibaca secara asinkron.
Integer
Tidak
50
Opsi ini mempengaruhi kinerja pembacaan. Anda dapat menyetelnya ke nilai yang lebih besar untuk meningkatkan throughput pembacaan.
fetchLatestDelay
Durasi tidur setelah tidak ada data yang diambil dari sumber data.
Integer
Tidak
500
Unit: milidetik. Jika data jarang dikirim dari sumber data, tentukan nilai yang lebih kecil untuk opsi ini untuk mengoptimalkan throughput pembacaan.
lengthCheck
Aturan untuk memeriksa jumlah bidang per baris.
String
Tidak
NONE
NONE
Jika jumlah bidang yang diuraikan dari sebuah baris lebih besar dari jumlah bidang yang didefinisikan, data diekstraksi dari kiri ke kanan berdasarkan jumlah bidang yang didefinisikan.
Jika jumlah bidang yang diuraikan dari sebuah baris kurang dari jumlah bidang yang didefinisikan, baris ini dilewati.
SKIP: Jika jumlah bidang yang diuraikan dari sebuah baris berbeda dari jumlah bidang yang didefinisikan, baris ini dilewati.
EXCEPTION: Jika jumlah bidang yang diuraikan dari sebuah baris berbeda dari jumlah bidang yang didefinisikan, pengecualian dilaporkan.
PAD: Data dipadati dari kiri ke kanan berdasarkan urutan bidang yang didefinisikan.
Jika jumlah bidang yang diuraikan dari sebuah baris lebih besar dari jumlah bidang yang didefinisikan, data diekstraksi dari kiri ke kanan berdasarkan jumlah bidang yang didefinisikan.
Jika jumlah bidang yang diuraikan dari sebuah baris kurang dari jumlah bidang yang didefinisikan, nilai bidang yang hilang dipadati dengan "Null" dari kiri ke kanan.
columnErrorDebug
Menentukan apakah akan mengaktifkan debugging.
Boolean
Tidak
false
false: Debugging dinonaktifkan.
true: Debugging diaktifkan dan log tentang pengecualian penguraian dicetak.
startTime
Waktu mulai konsumsi log.
String
Tidak
Tidak ada nilai default
Format: yyyy-MM-dd hh:mm:ss.
endTime
Waktu berhenti konsumsi log.
String
Tidak
Tidak ada nilai default
Format: yyyy-MM-dd hh:mm:ss.
startTimeMs
Waktu mulai konsumsi log.
Long
Tidak
-1
Unit: milidetik. Opsi ini memiliki prioritas lebih tinggi daripada
startTime. Nilai default -1 menunjukkan konsumsi dari offset terbaru dalam topik DataHub. Jika tidak ada offset, konsumsi akan dimulai dari offset paling awal.PentingMengandalkan nilai default dapat mengakibatkan kehilangan data. Jika pekerjaan Anda gagal sebelum checkpoint pertama, offset terbaru dalam topik DataHub mungkin telah maju, menyebabkan Anda melewatkan data. Untuk mencegah hal ini, konfigurasikan opsi ini secara eksplisit alih-alih menggunakan nilai default.
Spesifik sink
Opsi
Deskripsi
Tipe
Wajib
Nilai default
Catatan
batchCount
Jumlah baris yang dapat ditulis sekaligus.
Integer
Tidak
500
Menambahkan nilai opsi ini meningkatkan throughput penulisan dengan biaya latensi yang lebih tinggi.
batchSize
Ukuran data yang dapat ditulis sekaligus.
Integer
Tidak
512000
Menambahkan nilai ini meningkatkan throughput penulisan dengan biaya latensi yang lebih tinggi. Unit: byte.
flushInterval
Interval flush data.
Integer
Tidak
5000
Menambahkan nilai opsi ini meningkatkan throughput penulisan dengan biaya latensi yang lebih tinggi. Unit: milidetik.
hashFields
Nama kolom. Setelah nama kolom ditentukan, nilai kolom dengan nama yang sama ditulis ke shard yang sama.
String
Tidak
null
Pisahkan beberapa nilai kolom dengan koma (,), contohnya
hashFields=a,b. Nilai default "null" menunjukkan penulisan acak.timeZone
Zona waktu data.
String
Tidak
Tidak ada nilai default
Nilai opsi memengaruhi konversi bidang TIMESTAMP antar zona waktu.
schemaVersion
Versi dalam skema terdaftar.
Integer
Tidak
-1
Tidak tersedia
Pemetaan tipe data
Flink | DataHub |
TINYINT | TINYINT |
BOOLEAN | BOOLEAN |
INTEGER | INTEGER |
BIGINT | BIGINT |
BIGINT | TIMESTAMP |
TIMESTAMP | |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL | DECIMAL |
VARCHAR | STRING |
SMALLINT | SMALLINT |
VARBINARY | BLOB |
Metadata
Kunci | Tipe | Deskripsi |
shard-id | BIGINT METADATA VIRTUAL | ID shard. |
sequence | STRING METADATA VIRTUAL | Urutan data. |
system-time | TIMESTAMP METADATA VIRTUAL | Waktu sistem. |
Anda hanya dapat memperoleh metadata DataHub sebelumnya jika menggunakan VVR 3.0.1 atau versi lebih baru.
Contoh kode
Sumber
CREATE TEMPORARY TABLE datahub_input ( `time` BIGINT, `sequence` STRING METADATA VIRTUAL, `shard-id` BIGINT METADATA VIRTUAL, `system-time` TIMESTAMP METADATA VIRTUAL ) WITH ( 'connector' = 'datahub', 'subId' = '<yourSubId>', 'endPoint' = '<yourEndPoint>', 'project' = '<yourProjectName>', 'topic' = '<yourTopicName>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}' ); CREATE TEMPORARY TABLE test_out ( `time` BIGINT, `sequence` STRING, `shard-id` BIGINT, `system-time` TIMESTAMP ) WITH ( 'connector' = 'print', 'logger' = 'true' ); INSERT INTO test_out SELECT `time`, `sequence` , `shard-id`, `system-time` FROM datahub_input;Sink
CREATE TEMPORARY table datahub_source( name VARCHAR ) WITH ( 'connector'='datahub', 'endPoint'='<endPoint>', 'project'='<yourProjectName>', 'topic'='<yourTopicName>', 'subId'='<yourSubId>', 'accessId'='${secret_values.ak_id}', 'accessKey'='${secret_values.ak_secret}', 'startTime'='2018-06-01 00:00:00' ); CREATE TEMPORARY table datahub_sink( name varchar ) WITH ( 'connector'='datahub', 'endPoint'='<endPoint>', 'project'='<yourProjectName>', 'topic'='<yourTopicName>', 'accessId'='${secret_values.ak_id}', 'accessKey'='${secret_values.ak_secret}', 'batchSize'='512000', 'batchCount'='500' ); INSERT INTO datahub_sink SELECT LOWER(name) from datahub_source;
API Datastream
Jika ingin memanggil API DataStream untuk membaca atau menulis data, Anda harus menggunakan konektor DataStream dari tipe terkait untuk terhubung ke Realtime Compute for Apache Flink. Untuk informasi lebih lanjut tentang konfigurasi konektor DataStream, lihat Pengaturan Konektor DataStream.
Sumber DataHub
VVR menyediakan kelas DatahubSourceFunction yang mengimplementasikan antarmuka SourceFunction. Anda dapat menggunakan kelas tersebut untuk membaca data dari sumber DataHub. Contoh kode berikut menunjukkan cara membaca data dari DataHub.
env.setParallelism(1);
-- Tentukan konfigurasi koneksi.
DatahubSourceFunction datahubSource =
new DatahubSourceFunction(
<yourEndPoint>,
<yourProjectName>,
<yourTopicName>,
<yourSubId>,
<yourAccessId>,
<yourAccessKey>,
"public",
<yourStartTime>,
<yourEndTime>
);
datahubSource.setRequestTimeout(30 * 1000);
datahubSource.enableExitAfterReadFinished();
env.addSource(datahubSource)
.map((MapFunction<RecordEntry, Tuple2<String, Long>>) this::getStringLongTuple2)
.print();
env.execute();
private Tuple2<String, Long> getStringLongTuple2(RecordEntry recordEntry) {
Tuple2<String, Long> tuple2 = new Tuple2<>();
TupleRecordData recordData = (TupleRecordData) (recordEntry.getRecordData());
tuple2.f0 = (String) recordData.getField(0);
tuple2.f1 = (Long) recordData.getField(1);
return tuple2;StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Sink DataHub
VVR menyediakan kelas OutputFormatSinkFunction yang mengimplementasikan antarmuka DatahubSinkFunction. Anda dapat menggunakan kelas tersebut untuk menulis data ke DataHub. Contoh kode berikut menunjukkan cara menulis data ke DataHub.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-- Tentukan konfigurasi koneksi.
env.generateSequence(0, 100)
.map((MapFunction<Long, RecordEntry>) aLong -> getRecordEntry(aLong, "default:"))
.addSink(
new DatahubSinkFunction<>(
<yourEndPoint>,
<yourProjectName>,
<yourTopicName>,
<yourSubId>,
<yourAccessId>,
<yourAccessKey>,
"public",
<schemaVersion> // Jika schemaRegistry diaktifkan, Anda harus menentukan nilai schemaVersion untuk penulisan data. Dalam kasus lain, Anda dapat menyetel schemaVersion ini ke 0.
);
env.execute();
private RecordEntry getRecordEntry(Long message, String s) {
RecordSchema recordSchema = new RecordSchema();
recordSchema.addField(new Field("f1", FieldType.STRING));
recordSchema.addField(new Field("f2", FieldType.BIGINT));
recordSchema.addField(new Field("f3", FieldType.DOUBLE));
recordSchema.addField(new Field("f4", FieldType.BOOLEAN));
recordSchema.addField(new Field("f5", FieldType.TIMESTAMP));
recordSchema.addField(new Field("f6", FieldType.DECIMAL));
RecordEntry recordEntry = new RecordEntry();
TupleRecordData recordData = new TupleRecordData(recordSchema);
recordData.setField(0, s + message);
recordData.setField(1, message);
recordEntry.setRecordData(recordData);
return recordEntry;
}XML
Anda dapat menggunakan Konektor DataStream DataHub dari versi yang berbeda yang disimpan di repositori pusat Maven.
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-datahub</artifactId>
<version>${vvr-version}</version>
</dependency>Referensi
Untuk informasi lebih lanjut tentang konektor yang didukung oleh Realtime Compute for Apache Flink, lihat Konektor yang Didukung.
Untuk informasi tentang cara menggunakan konektor Kafka untuk mengakses DataHub, lihat Konektor Kafka.
Apakah saya bisa menghapus topik DataHub yang sedang dikonsumsi?