全部产品
Search
文档中心

Realtime Compute for Apache Flink:Konektor OSS

更新时间:Nov 10, 2025

Topik ini menjelaskan cara menggunakan konektor Object Storage Service (OSS).

Alibaba Cloud OSS adalah layanan penyimpanan objek yang aman dan hemat biaya dengan daya tahan data sebesar 99,9999999999% (dua belas angka sembilan) dan ketersediaan data sebesar 99,995%. OSS menyediakan beberapa kelas penyimpanan untuk membantu Anda mengelola dan mengurangi biaya penyimpanan. Tabel berikut menjelaskan kemampuan yang didukung oleh konektor OSS.

Item

Deskripsi

Tipe tabel

Sumber dan sink.

Mode operasi

Mode batch dan mode streaming.

Format data

ORC, PARQUET, AVRO, CSV, JSON, dan RAW.

Metrik

Tidak ada.

Tipe API

DataStream API dan SQL API.

Pembaruan atau penghapusan data dalam tabel sink

Data dalam tabel sink tidak dapat diperbarui atau dihapus. Data hanya dapat dimasukkan ke dalam tabel sink.

Batasan

  • Umum

    • Hanya Ververica Runtime (VVR) 11 atau versi yang lebih baru yang mendukung pembacaan data dari file terkompresi (GZIP, BZIP2, XZ, DEFLATE) di OSS.

    • Hanya VVR 8.0.6 atau versi yang lebih lama yang dapat membaca atau menulis data hanya ke bucket OSS dalam Akun Alibaba Cloud Anda.

      Catatan

      Jika Anda ingin membaca data dari dan menulis data ke Bucket OSS dalam akun Alibaba Cloud lainnya, gunakan Realtime Compute for Apache Flink yang menggunakan VVR 8.0.6 atau lebih baru serta konfigurasikan informasi autentikasi Bucket OSS. Untuk informasi lebih lanjut, lihat bagian Konfigurasi Informasi Autentikasi Bucket OSS dari topik ini.

  • Khusus sink

    • Data dalam format penyimpanan berorientasi baris seperti AVRO, CSV, JSON, dan RAW tidak dapat ditulis ke OSS. Untuk informasi lebih lanjut, lihat FLINK-30635.

Sintaks

CREATE TABLE OssTable (
  column_name1 INT,
  column_name2 STRING,
  ...
  datetime STRING,
  `hour` STRING
) PARTITIONED BY (datetime, `hour`) WITH (
  'connector' = 'filesystem',
  'path' = 'oss://<bucket>/path',
  'format' = '...'
);

Kolom metadata

Anda dapat menentukan kolom metadata dalam tabel sumber OSS untuk memperoleh metadata terkait OSS. Misalnya, jika Anda menentukan kolom metadata bernama file.path dalam tabel sumber OSS, nilai kolom tersebut adalah path file tempat setiap baris data disimpan. Contoh kode:

CREATE TABLE MyUserTableWithFilepath (
  column_name1 INT,
  column_name2 STRING,
  `file.path` STRING NOT NULL METADATA
) WITH (
  'connector' = 'filesystem',
  'path' = 'oss://<bucket>/path',
  'format' = 'json'
)

Tabel berikut menjelaskan kolom metadata yang didukung oleh tabel sumber OSS.

Kunci

Tipe data

Deskripsi

file.path

STRING NOT NULL

Jalur file tempat setiap baris data disimpan.

file.name

STRING NOT NULL

Nama file tempat setiap baris data disimpan. Nama file adalah elemen terjauh dari jalur root file.

file.size

BIGINT NOT NULL

Jumlah byte dalam file tempat setiap baris data disimpan.

file.modification-time

TIMESTAMP_LTZ(3) NOT NULL

Waktu ketika file tempat setiap baris data disimpan dimodifikasi.

Opsi konektor

  • Umum

    Opsi

    Deskripsi

    Tipe data

    Diperlukan

    Nilai default

    Catatan

    connector

    Tipe tabel.

    String

    Ya

    Tidak ada nilai default

    Atur nilainya menjadi filesystem.

    path

    Jalur sistem file.

    String

    Ya

    Tidak ada nilai default

    Path dalam format Uniform Resource Identifier (URI). Contoh: oss://my_bucket/my_path.

    Catatan

    Setelah Anda mengonfigurasi opsi ini di Realtime Compute for Apache Flink yang menggunakan VVR 8.0.6 atau versi yang lebih baru, Anda harus mengonfigurasi informasi autentikasi bucket OSS untuk membaca atau menulis data ke path sistem file yang ditentukan. Untuk informasi selengkapnya, lihat bagian Konfigurasi informasi autentikasi bucket OSS dalam topik ini.

    format

    Format file.

    String

    Ya

    Tidak ada nilai default

    Nilai valid:

    • csv

    • json

    • avro

    • parquet

    • orc

    • raw

  • Khusus sumber

    Opsi

    Deskripsi

    Tipe data

    Diperlukan

    Nilai default

    Catatan

    source.monitor-interval

    Interval pemantauan pembuatan file baru oleh tabel sumber. Nilai opsi ini harus lebih besar dari 0.

    Durasi

    Tidak

    Tidak ada nilai default

    Jika Anda tidak mengonfigurasi opsi ini, path yang ditentukan hanya dipindai sekali. Dalam hal ini, sumber data bersifat terbatas.

    Setiap file diidentifikasi secara unik oleh jalur file. Setiap kali file baru terdeteksi, file tersebut diproses.

    File yang telah diproses disimpan dalam status selama siklus hidup sumber data. Oleh karena itu, status sumber data disimpan saat checkpoint dan titik simpan dibuat. Jika Anda mengatur opsi ini ke nilai kecil, file baru dapat terdeteksi dengan cepat tetapi sistem file atau OSS sering dilalui.

  • Khusus sink

    Opsi

    Deskripsi

    Tipe data

    Diperlukan

    Nilai default

    Catatan

    partition.default-name

    Nama partisi ketika nilai bidang partisi adalah null atau string kosong.

    String

    Tidak

    _DEFAULT_PARTITION__

    Tidak tersedia

    sink.rolling-policy.file-size

    Ukuran maksimum file sebelum file digulung.

    MemorySize

    Tidak

    128 MB

    Data yang ditulis ke direktori yang ditentukan dibagi dan disimpan dalam file PART. Setiap sub-tugas di mana operator sink sebuah partisi menerima data partisi menghasilkan setidaknya satu file PART. File digulung berdasarkan kebijakan gulungan yang dikonfigurasi. Jika file PART saat ini yang berada dalam keadaan in-progress akan ditutup, file PART baru dihasilkan. File PART digulung berdasarkan ukuran file dan periode maksimum waktu file yang ditentukan dapat dibuka.

    Catatan

    Jika penyimpanan berorientasi kolom digunakan untuk file,

    file digulung ketika checkpointing dilakukan meskipun file tidak memenuhi persyaratan kebijakan gulungan.

    Dalam hal ini, jika file memenuhi persyaratan kebijakan gulungan atau jika checkpointing dilakukan, file selalu digulung.

    Jika penyimpanan berorientasi baris digunakan untuk file, file hanya digulung ketika file memenuhi persyaratan kebijakan gulungan.

    sink.rolling-policy.rollover-interval

    Periode maksimum waktu file PART dibuka sebelum file digulung.

    Durasi

    Tidak

    30 menit

    Frekuensi pemeriksaan ditentukan oleh opsi sink.rolling-policy.check-interval.

    sink.rolling-policy.check-interval

    Interval waktu Realtime Compute for Apache Flink memeriksa apakah file perlu digulung berdasarkan kebijakan gulungan.

    Durasi

    Tidak

    1 menit

    Opsi ini menentukan apakah file yang ditentukan perlu digulung berdasarkan nilai opsi sink.rolling-policy.rollover-interval.

    auto-compaction

    Menentukan apakah fitur penggabungan file otomatis diaktifkan untuk tabel sink streaming. Jika fitur ini diaktifkan, data pertama-tama ditulis ke file sementara. Setelah operasi checkpointing selesai, file sementara yang dihasilkan selama checkpointing digabungkan. File sementara tidak terlihat sebelum file tersebut digabungkan.

    Boolean

    Tidak

    false

    Jika fitur penggabungan file otomatis diaktifkan, beberapa file kecil digabungkan menjadi file besar berdasarkan ukuran file tujuan yang ditentukan. Saat menggunakan fitur ini di lingkungan produksi, perhatikan poin-poin berikut:

    • Hanya file yang dihasilkan selama checkpointing yang dapat digabungkan. Jumlah file yang dihasilkan lebih besar dari atau sama dengan jumlah checkpoint.

    • File sementara tidak terlihat sebelum file tersebut digabungkan. File menjadi terlihat setelah interval checkpointing dan durasi penggabungan berlalu.

    • Jika durasi penggabungan terlalu lama, tekanan balik mungkin terjadi. Ini memperpanjang waktu yang diperlukan untuk checkpointing.

    compaction.file-size

    Ukuran file tujuan ke dalam file sementara digabungkan.

    MemorySize

    Tidak

    128 MB

    Nilai default sama dengan nilai opsi sink.rolling-policy.file-size.

    sink.partition-commit.trigger

    Tipe pemicu komitmen partisi.

    String

    Tidak

    process-time

    Realtime Compute for Apache Flink menyediakan tipe pemicu komitmen partisi berikut yang dapat digunakan untuk menulis data ke tabel partisi:

    • process-time: Pemicu komitmen partisi ini digunakan berdasarkan waktu pembuatan partisi dan waktu sistem saat ini serta tidak memerlukan ekstraktor waktu partisi atau generator watermark. Jika waktu sistem saat ini melebihi jumlah waktu pembuatan partisi dan waktu yang ditentukan oleh opsi sink.partition-commit.delay, pemicu process-time segera melakukan komitmen partisi. Pemicu ini cocok untuk keperluan umum tetapi tidak menjamin akurasi. Misalnya, jika terjadi penundaan data atau kegagalan, pemicu ini mungkin melakukan komitmen partisi lebih awal dari yang diharapkan.

    • partition-time: Pemicu komitmen partisi ini digunakan berdasarkan waktu pembuatan partisi yang diekstraksi dan memerlukan generator watermark. Jika Anda ingin menggunakan pemicu ini untuk penerapan, pastikan penerapan tersebut mendukung pembuatan watermark dan data dipartisi berdasarkan waktu seperti jam atau hari. Jika waktu yang ditentukan oleh watermark melebihi jumlah waktu pembuatan partisi dan waktu yang ditentukan oleh opsi sink.partition-commit.delay, pemicu partition-time segera melakukan komitmen partisi.

    sink.partition-commit.delay

    Penundaan maksimum yang diizinkan sebelum partisi dapat dikomit. Partisi tidak dikomit sampai waktu penundaan tercapai.

    Durasi

    Tidak

    0 detik

    • Jika data dipartisi berdasarkan hari, Anda perlu mengatur opsi ini menjadi 1 d.

    • Jika data dipartisi berdasarkan jam, Anda perlu mengatur opsi ini menjadi 1 h.

    sink.partition-commit.watermark-time-zone

    Zona waktu yang digunakan ketika watermark tipe LONG dikonversi menjadi watermark tipe TIMESTAMP. Setelah konversi, Realtime Compute for Apache Flink membandingkan watermark tipe TIMESTAMP dengan waktu pembuatan partisi untuk menentukan apakah partisi perlu dikomit.

    String

    Tidak

    UTC

    Opsi ini hanya berlaku ketika opsi sink.partition-commit.trigger diatur ke partition-time.

    • Konfigurasi opsi sink.partition-commit.watermark-time-zone berdampak pada komitmen partisi. Misalnya, jika Anda menentukan ROWTIME untuk kolom TIMESTAMP_LTZ dalam tabel sumber dan tidak mengonfigurasi opsi ini, partisi mungkin dikomitmen beberapa jam lebih lambat dari waktu yang diharapkan. Nilai default UTC menunjukkan bahwa watermark didefinisikan pada kolom TIMESTAMP atau tidak ada watermark yang didefinisikan.

    • Jika watermark didefinisikan pada kolom TIMESTAMP_LTZ, zona waktu watermark harus sesuai dengan zona waktu sesi. Nilai yang valid untuk opsi ini dapat berupa nama lengkap zona waktu seperti 'America/Los_Angeles' atau zona waktu kustom seperti 'GMT-08:00'.

    partition.time-extractor.kind

    Ekstraktor waktu yang mengekstrak waktu dari bidang partisi.

    String

    Tidak

    default

    Nilai valid:

    • default: Secara default, Anda dapat mengonfigurasi pola timestamp atau formatter. Ini adalah nilai default.

    • custom: Kelas ekstraktor harus ditentukan.

    partition.time-extractor.class

    Kelas ekstraktor yang mengimplementasikan antarmuka PartitionTimeExtractor.

    String

    Tidak

    Tidak ada nilai default

    Tidak tersedia

    partition.time-extractor.timestamp-pattern

    Metode konstruksi default yang memungkinkan Anda menggunakan bidang partisi untuk mendapatkan pola timestamp yang valid.

    String

    Tidak

    Tidak ada nilai default

    Secara default, bidang pertama diekstraksi dalam format yyyy-MM-dd hh:mm:ss.

    • Jika Anda ingin mengekstraksi timestamp partisi dari bidang partisi dt, Anda dapat mengatur opsi ini menjadi $dt.

    • Jika Anda ingin mengekstraksi timestamp partisi dari beberapa bidang partisi, seperti tahun, bulan, hari, dan jam, Anda dapat mengatur opsi ini menjadi $year-$month-$day $hour:00:00.

    • Jika Anda ingin mengekstraksi timestamp partisi dari bidang partisi dt dan hour, Anda dapat mengatur opsi ini menjadi $dt $hour:00:00.

    partition.time-extractor.timestamp-formatter

    Formatter yang digunakan untuk mengonversi nilai string timestamp partisi menjadi timestamp. Nilai string timestamp partisi ditentukan oleh opsi partition.time-extractor.timestamp-pattern.

    String

    Tidak

    yyyy-MM-dd HH:mm:ss

    Sebagai contoh, jika Anda ingin mengekstraksi timestamp partisi dari beberapa bidang partisi, seperti tahun, bulan, dan hari, Anda dapat mengatur opsi partition.time-extractor.timestamp-pattern menjadi $year$month$day dan opsi partition.time-extractor.timestamp-formatter menjadi yyyyMMdd. Nilai default opsi ini adalah yyyy-MM-dd HH:mm:ss. Formatter timestamp yang ditentukan oleh opsi ini kompatibel dengan DateTimeFormatter Java.

    sink.partition-commit.policy.kind

    Tipe kebijakan komitmen partisi.

    String

    Tidak

    Tidak ada nilai default

    Kebijakan komitmen partisi memungkinkan Realtime Compute for Apache Flink memberi tahu aplikasi hilir bahwa penulisan data ke partisi selesai dan data dapat dibaca dari partisi. Nilai valid:

    • success-file: File _success ditambahkan ke direktori yang ditentukan.

    • custom: Kebijakan komitmen dibuat berdasarkan kelas yang ditentukan. Anda dapat menentukan beberapa kebijakan komitmen secara bersamaan.

    sink.partition-commit.policy.class

    Kelas kebijakan komitmen partisi yang mengimplementasikan antarmuka PartitionCommitPolicy.

    String

    Tidak

    Tidak ada nilai default

    Kelas ini tersedia hanya ketika opsi sink.partition-commit.policy.kind diatur ke custom.

    sink.partition-commit.success-file.name

    Nama file yang digunakan jika opsi sink.partition-commit.policy.kind diatur ke success-file.

    String

    Tidak

    _SUCCESS

    Tidak tersedia

    sink.parallelism

    Tingkat paralelisme untuk menulis file ke sistem file.

    Integer

    Tidak

    Tidak ada nilai default

    Secara default, nilai opsi sink.parallelism sama dengan tingkat paralelisme operator rantai upstream. Jika nilai opsi sink.parallelism berbeda dari tingkat paralelisme operator rantai upstream, operator yang menulis file menggunakan nilai opsi ini. Jika penggabungan file diaktifkan, operator yang menggabungkan file juga menggunakan nilai opsi ini.

    Catatan

    Nilai harus lebih besar dari 0. Jika tidak, terjadi kesalahan.

Konfigurasi informasi autentikasi Bucket OSS

Catatan

Hanya Realtime Compute for Apache Flink yang menggunakan VVR 8.0.6 atau lebih baru yang memungkinkan Anda mengonfigurasi informasi autentikasi Bucket OSS.

Setelah menentukan jalur sistem file, Anda harus mengonfigurasi informasi autentikasi Bucket OSS agar dapat membaca data dari dan menulis data ke jalur tertentu dari sistem file. Untuk mengonfigurasi informasi autentikasi Bucket OSS, ikuti langkah-langkah berikut: Masuk ke konsol pengembangan Realtime Compute for Apache Flink. Pada tab Configuration halaman Deployments, klik Edit di sudut kanan atas bagian Parameters dan tambahkan konfigurasi berikut ke bidang Other Configuration:

fs.oss.bucket.<bucketName>.accessKeyId: xxxx
fs.oss.bucket.<bucketName>.accessKeySecret: xxxx

Tabel berikut menjelaskan parameter dalam konfigurasi di atas.

Parameter

Deskripsi

fs.oss.bucket.<bucketName>.accessKeyId

Parameter:

  • <bucketName>: Ganti parameter ini dengan nama Bucket OSS yang Anda tentukan dalam URI sistem file tujuan.

  • accessKeyId: Masukkan ID AccessKey akun Alibaba Cloud Anda yang digunakan untuk mengakses Bucket OSS. Untuk informasi lebih lanjut tentang cara mendapatkan ID AccessKey akun Alibaba Cloud, lihat Lihat informasi tentang pasangan AccessKey pengguna RAM.

  • accessKeySecret: Masukkan Rahasia AccessKey akun Alibaba Cloud Anda yang digunakan untuk mengakses Bucket OSS. Untuk informasi lebih lanjut tentang cara mendapatkan Rahasia AccessKey akun Alibaba Cloud, lihat Lihat informasi tentang pasangan AccessKey pengguna RAM.

fs.oss.bucket.<bucketName>.accessKeySecret

Menulis data ke OSS-HDFS

Masuk ke Konsol Pengembangan Realtime Compute for Apache Flink. Di tab Configuration halaman Deployments, klik Edit di pojok kanan atas bagian Parameters dan tambahkan konfigurasi berikut ke kolom Other Configuration:

fs.oss.jindo.buckets: xxx
fs.oss.jindo.accessKeyId: xxx
fs.oss.jindo.accessKeySecret: xxx

Tabel berikut menjelaskan parameter dalam konfigurasi di atas.

Parameter

Deskripsi

fs.oss.jindo.buckets

Nama bucket layanan OSS-HDFS tempat data ditulis. Anda dapat menentukan beberapa nama bucket. Pisahkan nama bucket dengan titik koma (;). Ketika Flink menulis data ke jalur OSS, data ditulis ke layanan OSS-HDFS jika nama bucket terkait termasuk dalam nilai parameter fs.oss.jindo.buckets.

fs.oss.jindo.accessKeyId

ID AccessKey akun Alibaba Cloud Anda. Untuk informasi lebih lanjut tentang cara mendapatkan Rahasia AccessKey akun Alibaba Cloud, lihat Lihat informasi tentang pasangan AccessKey pengguna RAM.

fs.oss.jindo.accessKeySecret

Rahasia AccessKey akun Alibaba Cloud Anda. Untuk informasi lebih lanjut tentang cara mendapatkan Rahasia AccessKey akun Alibaba Cloud, lihat Lihat informasi tentang pasangan AccessKey pengguna RAM.

Anda juga harus mengonfigurasi titik akhir layanan OSS-HDFS menggunakan salah satu metode berikut:

Konfigurasi parameter

Di tab Configuration halaman detail penerapan Anda, klik Edit di pojok kanan atas bagian Parameters dan tambahkan konfigurasi berikut ke kolom Other Configuration:

fs.oss.jindo.endpoint: xxx

Konfigurasi path

Konfigurasikan titik akhir layanan OSS-HDFS dalam path OSS.

oss://<user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>/<user-defined-dir>
  • user-defined-oss-hdfs-bucket: nama bucket Anda.

  • oss-hdfs-endpoint: titik akhir bucket OSS-HDFS Anda.

  • Catatan: fs.oss.jindo.buckets memerlukan <user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>.

Sebagai contoh, jika nama bucket adalah jindo-test dan titik akhir adalah cn-beijing.oss-dls.aliyuncs.com, path OSS harus berupa oss://jindo-test.cn-beijing.oss-dls.aliyuncs.com/<user-defined-dir>, dan opsi fs.oss.jindo.buckets harus berisi jindo-test.cn-beijing.oss-dls.aliyuncs.com.

# URI OSS 
oss://jindo-test.cn-beijing.oss-dls.aliyuncs.com/<user-defined-dir>
# Konfigurasi lain 
fs.oss.jindo.buckets: jindo-test,jindo-test.cn-beijing.oss-dls.aliyuncs.com
Catatan

Saat menulis ke HDFS eksternal (path:hdfs://**), tambahkan konfigurasi berikut ke bagian Parameters di tab Configuration halaman detail penerapan Anda untuk menentukan atau mengubah pengguna.

containerized.taskmanager.env.HADOOP_USER_NAME: hdfs
containerized.master.env.HADOOP_USER_NAME: hdfs

Contoh kode

  • Contoh Kode untuk Tabel Sumber

    CREATE TEMPORARY TABLE fs_table_source (
      `id` INT,
      `name` VARCHAR
    ) WITH (
      'connector'='filesystem',
      'path'='oss://<bucket>/path',
      'format'='parquet'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      `id` INT,
      `name` VARCHAR
    ) with (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink SELECT * FROM fs_table_source ;
  • Kode contoh untuk tabel sink

    • Menulis Data ke Tabel Partisi

      CREATE TABLE datagen_source (
        user_id STRING,
        order_amount DOUBLE,
        ts BIGINT, -- Gunakan waktu dalam milidetik.
        ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
        WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- Tentukan watermark dalam kolom TIMESTAMP_LTZ.
      ) WITH (
        'connector' = 'datagen'
      );
      
      
      CREATE TEMPORARY TABLE fs_table_sink (
        user_id STRING,
        order_amount DOUBLE,
        dt STRING,
        `hour` STRING
      ) PARTITIONED BY (dt, `hour`) WITH (
        'connector'='filesystem',
        'path'='oss://<bucket>/path',
        'format'='parquet',
        'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',
        'sink.partition-commit.delay'='1 h',
        'sink.partition-commit.trigger'='partition-time',
        'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- Zona waktu adalah 'Asia/Shanghai'.
        'sink.partition-commit.policy.kind'='success-file'
      );
      
      
      -- Jalankan pernyataan SQL streaming berikut untuk menyisipkan data ke tabel sistem file.
      INSERT INTO fs_table_sink 
      SELECT 
        user_id, 
        order_amount, 
        DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'),
        DATE_FORMAT(ts_ltz, 'HH') 
      FROM datagen_source;
    • Menulis Data ke Tabel Non-Partisi

      CREATE TABLE datagen_source (
        user_id STRING,
        order_amount DOUBLE
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE fs_table_sink (
        user_id STRING,
        order_amount DOUBLE
      ) WITH (
        'connector'='filesystem',
        'path'='oss://<bucket>/path',
        'format'='parquet'
      );
      
      INSERT INTO fs_table_sink SELECT * FROM datagen_source;

DataStream API

Penting

Jika Anda ingin memanggil DataStream API untuk membaca atau menulis data, Anda harus menggunakan DataStream connector tipe terkait untuk terhubung ke Realtime Compute for Apache Flink. Untuk informasi lebih lanjut tentang cara mengonfigurasi DataStream connector, lihat Pengaturan DataStream Connector.

Contoh kode berikut menunjukkan cara menggunakan DataStream API untuk menulis data ke OSS atau OSS-HDFS.

String outputPath = "oss://<bucket>/path"
final StreamingFileSink<Row> sink =
                StreamingFileSink.forRowFormat(
                                new Path(outputPath),
                                (Encoder<Row>)
                                        (element, stream) -> {
                                            out.println(element.toString());
                                        })
                        .withRollingPolicy(OnCheckpointRollingPolicy.build())
                        .build();

outputStream.addSink(sink);

Jika Anda ingin menulis data ke OSS-HDFS, jalankan kode di atas dan lakukan langkah-langkah berikut: Masuk ke development console of Realtime Compute for Apache Flink. Pada tab Configuration halaman Deployments, klik Edit di sudut kanan atas bagian Parameters dan tambahkan konfigurasi terkait OSS-HDFS ke bidang Other Configuration. Untuk informasi lebih lanjut, lihat bagian Menulis Data ke OSS-HDFS dari topik ini.

Referensi

  • Untuk informasi lebih lanjut tentang konektor yang didukung oleh Realtime Compute for Apache Flink, lihat Konektor yang Didukung.

  • Untuk informasi lebih lanjut tentang cara menggunakan konektor Tablestore, lihat Konektor Tablestore.

  • Untuk informasi lebih lanjut tentang cara menggunakan konektor Apache Paimon, lihat Konektor Apache Paimon.