Lindorm SDK のマルチバリューモデルを使用すると、InfluxDB のマルチフィールド測定モデルと同様に、1 回の呼び出しでデータポイントごとに複数のフィールドを書き込むことができます。このドキュメントでは、Aliyun TSDB SDK for Java を使用してデータを書き込む方法について、同期書き込みと非同期書き込みの両方をカバーして説明します。
完全な例は、SDK リポジトリの test/example/ExampleOfMultiField および test/com.aliyun.hitsdb.client/TestHiTSDBClientMultiFieldFeatures にあります。
マルチバリューモデルには Lindorm SDK V0.2.0 以降が必要です。非同期マルチバリュー書き込みには V0.2.1 以降が必要です。V0.2.0 より前の SDK は、シミュレートされたマルチバリューモデルを使用します。シミュレートされたマルチバリューモデルは使用しないことを推奨します。MultiFieldクラス (V0.2.0 以降) で書き込まれたデータは、以前の SDK バージョンのMultiValueクラスを使用してクエリできません。最新の SDK バージョンとダウンロードリンクについては、SDK ドキュメントの「バージョン」セクションをご参照ください。
前提条件
開始する前に、以下を準備してください。
Aliyun TSDB SDK for Java V0.2.0 以降 (非同期書き込みの場合は V0.2.1 以降)
実行中の Lindorm Time Series Database インスタンス
データモデル
書き込む各 MultiFieldPoint には、次の 4 つのコンポーネントがあります。
| コンポーネント | 説明 | 例 |
|---|---|---|
| メトリック | InfluxDB の測定値に似たトップレベルカテゴリ | wind |
| フィールド | メトリック内のサブカテゴリ。単一のメトリックは複数のフィールドを保持できます。 | speed、direction、description |
| タグ | タイムラインを識別するメタデータ | sensor="95D8-7913"、city="hangzhou" |
| タイムスタンプ | データポイントの UNIX タイムスタンプ | 1537170208L |
データの同期書き込み
単一ポイントまたは複数ポイントの同期書き込みには、multiFieldPutSync() を使用します。
/*
* ビルダー API を使用して MultiFieldPoint を構築します。
* メトリック: トップレベルカテゴリ (例: "wind")
* フィールド: メトリック内の測定値 (数値または文字列の値)
* タグ: タイムライン識別子
* タイムスタンプ: データポイントの UNIX タイムスタンプ
*/
MultiFieldPoint multiFieldPoint = MultiFieldPoint.metric("wind")
.field("speed", 45.2)
.field("level", 1.2)
.field("direction", "S")
.field("description", "Breeze")
.tag("sensor", "95D8-7913")
.tag("city", "hangzhou")
.tag("province", "zhejiang")
.timestamp(1537170208L)
.build();
// 同期書き込み
tsdb.multiFieldPutSync(multiFieldPoint);データの非同期書き込み
非同期書き込みでは、バッファリングされたキューとともに multiFieldPut() を使用します。TSDBConfig を介して、キューサイズ、コンシューマースレッド、およびオプションのコールバックを構成します。
コールバックタイプの選択
| コールバックタイプ | クラス | response() の戻り値 | failed() の戻り値 |
|---|---|---|---|
| なし | (コールバックなし) | — | — |
| 共通 | MultiFieldBatchPutCallback | 成功したポイントと結果 | 失敗したポイントと例外 |
| 概要 | MultiFieldBatchPutSummaryCallback | 処理されたポイントと SummaryResult (成功/失敗のカウント) | 失敗したポイントと例外 |
| 詳細 | MultiFieldBatchPutDetailsCallback | 処理されたポイントと DetailsResult (失敗したポイントとそのエラー) | 失敗したポイントと例外 |
基本的な成功/失敗処理には 共通 コールバックを使用します。集計カウントが必要な場合は 概要 コールバックを使用します。ポイントごとのエラー詳細が必要な場合は 詳細 コールバックを使用します。
共通構成
すべての非同期書き込みの例では、同じベース TSDBConfig を使用します。アプリケーションごとに一度構成し、再利用してください。
TSDBConfig config = TSDBConfig.address("127.0.0.1", 8242)
.httpConnectTimeout(90)
// 単一値の非同期書き込みのバッチサイズ
.batchPutSize(500)
// 単一値の非同期書き込みのコンシューマースレッド
.batchPutConsumerThreadCount(1)
// マルチ値の非同期書き込みのバッファキューサイズ
.multiFieldBatchPutBufferSize(10000)
// マルチ値の非同期書き込みのコンシューマースレッド
.multiFieldBatchPutConsumerThreadCount(1)
// ここにコールバックをアタッチします (以下のオプションを参照)
.listenMultiFieldBatchPut(callback)
.config();
// アプリケーションライフサイクルごとに TSDB インスタンスを一度だけ作成します
TSDB tsdb = TSDBClientFactory.connect(config);TSDB インスタンスは、アプリケーションごとに一度だけ作成してください。アプリケーションが終了するまで閉じないでください。
コールバックなし
構成から .listenMultiFieldBatchPut() を省略します。クライアントは結果を返さずに書き込みを送信します。
TSDBConfig config = TSDBConfig.address("127.0.0.1", 8242)
.batchPutSize(500)
.batchPutConsumerThreadCount(1)
.multiFieldBatchPutBufferSize(10000)
.multiFieldBatchPutConsumerThreadCount(1)
.config();
TSDB tsdb = TSDBClientFactory.connect(config);
MultiFieldPoint point = MultiFieldPoint
.metric("test-test-test")
.tag("a", "1")
.tag("b", "2")
.timestamp(System.currentTimeMillis())
.field("f1", Math.random())
.field("f2", Math.random())
.build();
tsdb.multiFieldPut(point);
// アプリケーションが終了する場合にのみ閉じます
tsdb.close();共通コールバック
MultiFieldBatchPutCallback は、発生した成功と失敗を報告します。
AbstractMultiFieldBatchPutCallback callback = new MultiFieldBatchPutCallback() {
final AtomicInteger num = new AtomicInteger();
@Override
public void response(String address, List<MultiFieldPoint> points, Result result) {
int count = num.addAndGet(points.size());
System.out.println(result);
System.out.println("システムは " + count + " ポイントを処理しました。");
}
@Override
public void failed(String address, List<MultiFieldPoint> points, Exception ex) {
System.err.println("コールバック関数が失敗しました。" + points.size() + " エラー!");
ex.printStackTrace();
}
};
TSDBConfig config = TSDBConfig.address("127.0.0.1", 8242)
.httpConnectTimeout(90)
.batchPutSize(500)
.batchPutConsumerThreadCount(1)
.multiFieldBatchPutBufferSize(10000)
.multiFieldBatchPutConsumerThreadCount(1)
.listenMultiFieldBatchPut(callback)
.config();
TSDB tsdb = TSDBClientFactory.connect(config);
MultiFieldPoint point = MultiFieldPoint
.metric("test-test-test")
.tag("a", "1")
.tag("b", "2")
.timestamp(System.currentTimeMillis())
.field("f1", Math.random())
.field("f2", Math.random())
.build();
tsdb.multiFieldPut(point);
tsdb.close();概要コールバック
MultiFieldBatchPutSummaryCallback は、SummaryResult で成功および失敗の集計カウントを返します。
AbstractMultiFieldBatchPutCallback callback = new MultiFieldBatchPutSummaryCallback() {
final AtomicInteger num = new AtomicInteger();
@Override
public void response(String address, List<MultiFieldPoint> points, SummaryResult result) {
int count = num.addAndGet(points.size());
System.out.println(result);
System.out.println("システムは " + count + " ポイントを処理しました。");
}
@Override
public void failed(String address, List<MultiFieldPoint> points, Exception ex) {
System.err.println("コールバック関数が失敗しました。" + points.size() + " エラー!");
ex.printStackTrace();
}
};
TSDBConfig config = TSDBConfig.address("127.0.0.1", 8242)
.httpConnectTimeout(90)
.batchPutSize(500)
.batchPutConsumerThreadCount(1)
.multiFieldBatchPutBufferSize(10000)
.multiFieldBatchPutConsumerThreadCount(1)
.listenMultiFieldBatchPut(callback)
.config();
TSDB tsdb = TSDBClientFactory.connect(config);
MultiFieldPoint point = MultiFieldPoint
.metric("test-test-test")
.tag("a", "1")
.tag("b", "2")
.timestamp(System.currentTimeMillis())
.field("f1", Math.random())
.field("f2", Math.random())
.build();
tsdb.multiFieldPut(point);
tsdb.close();詳細コールバック
MultiFieldBatchPutDetailsCallback は、DetailsResult でポイントごとのエラー詳細を返します。
MultiFieldBatchPutDetailsCallback callback = new MultiFieldBatchPutDetailsCallback() {
final AtomicInteger num = new AtomicInteger();
@Override
public void response(String address, List<MultiFieldPoint> points, DetailsResult result) {
int count = num.addAndGet(points.size());
System.out.println(result);
System.out.println("システムは " + count + " ポイントを処理しました。");
}
@Override
public void failed(String address, List<MultiFieldPoint> points, Exception ex) {
System.err.println("コールバック関数が失敗しました。" + points.size() + " エラー!");
ex.printStackTrace();
}
};
TSDBConfig config = TSDBConfig.address("127.0.0.1", 8242)
.httpConnectTimeout(90)
.batchPutSize(500)
.batchPutConsumerThreadCount(1)
.multiFieldBatchPutBufferSize(10000)
.multiFieldBatchPutConsumerThreadCount(1)
.listenMultiFieldBatchPut(callback)
.config();
TSDB tsdb = TSDBClientFactory.connect(config);
MultiFieldPoint point = MultiFieldPoint
.metric("test-test-test")
.tag("a", "1")
.tag("b", "2")
.timestamp(System.currentTimeMillis())
.field("f1", Math.random())
.field("f2", Math.random())
.build();
tsdb.multiFieldPut(point);
tsdb.close();