すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:ApsaraMQ for RocketMQ

最終更新日:Nov 09, 2025

このトピックでは、ApsaraMQ for RocketMQ コネクタについて説明します。

重要

ApsaraMQ for RocketMQ 4.x Standard Edition インスタンスには、毎秒 5,000 の共有エラスティック API 呼び出し制限があります。このバージョンのメッセージングミドルウェアを使用して Realtime Compute for Apache Flink に接続すると、この制限を超えるとスロットリングメカニズムがトリガーされ、Flink ジョブが不安定になる可能性があります。したがって、ApsaraMQ for RocketMQ 4.x Standard Edition を使用する影響を評価することをお勧めします。ビジネスシナリオで許可されている場合は、代替として Kafka、Simple Log Service (SLS)、DataHub などの他のミドルウェアの使用を検討してください。大規模なメッセージを処理するために ApsaraMQ for RocketMQ 4.x Standard Edition を使用する必要がある場合は、チケットを送信して ApsaraMQ for RocketMQ プロダクトチームに連絡し、スロットリング制限の引き上げをリクエストすることもできます。

背景情報

ApsaraMQ for RocketMQ は、Alibaba Cloud が Apache RocketMQ をベースに開発した分散ミドルウェアサービスです。低レイテンシー、高い同時実行性、高可用性 (HA)、および高い信頼性を提供します。分散アプリケーションに非同期デカップリングとピーク負荷シフトを提供します。また、大量のメッセージ蓄積、高スループット、信頼性の高いリトライなど、インターネットアプリケーションに必要な機能もサポートしています。

RocketMQ コネクタは、以下をサポートしています。

カテゴリ

詳細

サポートされているタイプ

ソーステーブルとシンクテーブル

実行モード

ストリームモードのみがサポートされています。

データ形式

CSV およびバイナリ形式

特定の監視メトリック

監視メトリック

  • ソーステーブル

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerScond

    • currentEmitEventTimeLag

    • currentFetchEventTimeLag

    • sourceIdleTime

  • シンクテーブル

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

説明

メトリックの詳細については、「メトリックの説明」をご参照ください。

API タイプ

DataStream (RocketMQ 4.x のみ) と SQL

結果テーブルのデータの更新または削除をサポート

結果テーブルのデータの更新または削除はサポートされていません。データの挿入のみをサポートします。

特徴

ApsaraMQ for RocketMQ のソーステーブルと結果テーブルは、プロパティフィールドをサポートしています。

  • ソーステーブルのプロパティフィールド

    フィールド名

    フィールドタイプ

    説明

    topic

    VARCHAR METADATA VIRTUAL

    メッセージトピック。

    queue-id

    INT METADATA VIRTUAL

    メッセージキュー ID。

    queue-offset

    BIGINT METADATA VIRTUAL

    メッセージキューのコンシューマオフセット。

    msg-id

    VARCHAR METADATA VIRTUAL

    メッセージ ID。

    store-timestamp

    TIMESTAMP(3) METADATA VIRTUAL

    メッセージが保存された時刻。

    born-timestamp

    TIMESTAMP(3) METADATA VIRTUAL

    メッセージが生成された時刻。

    keys

    VARCHAR METADATA VIRTUAL

    メッセージキー。

    tags

    VARCHAR METADATA VIRTUAL

    メッセージタグ。

  • 結果テーブルのプロパティフィールド

    フィールド名

    フィールドタイプ

    説明

    keys

    VARCHAR METADATA

    メッセージキー。

    tags

    VARCHAR METADATA

    メッセージタグ。

前提条件

ApsaraMQ for RocketMQ リソースが作成されていること。詳細については、「リソースの作成」をご参照ください。

制限事項

  • Realtime Compute for Apache Flink の Ververica Runtime (VVR) 8.0.3 以降のみが ApsaraMQ for RocketMQ 5.x をサポートします。

  • ApsaraMQ for RocketMQ コネクタは、プルコンシューマーを使用してメッセージを消費します。すべてのサブタスクが消費負荷を共有します。

構文

CREATE TABLE mq_source(
  x varchar,
  y varchar,
  z varchar
) WITH (
  'connector' = 'mq5',
  'topic' = '<yourTopicName>',
  'endpoint' = '<yourEndpoint>',
  'consumerGroup' = '<yourConsumerGroup>'
);

WITH パラメーター

全般

パラメーター

説明

データ型

必須

デフォルト値

備考

connector

コネクタタイプ。

文字列

はい

-

  • ApsaraMQ for RocketMQ 4.x の場合、このパラメーターを mq に設定します。

  • ApsaraMQ for RocketMQ 5.x の場合、このパラメーターを mq5 に設定します。

endPoint

エンドポイントアドレス。

文字列

はい

-

ApsaraMQ for RocketMQ エンドポイントは、次のいずれかのタイプになります。

  • 内部 ApsaraMQ for RocketMQ サービス (Alibaba Cloud クラシックネットワークまたは VPC) のエンドポイント: ApsaraMQ for RocketMQ コンソールで、宛先インスタンスの詳細ページに移動します。[エンドポイント] > [TCP プロトコルクライアントアクセスポイント] > [内部ネットワークアクセス] を選択してエンドポイントを取得します。

  • パブリック ApsaraMQ for RocketMQ サービスのエンドポイント: ApsaraMQ for RocketMQ コンソールで、宛先インスタンスの詳細ページに移動します。[エンドポイント] > [TCP プロトコル] > [クライアントアクセスポイント] > [パブリックネットワークアクセス] を選択してエンドポイントを取得します。

重要

Alibaba Cloud ネットワークセキュリティポリシーの動的な変更により、Realtime Compute for Apache Flink がパブリック ApsaraMQ for RocketMQ サービスに接続する際にネットワーク接続の問題が発生する可能性があります。内部 ApsaraMQ for RocketMQ サービスを使用することをお勧めします。

  • 内部サービスはクロスドメインアクセスをサポートしていません。たとえば、Realtime Compute for Apache Flink サービスが中国 (杭州) リージョンにあり、ApsaraMQ for RocketMQ サービスが中国 (上海) リージョンにある場合、アクセスは拒否されます。

  • インターネット経由で ApsaraMQ for RocketMQ にアクセスするには、パブリックネットワークアクセス機能を有効にする必要があります。詳細については、「ネットワーク接続タイプの選択」をご参照ください。

topic

トピック名。

文字列

はい

なし

なし。

accessId

  • 4.x: Alibaba Cloud アカウントの AccessKey ID。

  • 5.x:

    5.x: ApsaraMQ for RocketMQ インスタンスのユーザー名。

文字列

  • ApsaraMQ for RocketMQ 4.x: はい

  • ApsaraMQ for RocketMQ 5.x: いいえ

なし

重要

AccessKey 情報の漏洩を防ぐために、変数を使用して AccessKey ペアを指定します。詳細については、「プロジェクト変数」をご参照ください。

  • ApsaraMQ for RocketMQ 5.x: パブリックネットワークアクセスのエンドポイントからインスタンスにアクセスする場合、このパラメーターをコンソールで ApsaraMQ for RocketMQ インスタンスのユーザー名に設定する必要があります。内部ネットワーク経由で Alibaba Cloud ECS インスタンスからインスタンスにアクセスする場合、このパラメーターを設定する必要はありません。

accessKey

  • 4.x: Alibaba Cloud アカウントの AccessKey Secret。

  • 5.x: インスタンスのパスワード。

文字列

  • ApsaraMQ for RocketMQ 4.x: はい

  • ApsaraMQ for RocketMQ 5.x: いいえ

-

tag

サブスクライブまたは書き込むタグ。

文字列

いいえ

-

  • ApsaraMQ for RocketMQ をソーステーブルとして使用する場合、1 つのタグのみを読み取ることができます。

  • ApsaraMQ for RocketMQ を結果テーブルとして使用する場合、複数のタグを設定できます。タグはコンマ (,) で区切ります。

説明

結果テーブルとして使用する場合、このパラメーターは ApsaraMQ for RocketMQ 4.x でのみサポートされます。ApsaraMQ for RocketMQ 5.x の場合、結果テーブルのプロパティフィールドを使用して出力メッセージのタグを指定します。

encoding

エンコード形式。

文字列

いいえ

UTF-8

なし。

instanceID

ApsaraMQ for RocketMQ インスタンス ID。

文字列

いいえ

-

  • ApsaraMQ for RocketMQ インスタンスに独立した名前空間がない場合、instanceID パラメーターは使用できません。

  • ApsaraMQ for RocketMQ インスタンスに独立した名前空間がある場合、instanceID パラメーターは必須です。

説明

このパラメーターは ApsaraMQ for RocketMQ 4.x でのみサポートされます。

ソーステーブル固有

パラメーター

説明

データ型

必須

デフォルト値

備考

consumerGroup

コンシューマーグループの名前。

文字列

はい

-

なし。

pullIntervalMs

アップストリームから消費できるデータがない場合のソースの休止期間。

整数

はい

なし

単位: ミリ秒。

スロットリングメカニズムはありません。ApsaraMQ for RocketMQ からデータを読み取るレートを設定することはできません。

説明

このパラメーターは ApsaraMQ for RocketMQ 4.x でのみサポートされます。

timeZone

タイムゾーン。

文字列

いいえ

-

例: Asia/Shanghai。

startTimeMs

開始時刻。

ロング

いいえ

なし

UNIX タイムスタンプ。単位: ミリ秒。

startMessageOffset

メッセージの開始オフセット。

整数

いいえ

-

このパラメーターが設定されている場合、データ読み込みは優先的に startMessageOffset のオフセットから開始されます。

lineDelimiter

ブロックを解析するために使用される行区切り文字。

文字列

いいえ

\n

なし。

fieldDelimiter

フィールド区切り文字。

文字列

いいえ

\u0001

区切り文字は、ApsaraMQ for RocketMQ クライアントのモードによって異なります。

  • 読み取り専用モード (デフォルトモード) では、デリミタは \u0001 です。このモードでは、デリミタは表示されません。

  • 編集モードでは、区切り文字は ^A です。

lengthCheck

1 行のフィールド数を確認するポリシー。

整数

いいえ

NONE

有効な値:

  • NONE: これがデフォルト値です。

    • 解析されたフィールドの数が定義されたフィールドの数より大きい場合、定義された数のフィールドが左から右に取得されます。

    • 解析されたフィールドの数が定義されたフィールドの数より少ない場合、その行のデータはスキップされます。

  • SKIP: 解析されたフィールドの数が定義されたフィールドの数と異なる場合、データはスキップされます。

  • EXCEPTION: 解析されたフィールドの数が定義されたフィールドの数と異なる場合、例外が返されます。

  • PAD: フィールドは左から右に埋められます。

    • 解析されたフィールドの数が定義されたフィールドの数より大きい場合、定義された数のフィールドが左から右に取得されます。

    • 解析されたフィールドの数が定義されたフィールドの数より少ない場合、行の末尾にある欠落しているフィールドは null 値で埋められます。

columnErrorDebug

デバッグを有効にするかどうかを指定します。

ブール値

いいえ

false

このパラメーターを true に設定すると、解析例外のログが出力されます。

pullBatchSize

一度にプルするメッセージの最大数。

Int

いいえ

64

このパラメーターは、Realtime Compute for Apache Flink の VVR 8.0.7 以降でサポートされています。

結果テーブル固有

パラメーター

説明

データ型

必須

デフォルト値

備考

producerGroup

書き込み先のグループ。

String

はい

-

なし。

retryTimes

書き込み操作のリトライ回数。

Int

いいえ

10

なし。

sleepTimeMs

再試行間隔。

Long

いいえ

5000

なし。

partitionField

パーティションキー列として使用するフィールドの名前を指定します。

String

いいえ

なし

modepartition に設定されている場合、このパラメーターは必須です。

説明

このパラメーターは、Realtime Compute for Apache Flink の VVR 8.0.5 以降でサポートされています。

deliveryTimestampMode

遅延メッセージのモードを指定します。このパラメーターは、deliveryTimestampValue パラメーターとともに、遅延メッセージの配信時間を決定します。

String

いいえ

-

有効な値:

  • fixed: 固定タイムスタンプモード。

  • relative: 相対遅延時間モード。

  • field: 指定されたフィールドを配信時間として使用するモード。

説明

このパラメーターは、Realtime Compute for Apache Flink の VVR 11.1 以降でサポートされています。

deliveryTimestampType

遅延メッセージの時間ベースタイプを指定します。

String

いいえ

processing_time

有効な値:

  • event_time: イベント時間。

  • processing_time: 処理時間。

説明

このパラメーターは、Realtime Compute for Apache Flink の VVR 11.1 以降でサポートされています。

deliveryTimestampValue

遅延メッセージの配信時間。

Long

いいえ

なし

このパラメーターの意味は、deliveryTimestampMode の値によって異なります。

  • deliveryTimestampMode=fixed: メッセージは、指定されたタイムスタンプ (ミリ秒) まで遅延されます。現在の時刻が指定されたタイムスタンプより後の場合、メッセージはすぐに配信されます。

  • deliveryTimestampMode=relative: deliveryTimestampType で指定された時間タイプに基づく遅延時間。デフォルトの単位はミリ秒です。

  • deliveryTimestampMode=field: このパラメーターは効果がありません。遅延時間は、deliveryTimestampField で指定されたフィールドの値によって決まります。

説明

このパラメーターは、Realtime Compute for Apache Flink の VVR 11.1 以降でサポートされています。

deliveryTimestampField

遅延メッセージの配信時間に使用するフィールドを指定します。フィールドタイプは BIGINT である必要があります。

String

いいえ

-

このパラメーターは、deliveryTimestampModefield に設定されている場合にのみ有効です。

説明

このパラメーターは、Realtime Compute for Apache Flink の VVR 11.1 以降でサポートされています。

タイプマッピング

Flink フィールドタイプ

ApsaraMQ for RocketMQ フィールドタイプ

BOOLEAN

STRING

VARBINARY

VARCHAR

TINYINT

INTEGER

BIGINT

FLOAT

DOUBLE

DECIMAL

コード例

ソーステーブルの例

  • CSV 形式

    CSV 形式で次のメッセージレコードがあると仮定します。

    1,name,male 
    2,name,female
    説明

    RocketMQ メッセージには、\n で区切られた 0 個以上のデータレコードを含めることができます。

    次のデータ定義言語 (DDL) 文は、Flink ジョブで ApsaraMQ for RocketMQ ソーステーブルを宣言する方法を示しています。

    • ApsaraMQ for RocketMQ 5.x

    CREATE TABLE mq_source(
      id varchar,
      name varchar,
      gender varchar,
      topic varchar metadata virtual
    ) WITH (
      'connector' = 'mq5',
      'topic' = 'mq-test',
      'endpoint' = '<yourEndpoint>',
      'consumerGroup' = 'mq-group',
      'fieldDelimiter' = ','
    );
    • RocketMQ 4.x

    CREATE TABLE mq_source(
      id varchar,
      name varchar,
      gender varchar,
      topic varchar metadata virtual
    ) WITH (
      'connector' = 'mq',
      'topic' = 'mq-test',
      'endpoint' = '<yourEndpoint>',
      'pullIntervalMs' = '1000',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'consumerGroup' = 'mq-group',
      'fieldDelimiter' = ','
    );
  • バイナリ形式

    • RocketMQ 5.x

      CREATE TEMPORARY TABLE source_table (
        mess varbinary
      ) WITH (
        'connector' = 'mq5',
        'endpoint' = '<yourEndpoint>',
        'topic' = 'mq-test',
        'consumerGroup' = 'mq-group'
      );
      
      CREATE TEMPORARY TABLE out_table (
        commodity varchar
      ) WITH (
        'connector' = 'print'
      );
      
      INSERT INTO out_table
      select 
        cast(mess as varchar)
      FROM source_table;
    • ApsaraMQ for RocketMQ 4.x

      CREATE TEMPORARY TABLE source_table (
        mess varbinary
      ) WITH (
        'connector' = 'mq',
        'endpoint' = '<yourEndpoint>',
        'pullIntervalMs' = '500',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'topic' = 'mq-test',
        'consumerGroup' = 'mq-group'
      );
      
      CREATE TEMPORARY TABLE out_table (
        commodity varchar
      ) WITH (
        'connector' = 'print'
      );
      
      INSERT INTO out_table
      select 
        cast(mess as varchar)
      FROM source_table;

結果テーブルの例

  • シンクテーブルを作成する

    • ApsaraMQ for RocketMQ 5.x

      CREATE TABLE mq_sink (
        id INTEGER,
        len BIGINT,
        content VARCHAR
      ) WITH (
        'connector'='mq5',
        'endpoint'='<yourEndpoint>',
        'topic'='<yourTopicName>',
        'producerGroup'='<yourGroupName>'
      );
    • ApsaraMQ for RocketMQ 4.x

      CREATE TABLE mq_sink (
        id INTEGER,
        len BIGINT,
        content VARCHAR
      ) WITH (
        'connector'='mq',
        'endpoint'='<yourEndpoint>',
        'accessId'='${secret_values.ak_id}',
        'accessKey'='${secret_values.ak_secret}',
        'topic'='<yourTopicName>',
        'producerGroup'='<yourGroupName>'
      );
      説明

      ApsaraMQ for RocketMQ メッセージがバイナリ形式の場合、DDL 文で定義できるフィールドは 1 つだけで、フィールドタイプは VARBINARY である必要があります。

  • keys および tags フィールドを RocketMQ メッセージのキーおよびタグとして指定する結果テーブルを作成します

    • ApsaraMQ for RocketMQ 5.x

      CREATE TABLE mq_sink (
        id INTEGER,
        len BIGINT,
        content VARCHAR,
        keys VARCHAR METADATA,
        tags VARCHAR METADATA
      ) WITH (
        'connector'='mq5',
        'endpoint'='<yourEndpoint>',
        'topic'='<yourTopicName>',
        'producerGroup'='<yourGroupName>'
      );
    • RocketMQ 4.x

      CREATE TABLE mq_sink (
        id INTEGER,
        len BIGINT,
        content VARCHAR,
        keys VARCHAR METADATA,
        tags VARCHAR METADATA
      ) WITH (
        'connector'='mq',
        'endpoint'='<yourEndpoint>',
        'accessId'='${secret_values.ak_id}',
        'accessKey'='${secret_values.ak_secret}',
        'topic'='<yourTopicName>',
        'producerGroup'='<yourGroupName>'
      );

DataStream API

重要

DataStream API を使用してデータを読み書きする場合、対応する DataStream コネクタを使用して完全マネージド型 Flink に接続する必要があります。DataStream コネクタの設定方法の詳細については、「DataStream コネクタの使用」をご参照ください。

Ververica Runtime (VVR) は、ApsaraMQ for RocketMQ からデータを読み取るための MetaQSource を提供します。また、ApsaraMQ for RocketMQ にデータを書き込むための OutputFormat クラスの実装である MetaQOutputFormat も提供します。次のコードは、ApsaraMQ for RocketMQ からデータを読み取り、ApsaraMQ for RocketMQ にデータを書き込む方法の例を示しています。

ApsaraMQ for RocketMQ 5.x

説明

ApsaraMQ for RocketMQ 5.x では、AccessKey ペアはインスタンスに設定されたユーザー名とパスワードに対応します。内部ネットワーク経由で ApsaraMQ for RocketMQ インスタンスにアクセスし、インスタンスで Access Control List (ACL) 認証が有効になっていない場合、AccessKey ペアパラメーターを設定する必要はありません。

import com.alibaba.ververica.connectors.common.sink.OutputFormatSinkFunction;
import com.alibaba.ververica.connectors.mq5.shaded.org.apache.rocketmq.common.message.MessageExt;
import com.alibaba.ververica.connectors.mq5.sink.RocketMQOutputFormat;
import com.alibaba.ververica.connectors.mq5.source.RocketMQSource;
import com.alibaba.ververica.connectors.mq5.source.reader.deserializer.RocketMQRecordDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Collections;
import java.util.List;
/**
 * RocketMQ からメッセージを消費し、メッセージを変換してから、RocketMQ にメッセージを生成する方法を示すデモです。
 */
public class RocketMQ5DataStreamDemo {

    public static final String ENDPOINT = "<yourEndpoint>";
    public static final String ACCESS_ID = "<accessID>";
    public static final String ACCESS_KEY = "<accessKey>";
    public static final String SOURCE_TOPIC = "<sourceTopicName>";
    public static final String CONSUMER_GROUP = "<consumerGroup>";
    public static final String SINK_TOPIC = "<sinkTopicName>";
    public static final String PRODUCER_GROUP = "<producerGroup>";

    public static void main(String[] args) throws Exception {
        // ストリーミング実行環境をセットアップします
        Configuration conf = new Configuration();

        // 次の 2 つの構成は、ローカルデバッグ専用です。ジョブをパッケージ化して Realtime Compute for Apache Flink にアップロードする前に削除してください。
        conf.setString("pipeline.classpaths", "file://" + "the absolute path of the uber JAR file");
        conf.setString(
                "classloader.parent-first-patterns.additional",
                "com.alibaba.ververica.connectors.mq5.source.reader.deserializer.RocketMQRecordDeserializationSchema;com.alibaba.ververica.connectors.mq5.shaded.");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        final DataStreamSource<String> ds =
                env.fromSource(
                        RocketMQSource.<String>builder()
                                .setEndpoint(ENDPOINT)
                                .setAccessId(ACCESS_ID)
                                .setAccessKey(ACCESS_KEY)
                                .setTopic(SOURCE_TOPIC)
                                .setConsumerGroup(CONSUMER_GROUP)
                                .setDeserializationSchema(new MyDeserializer())
                                .setStartOffset(1)
                                .build(),
                        WatermarkStrategy.noWatermarks(),
                        "source");

        ds.map(new ToMessage())
                .addSink(
                        new OutputFormatSinkFunction<>(
                                new RocketMQOutputFormat.Builder()
                                        .setEndpoint(ENDPOINT)
                                        .setAccessId(ACCESS_ID)
                                        .setAccessKey(ACCESS_KEY)
                                        .setTopicName(SINK_TOPIC)
                                        .setProducerGroup(PRODUCER_GROUP)
                                        .build()));

        env.execute();
    }

    private static class MyDeserializer implements RocketMQRecordDeserializationSchema<String> {
        @Override
        public void deserialize(List<MessageExt> record, Collector<String> out) {
            for (MessageExt messageExt : record) {
                out.collect(new String(messageExt.getBody()));
            }
        }

        @Override
        public TypeInformation<String> getProducedType() {
            return Types.STRING;
        }
    }

    private static class ToMessage implements MapFunction<String, List<MessageExt>> {

        public ToMessage() {
        }

        @Override
        public List<MessageExt> map(String s) {
            final MessageExt message = new MessageExt();
            message.setBody(s.getBytes());
            message.setWaitStoreMsgOK(true);
            return Collections.singletonList(message);
        }
    }
}

ApsaraMQ for RocketMQ 4.x

import com.alibaba.ververica.connector.mq.shaded.com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.ververica.connectors.common.sink.OutputFormatSinkFunction;
import com.alibaba.ververica.connectors.metaq.sink.MetaQOutputFormat;
import com.alibaba.ververica.connectors.metaq.source.MetaQSource;
import com.alibaba.ververica.connectors.metaq.source.reader.deserializer.MetaQRecordDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
import static com.alibaba.ververica.connector.mq.shaded.com.taobao.metaq.client.ExternConst.*;
/**
 * RocketMQ からメッセージを消費し、メッセージを変換してから、RocketMQ にメッセージを生成する方法を示すデモです。
 */
public class RocketMQDataStreamDemo {

    public static final String ENDPOINT = "<yourEndpoint>";
    public static final String ACCESS_ID = "<accessID>";
    public static final String ACCESS_KEY = "<accessKey>";
    public static final String INSTANCE_ID = "<instanceID>";
    public static final String SOURCE_TOPIC = "<sourceTopicName>";
    public static final String CONSUMER_GROUP = "<consumerGroup>";
    public static final String SINK_TOPIC = "<sinkTopicName>";
    public static final String PRODUCER_GROUP = "<producerGroup>";

    public static void main(String[] args) throws Exception {
        // ストリーミング実行環境をセットアップします
        Configuration conf = new Configuration();

        // 次の 2 つの構成は、ローカルデバッグ専用です。ジョブをパッケージ化して Realtime Compute for Apache Flink にアップロードする前に削除してください。
        conf.setString("pipeline.classpaths", "file://" + "the absolute path of the uber JAR file");
        conf.setString("classloader.parent-first-patterns.additional",
                "com.alibaba.ververica.connectors.metaq.source.reader.deserializer.MetaQRecordDeserializationSchema;com.alibaba.ververica.connector.mq.shaded.");
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        // RocketMQ ソースを作成して追加します。
        env.fromSource(createRocketMQSource(), WatermarkStrategy.noWatermarks(), "source")
                // メッセージ本文を大文字に変換します。
                .map(RocketMQDataStreamDemo2::convertMessages)
                // RocketMQ シンクを作成して追加します。
                .addSink(new OutputFormatSinkFunction<>(createRocketMQOutputFormat()))
                .name(RocketMQDataStreamDemo2.class.getSimpleName());
        // ジョブをコンパイルして送信します。
        env.execute("RocketMQ connector end-to-end DataStream demo");
    }

    private static MetaQSource<MessageExt> createRocketMQSource() {
        Properties mqProperties = createMQProperties();

        return new MetaQSource<>(SOURCE_TOPIC,
                CONSUMER_GROUP,
                null, // 常に null
                null, // 消費するメッセージのタグ
                Long.MAX_VALUE, // 停止タイムスタンプ (ミリ秒)
                -1, // 開始タイムスタンプ (ミリ秒)。-1 に設定すると、オフセットからの開始を無効にします。
                0, // 開始オフセット。
                300_000, // パーティション検出間隔。
                mqProperties,
                Boundedness.CONTINUOUS_UNBOUNDED,
                new MyDeserializationSchema());
    }

    private static MetaQOutputFormat createRocketMQOutputFormat() {
        return new MetaQOutputFormat.Builder()
                .setTopicName(SINK_TOPIC)
                .setProducerGroup(PRODUCER_GROUP)
                .setMqProperties(createMQProperties())
                .build();
    }

    private static Properties createMQProperties() {
        Properties properties = new Properties();
        properties.put(PROPERTY_ONS_CHANNEL, "ALIYUN");
        properties.put(NAMESRV_ADDR, ENDPOINT);
        properties.put(PROPERTY_ACCESSKEY, ACCESS_ID);
        properties.put(PROPERTY_SECRETKEY, ACCESS_KEY);
        properties.put(PROPERTY_ROCKET_AUTH_ENABLED, true);
        properties.put(PROPERTY_INSTANCE_ID, INSTANCE_ID);
        return properties;
    }

    private static List<MessageExt> convertMessages(MessageExt messages) {
        return Collections.singletonList(messages);
    }

    public static class MyDeserializationSchema implements MetaQRecordDeserializationSchema<MessageExt> {
        @Override
        public void deserialize(List<MessageExt> list, Collector<MessageExt> collector) {
            for (MessageExt messageExt : list) {
                collector.collect(messageExt);
            }
        }

        @Override
        public TypeInformation<MessageExt> getProducedType() {
            return TypeInformation.of(MessageExt.class);
        }
    }
}
    }
}

XML

<!--MQ 5.x-->
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mq5</artifactId>
    <version>${vvr-version}</version>
    <scope>provided</scope>
</dependency>

<!--MQ 4.x-->
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mq</artifactId>
    <version>${vvr-version}</version>
</dependency>
説明

ApsaraMQ for RocketMQ のエンドポイントの設定方法の詳細については、「TCP 内部エンドポイントの設定に関するお知らせ」をご参照ください。

よくある質問

Topic がスケールアウトされたときに、ApsaraMQ for RocketMQ は Topic パーティション数の変更をどのように検出しますか?