このトピックでは、LindormTSDB SDK が提供する共通インターフェースの使用方法について説明します。
LindormTSDB SDK は、管理インターフェース、書き込みインターフェース、およびクエリインターフェースを提供します。
管理インターフェース
LindormTSDB では、SQL ステートメントを実行して、データ定義言語 (DDL) およびデータ制御言語 (DCL) 操作を実行できます。このセクションでは、SQL ステートメントを送信して DDL および DCL 操作を実行する方法の例を示します。 LindormTSDB SDK では、次のオーバーロードメソッドのいずれかを使用して、SQL ステートメントを LindormTSDB に送信し、DDL および DCL 操作を実行できます。
// SQL ステートメントを直接送信します。
Result execute(String sql);
// 指定されたデータベースに対して DDL 操作を実行します。たとえば、データベースにテーブルを作成する DDL 操作を実行できます。
Result execute(String database, String sql);Result オブジェクトには、SQL ステートメントの実行結果が含まれています。たとえば、SHOW DATABASES ステートメントを送信した後、Result オブジェクトから既存のデータベースのリストを取得できます。 Result オブジェクトは、columns、metadata、および rows フィールドで構成されています。 columns フィールドには、返された結果のすべての列の名前が含まれています。 metadata フィールドには、列のデータ型が含まれています。 rows フィールドには、行ごとに返される結果が含まれています。
public class Result {
private List<String> columns;
private List<String> metadata;
private List<List<Object>> rows;
....
}データベース管理とテーブル管理の例
DDL 操作を実行して、データベースまたはテーブルの作成、削除、およびクエリを実行できます。次のコマンドを使用して、スキーマ制約ポリシーを弱い制約に設定できます。
// 1. 既存のデータベースをクエリします。
String showDatabase = "show databases";
Result result = lindormTSDBClient.execute(showDatabase);
System.out.println("before create, db list: " + result.getRows().stream().map(e -> (String) e.get(0)).collect(Collectors.toList()));
// 2. データベースを作成します。この例では、demo という名前のデータベースが作成されます。
String createDatabase = "create database demo";
result = lindormTSDBClient.execute(createDatabase);
System.out.println("create database:" + result.isSuccessful());
// 3. 既存のデータベースをクエリします。
result = lindormTSDBClient.execute(showDatabase);
System.out.println("after create, db list: " + result.getRows().stream().map(e -> (String) e.get(0)).collect(Collectors.toList()));
String database = "demo";
// 4. 既存のすべてのテーブルをクエリします。
String showTables = "show tables";
result = lindormTSDBClient.execute(database, showTables);
System.out.println("before create, table list: " + result.getRows().stream().map(e -> (String) e.get(0)).collect(Collectors.toList()));
// 5. テーブルを作成します。
String createTable = "CREATE TABLE sensor (device_id VARCHAR TAG,region VARCHAR TAG,time BIGINT,temperature DOUBLE,humidity DOUBLE,PRIMARY KEY(device_id))";
result = lindormTSDBClient.execute(database, createTable);
System.out.println("create table: " + result.isSuccessful());
// 6. 既存のテーブルをクエリして、テーブル sensor が作成されているかどうかを確認します。
result = lindormTSDBClient.execute(database, showTables);
System.out.println("after create, table list: " + result.getRows().stream().map(e -> (String) e.get(0)).collect(Collectors.toList()));
// 7. テーブルに関する詳細情報をクエリします。
String describeTable = "describe table sensor";
result = lindormTSDBClient.execute(database, describeTable);
System.out.println("------------ describe table -------------------");
List<String> columns = result.getColumns();
System.out.println("columns: " + columns);
List<String> metadata = result.getMetadata();
System.out.println("metadata: " + metadata);
List<List<Object>> rows = result.getRows();
for (int i = 0, size = rows.size(); i < size; i++) {
List<Object> row = rows.get(i);
System.out.println("column #" + i + " : " + row);
}
System.out.println("------------ describe table -------------------");
// 8. テーブルを削除します。
String dropTable = "drop table sensor";
result = lindormTSDBClient.execute(database, dropTable);
System.out.println("drop table: " + result.isSuccessful());
// 9. 既存のテーブルをクエリして、テーブルが削除されているかどうかを確認します。
result = lindormTSDBClient.execute(database, showTables);
System.out.println("after drop, table list: " + result.getRows().stream().map(e -> (String) e.get(0)).collect(Collectors.toList()));
// 10. データベースを削除します。
String dropDatabase = "drop database demo";
result = lindormTSDBClient.execute(dropDatabase);
System.out.println("drop database:" + result.isSuccessful());
// 11. 既存のデータベースをクエリして、データベースが削除されているかどうかを確認します。
result = lindormTSDBClient.execute(showDatabase);
System.out.println("after drop, db list : " + result.getRows().stream().map(e -> (String) e.get(0)).collect(Collectors.toList()));サンプル結果:
before create, db list: [default]
create database:true
after create, db list: [default, demo]
before create, table list: []
create table: true
after create, table list: [sensor]
------------ describe table -------------------
columns: [columnName, typeName, columnKind]
metadata: [VARCHAR, VARCHAR, VARCHAR]
column #0 : [device_id, VARCHAR, TAG]
column #1 : [region, VARCHAR, TAG]
column #2 : [time, TIMESTAMP, TIMESTAMP]
column #3 : [temperature, DOUBLE, FIELD]
column #4 : [humidity, DOUBLE, FIELD]
------------ describe table -------------------
drop table: true
after drop, table list: []
drop database:true
after drop, db list : [default]継続的なクエリの作成と削除を実行する DDL 操作など、その他の管理関連の操作については、このセクションでは説明していません。 LindormTSDB でサポートされている SQL 構文を参照して、これらの操作を実行できます。 LindormTSDB でサポートされている SQL ステートメントの詳細については、System policies for Lindorm をご参照ください。
書き込みインターフェース
LindormTSDB SDK は、複数の書き込みインターフェースを提供します。これらの書き込みインターフェースは、名前は同じですが、異なるパラメーターを使用する書き込みメソッドです。書き込みインターフェースは、インターフェースが書き込み結果を処理する方法に基づいて、次のタイプに分類できます。 CompletableFuture<WriteResult> クラスを使用して書き込み結果を処理する書き込みインターフェースと、コールバック関数を使用して非同期書き込み結果を処理する書き込みインターフェースです。
デフォルトでは、LindormTSDBClient は非同期バッチ書き込みメソッドを使用して、書き込み操作の効率を向上させます。同期書き込み操作を実行する場合は、書き込みメソッドによって返される CompletableFuture<WriteResult> クラスの join メソッドを呼び出します。
書き込み操作レコード
LindormTSDB SDK は、Record オブジェクトを使用して、テーブルに書き込まれる行に関する情報を格納します。 Record オブジェクトには、テーブル名、タイムスタンプ、タグ、フィールド名、およびフィールド値を指定する必要があります。タイムスタンプとタグは、インデックスの作成に使用されます。
デフォルトでは、LindormTSDB SDK は、Record オブジェクトを作成するときに文字の有効性を検証します。 build() メソッドでパラメーター 'false' を指定できます。
Record record = Record
// テーブルの名前。
.table("sensor")
// ミリ秒単位のタイムスタンプ。
.time(currentTime)
// タグ。
.tag("device_id", "F07A1260")
.tag("region", "north-cn")
// フィールド名とフィールド値。
.addField("temperature", 12.1)
.addField("humidity", 45.0)
.build();CompletableFuture オブジェクトを使用して書き込み結果を処理するインターフェース
書き込み操作レコードを一度に 1 つだけ送信します。
// デフォルトのデータベースに書き込みます。
CompletableFuture<WriteResult> future = lindormTSDBClient.write(record);
// 指定されたデータベースにデータを書き込みます。
String database = "demo";
CompletableFuture<WriteResult> future = lindormTSDBClient.write(database, record);複数の書き込み操作レコードをバッチで送信します。このメソッドを使用することをお勧めします。このメソッドを使用すると、LindormTSDB SDK を使用してタスクを送信するときに、非同期キューの保留中のタスクで発生するロック競合の問題を軽減できます。
List<Record> records;
// デフォルトのデータベースにデータを書き込みます。
CompletableFuture<WriteResult> future = lindormTSDBClient.write(records);
// 指定されたデータベースにデータを書き込みます。
String database = "demo";
CompletableFuture<WriteResult> future = lindormTSDBClient.write(database, records);CompletableFuture<WriteResult> オブジェクトによって返された結果を処理します。次のコードブロックは例を示しています。ビジネス要件に基づいて、CompletableFuture オブジェクトの他のメソッドを呼び出すことができます。
CompletableFuture<WriteResult> future = lindormTSDBClient.write(records);
// 非同期書き込み結果を処理します。
future.whenComplete((r, ex) -> {
if (ex != null) { // 送信例外が発生します。ほとんどの場合、書き込み操作が失敗したために例外が返されます。
System.out.println("Failed to write.");
Throwable throwable = ExceptionUtils.getRootCause(ex);
if (throwable instanceof LindormTSDBException) {
LindormTSDBException e = (LindormTSDBException) throwable;
System.out.println("Caught an LindormTSDBException, which means your request made it to Lindorm TSDB, "
+ "but was rejected with an error response for some reason.");
System.out.println("Error Code: " + e.getCode());
System.out.println("SQL State: " + e.getSqlstate());
System.out.println("Error Message: " + e.getMessage());
} else {
throwable.printStackTrace();
}
} else { // 書き込み操作は成功です。
if (r.isSuccessful()) {
System.out.println("Write successfully.");
} else {
System.out.println("Write failure.");
}
}
});CompletableFuture オブジェクトの whenComplete メソッドを使用して、複雑で時間のかかる計算を実行しないでください。そうしないと、計算が停止する可能性があります。複雑で時間のかかる計算を実行する場合は、計算タスクを他の独立したスレッドプールに送信します。エラーコードの詳細については、一般的なエラーコード をご参照ください。
コールバック関数を使用して非同期書き込み結果を処理するインターフェース
次のコードブロックは、書き込みコールバックを示しています。 onCompletion メソッドの result パラメーターは書き込み結果を示し、records パラメーターはコールバックに対応する書き込み操作レコードを示し、e パラメーターは書き込み操作の失敗時にスローされる例外を示します。
public interface Callback {
void onCompletion(WriteResult result, List<Record> records, Throwable e);
}次の例は、コールバックを使用して書き込み結果を処理する方法を示しています。
Callback callback = new Callback() {
@Override
public void onCompletion(WriteResult result, List<Record> list,
Throwable throwable) {
if (throwable != null) { // 書き込み操作の失敗。
if (throwable instanceof LindormTSDBException) {
LindormTSDBException ex = (LindormTSDBException) throwable;
System.out.println("errorCode: " + ex.getCode());
System.out.println("sqlstate: " + ex.getSqlstate());
System.out.println("message: " + ex.getMessage());
} else {
// その他のエラー。
throwable.printStackTrace();
}
} else {
if (result.isSuccessful()) {
System.out.println("Write successfully.");
} else {
System.out.println("Write failure.");
}
}
}
};コールバックの onCompletion メソッドを使用して、複雑で時間のかかる計算を実行しないでください。そうしないと、計算が停止する可能性があります。複雑で時間のかかる計算を実行する場合は、計算タスクを他の独立したスレッドプールに送信します。エラーコードの詳細については、一般的なエラーコード をご参照ください。
書き込み操作レコードを一度に 1 つだけ送信します。
// デフォルトのデータベースにデータを書き込みます。
lindormTSDBClient.write(record, callback);
// 指定されたデータベースにデータを書き込みます。
String database = "demo";
lindormTSDBClient.write(database, record, callback);複数の書き込み操作レコードをバッチで送信します。このメソッドを使用することをお勧めします。このメソッドを使用すると、LindormTSDB SDK を使用してタスクを送信するときに、非同期キューの保留中のタスクで発生するロック競合の問題を軽減できます。
List<Record> records;
// デフォルトのデータベースにデータを書き込みます。
lindormTSDBClient.write(records, callback);
// 指定されたデータベースにデータを書き込みます。
String database = "demo";
lindormTSDBClient.write(database, records, callback);クエリ操作
LindormTSDB SDK が提供するクエリインターフェースを使用すると、SQL ステートメントを使用してデータをクエリできます。詳細については、System policies for Lindorm をご参照ください。
次のコードブロックは、LindormTSDB SDK が提供するクエリインターフェースの例を示しています。入力パラメーターを構成して、クエリするデータベース、データのクエリに使用する SQL ステートメント、および各バッチで返す行数を指定するチャンクサイズを指定する必要があります。
ResultSet query(String database, String sql, int chunkSize);次のコードブロックは、LindormTSDB SDK が SQL ステートメントの結果を表示するために使用する ResultSet インターフェースを示しています。
public interface ResultSet extends Closeable {
QueryResult next();
void close();
}ResultSet インターフェースの next() メソッドを使用して、ループでクエリ結果を取得できます。 next() メソッドによって返される QueryResult オブジェクトが null の場合、すべてのクエリ結果が取得されます。すべてのクエリ結果が取得された後、ResultSet インターフェースの close メソッドを呼び出して、占有されている入力および出力リソースを解放できます。
QueryResult オブジェクトは、columns、metadata、および rows フィールドで構成されています。 columns フィールドには、クエリ結果のすべての列の名前が含まれています。 metadata フィールドには、列のデータ型が含まれています。 rows フィールドには、行ごとに返されるクエリ結果が含まれています。 LindormTSDB でサポートされているデータ型の詳細については、データ型をご参照ください。次の例は、ResultSet を使用してクエリ結果を処理する方法を示しています。
String sql = "select * from sensor";
int chunkSize = 100;
ResultSet resultSet = lindormTSDBClient.query("demo", sql, chunkSize);
// クエリ結果を処理します。
try {
QueryResult result = null;
// ResultSet の next() メソッドが null を返す場合、すべてのクエリ結果が取得されます。
while ((result = resultSet.next()) != null) {
List<String> columns = result.getColumns();
System.out.println("columns: " + columns);
List<String> metadata = result.getMetadata();
System.out.println("metadata: " + metadata);
List<List<Object>> rows = result.getRows();
for (int i = 0, size = rows.size(); i < size; i++) {
List<Object> row = rows.get(i);
System.out.println("row #" + i + " : " + row);
}
}
} finally {
// すべてのクエリ結果が取得された後、ResultSet の close メソッドを呼び出して、占有されている入力および出力リソースを解放してください。
resultSet.close();
}クエリの成否に関係なく、クエリが完了したら、ResultSet の close メソッドを明示的に呼び出して、入力および出力リソースを解放する必要があります。そうしないと、接続リークが発生します。
LindormTSDB SDK は、クエリインターフェースをオーバーロードするための複数の実装も提供します。ビジネス要件に基づいて、次の実装のいずれかを選択できます。
// SQL ステートメント。
String sql = "xxxx";
// 1. SQL ステートメントを実行して、デフォルトのデータベースをクエリします。デフォルトでは、各バッチで 1,000 行が返されます。
ResultSet resultSet = lindormTSDBClient.query(sql);
String database = "demo";
// 2. SQL ステートメントを実行して、指定されたデータベースをクエリします。デフォルトでは、各バッチで 1,000 行が返されます。
ResultSet resultSet = lindormTSDBClient.query(database, sql);