全部產品
Search
文件中心

Simple Log Service:使用Aliyun Log Java Producer寫入日誌資料

更新時間:Nov 19, 2025

如果您在使用FlinkSparkStorm等巨量資料計算引擎時,需要將日誌進行壓縮、批量上傳日誌到Log Service、減少網路傳輸資源的佔用,API或者SDK往往無法滿足巨量資料情境對資料寫入能力的要求,您可以使用Aliyun Log Java Producer,便捷高效地將資料上傳到Log Service。

前提條件

什麼是Aliyun Log Java Producer

Aliyun Log Java Producer是為運行在巨量資料、高並發情境下的Java應用量身打造的高效能類庫。相對於原始的API或SDK,使用該類庫寫日誌資料能為您帶來諸多優勢,包括高效能、計算與I/O邏輯分離、資源可控制等。Aliyun LOG Java Producer使用阿里雲Log Service提供的順序寫入功能來保證日誌的上傳順序。

Log Service提供基於Aliyun Log Java Producer的範例應用程式,便於您快速上手。更多資訊,請參見Aliyun Log Producer Sample Application

工作流程

特點

  • 安全執行緒:Producer介面暴露的所有方法都是安全執行緒的。

  • 非同步發送:調用Producer的發送介面通常能夠立即返迴響應。Producer內部會緩衝併合並待發送資料,然後批量發送以提高輸送量。

  • 自動重試:Producer會根據配置的最大重試次數和重試退避時間進行重試。

  • 行為追溯:通過Callback或Future能擷取當前資料是否發送成功的資訊,也可以獲得該資料每次被嘗試發送的資訊,有利於問題追溯和行為決策。

  • 上下文還原:同一個Producer執行個體產生的日誌在同一上下文中,在服務端可以查看某條日誌前後相關的日誌。

  • 優雅關閉:保證close方法退出時,Producer緩衝的所有資料都能被處理,同時您也能得到相應的通知。

應用情境

producer對比原始的API或SDK的優勢如下:

  • 高效能

    在海量資料、資源有限的前提下,寫入端要達到目標輸送量需要實現複雜的控制邏輯,包括多線程、緩衝策略、批量發送等,另外還要充分考慮失敗重試的情境。Producer實現了上述功能,在為您帶來效能優勢的同時簡化了程式開發步驟。

  • 非同步非阻塞

    在可用記憶體充足的前提下,Producer會對發往日誌庫的資料進行緩衝,因此調用send方法時能夠立即返迴響應且不會阻塞,可達到計算與I/O邏輯分離的目的。隨後,您可以通過返回的Future對象或傳入的Callback獲得資料發送的結果。

  • 資源可控制

    可以通過參數控制Producer用於緩衝待發送資料的記憶體大小,同時還可以配置用於執行資料發送任務的線程數量。這樣可避免Producer無限制地消耗資源,且可以讓您根據實際情況平衡資源消耗和寫入輸送量。

  • 定位問題簡單

    如果日誌資料發送失敗,Producer除了返回狀態代碼,還會返回一個String類型的異常資訊,用於描述失敗的原因和詳細資料。例如,如果發送失敗是因為網路連接逾時,則返回的異常資訊可能是“連線逾時”;如果發送失敗是因為伺服器無響應,則返回的異常資訊可能是“伺服器無響應”。

使用限制

  • aliyun-log-producer底層調用PutLogs介面上傳日誌,每次可以寫入的原始日誌大小存在限制。更多資訊,請參見資料讀寫

  • Log Service的基礎資源,包括建立Project個數、Logstore個數、Shard個數、LogtailConfig個數、機器組個數、單個LogItem大小、LogItem(Key)長度和LogItem(Value)長度等均存在限制。更多資訊,請參見基礎資源

  • 代碼首次運行後,請在Log Service控制台開啟日誌庫索引,等待一分鐘後,進行查詢。

  • 在控制台進行日誌查詢時,當單個欄位值長度超過最大長度時,超出部分被截斷,不參與分析。更多資訊,請參考建立索引

費用說明

使用SDK產生的費用和使用控制台產生的費用一致。更多資訊,請參見計費概述

步驟一:安裝Aliyun Log Java Producer

在Maven工程中使用Log ServiceAliyun Log Java Producer,只需在pom.xml中加入相應依賴。Maven專案管理工具會自動下載相關JAR包。例如,在<dependencies>中加入如下內容:

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>aliyun-log-producer</artifactId>
    <version>0.3.22</version>
</dependency>

添加更新後,如果提示Producer依賴的版本衝突,在<dependencies>中加入如下內容:

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>aliyun-log</artifactId>
    <version>0.6.114</version>
  <classifier>jar-with-dependencies</classifier>
</dependency>

步驟二:配置ProducerConfig

ProducerConfig用於配置發送策略,您可以根據不同的業務情境為參數指定不同的值,各參數含義如下表所示:

Config producerConfig = new ProducerConfig();
producerConfig.setTotalSizeInBytes(104857600);

參數

類型

描述

totalSizeInBytes

整型

單個Producer執行個體能緩衝的日誌大小上限,預設為 100MB。

maxBlockMs

整型

如果Producer可用空間不足,調用者在send方法上的最大阻塞時間,預設為60秒。

如果超過這個時間後所需空間仍無法得到滿足,send方法會拋出TimeoutException

如果將該值設為0,當所需空間無法得到滿足時,send 方法會立即拋出 TimeoutException。

如果您希望send方法一直阻塞直到所需空間得到滿足,可將該值設為負數。

ioThreadCount

整型

執行日誌發送任務的線程池大小,預設為可用處理器個數。

batchSizeThresholdInBytes

整型

當一個ProducerBatch中緩衝的日誌大小大於等於 batchSizeThresholdInBytes 時,該batch將被發送,預設為512KB,最大可設定成 5MB。

batchCountThreshold

整型

當一個ProducerBatch中緩衝的日誌條數大於等於 batchCountThreshold時,該batch將被發送,預設4096,最大可設定成40960。

lingerMs

整型

一個ProducerBatch從建立到可發送的逗留時間,預設為2秒,最小可設定成100毫秒。

retries

整型

如果某個ProducerBatch首次發送失敗,能夠對其重試的次數,預設為10次。

如果retries小於等於 0,該ProducerBatch首次發送失敗後將直接進入失敗隊列。

maxReservedAttempts

整型

每個ProducerBatch每次被嘗試發送都對應著一個Attempt,此參數用來控制返回給使用者的attempt個數,預設只保留最近的11次attempt資訊。

該參數越大能讓您追溯更多的資訊,但同時也會消耗更多的記憶體。

baseRetryBackoffMs

整型

首次重試的退避時間,預設為100毫秒。

Producer採樣指數退避演算法,第N次重試的計劃等待時間為 baseRetryBackoffMs * 2^(N-1)。

maxRetryBackoffMs

整型

重試的最大退避時間,預設為50秒。

adjustShardHash

布爾

如果調用send方法時指定了 shardHash,該參數用於控制是否需要對其進行調整,預設為true。

buckets

整型

若且唯若adjustShardHash為true時,該參數才生效。此時,producer會自動將shardHash重新分組,分組數量為buckets。

如果兩條資料的shardHash不同,它們是無法合并到一起發送的,會降低producer輸送量。將shardHash重新分組後,能讓資料有更多地機會被批量發送。

該參數的取值範圍是 [1, 256],且必須是2的整數次冪,預設為64。

步驟三:建立Producer

Producer 支援使用者配置AK或STS token。如果使用STS token,需要定期建立新的ProjectConfig然後將其添加到ProjectConfigs裡。

LogProducer是介面Producer的實作類別,它接收唯一的參數producerConfig。當您準備好producerConfig後,可以按照下列方式建立producer執行個體。

Producer producer = new LogProducer(producerConfig);

建立producer的同時會建立一系列線程,這是一個相對昂貴的操作,因此建議一個應用共用一個producer執行個體。一個producer執行個體包含的線程如下表所示,其中N為該producer執行個體在當前進程中的編號,從 0 開始。另外,LogProducer提供的所有方法都是安全執行緒的,可以在多線程環境下安全執行。

線程名格式

數量

描述

aliyun-log-producer-<N>-mover

1

負責將滿足發送條件的batch投遞到發送線程池裡。

aliyun-log-producer-<N>-io-thread

ioThreadCount

IOThreadPool中真正用於執行資料發送任務的線程。

aliyun-log-producer-<N>-success-batch-handler

1

用於處理髮送成功的batch。

aliyun-log-producer-<N>-failure-batch-handler

1

用於處理髮送失敗的batch。

步驟四:配置記錄項目

ProjectConfig包含目標Project的服務入口資訊以及表徵調用者身份的訪問憑證。每個記錄項目對應一個ProjectConfig對象。

可以按照如下方式建立執行個體。

ProjectConfig project1 = new ProjectConfig("your-project-1", "cn-hangzhou.log.aliyuncs.com", "accessKeyId", "accessKeySecret");
ProjectConfig project2 = new ProjectConfig("your-project-2", "cn-shanghai.log.aliyuncs.com", "accessKeyId", "accessKeySecret");
producer.putProject(project1);
producer.putProject(project2);

步驟五:發送資料

建立Future或Callback

在使用Aliyun Log Java Producer發送日誌資料時,需要指定一個回呼函數來處理髮送過程中的各種情況。當日誌資料發送成功時,回呼函數會被調用,並返回一個發送結果;當日誌資料發送失敗時,回呼函數也會被調用,並傳入一個異常對象。

說明

如果擷取結果後,應用的處理邏輯比較簡單且不會造成producer阻塞,建議直接使用callback。否則,建議使用ListenableFuture,在單獨的線程(池)中執行後續業務

方法的各個參數含義如下:

參數

描述

project

待發送資料的目標 project。

logstore

待發送資料的目標 logStore。

logTem

待發送資料。

completed

Java提供的一個原子類型,用來確保所有日誌發送完成(成功或者失敗)。

發送資料

Producer介面提供多種發送方法,方法的各個參數含義如下。

參數

描述

是否必選

project

目標Project。

logStore

目標LogStore。

logItem

要發送的日誌/日誌列表。

topic

日誌主題

說明

如果留空或沒有指定,該欄位將被賦予""。

source

發送源。

說明

如果留空或沒有指定,該欄位將被賦予producer所在宿主機的 IP。

shardHash

可為發送的日誌設定自訂雜湊,服務端將根據此雜湊選擇對應的日誌庫Shard分區寫入日誌。

說明

如果留空或沒有指定,資料將被隨機寫入目標LogStore的某個shard中。

callback

可設定一個回呼函數。該回呼函數將在日誌被成功發送或者重試多次失敗後被丟棄時調用。

常見異常

異常

說明

TimeoutException

當Producer緩衝的日誌大小超過設定的記憶體上限時,且阻塞maxBlockMs毫秒後仍未擷取到足夠記憶體時,將拋出TimeoutException。

maxBlockMs 為-1時,阻塞沒有時間上限,將永遠不會拋出 TimeoutException。

IllegalStateException

當Producer已經處於關閉狀態(調用過close方法)時,再調用send 方法,會拋出IllegalStateException。

步驟六:擷取發送資料

由於producer提供的所有發送方法都是非同步,需要通過返回的future或者傳入的callback擷取發送結果。

Future

Send 方法會返回一個ListenableFuture,它除了可以像普通future那樣通過調用get方法阻塞獲得發送結果外,還允許你註冊回調方法(回調方法會在完成 future 設定後被調用)。以下程式碼片段展示了ListenableFuture的使用方法,使用者需要為該future註冊一個FutureCallback並將其投遞到應用提供的線程池EXECUTOR_SERVICE中執行,完整範例請參見SampleProducerWithFuture.java

package com.aliyun.openservices.aliyun.log.producer.sample;

import com.aliyun.openservices.aliyun.log.producer.*;
import com.aliyun.openservices.aliyun.log.producer.errors.LogSizeTooLargeException;
import com.aliyun.openservices.aliyun.log.producer.errors.MaxBatchCountExceedException;
import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
import com.aliyun.openservices.aliyun.log.producer.errors.ResultFailedException;
import com.aliyun.openservices.aliyun.log.producer.errors.TimeoutException;
import com.aliyun.openservices.log.common.LogItem;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SampleProducerWithFuture {

  private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducerWithFuture.class);

  private static final ExecutorService EXECUTOR_SERVICE = Executors
      .newFixedThreadPool(Math.max(Runtime.getRuntime().availableProcessors(), 1));

  public static void main(String[] args) throws InterruptedException {
    Producer producer = Utils.createProducer();
    int n = 100;
    int size = 20;

    // The number of logs that have finished (either successfully send, or failed)
    final AtomicLong completed = new AtomicLong(0);

    for (int i = 0; i < n; ++i) {
      List<LogItem> logItems = Utils.generateLogItems(size);
      try {
        String project = System.getenv("PROJECT");
        String logStore = System.getenv("LOG_STORE");
        ListenableFuture<Result> f = producer.send(project, logStore, logItems);
        Futures.addCallback(
            f, new SampleFutureCallback(project, logStore, logItems, completed), EXECUTOR_SERVICE);
      } catch (InterruptedException e) {
        LOGGER.warn("The current thread has been interrupted during send logs.");
      } catch (Exception e) {
        if (e instanceof MaxBatchCountExceedException) {
          LOGGER.error("The logs exceeds the maximum batch count, e={}", e);
        } else if (e instanceof LogSizeTooLargeException) {
          LOGGER.error("The size of log is larger than the maximum allowable size, e={}", e);
        } else if (e instanceof TimeoutException) {
          LOGGER.error("The time taken for allocating memory for the logs has surpassed., e={}", e);
        } else {
          LOGGER.error("Failed to send logs, e=", e);
        }
      }
    }

    Utils.doSomething();

    try {
      producer.close();
    } catch (InterruptedException e) {
      LOGGER.warn("The current thread has been interrupted from close.");
    } catch (ProducerException e) {
      LOGGER.info("Failed to close producer, e=", e);
    }

    EXECUTOR_SERVICE.shutdown();
    while (!EXECUTOR_SERVICE.isTerminated()) {
      EXECUTOR_SERVICE.awaitTermination(100, TimeUnit.MILLISECONDS);
    }
    LOGGER.info("All log complete, completed={}", completed.get());
  }

  private static final class SampleFutureCallback implements FutureCallback<Result> {

    private static final Logger LOGGER = LoggerFactory.getLogger(SampleFutureCallback.class);

    private final String project;

    private final String logStore;

    private final List<LogItem> logItems;

    private final AtomicLong completed;

    SampleFutureCallback(
        String project, String logStore, List<LogItem> logItems, AtomicLong completed) {
      this.project = project;
      this.logStore = logStore;
      this.logItems = logItems;
      this.completed = completed;
    }

    @Override
    public void onSuccess(@Nullable Result result) {
      LOGGER.info("Send logs successfully.");
      completed.getAndIncrement();
    }

    @Override
    public void onFailure(Throwable t) {
      if (t instanceof ResultFailedException) {
        Result result = ((ResultFailedException) t).getResult();
        LOGGER.error(
            "Failed to send logs, project={}, logStore={}, result={}", project, logStore, result);
      } else {
        LOGGER.error("Failed to send log, e=", t);
      }
      completed.getAndIncrement();
    }
  }
}

Callback

Callback由producer內部線程負責執行,並且只有在執行完畢後資料“佔用”的空間才會釋放。為了不阻塞producer造成整體輸送量的下降,要避免在callback裡執行耗時的操作。另外,在callback中調用send方法進行重試也是不建議的,您可以在ListenableFuture的callback中進行重試。完整範例請參見SampleProducerWithCallback.java

package com.aliyun.openservices.aliyun.log.producer.sample;

import com.aliyun.openservices.aliyun.log.producer.Callback;
import com.aliyun.openservices.aliyun.log.producer.Producer;
import com.aliyun.openservices.aliyun.log.producer.Result;
import com.aliyun.openservices.aliyun.log.producer.errors.LogSizeTooLargeException;
import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
import com.aliyun.openservices.aliyun.log.producer.errors.TimeoutException;
import com.aliyun.openservices.log.common.LogItem;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SampleProducerWithCallback {

  private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducerWithCallback.class);

  private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(10);

  public static void main(String[] args) throws InterruptedException {
    final Producer producer = Utils.createProducer();

    int nTask = 100;

    // The monotonically increasing sequence number we will put in the data of each log
    final AtomicLong sequenceNumber = new AtomicLong(0);

    // The number of logs that have finished (either successfully send, or failed)
    final AtomicLong completed = new AtomicLong(0);

    final CountDownLatch latch = new CountDownLatch(nTask);

    for (int i = 0; i < nTask; ++i) {
      EXECUTOR_SERVICE.submit(
          new Runnable() {
            @Override
            public void run() {
              LogItem logItem = Utils.generateLogItem(sequenceNumber.getAndIncrement());
              try {
                String project = System.getenv("PROJECT");
                String logStore = System.getenv("LOG_STORE");
                producer.send(
                    project,
                    logStore,
                    Utils.getTopic(),
                    Utils.getSource(),
                    logItem,
                    new SampleCallback(project, logStore, logItem, completed));
              } catch (InterruptedException e) {
                LOGGER.warn("The current thread has been interrupted during send logs.");
              } catch (Exception e) {
                if (e instanceof LogSizeTooLargeException) {
                  LOGGER.error(
                      "The size of log is larger than the maximum allowable size, e={}", e);
                } else if (e instanceof TimeoutException) {
                  LOGGER.error(
                      "The time taken for allocating memory for the logs has surpassed., e={}", e);
                } else {
                  LOGGER.error("Failed to send log, logItem={}, e=", logItem, e);
                }
              } finally {
                latch.countDown();
              }
            }
          });
    }
    latch.await();
    EXECUTOR_SERVICE.shutdown();

    Utils.doSomething();

    try {
      producer.close();
    } catch (InterruptedException e) {
      LOGGER.warn("The current thread has been interrupted from close.");
    } catch (ProducerException e) {
      LOGGER.info("Failed to close producer, e=", e);
    }

    LOGGER.info("All log complete, completed={}", completed.get());
  }

  private static final class SampleCallback implements Callback {

    private static final Logger LOGGER = LoggerFactory.getLogger(SampleCallback.class);

    private final String project;

    private final String logStore;

    private final LogItem logItem;

    private final AtomicLong completed;

    SampleCallback(String project, String logStore, LogItem logItem, AtomicLong completed) {
      this.project = project;
      this.logStore = logStore;
      this.logItem = logItem;
      this.completed = completed;
    }

    @Override
    public void onCompletion(Result result) {
      try {
        if (result.isSuccessful()) {
          LOGGER.info("Send log successfully.");
        } else {
          LOGGER.error(
              "Failed to send log, project={}, logStore={}, logItem={}, result={}",
              project,
              logStore,
              logItem.ToJsonString(),
              result);
        }
      } finally {
        completed.getAndIncrement();
      }
    }
  }
}

步驟七:關閉Producer

當您已經沒有資料需要發送或者當前進程準備退出時,需要關閉Producer,目的是讓Producer中緩衝的資料全部被處理。目前,Producer提供安全關閉和有限關閉兩種模式。

安全關閉

在大多數情況下,建議您使用安全關閉。安全關閉對應的方法是close(),它會等到Producer中緩衝的資料全部被處理、線程全部停止、註冊的callback全部執行,返回future全部被設定後才會返回。

雖然要等到資料全部處理完成,但Producer被關閉後,緩衝的batch會被立刻處理且不會被重試。因此,如果callback不被阻塞,close方法往往能在很短的時間內返回。

有限關閉

如果您的callback在執行過程中有可能阻塞,但您又希望close方法能在短時間內返回,可以使用有限關閉。有限關閉對應的方法是close(long timeoutMs),如果超過指定的timeoutMs後Producer仍未完全關閉,它會拋出IllegalStateException異常,這意味著緩衝的資料可能還沒來得及處理就被丟棄,使用者註冊的Callback也可能不會被執行。

常見問題

寫入資料次數是否存在限制?

  • Log Service讀寫資料的次數和大小均存在限制。更多資訊,請參見資料讀寫

  • Log Service的基礎資源,包括建立Project個數、Logstore個數、Shard個數、LogtailConfig個數、機器組個數、單個LogItem大小、LogItem(Key)長度和LogItem(Value)長度等均存在限制。更多資訊,請參見基礎資源

為什麼資料沒有寫入Log Service?

如果您探索資料沒有寫入Log Service,可通過如下步驟診斷問題。

  1. 檢查您專案中引入的aliyun-log-produceraliyun-logprotobuf-java Jar包的版本是否和文檔中安裝部分列出的Jar包版本一致,如果不一致請進行升級。

  2. Producer介面的send方法非同步發送資料,無法及時擷取返回的值。請通過Callback介面或返回的Future對象擷取資料發送失敗的原因。

  3. 如果您發現並沒有回調Callback介面的onCompletion方法,請檢查在您的程式退出之前是否有調用producer.close()方法。因為資料發送是由後台線程非同步完成的,為了防止緩衝在記憶體裡的少量資料丟失,請務必在程式退出之前調用producer.close()方法。

  4. Producer介面會把運行過程中的關鍵行為通過日誌架構slf4j進行輸出,您可以在程式中配置好相應的日誌實現架構並開啟DEBUG層級的日誌。重點檢查是否輸出ERROR層級的日誌。

  5. 如果通過上述步驟仍然沒有解決,請提工單

相關文檔