LindormTSDB SDK は、3 種類のインターフェイスを提供します。DDL(Data Definition Language)および DCL(Data Control Language)操作用の管理インターフェイス、時系列レコードの取り込み用の書き込みインターフェイス、および SQL を用いたデータ取得用のクエリインターフェイスです。
管理インターフェイス
管理インターフェイスは、LindormTSDB に対して SQL ステートメントを実行します。2 つのオーバーロードされたメソッドが利用可能です。
| メソッド署名 | 説明 |
|---|---|
Result execute(String sql) | デフォルトデータベースに対して SQL ステートメントを実行します |
Result execute(String database, String sql) | 指定したデータベースに対して SQL ステートメントを実行します |
Result オブジェクトには、実行結果が出力されます。
| フィールド | 型 | 説明 |
|---|---|---|
columns | List<String> | 返された結果のカラム名 |
metadata | List<String> | カラムのデータ型 |
rows | List<List<Object>> | 行単位で返される結果 |
データベースおよびテーブルの管理
DDL ステートメントを使用して、データベースおよびテーブルの作成、スキーマ確認(describe)、削除を行います。以下の例では、作成から削除までの完全なライフサイクルを説明します。
// 1. 既存のデータベース一覧を表示します。
String showDatabase = "show databases";
Result result = lindormTSDBClient.execute(showDatabase);
System.out.println("作成前、データベース一覧: " +
result.getRows().stream().map(e -> (String) e.get(0)).collect(Collectors.toList()));
// 2. 「demo」という名前のデータベースを作成します。
String createDatabase = "create database demo";
result = lindormTSDBClient.execute(createDatabase);
System.out.println("データベース作成: " + result.isSuccessful());
// 3. データベースが正しく作成されたことを確認します。
result = lindormTSDBClient.execute(showDatabase);
System.out.println("作成後、データベース一覧: " +
result.getRows().stream().map(e -> (String) e.get(0)).collect(Collectors.toList()));
String database = "demo";
// 4. 指定データベース内の既存テーブル一覧を表示します。
String showTables = "show tables";
result = lindormTSDBClient.execute(database, showTables);
System.out.println("作成前、テーブル一覧: " +
result.getRows().stream().map(e -> (String) e.get(0)).collect(Collectors.toList()));
// 5. テーブルを作成します。
String createTable = "CREATE TABLE sensor (device_id VARCHAR TAG, region VARCHAR TAG, " +
"time BIGINT, temperature DOUBLE, humidity DOUBLE, PRIMARY KEY(device_id))";
result = lindormTSDBClient.execute(database, createTable);
System.out.println("テーブル作成: " + result.isSuccessful());
// 6. テーブルが正しく作成されたことを確認します。
result = lindormTSDBClient.execute(database, showTables);
System.out.println("作成後、テーブル一覧: " +
result.getRows().stream().map(e -> (String) e.get(0)).collect(Collectors.toList()));
// 7. テーブルスキーマを確認します。
String describeTable = "describe table sensor";
result = lindormTSDBClient.execute(database, describeTable);
System.out.println("------------ テーブルスキーマ確認 -------------------");
List<String> columns = result.getColumns();
System.out.println("カラム名: " + columns);
List<String> metadata = result.getMetadata();
System.out.println("メタデータ: " + metadata);
List<List<Object>> rows = result.getRows();
for (int i = 0, size = rows.size(); i < size; i++) {
List<Object> row = rows.get(i);
System.out.println("カラム #" + i + " : " + row);
}
System.out.println("------------ テーブルスキーマ確認 -------------------");
// 8. テーブルを削除します。
String dropTable = "drop table sensor";
result = lindormTSDBClient.execute(database, dropTable);
System.out.println("テーブル削除: " + result.isSuccessful());
// 9. テーブルが正しく削除されたことを確認します。
result = lindormTSDBClient.execute(database, showTables);
System.out.println("削除後、テーブル一覧: " +
result.getRows().stream().map(e -> (String) e.get(0)).collect(Collectors.toList()));
// 10. データベースを削除します。
String dropDatabase = "drop database demo";
result = lindormTSDBClient.execute(dropDatabase);
System.out.println("データベース削除: " + result.isSuccessful());
// 11. データベースが正しく削除されたことを確認します。
result = lindormTSDBClient.execute(showDatabase);
System.out.println("削除後、データベース一覧: " +
result.getRows().stream().map(e -> (String) e.get(0)).collect(Collectors.toList()));期待される出力:
作成前、データベース一覧: [default]
データベース作成: true
作成後、データベース一覧: [default, demo]
作成前、テーブル一覧: []
テーブル作成: true
作成後、テーブル一覧: [sensor]
------------ テーブルスキーマ確認 -------------------
カラム名: [columnName, typeName, columnKind]
メタデータ: [VARCHAR, VARCHAR, VARCHAR]
カラム #0 : [device_id, VARCHAR, TAG]
カラム #1 : [region, VARCHAR, TAG]
カラム #2 : [time, TIMESTAMP, TIMESTAMP]
カラム #3 : [temperature, DOUBLE, FIELD]
カラム #4 : [humidity, DOUBLE, FIELD]
------------ テーブルスキーマ確認 -------------------
テーブル削除: true
削除後、テーブル一覧: []
データベース削除: true
削除後、データベース一覧: [default]継続的クエリ(continuous queries)に関する DDL 操作は本項では扱いません。LindormTSDB でサポートされるフル SQL 構文については、「SQL 構文」をご参照ください。
インターフェイスの作成
LindormTSDB SDK は、スループットの最大化を目的として、デフォルトで非同期でデータを書き込みます。書き込みインターフェイスは以下の機能をサポートします。
単一レコードおよびバッチ書き込み(バッチ書き込みは非同期キュー内のロック競合を軽減し、推奨されます)
2 種類の結果処理パターン:インライン処理向けの
CompletableFuture<WriteResult>、イベント駆動型処理向けのCallback.join()を呼び出すことで、返されたCompletableFutureを用いた同期書き込み
書き込みレコードの構築
Record オブジェクトは、1 行分の書き込みデータを保持します。テーブル名、タイムスタンプ、タグ、およびフィールド値を指定します。
Record record = Record
.table("sensor") // テーブル名
.time(currentTime) // タイムスタンプ(ミリ秒単位)
.tag("device_id", "F07A1260") // タグ:インデックス付きのキー/バリューディメンション
.tag("region", "north-cn")
.addField("temperature", 12.1) // フィールド:インデックスなしのメトリック値
.addField("humidity", 45.0)
.build();Record の構築時に、SDK はデフォルトで文字の有効性を検証します。false を build() に渡すことで、この検証をスキップできます。
CompletableFuture を用いた書き込み
すべてのオーバーロードメソッドは CompletableFuture<WriteResult> を返します。非同期キュー内のロック競合を軽減するため、バッチ書き込みを推奨します。
| メソッド署名 | 説明 |
|---|---|
CompletableFuture<WriteResult> write(Record record) | デフォルトデータベースへ単一レコードを書き込みます |
CompletableFuture<WriteResult> write(String database, Record record) | 指定データベースへ単一レコードを書き込みます |
CompletableFuture<WriteResult> write(List<Record> records) | デフォルトデータベースへ複数レコードを書き込みます(推奨) |
CompletableFuture<WriteResult> write(String database, List<Record> records) | 指定データベースへ複数レコードを書き込みます(推奨) |
単一レコード:
// デフォルトデータベース
CompletableFuture<WriteResult> future = lindormTSDBClient.write(record);
// 指定データベース
String database = "demo";
CompletableFuture<WriteResult> future = lindormTSDBClient.write(database, record);バッチ書き込み(推奨):
List<Record> records;
// デフォルトデータベース
CompletableFuture<WriteResult> future = lindormTSDBClient.write(records);
// 指定データベース
String database = "demo";
CompletableFuture<WriteResult> future = lindormTSDBClient.write(database, records);結果の処理:
CompletableFuture<WriteResult> future = lindormTSDBClient.write(records);
future.whenComplete((r, ex) -> {
if (ex != null) {
// 書き込み送信に失敗しました。
System.out.println("書き込みに失敗しました。");
Throwable throwable = ExceptionUtils.getRootCause(ex);
if (throwable instanceof LindormTSDBException) {
LindormTSDBException e = (LindormTSDBException) throwable;
System.out.println("エラーコード: " + e.getCode());
System.out.println("SQL ステート: " + e.getSqlstate());
System.out.println("エラーメッセージ: " + e.getMessage());
} else {
throwable.printStackTrace();
}
} else {
// 書き込み完了。
if (r.isSuccessful()) {
System.out.println("書き込みに成功しました。");
} else {
System.out.println("書き込みに失敗しました。");
}
}
});whenComplete 内で複雑または時間のかかる計算を実行しないでください。そのような処理は、独立したスレッドプールにオフロードしてください。エラーコードの詳細については、「共通エラーコード」をご参照ください。
コールバックを用いた書き込み
Callback を write() メソッドに渡します。onCompletion は、書き込み結果、対応するレコード、および例外を引数として受け取ります。
public interface Callback {
void onCompletion(WriteResult result, List<Record> records, Throwable e);
}| メソッド署名 | 説明 |
|---|---|
write(Record record, Callback callback) | デフォルトデータベースへ単一レコードをコールバック付きで書き込みます |
write(String database, Record record, Callback callback) | 指定データベースへ単一レコードをコールバック付きで書き込みます |
write(List<Record> records, Callback callback) | デフォルトデータベースへ複数レコードをコールバック付きで書き込みます(推奨) |
write(String database, List<Record> records, Callback callback) | 指定データベースへ複数レコードをコールバック付きで書き込みます(推奨) |
コールバックの実装と渡し方:
Callback callback = new Callback() {
@Override
public void onCompletion(WriteResult result, List<Record> list, Throwable throwable) {
if (throwable != null) {
// 書き込みに失敗しました。
if (throwable instanceof LindormTSDBException) {
LindormTSDBException ex = (LindormTSDBException) throwable;
System.out.println("エラーコード: " + ex.getCode());
System.out.println("SQL ステート: " + ex.getSqlstate());
System.out.println("メッセージ: " + ex.getMessage());
} else {
throwable.printStackTrace();
}
} else {
if (result.isSuccessful()) {
System.out.println("書き込みに成功しました。");
} else {
System.out.println("書き込みに失敗しました。");
}
}
}
};
// バッチ書き込み — 推奨
List<Record> records;
lindormTSDBClient.write(records, callback);
// 単一レコード書き込み
lindormTSDBClient.write(record, callback);onCompletion 内で複雑または時間のかかる計算を実行しないでください。そのような処理は、独立したスレッドプールにオフロードしてください。エラーコードの詳細については、「共通エラーコード」をご参照ください。
クエリインターフェイス
クエリインターフェイスは SQL ステートメントを実行し、結果をチャンク単位でストリーム形式で返します。すべてのオーバーロードメソッドは ResultSet を返します。
| メソッド署名 | 説明 |
|---|---|
ResultSet query(String sql) | デフォルトデータベースをクエリ対象とし、チャンクあたり最大 1,000 行を返します |
ResultSet query(String database, String sql) | 指定データベースをクエリ対象とし、チャンクあたり最大 1,000 行を返します |
ResultSet query(String database, String sql, int chunkSize) | 指定データベースをクエリ対象とし、カスタムのチャンクサイズを指定できます |
パラメーター:
| パラメーター | タイプ | 説明 |
|---|---|---|
database | String | クエリ対象のデータベース名 |
sql | String | SQL 構文実行する SQL ステートメント。サポートされている構文については、「」をご参照ください。 |
chunkSize | int | バッチごとに返される行数。デフォルト値:1,000。 |
クエリ結果の処理
ResultSet は next() を繰り返し呼び出してイテレートし、null が返された時点で終了します。その後、I/O リソースを解放するために結果セットを必ず閉じてください。
public interface ResultSet extends Closeable {
QueryResult next();
void close();
}各 QueryResult オブジェクトには以下のフィールドが含まれます。
| フィールド | 型 | 説明 |
|---|---|---|
columns | List<String> | クエリ結果のカラム名 |
metadata | List<String> | カラムのデータ型。詳細については、「データ型」をご参照ください。 |
rows | List<List<Object>> | 行単位で返されるクエリ結果 |
例:
String sql = "select * from sensor";
int chunkSize = 100;
ResultSet resultSet = lindormTSDBClient.query("demo", sql, chunkSize);
try {
QueryResult result = null;
// next() が null を返すまでイテレート — 全結果を取得済みです。
while ((result = resultSet.next()) != null) {
List<String> columns = result.getColumns();
System.out.println("カラム名: " + columns);
List<String> metadata = result.getMetadata();
System.out.println("メタデータ: " + metadata);
List<List<Object>> rows = result.getRows();
for (int i = 0, size = rows.size(); i < size; i++) {
List<Object> row = rows.get(i);
System.out.println("行 #" + i + " : " + row);
}
}
} finally {
// クエリの完了後は、成功・失敗に関わらず、必ず ResultSet を閉じます。
resultSet.close();
}クエリの完了後は、成功・失敗に関わらず、必ず resultSet.close() を呼び出してください。これを省略すると接続リークが発生します。