本文通過範例程式碼指導您使用多值模型寫入資料。詳細樣本請參見Aliyun TSDB SDK for Java中的test/example/ExampleOfMultiField和test/com.aliyun.hitsdb.client/TestHiTSDBClientMultiFieldFeatures。
說明 多值模型僅在0.2.0版本及以上的SDK支援。關於最新SDK版本的說明及下載,請參見SDK的《版本說明》。之前版本的SDK所支援的多值模型實際是類比多值模型,不鼓勵繼續使用。之前通過MultiValue 類寫入的資料並不能通過最新的 SDK 多值模型相關類 (MultiField) 進行查詢。
多值模型的資料寫入
範例程式碼:
/*
* 通過 multiFieldPutSync() API。
* 該 API 支援單點和多點(List)寫入。
*
* 下面是必須寫入時必須提供的資訊:
* Metric: 資料指標的類別。相當於 influxdb 的 Measurement。
* Fields: 資料指標的度量資訊(相當於指標下的子類別)。即一個 metric 支援多個 fields。如 metric 為 wind,該 metric 可以有多個 fields:direction, speed, direction, description 和 temperature。
* Timestamp: 資料點的時間戳記
* Tags: 時間軸額外的資訊,如"型號=ABC123"、"出廠編號=1234567890"等。
*/
MultiFieldPoint multiFieldPoint = MultiFieldPoint.metric("wind")
.field("speed", 45.2)
.field("level", 1.2)
.field("direction", "S")
.field("description", "Breeze")
.tag("sensor", "95D8-7913")
.tag("city", "hangzhou")
.tag("province", "zhejiang")
.timestamp(1537170208L)
.build();
// 同步寫入
tsdb.multiFieldPutSync(multiFieldPoint);多值模型的非同步寫入
注意:多值模型非同步僅在 0.2.1 版本及以上的 SDK 支援。關於最新 SDK 版本的說明及下載,請參見SDK的《版本說明》。
不使用回調方法
預設的TSDBConfig沒有設定多值非同步寫入回調方法。範例程式碼:
TSDBConfig config = TSDBConfig.address("127.0.0.1", 8242)
// 批次寫入時每次提交資料量
.batchPutSize(500)
// 單值非同步寫入線程數
.batchPutConsumerThreadCount(1)
// 多值非同步寫入緩衝隊列長度
.multiFieldBatchPutBufferSize(10000)
// 多值非同步寫入線程數
.multiFieldBatchPutConsumerThreadCount(1)
.config();
// 特別注意,TSDB只需初始化一次即可
TSDB tsdb = TSDBClientFactory.connect(config);
MultiFieldPoint point = MultiFieldPoint
.metric("test-test-test")
.tag("a", "1")
.tag("b", "2")
.timestamp(System.currentTimeMillis())
.field("f1", Math.random())
.field("f2", Math.random())
.build();
tsdb.multiFieldPut(point);
// 特別注意,在應用生命週期內不用關閉TSDB執行個體,直到應用結束時關閉即可
tsdb.close();
}使用普通回調方法
普通回調方法僅報告當寫入成功時的資料點列表,或者當寫入失敗時的資料點列表以及對應的寫入異常。範例程式碼:
// 建立一個普通回調方法,用於處理寫入成功或者失敗後的情況
AbstractMultiFieldBatchPutCallback callback = new MultiFieldBatchPutCallback() {
@Override
public void response(String address, List<MultiFieldPoint> points, Result result) {
int count = num.addAndGet(points.size());
System.out.println(result);
System.out.println("已處理" + count + "個點");
}
final AtomicInteger num = new AtomicInteger();
@Override
public void failed(String address, List<MultiFieldPoint> points, Exception ex) {
System.err.println("業務回調出錯!" + points.size() + " error!");
ex.printStackTrace();
}
};
TSDBConfig config = TSDBConfig.address("127.0.0.1", 8242)
.httpConnectTimeout(90)
// 批次寫入時每次提交資料量
.batchPutSize(500)
// 單值非同步寫入線程數
.batchPutConsumerThreadCount(1)
// 多值非同步寫入緩衝隊列長度
.multiFieldBatchPutBufferSize(10000)
// 多值非同步寫入線程數
.multiFieldBatchPutConsumerThreadCount(1)
// 多值非同步寫入回調方法
.listenMultiFieldBatchPut(callback)
.config();
// 特別注意,TSDB只需初始化一次即可
TSDB tsdb = TSDBClientFactory.connect(config);
MultiFieldPoint point = MultiFieldPoint
.metric("test-test-test")
.tag("a", "1")
.tag("b", "2")
.timestamp(System.currentTimeMillis())
.field("f1", Math.random())
.field("f2", Math.random())
.build();
tsdb.multiFieldPut(point);
// 特別注意,在應用生命週期內不用關閉TSDB執行個體,直到應用結束時關閉即可
tsdb.close();
}使用摘要回調方法
摘要回調方法,當寫入成功時,報告這次寫入成功時的資料點列表,以及對應的寫入摘要情況,包括成功點數,失敗點數;當寫入失敗時,報告這次寫入失敗時的資料點列表以及對應的寫入異常。範例程式碼:
// 建立一個摘要回調方法,用於處理寫入成功或者失敗後的情況
AbstractMultiFieldBatchPutCallback callback = new MultiFieldBatchPutSummaryCallback() {
final AtomicInteger num = new AtomicInteger();
@Override
public void response(String address, List<MultiFieldPoint> points, SummaryResult result) {
int count = num.addAndGet(points.size());
System.out.println(result);
System.out.println("已處理" + count + "個點");
}
@Override
public void failed(String address, List<MultiFieldPoint> points, Exception ex) {
System.err.println("業務回調出錯!" + points.size() + " error!");
ex.printStackTrace();
}
};
TSDBConfig config = TSDBConfig.address("127.0.0.1", 8242)
.httpConnectTimeout(90)
// 批次寫入時每次提交資料量
.batchPutSize(500)
// 單值非同步寫入線程數
.batchPutConsumerThreadCount(1)
// 多值非同步寫入緩衝隊列長度
.multiFieldBatchPutBufferSize(10000)
// 多值非同步寫入線程數
.multiFieldBatchPutConsumerThreadCount(1)
// 多值非同步寫入回調方法
.listenMultiFieldBatchPut(callback)
.config();
// 特別注意,TSDB只需初始化一次即可
TSDB tsdb = TSDBClientFactory.connect(config);
MultiFieldPoint point = MultiFieldPoint
.metric("test-test-test")
.tag("a", "1")
.tag("b", "2")
.timestamp(System.currentTimeMillis())
.field("f1", Math.random())
.field("f2", Math.random())
.build();
tsdb.multiFieldPut(point);
// 特別注意,在應用生命週期內不用關閉TSDB執行個體,直到應用結束時關閉即可
tsdb.close();
}使用詳情回調方法
詳情回調方法,當寫入成功時,報告這次寫入成功時的資料點列表,以及對應的寫入詳情,包括寫入失敗的點以及對應錯誤資訊等;當寫入失敗時,報告這次寫入失敗時的資料點列表以及對應的寫入異常。範例程式碼:
// 建立一個詳情回調方法,用於處理寫入成功或者失敗後的情況
MultiFieldBatchPutDetailsCallback callback = new MultiFieldBatchPutDetailsCallback() {
final AtomicInteger num = new AtomicInteger();
@Override
public void response(String address, List<MultiFieldPoint> points, DetailsResult result) {
int count = num.addAndGet(points.size());
System.out.println(result);
System.out.println("已處理" + count + "個點");
}
@Override
public void failed(String address, List<MultiFieldPoint> points, Exception ex) {
System.err.println("業務回調出錯!" + points.size() + " error!");
ex.printStackTrace();
}
};
TSDBConfig config = TSDBConfig.address("127.0.0.1", 8242)
.httpConnectTimeout(90)
// 批次寫入時每次提交資料量
.batchPutSize(500)
// 單值非同步寫入線程數
.batchPutConsumerThreadCount(1)
// 多值非同步寫入緩衝隊列長度
.multiFieldBatchPutBufferSize(10000)
// 多值非同步寫入線程數
.multiFieldBatchPutConsumerThreadCount(1)
// 多值非同步寫入回調方法
.listenMultiFieldBatchPut(callback)
.config();
// 特別注意,TSDB只需初始化一次即可
TSDB tsdb = TSDBClientFactory.connect(config);
MultiFieldPoint point = MultiFieldPoint
.metric("test-test-test")
.tag("a", "1")
.tag("b", "2")
.timestamp(System.currentTimeMillis())
.field("f1", Math.random())
.field("f2", Math.random())
.build();
tsdb.multiFieldPut(point);
// 特別注意,在應用生命週期內不用關閉TSDB執行個體,直到應用結束時關閉即可
tsdb.close();
}