全部產品
Search
文件中心

Lindorm:使用ES API訪問搜尋索引 - SQL表

更新時間:Dec 23, 2025

Lindorm搜尋引擎(相容Elasticsearch API版本)支援通過開源ES介面訪問搜尋索引,便於已採用ES技術棧的業務平滑遷移。本文以寬表引擎的SQL表情境為例,示範通過ES API訪問搜尋索引的最佳實務。

前提條件

  • 開通搜尋索引服務,要求開通Lindorm搜尋引擎的Elasticsearch相容版本。

  • 推薦使用ES 7.10版本的用戶端訪問;若有其他版本用戶端的訪問需求,可聯絡Lindorm支援人員(DingTalk號:s0s3eg3)諮詢。

資料準備

準備SQL表並寫入範例資料。

寬表引擎建立SQL表

CREATE DATABASE searchindex_db;
USE searchindex_db;

CREATE TABLE search_table (
  user_id BIGINT,
  name VARCHAR,
  age SMALLINT,
  gender VARCHAR,
  address VARCHAR,
  email VARCHAR,
  city VARCHAR,
  PRIMARY KEY (user_id, name)
);

寫入範例資料

UPSERT INTO search_table (user_id,name,age,gender,address,email,city) VALUES (1, '張先生', 18, 'M', '北京市朝陽區', 'a***@example.net', '北京');
UPSERT INTO search_table (user_id,name,age,gender,address,email,city) VALUES (6, '李先生', 32, 'M', '杭州市餘杭區', 'a***@example.net', '杭州');
UPSERT INTO search_table (user_id,name,age,gender,address,email,city) VALUES (20, '王先生', 28, 'M', '杭州市濱江區', 'a***@example.net', '杭州');
UPSERT INTO search_table (user_id,name,age,gender,address,email,city) VALUES (28, '陳女士', 36, 'F', '深圳市南山區', 'a***@example.net', '深圳');

建立搜尋索引

LindormSQL表應通過SQL方式建立搜尋索引,本文僅提供簡單樣本,更詳細的建立方式可參考管理搜尋索引

建立索引

CREATE INDEX idx USING SEARCH ON search_table (
  name,
  age,
  gender,
  address(type=text, analyzer=ik),
  email,
  city
) WITH (numShards=4);

開啟索引source

預設情況下,通過SQL表建立的搜尋索引不在搜尋引擎儲存未經處理資料,以節約儲存空間。如需通過ES API直接擷取未經處理資料,請在建立索引時指定SOURCE_SETTINGS屬性。

重要

若需指定SOURCE_SETTINGS,要求寬表引擎版本不低於2.7.4.1。

CREATE INDEX idx USING SEARCH ON search_table (
  name,
  age,
  gender,
  address(type=text, analyzer=ik),
  email,
  city
) WITH (
  numShards=4,
  SOURCE_SETTINGS='
    {
      "enabled": true
    }
  '
);

擷取Elasticsearch索引名

通過SQL方式建立搜尋索引時,Elasticsearch中對應的索引名稱為寬表namespace名.寬表Table名.搜尋索引名。以上樣本對應的Elasticsearch索引名為searchindex_db.search_table.idx登入UI介面並執行下述查詢,可以查到對應的索引資訊。

GET searchindex_db.search_table.idx

寬表與Elasticsearch索引欄位的映射關係

預設情況下,SQL表和Elasticsearch索引的欄位名是一一對應的關係。比如上述樣本中,最終的Elasticsearch索引mappings結構如下:

{
  "mappings": {
    "dynamic": "true",
    "dynamic_templates": [],
    "properties": {
      "_searchindex_id": {
        "type": "keyword",
        "index": false
      },
      "address": {
        "type": "text",
        "analyzer": "ik_max_word"
      },
      "age": {
        "type": "integer"
      },
      "city": {
        "type": "keyword"
      },
      "delete_version_l": {
        "type": "long"
      },
      "email": {
        "type": "keyword"
      },
      "gender": {
        "type": "keyword"
      },
      "update_version_l": {
        "type": "long"
      }
    }
  }
}

SQL表中的age列對應索引中的age欄位,gender列對應索引中的gender欄位,以此類推。索引中存在部分自動添加的內建欄位(如update_version_l),這些欄位用於內部的資料同步等用途,不影響正常使用,無需關注。

寬表與Elasticsearch索引欄位的預設類型映射關係

寬表和搜尋的類型映射關係,預設如下:

寬表類型

Elasticsearch類型

BOOLEAN/HBOOLEAN

bool

BYTE/SHORT/HSHORT/INT/HINTEGER/UNSIGNED_BYTE/UNSIGNED_SHORT/UNSIGNED_INTEGER

integer

LONG/HLONG/UNSIGNED_LONG/TIMESTAMP

long

FLOAT/HFLOAT/UNSIGNED_FLOAT

float

DOUBLE/HDOUBLE/UNSIGNED_DOUBLE

double

STRING/HSTRING

keyword

CHAR/BINARY/VARBINARY

keyword

JSON

object

寬表與Elasticsearch索引的主鍵映射關係

推薦使用方式

對SQL表而言,索引_id列映射回寬表主鍵的邏輯相對複雜,若無節約儲存空間需求,可直接開啟索引source,無需從寬表回查。

若需兼顧節約空間和使用便捷的需求,推薦將寬表主鍵列加入索引列列表中,指定僅儲存主鍵列的未經處理資料,可直接從ES查詢結果擷取寬表的主鍵列。例如以下SQL將寬表的主鍵列user_id, name都加入索引列中,source可以指定對user_id, name兩個列開啟,即可通過ES API擷取寬表的主鍵列。

CREATE INDEX idx USING SEARCH ON search_table (
  user_id,
  name,
  age,
  gender,
  address(type=text, analyzer=ik),
  email,
  city
) WITH (
  numShards=4,
  SOURCE_SETTINGS='
    {
      "includes": ["user_id", "name"]
    }
  '
);

上述建立索引語句,通過ES API查詢:

GET /searchindex_db.search_table.idx/_search

返回如下:

{
  "took": 1,
  "timed_out": false,
  "_shards": {
    "total": 4,
    "successful": 4,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 4,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": "searchindex_db.search_table.idx",
        "_id": "8000000000000006e69d8ee58588e7949f00",
        "_score": 1,
        "_source": {
          "user_id": 6,
          "name": "李先生"
        }
      },
      {
        "_index": "searchindex_db.search_table.idx",
        "_id": "8000000000000014e78e8be58588e7949f00",
        "_score": 1,
        "_source": {
          "user_id": 20,
          "name": "王先生"
        }
      },
      {
        "_index": "searchindex_db.search_table.idx",
        "_id": "8000000000000001e5bca0e58588e7949f00",
        "_score": 1,
        "_source": {
          "user_id": 1,
          "name": "張先生"
        }
      },
      {
        "_index": "searchindex_db.search_table.idx",
        "_id": "800000000000001ce99988e5a5b3e5a3ab00",
        "_score": 1,
        "_source": {
          "user_id": 28,
          "name": "陳女士"
        }
      }
    ]
  }
}

索引_id和寬表主鍵映射關係

如果您不使用上述推薦方式,請參考本章節完成_id列與寬表主鍵的轉換操作。

寬表主鍵與Elasticsearch索引的_id列通過索引同步建立一一映射:每行寬表資料對應唯一索引文檔。若建立索引時設定source=false(不儲存未經處理資料),則需根據主鍵回查寬表擷取未經處理資料。以下程式碼範例展示如何將Elasticsearch索引的_id列映射為寬表主鍵列。

說明

以下程式碼範例僅考慮寬表已支援的基礎資料類型

package org.example;

import com.alibaba.lindorm.client.core.utils.DataTypeUtils;
import com.alibaba.lindorm.client.schema.DataType;
import com.alibaba.lindorm.client.schema.SortOrder;
import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.hbase.util.Bytes;

import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;

public class SearchDocIdToLindormColumnValues {

    public static void main(String[] args) throws Exception {
        List<String> primaryKeyTypes = new ArrayList<>();
        List<SortOrder> primaryKeySortOrders = new ArrayList<>();

        // this is "_id" value in Elasticsearch index
        String id = "00818001800000018000000000000001bf8ccccebff199999999999bc10b02007465737400800001992c36f418800001992c36f418800001992c36f41800000000e6b58be8af95e5ad97e7aca6e4b8b2";
        // get primary key types and sort orders
        getPrimaryKeyTypesAndSortOrders(primaryKeyTypes, primaryKeySortOrders);

        // primary keys in desc type
        // String id = "017e7ffe7ffffffe7ffffffffffffffe40733331400e6666666666643febf4feff8b9a8c8bff7ffffe66d3c90be77ffffe66d3c90be77ffffe66d3c90be7ffffffffe6b58be8af95e5ad97e7aca6e4b8b2";
        // getPrimaryKeyTypesAndSortOrdersDesc(primaryKeyTypes, primaryKeySortOrders);

        // get rowKey encoded type from search index
        RowKeyEncodedType rowKeyEncodedType = RowKeyEncodedType.getFormatterType(primaryKeyTypes, primaryKeySortOrders);

        byte[] rowKey;
        if (RowKeyEncodedType.HEX.equals(rowKeyEncodedType)) {
            rowKey = Hex.decodeHex(id.toCharArray());
        } else {
            rowKey = Bytes.toBytes(id);
        }

        // parse rowKey in Lindorm to several primary key values
        List<Object> values = parseRowKeyToValues(rowKey, primaryKeyTypes, primaryKeySortOrders);
        for (Object value : values) {
            System.out.println(value);
        }
    }

    private static void getPrimaryKeyTypesAndSortOrders(List<String> primaryKeyTypes, List<SortOrder> primaryKeySortOrders) {
        // sample here, you can get this through mysql connection by 'DESCRIBE TABLE' and get real schema from your table
        // OR you can simply write down your primary types in order here
        primaryKeyTypes.add("BOOLEAN");
        primaryKeyTypes.add("TINYINT");
        primaryKeyTypes.add("SMALLINT");
        primaryKeyTypes.add("INT");
        primaryKeyTypes.add("BIGINT");
        primaryKeyTypes.add("FLOAT");
        primaryKeyTypes.add("DOUBLE");
        primaryKeyTypes.add("DECIMAL(10,2)");
        primaryKeyTypes.add("VARCHAR");
        primaryKeyTypes.add("DATE");
        primaryKeyTypes.add("TIME");
        primaryKeyTypes.add("TIMESTAMP");
        primaryKeyTypes.add("VARBINARY");

        for (int i = 0; i < 13; ++i) {
            primaryKeySortOrders.add(SortOrder.ASC);
        }
    }

    private static void getPrimaryKeyTypesAndSortOrdersDesc(List<String> primaryKeyTypes, List<SortOrder> primaryKeySortOrders) {
        // sample here, you can get this through mysql connection by 'DESCRIBE TABLE' and get real schema from your table
        // OR you can simply write down your primary types in order here
        primaryKeyTypes.add("BOOLEAN");
        primaryKeyTypes.add("TINYINT");
        primaryKeyTypes.add("SMALLINT");
        primaryKeyTypes.add("INT");
        primaryKeyTypes.add("BIGINT");
        primaryKeyTypes.add("FLOAT");
        primaryKeyTypes.add("DOUBLE");
        primaryKeyTypes.add("DECIMAL(10,2)");
        primaryKeyTypes.add("VARCHAR");
        primaryKeyTypes.add("DATE");
        primaryKeyTypes.add("TIME");
        primaryKeyTypes.add("TIMESTAMP");
        primaryKeyTypes.add("VARBINARY");

        for (int i = 0; i < 12; ++i) {
            primaryKeySortOrders.add(SortOrder.DESC);
        }
        primaryKeySortOrders.add(SortOrder.ASC);
    }

    private static List<Object> parseRowKeyToValues(byte[] rowKey, List<String> primaryKeyTypes, List<SortOrder> primaryKeySortOrders) {
        List<Object> decoded = new ArrayList<>(primaryKeyTypes.size());
        int pos = 0;
        for (int i = 0; i < primaryKeyTypes.size(); i++) {
            String primaryKeyType = primaryKeyTypes.get(i).toLowerCase();
            int start = pos;
            if (primaryKeyType.equalsIgnoreCase("varchar")) {
                while (rowKey[pos] != getSeparatorByte(primaryKeySortOrders.get(i))) {
                    ++pos;
                }
                decoded.add(decodeString(rowKey, start, pos - start, primaryKeySortOrders.get(i)));
                ++pos;
            } else if (primaryKeyType.equalsIgnoreCase("boolean")) {
                decoded.add(decodeBoolean(rowKey, pos, primaryKeySortOrders.get(i)));
                pos += Bytes.SIZEOF_BOOLEAN;
            } else if (primaryKeyType.equalsIgnoreCase("tinyint")) {
                decoded.add(decodeByte(rowKey, pos, primaryKeySortOrders.get(i)));
                pos += Bytes.SIZEOF_BYTE;
            } else if (primaryKeyType.equalsIgnoreCase("smallint")) {
                decoded.add(decodeShort(rowKey, pos, primaryKeySortOrders.get(i)));
                pos += Bytes.SIZEOF_SHORT;
            } else if (primaryKeyType.equalsIgnoreCase("int")) {
                decoded.add(decodeInt(rowKey, pos, primaryKeySortOrders.get(i)));
                pos += Bytes.SIZEOF_INT;
            } else if (primaryKeyType.equalsIgnoreCase("bigint")) {
                decoded.add(decodeLong(rowKey, start, primaryKeySortOrders.get(i)));
                pos += Bytes.SIZEOF_LONG;
            } else if (primaryKeyType.equalsIgnoreCase("float")) {
                decoded.add(decodeFloat(rowKey, start, primaryKeySortOrders.get(i)));
                pos += Bytes.SIZEOF_FLOAT;
            } else if (primaryKeyType.equalsIgnoreCase("double")) {
                decoded.add(decodeDouble(rowKey, start, primaryKeySortOrders.get(i)));
                pos += Bytes.SIZEOF_DOUBLE;
            } else if (primaryKeyType.contains("decimal")) {
                while (rowKey[pos] != getSeparatorByte(primaryKeySortOrders.get(i))) {
                    ++pos;
                }
                decoded.add(decodeDecimal(rowKey, start, pos - start, primaryKeySortOrders.get(i)));
                ++pos;
            } else if (primaryKeyType.contains("varbinary")) {
                decoded.add(decodeVarbinary(rowKey, start, rowKey.length - start, primaryKeySortOrders.get(i)));
                pos = rowKey.length;
            } else if (primaryKeyType.contains("date")) {
                decoded.add(decodeDate(rowKey, start, primaryKeySortOrders.get(i)));
                pos += Bytes.SIZEOF_LONG;
            } else if (primaryKeyType.contains("time")) {
                decoded.add(decodeTime(rowKey, start, primaryKeySortOrders.get(i)));
                pos += Bytes.SIZEOF_LONG;
            } else if (primaryKeyType.contains("timestamp")) {
                decoded.add(decodeTimestamp(rowKey, start, primaryKeySortOrders.get(i)));
                pos += Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT;
            }
        }

        return decoded;
    }

    private static Object decodeString(byte[] bytes, int offset, int length, SortOrder sortOrder) {
        if (SortOrder.ASC.equals(sortOrder)) {
            return new String(bytes, offset, length, StandardCharsets.UTF_8);
        } else {
            byte[] reversed = new byte[length];
            for (int i = 0; i < length; i++) {
                reversed[i] = (byte) (bytes[offset + i] ^ 0xFF);
            }
            return new String(reversed, StandardCharsets.UTF_8);
        }
    }

    private static Object decodeBoolean(byte[] bytes, int offset, SortOrder sortOrder) {
        return ((bytes[offset] == 0) == SortOrder.DESC.equals(sortOrder));
    }

    private static Object decodeByte(byte[] bytes, int offset, SortOrder sortOrder) {
        return DataTypeUtils.decodeByte(bytes, offset, sortOrder);
    }

    private static Object decodeShort(byte[] bytes, int offset, SortOrder sortOrder) {
        return DataTypeUtils.decodeShort(bytes, offset, sortOrder);
    }

    private static Object decodeInt(byte[] bytes, int offset, SortOrder sortOrder) {
        return DataTypeUtils.decodeInt(bytes, offset, sortOrder);
    }

    private static Object decodeLong(byte[] bytes, int offset, SortOrder sortOrder) {
        return DataTypeUtils.decodeLong(bytes, offset, sortOrder);
    }

    private static Object decodeFloat(byte[] bytes, int offset, SortOrder sortOrder) {
        return DataTypeUtils.decodeFloat(bytes, offset, sortOrder);
    }

    private static Object decodeDouble(byte[] bytes, int offset, SortOrder sortOrder) {
        return DataTypeUtils.decodeDouble(bytes, offset, sortOrder);
    }

    private static Object decodeDecimal(byte[] bytes, int offset, int length, SortOrder sortOrder) {
        if (SortOrder.DESC.equals(sortOrder)) {
            return DataTypeUtils.decodeDecimal(bytes, offset, length, sortOrder, DataType.DECIMAL_V2);
        }
        return DataTypeUtils.decodeDecimal(bytes, offset, length, sortOrder);
    }

    private static Object decodeVarbinary(byte[] bytes, int offset, int length, SortOrder sortOrder) {
        byte[] result = new byte[length];
        if (SortOrder.DESC.equals(sortOrder)) {
            DataTypeUtils.invert(bytes, offset, result, 0, length);
        } else {
            System.arraycopy(bytes, offset, result, 0, length);
        }
        return result;
    }

    private static Object decodeDate(byte[] bytes, int offset, SortOrder sortOrder) {
        Long result = (Long) decodeLong(bytes, offset, sortOrder);
        return new Date(result);
    }

    private static Object decodeTime(byte[] bytes, int offset, SortOrder sortOrder) {
        Long result = (Long) decodeLong(bytes, offset, sortOrder);
        return new Time(result);
    }

    private static Object decodeTimestamp(byte[] bytes, int offset, SortOrder sortOrder) {
        long ms = DataTypeUtils.decodeLong(bytes, offset, sortOrder);
        int nanos = DataTypeUtils.decodeUnsignedInt(bytes, offset + Bytes.SIZEOF_LONG, sortOrder);
        Timestamp ts = new Timestamp(ms);
        ts.setNanos(ts.getNanos() + nanos);
        return ts;
    }

    private static byte getSeparatorByte(SortOrder sortOrder) {
        if (sortOrder == SortOrder.DESC) {
            return (byte) (0xFF);
        } else {
            return (byte) 0;
        }
    }

    public enum RowKeyEncodedType {
        STRING,
        HEX;

        public static RowKeyEncodedType getFormatterType(List<String> primaryKeyTypes, List<SortOrder> primaryKeySortOrders) {
            int n = primaryKeyTypes.size();
            for (int i = 0; i < n; i++) {
                String primaryKeyType = primaryKeyTypes.get(i);
                SortOrder sortOrder = primaryKeySortOrders.get(i);
                if (!(primaryKeyType.equalsIgnoreCase("varchar") || primaryKeyType.equalsIgnoreCase("hstring"))
                    || SortOrder.DESC.equals(sortOrder)) {
                    return RowKeyEncodedType.HEX;
                }
            }
            return RowKeyEncodedType.STRING;
        }
    }
}