Topik ini menjelaskan cara menggunakan konektor Object Storage Service (OSS).
Alibaba Cloud OSS adalah layanan penyimpanan objek yang aman dan hemat biaya dengan daya tahan data sebesar 99,9999999999% (dua belas angka sembilan) dan ketersediaan data sebesar 99,995%. OSS menyediakan beberapa kelas penyimpanan untuk membantu Anda mengelola dan mengurangi biaya penyimpanan. Tabel berikut menjelaskan kemampuan yang didukung oleh konektor OSS.
Item | Deskripsi |
Tipe tabel | Sumber dan sink. |
Mode operasi | Mode batch dan mode streaming. |
Format data | ORC, PARQUET, AVRO, CSV, JSON, dan RAW. |
Metrik | Tidak ada. |
Tipe API | DataStream API dan SQL API. |
Pembaruan atau penghapusan data dalam tabel sink | Data dalam tabel sink tidak dapat diperbarui atau dihapus. Data hanya dapat dimasukkan ke dalam tabel sink. |
Batasan
Umum
Hanya Ververica Runtime (VVR) 11 atau versi yang lebih baru yang mendukung pembacaan data dari file terkompresi (GZIP, BZIP2, XZ, DEFLATE) di OSS.
Hanya VVR 8.0.6 atau versi yang lebih lama yang dapat membaca atau menulis data hanya ke bucket OSS dalam Akun Alibaba Cloud Anda.
CatatanJika Anda ingin membaca data dari dan menulis data ke Bucket OSS dalam akun Alibaba Cloud lainnya, gunakan Realtime Compute for Apache Flink yang menggunakan VVR 8.0.6 atau lebih baru serta konfigurasikan informasi autentikasi Bucket OSS. Untuk informasi lebih lanjut, lihat bagian Konfigurasi Informasi Autentikasi Bucket OSS dari topik ini.
Khusus sink
Data dalam format penyimpanan berorientasi baris seperti AVRO, CSV, JSON, dan RAW tidak dapat ditulis ke OSS. Untuk informasi lebih lanjut, lihat FLINK-30635.
Sintaks
CREATE TABLE OssTable (
column_name1 INT,
column_name2 STRING,
...
datetime STRING,
`hour` STRING
) PARTITIONED BY (datetime, `hour`) WITH (
'connector' = 'filesystem',
'path' = 'oss://<bucket>/path',
'format' = '...'
);Kolom metadata
Anda dapat menentukan kolom metadata dalam tabel sumber OSS untuk memperoleh metadata terkait OSS. Misalnya, jika Anda menentukan kolom metadata bernama file.path dalam tabel sumber OSS, nilai kolom tersebut adalah path file tempat setiap baris data disimpan. Contoh kode:
CREATE TABLE MyUserTableWithFilepath (
column_name1 INT,
column_name2 STRING,
`file.path` STRING NOT NULL METADATA
) WITH (
'connector' = 'filesystem',
'path' = 'oss://<bucket>/path',
'format' = 'json'
)Tabel berikut menjelaskan kolom metadata yang didukung oleh tabel sumber OSS.
Kunci | Tipe data | Deskripsi |
file.path | STRING NOT NULL | Jalur file tempat setiap baris data disimpan. |
file.name | STRING NOT NULL | Nama file tempat setiap baris data disimpan. Nama file adalah elemen terjauh dari jalur root file. |
file.size | BIGINT NOT NULL | Jumlah byte dalam file tempat setiap baris data disimpan. |
file.modification-time | TIMESTAMP_LTZ(3) NOT NULL | Waktu ketika file tempat setiap baris data disimpan dimodifikasi. |
Opsi konektor
Umum
Opsi
Deskripsi
Tipe data
Diperlukan
Nilai default
Catatan
connector
Tipe tabel.
String
Ya
Tidak ada nilai default
Atur nilainya menjadi
filesystem.path
Jalur sistem file.
String
Ya
Tidak ada nilai default
Path dalam format Uniform Resource Identifier (URI). Contoh:
oss://my_bucket/my_path.CatatanSetelah Anda mengonfigurasi opsi ini di Realtime Compute for Apache Flink yang menggunakan VVR 8.0.6 atau versi yang lebih baru, Anda harus mengonfigurasi informasi autentikasi bucket OSS untuk membaca atau menulis data ke path sistem file yang ditentukan. Untuk informasi selengkapnya, lihat bagian Konfigurasi informasi autentikasi bucket OSS dalam topik ini.
format
Format file.
String
Ya
Tidak ada nilai default
Nilai valid:
csv
json
avro
parquet
orc
raw
Khusus sumber
Opsi
Deskripsi
Tipe data
Diperlukan
Nilai default
Catatan
source.monitor-interval
Interval pemantauan pembuatan file baru oleh tabel sumber. Nilai opsi ini harus lebih besar dari 0.
Durasi
Tidak
Tidak ada nilai default
Jika Anda tidak mengonfigurasi opsi ini, path yang ditentukan hanya dipindai sekali. Dalam hal ini, sumber data bersifat terbatas.
Setiap file diidentifikasi secara unik oleh jalur file. Setiap kali file baru terdeteksi, file tersebut diproses.
File yang telah diproses disimpan dalam status selama siklus hidup sumber data. Oleh karena itu, status sumber data disimpan saat checkpoint dan titik simpan dibuat. Jika Anda mengatur opsi ini ke nilai kecil, file baru dapat terdeteksi dengan cepat tetapi sistem file atau OSS sering dilalui.
Khusus sink
Opsi
Deskripsi
Tipe data
Diperlukan
Nilai default
Catatan
partition.default-name
Nama partisi ketika nilai bidang partisi adalah null atau string kosong.
String
Tidak
_DEFAULT_PARTITION__
Tidak tersedia
sink.rolling-policy.file-size
Ukuran maksimum file sebelum file digulung.
MemorySize
Tidak
128 MB
Data yang ditulis ke direktori yang ditentukan dibagi dan disimpan dalam file PART. Setiap sub-tugas di mana operator sink sebuah partisi menerima data partisi menghasilkan setidaknya satu file PART. File digulung berdasarkan kebijakan gulungan yang dikonfigurasi. Jika file PART saat ini yang berada dalam keadaan in-progress akan ditutup, file PART baru dihasilkan. File PART digulung berdasarkan ukuran file dan periode maksimum waktu file yang ditentukan dapat dibuka.
CatatanJika penyimpanan berorientasi kolom digunakan untuk file,
file digulung ketika checkpointing dilakukan meskipun file tidak memenuhi persyaratan kebijakan gulungan.
Dalam hal ini, jika file memenuhi persyaratan kebijakan gulungan atau jika checkpointing dilakukan, file selalu digulung.
Jika penyimpanan berorientasi baris digunakan untuk file, file hanya digulung ketika file memenuhi persyaratan kebijakan gulungan.
sink.rolling-policy.rollover-interval
Periode maksimum waktu file PART dibuka sebelum file digulung.
Durasi
Tidak
30 menit
Frekuensi pemeriksaan ditentukan oleh opsi sink.rolling-policy.check-interval.
sink.rolling-policy.check-interval
Interval waktu Realtime Compute for Apache Flink memeriksa apakah file perlu digulung berdasarkan kebijakan gulungan.
Durasi
Tidak
1 menit
Opsi ini menentukan apakah file yang ditentukan perlu digulung berdasarkan nilai opsi sink.rolling-policy.rollover-interval.
auto-compaction
Menentukan apakah fitur penggabungan file otomatis diaktifkan untuk tabel sink streaming. Jika fitur ini diaktifkan, data pertama-tama ditulis ke file sementara. Setelah operasi checkpointing selesai, file sementara yang dihasilkan selama checkpointing digabungkan. File sementara tidak terlihat sebelum file tersebut digabungkan.
Boolean
Tidak
false
Jika fitur penggabungan file otomatis diaktifkan, beberapa file kecil digabungkan menjadi file besar berdasarkan ukuran file tujuan yang ditentukan. Saat menggunakan fitur ini di lingkungan produksi, perhatikan poin-poin berikut:
Hanya file yang dihasilkan selama checkpointing yang dapat digabungkan. Jumlah file yang dihasilkan lebih besar dari atau sama dengan jumlah checkpoint.
File sementara tidak terlihat sebelum file tersebut digabungkan. File menjadi terlihat setelah
interval checkpointing dan durasi penggabunganberlalu.Jika durasi penggabungan terlalu lama, tekanan balik mungkin terjadi. Ini memperpanjang waktu yang diperlukan untuk checkpointing.
compaction.file-size
Ukuran file tujuan ke dalam file sementara digabungkan.
MemorySize
Tidak
128 MB
Nilai default sama dengan nilai opsi sink.rolling-policy.file-size.
sink.partition-commit.trigger
Tipe pemicu komitmen partisi.
String
Tidak
process-time
Realtime Compute for Apache Flink menyediakan tipe pemicu komitmen partisi berikut yang dapat digunakan untuk menulis data ke tabel partisi:
process-time: Pemicu komitmen partisi ini digunakan berdasarkan waktu pembuatan partisi dan waktu sistem saat ini serta tidak memerlukan ekstraktor waktu partisi atau generator watermark. Jika waktu sistem saat ini melebihi jumlah waktu pembuatan partisi dan waktu yang ditentukan oleh opsi sink.partition-commit.delay, pemicu process-time segera melakukan komitmen partisi. Pemicu ini cocok untuk keperluan umum tetapi tidak menjamin akurasi. Misalnya, jika terjadi penundaan data atau kegagalan, pemicu ini mungkin melakukan komitmen partisi lebih awal dari yang diharapkan.
partition-time: Pemicu komitmen partisi ini digunakan berdasarkan waktu pembuatan partisi yang diekstraksi dan memerlukan generator watermark. Jika Anda ingin menggunakan pemicu ini untuk penerapan, pastikan penerapan tersebut mendukung pembuatan watermark dan data dipartisi berdasarkan waktu seperti jam atau hari. Jika waktu yang ditentukan oleh watermark melebihi jumlah waktu pembuatan partisi dan waktu yang ditentukan oleh opsi sink.partition-commit.delay, pemicu partition-time segera melakukan komitmen partisi.
sink.partition-commit.delay
Penundaan maksimum yang diizinkan sebelum partisi dapat dikomit. Partisi tidak dikomit sampai waktu penundaan tercapai.
Durasi
Tidak
0 detik
Jika data dipartisi berdasarkan hari, Anda perlu mengatur opsi ini menjadi
1 d.Jika data dipartisi berdasarkan jam, Anda perlu mengatur opsi ini menjadi
1 h.
sink.partition-commit.watermark-time-zone
Zona waktu yang digunakan ketika watermark tipe LONG dikonversi menjadi watermark tipe TIMESTAMP. Setelah konversi, Realtime Compute for Apache Flink membandingkan watermark tipe TIMESTAMP dengan waktu pembuatan partisi untuk menentukan apakah partisi perlu dikomit.
String
Tidak
UTC
Opsi ini hanya berlaku ketika opsi sink.partition-commit.trigger diatur ke partition-time.
Konfigurasi opsi sink.partition-commit.watermark-time-zone berdampak pada komitmen partisi. Misalnya, jika Anda menentukan ROWTIME untuk kolom TIMESTAMP_LTZ dalam tabel sumber dan tidak mengonfigurasi opsi ini, partisi mungkin dikomitmen beberapa jam lebih lambat dari waktu yang diharapkan. Nilai default UTC menunjukkan bahwa watermark didefinisikan pada kolom TIMESTAMP atau tidak ada watermark yang didefinisikan.
Jika watermark didefinisikan pada kolom TIMESTAMP_LTZ, zona waktu watermark harus sesuai dengan zona waktu sesi. Nilai yang valid untuk opsi ini dapat berupa nama lengkap zona waktu seperti 'America/Los_Angeles' atau zona waktu kustom seperti 'GMT-08:00'.
partition.time-extractor.kind
Ekstraktor waktu yang mengekstrak waktu dari bidang partisi.
String
Tidak
default
Nilai valid:
default: Secara default, Anda dapat mengonfigurasi pola timestamp atau formatter. Ini adalah nilai default.
custom: Kelas ekstraktor harus ditentukan.
partition.time-extractor.class
Kelas ekstraktor yang mengimplementasikan antarmuka PartitionTimeExtractor.
String
Tidak
Tidak ada nilai default
Tidak tersedia
partition.time-extractor.timestamp-pattern
Metode konstruksi default yang memungkinkan Anda menggunakan bidang partisi untuk mendapatkan pola timestamp yang valid.
String
Tidak
Tidak ada nilai default
Secara default, bidang pertama diekstraksi dalam format
yyyy-MM-dd hh:mm:ss.Jika Anda ingin mengekstraksi timestamp partisi dari bidang partisi dt, Anda dapat mengatur opsi ini menjadi $dt.
Jika Anda ingin mengekstraksi timestamp partisi dari beberapa bidang partisi, seperti tahun, bulan, hari, dan jam, Anda dapat mengatur opsi ini menjadi
$year-$month-$day $hour:00:00.Jika Anda ingin mengekstraksi timestamp partisi dari bidang partisi dt dan hour, Anda dapat mengatur opsi ini menjadi
$dt $hour:00:00.
partition.time-extractor.timestamp-formatter
Formatter yang digunakan untuk mengonversi nilai string timestamp partisi menjadi timestamp. Nilai string timestamp partisi ditentukan oleh opsi partition.time-extractor.timestamp-pattern.
String
Tidak
yyyy-MM-dd HH:mm:ss
Sebagai contoh, jika Anda ingin mengekstraksi timestamp partisi dari beberapa bidang partisi, seperti tahun, bulan, dan hari, Anda dapat mengatur opsi partition.time-extractor.timestamp-pattern menjadi
$year$month$daydan opsi partition.time-extractor.timestamp-formatter menjadi yyyyMMdd. Nilai default opsi ini adalahyyyy-MM-dd HH:mm:ss. Formatter timestamp yang ditentukan oleh opsi ini kompatibel dengan DateTimeFormatter Java.sink.partition-commit.policy.kind
Tipe kebijakan komitmen partisi.
String
Tidak
Tidak ada nilai default
Kebijakan komitmen partisi memungkinkan Realtime Compute for Apache Flink memberi tahu aplikasi hilir bahwa penulisan data ke partisi selesai dan data dapat dibaca dari partisi. Nilai valid:
success-file: File _success ditambahkan ke direktori yang ditentukan.
custom: Kebijakan komitmen dibuat berdasarkan kelas yang ditentukan. Anda dapat menentukan beberapa kebijakan komitmen secara bersamaan.
sink.partition-commit.policy.class
Kelas kebijakan komitmen partisi yang mengimplementasikan antarmuka PartitionCommitPolicy.
String
Tidak
Tidak ada nilai default
Kelas ini tersedia hanya ketika opsi sink.partition-commit.policy.kind diatur ke custom.
sink.partition-commit.success-file.name
Nama file yang digunakan jika opsi sink.partition-commit.policy.kind diatur ke success-file.
String
Tidak
_SUCCESS
Tidak tersedia
sink.parallelism
Tingkat paralelisme untuk menulis file ke sistem file.
Integer
Tidak
Tidak ada nilai default
Secara default, nilai opsi sink.parallelism sama dengan tingkat paralelisme operator rantai upstream. Jika nilai opsi sink.parallelism berbeda dari tingkat paralelisme operator rantai upstream, operator yang menulis file menggunakan nilai opsi ini. Jika penggabungan file diaktifkan, operator yang menggabungkan file juga menggunakan nilai opsi ini.
CatatanNilai harus lebih besar dari 0. Jika tidak, terjadi kesalahan.
Konfigurasi informasi autentikasi Bucket OSS
Hanya Realtime Compute for Apache Flink yang menggunakan VVR 8.0.6 atau lebih baru yang memungkinkan Anda mengonfigurasi informasi autentikasi Bucket OSS.
Setelah menentukan jalur sistem file, Anda harus mengonfigurasi informasi autentikasi Bucket OSS agar dapat membaca data dari dan menulis data ke jalur tertentu dari sistem file. Untuk mengonfigurasi informasi autentikasi Bucket OSS, ikuti langkah-langkah berikut: Masuk ke konsol pengembangan Realtime Compute for Apache Flink. Pada tab Configuration halaman Deployments, klik Edit di sudut kanan atas bagian Parameters dan tambahkan konfigurasi berikut ke bidang Other Configuration:
fs.oss.bucket.<bucketName>.accessKeyId: xxxx
fs.oss.bucket.<bucketName>.accessKeySecret: xxxxTabel berikut menjelaskan parameter dalam konfigurasi di atas.
Parameter | Deskripsi |
fs.oss.bucket.<bucketName>.accessKeyId | Parameter:
|
fs.oss.bucket.<bucketName>.accessKeySecret |
Menulis data ke OSS-HDFS
Masuk ke Konsol Pengembangan Realtime Compute for Apache Flink. Di tab Configuration halaman Deployments, klik Edit di pojok kanan atas bagian Parameters dan tambahkan konfigurasi berikut ke kolom Other Configuration:
fs.oss.jindo.buckets: xxx
fs.oss.jindo.accessKeyId: xxx
fs.oss.jindo.accessKeySecret: xxxTabel berikut menjelaskan parameter dalam konfigurasi di atas.
Parameter | Deskripsi |
fs.oss.jindo.buckets | Nama bucket layanan OSS-HDFS tempat data ditulis. Anda dapat menentukan beberapa nama bucket. Pisahkan nama bucket dengan titik koma (;). Ketika Flink menulis data ke jalur OSS, data ditulis ke layanan OSS-HDFS jika nama bucket terkait termasuk dalam nilai parameter fs.oss.jindo.buckets. |
fs.oss.jindo.accessKeyId | ID AccessKey akun Alibaba Cloud Anda. Untuk informasi lebih lanjut tentang cara mendapatkan Rahasia AccessKey akun Alibaba Cloud, lihat Lihat informasi tentang pasangan AccessKey pengguna RAM. |
fs.oss.jindo.accessKeySecret | Rahasia AccessKey akun Alibaba Cloud Anda. Untuk informasi lebih lanjut tentang cara mendapatkan Rahasia AccessKey akun Alibaba Cloud, lihat Lihat informasi tentang pasangan AccessKey pengguna RAM. |
Anda juga harus mengonfigurasi titik akhir layanan OSS-HDFS menggunakan salah satu metode berikut:
Konfigurasi parameter
Di tab Configuration halaman detail penerapan Anda, klik Edit di pojok kanan atas bagian Parameters dan tambahkan konfigurasi berikut ke kolom Other Configuration:
fs.oss.jindo.endpoint: xxxKonfigurasi path
Konfigurasikan titik akhir layanan OSS-HDFS dalam path OSS.
oss://<user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>/<user-defined-dir>user-defined-oss-hdfs-bucket: nama bucket Anda.oss-hdfs-endpoint: titik akhir bucket OSS-HDFS Anda.Catatan:
fs.oss.jindo.bucketsmemerlukan<user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>.
Sebagai contoh, jika nama bucket adalah jindo-test dan titik akhir adalah cn-beijing.oss-dls.aliyuncs.com, path OSS harus berupa oss://jindo-test.cn-beijing.oss-dls.aliyuncs.com/<user-defined-dir>, dan opsi fs.oss.jindo.buckets harus berisi jindo-test.cn-beijing.oss-dls.aliyuncs.com.
# URI OSS
oss://jindo-test.cn-beijing.oss-dls.aliyuncs.com/<user-defined-dir>
# Konfigurasi lain
fs.oss.jindo.buckets: jindo-test,jindo-test.cn-beijing.oss-dls.aliyuncs.comSaat menulis ke HDFS eksternal (path:hdfs://**), tambahkan konfigurasi berikut ke bagian Parameters di tab Configuration halaman detail penerapan Anda untuk menentukan atau mengubah pengguna.
containerized.taskmanager.env.HADOOP_USER_NAME: hdfs
containerized.master.env.HADOOP_USER_NAME: hdfsContoh kode
Contoh Kode untuk Tabel Sumber
CREATE TEMPORARY TABLE fs_table_source ( `id` INT, `name` VARCHAR ) WITH ( 'connector'='filesystem', 'path'='oss://<bucket>/path', 'format'='parquet' ); CREATE TEMPORARY TABLE blackhole_sink( `id` INT, `name` VARCHAR ) with ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT * FROM fs_table_source ;Kode contoh untuk tabel sink
Menulis Data ke Tabel Partisi
CREATE TABLE datagen_source ( user_id STRING, order_amount DOUBLE, ts BIGINT, -- Gunakan waktu dalam milidetik. ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3), WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- Tentukan watermark dalam kolom TIMESTAMP_LTZ. ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE fs_table_sink ( user_id STRING, order_amount DOUBLE, dt STRING, `hour` STRING ) PARTITIONED BY (dt, `hour`) WITH ( 'connector'='filesystem', 'path'='oss://<bucket>/path', 'format'='parquet', 'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00', 'sink.partition-commit.delay'='1 h', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- Zona waktu adalah 'Asia/Shanghai'. 'sink.partition-commit.policy.kind'='success-file' ); -- Jalankan pernyataan SQL streaming berikut untuk menyisipkan data ke tabel sistem file. INSERT INTO fs_table_sink SELECT user_id, order_amount, DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'), DATE_FORMAT(ts_ltz, 'HH') FROM datagen_source;Menulis Data ke Tabel Non-Partisi
CREATE TABLE datagen_source ( user_id STRING, order_amount DOUBLE ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE fs_table_sink ( user_id STRING, order_amount DOUBLE ) WITH ( 'connector'='filesystem', 'path'='oss://<bucket>/path', 'format'='parquet' ); INSERT INTO fs_table_sink SELECT * FROM datagen_source;
DataStream API
Jika Anda ingin memanggil DataStream API untuk membaca atau menulis data, Anda harus menggunakan DataStream connector tipe terkait untuk terhubung ke Realtime Compute for Apache Flink. Untuk informasi lebih lanjut tentang cara mengonfigurasi DataStream connector, lihat Pengaturan DataStream Connector.
Contoh kode berikut menunjukkan cara menggunakan DataStream API untuk menulis data ke OSS atau OSS-HDFS.
String outputPath = "oss://<bucket>/path"
final StreamingFileSink<Row> sink =
StreamingFileSink.forRowFormat(
new Path(outputPath),
(Encoder<Row>)
(element, stream) -> {
out.println(element.toString());
})
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build();
outputStream.addSink(sink);Jika Anda ingin menulis data ke OSS-HDFS, jalankan kode di atas dan lakukan langkah-langkah berikut: Masuk ke development console of Realtime Compute for Apache Flink. Pada tab Configuration halaman Deployments, klik Edit di sudut kanan atas bagian Parameters dan tambahkan konfigurasi terkait OSS-HDFS ke bidang Other Configuration. Untuk informasi lebih lanjut, lihat bagian Menulis Data ke OSS-HDFS dari topik ini.
Referensi
Untuk informasi lebih lanjut tentang konektor yang didukung oleh Realtime Compute for Apache Flink, lihat Konektor yang Didukung.
Untuk informasi lebih lanjut tentang cara menggunakan konektor Tablestore, lihat Konektor Tablestore.
Untuk informasi lebih lanjut tentang cara menggunakan konektor Apache Paimon, lihat Konektor Apache Paimon.