This topic describes how to use the Object Storage Service (OSS) connector.
Alibaba Cloud Object Storage Service (OSS) is a highly reliable, secure, and cost-effective cloud storage service that provides massive storage capacity. It is designed to deliver 99.9999999999% (twelve 9s) data durability and 99.995% data availability. You can select from multiple storage classes to optimize your storage costs.
Category | Details |
Supported types | Source and sink tables |
Running modes | Batch and streaming modes |
Data formats | Orc, Parquet, Avro, Csv, JSON, and Raw |
Specific monitoring metrics | None |
API types | Datastream and SQL |
Update or delete data in sink tables | Updating and deleting data in sink tables is not supported. You can only insert data. |
Limits
General
Only Ververica Runtime (VVR) 11 and later versions support reading compressed files (GZIP, BZIP2, XZ, and DEFLATE) from OSS. VVR 8 cannot correctly process compressed files.
Flink compute engine versions earlier than VVR 8.0.6 can read from or write to OSS only within the same account. To read from or write to OSS in other accounts, you must use a Flink compute engine of VVR 8.0.6 or later and configure bucket authentication information. For more information, see Configure bucket authentication information.
Sink tables only
When writing to OSS, row-based formats such as Avro, CSV, JSON, and Raw are not supported. For more information, see FLINK-30635.
Syntax
CREATE TABLE OssTable (
column_name1 INT,
column_name2 STRING,
...
datetime STRING,
`hour` STRING
) PARTITIONED BY (datetime, `hour`) WITH (
'connector' = 'filesystem',
'path' = 'oss://<bucket>/path',
'format' = '...'
);Metadata columns
You can read metadata columns in a source table to obtain metadata about the OSS data. For example, if you define a metadata column file.path in an OSS source table, the value of this column is the path of the file that contains the row data. The following example shows how to use metadata columns.
CREATE TABLE MyUserTableWithFilepath (
column_name1 INT,
column_name2 STRING,
`file.path` STRING NOT NULL METADATA
) WITH (
'connector' = 'filesystem',
'path' = 'oss://<bucket>/path',
'format' = 'json'
)The following table lists the metadata columns that OSS source tables support.
Key | Data type | Description |
file.path | STRING NOT NULL | The path of the file where the row data is located. |
file.name | STRING NOT NULL | The name of the file where the row data is located. This is the element farthest from the file's root path. |
file.size | BIGINT NOT NULL | The size in bytes of the file where the row data is located. |
file.modification-time | TIMESTAMP_LTZ(3) NOT NULL | The modification time of the file where the row data is located. |
WITH parameters
General
Parameter
Description
Data type
Required
Default value
Remarks
connector
The table type.
String
Yes
None
The value must be
filesystem.path
The file system path.
String
Yes
None
The path must be in URI format. Example:
oss://my_bucket/my_path.NoteFor VVR 8.0.6 and later, after you set this parameter, you must also configure bucket authentication information to read from and write to data in the specified file system path. For more information, see Configure bucket authentication information.
format
The file format.
String
Yes
None
Valid values:
csv
json
avro
parquet
orc
raw
Source tables only
Parameter
Description
Data type
Required
Default value
Remarks
source.monitor-interval
The monitoring interval for new files. You must set a value greater than 0.
Duration
No
None
If this configuration item is not set, the provided path is scanned only once, and the source is bounded.
Each file is uniquely identified by its path. When a new file is discovered, it is processed once.
Processed files are stored in the state throughout the lifecycle of the source. Therefore, the source's state is saved during checkpoints and savepoints. A shorter interval allows for faster discovery of new files but also results in more frequent traversal of the file system or Object Storage Service.
Sink tables only
Parameter
Description
Data type
Required
Default value
Remarks
partition.default-name
The name of the partition when the partition field value is NULL or an empty string.
String
No
_DEFAULT_PARTITION__
None.
sink.rolling-policy.file-size
The maximum size of a file before it is rolled.
MemorySize
No
128 MB
Data written to a directory is split into part files. Each subtask of the sink that receives data for a partition generates at least one part file for that partition. Based on the configurable rolling policy, the current in-progress part file is closed, and a new one is created. The policy rolls part files based on size and the maximum timeout duration for which a file can remain open.
NoteFor column-store formats,
Even if the file does not meet the configured rolling policy, it is always rolled when a checkpoint is performed.
Therefore, if a file meets the set rolling policy or checkpointing is performed, the file is always rolled.
For row-store formats, a file is rolled only when it meets the configured rolling policy.
sink.rolling-policy.rollover-interval
The maximum duration that a part file can remain open before being rolled.
Duration
No
30min
The check frequency is controlled by the sink.rolling-policy.check-interval property.
sink.rolling-policy.check-interval
The check interval for the time-based rolling policy.
Duration
No
1min
This property controls the check to determine if a file should be rolled based on the sink.rolling-policy.rollover-interval property.
auto-compaction
Specifies whether to enable the automatic compaction feature in a streaming sink table. Data is first written to temporary files. After a checkpoint is completed, the temporary files generated for that checkpoint are merged. Temporary files are not visible before they are merged.
Boolean
No
false
If you enable file compaction, multiple small files are merged into larger files based on the target file size. Note the following points when using file compaction in a production environment:
Only files within a checkpoint are merged. At least as many files as there are checkpoints will be generated.
Files are not visible before merging. The file visibility time is the
checkpoint interval + merge duration.A long merge time can cause backpressure and extend the time required for a checkpoint.
compaction.file-size
The target file size for compaction.
MemorySize
No
128 MB
The default value is the same as the rolling file size, sink.rolling-policy.file-size.
sink.partition-commit.trigger
The type of partition commit trigger.
String
No
process-time
For writing to partitioned tables, Flink provides two types of partition commit triggers:
process-time: This trigger is based on the partition creation time and the current system time. It requires neither a partition time extractor nor a watermark generator. A partition is committed immediately after the current system time exceeds the sum of the partition creation system time and the sink.partition-commit.delay. This trigger is more general but less precise. For example, data latency or failures can cause partitions to be committed too early.
partition-time: This trigger is based on the extracted partition time and requires watermark generation. The job must support watermark generation, and partitions are divided by time, such as by hour or by day. A partition is committed immediately after the watermark exceeds the sum of the partition creation system time and the sink.partition-commit.delay.
sink.partition-commit.delay
The maximum delay before a partition is committed. This indicates that the partition will not be committed before this delay has passed.
Duration
No
0s
If partitions are created by day, set this to
1 d.If partitions are created by hour, set this to
1 h.
sink.partition-commit.watermark-time-zone
The time zone used to parse a LONG type watermark into a TIMESTAMP type. The resulting TIMESTAMP of the watermark is compared with the partition time to determine if the partition should be committed.
String
No
UTC
This parameter is valid only when sink.partition-commit.trigger is set to partition-time.
If this is not set correctly, for example, if a source rowtime is defined on a TIMESTAMP_LTZ column but this property is not set, you may not see the partition committed for several hours. The default value UTC means the watermark is defined on a TIMESTAMP column or no watermark is defined.
If the watermark is defined on a TIMESTAMP_LTZ column, the watermark time zone must be the session time zone. The value for this property can be either a full time zone name (such as 'America/Los_Angeles') or a custom time zone (such as 'GMT-08:00').
partition.time-extractor.kind
The time extractor for extracting time from partition fields.
String
No
default
Valid values:
default: By default, you can configure a timestamp pattern or formatter.
custom: You must specify an extractor class.
partition.time-extractor.class
The extractor class that implements the PartitionTimeExtractor interface.
String
No
None
None.
partition.time-extractor.timestamp-pattern
The default construction method that lets you use partition fields to obtain a valid timestamp pattern.
String
No
None
By default, the first field is extracted based on the
yyyy-MM-dd hh:mm:sspattern.To extract a timestamp from a partition field 'dt', you can configure: $dt.
To extract a timestamp from multiple partition fields, such as year, month, day, and hour, you can configure:
$year-$month-$day $hour:00:00.To extract a timestamp from two partition fields, dt and hour, you can configure:
$dt $hour:00:00.
partition.time-extractor.timestamp-formatter
The formatter that converts a partition timestamp string value to a timestamp. The partition timestamp string value is expressed by the partition.time-extractor.timestamp-pattern property.
String
No
yyyy-MM-dd HH:mm:ss
For example, if a partition timestamp is extracted from multiple partition fields, such as year, month, and day, you can set the partition.time-extractor.timestamp-pattern property to
$year$month$dayand the partition.time-extractor.timestamp-formatter property to yyyyMMdd. The default formatter isyyyy-MM-dd HH:mm:ss. This timestamp formatter is compatible with Java's DateTimeFormatter.sink.partition-commit.policy.kind
The type of partition commit policy.
String
No
None
The partition commit policy notifies a downstream consumer that a partition has been fully written and is ready to be read. Valid values:
success-file: Adds a _success file to the directory.
custom: Creates a commit policy using a specified class. You can specify multiple commit policies at the same time.
sink.partition-commit.policy.class
The partition commit policy class that implements the PartitionCommitPolicy interface.
String
No
None
This class can be used only with a custom commit policy.
sink.partition-commit.success-file.name
The name of the file when using the success-file partition commit policy.
String
No
_SUCCESS
None.
sink.parallelism
The parallelism for writing files to an external file system.
Integer
No
None
By default, the sink parallelism is the same as the parallelism of the upstream chained operator. If you configure a different parallelism from the upstream chained operator, the operator that writes files uses the specified sink parallelism. If file compaction is enabled, the compaction operator also uses the specified sink parallelism.
NoteThis value must be greater than 0. Otherwise, an exception is thrown.
Configure bucket authentication information
Bucket authentication information can be configured only for the real-time computing engine VVR 8.0.6 or later.
After you specify the file system path, you must also configure bucket authentication information to read data from and write data to that path. To do this, add the following code to the Other Configuration section on the Deployment Details tab of the development console.
fs.oss.bucket.<bucketName>.accessKeyId: xxxx
fs.oss.bucket.<bucketName>.accessKeySecret: xxxxThe following table describes the parameters.
Configuration item | Description |
fs.oss.bucket.<bucketName>.accessKeyId | Parameter description:
|
fs.oss.bucket.<bucketName>.accessKeySecret |
Write to OSS-HDFS
First, add the following configuration to the Other Configuration section on the Deployment Details tab of the development console:
fs.oss.jindo.buckets: xxx
fs.oss.jindo.accessKeyId: xxx
fs.oss.jindo.accessKeySecret: xxxThe following table describes the parameters.
Configuration item | Description |
fs.oss.jindo.buckets | The name of the bucket in the OSS-HDFS service to which data is written. You can configure multiple bucket names, separated by semicolons. When Flink writes to an OSS path, if its corresponding bucket is included in fs.oss.jindo.buckets, the data is written to the OSS-HDFS service. |
fs.oss.jindo.accessKeyId | Use an existing AccessKey or create a new one. For more information, see Create an AccessKey. Note To reduce the risk of an AccessKey secret leak, the AccessKey secret is displayed only when it is created and cannot be viewed later. Make sure to store it securely. |
fs.oss.jindo.accessKeySecret |
In addition, you must configure the endpoint for OSS-HDFS. You can configure the OSS-HDFS endpoint in one of two ways:
Parameter configuration
In the Real-time Compute for Apache Flink development console, on the Deployment Details tab, add the following configuration to the Additional Configurations section of the Runtime Parameter Settings area:
fs.oss.jindo.endpoint: xxxPath configuration
Configure the OSS-HDFS endpoint in the OSS path.
oss://<user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>/<user-defined-dir>user-defined-oss-hdfs-bucket: The name of the bucket.oss-hdfs-endpoint: The endpoint of the OSS-HDFS service.The
fs.oss.jindo.bucketsconfiguration item must include <user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>.
For example, if the bucket name is jindo-test and the endpoint is cn-beijing.oss-dls.aliyuncs.com, the OSS path must be oss://jindo-test.cn-beijing.oss-dls.aliyuncs.com/<user-defined-dir>, and the fs.oss.jindo.buckets configuration item must include jindo-test.cn-beijing.oss-dls.aliyuncs.com.
# OSS path
oss://jindo-test.cn-beijing.oss-dls.aliyuncs.com/<user-defined-dir>
# Other configuration
fs.oss.jindo.buckets: jindo-test,jindo-test.cn-beijing.oss-dls.aliyuncs.comWhen you write to external HDFS files where the file path is hdfs://**, you can also add the following configuration to specify or change the username for access.
Add the following configuration to the Other Configuration section on the Deployment Details tab of the development console:
containerized.taskmanager.env.HADOOP_USER_NAME: hdfs
containerized.master.env.HADOOP_USER_NAME: hdfsUsage examples
Source table
CREATE TEMPORARY TABLE fs_table_source ( `id` INT, `name` VARCHAR ) WITH ( 'connector'='filesystem', 'path'='oss://<bucket>/path', 'format'='parquet' ); CREATE TEMPORARY TABLE blackhole_sink( `id` INT, `name` VARCHAR ) with ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT * FROM fs_table_source ;Sink table
Write to a partitioned table
CREATE TABLE datagen_source ( user_id STRING, order_amount DOUBLE, ts BIGINT, -- Time in milliseconds ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3), WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- Define a watermark on the TIMESTAMP_LTZ column ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE fs_table_sink ( user_id STRING, order_amount DOUBLE, dt STRING, `hour` STRING ) PARTITIONED BY (dt, `hour`) WITH ( 'connector'='filesystem', 'path'='oss://<bucket>/path', 'format'='parquet', 'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00', 'sink.partition-commit.delay'='1 h', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- Assume that the user-configured time zone is 'Asia/Shanghai' 'sink.partition-commit.policy.kind'='success-file' ); -- Streaming SQL to insert into the file system table INSERT INTO fs_table_sink SELECT user_id, order_amount, DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'), DATE_FORMAT(ts_ltz, 'HH') FROM datagen_source;Write to a non-partitioned table
CREATE TABLE datagen_source ( user_id STRING, order_amount DOUBLE ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE fs_table_sink ( user_id STRING, order_amount DOUBLE ) WITH ( 'connector'='filesystem', 'path'='oss://<bucket>/path', 'format'='parquet' ); INSERT INTO fs_table_sink SELECT * FROM datagen_source;
DataStream API
To read or write data using the DataStream API, you must use the corresponding DataStream connector to connect to Flink. For more information, see How to use DataStream connectors.
The following example shows how to use the DataStream API to write to OSS and OSS-HDFS:
String outputPath = "oss://<bucket>/path"
final StreamingFileSink<Row> sink =
StreamingFileSink.forRowFormat(
new Path(outputPath),
(Encoder<Row>)
(element, stream) -> {
out.println(element.toString());
})
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build();
outputStream.addSink(sink);To write to OSS-HDFS, you must also configure the relevant OSS-HDFS parameters in the Other Configuration field. This field is in the Running Parameter Configuration area on the Deployment Details tab of the Real-time Compute for Apache Flink development console. For more information, see Write to OSS-HDFS.
References
For more information about the connectors that Flink supports, see Supported connectors.
For more information about the Tablestore (OTS) connector, see Tablestore (OTS) connector.
For more information about the Paimon connector for streaming data lakes, see Paimon connector for streaming data lakes.