The Lindorm search engine is compatible with the open source Elasticsearch (ES) API, letting you access search indexes and migrate services built on the ES technology stack with minimal changes. This topic uses an SQL table in LindormTable as an example to demonstrate how to access a search index using the ES API.
Prerequisites
Before you begin, ensure that you have:
-
Enabled the search index service using the Elasticsearch-compatible version of the Lindorm search engine
-
An ES 7.10 client (to use a different version, contact Lindorm technical support at DingTalk ID: s0s3eg3)
Key concepts
The following table maps LindormTable SQL concepts to their Elasticsearch equivalents. Use this as a reference when reading ES API documentation alongside Lindorm SQL documentation.
| LindormTable SQL | Elasticsearch | Description |
|---|---|---|
| Column | Field | The named entry that stores a single value |
| Row | Document | A collection of fields; each wide table row maps to one ES document |
| Table | Index | The target against which queries are run |
| Database (namespace) | Index prefix | The namespace in the ES index name format: namespace.table.index |
Prepare data
Create an SQL table in LindormTable
CREATE DATABASE searchindex_db;
USE searchindex_db;
CREATE TABLE search_table (
user_id BIGINT,
name VARCHAR,
age SMALLINT,
gender VARCHAR,
address VARCHAR,
email VARCHAR,
city VARCHAR,
PRIMARY KEY (user_id, name)
);
Write sample data
UPSERT INTO search_table (user_id,name,age,gender,address,email,city) VALUES (1, 'Mr. Zhang', 18, 'M', 'Chaoyang District, Beijing', 'a***@example.net', 'Beijing');
UPSERT INTO search_table (user_id,name,age,gender,address,email,city) VALUES (6, 'Mr. Li', 32, 'M', 'Yuhang District, Hangzhou', 'a***@example.net', 'Hangzhou');
UPSERT INTO search_table (user_id,name,age,gender,address,email,city) VALUES (20, 'Mr. Wang', 28, 'M', 'Binjiang District, Hangzhou', 'a***@example.net', 'Hangzhou');
UPSERT INTO search_table (user_id,name,age,gender,address,email,city) VALUES (28, 'Ms. Chen', 36, 'F', 'Nanshan District, Shenzhen', 'a***@example.net', 'Shenzhen');
Create a search index
For SQL tables, create search indexes using SQL. Before writing the CREATE INDEX statement, decide how much raw data the index should store, because this choice affects both storage cost and how you retrieve primary keys from ES query results.
| Storage option | How to configure | Trade-off |
|---|---|---|
| Store primary key columns only (recommended) | SOURCE_SETTINGS='{"includes": ["user_id", "name"]}' |
Saves space; primary keys are available directly in ES results, no need to decode _id |
| Store all fields | SOURCE_SETTINGS='{"enabled": true}' |
Convenient for direct retrieval; uses more storage |
| Store nothing (default) | Omit SOURCE_SETTINGS |
Minimum storage; you must decode _id to query the wide table for raw data |
SOURCE_SETTINGS requires LindormTable version 2.7.4.1 or later.
Recommended: store primary key columns only
Add the primary key columns (user_id, name) to the index column list and configure SOURCE_SETTINGS to store only those columns. This lets you retrieve primary keys directly from ES results without decoding _id.
CREATE INDEX idx USING SEARCH ON search_table (
user_id,
name,
age,
gender,
address(type=text, analyzer=ik),
email,
city
) WITH (
numShards=4,
SOURCE_SETTINGS='
{
"includes": ["user_id", "name"]
}
'
);
The non-primary-key fields (age, city, gender, and so on) remain indexed and searchable — they are just not stored in _source.
Store all fields
To retrieve any field directly from ES results without querying the wide table:
CREATE INDEX idx USING SEARCH ON search_table (
name,
age,
gender,
address(type=text, analyzer=ik),
email,
city
) WITH (
numShards=4,
SOURCE_SETTINGS='
{
"enabled": true
}
'
);
Store nothing (default)
To minimize storage, omit SOURCE_SETTINGS:
CREATE INDEX idx USING SEARCH ON search_table (
name,
age,
gender,
address(type=text, analyzer=ik),
email,
city
) WITH (numShards=4);
With this option, raw data is not stored in the search engine. To retrieve a row's full data, decode the _id field to obtain the wide table primary key, then query the wide table. See Map the index _id to the wide table primary key for details.
For more index creation options, see Manage search indexes.
Get the Elasticsearch index name
When you create a search index using SQL, the corresponding Elasticsearch index name follows the format namespace_name.table_name.search_index_name. For the example above, the name is searchindex_db.search_table.idx.
To verify the index, log on to the UI and run:
GET searchindex_db.search_table.idx
Field mapping
Field names
Field names map one-to-one between the SQL table and the Elasticsearch index. For example, after creating the index with the recommended statement above, the Elasticsearch mapping looks like this:
{
"mappings": {
"dynamic": "true",
"dynamic_templates": [],
"properties": {
"_searchindex_id": {
"type": "keyword",
"index": false
},
"address": {
"type": "text",
"analyzer": "ik_max_word"
},
"age": {
"type": "integer"
},
"city": {
"type": "keyword"
},
"delete_version_l": {
"type": "long"
},
"email": {
"type": "keyword"
},
"gender": {
"type": "keyword"
},
"update_version_l": {
"type": "long"
}
}
}
}
The index also contains built-in fields such as update_version_l and _searchindex_id. These are used for internal data synchronization and can be ignored.
Data types
The following table shows the default type mapping from SQL table columns to Elasticsearch fields.
| Wide table type | Elasticsearch type |
|---|---|
| BOOLEAN/HBOOLEAN | bool |
| BYTE/SHORT/HSHORT/INT/HINTEGER/UNSIGNED_BYTE/UNSIGNED_SHORT/UNSIGNED_INTEGER | integer |
| LONG/HLONG/UNSIGNED_LONG/TIMESTAMP | long |
| FLOAT/HFLOAT/UNSIGNED_FLOAT | float |
| DOUBLE/HDOUBLE/UNSIGNED_DOUBLE | double |
| STRING/HSTRING | keyword |
| CHAR/BINARY/VARBINARY | keyword |
| JSON | object |
keyword fields support exact-match queries. If you need full-text search on a field, set its type explicitly to text when creating the index (for example, address(type=text, analyzer=ik)).
Query using the ES API
After creating the index with the recommended SOURCE_SETTINGS, query it using the ES API:
GET /searchindex_db.search_table.idx/_search
The response includes _source with the stored primary key columns:
{
"took": 1,
"timed_out": false,
"_shards": {
"total": 4,
"successful": 4,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 4,
"relation": "eq"
},
"max_score": 1,
"hits": [
{
"_index": "searchindex_db.search_table.idx",
"_id": "8000000000000006e69d8ee58588e7949f00",
"_score": 1,
"_source": {
"user_id": 6,
"name": "Mr. Li"
}
},
{
"_index": "searchindex_db.search_table.idx",
"_id": "8000000000000014e78e8be58588e7949f00",
"_score": 1,
"_source": {
"user_id": 20,
"name": "Mr. Wang"
}
},
{
"_index": "searchindex_db.search_table.idx",
"_id": "8000000000000001e5bca0e58588e7949f00",
"_score": 1,
"_source": {
"user_id": 1,
"name": "Mr. Zhang"
}
},
{
"_index": "searchindex_db.search_table.idx",
"_id": "800000000000001ce99988e5a5b3e5a3ab00",
"_score": 1,
"_source": {
"user_id": 28,
"name": "Ms. Chen"
}
}
]
}
}
Each _source object contains the user_id and name primary key columns. The other indexed fields (age, city, gender, and so on) remain searchable but are not returned in _source. Use the primary key values to query the wide table directly when you need the full row.
Map the index _id to the wide table primary key
Skip this section if you used SOURCE_SETTINGS to store the primary key columns in the index — you can retrieve primary keys directly from _source.
Each wide table row maps one-to-one to an Elasticsearch document through index synchronization. If you did not store raw data in the index, decode the _id field to get the primary key, then query the wide table.
The following example covers only the basic data types supported by wide tables.
package org.example;
import com.alibaba.lindorm.client.core.utils.DataTypeUtils;
import com.alibaba.lindorm.client.schema.DataType;
import com.alibaba.lindorm.client.schema.SortOrder;
import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.hbase.util.Bytes;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
public class SearchDocIdToLindormColumnValues {
public static void main(String[] args) throws Exception {
List<String> primaryKeyTypes = new ArrayList<>();
List<SortOrder> primaryKeySortOrders = new ArrayList<>();
// this is "_id" value in ElasticSearch index
String id = "00818001800000018000000000000001bf8ccccebff199999999999bc10b02007465737400800001992c36f418800001992c36f418800001992c36f418000000007465737420737472696e67";
// get primary key types and sort orders
getPrimaryKeyTypesAndSortOrders(primaryKeyTypes, primaryKeySortOrders);
// primary keys in desc type
// String id = "017e7ffe7ffffffe7ffffffffffffffe40733331400e6666666666643febf4feff8b9a8c8bff7ffffe66d3c90be77ffffe66d3c90be77ffffe66d3c90be7ffffffff7465737420737472696e67";
// getPrimaryKeyTypesAndSortOrdersDesc(primaryKeyTypes, primaryKeySortOrders);
// get rowKey encoded type from search index
RowKeyEncodedType rowKeyEncodedType = RowKeyEncodedType.getFormatterType(primaryKeyTypes, primaryKeySortOrders);
byte[] rowKey;
if (RowKeyEncodedType.HEX.equals(rowKeyEncodedType)) {
rowKey = Hex.decodeHex(id.toCharArray());
} else {
rowKey = Bytes.toBytes(id);
}
// parse rowKey in Lindorm to several primary key values
List<Object> values = parseRowKeyToValues(rowKey, primaryKeyTypes, primaryKeySortOrders);
for (Object value : values) {
System.out.println(value);
}
}
private static void getPrimaryKeyTypesAndSortOrders(List<String> primaryKeyTypes, List<SortOrder> primaryKeySortOrders) {
// sample here, you can get this through mysql connection by 'DESCRIBE TABLE' and get real schema from your table
// OR you can simply write down your primary types in order here
primaryKeyTypes.add("BOOLEAN");
primaryKeyTypes.add("TINYINT");
primaryKeyTypes.add("SMALLINT");
primaryKeyTypes.add("INT");
primaryKeyTypes.add("BIGINT");
primaryKeyTypes.add("FLOAT");
primaryKeyTypes.add("DOUBLE");
primaryKeyTypes.add("DECIMAL(10,2)");
primaryKeyTypes.add("VARCHAR");
primaryKeyTypes.add("DATE");
primaryKeyTypes.add("TIME");
primaryKeyTypes.add("TIMESTAMP");
primaryKeyTypes.add("VARBINARY");
for (int i = 0; i < 13; ++i) {
primaryKeySortOrders.add(SortOrder.ASC);
}
}
private static void getPrimaryKeyTypesAndSortOrdersDesc(List<String> primaryKeyTypes, List<SortOrder> primaryKeySortOrders) {
// sample here, you can get this through mysql connection by 'DESCRIBE TABLE' and get real schema from your table
// OR you can simply write down your primary types in order here
primaryKeyTypes.add("BOOLEAN");
primaryKeyTypes.add("TINYINT");
primaryKeyTypes.add("SMALLINT");
primaryKeyTypes.add("INT");
primaryKeyTypes.add("BIGINT");
primaryKeyTypes.add("FLOAT");
primaryKeyTypes.add("DOUBLE");
primaryKeyTypes.add("DECIMAL(10,2)");
primaryKeyTypes.add("VARCHAR");
primaryKeyTypes.add("DATE");
primaryKeyTypes.add("TIME");
primaryKeyTypes.add("TIMESTAMP");
primaryKeyTypes.add("VARBINARY");
for (int i = 0; i < 12; ++i) {
primaryKeySortOrders.add(SortOrder.DESC);
}
primaryKeySortOrders.add(SortOrder.ASC);
}
private static List<Object> parseRowKeyToValues(byte[] rowKey, List<String> primaryKeyTypes, List<SortOrder> primaryKeySortOrders) {
List<Object> decoded = new ArrayList<>(primaryKeyTypes.size());
int pos = 0;
for (int i = 0; i < primaryKeyTypes.size(); i++) {
String primaryKeyType = primaryKeyTypes.get(i).toLowerCase();
int start = pos;
if (primaryKeyType.equalsIgnoreCase("varchar")) {
while (rowKey[pos] != getSeparatorByte(primaryKeySortOrders.get(i))) {
++pos;
}
decoded.add(decodeString(rowKey, start, pos - start, primaryKeySortOrders.get(i)));
++pos;
} else if (primaryKeyType.equalsIgnoreCase("boolean")) {
decoded.add(decodeBoolean(rowKey, pos, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_BOOLEAN;
} else if (primaryKeyType.equalsIgnoreCase("tinyint")) {
decoded.add(decodeByte(rowKey, pos, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_BYTE;
} else if (primaryKeyType.equalsIgnoreCase("smallint")) {
decoded.add(decodeShort(rowKey, pos, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_SHORT;
} else if (primaryKeyType.equalsIgnoreCase("int")) {
decoded.add(decodeInt(rowKey, pos, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_INT;
} else if (primaryKeyType.equalsIgnoreCase("bigint")) {
decoded.add(decodeLong(rowKey, start, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_LONG;
} else if (primaryKeyType.equalsIgnoreCase("float")) {
decoded.add(decodeFloat(rowKey, start, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_FLOAT;
} else if (primaryKeyType.equalsIgnoreCase("double")) {
decoded.add(decodeDouble(rowKey, start, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_DOUBLE;
} else if (primaryKeyType.contains("decimal")) {
while (rowKey[pos] != getSeparatorByte(primaryKeySortOrders.get(i))) {
++pos;
}
decoded.add(decodeDecimal(rowKey, start, pos - start, primaryKeySortOrders.get(i)));
++pos;
} else if (primaryKeyType.contains("varbinary")) {
decoded.add(decodeVarbinary(rowKey, start, rowKey.length - start, primaryKeySortOrders.get(i)));
pos = rowKey.length;
} else if (primaryKeyType.contains("date")) {
decoded.add(decodeDate(rowKey, start, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_LONG;
} else if (primaryKeyType.contains("time")) {
decoded.add(decodeTime(rowKey, start, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_LONG;
} else if (primaryKeyType.contains("timestamp")) {
decoded.add(decodeTimestamp(rowKey, start, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT;
}
}
return decoded;
}
private static Object decodeString(byte[] bytes, int offset, int length, SortOrder sortOrder) {
if (SortOrder.ASC.equals(sortOrder)) {
return new String(bytes, offset, length, StandardCharsets.UTF_8);
} else {
byte[] reversed = new byte[length];
for (int i = 0; i < length; i++) {
reversed[i] = (byte) (bytes[offset + i] ^ 0xFF);
}
return new String(reversed, StandardCharsets.UTF_8);
}
}
private static Object decodeBoolean(byte[] bytes, int offset, SortOrder sortOrder) {
return ((bytes[offset] == 0) == SortOrder.DESC.equals(sortOrder));
}
private static Object decodeByte(byte[] bytes, int offset, SortOrder sortOrder) {
return DataTypeUtils.decodeByte(bytes, offset, sortOrder);
}
private static Object decodeShort(byte[] bytes, int offset, SortOrder sortOrder) {
return DataTypeUtils.decodeShort(bytes, offset, sortOrder);
}
private static Object decodeInt(byte[] bytes, int offset, SortOrder sortOrder) {
return DataTypeUtils.decodeInt(bytes, offset, sortOrder);
}
private static Object decodeLong(byte[] bytes, int offset, SortOrder sortOrder) {
return DataTypeUtils.decodeLong(bytes, offset, sortOrder);
}
private static Object decodeFloat(byte[] bytes, int offset, SortOrder sortOrder) {
return DataTypeUtils.decodeFloat(bytes, offset, sortOrder);
}
private static Object decodeDouble(byte[] bytes, int offset, SortOrder sortOrder) {
return DataTypeUtils.decodeDouble(bytes, offset, sortOrder);
}
private static Object decodeDecimal(byte[] bytes, int offset, int length, SortOrder sortOrder) {
if (SortOrder.DESC.equals(sortOrder)) {
return DataTypeUtils.decodeDecimal(bytes, offset, length, sortOrder, DataType.DECIMAL_V2);
}
return DataTypeUtils.decodeDecimal(bytes, offset, length, sortOrder);
}
private static Object decodeVarbinary(byte[] bytes, int offset, int length, SortOrder sortOrder) {
byte[] result = new byte[length];
if (SortOrder.DESC.equals(sortOrder)) {
DataTypeUtils.invert(bytes, offset, result, 0, length);
} else {
System.arraycopy(bytes, offset, result, 0, length);
}
return result;
}
private static Object decodeDate(byte[] bytes, int offset, SortOrder sortOrder) {
Long result = (Long) decodeLong(bytes, offset, sortOrder);
return new Date(result);
}
private static Object decodeTime(byte[] bytes, int offset, SortOrder sortOrder) {
Long result = (Long) decodeLong(bytes, offset, sortOrder);
return new Time(result);
}
private static Object decodeTimestamp(byte[] bytes, int offset, SortOrder sortOrder) {
long ms = DataTypeUtils.decodeLong(bytes, offset, sortOrder);
int nanos = DataTypeUtils.decodeUnsignedInt(bytes, offset + Bytes.SIZEOF_LONG, sortOrder);
Timestamp ts = new Timestamp(ms);
ts.setNanos(ts.getNanos() + nanos);
return ts;
}
private static byte getSeparatorByte(SortOrder sortOrder) {
if (sortOrder == SortOrder.DESC) {
return (byte) (0xFF);
} else {
return (byte) 0;
}
}
public enum RowKeyEncodedType {
STRING,
HEX;
public static RowKeyEncodedType getFormatterType(List<String> primaryKeyTypes, List<SortOrder> primaryKeySortOrders) {
int n = primaryKeyTypes.size();
for (int i = 0; i < n; i++) {
String primaryKeyType = primaryKeyTypes.get(i);
SortOrder sortOrder = primaryKeySortOrders.get(i);
if (!(primaryKeyType.equalsIgnoreCase("varchar") || primaryKeyType.equalsIgnoreCase("hstring"))
|| SortOrder.DESC.equals(sortOrder)) {
return RowKeyEncodedType.HEX;
}
}
return RowKeyEncodedType.STRING;
}
}
}
The RowKeyEncodedType determines how to decode the _id string:
-
HEX: used when any primary key column is not
VARCHAR/HSTRING, or when any sort order isDESC -
STRING: used only when all primary key columns are
VARCHAR/HSTRINGand all sort orders areASC