MaxCompute と Kafka を統合することで、効率的で信頼性の高いデータ処理および分析機能が提供されます。この統合は、リアルタイム処理、大規模なデータストリーム、および複雑なデータ分析を必要とするシナリオに最適です。このトピックでは、Message Queue for Apache Kafka およびセルフマネージド Kafka インスタンスから MaxCompute にデータを書き込む方法について説明し、セルフマネージド Kafka インスタンスの詳細な例を示します。
Kafka データの MaxCompute への書き込み:Alibaba Cloud フルマネージド Kafka
MaxCompute は Message Queue for Apache Kafka と密接に統合されています。Message Queue for Apache Kafka 用の MaxCompute Sink Connector を使用すると、サードパーティツールやカスタム開発を必要とせずに、指定したトピックから MaxCompute テーブルにデータを継続的にインポートできます。詳細については、「MaxCompute Sink Connector の作成」をご参照ください。
Kafka データの MaxCompute への書き込み:セルフマネージドオープンソース Kafka
前提条件
Kafka V2.2 以降をデプロイし、Kafka トピックを作成済みであること。バージョン 3.4.0 を推奨します。
MaxCompute プロジェクトとテーブルを作成済みであること。詳細については、「MaxCompute プロジェクトの作成」および「テーブルの作成」をご参照ください。
注意事項
Kafka コネクタは、TEXT、CSV、JSON、および FLATTEN フォーマットでのデータ書き込みをサポートしています。以下の注意事項は各フォーマットに適用されます。データ型の詳細については、「データ型の説明」をご参照ください。
TEXT または JSON フォーマットの Kafka データを MaxCompute に書き込む場合、MaxCompute テーブルは次の要件を満たす必要があります:
フィールド名
フィールドの型
固定フィールド
topic
STRING
はい
partition
BIGINT
はい
offset
BIGINT
はい
key
TEXT フォーマットの Kafka データを書き込む場合、フィールドの型は STRING である必要があります。
JSON フォーマットの Kafka データを書き込む場合、フィールドの型は書き込むデータに応じて STRING または JSON になります。
このフィールドは、Kafka メッセージから MaxCompute テーブルにキーを同期するために固定されています。Kafka メッセージを MaxCompute に同期するモードの詳細については、「mode」をご参照ください。
value
TEXT フォーマットの Kafka データを書き込む場合、フィールドの型は STRING である必要があります。
JSON フォーマットの Kafka データを書き込む場合、フィールドの型は書き込むデータに応じて STRING または JSON になります。
このフィールドは、Kafka メッセージから MaxCompute テーブルに値を同期するために固定されています。Kafka メッセージを MaxCompute に同期するモードの詳細については、「mode」をご参照ください。
pt
STRING (パーティションフィールド)
はい
FLATTEN または CSV フォーマットの Kafka データを MaxCompute に書き込む場合、テーブルには次のフィールドとデータ型を含める必要があります。書き込むデータに基づいて他のフィールドを定義できます。
フィールド名
フィールドの型
topic
STRING
partition
BIGINT
offset
BIGINT
pt
STRING (パーティションフィールド)
CSV フォーマットの Kafka データを MaxCompute テーブルに書き込む場合、書き込み操作を正常に完了させるために、MaxCompute テーブルのカスタムフィールドの順序とデータ型が Kafka データの列と一致する必要があります。
FLATTEN フォーマットの Kafka データを MaxCompute テーブルに書き込む場合、書き込み操作を正常に完了させるために、MaxCompute テーブルのカスタムフィールドの名前が Kafka データのフィールド名と一致する必要があります。
たとえば、FLATTEN フォーマットの Kafka データが
{"A":a,"B":"b","C":{"D":"d","E":"e"}}の場合、MaxCompute テーブルは次のように設定する必要があります。CREATE TABLE IF NOT EXISTS table_flatten( topic STRING, `partition` BIGINT, `offset` BIGINT, A BIGINT, B STRING, C JSON ) PARTITIONED BY (pt STRING);
Kafka コネクタサービスの設定と起動
この例では Linux 環境を使用します。コマンドウィンドウで、次のコマンドを実行するか、ダウンロードリンクを使用して
kafka-connector-2.0.jarパッケージをダウンロードします。wget http://maxcompute-repo.oss-cn-hangzhou.aliyuncs.com/kafka/kafka-connector-2.0.jar依存関係の競合を防ぐために、
$KAFKA_HOME/libsディレクトリにconnectorなどのサブフォルダを作成し、その中にkafka-connector-2.0.jarパッケージを配置します。説明kafka-connector-2.0.jarパッケージがご利用の Kafka デプロイメント環境と互換性がない場合は、「Kafka-connector の設定」でKafka-connectorサービスの設定と起動方法に関する詳細をご参照ください。$KAFKA_HOME/configディレクトリで、connect-distributed.propertiesファイルを設定します。connect-distributed.propertiesファイルに次の内容を追加します。## 次の内容を追加します plugin.path=<KAFKA_HOME>/libs/connector ## key.converter および value.converter パラメーターの値を更新します key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter$KAFKA_HOME/ディレクトリで、次のコマンドを実行してKafka-connectorサービスを起動します。## 起動コマンド bin/connect-distributed.sh config/connect-distributed.properties &
Kafka コネクタタスクの設定と起動
odps-sink-connector.json設定ファイルを作成して設定します。次に、odps-sink-connector.jsonファイルを任意の場所にアップロードします。odps-sink-connector.json設定ファイルの内容とパラメーターについては、次のセクションで説明します。{ "name": "Kafka コネクタタスク名", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "your_topic", "endpoint": "エンドポイント", "tunnel_endpoint": "your_tunnel エンドポイント", "project": "プロジェクト", "schema":"default", "table": "your_table", "account_type": "アカウントタイプ (STS または ALIYUN)", "access_id": "アクセス ID", "access_key": "アクセスキー", "account_id": "sts のアカウント ID", "sts.endpoint": "sts エンドポイント", "region_id": "sts のリージョン ID", "role_name": "sts のロール名", "client_timeout_ms": "STS トークンの有効期間 (ms)", "format": "TEXT", "mode": "KEY", "partition_window_type": "MINUTE", "use_streaming": false, "buffer_size_kb": 65536, "sink_pool_size":"150", "record_batch_size":"8000", "runtime.error.topic.name":"実行時エラー発生時の kafka トピック", "runtime.error.topic.bootstrap.servers":"エラーキューの kafka ブートストラップサーバー", "skip_error":"false" } }共通パラメーター
パラメーター
必須
説明
name
はい
タスクの名前。名前は一意である必要があります。
connector.class
はい
Kafka コネクタサービスを起動するためのクラス名。デフォルト値はcom.aliyun.odps.kafka.connect.MaxComputeSinkConnectorです。tasks.max
はい
Kafka コネクタ内のコンシューマープロセスの最大数。値は 0 より大きい整数である必要があります。topics
はい
Kafka トピックの名前。
endpoint
はい
MaxCompute サービスのエンドポイント。
MaxCompute プロジェクトの作成時に選択したリージョンとネットワーク接続タイプに基づいてエンドポイントを設定する必要があります。各リージョンとネットワークのエンドポイントについては、「エンドポイント」をご参照ください。
tunnel_endpoint
いいえ
Tunnel サービスのパブリックエンドポイント。
Tunnel エンドポイントを設定しない場合、トンネルは MaxCompute サービスが配置されているネットワークに対応する Tunnel エンドポイントに自動的にルーティングされます。Tunnel エンドポイントを設定した場合、その設定が優先され、自動ルーティングは無効になります。
各リージョンとネットワークのエンドポイントについては、「エンドポイント」をご参照ください。
project
はい
ターゲットの MaxCompute プロジェクトの名前。
schema
いいえ
このパラメーターは、ターゲットの MaxCompute プロジェクトが3層スキーマモデルで設定されている場合に必須です。デフォルト値は default です。
このパラメーターは、ターゲットの MaxCompute プロジェクトが3層スキーマモデルで設定されていない場合は不要です。
スキーマの詳細については、「スキーマ操作」をご参照ください。
table
はい
ターゲットの MaxCompute プロジェクト内のテーブルの名前。
format
いいえ
書き込むメッセージのフォーマット。有効な値:
TEXT (デフォルト):メッセージは文字列です。
BINARY:メッセージはバイト配列です。
CSV:メッセージはカンマ (,) で区切られた値を持つ文字列です。
JSON:メッセージは JSON データ型の文字列です。MaxCompute の JSON 型の詳細については、「JSON データ型」をご参照ください。
FLATTEN:メッセージは JSON データ型の文字列です。JSON 文字列のキーと値が解析され、MaxCompute テーブルの対応する列に書き込まれます。JSON データのキーは、MaxCompute テーブルの列名に対応している必要があります。
さまざまなフォーマットのメッセージをインポートする例については、「使用例」をご参照ください。
mode
いいえ
メッセージを MaxCompute に同期するモード。有効な値:
KEY:メッセージキーのみを保持し、キーをターゲットの MaxCompute テーブルに書き込みます。
VALUE:メッセージ値のみを保持し、値をターゲットの MaxCompute テーブルに書き込みます。
DEFAULT (デフォルト):メッセージキーと値の両方を保持し、それらをターゲットの MaxCompute テーブルに書き込みます。
DEFAULT モードでは、TEXT および BINARY データフォーマットのみがサポートされます。
partition_window_type
いいえ
システム時刻によってデータをパーティション分割します。有効な値:DAY、HOUR (デフォルト)、および MINUTE。
use_streaming
いいえ
ストリーミングデータトンネルを使用するかどうかを指定します。有効な値:
false (デフォルト):無効。
true:有効。
buffer_size_kb
いいえ
odps パーティションライターの内部バッファーのサイズ (KB)。デフォルト値は 65536 KB です。
sink_pool_size
いいえ
マルチスレッド書き込みの最大スレッド数。デフォルト値はシステムの CPU コア数です。
record_batch_size
いいえ
Kafka コネクタタスク内の単一スレッドが一度に並行して送信できるメッセージの最大数。
skip_error
いいえ
不明なエラーを引き起こすレコードをスキップするかどうかを指定します。有効な値:
false (デフォルト):レコードをスキップしません。
true:レコードをスキップします。
説明skip_error が false に設定され、runtime.error.topic.name パラメーターが設定されていない場合、不明なエラーが発生すると、プロセスはデータ書き込みを停止します。プロセスはブロックされ、ログに例外がスローされます。
skip_error が true に設定され、runtime.error.topic.name が設定されていない場合、データ書き込みプロセスは続行され、異常データは破棄されます。
skip_error が false に設定され、runtime.error.topic.name が設定されている場合、データ書き込みプロセスは続行され、異常データは runtime.error.topic.name で指定されたトピックに記録されます。
異常データの処理方法の例については、「異常データの処理例」をご参照ください。
runtime.error.topic.name
いいえ
書き込み操作中に不明なエラーを引き起こしたデータが書き込まれる Kafka トピックの名前。
runtime.error.topic.bootstrap.servers
いいえ
書き込み操作中に不明なエラーを引き起こしたデータが書き込まれる Kafka インスタンスのブートストラップサーバーアドレス。
account_type
はい
ターゲットの MaxCompute サービスにアクセスするメソッド。有効な値は STS と ALIYUN です。デフォルト値は ALIYUN です。
アクセスするメソッドが異なれば、必要なアクセス認証情報パラメーターも異なります。詳細については、「ALIYUN メソッドを使用した MaxCompute へのアクセス」および「STS メソッドを使用した MaxCompute へのアクセス」をご参照ください。
共通パラメーターに加えて、次のパラメーターも設定する必要があります。
パラメーター名
説明
access_id
ご利用の Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID。
AccessKey ID は [AccessKey 管理] ページで取得できます。
access_key
AccessKey ID に対応する AccessKey Secret。
共通パラメーターに加えて、次のパラメーターも設定する必要があります。
パラメーター
説明
account_id
ターゲットの MaxCompute プロジェクトへのアクセスに使用するアカウントの ID。アカウント ID は [アカウントセンター] で確認できます。
region_id
ターゲットの MaxCompute プロジェクトのリージョン ID。各リージョンの ID については、「エンドポイント」をご参照ください。
role_name
ターゲットの MaxCompute プロジェクトへのアクセスに使用するロールの名前。ロール名は [ロール] ページで確認できます。
client_timeout_ms
Security Token Service (STS) トークンの更新間隔 (ミリ秒)。デフォルト値は 11 ms です。
sts.endpoint
一時的なセキュリティトークン (STS) を使用した身分認証に必要な STS サービスのエンドポイント。
各リージョンとネットワークのエンドポイントについては、「エンドポイント」をご参照ください。
次のコマンドを実行して、Kafka コネクタのデータ移行タスクを開始します。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @odps-sink-connector.json
使用例
TEXT データの書き込み
データの準備
ローカルクライアント (odpscmd) または MaxCompute SQL コマンドを実行できる別のツールを使用して MaxCompute に接続し、ターゲットテーブルを作成します。
CREATE TABLE IF NOT EXISTS table_text( topic STRING, `partition` BIGINT, `offset` BIGINT, key STRING, value STRING ) PARTITIONED BY (pt STRING);Kafka データの作成
$KAFKA_HOME/bin/ディレクトリで、次のコマンドを実行して Kafka トピックを作成します。この例では、トピック名としてtopic_textを使用します。sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_text次のコマンドを実行して Kafka メッセージを作成します。
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_text --property parse.key=true >123 abc >456 edf
(任意)
Kafka-connectorサービスを起動します。詳細については、「Kafka コネクタサービスの設定と起動」をご参照ください。説明Kafka-connectorサービスがすでに実行中の場合は、このステップをスキップできます。odps-sink-connector.jsonファイルを作成して設定します。次に、odps-sink-connector.jsonファイルを$KAFKA_HOME/configパスなどの任意の場所にアップロードします。次のコードは
odps-sink-connector.jsonファイルの例です。odps-sink-connector.jsonファイルの詳細については、「Kafka コネクタタスクの設定と起動」をご参照ください。{ "name": "odps-test-text", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_text", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema":"default", "table": "table_text", "account_type": "ALIYUN", "access_id": "<yourAccessKeyId>", "access_key": "<yourAccessKeySecret>", "partition_window_type": "MINUTE", "mode":"VALUE", "format":"TEXT", "sink_pool_size":"150", "record_batch_size":"9000", "buffer_size_kb":"600000" } }次のコマンドを実行して、Kafka コネクタのデータ移行タスクを開始します。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json結果の検証
ローカルクライアント (odpscmd) または MaxCompute SQL コマンドを実行できる別のツールを使用して MaxCompute に接続し、次のコマンドを実行してデータをクエリし、結果を検証します。
set odps.sql.allow.fullscan=true; select * from table_text;次の出力が返されます:
# odps-sink-connector.json 設定ファイルの mode パラメーターが VALUE に設定されているため、値の内容のみが保持されます。キーフィールドは NULL です。 +-------+------------+------------+-----+-------+----+ | topic | partition | offset | key | value | pt | +-------+------------+------------+-----+-------+----+ | topic_text | 0 | 0 | NULL | abc | 07-13-2023 21:13 | | topic_text | 0 | 1 | NULL | edf | 07-13-2023 21:13 | +-------+------------+------------+-----+-------+----+
CSV データの書き込み
データの準備
ローカルクライアント (odpscmd) または MaxCompute SQL コマンドを実行できる別のツールを使用して MaxCompute に接続し、ターゲットテーブルを作成します。
CREATE TABLE IF NOT EXISTS table_csv( topic STRING, `partition` BIGINT, `offset` BIGINT, id BIGINT, name STRING, region STRING ) PARTITIONED BY (pt STRING);Kafka へのデータの書き込み
$KAFKA_HOME/bin/ディレクトリで、次のコマンドを実行してtopic_csvという名前の Kafka トピックを作成します。sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_csv次のコマンドを実行して Kafka メッセージを作成します。
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_csv --property parse.key=true >123 1103,zhangsan,china >456 1104,lisi,usa
(任意)
Kafka-connectorサービスを起動します。詳細については、「Kafka コネクタサービスの設定と起動」をご参照ください。説明Kafka-connectorサービスがすでに実行中の場合は、このステップをスキップできます。odps-sink-connector.jsonファイルを作成して設定し、odps-sink-connector.jsonファイルを任意の場所にアップロードします。このトピックでは、例として$KAFKA_HOME/configパスを使用します。次のコードは
odps-sink-connector.jsonファイルの例です。odps-sink-connector.jsonファイルの詳細については、「Kafka コネクタタスクの設定と起動」をご参照ください。{ "name": "odps-test-csv", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_csv", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema":"default", "table": "table_csv", "account_type": "ALIYUN", "access_id": "<yourAccessKeyId>", "access_key": "<yourAccessKeySecret>", "partition_window_type": "MINUTE", "format":"CSV", "mode":"VALUE", "sink_pool_size":"150", "record_batch_size":"9000", "buffer_size_kb":"600000" } }次のコマンドを実行して、Kafka コネクタのデータ移行タスクを開始します。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json結果の検証
ローカルクライアント (odpscmd) または MaxCompute SQL コマンドを実行できる別のツールを使用して MaxCompute に接続し、次のコマンドを実行してデータをクエリし、結果を検証します。
set odps.sql.allow.fullscan=true; select * from table_csv;次の出力が返されます:
+-------+------------+------------+------------+------+--------+----+ | topic | partition | offset | id | name | region | pt | +-------+------------+------------+------------+------+--------+----+ | csv_test | 0 | 0 | 1103 | zhangsan | china | 07-14-2023 00:10 | | csv_test | 0 | 1 | 1104 | lisi | usa | 07-14-2023 00:10 | +-------+------------+------------+------------+------+--------+----+
JSON データの書き込み
データの準備
ローカルクライアント (odpscmd) または MaxCompute SQL コマンドを実行できる別のツールを使用して MaxCompute に接続し、ターゲットテーブルを作成します。
CREATE TABLE IF NOT EXISTS table_json( topic STRING, `partition` BIGINT, `offset` BIGINT, key STRING, value JSON ) PARTITIONED BY (pt STRING);Kafka データの作成
$KAFKA_HOME/bin/ディレクトリで、次のコマンドを実行して Kafka トピックを作成します。この例では、トピック名としてtopic_jsonを使用します。sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_json次のコマンドを実行して、Kafka メッセージを作成します。
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_json --property parse.key=true >123 {"id":123,"name":"json-1","region":"beijing"} >456 {"id":456,"name":"json-2","region":"hangzhou"}
(任意)
Kafka-connectorサービスを起動します。詳細については、「Kafka コネクタサービスの設定と起動」をご参照ください。説明Kafka-connectorサービスがすでに実行中の場合は、このステップをスキップできます。odps-sink-connector.jsonファイルを作成して設定します。次に、odps-sink-connector.jsonファイルを$KAFKA_HOME/configパスなどの任意の場所にアップロードします。次のコードは
odps-sink-connector.jsonファイルの例です。odps-sink-connector.jsonファイルの詳細については、「Kafka コネクタタスクの設定と起動」をご参照ください。{ "name": "odps-test-json", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_json", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema":"default", "table": "table_json", "account_type": "ALIYUN", "access_id": "<yourAccessKeyId>", "access_key": "<yourAccessKeySecret>", "partition_window_type": "MINUTE", "mode":"VALUE", "format":"JSON", "sink_pool_size":"150", "record_batch_size":"9000", "buffer_size_kb":"600000" } }次のコマンドを実行して、Kafka コネクタのデータ移行タスクを開始します。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json結果の検証
ローカルクライアント (odpscmd) または MaxCompute SQL コマンドを実行できる別のツールを使用して MaxCompute に接続し、次のコマンドを実行してデータをクエリし、結果を検証します。
set odps.sql.allow.fullscan=true; select * from table_json;次の出力が返されます:
# JSON データは正常に value フィールドに書き込まれます。 +-------+------------+------------+-----+-------+----+ | topic | partition | offset | key | value | pt | +-------+------------+------------+-----+-------+----+ | Topic_json | 0 | 0 | NULL | {"id":123,"name":"json-1","region":"beijing"} | 07-14-2023 00:28 | | Topic_json | 0 | 1 | NULL | {"id":456,"name":"json-2","region":"hangzhou"} | 07-14-2023 00:28 | +-------+------------+------------+-----+-------+----+
FLATTEN データの書き込み
データの準備
ローカルクライアント (odpscmd) または MaxCompute SQL コマンドを実行できる別のツールを使用して MaxCompute に接続し、ターゲットテーブルを作成します。
CREATE TABLE IF NOT EXISTS table_flatten( topic STRING, `partition` BIGINT, `offset` BIGINT, id BIGINT, name STRING, extendinfo JSON ) PARTITIONED BY (pt STRING);Kafka データの作成
$KAFKA_HOME/bin/ディレクトリで、次のコマンドを実行して Kafka トピックを作成します。この例では、トピック名としてtopic_flattenを使用します。./kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_flatten次のコマンドを実行して Kafka メッセージを作成します。
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_flatten --property parse.key=true >123 {"id":123,"name":"json-1","extendinfo":{"region":"beijing","sex":"M"}} >456 {"id":456,"name":"json-2","extendinfo":{"region":"hangzhou","sex":"W"}}
(任意)
Kafka-connectorサービスを起動します。詳細については、「Kafka コネクタサービスの設定と起動」をご参照ください。説明Kafka-connectorサービスがすでに実行中の場合は、このステップをスキップできます。odps-sink-connector.jsonファイルを作成して設定し、odps-sink-connector.jsonファイルを任意の場所にアップロードします。このトピックでは、例として$KAFKA_HOME/configパスを使用します。次のコードは
odps-sink-connector.jsonファイルの例です。odps-sink-connector.jsonファイルの詳細については、「Kafka コネクタタスクの設定と起動」をご参照ください。{ "name": "odps-test-flatten", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_flatten", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema":"default", "table": "table_flatten", "account_type": "ALIYUN", "access_id": "<yourAccessKeyId>", "access_key": "<yourAccessKeySecret>", "partition_window_type": "MINUTE", "mode":"VALUE", "format":"FLATTEN", "sink_pool_size":"150", "record_batch_size":"9000", "buffer_size_kb":"600000" } }次のコマンドを実行して、Kafka コネクタタスクを開始します。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json結果の検証
ローカルクライアント (odpscmd) または MaxCompute SQL コマンドを実行できる別のツールを使用して MaxCompute に接続し、次のコマンドを実行してデータをクエリし、結果を検証します。
set odps.sql.allow.fullscan=true; select * from table_flatten;次の結果が表示されます:
# JSON データは解析されて MaxCompute テーブルに書き込まれ、extendinfo はネストをサポートする JSON フィールドになります。 +-------+------------+--------+-----+------+------------+----+ | topic | partition | offset | id | name | extendinfo | pt | +-------+------------+--------+-----+------+------------+----+ | topic_flatten | 0 | 0 | 123 | json-1 | {"sex":"M","region":"beijing"} | 07-14-2023 01:33 | | topic_flatten | 0 | 1 | 456 | json-2 | {"sex":"W","region":"hangzhou"} | 07-14-2023 01:33 | +-------+------------+--------+-----+------+------------+----+
異常データの処理例
データの準備
ローカルクライアント (odpscmd) または MaxCompute SQL コマンドを実行できる別のツールを使用して MaxCompute に接続し、ターゲットテーブルを作成します。
CREATE TABLE IF NOT EXISTS table_flatten( topic STRING, `partition` BIGINT, `offset` BIGINT, id BIGINT, name STRING, extendinfo JSON ) PARTITIONED BY (pt STRING);Kafka データの作成
$KAFKA_HOME/bin/ディレクトリで、次のコマンドを実行して Kafka トピックを作成します。topic_abnormalトピック。sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_abnormalruntime_error例外のメッセージトピック。sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic runtime_error説明データ書き込み操作中にエラーが発生した場合、異常データは
runtime_errorトピックに書き込まれます。この種のエラーは通常、Kafka データと MaxCompute テーブルスキーマの不一致が原因で発生します。
次のコマンドを実行して Kafka メッセージを作成します。
次のコマンドのメッセージの1つが、ターゲットの MaxCompute テーブルのスキーマと一致しません。
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic flatten_test --property parse.key=true >100 {"id":100,"name":"json-3","extendinfo":{"region":"beijing","gender":"M"}} >101 {"id":101,"name":"json-4","extendinfos":"null"} >102 {"id":102,"name":"json-5","extendinfo":{"region":"beijing","gender":"M"}}
(任意)
Kafka-connectorサービスを起動します。詳細については、「Kafka コネクタサービスの設定と起動」をご参照ください。説明Kafka-connectorサービスがすでに実行中の場合は、このステップをスキップできます。odps-sink-connector.jsonファイルを作成して設定し、odps-sink-connector.jsonファイルを任意の場所にアップロードします。このトピックでは、例として$KAFKA_HOME/configパスを使用します。次のコードは
odps-sink-connector.jsonファイルの例です。odps-sink-connector.jsonファイルの詳細については、「Kafka コネクタタスクの設定と起動」をご参照ください。{ "name": "odps-test-runtime-error", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_abnormal", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema":"default", "table": "test_flatten", "account_type": "ALIYUN", "access_id": "<yourAccessKeyId>", "access_key": "<yourAccessKeySecret>", "partition_window_type": "MINUTE", "mode":"VALUE", "format":"FLATTEN", "sink_pool_size":"150", "record_batch_size":"9000", "buffer_size_kb":"600000", "runtime.error.topic.name":"runtime_error", "runtime.error.topic.bootstrap.servers":"http://XXXX", "skip_error":"false" } }次のコマンドを実行して、Kafka コネクタタスクを開始します。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json結果の検証
MaxCompute テーブルデータのクエリ
ローカルクライアント (odpscmd) または MaxCompute SQL コマンドを実行できる別のツールを使用して MaxCompute に接続し、次のコマンドを実行してデータをクエリし、結果を検証します。
set odps.sql.allow.fullscan=true; select * from table_flatten;次の出力が返されます:
# 結果からわかるように、ID 101 のデータはテーブルスキーマと一致しなかったため、MaxCompute に書き込まれませんでした。 # runtime.error.topic.name パラメーターが設定されていたため、プロセスはブロックされず、後続のデータは正常に書き込まれました。 +-------+------------+------------+------------+------+------------+----+ | topic | partition | offset | id | name | extendinfo | pt | +-------+------------+------------+------------+------+------------+----+ | flatten_test | 0 | 0 | 123 | json-1 | {"gender":"M","region":"beijing"} | 07-14-2023 01:33 | | flatten_test | 0 | 1 | 456 | json-2 | {"gender":"W","region":"hangzhou"} | 07-14-2023 01:33 | | flatten_test | 0 | 0 | 123 | json-1 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 | | flatten_test | 0 | 1 | 456 | json-2 | {"gender":"W","region":"hangzhou"} | 07-14-2023 13:16 | | flatten_test | 0 | 2 | 100 | json-3 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 | | flatten_test | 0 | 4 | 102 | json-5 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 | +-------+------------+------------+------------+------+------------+----+runtime_errorトピックのメッセージのクエリ$KAFKA_HOME/bin/ディレクトリで、次のコマンドを実行してメッセージを表示します。sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic runtime_error --from-beginning次の結果が返されます:
# 異常データは正常に runtime_error メッセージキューに書き込まれます。 {"id":101,"name":"json-4","extendinfos":"null"}