全部产品
Search
文档中心

对象存储 OSS:通过Flink访问OSS Tables

更新时间:May 26, 2026

OSS Tables 兼容 Apache Iceberg REST Catalog 协议。您可以通过 Flink 的 Iceberg Connector 对接 OSS Tables,使用 Flink SQL 进行建表、写入和查询操作,实现流式或批量数据入湖。本文以 Apache Flink 1.20 为例介绍接入流程。

步骤二:创建Table Bucket

在开始写入数据之前,需要创建 Table Bucket 和 Namespace。可以使用 ossutil 或 AWS CLI 创建。

方式一:使用ossutil

1. 安装或升级 ossutil

安装ossutil 2.3.0以上版本,如已安装 ossutil,可执行以下命令升级到最新版本:

ossutil update -f

2. 配置凭证

执行 ossutil config 命令,按提示输入 AccessKey ID、AccessKey Secret 和 Region。

3. 创建 Table Bucket

ossutil tables-api create-table-bucket --name {table bucket名称} --endpoint http://{endpint} --region {region}

命令执行成功后,返回结果中包含 Table Bucket ARN,请记录该值。

4. 创建 Namespace

ossutil tables-api create-namespace --table-bucket-arn {Table Bucket ARN} --namespace {Namespace名称} --endpoint http://{endpint}
重要

Namespace 和 Table 名称不能包含连字符(-),可使用下划线(_),这是因为名称会用于 SQL 语句中的标识符。

5. 创建 Table

您可以选择以下任一方式创建 Iceberg 表:

  • 通过其他计算引擎创建(如 Spark)。

  • 通过 ossutil 创建:先将表 schema 保存为 JSON 文件,再调用 create-table

    以下示例的 schema 文件 schema.json 定义了 3 个字段:

    {
      "iceberg": {
        "schema": {
          "fields": [
            {"name": "event_id", "type": "string", "required": true},
            {"name": "event_time", "type": "string"},
            {"name": "event_type", "type": "string"}
          ]
        }
      }
    }

    基于 schema 文件创建 Table:

    ossutil tables-api create-table --table-bucket-arn {bucketArn} --namespace {namespace名称} --name  {表名称} --format ICEBERG --metadata file://{文件路径} --endpoint --endpoint http://{endpint}

方式二:使用AWS CLI

OSS Tables 兼容 S3 Tables API,也可以使用 AWS CLI 管理 Table Bucket。

1. 安装 AWS CLI

curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
unzip awscliv2.zip
sudo ./aws/install

2. 配置凭证

执行 aws configure 命令,按提示输入 AccessKey ID、AccessKey Secret 和 Region。

3. 创建 Table Bucket

aws s3tables --endpoint http://{endpint} create-table-bucket --region {region} --name {table bucket名称}

命令执行成功后,返回结果中包含 Table Bucket ARN。

4. 创建 Namespace

aws s3tables --endpoint http://{endpoint} create-namespace --table-bucket-arn {Table Bucket ARN} --namespace {namespace名称}

5. 创建 Table

  • 通过其他计算引擎(如 Spark)创建表

  • 使用 AWS CLI 创建。使用 AWS CLI 时,先将完整的入参保存为 JSON 文件 create-table.json,再调用 create-table

    {
        "tableBucketARN": "{BucketArn}",
        "namespace": "{namespace名称}",
        "name": "{表明}",
        "format": "ICEBERG",
        "metadata": {
            "iceberg": {
                "schema": {
                    "fields": [
                         {"name": "event_id", "type": "string","required": true},
                         {"name": "event_time", "type": "string"},
                         {"name": "event_type", "type": "string"}
                    ]
                }
            }
        }
    }
    aws s3tables --endpoint http://{endpoint} create-table --cli-input-json file://{文件路径}

6. 管理后台维护任务

OSS Tables 支持自动执行 Iceberg 表的后台维护(如文件清理、文件合并等),通过 AWS CLI 可以查询和配置维护任务。

查询 Table 维护任务状态:

aws s3tables get-table-maintenance-job-status \
   --table-bucket-arn="{bucketArn}" \
   --namespace="{namespace名称}" \
   --name="{表名}" 

配置 Bucket 级维护策略(文件清理):

aws s3tables put-table-bucket-maintenance-configuration \
   --table-bucket-arn {tableArn} \
   --type icebergUnreferencedFileRemoval \
   --value '{"status":"enabled","settings":{"icebergUnreferencedFileRemoval":{"unreferencedDays":4,"nonCurrentDays":10}}}' 

配置 Table 级维护策略(小文件合并):

aws s3tables put-table-maintenance-configuration \
   --table-bucket-arn {bucketArn} \
   --type icebergCompaction \
   --namespace {namespace名称} \
   --name {表名} \
   --value='{"status":"enabled","settings":{"icebergCompaction":{"targetFileSizeMB":256}}}'