OSS Tables 兼容 Apache Iceberg REST Catalog 协议。您可以通过 Flink 的 Iceberg Connector 对接 OSS Tables,使用 Flink SQL 进行建表、写入和查询操作,实现流式或批量数据入湖。本文以 Apache Flink 1.20 为例介绍接入流程。
步骤一:环境准备
下载依赖JAR包
将以下 JAR 包放入 Flink 的 $FLINK_HOME/lib 目录,或在提交作业时通过 -C 参数指定。
JAR包 | 版本要求 | 说明 |
匹配iceberg版本 | Iceberg 的 Flink 运行时集成包。请根据 Flink 版本选择对应的包(如 Flink 1.20 对应 | |
匹配iceberg版本 | 提供 S3FileIO 实现及 REST Catalog SigV4 签名认证所需的 AWS SDK。版本需与 Runtime 包一致。 | |
3.3.6 | Hadoop API 依赖(Iceberg 内部加载需要),版本可按需调整。 | |
3.3.6 | Hadoop 运行时依赖(Iceberg 内部加载需要),版本可按需调整。 |
配置环境变量
Iceberg REST Catalog 使用 SigV4 签名认证,S3FileIO 访问数据面也需要凭证。推荐通过环境变量统一传递,在启动 Flink 之前设置以下环境变量:
环境变量名使用 AWS_ 前缀,是因为 Iceberg 的 SigV4 签名模块和 S3FileIO 复用 AWS SDK 的标准凭证链。实际填入的是您阿里云账号的 AccessKey ID 和 AccessKey Secret。
export AWS_ACCESS_KEY_ID=<阿里云AccessKey ID>
export AWS_SECRET_ACCESS_KEY=<阿里云AccessKey Secret>
export AWS_REGION=<地域,例如cn-hangzhou>
export AWS_DEFAULT_REGION=<地域,例如cn-hangzhou>
# 可选,使用STS临时凭证时配置
export AWS_SESSION_TOKEN=<阿里云STS TOKEN>如果使用较高版本的 AWS SDK(2.20+),写入数据时可能出现签名错误:aws-chunked encoding is not supported with the specified x-amz-content-sha256 value。此时需要在 Flink 配置文件 conf/config.yaml 中添加以下 JVM 参数:
env.java.opts.all: "-Daws.requestChecksumCalculation=when_required -Daws.responseChecksumValidation=when_required"步骤二:创建Table Bucket
在开始写入数据之前,需要创建 Table Bucket 和 Namespace。可以使用 ossutil 或 AWS CLI 创建。
方式一:使用ossutil
1. 安装或升级 ossutil
请安装ossutil 2.3.0以上版本,如已安装 ossutil,可执行以下命令升级到最新版本:
ossutil update -f2. 配置凭证
执行 ossutil config 命令,按提示输入 AccessKey ID、AccessKey Secret 和 Region。
3. 创建 Table Bucket
ossutil tables-api create-table-bucket --name {table bucket名称} --endpoint http://{endpint} --region {region}命令执行成功后,返回结果中包含 Table Bucket ARN,请记录该值。
4. 创建 Namespace
ossutil tables-api create-namespace --table-bucket-arn {Table Bucket ARN} --namespace {Namespace名称} --endpoint http://{endpint}Namespace 和 Table 名称不能包含连字符(-),可使用下划线(_),这是因为名称会用于 SQL 语句中的标识符。
5. 创建 Table
您可以选择以下任一方式创建 Iceberg 表:
通过其他计算引擎创建(如 Spark)。
通过 ossutil 创建:先将表 schema 保存为 JSON 文件,再调用
create-table。以下示例的 schema 文件
schema.json定义了 3 个字段:{ "iceberg": { "schema": { "fields": [ {"name": "event_id", "type": "string", "required": true}, {"name": "event_time", "type": "string"}, {"name": "event_type", "type": "string"} ] } } }基于 schema 文件创建 Table:
ossutil tables-api create-table --table-bucket-arn {bucketArn} --namespace {namespace名称} --name {表名称} --format ICEBERG --metadata file://{文件路径} --endpoint --endpoint http://{endpint}
方式二:使用AWS CLI
OSS Tables 兼容 S3 Tables API,也可以使用 AWS CLI 管理 Table Bucket。
1. 安装 AWS CLI
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
unzip awscliv2.zip
sudo ./aws/install2. 配置凭证
执行 aws configure 命令,按提示输入 AccessKey ID、AccessKey Secret 和 Region。
3. 创建 Table Bucket
aws s3tables --endpoint http://{endpint} create-table-bucket --region {region} --name {table bucket名称}命令执行成功后,返回结果中包含 Table Bucket ARN。
4. 创建 Namespace
aws s3tables --endpoint http://{endpoint} create-namespace --table-bucket-arn {Table Bucket ARN} --namespace {namespace名称}5. 创建 Table
通过其他计算引擎(如 Spark)创建表
使用 AWS CLI 创建。使用 AWS CLI 时,先将完整的入参保存为 JSON 文件
create-table.json,再调用create-table。{ "tableBucketARN": "{BucketArn}", "namespace": "{namespace名称}", "name": "{表明}", "format": "ICEBERG", "metadata": { "iceberg": { "schema": { "fields": [ {"name": "event_id", "type": "string","required": true}, {"name": "event_time", "type": "string"}, {"name": "event_type", "type": "string"} ] } } } }aws s3tables --endpoint http://{endpoint} create-table --cli-input-json file://{文件路径}
6. 管理后台维护任务
OSS Tables 支持自动执行 Iceberg 表的后台维护(如文件清理、文件合并等),通过 AWS CLI 可以查询和配置维护任务。
查询 Table 维护任务状态:
aws s3tables get-table-maintenance-job-status \
--table-bucket-arn="{bucketArn}" \
--namespace="{namespace名称}" \
--name="{表名}" 配置 Bucket 级维护策略(文件清理):
aws s3tables put-table-bucket-maintenance-configuration \
--table-bucket-arn {tableArn} \
--type icebergUnreferencedFileRemoval \
--value '{"status":"enabled","settings":{"icebergUnreferencedFileRemoval":{"unreferencedDays":4,"nonCurrentDays":10}}}' 配置 Table 级维护策略(小文件合并):
aws s3tables put-table-maintenance-configuration \
--table-bucket-arn {bucketArn} \
--type icebergCompaction \
--namespace {namespace名称} \
--name {表名} \
--value='{"status":"enabled","settings":{"icebergCompaction":{"targetFileSizeMB":256}}}'步骤三:配置Flink
OSS Tables 提供 Iceberg REST Catalog 端点,Flink 通过 Iceberg Connector 的 RESTCatalog 实现连接。Endpoint格式如下:
内网:
https://{tableBucketName}.{region}-internal.oss-tables.aliyuncs.com/iceberg外网:
https://{tableBucketName}.{region}.oss-tables.aliyuncs.com/iceberg
OSS Tables 提供S3FileIO访问OSS数据面使用的访问端点,Spark 通过该端点访问表数据。Endpoint格式如下:
内网:
https://{region}-internal.oss.aliyuncs.com外网:
https://{region}.oss.aliyuncs.com
3.1 配置Flink进程参数
在 flink-conf.yaml 中添加以下配置,用于 S3FileIO 的 Region 和凭证提供方式:
参数 | 是否必填 | 说明 |
| 是 | S3FileIO 使用的地域。例如 |
| 是 | 凭证提供方式。固定为 |
| 是 | Hadoop S3A 文件系统使用的地域。例如 |
3.2 创建Catalog
在 Flink SQL 中执行以下语句创建 Catalog:
CREATE CATALOG mycatalog WITH (
'type' = 'iceberg',
'catalog-impl' = 'org.apache.iceberg.rest.RESTCatalog',
'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO',
'uri' = 'https://{tableBucketName}.{region}-internal.oss-tables.aliyuncs.com/iceberg',
'warehouse' = '<Table Bucket ARN>',
'rest.sigv4-enabled' = 'true',
'rest.signing-name' = 'osstables',
'rest.signing-region' = '<Region>',
's3.endpoint' = 'https://{region}-internal.oss.aliyuncs.com',
's3.path-style-access' = 'false'
);配置参数说明
参数 | 是否必填 | 说明 |
| 是 | 固定为 |
| 是 | 固定为 |
| 是 | 固定为 |
| 是 | REST Catalog 端点 URL。格式:
|
| 是 | Table Bucket ARN。格式: |
| 是 | 固定为 |
| 是 | 固定为 |
| 是 | SigV4 签名地域。例如 |
| 是 | OSS 数据面端点。格式:
|
| 否 | 是否使用 Path-Style 访问模式。默认为 |
4.4 建表示例
CREATE TABLE IF NOT EXISTS mycatalog.<Namespace>.<表名> (
event_id STRING,
event_time STRING,
event_type STRING
) WITH (
'format-version' = '2',
'write.format.default' = 'parquet',
'write.target-file-size-bytes' = '33554432', -- 32 MB(不要 128MB)
'write.parquet.row-group-size-bytes' = '8388608' -- 8 MB
);建议将 write.target-file-size-bytes 设置为 32 MB(33554432),避免产生过大的文件影响后续维护任务的效率。
权限配置
使用 RAM 用户或 STS 临时凭证访问 OSS Tables 时,需确保对应身份具备所需的操作权限。
资源定义
Table Bucket ARN:
acs:osstables:<Region>:<阿里云账号ID>:bucket/<bucket_name>Table ARN:
acs:osstables:<Region>:<阿里云账号ID>:bucket/<bucket_name>/table/<table_id>
Action 定义
下表列出 OSS Tables 支持的 Action,及其是否支持跨账号授权:
分类 | Action | 跨账号访问 |
Table Bucket 级别 |
| 不允许 |
| 允许 | |
| 不允许 | |
| 允许 | |
| 允许 | |
| 允许 | |
| 允许 | |
| 允许 | |
| 不允许 | |
| 不允许 | |
| 不允许 | |
| 允许 | |
| 允许 | |
| 不允许 | |
| 不允许 | |
| 不允许 | |
Table 级别 |
| 允许 |
| 允许 | |
| 不允许 | |
| 不允许 | |
| 不允许 | |
| 允许 | |
| 允许 | |
| 允许 | |
| 允许 | |
| 允许 | |
| 允许 | |
| 允许 | |
| 允许 | |
| 不允许 | |
| 不允许 | |
| 允许 |
Iceberg REST操作与权限映射
下表列出 Iceberg REST Catalog 各操作所需的 OSS Action:
Iceberg REST 操作 | 所需 OSS Action |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|