SDK を使用して Simple Log Service から直接データを消費する方法は、負荷分散やフェールオーバーなどの複雑なタスクにおいて、サードパーティ製ソフトウェア、マルチ言語アプリケーション、クラウドサービス、ストリーム処理フレームワークとの統合時などに十分でない場合があります。このような場合は、コンシューマーグループを使用してデータを消費してください。コンシューマーグループは、通常秒単位の遅延でほぼリアルタイムでのデータ消費を実現します。本トピックでは、コンシューマーグループを用いたデータ消費方法について説明します。
概要
Logstore には複数のシャードが含まれます。コンシューマーグループを使用してデータを消費する場合、シャードはグループ内のコンシューマーに以下のように割り当てられます:
新しいコンシューマーがコンシューマーグループに参加すると、負荷を均等化するためにシャードの再割り当てが行われます。この再割り当ては、上記の原則に従って実行されます。
基本概念
|
用語
|
説明
|
|
consumer group
|
コンシューマーグループとは、Logstore からのデータ処理を共同で行う複数のコンシューマーから構成されるグループです。同一グループ内のコンシューマーはワークロードを共有し、各データレコードがグループ内の 1 つのコンシューマーによってのみ処理されることを保証します。
重要
Logstore ごとに最大 30 個のコンシューマーグループを作成できます。
|
|
consumer
|
コンシューマーグループ内でデータを消費する単位です。
重要
同一コンシューマーグループ内のコンシューマーは、それぞれ固有の名前を持つ必要があります。
|
|
Logstore
|
データ収集、保存、および照会のための基本単位です。詳細については、「Logstore」をご参照ください。
|
|
shard
|
Logstore の基本単位であり、固定の読み取りおよび書き込み能力を提供します。Logstore 内のすべてのデータはシャード内に格納されます。詳細については、「shard」をご参照ください。
|
|
checkpoint
|
コンシューマーが最後に消費した位置を示すマーカーです。再起動後、コンシューマーはこのチェックポイントを使用して消費を再開します。
説明
コンシューマーグループを使用する場合、コンシューマープロセスが障害で終了した際に、チェックポイントは自動的に保存されます。復旧後、プロセスは保存されたチェックポイントから消費を再開することで、重複消費を防止します。
|
手順 1:コンシューマーグループの作成
本セクションでは、SDK、API、または CLI を使用したコンシューマーグループの作成方法を説明します。
SDK
以下のコードは、コンシューマーグループの作成方法を示しています:
CreateConsumerGroup.java
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.exception.LogException;
public class CreateConsumerGroup {
public static void main(String[] args) throws LogException {
// この例では、AccessKey ID および AccessKey Secret を環境変数から取得します。
String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// プロジェクト名を入力します。
String projectName = "ali-test-project";
// Logstore 名を入力します。
String logstoreName = "ali-test-logstore";
// Simple Log Service のエンドポイントを設定します。この例では中国 (杭州) リージョンのエンドポイントを使用しています。実際のエンドポイントに置き換えてください。
String host = "https://cn-hangzhou.log.aliyuncs.com";
// Simple Log Service クライアントを作成します。
Client client = new Client(host, accessId, accessKey);
try {
// コンシューマーグループ名を設定します。
String consumerGroupName = "ali-test-consumergroup2";
System.out.println("コンシューマーグループの作成準備完了");
ConsumerGroup consumerGroup = new ConsumerGroup(consumerGroupName, 300, true);
client.CreateConsumerGroup(projectName, logstoreName, consumerGroup);
System.out.println(String.format("コンシューマーグループ %s の作成に成功しました", consumerGroupName));
} catch (LogException e) {
System.out.println("LogException e :" + e.toString());
System.out.println("エラーコード :" + e.GetErrorCode());
System.out.println("エラーメッセージ :" + e.GetErrorMessage());
throw e;
}
}
}
その他のコード例については、「Java SDK を使用したコンシューマーグループの管理」および「Python 向け Log Service SDK を使用したコンシューマーグループの管理」をご参照ください。
手順 2:ログの消費
仕組み
コンシューマーが初めて起動すると、SDK はコンシューマーグループが存在しない場合に自動的に作成します。「消費開始チェックポイント」の設定は、コンシューマーグループが初めて作成されたときにのみ適用され、初期位置を決定します。その後の再起動では、コンシューマーはサーバーに保存された最終消費チェックポイントから消費を再開します。たとえば:
消費の例
Java、C++、Python、Go 向けの SDK を使用して、コンシューマーグループからデータを消費できます。本セクションでは、Java SDK を使用した例を示します。
SDK
-
Maven 依存関係を追加します。
pom.xml ファイルに、以下の依存関係を追加します:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>loghub-client-lib</artifactId>
<version>0.6.50</version>
</dependency>
-
消費ロジックを作成します。以下のコードは一例です:
SampleLogHubProcessor.java
import com.aliyun.openservices.log.common.FastLog;
import com.aliyun.openservices.log.common.FastLogContent;
import com.aliyun.openservices.log.common.FastLogGroup;
import com.aliyun.openservices.log.common.FastLogTag;
import com.aliyun.openservices.log.common.LogGroupData;
import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
import java.util.List;
public class SampleLogHubProcessor implements ILogHubProcessor {
private int shardId;
// 最終チェックポイント保存時刻を追跡します。
private long mLastSaveTime = 0;
// プロセッサの初期化時に呼び出されます。
public void initialize(int shardId) {
this.shardId = shardId;
}
// 主要なデータ消費ロジック。このメソッド内で発生するすべての例外を処理し、再スローしないでください。
public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) {
// 取得したデータを出力します。
for (LogGroupData logGroup : logGroups) {
FastLogGroup fastLogGroup = logGroup.GetFastLogGroup();
System.out.println("Tags");
for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) {
FastLogTag logTag = fastLogGroup.getLogTags(i);
System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue());
}
for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) {
FastLog log = fastLogGroup.getLogs(i);
System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
for (int j = 0; j < log.getContentsCount(); ++j) {
FastLogContent content = log.getContents(j);
System.out.println(content.getKey() + "\t:\t" + content.getValue());
}
}
}
long curTime = System.currentTimeMillis();
// 30 秒ごとにチェックポイントをサーバーに保存します。ワーカーが予期せず終了した場合、新しいワーカーは最後のチェックポイントから消費を再開しますが、少量のデータが重複して消費される可能性があります。
try {
if (curTime - mLastSaveTime > 30 * 1000) {
// <code>true
false
その他のコードサンプルについては、「aliyun-log-consumer-java」および「Aliyun LOG Go Consumer」をご参照ください。
-
コンシューマーインスタンスファクトリを作成します。以下のコードは一例です:
SampleLogHubProcessorFactory.java
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
class SampleLogHubProcessorFactory implements ILogHubProcessorFactory {
public ILogHubProcessor generatorProcessor() {
// コンシューマーインスタンスを生成します。注意:このメソッドは毎回新しい SampleLogHubProcessor オブジェクトを返す必要があります。
return new SampleLogHubProcessor();
}
}
-
コンシューマーを作成し、指定された Logstore からデータを消費するコンシューマースレッドを起動します。以下のコードは一例です:
Main.java
import com.aliyun.openservices.loghub.client.ClientWorker;
import com.aliyun.openservices.loghub.client.config.LogHubConfig;
import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
public class Main {
// Simple Log Service のエンドポイント。 実際の エンドポイントに置き換えてください。 この例では、中国 (杭州) リージョンのエンドポイントを使用します。
private static String Endpoint = "cn-hangzhou.log.aliyuncs.com";
// プロジェクト名。
private static String Project = "ali-test-project";
// Logstore 名。
private static String Logstore = "ali-test-logstore";
// コンシューマーグループ名。 コンシューマーグループを事前に作成する必要はありません。 プログラムによってランタイムで自動的に作成されます。
private static String ConsumerGroup = "ali-test-consumergroup2";
// 環境変数から AccessKey ID と AccessKey Secret を取得します。
private static String AccessKeyId= System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
// 'consumer_1' はコンシューマー名で、コンシューマーグループ内で一意である必要があります。 複数のマシンで Logstore からの消費のバランスを取るには、各コンシューマーにマシン IP アドレスなどの一意の識別子を使用します。
// maxFetchLogGroupSize: 1 回のリクエストでフェッチするロググループの最大数。 デフォルト値が推奨されます。 変更するには、config.setMaxFetchLogGroupSize(100) を使用します。 値は (0, 1000] の範囲である必要があります。
LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR,1000);
ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
Thread thread = new Thread(worker);
// スレッドは ClientWorker を開始します。これは Runnable インターフェイスを実装し、自動的に実行されます。
thread.start();
Thread.sleep(60 * 60 * 1000);
// worker.shutdown() を呼び出してコンシューマーインスタンスを停止します。 関連付けられたスレッドは自動的に停止します。
worker.shutdown();
// ClientWorker は、ランタイム中に非同期タスクを作成します。 シャットダウン後、実行中のタスクが安全に終了するのを待ちます。 30 秒間のスリープが推奨されます。
Thread.sleep(30 * 1000);
}
}
-
Main.java を実行します。
たとえば、NGINX ログを消費する場合、出力は以下のようになります:
: GET
request_uri : /request/path-3/file-7
status : 200
body_bytes_sent : 3820
host : www.example.com
request_time : 43
request_length : 1987
http_user_agent : Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36
http_referer : www.example.com
http_x_forwarded_for : 192.168.10.196
upstream_response_time : 0.02
--------
Log: 158, time: 1635629778, GetContentCount: 14
......
category : null
source : 127.0.0.1
topic : nginx_access_log
machineUUID : null
Tags
__receive_time__ : 1635629815
--------
Log: 0, time: 1635629788, GetContentCount: 14
......
category : null
source : 127.0.0.1
topic : nginx_access_log
machineUUID : null
Tags
__receive_time__ : 1635629877
--------
......
SDK および SPL
-
Maven 依存関係を追加します。
pom.xml ファイルに、以下の依存関係を追加します:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>loghub-client-lib</artifactId>
<version>0.6.50</version>
</dependency>
-
消費ロジックを作成します。以下のコードは一例です:
SPLLogHubProcessor.java
import com.aliyun.openservices.log.common.FastLog;
import com.aliyun.openservices.log.common.FastLogContent;
import com.aliyun.openservices.log.common.FastLogGroup;
import com.aliyun.openservices.log.common.FastLogTag;
import com.aliyun.openservices.log.common.LogGroupData;
import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
import java.util.List;
public class SPLLogHubProcessor implements ILogHubProcessor {
private int shardId;
// 最終チェックポイント保存時刻を追跡します。
private long mLastSaveTime = 0;
// プロセッサの初期化時に呼び出されます。
public void initialize(int shardId) {
this.shardId = shardId;
}
// 主要なデータ消費ロジック。このメソッド内で発生するすべての例外を処理し、再スローしないでください。
public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) {
// 取得したデータを出力します。
for (LogGroupData logGroup : logGroups) {
FastLogGroup fastLogGroup = logGroup.GetFastLogGroup();
System.out.println("Tags");
for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) {
FastLogTag logTag = fastLogGroup.getLogTags(i);
System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue());
}
for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) {
FastLog log = fastLogGroup.getLogs(i);
System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
for (int j = 0; j < log.getContentsCount(); ++j) {
FastLogContent content = log.getContents(j);
System.out.println(content.getKey() + "\t:\t" + content.getValue());
}
}
}
long curTime = System.currentTimeMillis();
// 30 秒ごとにチェックポイントをサーバーに保存します。ワーカーが予期せず終了した場合、新しいワーカーは最後のチェックポイントから消費を再開しますが、少量のデータが重複して消費される可能性があります。
try {
if (curTime - mLastSaveTime > 30 * 1000) {
// <code>true
false
-
コンシューマーインスタンスファクトリを作成します。以下のコードは一例です:
SPLLogHubProcessorFactory.java
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
class SPLLogHubProcessorFactory implements ILogHubProcessorFactory {
public ILogHubProcessor generatorProcessor() {
// コンシューマーインスタンスを生成します。注意:このメソッドは毎回新しい SPLLogHubProcessor オブジェクトを返す必要があります。
return new SPLLogHubProcessor();
}
}
-
コンシューマーを作成し、指定された Logstore からデータを消費するコンシューマースレッドを起動します。以下のコードは一例です:
Main.java
import com.aliyun.openservices.loghub.client.ClientWorker;
import com.aliyun.openservices.loghub.client.config.LogHubConfig;
import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
public class Main {
// Simple Log Service のエンドポイント。 実際のエンドポイントに置き換えてください。 この例では、中国 (杭州) リージョンのエンドポイントを使用します。
private static String Endpoint = "cn-hangzhou.log.aliyuncs.com";
// プロジェクト名。
private static String Project = "ali-test-project";
// Logstore 名。
private static String Logstore = "ali-test-logstore";
// コンシューマーグループ名。 コンシューマーグループを事前に作成する必要はありません。 プログラムが実行時に自動的に作成します。
private static String ConsumerGroup = "ali-test-consumergroup2";
// 環境変数から AccessKey ID と AccessKey Secret を取得します。
private static String AccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
// 'consumer_1' はコンシューマー名であり、コンシューマーグループ内で一意である必要があります。 複数のマシン間で Logstore からの消費のバランスを取るには、各コンシューマーにマシンの IP アドレスなどの一意の識別子を使用します。
// maxFetchLogGroupSize: 1 回のリクエストでフェッチするロググループの最大数。 デフォルト値の使用を推奨します。 変更するには、config.setMaxFetchLogGroupSize(100) を使用します。 値は (0, 1000] の範囲である必要があります。
LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR, 1000);
// 消費のための Simple Log Service Processing Language (SPL) クエリを指定します。
config.setQuery("* | where cast(body_bytes_sent as bigint) > 14000");
ClientWorker worker = new ClientWorker(new SPLLogHubProcessorFactory(), config);
Thread thread = new Thread(worker);
// スレッドは ClientWorker を開始します。ClientWorker は Runnable インターフェイスを実装し、自動的に実行されます。
thread.start();
Thread.sleep(60 * 60 * 1000);
// worker.shutdown() を呼び出してコンシューマーインスタンスを停止します。 関連付けられたスレッドは自動的に停止します。
worker.shutdown();
// ClientWorker は実行時に非同期タスクを作成します。 シャットダウン後、実行中のタスクが安全に終了するのを待ちます。 30 秒間のスリープが推奨されます。
Thread.sleep(30 * 1000);
}
}
-
Main.java を実行します。
たとえば、NGINX ログを消費する場合、出力は以下のようになります:
: GET
request_uri : /request/path-3/file-7
status : 200
body_bytes_sent : 3820
host : www.example.com
request_time : 43
request_length : 1987
http_user_agent : Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36
http_referer : www.example.com
http_x_forwarded_for : 192.168.10.196
upstream_response_time : 0.02
--------
Log: 158, time: 1635629778, GetContentCount: 14
......
category : null
source : 127.0.0.1
topic : nginx_access_log
machineUUID : null
Tags
__receive_time__ : 1635629815
--------
Log: 0, time: 1635629788, GetContentCount: 14
......
category : null
source : 127.0.0.1
topic : nginx_access_log
machineUUID : null
Tags
__receive_time__ : 1635629877
--------
......
手順 3:コンシューマーグループのステータスの確認
本セクションでは、コンシューマーグループのステータスを確認する 2 つの方法について説明します。
Java SDK
-
以下のコード例は、各シャードの消費チェックポイントを確認する方法を示しています:
ConsumerGroupTest.java
import java.util.List;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.Consts.CursorMode;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.common.ConsumerGroupShardCheckPoint;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.exception.LogException;
public class ConsumerGroupTest {
static String endpoint = "cn-hangzhou.log.aliyuncs.com";
static String project = "ali-test-project";
static String logstore = "ali-test-logstore";
static String accesskeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
static String accesskey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
public static void main(String[] args) throws LogException {
Client client = new Client(endpoint, accesskeyId, accesskey);
// Logstore 内のすべてのコンシューマーグループを取得します。コンシューマーグループが存在しない場合は空リストを返します。
List<ConsumerGroup> consumerGroups = client.ListConsumerGroup(project, logstore).GetConsumerGroups();
for(ConsumerGroup c: consumerGroups){
// コンシューマーグループのプロパティ(名前、ハートビートタイムアウト、順序付き消費の有無)を出力します。
System.out.println("Name: " + c.getConsumerGroupName());
System.out.println("Heartbeat timeout: " + c.getTimeout());
System.out.println("Ordered consumption: " + c.isInOrder());
for(ConsumerGroupShardCheckPoint cp: client.GetCheckPoint(project, logstore, c.getConsumerGroupName()).GetCheckPoints()){
System.out.println("shard: " + cp.getShard());
// マイクロ秒単位のタイムスタンプ (long)。
System.out.println("Last checkpoint update time: " + cp.getUpdateTime());
System.out.println("Consumer name: " + cp.getConsumer());
String consumerPrg = "";
if(cp.getCheckPoint().isEmpty())
consumerPrg = "消費が開始されていません";
else{
// UNIX タイムスタンプ(秒単位)。注意:出力用にこの値をフォーマットしてください。
try{
int prg = client.GetPrevCursorTime(project, logstore, cp.getShard(), cp.getCheckPoint()).GetCursorTime();
consumerPrg = "" + prg;
}
catch(LogException e){
if(e.GetErrorCode().equals("InvalidCursor"))
// チェックポイントが無効なのは、Logstore の保持期間より古いからです。
consumerPrg = "無効です。最後の消費チェックポイントは Logstore の保持期間より古いです。";
else{
// 内部サーバーエラー
throw e;
}
}
}
System.out.println("Consumption checkpoint: " + consumerPrg);
String endCursor = client.GetCursor(project, logstore, cp.getShard(), CursorMode.END).GetCursor();
int endPrg = 0;
try{
endPrg = client.GetPrevCursorTime(project, logstore, cp.getShard(), endCursor).GetCursorTime();
}
catch(LogException e){
// 何もしません
}
// UNIX タイムスタンプ(秒単位)。注意:出力用にこの値をフォーマットしてください。
System.out.println("Latest data arrival time: " + endPrg);
}
}
}
}
-
以下の出力が返されます:
Name: ali-test-consumergroup2
Heartbeat timeout: 60
Ordered consumption: false
shard: 0
Last checkpoint update time: 0
Consumer name: consumer_1
Consumption checkpoint: 消費が開始されていません
Latest data arrival time: 1729583617
shard: 1
Last checkpoint update time: 0
Consumer name: consumer_1
Consumption checkpoint: 消費が開始されていません
Latest data arrival time: 1729583738
Process finished with exit code 0
コンソール
-
Simple Log Service コンソール にログインします。
プロジェクト一覧から、対象のプロジェクトをクリックします。

-
[]タブで、対象の Logstore の横にある
アイコンをクリックし、次に [データ消費] の横にある
アイコンをクリックします。
-
コンシューマーグループ一覧から、対象のコンシューマーグループをクリックします。
-
コンシューマーグループのステータス ページで、各シャードの消費チェックポイントを確認できます。
関連操作
-
RAM ユーザーへの権限付与
RAM ユーザーを使用してコンシューマーグループを管理するには、ユーザーに必要な権限を付与する必要があります。詳細については、「RAM ユーザーの作成と権限の付与」をご参照ください。
以下の表に、必要なアクションを示します。
|
アクション
|
説明
|
リソース
|
|
log:GetCursorOrData (GetCursor)
|
指定された時刻に基づいてカーソルを取得します。
|
acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${LogStoreName}
|
|
log:CreateConsumerGroup (CreateConsumerGroup)
|
指定された Logstore 用のコンシューマーグループを作成します。
|
acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${LogStoreName}/consumergroup/${consumerGroupName}
|
|
log:ListConsumerGroup (ListConsumerGroup)
|
指定された Logstore のすべてのコンシューマーグループを一覧表示します。
|
acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${LogStoreName}/consumergroup/*
|
|
log:ConsumerGroupUpdateCheckPoint (ConsumerGroupUpdateCheckPoint)
|
指定されたコンシューマーグループのシャードのチェックポイントを更新します。
|
acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${LogStoreName}/consumergroup/${consumerGroupName}
|
|
log:ConsumerGroupHeartBeat (ConsumerGroupHeartBeat)
|
コンシューマーからサーバーへハートビートを送信します。
|
acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${LogStoreName}/consumergroup/${consumerGroupName}
|
|
log:UpdateConsumerGroup (UpdateConsumerGroup)
|
指定されたコンシューマーグループのプロパティを変更します。
|
acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${LogStoreName}/consumergroup/${consumerGroupName}
|
|
log:GetConsumerGroupCheckPoint (GetCheckPoint)
|
指定されたコンシューマーグループの 1 つまたはすべてのシャードのチェックポイントを取得します。
|
acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${LogStoreName}/consumergroup/${consumerGroupName}
|
たとえば、以下の詳細を持つコンシューマーグループに対して RAM ユーザーに権限を付与する場合、下記のポリシーを使用します。
-
Alibaba Cloud アカウント:174649****602745。
-
リージョン ID:cn-hangzhou。
-
プロジェクト名:project-test。
-
Logstore 名:logstore-test。
-
コンシューマーグループ名:consumergroup-test。
ポリシーの例:
{
"Version": "1",
"Statement": [
{
"Effect": "Allow",
"Action": [
"log:GetCursorOrData"
],
"Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test"
},
{
"Effect": "Allow",
"Action": [
"log:CreateConsumerGroup",
"log:ListConsumerGroup"
],
"Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/*"
},
{
"Effect": "Allow",
"Action": [
"log:ConsumerGroupUpdateCheckPoint",
"log:ConsumerGroupHeartBeat",
"log:UpdateConsumerGroup",
"log:GetConsumerGroupCheckPoint"
],
"Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/consumergroup-test"
}
]
}
-
トラブルシューティング
コンシューマーアプリケーションで Log4j を構成して、コンシューマーグループから発生する例外をログ出力することで、問題の特定と診断を容易にします。以下のコードは、log4j.properties の構成例です:
log4j.rootLogger = info,stdout
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n
Log4j を構成した後、コンシューマーアプリケーションを実行すると、以下の例のような例外メッセージが表示されます:
[WARN ] 2018-03-14 12:01:52,747 method:com.aliyun.openservices.loghub.client.LogHubConsumer.sampleLogError(LogHubConsumer.java:159)
com.aliyun.openservices.log.exception.LogException: Invalid loggroup count, (0,1000]
-
特定の時刻からデータを消費
// consumerStartTimeInSeconds パラメーターは、消費を開始する時刻を指定します。
public LogHubConfig(String consumerGroupName,
String consumerName,
String loghubEndPoint,
String project, String logStore,
String accessId, String accessKey,
int consumerStartTimeInSeconds);
// position パラメーターは列挙型です。LogHubConfig.ConsumePosition.BEGIN_CURSOR は最も古いデータから消費を開始し、LogHubConfig.ConsumePosition.END_CURSOR は最新のデータから消費を開始します。
public LogHubConfig(String consumerGroupName,
String consumerName,
String loghubEndPoint,
String project, String logStore,
String accessId, String accessKey,
ConsumePosition position);
説明
-
消費要件に合ったコンストラクターを使用してください。
-
サーバーに既にチェックポイントが保存されている場合、他の設定に関係なく、保存されたチェックポイントから消費が再開されます。
-
デフォルトでは、Simple Log Service はデータ消費時にチェックポイントを優先します。開始時刻を指定する場合は、consumerStartTimeInSeconds の値が生存時間 (TTL) 期間内にあることを確認してください。そうでない場合、この設定は無効になります。
-
チェックポイントのリセット
public static void updateCheckpoint() throws Exception {
Client client = new Client(host, accessId, accessKey);
// タイムスタンプは UNIX タイムスタンプ(秒単位)である必要があります。ミリ秒単位のタイムスタンプを使用する場合は、例のように 1000 で除算してください。
long timestamp = Timestamp.valueOf("2017-11-15 00:00:00").getTime() / 1000;
ListShardResponse response = client.ListShard(new ListShardRequest(project, logStore));
for (Shard shard : response.GetShards()) {
int shardId = shard.GetShardId();
String cursor = client.GetCursor(project, logStore, shardId, timestamp).GetCursor();
client.UpdateCheckPoint(project, logStore, consumerGroup, shardId, cursor);
}
}