You can use KafkaProducer SDKs or collection agents to collect logs and upload the collected logs to Log Service by using the Kafka protocol. This topic describes how to upload logs to Log Service by using the Kafka protocol.
Limits
- Only Kafka 0.8.0 to Kafka 2.1.1 (message format version 2) are supported.
- You must use the SASL_SSL protocol to ensure the security of log transmission.
- If your Logstore contains multiple shards, you must upload logs in load balancing mode.
- You can use the Kafka protocol to upload only the logs that are collected by using KafkaProducer SDKs or collection agents to Log Service.
Data parsing
Logs that are uploaded by using the Kafka protocol are stored in the content field. If the logs are of the JSON type, you can set a JSON index for the content field. For more information, see JSON type.
If you use a Kafka producer or Beats to upload logs, you can configure the topic or headers parameter in the collection configuration to automatically display logs in the JSON format. Log Service automatically expands the content field. In this case, you do not need to configure a JSON index for the content field. For more information, see Configurations.
Configurations
Parameter | Description |
---|---|
Connection type | The security protocol. You must use the SASL_SSL protocol to ensure the security of log transmission. |
hosts | The address to which an initial connection is established. You can specify the endpoint
of a Log Service project in the Project name.Endpoint format. The endpoint varies based on the region of your Log Service project. For
more information, see Endpoints.
|
topic | The name of the Logstore.
If you use a Kafka producer or Beats to upload logs and specify the output format
as JSON, you can set the topic parameter to a value in the |
headers | If you use a Kafka producers or Beats to upload logs and specify the output format
as JSON, you can specify the following value for the headers parameter to automatically expand JSON logs:
For more information, see Example 1: Use Beats to upload logs . |
username | The name of the project. |
password | The AccessKey pair that is in the ${access-key-id}#${access-key-secret} format. Replace ${access-key-id} and ${access-key-secret} with your AccessKey ID and AccessKey secret. We recommend that you use the AccessKey pair of a RAM user. For more information, see Create a RAM user and authorize the RAM user to access Log Service. |
Certificate file | The certificate file of the endpoint. Each endpoint of Log Service has a certificate. Set this parameter to the path to the root certificate on your server. Example: /etc/ssl/certs/ca-bundle.crt. |
Example 1: Use Beats to upload logs
You can use Beats such as Metricbeat, Packetbeat, Winlogbeat, Auditbeat, Filebeat, and Heartbeat to collect logs. After the logs are collected, you can use the Kafka protocol to upload the logs to Log Service. For more information, see Beats-Kafka-Output.
- Example 1
- Configuration example
output.kafka: # initial brokers for reading cluster metadata hosts: ["test-project-1.cn-hangzhou.log.aliyuncs.com:10012"] username: "yourusername" password: "yourpassword" ssl.certificate_authorities: # message topic selection + partitioning topic: 'test-logstore-1' partition.round_robin: reachable_only: false required_acks: 1 compression: gzip max_message_bytes: 1000000
- Sample log
By default, Beats provides JSON-formatted logs. The logs are uploaded to Log Service and stored in the content field. You can create a JSON index for the content field. For more information, see JSON type.
- Configuration example
- Example 2
- Configuration example
output.kafka: enabled: true hosts: ["cn-hangzhou-intranet.log.aliyuncs.com:10011"] username: "test-project-1" password: "access-key-id#access-key-secret" ssl.certificate_authorities: topic: 'test-logstore-1' headers: - key: "data-parse-format" value: "json" partition.hash: reachable_only: false
- Sample log
You can configure the headers parameter to automatically expand JSON logs.
- Configuration example
Example 2: Use Collectd to upload logs
collectd is a daemon process that periodically collects the metrics of systems and applications. You can upload the collected metrics to Log Service by using the Kafka protocol. For more information about collectd, see collectd. For more information about how to upload metrics to Log Service, see Write Kafka Plugin.
Before you upload the logs that are collected by using collectd to Log Service, you
must install the collectd-write_kafka plug-in and related dependencies. On a CentOS
server, you can run the sudo yum install collectd-write_kafka
command to install the collectd-write_kafka plug-in. For more information about how
to install RPM Package Manager (RPM) packages, visit RPM resource collectd-write_kafka.
- Configuration example
In this example, collectd provides JSON-formatted logs. Command- and Graphite-formatted logs are also supported. For more information, see collectd.
<Plugin write_kafka> Property "metadata.broker.list" "test-project-1.cn-hangzhou.log.aliyuncs.com:10012" Property "security.protocol" "sasl_ssl" Property "sasl.mechanism" "PLAIN" Property "sasl.username" "yourusername" Property "sasl.password" "yourpassword" Property "broker.address.family" "v4" <Topic "test-logstore-1"> Format JSON Key "content" </Topic> </Plugin>
- Sample log
After JSON-formatted logs are uploaded to Log Service and stored in the content field, you can create a JSON index for the content field. For more information, see JSON type.
Example 3: Use Telegraf to upload logs
Telegraf is an agent in the Go programming language and is used to collect, process, and aggregate metrics. Telegraf consumes only a small amount of memory resources. For more information, see Telegraf. Telegraf provides various plug-ins and integration capabilities. You can use Telegraf to retrieve metrics from the systems on which Telegraf runs or from third-party APIs. You can also use Telegraf to monitor metrics by using StatsD and Kafka consumers.
Before you upload the logs that are collected by using Telegraf to Log Service, you must modify the configuration file of Telegraf.
- Configuration example
In this example, Telegraf provides JSON-formatted logs. Graphite- and Carbon2-formatted logs are also supported. For more information, see Telegraf.Note You must specify a valid path for tls_ca. You can specify the path to the root certificate on your server. In most cases, the path to the root certificate on a Linux server is /etc/ssl/certs/ca-bundle.crt.
[[outputs.kafka]] ## URLs of kafka brokers brokers = ["test-project-1.cn-hangzhou.log.aliyuncs.com:10012"] ## Kafka topic for producer messages topic = "test-logstore-1" routing_key = "content" ## CompressionCodec represents the various compression codecs recognized by ## Kafka in messages. ## 0 : No compression ## 1 : Gzip compression ## 2 : Snappy compression ## 3 : LZ4 compression compression_codec = 1 ## Optional TLS Config tls_ca = "/etc/ssl/certs/ca-bundle.crt" # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" ## Use TLS but skip chain & host verification # insecure_skip_verify = false ## Optional SASL Config sasl_username = "yourusername" sasl_password = "yourpassword" ## Data format to output. ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md data_format = "json"
- Sample log
After JSON-formatted logs are uploaded to Log Service and stored in the content field, you can create a JSON index for the content field. For more information, see JSON type.
Example 4: Use Fluentd to upload logs
Fluentd is an open source log collector. Fluentd is a project under the Cloud Native Computing Foundation (CNCF), and all components are available under the Apache 2 License. For more information, see Fluentd.
Fluentd supports various input, processing, and output plug-ins. You can collect logs by using Fluentd and upload the collected logs to Log Service by using the fluent-plugin-kafka plug-in. You need to only install and configure the plug-in. For more information, see fluent-plugin-kafka.
- Configuration example
In this example, Fluentd provides JSON-formatted logs. Dozens of formats are supported. For more information, see Fluentd Formatter.
<match **> @type kafka # Brokers: you can choose either brokers or zookeeper. brokers test-project-1.cn-hangzhou.log.aliyuncs.com:10012 default_topic test-logstore-1 default_message_key content output_data_type json output_include_tag true output_include_time true sasl_over_ssl true username yourusername // Replace yourusername with the actual value. password yourpassword // Replace yourpassword with the actual value. ssl_ca_certs_from_system true # ruby-kafka producer options max_send_retries 10000 required_acks 1 compression_codec gzip </match>
- Sample log
After JSON-formatted logs are uploaded to Log Service and stored in the content field, you can create a JSON index for the content field. For more information, see JSON type.
Example 5: Use Logstash to upload logs
Logstash is an open source log collection engine that provides real-time processing capabilities. You can use Logstash to dynamically collect logs from different sources. For more information, see Logstash.
- Configuration example
- Create a JAAS file and save the file to a directory. Example: /etc/kafka/kafka_client_jaas.conf.
Add the following content to the JAAS file:
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="yourusername" password="yourpassword"; };
- Configure an SSL certificate and save the certificate to a directory. Example: /etc/kafka/client-root.truststore.jks.
Download the root certificate and save the certificate to a directory. Example: /etc/kafka/root.pem. Run a keytool command to generate a file in the .jks format. The first time you run the command to generate a file, you must configure a password.
keytool -keystore client-root.truststore.jks -alias root -import -file /etc/kafka/root.pem
- Configure Logstash.
In this example, Logstash provides JSON-formatted logs. Dozens of formats are supported. For more information, see Logstash Codec.Note The following configuration is used for a connectivity test. In a production environment, we recommend that you remove the stdout field.
input { stdin { } } output { stdout { codec => rubydebug } kafka { topic_id => "test-logstore-1" bootstrap_servers => "test-project-1.cn-hangzhou.log.aliyuncs.com:10012" security_protocol => "SASL_SSL" ssl_truststore_location => "/etc/client-root.truststore.jks" ssl_truststore_password => "123456" jaas_path => "/etc/kafka_client_jaas.conf" sasl_mechanism => "PLAIN" codec => "json" client_id => "kafka-logstash" } }
- Create a JAAS file and save the file to a directory. Example: /etc/kafka/kafka_client_jaas.conf.
- Sample log
After JSON-formatted logs are uploaded to Log Service and stored in the content field, you can create a JSON index for the content field. For more information, see JSON type.
Example 6: Use a Kafka producer to upload logs
If you use a Kafka producer to upload logs to Log Service, you can configure the topic or headers parameter in the collection configuration to automatically expand JSON logs.
- Configuration example
public static void produce(){ // The configuration information. Properties props2 = new Properties(); String project = "etl-test-tlj"; String topic = "test3.json"; props2.put("bootstrap.servers", "kafka.log.aliyuncs.com:9093"); props2.put("security.protocol", "sasl_ssl"); props2.put("sasl.mechanism", "PLAIN"); props2.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+ project +"\" password=\"access-key-id#access-key-secret\";"); // Specify the serialization class for data keys and values. props2.put("key.serializer", StringSerializer.class); props2.put("value.serializer", StringSerializer.class); // Create a producer instance. KafkaProducer<String,String> producer = new KafkaProducer<>(props2); // Send records. for(int i=0;i<1;i++){ ProducerRecord record = new ProducerRecord<String, String>(topic, "{\"logName\": \"error4\"}"); record.headers().add(new RecordHeader("data-parse-format","json".getBytes())); producer.send(record); } producer.close(); }
- Sample log
Error messages
Error message | Description | Solution |
---|---|---|
NetworkException | The error message returned because a network exception occurred. | Try again after 1 second. |
TopicAuthorizationException | The error message returned because the authentication failed. | If your AccessKey pair is invalid or the pair does not have permissions to write data to the specified project or Logstore, the authentication fails. In this case, enter a valid AccessKey pair and make sure that the pair has the required write permissions. |
UnknownTopicOrPartitionException | The error message returned because one of the following errors occurred:
|
Make sure that the project or Logstore that you specify exists. If the specified project or Logstore is created but the error persists, check whether the region where the specified project resides is the same as the region of the specified endpoint. |
KafkaStorageException | The error message returned because a server error occurred. | Try again after 1 second. |