全部产品
Search
文档中心

MaxCompute:Masukkan data ke gudang data secara real-time

更新时间:Jul 02, 2025

Untuk memenuhi persyaratan bisnis terhadap data yang sensitif terhadap waktu di gudang data, MaxCompute mendukung penulisan data real-time dan pembaruan kunci utama dalam hitungan menit berdasarkan tabel Delta. Hal ini meningkatkan efisiensi pembaruan data secara signifikan.

Skenario penulisan data

Saat menggunakan database relasional tradisional atau metode analisis data offline untuk memproses log perilaku pelanggan terkait peristiwa mendadak seperti komentar, peringkat, dan suka, data biasanya hanya dapat dianalisis pada hari berikutnya. Skenario ini sering melibatkan masalah seperti konsumsi sumber daya tinggi, biaya besar, latensi data, dan kompleksitas pembaruan.

Untuk mengatasi masalah tersebut, Anda dapat menggunakan solusi penulisan data real-time ke gudang data. Solusi ini menyinkronkan data tambahan ke tabel Delta dalam hitungan menit, sehingga membatasi latensi dari penulisan hingga kueri menjadi 5 hingga 10 menit. Ini meningkatkan ketepatan waktu analisis data secara signifikan. Jika Anda menjalankan tugas produksi untuk menyinkronkan data ke tabel standar di lapisan operational data store (ODS) MaxCompute, Anda dapat menggunakan fitur UPSERT tabel Delta untuk menghindari risiko transformasi tugas produksi. Fitur ini membantu menyinkronkan data secara efektif ke tabel Delta dan mencegah penyimpanan data duplikat, meningkatkan efisiensi penyimpanan serta mengurangi biaya.

Contoh

Tulis data Flink ke tabel Delta

Topik ini menjelaskan cara menulis data ke tabel Delta MaxCompute secara real-time menggunakan konektor Flink.

image

Tabel berikut menggambarkan proses penulisan data.

No.

Deskripsi

1

Data dikelompokkan berdasarkan kunci utama dan ditulis secara bersamaan ke tabel.

Untuk meningkatkan throughput penulisan, Anda juga dapat mengelompokkan data berdasarkan kolom kunci partisi dan menulis data ke partisi tabel saat kondisi berikut terpenuhi: (1) Data perlu ditulis secara bersamaan ke sejumlah besar partisi. (2) Data didistribusikan secara merata di partisi. (3) Jumlah bucket untuk tabel kecil. Misalnya, kurang dari 10 bucket dikonfigurasikan untuk menyimpan data tabel.

2

Setelah UpsertWriterTask menerima data, ia mengurai partisi tempat data tersebut termasuk dan mengirim permintaan ke UpsertOperatorCoordinator. Kemudian, UpsertOperatorCoordinator membuat sesi upsert untuk menulis data ke partisi secara real-time.

3

UpsertOperatorCoordinator mengembalikan ID sesi upsert yang dibuat ke UpsertWriterTask.

4

UpsertWriterTask membuat Upsert Writer berdasarkan sesi upsert dan terhubung ke Tunnel Server MaxCompute untuk terus menulis data ke tabel.

Jika mode cache file diaktifkan, data pertama kali masuk ke cache disk lokal Flink selama transmisi data. Data ditransmisikan ke Tunnel Server sampai ukuran file data mencapai ambang tertentu atau proses checkpoint dimulai.

5

Setelah proses checkpoint dimulai, Upsert Writer mengirimkan semua data ke Tunnel Server, lalu mengirim permintaan ke UpsertOperatorCoordinator untuk memicu operasi commit. Setelah operasi commit berhasil, data menjadi terlihat.

6

Jika kompaksi mayor otomatis diaktifkan, UpsertOperatorCoordinator memulai operasi kompaksi mayor ke Storage Service ketika jumlah commit partisi melebihi ambang tertentu.

Catatan

Operasi ini dapat menyebabkan latensi untuk impor data real-time berdasarkan ukuran data tabel. Oleh karena itu, Anda harus menggunakan kompaksi mayor otomatis dengan hati-hati.

Untuk informasi lebih lanjut tentang cara menulis data Flink ke tabel Delta MaxCompute, lihat Gunakan Flink untuk menulis data ke tabel Delta.

Saran konfigurasi parameter untuk pernyataan UPSERT

Anda dapat menyesuaikan konfigurasi parameter untuk pernyataan UPSERT guna meningkatkan throughput sistem, kinerja penulisan data real-time, dan stabilitas sistem sesuai dengan berbagai persyaratan bisnis. Untuk informasi lebih lanjut tentang parameter untuk pernyataan UPSERT, lihat Parameter untuk pernyataan UPSERT.

Konfigurasi parameter kunci umum

  • Jumlah bucket untuk sebuah tabel memengaruhi konkurensi penulisan maksimum dan menentukan throughput total. Kami merekomendasikan Anda menghitung throughput total berdasarkan rumus: 1 MB/s × Jumlah bucket untuk sebuah tabel.

    Throughput aktual bergantung pada parameter spesifik seperti sink.parallelism. Untuk informasi lebih lanjut, lihat Format tabel dan tata kelola data.

  • Parameter sink.parallelism menentukan paralelisme node sink untuk penulisan data. Kami merekomendasikan Anda menyetel jumlah bucket untuk sebuah tabel sebagai kelipatan integral dari nilai parameter ini untuk mencapai kinerja optimal. Secara teori, kinerja optimal dicapai ketika nilai parameter sink.parallelism sama dengan jumlah bucket untuk sebuah tabel.

Konfigurasi parameter untuk peningkatan throughput tabel non-partisi

  • Jika throughput tidak meningkat setelah mengonfigurasi parameter sink.parallelism untuk meningkatkan konkurensi penulisan, kemungkinan tautan pemrosesan data upstream dari node sink tidak efisien. Kami merekomendasikan Anda mengoptimalkan tautan pemrosesan data untuk meningkatkan kinerja keseluruhan.

  • Jika jumlah bucket untuk sebuah tabel adalah kelipatan integral dari nilai parameter sink.parallelism, jumlah bucket yang ditulis oleh satu node sink dihitung berdasarkan rumus: Jumlah bucket untuk sebuah tabel / sink.parallelism. Jika jumlah bucket terlalu besar, kinerja dapat terpengaruh negatif. Kami merekomendasikan Anda menyesuaikan jumlah bucket dan nilai parameter sink.parallelism terlebih dahulu. Jika nilai parameter upsert.writer.buffer-size dibagi dengan jumlah bucket yang ditulis oleh satu node sink kurang dari ambang tertentu (misalnya, 128 KB), efisiensi transmisi jaringan mungkin berkurang. Untuk meningkatkan kinerja jaringan, kami merekomendasikan Anda meningkatkan nilai parameter upsert.writer.buffer-size.

  • Parameter upsert.flush.concurrent menentukan jumlah bucket yang mana data disiram secara bersamaan. Nilai default parameter ini adalah 2. Untuk meningkatkan throughput, Anda dapat meningkatkan nilai parameter ini dan mengamati bagaimana kinerja meningkat.

    Catatan

    Jika Anda menyetel parameter ini ke nilai yang terlalu besar, data mungkin ditulis ke sejumlah bucket yang berlebihan pada saat yang sama. Ini dapat menyebabkan kemacetan jaringan dan mengurangi throughput keseluruhan. Oleh karena itu, sesuaikan nilai parameter ini berdasarkan persyaratan bisnis Anda untuk memastikan operasi sistem yang stabil dan efisien.

Konfigurasi parameter untuk meningkatkan throughput penulisan bersamaan ke sejumlah kecil partisi

Dalam skenario ini, Anda dapat merujuk ke bagian "Konfigurasi parameter kunci umum" dan "Konfigurasi parameter untuk peningkatan throughput tabel non-partisi". Perhatikan poin-poin berikut:

  • Data ditulis ke beberapa partisi oleh satu node sink. Selama proses checkpoint, penulisan data ke setiap partisi dilakukan secara independen, sehingga memengaruhi throughput keseluruhan.

  • Memori maksimum data buffer untuk satu node sink dihitung berdasarkan rumus: upsert.writer.buffer-size × Jumlah partisi. Jika terjadi kesalahan out-of-memory (OOM), kami merekomendasikan Anda menurunkan nilai parameter upsert.writer.buffer-size untuk mencegah penggunaan memori melebihi batas atas.

  • Anda dapat meningkatkan nilai parameter upsert.commit.thread-num untuk mengurangi waktu yang diperlukan untuk operasi commit selama proses checkpoint. Nilai default parameter ini adalah 16, yang menunjukkan bahwa 16 thread digunakan untuk melakukan operasi commit secara bersamaan untuk partisi.

    Catatan

    Anda dapat meningkatkan nilai parameter upsert.commit.thread-num untuk meningkatkan kinerja sistem. Namun, untuk menghindari masalah yang disebabkan oleh konkurensi berlebihan, jangan tingkatkan nilai parameter ini menjadi lebih dari 32.

Konfigurasi parameter untuk meningkatkan throughput penulisan bersamaan ke sejumlah besar partisi (dalam mode cache file)

Dalam skenario ini, Anda dapat merujuk ke bagian "Konfigurasi parameter untuk meningkatkan throughput penulisan bersamaan ke sejumlah kecil partisi". Perhatikan poin-poin berikut:

  • Data setiap partisi disimpan dalam file lokal dan kemudian ditulis secara bersamaan ke MaxCompute selama proses checkpoint.

  • Nilai default parameter sink.file-cached.writer.num adalah 16. Anda dapat meningkatkan nilai parameter ini untuk meningkatkan jumlah partisi yang mana data ditulis secara bersamaan oleh satu node sink. Kami merekomendasikan Anda tidak menyetel parameter ini ke nilai lebih dari 32. Sesuaikan jumlah bucket yang ditulis secara bersamaan berdasarkan rumus: sink.file-cached.writer.num × upsert.flush.concurrent. Hindari menyetel parameter sink.file-cached.writer.num ke nilai yang terlalu besar, karena dapat menyebabkan kemacetan jaringan dan menurunkan throughput keseluruhan.

Catatan

Untuk informasi lebih lanjut tentang parameter untuk menulis data dalam mode cache file, lihat Parameter untuk menulis data dalam mode cache file.

Saran lainnya

Jika persyaratan throughput tidak terpenuhi atau throughput tidak stabil setelah menyesuaikan konfigurasi parameter berdasarkan saran sebelumnya, pertimbangkan faktor-faktor berikut:

  • Grup sumber daya Tunnel publik yang tersedia gratis untuk setiap proyek terbatas. Jika batas atas tercapai, data tidak dapat ditulis, mengurangi throughput keseluruhan.

  • Tautan pemrosesan data upstream dari konektor tidak efisien, menghasilkan throughput keseluruhan rendah. Kami merekomendasikan Anda mengoptimalkan tautan pemrosesan data untuk meningkatkan kinerja keseluruhan.

FAQ

Masalah terkait Flink

  • Masalah 1:

    • Deskripsi masalah: Pesan kesalahan "Checkpoint xxx expired before completing" muncul.

    • Penyebab: Proses checkpoint habis waktu. Dalam sebagian besar kasus, masalah ini terjadi karena data ditulis ke sejumlah partisi yang berlebihan selama proses checkpoint.

    • Solusi:

      • Kami merekomendasikan Anda meningkatkan interval checkpoint Flink.

      • Konfigurasikan parameter sink.file-cached.enable untuk mengaktifkan mode cache file. Untuk informasi lebih lanjut, lihat Lampiran: Parameter konektor Flink versi baru.

  • Masalah 2:

    • Deskripsi masalah: Pesan kesalahan "org.apache.flink.util.FlinkException: An OperatorEvent from an OperatorCoordinator to a task was lost. Triggering task failover to ensure consistency." muncul.

    • Penyebab: Komunikasi antara JobManager dan TaskManager tidak normal. Tugas secara otomatis mencoba ulang.

    • Solusi: Kami merekomendasikan Anda meningkatkan jumlah sumber daya tugas untuk memastikan stabilitas tugas.

Masalah penulisan data

  • Masalah 1:

    • Deskripsi masalah: Offset waktu delapan jam terjadi setelah data tipe TIMESTAMP ditulis ke MaxCompute.

    • Penyebab: Data tipe TIMESTAMP Flink tidak mengandung informasi zona waktu. Selain itu, zona waktu tidak dikonversi saat data ditulis ke MaxCompute. Oleh karena itu, data dianggap sebagai data zona waktu nol. Namun, MaxCompute mengonversi data berdasarkan zona waktu proyek saat membaca data.

    • Solusi: Ganti data tipe TIMESTAMP di tabel sink MaxCompute dengan data tipe TIMESTAMP_LTZ.

Masalah terkait Tunnel

  • Masalah 1:

    • Deskripsi masalah: Kesalahan terkait Tengine terjadi saat data ditulis. Pesan kesalahan berikut muncul:

      <body>
      <h1>Terjadi kesalahan.</h1>
      <p>Maaf, halaman yang Anda cari saat ini tidak tersedia.<br/>
      Silakan coba lagi nanti.</p>
      <p>Jika Anda adalah administrator sistem dari sumber daya ini maka Anda harus memeriksa
      <a href="http://nginx.org/r/error_log">log kesalahan</a> untuk detailnya.</p>
      <p></p>
      </body>
      </html>
    • Penyebab: Layanan Tunnel sementara tidak tersedia.

    • Solusi: Tunggu hingga layanan Tunnel dipulihkan. Setelah itu, tugas dapat mencoba lagi dengan sukses.

  • Masalah 2:

    • Deskripsi masalah: Muncul pesan kesalahan "java.io.IOException: RequestId=xxxxxx, ErrorCode=SlotExceeded, ErrorMessage=Kuota slot Anda telah terlampaui."

    • Penyebab: Kuota penulisan melebihi batas atas. Anda harus mengurangi konkurensi penulisan atau meningkatkan paralelisme grup sumber daya Tunnel eksklusif.

    • Solusi:

      • Kurangi konkurensi penulisan untuk mengurangi jumlah sumber daya sistem yang perlu digunakan.

      • Tingkatkan paralelisme grup sumber daya Tunnel eksklusif untuk meningkatkan kemampuan pemrosesan guna memenuhi kebutuhan penulisan data yang lebih tinggi.