Kafka Producer SDK、Beats、Collectd、Fluentd、Logstash、Telegraf、Vector などのログ収集ツールを使用して、Kafka プロトコル経由で Simple Log Service (SLS) へログをアップロードします。本トピックでは、これらのツールから SLS へ Kafka プロトコルを用いてログをアップロードする方法について説明します。
使用制限
サポートされる Kafka プロトコルの最も古いバージョンは 2.1.0 です。
安全なログ送信を保証するため、SASL_SSL 接続プロトコルを使用する必要があります。
権限
以下のいずれかの権限が必要です。
このポリシーは、SLS の管理権限を付与します。権限の付与方法については、「RAM ユーザーへの権限付与」および「RAM ロールの権限管理」をご参照ください。
カスタムポリシー
カスタムポリシーを作成するには、スクリプトエディター タブで、構成ボックス内の既存の内容を以下のスクリプトに置き換えます。詳細については、「カスタムポリシーの作成」をご参照ください。
説明スクリプト内で、
project_nameを実際のプロジェクト名に置き換えてください。{ "Version": "1", "Statement": [ { "Action": "log:GetProject", "Resource": "acs:log:*:*:project/project_name", "Effect": "Allow" }, { "Action": [ "log:GetLogStore", "log:ListShards", "log:PostLogStoreLogs" ], "Resource": "acs:log:*:*:project/project_name/logstore/*", "Effect": "Allow" } ] }作成したカスタムポリシーを RAM ユーザーにアタッチします。詳細については、「RAM ユーザーへの権限付与」をご参照ください。
構成
Kafka プロトコルを用いてログをアップロードする場合、以下のパラメーターを設定する必要があります。
構成名 | 構成値 | 説明 | 例 |
SLS_KAFKA_ENDPOINT | 初期接続に使用するクラスターのエンドポイント。形式は |
| aliyun-project-test はプロジェクト名、
|
SLS_PROJECT | プロジェクト名 | SLS プロジェクトの名称です。 | aliyun-project-test |
|
SLS_LOGSTORE |
Logstore 名 |
SLS の Logstore の名称です。Logstore 名の末尾に |
たとえば、Logstore 名が
|
SLS_PASSWORD | SLS への書き込み権限を持つ AccessKey Secret です。 | AccessKey ペアとは何か、およびその作成方法については、「AccessKey ペアの作成」をご参照ください。 値は、AccessKey ID と AccessKey Secret を
| LTAI****************#yourAccessKeySecret |
Kafka コンシューマーグループを用いて SLS からリアルタイムでデータを消費したい場合は、チケットを送信し、Alibaba Cloud テクニカルサポートまでお問い合わせください。
例 1:Beats を用いたログのアップロード
MetricBeat、PacketBeat、Winlogbeat、Auditbeat、Filebeat、Heartbeat などの Beats は、ログを収集し、Kafka プロトコル経由で SLS へアップロードできます。
構成例
例に記載されている
SLS_で始まるパラメーターの詳細については、「構成」をご参照ください。output.kafka: # クラスターメタデータ読み取りのための初期ブローカー hosts: ["SLS_KAFKA_ENDPOINT"] username: "SLS_PROJECT" password: "SLS_PASSWORD" ssl.certificate_authorities: # メッセージトピック選択 + パーティショニング topic: 'SLS_LOGSTORE' partition.round_robin: reachable_only: false required_acks: 1 compression: gzip max_message_bytes: 1000000
例 2:Collectd を用いたログのアップロード
Collectd は、システムおよびアプリケーションのパフォーマンスメトリクスを定期的に収集し、Kafka プロトコル経由で SLS へアップロードするデーモンです。詳細については、「Write Kafka Plugin」をご参照ください。
Collectd で収集したログを SLS へアップロードするには、Kafka プラグインおよびその依存関係もインストールする必要があります。たとえば CentOS では、sudo yum install collectd-write_kafka コマンドを実行して Kafka プラグインをインストールします。RPM パッケージマネージャー (RPM) パッケージのインストール方法については、「Collectd-write_kafka」をご参照ください。
構成例
この例では出力形式を JSON に設定しています。Command や Graphite など、他の形式もサポートされています。詳細については、「Collectd 構成ドキュメント」をご参照ください。
例に記載されている
SLS_で始まるパラメーターの詳細については、「構成」をご参照ください。
LoadPlugin write_kafka <Plugin write_kafka> Property "metadata.broker.list" "SLS_KAFKA_ENDPOINT" Property "security.protocol" "sasl_ssl" Property "sasl.mechanism" "PLAIN" Property "sasl.username" "SLS_PROJECT" Property "sasl.password" "SLS_PASSWORD" Property "broker.address.family" "v4" <Topic "SLS_LOGSTORE"> Format JSON Key "content" </Topic> </Plugin>
例 3:Telegraf を用いたログのアップロード
Telegraf は、Go 言語で実装されたエージェントプログラムであり、少ないメモリ使用量でデータメトリクスの収集、処理、および集計を行います。Telegraf は多様なプラグインおよび統合機能を提供します。Telegraf を使用してホストシステムからさまざまなメトリクスを取得したり、サードパーティ API からメトリクスを取得したり、statsd や Kafka コンシューマーサービスを介してメトリクスを受信したりできます。
Telegraf で収集したログを Kafka プロトコル経由で SLS へアップロードする前に、構成ファイルを変更する必要があります。
構成例
この例では出力形式を JSON に設定しています。Graphite や Carbon2 など、他の形式もサポートされています。詳細については、「Telegraf 出力形式」をご参照ください。
説明Telegraf では、tls_ca に対して有効なパスを設定する必要があります。サーバーが提供するルート証明書のパスを使用してください。Linux 環境では、ルート CA 証明書のパスは通常 /etc/ssl/certs/ca-bundle.crt です。
例に記載されている
SLS_で始まるパラメーターの詳細については、「構成」をご参照ください。
# Kafka 出力プラグインの構成 [[outputs.kafka]] ## Kafka ブローカーの URL brokers = ["SLS_KAFKA_ENDPOINT"] ## プロデューサーがメッセージを送信する Kafka トピック topic = "SLS_LOGSTORE" routing_key = "content" ## CompressionCodec は、Kafka で認識されるさまざまな圧縮コーデックを表します。 ## 0 : 圧縮なし ## 1 : Gzip 圧縮 ## 2 : Snappy 圧縮 ## 3 : LZ4 圧縮 compression_codec = 1 ## オプションの TLS 構成 tls_ca = "/etc/ssl/certs/ca-bundle.crt" tls_cert = "/etc/ssl/certs/ca-certificates.crt" # tls_key = "/etc/telegraf/key.pem" ## TLS を使用するが、チェーンおよびホスト検証をスキップ # insecure_skip_verify = false ## オプションの SASL 構成 sasl_username = "SLS_PROJECT" sasl_password = "SLS_PASSWORD" ## 出力するデータ形式。 ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md data_format = "json"
例 4:Fluentd を用いたログのアップロード
Fluentd は、オープンソースのログコレクターであり、Apache 2 ライセンスのもとで Cloud Native Computing Foundation (CNCF) が開発したプロジェクトです。
Fluentd は、多様な入力、処理、出力プラグインをサポートしています。Kafka プラグインを使用してログを SLS へアップロードできます。Kafka プラグインをインストールおよび構成するだけで済みます。詳細については、「fluent-plugin-kafka」をご参照ください。
構成例
この例では出力形式を JSON に設定しています。他にも多数の形式がサポートされています。詳細については、「Fluentd Formatter」をご参照ください。
例に記載されている
SLS_で始まるパラメーターの詳細については、「構成」をご参照ください。
<match **> @type kafka2 brokers SLS_KAFKA_ENDPOINT default_topic SLS_LOGSTORE default_message_key content sasl_over_ssl true use_event_time true username SLS_PROJECT password "SLS_PASSWORD" ssl_ca_certs_from_system true # ruby-kafka プロデューサーのオプション max_send_retries 1000 required_acks 1 compression_codec gzip use_event_time true max_send_limit_bytes 2097152 <buffer hostlogs> flush_interval 10s </buffer> <format> @type json </format> </match>
例 5:Logstash を用いたログのアップロード
Logstash は、オープンソースのリアルタイムログ収集エンジンです。Logstash を使用して、さまざまなソースから動的にログを収集できます。
Kafka プロトコルを用いてログをアップロードするには、Logstash 7.10.1 以降を使用する必要があります。
Logstash には組み込みの Kafka 出力プラグインがあります。Logstash を構成して、Kafka プロトコル経由でログを SLS へアップロードします。SLS では SASL_SSL 接続プロトコルが使用されるため、SSL 証明書および JAAS ファイルも構成する必要があります。
構成例
この例では出力形式を JSON に設定しています。他にも多数の形式がサポートされています。詳細については、「Logstash Codec」をご参照ください。
説明この例は接続性テスト用の構成を示しています。本番環境では、stdout 出力構成を削除することを推奨します。
例に記載されている
SLS_で始まるパラメーターの詳細については、「構成」をご参照ください。
output { stdout { codec => rubydebug } kafka { topic_id => "SLS_LOGSTORE" bootstrap_servers => "SLS_KAFKA_ENDPOINT" security_protocol => "SASL_SSL" sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='SLS_PROJECT' password='SLS_PASSWORD';" sasl_mechanism => "PLAIN" codec => "json" client_id => "kafka-logstash" } }
例 6:Fluent-bit を用いたログのアップロード
Fluent-bit は、軽量かつ高度にスケーラブルなログおよびメトリクスのプロセッサおよびフォワーダーです。多様な入力、処理、出力プラグインをサポートしています。Kafka プラグインを使用してログを SLS へアップロードできます。詳細については、「Kafka 出力プラグイン」をご参照ください。
構成例
例に記載されている
SLS_で始まるパラメーターの詳細については、「構成」をご参照ください。[Output] Name kafka Match * Brokers SLS_KAFKA_ENDPOINT Topics SLS_LOGSTORE Format json rdkafka.sasl.username SLS_PROJECT rdkafka.sasl.password SLS_PASSWORD rdkafka.security.protocol SASL_SSL rdkafka.sasl.mechanism PLAIN
例 7:Kafka プロトコルを用いた Vector の構成
Vector は、軽量かつ高性能なログ処理ソフトウェアであり、Kafka プロトコルを用いたログレポートをサポートしています。以下では、Kafka 互換モードで Vector を構成して SLS へデータを書き込む方法について説明します。
構成例
例に記載されている
SLS_で始まるパラメーターの詳細については、「構成」をご参照ください。[sinks.aliyun_sls] type = "kafka" inputs = ["test_logs"] bootstrap_servers = "SLS_KAFKA_ENDPOINT" compression = "gzip" healthcheck = true topic = "SLS_LOGSTORE" encoding.codec = "json" sasl.enabled = true sasl.mechanism = "PLAIN" sasl.username = "SLS_PROJECT" sasl.password = "SLS_PASSWORD" tls.enabled = true
例 8:Kafka プロデューサーを用いたログのアップロード
Java
依存関係
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.1.0</version> </dependency>サンプルコード
package org.example; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class KafkaProduceExample { public static void main(String[] args) { // 構成情報。 Properties props = new Properties(); String project = "etl-shanghai-b"; String logstore = "testlog"; // プロデューサーの内容を JSON ログとして解析する場合は、このパラメーターを true に設定します。 boolean parseJson = false; // Alibaba Cloud アカウントはすべての API オペレーションに対して完全な権限を持ちます。これは高いセキュリティリスクを伴います。API オペレーションの呼び出しや日常的な運用・保守作業を行う際には、RAM ユーザーの作成および利用を推奨します。RAM ユーザーを作成するには、RAM コンソールにログインしてください。 // このセクションでは、AccessKey ID および AccessKey Secret を環境変数に保存する例を示します。必要に応じて、構成ファイルに保存することも可能です。 // セキュリティリスクを回避するため、コード内に AccessKey ID および AccessKey Secret を保存しないことを推奨します。 String accessKeyID = System.getenv("SLS_ACCESS_KEY_ID"); String accessKeySecret = System.getenv("SLS_ACCESS_KEY_SECRET"); String endpoint = "cn-shanghai.log.aliyuncs.com"; // このパラメーターは、プロジェクトのエンドポイントに基づいて設定してください。 String port = "10012"; // インターネットではポート 10012、内部ネットワークではポート 10011 を使用します。 String hosts = project + "." + endpoint + ":" + port; String topic = logstore; if(parseJson) { topic = topic + ".json"; } props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put("security.protocol", "sasl_ssl"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + project + "\" password=\"" + accessKeyID + "#" + accessKeySecret + "\";"); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false"); // プロデューサーインスタンスを作成します。 KafkaProducer<String,String> producer = new KafkaProducer<>(props); // レコードを送信します。 for(int i=0;i<1;i++){ String content = "{\"msg\": \"Hello World\"}"; // 必要に応じて、以下のメソッドを使用してメッセージのタイムスタンプを設定できます。 // long timestamp = System.currentTimeMillis(); // ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, timestamp, null, content); ProducerRecord<String, String> record = new ProducerRecord<>(topic, content); producer.send(record, (metadata, exception) -> { if (exception != null) { System.err.println("ERROR: メッセージの送信に失敗しました:" + exception.getMessage()); exception.printStackTrace(); } else { System.out.println("メッセージが正常に送信されました。トピック:" + metadata.topic() + "、パーティション:" + metadata.partition() + "、オフセット:" + metadata.offset() + "、タイムスタンプ:" + metadata.timestamp()); } }); } producer.close(); } }
Python
依存関係
pip install confluent-kafkaサンプルコード
#!/bin/env python3 import time import os from confluent_kafka import Producer def delivery_report(err, msg): """ 各メッセージの配信結果を示すために、メッセージごとに 1 回呼び出されます。 poll() または flush() によってトリガーされます。 """ if err is not None: print('メッセージの配信に失敗しました:{}'.format(err)) else: print('メッセージが {} [{}] へ配信されました。オフセット:{}'.format(msg.topic(), msg.partition(), msg.offset())) def main(): project = "etl-shanghai-b" logstore = "testlog" parse_json = False # 環境変数から認証情報を取得 access_key_id = os.getenv("SLS_ACCESS_KEY_ID") access_key_secret = os.getenv("SLS_ACCESS_KEY_SECRET") endpoint = "cn-shanghai.log.aliyuncs.com" port = "10012" # インターネットではポート 10012、内部ネットワークではポート 10011 を使用します。 hosts = f"{project}.{endpoint}:{port}" topic = logstore if parse_json: topic = topic + ".json" # Kafka プロデューサーを構成 conf = { 'bootstrap.servers': hosts, 'security.protocol': 'sasl_ssl', 'sasl.mechanisms': 'PLAIN', 'sasl.username': project, 'sasl.password': f"{access_key_id}#{access_key_secret}", 'enable.idempotence': False, } # プロデューサーインスタンスを作成 producer = Producer(conf) # メッセージを送信 content = "{\"msg\": \"Hello World\"}" producer.produce(topic=topic, value=content.encode('utf-8'), #timestamp=int(time.time() * 1000), # (オプション) レコードのタイムスタンプ(ミリ秒単位)を設定します。 callback=delivery_report) # 未送信のメッセージがすべて配信され、配信レポートのコールバックが呼び出されるのを待機します。 producer.flush() if __name__ == '__main__': main()
Golang
依存関係
go get github.com/confluentinc/confluent-kafka-go/kafkaサンプルコード
package main import ( "fmt" "log" "os" // "time" "github.com/confluentinc/confluent-kafka-go/kafka" ) func main() { project := "etl-shanghai-b" logstore := "testlog" parseJson := false // 環境変数から認証情報を取得 accessKeyID := os.Getenv("SLS_ACCESS_KEY_ID") accessKeySecret := os.Getenv("SLS_ACCESS_KEY_SECRET") endpoint := "cn-shanghai.log.aliyuncs.com" port := "10012" // インターネットではポート 10012、内部ネットワークではポート 10011 を使用します。 hosts := fmt.Sprintf("%s.%s:%s", project, endpoint, port) topic := logstore if parseJson { topic = topic + ".json" } // Kafka プロデューサーを構成 config := &kafka.ConfigMap{ "bootstrap.servers": hosts, "security.protocol": "sasl_ssl", "sasl.mechanisms": "PLAIN", "sasl.username": project, "sasl.password": accessKeyID + "#" + accessKeySecret, "enable.idempotence": false, } // プロデューサーインスタンスを作成 producer, err := kafka.NewProducer(config) if err != nil { log.Fatalf("プロデューサーの作成に失敗しました:%v", err) } defer producer.Close() // バッチでメッセージを送信 messages := []string{ "{\"msg\": \"Hello World 1\"}", "{\"msg\": \"Hello World 2\"}", "{\"msg\": \"Hello World 3\"}", } for _, content := range messages { err := producer.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(content), //Timestamp: time.Now(), // 必要に応じて時間を設定 }, nil) if err != nil { log.Printf("メッセージの送信に失敗しました:%v", err) } } // プロデューサーがメッセージを正常に送信したかどうかを監視するゴルーチンを有効化 go func() { for e := range producer.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("配信に失敗しました:%v\n", ev.TopicPartition.Error) } else { fmt.Printf("トピック %s [%d] へメッセージを配信しました。オフセット:%v\n", *ev.TopicPartition.Topic, ev.TopicPartition.Partition, ev.TopicPartition.Offset) } } } }() producer.Flush(5 * 1000) }
エラーメッセージ
Kafka プロトコルを用いたログのアップロードに失敗した場合、Kafka のエラーメッセージ形式でエラーメッセージが返されます。以下の表に、エラーメッセージを示します。Kafka プロトコルのエラーメッセージの詳細については、「エラー一覧」をご参照ください。
エラーメッセージ | 説明 | 推奨される解決策 |
NetworkException | ネットワークエラーが発生しました。 | 1 秒待ってから再試行してください。 |
TopicAuthorizationException |
認証に失敗しました。 |
このエラーは、指定された AccessKey ペアが無効であるか、指定されたプロジェクトまたは Logstore に対する書き込み権限を有していない場合に発生します。有効な AccessKey ペアおよび必要な書き込み権限を確認してください。 |
UnknownTopicOrPartitionException |
このエラーは、以下のいずれかの理由で発生する可能性があります:
|
指定されたプロジェクトおよび Logstore が存在することを確認してください。エラーが継続する場合は、プロジェクトのリージョンとエンドポイントのリージョンが一致していることを確認してください。 |
KafkaStorageException | サーバーで例外が発生しました。 | 1 秒待ってから再試行してください。 |