Realtime Compute for Apache Flink supports reading from and writing to Object Storage Service (OSS) using the filesystem connector. OSS provides 99.9999999999% (twelve 9s) data durability and 99.995% availability, making it a reliable store for large-scale Flink pipelines.
| Category | Details |
|---|---|
| Supported table types | Source and sink tables |
| Execution modes | Batch and stream |
| Data formats | ORC, Parquet, Avro, CSV, JSON, and raw |
| Specific monitoring metrics | None |
| API types | DataStream API and SQL |
| Updating or deleting data in sink tables | Insert only. Update and delete are not supported. |
Limitations
General:
-
Only Ververica Runtime (VVR) 11 and later support reading compressed files (GZIP, BZIP2, XZ, DEFLATE) from OSS. VVR 8 cannot process compressed files correctly.
-
VVR versions earlier than 8.0.6 only support OSS buckets in the same account. To access buckets across accounts, use VVR 8.0.6 or later and configure bucket authentication. For details, see Configure bucket authentication.
-
Incremental reading of new partitions is not supported.
Sink tables only:
Row formats — Avro, CSV, JSON, and raw — are not supported when writing to OSS. See FLINK-30635 for details.
Syntax
CREATE TABLE OssTable (
column_name1 INT,
column_name2 STRING,
...
datetime STRING,
`hour` STRING
) PARTITIONED BY (datetime, `hour`) WITH (
'connector' = 'filesystem', -- required: must be 'filesystem'
'path' = 'oss://<bucket>/path', -- required: URI of the OSS path
'format' = '...', -- required: orc, parquet, avro, csv, json, or raw
'partition.default-name' = '...', -- optional: partition name when partition field is NULL or empty
'source.monitor-interval' = '...', -- optional (source only): interval to scan for new files
'auto-compaction' = '...' -- optional (sink only): enable automatic compaction after each checkpoint
);
Metadata columns
Source tables support metadata columns that expose file-level information about each row. Define a metadata column in your DDL by adding METADATA after the data type:
CREATE TABLE MyUserTableWithFilepath (
column_name1 INT,
column_name2 STRING,
`file.path` STRING NOT NULL METADATA
) WITH (
'connector' = 'filesystem',
'path' = 'oss://<bucket>/path',
'format' = 'json'
)
The following metadata columns are available:
| Key | Data type | Description |
|---|---|---|
file.path |
STRING NOT NULL | Full path of the file containing the row. |
file.name |
STRING NOT NULL | File name (last element of the path). |
file.size |
BIGINT NOT NULL | File size, in bytes. |
file.modification-time |
TIMESTAMP_LTZ(3) NOT NULL | Last modification time of the file. |
WITH parameters
General parameters
| Parameter | Required | Default | Description |
|---|---|---|---|
connector |
Yes | — | Must be filesystem. |
path |
Yes | — | OSS path in URI format, such as oss://my_bucket/my_path. For VVR 8.0.6 and later, bucket authentication is required after setting this parameter. See Configure bucket authentication. |
format |
Yes | — | File format: csv, json, avro, parquet, orc, or raw. |
Source table parameters
| Parameter | Required | Default | Description |
|---|---|---|---|
source.monitor-interval |
No | — | Interval to scan for new files. Must be greater than 0. If not set, the path is scanned once and the source is bounded. Each file is identified by its path and processed exactly once. Processed file paths are stored in state and persisted across checkpoints and savepoints. A shorter interval speeds up file discovery but increases scan frequency. |
Sink table parameters
| Parameter | Required | Default | Description |
|---|---|---|---|
partition.default-name |
No | _DEFAULT_PARTITION__ |
Partition name used when a partition field is NULL or an empty string. |
sink.rolling-policy.file-size |
No | 128 MB | Maximum part file size before rolling. Each sink subtask creates at least one part file per partition. See Rolling policy behavior for how this interacts with file formats. |
sink.rolling-policy.rollover-interval |
No | 30min | Maximum time a part file can stay open before rolling. Check frequency is controlled by sink.rolling-policy.check-interval. |
sink.rolling-policy.check-interval |
No | 1min | How often to check whether a part file should be rolled based on sink.rolling-policy.rollover-interval. |
auto-compaction |
No | false | Whether to enable automatic compaction. Data is first written to temporary files. After each checkpoint, temporary files from that checkpoint are merged into larger files. Temporary files are not visible before merging. When enabled: only files within a checkpoint are merged (at least one file per checkpoint); data visibility latency equals checkpoint interval + compaction duration; long compaction runs can cause backpressure and delay checkpoints. |
compaction.file-size |
No | 128 MB | Target file size for compacted output. Defaults to the same value as sink.rolling-policy.file-size. |
sink.partition-commit.trigger |
No | process-time |
When to commit a partition. See Partition commit triggers. |
sink.partition-commit.delay |
No | 0s |
Minimum delay before committing a partition. Set to 1 d for daily partitions, 1 h for hourly partitions. |
sink.partition-commit.watermark-time-zone |
No | UTC |
Time zone for parsing a LONG watermark into a TIMESTAMP for partition commit comparison. Only applies when sink.partition-commit.trigger is partition-time. Use the session time zone when the watermark is defined on a TIMESTAMP_LTZ column (for example, Asia/Shanghai). Accepts full time zone names (such as America/Los_Angeles) or custom offsets (such as GMT-08:00). If not configured correctly, partition commits may be delayed by several hours. |
partition.time-extractor.kind |
No | default |
How to extract time from partition fields. default: configure a timestamp pattern or formatter. custom: specify an extractor class. |
partition.time-extractor.class |
No | — | Class that implements the PartitionTimeExtractor interface. Required when partition.time-extractor.kind is custom. |
partition.time-extractor.timestamp-pattern |
No | — | Pattern for constructing a timestamp from partition fields. By default, the first field is extracted using yyyy-MM-dd hh:mm:ss. Examples: $dt (single field), $year-$month-$day $hour:00:00 (multiple fields), $dt $hour:00:00 (two fields). |
partition.time-extractor.timestamp-formatter |
No | yyyy-MM-dd HH:mm:ss |
Formatter for converting the timestamp string (as expressed by partition.time-extractor.timestamp-pattern) to a timestamp. For example, if partition.time-extractor.timestamp-pattern is $year$month$day, set this to yyyyMMdd. Compatible with Java's DateTimeFormatter. |
sink.partition-commit.policy.kind |
No | — | How to notify downstream consumers that a partition is ready. success-file: writes a _SUCCESS file to the partition directory. custom: uses a class implementing PartitionCommitPolicy. Multiple policies can be combined. |
sink.partition-commit.policy.class |
No | — | Class that implements PartitionCommitPolicy. Required when sink.partition-commit.policy.kind is custom. |
sink.partition-commit.success-file.name |
No | _SUCCESS |
Name of the success file written by the success-file commit policy. |
sink.parallelism |
No | — | Parallelism for the file-writing operator. Defaults to the upstream operator's parallelism. Must be greater than 0. When auto-compaction is enabled, the compaction operator also uses this parallelism. |
Rolling policy behavior
Rolling behavior differs by file format:
For columnar formats (Parquet, ORC, Avro), a part file is always rolled at checkpoint time, even if the rolling policy criteria are not met. File size and rollover interval apply as additional triggers between checkpoints.
For row formats (CSV, JSON, raw), a part file is rolled only when the rolling policy criteria (sink.rolling-policy.file-sizeorsink.rolling-policy.rollover-interval) are met. If you need low-latency file visibility, tunesink.rolling-policy.rollover-intervaltogether with your checkpoint interval.
Row formats are not supported for OSS sink tables due to FLINK-30635. The above behavior applies if row format support is added in a future version.
Partition commit triggers
Two trigger types are available for sink.partition-commit.trigger:
-
`process-time` (default): Commits a partition when the current system time exceeds the partition creation time plus
sink.partition-commit.delay. Does not require a watermark or partition time extractor. More general but less precise — data delays or failures can cause premature commits. -
`partition-time`: Commits a partition when the watermark exceeds the partition creation time plus
sink.partition-commit.delay. Requires watermark generation and time-based partitions (hourly, daily, and so on).
Configure bucket authentication
Only VVR 8.0.6 and later support bucket authentication.
After setting the path parameter, configure bucket authentication so Flink can read from and write to the specified OSS path. Add the following to the Additional Configurations section on the Parameters tab of the Deployment Details page in the Realtime Compute development console:
fs.oss.bucket.<bucketName>.accessKeyId: <your-access-key-id>
fs.oss.bucket.<bucketName>.accessKeySecret: <your-access-key-secret>
Replace <bucketName> with the bucket name used in the path parameter.
| Configuration item | Description |
|---|---|
fs.oss.bucket.<bucketName>.accessKeyId |
AccessKey ID for the bucket. Use an existing AccessKey or create one. See Create an AccessKey. |
fs.oss.bucket.<bucketName>.accessKeySecret |
AccessKey Secret for the bucket. |
The AccessKey Secret is shown only once when created. Store it securely.
Write to OSS-HDFS
Add the following configuration to the Additional Configurations section on the Parameters tab of the Deployment Details page in the Realtime Compute development console:
fs.oss.jindo.buckets: <bucket-names>
fs.oss.jindo.accessKeyId: <your-access-key-id>
fs.oss.jindo.accessKeySecret: <your-access-key-secret>
| Configuration item | Description |
|---|---|
fs.oss.jindo.buckets |
OSS-HDFS bucket names, separated by semicolons. When Flink writes to an OSS path, if the corresponding bucket is listed here, data is written to the OSS-HDFS service. |
fs.oss.jindo.accessKeyId |
AccessKey ID. See Create an AccessKey. |
fs.oss.jindo.accessKeySecret |
AccessKey Secret. |
The AccessKey Secret is shown only once when created. Store it securely.
Configure the OSS-HDFS endpoint using one of the following methods:
Parameter configuration
Add the endpoint to Additional Configurations:
fs.oss.jindo.endpoint: <oss-hdfs-endpoint>
Path configuration
Embed the endpoint directly in the OSS path:
oss://<bucket-name>.<oss-hdfs-endpoint>/<directory>
When using this method, fs.oss.jindo.buckets must include <bucket-name>.<oss-hdfs-endpoint>.
For example, if the bucket name is jindo-test and the endpoint is cn-beijing.oss-dls.aliyuncs.com:
# OSS path
oss://jindo-test.cn-beijing.oss-dls.aliyuncs.com/<directory>
# Additional Configurations
fs.oss.jindo.buckets: jindo-test,jindo-test.cn-beijing.oss-dls.aliyuncs.com
Writing to an external Hadoop Distributed File System (HDFS)
For paths using the hdfs:// scheme, add the following to specify or switch the access username:
containerized.taskmanager.env.HADOOP_USER_NAME: hdfs
containerized.master.env.HADOOP_USER_NAME: hdfs
Examples
Read from OSS (source table)
CREATE TEMPORARY TABLE fs_table_source (
`id` INT,
`name` VARCHAR
) WITH (
'connector' = 'filesystem',
'path' = 'oss://<bucket>/path',
'format' = 'parquet'
);
CREATE TEMPORARY TABLE blackhole_sink (
`id` INT,
`name` VARCHAR
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink SELECT * FROM fs_table_source;
Write to OSS (sink table)
Write to a partitioned table
This example streams data from a datagen source, partitions it by date and hour, and commits partitions using the partition-time trigger:
CREATE TABLE datagen_source (
user_id STRING,
order_amount DOUBLE,
ts BIGINT, -- Timestamp in milliseconds
ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- Watermark on TIMESTAMP_LTZ column
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE fs_table_sink (
user_id STRING,
order_amount DOUBLE,
dt STRING,
`hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
'connector' = 'filesystem',
'path' = 'oss://<bucket>/path',
'format' = 'parquet',
'partition.time-extractor.timestamp-pattern' = '$dt $hour:00:00',
'sink.partition-commit.delay' = '1 h',
'sink.partition-commit.trigger' = 'partition-time',
'sink.partition-commit.watermark-time-zone' = 'Asia/Shanghai',
'sink.partition-commit.policy.kind' = 'success-file'
);
INSERT INTO fs_table_sink
SELECT
user_id,
order_amount,
DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'),
DATE_FORMAT(ts_ltz, 'HH')
FROM datagen_source;
Write to a non-partitioned table
CREATE TABLE datagen_source (
user_id STRING,
order_amount DOUBLE
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE fs_table_sink (
user_id STRING,
order_amount DOUBLE
) WITH (
'connector' = 'filesystem',
'path' = 'oss://<bucket>/path',
'format' = 'parquet'
);
INSERT INTO fs_table_sink SELECT * FROM datagen_source;
DataStream API
To use the DataStream API, set up the DataStream connector first. See Use a DataStream connector.
The following example uses StreamingFileSink with OnCheckpointRollingPolicy to write to OSS. Part files are rolled on every checkpoint.
String outputPath = "oss://<bucket>/path";
final StreamingFileSink<Row> sink =
StreamingFileSink.forRowFormat(
new Path(outputPath),
(Encoder<Row>) (element, stream) -> {
out.println(element.toString());
})
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build();
outputStream.addSink(sink);
To write to OSS-HDFS, also configure the OSS-HDFS parameters in Additional Configurations. See Write to OSS-HDFS.