Data Integration DataWorks menyediakan MetaQ Reader untuk membaca data dari Message Queue. Topik ini menjelaskan kemampuan sinkronisasi data dari sumber data MetaQ.
Versi yang didukung
MetaQ Reader berlangganan pesan real-time di MetaQ menggunakan kit pengembangan perangkat lunak (SDK) Java untuk Message Queue. Versi SDK Java berikut ini didukung.
<dependency>
<groupId>com.taobao.metaq.final</groupId>
<artifactId>metaq-client</artifactId>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-sdk</artifactId>
<version>1.3.1</version>
</dependency>Batasan
Anda hanya dapat menggunakan plugin MetaQ Reader untuk membaca data dari Message Queue di editor kode.
MetaQ Reader mendukung kelompok sumber daya Serverless (disarankan) dan kelompok sumber daya eksklusif untuk Data Integration.
Tipe data
Tabel berikut mencantumkan tipe bidang yang didukung.
Tipe bidang | Baca offline (MetaQ Reader) |
STRING | Didukung |
Transformasi berikut tersedia untuk tipe MetaQ di MetaQ Reader.
Tipe data Integrasi Data | Tipe data Message Queue |
STRING | STRING |
Kembangkan node sinkronisasi data
Untuk informasi mengenai titik masuk dan prosedur konfigurasi tugas sinkronisasi, lihat panduan konfigurasi berikut.
Untuk prosedur konfigurasi, lihat Konfigurasi tugas di editor kode.
Untuk daftar semua parameter dan contoh kode untuk editor kode, lihat Lampiran: Contoh kode MetaQ dan deskripsi parameter.
Lampiran: Kode dan parameter
Konfigurasi tugas sinkronisasi batch menggunakan editor kode
Jika Anda ingin mengonfigurasi tugas sinkronisasi batch menggunakan editor kode, Anda harus mengonfigurasi parameter terkait dalam skrip sesuai dengan persyaratan format skrip terpadu. Untuk informasi selengkapnya, lihat Konfigurasi tugas di editor kode. Informasi berikut menjelaskan parameter yang harus Anda konfigurasi untuk sumber data saat mengonfigurasi tugas sinkronisasi batch menggunakan editor kode.
Contoh kode Reader
{
"job": {
"content": [
{
"reader": {
"name": "metaqreader",
"parameter": {
"accessId": "<yourAccessKeyId>",
"accessKey": "<yourAccessKeySecret>",
"consumerId": "Test01",
"topicName": "test",
"subExpression": "*",
"onsChannel": "ALIYUN",
"domainName": "***.aliyun.com",
"contentType": "singlestringcolumn",
"beginOffset": "lastRead",
"nullCurrentOffset": "begin",
"fieldDelimiter": ",",
"column": [
"col0"
],
"fieldDelimiter": ","
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": false
}
}
}
]
}
}Parameter skrip Reader
Parameter | Deskripsi | Wajib |
accessId | Pasangan AccessKey Message Queue. Pasangan AccessKey digunakan untuk otentikasi identitas. | Ya |
accessKey | Ya | |
consumerId | Konsumen, juga dikenal sebagai subscriber, menerima dan mengonsumsi pesan. ID konsumen adalah pengenal jenis konsumen. Umumnya, konsumen yang memiliki ID konsumen yang sama menerima dan mengonsumsi jenis pesan yang sama serta menggunakan logika konsumsi yang sama. | Ya |
topicName | Topik pesan. Topik adalah tipe pesan utama yang digunakan untuk mengklasifikasikan pesan. | Ya |
subExpression | Subtopik pesan. | Ya |
onsChannel | Digunakan untuk otentikasi Message Queue. | Ya |
unitName | Unit tujuan yang menerima pesan. Unit umum tercantum di bawah ini:
| Tidak |
instanceName | Nama instans konsumen. | Tidak |
domainName | Titik akhir Message Queue. | Ya |
contentType | Tipe pesan. Tipe yang didukung adalah singlestringcolumn untuk pesan STRING, text untuk pesan teks, dan json untuk pesan JSON. | Ya |
beginOffset | Offset tempat tugas mulai membaca data. Nilai yang valid: begin (dari offset paling awal) dan lastRead (dari offset bacaan terakhir). | Tidak |
nullCurrentOffset | Posisi tempat memulai pembacaan data jika offset terakhir kosong. Nilai yang valid: begin (dari offset paling awal) dan current (dari offset saat ini). | Ya |
fieldDelimiter | Pemisah kolom untuk string pesan dalam mode pemisah, seperti koma (,). Karakter kontrol, seperti \u0001, didukung. | Ya |
column | Daftar bidang yang akan dibaca. | Ya |
beginDateTime | Waktu mulai untuk konsumsi data. Parameter ini menentukan batas kiri rentang waktu. Rentang waktu merupakan interval tertutup di kiri dan terbuka di kanan. Nilai parameter beginDateTime adalah string waktu dalam format yyyyMMddHHmmss. Parameter ini dapat digunakan bersama dengan parameter penjadwalan di DataWorks. | Tidak Catatan Parameter beginDateTime dan endDateTime harus digunakan bersama-sama. |
endDateTime | Waktu akhir untuk konsumsi data. Parameter ini menentukan batas kanan rentang waktu. Rentang waktu merupakan interval tertutup di kiri dan terbuka di kanan. Nilai parameter endDateTime adalah string waktu dalam format yyyyMMddHHmmss. Parameter ini dapat digunakan bersama dengan parameter penjadwalan di DataWorks. |