Topik ini menjelaskan cara menggunakan Realtime Compute for Apache Flink untuk menyinkronkan data log dari ApsaraMQ for Kafka ke Hologres secara waktu nyata. Di akhir tutorial ini, Anda akan memiliki pekerjaan Flink SQL yang berjalan dan terus-menerus mengalirkan catatan pengguna ke gudang data Hologres—serta secara otomatis menyesuaikan diri ketika skema sumber berubah.
Ikhtisar arsitektur
Pipeline mengikuti alur data berikut:
[Faker Connector] → [ApsaraMQ for Kafka] → [Realtime Compute for Apache Flink] → [Hologres]
Konektor Faker menghasilkan catatan pengguna sintetis dan menuliskannya ke topik ApsaraMQ for Kafka. Realtime Compute for Apache Flink membaca dari topik tersebut, mentransformasi data menggunakan Flink SQL, lalu menuliskan hasilnya ke gudang data Hologres. Ketiga layanan tersebut harus berada dalam VPC yang sama.
Langkah-langkah dalam tutorial ini
Konfigurasikan daftar putih alamat IP
Siapkan data uji untuk instans ApsaraMQ for Kafka
Buat katalog Hologres (hanya metode CTAS)
Kembangkan dan mulai pekerjaan sinkronisasi data
Lihat hasil sinkronisasi data penuh
Periksa apakah perubahan skema tabel disinkronkan secara otomatis
Prasyarat
Sebelum memulai, pastikan hal-hal berikut telah tersedia:
-
Pengguna Resource Access Management (RAM) atau Peran RAM yang Anda gunakan untuk mengakses konsol pengembangan Realtime Compute for Apache Flink memiliki izin yang diperlukan. Untuk informasi selengkapnya, lihat Manajemen izin.
-
Ruang kerja Flink telah dibuat. Untuk informasi selengkapnya, lihat Aktifkan Realtime Compute for Apache Flink.
-
Instans penyimpanan hulu dan hilir telah dibuat:
Instans ApsaraMQ for Kafka dan Hologres Anda harus berada dalam VPC yang sama dengan ruang kerja Flink Anda. Ini merupakan persyaratan jaringan wajib—jika layanan tidak berada dalam VPC yang sama, akan terjadi error koneksi. Jika tidak berada dalam VPC yang sama, Anda harus membuat koneksi di antara layanan tersebut sebelum melanjutkan. Untuk informasi selengkapnya, lihat Bagaimana Realtime Compute for Apache Flink mengakses layanan lintas VPC? atau Bagaimana Realtime Compute for Apache Flink mengakses Internet?
Langkah 1: Konfigurasikan daftar putih alamat IP
Untuk mengizinkan ruang kerja Flink Anda mengakses instans Kafka dan Hologres, tambahkan Blok CIDR vSwitch tempat ruang kerja Flink berada ke daftar putih kedua instans tersebut.
-
Dapatkan Blok CIDR vSwitch tempat ruang kerja Flink Anda berada.
-
Masuk ke Konsol Realtime Compute for Apache Flink.
-
Temukan ruang kerja target dan pilih
More >Workspace Details pada kolomActions . -
Pada kotak dialog
Workspace Details , salinCIDR block vSwitch.
-
-
Tambahkan Blok CIDR ke daftar putih alamat IP instans Kafka Anda.
Tambahkan Blok CIDR ke daftar putih untuk titik akhir dengan jaringan
VPC .
-
Tambahkan Blok CIDR ke daftar putih alamat IP instans Hologres Anda.
Langkah 2: Siapkan data uji untuk instans ApsaraMQ for Kafka
Gunakan konektor Faker Realtime Compute for Apache Flink sebagai generator data dan tulis data tersebut ke instans ApsaraMQ for Kafka Anda.
Anda memerlukan nilai titik akhir
-
Buat topik bernama
users di konsol ApsaraMQ for Kafka. -
Kembangkan pekerjaan yang menulis data ke topik Kafka Anda.
-
Masuk ke konsol manajemen Realtime Compute for Apache Flink.
-
Temukan ruang kerja target dan klik
Console pada kolomActions . -
Pada panel navigasi kiri, pilih
Development >ETL . Pada halaman yang muncul, klikNew . -
Pada kotak dialog
New Draft , pilihBlank Stream Draft lalu klikNext . Konfigurasikan draft:Item konfigurasi
Contoh
Deskripsi
Name kafka-data-input Nama draft SQL. Catatan: Nama draft harus unik dalam namespace saat ini.
Location Development Folder tempat file kode draft disimpan. Secara default, file kode draft disimpan di folder
Development . Anda juga dapat mengklik ikon di sebelah kanan folder yang sudah ada untuk membuat subfolder.Engine Version vvr-8.0.11-flink-1.17 Pilih versi engine untuk draft dari daftar drop-down.
-
Klik
Create . -
Salin dan tempel potongan kode berikut ke editor SQL, lalu ganti nilai placeholder untuk
properties.bootstrap.servers dengan titik akhir Kafka aktual Anda (lihat catatan di atas).CREATE TEMPORARY TABLE source ( id INT, first_name STRING, last_name STRING, `address` ROW<`country` STRING, `state` STRING, `city` STRING>, event_time TIMESTAMP ) WITH ( 'connector' = 'faker', 'number-of-rows' = '100', 'rows-per-second' = '10', 'fields.id.expression' = '#{number.numberBetween ''0'',''1000''}', 'fields.first_name.expression' = '#{name.firstName}', 'fields.last_name.expression' = '#{name.lastName}', 'fields.address.country.expression' = '#{Address.country}', 'fields.address.state.expression' = '#{Address.state}', 'fields.address.city.expression' = '#{Address.city}', 'fields.event_time.expression' = '#{date.past ''15'',''SECONDS''}' ); CREATE TEMPORARY TABLE sink ( id INT, first_name STRING, last_name STRING, `address` ROW<`country` STRING, `state` STRING, `city` STRING>, `timestamp` TIMESTAMP METADATA ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092', 'topic' = 'users', 'format' = 'json' ); INSERT INTO sink SELECT * FROM source; Item konfigurasi
Contoh
Deskripsi
properties.bootstrap.servers alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092 Titik akhir broker Kafka. Format:
host:port,host:port,host:port . Lihat catatan di atas untuk instruksi pengambilannya.topic users Nama topik Kafka.
-
-
Jalankan pekerjaan.
-
Di pojok kanan atas editor SQL, klik
Deploy . -
Pada kotak dialog
Deploy draft , klikConfirm . -
Buka
O&M >Deployments , temukan penerapan target, lalu klikStart pada kolomActions . Untuk informasi tentang parameter yang harus dikonfigurasi saat memulai penerapan, lihat Mulai penerapan. -
Pada halaman
Deployments , lihat status penerapan.
Konektor Faker menyediakan aliran data terbatas, sehingga penerapan berpindah ke status
FINISHED sekitar satu menit setelah mencapai statusRUNNING . Saat selesai, data telah ditulis ke topik Kafka tujuan. Berikut contoh pesan berformat JSON yang ditulis ke ApsaraMQ for Kafka:{ "id": 765, "first_name": "Barry", "last_name": "Pollich", "address": { "country": "United Arab Emirates", "state": "Nevada", "city": "Powlowskifurt" } }
-
Langkah 3: Buat katalog Hologres
Langkah ini hanya diperlukan untuk
Untuk sinkronisasi tabel tunggal menggunakan CTAS, buat katalog Hologres sebagai katalog tujuan di konsol pengembangan Realtime Compute for Apache Flink. Bagian ini mencakup item konfigurasi penting. Untuk detail lengkap, lihat Buat katalog Hologres.
|
Item konfigurasi |
Deskripsi |
|---|---|
|
|
Masukkan nama kustom; contoh ini menggunakan |
|
|
Titik akhir instans Hologres Anda. |
|
|
ID AccessKey Akun Alibaba Cloud Anda. |
|
|
Secret AccessKey Akun Alibaba Cloud Anda. |
|
|
Masukkan nama database yang sudah ada di Hologres. Contoh ini menggunakan |
Langkah 4: Kembangkan dan mulai pekerjaan sinkronisasi data
Pilih metode sinkronisasi
Sebelum menulis kode, pilih metode yang sesuai dengan kasus penggunaan Anda:
|
CTAS |
INSERT INTO |
|
|---|---|---|
|
Pembuatan tabel |
Flink membuat tabel Hologres secara otomatis |
Anda membuat tabel secara manual di Hologres terlebih dahulu |
|
Penanganan JSON |
Kolom bersarang diekspansi otomatis ( |
Anda dapat memetakan JSON bersarang langsung ke kolom JSONB native |
|
Evolusi skema |
Pembaruan skema Hologres dilakukan secara otomatis ketika kolom bersarang baru muncul |
Pembaruan skema manual diperlukan |
|
Memerlukan Langkah 3 |
Ya — katalog Hologres harus dibuat terlebih dahulu |
Tidak — koneksi ditentukan langsung dalam klausa WITH |
|
Paling cocok untuk |
Pengaturan cepat; pipeline fleksibel terhadap skema |
Kontrol penuh atas tipe kolom; optimalisasi JSONB |
Buat pekerjaan sinkronisasi
-
Masuk ke konsol manajemen Realtime Compute for Apache Flink.
-
Temukan ruang kerja target dan klik
Console pada kolomActions . -
Pada panel navigasi kiri, pilih
Development >ETL . Pada halaman yang muncul, klikNew . -
Pada kotak dialog
New Draft , pilihBlank Stream Draft lalu klikNext . Konfigurasikan draft:Item konfigurasi
Contoh
Deskripsi
Name flink-quickstart-test Nama draft SQL. Catatan: Nama draft harus unik dalam namespace saat ini.
Location Development Folder tempat file kode draft disimpan. Secara default, file kode draft disimpan di folder
Development . Anda juga dapat mengklik ikon di sebelah kanan folder yang sudah ada untuk membuat subfolder.Engine Version vvr-8.0.11-flink-1.17 Pilih versi engine untuk draft dari daftar drop-down.
-
Klik
Create . -
Salin dan tempel salah satu potongan kode berikut ke editor SQL lalu ganti nilai placeholder dengan nilai aktual Anda.
Metode 1: CTAS
Pernyataan CREATE TABLE AS (CTAS) secara otomatis membuat tabel
Mendeklarasikan
Ganti nilai placeholder:
|
Item konfigurasi |
Contoh |
Deskripsi |
|---|---|---|
|
|
|
Titik akhir broker Kafka. Format: |
|
|
|
Nama topik Kafka. |
Metode 2: INSERT INTO
Gunakan pernyataan INSERT INTO ketika Anda ingin menyimpan data JSON bersarang sebagai kolom JSONB native di Hologres. Metode ini mengharuskan Anda membuat tabel
Ganti nilai placeholder:
|
Item konfigurasi |
Contoh |
Deskripsi |
|---|---|---|
|
|
|
Titik akhir broker Kafka. Format: |
|
|
|
Nama topik Kafka. |
|
|
|
Titik akhir instans Hologres. Format: |
|
|
|
ID AccessKey Akun Alibaba Cloud Anda. Penting: Untuk melindungi kredensial Anda, hindari hardcoding pasangan AccessKey dalam teks biasa; gunakan variabel sebagai gantinya. Untuk informasi selengkapnya, lihat Kelola variabel. |
|
|
|
Secret AccessKey Akun Alibaba Cloud Anda. |
|
|
|
Nama database Hologres. |
|
|
|
Nama tabel Hologres. Catatan: Jika Anda menggunakan INSERT INTO, Anda harus membuat tabel |
Terapkan dan jalankan pekerjaan
-
Simpan draft.
-
Klik
Deploy . -
Buka
O&M >Deployments , temukan penerapan target, lalu klikStart pada kolomActions . Untuk informasi tentang parameter yang harus dikonfigurasi saat memulai penerapan, lihat Mulai penerapan. -
Pada halaman
Deployments , lihat status dan detail penerapan.
Langkah 5: Lihat hasil sinkronisasi data penuh
Setelah pekerjaan sinkronisasi mencapai status
-
Masuk ke Konsol Hologres.
-
Pada halaman
Instances , klik nama instans target. -
Di pojok kanan atas halaman, klik
Connect to Instance . -
Pada tab
Metadata Management , lihat skema dan data tabelsync_kafka_users .
-
Skema tabel: Klik dua kali nama tabel
sync_kafka_users untuk melihat skema tabel.
CatatanDeklarasikan bidang
partition danoffset Kafka sebagai kunci primer untuk tabel Hologres. Jika data ditransmisikan ulang karena failover penerapan, hanya satu salinan data dengan nilaipartition danoffset yang sama yang disimpan. -
Data tabel: Di pojok kanan atas halaman tabel
sync_kafka_users , klikQuery table . Di editor SQL, jalankan pernyataan berikut lalu klikRunning :SELECT * FROM public.sync_kafka_users order by partition, "offset"; Anda seharusnya melihat 100 baris data, sesuai dengan pengaturan
'number-of-rows' = '100' dari konektor Faker.
-
Langkah 6: Periksa apakah perubahan skema tabel disinkronkan secara otomatis
Langkah ini menunjukkan evolusi skema: Anda mengirim pesan Kafka yang berisi kolom bersarang baru dan memverifikasi bahwa Hologres secara otomatis menambahkan kolom yang sesuai.
-
Di konsol ApsaraMQ for Kafka, kirim pesan yang berisi kolom baru.
-
Masuk ke Konsol ApsaraMQ for Kafka.
-
Pada halaman
Instances , klik nama instans target. -
Pada panel navigasi kiri, klik
Topics . Temukan topik bernamausers . -
Klik
Send Message pada kolomActions . -
Pada panel
Start to Send and Consume Message , konfigurasikan parameter sebagai berikut.
Item konfigurasi
Contoh
Method of Sending Pilih
Console .Message Key Masukkan
flinktest .Message Content Salin dan tempel konten JSON berikut ke field
Message Content . Catatan:house-points adalah kolom bersarang baru dalam pesan ini.Send to Specified Partition Pilih
Yes .Partition ID Masukkan
0 .{ "id": 100001, "first_name": "Dennise", "last_name": "Schuppe", "address": { "country": "Isle of Man", "state": "Montana", "city": "East Coleburgh" }, "house-points": { "house": "Pukwudgie", "points": 76 } } -
Klik
OK .
-
-
Di konsol Hologres, lihat perubahan skema dan data pada tabel
sync_kafka_users .-
Masuk ke Konsol Hologres.
-
Pada halaman
Instances , klik nama instans target. -
Di pojok kanan atas halaman, klik
Connect to Instance . -
Pada tab
Metadata Management , klik dua kali nama tabelsync_kafka_users . -
Di pojok kanan atas halaman tabel
sync_kafka_users , klikQuery table . Di editor SQL, jalankan pernyataan berikut lalu klikRunning :SELECT * FROM public.sync_kafka_users order by partition, "offset"; -
Lihat data tabel.
Hasil menunjukkan bahwa catatan data dengan
id 100001 ditulis ke Hologres. Kolomhouse-points.house danhouse-points.points juga ditambahkan ke Hologres secara otomatis.CatatanKarena
json.infer-schema.flatten-nested-columns.enable diatur ketrue dalam klausa WITH untuk tabel Kafkausers , Realtime Compute for Apache Flink secara otomatis mengekspansi kolom bersarang baru saat muncul. Jalur yang digunakan untuk mengakses setiap kolom menjadi nama kolomnya.
-
Ringkasan dan langkah selanjutnya
Anda telah berhasil membangun pipeline ingesti log waktu nyata yang mengalirkan catatan pengguna dari ApsaraMQ for Kafka ke Hologres menggunakan Realtime Compute for Apache Flink—dengan evolusi skema otomatis yang diaktifkan.
Dalam tutorial ini, Anda:
Mengonfigurasi daftar putih jaringan agar Flink dapat mengakses Kafka dan Hologres.
Menyebarluaskan generator data berbasis Faker yang menulis 100 catatan sintetis ke topik Kafka.
Membuat pekerjaan Flink SQL yang terus-menerus menyinkronkan data dari Kafka ke Hologres.
Memverifikasi bahwa perubahan skema (kolom bersarang baru) secara otomatis disebarkan ke Hologres.
Untuk melangkah lebih jauh, jelajahi topik-topik berikut:
Pernyataan CREATE TABLE AS (CTAS) — Pelajari opsi dan perilaku CTAS tingkat lanjut.
Konektor Kafka — Jelajahi opsi konfigurasi konektor lengkap untuk beban kerja produksi.
Konfigurasikan penerapan — Sesuaikan alokasi sumber daya, checkpointing, dan pengaturan penerapan lainnya untuk penggunaan produksi.
Referensi
-
Untuk informasi tentang peningkatan performa pekerjaan dengan memodifikasi paralelisme node dan sumber daya, lihat Konfigurasikan penerapan.