すべてのプロダクト
Search
ドキュメントセンター

Simple Log Service:Kafka プロトコルを用いたログのアップロード

最終更新日:Mar 26, 2026

Kafka Producer SDK、Beats、Collectd、Fluentd、Logstash、Telegraf、Vector などのログ収集ツールを使用して、Kafka プロトコル経由で Simple Log Service (SLS) へログをアップロードします。本トピックでは、これらのツールから SLS へ Kafka プロトコルを用いてログをアップロードする方法について説明します。

使用制限

  • サポートされる Kafka プロトコルの最も古いバージョンは 2.1.0 です。

  • 安全なログ送信を保証するため、SASL_SSL 接続プロトコルを使用する必要があります。

権限

以下のいずれかの権限が必要です。

  • AliyunLogFullAccess

    このポリシーは、SLS の管理権限を付与します。権限の付与方法については、「RAM ユーザーへの権限付与」および「RAM ロールの権限管理」をご参照ください。

  • カスタムポリシー

    1. カスタムポリシーを作成するには、スクリプトエディター タブで、構成ボックス内の既存の内容を以下のスクリプトに置き換えます。詳細については、「カスタムポリシーの作成」をご参照ください。

      説明

      スクリプト内で、project_name を実際のプロジェクト名に置き換えてください。

      {
          "Version": "1",
          "Statement": [
              {
                  "Action": "log:GetProject",
                  "Resource": "acs:log:*:*:project/project_name",
                  "Effect": "Allow"
              },
              {
                  "Action": [
                      "log:GetLogStore",
                      "log:ListShards",
                      "log:PostLogStoreLogs"
                  ],
                  "Resource": "acs:log:*:*:project/project_name/logstore/*",
                  "Effect": "Allow"
              }
          ]
      }
    2. 作成したカスタムポリシーを RAM ユーザーにアタッチします。詳細については、「RAM ユーザーへの権限付与」をご参照ください。

構成

Kafka プロトコルを用いてログをアップロードする場合、以下のパラメーターを設定する必要があります。

構成名

構成値

説明

SLS_KAFKA_ENDPOINT

初期接続に使用するクラスターのエンドポイント。形式は プロジェクト名.エンドポイント:ポート です。このパラメーターは、プロジェクトのエンドポイントに基づいて設定する必要があります。詳細については、「エンドポイント」をご参照ください。

  • 内部ネットワーク:ポート番号は 10011 です。例:プロジェクト名.cn-hangzhou-intranet.log.aliyuncs.com:10011

  • インターネット:ポート番号は 10012 です。例:プロジェクト名.cn-hangzhou.log.aliyuncs.com:10012

aliyun-project-test はプロジェクト名、cn-hangzhou-xxx.aliyuncs.comエンドポイント10011 および 10012 はそれぞれ内部ネットワークおよびインターネット向けのポート番号です。

  • 内部ネットワーク:aliyun-project-test.cn-hangzhou-intranet.log.aliyuncs.com:10011

  • インターネット:aliyun-project-test.cn-hangzhou.log.aliyuncs.com:10012

SLS_PROJECT

プロジェクト名

SLS プロジェクトの名称です。

aliyun-project-test

SLS_LOGSTORE

Logstore 名

SLS の Logstore の名称です。Logstore 名の末尾に .json サフィックスを追加すると、SLS はログを JSON ログとして解析しようと試みます。

たとえば、Logstore 名が test-logstore の場合:

  • test-logstore を指定した場合、アップロードされたログの内容は content フィールドに格納されます。

  • test-logstore.json を指定した場合、アップロードされたログの内容は JSON ログとして解析されます。JSON データの最上位レイヤーのキーがフィールド名として使用され、対応する値がフィールド値として使用されます。

SLS_PASSWORD

SLS への書き込み権限を持つ AccessKey Secret です。

AccessKey ペアとは何か、およびその作成方法については、「AccessKey ペアの作成」をご参照ください。

値は、AccessKey ID と AccessKey Secret を # 記号で区切ったものです。

  • AccessKey ID:Alibaba Cloud アカウントまたは RAM ユーザーのAccessKey ID です。

  • AccessKey Secret:Alibaba Cloud アカウントまたは RAM ユーザーのAccessKey Secret です。

LTAI****************#yourAccessKeySecret

説明

Kafka コンシューマーグループを用いて SLS からリアルタイムでデータを消費したい場合は、チケットを送信し、Alibaba Cloud テクニカルサポートまでお問い合わせください。

例 1:Beats を用いたログのアップロード

MetricBeat、PacketBeat、Winlogbeat、Auditbeat、Filebeat、Heartbeat などの Beats は、ログを収集し、Kafka プロトコル経由で SLS へアップロードできます。

  • 構成例

    例に記載されている SLS_ で始まるパラメーターの詳細については、「構成」をご参照ください。

    output.kafka:
      # クラスターメタデータ読み取りのための初期ブローカー
      hosts: ["SLS_KAFKA_ENDPOINT"]
      username: "SLS_PROJECT"
      password: "SLS_PASSWORD"
      ssl.certificate_authorities:
      # メッセージトピック選択 + パーティショニング
      topic: 'SLS_LOGSTORE'
      partition.round_robin:
        reachable_only: false
    
      required_acks: 1
      compression: gzip
      max_message_bytes: 1000000

例 2:Collectd を用いたログのアップロード

Collectd は、システムおよびアプリケーションのパフォーマンスメトリクスを定期的に収集し、Kafka プロトコル経由で SLS へアップロードするデーモンです。詳細については、「Write Kafka Plugin」をご参照ください。

Collectd で収集したログを SLS へアップロードするには、Kafka プラグインおよびその依存関係もインストールする必要があります。たとえば CentOS では、sudo yum install collectd-write_kafka コマンドを実行して Kafka プラグインをインストールします。RPM パッケージマネージャー (RPM) パッケージのインストール方法については、「Collectd-write_kafka」をご参照ください。

  • 構成例

    • この例では出力形式を JSON に設定しています。Command や Graphite など、他の形式もサポートされています。詳細については、「Collectd 構成ドキュメント」をご参照ください。

    • 例に記載されている SLS_ で始まるパラメーターの詳細については、「構成」をご参照ください。

    LoadPlugin write_kafka
    
    <Plugin write_kafka>
      Property "metadata.broker.list" "SLS_KAFKA_ENDPOINT"
      Property "security.protocol" "sasl_ssl"
      Property "sasl.mechanism" "PLAIN"
      Property "sasl.username" "SLS_PROJECT"
      Property "sasl.password" "SLS_PASSWORD"
      Property "broker.address.family" "v4"
      <Topic "SLS_LOGSTORE">
        Format JSON
        Key "content"
      </Topic>
    </Plugin>

例 3:Telegraf を用いたログのアップロード

Telegraf は、Go 言語で実装されたエージェントプログラムであり、少ないメモリ使用量でデータメトリクスの収集、処理、および集計を行います。Telegraf は多様なプラグインおよび統合機能を提供します。Telegraf を使用してホストシステムからさまざまなメトリクスを取得したり、サードパーティ API からメトリクスを取得したり、statsd や Kafka コンシューマーサービスを介してメトリクスを受信したりできます。

Telegraf で収集したログを Kafka プロトコル経由で SLS へアップロードする前に、構成ファイルを変更する必要があります。

  • 構成例

    • この例では出力形式を JSON に設定しています。Graphite や Carbon2 など、他の形式もサポートされています。詳細については、「Telegraf 出力形式」をご参照ください。

      説明

      Telegraf では、tls_ca に対して有効なパスを設定する必要があります。サーバーが提供するルート証明書のパスを使用してください。Linux 環境では、ルート CA 証明書のパスは通常 /etc/ssl/certs/ca-bundle.crt です。

    • 例に記載されている SLS_ で始まるパラメーターの詳細については、「構成」をご参照ください。

    # Kafka 出力プラグインの構成
    [[outputs.kafka]]
      ## Kafka ブローカーの URL
      brokers = ["SLS_KAFKA_ENDPOINT"]
      ## プロデューサーがメッセージを送信する Kafka トピック
      topic = "SLS_LOGSTORE"
      routing_key = "content"
      ## CompressionCodec は、Kafka で認識されるさまざまな圧縮コーデックを表します。
      ## 0 : 圧縮なし
      ## 1 : Gzip 圧縮
      ## 2 : Snappy 圧縮
      ## 3 : LZ4 圧縮
      compression_codec = 1
      ## オプションの TLS 構成 tls_ca = "/etc/ssl/certs/ca-bundle.crt"
      tls_cert = "/etc/ssl/certs/ca-certificates.crt" # tls_key = "/etc/telegraf/key.pem"
      ## TLS を使用するが、チェーンおよびホスト検証をスキップ
      # insecure_skip_verify = false
      ## オプションの SASL 構成
      sasl_username = "SLS_PROJECT"
      sasl_password = "SLS_PASSWORD"
      ## 出力するデータ形式。
      ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
      data_format = "json"

例 4:Fluentd を用いたログのアップロード

Fluentd は、オープンソースのログコレクターであり、Apache 2 ライセンスのもとで Cloud Native Computing Foundation (CNCF) が開発したプロジェクトです。

Fluentd は、多様な入力、処理、出力プラグインをサポートしています。Kafka プラグインを使用してログを SLS へアップロードできます。Kafka プラグインをインストールおよび構成するだけで済みます。詳細については、「fluent-plugin-kafka」をご参照ください。

  • 構成例

    • この例では出力形式を JSON に設定しています。他にも多数の形式がサポートされています。詳細については、「Fluentd Formatter」をご参照ください。

    • 例に記載されている SLS_ で始まるパラメーターの詳細については、「構成」をご参照ください。

    <match **>
      @type kafka2
      brokers     SLS_KAFKA_ENDPOINT
      default_topic SLS_LOGSTORE
      default_message_key content
      sasl_over_ssl true
      use_event_time true
      username SLS_PROJECT
      password "SLS_PASSWORD"
      ssl_ca_certs_from_system true
      # ruby-kafka プロデューサーのオプション
      max_send_retries 1000
      required_acks 1
      compression_codec gzip
      use_event_time true
      max_send_limit_bytes 2097152
    
      <buffer hostlogs>
        flush_interval 10s
      </buffer>
      <format>
        @type json
      </format>
    </match>

例 5:Logstash を用いたログのアップロード

Logstash は、オープンソースのリアルタイムログ収集エンジンです。Logstash を使用して、さまざまなソースから動的にログを収集できます。

説明

Kafka プロトコルを用いてログをアップロードするには、Logstash 7.10.1 以降を使用する必要があります。

Logstash には組み込みの Kafka 出力プラグインがあります。Logstash を構成して、Kafka プロトコル経由でログを SLS へアップロードします。SLS では SASL_SSL 接続プロトコルが使用されるため、SSL 証明書および JAAS ファイルも構成する必要があります。

  • 構成例

    • この例では出力形式を JSON に設定しています。他にも多数の形式がサポートされています。詳細については、「Logstash Codec」をご参照ください。

      説明

      この例は接続性テスト用の構成を示しています。本番環境では、stdout 出力構成を削除することを推奨します。

    • 例に記載されている SLS_ で始まるパラメーターの詳細については、「構成」をご参照ください。

    
    output {
      stdout { codec => rubydebug }
      kafka {
        topic_id => "SLS_LOGSTORE"
        bootstrap_servers => "SLS_KAFKA_ENDPOINT"
        security_protocol => "SASL_SSL"
        sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='SLS_PROJECT' password='SLS_PASSWORD';"
        sasl_mechanism => "PLAIN"
        codec => "json"
        client_id => "kafka-logstash"
      }
    }

例 6:Fluent-bit を用いたログのアップロード

Fluent-bit は、軽量かつ高度にスケーラブルなログおよびメトリクスのプロセッサおよびフォワーダーです。多様な入力、処理、出力プラグインをサポートしています。Kafka プラグインを使用してログを SLS へアップロードできます。詳細については、「Kafka 出力プラグイン」をご参照ください。

  • 構成例

    例に記載されている SLS_ で始まるパラメーターの詳細については、「構成」をご参照ください。

    [Output]
        Name    kafka
        Match    *
        Brokers   SLS_KAFKA_ENDPOINT
        Topics    SLS_LOGSTORE
        Format    json
        rdkafka.sasl.username   SLS_PROJECT
        rdkafka.sasl.password   SLS_PASSWORD
        rdkafka.security.protocol   SASL_SSL
        rdkafka.sasl.mechanism      PLAIN

例 7:Kafka プロトコルを用いた Vector の構成

Vector は、軽量かつ高性能なログ処理ソフトウェアであり、Kafka プロトコルを用いたログレポートをサポートしています。以下では、Kafka 互換モードで Vector を構成して SLS へデータを書き込む方法について説明します。

  • 構成例

    例に記載されている SLS_ で始まるパラメーターの詳細については、「構成」をご参照ください。

    [sinks.aliyun_sls]
      type = "kafka"
      inputs = ["test_logs"]
      bootstrap_servers = "SLS_KAFKA_ENDPOINT"
      compression = "gzip"
      healthcheck = true
      topic = "SLS_LOGSTORE"
      encoding.codec = "json"
      sasl.enabled = true
      sasl.mechanism = "PLAIN"
      sasl.username = "SLS_PROJECT"
      sasl.password = "SLS_PASSWORD"
      tls.enabled = true

例 8:Kafka プロデューサーを用いたログのアップロード

Java

  • 依存関係

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.1.0</version>
    </dependency>
  • サンプルコード

    package org.example;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    public class KafkaProduceExample {
    
        public static void main(String[] args) {
            // 構成情報。
            Properties props = new Properties();
            String project = "etl-shanghai-b";
            String logstore = "testlog";
            // プロデューサーの内容を JSON ログとして解析する場合は、このパラメーターを true に設定します。
            boolean parseJson = false;
            // Alibaba Cloud アカウントはすべての API オペレーションに対して完全な権限を持ちます。これは高いセキュリティリスクを伴います。API オペレーションの呼び出しや日常的な運用・保守作業を行う際には、RAM ユーザーの作成および利用を推奨します。RAM ユーザーを作成するには、RAM コンソールにログインしてください。
            // このセクションでは、AccessKey ID および AccessKey Secret を環境変数に保存する例を示します。必要に応じて、構成ファイルに保存することも可能です。
            // セキュリティリスクを回避するため、コード内に AccessKey ID および AccessKey Secret を保存しないことを推奨します。
            String accessKeyID = System.getenv("SLS_ACCESS_KEY_ID");
            String accessKeySecret = System.getenv("SLS_ACCESS_KEY_SECRET");
            String endpoint = "cn-shanghai.log.aliyuncs.com"; // このパラメーターは、プロジェクトのエンドポイントに基づいて設定してください。
            String port = "10012"; // インターネットではポート 10012、内部ネットワークではポート 10011 を使用します。
    
            String hosts = project + "." + endpoint + ":" + port;
            String topic = logstore;
            if(parseJson) {
                topic = topic + ".json";
            }
    
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put("security.protocol", "sasl_ssl");
            props.put("sasl.mechanism", "PLAIN");
            props.put("sasl.jaas.config",
                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" +
                            project + "\" password=\"" + accessKeyID + "#" + accessKeySecret + "\";");
            props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
    
            // プロデューサーインスタンスを作成します。
            KafkaProducer<String,String> producer = new KafkaProducer<>(props);
    
            // レコードを送信します。
            for(int i=0;i<1;i++){
                String content = "{\"msg\": \"Hello World\"}";
                // 必要に応じて、以下のメソッドを使用してメッセージのタイムスタンプを設定できます。
                // long timestamp = System.currentTimeMillis();
                // ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, timestamp, null, content);
    
                ProducerRecord<String, String> record = new ProducerRecord<>(topic, content);
                producer.send(record, (metadata, exception) -> {
                    if (exception != null) {
                        System.err.println("ERROR: メッセージの送信に失敗しました:" + exception.getMessage());
                        exception.printStackTrace();
                    } else {
                        System.out.println("メッセージが正常に送信されました。トピック:" + metadata.topic() +
                                         "、パーティション:" + metadata.partition() +
                                         "、オフセット:" + metadata.offset() +
                                         "、タイムスタンプ:" + metadata.timestamp());
                    }
                });
            }
    
            producer.close();
        }
    }
    

Python

  • 依存関係

    pip install confluent-kafka
  • サンプルコード

    #!/bin/env python3
    import time
    import os
    from confluent_kafka import Producer
    
    def delivery_report(err, msg):
        """ 各メッセージの配信結果を示すために、メッセージごとに 1 回呼び出されます。
            poll() または flush() によってトリガーされます。 """
        if err is not None:
            print('メッセージの配信に失敗しました:{}'.format(err))
        else:
            print('メッセージが {} [{}] へ配信されました。オフセット:{}'.format(msg.topic(), msg.partition(), msg.offset()))
    
    def main():
        project = "etl-shanghai-b"
        logstore = "testlog"
        parse_json = False
    
        # 環境変数から認証情報を取得
        access_key_id = os.getenv("SLS_ACCESS_KEY_ID")
        access_key_secret = os.getenv("SLS_ACCESS_KEY_SECRET")
        endpoint = "cn-shanghai.log.aliyuncs.com"
        port = "10012" # インターネットではポート 10012、内部ネットワークではポート 10011 を使用します。
    
        hosts = f"{project}.{endpoint}:{port}"
        topic = logstore
        if parse_json:
            topic = topic + ".json"
    
        # Kafka プロデューサーを構成
        conf = {
            'bootstrap.servers': hosts,
            'security.protocol': 'sasl_ssl',
            'sasl.mechanisms': 'PLAIN',
            'sasl.username': project,
            'sasl.password': f"{access_key_id}#{access_key_secret}",
            'enable.idempotence': False,
        }
    
        # プロデューサーインスタンスを作成
        producer = Producer(conf)
    
        # メッセージを送信
        content = "{\"msg\": \"Hello World\"}"
        producer.produce(topic=topic,
                         value=content.encode('utf-8'),
                         #timestamp=int(time.time() * 1000),  # (オプション) レコードのタイムスタンプ(ミリ秒単位)を設定します。
                         callback=delivery_report)
    
        # 未送信のメッセージがすべて配信され、配信レポートのコールバックが呼び出されるのを待機します。
        producer.flush()
    
    if __name__ == '__main__':
        main()
    

Golang

  • 依存関係

    go get github.com/confluentinc/confluent-kafka-go/kafka
  • サンプルコード

    package main
    
    import (
    	"fmt"
    	"log"
    	"os"
    //	"time"
    
    	"github.com/confluentinc/confluent-kafka-go/kafka"
    )
    
    func main() {
    	project := "etl-shanghai-b"
    	logstore := "testlog"
    	parseJson := false
    
    	// 環境変数から認証情報を取得
    	accessKeyID := os.Getenv("SLS_ACCESS_KEY_ID")
    	accessKeySecret := os.Getenv("SLS_ACCESS_KEY_SECRET")
    	endpoint := "cn-shanghai.log.aliyuncs.com"
    	port := "10012" // インターネットではポート 10012、内部ネットワークではポート 10011 を使用します。
    
    	hosts := fmt.Sprintf("%s.%s:%s", project, endpoint, port)
    	topic := logstore
    	if parseJson {
    		topic = topic + ".json"
    	}
    
    	// Kafka プロデューサーを構成
    	config := &kafka.ConfigMap{
    		"bootstrap.servers":  hosts,
    		"security.protocol":  "sasl_ssl",
    		"sasl.mechanisms":    "PLAIN",
    		"sasl.username":      project,
    		"sasl.password":      accessKeyID + "#" + accessKeySecret,
    		"enable.idempotence": false,
    	}
    
    	// プロデューサーインスタンスを作成
    	producer, err := kafka.NewProducer(config)
    	if err != nil {
    		log.Fatalf("プロデューサーの作成に失敗しました:%v", err)
    	}
    	defer producer.Close()
    
    	// バッチでメッセージを送信
    	messages := []string{
    		"{\"msg\": \"Hello World 1\"}",
    		"{\"msg\": \"Hello World 2\"}",
    		"{\"msg\": \"Hello World 3\"}",
    	}
    
    	for _, content := range messages {
    		err := producer.Produce(&kafka.Message{
    			TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
    			Value:          []byte(content),
    			//Timestamp:      time.Now(), // 必要に応じて時間を設定
    		}, nil)
    
    		if err != nil {
    			log.Printf("メッセージの送信に失敗しました:%v", err)
    		}
    	}
    
    	// プロデューサーがメッセージを正常に送信したかどうかを監視するゴルーチンを有効化
    	go func() {
    		for e := range producer.Events() {
    			switch ev := e.(type) {
    			case *kafka.Message:
    				if ev.TopicPartition.Error != nil {
    					fmt.Printf("配信に失敗しました:%v\n", ev.TopicPartition.Error)
    				} else {
    					fmt.Printf("トピック %s [%d] へメッセージを配信しました。オフセット:%v\n",
    						*ev.TopicPartition.Topic, ev.TopicPartition.Partition, ev.TopicPartition.Offset)
    				}
    			}
    		}
    	}()
    
    	producer.Flush(5 * 1000)
    }

エラーメッセージ

Kafka プロトコルを用いたログのアップロードに失敗した場合、Kafka のエラーメッセージ形式でエラーメッセージが返されます。以下の表に、エラーメッセージを示します。Kafka プロトコルのエラーメッセージの詳細については、「エラー一覧」をご参照ください。

エラーメッセージ

説明

推奨される解決策

NetworkException

ネットワークエラーが発生しました。

1 秒待ってから再試行してください。

TopicAuthorizationException

認証に失敗しました。

このエラーは、指定された AccessKey ペアが無効であるか、指定されたプロジェクトまたは Logstore に対する書き込み権限を有していない場合に発生します。有効な AccessKey ペアおよび必要な書き込み権限を確認してください。

UnknownTopicOrPartitionException

このエラーは、以下のいずれかの理由で発生する可能性があります:

  • 指定されたプロジェクトまたは Logstore が存在しません。

  • プロジェクトのリージョンとエンドポイントで指定されたリージョンが異なります。

指定されたプロジェクトおよび Logstore が存在することを確認してください。エラーが継続する場合は、プロジェクトのリージョンとエンドポイントのリージョンが一致していることを確認してください。

KafkaStorageException

サーバーで例外が発生しました。

1 秒待ってから再試行してください。