Log Service allows you to write log data in compliance with the Kafka protocol. You can use collection agents or Kafka Producer SDKs in various languages to write data into Log Service.

Limits

  • The supported Kafka protocol versions are from Kafka 0.8.0 to Kafka 2.1.1.
  • You must use the SASL_SSL connection protocol for secure data transmission.
  • If your Logstore contains multiple shards, you must write data in load balancing mode.
  • Currently, you can use only collection agents or Kafka Producer SDKs to write data into Log Service in compliance with the Kafka protocol.

Configuration

The following table describes parameters required for writing data into Log Service in compliance with the Kafka protocol.
Parameter Description Example
Connection protocol The connection protocol for secure data transmission. This parameter must be set to SASL_SSL. SASL_SSL
hosts The address of the cluster node that you initially connect to Log Service. The port number for the address of a classic network or VPC is 10011. The port number for an Internet address is 10012. Select the service endpoint where your target project resides. For more information, see Service endpoint.
  • cn-hangzhou-intranet.log.aliyuncs.com:10011
  • cn-hangzhou.log.aliyuncs.com:10012
topic The name of a Kafka topic mapped to the name of a Logstore in Log Service. You must create a Logstore in advance. test-logstore-1
username The username mapped to the name of a project in Log Service. <yourusername>
password The information about your AccessKey pair, which is in the format of ${access-key-id}#${access-key-secret}. You must replace ${access-key-id} with your AccessKey ID and ${access-key-secret} with your AccessKey secret. We recommend that you use the AccessKey pair of your RAM user. For more information, see Grant a RAM user the permission to access Log Service. <yourpassword>
Certificate The directory of a certificate. Each domain name in Log Service has a CA certificate. You only need to use the default root certificate of a server. /etc/ssl/certs/ca-bundle.crt

Error codes

If an error occurs when you collect logs in compliance with the Kafka protocol, Log Service returns a Kafka error code with the specific cause of failure. For more information about Kafka error codes, see the error list. The following table describes the specific error codes, their descriptions, and corresponding solutions.

Error code Description Solution
NetworkException The error message returned because a network error has occurred. If this error occurs, wait for one second and try again.
TopicAuthorizationException The error message returned because authentication failed. The authentication failed because your AccessKey pair is invalid or you have no permission to write data into the corresponding project or Logstore. Enter a valid AccessKey pair that has the required write permission.
UnknownTopicOrPartitionException The error message returned because either of the following errors has occurred:
  • The corresponding project or Logstore does not exist.
  • The region where the project resides is different from the region indicated by the endpoint that you entered.
  1. Create a project and a Logstore in advance.
  2. Ensure that the region where the project resides is the same as the region indicated by the endpoint that you entered.
KafkaStorageException The error message returned because a server error has occurred. If this error occurs, wait for one second and try again.

Configuration examples

Assume that you want to write data into Log Service with the following configurations. The project in Log Service is named test-project-1 and the Logstore is named test-logstore-1. The region where the project resides is cn-hangzhou. The AccessKey ID of the RAM user with the corresponding write permission is <yourAccessKeyId> and the AccessKey secret is <yourAccessKeySecret>.

  • Example 1: Use Beats software to write data into Log Service
    You can export the collected data to Kafka by using Beats software such as Metricbeat, Packetbeat, Winlogbeat, Auditbeat, Filebeat, and Heartbeat. For more information, see Beats-Kafka-Output. The sample code is as follows:
    output.kafka: 
      # initial brokers for reading cluster metadata 
      hosts: ["cn-hangzhou.log.aliyuncs.com:10012"] 
      username: "<yourusername>" 
      password: "<yourpassword>" 
      ssl.certificate_authorities: 
      # message topic selection + partitioning 
      topic: 'test-logstore-1' 
      partition.round_robin: 
        reachable_only: false 
    
      required_acks: 1 
      compression: gzip 
      max_message_bytes: 1000000
    By default, Beats software exports JSON-formatted logs to Kafka. You can also create a JSON type index for the content field. For more information, see JSON type. The following figure shows a sample log.
  • Example 2: Use Collectd to write data into Log Service

    Collectd is a daemon used to collect the performance metrics of a system or application regularly. You can also use Collectd to export the collected data to Kafka. For more information, see Write Kafka plug-in.

    If you want to export the collected data from Collectd to Kafka, you must install the Write Kafka plug-in and relevant dependencies. In the Community Enterprise Operating System (CentOS), you can directly run the sudo yum install collectd-write_kafka command to install the plug-in. For more information about Red-Hat Package Manager (RPM) resources, see RPM resource collectd-write_kafka.

    The sample code is as follows:
    <Plugin write_kafka>
      Property "metadata.broker.list" "cn-hangzhou.log.aliyuncs.com:10012" 
      Property "security.protocol" "sasl_ssl" 
      Property "sasl.mechanism" "PLAIN" 
      Property "sasl.username" "<yourusername>" 
      Property "sasl.password" "<yourpassword>" 
      Property "broker.address.family" "v4"  
      <Topic "test-logstore-1">
        Format JSON 
        Key "content"  
      </Topic>
    </Plugin>
    						

    In the preceding sample code, the format of the data that is exported to Kafka is set to JSON. In addition to JSON, Collectd also supports the Command and Graphite formats. For more information, see Collectd configuration documentation.

    If you use the JSON format, you can create a JSON type index for the content field. For more information, see JSON type. The following figure shows a sample log.
  • Example 3: Use Telegraf to write data into Log Service

    Telegraf is a sub-project of InfluxData. It is the agent compiled in the Go language for collecting, processing, and aggregating metrics. Telegraf is designed to minimize memory consumption. It can be used to build services and collect the metrics of a third-party component with plug-ins. In addition, Telegraf supports integration. It can obtain metrics from the system where it runs, obtain metrics by calling a third-party API operation, and even monitor metrics with StatsD and Kafka consumer services.

    Telegraf can export data to Kafka. Therefore, you only need to modify the configuration file to use Telegraf to collect data and write the data into Log Service. The sample code is as follows:
    [[outputs.kafka]] 
      ## URLs of kafka brokers 
      brokers = ["cn-hangzhou.log.aliyuncs.com:10012"] 
      ## Kafka topic for producer messages 
      topic = "test-logstore-1" 
      routing_key = "content" 
      ## CompressionCodec represents the various compression codecs recognized by 
      ## Kafka in messages. 
      ## 0 : No compression 
      ## 1 : Gzip compression 
      ## 2 : Snappy compression 
      ## 3 : LZ4 compression 
      compression_codec = 1 
      ## Optional TLS Config tls_ca = "/etc/ssl/certs/ca-bundle.crt" 
      # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" 
      ## Use TLS but skip chain & host verification 
      # insecure_skip_verify = false 
      ## Optional SASL Config 
      sasl_username = "<yourusername>" 
      sasl_password = "<yourpassword>" 
      ## Data format to output. 
      ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md 
      data_format = "json"
    Note You must set a valid tls_ca directory for Telegraf so that you only need to use the default root certificate. Typically, the root certificate directory in a Linux environment is /etc/ssl/certs/ca-bundle.crt.
    In the preceding sample code, the format of the data exported to Kafka is set to JSON. In addition to JSON, Telegraf also supports other formats such as Graphite and Carbon2. For more information, see Telegraf output data formats.
    If you use the JSON format, you can create a JSON type index for the content field. For more information, see JSON type. The following figure shows a sample log.
  • Example 4: Use Fluentd to write data into Log Service

    Fluentd is an open-source data collector that provides a unified logging layer. Fluentd allows you to collect data in a uniform manner so that you can easily use and understand the data. Fluentd is a member project of Cloud Native Computing Foundation (CNCF). It complies with the Apache 2 License protocol.

    Fluentd provides many input, processing, and output plug-ins. Specifically, the Kafka plug-in of Fluentd can export data to Kafka. You only need to install and configure this plug-in.

    The sample code is as follows:
    <match **>
      @type kafka 
      # Brokers: you can choose either brokers or zookeeper. 
      brokers      cn-hangzhou.log.aliyuncs.com:10012 
      default_topic test-logstore-1 
      default_message_key content 
      output_data_type json 
      output_include_tag true 
      output_include_time true 
      sasl_over_ssl true 
      username <yourusername> 
      password <yourpassword> 
      ssl_ca_certs_from_system true 
      # ruby-kafka producer options 
      max_send_retries 10000 
      required_acks 1 
      compression_codec gzip 
    </match>
    In the preceding sample code, the format of the data exported to Kafka is set to JSON. In addition to JSON, Fluentd also supports more than 10 formats. For more information, see Fluentd Formatter.
    If you use the JSON format, you can create a JSON type index for the content field. For more information, see JSON type. The following figure shows a sample log.
  • Example 5: Use Logstash to write data into Log Service

    Logstash is an open-source engine for collecting data in real time. You can use Logstash to dynamically collect data from different sources, process the data (for example, filter or convert the data), and export the results to a target directory. You can analyze the data further based on the output results.

    You can use the built-in Kafka output plug-in of Logstash to allow Logstash to write data into Log Service. However, you must configure the SSL certificate and the JASS file because Kafka in Log Service uses the SASL_SSL connection protocol.
    1. Create a JASS file and save it to a target directory, for example, /etc/kafka/kafka_client_jaas.conf.
      KafkaClient { 
        org.apache.kafka.common.security.plain.PlainLoginModule required 
        username="<yourusername>" 
        password="<yourpassword>"; 
      };
    2. Set the SSL certificate and save it to a target directory, for example, /etc/kafka/client-root.truststore.jks.
      Each domain name in Log Service has a CA certificate. Therefore, you only need to download the root certificate GlobalSign Root CA, encode the certificate in Base64, and save it to a target directory, for example, /etc/kafka/ca-root. Then, run a keytool command to generate a JKS file. You must set a password when a JKS file is generated for the first time.
      keytool -keystore client.truststore.jks -alias root -import -file /etc/kafka/ca-root
    3. Configure the Logstash. The sample code is as follows:
      input { stdin { } } 
      output { 
        stdout { codec => rubydebug } 
        kafka { 
          topic_id => "test-logstore-1" 
          bootstrap_servers => "cn-hangzhou.log.aliyuncs.com:10012" 
          security_protocol => "SASL_SSL" 
          ssl_truststore_location => "/etc/client-root.truststore.jks" 
          ssl_truststore_password => "123456" 
          jaas_path => "/etc/kafka_client_jaas.conf" 
          sasl_mechanism => "PLAIN" 
          codec => "json" 
          client_id => "kafka-logstash" 
        } 
      }
      Note The preceding example describes the configuration of a connectivity test. However, in a production environment, we recommend that you remove the stdout output configuration.
      In the preceding example, the format of the data exported to Kafka is set to JSON. In addition to JSON, Logstash supports more than 10 formats. For more information, see Logstash Codec plug-ins.
      If you use the JSON format, you can create a JSON type index for the content field. For more information, see JSON type. The following figure shows a sample log.