All Products
Search
Document Center

Realtime Compute for Apache Flink:Object Storage Service

Last Updated:Mar 26, 2026

Realtime Compute for Apache Flink supports reading from and writing to Object Storage Service (OSS) using the filesystem connector. OSS provides 99.9999999999% (twelve 9s) data durability and 99.995% availability, making it a reliable store for large-scale Flink pipelines.

Category Details
Supported table types Source and sink tables
Execution modes Batch and stream
Data formats ORC, Parquet, Avro, CSV, JSON, and raw
Specific monitoring metrics None
API types DataStream API and SQL
Updating or deleting data in sink tables Insert only. Update and delete are not supported.

Limitations

General:

  • Only Ververica Runtime (VVR) 11 and later support reading compressed files (GZIP, BZIP2, XZ, DEFLATE) from OSS. VVR 8 cannot process compressed files correctly.

  • VVR versions earlier than 8.0.6 only support OSS buckets in the same account. To access buckets across accounts, use VVR 8.0.6 or later and configure bucket authentication. For details, see Configure bucket authentication.

  • Incremental reading of new partitions is not supported.

Sink tables only:

Row formats — Avro, CSV, JSON, and raw — are not supported when writing to OSS. See FLINK-30635 for details.

Syntax

CREATE TABLE OssTable (
  column_name1 INT,
  column_name2 STRING,
  ...
  datetime STRING,
  `hour` STRING
) PARTITIONED BY (datetime, `hour`) WITH (
  'connector' = 'filesystem',           -- required: must be 'filesystem'
  'path' = 'oss://<bucket>/path',       -- required: URI of the OSS path
  'format' = '...',                     -- required: orc, parquet, avro, csv, json, or raw
  'partition.default-name' = '...',     -- optional: partition name when partition field is NULL or empty
  'source.monitor-interval' = '...',    -- optional (source only): interval to scan for new files
  'auto-compaction' = '...'            -- optional (sink only): enable automatic compaction after each checkpoint
);

Metadata columns

Source tables support metadata columns that expose file-level information about each row. Define a metadata column in your DDL by adding METADATA after the data type:

CREATE TABLE MyUserTableWithFilepath (
  column_name1 INT,
  column_name2 STRING,
  `file.path` STRING NOT NULL METADATA
) WITH (
  'connector' = 'filesystem',
  'path' = 'oss://<bucket>/path',
  'format' = 'json'
)

The following metadata columns are available:

Key Data type Description
file.path STRING NOT NULL Full path of the file containing the row.
file.name STRING NOT NULL File name (last element of the path).
file.size BIGINT NOT NULL File size, in bytes.
file.modification-time TIMESTAMP_LTZ(3) NOT NULL Last modification time of the file.

WITH parameters

General parameters

Parameter Required Default Description
connector Yes Must be filesystem.
path Yes OSS path in URI format, such as oss://my_bucket/my_path. For VVR 8.0.6 and later, bucket authentication is required after setting this parameter. See Configure bucket authentication.
format Yes File format: csv, json, avro, parquet, orc, or raw.

Source table parameters

Parameter Required Default Description
source.monitor-interval No Interval to scan for new files. Must be greater than 0. If not set, the path is scanned once and the source is bounded. Each file is identified by its path and processed exactly once. Processed file paths are stored in state and persisted across checkpoints and savepoints. A shorter interval speeds up file discovery but increases scan frequency.

Sink table parameters

Parameter Required Default Description
partition.default-name No _DEFAULT_PARTITION__ Partition name used when a partition field is NULL or an empty string.
sink.rolling-policy.file-size No 128 MB Maximum part file size before rolling. Each sink subtask creates at least one part file per partition. See Rolling policy behavior for how this interacts with file formats.
sink.rolling-policy.rollover-interval No 30min Maximum time a part file can stay open before rolling. Check frequency is controlled by sink.rolling-policy.check-interval.
sink.rolling-policy.check-interval No 1min How often to check whether a part file should be rolled based on sink.rolling-policy.rollover-interval.
auto-compaction No false Whether to enable automatic compaction. Data is first written to temporary files. After each checkpoint, temporary files from that checkpoint are merged into larger files. Temporary files are not visible before merging. When enabled: only files within a checkpoint are merged (at least one file per checkpoint); data visibility latency equals checkpoint interval + compaction duration; long compaction runs can cause backpressure and delay checkpoints.
compaction.file-size No 128 MB Target file size for compacted output. Defaults to the same value as sink.rolling-policy.file-size.
sink.partition-commit.trigger No process-time When to commit a partition. See Partition commit triggers.
sink.partition-commit.delay No 0s Minimum delay before committing a partition. Set to 1 d for daily partitions, 1 h for hourly partitions.
sink.partition-commit.watermark-time-zone No UTC Time zone for parsing a LONG watermark into a TIMESTAMP for partition commit comparison. Only applies when sink.partition-commit.trigger is partition-time. Use the session time zone when the watermark is defined on a TIMESTAMP_LTZ column (for example, Asia/Shanghai). Accepts full time zone names (such as America/Los_Angeles) or custom offsets (such as GMT-08:00). If not configured correctly, partition commits may be delayed by several hours.
partition.time-extractor.kind No default How to extract time from partition fields. default: configure a timestamp pattern or formatter. custom: specify an extractor class.
partition.time-extractor.class No Class that implements the PartitionTimeExtractor interface. Required when partition.time-extractor.kind is custom.
partition.time-extractor.timestamp-pattern No Pattern for constructing a timestamp from partition fields. By default, the first field is extracted using yyyy-MM-dd hh:mm:ss. Examples: $dt (single field), $year-$month-$day $hour:00:00 (multiple fields), $dt $hour:00:00 (two fields).
partition.time-extractor.timestamp-formatter No yyyy-MM-dd HH:mm:ss Formatter for converting the timestamp string (as expressed by partition.time-extractor.timestamp-pattern) to a timestamp. For example, if partition.time-extractor.timestamp-pattern is $year$month$day, set this to yyyyMMdd. Compatible with Java's DateTimeFormatter.
sink.partition-commit.policy.kind No How to notify downstream consumers that a partition is ready. success-file: writes a _SUCCESS file to the partition directory. custom: uses a class implementing PartitionCommitPolicy. Multiple policies can be combined.
sink.partition-commit.policy.class No Class that implements PartitionCommitPolicy. Required when sink.partition-commit.policy.kind is custom.
sink.partition-commit.success-file.name No _SUCCESS Name of the success file written by the success-file commit policy.
sink.parallelism No Parallelism for the file-writing operator. Defaults to the upstream operator's parallelism. Must be greater than 0. When auto-compaction is enabled, the compaction operator also uses this parallelism.

Rolling policy behavior

Rolling behavior differs by file format:

For columnar formats (Parquet, ORC, Avro), a part file is always rolled at checkpoint time, even if the rolling policy criteria are not met. File size and rollover interval apply as additional triggers between checkpoints.
For row formats (CSV, JSON, raw), a part file is rolled only when the rolling policy criteria (sink.rolling-policy.file-size or sink.rolling-policy.rollover-interval) are met. If you need low-latency file visibility, tune sink.rolling-policy.rollover-interval together with your checkpoint interval.
Row formats are not supported for OSS sink tables due to FLINK-30635. The above behavior applies if row format support is added in a future version.

Partition commit triggers

Two trigger types are available for sink.partition-commit.trigger:

  • `process-time` (default): Commits a partition when the current system time exceeds the partition creation time plus sink.partition-commit.delay. Does not require a watermark or partition time extractor. More general but less precise — data delays or failures can cause premature commits.

  • `partition-time`: Commits a partition when the watermark exceeds the partition creation time plus sink.partition-commit.delay. Requires watermark generation and time-based partitions (hourly, daily, and so on).

Configure bucket authentication

Only VVR 8.0.6 and later support bucket authentication.

After setting the path parameter, configure bucket authentication so Flink can read from and write to the specified OSS path. Add the following to the Additional Configurations section on the Parameters tab of the Deployment Details page in the Realtime Compute development console:

fs.oss.bucket.<bucketName>.accessKeyId: <your-access-key-id>
fs.oss.bucket.<bucketName>.accessKeySecret: <your-access-key-secret>

Replace <bucketName> with the bucket name used in the path parameter.

Configuration item Description
fs.oss.bucket.<bucketName>.accessKeyId AccessKey ID for the bucket. Use an existing AccessKey or create one. See Create an AccessKey.
fs.oss.bucket.<bucketName>.accessKeySecret AccessKey Secret for the bucket.
Important

The AccessKey Secret is shown only once when created. Store it securely.

Write to OSS-HDFS

Add the following configuration to the Additional Configurations section on the Parameters tab of the Deployment Details page in the Realtime Compute development console:

fs.oss.jindo.buckets: <bucket-names>
fs.oss.jindo.accessKeyId: <your-access-key-id>
fs.oss.jindo.accessKeySecret: <your-access-key-secret>
Configuration item Description
fs.oss.jindo.buckets OSS-HDFS bucket names, separated by semicolons. When Flink writes to an OSS path, if the corresponding bucket is listed here, data is written to the OSS-HDFS service.
fs.oss.jindo.accessKeyId AccessKey ID. See Create an AccessKey.
fs.oss.jindo.accessKeySecret AccessKey Secret.
Important

The AccessKey Secret is shown only once when created. Store it securely.

Configure the OSS-HDFS endpoint using one of the following methods:

Parameter configuration

Add the endpoint to Additional Configurations:

fs.oss.jindo.endpoint: <oss-hdfs-endpoint>

Path configuration

Embed the endpoint directly in the OSS path:

oss://<bucket-name>.<oss-hdfs-endpoint>/<directory>

When using this method, fs.oss.jindo.buckets must include <bucket-name>.<oss-hdfs-endpoint>.

For example, if the bucket name is jindo-test and the endpoint is cn-beijing.oss-dls.aliyuncs.com:

# OSS path
oss://jindo-test.cn-beijing.oss-dls.aliyuncs.com/<directory>

# Additional Configurations
fs.oss.jindo.buckets: jindo-test,jindo-test.cn-beijing.oss-dls.aliyuncs.com

Writing to an external Hadoop Distributed File System (HDFS)

For paths using the hdfs:// scheme, add the following to specify or switch the access username:

containerized.taskmanager.env.HADOOP_USER_NAME: hdfs
containerized.master.env.HADOOP_USER_NAME: hdfs

Examples

Read from OSS (source table)

CREATE TEMPORARY TABLE fs_table_source (
  `id` INT,
  `name` VARCHAR
) WITH (
  'connector' = 'filesystem',
  'path' = 'oss://<bucket>/path',
  'format' = 'parquet'
);

CREATE TEMPORARY TABLE blackhole_sink (
  `id` INT,
  `name` VARCHAR
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink SELECT * FROM fs_table_source;

Write to OSS (sink table)

Write to a partitioned table

This example streams data from a datagen source, partitions it by date and hour, and commits partitions using the partition-time trigger:

CREATE TABLE datagen_source (
  user_id STRING,
  order_amount DOUBLE,
  ts BIGINT,                                               -- Timestamp in milliseconds
  ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
  WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND    -- Watermark on TIMESTAMP_LTZ column
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE fs_table_sink (
  user_id STRING,
  order_amount DOUBLE,
  dt STRING,
  `hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
  'connector' = 'filesystem',
  'path' = 'oss://<bucket>/path',
  'format' = 'parquet',
  'partition.time-extractor.timestamp-pattern' = '$dt $hour:00:00',
  'sink.partition-commit.delay' = '1 h',
  'sink.partition-commit.trigger' = 'partition-time',
  'sink.partition-commit.watermark-time-zone' = 'Asia/Shanghai',
  'sink.partition-commit.policy.kind' = 'success-file'
);

INSERT INTO fs_table_sink
SELECT
  user_id,
  order_amount,
  DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'),
  DATE_FORMAT(ts_ltz, 'HH')
FROM datagen_source;

Write to a non-partitioned table

CREATE TABLE datagen_source (
  user_id STRING,
  order_amount DOUBLE
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE fs_table_sink (
  user_id STRING,
  order_amount DOUBLE
) WITH (
  'connector' = 'filesystem',
  'path' = 'oss://<bucket>/path',
  'format' = 'parquet'
);

INSERT INTO fs_table_sink SELECT * FROM datagen_source;

DataStream API

Important

To use the DataStream API, set up the DataStream connector first. See Use a DataStream connector.

The following example uses StreamingFileSink with OnCheckpointRollingPolicy to write to OSS. Part files are rolled on every checkpoint.

String outputPath = "oss://<bucket>/path";

final StreamingFileSink<Row> sink =
    StreamingFileSink.forRowFormat(
            new Path(outputPath),
            (Encoder<Row>) (element, stream) -> {
                out.println(element.toString());
            })
        .withRollingPolicy(OnCheckpointRollingPolicy.build())
        .build();

outputStream.addSink(sink);

To write to OSS-HDFS, also configure the OSS-HDFS parameters in Additional Configurations. See Write to OSS-HDFS.

What's next