Topik ini menjelaskan cara menggunakan konektor Object Storage Service (OSS).
Object Storage Service (OSS) Alibaba Cloud adalah layanan penyimpanan cloud yang aman, hemat biaya, dan sangat andal untuk menyimpan data dalam jumlah besar. Layanan ini menawarkan ketahanan data hingga 99,9999999999% (dua belas angka 9) dan ketersediaan data sebesar 99,995%. OSS menyediakan berbagai kelas penyimpanan untuk membantu Anda mengoptimalkan biaya penyimpanan.
Category | Details |
Jenis yang didukung | Tabel sumber dan tabel sink |
Execution modes | Batch and stream modes |
Format data | Orc, Parquet, Avro, Csv, JSON, dan Raw |
Metrik pemantauan spesifik | Tidak ada |
Jenis API | DataStream and SQL |
Supports updating or deleting data in sink tables | You can only insert data. You cannot update or delete data in sink tables. |
Batasan
General
Hanya Ververica Runtime (VVR) versi 11 ke atas yang mendukung pembacaan file terkompresi, seperti GZIP, BZIP2, XZ, dan DEFLATE, dari OSS. VVR 8 tidak dapat memproses file terkompresi dengan benar.
Versi VVR sebelum 8.0.6 hanya mendukung pembacaan dan penulisan data ke bucket OSS yang dimiliki oleh akun yang sama. Untuk mengakses data lintas akun, gunakan VVR 8.0.6 atau versi yang lebih baru serta konfigurasikan autentikasi bucket. Untuk informasi selengkapnya, lihat Configure bucket authentication.
Pembacaan inkremental partisi baru tidak didukung.
For sink tables only
Saat menulis data ke OSS, format row-store seperti Avro, CSV, JSON, dan Raw tidak didukung. Untuk informasi selengkapnya, lihat FLINK-30635.
Sintaks
CREATE TABLE OssTable (
column_name1 INT,
column_name2 STRING,
...
datetime STRING,
`hour` STRING
) PARTITIONED BY (datetime, `hour`) WITH (
'connector' = 'filesystem',
'path' = 'oss://<bucket>/path',
'format' = '...'
);Kolom metadata
Anda dapat membaca kolom metadata pada tabel sumber untuk mengambil metadata tentang data OSS. Misalnya, jika Anda mendefinisikan kolom metadata file.path pada tabel sumber OSS, nilai kolom tersebut adalah path file yang berisi baris tersebut. Contoh berikut menunjukkan cara menggunakan kolom metadata.
CREATE TABLE MyUserTableWithFilepath (
column_name1 INT,
column_name2 STRING,
`file.path` STRING NOT NULL METADATA
) WITH (
'connector' = 'filesystem',
'path' = 'oss://<bucket>/path',
'format' = 'json'
)Tabel berikut mencantumkan kolom metadata yang didukung oleh tabel sumber OSS.
Kunci | Tipe data | Deskripsi |
file.path | STRING NOT NULL | The path of the file that contains the row. |
file.name | STRING NOT NULL | The name of the file that contains the row. This is the last element of the file path. |
file.size | BIGINT NOT NULL | The size of the file that contains the row, in bytes. |
file.modification-time | TIMESTAMP_LTZ(3) NOT NULL | The modification time of the file that contains the row. |
Parameter WITH
Umum
Parameter
Deskripsi
Tipe data
Diperlukan
Nilai default
Notes
connector
The type of table.
String
Ya
None
Nilainya harus
filesystem.path
Path sistem file.
String
Ya
None
The path must be in a URI format, such as
oss://my_bucket/my_path.CatatanFor VVR 8.0.6 and later, after you set this parameter, you must also configure bucket authentication to read data from and write data to the specified file system path. For more information, see Configure bucket authentication.
format
Format file.
String
Ya
None
Nilai valid:
csv
json
avro
parquet
orc
raw
Specific to source tables
Parameter
Deskripsi
Tipe data
Diperlukan
Nilai default
Notes
source.monitor-interval
The interval to monitor for new files. The value must be greater than 0.
Durasi
Tidak
None
If you do not set this parameter, the specified path is scanned only once, and the source is bounded.
Each file is identified by its path and processed once.
The processed files are stored in the state for the lifecycle of the source. The state is saved during checkpoints and savepoints. A shorter interval allows for faster discovery of new files but results in more frequent scans of the file system or object store.
Specific to sink tables
Parameter
Deskripsi
Tipe data
Diperlukan
Nilai default
Notes
partition.default-name
The name for a partition when the partition field is NULL or an empty string.
String
Tidak
_DEFAULT_PARTITION__
None.
sink.rolling-policy.file-size
Ukuran maksimum file sebelum digulung.
MemorySize
Tidak
128 MB
Data written to a directory is split into part files. Each subtask of the sink that receives data for a partition creates at least one part file for that partition. Based on the rolling policy, the current in-progress part file is closed, and a new one is created. The policy rolls part files based on their size and the maximum time they can remain open.
CatatanFor column-store formats:
A file is always rolled during a checkpoint, even if it does not meet the rolling policy criteria.
A file is rolled when it meets the policy criteria or when a checkpoint occurs.
For row-store formats, a file is rolled only when it meets the rolling policy criteria.
sink.rolling-policy.rollover-interval
The maximum time a part file can remain open before being rolled.
Durasi
Tidak
30min
Frekuensi pemeriksaan dikendalikan oleh properti sink.rolling-policy.check-interval.
sink.rolling-policy.check-interval
The interval to check for the time-based rolling policy.
Durasi
Tidak
1min
This property controls how often to check whether a file should be rolled based on the sink.rolling-policy.rollover-interval property.
auto-compaction
Whether to enable automatic compaction for a streaming sink table. Data is first written to temporary files. After a checkpoint is complete, the temporary files from that checkpoint are merged. Temporary files are not visible before they are merged.
Boolean
Tidak
false
If you enable file compaction, small files are merged into larger files based on the target file size. Note the following when you use file compaction in a production environment:
Only files within a checkpoint are merged. At least one file is generated for each checkpoint.
Files are not visible before compaction. The data visibility latency is the
checkpoint interval + compaction duration.Long compaction times can cause backpressure and extend the time required for checkpoints.
compaction.file-size
The target size for a compacted file.
MemorySize
Tidak
128 MB
The default value is the same as the rolling file size specified by sink.rolling-policy.file-size.
sink.partition-commit.trigger
The type of trigger to commit a partition.
String
Tidak
process-time
Untuk menulis ke tabel partisi, Flink menyediakan dua jenis pemicu commit partisi:
process-time: The partition commit trigger is based on the partition creation time and the current system time. It does not require a partition time extractor or a watermark generator. A partition is committed immediately when the current system time exceeds the sum of the partition creation system time and the value of sink.partition-commit.delay. This trigger is more general but less precise. For example, data latency or failures can lead to premature partition commits.
partition-time: This trigger is based on the extracted partition time and requires watermark generation. The job must support watermark generation, and partitions are created based on time, such as hourly or daily. A partition is committed immediately when the watermark exceeds the sum of the partition creation system time and the value of sink.partition-commit.delay.
sink.partition-commit.delay
The maximum delay before a partition is committed. This means a partition will not be committed before this delay has passed.
Durasi
Tidak
0s
If partitions are created daily, you can set this to
1 d.If partitions are created hourly, set this to
1 h.
sink.partition-commit.watermark-time-zone
The time zone used to parse a LONG watermark into a TIMESTAMP. The resulting TIMESTAMP is compared with the partition time to determine whether the partition should be committed.
String
Tidak
UTC
This parameter is valid only when sink.partition-commit.trigger is set to `partition-time`.
If this is not set correctly, for example, if the source rowtime is defined on a TIMESTAMP_LTZ column and this property is not set, you might see the partition commit only after several hours. The default value is UTC, which means the watermark is defined on a TIMESTAMP column or no watermark is defined.
If the watermark is defined on a TIMESTAMP_LTZ column, the watermark time zone must be the session time zone. Valid values for this property are either a full time zone name (such as 'America/Los_Angeles') or a custom time zone (such as 'GMT-08:00').
partition.time-extractor.kind
The time extractor that extracts time from partition fields.
String
Tidak
default
Nilai valid:
default: Secara default, Anda dapat mengonfigurasi pola atau formatter timestamp.
custom: Anda harus menentukan kelas ekstraktor.
partition.time-extractor.class
Kelas ekstraktor yang mengimplementasikan antarmuka PartitionTimeExtractor.
String
Tidak
None
None.
partition.time-extractor.timestamp-pattern
Metode konstruksi default yang memungkinkan Anda menggunakan bidang partisi untuk memperoleh pola timestamp yang valid.
String
Tidak
None
By default, the first field is extracted using the
yyyy-MM-dd hh:mm:sspattern.To extract a timestamp from a partition field 'dt', you can configure it as: `$dt`.
To extract a timestamp from multiple partition fields, such as year, month, day, and hour, you can configure it as:
$year-$month-$day $hour:00:00.To extract a timestamp from two partition fields, dt and hour, you can configure it as:
$dt $hour:00:00.
partition.time-extractor.timestamp-formatter
Formatter yang mengonversi nilai string timestamp partisi menjadi timestamp. Nilai string timestamp partisi dinyatakan oleh properti partition.time-extractor.timestamp-pattern.
String
Tidak
yyyy-MM-dd HH:mm:ss
For example, if a partition timestamp is extracted from multiple partition fields, such as year, month, and day, you can set the partition.time-extractor.timestamp-pattern property to
$year$month$dayand the partition.time-extractor.timestamp-formatter property to `yyyyMMdd`. The default formatter isyyyy-MM-dd HH:mm:ss. This timestamp formatter is compatible with Java's DateTimeFormatter.sink.partition-commit.policy.kind
Jenis kebijakan commit partisi.
String
Tidak
None
A partition commit policy notifies downstream consumers that a partition has finished writing and is ready to be read. Valid values:
success-file: Adds a `_success` file to the directory.
custom: Membuat kebijakan commit menggunakan kelas tertentu. Anda dapat menentukan beberapa kebijakan commit sekaligus.
sink.partition-commit.policy.class
Kelas kebijakan komitmen partisi yang mengimplementasikan antarmuka PartitionCommitPolicy.
String
Tidak
None
This class can be used only with the `custom` commit policy.
sink.partition-commit.success-file.name
The name of the file to use with the `success-file` partition commit policy.
String
Tidak
_SUCCESS
None.
sink.parallelism
The parallelism for writing files to the external file system.
Integer
Tidak
None
By default, the sink parallelism is the same as the parallelism of the upstream chained operator. If you configure a different parallelism, the file writing operator uses the specified sink parallelism. If file compaction is enabled, the compaction operator also uses the specified sink parallelism.
CatatanNilai ini harus lebih besar dari 0. Jika tidak, exception akan dilemparkan.
Configure bucket authentication
Only VVR 8.0.6 and later versions support configuring bucket authentication.
Setelah menentukan path sistem file, Anda juga harus mengonfigurasi autentikasi bucket untuk membaca dan menulis data ke path yang ditentukan. Untuk mengonfigurasi autentikasi bucket, tambahkan kode berikut ke bagian Additional Configurations pada tab Parameters di halaman Deployment Details di real-time computing development console.
fs.oss.bucket.<bucketName>.accessKeyId: xxxx
fs.oss.bucket.<bucketName>.accessKeySecret: xxxxTabel berikut menjelaskan parameter-parameter tersebut.
Item konfigurasi | Deskripsi |
fs.oss.bucket.<bucketName>.accessKeyId | Deskripsi parameter:
|
fs.oss.bucket.<bucketName>.accessKeySecret |
Tulis ke OSS-HDFS
Pertama, tambahkan konfigurasi berikut ke bagian Additional Configurations pada tab Parameters di halaman Deployment Details di real-time computing development console.
fs.oss.jindo.buckets: xxx
fs.oss.jindo.accessKeyId: xxx
fs.oss.jindo.accessKeySecret: xxxTabel berikut menjelaskan parameter-parameter tersebut.
Configuration item | Deskripsi |
fs.oss.jindo.buckets | The names of the buckets in the OSS-HDFS service to write to. You can configure multiple buckets, separated by semicolons. When Flink writes to an OSS path, if the corresponding bucket is included in `fs.oss.jindo.buckets`, the data is written to the OSS-HDFS service. |
fs.oss.jindo.accessKeyId | Gunakan AccessKey yang sudah ada atau buat yang baru. Untuk informasi selengkapnya, lihat Buat AccessKey. Catatan To reduce the risk of an AccessKey secret leak, the AccessKey secret is shown only once when you create it and cannot be viewed later. Keep it secure. |
fs.oss.jindo.accessKeySecret |
Anda juga perlu mengonfigurasi endpoint OSS-HDFS. Endpoint OSS-HDFS dapat dikonfigurasi dengan dua cara:
Konfigurasi parameter
Anda dapat menambahkan konfigurasi berikut ke bagian Additional Configurations pada tab Parameters di halaman Deployment Details di real-time computing development console.
fs.oss.jindo.endpoint: xxxKonfigurasi path
Anda dapat mengonfigurasi endpoint OSS-HDFS langsung dalam path OSS.
oss://<user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>/<user-defined-dir>user-defined-oss-hdfs-bucket: Nama bucket.oss-hdfs-endpoint: The OSS-HDFS endpoint.Item konfigurasi
fs.oss.jindo.bucketsharus mencakup <user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>.
Misalnya, jika nama bucket adalah `jindo-test` dan endpoint-nya adalah `cn-beijing.oss-dls.aliyuncs.com`, maka path OSS harus berupa oss://jindo-test.cn-beijing.oss-dls.aliyuncs.com/<user-defined-dir>, dan item konfigurasi fs.oss.jindo.buckets harus mencakup jindo-test.cn-beijing.oss-dls.aliyuncs.com.
# OSS path
oss://jindo-test.cn-beijing.oss-dls.aliyuncs.com/<user-defined-dir>
# Additional Configurations
fs.oss.jindo.buckets: jindo-test,jindo-test.cn-beijing.oss-dls.aliyuncs.comSaat menulis ke file HDFS eksternal dengan path hdfs://**, Anda juga dapat menambahkan konfigurasi berikut untuk menentukan atau mengganti username akses.
Tambahkan konfigurasi berikut ke bagian Additional Configurations pada tab Parameters di halaman Deployment Details di real-time computing development console.
containerized.taskmanager.env.HADOOP_USER_NAME: hdfs
containerized.master.env.HADOOP_USER_NAME: hdfsExamples
Tabel sumber
CREATE TEMPORARY TABLE fs_table_source ( `id` INT, `name` VARCHAR ) WITH ( 'connector'='filesystem', 'path'='oss://<bucket>/path', 'format'='parquet' ); CREATE TEMPORARY TABLE blackhole_sink( `id` INT, `name` VARCHAR ) with ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT * FROM fs_table_source ;Tabel sink
Menulis ke tabel partisi
CREATE TABLE datagen_source ( user_id STRING, order_amount DOUBLE, ts BIGINT, -- Time in milliseconds ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3), WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- Define a watermark on a TIMESTAMP_LTZ column ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE fs_table_sink ( user_id STRING, order_amount DOUBLE, dt STRING, `hour` STRING ) PARTITIONED BY (dt, `hour`) WITH ( 'connector'='filesystem', 'path'='oss://<bucket>/path', 'format'='parquet', 'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00', 'sink.partition-commit.delay'='1 h', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- Assume the user-configured time zone is 'Asia/Shanghai' 'sink.partition-commit.policy.kind'='success-file' ); -- Streaming SQL to insert into the file system table INSERT INTO fs_table_sink SELECT user_id, order_amount, DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'), DATE_FORMAT(ts_ltz, 'HH') FROM datagen_source;Menulis ke tabel non-partisi
CREATE TABLE datagen_source ( user_id STRING, order_amount DOUBLE ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE fs_table_sink ( user_id STRING, order_amount DOUBLE ) WITH ( 'connector'='filesystem', 'path'='oss://<bucket>/path', 'format'='parquet' ); INSERT INTO fs_table_sink SELECT * FROM datagen_source;
DataStream API
Untuk membaca dan menulis data menggunakan DataStream API, Anda harus menggunakan konektor DataStream yang sesuai untuk terhubung ke Flink. Untuk informasi selengkapnya tentang pengaturan konektor DataStream, lihat Use a DataStream connector.
Kode berikut menunjukkan contoh penggunaan DataStream API untuk menulis ke OSS dan OSS-HDFS.
String outputPath = "oss://<bucket>/path"
final StreamingFileSink<Row> sink =
StreamingFileSink.forRowFormat(
new Path(outputPath),
(Encoder<Row>)
(element, stream) -> {
out.println(element.toString());
})
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build();
outputStream.addSink(sink);Untuk menulis ke OSS-HDFS, Anda juga harus mengonfigurasi parameter OSS-HDFS terkait di bagian Additional Configurations pada tab Parameters di halaman Deployment Details di real-time computing development console. Untuk informasi selengkapnya, lihat Write to OSS-HDFS.
Referensi
Untuk informasi selengkapnya tentang konektor yang didukung Flink, lihat Konektor yang didukung.
For more information about how to use the Tablestore (OTS) connector, see Tablestore (OTS).
For more information about how to use the Paimon connector for streaming data lakehouses, see Paimon for streaming data lakehouses.