Lindorm搜尋引擎(相容Elasticsearch API版本)支援通過開源ES介面訪問搜尋索引,便於已採用ES技術棧的業務平滑遷移。本文以寬表引擎的SQL表情境為例,示範通過ES API訪問搜尋索引的最佳實務。
前提條件
已開通搜尋索引服務,要求開通Lindorm搜尋引擎的Elasticsearch相容版本。
推薦使用ES 7.10版本的用戶端訪問;若有其他版本用戶端的訪問需求,可聯絡Lindorm支援人員(DingTalk號:s0s3eg3)諮詢。
資料準備
準備SQL表並寫入範例資料。
寬表引擎建立SQL表
CREATE DATABASE searchindex_db;
USE searchindex_db;
CREATE TABLE search_table (
user_id BIGINT,
name VARCHAR,
age SMALLINT,
gender VARCHAR,
address VARCHAR,
email VARCHAR,
city VARCHAR,
PRIMARY KEY (user_id, name)
);寫入範例資料
UPSERT INTO search_table (user_id,name,age,gender,address,email,city) VALUES (1, '張先生', 18, 'M', '北京市朝陽區', 'a***@example.net', '北京');
UPSERT INTO search_table (user_id,name,age,gender,address,email,city) VALUES (6, '李先生', 32, 'M', '杭州市餘杭區', 'a***@example.net', '杭州');
UPSERT INTO search_table (user_id,name,age,gender,address,email,city) VALUES (20, '王先生', 28, 'M', '杭州市濱江區', 'a***@example.net', '杭州');
UPSERT INTO search_table (user_id,name,age,gender,address,email,city) VALUES (28, '陳女士', 36, 'F', '深圳市南山區', 'a***@example.net', '深圳');建立搜尋索引
LindormSQL表應通過SQL方式建立搜尋索引,本文僅提供簡單樣本,更詳細的建立方式可參考管理搜尋索引。
建立索引
CREATE INDEX idx USING SEARCH ON search_table (
name,
age,
gender,
address(type=text, analyzer=ik),
email,
city
) WITH (numShards=4);開啟索引source
預設情況下,通過SQL表建立的搜尋索引不在搜尋引擎儲存未經處理資料,以節約儲存空間。如需通過ES API直接擷取未經處理資料,請在建立索引時指定SOURCE_SETTINGS屬性。
若需指定SOURCE_SETTINGS,要求寬表引擎版本不低於2.7.4.1。
CREATE INDEX idx USING SEARCH ON search_table (
name,
age,
gender,
address(type=text, analyzer=ik),
email,
city
) WITH (
numShards=4,
SOURCE_SETTINGS='
{
"enabled": true
}
'
);擷取Elasticsearch索引名
通過SQL方式建立搜尋索引時,Elasticsearch中對應的索引名稱為寬表namespace名.寬表Table名.搜尋索引名。以上樣本對應的Elasticsearch索引名為searchindex_db.search_table.idx。登入UI介面並執行下述查詢,可以查到對應的索引資訊。
GET searchindex_db.search_table.idx寬表與Elasticsearch索引欄位的映射關係
預設情況下,SQL表和Elasticsearch索引的欄位名是一一對應的關係。比如上述樣本中,最終的Elasticsearch索引mappings結構如下:
{
"mappings": {
"dynamic": "true",
"dynamic_templates": [],
"properties": {
"_searchindex_id": {
"type": "keyword",
"index": false
},
"address": {
"type": "text",
"analyzer": "ik_max_word"
},
"age": {
"type": "integer"
},
"city": {
"type": "keyword"
},
"delete_version_l": {
"type": "long"
},
"email": {
"type": "keyword"
},
"gender": {
"type": "keyword"
},
"update_version_l": {
"type": "long"
}
}
}
}SQL表中的age列對應索引中的age欄位,gender列對應索引中的gender欄位,以此類推。索引中存在部分自動添加的內建欄位(如update_version_l),這些欄位用於內部的資料同步等用途,不影響正常使用,無需關注。
寬表與Elasticsearch索引欄位的預設類型映射關係
寬表和搜尋的類型映射關係,預設如下:
寬表類型 | Elasticsearch類型 |
BOOLEAN/HBOOLEAN | bool |
BYTE/SHORT/HSHORT/INT/HINTEGER/UNSIGNED_BYTE/UNSIGNED_SHORT/UNSIGNED_INTEGER | integer |
LONG/HLONG/UNSIGNED_LONG/TIMESTAMP | long |
FLOAT/HFLOAT/UNSIGNED_FLOAT | float |
DOUBLE/HDOUBLE/UNSIGNED_DOUBLE | double |
STRING/HSTRING | keyword |
CHAR/BINARY/VARBINARY | keyword |
JSON | object |
寬表與Elasticsearch索引的主鍵映射關係
推薦使用方式
對SQL表而言,索引_id列映射回寬表主鍵的邏輯相對複雜,若無節約儲存空間需求,可直接開啟索引source,無需從寬表回查。
若需兼顧節約空間和使用便捷的需求,推薦將寬表主鍵列加入索引列列表中,指定僅儲存主鍵列的未經處理資料,可直接從ES查詢結果擷取寬表的主鍵列。例如以下SQL將寬表的主鍵列user_id, name都加入索引列中,source可以指定對user_id, name兩個列開啟,即可通過ES API擷取寬表的主鍵列。
CREATE INDEX idx USING SEARCH ON search_table (
user_id,
name,
age,
gender,
address(type=text, analyzer=ik),
email,
city
) WITH (
numShards=4,
SOURCE_SETTINGS='
{
"includes": ["user_id", "name"]
}
'
);上述建立索引語句,通過ES API查詢:
GET /searchindex_db.search_table.idx/_search返回如下:
{
"took": 1,
"timed_out": false,
"_shards": {
"total": 4,
"successful": 4,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 4,
"relation": "eq"
},
"max_score": 1,
"hits": [
{
"_index": "searchindex_db.search_table.idx",
"_id": "8000000000000006e69d8ee58588e7949f00",
"_score": 1,
"_source": {
"user_id": 6,
"name": "李先生"
}
},
{
"_index": "searchindex_db.search_table.idx",
"_id": "8000000000000014e78e8be58588e7949f00",
"_score": 1,
"_source": {
"user_id": 20,
"name": "王先生"
}
},
{
"_index": "searchindex_db.search_table.idx",
"_id": "8000000000000001e5bca0e58588e7949f00",
"_score": 1,
"_source": {
"user_id": 1,
"name": "張先生"
}
},
{
"_index": "searchindex_db.search_table.idx",
"_id": "800000000000001ce99988e5a5b3e5a3ab00",
"_score": 1,
"_source": {
"user_id": 28,
"name": "陳女士"
}
}
]
}
}索引_id和寬表主鍵映射關係
如果您不使用上述推薦方式,請參考本章節完成_id列與寬表主鍵的轉換操作。
寬表主鍵與Elasticsearch索引的_id列通過索引同步建立一一映射:每行寬表資料對應唯一索引文檔。若建立索引時設定source=false(不儲存未經處理資料),則需根據主鍵回查寬表擷取未經處理資料。以下程式碼範例展示如何將Elasticsearch索引的_id列映射為寬表主鍵列。
以下程式碼範例僅考慮寬表已支援的基礎資料類型。
package org.example;
import com.alibaba.lindorm.client.core.utils.DataTypeUtils;
import com.alibaba.lindorm.client.schema.DataType;
import com.alibaba.lindorm.client.schema.SortOrder;
import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.hbase.util.Bytes;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
public class SearchDocIdToLindormColumnValues {
public static void main(String[] args) throws Exception {
List<String> primaryKeyTypes = new ArrayList<>();
List<SortOrder> primaryKeySortOrders = new ArrayList<>();
// this is "_id" value in Elasticsearch index
String id = "00818001800000018000000000000001bf8ccccebff199999999999bc10b02007465737400800001992c36f418800001992c36f418800001992c36f41800000000e6b58be8af95e5ad97e7aca6e4b8b2";
// get primary key types and sort orders
getPrimaryKeyTypesAndSortOrders(primaryKeyTypes, primaryKeySortOrders);
// primary keys in desc type
// String id = "017e7ffe7ffffffe7ffffffffffffffe40733331400e6666666666643febf4feff8b9a8c8bff7ffffe66d3c90be77ffffe66d3c90be77ffffe66d3c90be7ffffffffe6b58be8af95e5ad97e7aca6e4b8b2";
// getPrimaryKeyTypesAndSortOrdersDesc(primaryKeyTypes, primaryKeySortOrders);
// get rowKey encoded type from search index
RowKeyEncodedType rowKeyEncodedType = RowKeyEncodedType.getFormatterType(primaryKeyTypes, primaryKeySortOrders);
byte[] rowKey;
if (RowKeyEncodedType.HEX.equals(rowKeyEncodedType)) {
rowKey = Hex.decodeHex(id.toCharArray());
} else {
rowKey = Bytes.toBytes(id);
}
// parse rowKey in Lindorm to several primary key values
List<Object> values = parseRowKeyToValues(rowKey, primaryKeyTypes, primaryKeySortOrders);
for (Object value : values) {
System.out.println(value);
}
}
private static void getPrimaryKeyTypesAndSortOrders(List<String> primaryKeyTypes, List<SortOrder> primaryKeySortOrders) {
// sample here, you can get this through mysql connection by 'DESCRIBE TABLE' and get real schema from your table
// OR you can simply write down your primary types in order here
primaryKeyTypes.add("BOOLEAN");
primaryKeyTypes.add("TINYINT");
primaryKeyTypes.add("SMALLINT");
primaryKeyTypes.add("INT");
primaryKeyTypes.add("BIGINT");
primaryKeyTypes.add("FLOAT");
primaryKeyTypes.add("DOUBLE");
primaryKeyTypes.add("DECIMAL(10,2)");
primaryKeyTypes.add("VARCHAR");
primaryKeyTypes.add("DATE");
primaryKeyTypes.add("TIME");
primaryKeyTypes.add("TIMESTAMP");
primaryKeyTypes.add("VARBINARY");
for (int i = 0; i < 13; ++i) {
primaryKeySortOrders.add(SortOrder.ASC);
}
}
private static void getPrimaryKeyTypesAndSortOrdersDesc(List<String> primaryKeyTypes, List<SortOrder> primaryKeySortOrders) {
// sample here, you can get this through mysql connection by 'DESCRIBE TABLE' and get real schema from your table
// OR you can simply write down your primary types in order here
primaryKeyTypes.add("BOOLEAN");
primaryKeyTypes.add("TINYINT");
primaryKeyTypes.add("SMALLINT");
primaryKeyTypes.add("INT");
primaryKeyTypes.add("BIGINT");
primaryKeyTypes.add("FLOAT");
primaryKeyTypes.add("DOUBLE");
primaryKeyTypes.add("DECIMAL(10,2)");
primaryKeyTypes.add("VARCHAR");
primaryKeyTypes.add("DATE");
primaryKeyTypes.add("TIME");
primaryKeyTypes.add("TIMESTAMP");
primaryKeyTypes.add("VARBINARY");
for (int i = 0; i < 12; ++i) {
primaryKeySortOrders.add(SortOrder.DESC);
}
primaryKeySortOrders.add(SortOrder.ASC);
}
private static List<Object> parseRowKeyToValues(byte[] rowKey, List<String> primaryKeyTypes, List<SortOrder> primaryKeySortOrders) {
List<Object> decoded = new ArrayList<>(primaryKeyTypes.size());
int pos = 0;
for (int i = 0; i < primaryKeyTypes.size(); i++) {
String primaryKeyType = primaryKeyTypes.get(i).toLowerCase();
int start = pos;
if (primaryKeyType.equalsIgnoreCase("varchar")) {
while (rowKey[pos] != getSeparatorByte(primaryKeySortOrders.get(i))) {
++pos;
}
decoded.add(decodeString(rowKey, start, pos - start, primaryKeySortOrders.get(i)));
++pos;
} else if (primaryKeyType.equalsIgnoreCase("boolean")) {
decoded.add(decodeBoolean(rowKey, pos, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_BOOLEAN;
} else if (primaryKeyType.equalsIgnoreCase("tinyint")) {
decoded.add(decodeByte(rowKey, pos, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_BYTE;
} else if (primaryKeyType.equalsIgnoreCase("smallint")) {
decoded.add(decodeShort(rowKey, pos, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_SHORT;
} else if (primaryKeyType.equalsIgnoreCase("int")) {
decoded.add(decodeInt(rowKey, pos, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_INT;
} else if (primaryKeyType.equalsIgnoreCase("bigint")) {
decoded.add(decodeLong(rowKey, start, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_LONG;
} else if (primaryKeyType.equalsIgnoreCase("float")) {
decoded.add(decodeFloat(rowKey, start, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_FLOAT;
} else if (primaryKeyType.equalsIgnoreCase("double")) {
decoded.add(decodeDouble(rowKey, start, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_DOUBLE;
} else if (primaryKeyType.contains("decimal")) {
while (rowKey[pos] != getSeparatorByte(primaryKeySortOrders.get(i))) {
++pos;
}
decoded.add(decodeDecimal(rowKey, start, pos - start, primaryKeySortOrders.get(i)));
++pos;
} else if (primaryKeyType.contains("varbinary")) {
decoded.add(decodeVarbinary(rowKey, start, rowKey.length - start, primaryKeySortOrders.get(i)));
pos = rowKey.length;
} else if (primaryKeyType.contains("date")) {
decoded.add(decodeDate(rowKey, start, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_LONG;
} else if (primaryKeyType.contains("time")) {
decoded.add(decodeTime(rowKey, start, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_LONG;
} else if (primaryKeyType.contains("timestamp")) {
decoded.add(decodeTimestamp(rowKey, start, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT;
}
}
return decoded;
}
private static Object decodeString(byte[] bytes, int offset, int length, SortOrder sortOrder) {
if (SortOrder.ASC.equals(sortOrder)) {
return new String(bytes, offset, length, StandardCharsets.UTF_8);
} else {
byte[] reversed = new byte[length];
for (int i = 0; i < length; i++) {
reversed[i] = (byte) (bytes[offset + i] ^ 0xFF);
}
return new String(reversed, StandardCharsets.UTF_8);
}
}
private static Object decodeBoolean(byte[] bytes, int offset, SortOrder sortOrder) {
return ((bytes[offset] == 0) == SortOrder.DESC.equals(sortOrder));
}
private static Object decodeByte(byte[] bytes, int offset, SortOrder sortOrder) {
return DataTypeUtils.decodeByte(bytes, offset, sortOrder);
}
private static Object decodeShort(byte[] bytes, int offset, SortOrder sortOrder) {
return DataTypeUtils.decodeShort(bytes, offset, sortOrder);
}
private static Object decodeInt(byte[] bytes, int offset, SortOrder sortOrder) {
return DataTypeUtils.decodeInt(bytes, offset, sortOrder);
}
private static Object decodeLong(byte[] bytes, int offset, SortOrder sortOrder) {
return DataTypeUtils.decodeLong(bytes, offset, sortOrder);
}
private static Object decodeFloat(byte[] bytes, int offset, SortOrder sortOrder) {
return DataTypeUtils.decodeFloat(bytes, offset, sortOrder);
}
private static Object decodeDouble(byte[] bytes, int offset, SortOrder sortOrder) {
return DataTypeUtils.decodeDouble(bytes, offset, sortOrder);
}
private static Object decodeDecimal(byte[] bytes, int offset, int length, SortOrder sortOrder) {
if (SortOrder.DESC.equals(sortOrder)) {
return DataTypeUtils.decodeDecimal(bytes, offset, length, sortOrder, DataType.DECIMAL_V2);
}
return DataTypeUtils.decodeDecimal(bytes, offset, length, sortOrder);
}
private static Object decodeVarbinary(byte[] bytes, int offset, int length, SortOrder sortOrder) {
byte[] result = new byte[length];
if (SortOrder.DESC.equals(sortOrder)) {
DataTypeUtils.invert(bytes, offset, result, 0, length);
} else {
System.arraycopy(bytes, offset, result, 0, length);
}
return result;
}
private static Object decodeDate(byte[] bytes, int offset, SortOrder sortOrder) {
Long result = (Long) decodeLong(bytes, offset, sortOrder);
return new Date(result);
}
private static Object decodeTime(byte[] bytes, int offset, SortOrder sortOrder) {
Long result = (Long) decodeLong(bytes, offset, sortOrder);
return new Time(result);
}
private static Object decodeTimestamp(byte[] bytes, int offset, SortOrder sortOrder) {
long ms = DataTypeUtils.decodeLong(bytes, offset, sortOrder);
int nanos = DataTypeUtils.decodeUnsignedInt(bytes, offset + Bytes.SIZEOF_LONG, sortOrder);
Timestamp ts = new Timestamp(ms);
ts.setNanos(ts.getNanos() + nanos);
return ts;
}
private static byte getSeparatorByte(SortOrder sortOrder) {
if (sortOrder == SortOrder.DESC) {
return (byte) (0xFF);
} else {
return (byte) 0;
}
}
public enum RowKeyEncodedType {
STRING,
HEX;
public static RowKeyEncodedType getFormatterType(List<String> primaryKeyTypes, List<SortOrder> primaryKeySortOrders) {
int n = primaryKeyTypes.size();
for (int i = 0; i < n; i++) {
String primaryKeyType = primaryKeyTypes.get(i);
SortOrder sortOrder = primaryKeySortOrders.get(i);
if (!(primaryKeyType.equalsIgnoreCase("varchar") || primaryKeyType.equalsIgnoreCase("hstring"))
|| SortOrder.DESC.equals(sortOrder)) {
return RowKeyEncodedType.HEX;
}
}
return RowKeyEncodedType.STRING;
}
}
}