在金融、物流與物聯網等業務情境中,系統會產生海量的時序資料,例如交易流水、軌跡資料和監控日誌。對這些TB層級的資料進行即時分析,往往面臨效能挑戰。PolarDB PostgreSQL版憑藉分區表、冷熱分層等特性,為儲存海量時序資料提供了高性價比的解決方案。在此基礎上,您可以通過列存索引(IMCI)功能,無需進行複雜的資料預先處理,即可實現對海量時序資料的即時、高效能分析,有效挖掘資料價值。
方案介紹
操作流程
資料寫入:業務應用將時序資料(例如交易流水)寫入PolarDB PostgreSQL版叢集。
列存索引:在基表上建立列存索引。PolarDB PostgreSQL版會自動為維護表中列存資料。相比行存,列存以列為單位組織資料,具備較高的壓縮比,在執行彙總查詢時僅需讀取相關列,從而減少I/O消耗。
查詢加速:分析查詢(如K線彙總)通過最佳化器或
Hint指定,優先使用列存索引。查詢引擎利用列式儲存和平行處理能力,完成對資料的掃描和彙總計算,最終返回結果。
方案優勢
使用簡單:無需改造業務或進行複雜的ETL,僅需為基表建立列存索引,即可透明加速分析查詢。
功能豐富:原生支援分區表,並內建
time_bucket、first、last等豐富的時序分析函數,簡化您的SQL開發。
效果展示
資料量:1 億條資料,時間跨度2天,每天約5000萬條。
K線彙總查詢:含指定時間周期內最高值、最低值、開盤價、收盤價、交易總量等5個指標。
列存索引並行度:8。
耗時如下(單位:秒):
情境
秒級K線彙總
分鐘級K線彙總
小時級K線彙總
天級K線彙總
全量資料彙總(1億條)
3.41
0.95
0.93
0.91
1天資料彙總(約5000萬條)
1.88
0.82
0.81
0.76
12小時資料彙總(約2500萬條)
0.89
0.55
0.53
無
1小時資料彙總(約600萬條)
0.41
0.39
0.37
無
實施步驟
步驟一:環境準備
請確認您的叢集版本與配置是否滿足以下條件:
叢集版本:
PostgreSQL 16(核心小版本為2.0.16.8.3.0及以上)
PostgreSQL 14(核心小版本為2.0.14.10.20.0及以上)
原表必須包含主鍵,且在建立列存索引時需要將主鍵列加入列存索引中。
wal_level參數的值需設定為logical,即在預寫式日誌WAL(Write-Ahead Logging)中增加支援邏輯編碼所需的資訊。說明您可以通過控制台設定wal_level參數。修改該參數後叢集將會重啟,請在修改參數前做好業務安排,謹慎操作。
開啟列存索引功能。
對於不同的PolarDB PostgreSQL版核心版本,開啟列存索引的方式不同:
步驟二:資料準備
本方案將準備一張交易流水表,類比產生1億條交易資料,時間範圍約為2天。在此期間,交易時間為每天的8:00至16:00,預計每日資料量約為4000萬條。
-- 交易流水表 CREATE TABLE market_trades ( trade_id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- 自增主鍵 trade_ts TIMESTAMP, -- 交易時間 market_id VARCHAR, -- 市場編號 price DECIMAL, -- 交易價格 amount DECIMAL, -- 交易數量 insert_ts TIMESTAMP -- 系統的寫入時間 ); INSERT INTO market_trades(trade_ts, market_id, price, amount, insert_ts) SELECT trade_ts, market_id, price, amount, trade_ts + (random() * 500)::INT * INTERVAL '1 millisecond' AS insert_ts FROM ( -- ======================== -- 1. 第一天高峰:2025-06-01 8:00 - 16:00,4000萬條 -- ======================== SELECT '2025-06-01 08:00:00'::TIMESTAMP + (random() * 28800)::INT * INTERVAL '1 second' + -- 28800秒 = 8小時 (random() * 1000)::INT * INTERVAL '1 millisecond' AS trade_ts, CASE WHEN random() < 0.6 THEN 'BTC-USDT' ELSE 'ETH-USDT' END AS market_id, CASE WHEN random() < 0.6 THEN 30000 + (random() * 1000) ELSE 2000 + (random() * 100) END AS price, random() * 10 + 0.1 AS amount FROM generate_series(1, 40000000) UNION ALL -- ======================== -- 2. 第一天非高峰:2025-06-01 16:00 - 2025-06-02 08:00,1000萬條 -- ======================== SELECT CASE WHEN random() < 0.5 THEN -- 16:00 - 24:00 '2025-06-01 16:00:00'::TIMESTAMP + (random() * 28800)::INT * INTERVAL '1 second' ELSE -- 00:00 - 08:00(第二天淩晨) '2025-06-02 00:00:00'::TIMESTAMP + (random() * 28800)::INT * INTERVAL '1 second' END + (random() * 1000)::INT * INTERVAL '1 millisecond' AS trade_ts, CASE WHEN random() < 0.6 THEN 'BTC-USDT' ELSE 'ETH-USDT' END AS market_id, CASE WHEN random() < 0.6 THEN 30000 + (random() * 1000) ELSE 2000 + (random() * 100) END AS price, random() * 10 + 0.1 AS amount FROM generate_series(1, 10000000) UNION ALL -- ======================== -- 3. 第二天高峰:2025-06-02 8:00 - 16:00,4000萬條 -- ======================== SELECT '2025-06-02 08:00:00'::TIMESTAMP + (random() * 28800)::INT * INTERVAL '1 second' + (random() * 1000)::INT * INTERVAL '1 millisecond' AS trade_ts, CASE WHEN random() < 0.6 THEN 'BTC-USDT' ELSE 'ETH-USDT' END AS market_id, CASE WHEN random() < 0.6 THEN 30000 + (random() * 1000) ELSE 2000 + (random() * 100) END AS price, random() * 10 + 0.1 AS amount FROM generate_series(1, 40000000) UNION ALL -- ======================== -- 4. 第二天非高峰:2025-06-02 16:00 - 2025-06-03 08:00,1000萬條 -- ======================== SELECT CASE WHEN random() < 0.5 THEN -- 16:00 - 24:00 '2025-06-02 16:00:00'::TIMESTAMP + (random() * 28800)::INT * INTERVAL '1 second' ELSE -- 00:00 - 08:00(第三天淩晨) '2025-06-03 00:00:00'::TIMESTAMP + (random() * 28800)::INT * INTERVAL '1 second' END + (random() * 1000)::INT * INTERVAL '1 millisecond' AS trade_ts, CASE WHEN random() < 0.6 THEN 'BTC-USDT' ELSE 'ETH-USDT' END AS market_id, CASE WHEN random() < 0.6 THEN 30000 + (random() * 1000) ELSE 2000 + (random() * 100) END AS price, random() * 10 + 0.1 AS amount FROM generate_series(1, 10000000) ) AS data;為交易流水表建立列存索引。
CREATE INDEX idx_csi_market_trades ON market_trades USING CSI;
步驟三:執行K線彙總查詢
使用情境:計算每個固定時間視窗的K線。
應用舉例:計算每秒鐘的股價最高值、最低值、開盤價、收盤價、交易總量。
以下樣本分別計算每秒、每分鐘、每小時、每天的K線資料。
秒級K線彙總
-- 秒級K線彙總
/*+ SET (polar_csi.enable_query on) */
SELECT
time_bucket('1 second', trade_ts) AS candle_ts, -- 1秒內的資料
market_id,
MIN(price) AS low, -- 1秒內最低價
MAX(price) AS high, -- 1秒內最高價
FIRST(price ORDER BY trade_ts) AS open, -- 1秒內開盤價
LAST(price ORDER BY trade_ts) AS close, -- 1秒內收盤價
SUM(amount) AS vol -- 1秒內交易總量
FROM market_trades
WHERE trade_ts >= '2025-06-01 00:00:00' AND trade_ts <= '2025-06-02 00:00:00'
GROUP BY candle_ts, market_id
ORDER BY candle_ts, market_id;分鐘級K線彙總
-- 分鐘級K線彙總
/*+ SET (polar_csi.enable_query on) */
SELECT
time_bucket('1 minute', trade_ts) AS candle_ts, -- 1分鐘內的資料
market_id,
MIN(price) AS low, -- 1分鐘內最低價
MAX(price) AS high, -- 1分鐘內最高價
FIRST(price ORDER BY trade_ts) AS open, -- 1分鐘內開盤價
LAST(price ORDER BY trade_ts) AS close, -- 1分鐘內收盤價
SUM(amount) AS vol -- 1分鐘內交易總量
FROM market_trades
WHERE trade_ts >= '2025-06-01 00:00:00' AND trade_ts <= '2025-06-02 00:00:00'
GROUP BY candle_ts, market_id
ORDER BY candle_ts, market_id;小時級K線彙總
-- 小時級K線彙總
/*+ SET (polar_csi.enable_query on) */
SELECT
time_bucket('1 hour', trade_ts) AS candle_ts, -- 1小時內的資料
market_id,
MIN(price) AS low, -- 1小時內最低價
MAX(price) AS high, -- 1小時內最高價
FIRST(price ORDER BY trade_ts) AS open, -- 1小時內開盤價
LAST(price ORDER BY trade_ts) AS close, -- 1小時內收盤價
SUM(amount) AS vol -- 1小時內交易總量
FROM market_trades
WHERE trade_ts >= '2025-06-01 00:00:00' AND trade_ts <= '2025-06-02 00:00:00'
GROUP BY candle_ts, market_id
ORDER BY candle_ts, market_id;天級K線彙總
-- 天級K線彙總
/*+ SET (polar_csi.enable_query on) */
SELECT
time_bucket('1 day', trade_ts) AS candle_ts, -- 1天內的資料
market_id,
MIN(price) AS low, -- 1天內最低價
MAX(price) AS high, -- 1天內最高價
FIRST(price ORDER BY trade_ts) AS open, -- 1天內開盤價
LAST(price ORDER BY trade_ts) AS close, -- 1天內收盤價
SUM(amount) AS vol -- 1天內交易總量
FROM market_trades
WHERE trade_ts >= '2025-06-01 00:00:00' AND trade_ts <= '2025-06-02 00:00:00'
GROUP BY candle_ts, market_id
ORDER BY candle_ts, market_id;SQL說明
/*+ SET (polar_csi.enable_query on) */:用於強制查詢使用列存索引執行計畫。在某些情境下,最佳化器可能誤判行存更優,此時可使用此Hint確保查詢走列存路徑。time_bucket(bucket_width, ts):時序資料庫提供的函數,用於將時間戳記ts按指定的時間間隔bucket_width進行分組。





