Topik ini menjelaskan cara menggunakan konektor MaxCompute.
Latar Belakang
MaxCompute (sebelumnya dikenal sebagai ODPS) adalah platform komputasi yang cepat dan sepenuhnya terkelola untuk gudang data berskala besar. MaxCompute mampu memproses data dalam skala exabyte serta menyediakan solusi penyimpanan dan komputasi untuk data terstruktur dalam jumlah besar di gudang data, lengkap dengan layanan analitik dan pemodelan.
Tabel berikut menjelaskan kemampuan yang didukung oleh konektor MaxCompute.
Item | Deskripsi |
Jenis yang didukung | Tabel sumber, tabel dimensi, dan tabel sink |
Mode operasi | Mode streaming dan mode batch |
Format data | N/A |
Metrik | |
Jenis API | DataStream API dan SQL API |
Pembaruan atau penghapusan data pada tabel sink | Jika MaxCompute Batch Tunnel atau MaxCompute Streaming Tunnel digunakan, data hanya dapat dimasukkan ke tabel sink. Jika MaxCompute Upsert Tunnel digunakan, data pada tabel sink dapat diperbarui atau dihapus, serta data baru dapat dimasukkan ke tabel sink. |
Prasyarat
Tabel MaxCompute telah dibuat. Untuk informasi selengkapnya tentang cara membuat tabel MaxCompute, lihat Buat tabel.
Batasan
Konektor MaxCompute hanya mendukung semantik at-least-once.
CatatanSemantik at-least-once digunakan untuk mencegah kehilangan data. Dalam kondisi tertentu, data duplikat mungkin ditulis ke MaxCompute, tergantung pada tunnel yang digunakan. Untuk informasi lebih lanjut mengenai MaxCompute Tunnel, lihat bagian "Bagaimana cara memilih tunnel data?" dalam topik FAQ tentang penyimpanan hulu dan hilir.
Secara default, sumber beroperasi dalam mode penuh dan hanya membaca data dari partisi yang ditentukan oleh opsi
partition. Setelah semua data dari partisi tersebut selesai dibaca, pekerjaan berhenti dan tidak memantau partisi baru.Untuk memantau partisi baru secara berkelanjutan, buat sumber inkremental dengan menentukan opsi
startPartitiondalam klausa WITH.CatatanSetiap kali tabel dimensi diperbarui, tabel tersebut memeriksa partisi terbaru.
Setelah tabel sumber mulai berjalan, data yang baru ditambahkan ke partisi tidak akan dibaca. Disarankan agar Anda menjalankan penerapan setelah partisi berisi data lengkap.
SQL
Konektor MaxCompute dapat digunakan sebagai tabel sumber, dimensi, atau sink dalam pekerjaan berbasis SQL.
Sintaksis
CREATE TEMPORARY TABLE odps_source(
id INT,
user_name VARCHAR,
content VARCHAR
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'schemaName' = '<yourSchemaName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=2018****'
);Opsi konektor
Umum
Opsi
Deskripsi
Tipe data
Wajib
Nilai default
Keterangan
connector
Jenis tabel.
STRING
Ya
Tidak ada nilai default
Atur nilainya ke odps.
endpoint
Titik akhir MaxCompute.
STRING
Ya
Tidak ada nilai default
Untuk informasi selengkapnya, lihat Endpoint.
tunnelEndpoint
Titik akhir MaxCompute Tunnel.
STRING
Tidak
Tidak ada nilai default
Untuk informasi selengkapnya, lihat Endpoint.
CatatanJika opsi ini tidak ditentukan, MaxCompute mengalokasikan koneksi tunnel berdasarkan layanan Server Load Balancer (SLB).
project
Nama proyek MaxCompute.
STRING
Ya
Tidak ada nilai default
N/A.
schemaName
Nama skema MaxCompute.
STRING
Tidak
Tidak ada nilai default
Opsi ini hanya wajib jika fitur skema MaxCompute diaktifkan. Dalam hal ini, Anda harus mengatur opsi ini ke nama skema tabel MaxCompute. Untuk informasi selengkapnya, lihat Operasi skema.
CatatanHanya VVR 8.0.6 atau versi lebih baru yang mendukung opsi ini.
tableName
Nama tabel MaxCompute.
STRING
Ya
Tidak ada nilai default
N/A.
accessId
ID AccessKey yang digunakan untuk mengakses MaxCompute.
STRING
Ya
Tidak ada nilai default
Untuk informasi selengkapnya, lihat Bagaimana cara melihat informasi ID AccessKey dan Rahasia AccessKey akun?
PentingUntuk melindungi pasangan AccessKey Anda, kami menyarankan Anda mengonfigurasi ID AccessKey dengan menggunakan variabel. Untuk informasi selengkapnya, lihat Kelola variabel.
accessKey
Rahasia AccessKey yang digunakan untuk mengakses MaxCompute.
STRING
Ya
Tidak ada nilai default
partition
Nama partisi dalam tabel MaxCompute.
STRING
Tidak
Tidak ada nilai default
Anda tidak perlu menentukan opsi ini untuk tabel MaxCompute non-partisi atau sumber inkremental.
CatatanUntuk informasi selengkapnya tentang cara menentukan opsi partition untuk tabel MaxCompute berpartisi, lihat bagian "Bagaimana cara mengonfigurasi opsi partition saat membaca atau menulis data ke partisi?" dalam topik FAQ tentang penyimpanan hulu dan hilir.
compressAlgorithm
Algoritma kompresi yang digunakan oleh MaxCompute Tunnel.
STRING
Tidak
SNAPPY
Nilai yang valid:
RAW (tanpa kompresi)
ZLIB
SNAPPY
Dibandingkan dengan ZLIB, SNAPPY dapat meningkatkan throughput secara signifikan. Dalam skenario pengujian, throughput meningkat sekitar 50%.
quotaName
Nama kuota untuk grup resource Tunnel eksklusif MaxCompute.
STRING
Tidak
Tidak ada nilai default
Anda dapat menentukan opsi ini untuk menggunakan grup resource Tunnel eksklusif MaxCompute.
PentingHanya VVR 8.0.3 atau versi lebih baru yang mendukung opsi ini.
Jika Anda menentukan opsi ini, Anda harus menghapus opsi tunnelEndpoint. Jika tidak, tunnel yang ditentukan oleh opsi tunnelEndpoint akan digunakan.
Khusus sumber
Opsi
Deskripsi
Tipe data
Wajib
Nilai default
Keterangan
maxPartitionCount
Jumlah maksimum partisi dari mana data dapat dibaca.
INTEGER
Tidak
100
Jika jumlah partisi dari mana data dibaca melebihi nilai opsi ini, pesan kesalahan berikut muncul:
"The number of matched partitions exceeds the default limit".PentingMembaca dari jumlah partisi yang berlebihan dapat membebani MaxCompute dan memperlambat startup pekerjaan. Jika diperlukan untuk kebutuhan bisnis Anda, sesuaikan nilai opsi ini secara manual.
useArrow
Menentukan apakah akan menggunakan format Arrow untuk membaca data.
BOOLEAN
Tidak
false
Format Arrow dapat digunakan untuk memanggil operasi API penyimpanan MaxCompute.
PentingOpsi ini hanya berlaku dalam penerapan batch.
Hanya VVR 8.0.8 atau versi lebih baru yang mendukung opsi ini.
splitSize
Ukuran data yang dapat ditarik sekaligus saat menggunakan format Arrow untuk membaca data.
MEMORYSIZE
Tidak
256 MB
Hanya VVR 8.0.8 atau versi lebih baru yang mendukung opsi ini.
PentingOpsi ini hanya berlaku dalam penerapan batch.
compressCodec
Algoritma kompresi yang digunakan saat menggunakan format Arrow untuk membaca data.
STRING
Tidak
""
Nilai yang valid:
"" (tanpa kompresi)
ZSTD
LZ4_FRAME
Dibandingkan tanpa kompresi, throughput dapat ditingkatkan jika Anda menentukan algoritma kompresi.
PentingOpsi ini hanya berlaku dalam penerapan batch.
Hanya VVR 8.0.8 atau versi lebih baru yang mendukung opsi ini.
dynamicLoadBalance
Menentukan apakah akan mengaktifkan alokasi shard secara dinamis.
BOOLEAN
Tidak
false
Nilai yang valid:
true
false
Alokasi shard secara dinamis dapat meningkatkan kinerja pemrosesan operator yang berbeda pada Realtime Compute for Apache Flink dan mengurangi waktu total yang diperlukan untuk membaca dari MaxCompute. Namun, hal ini dapat menyebabkan kesenjangan data karena jumlah total data yang dibaca oleh operator yang berbeda tidak konsisten.
PentingOpsi ini hanya berlaku dalam penerapan batch.
Hanya VVR 8.0.8 atau versi lebih baru yang mendukung opsi ini.
Opsi khusus untuk tabel sumber MaxCompute inkremental
Sumber tabel inkremental memantau partisi baru dengan melakukan polling berkala ke server MaxCompute untuk mendapatkan semua informasi partisi. Sebelum sumber mulai membaca data dari partisi baru, penulisan data ke partisi tersebut harus selesai. Untuk informasi selengkapnya, lihat bagian "Apa yang harus saya lakukan jika sumber tabel MaxCompute inkremental mendeteksi partisi baru saat data masih ditulis ke partisi tersebut?" dalam topik FAQ tentang penyimpanan hulu dan hilir. Anda dapat mengonfigurasi opsi startPartition untuk menentukan partisi awal dari mana data dibaca. Hanya data dalam partisi yang urutan abjadnya lebih besar dari atau sama dengan urutan abjad partisi yang ditentukan oleh opsi startPartition yang akan dibaca. Misalnya, urutan abjad partisi
year=2023,month=10lebih kecil daripada urutan abjad partisiyear=2023,month=9. Dalam kasus ini, Anda dapat menambahkan nol di depan angka bulan dalam nama partisi yang dideklarasikan dalam kode untuk memastikan urutan abjad partisi valid. Dengan cara ini, Anda dapat mengubah nilai opsi partition dari year=2023,month=9 menjadiyear=2023,month=09.Opsi
Deskripsi
Tipe data
Wajib
Nilai default
Keterangan
startPartition
Partisi awal dari mana data inkremental dibaca.
STRING
Ya
Tidak ada nilai default
Jika Anda menentukan opsi ini, sumber inkremental digunakan. Akibatnya, opsi partition diabaikan.
Jika tabel sumber adalah tabel berpartisi multi-level, Anda harus mengonfigurasi nilai setiap kolom partisi dalam urutan menurun berdasarkan level partisi.
CatatanUntuk informasi selengkapnya tentang cara menentukan opsi startPartition, lihat bagian "Bagaimana cara mengonfigurasi opsi startPartition untuk tabel sumber MaxCompute inkremental?" dalam topik FAQ tentang penyimpanan hulu dan hilir.
subscribeIntervalInSec
Interval polling ke MaxCompute untuk mendapatkan informasi partisi.
INTEGER
Tidak
30
Unit: detik.
modifiedTableOperation
Operasi yang dilakukan ketika data dalam partisi dimodifikasi selama pembacaan partisi.
Enum (NONE, SKIP)
Tidak
NONE
Sesi unduhan disimpan dalam checkpoint. Setiap kali Anda melanjutkan sesi dari checkpoint, Realtime Compute for Apache Flink mencoba melanjutkan progres pembacaan dari sesi tersebut. Namun, sesi tersebut tidak tersedia karena data dalam partisi dimodifikasi. Dalam kasus ini, penerapan akan terus-menerus dimulai ulang. Untuk mengatasi masalah ini, Anda dapat menentukan opsi ini. Nilai yang valid:
NONE: Jika Anda mengatur opsi ini ke NONE, Anda harus mengubah nilai opsi startPartition agar urutan abjad partisi yang ditentukan oleh opsi startPartition lebih besar dari urutan abjad partisi yang tidak tersedia, lalu memulai penerapan tanpa state.
SKIP: Jika Anda tidak ingin memulai penerapan tanpa state, Anda dapat mengatur opsi ini ke SKIP. Dalam hal ini, Realtime Compute for Apache Flink melewati partisi yang tidak tersedia saat mencoba melanjutkan sesi dari checkpoint.
PentingHanya VVR 8.0.3 atau versi lebih baru yang mendukung opsi ini.
Jika Anda mengatur opsi ini ke NONE atau SKIP, data yang telah dibaca dari partisi yang dimodifikasi dipertahankan, sedangkan data yang belum dibaca diabaikan.
Khusus sink
Opsi
Deskripsi
Tipe data
Wajib
Nilai default
Keterangan
useStreamTunnel
Menentukan apakah akan menggunakan MaxCompute Streaming Tunnel untuk mengunggah data.
BOOLEAN
Tidak
false
Nilai yang valid:
true: MaxCompute Streaming Tunnel digunakan untuk mengunggah data.
false: MaxCompute Batch Tunnel digunakan untuk mengunggah data.
CatatanUntuk informasi selengkapnya tentang cara memilih tunnel, lihat bagian "Bagaimana cara memilih tunnel data?" dalam topik FAQ tentang penyimpanan hulu dan hilir.
flushIntervalMs
Interval flush dalam buffer writer di MaxCompute Tunnel.
LONG
Tidak
30000 (30 detik)
Data dibuffer dan diflush secara batch sesuai interval yang ditentukan oleh
flushIntervalMs.Streaming Tunnel: Data yang diflush langsung tersedia di tabel MaxCompute tujuan.
Batch Tunnel: Data yang diflush hanya tersedia setelah operasi checkpointing selesai. Kami menyarankan Anda mengatur opsi ini ke 0 untuk menonaktifkan fitur flushing terjadwal.
Unit: milidetik.
CatatanOpsi ini dapat digunakan bersama opsi
batchSize. Operasi flush dipicu ketika kondisi yang ditentukan oleh opsibatchSizeatau opsiflushIntervalMsterpenuhi.batchSize
Ukuran buffer MaxCompute Tunnel.
LONG
Tidak
67108864 (64 MB)
Sink MaxCompute memasukkan data ke buffer. Kemudian, sink MaxCompute menulis data dalam buffer ke tabel MaxCompute tujuan ketika ukuran data buffer melebihi nilai yang ditentukan oleh opsi batchSize.
Unit: byte.
CatatanOpsi ini dapat digunakan bersama opsi flushIntervalMs. Operasi flush dipicu ketika kondisi yang ditentukan oleh opsi batchSize atau opsi flushIntervalMs terpenuhi.
numFlushThreads
Jumlah thread yang digunakan untuk flush data dalam buffer writer di MaxCompute Tunnel.
INTEGER
Tidak
1
Setiap sink MaxCompute membuat jumlah thread sesuai nilai opsi numFlushThreads untuk flush data. Jika nilai opsi ini lebih dari 1, data dalam partisi berbeda dapat diflush secara bersamaan. Hal ini meningkatkan efisiensi operasi flush.
slotNum
Jumlah slot Tunnel yang digunakan oleh MaxCompute untuk menerima data dari Flink.
INTEGER
Tidak
0
Untuk informasi tentang batasan jumlah slot, lihat Ikhtisar layanan transmisi data dalam dokumentasi MaxCompute.
dynamicPartitionLimit
Jumlah maksimum partisi dinamis tempat data dapat ditulis.
INTEGER
Tidak
100
Jika jumlah partisi dinamis tempat data ditulis dari sink antara dua checkpoint melebihi nilai opsi dynamicPartitionLimit, pesan kesalahan berikut muncul:
"Too many dynamic partitions".PentingJika data ditulis ke sejumlah besar partisi tabel MaxCompute, beban kerja pada layanan MaxCompute tinggi, sehingga memperlambat checkpointing dan flushing. Untuk mencegah masalah ini, Anda perlu memeriksa apakah data perlu ditulis ke banyak partisi. Jika bisnis Anda memerlukan penulisan data ke banyak partisi, tingkatkan nilai opsi dynamicPartitionLimit secara manual.
retryTimes
Jumlah maksimum retry permintaan ke server MaxCompute.
INTEGER
Tidak
3
Layanan MaxCompute mungkin tidak tersedia untuk periode singkat saat Anda membuat sesi, mengirim sesi, atau data diflush. Jika layanan MaxCompute tidak tersedia, server MaxCompute diminta berdasarkan konfigurasi opsi ini.
sleepMillis
Interval retry.
INTEGER
Tidak
1000
Satuan: milidetik.
enableUpsert
Menentukan apakah akan menggunakan MaxCompute Upsert Tunnel untuk mengunggah data.
BOOLEAN
Tidak
false
Nilai yang valid:
true: MaxCompute Upsert Tunnel digunakan untuk memproses data INSERT, UPDATE_AFTER, dan DELETE di Realtime Compute for Apache Flink.
false: MaxCompute Batch Tunnel atau MaxCompute Streaming Tunnel yang ditentukan oleh opsi useStreamTunnel digunakan untuk memproses data INSERT dan UPDATE_AFTER di Realtime Compute for Apache Flink.
PentingJika terjadi masalah seperti error, kegagalan penerapan, atau gangguan pemrosesan jangka panjang saat sink MaxCompute melakukan commit sesi dalam mode upsert, kami menyarankan Anda mengatur opsi Parallelism operator sink ke nilai kurang dari atau sama dengan 10.
Hanya VVR 8.0.6 atau versi lebih baru yang mendukung opsi ini.
upsertAsyncCommit
Menentukan apakah akan menggunakan mode asinkron saat sink MaxCompute melakukan commit sesi dalam mode upsert.
BOOLEAN
Tidak
false
Nilai yang valid:
true: Mode asinkron digunakan. Jika Anda menggunakan mode asinkron, waktu yang dibutuhkan untuk commit sesi berkurang, tetapi data yang ditulis setelah sesi dikomit tidak dapat langsung diquery.
false: Mode sinkron digunakan secara default. Saat sink MaxCompute melakukan commit sesi, sistem menunggu hingga server memproses sesi tersebut.
CatatanHanya VVR 8.0.6 atau versi lebih baru yang mendukung opsi ini.
upsertCommitTimeoutMs
Waktu timeout untuk commit sesi oleh sink MaxCompute dalam mode upsert.
INTEGER
Tidak
120000
(120 detik)
Unit: milidetik.
CatatanHanya VVR 8.0.6 atau versi lebih baru yang mendukung opsi ini.
sink.operation
Mode operasi penulisan untuk tabel Delta.
STRING
Tidak
insert
Nilai yang valid:
insert: Data ditulis ke tabel dalam mode append.
upsert: Data diperbarui.
CatatanHanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.
sink.parallelism
Tingkat paralelisme saat menulis data ke tabel Delta.
INTEGER
Tidak
None
Tingkat paralelisme penulisan data. Jika Anda tidak mengonfigurasi opsi ini, paralelisme data hulu digunakan secara default.
Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.
PentingPastikan nilai opsi write.bucket.num merupakan kelipatan bulat dari nilai opsi sink.parallelism. Hal ini membantu memastikan kinerja penulisan optimal dan menghemat memori node sink secara efisien.
sink.file-cached.enable
Menentukan apakah akan mengaktifkan mode cache file saat menulis data ke partisi dinamis tabel Delta.
BOOLEAN
Tidak
false
Nilai yang valid:
true: Mode cache file diaktifkan.
false: Mode cache file dinonaktifkan.
Jika Anda mengaktifkan mode cache file, jumlah file kecil yang ditulis ke server berkurang. Namun, latensi penulisan menjadi lebih tinggi. Kami menyarankan Anda mengaktifkan mode cache file saat tabel sink memiliki tingkat paralelisme tinggi.
CatatanHanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.
sink.file-cached.writer.num
Jumlah thread yang digunakan untuk mengunggah data secara konkuren dalam satu task dalam mode cache file.
INTEGER
Tidak
16
Opsi ini hanya berlaku jika opsi sink.file-cached.enable diatur ke true.
Kami menyarankan Anda tidak menaikkan nilai opsi ini ke angka yang terlalu besar. Jika data ditulis ke banyak partisi secara bersamaan, error out of memory (OOM) dapat terjadi.
CatatanHanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.
sink.bucket.check-interval
Interval pemeriksaan ukuran file dalam mode cache file. Unit: milidetik.
INTEGER
Tidak
60000
Opsi ini hanya berlaku jika opsi sink.file-cached.enable diatur ke true.
Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.
sink.file-cached.rolling.max-size
Nilai maksimum file cache tunggal dalam mode cache file.
MEMORYSIZE
Tidak
16 MB
Opsi ini hanya berlaku jika opsi sink.file-cached.enable diatur ke true.
Jika ukuran file melebihi nilai opsi ini, data file diunggah ke server.
CatatanHanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.
sink.file-cached.memory
Ukuran maksimum memori off-heap yang digunakan untuk menulis data ke file dalam mode cache file.
MEMORYSIZE
Tidak
64 MB
Opsi ini hanya berlaku jika opsi sink.file-cached.enable diatur ke true.
Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.
sink.file-cached.memory.segment-size
Ukuran buffer yang digunakan untuk menulis data ke file dalam mode cache file.
MEMORYSIZE
Tidak
128 KB
Opsi ini hanya berlaku jika opsi sink.file-cached.enable diatur ke true.
Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.
sink.file-cached.flush.always
Menentukan apakah cache digunakan untuk menulis data ke file dalam mode cache file.
BOOLEAN
Tidak
true
Opsi ini hanya berlaku jika opsi sink.file-cached.enable diatur ke true.
Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.
sink.file-cached.write.max-retries
Jumlah retry untuk mengunggah data dalam mode cache file.
INTEGER
Tidak
3
Opsi ini hanya berlaku jika opsi sink.file-cached.enable diatur ke true.
Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.
upsert.writer.max-retries
Jumlah maksimum retry untuk menulis data ke bucket dalam sesi Upsert Writer.
INTEGER
Tidak
3
Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.
upsert.writer.buffer-size
Ukuran buffer sesi Upsert Writer di Realtime Compute for Apache Flink.
MEMORYSIZE
Tidak
64 MB
Ketika ukuran buffer total semua bucket mencapai ambang batas yang ditentukan, sistem secara otomatis memperbarui data ke server.
CatatanData dalam sesi Upsert Writer dapat ditulis ke beberapa bucket secara bersamaan. Kami menyarankan Anda menaikkan nilai opsi ini untuk meningkatkan efisiensi penulisan.
Jika data ditulis ke banyak partisi, error OOM dapat terjadi. Untuk mencegah hal ini, Anda dapat menurunkan nilai opsi ini.
Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.
upsert.writer.bucket.buffer-size
Ukuran buffer bucket tunggal di Realtime Compute for Apache Flink.
MEMORYSIZE
Tidak
1 MB
Jika sumber daya memori server Flink tidak mencukupi, Anda dapat menurunkan nilai opsi ini.
Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.
upsert.write.bucket.num
Jumlah bucket untuk tabel tempat data ditulis.
INTEGER
Ya
None
Nilai opsi ini harus sama dengan nilai opsi
write.bucket.numyang dikonfigurasi untuk tabel Delta tempat data ditulis.Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.
upsert.write.slot-num
Jumlah slot Tunnel yang digunakan dalam sesi.
INTEGER
Tidak
1
Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.
upsert.commit.max-retries
Jumlah maksimum retry untuk commit sesi upsert.
INTEGER
Tidak
3
Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.
upsert.commit.thread-num
Tingkat paralelisme commit sesi upsert.
INTEGER
Tidak
16
Kami menyarankan Anda tidak menaikkan nilai opsi ini ke angka yang terlalu besar. Jika terlalu banyak commit sesi upsert dilakukan secara bersamaan, konsumsi sumber daya meningkat. Hal ini dapat menyebabkan masalah kinerja atau konsumsi sumber daya berlebihan.
Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.
upsert.commit.timeout
Waktu timeout untuk commit sesi upsert. Unit: detik.
INTEGER
Tidak
600
Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.
upsert.flush.concurrent
Jumlah maksimum bucket tempat data dalam partisi dapat ditulis secara bersamaan.
INTEGER
Tidak
2
Setiap kali data dalam bucket direfresh, satu slot Tunnel digunakan.
Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.
insert.commit.thread-num
Tingkat paralelisme sesi commit.
INTEGER
Tidak
16
Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.
insert.arrow-writer.enable
Menentukan apakah akan menggunakan format Arrow.
BOOLEAN
Tidak
false
Nilai yang valid:
true: Format Arrow digunakan.
false: Format Arrow tidak digunakan.
CatatanHanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.
insert.arrow-writer.batch-size
Jumlah maksimum baris dalam batch data berformat Arrow.
INTEGER
Tidak
512
Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.
insert.arrow-writer.flush-interval
Interval flush writer. Unit: milidetik.
INTEGER
Tidak
100000
Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.
insert.writer.buffer-size
Ukuran cache untuk buffered writer.
MEMORYSIZE
Tidak
64 MB
Hanya VVR 8.0.10 atau versi lebih baru yang mendukung opsi ini.
upsert.partial-column.enable
Menentukan apakah hanya akan memperbarui data di kolom tertentu.
BOOLEAN
Tidak
false
Opsi ini hanya berlaku untuk tabel sink yang menulis data ke Tabel Delta MaxCompute. Untuk informasi selengkapnya, lihat Perbarui data di kolom tertentu dalam dokumentasi MaxCompute.
Nilai yang valid:
true
false
Perilaku pembaruan data bergantung pada apakah sink memiliki catatan dengan primary key yang sama dengan data baru.
Jika tabel sink berisi data dengan primary key yang sama, bidang yang sesuai diperbarui berdasarkan primary key tersebut. Secara spesifik, bidang yang ditentukan ditimpa dengan nilai baru jika nilainya bukan
null.Jika tabel sink tidak berisi catatan dengan primary key yang sama, catatan baru akan ditambahkan. Nilai baru akan dimasukkan untuk kolom yang ditentukan, sedangkan
nullakan dimasukkan untuk semua kolom lainnya.
CatatanHanya VVR 8.0.11 atau versi lebih baru yang mendukung opsi ini.
Khusus tabel dimensi
Saat penerapan dimulai, tabel dimensi menarik data lengkap dari partisi yang ditentukan oleh opsi partition. Opsi ini mendukung fungsi max_pt(). Jika cache dimuat ulang setelah entri cache kedaluwarsa, data dari partisi terbaru yang ditentukan oleh opsi partition akan diurai ulang. Jika opsi partition diatur ke max_two_pt(), tabel dimensi dapat menarik data dari dua partisi. Jika tidak, hanya data dari satu partisi yang dapat ditarik.
Opsi
Deskripsi
Tipe data
Wajib
Nilai default
Keterangan
cache
Kebijakan cache.
STRING
Ya
Tidak ada nilai default
Anda harus mengatur opsi cache ke
ALLuntuk tabel dimensi dan secara eksplisit mendeklarasikan pengaturan ini dalam pernyataan DDL. Jika jumlah data dalam tabel remote kecil dan terdapat banyak missing key, kami menyarankan Anda mengatur opsi ini ke ALL. Sumber dan tabel dimensi tidak dapat diasosiasikan berdasarkan klausa ON.ALL: menunjukkan bahwa semua data dalam tabel dimensi dicache. Sebelum sistem menjalankan penerapan, sistem memuat semua data dalam tabel dimensi ke cache. Dengan demikian, cache digunakan untuk semua query selanjutnya pada tabel dimensi. Jika tidak ada key yang cocok, sistem tidak dapat menemukan catatan data dalam cache. Sistem memuat ulang semua data dalam cache setelah entri cache kedaluwarsa.
CatatanJika opsi cache diatur ke ALL, Anda harus menambah memori node join karena sistem memuat data tabel dimensi secara asinkron. Kami menyarankan Anda menambah ukuran memori minimal empat kali lipat dari jumlah data dalam tabel remote. Ukuran memori terkait dengan algoritma kompresi penyimpanan MaxCompute.
Jika tabel dimensi berisi banyak data, Anda dapat menggunakan petunjuk SHUFFLE_HASH untuk mendistribusikan data secara merata ke setiap subtask. Untuk informasi selengkapnya, lihat bagian "Bagaimana cara menggunakan petunjuk SHUFFLE_HASH untuk tabel dimensi?" dalam topik FAQ tentang penyimpanan hulu dan hilir.
Jika Anda menggunakan tabel dimensi berukuran sangat besar, garbage collection (GC) JVM yang sering dapat menyebabkan exception penerapan. Untuk mengatasi masalah ini, Anda dapat menambah memori node tempat tabel dimensi di-join dengan tabel lain. Jika masalah tetap berlanjut, kami menyarankan Anda mengonversi tabel dimensi menjadi tabel dimensi key-value yang mendukung kebijakan cache least recently used (LRU). Misalnya, Anda dapat menggunakan tabel dimensi ApsaraDB for HBase sebagai tabel dimensi key-value.
cacheSize
Jumlah maksimum baris data yang dapat dicache.
LONG
Tidak
100000
Jika jumlah catatan data dalam tabel dimensi melebihi nilai opsi cacheSize, pesan kesalahan berikut muncul:
"Row count of table <table-name> partition <partition-name> exceeds maxRowCount limit".PentingJika tabel dimensi berisi banyak catatan data, sejumlah besar memori heap JVM dikonsumsi. Dalam kasus ini, kecepatan startup penerapan dan kecepatan pembaruan tabel dimensi melambat. Untuk mencegah masalah ini, Anda perlu memeriksa apakah banyak catatan data perlu dicache. Jika bisnis Anda memerlukan caching banyak catatan data dalam tabel dimensi, tingkatkan nilai opsi ini secara manual.
cacheTTLMs
Waktu timeout cache.
LONG
Tidak
Long.MAX_VALUE
Unit: milidetik.
cacheReloadTimeBlackList
Periode waktu saat cache tidak direfresh. Cache tidak direfresh selama periode waktu yang ditentukan oleh opsi ini.
STRING
Tidak
Tidak ada nilai default
Opsi ini berlaku untuk acara promosi online berskala besar seperti jam puncak aktivitas. Anda dapat menentukan opsi ini untuk mencegah penerapan menjadi tidak stabil saat cache direfresh. Untuk informasi selengkapnya tentang cara menentukan opsi ini, lihat bagian "Bagaimana cara mengonfigurasi opsi CacheReloadTimeBlackList?" dalam topik FAQ tentang penyimpanan hulu dan hilir.
maxLoadRetries
Jumlah maksimum retry untuk refresh cache. Saat data pertama kali ditarik saat penerapan dimulai, cache direfresh. Jika jumlah retry melebihi nilai opsi ini, penerapan gagal dijalankan.
INTEGER
Tidak
10
N/A.
Pemetaan tipe data
Untuk informasi selengkapnya tentang tipe data yang didukung oleh MaxCompute, lihat Sistem tipe data MaxCompute versi 2.0.
Tipe data MaxCompute | Tipe data Realtime Compute for Apache Flink |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(precision, scale) | DECIMAL(precision, scale) |
CHAR(n) | CHAR(n) |
VARCHAR(n) | VARCHAR(n) |
STRING | STRING |
BINARY | BYTES |
DATE | DATE |
DATETIME | TIMESTAMP(3) |
TIMESTAMP | TIMESTAMP(9) |
TIMESTAMP_NTZ | TIMESTAMP(9) |
ARRAY | ARRAY |
MAP | MAP |
STRUCT | ROW |
JSON | STRING |
Jika tabel fisik MaxCompute berisi field bertipe data komposit bersarang (ARRAY, MAP, atau STRUCT, dll.) dan field bertipe JSON, Anda harus menentukan tblproperties('columnar.nested.type'='true') saat membuat tabel fisik MaxCompute agar Realtime Compute for Apache Flink dapat membaca dan menulis data ke tabel tersebut dengan benar.
Flink CDC (pratinjau publik)
Konektor MaxCompute dapat digunakan sebagai sink ingesti data dalam pekerjaan berbasis YAML.
Persyaratan mesin VVR
VVR 11.1 atau versi lebih baru
Sintaksis
source:
type: xxx
sink:
type: maxcompute
name: MaxComputeSinkaccess-id: ${your_accessId}
access-key: ${your_accessKey}
endpoint: ${your_maxcompute_endpoint}
project: ${your_project}buckets-num: 8Opsi konfigurasi
Opsi | Wajib? | Nilai default | Tipe data | Deskripsi |
type | Ya | Tidak ada nilai default. | String | Konektor yang digunakan. Atur ke |
name | Tidak | Tidak ada nilai default. | String | Nama sink. |
access-id | Ya | Tidak ada nilai default. | String | ID AccessKey Akun Alibaba Cloud atau Pengguna RAM Anda. Dapatkan di Konsol Resource Access Management. |
access-key | Ya | Tidak ada nilai default. | String | Rahasia AccessKey Anda. |
endpoint | Ya | Tidak ada nilai default. | String | Titik akhir MaxCompute Anda. Konfigurasikan berdasarkan wilayah tempat proyek MaxCompute Anda berada dan metode koneksi jaringan. Untuk informasi selengkapnya, lihat Endpoint. |
project | Ya | Tidak ada nilai default. | String | Nama proyek MaxCompute Anda. Lakukan langkah-langkah berikut untuk mendapatkannya:
|
tunnel.endpoint | Tidak | Tidak ada nilai default. | String | Titik akhir MaxCompute Tunnel. Titik akhir ini biasanya disimpulkan secara otomatis oleh MaxCompute dari pengaturan |
quota.name | Tidak | Tidak ada nilai default. | String | Nama kuota grup resource eksklusif. Jika Anda tidak menentukan opsi ini secara eksplisit, grup resource bersama digunakan. |
sts-token | Tidak | Tidak ada nilai default. | String | Token STS Peran RAM Anda. Opsi ini wajib untuk otentikasi identitas jika Anda menggunakan Peran RAM untuk mengakses MaxCompute. |
buckets-num | Tidak | 16 | Integer | Jumlah bucket untuk tabel Delta MaxCompute yang dibuat secara otomatis. Untuk informasi selengkapnya, lihat Gudang data near real-time. |
compress.algorithm | Tidak | zlib | String | Algoritma kompresi data. Nilai yang valid:
|
total.buffer-size | Tidak | 64 MB | String | Ukuran buffer dalam memori. Untuk tabel berpartisi, buffer ini berlaku di tingkat partisi. Untuk tabel non-partisi, berlaku di tingkat tabel. Buffer untuk partisi atau tabel yang berbeda bersifat independen. Saat buffer mencapai kapasitas, datanya diflush ke MaxCompute. |
bucket.buffer-size | Tidak | 4 MB | String | Ukuran buffer dalam memori untuk bucket. Opsi ini hanya berlaku saat data ditulis ke tabel Delta MaxCompute. Buffer untuk bucket yang berbeda bersifat independen. Saat buffer mencapai kapasitas, datanya diflush ke MaxCompute. |
commit.thread-num | Tidak | 16 | Integer | Jumlah maksimum partisi atau tabel yang dapat dikomit secara konkuren selama checkpointing. |
flush.concurrent-num | Tidak | 4 | Integer | Menentukan jumlah maksimum bucket tempat Flink dapat secara konkuren melakukan flush data. Opsi ini hanya berlaku saat data ditulis ke tabel Delta MaxCompute. |
Pemetaan lokasi tabel
Saat konektor memicu pembuatan tabel otomatis di MaxCompute, lokasi dipetakan sebagai berikut:
Jika fitur skema dinonaktifkan untuk proyek MaxCompute Anda, konektor akan mengabaikan tableId.namespace. Dalam hal ini, hanya satu database atau ekuivalen logisnya yang diingesti ke MaxCompute. Misalnya, saat data diingesti dari MySQL ke MaxCompute, hanya satu database MySQL yang dimasukkan.
Lokasi MySQL | Abstraksi di Flink CDC | Lokasi MaxCompute |
N/A | Proyek dalam file konfigurasi | Proyek |
Database | TableId.namespace | Skema Catatan Jika skema dinonaktifkan untuk proyek MaxCompute Anda, pengaturan ini diabaikan. |
Tabel | TableId.tableName | Tabel |
Pemetaan tipe data
Tipe Flink CDC | Tipe MaxCompute |
CHAR | STRING |
VARCHAR | STRING |
BOOLEAN | BOOLEAN |
BINARY/VARBINARY | BINARY |
DECIMAL | DECIMAL |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INTEGER | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
TIME_WITHOUT_TIME_ZONE | STRING |
DATE | DATE |
TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_NTZ |
TIMESTAMP_WITH_LOCAL_TIME_ZONE(Precision>3) | TIMESTAMP |
TIMESTAMP_WITH_LOCAL_TIME_ZONE(Precision<=3) | DATETIME |
TIMESTAMP_WITH_TIME_ZONE(Precision>3) | TIMESTAMP |
TIMESTAMP_WITH_TIME_ZONE(Precision<=3) | DATETIME |
ARRAY | ARRAY |
MAP | MAP |
ROW | STRUCT |
Contoh
SQL API
Tabel sumber
Baca semua data dalam partisi tertentu
Baca semua data dalam partisi yang ditentukan oleh opsi partition.
CREATE TEMPORARY TABLE odps_source (
cid VARCHAR,
rt DOUBLE
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpointName>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=201809*'
);
CREATE TEMPORARY TABLE blackhole_sink (
cid VARCHAR,
invoke_count BIGINT
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT
cid,
COUNT(*) AS invoke_count
FROM odps_source GROUP BY cid;Baca data inkremental
Baca data mulai dari partisi yang ditentukan oleh startPartition dan pantau catatan baru secara berkelanjutan.
CREATE TEMPORARY TABLE odps_source (
cid VARCHAR,
rt DOUBLE
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpointName>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'startPartition' = 'yyyy=2018,MM=09,dd=05' -- Mulai membaca dari partisi 20180905.
);
CREATE TEMPORARY TABLE blackhole_sink (
cid VARCHAR,
invoke_count BIGINT
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT cid, COUNT(*) AS invoke_count
FROM odps_source GROUP BY cid;Tabel sink
Tulis ke partisi statis
Tulis ke partisi yang ditentukan oleh opsi partition:
CREATE TEMPORARY TABLE datagen_source (
id INT,
len INT,
content VARCHAR
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_sink (
id INT,
len INT,
content VARCHAR
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=20180905' -- Data ditulis ke partisi 20180905.
);
INSERT INTO odps_sink
SELECT
id, len, content
FROM datagen_source;Tulis ke partisi secara dinamis
Tulis data ke partisi secara dinamis berdasarkan opsi partition:
CREATE TEMPORARY TABLE datagen_source (
id INT,
len INT,
content VARCHAR,
c TIMESTAMP
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_sink (
id INT,
len INT,
content VARCHAR,
ds VARCHAR -- Tentukan secara eksplisit kolom partisi dinamis.
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds' -- Jangan tentukan nilai untuk ds. Data ditulis ke partisi berbeda berdasarkan nilai field ds.
);
INSERT INTO odps_sink
SELECT
id,
len,
content,
DATE_FORMAT(c, 'yyMMdd') as ds
FROM datagen_source;Tabel dimensi
Key bernilai tunggal
Tentukan primary key saat setiap key memiliki nilai unik:
CREATE TEMPORARY TABLE datagen_source (
k INT,
v VARCHAR
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_dim (
k INT,
v VARCHAR,
PRIMARY KEY (k) NOT ENFORCED -- Tentukan primary key.
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=20180905',
'cache' = 'ALL'
);
CREATE TEMPORARY TABLE blackhole_sink (
k VARCHAR,
v1 VARCHAR,
v2 VARCHAR
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT k, s.v, d.v
FROM datagen_source AS s
INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;Key bernilai ganda
Jangan tentukan primary key saat key dapat memiliki beberapa nilai:
CREATE TEMPORARY TABLE datagen_source (
k INT,
v VARCHAR
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_dim (
k INT,
v VARCHAR
-- Menentukan primary key tidak diperlukan.
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=20180905',
'cache' = 'ALL'
);
CREATE TEMPORARY TABLE blackhole_sink (
k VARCHAR,
v1 VARCHAR,
v2 VARCHAR
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT k, s.v, d.v
FROM datagen_source AS s
INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;DataStream API
Jika Anda ingin memanggil DataStream API untuk membaca atau menulis data, gunakan konektor DataStream jenis terkait. Untuk informasi selengkapnya tentang cara mengonfigurasi konektor DataStream, lihat Integrasikan konektor DataStream.
Untuk melindungi kekayaan intelektual, VVR 6.0.6 atau versi lebih baru mendukung debugging lokal program DataStream menggunakan konektor MaxCompute hingga 30 menit. Sesi debugging yang lebih lama akan mengakibatkan program dihentikan dengan error. Untuk informasi selengkapnya, lihat Debug konektor secara lokal.
Membaca data dari Tabel Delta MaxCompute tidak didukung. Tabel Delta adalah tabel yang dibuat dengan
primary keytertentu dan propertitransactional=true. Untuk informasi selengkapnya, lihat Istilah.
Disarankan untuk mendeklarasikan tabel MaxCompute menggunakan pernyataan SQL saat menggunakan konektor DataStream MaxCompute. Anda dapat memanggil operasi Table API untuk mengakses tabel MaxCompute atau memanggil operasi DataStream API untuk mengakses stream data.
Hubungkan ke tabel sumber
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
"\n",
"CREATE TEMPORARY TABLE IF NOT EXISTS odps_source (",
" cid VARCHAR,",
" rt DOUBLE",
") WITH (",
" 'connector' = 'odps',",
" 'endpoint' = '<yourEndpointName>',",
" 'project' = '<yourProjectName>',",
" 'tableName' = '<yourTableName>',",
" 'accessId' = '<yourAccessId>',",
" 'accessKey' = '<yourAccessPassword>',",
" 'partition' = 'ds=201809*'",
")");
DataStream<Row> source = tEnv.toDataStream(tEnv.from("odps_source"));
source.print();
env.execute("odps source"); Hubungkan ke sink
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
"\n",
"CREATE TEMPORARY TABLE IF NOT EXISTS odps_sink (",
" cid VARCHAR,",
" rt DOUBLE",
") WITH (",
" 'connector' = 'odps',",
" 'endpoint' = '<yourEndpointName>',",
" 'project' = '<yourProjectName>',",
" 'tableName' = '<yourTableName>',",
" 'accessId' = '<yourAccessId>',",
" 'accessKey' = '<yourAccessPassword>',",
" 'partition' = 'ds=20180905'",
")");
DataStream<Row> data = env.fromElements(
Row.of("id0", 3.),
Row.of("id1", 4.));
tEnv.fromDataStream(data).insertInto("odps_sink").execute();XML
Dependensi Maven konektor MaxCompute mencakup kelas yang diperlukan untuk membuat sumber lengkap, sumber inkremental, sink, dan tabel dimensi. Konektor DataStream MaxCompute versi berbeda tersedia di repositori pusat Maven.
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-odps</artifactId>
<version>${vvr-version}</version>
</dependency>