Topik ini menjelaskan cara menggunakan MaxCompute connector.
Latar Belakang
MaxCompute (sebelumnya dikenal sebagai ODPS) adalah platform komputasi cepat dan sepenuhnya terkelola untuk gudang data skala besar. MaxCompute mampu memproses data dalam skala exabyte serta menyediakan solusi penyimpanan dan pemrosesan data terstruktur dalam jumlah besar di gudang data, lengkap dengan layanan analitik dan pemodelan.
Tabel berikut menguraikan kemampuan yang didukung oleh MaxCompute connector.
Item | Deskripsi |
Jenis yang didukung | Tabel sumber, tabel dimensi, dan Tabel sink |
Mode menjalankan | Mode streaming dan mode Batch |
Format data | T/A |
Metrik | |
Jenis API | DataStream API dan SQL API |
Pembaruan atau penghapusan data di Tabel sink | Jika MaxCompute Batch Tunnel atau MaxCompute Streaming Tunnel digunakan, data hanya dapat dimasukkan ke dalam Tabel sink. Jika MaxCompute Upsert Tunnel digunakan, data di Tabel sink dapat diperbarui atau dihapus, serta data dapat dimasukkan ke dalam Tabel sink. |
Prasyarat
Tabel MaxCompute telah dibuat. Untuk informasi lebih lanjut tentang cara membuat tabel MaxCompute, lihat Buat Tabel.
Batasan
MaxCompute connector hanya mendukung semantik setidaknya sekali.
CatatanSemantik setidaknya sekali digunakan untuk mencegah kehilangan data. Dalam kasus tertentu, data duplikat mungkin ditulis ke MaxCompute. Data duplikat dapat dihasilkan berdasarkan tunnel yang Anda gunakan. Untuk informasi lebih lanjut tentang MaxCompute Tunnel, lihat bagian "Bagaimana cara memilih data tunnel?" dari topik FAQ tentang Penyimpanan Hulu dan Hilir.
Secara default, sumber beroperasi dalam mode penuh dan hanya membaca data dari partisi yang ditentukan oleh opsi
partition. Setelah semua data dari partisi dibaca, pekerjaan selesai dan tidak memantau partisi baru.Untuk terus memantau partisi baru, buat sumber tambahan dengan menentukan opsi
startPartitiondalam klausa WITH.CatatanSetiap kali tabel dimensi diperbarui, tabel dimensi memeriksa partisi terbaru.
Setelah tabel sumber mulai berjalan, ia tidak membaca data yang baru ditambahkan ke partisi. Kami sarankan Anda menjalankan penyebaran ketika partisi berisi data lengkap.
SQL
MaxCompute connector dapat digunakan sebagai sumber, dimensi, atau tabel sink dalam pekerjaan berbasis SQL.
Sintaksis
CREATE TEMPORARY TABLE odps_source(
id INT,
user_name VARCHAR,
content VARCHAR
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'schemaName' = '<yourSchemaName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=2018****'
);Opsi konektor
Umum
Opsi
Deskripsi
Tipe data
Diperlukan
Nilai default
Catatan
connector
Jenis tabel.
STRING
Ya
Tidak ada nilai default
Tetapkan nilainya menjadi odps.
endpoint
Titik akhir MaxCompute.
STRING
Ya
Tidak ada nilai default
Untuk informasi lebih lanjut, lihat Titik akhir.
tunnelEndpoint
Titik akhir MaxCompute Tunnel.
STRING
Tidak
Tidak ada nilai default
Untuk informasi lebih lanjut, lihat Titik akhir.
CatatanJika opsi ini tidak ditentukan, MaxCompute mengalokasikan koneksi tunnel berdasarkan layanan Server Load Balancer (SLB).
project
Nama proyek MaxCompute.
STRING
Ya
Tidak ada nilai default
Tidak tersedia.
schemaName
Nama skema MaxCompute.
STRING
Tidak
Tidak ada nilai default
Opsi ini diperlukan hanya jika fitur skema MaxCompute diaktifkan. Dalam hal ini, Anda harus menetapkan opsi ini ke nama skema tabel MaxCompute. Untuk informasi lebih lanjut, lihat Operasi terkait skema.
CatatanHanya VVR 8.0.6 atau yang lebih baru yang mendukung opsi ini.
tableName
Nama tabel MaxCompute.
STRING
Ya
Tidak ada nilai default
Tidak tersedia.
accessId
ID AccessKey yang digunakan untuk mengakses MaxCompute.
STRING
Ya
Tidak ada nilai default
Untuk informasi lebih lanjut, lihat Bagaimana cara melihat informasi tentang ID AccessKey dan rahasia AccessKey akun?
PentingUntuk melindungi pasangan AccessKey Anda, kami sarankan Anda mengonfigurasi ID AccessKey menggunakan variabel. Untuk informasi lebih lanjut, lihat Kelola variabel.
accessKey
Rahasia AccessKey yang digunakan untuk mengakses MaxCompute.
STRING
Ya
Tidak ada nilai default
partition
Nama partisi dalam tabel MaxCompute.
STRING
Tidak
Tidak ada nilai default
Anda tidak perlu menentukan opsi ini untuk tabel MaxCompute non-partisi atau sumber inkremental.
CatatanUntuk informasi lebih lanjut tentang cara menentukan opsi partisi untuk tabel MaxCompute yang dipartisi, lihat bagian "Bagaimana cara mengonfigurasi opsi partisi saat data dibaca dari atau ditulis ke partisi?" dari topik FAQ tentang penyimpanan hulu dan hilir.
compressAlgorithm
Algoritma kompresi yang digunakan oleh MaxCompute Tunnel.
STRING
Tidak
SNAPPY
Nilai valid:
RAW (tanpa kompresi)
ZLIB
SNAPPY
Dibandingkan dengan ZLIB, SNAPPY dapat secara signifikan meningkatkan throughput. Dalam skenario uji, throughput meningkat sekitar 50%.
quotaName
Nama kuota untuk grup sumber daya eksklusif Tunnel MaxCompute.
STRING
Tidak
Tidak ada nilai default
Anda dapat menentukan opsi ini untuk menggunakan grup sumber daya eksklusif Tunnel MaxCompute.
PentingHanya VVR 8.0.3 atau lebih baru yang mendukung opsi ini.
Jika Anda menentukan opsi ini, Anda harus menghapus opsi tunnelEndpoint. Jika tidak, tunnel yang ditentukan oleh opsi tunnelEndpoint akan digunakan.
Khusus Sumber
Opsi
Deskripsi
Tipe data
Diperlukan
Nilai default
Catatan
maxPartitionCount
Jumlah maksimum partisi dari mana data dapat dibaca.
INTEGER
Tidak
100
Jika jumlah partisi dari mana data dibaca melebihi nilai opsi ini, pesan kesalahan ini muncul:
"Jumlah partisi yang cocok melebihi batas default".PentingMembaca dari jumlah partisi yang berlebihan dapat membebani MaxCompute dan memperlambat startup pekerjaan. Jika diperlukan untuk kebutuhan bisnis Anda, sesuaikan secara manual nilai opsi ini.
useArrow
Menentukan apakah akan menggunakan format Arrow untuk membaca data.
BOOLEAN
Tidak
false
Format Arrow dapat digunakan untuk memanggil operasi API penyimpanan MaxCompute.
PentingOpsi ini hanya berlaku dalam penyebaran batch.
Hanya VVR 8.0.8 atau yang lebih baru yang mendukung opsi ini.
splitSize
Ukuran data yang dapat ditarik sekaligus saat format Arrow digunakan untuk membaca data.
MEMORYSIZE
Tidak
256 MB
Hanya VVR 8.0.8 atau yang lebih baru yang mendukung opsi ini.
PentingOpsi ini hanya berlaku dalam penyebaran batch.
compressCodec
Algoritma kompresi yang digunakan saat format Arrow digunakan untuk membaca data.
STRING
Tidak
""
Nilai valid:
"" (tanpa kompresi)
ZSTD
LZ4_FRAME
Dibandingkan tanpa kompresi, throughput dapat ditingkatkan jika Anda menentukan algoritma kompresi.
PentingOpsi ini hanya berlaku dalam penyebaran batch.
Hanya VVR 8.0.8 atau yang lebih baru yang mendukung opsi ini.
dynamicLoadBalance
Menentukan apakah akan mengaktifkan alokasi dinamis shard.
BOOLEAN
Tidak
false
Nilai valid:
true
false
Alokasi dinamis shard dapat meningkatkan kinerja pemrosesan operator yang berbeda dari Realtime Compute for Apache Flink dan mengurangi waktu keseluruhan yang diperlukan untuk membaca dari MaxCompute. Namun, ini dapat menyebabkan skew data karena jumlah total data yang dibaca oleh operator yang berbeda tidak konsisten.
PentingOpsi ini hanya berlaku dalam penyebaran batch.
Hanya VVR 8.0.8 atau yang lebih baru yang mendukung opsi ini.
Opsi Spesifik untuk Tabel Sumber MaxCompute Inkremental
Tabel sumber inkremental memantau partisi baru dengan memeriksa secara berkala server MaxCompute untuk mendapatkan semua informasi partisi. Sebelum sumber mulai membaca data dari partisi baru, penulisan data di partisi tersebut harus selesai. Untuk informasi lebih lanjut, lihat bagian "Apa yang harus saya lakukan jika tabel sumber MaxCompute inkremental mendeteksi partisi baru saat data masih ditulis ke partisi?" dari topik FAQ tentang Penyimpanan Hulu dan Hilir. Anda dapat mengonfigurasi opsi startPartition untuk menentukan partisi awal dari mana data dibaca. Hanya data dalam partisi yang urutan alfabetisnya lebih besar dari atau sama dengan urutan alfabetis partisi yang ditentukan oleh opsi startPartition yang dibaca. Sebagai contoh, urutan alfabetis partisi
year=2023,month=10kurang dari urutan alfabetis partisiyear=2023,month=9. Dalam hal ini, Anda dapat menambahkan nol sebelum nomor bulan pada nama partisi yang dideklarasikan dalam kode untuk memastikan bahwa urutan alfabetis partisi valid. Dengan cara ini, Anda dapat mengubah nilai opsi partisi dari year=2023,month=9 menjadiyear=2023,month=09.Opsi
Deskripsi
Tipe data
Diperlukan
Nilai default
Catatan
startPartition
Partisi awal dari mana data inkremental dibaca.
STRING
Ya
Tidak ada nilai default
Jika Anda menentukan opsi ini, sumber inkremental digunakan. Akibatnya, opsi partisi diabaikan.
Jika tabel sumber adalah tabel partisi multi-level, Anda harus mengonfigurasi nilai setiap kolom partisi dalam urutan menurun berdasarkan tingkat partisi.
CatatanUntuk informasi lebih lanjut tentang cara menentukan opsi startPartition, lihat bagian "Bagaimana cara mengonfigurasi opsi startPartition untuk tabel sumber MaxCompute inkremental?" dari topik FAQ tentang penyimpanan hulu dan hilir.
subscribeIntervalInSec
Interval di mana MaxCompute dipolling untuk mendapatkan informasi tentang partisi.
INTEGER
Tidak
30
Satuan: detik.
modifiedTableOperation
Operasi yang dilakukan saat data dalam partisi dimodifikasi selama pembacaan partisi.
Enum (NONE, SKIP)
Tidak
NONE
Sesi unduhan disimpan dalam checkpoint. Setiap kali Anda melanjutkan sesi dari checkpoint, Realtime Compute for Apache Flink mencoba melanjutkan kemajuan pembacaan dari sesi. Namun, sesi tidak tersedia karena data dalam partisi dimodifikasi. Dalam hal ini, penyebaran di-restart berulang kali. Untuk menyelesaikan masalah ini, Anda dapat menentukan opsi ini. Nilai valid:
NONE: Jika Anda mengatur opsi ini ke NONE, Anda harus mengubah nilai opsi startPartition agar urutan alfabetis partisi yang ditentukan oleh opsi startPartition lebih besar dari urutan alfabetis partisi yang tidak tersedia dan mulai penyebaran tanpa status.
SKIP: Jika Anda tidak ingin memulai penyebaran tanpa status, Anda dapat mengatur opsi ini ke SKIP. Dalam hal ini, Realtime Compute for Apache Flink melewati partisi yang tidak tersedia saat mencoba melanjutkan sesi dari checkpoint.
PentingHanya VVR 8.0.3 atau yang lebih baru yang mendukung opsi ini.
Jika Anda mengatur opsi ini ke NONE atau SKIP, data yang dibaca dari partisi tempat data dimodifikasi tetap disimpan, dan data yang belum dibaca diabaikan.
Khusus Sink
Opsi
Deskripsi
Tipe data
Diperlukan
Nilai default
Catatan
useStreamTunnel
Menentukan apakah akan menggunakan MaxCompute Streaming Tunnel untuk mengunggah data.
BOOLEAN
Tidak
false
Nilai valid:
true: MaxCompute Streaming Tunnel digunakan untuk mengunggah data.
false: MaxCompute Batch Tunnel digunakan untuk mengunggah data.
CatatanUntuk informasi lebih lanjut tentang cara memilih terowongan, lihat bagian "How do I select a data tunnel?" pada topik FAQ tentang penyimpanan hulu dan hilir.
flushIntervalMs
Interval di mana operasi flush dilakukan di buffer penulis dalam MaxCompute Tunnel.
LONG
Tidak
30000 (30 detik)
Data dibuffer dan di-flush secara batch pada interval yang ditentukan oleh
flushIntervalMs.Streaming Tunnel: Data yang di-flush langsung tersedia di tabel MaxCompute tujuan.
Batch Tunnel: Data yang di-flush hanya tersedia setelah operasi checkpointing selesai. Kami menyarankan Anda mengatur opsi ini ke 0 untuk menonaktifkan fitur flush terjadwal.
Satuan: milidetik.
CatatanOpsi ini dapat digunakan bersama dengan opsi
batchSize. Operasi flush dipicu ketika kondisi yang ditentukan oleh opsibatchSizeatau opsiflushIntervalMsterpenuhi.batchSize
Ukuran buffer MaxCompute Tunnel.
LONG
Tidak
67108864 (64 MB)
MaxCompute sink menyisipkan data ke dalam buffer. Kemudian, MaxCompute sink menulis data dalam buffer ke tabel MaxCompute tujuan ketika ukuran data buffer melebihi nilai yang ditentukan oleh opsi batchSize.
Satuan: byte.
CatatanOpsi ini dapat digunakan bersama dengan opsi flushIntervalMs. Operasi flush dipicu ketika kondisi yang ditentukan oleh opsi batchSize atau flushIntervalMs terpenuhi.
numFlushThreads
Jumlah thread yang digunakan untuk flush data dalam buffer penulis di MaxCompute Tunnel.
INTEGER
Tidak
1
Setiap MaxCompute sink membuat jumlah thread yang ditentukan oleh opsi numFlushThreads untuk flush data. Jika nilai opsi ini lebih besar dari 1, data dalam partisi yang berbeda dapat di-flush secara bersamaan. Ini meningkatkan efisiensi operasi flush.
slotNum
Jumlah slot Tunnel yang digunakan oleh MaxCompute untuk menerima data dari Flink.
INTEGER
Tidak
0
Untuk informasi tentang batasan jumlah slot, lihat Ikhtisar layanan transmisi data dalam dokumentasi MaxCompute.
dynamicPartitionLimit
Jumlah maksimum partisi dinamis ke mana data dapat ditulis.
INTEGER
Tidak
100
Jika jumlah partisi dinamis ke mana data ditulis dari sink antara dua checkpoint melebihi nilai opsi dynamicPartitionLimit, pesan kesalahan ini muncul:
"Terlalu banyak partisi dinamis".PentingJika data ditulis ke sejumlah besar partisi tabel MaxCompute, beban kerja pada layanan MaxCompute tinggi, memperlambat checkpointing dan flushing. Untuk mencegah masalah ini, Anda perlu memeriksa apakah data perlu ditulis ke sejumlah besar partisi. Jika bisnis Anda memerlukan data untuk ditulis ke sejumlah besar partisi, secara manual tingkatkan nilai opsi dynamicPartitionLimit.
retryTimes
Jumlah maksimum percobaan ulang yang dapat dilakukan untuk permintaan di server MaxCompute.
INTEGER
Tidak
3
Layanan MaxCompute mungkin tidak tersedia untuk sementara waktu saat Anda membuat sesi, mengirimkan sesi, atau data di-flush. Jika layanan MaxCompute menjadi tidak tersedia, server MaxCompute diminta berdasarkan konfigurasi opsi ini.
sleepMillis
Interval percobaan ulang.
INTEGER
Tidak
1000
Satuan: milidetik.
enableUpsert
Menentukan apakah akan menggunakan MaxCompute Upsert Tunnel untuk mengunggah data.
BOOLEAN
Tidak
false
Nilai valid:
true: MaxCompute Upsert Tunnel digunakan untuk memproses data INSERT, UPDATE_AFTER, dan DELETE di Realtime Compute for Apache Flink.
false: MaxCompute Batch Tunnel atau MaxCompute Streaming Tunnel yang ditentukan oleh opsi useStreamTunnel digunakan untuk memproses data INSERT dan UPDATE_AFTER di Realtime Compute for Apache Flink.
PentingJika masalah seperti kesalahan, kegagalan penyebaran, atau gangguan pemrosesan jangka panjang terjadi saat MaxCompute sink melakukan commit sesi dalam mode upsert, kami sarankan Anda mengatur opsi Paralelisme operator sink ke nilai yang kurang dari atau sama dengan 10.
Hanya VVR 8.0.6 atau yang lebih baru yang mendukung opsi ini.
upsertAsyncCommit
Menentukan apakah akan menggunakan mode asinkron saat MaxCompute sink melakukan commit sesi dalam mode upsert.
BOOLEAN
Tidak
false
Nilai valid:
true: Mode asinkron digunakan. Jika Anda menggunakan mode asinkron, waktu yang dikonsumsi untuk melakukan commit sesi berkurang tetapi data yang ditulis setelah sesi di-commit tidak dapat segera di-query.
false: Mode sinkron digunakan secara default. Saat MaxCompute sink melakukan commit sesi, sistem menunggu hingga server memproses sesi.
CatatanHanya VVR 8.0.6 atau yang lebih baru yang mendukung opsi ini.
upsertCommitTimeoutMs
Batas waktu untuk MaxCompute sink melakukan commit sesi dalam mode upsert.
INTEGER
Tidak
120000
(120 detik)
Unit: milidetik.
CatatanHanya VVR 8.0.6 atau yang lebih baru yang mendukung opsi ini.
sink.operation
Mode operasi tulis untuk tabel Delta.
STRING
Tidak
insert
Nilai valid:
insert: Data ditulis ke tabel dalam mode tambahan.
upsert: Data diperbarui.
CatatanHanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.
sink.parallelism
Tingkat paralelisme saat data ditulis ke tabel Delta.
INTEGER
Tidak
Tidak ada
Tingkat paralelisme penulisan data. Jika Anda tidak mengonfigurasi opsi ini, paralelisme data upstream digunakan secara default.
Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.
PentingPastikan nilai opsi write.bucket.num adalah kelipatan integral dari nilai opsi sink.parallelism. Ini membantu memastikan kinerja tulis optimal dan secara efisien menghemat memori node sink.
sink.file-cached.enable
Menentukan apakah akan mengaktifkan mode cache file saat data ditulis ke partisi dinamis tabel Delta.
BOOLEAN
Tidak
false
Nilai valid:
true: Mode cache file diaktifkan.
false: Mode cache file dinonaktifkan.
Jika Anda mengaktifkan mode cache file, jumlah file kecil yang ditulis ke server berkurang. Namun, latensi penulisan yang lebih tinggi ada. Kami sarankan Anda mengaktifkan mode cache file jika tabel sink memiliki tingkat paralelisme tinggi.
CatatanHanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.
sink.file-cached.writer.num
Jumlah thread yang digunakan untuk mengunggah data secara bersamaan dalam tugas dalam mode cache file.
INTEGER
Tidak
16
Opsi ini berlaku hanya jika opsi sink.file-cached.enable diatur ke true.
Kami sarankan Anda tidak menaikkan nilai opsi ini ke nilai besar. Jika data ditulis ke sejumlah besar partisi secara bersamaan, kesalahan memori habis (OOM) mungkin terjadi.
CatatanHanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.
sink.bucket.check-interval
Interval di mana ukuran file diperiksa dalam mode cache file. Satuan: milidetik.
INTEGER
Tidak
60000
Opsi ini berlaku hanya jika opsi sink.file-cached.enable diatur ke true.
Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.
sink.file-cached.rolling.max-size
Nilai maksimum file cache tunggal dalam mode cache file.
MEMORYSIZE
Tidak
16 MB
Opsi ini hanya berlaku jika opsi sink.file-cached.enable diatur ke true.
Jika ukuran file melebihi nilai dari opsi ini, data file akan diunggah ke server.
CatatanHanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.
sink.file-cached.memory
Ukuran maksimum memori di luar heap yang digunakan untuk menulis data ke file dalam mode cache file.
MEMORYSIZE
Tidak
64 MB
Opsi ini hanya berlaku jika opsi sink.file-cached.enable disetel ke true.
Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.
sink.file-cached.memory.segment-size
Ukuran buffer yang digunakan untuk menulis data ke file dalam mode cache file.
UKURANMEMORI
Tidak
128 KB
Opsi ini hanya berlaku jika opsi sink.file-cached.enable diatur ke true.
Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.
sink.file-cached.flush.always
Menentukan apakah cache digunakan untuk menulis data ke file dalam mode cache.
BOOLEAN
Tidak
true
Opsi ini berlaku hanya jika opsi sink.file-cached.enable diatur ke true.
Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.
sink.file-cached.write.max-retries
Jumlah percobaan ulang maksimum untuk mengunggah data dalam mode cache file.
INTEGER
Tidak
3
Opsi ini berlaku hanya jika opsi sink.file-cached.enable diatur ke true.
Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.
upsert.writer.max-retries
Jumlah percobaan ulang maksimum untuk menulis data ke bucket dalam sesi Upsert Writer.
INTEGER
Tidak
3
Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.
upsert.writer.buffer-size
Ukuran buffer sesi Upsert Writer di Realtime Compute for Apache Flink.
MEMORYSIZE
Tidak
64 MB
Saat ukuran buffer total semua bucket mencapai ambang batas tertentu, sistem secara otomatis memperbarui data ke server.
CatatanData dalam sesi Upsert Writer dapat ditulis ke beberapa bucket secara bersamaan. Kami sarankan Anda meningkatkan nilai opsi ini untuk meningkatkan efisiensi penulisan.
Jika data ditulis ke sejumlah besar partisi, kesalahan OOM mungkin terjadi. Untuk mencegah masalah ini, Anda dapat menurunkan nilai opsi ini.
Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.
upsert.writer.bucket.buffer-size
Ukuran buffer bucket tunggal di Realtime Compute for Apache Flink.
MEMORYSIZE
Tidak
1 MB
Jika sumber daya memori server Flink tidak cukup, Anda dapat menurunkan nilai opsi ini.
Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.
upsert.write.bucket.num
Jumlah bucket untuk tabel tempat data ditulis.
INTEGER
Ya
Tidak ada
Nilai opsi ini harus sama dengan nilai opsi
write.bucket.numyang dikonfigurasikan untuk tabel Delta tempat data ditulis.Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.
upsert.write.slot-num
Jumlah slot Tunnel yang digunakan dalam sesi.
INTEGER
Tidak
1
Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.
upsert.commit.max-retries
Jumlah percobaan ulang maksimum untuk commit sesi upsert.
INTEGER
Tidak
3
Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.
upsert.commit.thread-num
Tingkat paralelisme commit sesi upsert.
INTEGER
Tidak
16
Kami sarankan Anda tidak menaikkan nilai opsi ini ke nilai besar. Jika terlalu banyak commit sesi upsert dilakukan secara bersamaan, konsumsi sumber daya meningkat. Ini dapat menyebabkan masalah kinerja atau konsumsi sumber daya berlebihan.
Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.
upsert.commit.timeout
Periode timeout untuk commit sesi upsert. Unit: detik.
BILANGAN BULAT
Tidak
600
Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.
upsert.flush.concurrent
Jumlah maksimum bucket ke mana data dalam partisi dapat ditulis secara bersamaan.
BILANGAN BULAT
Tidak
2
Setiap kali data dalam bucket diperbarui, satu slot Tunnel akan digunakan.
Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.
insert.commit.thread-num
Tingkat paralelisme sesi commit.
INTEGER
Tidak
16
Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.
insert.arrow-writer.enable
Menentukan apakah akan menggunakan format Arrow.
BOOLEAN
Tidak
false
Nilai valid:
true: Format Arrow digunakan.
false: Format Arrow tidak digunakan.
CatatanHanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.
insert.arrow-writer.batch-size
Jumlah maksimum baris dalam batch data berformat Arrow.
INTEGER
Tidak
512
Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.
insert.arrow-writer.flush-interval
Interval di mana penulis melakukan flush data. Satuan: milidetik.
INTEGER
Tidak
100000
Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.
insert.writer.buffer-size
Ukuran cache untuk penulis yang dibuffer.
MEMORYSIZE
Tidak
64 MB
Hanya VVR 8.0.10 atau yang lebih baru yang mendukung opsi ini.
upsert.partial-column.enable
Menentukan apakah hanya memperbarui data di kolom tertentu.
BOOLEAN
Tidak
false
Opsi ini hanya berlaku untuk tabel sink yang menulis data ke tabel MaxCompute Delta. Untuk informasi lebih lanjut, lihat Memperbarui data di kolom tertentu dalam dokumentasi MaxCompute.
Nilai valid:
true
false
Perilaku pembaruan data bergantung pada apakah sink memiliki catatan dengan kunci utama yang sama dengan data baru.
Jika tabel sink berisi data dengan kunci utama yang sama, bidang yang sesuai diperbarui berdasarkan kunci utama. Secara spesifik, bidang yang ditentukan ditimpa dengan nilai baru jika nilainya bukan
null.Jika tabel sink tidak berisi catatan dengan kunci utama yang sama, catatan baru akan ditambahkan. Nilai baru akan dimasukkan untuk kolom yang ditentukan sementara
nullakan dimasukkan untuk semua kolom lainnya.
CatatanHanya VVR 8.0.11 atau yang lebih baru yang mendukung opsi ini.
Spesifik Tabel Dimensi
Saat penyebaran dimulai, tabel dimensi menarik data penuh dari partisi yang ditentukan oleh opsi partisi. Opsi ini mendukung fungsi max_pt(). Jika cache dimuat ulang setelah entri cache kedaluwarsa, data partisi terbaru yang ditentukan oleh opsi partisi dianalisis ulang. Jika opsi partisi diatur ke max_two_pt(), tabel dimensi dapat menarik data dari dua partisi. Jika opsi partisi tidak diatur ke max_two_pt(), data hanya dari satu partisi dapat ditarik.
Opsi
Deskripsi
Tipe data
Diperlukan
Nilai default
Catatan
cache
Kebijakan cache.
STRING
Ya
Tidak ada nilai default
Anda harus mengatur opsi cache ke
ALLuntuk tabel dimensi dan secara eksplisit mendeklarasikan pengaturan dalam pernyataan DDL. Jika jumlah data dalam tabel remote kecil dan sejumlah besar kunci yang hilang ada, kami sarankan Anda mengatur opsi ini ke ALL. Tabel sumber dan tabel dimensi tidak dapat diasosiasikan berdasarkan klausa ON.ALL: menunjukkan bahwa semua data dalam tabel dimensi disimpan dalam cache. Sebelum sistem menjalankan penyebaran, sistem memuat semua data dalam tabel dimensi ke cache. Dengan cara ini, cache dicari untuk semua query berikutnya dalam tabel dimensi. Jika tidak ada kunci yang ada, sistem tidak dapat menemukan catatan data dalam cache. Sistem memuat ulang semua data dalam cache setelah entri cache kedaluwarsa.
CatatanJika opsi cache diatur ke ALL, Anda harus meningkatkan memori node join karena sistem secara asinkron memuat data tabel dimensi. Kami sarankan Anda meningkatkan ukuran memori setidaknya empat kali jumlah data dalam tabel remote. Ukuran memori terkait dengan algoritma kompresi penyimpanan MaxCompute.
Jika tabel dimensi berisi sejumlah besar data, Anda dapat menggunakan petunjuk SHUFFLE_HASH untuk mendistribusikan data secara merata ke setiap sub-tugas. Untuk informasi lebih lanjut, lihat bagian "Bagaimana cara menggunakan petunjuk SHUFFLE_HASH untuk tabel dimensi?" dari topik FAQ tentang penyimpanan hulu dan hilir.
Jika Anda menggunakan tabel dimensi ultra-besar, pengumpulan sampah (GC) Java virtual machine (JVM) yang sering dapat menyebabkan pengecualian penyebaran. Untuk menyelesaikan masalah ini, Anda dapat meningkatkan memori node tempat tabel dimensi digabungkan dengan tabel lain. Jika masalah tetap ada, kami sarankan Anda mengubah tabel dimensi menjadi tabel dimensi key-value yang mendukung kebijakan cache least recently used (LRU). Sebagai contoh, Anda dapat menggunakan tabel dimensi ApsaraDB for HBase sebagai tabel dimensi key-value.
cacheSize
Jumlah maksimum baris data yang dapat disimpan dalam cache.
LONG
Tidak
100000
Jika jumlah catatan data dalam tabel dimensi melebihi nilai opsi cacheSize, pesan kesalahan ini muncul:
"Jumlah baris tabel <table-name> partisi <partition-name> melebihi batas maxRowCount".PentingJika sejumlah besar catatan data ada dalam tabel dimensi, sejumlah besar memori heap JVM dikonsumsi. Dalam hal ini, kecepatan startup penyebaran dan kecepatan pembaruan tabel dimensi melambat. Untuk mencegah masalah ini, Anda perlu memeriksa apakah sejumlah besar catatan data perlu disimpan dalam cache. Jika bisnis Anda memerlukan sejumlah besar catatan data disimpan dalam cache tabel dimensi, secara manual tingkatkan nilai opsi ini.
cacheTTLMs
Batas waktu cache.
LONG
Tidak
Long.MAX_VALUE
Satuan: milidetik.
cacheReloadTimeBlackList
Periode waktu selama cache tidak diperbarui. Cache tidak diperbarui selama periode waktu yang ditentukan oleh opsi ini.
STRING
Tidak
Tidak ada nilai default
Opsi ini berlaku untuk acara promosi online skala besar seperti jam sibuk aktivitas. Anda dapat menentukan opsi ini untuk mencegah penyebaran menjadi tidak stabil saat cache diperbarui. Untuk informasi lebih lanjut tentang cara menentukan opsi, lihat bagian "Bagaimana cara mengonfigurasi opsi CacheReloadTimeBlackList?" dari topik FAQ tentang penyimpanan hulu dan hilir.
maxLoadRetries
Jumlah maksimum percobaan ulang yang dapat dilakukan untuk memperbarui cache. Saat data pertama kali ditarik ketika penyebaran dimulai, cache diperbarui. Jika jumlah percobaan ulang melebihi nilai opsi ini, penyebaran gagal dijalankan.
INTEGER
Tidak
10
Tidak tersedia.
Pemetaan tipe data
Untuk informasi lebih lanjut tentang tipe data yang didukung oleh MaxCompute, lihat Edisi Tipe Data MaxCompute V2.0.
Tipe data MaxCompute | Tipe data Realtime Compute for Apache Flink |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(presisi, skala) | DECIMAL(presisi, skala) |
CHAR(n) | CHAR(n) |
VARCHAR(n) | VARCHAR(n) |
STRING | STRING |
BINARY | BYTES |
DATE | DATE |
DATETIME | TIMESTAMP(3) |
TIMESTAMP | TIMESTAMP(9) |
TIMESTAMP_NTZ | TIMESTAMP(9) |
ARRAY | ARRAY |
MAP | MAP |
STRUCT | ROW |
JSON | STRING |
Jika tabel fisik MaxCompute berisi bidang tipe data komposit bersarang (ARRAY, MAP, atau STRUCT, dll) dan bidang tipe JSON, Anda harus menentukan tblproperties('columnar.nested.type'='true') saat membuat tabel fisik MaxCompute untuk memungkinkan Realtime Compute for Apache Flink membaca data dari dan menulis data ke tabel fisik dengan benar.
Ingesti data melalui YAML
MaxCompute connector dapat digunakan sebagai sink ingest data dalam pekerjaan berbasis YAML.
Persyaratan mesin VVR
VVR 11.1 atau lebih baru
Sintaksis
source:
type: xxx
sink:
type: maxcompute
name: MaxComputeSinkaccess-id: ${your_accessId}
access-key: ${your_accessKey}
endpoint: ${your_maxcompute_endpoint}
project: ${your_project}buckets-num: 8Opsi konfigurasi
Opsi | Diperlukan? | Nilai default | Tipe data | Deskripsi |
type | Ya | Tidak ada nilai default. | String | Konektor yang akan digunakan. Atur ke |
name | Tidak | Tidak ada nilai default. | String | Nama sink. |
access-id | Ya | Tidak ada nilai default. | String | ID AccessKey akun Alibaba Cloud atau pengguna RAM Anda. Dapatkan di Konsol Manajemen Akses Sumber Daya. |
access-key | Ya | Tidak ada nilai default. | String | Rahasia AccessKey Anda. |
endpoint | Ya | Tidak ada nilai default. | String | Titik akhir MaxCompute Anda. Konfigurasikan berdasarkan wilayah tempat proyek MaxCompute Anda berada dan metode koneksi jaringan. Untuk informasi lebih lanjut, lihat Titik akhir. |
project | Ya | Tidak ada nilai default. | String | Nama proyek MaxCompute Anda. Lakukan hal berikut untuk mendapatkannya:
|
tunnel.endpoint | Tidak | Tidak ada nilai default. | String | Titik akhir Tunnel MaxCompute. Titik akhir ini biasanya disimpulkan secara otomatis oleh MaxCompute dari pengaturan |
quota.name | Tidak | Tidak ada nilai default. | String | Nama kuota dari grup sumber daya eksklusif. Jika Anda tidak secara eksplisit menentukan opsi ini, grup sumber daya bersama akan digunakan. |
sts-token | Tidak | Tidak ada nilai default. | String | Token STS dari Peran RAM Anda. Opsi ini diperlukan untuk otentikasi identitas jika Anda menggunakan Peran RAM untuk mengakses MaxCompute. |
buckets-num | Tidak | 16 | Integer | Jumlah bucket untuk tabel Delta MaxCompute yang dibuat secara otomatis. Untuk informasi lebih lanjut, lihat Gudang data hampir real-time. |
compress.algorithm | Tidak | zlib | String | Algoritma kompresi data. Nilai valid:
|
total.buffer-size | Tidak | 64 MB | String | Ukuran buffer dalam memori. Untuk tabel yang dipartisi, buffer ini berlaku pada tingkat partisi. Untuk tabel non-partisi, berlaku pada tingkat tabel. Buffer untuk partisi atau tabel yang berbeda bersifat independen. Saat buffer mencapai kapasitas, datanya di-flush ke MaxCompute. |
bucket.buffer-size | Tidak | 4 MB | String | Ukuran buffer dalam memori untuk bucket. Opsi ini hanya berlaku saat data ditulis ke tabel Delta MaxCompute. Buffer untuk bucket yang berbeda bersifat independen. Saat buffer mencapai kapasitas, datanya di-flush ke MaxCompute. |
commit.thread-num | Tidak | 16 | Integer | Jumlah maksimum partisi atau tabel yang dapat dicommit secara bersamaan selama checkpointing. |
flush.concurrent-num | Tidak | 4 | Integer | Menentukan jumlah maksimum bucket ke mana Flink dapat secara bersamaan flush data. Opsi ini hanya berlaku saat data ditulis ke tabel Delta MaxCompute. |
Pemetaan lokasi tabel
Saat konektor memicu pembuatan tabel otomatis di MaxCompute, lokasi dipetakan sebagai berikut:
Jika fitur skema dinonaktifkan untuk proyek MaxCompute Anda, konektor akan mengabaikan tableId.namespace. Dalam hal ini, hanya satu database atau setara logisnya yang dimasukkan ke dalam MaxCompute. Sebagai contoh, ketika data di-ingest dari MySQL ke MaxCompute, hanya satu database MySQL yang di-inject.
Lokasi MySQL | Abstraksi di Flink CDC | Lokasi MaxCompute |
Tidak tersedia | Proyek dalam file konfigurasi | Proyek |
Database | TableId.namespace | Skema Catatan Jika skema dinonaktifkan untuk proyek MaxCompute Anda, pengaturan ini diabaikan. |
Tabel | TableId.tableName | Tabel |
Pemetaan tipe data
Tipe Flink CDC | Tipe MaxCompute |
CHAR | STRING |
VARCHAR | STRING |
BOOLEAN | BOOLEAN |
BINARY/VARBINARY | BINARY |
DECIMAL | DECIMAL |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INTEGER | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
TIME_WITHOUT_TIME_ZONE | STRING |
DATE | DATE |
TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_NTZ |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | TIMESTAMP |
TIMESTAMP_WITH_TIME_ZONE | TIMESTAMP |
ARRAY | ARRAY |
MAP | MAP |
ROW | STRUCT |
Contoh
API SQL
Tabel sumber
Membaca semua data dalam partisi tertentu
Baca semua data dalam partisi yang ditentukan oleh opsi partition.
CREATE TEMPORARY TABLE odps_source (
cid VARCHAR,
rt DOUBLE
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpointName>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=201809*'
);
CREATE TEMPORARY TABLE blackhole_sink (
cid VARCHAR,
invoke_count BIGINT
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT
cid,
COUNT(*) AS invoke_count
FROM odps_source GROUP BY cid;Membaca data inkremental
Baca data mulai dari partisi yang ditentukan oleh startPartition dan terus memantau catatan baru.
CREATE TEMPORARY TABLE odps_source (
cid VARCHAR,
rt DOUBLE
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpointName>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'startPartition' = 'yyyy=2018,MM=09,dd=05' -- Mulai membaca dari partisi 20180905.
);
CREATE TEMPORARY TABLE blackhole_sink (
cid VARCHAR,
invoke_count BIGINT
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT cid, COUNT(*) AS invoke_count
FROM odps_source GROUP BY cid;Tabel sink
Menulis ke partisi statis
Tulis ke partisi yang ditentukan oleh opsi partition:
CREATE TEMPORARY TABLE datagen_source (
id INT,
len INT,
content VARCHAR
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_sink (
id INT,
len INT,
content VARCHAR
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=20180905' -- Data ditulis ke partisi 20180905.
);
INSERT INTO odps_sink
SELECT
id, len, content
FROM datagen_source;Menulis ke partisi secara dinamis
Tulis data ke partisi secara dinamis berdasarkan opsi partition:
CREATE TEMPORARY TABLE datagen_source (
id INT,
len INT,
content VARCHAR,
c TIMESTAMP
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_sink (
id INT,
len INT,
content VARCHAR,
ds VARCHAR -- Secara eksplisit tentukan kolom partisi dinamis.
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds' -- Jangan tentukan nilai untuk ds. Data ditulis ke partisi yang berbeda berdasarkan nilai bidang ds.
);
INSERT INTO odps_sink
SELECT
id,
len,
content,
DATE_FORMAT(c, 'yyMMdd') as ds
FROM datagen_source;Tabel dimensi
Kunci nilai tunggal
Tentukan kunci utama saat setiap kunci memiliki nilai unik:
CREATE TEMPORARY TABLE datagen_source (
k INT,
v VARCHAR
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_dim (
k INT,
v VARCHAR,
PRIMARY KEY (k) NOT ENFORCED -- Tentukan kunci utama.
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=20180905',
'cache' = 'ALL'
);
CREATE TEMPORARY TABLE blackhole_sink (
k VARCHAR,
v1 VARCHAR,
v2 VARCHAR
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT k, s.v, d.v
FROM datagen_source AS s
INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;Kunci multi-nilai
Tidak perlu menentukan kunci utama saat sebuah kunci dapat memiliki beberapa nilai:
CREATE TEMPORARY TABLE datagen_source (
k INT,
v VARCHAR
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_dim (
k INT,
v VARCHAR
-- Menentukan kunci utama tidak diperlukan.
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=20180905',
'cache' = 'ALL'
);
CREATE TEMPORARY TABLE blackhole_sink (
k VARCHAR,
v1 VARCHAR,
v2 VARCHAR
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT k, s.v, d.v
FROM datagen_source AS s
INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;API DataStream
Jika Anda ingin memanggil API DataStream untuk membaca atau menulis data, Anda harus menggunakan konektor DataStream tipe terkait. Untuk informasi lebih lanjut tentang cara mengonfigurasi konektor DataStream, lihat Pengaturan Konektor DataStream.
Untuk melindungi kekayaan intelektual, VVR 6.0.6 atau lebih baru mendukung debugging lokal program DataStream yang mencakup konektor MaxCompute hingga 30 menit. Setiap sesi debugging yang lebih lama akan mengakibatkan program dihentikan dengan kesalahan. Untuk informasi lebih lanjut, lihat Debug Lokal Program Flink yang Mencakup Konektor.
Pembacaan data dari tabel Delta MaxCompute tidak didukung. Tabel Delta adalah tabel yang dibuat dengan
primary keyyang ditentukan dan propertitransactional=true. Untuk informasi lebih lanjut, lihat Konsep Dasar.
Kami sarankan Anda mendeklarasikan tabel MaxCompute menggunakan pernyataan SQL saat menggunakan konektor DataStream MaxCompute. Anda dapat memanggil operasi API Table untuk mengakses tabel MaxCompute atau memanggil operasi API DataStream untuk mengakses aliran data.
Hubungkan ke tabel sumber
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
"\n",
"CREATE TEMPORARY TABLE IF NOT EXISTS odps_source (",
" cid VARCHAR,",
" rt DOUBLE",
") WITH (",
" 'connector' = 'odps',",
" 'endpoint' = '<yourEndpointName>',",
" 'project' = '<yourProjectName>',",
" 'tableName' = '<yourTableName>',",
" 'accessId' = '<yourAccessId>',",
" 'accessKey' = '<yourAccessPassword>',",
" 'partition' = 'ds=201809*'",
")");
DataStream<Row> source = tEnv.toDataStream(tEnv.from("odps_source"));
source.print();
env.execute("odps source"); Menyambungkan ke sink
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
"\n",
"CREATE TEMPORARY TABLE IF NOT EXISTS odps_sink (",
" cid VARCHAR,",
" rt DOUBLE",
") WITH (",
" 'connector' = 'odps',",
" 'endpoint' = '<yourEndpointName>',",
" 'project' = '<yourProjectName>',",
" 'tableName' = '<yourTableName>',",
" 'accessId' = '<yourAccessId>',",
" 'accessKey' = '<yourAccessPassword>',",
" 'partition' = 'ds=20180905'",
")");
DataStream<Row> data = env.fromElements(
Row.of("id0", 3.),
Row.of("id1", 4.));
tEnv.fromDataStream(data).insertInto("odps_sink").execute();XML
Dependensi Maven dari konektor MaxCompute berisi kelas-kelas yang diperlukan untuk membuat sumber penuh, sumber tambahan, sink, dan tabel dimensi. MaxCompute DataStream Connectors dari berbagai versi disimpan di repositori pusat Maven.
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-odps</artifactId>
<version>${vvr-version}</version>
</dependency>