All Products
Search
Document Center

Object Storage Service:Write data to OSS Tables using Kafka Connect

Last Updated:May 19, 2026

OSS Tables is compatible with the Apache Iceberg REST Catalog protocol. You can use the Iceberg Sink Connector for Kafka Connect to write Kafka messages to tables in OSS Tables in real time. This lets you stream data into your data lake.

Step 1: Prepare the environment

Download dependency JAR packages

Place the following JAR packages into the plugin directory of Kafka Connect, which is the path specified by plugin.path.

JAR package

Version requirements

Description

iceberg-aws-bundle-1.10.1.jar

Must match the Iceberg version

Provides the S3FileIO implementation and AWS SDK needed for SigV4 signature authentication with the Iceberg REST Catalog.

iceberg-aws-1.10.1.jar

Must match the Iceberg version

Provides SigV4 signing and the S3FileIO implementation. The version must match the iceberg-aws-bundle package.

iceberg-parquet-1.10.1.jar

Must match the Iceberg version

Supports writing data in the Parquet file format.

hadoop-client-runtime-3.3.6.jar

3.3.6

A Hadoop runtime dependency required by Iceberg for internal loading. You can adjust the version as needed.

hadoop-client-api-3.3.6.jar

3.3.6

A Hadoop API dependency required by Iceberg for internal loading. You can adjust the version as needed.

failsafe-3.3.2.jar

3.3.2

This dependency is required by the Iceberg SnapshotProducer at runtime. A missing dependency causes a ClassNotFoundException.

Step 2: Create a Table Bucket

To write data, you must first create a Table Bucket and a Namespace. You can use ossutil or the AWS CLI to perform these actions.

Method 1: Using ossutil

1. Install or upgrade ossutil

Install ossutil 2.3.0 or later. If you have already installed ossutil, run the following command to upgrade it to the latest version:

ossutil update -f

2. Configure credentials

Run the ossutil config command and follow the prompts to enter your AccessKey ID, AccessKey Secret, and region.

3. Create a Table Bucket

ossutil tables-api create-table-bucket --name <table_bucket_name> --endpoint http://<endpoint> --region <region>

After the command runs successfully, the output contains the Table Bucket ARN. Save this value.

4. Create a Namespace

ossutil tables-api create-namespace --table-bucket-arn <table_bucket_arn> --namespace <namespace_name> --endpoint http://<endpoint>
Important

Namespace and table names cannot include hyphens (-). You can use underscores (_) instead. This is because these names are used as identifiers in SQL statements.

5. Create a table

You can create an Iceberg table in one of two ways:

  • Create a table using another compute engine, such as Spark.

  • To create a table using ossutil, save the table schema to a JSON file and then run the create-table command.

    In the following example, the schema file schema.json defines three fields:

    {
      "iceberg": {
        "schema": {
          "fields": [
            {"name": "event_id", "type": "string", "required": true},
            {"name": "event_time", "type": "string"},
            {"name": "event_type", "type": "string"}
          ]
        }
      }
    }

    Create a table based on the schema file:

    ossutil tables-api create-table --table-bucket-arn <table_bucket_arn> --namespace <namespace_name> --name <table_name> --format ICEBERG --metadata file://schema.json --endpoint http://<region>-internal.oss-tables.aliyuncs.com

Method 2: Using the AWS CLI

OSS Tables is compatible with the S3 Tables API. You can also use the AWS CLI to manage Table Buckets.

1. Install the AWS CLI

curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
unzip awscliv2.zip
sudo ./aws/install

2. Configure credentials

Run the aws configure command and follow the prompts to enter your AccessKey ID, AccessKey Secret, and region.

3. Create a Table Bucket

aws s3tables --endpoint http://<region>.oss-tables.aliyuncs.com create-table-bucket --region <region> --name <table_bucket_name>

After the command runs successfully, the result contains the Table Bucket ARN.

4. Create a Namespace

aws s3tables --endpoint http://<region>.oss-tables.aliyuncs.com create-namespace --table-bucket-arn <table_bucket_arn> --namespace <namespace_name>

5. Create a table

  • Create a table by using another compute engine, such as Spark.

  • To create a table using the AWS CLI, save the full input parameters to a JSON file named create-table.json and then run the create-table command.

    {
      "tableBucketARN": "<table_bucket_arn>",
      "namespace": "<namespace_name>",
      "name": "<table_name>",
      "format": "ICEBERG",
      "metadata": {
        "iceberg": {
          "schema": {
            "fields": [
              {"name": "event_id", "type": "string", "required": true},
              {"name": "event_time", "type": "string"},
              {"name": "event_type", "type": "string"}
            ]
          }
        }
      }
    }
    aws s3tables --endpoint http://<region>.oss-tables.aliyuncs.com create-table --cli-input-json file://create-table.json

6. Manage background maintenance tasks

OSS Tables supports automatic background maintenance for Iceberg tables, such as file cleanup and file compaction. You can use the AWS CLI to query and configure these maintenance tasks.

Query the status of a table maintenance task:

aws s3tables get-table-maintenance-job-status \
   --table-bucket-arn=<table_bucket_arn> \
   --namespace=<namespace_name> \
   --name=<table_name>

Configure a bucket-level maintenance configuration (file cleanup):

aws s3tables put-table-bucket-maintenance-configuration \
   --table-bucket-arn <table_bucket_arn> \
   --type icebergUnreferencedFileRemoval \
   --value '{"status":"enabled","settings":{"icebergUnreferencedFileRemoval":{"unreferencedDays":4,"nonCurrentDays":10}}}'

Configure a table-level maintenance configuration (small file compaction):

aws s3tables put-table-maintenance-configuration \
   --table-bucket-arn <table_bucket_arn> \
   --type icebergCompaction \
   --namespace <namespace_name> \
   --name <table_name> \
   --value='{"status":"enabled","settings":{"icebergCompaction":{"targetFileSizeMB":256}}}'

Step 3: Configure Kafka Connect

OSS Tables provides an Iceberg REST Catalog endpoint. Kafka Connect uses the Iceberg Sink Connector to connect to this endpoint and write data.

  • Public endpoint:

    https://<REGION>.oss-tables.aliyuncs.com/iceberg
  • Internal endpoint:

    https://<REGION>-internal.oss-tables.aliyuncs.com/iceberg

Connector configuration

When you create an Iceberg Sink Connector, set the connector class to org.apache.iceberg.connect.IcebergSinkConnector and configure the following properties:

# --- Iceberg Catalog (REST) ---
iceberg.catalog.type: rest
iceberg.catalog.uri: https://<REGION>-internal.oss-tables.aliyuncs.com/iceberg
iceberg.catalog.rest.sigv4-enabled: true
iceberg.catalog.rest.signing-region: <region>
iceberg.catalog.warehouse: <table_bucket_arn>
iceberg.catalog.rest.signing-name: osstables
iceberg.catalog.rest.access-key-id: <access_key_id>
iceberg.catalog.rest.secret-access-key: <access_key_secret>


# --- Force S3FileIO (catalog returns oss:// but storage is S3-compatible) ---
iceberg.catalog.io-impl: org.apache.iceberg.aws.s3.S3FileIO

# --- S3FileIO storage configuration ---
iceberg.catalog.s3.endpoint: https://oss-<region>-internal.aliyuncs.com
iceberg.catalog.s3.access-key-id: <access_key_id>
iceberg.catalog.s3.secret-access-key: <access_key_secret>
iceberg.catalog.s3.path-style-access: true
iceberg.catalog.client.region: <region>

# --- Data format conversion ---
key.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: false
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: false
Important

If you use a newer version of the AWS SDK (2.20+), a signature error may occur: aws-chunked encoding is not supported with the specified x-amz-content-sha256 value. In this case, you must add the following JVM options to the Java startup parameters for Kafka Connect:

-Daws.requestChecksumCalculation=when_required
-Daws.responseChecksumValidation=when_required

Parameters

Parameter

Required

Description

iceberg.catalog.type

Yes

Must be set to rest to use the Iceberg REST Catalog.

iceberg.catalog.uri

Yes

The URL of the REST Catalog endpoint. Format:

  • Internal: https://{region}-internal.oss-tables.aliyuncs.com/iceberg

  • Public: https://{region}.oss-tables.aliyuncs.com/iceberg

iceberg.catalog.warehouse

Yes

The Table Bucket ARN. Format: acs:osstables:<region>:<alibaba_cloud_account_id>:bucket/<table_bucket_name>.

iceberg.catalog.rest.sigv4-enabled

Yes

Must be set to true to enable SigV4 signature authentication.

iceberg.catalog.rest.signing-name

Yes

Must be set to osstables. The service name for SigV4 signing for the OSS Tables service endpoint.

iceberg.catalog.io-impl

Yes

Must be set to org.apache.iceberg.aws.s3.S3FileIO to access the OSS data plane using the S3 protocol.

iceberg.catalog.s3.endpoint

Yes

The OSS data plane endpoint. Format:

  • Internal: https://oss-{region}-internal.aliyuncs.com

  • Public: https://oss-{region}.aliyuncs.com.

iceberg.catalog.s3.path-style-access

Yes

Must be set to true to use path-style access.

Permissions configuration

If you use a RAM user or STS temporary credentials to access OSS Tables, ensure the identity has the required permissions.

Resource definitions

  • Table Bucket ARN: acs:osstables:<region>:<alibaba_cloud_account_id>:bucket/<bucket_name>

  • Table ARN: acs:osstables:<region>:<alibaba_cloud_account_id>:bucket/<bucket_name>/table/<table_id>

Action definitions

This table lists the Actions that OSS Tables supports and indicates whether each Action allows cross-account access.

Category

Action

Cross-account access

Table Bucket level

oss:CreateTableBucket

Not allowed

oss:GetTableBucket

Allowed

oss:ListTableBuckets

Not allowed

oss:CreateNamespace

Allowed

oss:GetNamespace

Allowed

oss:ListNamespaces

Allowed

oss:DeleteNamespace

Allowed

oss:DeleteTableBucket

Allowed

oss:PutTableBucketPolicy

Not allowed

oss:GetTableBucketPolicy

Not allowed

oss:DeleteTableBucketPolicy

Not allowed

oss:GetTableBucketMaintenanceConfiguration

Allowed

oss:PutTableBucketMaintenanceConfiguration

Allowed

oss:PutTableBucketEncryption

Not allowed

oss:GetTableBucketEncryption

Not allowed

oss:DeleteTableBucketEncryption

Not allowed

Table level

oss:GetTableMaintenanceConfiguration

Allowed

oss:PutTableMaintenanceConfiguration

Allowed

oss:PutTablePolicy

Not allowed

oss:GetTablePolicy

Not allowed

oss:DeleteTablePolicy

Not allowed

oss:CreateTable

Allowed

oss:GetTable

Allowed

oss:GetTableMetadataLocation

Allowed

oss:ListTables

Allowed

oss:RenameTable

Allowed

oss:UpdateTableMetadataLocation

Allowed

oss:GetTableData

Allowed

oss:PutTableData

Allowed

oss:GetTableEncryption

Not allowed

oss:PutTableEncryption

Not allowed

oss:DeleteTable

Allowed

Iceberg REST operation to permission mapping

This table maps each Iceberg REST Catalog operation to the required OSS Actions.

Iceberg REST operation

Required OSS action

getConfig

oss:GetTableBucket

listNamespaces

oss:ListNamespaces

createNamespace

oss:CreateNamespace

loadNamespaceMetadata

oss:GetNamespace

dropNamespace

oss:DeleteNamespace

listTables

oss:ListTables

createTable

oss:CreateTable, oss:PutTableData

loadTable

oss:GetTableMetadataLocation, oss:GetTableData

updateTable

oss:UpdateTableMetadataLocation, oss:PutTableData, oss:GetTableData

dropTable

oss:DeleteTable

renameTable

oss:RenameTable

tableExists

oss:GetTable

namespaceExists

oss:GetNamespace