すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:OSSシンクコネクタの作成(旧バージョン)

最終更新日:Feb 17, 2025

このトピックでは、ApsaraMQ for Kafka のトピックから OSS のオブジェクトにデータを同期するために、Object Storage Service(OSS)シンクコネクタを作成する方法について説明します。

前提条件

前提条件については、前提条件をご参照ください。

手順 1:OSSリソースの作成

OSSコンソールでバケットを作成します。詳細については、バケットの作成をご参照ください。

このトピックでは、oss-sink-connector-bucket という名前のバケットが作成されます。

手順 2:OSSシンクコネクタの作成と開始

  1. ApsaraMQ for Kafka コンソール にログオンします。リソースの分布 セクションの 概要 ページで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。

  2. 左側のナビゲーションペインで、[コネクタエコシステム統合] > [メッセージアウトフロー] を選択します。
  3. [メッセージアウトフロー] ページで、[タスクの作成] をクリックします。
  4. [メッセージアウトフロータスクの作成] パネルで、パラメータを設定し、[確認] をクリックします。

    1. [基本情報] 手順で、[タスク名] フィールドにタスク名を指定し、OSS[メッセージアウトフロータスクタイプ] ドロップダウンリストから を選択します。

    2. [リソース設定] 手順で、パラメータを設定します。次の表にパラメータを示します。

      表 1. ApsaraMQ for Kafka 用に構成されたパラメータ

      パラメータ

      説明

      リージョン

      Message Queue for Apache Kafka インスタンスが存在するリージョンを選択します。

      中国 (杭州)

      Message Queue for Apache Kafka インスタンス

      Message Queue for Apache Kafka インスタンスの ID。

      alikafka_post-cn-9hdsbdhd****

      トピック

      Message Queue for Apache Kafka インスタンスのソース トピック。

      guide-sink-topic

      グループ ID

      Message Queue for Apache Kafka インスタンスのグループの ID。

      • [クイック作成]: システムは、ID が GID_EVENTBRIDGE_xxx 形式のグループを自動的に作成します。

      • [既存のグループを使用]: 使用されていない既存のグループの ID を選択します。使用中の既存のグループを選択すると、既存のメッセージの発行とサブスクリプションに影響します。

      既存のグループを使用

      同時実行クォータ (コンシューマー)

      トピックのデータを同時に消費するスレッドの数。次の項目では、トピック内のスレッドとパーティション間のマッピングについて説明します。

      • トピックのパーティション数が、トピックのデータを同時に消費するスレッド数と同じ場合、1 つのスレッドが 1 つのパーティションのデータを消費します。この構成を使用することをお勧めします。

      • トピックのパーティション数が、トピックのデータを同時に消費するスレッド数よりも大きい場合、スレッドはすべてのパーティションのデータを均等に消費します。

      • トピックのパーティション数が、トピックのデータを同時に消費するスレッド数よりも小さい場合、1 つのスレッドが 1 つのパーティションのデータを消費します。追加のスレッドはデータを消費しません。

      2

      コンシューマー オフセット

      • [最新のオフセット]: 最新のオフセットからメッセージを消費します。

      • [最も古いオフセット]: 最も古いオフセットからメッセージを消費します。

      最新のオフセット

      ネットワーク構成

      国境を越えたデータ転送が必要な場合は、[インターネット] を選択します。それ以外の場合は、[デフォルト ネットワーク] を選択します。

      デフォルト ネットワーク

      表 1. OSS 用に構成されたパラメータ

      パラメータ

      説明

      OSSバケット

      作成した OSS バケット。

      oss-sink-connector-bucket

      ストレージパス

      • [サブディレクトリ不要]: データストレージパスは {Kafka インスタンス ID}/{トピック名} 形式です。

      • [時間ベースのサブディレクトリ]:

        • [yyyy/mm/dd/hh]: 生成されるサブディレクトリは {Kafka インスタンス ID}/{トピック名}/YYYY/MM/dd/HH 形式です。

        • [yyyy/mm/dd/hh]: 生成されるサブディレクトリは {Kafka インスタンス ID}/{トピック名}/YYYY/MM/dd 形式です。

        • [yyyymmddhh]: 生成されるサブディレクトリは {Kafka インスタンス ID}/{トピック名}/YYYYMMddHH 形式です。

        • [yyyymmdd]: 生成されるサブディレクトリは {Kafka インスタンス ID}/{トピック名}/YYYYMMdd 形式です。

      説明

      上記の値で、YYYY は年、MM は月、dd は日、HH は時間を表します。

      alikafka_post-cn-9dhsaassdd****/guide-oss-sink-topic/YYYY/MM/dd/HH

      詳細設定

      バッチ集約オブジェクトサイズ パラメータまたは バッチ集約時間ウィンドウ パラメータで指定された条件がバックログ内のメッセージで満たされると、新しいメッセージは新しいオブジェクトに書き込まれます。

      なし

      バッチ集約オブジェクトサイズ

      集約するオブジェクトのサイズを指定します。有効な値: 1 ~ 128。単位: MiB。

      5

      バッチ集約時間ウィンドウ

      集約の時間ウィンドウを指定します。単位: 分。

      1

    上記の手順を実行した後、[メッセージアウトフロー] ページに移動し、作成した OSS シンクコネクタを見つけ、[アクション] 列の [開始] をクリックします。[ステータス] 列の状態が [開始中] から [実行中] に変わると、コネクタが作成されます。

手順 3:OSSシンクコネクタのテスト

  1. [メッセージアウトフロー] ページで、作成した OSS シンクコネクタを見つけ、[イベントソース] 列のソース トピックをクリックします。

  2. トピック詳細ページで、[メッセージの送信] をクリックします。
  3. [メッセージの送受信を開始] パネルで、次の図に基づいてパラメータを設定し、[OK] をクリックします。

    发送消息

  4. [メッセージアウトフロー] ページで、作成した OSS シンクコネクタを見つけ、[イベントターゲット] 列のデスティネーション バケットをクリックします。

  5. バケット詳細ページの左側のナビゲーションペインで、[ファイル] > [オブジェクト] を選択します。表示されるページで、バケットの最も深いパスを入力します。

    最深层路径

    このパスには、次のタイプのオブジェクトがあります。

    • システム メタオブジェクト: システム メタオブジェクトの名前は oss_meta_file_partition_{partitionID} 形式です。オブジェクトの数は、アップストリーム トピックのパーティション数と同じです。システム メタオブジェクトは、バッチ情報を記録するために使用されます。システム メタオブジェクトは無視できます。

    • データ オブジェクト: データ オブジェクトの名前は partition_{partitionID}_offset_{offset}_{8 ビットのランダムな文字列} 形式です。パーティションの複数のメッセージが 1 つのオブジェクトに集約されている場合、オブジェクト名の {offset} の値は、メッセージの中で最小のオフセットです。

  6. 管理するオブジェクトを見つけ、[アクション] 列の 图标 > [ダウンロード] を選択します。アクション

  7. ダウンロードしたオブジェクトを開いて、メッセージの詳細を表示します。

    消息

    上の図は、メッセージ詳細の例です。複数のメッセージは改行で区切られています。