このトピックでは、ApsaraMQ for RocketMQ コネクタについて説明します。
ApsaraMQ for RocketMQ 4.x Standard Edition インスタンスには、毎秒 5,000 の共有エラスティック API 呼び出し制限があります。このバージョンのメッセージングミドルウェアを使用して Realtime Compute for Apache Flink に接続すると、この制限を超えるとスロットリングメカニズムがトリガーされ、Flink ジョブが不安定になる可能性があります。したがって、ApsaraMQ for RocketMQ 4.x Standard Edition を使用する影響を評価することをお勧めします。ビジネスシナリオで許可されている場合は、代替として Kafka、Simple Log Service (SLS)、DataHub などの他のミドルウェアの使用を検討してください。大規模なメッセージを処理するために ApsaraMQ for RocketMQ 4.x Standard Edition を使用する必要がある場合は、チケットを送信して ApsaraMQ for RocketMQ プロダクトチームに連絡し、スロットリング制限の引き上げをリクエストすることもできます。
背景情報
ApsaraMQ for RocketMQ は、Alibaba Cloud が Apache RocketMQ をベースに開発した分散ミドルウェアサービスです。低レイテンシー、高い同時実行性、高可用性 (HA)、および高い信頼性を提供します。分散アプリケーションに非同期デカップリングとピーク負荷シフトを提供します。また、大量のメッセージ蓄積、高スループット、信頼性の高いリトライなど、インターネットアプリケーションに必要な機能もサポートしています。
RocketMQ コネクタは、以下をサポートしています。
カテゴリ | 詳細 |
サポートされているタイプ | ソーステーブルとシンクテーブル |
実行モード | ストリームモードのみがサポートされています。 |
データ形式 | CSV およびバイナリ形式 |
特定の監視メトリック | |
API タイプ | DataStream (RocketMQ 4.x のみ) と SQL |
結果テーブルのデータの更新または削除をサポート | 結果テーブルのデータの更新または削除はサポートされていません。データの挿入のみをサポートします。 |
特徴
ApsaraMQ for RocketMQ のソーステーブルと結果テーブルは、プロパティフィールドをサポートしています。
ソーステーブルのプロパティフィールド
フィールド名
フィールドタイプ
説明
topic
VARCHAR METADATA VIRTUAL
メッセージトピック。
queue-id
INT METADATA VIRTUAL
メッセージキュー ID。
queue-offset
BIGINT METADATA VIRTUAL
メッセージキューのコンシューマオフセット。
msg-id
VARCHAR METADATA VIRTUAL
メッセージ ID。
store-timestamp
TIMESTAMP(3) METADATA VIRTUAL
メッセージが保存された時刻。
born-timestamp
TIMESTAMP(3) METADATA VIRTUAL
メッセージが生成された時刻。
keys
VARCHAR METADATA VIRTUAL
メッセージキー。
tags
VARCHAR METADATA VIRTUAL
メッセージタグ。
結果テーブルのプロパティフィールド
フィールド名
フィールドタイプ
説明
keys
VARCHAR METADATA
メッセージキー。
tags
VARCHAR METADATA
メッセージタグ。
前提条件
ApsaraMQ for RocketMQ リソースが作成されていること。詳細については、「リソースの作成」をご参照ください。
制限事項
Realtime Compute for Apache Flink の Ververica Runtime (VVR) 8.0.3 以降のみが ApsaraMQ for RocketMQ 5.x をサポートします。
ApsaraMQ for RocketMQ コネクタは、プルコンシューマーを使用してメッセージを消費します。すべてのサブタスクが消費負荷を共有します。
構文
CREATE TABLE mq_source(
x varchar,
y varchar,
z varchar
) WITH (
'connector' = 'mq5',
'topic' = '<yourTopicName>',
'endpoint' = '<yourEndpoint>',
'consumerGroup' = '<yourConsumerGroup>'
);WITH パラメーター
全般
パラメーター | 説明 | データ型 | 必須 | デフォルト値 | 備考 |
connector | コネクタタイプ。 | 文字列 | はい | - |
|
endPoint | エンドポイントアドレス。 | 文字列 | はい | - | ApsaraMQ for RocketMQ エンドポイントは、次のいずれかのタイプになります。
重要 Alibaba Cloud ネットワークセキュリティポリシーの動的な変更により、Realtime Compute for Apache Flink がパブリック ApsaraMQ for RocketMQ サービスに接続する際にネットワーク接続の問題が発生する可能性があります。内部 ApsaraMQ for RocketMQ サービスを使用することをお勧めします。
|
topic | トピック名。 | 文字列 | はい | なし | なし。 |
accessId |
| 文字列 |
| なし |
重要 AccessKey 情報の漏洩を防ぐために、変数を使用して AccessKey ペアを指定します。詳細については、「プロジェクト変数」をご参照ください。
|
accessKey |
| 文字列 |
| - | |
tag | サブスクライブまたは書き込むタグ。 | 文字列 | いいえ | - |
説明 結果テーブルとして使用する場合、このパラメーターは ApsaraMQ for RocketMQ 4.x でのみサポートされます。ApsaraMQ for RocketMQ 5.x の場合、結果テーブルのプロパティフィールドを使用して出力メッセージのタグを指定します。 |
encoding | エンコード形式。 | 文字列 | いいえ | UTF-8 | なし。 |
instanceID | ApsaraMQ for RocketMQ インスタンス ID。 | 文字列 | いいえ | - |
説明 このパラメーターは ApsaraMQ for RocketMQ 4.x でのみサポートされます。 |
ソーステーブル固有
パラメーター | 説明 | データ型 | 必須 | デフォルト値 | 備考 |
consumerGroup | コンシューマーグループの名前。 | 文字列 | はい | - | なし。 |
pullIntervalMs | アップストリームから消費できるデータがない場合のソースの休止期間。 | 整数 | はい | なし | 単位: ミリ秒。 スロットリングメカニズムはありません。ApsaraMQ for RocketMQ からデータを読み取るレートを設定することはできません。 説明 このパラメーターは ApsaraMQ for RocketMQ 4.x でのみサポートされます。 |
timeZone | タイムゾーン。 | 文字列 | いいえ | - | 例: Asia/Shanghai。 |
startTimeMs | 開始時刻。 | ロング | いいえ | なし | UNIX タイムスタンプ。単位: ミリ秒。 |
startMessageOffset | メッセージの開始オフセット。 | 整数 | いいえ | - | このパラメーターが設定されている場合、データ読み込みは優先的に |
lineDelimiter | ブロックを解析するために使用される行区切り文字。 | 文字列 | いいえ | \n | なし。 |
fieldDelimiter | フィールド区切り文字。 | 文字列 | いいえ | \u0001 | 区切り文字は、ApsaraMQ for RocketMQ クライアントのモードによって異なります。
|
lengthCheck | 1 行のフィールド数を確認するポリシー。 | 整数 | いいえ | NONE | 有効な値:
|
columnErrorDebug | デバッグを有効にするかどうかを指定します。 | ブール値 | いいえ | false | このパラメーターを true に設定すると、解析例外のログが出力されます。 |
pullBatchSize | 一度にプルするメッセージの最大数。 | Int | いいえ | 64 | このパラメーターは、Realtime Compute for Apache Flink の VVR 8.0.7 以降でサポートされています。 |
結果テーブル固有
パラメーター | 説明 | データ型 | 必須 | デフォルト値 | 備考 |
producerGroup | 書き込み先のグループ。 | String | はい | - | なし。 |
retryTimes | 書き込み操作のリトライ回数。 | Int | いいえ | 10 | なし。 |
sleepTimeMs | 再試行間隔。 | Long | いいえ | 5000 | なし。 |
partitionField | パーティションキー列として使用するフィールドの名前を指定します。 | String | いいえ | なし |
説明 このパラメーターは、Realtime Compute for Apache Flink の VVR 8.0.5 以降でサポートされています。 |
deliveryTimestampMode | 遅延メッセージのモードを指定します。このパラメーターは、 | String | いいえ | - | 有効な値:
説明 このパラメーターは、Realtime Compute for Apache Flink の VVR 11.1 以降でサポートされています。 |
deliveryTimestampType | 遅延メッセージの時間ベースタイプを指定します。 | String | いいえ | processing_time | 有効な値:
説明 このパラメーターは、Realtime Compute for Apache Flink の VVR 11.1 以降でサポートされています。 |
deliveryTimestampValue | 遅延メッセージの配信時間。 | Long | いいえ | なし | このパラメーターの意味は、
説明 このパラメーターは、Realtime Compute for Apache Flink の VVR 11.1 以降でサポートされています。 |
deliveryTimestampField | 遅延メッセージの配信時間に使用するフィールドを指定します。フィールドタイプは | String | いいえ | - | このパラメーターは、 説明 このパラメーターは、Realtime Compute for Apache Flink の VVR 11.1 以降でサポートされています。 |
タイプマッピング
Flink フィールドタイプ | ApsaraMQ for RocketMQ フィールドタイプ |
BOOLEAN | STRING |
VARBINARY | |
VARCHAR | |
TINYINT | |
INTEGER | |
BIGINT | |
FLOAT | |
DOUBLE | |
DECIMAL |
コード例
ソーステーブルの例
CSV 形式
CSV 形式で次のメッセージレコードがあると仮定します。
1,name,male 2,name,female説明RocketMQ メッセージには、
\nで区切られた 0 個以上のデータレコードを含めることができます。次のデータ定義言語 (DDL) 文は、Flink ジョブで ApsaraMQ for RocketMQ ソーステーブルを宣言する方法を示しています。
ApsaraMQ for RocketMQ 5.x
CREATE TABLE mq_source( id varchar, name varchar, gender varchar, topic varchar metadata virtual ) WITH ( 'connector' = 'mq5', 'topic' = 'mq-test', 'endpoint' = '<yourEndpoint>', 'consumerGroup' = 'mq-group', 'fieldDelimiter' = ',' );RocketMQ 4.x
CREATE TABLE mq_source( id varchar, name varchar, gender varchar, topic varchar metadata virtual ) WITH ( 'connector' = 'mq', 'topic' = 'mq-test', 'endpoint' = '<yourEndpoint>', 'pullIntervalMs' = '1000', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'consumerGroup' = 'mq-group', 'fieldDelimiter' = ',' );バイナリ形式
RocketMQ 5.x
CREATE TEMPORARY TABLE source_table ( mess varbinary ) WITH ( 'connector' = 'mq5', 'endpoint' = '<yourEndpoint>', 'topic' = 'mq-test', 'consumerGroup' = 'mq-group' ); CREATE TEMPORARY TABLE out_table ( commodity varchar ) WITH ( 'connector' = 'print' ); INSERT INTO out_table select cast(mess as varchar) FROM source_table;ApsaraMQ for RocketMQ 4.x
CREATE TEMPORARY TABLE source_table ( mess varbinary ) WITH ( 'connector' = 'mq', 'endpoint' = '<yourEndpoint>', 'pullIntervalMs' = '500', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'topic' = 'mq-test', 'consumerGroup' = 'mq-group' ); CREATE TEMPORARY TABLE out_table ( commodity varchar ) WITH ( 'connector' = 'print' ); INSERT INTO out_table select cast(mess as varchar) FROM source_table;
結果テーブルの例
シンクテーブルを作成する
ApsaraMQ for RocketMQ 5.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR ) WITH ( 'connector'='mq5', 'endpoint'='<yourEndpoint>', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );ApsaraMQ for RocketMQ 4.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR ) WITH ( 'connector'='mq', 'endpoint'='<yourEndpoint>', 'accessId'='${secret_values.ak_id}', 'accessKey'='${secret_values.ak_secret}', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );説明ApsaraMQ for RocketMQ メッセージがバイナリ形式の場合、DDL 文で定義できるフィールドは 1 つだけで、フィールドタイプは VARBINARY である必要があります。
keysおよびtagsフィールドを RocketMQ メッセージのキーおよびタグとして指定する結果テーブルを作成しますApsaraMQ for RocketMQ 5.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR, keys VARCHAR METADATA, tags VARCHAR METADATA ) WITH ( 'connector'='mq5', 'endpoint'='<yourEndpoint>', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );RocketMQ 4.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR, keys VARCHAR METADATA, tags VARCHAR METADATA ) WITH ( 'connector'='mq', 'endpoint'='<yourEndpoint>', 'accessId'='${secret_values.ak_id}', 'accessKey'='${secret_values.ak_secret}', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );
DataStream API
DataStream API を使用してデータを読み書きする場合、対応する DataStream コネクタを使用して完全マネージド型 Flink に接続する必要があります。DataStream コネクタの設定方法の詳細については、「DataStream コネクタの使用」をご参照ください。
Ververica Runtime (VVR) は、ApsaraMQ for RocketMQ からデータを読み取るための MetaQSource を提供します。また、ApsaraMQ for RocketMQ にデータを書き込むための OutputFormat クラスの実装である MetaQOutputFormat も提供します。次のコードは、ApsaraMQ for RocketMQ からデータを読み取り、ApsaraMQ for RocketMQ にデータを書き込む方法の例を示しています。
ApsaraMQ for RocketMQ 5.x
ApsaraMQ for RocketMQ 5.x では、AccessKey ペアはインスタンスに設定されたユーザー名とパスワードに対応します。内部ネットワーク経由で ApsaraMQ for RocketMQ インスタンスにアクセスし、インスタンスで Access Control List (ACL) 認証が有効になっていない場合、AccessKey ペアパラメーターを設定する必要はありません。
import com.alibaba.ververica.connectors.common.sink.OutputFormatSinkFunction;
import com.alibaba.ververica.connectors.mq5.shaded.org.apache.rocketmq.common.message.MessageExt;
import com.alibaba.ververica.connectors.mq5.sink.RocketMQOutputFormat;
import com.alibaba.ververica.connectors.mq5.source.RocketMQSource;
import com.alibaba.ververica.connectors.mq5.source.reader.deserializer.RocketMQRecordDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Collections;
import java.util.List;
/**
* RocketMQ からメッセージを消費し、メッセージを変換してから、RocketMQ にメッセージを生成する方法を示すデモです。
*/
public class RocketMQ5DataStreamDemo {
public static final String ENDPOINT = "<yourEndpoint>";
public static final String ACCESS_ID = "<accessID>";
public static final String ACCESS_KEY = "<accessKey>";
public static final String SOURCE_TOPIC = "<sourceTopicName>";
public static final String CONSUMER_GROUP = "<consumerGroup>";
public static final String SINK_TOPIC = "<sinkTopicName>";
public static final String PRODUCER_GROUP = "<producerGroup>";
public static void main(String[] args) throws Exception {
// ストリーミング実行環境をセットアップします
Configuration conf = new Configuration();
// 次の 2 つの構成は、ローカルデバッグ専用です。ジョブをパッケージ化して Realtime Compute for Apache Flink にアップロードする前に削除してください。
conf.setString("pipeline.classpaths", "file://" + "the absolute path of the uber JAR file");
conf.setString(
"classloader.parent-first-patterns.additional",
"com.alibaba.ververica.connectors.mq5.source.reader.deserializer.RocketMQRecordDeserializationSchema;com.alibaba.ververica.connectors.mq5.shaded.");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
final DataStreamSource<String> ds =
env.fromSource(
RocketMQSource.<String>builder()
.setEndpoint(ENDPOINT)
.setAccessId(ACCESS_ID)
.setAccessKey(ACCESS_KEY)
.setTopic(SOURCE_TOPIC)
.setConsumerGroup(CONSUMER_GROUP)
.setDeserializationSchema(new MyDeserializer())
.setStartOffset(1)
.build(),
WatermarkStrategy.noWatermarks(),
"source");
ds.map(new ToMessage())
.addSink(
new OutputFormatSinkFunction<>(
new RocketMQOutputFormat.Builder()
.setEndpoint(ENDPOINT)
.setAccessId(ACCESS_ID)
.setAccessKey(ACCESS_KEY)
.setTopicName(SINK_TOPIC)
.setProducerGroup(PRODUCER_GROUP)
.build()));
env.execute();
}
private static class MyDeserializer implements RocketMQRecordDeserializationSchema<String> {
@Override
public void deserialize(List<MessageExt> record, Collector<String> out) {
for (MessageExt messageExt : record) {
out.collect(new String(messageExt.getBody()));
}
}
@Override
public TypeInformation<String> getProducedType() {
return Types.STRING;
}
}
private static class ToMessage implements MapFunction<String, List<MessageExt>> {
public ToMessage() {
}
@Override
public List<MessageExt> map(String s) {
final MessageExt message = new MessageExt();
message.setBody(s.getBytes());
message.setWaitStoreMsgOK(true);
return Collections.singletonList(message);
}
}
}ApsaraMQ for RocketMQ 4.x
import com.alibaba.ververica.connector.mq.shaded.com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.ververica.connectors.common.sink.OutputFormatSinkFunction;
import com.alibaba.ververica.connectors.metaq.sink.MetaQOutputFormat;
import com.alibaba.ververica.connectors.metaq.source.MetaQSource;
import com.alibaba.ververica.connectors.metaq.source.reader.deserializer.MetaQRecordDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
import static com.alibaba.ververica.connector.mq.shaded.com.taobao.metaq.client.ExternConst.*;
/**
* RocketMQ からメッセージを消費し、メッセージを変換してから、RocketMQ にメッセージを生成する方法を示すデモです。
*/
public class RocketMQDataStreamDemo {
public static final String ENDPOINT = "<yourEndpoint>";
public static final String ACCESS_ID = "<accessID>";
public static final String ACCESS_KEY = "<accessKey>";
public static final String INSTANCE_ID = "<instanceID>";
public static final String SOURCE_TOPIC = "<sourceTopicName>";
public static final String CONSUMER_GROUP = "<consumerGroup>";
public static final String SINK_TOPIC = "<sinkTopicName>";
public static final String PRODUCER_GROUP = "<producerGroup>";
public static void main(String[] args) throws Exception {
// ストリーミング実行環境をセットアップします
Configuration conf = new Configuration();
// 次の 2 つの構成は、ローカルデバッグ専用です。ジョブをパッケージ化して Realtime Compute for Apache Flink にアップロードする前に削除してください。
conf.setString("pipeline.classpaths", "file://" + "the absolute path of the uber JAR file");
conf.setString("classloader.parent-first-patterns.additional",
"com.alibaba.ververica.connectors.metaq.source.reader.deserializer.MetaQRecordDeserializationSchema;com.alibaba.ververica.connector.mq.shaded.");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
// RocketMQ ソースを作成して追加します。
env.fromSource(createRocketMQSource(), WatermarkStrategy.noWatermarks(), "source")
// メッセージ本文を大文字に変換します。
.map(RocketMQDataStreamDemo2::convertMessages)
// RocketMQ シンクを作成して追加します。
.addSink(new OutputFormatSinkFunction<>(createRocketMQOutputFormat()))
.name(RocketMQDataStreamDemo2.class.getSimpleName());
// ジョブをコンパイルして送信します。
env.execute("RocketMQ connector end-to-end DataStream demo");
}
private static MetaQSource<MessageExt> createRocketMQSource() {
Properties mqProperties = createMQProperties();
return new MetaQSource<>(SOURCE_TOPIC,
CONSUMER_GROUP,
null, // 常に null
null, // 消費するメッセージのタグ
Long.MAX_VALUE, // 停止タイムスタンプ (ミリ秒)
-1, // 開始タイムスタンプ (ミリ秒)。-1 に設定すると、オフセットからの開始を無効にします。
0, // 開始オフセット。
300_000, // パーティション検出間隔。
mqProperties,
Boundedness.CONTINUOUS_UNBOUNDED,
new MyDeserializationSchema());
}
private static MetaQOutputFormat createRocketMQOutputFormat() {
return new MetaQOutputFormat.Builder()
.setTopicName(SINK_TOPIC)
.setProducerGroup(PRODUCER_GROUP)
.setMqProperties(createMQProperties())
.build();
}
private static Properties createMQProperties() {
Properties properties = new Properties();
properties.put(PROPERTY_ONS_CHANNEL, "ALIYUN");
properties.put(NAMESRV_ADDR, ENDPOINT);
properties.put(PROPERTY_ACCESSKEY, ACCESS_ID);
properties.put(PROPERTY_SECRETKEY, ACCESS_KEY);
properties.put(PROPERTY_ROCKET_AUTH_ENABLED, true);
properties.put(PROPERTY_INSTANCE_ID, INSTANCE_ID);
return properties;
}
private static List<MessageExt> convertMessages(MessageExt messages) {
return Collections.singletonList(messages);
}
public static class MyDeserializationSchema implements MetaQRecordDeserializationSchema<MessageExt> {
@Override
public void deserialize(List<MessageExt> list, Collector<MessageExt> collector) {
for (MessageExt messageExt : list) {
collector.collect(messageExt);
}
}
@Override
public TypeInformation<MessageExt> getProducedType() {
return TypeInformation.of(MessageExt.class);
}
}
}
}
}XML
MQ 4.x: MQ DataStream コネクタ。
MQ 5.x: MQ DataStream コネクタ。
<!--MQ 5.x-->
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mq5</artifactId>
<version>${vvr-version}</version>
<scope>provided</scope>
</dependency>
<!--MQ 4.x-->
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mq</artifactId>
<version>${vvr-version}</version>
</dependency>ApsaraMQ for RocketMQ のエンドポイントの設定方法の詳細については、「TCP 内部エンドポイントの設定に関するお知らせ」をご参照ください。
よくある質問
Topic がスケールアウトされたときに、ApsaraMQ for RocketMQ は Topic パーティション数の変更をどのように検出しますか?