全部产品
Search
文档中心

MaxCompute:Pembaruan Kolom Parsial Hampir Real-Time ke Tabel Delta dengan Flink

更新时间:Jul 02, 2025

Topik ini menjelaskan skenario dan konfigurasi parameter untuk pembaruan kolom parsial dalam tabel Delta, serta dua mode pembaruan yang dirancang untuk Flink Connector dan konfigurasinya.

Informasi latar belakang

  • Operasi UPSERT: Fitur database yang menggabungkan operasi INSERT dan UPDATE. Operasi ini memastikan efisiensi dengan mensyaratkan bahwa setiap rekaman (atau baris) yang diproses oleh UPSERT mencakup kolom kunci utama.

  • Perilaku UPSERT: Bergantung pada apakah data dengan kunci utama tertentu ada di tabel.

    • Semantik insert: Jika data dengan kunci utama tertentu tidak ada di tabel, UPSERT melakukan operasi insert untuk menambahkan rekaman baru.

    • Semantik update: Jika rekaman dengan kunci utama tertentu sudah ada di tabel, UPSERT melakukan operasi update untuk memperbarui data yang ada dengan data baru.

  • Skenario UPSERT: Dalam pemrosesan aliran dengan beberapa penggabungan tabel, pembaruan dari dua aliran data berbeda memengaruhi kolom berbeda di tabel yang sama.

    • Aliran data StreamA bertanggung jawab memperbarui kolom ColumnX.

    • Aliran data StreamB bertanggung jawab memperbarui kolom ColumnY.

  • Perbandingan Bentuk UPSERT:

    • UPSERT tradisional: Pembaruan dari StreamB dapat menimpa modifikasi yang dibuat oleh StreamA, menyebabkan ketidaksesuaian data.

    • Fitur Pembaruan Kolom Parsial: Memastikan tidak ada konflik antar aliran selama pembaruan bersamaan. Setiap aliran hanya memperbarui kolom yang menjadi tanggung jawabnya sambil mempertahankan hasil pembaruan dari semua aliran di baris yang sama.

Skenario

Skenario 1: Perbarui kolom berbeda di baris yang sama tanpa gangguan

Misalkan terdapat sistem manajemen informasi pengguna yang perlu memproses dan memperbarui data pengguna secara real-time. Data diproses oleh dua aliran layanan independen yang menerima informasi dari sumber data berbeda.

  • Aliran data StreamA bertanggung jawab memproses informasi pribadi pengguna seperti nama, usia, dan jenis kelamin.

  • Aliran data StreamB bertanggung jawab memproses informasi kontak pengguna seperti email dan nomor telepon.

Dalam operasi bisnis nyata, informasi pribadi dan kontak pengguna mungkin berubah hampir secara bersamaan. Kita perlu memastikan bahwa pembaruan ini segera tercermin dalam sistem manajemen informasi pengguna tanpa saling menimpa.

Proses operasi

  1. Seorang pengguna memperbarui nama dan nomor telepon mereka di platform berbeda. StreamA menerima pembaruan nama, dan StreamB menerima pembaruan nomor telepon.

  2. Kedua StreamA dan StreamB mengirimkan pembaruan ke sistem manajemen informasi pengguna.

Hasil akhir

  • Tanpa Pembaruan Kolom Parsial: Jika pembaruan dari StreamB tiba dan diproses setelah StreamA, itu akan menimpa informasi nama yang baru saja diperbarui oleh StreamA (jika StreamB melakukan pembaruan baris penuh), menyebabkan nama kembali ke nilai lamanya.

  • Dengan Pembaruan Kolom Parsial:

    • Ketika StreamA melakukan pembaruan, itu hanya beroperasi pada kolom nama tanpa memengaruhi kolom informasi kontak.

    • Ketika StreamB melakukan pembaruan, itu hanya beroperasi pada kolom nomor telepon tanpa memengaruhi kolom informasi pribadi.

    Hasil akhirnya adalah nama pengguna diperbarui ke informasi terbaru, dan nomor telepon juga diperbarui ke informasi terbaru. Pembaruan ini dilakukan secara independen tanpa gangguan, memastikan integritas dan akurasi informasi pengguna.

Dalam aplikasi praktis, fitur pembaruan kolom parsial sangat penting untuk memproses data seperti informasi pengguna. Fitur ini tidak hanya memastikan pembaruan data secara real-time tetapi juga secara efektif mencegah masalah ketidaksesuaian data.

Skenario 2: Perbarui bidang parsial dalam baris sambil menjaga yang lain tetap tidak berubah

Misalkan terdapat sistem manajemen informasi pengguna yang perlu memproses dan memperbarui data pengguna secara real-time. Data diproses oleh dua aliran layanan independen yang menerima informasi dari sumber data berbeda.

  • StreamA bertanggung jawab memperbarui informasi pribadi pengguna seperti nama, usia, jenis kelamin, dan informasi kontak pengguna seperti email dan nomor telepon.

  • StreamB bertanggung jawab memperbarui informasi pribadi pengguna seperti nama, usia, jenis kelamin, dan informasi kontak pengguna seperti email dan nomor telepon. Tugasnya identik dengan StreamA.

Proses operasi

  1. StreamA hanya ingin memperbarui usia pengguna, dengan perintah seperti: INSERT INTO table (pk, age) VALUES (1, 3);.

  2. Pada saat yang sama, StreamB hanya ingin memperbarui jenis kelamin pengguna, dengan perintah seperti: INSERT INTO table (pk, sex) VALUES (1, 'male');.

Hasil akhir

  • Tanpa Pembaruan Kolom Parsial: Jika rekaman dengan kunci utama 1 menerima perintah pembaruan di atas, semua bidang lain selain yang sedang diperbarui akan disetel ke NULL. Ini akan mengakibatkan hilangnya data valid asli.

  • Dengan Pembaruan Dinamis:

    • Ketika operasi insert dipicu, sistem mengidentifikasi bahwa hanya beberapa bidang yang berisi data.

    • Mekanisme pembaruan kolom parsial memastikan bahwa hanya bidang-bidang dengan data yang diperbarui.

    • Pada saat yang sama, bidang yang tidak memiliki data yang diberikan dalam operasi insert akan mempertahankan nilai aslinya.

Dengan menerapkan strategi identifikasi dan pembaruan otomatis ini, sistem manajemen informasi pengguna dapat memperbarui secara tepat hanya bidang yang perlu diubah tanpa kehilangan data valid yang ada. Ini secara signifikan meningkatkan fleksibilitas dan akurasi pengelolaan data, memberikan perlindungan kuat untuk menjaga integritas data.

Ikhtisar mode Flink Connector

Berdasarkan skenario penggunaan pembaruan kolom parsial dalam tabel Delta, dua mode pembaruan kolom parsial dirancang untuk Flink Connector untuk memenuhi persyaratan pembaruan data yang berbeda.

Mode statis

Dalam mode statis, pengguna perlu menentukan sebelumnya kolom mana yang akan diperbarui oleh aliran data. Kolom yang ditentukan ini akan mengikuti logika UPSERT normal:

  • Jika kunci utama ada, perbarui data.

  • Jika kunci utama tidak ada, sisipkan data baru.

Pada saat yang sama, kolom yang tidak ditentukan untuk pembaruan akan mempertahankan nilai yang ada. Mode ini cocok untuk kolom yang diharapkan sering berubah.

Mode dinamis

Mode dinamis memberikan sistem kecerdasan dan adaptabilitas yang lebih tinggi. Dalam mode ini, sistem dapat secara otomatis mendeteksi kolom mana dalam aliran data yang berisi nilai non-NULL dan hanya memperbarui kolom-kolom tersebut dengan nilai. Ini berarti bahwa kolom dalam aliran data tanpa nilai (dengan nilai NULL) akan tetap tidak berubah. Mode dinamis sangat cocok untuk situasi di mana tidak mungkin menentukan sebelumnya kolom mana yang akan berubah, memastikan akurasi dan efisiensi setiap pembaruan aliran data.

Dengan memperkenalkan kedua mode pembaruan ini, Flink Connector memberikan pengguna kemampuan pemrosesan data yang lebih fleksibel dan kuat, memungkinkan mereka memilih strategi pembaruan data yang paling sesuai berdasarkan situasi nyata, sehingga memastikan akurasi dan integritas data.

Tabel berikut menunjukkan hasil setelah memperbarui data yang sama menggunakan mode berbeda.

Catatan

Dalam contoh ini, kolom pertama a adalah kunci utama. Dalam mode statis, kolom kunci utama dipilih secara default.

Mode

Data awal

Langkah 1: Data setelah pembaruan (a, b, c)

Langkah 2: Data setelah pembaruan (a, d, null)

Langkah 3: Data akhir setelah pembaruan (a, null, e)

Mode reguler

(null, null, null)

(a, b, c)

(a, d, null)

(a, null, e)

Mode dinamis

(null, null, null)

(a, b, c)

(a, d, c)

(a, d, e)

Mode statis (kolom kedua ditentukan untuk pembaruan)

(null, null, null)

(a, b, null)

(a, d, null)

(a, null, null)

Metode penggunaan

Buat tabel Delta dan aktifkan pembaruan kolom parsial

Metode spesifik adalah mengonfigurasi parameter acid.partial.fields.update.enable=true dalam tblproperties. Untuk informasi lebih lanjut, lihat Parameter untuk Tabel Delta.

Contoh sintaks:

CREATE TABLE IF NOT EXISTS partial_upsert_test
  (pk INT NOT NULL, 
   c1 STRING, 
   c2 STRING, 
   c3 STRING, 
   primary key(pk)) 
TBLPROPERTIES('transactional'='true', 'acid.partial.fields.update.enable'='true');

Contoh konfigurasi Flink Connector

Parameter

Untuk mengonfigurasi mode pembaruan kolom parsial, MaxCompute telah memperkenalkan dua parameter konfigurasi berikut:

Parameter

Deskripsi

upsert.partial-column.enable

Parameter ini digunakan untuk mengaktifkan fitur pembaruan kolom parsial. Jika nama kolom tidak ditentukan (dengan parameter upsert.partial-column.name dibiarkan kosong), sistem akan menggunakan mode dinamis (memperbarui bidang non-NULL) untuk pembaruan.

upsert.partial-column.name

Parameter ini digunakan untuk menentukan kolom yang perlu diperbarui. Jika parameter ini disetel, sistem hanya akan memperbarui bidang yang terdaftar, sementara bidang lainnya akan mempertahankan nilai aslinya.

Catatan

Kolom kunci utama dipilih secara default. Nama kolom kunci partisi tidak dapat ditambahkan ke parameter ini.

Contoh konfigurasi pembaruan kolom parsial dinamis

Buat tabel dengan pembaruan kolom parsial dinamis diaktifkan. Contoh:

CREATE TABLE partialtable (
  pk INT,
  c1 STRING, 
  c2 STRING, 
  c3 STRING,
  PRIMARY KEY(pk) NOT ENFORCED
) WITH (
  'connector' = 'maxcompute',
  'odps.end.point' = 'https://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api', //Konektivitas jaringan VPC
  'odps.project.name' = 'project_name',
  'odps.namespace.schema' = 'true', //Dukung model tiga lapis.
  'table.name' = 'project.schema.tablename',
  'sink.operation' = 'upsert',
  'upsert.write.bucket.num' = '1',
  'upsert.partial-column.enable' = 'true', 
  'odps.access.id' = 'yourAccessId',
  'odps.access.key' = 'yourAccessKey'
);

Contoh operasi berikutnya pada tabel dan hasilnya:

  1. Masukkan data [1,a, b, c] ke dalam tabel, di mana kolom pertama adalah kunci utama. Data awal adalah [1, a, b, c].

    INSERT INTO partialtable VALUES (1, 'a', 'b', 'c'); 
  2. Perbarui hanya kolom kedua c2 menjadi d untuk rekaman dengan kunci utama 1. Data setelah pembaruan adalah [1, a, d, c].

    INSERT INTO partialtable(pk, c2) VALUES (1, 'd'); 
  3. Perbarui hanya kolom ketiga c3 menjadi e untuk rekaman dengan kunci utama 1. Data setelah pembaruan adalah [1, a, d, e].

    INSERT INTO partialtable(pk, c3) VALUES (1, 'e'); 

Contoh konfigurasi pembaruan kolom parsial statis

Buat tabel yang hanya memperbarui kolom c2. Operasi berikutnya pada tabel ini hanya akan memengaruhi kolom c2, sementara kolom lainnya akan tetap tidak berubah. Contoh:

CREATE TABLE PartialTable2 (
  pk INT,
  c1 STRING, 
  c2 STRING, 
  c3 STRING,
  PRIMARY KEY(pk) NOT ENFORCED
) WITH (
  'connector' = 'maxcompute',
  'odps.end.point' = 'https://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api', //Konektivitas jaringan VPC
  'odps.project.name' = 'project_name',
  'odps.namespace.schema' = 'true', //Dukung model tiga lapis.
  'table.name' = 'project.schema.tablename',
  'sink.operation' = 'upsert',
  'upsert.write.bucket.num' = '1',
  'upsert.partial-column.enable' = 'true', 
  'upsert.partial-column.name' = 'c2', // Tentukan untuk hanya memperbarui kolom c2.
  'odps.access.id' = 'yourAccessId',
  'odps.access.key' = 'yourAccessKey'
);
Catatan

Saat mengonfigurasi parameter upsert.partial-column.name, Anda harus menggunakan nama kolom yang sesuai dengan tabel di MaxCompute, bukan nama kolom di tabel internal Flink. Ini memastikan bahwa Flink dapat dengan benar mengidentifikasi dan memperbarui kolom yang sesuai di sistem penyimpanan.

Referensi