このトピックでは、ApsaraMQ for Kafka のトピックから OSS のオブジェクトにデータを同期するために、Object Storage Service (OSS) シンクコネクタを作成する方法について説明します。
前提条件
前提条件については、「前提条件」をご参照ください。
注意事項
OSS シンクコネクタは、イベントが生成された時刻ではなく、イベントが処理された時刻に基づいて、OSS にデータをルーティングします。 OSS シンクコネクタの作成時に、時刻によってサブディレクトリを設定すると、時間境界で生成されたデータが次のサブディレクトリに配信される場合があります。
不正なデータ処理: OSS シンクコネクタの作成時に、JSONPath 構文を使用してカスタムパスまたはオブジェクトコンテンツを設定すると、コネクタは JSONPath 構文に一致しないデータを、設定したバッチポリシーに基づいて、バケット内の invalidRuleData/ という名前のディレクトリにルーティングします。 invalidRuleData/ ディレクトリがバケットに表示されている場合は、JSONPath 構文が正しいかどうかを確認し、すべてのメッセージがコンシューマーによって消費されていることを確認してください。
ルーティングには、数秒から数分のレイテンシが発生する可能性があります。
ソース ApsaraMQ for Kafka トピックのメッセージ本文を、カスタムパスまたはオブジェクトコンテンツに使用される JSONPath 構文に基づいて抽出する必要がある場合は、ソース ApsaraMQ for Kafka トピックのメッセージ本文を JSON 形式にエンコードまたはデコードする必要があります。
OSS シンクコネクタは、アップストリームアプリケーションからのデータを、既存のオブジェクトにリアルタイムで追加することにより、OSS に書き込みます。そのため、サブディレクトリのないパスでは、データは最後に表示されたオブジェクトに書き込まれます。この場合、メッセージを消費する際には注意が必要です。
課金ルール
OSS シンクコネクタは、Alibaba Cloud Function Compute 上で実行されます。 OSS シンクコネクタのデータが処理および送信されると、消費された Function Compute リソースに対して課金されます。 詳細については、「課金の概要」をご参照ください。
手順 1: OSS リソースを作成する
OSS コンソールでバケットを作成します。 詳細については、「バケットを作成する」をご参照ください。
このトピックでは、oss-sink-connector-bucket という名前のバケットが作成されます。
手順 2: OSS シンクコネクタを作成して起動する
ApsaraMQ for Kafka コンソール にログインします。 リソースの分布 セクションの 概要 ページで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。
左側のナビゲーションウィンドウで、 を選択します。
[タスク] ページで、[タスクの作成] をクリックします。
タスクの作成
Source (ソース) ステップで、データプロバイダー パラメーターを [apsaramq For Kafka] に設定し、画面の指示に従って他のパラメーターを設定します。 次に、[次のステップ] をクリックします。 次の表にパラメーターを示します。
パラメーター
説明
例
[リージョン]
ソース ApsaraMQ for Kafka インスタンスが存在するリージョン。
中国 (北京)
[apsaramq For Kafka インスタンス]
ルーティングするメッセージが生成される ApsaraMQ for Kafka インスタンス。
alikafka_post-cn-jte3****
[トピック]
ルーティングするメッセージが生成される ApsaraMQ for Kafka インスタンス上のトピック。
demo-topic
[グループ ID]
ソース ApsaraMQ for Kafka インスタンス上のコンシューマーグループの名前。
[クイック作成]: システムは、
GID_EVENTBRIDGE_xxx
形式で名前が付けられたコンシューマーグループを自動的に作成します。 この値を選択することをお勧めします。[既存のグループを使用]: 使用されていない既存のグループの ID を選択します。 使用中の既存のグループを選択すると、既存のメッセージの発行とサブスクライブに影響します。
クイック作成
[コンシューマーオフセット]
メッセージが消費されるオフセット。 有効な値:
[最新のオフセット]
[最も古いオフセット]
最新のオフセット
[ネットワーク設定]
メッセージをルーティングするネットワークのタイプ。 有効な値:
[ベーシックネットワーク]
[セルフマネージドインターネット]
ベーシックネットワーク
[VPC]
ApsaraMQ for Kafka インスタンスがデプロイされている virtual private cloud (VPC) の ID。 このパラメーターは、[ネットワーク設定] パラメーターを セルフマネージドインターネット に設定した場合にのみ必須です。
vpc-bp17fapfdj0dwzjkd****
[vswitch]
ApsaraMQ for Kafka インスタンスが属する vSwitch の ID。 このパラメーターは、[ネットワーク設定] パラメーターを セルフマネージドインターネット に設定した場合にのみ必須です。
vsw-bp1gbjhj53hdjdkg****
[セキュリティグループ]
ApsaraMQ for Kafka インスタンスが属するセキュリティグループの ID。 このパラメーターは、[ネットワーク設定] パラメーターを セルフマネージドインターネット に設定した場合にのみ必須です。
alikafka_pre-cn-7mz2****
データ形式
データ形式機能は、ソースから配信されたバイナリデータを特定のデータ形式にエンコードするために使用されます。 複数のデータ形式がサポートされています。 エンコードに関する特別な要件がない場合は、値として Json を指定します。
[json]: バイナリデータは UTF-8 エンコーディングに基づいて JSON 形式のデータにエンコードされ、ペイロードに配置されます。
[テキスト]: バイナリデータは UTF-8 エンコーディングに基づいて文字列にエンコードされ、ペイロードに配置されます。 これはデフォルト値です。
[バイナリ]: バイナリデータは Base64 エンコーディングに基づいて文字列にエンコードされ、ペイロードに配置されます。
Json
一括プッシュの件数
各関数呼び出しで送信できるメッセージの最大数。 リクエストは、バックログ内のメッセージ数が指定された値に達した場合にのみ送信されます。 有効な値: 1 ~ 10000。
100
バッチプッシュ間隔 (単位:秒)
関数が呼び出される時間間隔。 システムは、指定された時間間隔で集約されたメッセージを Function Compute に送信します。 有効な値: 0 ~ 15。 単位: 秒。 値 0 は、集約後すぐにメッセージが送信されることを指定します。
3
Filtering (フィルタリング) ステップで、パターン内容 コードエディターでデータパターンを定義してデータをフィルタリングします。 詳細については、「イベントパターン」をご参照ください。
Transform (変換) ステップで、データ分割、マッピング、エンリッチメント、およびルーティング機能を実装するためのデータクレンジング方法を指定します。 詳細については、「Function Compute を使用してメッセージクレンジングを実行する」をご参照ください。
Sink (ターゲット) ステップで、サービスタイプ パラメーターを [OSS] に設定し、画面の指示に従って他のパラメーターを設定します。 次に、[保存] をクリックします。 次の表にパラメーターを示します。
パラメーター
説明
例
[OSS バケット]
作成した OSS バケット。
重要指定されたバケットが手動で作成され、コネクタの実行時に削除されないことを確認してください。
バケットを作成するときは、[ストレージクラス] パラメーターを 標準 または IA に設定します。 アーカイブバケットは、ApsaraMQ for Kafka シンクコネクタではサポートされていません。
OSS シンクコネクタを作成すると、システムファイルパス .tmp/ が OSS バケットのレベル 1 ディレクトリに生成されます。 パス内の OSS オブジェクトを削除または使用しないでください。
oss-sink-connector-bucket
[ストレージパス]
ルーティングされたメッセージが格納されるオブジェクト。 OSS オブジェクトのキーは、パスと名前で構成されます。 たとえば、ObjectKey パラメーターが
a/b/c/a.txt
に設定されている場合、オブジェクトのパスはa/b/c/
で、オブジェクトの名前はa.txt
です。 オブジェクトのパスにカスタム値を指定できます。 オブジェクトの名前は、{ミリ秒単位の UNIX タイムスタンプ}_{8 ビットのランダムな文字列}
という形式に基づいてコネクタによって生成されます。 例: 1705576353794_elJmxu3v。このパラメーターをスラッシュ (/) に設定すると、バケットにサブディレクトリは使用できません。 データはバケットのレベル 1 ディレクトリに格納されます。
このパラメーターの値には、時間変数 {yyyy}、{MM}、{dd}、および {HH} を使用できます。 これらの変数は大文字と小文字が区別され、それぞれ年、月、日、および時間を指定します。
このパラメーターの値には、JSONPath 構文を使用できます。 例: {$.data.topic} および {$.data.partition}。 JSONPath 変数は、標準 JSONPath 式の要件を満たしている必要があります。 データ書き込み例外を防ぐために、JSONPath を使用して抽出された値は int または string 型であり、UTF-8 でエンコードされた文字を含み、スペース、2 つの連続するピリオド (.)、絵文字、スラッシュ (/)、またはバックスラッシュ (\) を含まないようにすることをお勧めします。
このパラメーターの値には、定数を使用できます。
説明サブディレクトリを使用すると、データを適切にグループ化できます。 これにより、単一のサブディレクトリに多数の小さなオブジェクトが存在することによって発生する問題を防ぐことができます。
OSS シンクコネクタのスループットは、サブディレクトリの数と正の相関があります。 サブディレクトリがない場合、またはサブディレクトリの数が少ない場合、コネクタのスループットは低くなります。 これにより、アップストリームアプリケーションでメッセージが蓄積される可能性があります。 多数のサブディレクトリは、データの分散、書き込み数の増加、パーツの過剰などの問題を引き起こす可能性があります。 次の提案に基づいてサブディレクトリを設定することをお勧めします。
ソース ApsaraMQ for Kafka トピック: 時間とパーティションごとにサブディレクトリを設定できます。 この方法では、ApsaraMQ for Kafka インスタンスのパーティション数を増やすことで、コネクタのスループットを向上させることができます。 例: prefix/{yyyy}/{MM}/{dd}/{HH}/{$.data.partition}/。
ビジネスグループ: データの特定のビジネスフィールドを使用してサブディレクトリを設定できます。 コネクタのスループットは、ビジネスフィールド値の数によって決まります。 例: prefixV2/{$.data.body.field}/。
異なる OSS シンクコネクタには、異なる定数プレフィックスを設定することをお勧めします。 これにより、複数のコネクタが同じディレクトリにデータを書き込むことを防ぎます。
alikafka_post-cn-9dhsaassdd****/guide-oss-シンク-topic/YYYY/MM/dd/HH
[バッチ集約オブジェクトサイズ]
集約されるオブジェクトのサイズ。 有効な値: 1 ~ 1024。 単位: MB。
説明OSS シンクコネクタは、同じ OSS オブジェクトにバッチでデータを書き込みます。 各バッチのデータサイズは 0 MB より大きく、16 MB 以下です。 そのため、OSS オブジェクトのサイズは、設定された値よりもわずかに大きくなる場合があります。 超過サイズは最大 16 MB です。
トラフィック量の多いシナリオでは、[バッチ集約オブジェクトサイズ] パラメーターを 100 MB より大きい値に設定し、[バッチ集約タイムウィンドウ] パラメーターを 1 時間より大きい値に設定することをお勧めします。 [バッチ集約オブジェクトサイズ] パラメーターの例: 128 MB および 512 MB。 [バッチ集約タイムウィンドウ] パラメーターの例: 60 分および 120 分。
5
[バッチ集約タイムウィンドウ]
集約のタイムウィンドウ。 有効な値: 1 ~ 1440。 単位: 分。
1
[ファイル圧縮]
[圧縮不要]: 接尾辞のないオブジェクトを生成します。
[GZIP]: .gz 接尾辞を持つオブジェクトを生成します。
[Snappy]: .snappy 接尾辞を持つオブジェクトを生成します。
[Zstd]: .zstd 接尾辞を持つオブジェクトを生成します。
オブジェクトを圧縮する場合、OSS シンクコネクタは、圧縮前のデータサイズに基づいてバッチでデータを書き込みます。 その結果、OSS に表示されるオブジェクトサイズは、バッチサイズよりも小さくなります。 オブジェクトを解凍すると、オブジェクトサイズはバッチサイズに近くなります。
圧縮不要
[ファイルコンテンツ]
[完全なデータ]: コネクタは、CloudEvents 仕様を使用して元のメッセージをパッケージ化します。 この値を選択すると、OSS にルーティングされたデータには、CloudEvents 仕様のメタデータが含まれます。 次のサンプルコードは例を示しています。 サンプルコードでは、data フィールドはデータを指定し、他のフィールドは CloudEvents 仕様のメタデータを指定します。
{ "specversion": "1.0", "id": "8e215af8-ca18-4249-8645-f96c1026****", "source": "acs:alikafka", "type": "alikafka:Topic:Message", "subject": "acs:alikafka:alikafka_pre-cn-i7m2msb9****:topic:****", "datacontenttype": "application/json; charset=utf-8", "time": "2022-06-23T02:49:51.589Z", "aliyunaccountid": "182572506381****", "data": { "topic": "****", "partition": 7, "offset": 25, "timestamp": 1655952591589, "headers": { "headers": [], "isReadOnly": false }, "key": "keytest", "value": "hello kafka msg" } }
[データ抽出]: JSONPath を使用して抽出されたデータは OSS にルーティングされます。 たとえば、$.data 式を設定すると、data フィールドの値のみが OSS にルーティングされます。
CloudEvents 仕様の追加フィールドが不要な場合は、[部分イベント] を選択し、$.data 式を設定して元のメッセージを OSS にルーティングすることをお勧めします。 これにより、コストを削減し、伝送効率を向上させることができます。
データ抽出
$.data
タスクのプロパティ
イベントのプッシュに失敗した場合に使用するリトライポリシーと、エラーの処理に使用するメソッドを設定します。 詳細については、「リトライポリシーとデッドレターキュー」をご参照ください。
タスクリスト ページに戻り、作成した OSS シンクコネクタを見つけ、操作する 列の 有効化する をクリックします。
ヒント メッセージで、OK をクリックします。
シンクコネクタが有効になるまで 30 ~ 60 秒かかります。 タスクリスト ページの Status 列で進行状況を確認できます。
手順 3: OSS シンクコネクタをテストする
[タスク] ページで、作成した OSS シンクコネクタを見つけ、[イベントソース] 列のソース トピックをクリックします。
- トピック詳細ページで、[メッセージの送信] をクリックします。
[メッセージの送受信を開始] パネルで、次の図に基づいてパラメーターを設定し、[OK] をクリックします。
[タスク] ページで、作成した OSS シンクコネクタを見つけ、[イベントターゲット] 列のデスティネーションバケットをクリックします。
表示されるページの左側のナビゲーションウィンドウで、 を選択します。
/tmp ディレクトリ: コネクタが依存するシステムファイルパス。 このパス内の OSS オブジェクトを削除または使用しないでください。
データファイルディレクトリ: このディレクトリには、コネクタに設定したパスに基づいてサブディレクトリが生成されます。 データオブジェクトは最下層のディレクトリにアップロードされます。
管理するオブジェクトを見つけ、[アクション] 列で を選択します。アクション
ダウンロードしたオブジェクトを開いて、メッセージの詳細を表示します。
前の図は、メッセージ詳細の例を示しています。 複数のメッセージは改行で区切られます。