Lindorm column-oriented storage is a columnar distributed storage service that stores and processes data by column. Compared with row-oriented storage, it reduces query response time and saves I/O resources. Use the Lindorm compute engine to create namespaces and tables, read and write data, compact files, and configure tiered storage.
Overview
Column-oriented storage is suited for large amounts of semi-structured and structured data in scenarios such as Internet of Vehicles (IoV), Internet of Things (IoT), orders, and logs. Key capabilities include:
Computing and analytics: Perform interactive analytics and online computing on column-oriented data. Rich indexing capabilities accelerate data positioning. Add, remove, modify, and query large amounts of primary key data using SQL statements.
High throughput: Supports horizontal scaling with the ability to read and write terabytes of data per minute—suited for quick IoV data import, model training dataset access, and large-scale report analysis.
Cost-effectiveness: Uses columnar high compression ratio algorithms, high-density low-cost media, hot and cold data separation, compression encoding, and cold data archive storage to reduce storage costs.
High availability: Uses erasure coding to ensure the high availability of distributed datasets and eliminate the risk of single points of failure (SPOFs).
Open source compatibility: Compatible with the Iceberg API and connects to compute engines such as Spark and Flink for seamless integration into mainstream data ecosystems.
Hot and cold data separation: Store hot and cold data on different storage media based on your business requirements to reduce performance overhead and lower storage costs.
Prerequisites
Before you begin, ensure that you have:
Read Precautions
Completed the connection setup for your job mode:
JDBC jobs: Use JDBC in application development
JAR jobs: Create a job in Java
Python jobs: Create a job in Python
(Optional) If you plan to use hot and cold data separation: Enable Capacity storage
DDL
Namespace
Table
Partition
DML
Table
Partition Management
Over time, incremental writes produce many small files. Compacting these files into larger ones reduces data and metadata redundancy and improves query performance. Use rewrite_data_files or rewrite_manifest to trigger compaction.
Both commands consume database resources. Run them during off-peak hours.
Compact all files in a table:
USE lindorm_columnar;
CALL lindorm_columnar.system.rewrite_data_files(table => 'mydb.mytable');Compact files in a specific partition:
USE lindorm_columnar;
CALL lindorm_columnar.system.rewrite_data_files(table => 'mydb.mytable', where => 'city=\"beijing\"');Rewrite the table manifest:
USE lindorm_columnar;
CALL lindorm_columnar.system.rewrite_manifest('mydb.mytable');For full command references, see rewrite_data_files and rewrite_manifest.
Hot and cold data storage
Lindorm supports three-level (L1, L2, L3) hot and cold data separation. Frequently accessed data stays in high-performance storage; older data is automatically moved to lower-cost storage. The data lake service handles dumping asynchronously—queries remain available during the process, though access performance varies across storage tiers.
To enable automatic hot-cold conversion for column-oriented data, contact Lindorm technical support (DingTalk ID: s0s3eg3).
Hot and cold separation policies are based on time partitions only.
CHS parameters
Configure the cold/hot storage (CHS) attribute in TBLPROPERTIES when creating or altering a table.
| Parameter | Description | Format | Default |
|---|---|---|---|
CHS | Time threshold (in seconds) that determines when data moves between storage levels. One value = L1/L2 split. Two values (num0, num1 where num0 < num1) = L1/L2/L3 split. | Long integer(s), unit: seconds | — |
CHS_L1 | Storage type for the L1 (hot) tier | 'CHS_L1'='storagetype=<storage type>' | CAPACITY_CLOUD_STORAGE |
CHS_L2 | Storage type for the L2 tier. Required when CHS is set. | Same format as CHS_L1 | — |
CHS_L3 | Storage type for the L3 (coldest) tier. Required when CHS has two values. | Same format as CHS_L1 | — |
CHS_EXP | Expression that extracts the data time from a time partition field, used to compare against the CHS threshold. | toSec(col0, pattern0, col1, pattern1, ...) | — |
Supported storage types
For cloud storage: CAPACITY_CLOUD_STORAGE (default), STANDARD_CLOUD_STORAGE, PERFORMANCE_CLOUD_STORAGE, CLOUD_ARCHIVE_STORAGE
Archive storage (CLOUD_ARCHIVE_STORAGE) is in internal preview. Contact Lindorm technical support (DingTalk ID: s0s3eg3) to enable it.For local disk: CAPACITY_CLOUD_STORAGE (default), LOCAL_SSD_STORAGE, LOCAL_HDD_STORAGE, LOCAL_EBS_STORAGE
`CHS_EXP` pattern tokens
| Pattern token | Meaning |
|---|---|
yyyy | Year |
MM | Month |
dd | Day |
HH | Hour |
mm | Minute |
The toSec() function calculates the maximum data time for the matching partition. For example, for the partition year=2023, month=10, day=2:
toSec(year, yyyy)returns2023-12-31 23:59:59toSec(year, yyyy, month, MM)returns2023-10-31 23:59:59toSec(year, yyyy, month, MM, day, dd)returns2023-10-02 23:59:59
For a single combined date field, for example date=2023-10-02:
toSec(date, yyyy-MM-dd)returns2023-10-02 23:59:59
Examples
Example 1: Move data older than one month (2,592,000 seconds) to Capacity storage (L2):
CREATE TABLE table0 (col0 INT, year STRING, month STRING, day STRING)
PARTITIONED BY (year, month)
TBLPROPERTIES (
'CHS' = '2592000',
'CHS_L2' = 'storagetype=CAPACITY_CLOUD_STORAGE',
'CHS_EXP'= 'toSec(year,yyyy,month,MM,day,dd)'
);Example 2: Update the policy on table0—move data older than one month to Capacity storage, and data older than three months (5,184,000 seconds) to archive storage:
ALTER TABLE table0
SET TBLPROPERTIES (
'CHS' = '2592000,5184000',
'CHS_L2' = 'storagetype=CAPACITY_CLOUD_STORAGE',
'CHS_L3' = 'storagetype=CLOUD_ARCHIVE_STORAGE',
'CHS_EXP'= 'toSec(year,yyyy,month,MM,day,dd)'
);Example 3: Create a table with a single dt partition field (format: yyyy/MM/dd, e.g., 2020/12/1). Move data older than one month to Capacity storage and data older than three months to archive storage:
CREATE TABLE table1 (col0 INT, dt STRING)
PARTITIONED BY (dt)
TBLPROPERTIES (
'CHS' = '2592000,5184000',
'CHS_L2' = 'storagetype=CAPACITY_CLOUD_STORAGE',
'CHS_L3' = 'storagetype=CLOUD_ARCHIVE_STORAGE',
'CHS_EXP'= 'toSec(dt,yyyy/MM/dd)'
);Usage notes
Set
CHSwhen creating a table, or update it later withALTER TABLE ... SET TBLPROPERTIES.If
CHSis configured incorrectly, the table is created or updated successfully, but data is not automatically moved between storage tiers.Dumping runs asynchronously. Data remains accessible during the process, but access performance may vary depending on the storage tier.
Hot and cold separation is supported only for time-based partitions.
Best practices
Specify a primary key for point queries
Primary key lookups are the fastest query path. For large tables, specify a primary key and query within a narrow key range for the best performance.
Sample table:
USE lindorm_columnar;
CREATE TABLE orders (
o_orderkey INT NOT NULL,
o_custkey INT,
o_orderstatus STRING,
o_totalprice DOUBLE,
o_orderdate STRING,
o_orderpriority STRING,
o_clerk STRING,
o_shippriority INT,
o_comment STRING)
PARTITIONED BY (bucket(1024, o_orderkey))
TBLPROPERTIES (
'primary-key' = 'o_orderkey');Point lookup:
USE lindorm_columnar;
SELECT * FROM orders WHERE o_orderkey = 18394;Range scan (narrow range performs better):
USE lindorm_columnar;
SELECT count(*) FROM orders WHERE o_orderkey > 100000 AND o_orderkey < 200000;Add partitions to isolate data
Partitions in a column-oriented storage engine are physically isolated. Queries that filter on the partition key skip unrelated partitions entirely.
Sample table with a date partition:
USE lindorm_columnar;
CREATE TABLE orders (
o_orderkey INT NOT NULL,
o_custkey INT,
o_orderstatus STRING,
o_totalprice DOUBLE,
o_orderdate STRING NOT NULL,
o_orderpriority STRING,
o_clerk STRING,
o_shippriority INT,
o_comment STRING)
PARTITIONED BY (o_orderdate, bucket(1024, o_orderkey))
TBLPROPERTIES (
'primary-key' = 'o_orderdate,o_orderkey');Query a single day's data:
USE lindorm_columnar;
SELECT o_orderdate, count(*) FROM orders WHERE o_orderdate = '2022-01-01' GROUP BY o_orderdate;Query a date range:
USE lindorm_columnar;
SELECT o_orderdate, count(*)
FROM orders
WHERE o_orderdate >= '2022-01-01' AND o_orderdate <= '2022-01-07'
GROUP BY o_orderdate;Improve scan performance with data compaction
After compaction with rewrite_data_files, data is sorted by primary key within each partition, which significantly improves scan performance. To query only the compacted data and skip incremental writes, set read.scan-major-rewritten-files-only to true.
Sample table:
CREATE TABLE mydb.mytable (
id INT NOT NULL,
city STRING NOT NULL,
name STRING,
score INT)
PARTITIONED BY (city, bucket(4, id))
TBLPROPERTIES (
'primary-key' = 'id,city');Compact the entire table:
CALL lindorm_columnar.system.rewrite_data_files(table => 'mydb.mytable');Compact a specific partition:
CALL lindorm_columnar.system.rewrite_data_files(table => 'mydb.mytable', where => 'city=\"beijing\"');Restrict queries to compacted data only:
ALTER TABLE mydb.mytable SET TBLPROPERTIES ('read.scan-major-rewritten-files-only' = true);| Value | Behavior |
|---|---|
true | Queries only compacted data; incremental data is excluded |
false (default) | Queries all data, including incremental writes |
Accelerate non-primary key queries with a sort key
After compaction, data is sorted by the primary key. To accelerate queries on non-primary key columns, specify a sort key and then recompact the data.
After you specify the sort key, you must rewrite data in a partition to ensure the acceleration performance. Only the rewritten data can be queried and the incremental data cannot be queried.
Sample table:
USE lindorm_columnar;
CREATE TABLE orders (
o_orderkey INT NOT NULL,
o_custkey INT,
o_orderstatus STRING,
o_totalprice DOUBLE,
o_orderdate STRING,
o_orderpriority STRING,
o_clerk STRING,
o_shippriority INT,
o_comment STRING)
PARTITIONED BY (bucket(1024, o_orderkey))
TBLPROPERTIES (
'primary-key' = 'o_orderkey',
'read.scan-major-rewritten-files-only' = 'true');Set the sort key:
ALTER TABLE orders WRITE ORDERED BY o_shippriority, o_totalprice;Compact to apply the sort order:
CALL lindorm_columnar.system.rewrite_data_files(table => 'orders');Query using the sort key:
USE lindorm_columnar;
SELECT count(*) FROM orders WHERE o_shippriority = 0;USE lindorm_columnar;
SELECT count(*) FROM orders WHERE o_shippriority = 0 AND o_totalprice > 999.9;