全部產品
Search
文件中心

Time Series Database:SDK多值寫入

更新時間:Jul 06, 2024

本文通過範例程式碼指導您使用多值模型寫入資料。詳細樣本請參見Aliyun TSDB SDK for Java中的test/example/ExampleOfMultiField和test/com.aliyun.hitsdb.client/TestHiTSDBClientMultiFieldFeatures

說明 多值模型僅在0.2.0版本及以上的SDK支援。關於最新SDK版本的說明及下載,請參見SDK的《版本說明》。之前版本的SDK所支援的多值模型實際是類比多值模型,不鼓勵繼續使用。之前通過MultiValue 類寫入的資料並不能通過最新的 SDK 多值模型相關類 (MultiField) 進行查詢。

多值模型的資料寫入

範例程式碼:

        /*
         *  通過 multiFieldPutSync() API。
         *  該 API 支援單點和多點(List)寫入。
         *
         *  下面是必須寫入時必須提供的資訊:
         *  Metric: 資料指標的類別。相當於 influxdb 的 Measurement。
         *  Fields: 資料指標的度量資訊(相當於指標下的子類別)。即一個 metric 支援多個 fields。如 metric 為 wind,該 metric 可以有多個 fields:direction, speed, direction, description 和 temperature。
         *  Timestamp: 資料點的時間戳記
         *  Tags: 時間軸額外的資訊,如"型號=ABC123"、"出廠編號=1234567890"等。
         */
        MultiFieldPoint multiFieldPoint = MultiFieldPoint.metric("wind")
                .field("speed", 45.2)
                .field("level", 1.2)
                .field("direction", "S")
                .field("description", "Breeze")
                .tag("sensor", "95D8-7913")
                .tag("city", "hangzhou")
                .tag("province", "zhejiang")
                .timestamp(1537170208L)
                .build();

        // 同步寫入
        tsdb.multiFieldPutSync(multiFieldPoint);

多值模型的非同步寫入

注意:多值模型非同步僅在 0.2.1 版本及以上的 SDK 支援。關於最新 SDK 版本的說明及下載,請參見SDK的《版本說明》。

不使用回調方法

預設的TSDBConfig沒有設定多值非同步寫入回調方法。範例程式碼:
TSDBConfig config = TSDBConfig.address("127.0.0.1", 8242)
                // 批次寫入時每次提交資料量
                .batchPutSize(500)
                // 單值非同步寫入線程數
                .batchPutConsumerThreadCount(1)
                // 多值非同步寫入緩衝隊列長度
                .multiFieldBatchPutBufferSize(10000)
                // 多值非同步寫入線程數
                .multiFieldBatchPutConsumerThreadCount(1)
                .config();

        // 特別注意,TSDB只需初始化一次即可
        TSDB tsdb = TSDBClientFactory.connect(config);

        MultiFieldPoint point = MultiFieldPoint
                .metric("test-test-test")
                .tag("a", "1")
                .tag("b", "2")
                .timestamp(System.currentTimeMillis())
                .field("f1", Math.random())
                .field("f2", Math.random())
                .build();
        tsdb.multiFieldPut(point);

        // 特別注意,在應用生命週期內不用關閉TSDB執行個體,直到應用結束時關閉即可
        tsdb.close();
    }

使用普通回調方法

普通回調方法僅報告當寫入成功時的資料點列表,或者當寫入失敗時的資料點列表以及對應的寫入異常。範例程式碼:
      // 建立一個普通回調方法,用於處理寫入成功或者失敗後的情況
      AbstractMultiFieldBatchPutCallback callback = new MultiFieldBatchPutCallback() {

            @Override
            public void response(String address, List<MultiFieldPoint> points, Result result) {
                int count = num.addAndGet(points.size());
                System.out.println(result);
                System.out.println("已處理" + count + "個點");
            }
            final AtomicInteger num = new AtomicInteger();
            @Override
            public void failed(String address, List<MultiFieldPoint> points, Exception ex) {
                System.err.println("業務回調出錯!" + points.size() + " error!");
                ex.printStackTrace();
            }
        };

       TSDBConfig config = TSDBConfig.address("127.0.0.1", 8242)
                .httpConnectTimeout(90)
                // 批次寫入時每次提交資料量
                .batchPutSize(500)
                // 單值非同步寫入線程數
                .batchPutConsumerThreadCount(1)
                // 多值非同步寫入緩衝隊列長度
                .multiFieldBatchPutBufferSize(10000)
                // 多值非同步寫入線程數
                .multiFieldBatchPutConsumerThreadCount(1)
                // 多值非同步寫入回調方法
                .listenMultiFieldBatchPut(callback)
                .config();

        // 特別注意,TSDB只需初始化一次即可
        TSDB tsdb = TSDBClientFactory.connect(config);

        MultiFieldPoint point = MultiFieldPoint
                .metric("test-test-test")
                .tag("a", "1")
                .tag("b", "2")
                .timestamp(System.currentTimeMillis())
                .field("f1", Math.random())
                .field("f2", Math.random())
                .build();
        tsdb.multiFieldPut(point);

        // 特別注意,在應用生命週期內不用關閉TSDB執行個體,直到應用結束時關閉即可
        tsdb.close();
    }

使用摘要回調方法

摘要回調方法,當寫入成功時,報告這次寫入成功時的資料點列表,以及對應的寫入摘要情況,包括成功點數,失敗點數;當寫入失敗時,報告這次寫入失敗時的資料點列表以及對應的寫入異常。範例程式碼:
      // 建立一個摘要回調方法,用於處理寫入成功或者失敗後的情況
      AbstractMultiFieldBatchPutCallback callback = new MultiFieldBatchPutSummaryCallback() {

            final AtomicInteger num = new AtomicInteger();
            @Override
            public void response(String address, List<MultiFieldPoint> points, SummaryResult result) {
                int count = num.addAndGet(points.size());
                System.out.println(result);
                System.out.println("已處理" + count + "個點");
            }



            @Override
            public void failed(String address, List<MultiFieldPoint> points, Exception ex) {
                System.err.println("業務回調出錯!" + points.size() + " error!");
                ex.printStackTrace();
            }
        };

       TSDBConfig config = TSDBConfig.address("127.0.0.1", 8242)
                .httpConnectTimeout(90)
                // 批次寫入時每次提交資料量
                .batchPutSize(500)
                // 單值非同步寫入線程數
                .batchPutConsumerThreadCount(1)
                // 多值非同步寫入緩衝隊列長度
                .multiFieldBatchPutBufferSize(10000)
                // 多值非同步寫入線程數
                .multiFieldBatchPutConsumerThreadCount(1)
                // 多值非同步寫入回調方法
                .listenMultiFieldBatchPut(callback)
                .config();

        // 特別注意,TSDB只需初始化一次即可
        TSDB tsdb = TSDBClientFactory.connect(config);

        MultiFieldPoint point = MultiFieldPoint
                .metric("test-test-test")
                .tag("a", "1")
                .tag("b", "2")
                .timestamp(System.currentTimeMillis())
                .field("f1", Math.random())
                .field("f2", Math.random())
                .build();
        tsdb.multiFieldPut(point);

        // 特別注意,在應用生命週期內不用關閉TSDB執行個體,直到應用結束時關閉即可
        tsdb.close();
    }

使用詳情回調方法

詳情回調方法,當寫入成功時,報告這次寫入成功時的資料點列表,以及對應的寫入詳情,包括寫入失敗的點以及對應錯誤資訊等;當寫入失敗時,報告這次寫入失敗時的資料點列表以及對應的寫入異常。範例程式碼:
      // 建立一個詳情回調方法,用於處理寫入成功或者失敗後的情況
      MultiFieldBatchPutDetailsCallback callback = new MultiFieldBatchPutDetailsCallback() {

            final AtomicInteger num = new AtomicInteger();

            @Override
            public void response(String address, List<MultiFieldPoint> points, DetailsResult result) {
                int count = num.addAndGet(points.size());
                System.out.println(result);
                System.out.println("已處理" + count + "個點");
            }

            @Override
            public void failed(String address, List<MultiFieldPoint> points, Exception ex) {
                System.err.println("業務回調出錯!" + points.size() + " error!");
                ex.printStackTrace();
            }
        };

       TSDBConfig config = TSDBConfig.address("127.0.0.1", 8242)
                .httpConnectTimeout(90)
                // 批次寫入時每次提交資料量
                .batchPutSize(500)
                // 單值非同步寫入線程數
                .batchPutConsumerThreadCount(1)
                // 多值非同步寫入緩衝隊列長度
                .multiFieldBatchPutBufferSize(10000)
                // 多值非同步寫入線程數
                .multiFieldBatchPutConsumerThreadCount(1)
                // 多值非同步寫入回調方法
                .listenMultiFieldBatchPut(callback)
                .config();

        // 特別注意,TSDB只需初始化一次即可
        TSDB tsdb = TSDBClientFactory.connect(config);

        MultiFieldPoint point = MultiFieldPoint
                .metric("test-test-test")
                .tag("a", "1")
                .tag("b", "2")
                .timestamp(System.currentTimeMillis())
                .field("f1", Math.random())
                .field("f2", Math.random())
                .build();
        tsdb.multiFieldPut(point);

        // 特別注意,在應用生命週期內不用關閉TSDB執行個體,直到應用結束時關閉即可
        tsdb.close();
    }