All Products
Search
Document Center

Lindorm:Access a search index by using the ES API - SQL tables

Last Updated:Mar 30, 2026

The Lindorm search engine is compatible with the open source Elasticsearch (ES) API, letting you access search indexes and migrate services built on the ES technology stack with minimal changes. This topic uses an SQL table in LindormTable as an example to demonstrate how to access a search index using the ES API.

Prerequisites

Before you begin, ensure that you have:

  • Enabled the search index service using the Elasticsearch-compatible version of the Lindorm search engine

  • An ES 7.10 client (to use a different version, contact Lindorm technical support at DingTalk ID: s0s3eg3)

Key concepts

The following table maps LindormTable SQL concepts to their Elasticsearch equivalents. Use this as a reference when reading ES API documentation alongside Lindorm SQL documentation.

LindormTable SQL Elasticsearch Description
Column Field The named entry that stores a single value
Row Document A collection of fields; each wide table row maps to one ES document
Table Index The target against which queries are run
Database (namespace) Index prefix The namespace in the ES index name format: namespace.table.index

Prepare data

Create an SQL table in LindormTable

CREATE DATABASE searchindex_db;
USE searchindex_db;

CREATE TABLE search_table (
  user_id BIGINT,
  name VARCHAR,
  age SMALLINT,
  gender VARCHAR,
  address VARCHAR,
  email VARCHAR,
  city VARCHAR,
  PRIMARY KEY (user_id, name)
);

Write sample data

UPSERT INTO search_table (user_id,name,age,gender,address,email,city) VALUES (1, 'Mr. Zhang', 18, 'M', 'Chaoyang District, Beijing', 'a***@example.net', 'Beijing');
UPSERT INTO search_table (user_id,name,age,gender,address,email,city) VALUES (6, 'Mr. Li', 32, 'M', 'Yuhang District, Hangzhou', 'a***@example.net', 'Hangzhou');
UPSERT INTO search_table (user_id,name,age,gender,address,email,city) VALUES (20, 'Mr. Wang', 28, 'M', 'Binjiang District, Hangzhou', 'a***@example.net', 'Hangzhou');
UPSERT INTO search_table (user_id,name,age,gender,address,email,city) VALUES (28, 'Ms. Chen', 36, 'F', 'Nanshan District, Shenzhen', 'a***@example.net', 'Shenzhen');

Create a search index

For SQL tables, create search indexes using SQL. Before writing the CREATE INDEX statement, decide how much raw data the index should store, because this choice affects both storage cost and how you retrieve primary keys from ES query results.

Storage option How to configure Trade-off
Store primary key columns only (recommended) SOURCE_SETTINGS='{"includes": ["user_id", "name"]}' Saves space; primary keys are available directly in ES results, no need to decode _id
Store all fields SOURCE_SETTINGS='{"enabled": true}' Convenient for direct retrieval; uses more storage
Store nothing (default) Omit SOURCE_SETTINGS Minimum storage; you must decode _id to query the wide table for raw data
Important

SOURCE_SETTINGS requires LindormTable version 2.7.4.1 or later.

Recommended: store primary key columns only

Add the primary key columns (user_id, name) to the index column list and configure SOURCE_SETTINGS to store only those columns. This lets you retrieve primary keys directly from ES results without decoding _id.

CREATE INDEX idx USING SEARCH ON search_table (
  user_id,
  name,
  age,
  gender,
  address(type=text, analyzer=ik),
  email,
  city
) WITH (
  numShards=4,
  SOURCE_SETTINGS='
    {
      "includes": ["user_id", "name"]
    }
  '
);

The non-primary-key fields (age, city, gender, and so on) remain indexed and searchable — they are just not stored in _source.

Store all fields

To retrieve any field directly from ES results without querying the wide table:

CREATE INDEX idx USING SEARCH ON search_table (
  name,
  age,
  gender,
  address(type=text, analyzer=ik),
  email,
  city
) WITH (
  numShards=4,
  SOURCE_SETTINGS='
    {
      "enabled": true
    }
  '
);

Store nothing (default)

To minimize storage, omit SOURCE_SETTINGS:

CREATE INDEX idx USING SEARCH ON search_table (
  name,
  age,
  gender,
  address(type=text, analyzer=ik),
  email,
  city
) WITH (numShards=4);

With this option, raw data is not stored in the search engine. To retrieve a row's full data, decode the _id field to obtain the wide table primary key, then query the wide table. See Map the index _id to the wide table primary key for details.

For more index creation options, see Manage search indexes.

Get the Elasticsearch index name

When you create a search index using SQL, the corresponding Elasticsearch index name follows the format namespace_name.table_name.search_index_name. For the example above, the name is searchindex_db.search_table.idx.

To verify the index, log on to the UI and run:

GET searchindex_db.search_table.idx

Field mapping

Field names

Field names map one-to-one between the SQL table and the Elasticsearch index. For example, after creating the index with the recommended statement above, the Elasticsearch mapping looks like this:

{
  "mappings": {
    "dynamic": "true",
    "dynamic_templates": [],
    "properties": {
      "_searchindex_id": {
        "type": "keyword",
        "index": false
      },
      "address": {
        "type": "text",
        "analyzer": "ik_max_word"
      },
      "age": {
        "type": "integer"
      },
      "city": {
        "type": "keyword"
      },
      "delete_version_l": {
        "type": "long"
      },
      "email": {
        "type": "keyword"
      },
      "gender": {
        "type": "keyword"
      },
      "update_version_l": {
        "type": "long"
      }
    }
  }
}

The index also contains built-in fields such as update_version_l and _searchindex_id. These are used for internal data synchronization and can be ignored.

Data types

The following table shows the default type mapping from SQL table columns to Elasticsearch fields.

Wide table type Elasticsearch type
BOOLEAN/HBOOLEAN bool
BYTE/SHORT/HSHORT/INT/HINTEGER/UNSIGNED_BYTE/UNSIGNED_SHORT/UNSIGNED_INTEGER integer
LONG/HLONG/UNSIGNED_LONG/TIMESTAMP long
FLOAT/HFLOAT/UNSIGNED_FLOAT float
DOUBLE/HDOUBLE/UNSIGNED_DOUBLE double
STRING/HSTRING keyword
CHAR/BINARY/VARBINARY keyword
JSON object

keyword fields support exact-match queries. If you need full-text search on a field, set its type explicitly to text when creating the index (for example, address(type=text, analyzer=ik)).

Query using the ES API

After creating the index with the recommended SOURCE_SETTINGS, query it using the ES API:

GET /searchindex_db.search_table.idx/_search

The response includes _source with the stored primary key columns:

{
  "took": 1,
  "timed_out": false,
  "_shards": {
    "total": 4,
    "successful": 4,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 4,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": "searchindex_db.search_table.idx",
        "_id": "8000000000000006e69d8ee58588e7949f00",
        "_score": 1,
        "_source": {
          "user_id": 6,
          "name": "Mr. Li"
        }
      },
      {
        "_index": "searchindex_db.search_table.idx",
        "_id": "8000000000000014e78e8be58588e7949f00",
        "_score": 1,
        "_source": {
          "user_id": 20,
          "name": "Mr. Wang"
        }
      },
      {
        "_index": "searchindex_db.search_table.idx",
        "_id": "8000000000000001e5bca0e58588e7949f00",
        "_score": 1,
        "_source": {
          "user_id": 1,
          "name": "Mr. Zhang"
        }
      },
      {
        "_index": "searchindex_db.search_table.idx",
        "_id": "800000000000001ce99988e5a5b3e5a3ab00",
        "_score": 1,
        "_source": {
          "user_id": 28,
          "name": "Ms. Chen"
        }
      }
    ]
  }
}

Each _source object contains the user_id and name primary key columns. The other indexed fields (age, city, gender, and so on) remain searchable but are not returned in _source. Use the primary key values to query the wide table directly when you need the full row.

Map the index _id to the wide table primary key

Note

Skip this section if you used SOURCE_SETTINGS to store the primary key columns in the index — you can retrieve primary keys directly from _source.

Each wide table row maps one-to-one to an Elasticsearch document through index synchronization. If you did not store raw data in the index, decode the _id field to get the primary key, then query the wide table.

Note

The following example covers only the basic data types supported by wide tables.

package org.example;

import com.alibaba.lindorm.client.core.utils.DataTypeUtils;
import com.alibaba.lindorm.client.schema.DataType;
import com.alibaba.lindorm.client.schema.SortOrder;
import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.hbase.util.Bytes;

import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;

public class SearchDocIdToLindormColumnValues {

    public static void main(String[] args) throws Exception {
        List<String> primaryKeyTypes = new ArrayList<>();
        List<SortOrder> primaryKeySortOrders = new ArrayList<>();

        // this is "_id" value in ElasticSearch index
        String id = "00818001800000018000000000000001bf8ccccebff199999999999bc10b02007465737400800001992c36f418800001992c36f418800001992c36f418000000007465737420737472696e67";
        // get primary key types and sort orders
        getPrimaryKeyTypesAndSortOrders(primaryKeyTypes, primaryKeySortOrders);

        // primary keys in desc type
        // String id = "017e7ffe7ffffffe7ffffffffffffffe40733331400e6666666666643febf4feff8b9a8c8bff7ffffe66d3c90be77ffffe66d3c90be77ffffe66d3c90be7ffffffff7465737420737472696e67";
        // getPrimaryKeyTypesAndSortOrdersDesc(primaryKeyTypes, primaryKeySortOrders);

        // get rowKey encoded type from search index
        RowKeyEncodedType rowKeyEncodedType = RowKeyEncodedType.getFormatterType(primaryKeyTypes, primaryKeySortOrders);

        byte[] rowKey;
        if (RowKeyEncodedType.HEX.equals(rowKeyEncodedType)) {
            rowKey = Hex.decodeHex(id.toCharArray());
        } else {
            rowKey = Bytes.toBytes(id);
        }

        // parse rowKey in Lindorm to several primary key values
        List<Object> values = parseRowKeyToValues(rowKey, primaryKeyTypes, primaryKeySortOrders);
        for (Object value : values) {
            System.out.println(value);
        }
    }

    private static void getPrimaryKeyTypesAndSortOrders(List<String> primaryKeyTypes, List<SortOrder> primaryKeySortOrders) {
        // sample here, you can get this through mysql connection by 'DESCRIBE TABLE' and get real schema from your table
        // OR you can simply write down your primary types in order here
        primaryKeyTypes.add("BOOLEAN");
        primaryKeyTypes.add("TINYINT");
        primaryKeyTypes.add("SMALLINT");
        primaryKeyTypes.add("INT");
        primaryKeyTypes.add("BIGINT");
        primaryKeyTypes.add("FLOAT");
        primaryKeyTypes.add("DOUBLE");
        primaryKeyTypes.add("DECIMAL(10,2)");
        primaryKeyTypes.add("VARCHAR");
        primaryKeyTypes.add("DATE");
        primaryKeyTypes.add("TIME");
        primaryKeyTypes.add("TIMESTAMP");
        primaryKeyTypes.add("VARBINARY");

        for (int i = 0; i < 13; ++i) {
            primaryKeySortOrders.add(SortOrder.ASC);
        }
    }

    private static void getPrimaryKeyTypesAndSortOrdersDesc(List<String> primaryKeyTypes, List<SortOrder> primaryKeySortOrders) {
        // sample here, you can get this through mysql connection by 'DESCRIBE TABLE' and get real schema from your table
        // OR you can simply write down your primary types in order here
        primaryKeyTypes.add("BOOLEAN");
        primaryKeyTypes.add("TINYINT");
        primaryKeyTypes.add("SMALLINT");
        primaryKeyTypes.add("INT");
        primaryKeyTypes.add("BIGINT");
        primaryKeyTypes.add("FLOAT");
        primaryKeyTypes.add("DOUBLE");
        primaryKeyTypes.add("DECIMAL(10,2)");
        primaryKeyTypes.add("VARCHAR");
        primaryKeyTypes.add("DATE");
        primaryKeyTypes.add("TIME");
        primaryKeyTypes.add("TIMESTAMP");
        primaryKeyTypes.add("VARBINARY");

        for (int i = 0; i < 12; ++i) {
            primaryKeySortOrders.add(SortOrder.DESC);
        }
        primaryKeySortOrders.add(SortOrder.ASC);
    }

    private static List<Object> parseRowKeyToValues(byte[] rowKey, List<String> primaryKeyTypes, List<SortOrder> primaryKeySortOrders) {
        List<Object> decoded = new ArrayList<>(primaryKeyTypes.size());
        int pos = 0;
        for (int i = 0; i < primaryKeyTypes.size(); i++) {
            String primaryKeyType = primaryKeyTypes.get(i).toLowerCase();
            int start = pos;
            if (primaryKeyType.equalsIgnoreCase("varchar")) {
                while (rowKey[pos] != getSeparatorByte(primaryKeySortOrders.get(i))) {
                    ++pos;
                }
                decoded.add(decodeString(rowKey, start, pos - start, primaryKeySortOrders.get(i)));
                ++pos;
            } else if (primaryKeyType.equalsIgnoreCase("boolean")) {
                decoded.add(decodeBoolean(rowKey, pos, primaryKeySortOrders.get(i)));
                pos += Bytes.SIZEOF_BOOLEAN;
            } else if (primaryKeyType.equalsIgnoreCase("tinyint")) {
                decoded.add(decodeByte(rowKey, pos, primaryKeySortOrders.get(i)));
                pos += Bytes.SIZEOF_BYTE;
            } else if (primaryKeyType.equalsIgnoreCase("smallint")) {
                decoded.add(decodeShort(rowKey, pos, primaryKeySortOrders.get(i)));
                pos += Bytes.SIZEOF_SHORT;
            } else if (primaryKeyType.equalsIgnoreCase("int")) {
                decoded.add(decodeInt(rowKey, pos, primaryKeySortOrders.get(i)));
                pos += Bytes.SIZEOF_INT;
            } else if (primaryKeyType.equalsIgnoreCase("bigint")) {
                decoded.add(decodeLong(rowKey, start, primaryKeySortOrders.get(i)));
                pos += Bytes.SIZEOF_LONG;
            } else if (primaryKeyType.equalsIgnoreCase("float")) {
                decoded.add(decodeFloat(rowKey, start, primaryKeySortOrders.get(i)));
                pos += Bytes.SIZEOF_FLOAT;
            } else if (primaryKeyType.equalsIgnoreCase("double")) {
                decoded.add(decodeDouble(rowKey, start, primaryKeySortOrders.get(i)));
                pos += Bytes.SIZEOF_DOUBLE;
            } else if (primaryKeyType.contains("decimal")) {
                while (rowKey[pos] != getSeparatorByte(primaryKeySortOrders.get(i))) {
                    ++pos;
                }
                decoded.add(decodeDecimal(rowKey, start, pos - start, primaryKeySortOrders.get(i)));
                ++pos;
            } else if (primaryKeyType.contains("varbinary")) {
                decoded.add(decodeVarbinary(rowKey, start, rowKey.length - start, primaryKeySortOrders.get(i)));
                pos = rowKey.length;
            } else if (primaryKeyType.contains("date")) {
                decoded.add(decodeDate(rowKey, start, primaryKeySortOrders.get(i)));
                pos += Bytes.SIZEOF_LONG;
            } else if (primaryKeyType.contains("time")) {
                decoded.add(decodeTime(rowKey, start, primaryKeySortOrders.get(i)));
                pos += Bytes.SIZEOF_LONG;
            } else if (primaryKeyType.contains("timestamp")) {
                decoded.add(decodeTimestamp(rowKey, start, primaryKeySortOrders.get(i)));
                pos += Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT;
            }
        }

        return decoded;
    }

    private static Object decodeString(byte[] bytes, int offset, int length, SortOrder sortOrder) {
        if (SortOrder.ASC.equals(sortOrder)) {
            return new String(bytes, offset, length, StandardCharsets.UTF_8);
        } else {
            byte[] reversed = new byte[length];
            for (int i = 0; i < length; i++) {
                reversed[i] = (byte) (bytes[offset + i] ^ 0xFF);
            }
            return new String(reversed, StandardCharsets.UTF_8);
        }
    }

    private static Object decodeBoolean(byte[] bytes, int offset, SortOrder sortOrder) {
        return ((bytes[offset] == 0) == SortOrder.DESC.equals(sortOrder));
    }

    private static Object decodeByte(byte[] bytes, int offset, SortOrder sortOrder) {
        return DataTypeUtils.decodeByte(bytes, offset, sortOrder);
    }

    private static Object decodeShort(byte[] bytes, int offset, SortOrder sortOrder) {
        return DataTypeUtils.decodeShort(bytes, offset, sortOrder);
    }

    private static Object decodeInt(byte[] bytes, int offset, SortOrder sortOrder) {
        return DataTypeUtils.decodeInt(bytes, offset, sortOrder);
    }

    private static Object decodeLong(byte[] bytes, int offset, SortOrder sortOrder) {
        return DataTypeUtils.decodeLong(bytes, offset, sortOrder);
    }

    private static Object decodeFloat(byte[] bytes, int offset, SortOrder sortOrder) {
        return DataTypeUtils.decodeFloat(bytes, offset, sortOrder);
    }

    private static Object decodeDouble(byte[] bytes, int offset, SortOrder sortOrder) {
        return DataTypeUtils.decodeDouble(bytes, offset, sortOrder);
    }

    private static Object decodeDecimal(byte[] bytes, int offset, int length, SortOrder sortOrder) {
        if (SortOrder.DESC.equals(sortOrder)) {
            return DataTypeUtils.decodeDecimal(bytes, offset, length, sortOrder, DataType.DECIMAL_V2);
        }
        return DataTypeUtils.decodeDecimal(bytes, offset, length, sortOrder);
    }

    private static Object decodeVarbinary(byte[] bytes, int offset, int length, SortOrder sortOrder) {
        byte[] result = new byte[length];
        if (SortOrder.DESC.equals(sortOrder)) {
            DataTypeUtils.invert(bytes, offset, result, 0, length);
        } else {
            System.arraycopy(bytes, offset, result, 0, length);
        }
        return result;
    }

    private static Object decodeDate(byte[] bytes, int offset, SortOrder sortOrder) {
        Long result = (Long) decodeLong(bytes, offset, sortOrder);
        return new Date(result);
    }

    private static Object decodeTime(byte[] bytes, int offset, SortOrder sortOrder) {
        Long result = (Long) decodeLong(bytes, offset, sortOrder);
        return new Time(result);
    }

    private static Object decodeTimestamp(byte[] bytes, int offset, SortOrder sortOrder) {
        long ms = DataTypeUtils.decodeLong(bytes, offset, sortOrder);
        int nanos = DataTypeUtils.decodeUnsignedInt(bytes, offset + Bytes.SIZEOF_LONG, sortOrder);
        Timestamp ts = new Timestamp(ms);
        ts.setNanos(ts.getNanos() + nanos);
        return ts;
    }

    private static byte getSeparatorByte(SortOrder sortOrder) {
        if (sortOrder == SortOrder.DESC) {
            return (byte) (0xFF);
        } else {
            return (byte) 0;
        }
    }

    public enum RowKeyEncodedType {
        STRING,
        HEX;

        public static RowKeyEncodedType getFormatterType(List<String> primaryKeyTypes, List<SortOrder> primaryKeySortOrders) {
            int n = primaryKeyTypes.size();
            for (int i = 0; i < n; i++) {
                String primaryKeyType = primaryKeyTypes.get(i);
                SortOrder sortOrder = primaryKeySortOrders.get(i);
                if (!(primaryKeyType.equalsIgnoreCase("varchar") || primaryKeyType.equalsIgnoreCase("hstring"))
                    || SortOrder.DESC.equals(sortOrder)) {
                    return RowKeyEncodedType.HEX;
                }
            }
            return RowKeyEncodedType.STRING;
        }
    }
}

The RowKeyEncodedType determines how to decode the _id string:

  • HEX: used when any primary key column is not VARCHAR/HSTRING, or when any sort order is DESC

  • STRING: used only when all primary key columns are VARCHAR/HSTRING and all sort orders are ASC

Next steps