Topik ini menjelaskan cara membangun pekerjaan sinkronisasi data dari Kafka ke Hologres secara cepat untuk mengingesti data log waktu nyata menggunakan konsol Realtime Compute for Apache Flink.
Prasyarat
-
Pastikan pengguna RAM atau peran RAM Anda memiliki izin yang diperlukan untuk mengakses konsol Realtime Compute for Apache Flink. Untuk informasi selengkapnya, lihat Izin.
-
Anda telah membuat ruang kerja Flink. Untuk informasi selengkapnya, lihat Buat ruang kerja.
-
Penyimpanan hulu dan hilir
-
Anda telah membuat instans ApsaraMQ for Kafka. Untuk informasi selengkapnya, lihat Langkah 2: Beli dan terapkan instans.
-
Anda telah membuat instans Hologres. Untuk informasi selengkapnya, lihat Beli instans Hologres.
CatatanInstans ApsaraMQ for Kafka dan Hologres harus berada di Wilayah dan VPC yang sama dengan ruang kerja Realtime Compute for Apache Flink Anda. Jika tidak, Anda harus menetapkan konektivitas jaringan. Untuk informasi selengkapnya, lihat Akses layanan lintas VPC atau Akses internet.
-
Langkah 1: Konfigurasikan daftar putih IP
Untuk mengizinkan Flink mengakses instans Kafka dan Hologres Anda, tambahkan Blok CIDR ruang kerja Flink ke daftar putih IP Kafka dan Hologres.
-
Dapatkan Blok CIDR VPC ruang kerja Flink Anda.
-
Masuk ke konsol Realtime Compute for Apache Flink.
-
Pada kolom Actions dari workspace target, pilih .
-
Pada kotak dialog Workspace Details, temukan CIDR block VSwitch.
Kotak dialog ini menampilkan informasi dasar ruang kerja dan daftar VSwitch. Pada kolom CIDR block, lihat blok CIDR untuk setiap zona ketersediaan. Catat blok CIDR ini untuk langkah berikutnya.
-
-
Tambahkan Blok CIDR ruang kerja Flink ke daftar putih IP instans Kafka Anda.
Anda harus mengonfigurasi daftar putih IP untuk titik akhir VPC. Untuk langkah-langkah selengkapnya, lihat Konfigurasi daftar putih IP. Di kotak dialog pengeditan daftar putih, klik Add Whitelist IP untuk menambahkan Blok CIDR.
-
Tambahkan Blok CIDR ruang kerja Flink ke daftar putih IP instans Hologres Anda.
Masuk ke instans Hologres Anda dan konfigurasikan daftar putih IP-nya. Untuk langkah-langkah selengkapnya, lihat daftar putih IP. Pada halaman konfigurasi daftar putih di Pusat Keamanan HoloWeb, masukkan Blok CIDR pada bidang IP Address dalam kotak dialog Edit IP Whitelist, lalu klik OK.
Langkah 2: Siapkan data uji Kafka
Gunakan konektor Faker Realtime Compute for Apache Flink untuk menghasilkan dan menulis data ke ApsaraMQ for Kafka. Ikuti langkah-langkah berikut di konsol pengembangan Realtime Compute for Apache Flink.
-
Di konsol ApsaraMQ for Kafka, buat topik bernama users.
Untuk informasi selengkapnya, lihat Buat topik.
-
Buat pekerjaan untuk menulis data ke ApsaraMQ for Kafka.
-
Masuk ke konsol pengembangan Realtime Compute for Apache Flink.
-
Pada kolom Actions ruang kerja target, klik Console.
-
Di panel navigasi sebelah kiri, pilih .
-
Klik ikon
, lalu klik New Stream Draft. Masukkan File Name dan pilih Engine Version.Realtime Compute for Apache Flink juga menyediakan berbagai templat kode dan sinkronisasi data. Setiap templat mencakup kasus penggunaan spesifik, contoh kode, dan instruksi. Klik templat untuk mempelajari fitur dan sintaks Realtime Compute for Apache Flink secara cepat serta menerapkan logika bisnis Anda. Untuk informasi selengkapnya, lihat Templat kode dan Templat sinkronisasi data.
Parameter
Deskripsi
Contoh
File Name
Nama pekerjaan.
CatatanNama pekerjaan harus unik dalam ruang kerja saat ini.
flink-test
Engine Version
Versi engine Flink untuk pekerjaan saat ini.
Pilih versi yang diberi label Recommended atau Stable untuk keandalan dan kinerja yang lebih tinggi. Untuk informasi selengkapnya tentang versi engine, lihat Catatan rilis dan Versi engine.
vvr-8.0.8-flink-1.17
-
Klik Create.
-
Tulis pernyataan SQL untuk pekerjaan tersebut.
Salin kode berikut ke editor dan modifikasi parameter sesuai lingkungan Anda.
CREATE TEMPORARY TABLE source ( id INT, first_name STRING, last_name STRING, `address` ROW<`country` STRING, `state` STRING, `city` STRING>, event_time TIMESTAMP ) WITH ( 'connector' = 'faker', 'number-of-rows' = '100', 'rows-per-second' = '10', 'fields.id.expression' = '#{number.numberBetween ''0'',''1000''}', 'fields.first_name.expression' = '#{name.firstName}', 'fields.last_name.expression' = '#{name.lastName}', 'fields.address.country.expression' = '#{Address.country}', 'fields.address.state.expression' = '#{Address.state}', 'fields.address.city.expression' = '#{Address.city}', 'fields.event_time.expression' = '#{date.past ''15'',''SECONDS''}' ); CREATE TEMPORARY TABLE sink ( id INT, first_name STRING, last_name STRING, `address` ROW<`country` STRING, `state` STRING, `city` STRING>, `timestamp` TIMESTAMP METADATA ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092', 'topic' = 'users', 'format' = 'json', 'properties.enable.idempotence'='false' ); INSERT INTO sink SELECT * FROM source;Tabel berikut menjelaskan parameter yang perlu Anda modifikasi.
Parameter
Nilai contoh
Deskripsi
properties.bootstrap.servers
alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092
Titik akhir broker ApsaraMQ for Kafka.
Daftar titik akhir broker yang dipisahkan koma dalam format 'host:port'. Anda dapat menemukan titik akhir Domain Name VPC di bagian Endpoint Information pada halaman Instance Details.
topic
users
Nama topik ApsaraMQ for Kafka.
-
-
Jalankan pekerjaan.
-
Pada halaman , klik Deploy.
-
Pada kotak dialog Deploy draft, klik Confirm.
-
Konfigurasikan sumber daya untuk pekerjaan. Untuk informasi selengkapnya, lihat Konfigurasikan sumber daya untuk pekerjaan.
-
Pada halaman , temukan penyebaran target, lalu klik Start pada kolom Actions. Untuk informasi selengkapnya mengenai konfigurasi startup, lihat Mulai penyebaran.
-
Pada halaman Deployments, Anda dapat memantau informasi waktu proses dan status penyebaran.
Karena sumber Faker menghasilkan aliran terbatas, status penyebaran berubah menjadi FINISHED sekitar satu menit setelah dimulai. Saat penyebaran selesai, data telah ditulis ke topik 'users'. Kode berikut menunjukkan contoh data berformat JSON.
{ "id": 765, "first_name": "Barry", "last_name": "Pollich", "address": { "country": "United Arab Emirates", "state": "Nevada", "city": "Powlowskifurt" } }
-
Langkah 3: Buat dan jalankan pekerjaan sinkronisasi data
Flink CDC
-
Masuk ke konsol pengembangan Realtime Compute for Apache Flink untuk membuat pekerjaan sinkronisasi data.
-
Masuk ke konsol Realtime Compute for Apache Flink.
-
Pada kolom Actions ruang kerja target, klik Console.
-
Di panel navigasi sebelah kiri, pilih .
-
Klik ikon
lalu klik New ETL Draft. Masukkan name dan pilih engine version.Parameter
Deskripsi
Contoh
Name
Nama pekerjaan.
CatatanNama pekerjaan harus unik dalam proyek.
flink-test
Engine Version
Versi engine Flink untuk pekerjaan.
Pilih versi yang diberi label Recommended atau Stable. Versi ini memberikan keandalan dan kinerja yang lebih tinggi. Untuk informasi selengkapnya tentang versi engine, lihat Catatan rilis dan Versi engine.
vvr-8.0.8-flink-1.17
-
Klik Create.
-
-
Tulis pekerjaan Flink CDC. Salin kode berikut ke editor dan perbarui parameter sesuai lingkungan Anda.
Pekerjaan berikut menyinkronkan data tabel berformat JSON dari topik users di Kafka ke tabel users dalam skema test_schema di database flink_test_db di Hologres.
source: type: kafka name: Kafka Source properties.bootstrap.servers: alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092 topic: users scan.startup.mode: earliest-offset value.format: json json.infer-schema.flatten-nested-columns.enable: true sink: type: hologres name: Hologres Sink endpoint: hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80 dbname: flink_test_db username: ****** password: ** sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT transform: - source-table: \.*.\.* projection: \* primary-keys: id route: - source-table: users sink-table: test_schema.usersTabel berikut menjelaskan parameter yang perlu dimodifikasi.
Parameter
Contoh
Deskripsi
properties.bootstrap.servers
alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092
Alamat broker Kafka.
Formatnya adalah daftar entri host:port yang dipisahkan koma. Anda dapat memperoleh Domain Name Endpoint untuk jenis jaringan VPC dari bagian Network Information pada halaman instance details.
topic
users
Nama topik Kafka.
endpoint
hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80
Titik akhir instans Hologres.
Formatnya adalah <ip>:<port>. Anda dapat memperoleh titik akhir VPC dari bagian Network Information pada halaman instance details di konsol Hologres.
username
**
Username dan password untuk database Hologres. Masukkan ID AccessKey dan AccessKey Secret Akun Alibaba Cloud Anda.
PentingUntuk mencegah kebocoran pasangan AccessKey Anda, gunakan manajemen variabel untuk menentukan ID AccessKey dan AccessKey Secret Anda. Untuk informasi selengkapnya, lihat Kelola variabel.
password
**
dbname
flink_test_db
Nama database Hologres.
source-table
users
Tabel sumber. Secara default, nama topik digunakan.
sink-table
test_schema.users
Tabel tujuan. Tentukan tabel dalam format
schema.table_name. -
Klik Save.
-
Pada halaman , klik Deploy.
-
Pada halaman , temukan penyebaran target dan klik Start pada kolom Actions. Untuk informasi selengkapnya tentang konfigurasi startup pekerjaan, lihat Jalankan pekerjaan.
Setelah pekerjaan dimulai, Anda dapat melihat informasi waktu proses dan statusnya di halaman Deployments. Halaman ini menampilkan daftar penyebaran dengan metrik seperti Status, health score, CPU, dan memory, serta menyediakan aksi seperti Start dan Stop.
SQL
-
Masuk ke konsol pengembangan Realtime Compute for Apache Flink untuk membuat pekerjaan sinkronisasi data.
-
Masuk ke konsol Realtime Compute for Apache Flink.
-
Pada kolom Actions ruang kerja target, klik Console.
-
Di panel navigasi sebelah kiri, pilih , lalu klik New.
-
Klik ikon
lalu klik New Stream Draft. Masukkan name dan pilih engine version.Parameter
Deskripsi
Contoh
Name
Nama pekerjaan.
CatatanNama pekerjaan harus unik dalam proyek.
flink-test
Engine Version
Versi engine Flink untuk pekerjaan.
Pilih versi yang diberi label Recommended atau Stable. Versi ini memberikan keandalan dan kinerja yang lebih tinggi. Untuk informasi selengkapnya tentang versi engine, lihat Catatan rilis dan Versi engine.
vvr-8.0.8-flink-1.17
-
Klik Create.
-
-
Tulis pekerjaan SQL. Salin kode berikut ke editor SQL dan perbarui parameter sesuai lingkungan Anda.
Anda dapat menggunakan pernyataan INSERT INTO untuk menyinkronkan data dari topik users di Kafka ke tabel users di database flink_test_db Hologres.
Hologres menyediakan optimasi khusus untuk tipe data JSON dan JSONB. Anda dapat menggunakan pernyataan INSERT INTO untuk menulis data JSON bersarang ke Hologres.
Metode ini mengharuskan Anda terlebih dahulu membuat tabel users di Hologres, lalu menjalankan pernyataan SQL berikut untuk menulis data ke tabel tersebut.
CREATE TEMPORARY TABLE kafka_users ( `id` INT NOT NULL, `address` STRING, -- Data dalam kolom ini adalah JSON bersarang. `offset` BIGINT NOT NULL METADATA, `partition` BIGINT NOT NULL METADATA, `timestamp` TIMESTAMP METADATA, `date` AS CAST(`timestamp` AS DATE), `country` AS JSON_VALUE(`address`, '$.country') ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092', 'topic' = 'users', 'format' = 'json', 'json.infer-schema.flatten-nested-columns.enable' = 'true', -- Secara otomatis memperluas kolom bersarang. 'scan.startup.mode' = 'earliest-offset' ); CREATE TEMPORARY TABLE holo ( `id` INT NOT NULL, `address` STRING, `offset` BIGINT, `partition` BIGINT, `timestamp` TIMESTAMP, `date` DATE, `country` STRING ) WITH ( 'connector' = 'hologres', 'endpoint' = 'hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80', 'username' = '******', 'password' = '******', 'dbname' = 'flink_test_db', 'tablename' = 'users' ); INSERT INTO holo SELECT * FROM kafka_users;Tabel berikut menjelaskan parameter yang perlu dimodifikasi.
Parameter
Contoh
Deskripsi
properties.bootstrap.servers
alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092
Alamat broker Kafka.
Formatnya adalah daftar entri host:port yang dipisahkan koma. Anda dapat memperoleh Domain Name Endpoint untuk jenis jaringan VPC dari bagian Network Information pada halaman instance details.
topic
users
Nama topik Kafka.
endpoint
hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80
Titik akhir instans Hologres.
Formatnya adalah <ip>:<port>. Anda dapat memperoleh titik akhir VPC dari bagian Network Information pada halaman instance details di konsol Hologres.
username
******
Username dan password untuk database Hologres. Masukkan ID AccessKey dan AccessKey Secret Akun Alibaba Cloud Anda.
PentingUntuk mencegah kebocoran pasangan AccessKey Anda, gunakan manajemen variabel untuk menentukan ID AccessKey dan AccessKey Secret Anda. Untuk informasi selengkapnya, lihat Kelola variabel.
password
******
dbname
flink_test_db
Nama database Hologres.
tablename
users
Nama tabel Hologres.
Catatan-
Jika Anda menggunakan pernyataan INSERT INTO untuk menyinkronkan data, Anda harus membuat tabel users beserta kolom-kolomnya di database tujuan terlebih dahulu.
-
Jika skema bukan public, Anda harus menentukan parameter tablename dalam format
schema.table_name.
-
-
Klik Save.
-
Pada halaman , klik Deploy.
-
Pada halaman , temukan penyebaran target dan klik Start pada kolom Actions. Untuk informasi selengkapnya tentang konfigurasi startup pekerjaan, lihat Jalankan pekerjaan.
Setelah pekerjaan dimulai, Anda dapat melihat informasi waktu proses dan statusnya di halaman Deployments. Halaman ini menampilkan daftar penyebaran dengan metrik seperti Status, health score, CPU, dan memory, serta menyediakan aksi seperti Start dan Stop.
Langkah 4: Lihat hasil sinkronisasi lengkap
Masuk ke Konsol Manajemen Hologres.
-
Pada halaman Instances, klik nama instans target Anda.
-
Di pojok kanan atas halaman, klik Connect to Instance.
-
Pada tab Metadata Management, lihat skema tabel dan data tabel users yang telah disinkronkan di database flink_test_db.
Di pohon navigasi sebelah kiri, perluas nama instans Anda > flink_test_db > test_schema > Tables. Tabel users yang telah disinkronkan akan muncul.
Skema dan data tabel yang telah disinkronkan adalah sebagai berikut.
-
Skema tabel
Klik ganda nama tabel users untuk melihat skema tabel.
Skema tabel users berisi bidang-bidang berikut: id (BIGINT, primary key), first_name (TEXT), last_name (TEXT), address.country (TEXT), address.state (TEXT), dan address.city (TEXT).
CatatanSelama sinkronisasi data lengkap, kami menyarankan mendefinisikan partisi dan offset metadata Kafka sebagai primary key untuk tabel Hologres. Hal ini mencegah duplikasi data jika pekerjaan gagal dan mentransmisikan ulang data.
-
Data tabel
Di pojok kanan atas halaman tabel users, klik Query table. Masukkan perintah berikut dan klik Run.
SELECT * FROM test_schema.users;Perintah tersebut mengembalikan hasil berikut.
Kueri mengembalikan beberapa baris. Hal ini mengonfirmasi bahwa catatan berhasil disinkronkan ke tabel users. Setiap baris yang dikembalikan berisi data lengkap untuk kolom id, first_name, last_name, address.country, address.state, dan address.city.
-
Langkah 5: Amati sinkronisasi skema otomatis
-
Di konsol ApsaraMQ for Kafka, kirim pesan secara manual yang berisi kolom baru.
-
Masuk ke ApsaraMQ for Kafka console.
-
Pada halaman Instances, klik nama instans target.
-
Pada halaman Topics, klik nama topik target (users).
-
Klik Send Message.
-
Konfigurasikan pesan.
Pada kotak dialog Start to Send and Consume Message, konfigurasikan parameter sebagai berikut.
Parameter
Contoh
Method of Sending
Pilih Console.
Message Key
Masukkan flinktest.
Message Content
Salin dan tempel konten JSON berikut ke bidang Message Content.
{ "id": 100001, "first_name": "Dennise", "last_name": "Schuppe", "address": { "country": "Isle of Man", "state": "Montana", "city": "East Coleburgh" }, "house-points": { "house": "Pukwudgie", "points": 76 } }CatatanDalam contoh ini, house-points adalah kolom bersarang baru.
Send to Specified Partition
Pilih Yes.
Partition ID
Masukkan 0.
-
Klik OK.
-
-
Di konsol Hologres, lihat perubahan skema dan data pada tabel users.
-
Masuk ke konsol Hologres.
-
Pada halaman Instances, klik nama instans target.
-
Di pojok kanan atas halaman, klik Connect to Instance.
-
Pada tab Metadata Management, klik ganda nama tabel users.
-
Klik Query table, masukkan pernyataan berikut, lalu klik Running.
SELECT * FROM test_schema.users; -
Lihat hasil kueri.
Kueri mengembalikan hasil berikut.
Hasil menunjukkan bahwa catatan dengan ID 100001 berhasil ditulis ke Hologres, dan dua kolom baru, house-points.house dan house-points.points, ditambahkan ke tabel Hologres.
CatatanPesan yang dikirim ke ApsaraMQ for Kafka hanya berisi satu kolom bersarang: house-points. Namun, karena json.infer-schema.flatten-nested-columns.enable ditentukan dalam klausa WITH, Realtime Compute for Apache Flink secara otomatis meratakan kolom ini, menggunakan jalur akses bidang bersarang sebagai nama kolom baru.
-
Referensi
-
Untuk informasi tentang pernyataan CREATE TABLE AS (CTAS), lihat Pernyataan CREATE TABLE AS (CTAS).
-
Untuk informasi tentang penggunaan Message Queue for Apache Kafka sebagai tabel sumber atau hasil, lihat Message Queue for Apache Kafka.
-
Untuk meningkatkan kinerja pekerjaan dengan menyesuaikan paralelisme node dan sumber daya, lihat Konfigurasikan penyebaran pekerjaan.