全部产品
Search
文档中心

Realtime Compute for Apache Flink:Object Storage Service (OSS)

更新时间:Dec 27, 2025

Topik ini menjelaskan cara menggunakan konektor Object Storage Service (OSS).

Object Storage Service (OSS) Alibaba Cloud adalah layanan penyimpanan cloud yang aman, hemat biaya, dan sangat andal untuk menyimpan data dalam jumlah besar. Layanan ini menawarkan ketahanan data hingga 99,9999999999% (dua belas angka 9) dan ketersediaan data sebesar 99,995%. OSS menyediakan berbagai kelas penyimpanan untuk membantu Anda mengoptimalkan biaya penyimpanan.

Category

Details

Jenis yang didukung

Tabel sumber dan tabel sink

Execution modes

Batch and stream modes

Format data

Orc, Parquet, Avro, Csv, JSON, dan Raw

Metrik pemantauan spesifik

Tidak ada

Jenis API

DataStream and SQL

Supports updating or deleting data in sink tables

You can only insert data. You cannot update or delete data in sink tables.

Batasan

  • General

    • Hanya Ververica Runtime (VVR) versi 11 ke atas yang mendukung pembacaan file terkompresi, seperti GZIP, BZIP2, XZ, dan DEFLATE, dari OSS. VVR 8 tidak dapat memproses file terkompresi dengan benar.

    • Versi VVR sebelum 8.0.6 hanya mendukung pembacaan dan penulisan data ke bucket OSS yang dimiliki oleh akun yang sama. Untuk mengakses data lintas akun, gunakan VVR 8.0.6 atau versi yang lebih baru serta konfigurasikan autentikasi bucket. Untuk informasi selengkapnya, lihat Configure bucket authentication.

    • Pembacaan inkremental partisi baru tidak didukung.

  • For sink tables only

    Saat menulis data ke OSS, format row-store seperti Avro, CSV, JSON, dan Raw tidak didukung. Untuk informasi selengkapnya, lihat FLINK-30635.

Sintaks

CREATE TABLE OssTable (
  column_name1 INT,
  column_name2 STRING,
  ...
  datetime STRING,
  `hour` STRING
) PARTITIONED BY (datetime, `hour`) WITH (
  'connector' = 'filesystem',
  'path' = 'oss://<bucket>/path',
  'format' = '...'
);

Kolom metadata

Anda dapat membaca kolom metadata pada tabel sumber untuk mengambil metadata tentang data OSS. Misalnya, jika Anda mendefinisikan kolom metadata file.path pada tabel sumber OSS, nilai kolom tersebut adalah path file yang berisi baris tersebut. Contoh berikut menunjukkan cara menggunakan kolom metadata.

CREATE TABLE MyUserTableWithFilepath (
  column_name1 INT,
  column_name2 STRING,
  `file.path` STRING NOT NULL METADATA
) WITH (
  'connector' = 'filesystem',
  'path' = 'oss://<bucket>/path',
  'format' = 'json'
)

Tabel berikut mencantumkan kolom metadata yang didukung oleh tabel sumber OSS.

Kunci

Tipe data

Deskripsi

file.path

STRING NOT NULL

The path of the file that contains the row.

file.name

STRING NOT NULL

The name of the file that contains the row. This is the last element of the file path.

file.size

BIGINT NOT NULL

The size of the file that contains the row, in bytes.

file.modification-time

TIMESTAMP_LTZ(3) NOT NULL

The modification time of the file that contains the row.

Parameter WITH

  • Umum

    Parameter

    Deskripsi

    Tipe data

    Diperlukan

    Nilai default

    Notes

    connector

    The type of table.

    String

    Ya

    None

    Nilainya harus filesystem.

    path

    Path sistem file.

    String

    Ya

    None

    The path must be in a URI format, such as oss://my_bucket/my_path.

    Catatan

    For VVR 8.0.6 and later, after you set this parameter, you must also configure bucket authentication to read data from and write data to the specified file system path. For more information, see Configure bucket authentication.

    format

    Format file.

    String

    Ya

    None

    Nilai valid:

    • csv

    • json

    • avro

    • parquet

    • orc

    • raw

  • Specific to source tables

    Parameter

    Deskripsi

    Tipe data

    Diperlukan

    Nilai default

    Notes

    source.monitor-interval

    The interval to monitor for new files. The value must be greater than 0.

    Durasi

    Tidak

    None

    If you do not set this parameter, the specified path is scanned only once, and the source is bounded.

    Each file is identified by its path and processed once.

    The processed files are stored in the state for the lifecycle of the source. The state is saved during checkpoints and savepoints. A shorter interval allows for faster discovery of new files but results in more frequent scans of the file system or object store.

  • Specific to sink tables

    Parameter

    Deskripsi

    Tipe data

    Diperlukan

    Nilai default

    Notes

    partition.default-name

    The name for a partition when the partition field is NULL or an empty string.

    String

    Tidak

    _DEFAULT_PARTITION__

    None.

    sink.rolling-policy.file-size

    Ukuran maksimum file sebelum digulung.

    MemorySize

    Tidak

    128 MB

    Data written to a directory is split into part files. Each subtask of the sink that receives data for a partition creates at least one part file for that partition. Based on the rolling policy, the current in-progress part file is closed, and a new one is created. The policy rolls part files based on their size and the maximum time they can remain open.

    Catatan

    For column-store formats:

    A file is always rolled during a checkpoint, even if it does not meet the rolling policy criteria.

    A file is rolled when it meets the policy criteria or when a checkpoint occurs.

    For row-store formats, a file is rolled only when it meets the rolling policy criteria.

    sink.rolling-policy.rollover-interval

    The maximum time a part file can remain open before being rolled.

    Durasi

    Tidak

    30min

    Frekuensi pemeriksaan dikendalikan oleh properti sink.rolling-policy.check-interval.

    sink.rolling-policy.check-interval

    The interval to check for the time-based rolling policy.

    Durasi

    Tidak

    1min

    This property controls how often to check whether a file should be rolled based on the sink.rolling-policy.rollover-interval property.

    auto-compaction

    Whether to enable automatic compaction for a streaming sink table. Data is first written to temporary files. After a checkpoint is complete, the temporary files from that checkpoint are merged. Temporary files are not visible before they are merged.

    Boolean

    Tidak

    false

    If you enable file compaction, small files are merged into larger files based on the target file size. Note the following when you use file compaction in a production environment:

    • Only files within a checkpoint are merged. At least one file is generated for each checkpoint.

    • Files are not visible before compaction. The data visibility latency is the checkpoint interval + compaction duration.

    • Long compaction times can cause backpressure and extend the time required for checkpoints.

    compaction.file-size

    The target size for a compacted file.

    MemorySize

    Tidak

    128 MB

    The default value is the same as the rolling file size specified by sink.rolling-policy.file-size.

    sink.partition-commit.trigger

    The type of trigger to commit a partition.

    String

    Tidak

    process-time

    Untuk menulis ke tabel partisi, Flink menyediakan dua jenis pemicu commit partisi:

    • process-time: The partition commit trigger is based on the partition creation time and the current system time. It does not require a partition time extractor or a watermark generator. A partition is committed immediately when the current system time exceeds the sum of the partition creation system time and the value of sink.partition-commit.delay. This trigger is more general but less precise. For example, data latency or failures can lead to premature partition commits.

    • partition-time: This trigger is based on the extracted partition time and requires watermark generation. The job must support watermark generation, and partitions are created based on time, such as hourly or daily. A partition is committed immediately when the watermark exceeds the sum of the partition creation system time and the value of sink.partition-commit.delay.

    sink.partition-commit.delay

    The maximum delay before a partition is committed. This means a partition will not be committed before this delay has passed.

    Durasi

    Tidak

    0s

    • If partitions are created daily, you can set this to 1 d.

    • If partitions are created hourly, set this to 1 h.

    sink.partition-commit.watermark-time-zone

    The time zone used to parse a LONG watermark into a TIMESTAMP. The resulting TIMESTAMP is compared with the partition time to determine whether the partition should be committed.

    String

    Tidak

    UTC

    This parameter is valid only when sink.partition-commit.trigger is set to `partition-time`.

    • If this is not set correctly, for example, if the source rowtime is defined on a TIMESTAMP_LTZ column and this property is not set, you might see the partition commit only after several hours. The default value is UTC, which means the watermark is defined on a TIMESTAMP column or no watermark is defined.

    • If the watermark is defined on a TIMESTAMP_LTZ column, the watermark time zone must be the session time zone. Valid values for this property are either a full time zone name (such as 'America/Los_Angeles') or a custom time zone (such as 'GMT-08:00').

    partition.time-extractor.kind

    The time extractor that extracts time from partition fields.

    String

    Tidak

    default

    Nilai valid:

    • default: Secara default, Anda dapat mengonfigurasi pola atau formatter timestamp.

    • custom: Anda harus menentukan kelas ekstraktor.

    partition.time-extractor.class

    Kelas ekstraktor yang mengimplementasikan antarmuka PartitionTimeExtractor.

    String

    Tidak

    None

    None.

    partition.time-extractor.timestamp-pattern

    Metode konstruksi default yang memungkinkan Anda menggunakan bidang partisi untuk memperoleh pola timestamp yang valid.

    String

    Tidak

    None

    By default, the first field is extracted using the yyyy-MM-dd hh:mm:ss pattern.

    • To extract a timestamp from a partition field 'dt', you can configure it as: `$dt`.

    • To extract a timestamp from multiple partition fields, such as year, month, day, and hour, you can configure it as: $year-$month-$day $hour:00:00.

    • To extract a timestamp from two partition fields, dt and hour, you can configure it as: $dt $hour:00:00.

    partition.time-extractor.timestamp-formatter

    Formatter yang mengonversi nilai string timestamp partisi menjadi timestamp. Nilai string timestamp partisi dinyatakan oleh properti partition.time-extractor.timestamp-pattern.

    String

    Tidak

    yyyy-MM-dd HH:mm:ss

    For example, if a partition timestamp is extracted from multiple partition fields, such as year, month, and day, you can set the partition.time-extractor.timestamp-pattern property to $year$month$day and the partition.time-extractor.timestamp-formatter property to `yyyyMMdd`. The default formatter is yyyy-MM-dd HH:mm:ss. This timestamp formatter is compatible with Java's DateTimeFormatter.

    sink.partition-commit.policy.kind

    Jenis kebijakan commit partisi.

    String

    Tidak

    None

    A partition commit policy notifies downstream consumers that a partition has finished writing and is ready to be read. Valid values:

    • success-file: Adds a `_success` file to the directory.

    • custom: Membuat kebijakan commit menggunakan kelas tertentu. Anda dapat menentukan beberapa kebijakan commit sekaligus.

    sink.partition-commit.policy.class

    Kelas kebijakan komitmen partisi yang mengimplementasikan antarmuka PartitionCommitPolicy.

    String

    Tidak

    None

    This class can be used only with the `custom` commit policy.

    sink.partition-commit.success-file.name

    The name of the file to use with the `success-file` partition commit policy.

    String

    Tidak

    _SUCCESS

    None.

    sink.parallelism

    The parallelism for writing files to the external file system.

    Integer

    Tidak

    None

    By default, the sink parallelism is the same as the parallelism of the upstream chained operator. If you configure a different parallelism, the file writing operator uses the specified sink parallelism. If file compaction is enabled, the compaction operator also uses the specified sink parallelism.

    Catatan

    Nilai ini harus lebih besar dari 0. Jika tidak, exception akan dilemparkan.

Configure bucket authentication

Catatan

Only VVR 8.0.6 and later versions support configuring bucket authentication.

Setelah menentukan path sistem file, Anda juga harus mengonfigurasi autentikasi bucket untuk membaca dan menulis data ke path yang ditentukan. Untuk mengonfigurasi autentikasi bucket, tambahkan kode berikut ke bagian Additional Configurations pada tab Parameters di halaman Deployment Details di real-time computing development console.

fs.oss.bucket.<bucketName>.accessKeyId: xxxx
fs.oss.bucket.<bucketName>.accessKeySecret: xxxx

Tabel berikut menjelaskan parameter-parameter tersebut.

Item konfigurasi

Deskripsi

fs.oss.bucket.<bucketName>.accessKeyId

Deskripsi parameter:

  • <bucketName>: Ganti dengan nama bucket yang Anda masukkan untuk parameter path sistem file.

  • Gunakan AccessKey yang sudah ada atau buat yang baru. Untuk informasi selengkapnya, lihat Create an AccessKey.

    Catatan

    Untuk mengurangi risiko kebocoran Rahasia AccessKey, AccessKey secret hanya ditampilkan sekali saat Anda membuatnya dan tidak dapat dilihat kembali setelah itu. Simpanlah dengan aman.

fs.oss.bucket.<bucketName>.accessKeySecret

Tulis ke OSS-HDFS

Pertama, tambahkan konfigurasi berikut ke bagian Additional Configurations pada tab Parameters di halaman Deployment Details di real-time computing development console.

fs.oss.jindo.buckets: xxx
fs.oss.jindo.accessKeyId: xxx
fs.oss.jindo.accessKeySecret: xxx

Tabel berikut menjelaskan parameter-parameter tersebut.

Configuration item

Deskripsi

fs.oss.jindo.buckets

The names of the buckets in the OSS-HDFS service to write to. You can configure multiple buckets, separated by semicolons. When Flink writes to an OSS path, if the corresponding bucket is included in `fs.oss.jindo.buckets`, the data is written to the OSS-HDFS service.

fs.oss.jindo.accessKeyId

Gunakan AccessKey yang sudah ada atau buat yang baru. Untuk informasi selengkapnya, lihat Buat AccessKey.

Catatan

To reduce the risk of an AccessKey secret leak, the AccessKey secret is shown only once when you create it and cannot be viewed later. Keep it secure.

fs.oss.jindo.accessKeySecret

Anda juga perlu mengonfigurasi endpoint OSS-HDFS. Endpoint OSS-HDFS dapat dikonfigurasi dengan dua cara:

Konfigurasi parameter

Anda dapat menambahkan konfigurasi berikut ke bagian Additional Configurations pada tab Parameters di halaman Deployment Details di real-time computing development console.

fs.oss.jindo.endpoint: xxx

Konfigurasi path

Anda dapat mengonfigurasi endpoint OSS-HDFS langsung dalam path OSS.

oss://<user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>/<user-defined-dir>
  • user-defined-oss-hdfs-bucket: Nama bucket.

  • oss-hdfs-endpoint: The OSS-HDFS endpoint.

  • Item konfigurasi fs.oss.jindo.buckets harus mencakup <user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>.

Misalnya, jika nama bucket adalah `jindo-test` dan endpoint-nya adalah `cn-beijing.oss-dls.aliyuncs.com`, maka path OSS harus berupa oss://jindo-test.cn-beijing.oss-dls.aliyuncs.com/<user-defined-dir>, dan item konfigurasi fs.oss.jindo.buckets harus mencakup jindo-test.cn-beijing.oss-dls.aliyuncs.com.

# OSS path 
oss://jindo-test.cn-beijing.oss-dls.aliyuncs.com/<user-defined-dir>
# Additional Configurations 
fs.oss.jindo.buckets: jindo-test,jindo-test.cn-beijing.oss-dls.aliyuncs.com
Catatan

Saat menulis ke file HDFS eksternal dengan path hdfs://**, Anda juga dapat menambahkan konfigurasi berikut untuk menentukan atau mengganti username akses.

Tambahkan konfigurasi berikut ke bagian Additional Configurations pada tab Parameters di halaman Deployment Details di real-time computing development console.

containerized.taskmanager.env.HADOOP_USER_NAME: hdfs
containerized.master.env.HADOOP_USER_NAME: hdfs

Examples

  • Tabel sumber

    CREATE TEMPORARY TABLE fs_table_source (
      `id` INT,
      `name` VARCHAR
    ) WITH (
      'connector'='filesystem',
      'path'='oss://<bucket>/path',
      'format'='parquet'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      `id` INT,
      `name` VARCHAR
    ) with (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink SELECT * FROM fs_table_source ;
  • Tabel sink

    • Menulis ke tabel partisi

      CREATE TABLE datagen_source (
        user_id STRING,
        order_amount DOUBLE,
        ts BIGINT, -- Time in milliseconds
        ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
        WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- Define a watermark on a TIMESTAMP_LTZ column
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE fs_table_sink (
        user_id STRING,
        order_amount DOUBLE,
        dt STRING,
        `hour` STRING
      ) PARTITIONED BY (dt, `hour`) WITH (
        'connector'='filesystem',
        'path'='oss://<bucket>/path',
        'format'='parquet',
        'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',
        'sink.partition-commit.delay'='1 h',
        'sink.partition-commit.trigger'='partition-time',
        'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- Assume the user-configured time zone is 'Asia/Shanghai'
        'sink.partition-commit.policy.kind'='success-file'
      );
      
      -- Streaming SQL to insert into the file system table
      INSERT INTO fs_table_sink 
      SELECT 
        user_id, 
        order_amount, 
        DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'),
        DATE_FORMAT(ts_ltz, 'HH') 
      FROM datagen_source;
    • Menulis ke tabel non-partisi

      CREATE TABLE datagen_source (
        user_id STRING,
        order_amount DOUBLE
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE fs_table_sink (
        user_id STRING,
        order_amount DOUBLE
      ) WITH (
        'connector'='filesystem',
        'path'='oss://<bucket>/path',
        'format'='parquet'
      );
      
      INSERT INTO fs_table_sink SELECT * FROM datagen_source;

DataStream API

Penting

Untuk membaca dan menulis data menggunakan DataStream API, Anda harus menggunakan konektor DataStream yang sesuai untuk terhubung ke Flink. Untuk informasi selengkapnya tentang pengaturan konektor DataStream, lihat Use a DataStream connector.

Kode berikut menunjukkan contoh penggunaan DataStream API untuk menulis ke OSS dan OSS-HDFS.

String outputPath = "oss://<bucket>/path"
final StreamingFileSink<Row> sink =
                StreamingFileSink.forRowFormat(
                                new Path(outputPath),
                                (Encoder<Row>)
                                        (element, stream) -> {
                                            out.println(element.toString());
                                        })
                        .withRollingPolicy(OnCheckpointRollingPolicy.build())
                        .build();

outputStream.addSink(sink);

Untuk menulis ke OSS-HDFS, Anda juga harus mengonfigurasi parameter OSS-HDFS terkait di bagian Additional Configurations pada tab Parameters di halaman Deployment Details di real-time computing development console. Untuk informasi selengkapnya, lihat Write to OSS-HDFS.

Referensi