Hologres V4.1 起支持基于临时存储 Stage 的近实时导入,在分钟级延迟下平衡吞吐与资源。本文介绍 Stage 能力、管理操作(创建/删除/查询)、使用方式(写入 Stage、从 Stage 写入内表)及 Arrow 类型映射。
概述
Hologres 目前提供两种主要的数据导入模式,分别适用于不同业务场景:
Fixed Plan 实时流式写入:面向数据流式写入场景。端到端延迟低,通常在毫秒级,可满足强实时性要求,但资源开销相对较大。
Bulkload 离线批量导入:面向高吞吐、大规模数据加载场景,如离线数仓 T+1 数据同步。端到端延迟高,仅能满足离线业务需求。
若业务存在分钟级“近实时”需求,离线批量导入难以满足时效性;实时流式写入虽可满足,但会带来较大写入资源开销。Hologres V4.1 版本起,基于临时存储 Stage 实现“近实时”数据导入能力,可有效平衡时效性与资源开销。
Stage 介绍
Stage 是 Hologres 内部的高性能临时存储,具有以下核心特性:
内部托管:由 Hologres 自动管理,无需用户操作外部存储。
结构化缓存:按目标表的 Schema 组织数据,确保类型和格式一致性。
事务安全:支持 ACID 语义,保障写入过程的可靠性与一致性。
自动合并提交:Spark 等 Connector 支持自动将临时存储以优化批次方式写入主存储,提升效率。
通过以下流程实现 Stage 近实时导入:
外部数据写入 Stage。
Stage 数据攒批写入内表。
内表数据写入即可见。
Stage 优势
平衡延迟与吞吐:数据从写入到可查控制在分钟级,显著优于离线批量导入,同时支持远高于实时流式写入的吞吐能力。
简化数据链路:数据直接写入 Hologres 内部 Stage,无外部系统依赖;通过标准 SQL 的 COPY、INSERT 语法及官方 Connector 适配。
提升资源效率:Stage 小文件自动合并,提升后续读写性能;支持 Serverless Computing、独立计算组实现 Stage 写入资源隔离,提升系统稳定性。
适用场景广泛:Spark Connector、Flink Connector、Holo Client 均支持 Stage 近实时写入模式。
限制条件
仅支持 Arrow 格式通过 Stage 近实时导入 Hologres。Arrow 格式与 Hologres 的数据类型映射参见数据类型映射。
权限说明
SPM 或 SLPM:
创建 Stage:需具备 writer 及以上权限,或 Superuser。
写入/删除/读取 Stage:需是对应 Stage 的创建者且具备 writer 及以上权限,或 Superuser。
专家权限模型:
创建 Stage:需具备
pg_operate_internal_stages角色权限,或 Superuser。写入/删除/读取 Stage:需是对应 Stage 的创建者且具备
pg_operate_internal_stages角色权限,或 Superuser。
通过如下命令授予用户 pg_operate_internal_stages 角色权限:
-- 授予指定用户 Stage 操作权限
-- 可替换变量:<user_name> 替换为实际用户名(如 RAM 子账号名)
GRANT pg_operate_internal_stages TO "<user_name>";管理 Stage
创建 Stage
命令格式:
-- 创建 Internal Stage
-- 可替换变量:
-- <internal_stage_name>:Stage 名称,必填,最大 128 字符
-- <group_name>:分组名,可选,默认与 internal_stage_name 相同
-- <ttl_in_seconds>:生命周期(秒),可选,默认 7200,最大 864000
CALL HOLOGRES.HG_CREATE_INTERNAL_STAGE(
'<internal_stage_name>',
['<group_name>'],
['<ttl_in_seconds>']
);参数说明:
参数名 | 是否必填 | 说明 |
internal_stage_name | 必填 | Internal Stage 名称。支持字母(大小写敏感)、数字、下划线、连字符,最大长度 128 字符。 |
group_name | 非必填 | Stage 所属分组,用于分类管理。默认同 internal_stage_name。 |
ttl_in_seconds | 非必填 | Stage 生命周期,单位秒,从 last_modified_time(可从系统表 |
删除 Stage
命令格式:
-- 删除指定 Internal Stage
-- 可替换变量:<internal_stage_name> 替换为要删除的 Stage 名称
CALL HOLOGRES.HG_DROP_INTERNAL_STAGE(
'<internal_stage_name>'
);internal_stage_name:必填。Internal Stage 名称。
查询 Stage 状态
可通过系统视图 hologres.hg_internal_stages 查询 Stage 状态:
-- 查询全部 Stage 或按名称过滤
-- 可替换变量:<internal_stage_name> 替换为 Stage 名称时仅查询该 Stage;省略 WHERE 则查询全部
SELECT * FROM hologres.hg_internal_stages
[WHERE stage_name = '<internal_stage_name>'];包含如下字段:
字段名 | 含义 |
stage_name | Stage 名称 |
group_name | Stage 所属分组 |
ttl_in_seconds | Stage 生命周期(秒) |
create_time | Stage 创建时间 |
create_user | 创建 Stage 的用户 |
create_application_name | 创建 Stage 的应用程序 |
create_session_id | 创建 Stage 的会话 ID |
last_modified_time | Stage 最后一次更新时间 |
stage_bytes | Stage 存储大小(Byte) |
file_count | Stage 文件数 |
查询 Stage 文件
可通过系统视图 hologres.hg_internal_stage_files 查询 Stage 文件:
-- 查询 Stage 下文件列表,可按名称模糊过滤
-- 可替换变量:<internal_stage_name> Stage 名称;<pattern%> 文件名模糊匹配(如 'batch_%')
SELECT * FROM hologres.hg_internal_stage_files
[WHERE stage_name = '<internal_stage_name>']
[AND file_name like '<pattern%>'];包含如下字段:
字段名 | 含义 |
stage_name | Stage 名称 |
file_name | 文件名称 |
file_size | 文件存储大小(Byte) |
last_modified_time | 文件最后一次更新时间 |
is_complete | 文件是否写入成功。True 表示写入成功,False 表示正在写入或写入失败 |
删除 Stage 文件
可通过系统函数 hologres.hg_remove_internal_stage_file 删除 Stage 文件:
-- 删除指定文件
-- 可替换变量:<stage_name> Stage 名称;<file_name> 要删除的文件名
SELECT hologres.hg_remove_internal_stage_file ('<stage_name>', '<file_name>');
-- 批量删除 Stage 文件
-- 可替换变量:<stage_name> Stage 名称;<glob_pattern> 文件名通配(如 '*.arrow');<pattern%> WHERE 中的文件名模糊匹配
SELECT
stage_name,
file_name,
hologres.hg_remove_internal_stage_file (stage_name, file_name) AS hg_remove_internal_stage_file
FROM
hologres.hg_list_internal_stage_files ('<stage_name>',['<glob_pattern>'])
[WHERE file_name like '<pattern%>'];使用 Stage
客户端写入 Stage
命令格式:
-- 将客户端数据流写入 Stage 指定文件
-- 可替换变量:<internal_stage_name> 已创建的 Stage 名称;<file_name> 写入的文件名(支持字母、数字、下划线、连字符、英文句号,最大 128 字符)
COPY EXTERNAL_FILES(
path = 'internal_stage://<internal_stage_name>/<file_name>'
) FROM STDIN;通过 COPY 命令写入 Stage 时不支持配置 COPY 语法中的 WITH 参数。通过 path 参数定义 Stage 文件路径,支持字母(大小写敏感)、数字、下划线、连字符、英文句号,最大长度 128 字符。
读取 Stage 写入内表
命令格式:
-- 从 Stage 读取 Arrow 文件并写入内表
-- 可替换变量:
-- <table_name> 目标内表名
-- <col_name> 列名(可选,指定时与目标表列一一对应)
-- <internal_stage_name> Stage 名称,可写多个用逗号分隔以读取多个 Stage
-- <col_type> 列类型(可选,仅在 AS 子句中指定列类型时使用)
INSERT INTO <table_name> [ ( <col_name> [ , <col_name> ... ] ) ]
SELECT *
FROM EXTERNAL_FILES(
path = 'internal_stage://<internal_stage_name>, internal_stage://<internal_stage_name>',
format = arrow
)
[AS ( <col_name> <col_type>[ , <col_name> <col_type> ... ] )];读取 Stage 仅支持 Arrow 格式文件。
使用示例
通过 Holo-client 写入 Stage
以下示例演示如何使用 Holo-client 通过 Stage 进行近实时写入。
Maven 依赖:
<dependency>
<groupId>com.alibaba.hologres</groupId>
<artifactId>holo-client</artifactId>
<version>2.7.0</version>
</dependency>示例代码:
import com.alibaba.hologres.client.HoloClient;
import com.alibaba.hologres.client.HoloConfig;
import com.alibaba.hologres.client.Put;
import com.alibaba.hologres.client.copy.CopyUtil;
import com.alibaba.hologres.client.copy.in.CopyInStageWrapper;
import com.alibaba.hologres.client.copy.in.arrow.RecordArrowWriter;
import com.alibaba.hologres.client.model.OnConflictAction;
import com.alibaba.hologres.client.model.TableSchema;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class CopyStageDemo {
public static void main(String[] args) throws Exception {
// 注意:jdbcUrl需使用jdbc:hologres协议
String jdbcUrl = "jdbc:hologres://host:port/db";
String username = "";
String password = "";
/*
CREATE TABLE copy_stage_demo (id INT NOT NULL, name TEXT NOT NULL, address TEXT, PRIMARY KEY(id));
*/
String tableName = "copy_stage_demo";
HoloConfig config = new HoloConfig();
config.setJdbcUrl(jdbcUrl);
config.setUsername(username);
config.setPassword(password);
config.setRegion("local");
// 创建临时Stage名称
String stageName = "temp_stage_" + System.currentTimeMillis();
try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password);
HoloClient client = new HoloClient(config)) {
// 创建内部Stage
String createStageSql =
"call hologres.hg_create_internal_stage('"
+ stageName
+ "', 'default_group', 7200);";
try (java.sql.Statement stmt = conn.createStatement()) {
stmt.execute(createStageSql);
}
// 获取表结构
TableSchema schema = client.getTableSchema(tableName);
// 定义要写入的列
List<String> columns = new ArrayList<>();
columns.add("id");
columns.add("name");
columns.add("address");
// 使用RecordArrowWriter和CopyInStageWrapper写入数据到Stage
try (RecordArrowWriter arrowWriter =
new RecordArrowWriter(
schema,
columns,
8192 // maxBatchSize,每1024行数据组成一个Arrow RecordBatch
);
CopyInStageWrapper<com.alibaba.hologres.client.model.Record> copyInStage =
new CopyInStageWrapper<>(
config,
stageName,
"data_file", // 文件名前缀
arrowWriter,
64 * 1024 * 1024 // fileSizeLimit,每个文件大小64 MB
)) {
// 写入10条数据
for (int i = 0; i < 10; ++i) {
Put put = new Put(schema);
// 和CopyInStageWrapper的columns保持一致
put.setObject("id", i);
put.setObject("name", "name" + i);
put.setObject("address", "address" + i);
copyInStage.putRecord(put.getRecord());
}
// 程序结束之前需要调用close,保证数据完全写入
// demo使用了try-with-resources,无需手动close
// copyInStage.close();
}
// 生成从Stage写入目标表的INSERT语句
String insertSql =
CopyUtil.buildInsertTableSelectFromStageSql(
schema,
columns,
Collections.singletonList(stageName),
OnConflictAction.INSERT_OR_UPDATE);
try (java.sql.Statement stmt = conn.createStatement()) {
stmt.execute(insertSql);
}
// 验证写入结果
try (java.sql.Statement stmt = conn.createStatement()) {
try (java.sql.ResultSet rs = stmt.executeQuery("select * from " + tableName)) {
while (rs.next()) {
System.out.println(
"id: "
+ rs.getInt(1)
+ ", name: "
+ rs.getString(2)
+ ", address: "
+ rs.getString(3));
}
}
}
} finally {
// 清理临时Stage,不清理的话也会根据TTL自动清理
try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password);
java.sql.Statement stmt = conn.createStatement()) {
String dropStageSql = "call hologres.hg_drop_internal_stage('" + stageName + "');";
stmt.execute(dropStageSql);
} catch (Exception e) {
System.err.println("清理Stage时出错: " + e.getMessage());
}
}
}
}数据类型映射
通过 Stage 近实时导入时,Arrow 格式与 Hologres 数据类型对应关系如下。使用 Holo-client 进行写入时,已自动进行数据类型转换。
Hologres 数据类型 | Arrow 数据类型 |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
BOOLEAN | UINT8、BIT |
REAL(FLOAT4) | FLOAT4 |
DOUBLE PRECISION(FLOAT8) | FLOAT8 |
DATE | DateDay |
TIMETZ | FixedSizeBinary |
TIME | TimeMicro |
TIMESTAMP WITHOUT TIME ZONE | TimeStampMicro |
TIMESTAMP WITH TIME ZONE | DateMilli |
TEXT | VarChar |
CHAR(n) | — |
VARCHAR(n) | — |
JSON | — |
JSONB | — |
BYTEA | VarBINARY |
roaringbitmap | — |
NUMERIC(m,n) | DECIMAL(m,n) |
ARRAY(支持 INT、BIGINT、FLOAT、BOOLEAN、DOUBLE、STRING) | ARRAY<type> |