Kafka Producer SDK、Beats、Collectd、Fluentd、Logstash、Telegraf、Vector などのログ収集ツールを使用して、Kafka プロトコルで Simple Log Service (SLS) にログをアップロードします。このトピックでは、これらのツールから Kafka プロトコルを使用して SLS にログをアップロードする方法について説明します。
制限
サポートされている Kafka プロトコルの最も古いバージョンは 2.1.0 です。
安全なログ伝送を確保するには、SASL_SSL 接続プロトコルを使用する必要があります。
権限
次のいずれかの権限が必要です。
このポリシーは、SLS を管理する権限を付与します。権限の付与方法の詳細については、「RAM ユーザーへの権限付与」および「RAM ロールへの権限付与」をご参照ください。
カスタムポリシー
カスタムポリシーを作成するには、[スクリプトエディター] タブで、設定ボックス内の既存のコンテンツを次のスクリプトに置き換えます。詳細については、「カスタムポリシーの作成」をご参照ください。
説明スクリプトで、
project_nameを実際のプロジェクト名に置き換えます。{ "Version": "1", "Statement": [ { "Action": "log:GetProject", "Resource": "acs:log:*:*:project/project_name", "Effect": "Allow" }, { "Action": [ "log:GetLogStore", "log:ListShards", "log:PostLogStoreLogs" ], "Resource": "acs:log:*:*:project/project_name/logstore/*", "Effect": "Allow" } ] }作成したカスタムポリシーを RAM ユーザーにアタッチします。詳細については、「RAM ユーザーへの権限付与」をご参照ください。
構成
Kafka プロトコルを使用してログをアップロードする場合、次のパラメーターを設定する必要があります。
構成名 | 構成値 | 説明 | 例 |
SLS_KAFKA_ENDPOINT | 初期接続に使用されるクラスターのエンドポイント。フォーマットは |
| aliyun-project-test はプロジェクト名です。
|
SLS_PROJECT | プロジェクト名 | SLS プロジェクトの名前。 | aliyun-project-test |
SLS_LOGSTORE | Logstore 名 | Logstore の名前。 | たとえば、Logstore 名が
|
SLS_PASSWORD | SLS への書き込み権限を持つ AccessKey シークレット。 | AccessKey ペアとは何か、およびその作成方法については、「AccessKey ペアの作成」をご参照ください。 値は、
| LTAI****************#yourAccessKeySecret |
Kafka コンシューマーグループを使用して SLS からリアルタイムでデータを消費する場合は、チケットを送信。
例 1:Beats を使用してログをアップロードする
MetricBeat、PacketBeat、Winlogbeat、Auditbeat、Filebeat、Heartbeat などの Beats は、ログを収集し、Kafka プロトコルを使用して SLS にアップロードできます。
構成例
この例の
SLS_で始まるパラメーターの詳細については、「構成」をご参照ください。output.kafka: # クラスタメタデータを読み取るための初期ブローカー hosts: ["SLS_KAFKA_ENDPOINT"] username: "SLS_PROJECT" password: "SLS_PASSWORD" ssl.certificate_authorities: # メッセージトピックの選択 + パーティショニング topic: 'SLS_LOGSTORE' partition.round_robin: reachable_only: false required_acks: 1 compression: gzip max_message_bytes: 1000000
例 2: Collectd を使用したログのアップロード
Collectd は、システムとアプリケーションのパフォーマンスメトリックを定期的に収集し、Kafka プロトコルを使用してメトリックを SLS にアップロードするデーモンです。詳細については、「Write Kafka Plugin」をご参照ください。
Collectd によって収集されたログを SLS にアップロードする場合、Kafka プラグインとその依存関係もインストールする必要があります。たとえば、CentOS では、sudo yum install collectd-write_kafka コマンドを実行して Kafka プラグインをインストールします。RPM Package Manager (RPM) パッケージのインストール方法の詳細については、「Collectd-write_kafka」をご参照ください。
構成例
この例では、出力フォーマットは JSON に設定されています。Command や Graphite などの他のフォーマットもサポートされています。詳細については、「Collectd 構成ドキュメント」をご参照ください。
この例の
SLS_で始まるパラメーターの詳細については、「構成」をご参照ください。
LoadPlugin write_kafka <Plugin write_kafka> Property "metadata.broker.list" "SLS_KAFKA_ENDPOINT" Property "security.protocol" "sasl_ssl" Property "sasl.mechanism" "PLAIN" Property "sasl.username" "SLS_PROJECT" Property "sasl.password" "SLS_PASSWORD" Property "broker.address.family" "v4" <Topic "SLS_LOGSTORE"> Format JSON Key "content" </Topic> </Plugin>
例 3:Telegraf を使用してログをアップロードする
Telegraf は、少量のメモリを使用してデータメトリックを収集、処理、集約する Go ベースのエージェントプログラムです。Telegraf は、幅広いプラグインと統合機能を提供します。Telegraf を使用して、ホストシステムからさまざまなメトリックを取得したり、サードパーティ API からメトリックを取得したり、statsd および Kafka コンシューマーサービスを使用してメトリックをリッスンしたりします。
Kafka プロトコルを使用して Telegraf によって収集されたログを SLS にアップロードする前に、構成ファイルを変更する必要があります。
構成例
この例では、出力フォーマットは JSON に設定されています。Graphite や Carbon2 などの他のフォーマットもサポートされています。詳細については、「Telegraf 出力フォーマット」をご参照ください。
説明Telegraf で tls_ca に有効なパスを設定する必要があります。サーバーから提供されたルート証明書のパスを使用します。Linux 環境では、ルート CA 証明書のパスは通常 /etc/ssl/certs/ca-bundle.crt です。
この例の
SLS_で始まるパラメーターの詳細については、「構成」をご参照ください。
# Kafka 出力プラグインの構成 [[outputs.kafka]] ## Kafka ブローカーの URL brokers = ["SLS_KAFKA_ENDPOINT"] ## プロデューサーメッセージの Kafka トピック topic = "SLS_LOGSTORE" routing_key = "content" ## CompressionCodec は、メッセージで Kafka によって認識されるさまざまな圧縮コーデックを表します。 ## 0:圧縮なし ## 1:Gzip 圧縮 ## 2:Snappy 圧縮 ## 3:LZ4 圧縮 compression_codec = 1 ## オプションの TLS 構成 tls_ca = "/etc/ssl/certs/ca-bundle.crt" tls_cert = "/etc/ssl/certs/ca-certificates.crt" # tls_key = "/etc/telegraf/key.pem" ## TLS を使用しますが、チェーンとホストの検証はスキップします # insecure_skip_verify = false ## オプションの SASL 構成 sasl_username = "SLS_PROJECT" sasl_password = "SLS_PASSWORD" ## 出力するデータフォーマット。 ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md data_format = "json"
例 4:Fluentd を使用してログをアップロードする
Fluentd は、オープンソースのログコレクターであり、Apache 2 ライセンスの下で開発された Cloud Native Computing Foundation (CNCF) のプロジェクトです。
Fluentd は、さまざまな入力、処理、および出力プラグインをサポートしています。Kafka プラグインを使用して SLS にログをアップロードします。Kafka プラグインをインストールして構成するだけで済みます。詳細については、「fluent-plugin-kafka」をご参照ください。
構成例
この例では、出力フォーマットは JSON に設定されています。その他にも数十のフォーマットがサポートされています。詳細については、「Fluentd Formatter」をご参照ください。
この例の
SLS_で始まるパラメーターの詳細については、「構成」をご参照ください。
<match **> @type kafka2 brokers SLS_KAFKA_ENDPOINT default_topic SLS_LOGSTORE default_message_key content sasl_over_ssl true use_event_time true username SLS_PROJECT password "SLS_PASSWORD" ssl_ca_certs_from_system true # ruby-kafka プロデューサーオプション max_send_retries 1000 required_acks 1 compression_codec gzip use_event_time true max_send_limit_bytes 2097152 <buffer hostlogs> flush_interval 10s </buffer> <format> @type json </format> </match>
例 5:Logstash を使用してログをアップロードする
Logstash は、オープンソースのリアルタイムログ収集エンジンです。Logstash を使用して、さまざまなソースから動的にログを収集します。
Kafka プロトコルを使用してログをアップロードするには、Logstash 7.10.1 以降を使用する必要があります。
Logstash には、組み込みの Kafka 出力プラグインがあります。Kafka プロトコルを使用して SLS にログをアップロードするように Logstash を構成します。SLS は SASL_SSL 接続プロトコルを使用するため、SSL 証明書と JAAS ファイルも構成する必要があります。
構成例
この例では、出力フォーマットは JSON に設定されています。その他にも数十のフォーマットがサポートされています。詳細については、「Logstash Codec」をご参照ください。
説明この例は、接続性テストの構成を示しています。本番環境では stdout 出力構成を削除することをお勧めします。
この例の
SLS_で始まるパラメーターの詳細については、「構成」をご参照ください。
output { stdout { codec => rubydebug } // ネットワーク接続テスト用 kafka { topic_id => "SLS_LOGSTORE" bootstrap_servers => "SLS_KAFKA_ENDPOINT" security_protocol => "SASL_SSL" sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='SLS_PROJECT' password='SLS_PASSWORD';" // 認証情報 sasl_mechanism => "PLAIN" codec => "json" client_id => "kafka-logstash" } }
例 6: Fluent-bit を使用したログのアップロード
Fluent-bit は、軽量で拡張性の高いログおよびメトリックプロセッサー兼フォワーダーです。さまざまな入力、処理、および出力プラグインをサポートしています。Kafka プラグインを使用して SLS にログをアップロードします。詳細については、「Kafka 出力プラグイン」をご参照ください。
構成例
この例の
SLS_で始まるパラメーターの詳細については、「構成」をご参照ください。[Output] Name kafka Match * Brokers SLS_KAFKA_ENDPOINT Topics SLS_LOGSTORE Format json rdkafka.sasl.username SLS_PROJECT rdkafka.sasl.password SLS_PASSWORD rdkafka.security.protocol SASL_SSL rdkafka.sasl.mechanism PLAIN
例 7: Kafka プロトコルを使用してログをアップロードするように Vector を構成する
Vector は、Kafka プロトコルを使用したログレポートをサポートする、軽量でパフォーマンス専有型のログ処理ソフトウェアプログラムです。次のセクションでは、Kafka 互換モードで SLS にデータを書き込むように Vector を構成する方法について説明します。
構成例
この例の
SLS_で始まるパラメーターの詳細については、「構成」をご参照ください。[sinks.aliyun_sls] type = "kafka" inputs = ["test_logs"] bootstrap_servers = "SLS_KAFKA_ENDPOINT" compression = "gzip" healthcheck = true topic = "SLS_LOGSTORE" encoding.codec = "json" sasl.enabled = true sasl.mechanism = "PLAIN" sasl.username = "SLS_PROJECT" sasl.password = "SLS_PASSWORD" tls.enabled = true
例 8: Kafka プロデューサーを使用したログのアップロード
Java
依存関係
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.1.0</version> </dependency>サンプルコード
package org.example; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class KafkaProduceExample { public static void main(String[] args) { // 構成。 Properties props = new Properties(); String project = "etl-shanghai-b"; String logstore = "testlog"; // プロデューサーのコンテンツを JSON ログとして解析する場合は、このパラメーターを true に設定します。 boolean parseJson = false; // Alibaba Cloud アカウントは、すべての API 操作に対する完全な権限を持っています。これは高いセキュリティ脅威をもたらします。API 操作を呼び出したり、日常の O&M を実行したりするには、RAM ユーザーを作成して使用することをお勧めします。RAM ユーザーを作成するには、RAM コンソールにログインします。 // このセクションでは、AccessKey ID と AccessKey シークレットを環境変数に保存する方法の例を示します。必要に応じて、AccessKey ID と AccessKey シークレットを構成ファイルに保存することもできます。 // セキュリティリスクを防ぐため、コードに AccessKey ID と AccessKey シークレットを保存しないことをお勧めします。 String accessKeyID = System.getenv("SLS_ACCESS_KEY_ID"); String accessKeySecret = System.getenv("SLS_ACCESS_KEY_SECRET"); String endpoint = "cn-shanghai.log.aliyuncs.com"; // このパラメーターは、プロジェクトのエンドポイントに基づいて設定します。 String port = "10012"; // インターネットにはポート 10012 を、内部ネットワークにはポート 10011 を使用します。 String hosts = project + "." + endpoint + ":" + port; String topic = logstore; if(parseJson) { topic = topic + ".json"; } props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put("security.protocol", "sasl_ssl"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + project + "\" password=\"" + accessKeyID + "#" + accessKeySecret + "\";"); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false"); //プロデューサーインスタンスを作成します。 KafkaProducer<String,String> producer = new KafkaProducer<>(props); //レコードを送信します。 for(int i=0;i<1;i++){ String content = "{\"msg\": \"Hello World\"}"; // 必要に応じて、次のメソッドを使用してメッセージのタイムスタンプを設定します。 // long timestamp = System.currentTimeMillis(); // ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, timestamp, null, content); ProducerRecord<String, String> record = new ProducerRecord<>(topic, content); producer.send(record, (metadata, exception) -> { if (exception != null) { System.err.println("ERROR: Failed to send message: " + exception.getMessage()); exception.printStackTrace(); } else { System.out.println("Message sent successfully to topic: " + metadata.topic() + ", partition: " + metadata.partition() + ", offset: " + metadata.offset() + ", timestamp: " + metadata.timestamp()); } }); } producer.close(); } }
Python
依存関係
pip install confluent-kafkaサンプルコード
#!/bin/env python3 import time import os from confluent_kafka import Producer def delivery_report(err, msg): """ 生成された各メッセージに対して一度呼び出され、配信結果を示します。 poll() または flush() によってトリガーされます。""" if err is not None: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}] at offset {}'.format(msg.topic(), msg.partition(), msg.offset())) def main(): project = "etl-shanghai-b" logstore = "testlog" parse_json = False # 環境変数から認証情報を取得します access_key_id = os.getenv("SLS_ACCESS_KEY_ID") access_key_secret = os.getenv("SLS_ACCESS_KEY_SECRET") endpoint = "cn-shanghai.log.aliyuncs.com" port = "10012" # インターネットにはポート 10012 を、内部ネットワークにはポート 10011 を使用します。 hosts = f"{project}.{endpoint}:{port}" topic = logstore if parse_json: topic = topic + ".json" # Kafka プロデューサーを構成します conf = { 'bootstrap.servers': hosts, 'security.protocol': 'sasl_ssl', 'sasl.mechanisms': 'PLAIN', 'sasl.username': project, 'sasl.password': f"{access_key_id}#{access_key_secret}", 'enable.idempotence': False, } # プロデューサーインスタンスを作成します producer = Producer(conf) # メッセージを送信します content = "{\"msg\": \"Hello World\"}" producer.produce(topic=topic, value=content.encode('utf-8'), #timestamp=int(time.time() * 1000), # (オプション) レコードのタイムスタンプをミリ秒単位で設定します。 callback=delivery_report) # 未処理のメッセージが配信され、配信レポートの # コールバックがトリガーされるのを待ちます。 producer.flush() if __name__ == '__main__': main()
Golang
依存関係
go get github.com/confluentinc/confluent-kafka-go/kafkaサンプルコード
package main import ( "fmt" "log" "os" // "time" "github.com/confluentinc/confluent-kafka-go/kafka" ) func main() { project := "etl-shanghai-b" logstore := "testlog" parseJson := false // 環境変数から認証情報を取得します accessKeyID := os.Getenv("SLS_ACCESS_KEY_ID") accessKeySecret := os.Getenv("SLS_ACCESS_KEY_SECRET") endpoint := "cn-shanghai.log.aliyuncs.com" port := "10012" // インターネットにはポート 10012 を、内部ネットワークにはポート 10011 を使用します。 hosts := fmt.Sprintf("%s.%s:%s", project, endpoint, port) topic := logstore if parseJson { topic = topic + ".json" } // Kafka プロデューサーを構成します config := &kafka.ConfigMap{ "bootstrap.servers": hosts, "security.protocol": "sasl_ssl", "sasl.mechanisms": "PLAIN", "sasl.username": project, "sasl.password": accessKeyID + "#" + accessKeySecret, "enable.idempotence": false, } // プロデューサーインスタンスを作成します producer, err := kafka.NewProducer(config) if err != nil { log.Fatalf("Failed to create producer: %v", err) } defer producer.Close() // メッセージをバッチで送信します。 messages := []string{ "{\"msg\": \"Hello World 1\"}", "{\"msg\": \"Hello World 2\"}", "{\"msg\": \"Hello World 3\"}", } for _, content := range messages { err := producer.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(content), //Timestamp: time.Now(), // 必要に応じて時間を設定します。 }, nil) if err != nil { log.Printf("Failed to produce message: %v", err) } } // プロデューサーがメッセージを正常に送信したかどうかをリッスンするゴルーチンを有効にします。 go func() { for e := range producer.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n", ev.TopicPartition.Error) } else { fmt.Printf("Delivered message to topic %s [%d] at offset %v\n", *ev.TopicPartition.Topic, ev.TopicPartition.Partition, ev.TopicPartition.Offset) } } } }() producer.Flush(5 * 1000) }
エラーメッセージ
Kafka プロトコルを使用してログのアップロードに失敗した場合、Kafka エラーメッセージフォーマットでエラーメッセージが返されます。次の表にエラーメッセージを示します。Kafka プロトコルのエラーメッセージの詳細については、「エラーリスト」をご参照ください。
エラーメッセージ | 説明 | 推奨される解決策 |
NetworkException | ネットワークエラーが発生しました。 | 1 秒待ってからリトライしてください。 |
TopicAuthorizationException | 認証に失敗しました。 | このエラーは通常、無効な AccessKey ペア、または指定されたプロジェクトまたは Logstore にデータを書き込む権限が不十分なことが原因で発生します。書き込み権限を持つ有効な AccessKey ペアを指定してください。 |
UnknownTopicOrPartitionException | このエラーは、次のいずれかの理由で発生する可能性があります:
| 指定されたプロジェクトと Logstore が存在することを確認してください。エラーが解決しない場合は、プロジェクトのリージョンがエンドポイントで指定されたリージョンと同じであるかどうかを確認してください。 |
KafkaStorageException | サーバーで例外が発生しました。 | 1 秒待ってからリトライしてください。 |