您可以使用Kafka Producer SDK、Beats系列軟體、Collectd、Fluentd、Logstash、Telegraf、Vector等採集工具採集日誌,並通過Kafka協議上傳到Log Service。本文介紹通過採集工具採集到日誌後,利用Kafka協議上傳日誌到Log Service的操作步驟。
相關限制
支援的Kafka協議版本最低為2.1.0。
為保證日誌傳輸安全性,必須使用SASL_SSL連線協定。
許可權說明
以下兩個許可權規則滿足其中之一即可。
自訂權限原則
建立一個自訂權限原則,其中在指令碼編輯頁簽,請使用以下指令碼替換配置框中的原有內容。具體操作,請參見建立自訂權限原則。
說明指令碼中的
Project名稱請根據實際情況替換。{ "Version": "1", "Statement": [ { "Action": "log:GetProject", "Resource": "acs:log:*:*:project/project名稱", "Effect": "Allow" }, { "Action": [ "log:GetLogStore", "log:ListShards", "log:PostLogStoreLogs" ], "Resource": "acs:log:*:*:project/project名稱/logstore/*", "Effect": "Allow" } ] }為RAM使用者添加建立的自訂權限原則。具體操作,請參見為RAM使用者授權。
配置方式
使用Kafka協議上傳日誌時,您需要配置以下參數。
配置名 | 配置值 | 說明 | 樣本 |
SLS_KAFKA_ENDPOINT | 初始串連的叢集地址,格式為 |
| aliyun-project-test為Project名稱,
|
SLS_PROJECT | Project名稱 | Log Service對應的Project名稱。 | aliyun-project-test |
SLS_LOGSTORE | Logstore名稱 | Log Service對應的Logstore名稱。Logstore名稱尾碼加上 | 例如Logstore名稱是
|
SLS_PASSWORD | 具備sls寫入許可權的AccessKeySecret。 | AK的概念和建立步驟,請參見建立AccessKey。 值為AccessKey ID和AccessKey Secret用
| LTAI****************#yourAccessKeySecret |
如果您要通過Kafka消費組即時消費Log Service的資料,請提交工單諮詢阿里雲支援人員工程師。
樣本一:通過Beats系列軟體上傳日誌
Beats系列軟體(MetricBeat、PacketBeat、Winlogbeat、Auditbeat、Filebeat、Heartbeat等)採集到日誌後,支援通過Kafka協議將日誌上傳到Log Service。
配置樣本
樣本中用到的
SLS_開頭的參數配置請參見配置方式。output.kafka: # initial brokers for reading cluster metadata hosts: ["SLS_KAFKA_ENDPOINT"] username: "SLS_PROJECT" password: "SLS_PASSWORD" ssl.certificate_authorities: # message topic selection + partitioning topic: 'SLS_LOGSTORE' partition.round_robin: reachable_only: false required_acks: 1 compression: gzip max_message_bytes: 1000000
樣本二:通過Collectd上傳日誌
Collectd是一個守護(daemon)進程,用於定期採集系統和應用程式的效能指標,並支援通過Kafka協議上傳到Log Service。更多資訊,請參見Write Kafka Plugin。
將Collectd採集到日誌上傳到Log Service時,還需安裝Kafka外掛程式以及相關依賴。例如:在Linux CentOS中,可以使用yum安裝Kafka外掛程式,命令為sudo yum install collectd-write_kafka,安裝RPM請參見Collectd-write_kafka。
配置樣本
樣本中將日誌輸出格式(Format)設定為JSON,除此之外,還支援Command、Graphite類型。更多資訊,請參見Collectd配置文檔。
樣本中用到的
SLS_開頭的參數配置請參見配置方式。
LoadPlugin write_kafka <Plugin write_kafka> Property "metadata.broker.list" "SLS_KAFKA_ENDPOINT" Property "security.protocol" "sasl_ssl" Property "sasl.mechanism" "PLAIN" Property "sasl.username" "SLS_PROJECT" Property "sasl.password" "SLS_PASSWORD" Property "broker.address.family" "v4" <Topic "SLS_LOGSTORE"> Format JSON Key "content" </Topic> </Plugin>
樣本三:使用Telegraf上傳日誌
Telegraf是由Go語言編寫的代理程式,記憶體佔用小,用於收集、處理、摘要資料指標。Telegraf具有豐富的外掛程式及整合功能,可從其啟動並執行系統中擷取各種指標、從第三方API中擷取指標以及通過statsd和Kafka消費者服務監聽指標。
將Telegraf採集到的日誌通過Kafka協議上傳到Log Service前,您需要先修改設定檔。
配置樣本
樣本中將日誌輸出格式(Format)設定為JSON,除此之外還支援Graphite、Carbon2等類型。更多資訊,請參見Telegraf輸出格式。
說明Telegraf必須配置一個合法的tls_ca路徑,使用伺服器內建的根憑證的路徑即可。Linux環境中,根憑證CA路徑一般為/etc/ssl/certs/ca-bundle.crt。
樣本中用到的
SLS_開頭的參數配置請參見配置方式。
# Kafka output plugin configuration [[outputs.kafka]] ## URLs of kafka brokers brokers = ["SLS_KAFKA_ENDPOINT"] ## Kafka topic for producer messages topic = "SLS_LOGSTORE" routing_key = "content" ## CompressionCodec represents the various compression codecs recognized by ## Kafka in messages. ## 0 : No compression ## 1 : Gzip compression ## 2 : Snappy compression ## 3 : LZ4 compression compression_codec = 1 ## Optional TLS Config tls_ca = "/etc/ssl/certs/ca-bundle.crt" tls_cert = "/etc/ssl/certs/ca-certificates.crt" # tls_key = "/etc/telegraf/key.pem" ## Use TLS but skip chain & host verification # insecure_skip_verify = false ## Optional SASL Config sasl_username = "SLS_PROJECT" sasl_password = "SLS_PASSWORD" ## Data format to output. ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md data_format = "json"
樣本四:使用Fluentd上傳日誌
Fluentd是一個開源的記錄收集器,是雲端原生計算基金會(CNCF)的成員專案之一,遵循Apache 2 License協議。
Fluentd支援眾多輸入、處理、輸出外掛程式,支援通過Kafka外掛程式將日誌上傳到Log Service,您只需安裝並配置Kafka外掛程式即可。更多資訊,請參見fluent-plugin-kafka。
配置樣本
樣本中將日誌輸出格式(Format)設定為JSON,除此之外還支援數十種Format類型。更多資訊,請參見Fluentd Formatter。
樣本中用到的
SLS_開頭的參數配置請參見配置方式。
<match **> @type kafka2 brokers SLS_KAFKA_ENDPOINT default_topic SLS_LOGSTORE default_message_key content sasl_over_ssl true use_event_time true username SLS_PROJECT password "SLS_PASSWORD" ssl_ca_certs_from_system true # ruby-kafka producer options max_send_retries 1000 required_acks 1 compression_codec gzip use_event_time true max_send_limit_bytes 2097152 <buffer hostlogs> flush_interval 10s </buffer> <format> @type json </format> </match>
樣本五:使用Logstash上傳日誌
Logstash是一個具備即時處理能力、開源的日誌採集引擎,可以動態採集不同來源的日誌。
Kafka協議上傳日誌功能要求Logstash的版本號碼至少為7.10.1。
Logstash內建Kafka輸出外掛程式,您可以配置Logstash實現日誌通過Kafka協議上傳到Log Service。由於Log Service使用SASL_SSL連線協定,因此還需要配置SSL認證和jaas檔案。
配置樣本
樣本中將日誌輸出格式(Format)設定為JSON,除此之外還支援數十種Format類型。更多資訊,請參見Logstash Codec。
說明本樣本為連通性測試的配置,您的生產環境中建議刪除stdout的輸出配置。
樣本中用到的
SLS_開頭的參數配置請參見配置方式。
output { stdout { codec => rubydebug } kafka { topic_id => "SLS_LOGSTORE" bootstrap_servers => "SLS_KAFKA_ENDPOINT" security_protocol => "SASL_SSL" sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='SLS_PROJECT' password='SLS_PASSWORD';" sasl_mechanism => "PLAIN" codec => "json" client_id => "kafka-logstash" } }
樣本六:通過Fluent-bit上傳日誌
Fluent-bit是一個輕量級、高可擴充的日誌與指標的處理器、轉寄站,支援眾多輸入、處理和輸出外掛程式,支援通過Kafka外掛程式將日誌上傳到Log Service。更多資訊,請參見Kafka output plugin。
配置樣本
樣本中用到的
SLS_開頭的參數配置請參見配置方式。[Output] Name kafka Match * Brokers SLS_KAFKA_ENDPOINT Topics SLS_LOGSTORE Format json rdkafka.sasl.username SLS_PROJECT rdkafka.sasl.password SLS_PASSWORD rdkafka.security.protocol SASL_SSL rdkafka.sasl.mechanism PLAIN
樣本七 :Vector配置Kafka協議上傳
Vector是一款輕量級、高效能的Tlog軟體,它支援Kafka協議的方式上報日誌。下面是Vector通過Kafka相容模式寫入SLS的配置方法。
配置樣本
樣本中用到的
SLS_開頭的參數配置請參見配置方式。[sinks.aliyun_sls] type = "kafka" inputs = ["test_logs"] bootstrap_servers = "SLS_KAFKA_ENDPOINT" compression = "gzip" healthcheck = true topic = "SLS_LOGSTORE" encoding.codec = "json" sasl.enabled = true sasl.mechanism = "PLAIN" sasl.username = "SLS_PROJECT" sasl.password = "SLS_PASSWORD" tls.enabled = true
樣本八:通過Kafka生產者(produce)上傳日誌
Java
依賴
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.1.0</version> </dependency>程式碼範例
package org.example; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class KafkaProduceExample { public static void main(String[] args) { //配置資訊。 Properties props = new Properties(); String project = "etl-shanghai-b"; String logstore = "testlog"; // 如果希望produce的內容被json解析展開,則設定為true boolean parseJson = false; // 阿里雲帳號AccessKey擁有所有API的存取權限,風險很高。強烈建議您建立並使用RAM使用者進行API訪問或日常營運,請登入RAM控制台建立RAM使用者。 // 此處以把AccessKey 和 AccessKeySecret 儲存在環境變數為例說明。您可以根據業務需要,儲存到設定檔裡。 // 強烈建議不要把 AccessKey 和 AccessKeySecret 儲存到代碼裡,會存在密鑰泄漏風險 String accessKeyID = System.getenv("SLS_ACCESS_KEY_ID"); String accessKeySecret = System.getenv("SLS_ACCESS_KEY_SECRET"); String endpoint = "cn-shanghai.log.aliyuncs.com"; // 根據實際project所在的endpoint配置 String port = "10012"; // 公網用10012,私網用10011 String hosts = project + "." + endpoint + ":" + port; String topic = logstore; if(parseJson) { topic = topic + ".json"; } props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put("security.protocol", "sasl_ssl"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + project + "\" password=\"" + accessKeyID + "#" + accessKeySecret + "\";"); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false"); //建立生產者執行個體。 KafkaProducer<String,String> producer = new KafkaProducer<>(props); //發送記錄 for(int i=0;i<1;i++){ String content = "{\"msg\": \"Hello World\"}"; // 如果有需要可以用下面的方式設定訊息的時間戳記 // long timestamp = System.currentTimeMillis(); // ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, timestamp, null, content); ProducerRecord<String, String> record = new ProducerRecord<>(topic, content); producer.send(record, (metadata, exception) -> { if (exception != null) { System.err.println("ERROR: Failed to send message: " + exception.getMessage()); exception.printStackTrace(); } else { System.out.println("Message sent successfully to topic: " + metadata.topic() + ", partition: " + metadata.partition() + ", offset: " + metadata.offset() + ", timestamp: " + metadata.timestamp()); } }); } producer.close(); } }
Python
依賴
pip install confluent-kafka程式碼範例
#!/bin/env python3 import time import os from confluent_kafka import Producer def delivery_report(err, msg): """ Called once for each message produced to indicate delivery result. Triggered by poll() or flush(). """ if err is not None: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}] at offset {}'.format(msg.topic(), msg.partition(), msg.offset())) def main(): project = "etl-shanghai-b" logstore = "testlog" parse_json = False # Get credentials from environment variables access_key_id = os.getenv("SLS_ACCESS_KEY_ID") access_key_secret = os.getenv("SLS_ACCESS_KEY_SECRET") endpoint = "cn-shanghai.log.aliyuncs.com" port = "10012" # 公網用10012,私網用10011 hosts = f"{project}.{endpoint}:{port}" topic = logstore if parse_json: topic = topic + ".json" # Configure Kafka producer conf = { 'bootstrap.servers': hosts, 'security.protocol': 'sasl_ssl', 'sasl.mechanisms': 'PLAIN', 'sasl.username': project, 'sasl.password': f"{access_key_id}#{access_key_secret}", 'enable.idempotence': False, } # Create producer instance producer = Producer(conf) # Send message content = "{\"msg\": \"Hello World\"}" producer.produce(topic=topic, value=content.encode('utf-8'), #timestamp=int(time.time() * 1000), # (可選) 設定record時間戳記, 單位毫秒 callback=delivery_report) # Wait for any outstanding messages to be delivered and delivery report # callbacks to be triggered. producer.flush() if __name__ == '__main__': main()
Golang
依賴
go get github.com/confluentinc/confluent-kafka-go/kafka程式碼範例
package main import ( "fmt" "log" "os" // "time" "github.com/confluentinc/confluent-kafka-go/kafka" ) func main() { project := "etl-shanghai-b" logstore := "testlog" parseJson := false // Get credentials from environment variables accessKeyID := os.Getenv("SLS_ACCESS_KEY_ID") accessKeySecret := os.Getenv("SLS_ACCESS_KEY_SECRET") endpoint := "cn-shanghai.log.aliyuncs.com" port := "10012" // 公網用10012,私網用10011 hosts := fmt.Sprintf("%s.%s:%s", project, endpoint, port) topic := logstore if parseJson { topic = topic + ".json" } // Configure Kafka producer config := &kafka.ConfigMap{ "bootstrap.servers": hosts, "security.protocol": "sasl_ssl", "sasl.mechanisms": "PLAIN", "sasl.username": project, "sasl.password": accessKeyID + "#" + accessKeySecret, "enable.idempotence": false, } // Create producer instance producer, err := kafka.NewProducer(config) if err != nil { log.Fatalf("Failed to create producer: %v", err) } defer producer.Close() // 批量發送訊息 messages := []string{ "{\"msg\": \"Hello World 1\"}", "{\"msg\": \"Hello World 2\"}", "{\"msg\": \"Hello World 3\"}", } for _, content := range messages { err := producer.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(content), //Timestamp: time.Now(), // 如有需要可以設定時間 }, nil) if err != nil { log.Printf("Failed to produce message: %v", err) } } // 啟用一個go routine 監聽producer發送是否成功或者失敗 go func() { for e := range producer.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n", ev.TopicPartition.Error) } else { fmt.Printf("Delivered message to topic %s [%d] at offset %v\n", *ev.TopicPartition.Topic, ev.TopicPartition.Partition, ev.TopicPartition.Offset) } } } }() producer.Flush(5 * 1000) }
錯誤資訊
使用Kafka協議上傳日誌失敗時,會按照Kafka的錯誤資訊返回對應的錯誤資訊,如下表所示,Kafka協議錯誤資訊詳情,請參見error list。
錯誤資訊 | 說明 | 推薦解決方式 |
NetworkException | 出現網路錯誤時返回該錯誤資訊。 | 一般等待1秒後重試即可。 |
TopicAuthorizationException | 鑒權失敗時返回該錯誤資訊。 | 一般是您提供的AccessKey錯誤或沒有寫入對應Project、Logstore的許可權。請填寫正確的且具備寫入許可權的AccessKey。 |
UnknownTopicOrPartitionException | 出現該錯誤可能有兩種原因:
| 請確保已建立對應的Project和Logstore。如果已建立還是提示該錯誤,請檢查Project所在地區是否和填入的Endpoint一致。 |
KafkaStorageException | 服務端出現異常時返回該錯誤資訊。 | 一般等待1秒後重試即可。 |