Topik ini menjelaskan ApsaraMQ for RocketMQ.
Instans Edisi Standar ApsaraMQ for RocketMQ versi 4.x memiliki batas panggilan API elastis bersama sebesar 5.000 per detik. Jika Anda menggunakan middleware messaging versi ini untuk terhubung ke Realtime Compute for Apache Flink dan melebihi batas tersebut, mekanisme throttling akan dipicu sehingga pekerjaan Flink Anda menjadi tidak stabil. Oleh karena itu, kami menyarankan agar Anda mengevaluasi dampak penggunaan ApsaraMQ for RocketMQ 4.x Edisi Standar. Jika skenario bisnis Anda memungkinkan, pertimbangkan alternatif lain seperti Kafka, Simple Log Service (SLS), atau DataHub. Namun, jika Anda harus menggunakan ApsaraMQ for RocketMQ 4.x Edisi Standar untuk memproses pesan dalam skala besar, Anda dapat membuat tiket untuk menghubungi tim produk ApsaraMQ for RocketMQ dan meminta peningkatan batas throttling.
Informasi latar belakang
ApsaraMQ for RocketMQ adalah layanan middleware terdistribusi yang dikembangkan oleh Alibaba Cloud berdasarkan Apache RocketMQ. Layanan ini menyediakan latensi rendah, konkurensi tinggi, ketersediaan tinggi (HA), serta keandalan tinggi. Layanan ini mendukung penguraian keterkaitan asinkron dan penggeseran beban puncak untuk aplikasi terdistribusi, serta fitur-fitur penting bagi aplikasi internet seperti akumulasi pesan dalam jumlah besar, throughput tinggi, dan retry yang andal.
Konektor RocketMQ mendukung hal-hal berikut.
Kategori | Rincian |
Jenis yang didukung | Tabel sumber dan tabel sink |
Mode jalankan | Hanya mode stream yang didukung. |
Format data | Format CSV dan biner |
Metrik pemantauan spesifik | |
Jenis API | Datastream (hanya untuk RocketMQ 4.x) dan SQL |
Mendukung pembaruan atau penghapusan data di tabel sink | Tidak mendukung pembaruan atau penghapusan data di tabel sink. Hanya mendukung penyisipan data. |
Fitur
Tabel sumber dan tabel sink ApsaraMQ for RocketMQ mendukung bidang properti.
Bidang properti tabel sumber
Nama bidang
Jenis bidang
Deskripsi
topic
VARCHAR METADATA VIRTUAL
Topik pesan.
queue-id
INT METADATA VIRTUAL
ID antrian pesan.
queue-offset
BIGINT METADATA VIRTUAL
Offset konsumen dari antrian pesan.
msg-id
VARCHAR METADATA VIRTUAL
ID pesan.
store-timestamp
TIMESTAMP(3) METADATA VIRTUAL
Waktu saat pesan disimpan.
born-timestamp
TIMESTAMP(3) METADATA VIRTUAL
Waktu saat pesan dihasilkan.
keys
VARCHAR METADATA VIRTUAL
Kunci pesan.
tags
VARCHAR METADATA VIRTUAL
Tag pesan.
Bidang properti tabel sink
Nama bidang
Jenis bidang
Deskripsi
keys
VARCHAR METADATA
Kunci pesan.
tags
VARCHAR METADATA
Tag pesan.
Prasyarat
Sumber daya ApsaraMQ for RocketMQ telah dibuat. Untuk informasi selengkapnya, lihat Buat sumber daya.
Batasan
Hanya Ververica Runtime (VVR) 8.0.3 atau yang lebih baru dari Realtime Compute for Apache Flink yang mendukung ApsaraMQ for RocketMQ 5.x.
Konektor ApsaraMQ for RocketMQ menggunakan konsumen pull untuk mengonsumsi pesan. Semua subtugas berbagi beban konsumsi.
Sintaksis
CREATE TABLE mq_source(
x varchar,
y varchar,
z varchar
) WITH (
'connector' = 'mq5',
'topic' = '<yourTopicName>',
'endpoint' = '<yourEndpoint>',
'consumerGroup' = '<yourConsumerGroup>'
);Parameter WITH
Umum
Parameter | Deskripsi | Tipe data | Wajib | Nilai default | Catatan |
connector | Jenis konektor. | String | Ya | - |
|
endPoint | Alamat titik akhir. | String | Ya | - | Titik akhir ApsaraMQ for RocketMQ dapat berupa salah satu jenis berikut:
Penting Karena perubahan dinamis pada kebijakan Keamanan Jaringan Alibaba Cloud, masalah konektivitas jaringan dapat terjadi ketika Realtime Compute for Apache Flink terhubung ke layanan ApsaraMQ for RocketMQ publik. Kami menyarankan agar Anda menggunakan layanan ApsaraMQ for RocketMQ internal.
|
topic | Nama topik. | String | Ya | Tidak ada | Tidak ada. |
accessId |
| String |
| Tidak ada |
Penting Untuk mencegah informasi AccessKey Anda bocor, gunakan variabel untuk menentukan pasangan Kunci Akses. Untuk informasi selengkapnya, lihat Variabel proyek.
|
accessKey |
| String |
| - | |
tag | Tag yang akan berlangganan atau ditulis. | String | Tidak | - |
Catatan Saat digunakan sebagai tabel sink, parameter ini hanya didukung untuk ApsaraMQ for RocketMQ 4.x. Untuk ApsaraMQ for RocketMQ 5.x, gunakan bidang properti tabel sink untuk menentukan tag pesan keluaran. |
encoding | Format penyandian. | String | Tidak | UTF-8 | Tidak ada. |
instanceID | ID instans ApsaraMQ for RocketMQ. | String | Tidak | - |
Catatan Parameter ini hanya didukung untuk ApsaraMQ for RocketMQ 4.x. |
Khusus tabel sumber
Parameter | Deskripsi | Tipe data | Wajib | Nilai default | Catatan |
consumerGroup | Nama grup konsumen. | String | Ya | - | Tidak ada. |
pullIntervalMs | Durasi hibernasi untuk sumber saat tidak ada data yang tersedia untuk dikonsumsi dari hulu. | Int | Ya | Tidak ada | Satuan: milidetik. Tidak tersedia mekanisme throttling. Anda tidak dapat mengatur laju pembacaan data dari ApsaraMQ for RocketMQ. Catatan Parameter ini hanya didukung untuk ApsaraMQ for RocketMQ 4.x. |
timeZone | Zona waktu. | String | Tidak | - | Contoh: Asia/Shanghai. |
startTimeMs | Waktu mulai. | Long | Tidak | Tidak ada | Stempel waktu UNIX. Unit: milidetik. |
startMessageOffset | Offset awal pesan. | Int | Tidak | - | Jika parameter ini diatur, pemuatan data diprioritaskan dimulai dari offset |
lineDelimiter | Pemisah baris yang digunakan untuk mengurai blok. | String | Tidak | \n | Tidak ada. |
fieldDelimiter | Pemisah bidang. | String | Tidak | \u0001 | Pemisah bervariasi berdasarkan mode klien ApsaraMQ for RocketMQ:
|
lengthCheck | Kebijakan untuk memeriksa jumlah bidang dalam satu baris. | Int | Tidak | NONE | Nilai valid:
|
columnErrorDebug | Menentukan apakah debugging diaktifkan. | Boolean | Tidak | false | Jika Anda mengatur parameter ini ke true, log pengecualian parsing dicetak. |
pullBatchSize | Jumlah maksimum pesan yang ditarik sekaligus. | Int | Tidak | 64 | Parameter ini hanya didukung di VVR 8.0.7 atau yang lebih baru dari Realtime Compute for Apache Flink. |
Khusus tabel sink
Parameter | Deskripsi | Tipe data | Wajib | Nilai default | Catatan |
producerGroup | Kelompok tempat menulis. | String | Ya | - | Tidak ada. |
retryTimes | Jumlah percobaan ulang untuk operasi tulis. | Int | Tidak | 10 | Tidak ada. |
sleepTimeMs | Interval percobaan ulang. | Long | Tidak | 5000 | Tidak ada. |
partitionField | Menentukan nama bidang yang akan digunakan sebagai kolom kunci partisi. | String | Tidak | Tidak ada | Jika Catatan Parameter ini hanya didukung di VVR 8.0.5 atau yang lebih baru dari Realtime Compute for Apache Flink. |
deliveryTimestampMode | Menentukan mode untuk pesan tertunda. Parameter ini, bersama dengan parameter | String | Tidak | - | Nilai valid:
Catatan Parameter ini hanya didukung di VVR 11.1 atau yang lebih baru dari Realtime Compute for Apache Flink. |
deliveryTimestampType | Menentukan jenis dasar waktu untuk pesan tertunda. | String | Tidak | processing_time | Nilai valid:
Catatan Parameter ini hanya didukung di VVR 11.1 atau yang lebih baru dari Realtime Compute for Apache Flink. |
deliveryTimestampValue | Waktu pengiriman untuk pesan tertunda. | Long | Tidak | Tidak ada | Makna parameter ini bervariasi berdasarkan nilai
Catatan Parameter ini hanya didukung di VVR 11.1 atau yang lebih baru dari Realtime Compute for Apache Flink. |
deliveryTimestampField | Menentukan bidang yang akan digunakan untuk waktu pengiriman pesan tertunda. Jenis bidang harus | String | Tidak | - | Parameter ini hanya berlaku ketika Catatan Parameter ini hanya didukung di VVR 11.1 atau yang lebih baru dari Realtime Compute for Apache Flink. |
Pemetaan jenis
Jenis bidang Flink | Jenis bidang ApsaraMQ for RocketMQ |
BOOLEAN | STRING |
VARBINARY | |
VARCHAR | |
TINYINT | |
INTEGER | |
BIGINT | |
FLOAT | |
DOUBLE | |
DECIMAL |
Contoh kode
Contoh tabel sumber
Format CSV
Asumsikan Anda memiliki catatan pesan berikut dalam format CSV.
1,name,male 2,name,femaleCatatanPesan RocketMQ dapat berisi nol atau lebih catatan data, dipisahkan oleh
\n.Pernyataan Data Definition Language (DDL) berikut menunjukkan cara mendeklarasikan tabel sumber ApsaraMQ for RocketMQ dalam pekerjaan Flink.
ApsaraMQ for RocketMQ 5.x
CREATE TABLE mq_source( id varchar, name varchar, gender varchar, topic varchar metadata virtual ) WITH ( 'connector' = 'mq5', 'topic' = 'mq-test', 'endpoint' = '<yourEndpoint>', 'consumerGroup' = 'mq-group', 'fieldDelimiter' = ',' );RocketMQ 4.x
CREATE TABLE mq_source( id varchar, name varchar, gender varchar, topic varchar metadata virtual ) WITH ( 'connector' = 'mq', 'topic' = 'mq-test', 'endpoint' = '<yourEndpoint>', 'pullIntervalMs' = '1000', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'consumerGroup' = 'mq-group', 'fieldDelimiter' = ',' );Format Biner
RocketMQ 5.x
CREATE TEMPORARY TABLE source_table ( mess varbinary ) WITH ( 'connector' = 'mq5', 'endpoint' = '<yourEndpoint>', 'topic' = 'mq-test', 'consumerGroup' = 'mq-group' ); CREATE TEMPORARY TABLE out_table ( commodity varchar ) WITH ( 'connector' = 'print' ); INSERT INTO out_table select cast(mess as varchar) FROM source_table;ApsaraMQ for RocketMQ 4.x
CREATE TEMPORARY TABLE source_table ( mess varbinary ) WITH ( 'connector' = 'mq', 'endpoint' = '<yourEndpoint>', 'pullIntervalMs' = '500', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'topic' = 'mq-test', 'consumerGroup' = 'mq-group' ); CREATE TEMPORARY TABLE out_table ( commodity varchar ) WITH ( 'connector' = 'print' ); INSERT INTO out_table select cast(mess as varchar) FROM source_table;
Contoh tabel sink
Buat Tabel Sink
ApsaraMQ for RocketMQ 5.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR ) WITH ( 'connector'='mq5', 'endpoint'='<yourEndpoint>', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );ApsaraMQ for RocketMQ 4.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR ) WITH ( 'connector'='mq', 'endpoint'='<yourEndpoint>', 'accessId'='${secret_values.ak_id}', 'accessKey'='${secret_values.ak_secret}', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );CatatanJika pesan ApsaraMQ for RocketMQ Anda dalam format biner, Anda hanya dapat mendefinisikan satu bidang dalam pernyataan DDL, dan jenis bidang tersebut harus VARBINARY.
Buat tabel sink yang menentukan bidang
keysdantagssebagai kunci dan tag untuk pesan RocketMQApsaraMQ for RocketMQ 5.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR, keys VARCHAR METADATA, tags VARCHAR METADATA ) WITH ( 'connector'='mq5', 'endpoint'='<yourEndpoint>', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );RocketMQ 4.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR, keys VARCHAR METADATA, tags VARCHAR METADATA ) WITH ( 'connector'='mq', 'endpoint'='<yourEndpoint>', 'accessId'='${secret_values.ak_id}', 'accessKey'='${secret_values.ak_secret}', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );
API DataStream
Saat membaca dan menulis data menggunakan API DataStream, Anda harus menggunakan konektor DataStream yang sesuai untuk terhubung ke Flink yang sepenuhnya dikelola. Untuk informasi selengkapnya tentang cara mengonfigurasi konektor DataStream, lihat Gunakan konektor DataStream.
Ververica Runtime (VVR) menyediakan MetaQSource untuk membaca data dari ApsaraMQ for RocketMQ. VVR juga menyediakan MetaQOutputFormat, yang merupakan implementasi dari kelas OutputFormat, untuk menulis data ke ApsaraMQ for RocketMQ. Kode berikut memberikan contoh cara membaca dan menulis data ke ApsaraMQ for RocketMQ:
ApsaraMQ for RocketMQ 5.x
Pada ApsaraMQ for RocketMQ 5.x, pasangan AccessKey sesuai dengan nama pengguna dan kata sandi yang dikonfigurasi untuk instans. Jika Anda mengakses instans ApsaraMQ for RocketMQ melalui jaringan internal dan autentikasi Access Control List (ACL) tidak diaktifkan untuk instans tersebut, Anda tidak perlu mengonfigurasi parameter pasangan AccessKey.
import com.alibaba.ververica.connectors.common.sink.OutputFormatSinkFunction;
import com.alibaba.ververica.connectors.mq5.shaded.org.apache.rocketmq.common.message.MessageExt;
import com.alibaba.ververica.connectors.mq5.sink.RocketMQOutputFormat;
import com.alibaba.ververica.connectors.mq5.source.RocketMQSource;
import com.alibaba.ververica.connectors.mq5.source.reader.deserializer.RocketMQRecordDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Collections;
import java.util.List;
/**
* Demo yang menggambarkan cara mengonsumsi pesan dari RocketMQ, mengonversi
* pesan, lalu menghasilkan pesan ke RocketMQ.
*/
public class RocketMQ5DataStreamDemo {
public static final String ENDPOINT = "<yourEndpoint>";
public static final String ACCESS_ID = "<accessID>";
public static final String ACCESS_KEY = "<accessKey>";
public static final String SOURCE_TOPIC = "<sourceTopicName>";
public static final String CONSUMER_GROUP = "<consumerGroup>";
public static final String SINK_TOPIC = "<sinkTopicName>";
public static final String PRODUCER_GROUP = "<producerGroup>";
public static void main(String[] args) throws Exception {
// Menyiapkan lingkungan eksekusi streaming
Configuration conf = new Configuration();
// Dua konfigurasi berikut hanya untuk debugging lokal. Hapus sebelum Anda mengemas pekerjaan dan mengunggahnya ke Realtime Compute for Apache Flink.
conf.setString("pipeline.classpaths", "file://" + "jalur mutlak file JAR uber");
conf.setString(
"classloader.parent-first-patterns.additional",
"com.alibaba.ververica.connectors.mq5.source.reader.deserializer.RocketMQRecordDeserializationSchema;com.alibaba.ververica.connectors.mq5.shaded.");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
final DataStreamSource<String> ds =
env.fromSource(
RocketMQSource.<String>builder()
.setEndpoint(ENDPOINT)
.setAccessId(ACCESS_ID)
.setAccessKey(ACCESS_KEY)
.setTopic(SOURCE_TOPIC)
.setConsumerGroup(CONSUMER_GROUP)
.setDeserializationSchema(new MyDeserializer())
.setStartOffset(1)
.build(),
WatermarkStrategy.noWatermarks(),
"source");
ds.map(new ToMessage())
.addSink(
new OutputFormatSinkFunction<>(
new RocketMQOutputFormat.Builder()
.setEndpoint(ENDPOINT)
.setAccessId(ACCESS_ID)
.setAccessKey(ACCESS_KEY)
.setTopicName(SINK_TOPIC)
.setProducerGroup(PRODUCER_GROUP)
.build()));
env.execute();
}
private static class MyDeserializer implements RocketMQRecordDeserializationSchema<String> {
@Override
public void deserialize(List<MessageExt> record, Collector<String> out) {
for (MessageExt messageExt : record) {
out.collect(new String(messageExt.getBody()));
}
}
@Override
public TypeInformation<String> getProducedType() {
return Types.STRING;
}
}
private static class ToMessage implements MapFunction<String, List<MessageExt>> {
public ToMessage() {
}
@Override
public List<MessageExt> map(String s) {
final MessageExt message = new MessageExt();
message.setBody(s.getBytes());
message.setWaitStoreMsgOK(true);
return Collections.singletonList(message);
}
}
}ApsaraMQ for RocketMQ 4.x
import com.alibaba.ververica.connector.mq.shaded.com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.ververica.connectors.common.sink.OutputFormatSinkFunction;
import com.alibaba.ververica.connectors.metaq.sink.MetaQOutputFormat;
import com.alibaba.ververica.connectors.metaq.source.MetaQSource;
import com.alibaba.ververica.connectors.metaq.source.reader.deserializer.MetaQRecordDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
import static com.alibaba.ververica.connector.mq.shaded.com.taobao.metaq.client.ExternConst.*;
/**
* Demo yang menggambarkan cara mengonsumsi pesan dari RocketMQ, mengonversi
* pesan, lalu menghasilkan pesan ke RocketMQ.
*/
public class RocketMQDataStreamDemo {
public static final String ENDPOINT = "<yourEndpoint>";
public static final String ACCESS_ID = "<accessID>";
public static final String ACCESS_KEY = "<accessKey>";
public static final String INSTANCE_ID = "<instanceID>";
public static final String SOURCE_TOPIC = "<sourceTopicName>";
public static final String CONSUMER_GROUP = "<consumerGroup>";
public static final String SINK_TOPIC = "<sinkTopicName>";
public static final String PRODUCER_GROUP = "<producerGroup>";
public static void main(String[] args) throws Exception {
// Menyiapkan lingkungan eksekusi streaming
Configuration conf = new Configuration();
// Dua konfigurasi berikut hanya untuk debugging lokal. Hapus sebelum Anda mengemas pekerjaan dan mengunggahnya ke Realtime Compute for Apache Flink.
conf.setString("pipeline.classpaths", "file://" + "jalur mutlak file JAR uber");
conf.setString("classloader.parent-first-patterns.additional",
"com.alibaba.ververica.connectors.metaq.source.reader.deserializer.MetaQRecordDeserializationSchema;com.alibaba.ververica.connector.mq.shaded.");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
// Membuat dan menambahkan sumber RocketMQ.
env.fromSource(createRocketMQSource(), WatermarkStrategy.noWatermarks(), "source")
// Mengonversi isi pesan menjadi huruf kapital.
.map(RocketMQDataStreamDemo2::convertMessages)
// Membuat dan menambahkan sink RocketMQ.
.addSink(new OutputFormatSinkFunction<>(createRocketMQOutputFormat()))
.name(RocketMQDataStreamDemo2.class.getSimpleName());
// Mengompilasi dan mengirimkan pekerjaan.
env.execute("Demo end-to-end konektor RocketMQ DataStream");
}
private static MetaQSource<MessageExt> createRocketMQSource() {
Properties mqProperties = createMQProperties();
return new MetaQSource<>(SOURCE_TOPIC,
CONSUMER_GROUP,
null, // selalu null
null, // tag pesan yang akan dikonsumsi
Long.MAX_VALUE, // stempel waktu berhenti dalam milidetik
-1, // Stempel waktu mulai dalam milidetik. Atur ke -1 untuk menonaktifkan mulai dari offset.
0, // Offset awal.
300_000, // Interval penemuan partisi.
mqProperties,
Boundedness.CONTINUOUS_UNBOUNDED,
new MyDeserializationSchema());
}
private static MetaQOutputFormat createRocketMQOutputFormat() {
return new MetaQOutputFormat.Builder()
.setTopicName(SINK_TOPIC)
.setProducerGroup(PRODUCER_GROUP)
.setMqProperties(createMQProperties())
.build();
}
private static Properties createMQProperties() {
Properties properties = new Properties();
properties.put(PROPERTY_ONS_CHANNEL, "ALIYUN");
properties.put(NAMESRV_ADDR, ENDPOINT);
properties.put(PROPERTY_ACCESSKEY, ACCESS_ID);
properties.put(PROPERTY_SECRETKEY, ACCESS_KEY);
properties.put(PROPERTY_ROCKET_AUTH_ENABLED, true);
properties.put(PROPERTY_INSTANCE_ID, INSTANCE_ID);
return properties;
}
private static List<MessageExt> convertMessages(MessageExt messages) {
return Collections.singletonList(messages);
}
public static class MyDeserializationSchema implements MetaQRecordDeserializationSchema<MessageExt> {
@Override
public void deserialize(List<MessageExt> list, Collector<MessageExt> collector) {
for (MessageExt messageExt : list) {
collector.collect(messageExt);
}
}
@Override
public TypeInformation<MessageExt> getProducedType() {
return TypeInformation.of(MessageExt.class);
}
}
}
}
}XML
MQ 4.x: Konektor DataStream MQ.
MQ 5.x: Konektor DataStream MQ.
<!--MQ 5.x-->
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mq5</artifactId>
<version>${vvr-version}</version>
<scope>provided</scope>
</dependency>
<!--MQ 4.x-->
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mq</artifactId>
<version>${vvr-version}</version>
</dependency>Untuk informasi selengkapnya tentang cara mengonfigurasi titik akhir untuk ApsaraMQ for RocketMQ, lihat Pengumuman tentang pengaturan titik akhir internal TCP.
FAQ
Bagaimana ApsaraMQ for RocketMQ mendeteksi perubahan jumlah partisi topik saat topik diskalakan?