All Products
Search
Document Center

Realtime Compute for Apache Flink:Object Storage Service (OSS)

Last Updated:Dec 19, 2025

This topic describes how to use the Object Storage Service (OSS) connector.

Alibaba Cloud Object Storage Service (OSS) is a highly reliable, secure, and cost-effective cloud storage service that provides massive storage capacity. It is designed to deliver 99.9999999999% (twelve 9s) data durability and 99.995% data availability. You can select from multiple storage classes to optimize your storage costs.

Category

Details

Supported types

Source and sink tables

Running modes

Batch and streaming modes

Data formats

Orc, Parquet, Avro, Csv, JSON, and Raw

Specific monitoring metrics

None

API types

Datastream and SQL

Update or delete data in sink tables

Updating and deleting data in sink tables is not supported. You can only insert data.

Limits

  • General

    • Only Ververica Runtime (VVR) 11 and later versions support reading compressed files (GZIP, BZIP2, XZ, and DEFLATE) from OSS. VVR 8 cannot correctly process compressed files.

    • Flink compute engine versions earlier than VVR 8.0.6 can read from or write to OSS only within the same account. To read from or write to OSS in other accounts, you must use a Flink compute engine of VVR 8.0.6 or later and configure bucket authentication information. For more information, see Configure bucket authentication information.

  • Sink tables only

    • When writing to OSS, row-based formats such as Avro, CSV, JSON, and Raw are not supported. For more information, see FLINK-30635.

Syntax

CREATE TABLE OssTable (
  column_name1 INT,
  column_name2 STRING,
  ...
  datetime STRING,
  `hour` STRING
) PARTITIONED BY (datetime, `hour`) WITH (
  'connector' = 'filesystem',
  'path' = 'oss://<bucket>/path',
  'format' = '...'
);

Metadata columns

You can read metadata columns in a source table to obtain metadata about the OSS data. For example, if you define a metadata column file.path in an OSS source table, the value of this column is the path of the file that contains the row data. The following example shows how to use metadata columns.

CREATE TABLE MyUserTableWithFilepath (
  column_name1 INT,
  column_name2 STRING,
  `file.path` STRING NOT NULL METADATA
) WITH (
  'connector' = 'filesystem',
  'path' = 'oss://<bucket>/path',
  'format' = 'json'
)

The following table lists the metadata columns that OSS source tables support.

Key

Data type

Description

file.path

STRING NOT NULL

The path of the file where the row data is located.

file.name

STRING NOT NULL

The name of the file where the row data is located. This is the element farthest from the file's root path.

file.size

BIGINT NOT NULL

The size in bytes of the file where the row data is located.

file.modification-time

TIMESTAMP_LTZ(3) NOT NULL

The modification time of the file where the row data is located.

WITH parameters

  • General

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    connector

    The table type.

    String

    Yes

    None

    The value must be filesystem.

    path

    The file system path.

    String

    Yes

    None

    The path must be in URI format. Example: oss://my_bucket/my_path.

    Note

    For VVR 8.0.6 and later, after you set this parameter, you must also configure bucket authentication information to read from and write to data in the specified file system path. For more information, see Configure bucket authentication information.

    format

    The file format.

    String

    Yes

    None

    Valid values:

    • csv

    • json

    • avro

    • parquet

    • orc

    • raw

  • Source tables only

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    source.monitor-interval

    The monitoring interval for new files. You must set a value greater than 0.

    Duration

    No

    None

    If this configuration item is not set, the provided path is scanned only once, and the source is bounded.

    Each file is uniquely identified by its path. When a new file is discovered, it is processed once.

    Processed files are stored in the state throughout the lifecycle of the source. Therefore, the source's state is saved during checkpoints and savepoints. A shorter interval allows for faster discovery of new files but also results in more frequent traversal of the file system or Object Storage Service.

  • Sink tables only

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    partition.default-name

    The name of the partition when the partition field value is NULL or an empty string.

    String

    No

    _DEFAULT_PARTITION__

    None.

    sink.rolling-policy.file-size

    The maximum size of a file before it is rolled.

    MemorySize

    No

    128 MB

    Data written to a directory is split into part files. Each subtask of the sink that receives data for a partition generates at least one part file for that partition. Based on the configurable rolling policy, the current in-progress part file is closed, and a new one is created. The policy rolls part files based on size and the maximum timeout duration for which a file can remain open.

    Note

    For column-store formats,

    Even if the file does not meet the configured rolling policy, it is always rolled when a checkpoint is performed.

    Therefore, if a file meets the set rolling policy or checkpointing is performed, the file is always rolled.

    For row-store formats, a file is rolled only when it meets the configured rolling policy.

    sink.rolling-policy.rollover-interval

    The maximum duration that a part file can remain open before being rolled.

    Duration

    No

    30min

    The check frequency is controlled by the sink.rolling-policy.check-interval property.

    sink.rolling-policy.check-interval

    The check interval for the time-based rolling policy.

    Duration

    No

    1min

    This property controls the check to determine if a file should be rolled based on the sink.rolling-policy.rollover-interval property.

    auto-compaction

    Specifies whether to enable the automatic compaction feature in a streaming sink table. Data is first written to temporary files. After a checkpoint is completed, the temporary files generated for that checkpoint are merged. Temporary files are not visible before they are merged.

    Boolean

    No

    false

    If you enable file compaction, multiple small files are merged into larger files based on the target file size. Note the following points when using file compaction in a production environment:

    • Only files within a checkpoint are merged. At least as many files as there are checkpoints will be generated.

    • Files are not visible before merging. The file visibility time is the checkpoint interval + merge duration.

    • A long merge time can cause backpressure and extend the time required for a checkpoint.

    compaction.file-size

    The target file size for compaction.

    MemorySize

    No

    128 MB

    The default value is the same as the rolling file size, sink.rolling-policy.file-size.

    sink.partition-commit.trigger

    The type of partition commit trigger.

    String

    No

    process-time

    For writing to partitioned tables, Flink provides two types of partition commit triggers:

    • process-time: This trigger is based on the partition creation time and the current system time. It requires neither a partition time extractor nor a watermark generator. A partition is committed immediately after the current system time exceeds the sum of the partition creation system time and the sink.partition-commit.delay. This trigger is more general but less precise. For example, data latency or failures can cause partitions to be committed too early.

    • partition-time: This trigger is based on the extracted partition time and requires watermark generation. The job must support watermark generation, and partitions are divided by time, such as by hour or by day. A partition is committed immediately after the watermark exceeds the sum of the partition creation system time and the sink.partition-commit.delay.

    sink.partition-commit.delay

    The maximum delay before a partition is committed. This indicates that the partition will not be committed before this delay has passed.

    Duration

    No

    0s

    • If partitions are created by day, set this to 1 d.

    • If partitions are created by hour, set this to 1 h.

    sink.partition-commit.watermark-time-zone

    The time zone used to parse a LONG type watermark into a TIMESTAMP type. The resulting TIMESTAMP of the watermark is compared with the partition time to determine if the partition should be committed.

    String

    No

    UTC

    This parameter is valid only when sink.partition-commit.trigger is set to partition-time.

    • If this is not set correctly, for example, if a source rowtime is defined on a TIMESTAMP_LTZ column but this property is not set, you may not see the partition committed for several hours. The default value UTC means the watermark is defined on a TIMESTAMP column or no watermark is defined.

    • If the watermark is defined on a TIMESTAMP_LTZ column, the watermark time zone must be the session time zone. The value for this property can be either a full time zone name (such as 'America/Los_Angeles') or a custom time zone (such as 'GMT-08:00').

    partition.time-extractor.kind

    The time extractor for extracting time from partition fields.

    String

    No

    default

    Valid values:

    • default: By default, you can configure a timestamp pattern or formatter.

    • custom: You must specify an extractor class.

    partition.time-extractor.class

    The extractor class that implements the PartitionTimeExtractor interface.

    String

    No

    None

    None.

    partition.time-extractor.timestamp-pattern

    The default construction method that lets you use partition fields to obtain a valid timestamp pattern.

    String

    No

    None

    By default, the first field is extracted based on the yyyy-MM-dd hh:mm:ss pattern.

    • To extract a timestamp from a partition field 'dt', you can configure: $dt.

    • To extract a timestamp from multiple partition fields, such as year, month, day, and hour, you can configure: $year-$month-$day $hour:00:00.

    • To extract a timestamp from two partition fields, dt and hour, you can configure: $dt $hour:00:00.

    partition.time-extractor.timestamp-formatter

    The formatter that converts a partition timestamp string value to a timestamp. The partition timestamp string value is expressed by the partition.time-extractor.timestamp-pattern property.

    String

    No

    yyyy-MM-dd HH:mm:ss

    For example, if a partition timestamp is extracted from multiple partition fields, such as year, month, and day, you can set the partition.time-extractor.timestamp-pattern property to $year$month$day and the partition.time-extractor.timestamp-formatter property to yyyyMMdd. The default formatter is yyyy-MM-dd HH:mm:ss. This timestamp formatter is compatible with Java's DateTimeFormatter.

    sink.partition-commit.policy.kind

    The type of partition commit policy.

    String

    No

    None

    The partition commit policy notifies a downstream consumer that a partition has been fully written and is ready to be read. Valid values:

    • success-file: Adds a _success file to the directory.

    • custom: Creates a commit policy using a specified class. You can specify multiple commit policies at the same time.

    sink.partition-commit.policy.class

    The partition commit policy class that implements the PartitionCommitPolicy interface.

    String

    No

    None

    This class can be used only with a custom commit policy.

    sink.partition-commit.success-file.name

    The name of the file when using the success-file partition commit policy.

    String

    No

    _SUCCESS

    None.

    sink.parallelism

    The parallelism for writing files to an external file system.

    Integer

    No

    None

    By default, the sink parallelism is the same as the parallelism of the upstream chained operator. If you configure a different parallelism from the upstream chained operator, the operator that writes files uses the specified sink parallelism. If file compaction is enabled, the compaction operator also uses the specified sink parallelism.

    Note

    This value must be greater than 0. Otherwise, an exception is thrown.

Configure bucket authentication information

Note

Bucket authentication information can be configured only for the real-time computing engine VVR 8.0.6 or later.

After you specify the file system path, you must also configure bucket authentication information to read data from and write data to that path. To do this, add the following code to the Other Configuration section on the Deployment Details tab of the development console.

fs.oss.bucket.<bucketName>.accessKeyId: xxxx
fs.oss.bucket.<bucketName>.accessKeySecret: xxxx

The following table describes the parameters.

Configuration item

Description

fs.oss.bucket.<bucketName>.accessKeyId

Parameter description:

  • <bucketName>: Replace this with the bucket name that you specified in the file system path parameter.

  • Use an existing AccessKey or create a new one. For more information, see Create an AccessKey.

    Note

    To reduce the risk of an AccessKey secret leak, the AccessKey secret is displayed only when it is created and cannot be viewed later. Make sure to store it securely.

fs.oss.bucket.<bucketName>.accessKeySecret

Write to OSS-HDFS

First, add the following configuration to the Other Configuration section on the Deployment Details tab of the development console:

fs.oss.jindo.buckets: xxx
fs.oss.jindo.accessKeyId: xxx
fs.oss.jindo.accessKeySecret: xxx

The following table describes the parameters.

Configuration item

Description

fs.oss.jindo.buckets

The name of the bucket in the OSS-HDFS service to which data is written. You can configure multiple bucket names, separated by semicolons. When Flink writes to an OSS path, if its corresponding bucket is included in fs.oss.jindo.buckets, the data is written to the OSS-HDFS service.

fs.oss.jindo.accessKeyId

Use an existing AccessKey or create a new one. For more information, see Create an AccessKey.

Note

To reduce the risk of an AccessKey secret leak, the AccessKey secret is displayed only when it is created and cannot be viewed later. Make sure to store it securely.

fs.oss.jindo.accessKeySecret

In addition, you must configure the endpoint for OSS-HDFS. You can configure the OSS-HDFS endpoint in one of two ways:

Parameter configuration

In the Real-time Compute for Apache Flink development console, on the Deployment Details tab, add the following configuration to the Additional Configurations section of the Runtime Parameter Settings area:

fs.oss.jindo.endpoint: xxx

Path configuration

Configure the OSS-HDFS endpoint in the OSS path.

oss://<user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>/<user-defined-dir>
  • user-defined-oss-hdfs-bucket: The name of the bucket.

  • oss-hdfs-endpoint: The endpoint of the OSS-HDFS service.

  • The fs.oss.jindo.buckets configuration item must include <user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>.

For example, if the bucket name is jindo-test and the endpoint is cn-beijing.oss-dls.aliyuncs.com, the OSS path must be oss://jindo-test.cn-beijing.oss-dls.aliyuncs.com/<user-defined-dir>, and the fs.oss.jindo.buckets configuration item must include jindo-test.cn-beijing.oss-dls.aliyuncs.com.

# OSS path 
oss://jindo-test.cn-beijing.oss-dls.aliyuncs.com/<user-defined-dir>
# Other configuration 
fs.oss.jindo.buckets: jindo-test,jindo-test.cn-beijing.oss-dls.aliyuncs.com
Note

When you write to external HDFS files where the file path is hdfs://**, you can also add the following configuration to specify or change the username for access.

Add the following configuration to the Other Configuration section on the Deployment Details tab of the development console:

containerized.taskmanager.env.HADOOP_USER_NAME: hdfs
containerized.master.env.HADOOP_USER_NAME: hdfs

Usage examples

  • Source table

    CREATE TEMPORARY TABLE fs_table_source (
      `id` INT,
      `name` VARCHAR
    ) WITH (
      'connector'='filesystem',
      'path'='oss://<bucket>/path',
      'format'='parquet'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      `id` INT,
      `name` VARCHAR
    ) with (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink SELECT * FROM fs_table_source ;
  • Sink table

    • Write to a partitioned table

      CREATE TABLE datagen_source (
        user_id STRING,
        order_amount DOUBLE,
        ts BIGINT, -- Time in milliseconds
        ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
        WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- Define a watermark on the TIMESTAMP_LTZ column
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE fs_table_sink (
        user_id STRING,
        order_amount DOUBLE,
        dt STRING,
        `hour` STRING
      ) PARTITIONED BY (dt, `hour`) WITH (
        'connector'='filesystem',
        'path'='oss://<bucket>/path',
        'format'='parquet',
        'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',
        'sink.partition-commit.delay'='1 h',
        'sink.partition-commit.trigger'='partition-time',
        'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- Assume that the user-configured time zone is 'Asia/Shanghai'
        'sink.partition-commit.policy.kind'='success-file'
      );
      
      -- Streaming SQL to insert into the file system table
      INSERT INTO fs_table_sink 
      SELECT 
        user_id, 
        order_amount, 
        DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'),
        DATE_FORMAT(ts_ltz, 'HH') 
      FROM datagen_source;
    • Write to a non-partitioned table

      CREATE TABLE datagen_source (
        user_id STRING,
        order_amount DOUBLE
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE fs_table_sink (
        user_id STRING,
        order_amount DOUBLE
      ) WITH (
        'connector'='filesystem',
        'path'='oss://<bucket>/path',
        'format'='parquet'
      );
      
      INSERT INTO fs_table_sink SELECT * FROM datagen_source;

DataStream API

Important

To read or write data using the DataStream API, you must use the corresponding DataStream connector to connect to Flink. For more information, see How to use DataStream connectors.

The following example shows how to use the DataStream API to write to OSS and OSS-HDFS:

String outputPath = "oss://<bucket>/path"
final StreamingFileSink<Row> sink =
                StreamingFileSink.forRowFormat(
                                new Path(outputPath),
                                (Encoder<Row>)
                                        (element, stream) -> {
                                            out.println(element.toString());
                                        })
                        .withRollingPolicy(OnCheckpointRollingPolicy.build())
                        .build();

outputStream.addSink(sink);

To write to OSS-HDFS, you must also configure the relevant OSS-HDFS parameters in the Other Configuration field. This field is in the Running Parameter Configuration area on the Deployment Details tab of the Real-time Compute for Apache Flink development console. For more information, see Write to OSS-HDFS.

References