このトピックでは、ApsaraMQ for Kafka Topic から MaxCompute テーブルにデータをエクスポートする MaxCompute シンクコネクタを作成する方法について説明します。
前提条件
前提条件については、「前提条件」をご参照ください。
使用上の注意
MaxCompute のパーティション機能を使用する場合は、テーブルを作成するときに、名前が time で型が string の追加のパーティションキー列を作成する必要があります。
ステップ 1:MaxCompute リソースを作成する
MaxCompute クライアントでテーブルを作成します。詳細については、「テーブルを作成する」をご参照ください。
この例では、kafka_to_maxcompute という名前のテーブルが作成されます。テーブルには 3 つの列が含まれており、パーティション機能が有効になっています。次のコードは、テーブルを作成するために実行されるステートメントを示しています。
CREATE TABLE IF NOT EXISTS kafka_to_maxcompute(topic STRING,valueName STRING,valueAge BIGINT) PARTITIONED by (time STRING);次のコードは、パーティション機能が無効になっている場合にテーブルを作成するために実行されるステートメントを示しています。
CREATE TABLE IF NOT EXISTS kafka_to_maxcompute(topic STRING,valueName STRING,valueAge BIGINT);ステートメントが実行されると、次の結果が表示されます。
[テーブル] ページで、作成されたテーブルに関する情報を表示します。
ステップ 2:MaxCompute シンクコネクタを作成して起動する
ApsaraMQ for Kafka コンソール にログインします。リソースの分布 セクションの 概要 ページで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。
左側のナビゲーションウィンドウで、 を選択します。
[タスク] ページで、[タスクの作成] をクリックします。
タスクの作成 ページで、タスク名 パラメーターと 説明 パラメーターを構成します。次に、画面の指示に従って他のパラメーターを構成します。
タスクの作成
Source (ソース) ステップで、データプロバイダー パラメーターを [apsaramq For Kafka] に設定し、画面の指示に従って他のパラメーターを構成します。次に、[次のステップ] をクリックします。次の表にパラメーターを示します。
パラメーター
説明
例
Region
ApsaraMQ for Kafka インスタンスが存在するリージョン。
中国 (杭州)
Apsaramq For Kafka インスタンス
ルーティングするデータが生成される ApsaraMQ for Kafka インスタンスの ID。
alikafka_post-cn-9hdsbdhd****
Topic
ルーティングするデータが生成される ApsaraMQ for Kafka インスタンスのトピック。
guide-sink-topic
グループ ID
ルーティングするデータが生成される ApsaraMQ for Kafka インスタンスのグループの ID。
[クイック作成]: システムは、ID が GID_EVENTBRIDGE_xxx 形式のグループを自動的に作成します。
[既存のグループを使用]: 使用されていない既存のグループの ID を選択します。使用中の既存のグループを選択すると、既存のメッセージの発行とサブスクライブに影響します。
既存のグループを使用
Consumer Offset
[最新のオフセット]: メッセージは最新のオフセットから消費されます。
[最も古いオフセット]: メッセージは最も古いオフセットから消費されます。
最新のオフセット
ネットワーク設定
国境を越えたデータ転送が必要な場合は、[セルフマネージドインターネット] を選択します。それ以外の場合は、[ベーシックネットワーク] を選択します。
ベーシックネットワーク
データ形式
データ形式機能は、ソースから配信されたバイナリデータを特定のデータ形式にエンコードするために使用されます。複数のデータ形式がサポートされています。エンコーディングに特別な要件がない場合は、値として Json を指定します。
Json: バイナリデータは UTF-8 エンコーディングに基づいて JSON 形式のデータにエンコードされ、ペイロードに配置されます。これはデフォルト値です。
テキスト: バイナリデータは UTF-8 エンコーディングに基づいて文字列にエンコードされ、ペイロードに配置されます。
バイナリ: バイナリデータは Base64 エンコーディングに基づいて文字列にエンコードされ、ペイロードに配置されます。
Json
一括プッシュの件数
各関数呼び出しで送信できるメッセージの最大数。リクエストは、バックログ内のメッセージ数が指定された値に達した場合にのみ送信されます。有効な値: 1 ~ 10000。
2000
バッチプッシュ間隔 (単位:秒)
関数が呼び出される時間間隔。システムは、指定された時間間隔で集約されたメッセージを Function Compute に送信します。有効な値: 0 ~ 15。単位: 秒。値 0 は、集約後すぐにメッセージが送信されることを示します。
3
Filtering (フィルタリング) ステップで、リクエストをフィルタリングするデータパターンを定義します。詳細については、「イベントパターン」をご参照ください。
Transform (変換) ステップで、分割、マッピング、エンリッチメント、動的ルーティングなどのデータ処理機能を実装するデータクレンジング方法を指定します。詳細については、「Function Compute を使用してメッセージクレンジングを実行する」をご参照ください。
Sink (ターゲット) ステップで、サービスタイプ パラメーターを [maxcompute Acs.maxcompute] に設定し、画面の指示に従って他のパラメーターを構成します。次の表にパラメーターを示します。
パラメーター
説明
例
AccessKey ID
Alibaba Cloud アカウントの AccessKey ID。 AccessKey ID は、MaxCompute へのアクセスに使用されます。
yourAccessKeyID
AccessKey Secret
Alibaba Cloud アカウントの AccessKey Secret。
yourAccessKeySecret
Maxcompute プロジェクト名
作成した MaxCompute プロジェクト。
test_compute
Maxcompute テーブル名
作成した MaxCompute テーブル。
kafka_to_maxcompute
Maxcompute テーブル入力パラメーター
MaxCompute テーブルを選択すると、テーブルの列名と型がこのセクションに表示されます。[値抽出ルール] パラメーターのみを構成する必要があります。次のコードは、メッセージの値抽出ルールを構成する方法を示しています。この例では、topic 列の値はメッセージの topic フィールドから抽出されます。したがって、[値抽出ルール] パラメーターは
$.topicに設定されます。{ 'data': { 'topic': 't_test', 'partition': 2, 'offset': 1, 'timestamp': 1717048990499, 'headers': { 'headers': [], 'isReadOnly': False }, 'key': 'MaxCompute-K1', 'value': 'MaxCompute-V1' }, 'id': '9b05fc19-9838-4990-bb49-ddb942307d3f-2-1', 'source': 'acs:alikafka', 'specversion': '1.0', 'type': 'alikafka:Topic:Message', 'datacontenttype': 'application/json; charset=utf-8', 'time': '2024-05-30T06:03:10.499Z', 'aliyunaccountid': '1413397765616316' }topic:
$.data.topicvaluename:
$.data.valuevalueage:
$.data.offsetパーティションディメンション
有効な値:
[無効]
[有効]
パーティション機能を有効にする場合は、[パーティション値] などのパラメーターも構成する必要があります。 [パーティション値] パラメーターの値は、次のいずれかの形式にすることができます。
時間変数: {yyyy}、{MM}、{dd}、{HH}、{mm}。これらの変数は大文字と小文字が区別され、それぞれ年、月、日、時、分を示します。
定数。
はい
{yyyy}-{MM}-{dd}.{HH}:{mm}.suffix
ネットワーク設定
[VPC]: ApsaraMQ for Kafka のメッセージは、VPC (Virtual Private Cloud) 内で MaxCompute に配信されます。
[インターネット]: ApsaraMQ for Kafka のメッセージは、インターネット経由で MaxCompute に配信されます。
インターネット
VPC
VPC ID。このパラメーターは、ネットワーク設定 パラメーターを [VPC] に設定した場合にのみ必須です。
vpc-bp17fapfdj0dwzjkd****
vSwitch
vSwitch ID。このパラメーターは、ネットワーク設定 パラメーターを [VPC] に設定した場合にのみ必須です。
vsw-bp1gbjhj53hdjdkg****
セキュリティグループ
セキュリティグループ ID。このパラメーターは、ネットワーク設定 パラメーターを [VPC] に設定した場合にのみ必須です。
test_group
タスクのプロパティ
イベントのプッシュに失敗した場合に使用するリトライポリシーと、エラーを処理するために使用するメソッドを構成します。詳細については、「リトライポリシーと配信不能キュー」をご参照ください。
[保存] をクリックします。[タスク] ページで、作成した MaxCompute シンクコネクタを見つけます。[ステータス] 列のステータスが [開始中] から [実行中] に変わると、コネクタが作成されます。
ステップ 3:MaxCompute シンクコネクタをテストする
[タスク] ページで、作成した MaxCompute シンクコネクタを見つけ、[イベントソース] 列のソース Topic の名前をクリックします。
トピック詳細ページで、[メッセージの送信] をクリックします。
[メッセージの送受信を開始] パネルで、次の図に基づいてパラメーターを構成し、[OK] をクリックします。

MaxCompute コンソールに移動し、次の SQL ステートメントを実行して、パーティションに関する情報をクエリします。
show PARTITIONS kafka_to_maxcompute;次の結果が返されます。

パーティション情報に基づいて次のステートメントを実行して、パーティション内のデータをクエリします。
SELECT * FROM kafka_to_maxcompute WHERE time="2024-05-31.16:37.suffix";次の結果が返されます。
