All Products
Search
Document Center

Lindorm:Access column-oriented data

Last Updated:Mar 28, 2026

Lindorm column-oriented storage is a columnar distributed storage service that stores and processes data by column. Compared with row-oriented storage, it reduces query response time and saves I/O resources. Use the Lindorm compute engine to create namespaces and tables, read and write data, compact files, and configure tiered storage.

Overview

Column-oriented storage is suited for large amounts of semi-structured and structured data in scenarios such as Internet of Vehicles (IoV), Internet of Things (IoT), orders, and logs. Key capabilities include:

  • Computing and analytics: Perform interactive analytics and online computing on column-oriented data. Rich indexing capabilities accelerate data positioning. Add, remove, modify, and query large amounts of primary key data using SQL statements.

  • High throughput: Supports horizontal scaling with the ability to read and write terabytes of data per minute—suited for quick IoV data import, model training dataset access, and large-scale report analysis.

  • Cost-effectiveness: Uses columnar high compression ratio algorithms, high-density low-cost media, hot and cold data separation, compression encoding, and cold data archive storage to reduce storage costs.

  • High availability: Uses erasure coding to ensure the high availability of distributed datasets and eliminate the risk of single points of failure (SPOFs).

  • Open source compatibility: Compatible with the Iceberg API and connects to compute engines such as Spark and Flink for seamless integration into mainstream data ecosystems.

  • Hot and cold data separation: Store hot and cold data on different storage media based on your business requirements to reduce performance overhead and lower storage costs.

Prerequisites

Before you begin, ensure that you have:

DDL

Namespace

Create a namespace (database).

USE lindorm_columnar;
CREATE NAMESPACE mydb;

Delete a namespace (database).

USE lindorm_columnar;
DROP NAMESPACE mydb;

Table

Create a table.

Create a table

The following example creates a primary key table with a regular partition and a bucket partition:

USE lindorm_columnar;
CREATE TABLE mydb.mytable (
  id   INT    NOT NULL,
  city STRING NOT NULL,
  name STRING,
  score INT)
PARTITIONED BY (city, bucket(128, id))
TBLPROPERTIES (
  'primary-key' = 'id,city');

Primary key vs. non-primary key tables

Primary key tableNon-primary key table
How to createSpecify 'primary-key' in TBLPROPERTIESOmit 'primary-key' from TBLPROPERTIES
Data uniquenessPrimary key is unique; duplicate writes overwrite existing dataNo uniqueness guarantee; duplicate rows may exist
PartitioningRequired; partition expression fields must be primary key fields; the last-level partition must be a bucket partitionNot required
Supported primary key typesBOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, STRING, BINARYN/A

Partition expressions

Use PARTITIONED BY to specify how data is distributed across partitions. The syntax is:

PARTITIONED BY ([regular partition expression, ...] {bucket(bucketNum, bucketCol)})

The following table describes the available partition transform functions:

FunctionDescriptionSupported typesUsage notes
bucket(N, col)Distributes data into N shards by hashing the column valueBOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, STRING, BINARYbucketNum directly affects write and scan concurrency; the last-level partition must be a bucket partition for primary key tables
col_name (identity)Creates one physical partition per distinct valueAnyUse concentrated values such as date, city, or gender; avoid discrete values like timestamps, which generate excessive metadata

Sizing `bucketNum`: Evaluate the total data volume before creating a table and choose bucketNum so that each bucket partition holds 50–512 MB of data. The bucket index is computed as bucket_index = hash(bucketCol) % bucketNum.

Avoiding data skew: Make sure that the bucketCol values are discrete.

Bucket-only partition examples

Example 1: Single-column primary key, bucket partition only:

USE lindorm_columnar;
CREATE TABLE mydb.mytable (
  id    INT    NOT NULL,
  city  STRING,
  name  STRING,
  score DOUBLE)
PARTITIONED BY (bucket(1024, id))
TBLPROPERTIES (
  'primary-key' = 'id');

Example 2: Composite primary key, bucket partition only:

USE lindorm_columnar;
CREATE TABLE mydb.mytable (
  id        INT  NOT NULL,
  timestamp LONG NOT NULL,
  city      STRING,
  name      STRING,
  score     DOUBLE)
PARTITIONED BY (bucket(512, timestamp))
TBLPROPERTIES (
  'primary-key' = 'id,timestamp');

Regular partition + bucket partition examples

Example 1: Date-based regular partition with bucket:

USE lindorm_columnar;
CREATE TABLE mydb.mytable (
  id    INT    NOT NULL,
  year  STRING NOT NULL,
  month STRING NOT NULL,
  day   STRING NOT NULL,
  city  STRING,
  name  STRING,
  score DOUBLE)
PARTITIONED BY (year, month, day, bucket(1024, id))
TBLPROPERTIES (
  'primary-key' = 'id,year,month,day');

Example 2: Multi-column regular partition with bucket:

USE lindorm_columnar;
CREATE TABLE mydb.mytable (
  id    INT    NOT NULL,
  date  STRING NOT NULL,
  city  STRING NOT NULL,
  name  STRING,
  score DOUBLE)
PARTITIONED BY (date, city, bucket(1024, id))
TBLPROPERTIES (
  'primary-key' = 'id,date,city');

View tables in the current namespace.

USE lindorm_columnar;
USE mydb;
SHOW TABLES;

View existing tables.

You can execute the following SQL statements to view the table schema.

USE lindorm_columnar;
SHOW CREATE TABLE mydb.mytable;
DESC mydb.mytable;

Delete a specific table.

USE lindorm_columnar;
-- Delete the table but keep data files.
DROP TABLE mydb.mytable;
-- Delete the table and all data files.
DROP TABLE mydb.mytable PURGE;

Clear the data in a table.

USE lindorm_columnar;
TRUNCATE TABLE mydb.mytable;

Partition

Delete a partition.

Use DELETE FROM with a WHERE clause to delete a partition:

USE lindorm_columnar;
DELETE FROM mydb.mytable WHERE city = 'beijing';

DML

Table

Insert data into a table.

  • Example 1:

    USE lindorm_columnar;
    INSERT INTO mydb.mytable VALUES (0, 'beijing', 'zhang3', 99);
  • Example 2:

    USE lindorm_columnar;
    INSERT INTO mydb.mytable SELECT id, city, name, score FROM another_table;

Query data in a table.

  • Example 1:

    USE lindorm_columnar;
    SELECT * FROM mydb.mytable WHERE id = 0;
  • Example 2:

    USE lindorm_columnar;
    SELECT count(1), sum(score) FROM mydb.mytable WHERE city = 'beijing';

Partition Management

Over time, incremental writes produce many small files. Compacting these files into larger ones reduces data and metadata redundancy and improves query performance. Use rewrite_data_files or rewrite_manifest to trigger compaction.

Important

Both commands consume database resources. Run them during off-peak hours.

Compact all files in a table:

USE lindorm_columnar;
CALL lindorm_columnar.system.rewrite_data_files(table => 'mydb.mytable');

Compact files in a specific partition:

USE lindorm_columnar;
CALL lindorm_columnar.system.rewrite_data_files(table => 'mydb.mytable', where => 'city=\"beijing\"');

Rewrite the table manifest:

USE lindorm_columnar;
CALL lindorm_columnar.system.rewrite_manifest('mydb.mytable');

For full command references, see rewrite_data_files and rewrite_manifest.

Hot and cold data storage

Lindorm supports three-level (L1, L2, L3) hot and cold data separation. Frequently accessed data stays in high-performance storage; older data is automatically moved to lower-cost storage. The data lake service handles dumping asynchronously—queries remain available during the process, though access performance varies across storage tiers.

Important

To enable automatic hot-cold conversion for column-oriented data, contact Lindorm technical support (DingTalk ID: s0s3eg3).

Hot and cold separation policies are based on time partitions only.

CHS parameters

Configure the cold/hot storage (CHS) attribute in TBLPROPERTIES when creating or altering a table.

ParameterDescriptionFormatDefault
CHSTime threshold (in seconds) that determines when data moves between storage levels. One value = L1/L2 split. Two values (num0, num1 where num0 < num1) = L1/L2/L3 split.Long integer(s), unit: seconds
CHS_L1Storage type for the L1 (hot) tier'CHS_L1'='storagetype=<storage type>'CAPACITY_CLOUD_STORAGE
CHS_L2Storage type for the L2 tier. Required when CHS is set.Same format as CHS_L1
CHS_L3Storage type for the L3 (coldest) tier. Required when CHS has two values.Same format as CHS_L1
CHS_EXPExpression that extracts the data time from a time partition field, used to compare against the CHS threshold.toSec(col0, pattern0, col1, pattern1, ...)

Supported storage types

For cloud storage: CAPACITY_CLOUD_STORAGE (default), STANDARD_CLOUD_STORAGE, PERFORMANCE_CLOUD_STORAGE, CLOUD_ARCHIVE_STORAGE

Archive storage (CLOUD_ARCHIVE_STORAGE) is in internal preview. Contact Lindorm technical support (DingTalk ID: s0s3eg3) to enable it.

For local disk: CAPACITY_CLOUD_STORAGE (default), LOCAL_SSD_STORAGE, LOCAL_HDD_STORAGE, LOCAL_EBS_STORAGE

`CHS_EXP` pattern tokens

Pattern tokenMeaning
yyyyYear
MMMonth
ddDay
HHHour
mmMinute

The toSec() function calculates the maximum data time for the matching partition. For example, for the partition year=2023, month=10, day=2:

  • toSec(year, yyyy) returns 2023-12-31 23:59:59

  • toSec(year, yyyy, month, MM) returns 2023-10-31 23:59:59

  • toSec(year, yyyy, month, MM, day, dd) returns 2023-10-02 23:59:59

For a single combined date field, for example date=2023-10-02:

  • toSec(date, yyyy-MM-dd) returns 2023-10-02 23:59:59

Examples

Example 1: Move data older than one month (2,592,000 seconds) to Capacity storage (L2):

CREATE TABLE table0 (col0 INT, year STRING, month STRING, day STRING)
PARTITIONED BY (year, month)
TBLPROPERTIES (
  'CHS'    = '2592000',
  'CHS_L2' = 'storagetype=CAPACITY_CLOUD_STORAGE',
  'CHS_EXP'= 'toSec(year,yyyy,month,MM,day,dd)'
);

Example 2: Update the policy on table0—move data older than one month to Capacity storage, and data older than three months (5,184,000 seconds) to archive storage:

ALTER TABLE table0
SET TBLPROPERTIES (
  'CHS'    = '2592000,5184000',
  'CHS_L2' = 'storagetype=CAPACITY_CLOUD_STORAGE',
  'CHS_L3' = 'storagetype=CLOUD_ARCHIVE_STORAGE',
  'CHS_EXP'= 'toSec(year,yyyy,month,MM,day,dd)'
);

Example 3: Create a table with a single dt partition field (format: yyyy/MM/dd, e.g., 2020/12/1). Move data older than one month to Capacity storage and data older than three months to archive storage:

CREATE TABLE table1 (col0 INT, dt STRING)
PARTITIONED BY (dt)
TBLPROPERTIES (
  'CHS'    = '2592000,5184000',
  'CHS_L2' = 'storagetype=CAPACITY_CLOUD_STORAGE',
  'CHS_L3' = 'storagetype=CLOUD_ARCHIVE_STORAGE',
  'CHS_EXP'= 'toSec(dt,yyyy/MM/dd)'
);

Usage notes

  • Set CHS when creating a table, or update it later with ALTER TABLE ... SET TBLPROPERTIES.

  • If CHS is configured incorrectly, the table is created or updated successfully, but data is not automatically moved between storage tiers.

  • Dumping runs asynchronously. Data remains accessible during the process, but access performance may vary depending on the storage tier.

  • Hot and cold separation is supported only for time-based partitions.

Best practices

Specify a primary key for point queries

Primary key lookups are the fastest query path. For large tables, specify a primary key and query within a narrow key range for the best performance.

Sample table:

USE lindorm_columnar;
CREATE TABLE orders (
  o_orderkey      INT    NOT NULL,
  o_custkey       INT,
  o_orderstatus   STRING,
  o_totalprice    DOUBLE,
  o_orderdate     STRING,
  o_orderpriority STRING,
  o_clerk         STRING,
  o_shippriority  INT,
  o_comment       STRING)
PARTITIONED BY (bucket(1024, o_orderkey))
TBLPROPERTIES (
  'primary-key' = 'o_orderkey');

Point lookup:

USE lindorm_columnar;
SELECT * FROM orders WHERE o_orderkey = 18394;

Range scan (narrow range performs better):

USE lindorm_columnar;
SELECT count(*) FROM orders WHERE o_orderkey > 100000 AND o_orderkey < 200000;

Add partitions to isolate data

Partitions in a column-oriented storage engine are physically isolated. Queries that filter on the partition key skip unrelated partitions entirely.

Sample table with a date partition:

USE lindorm_columnar;
CREATE TABLE orders (
  o_orderkey      INT    NOT NULL,
  o_custkey       INT,
  o_orderstatus   STRING,
  o_totalprice    DOUBLE,
  o_orderdate     STRING  NOT NULL,
  o_orderpriority STRING,
  o_clerk         STRING,
  o_shippriority  INT,
  o_comment       STRING)
PARTITIONED BY (o_orderdate, bucket(1024, o_orderkey))
TBLPROPERTIES (
  'primary-key' = 'o_orderdate,o_orderkey');

Query a single day's data:

USE lindorm_columnar;
SELECT o_orderdate, count(*) FROM orders WHERE o_orderdate = '2022-01-01' GROUP BY o_orderdate;

Query a date range:

USE lindorm_columnar;
SELECT o_orderdate, count(*)
FROM orders
WHERE o_orderdate >= '2022-01-01' AND o_orderdate <= '2022-01-07'
GROUP BY o_orderdate;

Improve scan performance with data compaction

After compaction with rewrite_data_files, data is sorted by primary key within each partition, which significantly improves scan performance. To query only the compacted data and skip incremental writes, set read.scan-major-rewritten-files-only to true.

Sample table:

CREATE TABLE mydb.mytable (
  id    INT    NOT NULL,
  city  STRING NOT NULL,
  name  STRING,
  score INT)
PARTITIONED BY (city, bucket(4, id))
TBLPROPERTIES (
  'primary-key' = 'id,city');

Compact the entire table:

CALL lindorm_columnar.system.rewrite_data_files(table => 'mydb.mytable');

Compact a specific partition:

CALL lindorm_columnar.system.rewrite_data_files(table => 'mydb.mytable', where => 'city=\"beijing\"');

Restrict queries to compacted data only:

ALTER TABLE mydb.mytable SET TBLPROPERTIES ('read.scan-major-rewritten-files-only' = true);
ValueBehavior
trueQueries only compacted data; incremental data is excluded
false (default)Queries all data, including incremental writes

Accelerate non-primary key queries with a sort key

After compaction, data is sorted by the primary key. To accelerate queries on non-primary key columns, specify a sort key and then recompact the data.

Important

After you specify the sort key, you must rewrite data in a partition to ensure the acceleration performance. Only the rewritten data can be queried and the incremental data cannot be queried.

Sample table:

USE lindorm_columnar;
CREATE TABLE orders (
  o_orderkey      INT    NOT NULL,
  o_custkey       INT,
  o_orderstatus   STRING,
  o_totalprice    DOUBLE,
  o_orderdate     STRING,
  o_orderpriority STRING,
  o_clerk         STRING,
  o_shippriority  INT,
  o_comment       STRING)
PARTITIONED BY (bucket(1024, o_orderkey))
TBLPROPERTIES (
  'primary-key'                          = 'o_orderkey',
  'read.scan-major-rewritten-files-only' = 'true');

Set the sort key:

ALTER TABLE orders WRITE ORDERED BY o_shippriority, o_totalprice;

Compact to apply the sort order:

CALL lindorm_columnar.system.rewrite_data_files(table => 'orders');

Query using the sort key:

USE lindorm_columnar;
SELECT count(*) FROM orders WHERE o_shippriority = 0;
USE lindorm_columnar;
SELECT count(*) FROM orders WHERE o_shippriority = 0 AND o_totalprice > 999.9;