このトピックでは、ApsaraMQ for Kafka のトピックから OSS のオブジェクトにデータを同期するために、Object Storage Service(OSS)シンクコネクタを作成する方法について説明します。
前提条件
前提条件については、前提条件をご参照ください。
手順 1:OSSリソースの作成
OSSコンソールでバケットを作成します。詳細については、バケットの作成をご参照ください。
このトピックでは、oss-sink-connector-bucket という名前のバケットが作成されます。
手順 2:OSSシンクコネクタの作成と開始
ApsaraMQ for Kafka コンソール にログオンします。リソースの分布 セクションの 概要 ページで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。
- 左側のナビゲーションペインで、 を選択します。
- [メッセージアウトフロー] ページで、[タスクの作成] をクリックします。
[基本情報] 手順で、[タスク名] フィールドにタスク名を指定し、OSS[メッセージアウトフロータスクタイプ] ドロップダウンリストから を選択します。
[リソース設定] 手順で、パラメータを設定します。次の表にパラメータを示します。
表 1. ApsaraMQ for Kafka 用に構成されたパラメータ
パラメータ
説明
例
リージョン
Message Queue for Apache Kafka インスタンスが存在するリージョンを選択します。
中国 (杭州)
Message Queue for Apache Kafka インスタンス
Message Queue for Apache Kafka インスタンスの ID。
alikafka_post-cn-9hdsbdhd****
トピック
Message Queue for Apache Kafka インスタンスのソース トピック。
guide-sink-topic
グループ ID
Message Queue for Apache Kafka インスタンスのグループの ID。
[クイック作成]: システムは、ID が GID_EVENTBRIDGE_xxx 形式のグループを自動的に作成します。
[既存のグループを使用]: 使用されていない既存のグループの ID を選択します。使用中の既存のグループを選択すると、既存のメッセージの発行とサブスクリプションに影響します。
既存のグループを使用
同時実行クォータ (コンシューマー)
トピックのデータを同時に消費するスレッドの数。次の項目では、トピック内のスレッドとパーティション間のマッピングについて説明します。
トピックのパーティション数が、トピックのデータを同時に消費するスレッド数と同じ場合、1 つのスレッドが 1 つのパーティションのデータを消費します。この構成を使用することをお勧めします。
トピックのパーティション数が、トピックのデータを同時に消費するスレッド数よりも大きい場合、スレッドはすべてのパーティションのデータを均等に消費します。
トピックのパーティション数が、トピックのデータを同時に消費するスレッド数よりも小さい場合、1 つのスレッドが 1 つのパーティションのデータを消費します。追加のスレッドはデータを消費しません。
2
コンシューマー オフセット
[最新のオフセット]: 最新のオフセットからメッセージを消費します。
[最も古いオフセット]: 最も古いオフセットからメッセージを消費します。
最新のオフセット
ネットワーク構成
国境を越えたデータ転送が必要な場合は、[インターネット] を選択します。それ以外の場合は、[デフォルト ネットワーク] を選択します。
デフォルト ネットワーク
表 1. OSS 用に構成されたパラメータ
パラメータ
説明
例
OSSバケット
作成した OSS バケット。
oss-sink-connector-bucket
ストレージパス
[サブディレクトリ不要]: データストレージパスは {Kafka インスタンス ID}/{トピック名} 形式です。
[時間ベースのサブディレクトリ]:
[yyyy/mm/dd/hh]: 生成されるサブディレクトリは {Kafka インスタンス ID}/{トピック名}/YYYY/MM/dd/HH 形式です。
[yyyy/mm/dd/hh]: 生成されるサブディレクトリは {Kafka インスタンス ID}/{トピック名}/YYYY/MM/dd 形式です。
[yyyymmddhh]: 生成されるサブディレクトリは {Kafka インスタンス ID}/{トピック名}/YYYYMMddHH 形式です。
[yyyymmdd]: 生成されるサブディレクトリは {Kafka インスタンス ID}/{トピック名}/YYYYMMdd 形式です。
説明上記の値で、YYYY は年、MM は月、dd は日、HH は時間を表します。
alikafka_post-cn-9dhsaassdd****/guide-oss-sink-topic/YYYY/MM/dd/HH
詳細設定
バッチ集約オブジェクトサイズ パラメータまたは バッチ集約時間ウィンドウ パラメータで指定された条件がバックログ内のメッセージで満たされると、新しいメッセージは新しいオブジェクトに書き込まれます。
なし
バッチ集約オブジェクトサイズ
集約するオブジェクトのサイズを指定します。有効な値: 1 ~ 128。単位: MiB。
5
バッチ集約時間ウィンドウ
集約の時間ウィンドウを指定します。単位: 分。
1
上記の手順を実行した後、[メッセージアウトフロー] ページに移動し、作成した OSS シンクコネクタを見つけ、[アクション] 列の [開始] をクリックします。[ステータス] 列の状態が [開始中] から [実行中] に変わると、コネクタが作成されます。
手順 3:OSSシンクコネクタのテスト
[メッセージアウトフロー] ページで、作成した OSS シンクコネクタを見つけ、[イベントソース] 列のソース トピックをクリックします。
- トピック詳細ページで、[メッセージの送信] をクリックします。
[メッセージの送受信を開始] パネルで、次の図に基づいてパラメータを設定し、[OK] をクリックします。

[メッセージアウトフロー] ページで、作成した OSS シンクコネクタを見つけ、[イベントターゲット] 列のデスティネーション バケットをクリックします。
バケット詳細ページの左側のナビゲーションペインで、 を選択します。表示されるページで、バケットの最も深いパスを入力します。

このパスには、次のタイプのオブジェクトがあります。
システム メタオブジェクト: システム メタオブジェクトの名前は oss_meta_file_partition_{partitionID} 形式です。オブジェクトの数は、アップストリーム トピックのパーティション数と同じです。システム メタオブジェクトは、バッチ情報を記録するために使用されます。システム メタオブジェクトは無視できます。
データ オブジェクト: データ オブジェクトの名前は partition_{partitionID}_offset_{offset}_{8 ビットのランダムな文字列} 形式です。パーティションの複数のメッセージが 1 つのオブジェクトに集約されている場合、オブジェクト名の {offset} の値は、メッセージの中で最小のオフセットです。
管理するオブジェクトを見つけ、[アクション] 列の を選択します。アクション
ダウンロードしたオブジェクトを開いて、メッセージの詳細を表示します。

上の図は、メッセージ詳細の例です。複数のメッセージは改行で区切られています。