This topic describes how to use Aliyun Log Java Producer to write data to Log Service.

Prerequisites

Log Service SDK for Java is installed. For more information, see Install Log Service SDK for Java.

Background information

Aliyun Log Java Producer is a high-performance class library for Java applications that run in big data and high concurrency scenarios. Compared with the Log Service API and Log Service SDK for Java, Aliyun Log Java Producer provides multiple advantages in log writing, such as high performance, Logic isolation between computing and I/O, and resource management.

Features

  • Thread safety: All methods exposed by the Producer API are thread-safe.
  • Asynchronous transmission: In most cases, a response is quickly returned for a Producer API request. Aliyun Log Java Producer caches and merges the data to be sent and then sends the data in batches to improve throughput.
  • Automatic retries: A producer retries requests based on the specified maximum number of retries and backoff time.
  • Behavior tracing: You can check whether the current data is sent or obtain the information about delivery retries based on callbacks or futures. This helps you trace issues and make subsequent decisions.
  • Contextual query: The logs that are generated by a producer are in the same context. You can view the logs that are generated before and after a specified log on the server.
  • Graceful shutdown: All data that is cached by a producer can be released and you can receive notifications after you use the close method to exit your program.

Benefits

  • High performance

    If you need to write large amounts of data to Log Service by using limited resources, you must first implement complex control logic and configure multiple processes, cache policies, and batch sending to meet throughput requirements. In addition, you must also consider situations in which retries are performed if data fails to be written. Aliyun Log Java Producer meets the preceding requirements with high performance and simplifies the development process.

  • Asynchronous mode

    If the available memory is sufficient, Aliyun Log Java Producer caches the data that is sent to Logstores. If the send method is called, Aliyun Log Java Producer can immediately return a response without blocking. This way, computing and I/O logic are separated from each other. Then, you can obtain the result of data sending based on the returned future object or the callback parameter.

  • Resource management

    You can configure parameters to manage the memory size of a producer to cache the data to be sent. You can also specify the number of threads that are used to send data. This prevents the producer from limitlessly consuming resources and allows you to balance resource consumption and write throughput based on your actual scenario.

Install Aliyun Log Java Producer

To use the Aliyun Log Java Producer in a Maven project, add related dependencies to pom.xml. Maven automatically downloads the related JAR packages. For example, add the following content to <dependencies>:
<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>aliyun-log-producer</artifactId>
    <version>0.3.10</version>
</dependency>
jar-with-dependency can resolve version conflicts among the producer dependencies. Add the following content to <dependencies>:
<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>aliyun-log</artifactId>
    <version>0.6.35</version>
  <classifier>jar-with-dependencies</classifier>
</dependency>

Sample code in Java

After you install Aliyun Cloud Log Java Producer, you can use the library to write Java code.

  • Callback
    In this example, a file named SampleProducerWithCallback.java is created and the generated log data is uploaded to Log Service.
    • Sample code
      Note The first time you run the code, you must enable the indexing feature for your Logstore in the Log Service console. Then, wait for 1 minute before you perform a query.
      import com.aliyun.openservices.aliyun.log.producer.Callback;
      import com.aliyun.openservices.aliyun.log.producer.LogProducer;
      import com.aliyun.openservices.aliyun.log.producer.Producer;
      import com.aliyun.openservices.aliyun.log.producer.ProducerConfig;
      import com.aliyun.openservices.aliyun.log.producer.ProjectConfig;
      import com.aliyun.openservices.aliyun.log.producer.Result;
      import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
      import com.aliyun.openservices.log.common.LogItem;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      import java.util.concurrent.CountDownLatch;
      import java.util.concurrent.ExecutorService;
      import java.util.concurrent.Executors;
      import java.util.concurrent.atomic.AtomicLong;
      
      public class SampleProducerWithCallback {
      
          private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducerWithCallback.class);
      
          private static final ExecutorService threadPool = Executors.newFixedThreadPool(10);
      
          public static void main(String[] args) throws InterruptedException {
              final String project = "example-project";
              final String logstore = "example-logstore";
              String endpoint = "example-endpoint";
              String accessKeyId = "your-accesskey-id";
              String accessKeySecret = "your-access-key-secret";
      
              ProducerConfig producerConfig = new ProducerConfig();
              final Producer producer = new LogProducer(producerConfig);
              producer.putProjectConfig(new ProjectConfig(project, endpoint, accessKeyId, accessKeySecret));
      
              int nTask = 100;
              // The number of logs that have finished (either successfully send, or failed).
              final AtomicLong completed = new AtomicLong(0);
              final CountDownLatch latch = new CountDownLatch(nTask);
      
              for (int i = 0; i < nTask; ++i) {
                  threadPool.submit(
                          new Runnable() {
                              @Override
                              public void run() {
                                  LogItem logItem = new LogItem();
                                  logItem.PushBack("key1", "foo");
                                  logItem.PushBack("key2", "bar");
                                  try {
                                      producer.send(
                                              project,
                                              logstore,
                                              "your-topic",
                                              "your-source",
                                              logItem,
                                              new SampleCallback(project, logstore, logItem, completed));
                                  } catch (InterruptedException e) {
                                      LOGGER.warn("The current thread has been interrupted during send logs.");
                                  } catch (Exception e) {
                                      LOGGER.error("Failed to send log, logItem={}, e=", logItem, e);
                                  } finally {
                                      latch.countDown();
                                  }
                              }
                          });
              }
      
              // The following logic must be considered only if the process exits. 
              latch.await();
              threadPool.shutdown();
              try {
                  producer.close();
              } catch (InterruptedException e) {
                  LOGGER.warn("The current thread has been interrupted from close.");
              } catch (ProducerException e) {
                  LOGGER.info("Failed to close producer, e=", e);
              }
      
              LOGGER.info("All log complete, completed={}", completed.get());
          }
      
          private static final class SampleCallback implements Callback {
              private static final Logger LOGGER = LoggerFactory.getLogger(SampleCallback.class);
              private final String project;
              private final String logStore;
              private final LogItem logItem;
              private final AtomicLong completed;
      
              SampleCallback(String project, String logStore, LogItem logItem, AtomicLong completed) {
                  this.project = project;
                  this.logStore = logStore;
                  this.logItem = logItem;
                  this.completed = completed;
              }
      
              @Override
              public void onCompletion(Result result) {
                  try {
                      if (result.isSuccessful()) {
                          LOGGER.info("Send log successfully.");
                      } else {
                          LOGGER.error(
                                  "Failed to send log, project={}, logStore={}, logItem={}, result={}",
                                  project,
                                  logStore,
                                  logItem.ToJsonString(),
                                  result);
                      }
                  } finally {
                      completed.getAndIncrement();
                  }
              }
          }
      }
    • Expected result
      {"__source__":"your-source","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658715477","__time__":"1658715473","__topic__":"your-topic","key1":"foo","key2":"bar"}
      {"__source__":"your-source","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658715477","__time__":"1658715473","__topic__":"your-topic","key1":"foo","key2":"bar"}
      {"__source__":"your-source","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658715477","__time__":"1658715473","__topic__":"your-topic","key1":"foo","key2":"bar"}
      {"__source__":"your-source","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658715477","__time__":"1658715473","__topic__":"your-topic","key1":"foo","key2":"bar"}
      ......

    For more information, see Aliyun Log Java Producer.

  • Future
    In this example, a file named SampleProducerWithFuture.java is created and the generated log data is uploaded to Log Service.
    • Sample code
      Note The first time you run the code, you must enable the indexing feature for your Logstore in the Log Service console. Then, wait for 1 minute before you perform a query.
      import com.aliyun.openservices.aliyun.log.producer.LogProducer;
      import com.aliyun.openservices.aliyun.log.producer.Producer;
      import com.aliyun.openservices.aliyun.log.producer.ProducerConfig;
      import com.aliyun.openservices.aliyun.log.producer.ProjectConfig;
      import com.aliyun.openservices.aliyun.log.producer.Result;
      import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
      import com.aliyun.openservices.aliyun.log.producer.errors.ResultFailedException;
      import com.aliyun.openservices.log.common.LogItem;
      import com.google.common.util.concurrent.FutureCallback;
      import com.google.common.util.concurrent.Futures;
      import com.google.common.util.concurrent.ListenableFuture;
      import org.checkerframework.checker.nullness.qual.Nullable;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      import java.util.ArrayList;
      import java.util.List;
      import java.util.concurrent.ExecutorService;
      import java.util.concurrent.Executors;
      import java.util.concurrent.TimeUnit;
      import java.util.concurrent.atomic.AtomicLong;
      
      public class SampleProducerWithFuture {
      
          private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducerWithFuture.class);
      
          private static final ExecutorService threadPool = Executors
                  .newFixedThreadPool(Math.max(Runtime.getRuntime().availableProcessors(), 1));
      
          public static void main(String[] args) throws InterruptedException {
              final String project = "example-project";
              final String logstore = "example-logstore";
              String endpoint = "example-endpoint";
              String accessKeyId = "your-accesskey-id";
              String accessKeySecret = "your-access-key-secret";
      
              ProducerConfig producerConfig = new ProducerConfig();
              final Producer producer = new LogProducer(producerConfig);
              producer.putProjectConfig(new ProjectConfig(project, endpoint, accessKeyId, accessKeySecret));
      
              int n = 100;
              // The number of logs that have finished (either successfully send, or failed).
              final AtomicLong completed = new AtomicLong(0);
      
              for (int i = 0; i < n; ++i) {
                  List<LogItem> logItems = new ArrayList<LogItem>();
                  for (int j = 0; j < 10; ++j) {
                      LogItem logItem = new LogItem();
                      logItem.PushBack("key1", "foo" + j);
                      logItem.PushBack("key2", "bar" + j);
                      logItems.add(logItem);
                  }
                  try {
                      ListenableFuture<Result> f = producer.send(project, logstore, logItems);
                      Futures.addCallback(
                              f, new SampleFutureCallback(project, logstore, logItems, completed), threadPool);
                  } catch (InterruptedException e) {
                      LOGGER.warn("The current thread has been interrupted during send logs.");
                  } catch (Exception e) {
                      LOGGER.error("Failed to send logs, e=", e);
                  }
              }
      
              try {
                  producer.close();
              } catch (InterruptedException e) {
                  LOGGER.warn("The current thread has been interrupted from close.");
              } catch (ProducerException e) {
                  LOGGER.info("Failed to close producer, e=", e);
              }
      
              threadPool.shutdown();
              while (!threadPool.isTerminated()) {
                  threadPool.awaitTermination(100, TimeUnit.MILLISECONDS);
              }
              LOGGER.info("All log complete, completed={}", completed.get());
          }
      
          private static final class SampleFutureCallback implements FutureCallback<Result> {
      
              private static final Logger LOGGER = LoggerFactory.getLogger(SampleFutureCallback.class);
      
              private final String project;
              private final String logStore;
              private final List<LogItem> logItems;
              private final AtomicLong completed;
      
              SampleFutureCallback(
                      String project, String logStore, List<LogItem> logItems, AtomicLong completed) {
                  this.project = project;
                  this.logStore = logStore;
                  this.logItems = logItems;
                  this.completed = completed;
              }
      
              @Override
              public void onSuccess(@Nullable Result result) {
                  LOGGER.info("Send logs successfully.");
                  completed.getAndIncrement();
              }
      
              @Override
              public void onFailure(Throwable t) {
                  if (t instanceof ResultFailedException) {
                      Result result = ((ResultFailedException) t).getResult();
                      LOGGER.error(
                              "Failed to send logs, project={}, logStore={}, result={}", project, logStore, result);
                  } else {
                      LOGGER.error("Failed to send log, e=", t);
                  }
                  completed.getAndIncrement();
              }
          }
      }
    • Expected result
      {"__source__":"172.16.4.254","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658716454","__time__":"1658716450","__topic__":"","key1":"foo0","key2":"bar0"}
      {"__source__":"172.16.4.254","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658716454","__time__":"1658716450","__topic__":"","key1":"foo1","key2":"bar1"}
      {"__source__":"172.16.4.254","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658716454","__time__":"1658716450","__topic__":"","key1":"foo2","key2":"bar2"}
      {"__source__":"172.16.4.254","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658716454","__time__":"1658716450","__topic__":"","key1":"foo3","key2":"bar3"}
      ......

For more information, see Aliyun Log Java Producer.

In addition, Log Service provides sample applications that are specific to Aliyun Log Java Producer to help you get started. For more information, see Aliyun Log Producer Sample Application.

FAQ

If no data is written to Log Service, perform the following steps to troubleshoot the issue:
  1. Check whether the versions of the aliyun-log-producer, aliyun-log, and protobuf-java JAR packages introduced in your project are the same as the packages listed in the Install Aliyun Log Java Producer section of this topic. If not, upgrade the JAR packages.
  2. If you use the send method of the Producer API to asynchronously send data, you can identify the cause of the data sending failure by calling the callback API or based on the returned future object.
  3. If you do not use the onCompletion method of the callback API, check whether the producer.close() method is called before your program exits. Data is asynchronously sent by backend threads. Therefore, you must call the producer.close() method before the program exits to prevent the data that is cached in the memory from being lost.
  4. The Producer API returns key behavior during runtime by using the log framework Simple Logging Facade for Java (SLF4J). You can configure the corresponding logs to implement the framework in the program and open DEBUG-level logs. Check whether ERROR-level logs are returned.
  5. If the issue persists, submit a ticket.