Lindorm 検索エンジンは、オープンソースの Elasticsearch (ES) API と互換性があります。これにより、検索インデックスにアクセスでき、ES 技術スタックを使用するサービスのスムーズな移行が容易になります。このトピックでは、LindormTable の SQL テーブルを例に、ES API を使用して検索インデックスにアクセスするためのベストプラクティスを説明します。
前提条件
検索インデックスサービスを有効化していること。Elasticsearch 互換バージョンの Lindorm 検索エンジンを使用する必要があります。
ES 7.10 クライアントを使用してサービスにアクセスする必要があります。異なるバージョンのクライアントを使用する場合は、Lindorm のテクニカルサポート (DingTalk ID:s0s3eg3) にご連絡ください。
データの準備
SQL テーブルを準備し、サンプルデータを書き込みます。
LindormTable で SQL テーブルを作成する
CREATE DATABASE searchindex_db;
USE searchindex_db;
CREATE TABLE search_table (
user_id BIGINT,
name VARCHAR,
age SMALLINT,
gender VARCHAR,
address VARCHAR,
email VARCHAR,
city VARCHAR,
PRIMARY KEY (user_id, name)
);サンプルデータを書き込む
UPSERT INTO search_table (user_id,name,age,gender,address,email,city) VALUES (1, 'Mr. Zhang', 18, 'M', 'Chaoyang District, Beijing', 'a***@example.net', 'Beijing');
UPSERT INTO search_table (user_id,name,age,gender,address,email,city) VALUES (6, 'Mr. Li', 32, 'M', 'Yuhang District, Hangzhou', 'a***@example.net', 'Hangzhou');
UPSERT INTO search_table (user_id,name,age,gender,address,email,city) VALUES (20, 'Mr. Wang', 28, 'M', 'Binjiang District, Hangzhou', 'a***@example.net', 'Hangzhou');
UPSERT INTO search_table (user_id,name,age,gender,address,email,city) VALUES (28, 'Ms. Chen', 36, 'F', 'Nanshan District, Shenzhen', 'a***@example.net', 'Shenzhen');検索インデックスの作成
Lindorm の SQL テーブルでは、SQL を使用して検索インデックスを作成できます。このトピックでは、簡単な例のみを示します。詳細については、「検索インデックスの管理」をご参照ください。
インデックスの作成
CREATE INDEX idx USING SEARCH ON search_table (
name,
age,
gender,
address(type=text, analyzer=ik),
email,
city
) WITH (numShards=4);インデックスソースを有効にする
デフォルトでは、SQL テーブル用に作成された検索インデックスは、生データを検索エンジンに保存しません。これにより、ストレージ容量を節約できます。ES API を使用して生データを直接取得するには、インデックスの作成時に SOURCE_SETTINGS プロパティを指定します。
SOURCE_SETTINGS を指定するには、LindormTable のバージョンが 2.7.4.1 以降である必要があります。
CREATE INDEX idx USING SEARCH ON search_table (
name,
age,
gender,
address(type=text, analyzer=ik),
email,
city
) WITH (
numShards=4,
SOURCE_SETTINGS='
{
"enabled": true
}
'
);Elasticsearch インデックス名を取得する
SQL を使用して検索インデックスを作成すると、対応する Elasticsearch インデックス名は namespace_name.table_name.search_index_name というフォーマットになります。上記の例では、Elasticsearch インデックス名は searchindex_db.search_table.idx です。UI にログインし、次のクエリを実行して、対応するインデックス情報を取得できます。
GET searchindex_db.search_table.idxワイドテーブルと Elasticsearch インデックスフィールドのマッピング
デフォルトでは、SQL テーブルのフィールド名と Elasticsearch インデックスのフィールド名は 1 対 1 でマッピングされます。例えば、上記の例では、最終的な Elasticsearch インデックスのマッピング構造は次のようになります。
{
"mappings": {
"dynamic": "true",
"dynamic_templates": [],
"properties": {
"_searchindex_id": {
"type": "keyword",
"index": false
},
"address": {
"type": "text",
"analyzer": "ik_max_word"
},
"age": {
"type": "integer"
},
"city": {
"type": "keyword"
},
"delete_version_l": {
"type": "long"
},
"email": {
"type": "keyword"
},
"gender": {
"type": "keyword"
},
"update_version_l": {
"type": "long"
}
}
}
}SQL テーブルの age 列はインデックスの age フィールドに対応し、gender 列は gender フィールドに対応します。インデックスには、update_version_l などの自動的に追加される組み込みフィールドも含まれています。これらのフィールドは、データ同期などの内部目的で使用され、通常の使用には影響しないため、無視してかまいません。
ワイドテーブルと Elasticsearch インデックスフィールドのデフォルトの型マッピング
ワイドテーブルと検索インデックスの間のデフォルトの型マッピングは次のとおりです。
ワイドテーブルの型 | Elasticsearch の型 |
BOOLEAN/HBOOLEAN | bool |
BYTE/SHORT/HSHORT/INT/HINTEGER/UNSIGNED_BYTE/UNSIGNED_SHORT/UNSIGNED_INTEGER | integer |
LONG/HLONG/UNSIGNED_LONG/TIMESTAMP | long |
FLOAT/HFLOAT/UNSIGNED_FLOAT | float |
DOUBLE/HDOUBLE/UNSIGNED_DOUBLE | double |
STRING/HSTRING | keyword |
CHAR/BINARY/VARBINARY | keyword |
JSON | object |
ワイドテーブルと Elasticsearch インデックスのプライマリキーのマッピング
推奨される方法
SQL テーブルの場合、インデックスの _id 列をワイドテーブルのプライマリキーにマッピングし直すには、複雑なロジックが必要です。ストレージ容量を節約する必要がない場合は、直接インデックスソースを有効化できます。これにより、ワイドテーブルをクエリする必要がなくなります。
容量を節約し、使用を簡素化するために、ワイドテーブルのプライマリキー列をインデックス列リストに追加し、プライマリキー列の生データのみを保存するように指定できます。これにより、ES クエリ結果からワイドテーブルのプライマリキー列を直接取得できます。例えば、次の SQL 文は、ワイドテーブルのプライマリキー列である user_id, name をインデックス列に追加します。user_id, name 列のソースを有効化できます。その後、ES API を使用してワイドテーブルのプライマリキー列を取得できます。
CREATE INDEX idx USING SEARCH ON search_table (
user_id,
name,
age,
gender,
address(type=text, analyzer=ik),
email,
city
) WITH (
numShards=4,
SOURCE_SETTINGS='
{
"includes": ["user_id", "name"]
}
'
);上記の文を使用してインデックスを作成した後、ES API を使用してクエリを実行できます。
GET /searchindex_db.search_table.idx/_search次の結果が返されます。
{
"took": 1,
"timed_out": false,
"_shards": {
"total": 4,
"successful": 4,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 4,
"relation": "eq"
},
"max_score": 1,
"hits": [
{
"_index": "searchindex_db.search_table.idx",
"_id": "8000000000000006e69d8ee58588e7949f00",
"_score": 1,
"_source": {
"user_id": 6,
"name": "Mr. Li"
}
},
{
"_index": "searchindex_db.search_table.idx",
"_id": "8000000000000014e78e8be58588e7949f00",
"_score": 1,
"_source": {
"user_id": 20,
"name": "Mr. Wang"
}
},
{
"_index": "searchindex_db.search_table.idx",
"_id": "8000000000000001e5bca0e58588e7949f00",
"_score": 1,
"_source": {
"user_id": 1,
"name": "Mr. Zhang"
}
},
{
"_index": "searchindex_db.search_table.idx",
"_id": "800000000000001ce99988e5a5b3e5a3ab00",
"_score": 1,
"_source": {
"user_id": 28,
"name": "Ms. Chen"
}
}
]
}
}インデックスの _id とワイドテーブルのプライマリキー間のマッピング
推奨される方法を使用しない場合は、このセクションの手順に従って _id 列をワイドテーブルのプライマリキーに変換できます。
ワイドテーブルのプライマリキーと Elasticsearch インデックスの _id 列は、インデックス同期によって確立される 1 対 1 のマッピング関係にあります。ワイドテーブルデータの各行は、一意のインデックスドキュメントに対応します。インデックス作成時に source=false (生データを保存しない) を設定した場合、生データを取得するにはプライマリキーでワイドテーブルをクエリする必要があります。次のコード例は、Elasticsearch インデックスの _id 列をワイドテーブルのプライマリキー列にマッピングする方法を示しています。
次のコード例では、ワイドテーブルでサポートされている基本データ型のみを考慮しています。
package org.example;
import com.alibaba.lindorm.client.core.utils.DataTypeUtils;
import com.alibaba.lindorm.client.schema.DataType;
import com.alibaba.lindorm.client.schema.SortOrder;
import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.hbase.util.Bytes;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
public class SearchDocIdToLindormColumnValues {
public static void main(String[] args) throws Exception {
List<String> primaryKeyTypes = new ArrayList<>();
List<SortOrder> primaryKeySortOrders = new ArrayList<>();
// これは Elasticsearch インデックスの "_id" の値です
String id = "00818001800000018000000000000001bf8ccccebff199999999999bc10b02007465737400800001992c36f418800001992c36f418800001992c36f418000000007465737420737472696e67";
// プライマリキーの型とソート順を取得します
getPrimaryKeyTypesAndSortOrders(primaryKeyTypes, primaryKeySortOrders);
// desc 型のプライマリキー
// String id = "017e7ffe7ffffffe7ffffffffffffffe40733331400e6666666666643febf4feff8b9a8c8bff7ffffe66d3c90be77ffffe66d3c90be77ffffe66d3c90be7ffffffff7465737420737472696e67";
// getPrimaryKeyTypesAndSortOrdersDesc(primaryKeyTypes, primaryKeySortOrders);
// 検索インデックスから rowKey のエンコード型を取得します
RowKeyEncodedType rowKeyEncodedType = RowKeyEncodedType.getFormatterType(primaryKeyTypes, primaryKeySortOrders);
byte[] rowKey;
if (RowKeyEncodedType.HEX.equals(rowKeyEncodedType)) {
rowKey = Hex.decodeHex(id.toCharArray());
} else {
rowKey = Bytes.toBytes(id);
}
// Lindorm の rowKey を複数のプライマリキー値に解析します
List<Object> values = parseRowKeyToValues(rowKey, primaryKeyTypes, primaryKeySortOrders);
for (Object value : values) {
System.out.println(value);
}
}
private static void getPrimaryKeyTypesAndSortOrders(List<String> primaryKeyTypes, List<SortOrder> primaryKeySortOrders) {
// これはサンプルです。mysql 接続で 'DESCRIBE TABLE' を実行してテーブルから実際のスキーマを取得できます
// または、ここにプライマリキーの型を順番に記述することもできます
primaryKeyTypes.add("BOOLEAN");
primaryKeyTypes.add("TINYINT");
primaryKeyTypes.add("SMALLINT");
primaryKeyTypes.add("INT");
primaryKeyTypes.add("BIGINT");
primaryKeyTypes.add("FLOAT");
primaryKeyTypes.add("DOUBLE");
primaryKeyTypes.add("DECIMAL(10,2)");
primaryKeyTypes.add("VARCHAR");
primaryKeyTypes.add("DATE");
primaryKeyTypes.add("TIME");
primaryKeyTypes.add("TIMESTAMP");
primaryKeyTypes.add("VARBINARY");
for (int i = 0; i < 13; ++i) {
primaryKeySortOrders.add(SortOrder.ASC);
}
}
private static void getPrimaryKeyTypesAndSortOrdersDesc(List<String> primaryKeyTypes, List<SortOrder> primaryKeySortOrders) {
// これはサンプルです。mysql 接続で 'DESCRIBE TABLE' を実行してテーブルから実際のスキーマを取得できます
// または、ここにプライマリキーの型を順番に記述することもできます
primaryKeyTypes.add("BOOLEAN");
primaryKeyTypes.add("TINYINT");
primaryKeyTypes.add("SMALLINT");
primaryKeyTypes.add("INT");
primaryKeyTypes.add("BIGINT");
primaryKeyTypes.add("FLOAT");
primaryKeyTypes.add("DOUBLE");
primaryKeyTypes.add("DECIMAL(10,2)");
primaryKeyTypes.add("VARCHAR");
primaryKeyTypes.add("DATE");
primaryKeyTypes.add("TIME");
primaryKeyTypes.add("TIMESTAMP");
primaryKeyTypes.add("VARBINARY");
for (int i = 0; i < 12; ++i) {
primaryKeySortOrders.add(SortOrder.DESC);
}
primaryKeySortOrders.add(SortOrder.ASC);
}
private static List<Object> parseRowKeyToValues(byte[] rowKey, List<String> primaryKeyTypes, List<SortOrder> primaryKeySortOrders) {
List<Object> decoded = new ArrayList<>(primaryKeyTypes.size());
int pos = 0;
for (int i = 0; i < primaryKeyTypes.size(); i++) {
String primaryKeyType = primaryKeyTypes.get(i).toLowerCase();
int start = pos;
if (primaryKeyType.equalsIgnoreCase("varchar")) {
while (rowKey[pos] != getSeparatorByte(primaryKeySortOrders.get(i))) {
++pos;
}
decoded.add(decodeString(rowKey, start, pos - start, primaryKeySortOrders.get(i)));
++pos;
} else if (primaryKeyType.equalsIgnoreCase("boolean")) {
decoded.add(decodeBoolean(rowKey, pos, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_BOOLEAN;
} else if (primaryKeyType.equalsIgnoreCase("tinyint")) {
decoded.add(decodeByte(rowKey, pos, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_BYTE;
} else if (primaryKeyType.equalsIgnoreCase("smallint")) {
decoded.add(decodeShort(rowKey, pos, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_SHORT;
} else if (primaryKeyType.equalsIgnoreCase("int")) {
decoded.add(decodeInt(rowKey, pos, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_INT;
} else if (primaryKeyType.equalsIgnoreCase("bigint")) {
decoded.add(decodeLong(rowKey, start, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_LONG;
} else if (primaryKeyType.equalsIgnoreCase("float")) {
decoded.add(decodeFloat(rowKey, start, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_FLOAT;
} else if (primaryKeyType.equalsIgnoreCase("double")) {
decoded.add(decodeDouble(rowKey, start, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_DOUBLE;
} else if (primaryKeyType.contains("decimal")) {
while (rowKey[pos] != getSeparatorByte(primaryKeySortOrders.get(i))) {
++pos;
}
decoded.add(decodeDecimal(rowKey, start, pos - start, primaryKeySortOrders.get(i)));
++pos;
} else if (primaryKeyType.contains("varbinary")) {
decoded.add(decodeVarbinary(rowKey, start, rowKey.length - start, primaryKeySortOrders.get(i)));
pos = rowKey.length;
} else if (primaryKeyType.contains("date")) {
decoded.add(decodeDate(rowKey, start, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_LONG;
} else if (primaryKeyType.contains("time")) {
decoded.add(decodeTime(rowKey, start, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_LONG;
} else if (primaryKeyType.contains("timestamp")) {
decoded.add(decodeTimestamp(rowKey, start, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT;
}
}
return decoded;
}
private static Object decodeString(byte[] bytes, int offset, int length, SortOrder sortOrder) {
if (SortOrder.ASC.equals(sortOrder)) {
return new String(bytes, offset, length, StandardCharsets.UTF_8);
} else {
byte[] reversed = new byte[length];
for (int i = 0; i < length; i++) {
reversed[i] = (byte) (bytes[offset + i] ^ 0xFF);
}
return new String(reversed, StandardCharsets.UTF_8);
}
}
private static Object decodeBoolean(byte[] bytes, int offset, SortOrder sortOrder) {
return ((bytes[offset] == 0) == SortOrder.DESC.equals(sortOrder));
}
private static Object decodeByte(byte[] bytes, int offset, SortOrder sortOrder) {
return DataTypeUtils.decodeByte(bytes, offset, sortOrder);
}
private static Object decodeShort(byte[] bytes, int offset, SortOrder sortOrder) {
return DataTypeUtils.decodeShort(bytes, offset, sortOrder);
}
private static Object decodeInt(byte[] bytes, int offset, SortOrder sortOrder) {
return DataTypeUtils.decodeInt(bytes, offset, sortOrder);
}
private static Object decodeLong(byte[] bytes, int offset, SortOrder sortOrder) {
return DataTypeUtils.decodeLong(bytes, offset, sortOrder);
}
private static Object decodeFloat(byte[] bytes, int offset, SortOrder sortOrder) {
return DataTypeUtils.decodeFloat(bytes, offset, sortOrder);
}
private static Object decodeDouble(byte[] bytes, int offset, SortOrder sortOrder) {
return DataTypeUtils.decodeDouble(bytes, offset, sortOrder);
}
private static Object decodeDecimal(byte[] bytes, int offset, int length, SortOrder sortOrder) {
if (SortOrder.DESC.equals(sortOrder)) {
return DataTypeUtils.decodeDecimal(bytes, offset, length, sortOrder, DataType.DECIMAL_V2);
}
return DataTypeUtils.decodeDecimal(bytes, offset, length, sortOrder);
}
private static Object decodeVarbinary(byte[] bytes, int offset, int length, SortOrder sortOrder) {
byte[] result = new byte[length];
if (SortOrder.DESC.equals(sortOrder)) {
DataTypeUtils.invert(bytes, offset, result, 0, length);
} else {
System.arraycopy(bytes, offset, result, 0, length);
}
return result;
}
private static Object decodeDate(byte[] bytes, int offset, SortOrder sortOrder) {
Long result = (Long) decodeLong(bytes, offset, sortOrder);
return new Date(result);
}
private static Object decodeTime(byte[] bytes, int offset, SortOrder sortOrder) {
Long result = (Long) decodeLong(bytes, offset, sortOrder);
return new Time(result);
}
private static Object decodeTimestamp(byte[] bytes, int offset, SortOrder sortOrder) {
long ms = DataTypeUtils.decodeLong(bytes, offset, sortOrder);
int nanos = DataTypeUtils.decodeUnsignedInt(bytes, offset + Bytes.SIZEOF_LONG, sortOrder);
Timestamp ts = new Timestamp(ms);
ts.setNanos(ts.getNanos() + nanos);
return ts;
}
private static byte getSeparatorByte(SortOrder sortOrder) {
if (sortOrder == SortOrder.DESC) {
return (byte) (0xFF);
} else {
return (byte) 0;
}
}
public enum RowKeyEncodedType {
STRING,
HEX;
public static RowKeyEncodedType getFormatterType(List<String> primaryKeyTypes, List<SortOrder> primaryKeySortOrders) {
int n = primaryKeyTypes.size();
for (int i = 0; i < n; i++) {
String primaryKeyType = primaryKeyTypes.get(i);
SortOrder sortOrder = primaryKeySortOrders.get(i);
if (!(primaryKeyType.equalsIgnoreCase("varchar") || primaryKeyType.equalsIgnoreCase("hstring"))
|| SortOrder.DESC.equals(sortOrder)) {
return RowKeyEncodedType.HEX;
}
}
return RowKeyEncodedType.STRING;
}
}
}