すべてのプロダクト
Search
ドキュメントセンター

Simple Log Service:Kafka プロトコルを使用したログのアップロード

最終更新日:Oct 30, 2025

Kafka Producer SDK、Beats、Collectd、Fluentd、Logstash、Telegraf、Vector などのログ収集ツールを使用して、Kafka プロトコルで Simple Log Service (SLS) にログをアップロードします。このトピックでは、これらのツールから Kafka プロトコルを使用して SLS にログをアップロードする方法について説明します。

制限

  • サポートされている Kafka プロトコルの最も古いバージョンは 2.1.0 です。

  • 安全なログ伝送を確保するには、SASL_SSL 接続プロトコルを使用する必要があります。

権限

次のいずれかの権限が必要です。

  • AliyunLogFullAccess

    このポリシーは、SLS を管理する権限を付与します。権限の付与方法の詳細については、「RAM ユーザーへの権限付与」および「RAM ロールへの権限付与」をご参照ください。

  • カスタムポリシー

    1. カスタムポリシーを作成するには、[スクリプトエディター] タブで、設定ボックス内の既存のコンテンツを次のスクリプトに置き換えます。詳細については、「カスタムポリシーの作成」をご参照ください。

      説明

      スクリプトで、project_name を実際のプロジェクト名に置き換えます。

      {
          "Version": "1",
          "Statement": [
              {
                  "Action": "log:GetProject",
                  "Resource": "acs:log:*:*:project/project_name",
                  "Effect": "Allow"
              },
              {
                  "Action": [
                      "log:GetLogStore",
                      "log:ListShards",
                      "log:PostLogStoreLogs"
                  ],
                  "Resource": "acs:log:*:*:project/project_name/logstore/*",
                  "Effect": "Allow"
              }
          ]
      }
    2. 作成したカスタムポリシーを RAM ユーザーにアタッチします。詳細については、「RAM ユーザーへの権限付与」をご参照ください。

構成

Kafka プロトコルを使用してログをアップロードする場合、次のパラメーターを設定する必要があります。

構成名

構成値

説明

SLS_KAFKA_ENDPOINT

初期接続に使用されるクラスターのエンドポイント。フォーマットは Project name.Endpoint:Port です。このパラメーターは、プロジェクトのエンドポイントに基づいて設定する必要があります。詳細については、「エンドポイント」をご参照ください。

  • 内部ネットワーク: ポート番号は 10011 です。例: Project name.cn-hangzhou-intranet.log.aliyuncs.com:10011

  • インターネット: ポート番号は 10012 です。例: Project name.cn-hangzhou.log.aliyuncs.com:10012

aliyun-project-test はプロジェクト名です。cn-hangzhou-xxx.aliyuncs.comエンドポイント です。1001110012 は、内部ネットワークとインターネットのポート番号です。

  • 内部ネットワーク: aliyun-project-test.cn-hangzhou-intranet.log.aliyuncs.com:10011

  • インターネット: aliyun-project-test.cn-hangzhou.log.aliyuncs.com:10012

SLS_PROJECT

プロジェクト名

SLS プロジェクトの名前。

aliyun-project-test

SLS_LOGSTORE

Logstore 名

Logstore の名前。.json を Logstore 名に追加すると、SLS はログを JSON ログとして解析しようとします。

たとえば、Logstore 名が test-logstore の場合です。

  • 値を test-logstore に設定すると、アップロードされたログコンテンツは content フィールドに保存されます。

  • 値を test-logstore.json に設定すると、アップロードされたログコンテンツは JSON ログとして解析されます。JSON データの第 1 層のキーがフィールド名として使用され、対応する値がフィールド値として使用されます。

SLS_PASSWORD

SLS への書き込み権限を持つ AccessKey シークレット。

AccessKey ペアとは何か、およびその作成方法については、「AccessKey ペアの作成」をご参照ください。

値は、# 記号で区切られた AccessKey ID と AccessKey シークレットで構成されます。

  • AccessKey ID: Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID

  • AccessKey secret: Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey secret

LTAI****************#yourAccessKeySecret

説明

Kafka コンシューマーグループを使用して SLS からリアルタイムでデータを消費する場合は、チケットを送信

例 1:Beats を使用してログをアップロードする

MetricBeat、PacketBeat、Winlogbeat、Auditbeat、Filebeat、Heartbeat などの Beats は、ログを収集し、Kafka プロトコルを使用して SLS にアップロードできます。

  • 構成例

    この例の SLS_ で始まるパラメーターの詳細については、「構成」をご参照ください。

    output.kafka:
      # クラスタメタデータを読み取るための初期ブローカー
      hosts: ["SLS_KAFKA_ENDPOINT"]
      username: "SLS_PROJECT"
      password: "SLS_PASSWORD"
      ssl.certificate_authorities:
      # メッセージトピックの選択 + パーティショニング
      topic: 'SLS_LOGSTORE'
      partition.round_robin:
        reachable_only: false
    
      required_acks: 1
      compression: gzip
      max_message_bytes: 1000000

例 2: Collectd を使用したログのアップロード

Collectd は、システムとアプリケーションのパフォーマンスメトリックを定期的に収集し、Kafka プロトコルを使用してメトリックを SLS にアップロードするデーモンです。詳細については、「Write Kafka Plugin」をご参照ください。

Collectd によって収集されたログを SLS にアップロードする場合、Kafka プラグインとその依存関係もインストールする必要があります。たとえば、CentOS では、sudo yum install collectd-write_kafka コマンドを実行して Kafka プラグインをインストールします。RPM Package Manager (RPM) パッケージのインストール方法の詳細については、「Collectd-write_kafka」をご参照ください。

  • 構成例

    • この例では、出力フォーマットは JSON に設定されています。Command や Graphite などの他のフォーマットもサポートされています。詳細については、「Collectd 構成ドキュメント」をご参照ください。

    • この例の SLS_ で始まるパラメーターの詳細については、「構成」をご参照ください。

    LoadPlugin write_kafka
    
    <Plugin write_kafka>
      Property "metadata.broker.list" "SLS_KAFKA_ENDPOINT"
      Property "security.protocol" "sasl_ssl"
      Property "sasl.mechanism" "PLAIN"
      Property "sasl.username" "SLS_PROJECT"
      Property "sasl.password" "SLS_PASSWORD"
      Property "broker.address.family" "v4"
      <Topic "SLS_LOGSTORE">
        Format JSON
        Key "content"
      </Topic>
    </Plugin>

例 3:Telegraf を使用してログをアップロードする

Telegraf は、少量のメモリを使用してデータメトリックを収集、処理、集約する Go ベースのエージェントプログラムです。Telegraf は、幅広いプラグインと統合機能を提供します。Telegraf を使用して、ホストシステムからさまざまなメトリックを取得したり、サードパーティ API からメトリックを取得したり、statsd および Kafka コンシューマーサービスを使用してメトリックをリッスンしたりします。

Kafka プロトコルを使用して Telegraf によって収集されたログを SLS にアップロードする前に、構成ファイルを変更する必要があります。

  • 構成例

    • この例では、出力フォーマットは JSON に設定されています。Graphite や Carbon2 などの他のフォーマットもサポートされています。詳細については、「Telegraf 出力フォーマット」をご参照ください。

      説明

      Telegraf で tls_ca に有効なパスを設定する必要があります。サーバーから提供されたルート証明書のパスを使用します。Linux 環境では、ルート CA 証明書のパスは通常 /etc/ssl/certs/ca-bundle.crt です。

    • この例の SLS_ で始まるパラメーターの詳細については、「構成」をご参照ください。

    # Kafka 出力プラグインの構成
    [[outputs.kafka]]
      ## Kafka ブローカーの URL
      brokers = ["SLS_KAFKA_ENDPOINT"]
      ## プロデューサーメッセージの Kafka トピック
      topic = "SLS_LOGSTORE"
      routing_key = "content"
      ## CompressionCodec は、メッセージで Kafka によって認識されるさまざまな圧縮コーデックを表します。
      ## 0:圧縮なし
      ## 1:Gzip 圧縮
      ## 2:Snappy 圧縮
      ## 3:LZ4 圧縮
      compression_codec = 1
      ## オプションの TLS 構成 tls_ca = "/etc/ssl/certs/ca-bundle.crt"
      tls_cert = "/etc/ssl/certs/ca-certificates.crt" # tls_key = "/etc/telegraf/key.pem"
      ## TLS を使用しますが、チェーンとホストの検証はスキップします
      # insecure_skip_verify = false
      ## オプションの SASL 構成
      sasl_username = "SLS_PROJECT"
      sasl_password = "SLS_PASSWORD"
      ## 出力するデータフォーマット。
      ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
      data_format = "json"

例 4:Fluentd を使用してログをアップロードする

Fluentd は、オープンソースのログコレクターであり、Apache 2 ライセンスの下で開発された Cloud Native Computing Foundation (CNCF) のプロジェクトです。

Fluentd は、さまざまな入力、処理、および出力プラグインをサポートしています。Kafka プラグインを使用して SLS にログをアップロードします。Kafka プラグインをインストールして構成するだけで済みます。詳細については、「fluent-plugin-kafka」をご参照ください。

  • 構成例

    • この例では、出力フォーマットは JSON に設定されています。その他にも数十のフォーマットがサポートされています。詳細については、「Fluentd Formatter」をご参照ください。

    • この例の SLS_ で始まるパラメーターの詳細については、「構成」をご参照ください。

    <match **>
      @type kafka2
      brokers     SLS_KAFKA_ENDPOINT
      default_topic SLS_LOGSTORE
      default_message_key content
      sasl_over_ssl true
      use_event_time true
      username SLS_PROJECT
      password "SLS_PASSWORD"
      ssl_ca_certs_from_system true
      # ruby-kafka プロデューサーオプション
      max_send_retries 1000
      required_acks 1
      compression_codec gzip
      use_event_time true
      max_send_limit_bytes 2097152
    
      <buffer hostlogs>
        flush_interval 10s
      </buffer>
      <format>
        @type json
      </format>
    </match>

例 5:Logstash を使用してログをアップロードする

Logstash は、オープンソースのリアルタイムログ収集エンジンです。Logstash を使用して、さまざまなソースから動的にログを収集します。

説明

Kafka プロトコルを使用してログをアップロードするには、Logstash 7.10.1 以降を使用する必要があります。

Logstash には、組み込みの Kafka 出力プラグインがあります。Kafka プロトコルを使用して SLS にログをアップロードするように Logstash を構成します。SLS は SASL_SSL 接続プロトコルを使用するため、SSL 証明書と JAAS ファイルも構成する必要があります。

  • 構成例

    • この例では、出力フォーマットは JSON に設定されています。その他にも数十のフォーマットがサポートされています。詳細については、「Logstash Codec」をご参照ください。

      説明

      この例は、接続性テストの構成を示しています。本番環境では stdout 出力構成を削除することをお勧めします。

    • この例の SLS_ で始まるパラメーターの詳細については、「構成」をご参照ください。

    
    output {
      stdout { codec => rubydebug } // ネットワーク接続テスト用
      kafka {
        topic_id => "SLS_LOGSTORE"
        bootstrap_servers => "SLS_KAFKA_ENDPOINT"
        security_protocol => "SASL_SSL"
        sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='SLS_PROJECT' password='SLS_PASSWORD';" // 認証情報
        sasl_mechanism => "PLAIN"
        codec => "json"
        client_id => "kafka-logstash"
      }
    }

例 6: Fluent-bit を使用したログのアップロード

Fluent-bit は、軽量で拡張性の高いログおよびメトリックプロセッサー兼フォワーダーです。さまざまな入力、処理、および出力プラグインをサポートしています。Kafka プラグインを使用して SLS にログをアップロードします。詳細については、「Kafka 出力プラグイン」をご参照ください。

  • 構成例

    この例の SLS_ で始まるパラメーターの詳細については、「構成」をご参照ください。

    [Output]
        Name    kafka
        Match    *
        Brokers   SLS_KAFKA_ENDPOINT
        Topics    SLS_LOGSTORE
        Format    json
        rdkafka.sasl.username   SLS_PROJECT
        rdkafka.sasl.password   SLS_PASSWORD
        rdkafka.security.protocol   SASL_SSL
        rdkafka.sasl.mechanism      PLAIN

例 7: Kafka プロトコルを使用してログをアップロードするように Vector を構成する

Vector は、Kafka プロトコルを使用したログレポートをサポートする、軽量でパフォーマンス専有型のログ処理ソフトウェアプログラムです。次のセクションでは、Kafka 互換モードで SLS にデータを書き込むように Vector を構成する方法について説明します。

  • 構成例

    この例の SLS_ で始まるパラメーターの詳細については、「構成」をご参照ください。

    [sinks.aliyun_sls]
      type = "kafka"
      inputs = ["test_logs"]
      bootstrap_servers = "SLS_KAFKA_ENDPOINT"
      compression = "gzip"
      healthcheck = true
      topic = "SLS_LOGSTORE"
      encoding.codec = "json"
      sasl.enabled = true
      sasl.mechanism = "PLAIN"
      sasl.username = "SLS_PROJECT"
      sasl.password = "SLS_PASSWORD"
      tls.enabled = true

例 8: Kafka プロデューサーを使用したログのアップロード

Java

  • 依存関係

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.1.0</version>
    </dependency>
  • サンプルコード

    package org.example;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    public class KafkaProduceExample {
    
        public static void main(String[] args) {
            // 構成。
            Properties props = new Properties();
            String project = "etl-shanghai-b";
            String logstore = "testlog";
            // プロデューサーのコンテンツを JSON ログとして解析する場合は、このパラメーターを true に設定します。
            boolean parseJson = false;
            // Alibaba Cloud アカウントは、すべての API 操作に対する完全な権限を持っています。これは高いセキュリティ脅威をもたらします。API 操作を呼び出したり、日常の O&M を実行したりするには、RAM ユーザーを作成して使用することをお勧めします。RAM ユーザーを作成するには、RAM コンソールにログインします。
            // このセクションでは、AccessKey ID と AccessKey シークレットを環境変数に保存する方法の例を示します。必要に応じて、AccessKey ID と AccessKey シークレットを構成ファイルに保存することもできます。
            // セキュリティリスクを防ぐため、コードに AccessKey ID と AccessKey シークレットを保存しないことをお勧めします。
            String accessKeyID = System.getenv("SLS_ACCESS_KEY_ID");
            String accessKeySecret = System.getenv("SLS_ACCESS_KEY_SECRET");
            String endpoint = "cn-shanghai.log.aliyuncs.com"; // このパラメーターは、プロジェクトのエンドポイントに基づいて設定します。
            String port = "10012"; // インターネットにはポート 10012 を、内部ネットワークにはポート 10011 を使用します。
    
            String hosts = project + "." + endpoint + ":" + port;
            String topic = logstore;
            if(parseJson) {
                topic = topic + ".json";
            }
    
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put("security.protocol", "sasl_ssl");
            props.put("sasl.mechanism", "PLAIN");
            props.put("sasl.jaas.config",
                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" +
                            project + "\" password=\"" + accessKeyID + "#" + accessKeySecret + "\";");
            props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
    
            //プロデューサーインスタンスを作成します。
            KafkaProducer<String,String> producer = new KafkaProducer<>(props);
    
            //レコードを送信します。
            for(int i=0;i<1;i++){
                String content = "{\"msg\": \"Hello World\"}";
                // 必要に応じて、次のメソッドを使用してメッセージのタイムスタンプを設定します。
                // long timestamp = System.currentTimeMillis();
                // ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, timestamp, null, content);
    
                ProducerRecord<String, String> record = new ProducerRecord<>(topic, content);
                producer.send(record, (metadata, exception) -> {
                    if (exception != null) {
                        System.err.println("ERROR: Failed to send message: " + exception.getMessage());
                        exception.printStackTrace();
                    } else {
                        System.out.println("Message sent successfully to topic: " + metadata.topic() +
                                         ", partition: " + metadata.partition() +
                                         ", offset: " + metadata.offset() +
                                         ", timestamp: " + metadata.timestamp());
                    }
                });
            }
    
            producer.close();
        }
    }
    

Python

  • 依存関係

    pip install confluent-kafka
  • サンプルコード

    #!/bin/env python3
    import time
    import os
    from confluent_kafka import Producer
    
    def delivery_report(err, msg):
        """ 生成された各メッセージに対して一度呼び出され、配信結果を示します。
            poll() または flush() によってトリガーされます。"""
        if err is not None:
            print('Message delivery failed: {}'.format(err))
        else:
            print('Message delivered to {} [{}] at offset {}'.format(msg.topic(), msg.partition(), msg.offset()))
    
    def main():
        project = "etl-shanghai-b"
        logstore = "testlog"
        parse_json = False
    
        # 環境変数から認証情報を取得します
        access_key_id = os.getenv("SLS_ACCESS_KEY_ID")
        access_key_secret = os.getenv("SLS_ACCESS_KEY_SECRET")
        endpoint = "cn-shanghai.log.aliyuncs.com"
        port = "10012" # インターネットにはポート 10012 を、内部ネットワークにはポート 10011 を使用します。
    
        hosts = f"{project}.{endpoint}:{port}"
        topic = logstore
        if parse_json:
            topic = topic + ".json"
    
        # Kafka プロデューサーを構成します
        conf = {
            'bootstrap.servers': hosts,
            'security.protocol': 'sasl_ssl',
            'sasl.mechanisms': 'PLAIN',
            'sasl.username': project,
            'sasl.password': f"{access_key_id}#{access_key_secret}",
            'enable.idempotence': False,
        }
    
        # プロデューサーインスタンスを作成します
        producer = Producer(conf)
    
        # メッセージを送信します
        content = "{\"msg\": \"Hello World\"}"
        producer.produce(topic=topic,
                         value=content.encode('utf-8'),
                         #timestamp=int(time.time() * 1000),  # (オプション) レコードのタイムスタンプをミリ秒単位で設定します。
                         callback=delivery_report)
    
        # 未処理のメッセージが配信され、配信レポートの
        # コールバックがトリガーされるのを待ちます。
        producer.flush()
    
    if __name__ == '__main__':
        main()
    

Golang

  • 依存関係

    go get github.com/confluentinc/confluent-kafka-go/kafka
  • サンプルコード

    package main
    
    import (
    	"fmt"
    	"log"
    	"os"
    //	"time"
    
    	"github.com/confluentinc/confluent-kafka-go/kafka"
    )
    
    func main() {
    	project := "etl-shanghai-b"
    	logstore := "testlog"
    	parseJson := false
    
    	// 環境変数から認証情報を取得します
    	accessKeyID := os.Getenv("SLS_ACCESS_KEY_ID")
    	accessKeySecret := os.Getenv("SLS_ACCESS_KEY_SECRET")
    	endpoint := "cn-shanghai.log.aliyuncs.com"
    	port := "10012" // インターネットにはポート 10012 を、内部ネットワークにはポート 10011 を使用します。
    
    	hosts := fmt.Sprintf("%s.%s:%s", project, endpoint, port)
    	topic := logstore
    	if parseJson {
    		topic = topic + ".json"
    	}
    
    	// Kafka プロデューサーを構成します
    	config := &kafka.ConfigMap{
    		"bootstrap.servers":  hosts,
    		"security.protocol":  "sasl_ssl",
    		"sasl.mechanisms":    "PLAIN",
    		"sasl.username":      project,
    		"sasl.password":      accessKeyID + "#" + accessKeySecret,
    		"enable.idempotence": false,
    	}
    
    	// プロデューサーインスタンスを作成します
    	producer, err := kafka.NewProducer(config)
    	if err != nil {
    		log.Fatalf("Failed to create producer: %v", err)
    	}
    	defer producer.Close()
    
    	// メッセージをバッチで送信します。
    	messages := []string{
    		"{\"msg\": \"Hello World 1\"}",
    		"{\"msg\": \"Hello World 2\"}",
    		"{\"msg\": \"Hello World 3\"}",
    	}
    
    	for _, content := range messages {
    		err := producer.Produce(&kafka.Message{
    			TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
    			Value:          []byte(content),
    			//Timestamp:      time.Now(), // 必要に応じて時間を設定します。
    		}, nil)
    
    		if err != nil {
    			log.Printf("Failed to produce message: %v", err)
    		}
    	}
    
    	// プロデューサーがメッセージを正常に送信したかどうかをリッスンするゴルーチンを有効にします。
    	go func() {
    		for e := range producer.Events() {
    			switch ev := e.(type) {
    			case *kafka.Message:
    				if ev.TopicPartition.Error != nil {
    					fmt.Printf("Delivery failed: %v\n", ev.TopicPartition.Error)
    				} else {
    					fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
    						*ev.TopicPartition.Topic, ev.TopicPartition.Partition, ev.TopicPartition.Offset)
    				}
    			}
    		}
    	}()
    
    	producer.Flush(5 * 1000)
    }

エラーメッセージ

Kafka プロトコルを使用してログのアップロードに失敗した場合、Kafka エラーメッセージフォーマットでエラーメッセージが返されます。次の表にエラーメッセージを示します。Kafka プロトコルのエラーメッセージの詳細については、「エラーリスト」をご参照ください。

エラーメッセージ

説明

推奨される解決策

NetworkException

ネットワークエラーが発生しました。

1 秒待ってからリトライしてください。

TopicAuthorizationException

認証に失敗しました。

このエラーは通常、無効な AccessKey ペア、または指定されたプロジェクトまたは Logstore にデータを書き込む権限が不十分なことが原因で発生します。書き込み権限を持つ有効な AccessKey ペアを指定してください。

UnknownTopicOrPartitionException

このエラーは、次のいずれかの理由で発生する可能性があります:

  • 指定されたプロジェクトまたは Logstore が存在しない。

  • プロジェクトのリージョンがエンドポイントで指定されたリージョンと異なる。

指定されたプロジェクトと Logstore が存在することを確認してください。エラーが解決しない場合は、プロジェクトのリージョンがエンドポイントで指定されたリージョンと同じであるかどうかを確認してください。

KafkaStorageException

サーバーで例外が発生しました。

1 秒待ってからリトライしてください。