全部产品
Search
文档中心

实时数仓Hologres:通过临时存储Stage近实时导入

更新时间:Mar 05, 2026

Hologres V4.1 起支持基于临时存储 Stage 的近实时导入,在分钟级延迟下平衡吞吐与资源。本文介绍 Stage 能力、管理操作(创建/删除/查询)、使用方式(写入 Stage、从 Stage 写入内表)及 Arrow 类型映射。

概述

Hologres 目前提供两种主要的数据导入模式,分别适用于不同业务场景:

  • Fixed Plan 实时流式写入:面向数据流式写入场景。端到端延迟低,通常在毫秒级,可满足强实时性要求,但资源开销相对较大。

  • Bulkload 离线批量导入:面向高吞吐、大规模数据加载场景,如离线数仓 T+1 数据同步。端到端延迟高,仅能满足离线业务需求。

若业务存在分钟级“近实时”需求,离线批量导入难以满足时效性;实时流式写入虽可满足,但会带来较大写入资源开销。Hologres V4.1 版本起,基于临时存储 Stage 实现“近实时”数据导入能力,可有效平衡时效性与资源开销。

Stage 介绍

Stage 是 Hologres 内部的高性能临时存储,具有以下核心特性:

  • 内部托管:由 Hologres 自动管理,无需用户操作外部存储。

  • 结构化缓存:按目标表的 Schema 组织数据,确保类型和格式一致性。

  • 事务安全:支持 ACID 语义,保障写入过程的可靠性与一致性。

  • 自动合并提交:Spark 等 Connector 支持自动将临时存储以优化批次方式写入主存储,提升效率。

通过以下流程实现 Stage 近实时导入:

  • 外部数据写入 Stage。

  • Stage 数据攒批写入内表。

  • 内表数据写入即可见。

Stage 优势

  • 平衡延迟与吞吐:数据从写入到可查控制在分钟级,显著优于离线批量导入,同时支持远高于实时流式写入的吞吐能力。

  • 简化数据链路:数据直接写入 Hologres 内部 Stage,无外部系统依赖;通过标准 SQL 的 COPY、INSERT 语法及官方 Connector 适配。

  • 提升资源效率:Stage 小文件自动合并,提升后续读写性能;支持 Serverless Computing、独立计算组实现 Stage 写入资源隔离,提升系统稳定性。

  • 适用场景广泛:Spark Connector、Flink Connector、Holo Client 均支持 Stage 近实时写入模式。

限制条件

仅支持 Arrow 格式通过 Stage 近实时导入 Hologres。Arrow 格式与 Hologres 的数据类型映射参见数据类型映射

权限说明

  1. SPM 或 SLPM

    • 创建 Stage:需具备 writer 及以上权限,或 Superuser。

    • 写入/删除/读取 Stage:需是对应 Stage 的创建者且具备 writer 及以上权限,或 Superuser。

  2. 专家权限模型

    • 创建 Stage:需具备 pg_operate_internal_stages 角色权限,或 Superuser。

    • 写入/删除/读取 Stage:需是对应 Stage 的创建者且具备 pg_operate_internal_stages 角色权限,或 Superuser。

通过如下命令授予用户 pg_operate_internal_stages 角色权限:

-- 授予指定用户 Stage 操作权限
-- 可替换变量:<user_name> 替换为实际用户名(如 RAM 子账号名)
GRANT pg_operate_internal_stages TO "<user_name>";

管理 Stage

创建 Stage

命令格式:

-- 创建 Internal Stage
-- 可替换变量:
--   <internal_stage_name>:Stage 名称,必填,最大 128 字符
--   <group_name>:分组名,可选,默认与 internal_stage_name 相同
--   <ttl_in_seconds>:生命周期(秒),可选,默认 7200,最大 864000
CALL HOLOGRES.HG_CREATE_INTERNAL_STAGE(
  '<internal_stage_name>', 
  ['<group_name>'],
  ['<ttl_in_seconds>']
);

参数说明:

参数名

是否必填

说明

internal_stage_name

必填

Internal Stage 名称。支持字母(大小写敏感)、数字、下划线、连字符,最大长度 128 字符。

group_name

非必填

Stage 所属分组,用于分类管理。默认同 internal_stage_name。

ttl_in_seconds

非必填

Stage 生命周期,单位秒,从 last_modified_time(可从系统表 hologres.hg_internal_stages 查看)起计。默认 7200(2 小时),最大 864000(10 天)。到达生命周期后系统自动异步清理。

删除 Stage

命令格式:

-- 删除指定 Internal Stage
-- 可替换变量:<internal_stage_name> 替换为要删除的 Stage 名称
CALL HOLOGRES.HG_DROP_INTERNAL_STAGE(
    '<internal_stage_name>'
);

internal_stage_name:必填。Internal Stage 名称。

查询 Stage 状态

可通过系统视图 hologres.hg_internal_stages 查询 Stage 状态:

-- 查询全部 Stage 或按名称过滤
-- 可替换变量:<internal_stage_name> 替换为 Stage 名称时仅查询该 Stage;省略 WHERE 则查询全部
SELECT * FROM hologres.hg_internal_stages 
[WHERE stage_name = '<internal_stage_name>'];

包含如下字段:

字段名

含义

stage_name

Stage 名称

group_name

Stage 所属分组

ttl_in_seconds

Stage 生命周期(秒)

create_time

Stage 创建时间

create_user

创建 Stage 的用户

create_application_name

创建 Stage 的应用程序

create_session_id

创建 Stage 的会话 ID

last_modified_time

Stage 最后一次更新时间

stage_bytes

Stage 存储大小(Byte)

file_count

Stage 文件数

查询 Stage 文件

可通过系统视图 hologres.hg_internal_stage_files 查询 Stage 文件:

-- 查询 Stage 下文件列表,可按名称模糊过滤
-- 可替换变量:<internal_stage_name> Stage 名称;<pattern%> 文件名模糊匹配(如 'batch_%')
SELECT * FROM hologres.hg_internal_stage_files 
[WHERE stage_name = '<internal_stage_name>'] 
  [AND file_name like '<pattern%>'];

包含如下字段:

字段名

含义

stage_name

Stage 名称

file_name

文件名称

file_size

文件存储大小(Byte)

last_modified_time

文件最后一次更新时间

is_complete

文件是否写入成功。True 表示写入成功,False 表示正在写入或写入失败

删除 Stage 文件

可通过系统函数 hologres.hg_remove_internal_stage_file 删除 Stage 文件:

-- 删除指定文件
-- 可替换变量:<stage_name> Stage 名称;<file_name> 要删除的文件名
SELECT hologres.hg_remove_internal_stage_file ('<stage_name>', '<file_name>');

-- 批量删除 Stage 文件
-- 可替换变量:<stage_name> Stage 名称;<glob_pattern> 文件名通配(如 '*.arrow');<pattern%> WHERE 中的文件名模糊匹配
SELECT
    stage_name,
    file_name,
    hologres.hg_remove_internal_stage_file (stage_name, file_name) AS hg_remove_internal_stage_file
FROM
    hologres.hg_list_internal_stage_files ('<stage_name>',['<glob_pattern>']) 
[WHERE file_name like '<pattern%>'];

使用 Stage

客户端写入 Stage

命令格式:

-- 将客户端数据流写入 Stage 指定文件
-- 可替换变量:<internal_stage_name> 已创建的 Stage 名称;<file_name> 写入的文件名(支持字母、数字、下划线、连字符、英文句号,最大 128 字符)
COPY EXTERNAL_FILES(
  path = 'internal_stage://<internal_stage_name>/<file_name>'
) FROM STDIN;

通过 COPY 命令写入 Stage 时不支持配置 COPY 语法中的 WITH 参数。通过 path 参数定义 Stage 文件路径,支持字母(大小写敏感)、数字、下划线、连字符、英文句号,最大长度 128 字符。

读取 Stage 写入内表

命令格式:

-- 从 Stage 读取 Arrow 文件并写入内表
-- 可替换变量:
--   <table_name> 目标内表名
--   <col_name> 列名(可选,指定时与目标表列一一对应)
--   <internal_stage_name> Stage 名称,可写多个用逗号分隔以读取多个 Stage
--   <col_type> 列类型(可选,仅在 AS 子句中指定列类型时使用)
INSERT INTO <table_name> [ ( <col_name> [ , <col_name> ... ] ) ]
SELECT *
FROM EXTERNAL_FILES(
  path = 'internal_stage://<internal_stage_name>, internal_stage://<internal_stage_name>',
  format = arrow
)
[AS ( <col_name> <col_type>[ , <col_name> <col_type> ... ] )];

读取 Stage 仅支持 Arrow 格式文件。

使用示例

通过 Holo-client 写入 Stage

以下示例演示如何使用 Holo-client 通过 Stage 进行近实时写入。

Maven 依赖:

<dependency>
  <groupId>com.alibaba.hologres</groupId>
  <artifactId>holo-client</artifactId>
  <version>2.7.0</version>
</dependency>

示例代码:

import com.alibaba.hologres.client.HoloClient;
import com.alibaba.hologres.client.HoloConfig;
import com.alibaba.hologres.client.Put;
import com.alibaba.hologres.client.copy.CopyUtil;
import com.alibaba.hologres.client.copy.in.CopyInStageWrapper;
import com.alibaba.hologres.client.copy.in.arrow.RecordArrowWriter;
import com.alibaba.hologres.client.model.OnConflictAction;
import com.alibaba.hologres.client.model.TableSchema;

import java.sql.Connection;
import java.sql.DriverManager;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class CopyStageDemo {

    public static void main(String[] args) throws Exception {
        // 注意:jdbcUrl需使用jdbc:hologres协议
        String jdbcUrl = "jdbc:hologres://host:port/db";
        String username = "";
        String password = "";

        /*
        CREATE TABLE copy_stage_demo (id INT NOT NULL, name TEXT NOT NULL, address TEXT, PRIMARY KEY(id));
        */
        String tableName = "copy_stage_demo";

        HoloConfig config = new HoloConfig();
        config.setJdbcUrl(jdbcUrl);
        config.setUsername(username);
        config.setPassword(password);
        config.setRegion("local");

        // 创建临时Stage名称
        String stageName = "temp_stage_" + System.currentTimeMillis();

        try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password);
             HoloClient client = new HoloClient(config)) {

            // 创建内部Stage
            String createStageSql =
                    "call hologres.hg_create_internal_stage('"
                            + stageName
                            + "', 'default_group', 7200);";
            try (java.sql.Statement stmt = conn.createStatement()) {
                stmt.execute(createStageSql);
            }

            // 获取表结构
            TableSchema schema = client.getTableSchema(tableName);

            // 定义要写入的列
            List<String> columns = new ArrayList<>();
            columns.add("id");
            columns.add("name");
            columns.add("address");

            // 使用RecordArrowWriter和CopyInStageWrapper写入数据到Stage
            try (RecordArrowWriter arrowWriter =
                         new RecordArrowWriter(
                                 schema,
                                 columns,
                                 8192 // maxBatchSize,每1024行数据组成一个Arrow RecordBatch
                         );
                 CopyInStageWrapper<com.alibaba.hologres.client.model.Record> copyInStage =
                         new CopyInStageWrapper<>(
                                 config,
                                 stageName,
                                 "data_file", // 文件名前缀
                                 arrowWriter,
                                 64 * 1024 * 1024 // fileSizeLimit,每个文件大小64 MB
                         )) {

                // 写入10条数据
                for (int i = 0; i < 10; ++i) {
                    Put put = new Put(schema);
                    // 和CopyInStageWrapper的columns保持一致
                    put.setObject("id", i);
                    put.setObject("name", "name" + i);
                    put.setObject("address", "address" + i);

                    copyInStage.putRecord(put.getRecord());
                }
                // 程序结束之前需要调用close,保证数据完全写入
                // demo使用了try-with-resources,无需手动close
                // copyInStage.close();
            }

            // 生成从Stage写入目标表的INSERT语句
            String insertSql =
                    CopyUtil.buildInsertTableSelectFromStageSql(
                            schema,
                            columns,
                            Collections.singletonList(stageName),
                            OnConflictAction.INSERT_OR_UPDATE);

            try (java.sql.Statement stmt = conn.createStatement()) {
                stmt.execute(insertSql);
            }
            // 验证写入结果
            try (java.sql.Statement stmt = conn.createStatement()) {
                try (java.sql.ResultSet rs = stmt.executeQuery("select * from " + tableName)) {
                    while (rs.next()) {
                        System.out.println(
                                "id: "
                                        + rs.getInt(1)
                                        + ", name: "
                                        + rs.getString(2)
                                        + ", address: "
                                        + rs.getString(3));
                    }
                }
            }
        } finally {
            // 清理临时Stage,不清理的话也会根据TTL自动清理
            try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password);
                 java.sql.Statement stmt = conn.createStatement()) {
                String dropStageSql = "call hologres.hg_drop_internal_stage('" + stageName + "');";
                stmt.execute(dropStageSql);
            } catch (Exception e) {
                System.err.println("清理Stage时出错: " + e.getMessage());
            }
        }
    }
}

数据类型映射

通过 Stage 近实时导入时,Arrow 格式与 Hologres 数据类型对应关系如下。使用 Holo-client 进行写入时,已自动进行数据类型转换。

Hologres 数据类型

Arrow 数据类型

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

BOOLEAN

UINT8、BIT

REAL(FLOAT4)

FLOAT4

DOUBLE PRECISION(FLOAT8)

FLOAT8

DATE

DateDay

TIMETZ

FixedSizeBinary

TIME

TimeMicro

TIMESTAMP WITHOUT TIME ZONE

TimeStampMicro

TIMESTAMP WITH TIME ZONE

DateMilli

TEXT

VarChar

CHAR(n)

VARCHAR(n)

JSON

JSONB

BYTEA

VarBINARY

roaringbitmap

NUMERIC(m,n)

DECIMAL(m,n)

ARRAY(支持 INT、BIGINT、FLOAT、BOOLEAN、DOUBLE、STRING)

ARRAY<type>