OSS Tables 相容 Apache Iceberg REST Catalog 協議。您可以通過 Flink 的 Iceberg Connector 對接 OSS Tables,使用 Flink SQL 進行建表、寫入和查詢操作,實現流式或批量資料入湖。本文以 Apache Flink 1.20 為例介紹接入流程。
步驟一:環境準備
下載依賴JAR包
將以下 JAR 包放入 Flink 的 $FLINK_HOME/lib 目錄,或在提交作業時通過 -C 參數指定。
JAR包 | 版本要求 | 說明 |
匹配iceberg版本 | Iceberg 的 Flink 運行時整合套件。請根據 Flink 版本選擇對應的包(如 Flink 1.20 對應 | |
匹配iceberg版本 | 提供 S3FileIO 實現及 REST Catalog SigV4 簽名認證所需的 AWS SDK。版本需與 Runtime 包一致。 | |
3.3.6 | Hadoop API 依賴(Iceberg 內部載入需要),版本可按需調整。 | |
3.3.6 | Hadoop 運行時依賴(Iceberg 內部載入需要),版本可按需調整。 |
配置環境變數
Iceberg REST Catalog 使用 SigV4 簽名認證,S3FileIO 訪問資料面也需要憑證。推薦通過環境變數統一傳遞,在啟動 Flink 之前設定以下環境變數:
環境變數名使用 AWS_ 首碼,是因為 Iceberg 的 SigV4 簽名模組和 S3FileIO 複用 AWS SDK 的標準憑證鏈。實際填入的是您阿里雲帳號的 AccessKey ID 和 AccessKey Secret。
export AWS_ACCESS_KEY_ID=<阿里雲AccessKey ID>
export AWS_SECRET_ACCESS_KEY=<阿里雲AccessKey Secret>
export AWS_REGION=<地區,例如cn-hangzhou>
export AWS_DEFAULT_REGION=<地區,例如cn-hangzhou>
# 可選,使用STS臨時憑證時配置
export AWS_SESSION_TOKEN=<阿里雲STS TOKEN>如果使用較高版本的 AWS SDK(2.20+),寫入資料時可能出現簽名錯誤:aws-chunked encoding is not supported with the specified x-amz-content-sha256 value。此時需要在 Flink 設定檔 conf/config.yaml 中添加以下 JVM 參數:
env.java.opts.all: "-Daws.requestChecksumCalculation=when_required -Daws.responseChecksumValidation=when_required"步驟二:建立Table Bucket
在開始寫入資料之前,需要建立 Table Bucket 和 Namespace。可以使用 ossutil 或 AWS CLI 建立。
方式一:使用ossutil
1. 安裝或升級 ossutil
請安裝ossutil 2.3.0以上版本,如已安裝 ossutil,可執行以下命令升級到最新版本:
ossutil update -f2. 配置憑證
執行 ossutil config 命令,按提示輸入 AccessKey ID、AccessKey Secret 和 Region。
3. 建立 Table Bucket
ossutil tables-api create-table-bucket --name {table bucket名稱} --endpoint http://{endpint} --region {region}命令執行成功後,返回結果中包含 Table Bucket ARN,請記錄該值。
4. 建立 Namespace
ossutil tables-api create-namespace --table-bucket-arn {Table Bucket ARN} --namespace {Namespace名稱} --endpoint http://{endpint}Namespace 和 Table 名稱不能包含連字號(-),可使用底線(_),這是因為名稱會用於 SQL 陳述式中的標識符。
5. 建立 Table
您可以選擇以下任一方式建立 Iceberg 表:
通過其他計算引擎建立(如 Spark)。
通過 ossutil 建立:先將表 schema 儲存為 JSON 檔案,再調用
create-table。以下樣本的 schema 檔案
schema.json定義了 3 個欄位:{ "iceberg": { "schema": { "fields": [ {"name": "event_id", "type": "string", "required": true}, {"name": "event_time", "type": "string"}, {"name": "event_type", "type": "string"} ] } } }基於 schema 檔案建立 Table:
ossutil tables-api create-table --table-bucket-arn {bucketArn} --namespace {namespace名稱} --name {表名稱} --format ICEBERG --metadata file://{檔案路徑} --endpoint --endpoint http://{endpint}
方式二:使用AWS CLI
OSS Tables 相容 S3 Tables API,也可以使用 AWS CLI 管理 Table Bucket。
1. 安裝 AWS CLI
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
unzip awscliv2.zip
sudo ./aws/install2. 配置憑證
執行 aws configure 命令,按提示輸入 AccessKey ID、AccessKey Secret 和 Region。
3. 建立 Table Bucket
aws s3tables --endpoint http://{endpint} create-table-bucket --region {region} --name {table bucket名稱}命令執行成功後,返回結果中包含 Table Bucket ARN。
4. 建立 Namespace
aws s3tables --endpoint http://{endpoint} create-namespace --table-bucket-arn {Table Bucket ARN} --namespace {namespace名稱}5. 建立 Table
通過其他計算引擎(如 Spark)建立表
使用 AWS CLI 建立。使用 AWS CLI 時,先將完整的入參儲存為 JSON 檔案
create-table.json,再調用create-table。{ "tableBucketARN": "{BucketArn}", "namespace": "{namespace名稱}", "name": "{表明}", "format": "ICEBERG", "metadata": { "iceberg": { "schema": { "fields": [ {"name": "event_id", "type": "string","required": true}, {"name": "event_time", "type": "string"}, {"name": "event_type", "type": "string"} ] } } } }aws s3tables --endpoint http://{endpoint} create-table --cli-input-json file://{檔案路徑}
6. 管理後台維護任務
OSS Tables 支援自動執行 Iceberg 表的後台維護(如檔案清理、檔案合并等),通過 AWS CLI 可以查詢和配置維護任務。
查詢 Table 維護任務狀態:
aws s3tables get-table-maintenance-job-status \
--table-bucket-arn="{bucketArn}" \
--namespace="{namespace名稱}" \
--name="{表名}" 配置 Bucket 級維護策略(檔案清理):
aws s3tables put-table-bucket-maintenance-configuration \
--table-bucket-arn {tableArn} \
--type icebergUnreferencedFileRemoval \
--value '{"status":"enabled","settings":{"icebergUnreferencedFileRemoval":{"unreferencedDays":4,"nonCurrentDays":10}}}' 配置 Table 級維護策略(小檔案合并):
aws s3tables put-table-maintenance-configuration \
--table-bucket-arn {bucketArn} \
--type icebergCompaction \
--namespace {namespace名稱} \
--name {表名} \
--value='{"status":"enabled","settings":{"icebergCompaction":{"targetFileSizeMB":256}}}'步驟三:配置Flink
OSS Tables 提供 Iceberg REST Catalog 端點,Flink 通過 Iceberg Connector 的 RESTCatalog 實現串連。Endpoint格式如下:
內網:
https://{tableBucketName}.{region}-internal.oss-tables.aliyuncs.com/iceberg外網:
https://{tableBucketName}.{region}.oss-tables.aliyuncs.com/iceberg
OSS Tables 提供S3FileIO訪問OSS資料面使用的訪問端點,Spark 通過該端點訪問表資料。Endpoint格式如下:
內網:
https://{region}-internal.oss.aliyuncs.com外網:
https://{region}.oss.aliyuncs.com
3.1 配置Flink進程參數
在 flink-conf.yaml 中添加以下配置,用於 S3FileIO 的 Region 和憑證提供方式:
參數 | 是否必填 | 說明 |
| 是 | S3FileIO 使用的地區。例如 |
| 是 | 憑證提供方式。固定為 |
| 是 | Hadoop S3A 檔案系統使用的地區。例如 |
3.2 建立Catalog
在 Flink SQL 中執行以下語句建立 Catalog:
CREATE CATALOG mycatalog WITH (
'type' = 'iceberg',
'catalog-impl' = 'org.apache.iceberg.rest.RESTCatalog',
'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO',
'uri' = 'https://{tableBucketName}.{region}-internal.oss-tables.aliyuncs.com/iceberg',
'warehouse' = '<Table Bucket ARN>',
'rest.sigv4-enabled' = 'true',
'rest.signing-name' = 'osstables',
'rest.signing-region' = '<Region>',
's3.endpoint' = 'https://{region}-internal.oss.aliyuncs.com',
's3.path-style-access' = 'false'
);配置參數說明
參數 | 是否必填 | 說明 |
| 是 | 固定為 |
| 是 | 固定為 |
| 是 | 固定為 |
| 是 | REST Catalog 端點 URL。格式:
|
| 是 | Table Bucket ARN。格式: |
| 是 | 固定為 |
| 是 | 固定為 |
| 是 | SigV4 簽名地區。例如 |
| 是 | OSS 資料面端點。格式:
|
| 否 | 是否使用 Path-Style 訪問模式。預設為 |
4.4 建表示例
CREATE TABLE IF NOT EXISTS mycatalog.<Namespace>.<表名> (
event_id STRING,
event_time STRING,
event_type STRING
) WITH (
'format-version' = '2',
'write.format.default' = 'parquet',
'write.target-file-size-bytes' = '33554432', -- 32 MB(不要 128MB)
'write.parquet.row-group-size-bytes' = '8388608' -- 8 MB
);建議將 write.target-file-size-bytes 設定為 32 MB(33554432),避免產生過大的檔案影響後續維護任務的效率。
許可權配置
使用 RAM 使用者或 STS 臨時憑證訪問 OSS Tables 時,需確保對應身份具備所需的操作許可權。
資源定義
Table Bucket ARN:
acs:osstables:<Region>:<阿里雲帳號ID>:bucket/<bucket_name>Table ARN:
acs:osstables:<Region>:<阿里雲帳號ID>:bucket/<bucket_name>/table/<table_id>
Action 定義
下表列出 OSS Tables 支援的 Action,及其是否支援跨帳號授權:
分類 | Action | 跨帳號訪問 |
Table Bucket 層級 |
| 不允許 |
| 允許 | |
| 不允許 | |
| 允許 | |
| 允許 | |
| 允許 | |
| 允許 | |
| 允許 | |
| 不允許 | |
| 不允許 | |
| 不允許 | |
| 允許 | |
| 允許 | |
| 不允許 | |
| 不允許 | |
| 不允許 | |
Table 層級 |
| 允許 |
| 允許 | |
| 不允許 | |
| 不允許 | |
| 不允許 | |
| 允許 | |
| 允許 | |
| 允許 | |
| 允許 | |
| 允許 | |
| 允許 | |
| 允許 | |
| 允許 | |
| 不允許 | |
| 不允許 | |
| 允許 |
Iceberg REST操作與許可權映射
下表列出 Iceberg REST Catalog 各操作所需的 OSS Action:
Iceberg REST 操作 | 所需 OSS Action |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|