MaxComputeとKafkaの統合により、効率的で信頼性の高いデータ処理および分析機能が提供されます。 MaxComputeとKafkaの統合は、リアルタイム処理、大規模なデータストリーム、複雑なデータ分析が必要なシナリオに適しています。 このトピックでは、ApsaraMQ for KafkaおよびセルフマネージドKafkaのデータをMaxComputeにインポートする方法について説明します。 このトピックでは、セルフマネージドKafkaのデータをMaxComputeにインポートする方法の例も提供します。
ApsaraMQ for KafkaからMaxComputeへのデータのインポート
MaxComputeはApsaraMQ for Kafkaと密接に統合されています。 ApsaraMQ for Kafkaが提供するMaxComputeシンクコネクタを直接使用して、特定のトピックのデータをMaxComputeテーブルに継続的にインポートできます。 サードパーティのツールを使用したり、カスタム開発を実行したりする必要はありません。 MaxComputeシンクコネクタの作成方法の詳細については、「MaxComputeシンクコネクタの作成」をご参照ください。
セルフマネージドApache KafkaからMaxComputeへのデータのインポート
前提条件
V2.2以降のKafkaサービスがデプロイされ、Kafkaトピックが作成されます。 V3.4.0のKafkaサービスをデプロイすることを推奨します。
MaxComputeプロジェクトとMaxComputeテーブルが作成されます。 詳細については、「MaxCompute プロジェクトを作成する」および「テーブルの作成」をご参照ください。
注意事項
Kafka-connectorサービスを使用すると、TEXT、CSV、JSON、またはFLATTENタイプのKafkaデータをMaxComputeに書き込むことができます。 さまざまな種類のKafkaデータを書き込む場合は、次の項目に注意してください。 データ型の詳細については、formatパラメーターの説明をご参照ください。
次の表に、TEXTまたはJSONタイプのKafkaデータが書き込まれるMaxComputeテーブルの要件を示します。
フィールド名
データ型
必須
topic
STRING
はい。
partition
BIGINT
はい。
offset
BIGINT
はい。
key
TEXT型のKafkaデータを記述する場合、フィールドはSTRING型でなければなりません。
JSON型のKafkaデータを記述する場合、フィールドは、記述されたデータのデータ型設定に基づいてSTRING型またはJSON型にすることができます。
このフィールドは、KafkaメッセージのキーをMaxComputeテーブルに同期する必要がある場合に必要です。 KafkaメッセージがMaxComputeに同期されるモードの詳細については、modeパラメーターの説明をご参照ください。
value
TEXT型のKafkaデータを記述する場合、フィールドはSTRING型でなければなりません。
JSON型のKafkaデータを記述する場合、フィールドは、記述されたデータのデータ型設定に基づいてSTRING型またはJSON型にすることができます。
このフィールドは、Kafkaメッセージの値をMaxComputeテーブルに同期する必要がある場合に必要です。 KafkaメッセージがMaxComputeに同期されるモードの詳細については、modeパラメーターの説明をご参照ください。
pt
STRING (パーティションフィールド)
はい。
FLATTEN型またはCSV型のKafkaデータをMaxComputeに書き込む場合、次の表に示すフィールドが含まれ、必要なデータ型である必要があります。 書き込まれたデータに基づいてカスタムフィールドを設定することもできます。
フィールド名
データ型
topic
STRING
partition
BIGINT
offset
BIGINT
pt
STRING (パーティションフィールド)
CSVタイプのKafkaデータをMaxComputeテーブルに書き込む場合、MaxComputeテーブルのカスタムフィールドシーケンスとフィールドタイプは、Kafkaデータのものと一致している必要があります。 これにより、Kafkaデータを正しく書き込むことができます。
FLATTEN型のKafkaデータをMaxComputeテーブルに書き込む場合、MaxComputeテーブルのカスタムフィールド名は、Kafkaデータのフィールド名と一致している必要があります。 これにより、Kafkaデータを正しく書き込むことができます。
たとえば、書き込むFLATTEN型のKafkaデータが
{"A":a,"B":"b","C":{"D":"d","E":"e"}}の場合、次の文を実行してデータを保存するMaxComputeテーブルを作成できます。CREATE TABLE IF NOT EXISTS table_flatten( topic STRING, `partition` BIGINT, `offset` BIGINT, A BIGINT, B STRING, C JSON ) PARTITIONED BY (pt STRING);
Kafka-connectorサービスの設定と開始
Linux環境で、CLIで次のコマンドを実行するか、ダウンロードリンクをクリックして
kafka-connector-2.0.jarパッケージをダウンロードします。wget http://maxcompute-repo.oss-cn-hangzhou.aliyuncs.com/kafka/kafka-connector-2.0.jar依存関係の競合を防ぐために、
$KAFKA_HOME/libsにconnectorなどのサブフォルダーを作成してkafka-connector-2.0.jarパッケージを格納することを推奨します。説明kafka-connector-2.0.jarパッケージの展開環境がKafkaデータの展開環境と同じでない場合は、aliware-Kafka-demosに記載されている手順に従って、kafka-connectorサービスを設定して起動する必要があります。$KAFKA_HOME/configディレクトリで、connect-distributed.propertiesファイルを設定します。connect-distributed.propertiesファイルに次の設定を追加します。## Add the following content: plugin.path=<KAFKA_HOME>/libs/connector ## Update the values of the key.converter and value.converter parameters. key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter$KAFKA_HOME/ディレクトリで次のコマンドを実行し、Kafka-connectorサービスを開始します。## Run the following command: bin/connect-distributed.sh config/connect-distributed.properties &
Kafka-connectorタスクの設定と開始
odps-sink-connector.jsonファイルを作成して構成し、odps-sink-connector.jsonファイルを任意の場所にアップロードします。次のコードと表は、
odps-sink-connector.jsonファイルの内容とパラメーターを示しています。{ "name": "Kafka connector task name", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "your_topic", "endpoint": "endpoint", "tunnel_endpoint": "your_tunnel endpoint", "project": "project", "schema":"default", "table": "your_table", "account_type": "account type (STS or ALIYUN)", "access_id": "access id", "access_key": "access key", "account_id": "account id for sts", "sts.endpoint": "sts endpoint", "region_id": "region id for sts", "role_name": "role name for sts", "client_timeout_ms": "STS Token valid period (ms)", "format": "TEXT", "mode": "KEY", "partition_window_type": "MINUTE", "use_streaming": false, "buffer_size_kb": 65536, "sink_pool_size":"150", "record_batch_size":"8000", "runtime.error.topic.name":"kafka topic when runtime errors happens", "runtime.error.topic.bootstrap.servers":"kafka bootstrap servers of error topic queue", "skip_error":"false" } }共通パラメーター
パラメーター
必須
説明
name
必須
タスクの名前。 名前は一意にする必要があります。
connector.class
必須
Kafka-connectorサービスのクラス名。 デフォルト値:com.aliyun.odps.kafka.connect.MaxComputeSinkConnectortasks.max
必須
Kafka-connectorサービスのコンシューマプロセスの最大数。 値は0より大きい整数でなければなりません。トピック
必須
Kafkaトピックの名前。
endpoint
必須
MaxComputeのエンドポイント。
このパラメーターは、MaxComputeプロジェクトの作成時に選択したリージョンとネットワーク接続タイプに基づいて設定する必要があります。 各リージョンの異なるネットワークタイプのエンドポイントの詳細については、「エンドポイント」をご参照ください。
tunnel_endpoint
選択可能
MaxCompute Tunnelのパブリックエンドポイント。
このパラメーターを設定しないと、トラフィックはMaxComputeが存在するネットワークに対応するTunnelエンドポイントに自動的にルーティングされます。 このパラメーターを設定すると、トラフィックは指定されたエンドポイントにルーティングされ、自動ルーティングは実行されません。
各リージョンの異なるネットワークタイプのトンネルエンドポイントの詳細については、「エンドポイント」をご参照ください。
project
必須
アクセスするMaxComputeプロジェクトの名前。
schema
選択可能
このパラメーターは、ターゲットMaxComputeプロジェクトに3層のスキーマモデルがある場合に必要です。 デフォルト値: Default。
ターゲットMaxComputeプロジェクトに3層スキーマモデルがない場合、このパラメーターを設定する必要はありません。
スキーマの詳細については、「スキーマ関連の操作」をご参照ください。
table
必須
ターゲットMaxComputeプロジェクト内のテーブルの名前。
format
選択可能
書かれたメッセージの形式。 有効な値:
TEXT: 文字列。 デフォルト値です。
BINARY: バイト配列。
CSV: コンマ (,) で区切られた文字列のリスト。
JSON: JSON文字列。 MaxCompute JSONデータ型の詳細については、「MaxComputeのJSON型を使用する手順 (ベータ版) 」をご参照ください。
FLATTEN: JSON文字列。 JSON文字列のキーと値が解析され、指定されたMaxComputeテーブルに書き込まれます。 JSON文字列のキーは、MaxComputeテーブルの列名に対応している必要があります。
さまざまな形式のメッセージをインポートする方法の詳細については、「例」をご参照ください。
mode
選択可能
メッセージがMaxComputeに同期されるモード。 有効な値:
KEY: メッセージのキーのみが保持され、宛先MaxComputeテーブルに書き込まれます。
VALUE: メッセージの値のみが保持され、宛先MaxComputeテーブルに書き込まれます。
DEFAULT: メッセージのキーと値の両方が保持され、宛先MaxComputeテーブルに書き込まれます。 デフォルト値です。
このパラメーターをDEFAULTに設定すると、TEXT型またはBINARY型のデータのみを書き込むことができます。
partition_window_type
選択可能
データは、システム時間に基づいて分割される。 有効な値: DAY、HOUR、MINUTE。 デフォルト値: HOUR。
use_streaming
選択可能
Streaming Tunnelを使用するかどうかを指定します。 有効な値:
false: Streaming Tunnelは使用されません。 デフォルト値です。
true: ストリーミングトンネルが使用されます。
buffer_size_kb
選択可能
odpsパーティションライターの内部バッファサイズ。 (単位:KB) デフォルトのサイズは65,536 KBです。
sink_pool_size
選択可能
マルチスレッド書き込み用のスレッドの最大数。 デフォルト値は、システム内のCPUコアの数です。
record_batch_size
選択可能
Kafka-connectorタスクでスレッドが同時に送信できるメッセージの最大数。
skip_error
選択可能
不明なエラーが発生したときに生成されるレコードをスキップするかどうかを指定します。 有効な値:
false: レコードはスキップされません。 デフォルト値です。
true: レコードはスキップされます。
説明skip_errorがfalseに設定され、runtime.error.topic.nameパラメーターが設定されていない場合、その後のデータ書き込み操作は停止され、プロセスはブロックされ、不明なエラーが発生すると例外がログに記録されます。
skip_errorがtrueに設定されていて、runtime.error.topic.nameパラメーターが設定されていない場合、データの書き込みプロセスはデータの書き込みを続行し、異常なデータは破棄されます。
skip_errorパラメーターがfalseに設定され、runtime.error.topic.nameパラメーターが設定されている場合、データ書き込みのプロセスはデータの書き込みを続行し、runtime.error.topic.nameトピックで指定されたトピックに異常なデータが記録されます。
異常データの処理例については、「異常データの処理」をご参照ください。
runtime.error.topic.name
選択可能
不明なエラーが発生したときにデータが書き込まれるKafkaトピックの名前。
runtime.error.topic.bootstrap.servers
選択可能
ブートストラップサーバー構成のアドレス。 アドレスは、未知のエラーが発生したときにデータが書き込まれるKafkaブローカーのアドレスです。
account_type
必須
ターゲットMaxComputeサービスへのアクセスに使用されるメソッド。 有効な値: STSおよびALIYUN。 デフォルト値: ALIYUN。
MaxComputeにアクセスするには、異なる方法に対して異なるアクセス資格情報パラメーターを設定する必要があります。 詳細については、このトピックの「ALIYUNメソッドを使用したMaxComputeへのアクセス」および「STSメソッドを使用したMaxComputeへのアクセス」をご参照ください。
ALIYUNメソッドを使用したMaxComputeへのアクセス: 共通パラメーターに加えて、次のパラメーターを設定する必要があります。
パラメーター
説明
access_id
Alibaba CloudアカウントまたはAlibaba Cloudアカウント内のRAMユーザーのAccessKey ID。
AccessKeyペアページからAccessKey IDを取得できます。
access_key
AccessKey IDに対応するAccessKeyシークレット。
STSメソッドを使用したMaxComputeへのアクセス: 共通パラメーターに加えて、次のパラメーターを設定する必要があります。
パラメーター
説明
account_id
ターゲットMaxComputeプロジェクトへのアクセスに使用されるアカウントのID。 アカウントセンターでアカウントIDを確認できます。
region_id
ターゲットMaxComputeプロジェクトが存在するリージョンのID。 各リージョンのIDの詳細については、「エンドポイント」をご参照ください。
role_name
ターゲットMaxComputeプロジェクトへのアクセスに使用されるロールの名前。 ロール名は [ロール] ページで確認できます。
client_timeout_ms
STSトークンが更新される間隔。 単位:ミリ秒。 デフォルト値: 11。
sts.endpoint
ID認証にSTSトークンを使用する場合に必要なSTSサービスエンドポイント。
各リージョンの異なるネットワークタイプのエンドポイントの詳細については、「エンドポイント」をご参照ください。
次のコマンドを実行して、Kafka-connectorタスクを開始します。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @odps-sink-connector.json
例
TEXTタイプの書き込みデータ
データを準備します。
MaxComputeクライアント (odpscmd) またはMaxCompute SQLを実行できる別のツールを使用して、MaxComputeテーブルを作成します。
CREATE TABLE IF NOT EXISTS table_text( topic STRING, `partition` BIGINT, `offset` BIGINT, key STRING, value STRING ) PARTITIONED BY (pt STRING);Kafkaデータを作成します。
$KAFKA_HOME/bin/ディレクトリで、次のコマンドを実行してKafkaトピックを作成します。 この例では、topic_textという名前のKafkaトピックが作成されます。sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_text次のコマンドを実行してKafkaメッセージを作成します。
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_text --property parse.key=true >123 abc >456 edf
(オプション)
Kafka-connectorサービスを開始します。 詳細については、「Kafka-connectorサービスの設定と開始」をご参照ください。説明Kafka-connectorサービスが開始されている場合は、この手順をスキップします。odps-sink-connector.jsonファイルを作成して構成し、odps-sink-connector.jsonファイルを任意の場所にアップロードします。 この例では、odps-sink-connector.jsonファイルは$KAFKA_HOME/configディレクトリにアップロードされます。次のコードは、
odps-sink-connector.jsonファイルの内容を示しています。odps-sink-connector.jsonファイルの詳細については、「Kafka-connectorタスクの設定と開始」をご参照ください。{ "name": "odps-test-text", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_text", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema":"default", "table": "table_text", "account_type": "ALIYUN", "access_id": "<yourAccessKeyId>", "access_key": "<yourAccessKeySecret>", "partition_window_type": "MINUTE", "mode":"VALUE", "format":"TEXT", "sink_pool_size":"150", "record_batch_size":"9000", "buffer_size_kb":"600000" } }次のコマンドを実行して、Kafka-connectorタスクを開始します。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json結果を確認します。
MaxComputeクライアント (odpscmd) またはMaxCompute SQLを実行できる別のツールで次のコマンドを実行して、データ書き込み結果を照会します。
set odps.sql.allow.fullscan=true; select * from table_text;次の応答が返されます。
# The mode value in the odps-sink-connector.json configuration file is VALUE. Therefore, only the value is retained and the key field is NULL. +-------+------------+------------+-----+-------+----+ | topic | partition | offset | key | value | pt | +-------+------------+------------+-----+-------+----+ | topic_text | 0 | 0 | NULL | abc | 07-13-2023 21:13 | | topic_text | 0 | 1 | NULL | edf | 07-13-2023 21:13 | +-------+------------+------------+-----+-------+----+
CSVタイプのデータの書き込み
データを準備します。
MaxComputeクライアント (odpscmd) またはMaxCompute SQLを実行できる別のツールを使用して、ターゲットMaxComputeテーブルを作成します。
CREATE TABLE IF NOT EXISTS table_csv( topic STRING, `partition` BIGINT, `offset` BIGINT, id BIGINT, name STRING, region STRING ) PARTITIONED BY (pt STRING);Kafkaデータを作成します。
$KAFKA_HOME/bin/ディレクトリで、次のコマンドを実行してKafkaトピックを作成します。 この例では、topic_csvという名前のKafkaトピックが作成されます。sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_csv次のコマンドを実行してKafkaメッセージを作成します。
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_csv --property parse.key=true >123 1103,zhangsan,china >456 1104,lisi,usa
(オプション)
Kafka-connectorサービスを開始します。 詳細については、「Kafka-connectorサービスの設定と開始」をご参照ください。説明Kafka-connectorサービスが開始されている場合は、この手順をスキップします。odps-sink-connector.jsonファイルを作成して構成し、odps-sink-connector.jsonファイルを任意の場所にアップロードします。 この例では、odps-sink-connector.jsonファイルは$KAFKA_HOME/configディレクトリにアップロードされます。次のコードは、
odps-sink-connector.jsonファイルの内容を示しています。odps-sink-connector.jsonファイルの詳細については、「Kafka-connectorタスクの設定と開始」をご参照ください。{ "name": "odps-test-csv", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_csv", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema":"default", "table": "table_csv", "account_type": "ALIYUN", "access_id": "<yourAccessKeyId>", "access_key": "<yourAccessKeySecret>", "partition_window_type": "MINUTE", "format":"CSV", "mode":"VALUE", "sink_pool_size":"150", "record_batch_size":"9000", "buffer_size_kb":"600000" } }次のコマンドを実行して、Kafka-connectorタスクを開始します。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json結果を確認します。
MaxComputeクライアント (odpscmd) またはMaxCompute SQLを実行できる別のツールで次のコマンドを実行して、データ書き込み結果を照会します。
set odps.sql.allow.fullscan=true; select * from table_csv;次の応答が返されます。
+-------+------------+------------+------------+------+--------+----+ | topic | partition | offset | id | name | region | pt | +-------+------------+------------+------------+------+--------+----+ | csv_test | 0 | 0 | 1103 | zhangsan | china | 07-14-2023 00:10 | | csv_test | 0 | 1 | 1104 | lisi | usa | 07-14-2023 00:10 | +-------+------------+------------+------------+------+--------+----+
JSON型のデータの書き込み
データを準備します。
MaxComputeクライアント (odpscmd) またはMaxCompute SQLを実行できる別のツールを使用して、ターゲットMaxComputeテーブルを作成します。
CREATE TABLE IF NOT EXISTS table_json( topic STRING, `partition` BIGINT, `offset` BIGINT, key STRING, value JSON ) PARTITIONED BY (pt STRING);Kafkaデータを作成します。
$KAFKA_HOME/bin/ディレクトリで、次のコマンドを実行してKafkaトピックを作成します。 この例では、topic_jsonという名前のトピックが作成されます。sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_json次のコマンドを実行してKafkaメッセージを作成します。
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_json --property parse.key=true >123 {"id":123,"name":"json-1","region":"beijing"} >456 {"id":456,"name":"json-2","region":"hangzhou"}
(オプション)
Kafka-connectorサービスを開始します。 詳細については、「Kafka-connectorサービスの設定と開始」をご参照ください。説明Kafka-connectorサービスが開始されている場合は、この手順をスキップします。odps-sink-connector.jsonファイルを作成して構成し、odps-sink-connector.jsonファイルを任意の場所にアップロードします。 この例では、odps-sink-connector.jsonファイルは$KAFKA_HOME/configディレクトリにアップロードされます。次のコードは、
odps-sink-connector.jsonファイルの内容を示しています。odps-sink-connector.jsonファイルの詳細については、「Kafka-connectorタスクの設定と開始」をご参照ください。{ "name": "odps-test-json", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_json", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema":"default", "table": "table_json", "account_type": "ALIYUN", "access_id": "<yourAccessKeyId>", "access_key": "<yourAccessKeySecret>", "partition_window_type": "MINUTE", "mode":"VALUE", "format":"JSON", "sink_pool_size":"150", "record_batch_size":"9000", "buffer_size_kb":"600000" } }次のコマンドを実行して、Kafka-connectorタスクを開始します。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json結果を確認します。
MaxComputeクライアント (odpscmd) またはMaxCompute SQLを実行できる別のツールで次のコマンドを実行して、データ書き込み結果を照会します。
set odps.sql.allow.fullscan=true; select * from table_json;次の応答が返されます。
# Write JSON data to the value field. +-------+------------+------------+-----+-------+----+ | topic | partition | offset | key | value | pt | +-------+------------+------------+-----+-------+----+ | Topic_json | 0 | 0 | NULL | {"id":123,"name":"json-1","region":"beijing"} | 07-14-2023 00:28 | | Topic_json | 0 | 1 | NULL | {"id":456,"name":"json-2","region":"hangzhou"} | 07-14-2023 00:28 | +-------+------------+------------+-----+-------+----+
FLATTENタイプの書き込みデータ
データを準備します。
MaxComputeクライアント (odpscmd) またはMaxCompute SQLを実行できる別のツールを使用して、ターゲットMaxComputeテーブルを作成します。
CREATE TABLE IF NOT EXISTS table_flatten( topic STRING, `partition` BIGINT, `offset` BIGINT, id BIGINT, name STRING, extendinfo JSON ) PARTITIONED BY (pt STRING);Kafkaデータを作成します。
$KAFKA_HOME/bin/ディレクトリで、次のコマンドを実行してKafkaトピックを作成します。 この例では、topic_flattenという名前のトピックが作成されます。./kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_flatten次のコマンドを実行してKafkaメッセージを作成します。
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_flatten --property parse.key=true >123 {"id":123,"name":"json-1","extendinfo":{"region":"beijing","sex":"M"}} >456 {"id":456,"name":"json-2","extendinfo":{"region":"hangzhou","sex":"W"}}
(オプション)
Kafka-connectorサービスを開始します。 詳細については、「Kafka-connectorサービスの設定と開始」をご参照ください。説明Kafka-connectorサービスが開始されている場合は、この手順をスキップします。odps-sink-connector.jsonファイルを作成して構成し、odps-sink-connector.jsonファイルを任意の場所にアップロードします。 この例では、odps-sink-connector.jsonファイルは$KAFKA_HOME/configディレクトリにアップロードされます。次のコードは、
odps-sink-connector.jsonファイルの内容を示しています。odps-sink-connector.jsonファイルの詳細については、「Kafka-connectorタスクの設定と開始」をご参照ください。{ "name": "odps-test-flatten", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_flatten", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema":"default", "table": "table_flatten", "account_type": "ALIYUN", "access_id": "<yourAccessKeyId>", "access_key": "<yourAccessKeySecret>", "partition_window_type": "MINUTE", "mode":"VALUE", "format":"FLATTEN", "sink_pool_size":"150", "record_batch_size":"9000", "buffer_size_kb":"600000" } }次のコマンドを実行して、Kafka-connectorタスクを開始します。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json結果を確認します。
MaxComputeクライアント (odpscmd) またはMaxCompute SQLを実行できる別のツールで次のコマンドを実行して、データ書き込み結果を照会します。
set odps.sql.allow.fullscan=true; select * from table_flatten;次の応答が返されます。
# JSON data is parsed and written to the MaxCompute table. The exteninfo field in the nested JSON format can be a JSON field. +-------+------------+--------+-----+------+------------+----+ | topic | partition | offset | id | name | extendinfo | pt | +-------+------------+--------+-----+------+------------+----+ | topic_flatten | 0 | 0 | 123 | json-1 | {"sex":"M","region":"beijing"} | 07-14-2023 01:33 | | topic_flatten | 0 | 1 | 456 | json-2 | {"sex":"W","region":"hangzhou"} | 07-14-2023 01:33 | +-------+------------+--------+-----+------+------------+----+
プロセス異常データ
データを準備します。
MaxComputeクライアント (odpscmd) またはMaxCompute SQLを実行できる別のツールを使用して、ターゲットMaxComputeテーブルを作成します。
CREATE TABLE IF NOT EXISTS table_flatten( topic STRING, `partition` BIGINT, `offset` BIGINT, id BIGINT, name STRING, extendinfo JSON ) PARTITIONED BY (pt STRING);Kafkaデータを作成します。
$KAFKA_HOME/bin/ディレクトリで、次のKafkaトピックを作成するコマンドを実行します。トピック_異常sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_abnormalruntime_errorsh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic runtime_error説明データの書き込み時に不明なエラーが発生した場合、異常なデータは
runtime_errorトピックに書き込まれます。 ほとんどの場合、Kafkaデータの形式がMaxComputeテーブルの形式と一致していないため、不明なエラーが発生します。
次のコマンドを実行してKafkaメッセージを作成します。
次のメッセージでは、1つのメッセージのデータ形式がMaxComputeテーブルの形式と同じではありません。
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic flatten_test --property parse.key=true >100 {"id":100,"name":"json-3","extendinfo":{"region":"beijing","gender":"M"}} >101 {"id":101,"name":"json-4","extendinfos":"null"} >102 {"id":102,"name":"json-5","extendinfo":{"region":"beijing","gender":"M"}}
(オプション)
Kafka-connectorサービスを開始します。 詳細については、「Kafka-connectorサービスの設定と開始」をご参照ください。説明Kafka-connectorサービスが開始されている場合は、この手順をスキップします。odps-sink-connector.jsonファイルを作成して構成し、odps-sink-connector.jsonファイルを任意の場所にアップロードします。 この例では、odps-sink-connector.jsonファイルは$KAFKA_HOME/configディレクトリにアップロードされます。次のコードは、
odps-sink-connector.jsonファイルの内容を示しています。odps-sink-connector.jsonファイルの詳細については、「Kafka-connectorタスクの設定と開始」をご参照ください。{ "name": "odps-test-runtime-error", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_abnormal", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema":"default", "table": "test_flatten", "account_type": "ALIYUN", "access_id": "<yourAccessKeyId>", "access_key": "<yourAccessKeySecret>", "partition_window_type": "MINUTE", "mode":"VALUE", "format":"FLATTEN", "sink_pool_size":"150", "record_batch_size":"9000", "buffer_size_kb":"600000", "runtime.error.topic.name":"runtime_error", "runtime.error.topic.bootstrap.servers":"http://XXXX", "skip_error":"false" } }次のコマンドを実行して、Kafka-connectorタスクを開始します。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json結果を確認します。
MaxComputeテーブルのデータを照会します。
MaxComputeクライアント (odpscmd) またはMaxCompute SQLを実行できる別のツールで次のコマンドを実行して、データ書き込み結果を照会します。
set odps.sql.allow.fullscan=true; select * from table_flatten;次の応答が返されます。
# The last two records are displayed. This is because the skip_error parameter is set to true. The data with the id of 101 is not written to the MaxCompute table, and subsequent records are not blocked from being written to the MaxCompute table. +-------+------------+------------+------------+------+------------+----+ | topic | partition | offset | id | name | extendinfo | pt | +-------+------------+------------+------------+------+------------+----+ | flatten_test | 0 | 0 | 123 | json-1 | {"gender":"M","region":"beijing"} | 07-14-2023 01:33 | | flatten_test | 0 | 1 | 456 | json-2 | {"gender":"W","region":"hangzhou"} | 07-14-2023 01:33 | | flatten_test | 0 | 0 | 123 | json-1 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 | | flatten_test | 0 | 1 | 456 | json-2 | {"gender":"W","region":"hangzhou"} | 07-14-2023 13:16 | | flatten_test | 0 | 2 | 100 | json-3 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 | | flatten_test | 0 | 4 | 102 | json-5 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 | +-------+------------+------------+------------+------+------------+----+runtime_errorトピックのメッセージを照会します。$KAFKA_HOME/bin/ディレクトリで、次のコマンドを実行してメッセージの書き込み結果を表示します。sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic runtime_error --from-beginning次の応答が返されます。
# Abnormal data is written to the runtime_error topic. {"id":101,"name":"json-4","extendinfos":"null"}