すべてのプロダクト
Search
ドキュメントセンター

Simple Log Service:Aliyun Log Java Producer を使用した Simple Log Service へのログデータ書き込み

最終更新日:Dec 24, 2025

FlinkSparkStorm などのビッグデータコンピュートエンジンでは、ログの圧縮、Simple Log Service (SLS) へのバッチアップロード、ネットワークリソース消費の削減が求められますが、API や SDK だけでは不十分な場合があります。このようなシナリオでは、Aliyun Log Java Producer が SLS へのデータアップロードに便利で効率的なソリューションを提供します。

前提条件

  • Simple Log Service が有効化されていること。

  • Simple Log Service SDK for Python が初期化されていること。

Aliyun Log Java Producer の概要

Aliyun Log Java Producer は、ビッグデータおよび高同時実行性環境の Java アプリケーション向けに設計された高性能クラスライブラリです。標準の API や SDK と比較して、高性能、コンピューティングロジックと I/O ロジックの分離、リソース管理など、いくつかのメリットがあります。Alibaba Cloud SLS のシーケンシャル書き込み機能を利用して、順序付けられたログアップロードを保証します。

SLS は、迅速な開始を容易にするために、Aliyun Log Java Producer を使用したサンプルアプリケーションを提供しています。詳細については、「Aliyun Log Producer サンプルアプリケーション」をご参照ください。

仕組み

特長

  • スレッドセーフ:Producer インターフェイスによって公開されるすべてのメソッドはスレッドセーフです。

  • 非同期送信:Producer の送信インターフェイスを呼び出すと、通常はすぐに応答が返されます。Producer は内部で保留中のデータをキャッシュしてマージし、スループットを向上させるためにバッチで送信します。

  • 自動リトライ:Producer は、設定された最大リトライ回数とバックオフ時間に基づいてリトライします。

  • 動作の追跡:コールバックまたは Future を通じて、現在のデータが正常に送信されたかどうかに関する情報と、各送信試行に関する情報を取得できます。これは、問題の追跡と意思決定に役立ちます。

  • コンテキストの復元:同じ Producer インスタンスによって生成されたログは同じコンテキスト内にあり、サーバー側で特定のログの前後の関連ログを表示できます。

  • グレースフルシャットダウン:close メソッドは、Producer によってキャッシュされたすべてのデータが終了前に処理されることを保証し、対応する通知を受け取ります。

メリット

プロデューサーは、従来の API や SDK と比較して、以下のメリットがあります。

  • 高性能

    大量のデータと限られたリソースで、書き込み側で目標のスループットを達成するには、マルチスレッド、キャッシュ戦略、バッチ送信、障害リトライシナリオの包括的な考慮など、複雑な制御ロジックが必要です。プロデューサーはこれらの機能を実装し、プログラム開発を簡素化すると同時にパフォーマンス上の利点を提供します。

  • 非同期ノンブロッキング

    利用可能なメモリが十分にある場合、プロデューサーは Logstore に送信されるデータをキャッシュするため、send メソッドはブロックされることなくすぐに戻り、コンピューティングロジックと I/O ロジックの分離を実現します。その後、返された Future オブジェクトまたは提供されたコールバックを通じてデータ転送の結果を取得します。

  • 制御可能なリソース

    パラメーターを通じてプロデューサーが保留中のデータをキャッシュするために使用するメモリサイズを制御し、データ送信タスクの実行に使用するスレッド数を設定します。これにより、プロデューサーが無限にリソースを消費するのを防ぎ、実際のリソース消費と書き込みスループットのバランスを取ることができます。

  • 簡単な問題特定

    ログデータの転送が失敗した場合、プロデューサーはステータスコードだけでなく、失敗の理由と詳細情報を記述した String 型の例外メッセージも返します。たとえば、ネットワーク接続のタイムアウトにより転送が失敗した場合、返される例外メッセージは「connection timeout」となる可能性があります。サーバーが応答しないために転送が失敗した場合、返される例外メッセージは「server unresponsive」となる可能性があります。

注意事項

  • aliyun-log-producer は、内部で PutLogs 操作を呼び出してログをアップロードしますが、毎回書き込める生ログのサイズには制限があります。詳細については、「データの読み書き」をご参照ください。

  • プロジェクト、Logstore、シャード、LogtailConfig、マシングループ、単一の LogItem サイズ、LogItem (キー) の長さ、LogItem (値) の長さなど、SLS の基本リソースにはすべて制限があります。詳細については、「基本リソースの制限」をご参照ください。

  • コードを初めて実行した後、SLS コンソールで Logstore のインデックス作成を有効にし、1 分待ってからクエリを実行します。

  • コンソールでログをクエリする際、単一のフィールド値の長さが最大長を超えると、超過部分は切り捨てられ、分析には含まれません。詳細については、「インデックスの作成」をご参照ください。

課金

SDK を使用して発生するコストは、コンソールを使用して発生するコストと同じです。詳細については、「課金概要」をご参照ください。

ステップ 1:Aliyun Log Java Producer のインストール

Maven プロジェクトで Aliyun Log Java Producer を使用するには、pom.xml ファイルに対応する依存関係を追加します。その後、Maven は関連する JAR パッケージを自動的にダウンロードします。たとえば、<dependencies> に以下を追加します。

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>aliyun-log-producer</artifactId>
    <version>0.3.22</version>
</dependency>

追加して更新した後、Producer の依存関係とのバージョン競合が報告された場合は、<dependencies> に以下を追加します。

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>aliyun-log</artifactId>
    <version>0.6.114</version>
  <classifier>jar-with-dependencies</classifier>
</dependency>

ステップ 2:ProducerConfig の設定

ProducerConfig は送信ポリシーを設定するために使用されます。さまざまなビジネスニーズに合わせてパラメーター値を調整します。パラメーターは次の表で説明されています。

Config producerConfig = new ProducerConfig();
producerConfig.setTotalSizeInBytes(104857600);

パラメーター

タイプ

説明

totalSizeInBytes

Integer

プロデューサーインスタンスがキャッシュできるログの最大サイズ。デフォルト値:100 MB。

maxBlockMs

Integer

send メソッドが呼び出されたときにプロデューサーインスタンスの利用可能なスペースが不足している場合の最大ブロッキング時間。デフォルト値:60 秒。

指定した最大ブロッキング時間が経過してもプロデューサーインスタンスの利用可能なスペースが不足している場合、send メソッドは TimeoutException エラーをスローします。

このパラメーターを 0 に設定し、プロデューサーインスタンスの利用可能なスペースが不足している場合、send メソッドはすぐに TimeoutException エラーをスローします。

プロデューサーインスタンスの利用可能なスペースが十分になるまで send メソッドをブロックしたい場合は、このパラメーターを負の値に設定する必要があります。

ioThreadCount

Integer

ログ送信タスク用のスレッド数。デフォルト値は、利用可能なプロセッサーの数です。

batchSizeThresholdInBytes

Integer

ログのバッチを送信するためのしきい値。デフォルト値:512 KB。最大値:5 MB。

batchCountThreshold

Integer

送信前のバッチ内のログ数。デフォルト値:4096。最大値:40960。

lingerMs

Integer

バッチが送信されるまでの遅延。デフォルト値:2 秒。最小値:100 ms。

retries

Integer

最初の失敗後のバッチのリトライ回数。デフォルト値:10。

このパラメーターが 0 以下に設定されている場合、バッチは最初の失敗後すぐに失敗キューに入ります。

maxReservedAttempts

Integer

ProducerBatch を送信する各試行は、アテンプトに対応します。このパラメーターは、ユーザーに報告されるアテンプトの数を制御し、デフォルトでは最新の 11 回のアテンプトのみを保持します。

このパラメーターを増やすと、より詳細な追跡が可能になりますが、メモリ消費量が増加します。

baseRetryBackoffMs

Integer

リトライの初期バックオフ時間。デフォルト値:100 ミリ秒。

プロデューサーは指数バックオフアルゴリズムを採用しており、N 回目のリトライ前の待機時間は baseRetryBackoffMs × 2^(N-1) として計算されます。

maxRetryBackoffMs

Integer

リトライの最大バックオフ時間。デフォルト値:50 秒。

adjustShardHash

Boolean

send メソッドが呼び出されたときに shardHash を調整するかどうかを決定します。デフォルト値:true。

buckets

Integer

このパラメーターは、adjustShardHash が true の場合に有効です。このパラメーターは、shardHash を指定された数のバケットに再グループ化します。

shardHash の値が異なると、データがマージおよびバッチ処理されなくなり、プロデューサーのスループットが制限されます。shardHash を再グループ化することで、データをより効果的にバッチ処理して転送できます。

このパラメーターの値は、[1, 256] の範囲内の 2 のべき乗の整数でなければなりません。デフォルト値:64。

ステップ 3:プロデューサーの作成

プロデューサーは、AK または STS トークンを使用した構成をサポートしています。STS トークンの場合、定期的に新しい ProjectConfig を作成し、ProjectConfigs に追加します。

LogProducer はプロデューサーの実装クラスであり、一意の producerConfig が必要です。producerConfig を準備した後、次のようにプロデューサーをインスタンス化します。

Producer producer = new LogProducer(producerConfig);

プロデューサーを作成すると、いくつかのスレッドが開始されますが、このプロセスはリソースを大量に消費します。アプリケーション全体でプロデューサーインスタンスを共有することを推奨します。LogProducer のすべてのメソッドは、同時使用に対してスレッドセーフです。次の表は、プロデューサーインスタンス内のスレッドを示しています。N はインスタンス番号で、0 から始まります。

スレッド名のフォーマット

数量

説明

aliyun-log-producer-<N>-mover

1

送信準備ができたバッチを送信スレッドプールに転送します。

aliyun-log-producer-<N>-io-thread

ioThreadCount

データ送信タスクを実行する IOThreadPool 内のスレッド。

aliyun-log-producer-<N>-success-batch-handler

1

正常に送信されたバッチを処理します。

aliyun-log-producer-<N>-failure-batch-handler

1

送信に失敗したバッチを管理します。

ステップ 4:ログプロジェクトの設定

ProjectConfig には、宛先プロジェクトのエンドポイント情報と、呼び出し元の ID を表すアクセス認証情報が含まれます。各ログプロジェクトは、1 つの ProjectConfig オブジェクトに対応します。

次のようにインスタンスを作成します。

ProjectConfig project1 = new ProjectConfig("your-project-1", "cn-hangzhou.log.aliyuncs.com", "accessKeyId", "accessKeySecret");
ProjectConfig project2 = new ProjectConfig("your-project-2", "cn-shanghai.log.aliyuncs.com", "accessKeyId", "accessKeySecret");
producer.putProject(project1);
producer.putProject(project2);

ステップ 5:データの送信

Future またはコールバックの作成

Aliyun Log Java Producer でログデータを送信する際、送信プロセスを処理するためにコールバック関数を指定します。このコールバック関数は、データ転送が成功したとき、または送信失敗時に例外が発生したときに呼び出されます。

説明

アプリケーションでの結果の後処理が単純で、プロデューサーをブロックしない場合は、コールバックを直接使用します。そうでない場合は、ListenableFuture を使用して、別のスレッドまたはスレッドプールでビジネスロジックを処理します。

メソッドのパラメーターは以下のとおりです。

パラメーター

説明

project

送信するデータの宛先プロジェクト。

logstore

送信するデータの宛先 Logstore。

logTem

送信するデータ。

completed

すべてのログが送信されたこと (成功と失敗の両方) を保証するための Java のアトミック型。

データの送信

プロデューサーインターフェイスは複数の送信メソッドを提供しており、それぞれに以下で説明する特定のパラメーターがあります。

パラメーター

説明

必須

project

宛先プロジェクト。

はい

logStore

宛先 Logstore。

はい

logItem

送信するログ。

はい

topic

ログのトピック。

いいえ

説明

指定しない場合、このパラメーターには "" が割り当てられます。

source

ログのソース。

いいえ

説明

指定しない場合、このパラメーターにはプロデューサーが存在するホストの IP アドレスが割り当てられます。

shardHash

送信するログのハッシュ値。必要に応じてハッシュ値を指定すると、ログはハッシュ値に基づいて指定された Logstore 内の特定のシャードに書き込まれます。

いいえ

説明

指定しない場合、データは宛先 Logstore 内のランダムなシャードに書き込まれます。

callback

ログの配信が成功したとき、または複数回のリトライ失敗後に破棄されたときに呼び出されるコールバック関数を定義します。

いいえ

一般的な例外

例外

説明

TimeoutException

TimeoutException は、プロデューサーのキャッシュされたログサイズがメモリ制限を超え、maxBlockMs ミリ秒後に十分なメモリを取得できない場合にスローされます。

maxBlockMs が -1 に設定されている場合、無期限のブロッキング期間を示し、TimeoutException は発生しません。

IllegalStateException

プロデューサーが閉じられた状態 (close メソッドが呼び出された) の場合、その後の send メソッドの呼び出しはすべて IllegalStateException になります。

ステップ 6:送信結果の取得

プロデューサーの送信メソッドは非同期であるため、送信結果は返された future または提供されたコールバックを介して取得する必要があります。

Future

send メソッドは ListenableFuture を返します。これは、標準の get メソッドに加えて、完了後のコールバック登録も可能です。以下のサンプルコードは ListenableFuture の使用法を示しています。future に FutureCallback を登録し、アプリケーションが提供する EXECUTOR_SERVICE スレッドプールで実行します。完全な例については、「SampleProducerWithFuture.java」をご参照ください。

package com.aliyun.openservices.aliyun.log.producer.sample;

import com.aliyun.openservices.aliyun.log.producer.*;
import com.aliyun.openservices.aliyun.log.producer.errors.LogSizeTooLargeException;
import com.aliyun.openservices.aliyun.log.producer.errors.MaxBatchCountExceedException;
import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
import com.aliyun.openservices.aliyun.log.producer.errors.ResultFailedException;
import com.aliyun.openservices.aliyun.log.producer.errors.TimeoutException;
import com.aliyun.openservices.log.common.LogItem;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SampleProducerWithFuture {

  private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducerWithFuture.class);

  private static final ExecutorService EXECUTOR_SERVICE = Executors
      .newFixedThreadPool(Math.max(Runtime.getRuntime().availableProcessors(), 1));

  public static void main(String[] args) throws InterruptedException {
    Producer producer = Utils.createProducer();
    int n = 100;
    int size = 20;

    // The number of logs that have finished (either successfully send, or failed)
    final AtomicLong completed = new AtomicLong(0);

    for (int i = 0; i < n; ++i) {
      List<LogItem> logItems = Utils.generateLogItems(size);
      try {
        String project = System.getenv("PROJECT");
        String logStore = System.getenv("LOG_STORE");
        ListenableFuture<Result> f = producer.send(project, logStore, logItems);
        Futures.addCallback(
            f, new SampleFutureCallback(project, logStore, logItems, completed), EXECUTOR_SERVICE);
      } catch (InterruptedException e) {
        LOGGER.warn("The current thread has been interrupted during send logs.");
      } catch (Exception e) {
        if (e instanceof MaxBatchCountExceedException) {
          LOGGER.error("The logs exceeds the maximum batch count, e={}", e);
        } else if (e instanceof LogSizeTooLargeException) {
          LOGGER.error("The size of log is larger than the maximum allowable size, e={}", e);
        } else if (e instanceof TimeoutException) {
          LOGGER.error("The time taken for allocating memory for the logs has surpassed., e={}", e);
        } else {
          LOGGER.error("Failed to send logs, e=", e);
        }
      }
    }

    Utils.doSomething();

    try {
      producer.close();
    } catch (InterruptedException e) {
      LOGGER.warn("The current thread has been interrupted from close.");
    } catch (ProducerException e) {
      LOGGER.info("Failed to close producer, e=", e);
    }

    EXECUTOR_SERVICE.shutdown();
    while (!EXECUTOR_SERVICE.isTerminated()) {
      EXECUTOR_SERVICE.awaitTermination(100, TimeUnit.MILLISECONDS);
    }
    LOGGER.info("All log complete, completed={}", completed.get());
  }

  private static final class SampleFutureCallback implements FutureCallback<Result> {

    private static final Logger LOGGER = LoggerFactory.getLogger(SampleFutureCallback.class);

    private final String project;

    private final String logStore;

    private final List<LogItem> logItems;

    private final AtomicLong completed;

    SampleFutureCallback(
        String project, String logStore, List<LogItem> logItems, AtomicLong completed) {
      this.project = project;
      this.logStore = logStore;
      this.logItems = logItems;
      this.completed = completed;
    }

    @Override
    public void onSuccess(@Nullable Result result) {
      LOGGER.info("Send logs successfully.");
      completed.getAndIncrement();
    }

    @Override
    public void onFailure(Throwable t) {
      if (t instanceof ResultFailedException) {
        Result result = ((ResultFailedException) t).getResult();
        LOGGER.error(
            "Failed to send logs, project={}, logStore={}, result={}", project, logStore, result);
      } else {
        LOGGER.error("Failed to send log, e=", t);
      }
      completed.getAndIncrement();
    }
  }
}

コールバック

コールバックはプロデューサーの内部スレッドによって実行され、データスペースは完了後にのみ解放されます。プロデューサーをブロックしてスループットを低下させないように、コールバック内で長時間の操作を避けてください。さらに、コールバック内でリトライのために send メソッドを呼び出さないでください。代わりに、ListenableFuture コールバックでリトライを処理します。完全な例については、「SampleProducerWithCallback.java」をご参照ください。

package com.aliyun.openservices.aliyun.log.producer.sample;

import com.aliyun.openservices.aliyun.log.producer.Callback;
import com.aliyun.openservices.aliyun.log.producer.Producer;
import com.aliyun.openservices.aliyun.log.producer.Result;
import com.aliyun.openservices.aliyun.log.producer.errors.LogSizeTooLargeException;
import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
import com.aliyun.openservices.aliyun.log.producer.errors.TimeoutException;
import com.aliyun.openservices.log.common.LogItem;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SampleProducerWithCallback {

  private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducerWithCallback.class);

  private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(10);

  public static void main(String[] args) throws InterruptedException {
    final Producer producer = Utils.createProducer();

    int nTask = 100;

    // The monotonically increasing sequence number we will put in the data of each log
    final AtomicLong sequenceNumber = new AtomicLong(0);

    // The number of logs that have finished (either successfully send, or failed)
    final AtomicLong completed = new AtomicLong(0);

    final CountDownLatch latch = new CountDownLatch(nTask);

    for (int i = 0; i < nTask; ++i) {
      EXECUTOR_SERVICE.submit(
          new Runnable() {
            @Override
            public void run() {
              LogItem logItem = Utils.generateLogItem(sequenceNumber.getAndIncrement());
              try {
                String project = System.getenv("PROJECT");
                String logStore = System.getenv("LOG_STORE");
                producer.send(
                    project,
                    logStore,
                    Utils.getTopic(),
                    Utils.getSource(),
                    logItem,
                    new SampleCallback(project, logStore, logItem, completed));
              } catch (InterruptedException e) {
                LOGGER.warn("The current thread has been interrupted during send logs.");
              } catch (Exception e) {
                if (e instanceof LogSizeTooLargeException) {
                  LOGGER.error(
                      "The size of log is larger than the maximum allowable size, e={}", e);
                } else if (e instanceof TimeoutException) {
                  LOGGER.error(
                      "The time taken for allocating memory for the logs has surpassed., e={}", e);
                } else {
                  LOGGER.error("Failed to send log, logItem={}, e=", logItem, e);
                }
              } finally {
                latch.countDown();
              }
            }
          });
    }
    latch.await();
    EXECUTOR_SERVICE.shutdown();

    Utils.doSomething();

    try {
      producer.close();
    } catch (InterruptedException e) {
      LOGGER.warn("The current thread has been interrupted from close.");
    } catch (ProducerException e) {
      LOGGER.info("Failed to close producer, e=", e);
    }

    LOGGER.info("All log complete, completed={}", completed.get());
  }

  private static final class SampleCallback implements Callback {

    private static final Logger LOGGER = LoggerFactory.getLogger(SampleCallback.class);

    private final String project;

    private final String logStore;

    private final LogItem logItem;

    private final AtomicLong completed;

    SampleCallback(String project, String logStore, LogItem logItem, AtomicLong completed) {
      this.project = project;
      this.logStore = logStore;
      this.logItem = logItem;
      this.completed = completed;
    }

    @Override
    public void onCompletion(Result result) {
      try {
        if (result.isSuccessful()) {
          LOGGER.info("Send log successfully.");
        } else {
          LOGGER.error(
              "Failed to send log, project={}, logStore={}, logItem={}, result={}",
              project,
              logStore,
              logItem.ToJsonString(),
              result);
        }
      } finally {
        completed.getAndIncrement();
      }
    }
  }
}

ステップ 7:プロデューサーのクローズ

プロデューサーが不要になった場合やプロセスが終了する際には、プロデューサーを閉じて、キャッシュされたすべてのデータが処理されるようにします。安全なシャットダウンと制限付きシャットダウンの 2 つのシャットダウンモードがサポートされています。

安全なシャットダウン

ほとんどの場合、close() メソッドを使用した安全なシャットダウンが推奨されます。close() メソッドは、キャッシュされたすべてのデータが処理され、スレッドが停止し、コールバックが実行され、future が完了するのを待ってから戻ります。

このメソッドはすべてのデータが処理されるのを待ちますが、コールバックがブロックされていなければすぐに戻り、バッチはクローズ後にリトライなしで即座に処理されます。

制限付きシャットダウン

コールバックがブロックされる可能性がある場合に迅速に戻るには、close(long timeoutMs) メソッドを使用した制限付きシャットダウンを使用します。指定された timeoutMs 後にプロデューサーが完全に閉じられていない場合、IllegalStateException がスローされ、データの損失や未実行のコールバックの可能性が示されます。

よくある質問

データ書き込み操作の数に制限はありますか?

  • SLS での読み書き操作の数とサイズには制限があります。詳細については、「データの読み書き」をご参照ください。

  • プロジェクト、Logstore、シャード、LogtailConfig、マシングループ、単一の LogItem サイズ、LogItem (キー) の長さ、LogItem (値) の長さなど、SLS の基本リソースにはすべて制限があります。詳細については、「基本リソースの制限」をご参照ください。

SLS にデータが書き込まれない場合はどうすればよいですか?

SLS にデータが書き込まれない場合は、次のトラブルシューティング手順に従ってください。

  1. プロジェクト内の aliyun-log-produceraliyun-log、および protobuf-java JAR パッケージのバージョンが、インストール ドキュメントで指定されているものと一致することを確認します。必要に応じてアップグレードしてください。

  2. Aliyun Log Java Producer の send メソッドは非同期であるため、戻りデータはすぐには利用できません。コールバックまたは Future オブジェクトを使用して、送信失敗の原因を特定します。

  3. コールバックインターフェイスの onCompletion メソッドが呼び出されない場合は、プログラム終了前に producer.close() メソッドが呼び出されていることを確認してください。データ転送はバックエンドスレッドによって処理されるため、producer.close() を呼び出すことでデータ損失を防ぎます。

  4. Aliyun Log Java Producer は、SLF4J ロギングフレームワークを使用してランタイムの動作を返します。プログラムにロギングフレームワークを設定し、DEBUG レベルのロギングを有効にして ERROR ログを確認してください。

  5. 上記の手順を完了しても問題が解決しない場合は、チケットを起票してください。

関連ドキュメント