ApsaraMQ for RocketMQ Java SDK では、プッシュモードまたはプルモードでメッセージの送信および消費が可能です。本トピックでは、利用可能なメソッドおよび構成パラメーターについて説明します。
プッシュモードとプルモード
ApsaraMQ for RocketMQ では、コンシューマーへのメッセージ配信を以下の 2 つのモードのいずれかで実行します。
プッシュモード:ブローカーがメッセージの利用可能状態に応じて、コンシューマーへ自動的にメッセージをプッシュします。このモードではバッチ消費をサポートしており、単一のバッチで複数のメッセージを配信できます。
プルモード:コンシューマーが要求に応じてブローカーから新規メッセージをポーリング(取得)します。このモードでは、読み取り対象のパーティション、メッセージ取得タイミング、およびオフセット管理方法を直接制御できます。
プルモードを利用するには、Enterprise Platinum Edition インスタンスが必要です。
接続パラメーター
プロデューサーおよびコンシューマーの両方で、以下のパラメーターを構成します。
| パラメーター | 説明 | デフォルト値 |
|---|---|---|
NAMESRV_ADDR | ApsaraMQ for RocketMQ インスタンスの TCP エンドポイントです。コンソールのインスタンスの詳細ページで確認できます。 | -- |
AccessKey | 認証時の一意の識別子として使用される AccessKey ID です。「AccessKey ペアの作成」をご参照ください。 | -- |
SecretKey | 認証時のパスワードとして使用される AccessKey Secret です。「AccessKey ペアの作成」をご参照ください。 | -- |
OnsChannel | ユーザーチャンネルです。CloudTmall ユーザーの場合は CLOUD を設定します。 | ALIYUN |
プロデューサーメソッド
プロデューサーインターフェイスでは、以下のメソッドが提供されます。

プロデューサーパラメーター
| パラメーター | 説明 | デフォルト値 | 単位 |
|---|---|---|---|
SendMsgTimeoutMillis | メッセージ送信のタイムアウト時間です。 | -- | ミリ秒 |
CheckImmunityTimeInSeconds | トランザクションメッセージのステータスをブローカーが初めて確認するまでの最小待機時間です。 | -- | 秒 |
shardingKey | 順序付きメッセージの配信先パーティションを決定するパーティションキーです。 | -- | -- |
コンシューマーメソッド
プッシュモード
プッシュコンシューマーインターフェイスでは、以下のメソッドが提供されます。

プルモード
プルコンシューマーインターフェイスでは、以下のメソッドが提供されます。

PullConsumer インターフェイス
public interface PullConsumer extends Admin {
/**
* 指定されたトピックのすべてのパーティションを返します。
* このメソッドは、プルコンシューマーが起動した後にのみ呼び出してください。
*/
Set<TopicPartition> topicPartitions(String topic);
/**
* パーティションをこのコンシューマーに手動で割り当てます。
* 自動リバランスは実行されません。すべてのパーティションが
* コンシューマー間でカバーされていることを確認してください。
* このメソッドを再度呼び出すと、以前の割り当てが完全に置き換えられます。
*/
void assign(Collection<TopicPartition> topicPartitions);
/**
* メッセージをポーリングします。最大 maxBatchMessageCount 個のメッセージを
* 返し、指定されたタイムアウト(ミリ秒単位)までブロックします。
*/
List<Message> poll(long timeout);
/**
* パーティションのコンシューマーオフセットを特定の位置にリセットします。
* オフセットは、パーティションの最小および最大オフセットの範囲内である必要があります。
* このメソッドは、コンシューマーが起動済みであり、対象パーティションが割り当て済みである場合にのみ呼び出してください。
*/
void seek(TopicPartition topicPartition, long offset);
/**
* パーティションのコンシューマーオフセットを、利用可能な最も古いメッセージの位置にリセットします。
* 対象パーティションが割り当て済みの起動済みコンシューマーが必要です。
*/
void seekToBeginning(TopicPartition topicPartition);
/**
* パーティションのコンシューマーオフセットを、最新のメッセージの位置にリセットします。
* 対象パーティションが割り当て済みの起動済みコンシューマーが必要です。
*/
void seekToEnd(TopicPartition topicPartition);
/**
* 指定されたパーティションでのメッセージ消費を一時停止します。
*/
void pause(Collection<TopicPartition> topicPartitions);
/**
* 以前に一時停止したパーティションでのメッセージ消費を再開します。
*/
void resume(Collection<TopicPartition> topicPartitions);
/**
* 指定されたパーティション内で、与えられたタイムスタンプ以降(または該当する最初の位置)に格納された
* 最初のメッセージのオフセットを返します。タイムスタンプは、メッセージがブローカーに格納された時刻を指し、
* 送信時刻ではありません。
*/
Long offsetForTimestamp(TopicPartition topicPartition, Long timestamp);
/**
* 指定されたパーティションの最新のコンシューマーオフセットを返します。
*/
Long committed(TopicPartition topicPartition);
/**
* 現在のコンシューマーオフセットを同期的にコミットします。
* オフセットはまずローカルクライアントに同期され、その後非同期でブローカーに書き込まれます。
*/
void commitSync();
/**
* パーティション変更イベント用のリスナーインターフェイスです。
*/
interface TopicPartitionChangeListener {
/**
* トピックのパーティションが変更されたときに呼び出されます(例:ブローカーのスケーリングにより
* パーティション数が調整された場合)。
*/
void onChanged(Set<TopicPartition> topicPartitions);
}
/**
* トピックのパーティション数が変更されたときに発火するリスナーを登録します(例:ブローカーのスケーリングによる変更)。
* デフォルトでは、コールバックの最大遅延は 5 秒です。
*/
void registerTopicPartitionChangedListener(String topic,
TopicPartitionChangeListener callback);
}コンシューマーパラメーター
共通パラメーター
以下のパラメーターは、プッシュコンシューマーおよびプルコンシューマーの両方に適用されます。
| パラメーター | 説明 | デフォルト値 | 有効な値 | 単位 |
|---|---|---|---|---|
GROUP_ID | ApsaraMQ for RocketMQ コンソールで作成したコンシューマーグループ ID です。「用語」をご参照ください。 | -- | -- | -- |
MessageModel | 消費モデルです。有効な値は CLUSTERING(クラスター消費)および BROADCASTING(ブロードキャスト消費)です。 | CLUSTERING | CLUSTERING、BROADCASTING | -- |
ConsumeThreadNums | コンシューマーがメッセージ処理に使用するスレッド数です。 | 20 | -- | -- |
MaxReconsumeTimes | メッセージの消費に失敗した場合の最大リトライ回数です。 | 16 | -- | -- |
ConsumeTimeout | 単一のメッセージを消費する際に許容される最大時間です。この時間を超えると、再試行間隔を経てメッセージが再配信されます。ビジネスロジックに応じて適切に設定してください。 | 15 | -- | 分 |
suspendTimeMillis | 順序付きメッセージの消費に失敗した場合の再試行間隔です。 | -- | -- | -- |
maxCachedMessageAmount | ローカルコンシューマークライアント上でキャッシュされるメッセージの最大数です。クォータは、購読中のすべてのトピックに均等に分割されます。たとえば、コンシューマーが 2 つのトピックを購読しており、この値が 1,000 の場合、各トピックは最大 500 メッセージをキャッシュできます。また、コンシューマークライアントが一度に複数のメッセージをプルする場合、実際のキャッシュメッセージ数がこの値を超えることがあります。コンシューマーが 1 秒間に処理するメッセージ数の約 2 倍を設定することを推奨します。 | 5,000 | 100~50,000 | -- |
maxCachedMessageSizeInMiB | ローカルコンシューマークライアント上でキャッシュされるメッセージの合計サイズの上限です。 | 512 | 16~2,048 | MiB |
maxCachedMessageAmount または maxCachedMessageSizeInMiB の値を高すぎると、クライアントでメモリ不足 (OOM) エラーが発生する可能性があります。
バッチプッシュパラメーター
以下のパラメーターは、プッシュモードにおいてブローカーがメッセージをバッチ処理して配信する際の動作を制御します。
| パラメーター | 説明 | デフォルト値 | 有効な値 | 単位 |
|---|---|---|---|---|
ConsumeMessageBatchMaxSize | 単一のバッチでコンシューマーに配信されるキャッシュ済みメッセージの最大数です。キャッシュがこのしきい値に達すると、すべてのキャッシュ済みメッセージが一度にプッシュされます。 | 32 | 1~1,024 | -- |
BatchConsumeMaxAwaitDurationInSeconds | キャッシュ済みメッセージがコンシューマーに一度にプッシュされるまでの最大待機時間です。 | 0 | 0~450 | 秒 |
プルモードパラメーター
以下のパラメーターは、プルコンシューマーにのみ適用されます。
| パラメーター | 説明 | デフォルト値 | 有効な値 | 単位 |
|---|---|---|---|---|
maxCachedMessageSizeInMiB | ローカルクライアント上で各パーティションごとにキャッシュされるメッセージの最大サイズです。 | 100 | 16~2,048 | MiB |
autoCommit | コンシューマーオフセットを自動コミットするかどうかを指定します。 | true | true、false | -- |
autoCommitIntervalMillis | 自動オフセットコミットの間隔です。 | 5 | -- | 秒 |
pollTimeoutMillis | poll() 呼び出しのタイムアウトです。 | 5 | -- | 秒 |
maxCachedMessageSizeInMiB の値を高すぎると、クライアントで OOM エラーが発生する可能性があります。
パーティションおよびオフセットに関する詳細については、「用語」をご参照ください。
参照
メッセージの送信および消費に関するサンプルコードは、以下のとおりです。