Topik ini menjelaskan cara menggunakan Realtime Compute for Apache Flink untuk menyinkronkan data log dari Kafka ke Hologres secara real-time.
Prasyarat
Pengguna Resource Access Management (RAM) atau Peran RAM yang digunakan untuk mengakses konsol pengembangan Realtime Compute for Apache Flink memiliki izin yang diperlukan. Untuk informasi lebih lanjut, lihat Manajemen Izin.
Sebuah ruang kerja Flink telah dibuat. Untuk informasi lebih lanjut, lihat Aktifkan Realtime Compute for Apache Flink.
Instansi penyimpanan hulu dan hilir telah dibuat.
CatatanInstansi ApsaraMQ for Kafka dan Hologres harus berada di VPC yang sama dengan ruang kerja Flink Anda. Jika tidak berada di VPC yang sama, Anda perlu membuat koneksi di antara mereka. Untuk informasi lebih lanjut, lihat Bagaimana Realtime Compute for Apache Flink mengakses layanan lintas VPC? atau Bagaimana Realtime Compute for Apache Flink mengakses Internet?
Langkah 1: Konfigurasi daftar putih alamat IP
Untuk memungkinkan ruang kerja Flink mengakses instansi Kafka dan Hologres, tambahkan blok CIDR dari vSwitch tempat ruang kerja Flink berada ke daftar putih instansi Kafka dan Hologres.
Dapatkan blok CIDR vSwitch tempat ruang kerja Flink berada.
Masuk ke Konsol Realtime Compute for Apache Flink.
Temukan workspace target dan pilih di kolom Actions.
Di kotak dialog Workspace Details, salin CIDR block dari vSwitch.

Tambahkan blok CIDR ke daftar putih IP instansi Kafka Anda.
Atur daftar putih untuk titik akhir dengan jaringan VPC.

Tambahkan blok CIDR ke daftar putih IP instansi Hologres Anda.

Langkah 2: Persiapkan data uji untuk instansi ApsaraMQ for Kafka
Gunakan Faker connector dari Realtime Compute for Apache Flink sebagai generator data dan tulis data tersebut ke instansi ApsaraMQ for Kafka. Anda dapat melakukan langkah-langkah berikut untuk menulis data ke instansi Kafka Anda di konsol pengembangan Realtime Compute for Apache Flink.
Buat topik bernama users di konsol ApsaraMQ for Kafka.
Kembangkan pekerjaan yang menulis data ke topik Kafka Anda.
Masuk ke konsol manajemen Realtime Compute for Apache Flink.
Temukan ruang kerja target dan klik Console di kolom Actions.
Di panel navigasi sisi kiri, pilih . Pada halaman yang muncul, klik New.
Dalam kotak dialog New Draft, pilih template, seperti Blank Stream Draft. Klik Next, lalu konfigurasikan draf sesuai tabel di bawah ini:
Item konfigurasi
Contoh
Deskripsi
Name
kafka-data-input
Nama draf SQL.
CatatanNama draf harus unik di namespace saat ini.
Location
Pengembangan
Folder tempat file kode draf disimpan. Secara default, file kode draf disimpan di folder Development.
Anda juga bisa mengklik ikon
di sebelah kanan folder yang ada untuk membuat subfolder. Engine Version
vvr-8.0.11-flink-1.17
Pilih versi mesin untuk draf dari daftar drop-down.
Klik Create.
Tulis kode.
Salin dan tempel potongan kode berikut ke editor SQL, lalu buat modifikasi yang diperlukan.
CREATE TEMPORARY TABLE source ( id INT, first_name STRING, last_name STRING, `address` ROW<`country` STRING, `state` STRING, `city` STRING>, event_time TIMESTAMP ) WITH ( 'connector' = 'faker', 'number-of-rows' = '100', 'rows-per-second' = '10', 'fields.id.expression' = '#{number.numberBetween ''0'',''1000''}', 'fields.first_name.expression' = '#{name.firstName}', 'fields.last_name.expression' = '#{name.lastName}', 'fields.address.country.expression' = '#{Address.country}', 'fields.address.state.expression' = '#{Address.state}', 'fields.address.city.expression' = '#{Address.city}', 'fields.event_time.expression' = '#{date.past ''15'',''SECONDS''}' ); CREATE TEMPORARY TABLE sink ( id INT, first_name STRING, last_name STRING, `address` ROW<`country` STRING, `state` STRING, `city` STRING>, `timestamp` TIMESTAMP METADATA ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092', 'topic' = 'users', 'format' = 'json' ); INSERT INTO sink SELECT * FROM source;Ganti nilai placeholder dalam perintah di atas dengan nilai aktual Anda:
Item konfigurasi
Contoh
Deskripsi
properties.bootstrap.servers
alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092
Alamat IP atau titik akhir broker Kafka.
Format: host:port,host:port,host:port. Pisahkan beberapa pasangan host:port dengan koma (,).
CatatanUntuk mendapatkan titik akhir instansi ApsaraMQ for Kafka Anda, ikuti langkah-langkah berikut:
Di konsol ApsaraMQ for Kafka, klik nama instansi target.
Di bagian Endpoint Information pada halaman Instance Details yang muncul, temukan titik akhir dengan jaringan VPC.
Salin nilai di kolom Domain Name.
topic
users
Nama topik Kafka.
Mulai pekerjaan.
Di sudut kanan atas editor SQL, klik Deploy.
Di kotak dialog Deploy draft, klik Confirm.
Pergi ke , temukan penyebaran target, dan klik Start di kolom Actions.
Untuk informasi tentang parameter yang harus dikonfigurasi saat memulai penyebaran Anda, lihat Mulai penyebaran
Di halaman Deployments, lihat status penyebaran.

Konektor Faker menyediakan aliran terbatas. Oleh karena itu, penyebaran menjadi FINISHED sekitar satu menit setelah penyebaran tetap RUNNING. Saat penyebaran selesai, itu menunjukkan bahwa data telah ditulis ke topik Kafka tujuan. Berikut adalah contoh pesan JSON yang ditulis ke ApsaraMQ for Kafka.
{ "id": 765, "first_name": "Barry", "last_name": "Pollich", "address": { "country": "Uni Emirat Arab", "state": "Nevada", "city": "Powlowskifurt" } }
Langkah 3: Buat katalog Hologres
Jika Anda ingin melakukan sinkronisasi tabel tunggal, Anda harus membuat tabel tujuan di katalog tujuan. Anda dapat membuat katalog tujuan di konsol pengembangan Realtime Compute for Apache Flink. Dalam topik ini, katalog Hologres digunakan sebagai katalog tujuan. Bagian ini secara singkat menjelaskan item konfigurasi penting saat Anda membuat katalog Hologres. Untuk informasi lebih rinci, lihat Buat Katalog Hologres.
Item konfigurasi | Deskripsi |
catalog name | Masukkan nama kustom; dalam contoh ini, holo digunakan. |
endpoint | Titik akhir instansi Hologres Anda. |
username | ID AccessKey akun Alibaba Cloud. |
password | Rahasia AccessKey akun Alibaba Cloud. |
dbname | Masukkan nama database yang ada di Hologres. Contoh ini menggunakan flink_test_db. Penting Pastikan database flink_test_db yang ditentukan untuk bidang ini sudah dibuat di instansi Hologres Anda. Jika tidak, akan terjadi kesalahan. Untuk informasi lebih lanjut, lihat Buat database di dokumentasi Hologres. |
Langkah 4: Kembangkan dan mulai pekerjaan sinkronisasi data
Masuk ke konsol pengembangan Realtime Compute for Apache Flink dan kembangkan pekerjaan yang menyinkronkan data.
Masuk ke konsol manajemen Realtime Compute for Apache Flink.
Temukan ruang kerja target dan klik Console di kolom Actions.
Di panel navigasi sisi kiri, pilih . Pada halaman yang muncul, klik New.
Dalam kotak dialog New Draft, pilih template, seperti Blank Stream Draft. Klik Next, lalu konfigurasikan draf sesuai tabel di bawah ini:
Item konfigurasi
Contoh
Deskripsi
Name
flink-quickstart-test
Nama draf SQL.
CatatanNama draf harus unik di namespace saat ini.
Location
Pengembangan
Folder tempat file kode draf disimpan. Secara default, file kode draf disimpan di folder Development.
Anda juga bisa mengklik ikon
di sebelah kanan folder yang ada untuk membuat subfolder. Engine Version
vvr-8.0.11-flink-1.17
Pilih versi mesin untuk draf dari daftar drop-down.
Klik Create.
Tulis kode. Salin dan tempel potongan kode berikut ke editor SQL, lalu buat modifikasi yang diperlukan.
Gunakan salah satu metode berikut untuk menyinkronkan data dari topik Kafka users ke tabel sync_kafka_users di database flink_test_db di Hologres.
CTAS
Menggunakan pernyataan CREATE TABLE AS (CTAS) untuk sinkronisasi data menghilangkan kebutuhan untuk secara manual membuat tabel sync_kafka_users di Hologres dan menentukan tipe kolom sebagai JSON atau JSONB:
CREATE TEMPORARY TABLE kafka_users ( `id` INT NOT NULL, `address` STRING, `offset` BIGINT NOT NULL METADATA, `partition` BIGINT NOT NULL METADATA, `timestamp` TIMESTAMP METADATA, `date` AS CAST(`timestamp` AS DATE), `country` AS JSON_VALUE(`address`, '$.country'), PRIMARY KEY (`partition`, `offset`) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092', 'topic' = 'users', 'format' = 'json', 'json.infer-schema.flatten-nested-columns.enable' = 'true', -- Secara otomatis memperluas kolom bersarang. 'scan.startup.mode' = 'earliest-offset' ); CREATE TABLE IF NOT EXISTS holo.flink_test_db.sync_kafka_users WITH ( 'connector' = 'hologres' ) AS TABLE kafka_users;CatatanUntuk mencegah data duplikat ditulis ke Hologres setelah pekerjaan gagal, Anda dapat menambahkan kunci utama terkait ke tabel untuk secara unik mengidentifikasi data. Jika data dikirim ulang, Hologres memastikan hanya satu salinan data dengan nilai partisi dan offset yang sama yang disimpan.
Ganti nilai placeholder dalam perintah di atas dengan nilai aktual Anda:
Item konfigurasi
Contoh
Deskripsi
properties.bootstrap.servers
alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092
Alamat IP atau titik akhir broker Kafka.
Format: host:port,host:port,host:port. Pisahkan beberapa pasangan host:port dengan koma (,).
CatatanUntuk mendapatkan titik akhir instansi ApsaraMQ for Kafka Anda, ikuti langkah-langkah berikut:
Di konsol ApsaraMQ for Kafka, klik nama instansi target.
Di bagian Endpoint Information pada halaman Instance Details yang muncul, temukan titik akhir dengan jaringan VPC.
Salin nilai di kolom Domain Name.
topic
users
Nama topik Kafka.
INSERT INTO
Metode khusus digunakan untuk mengoptimalkan data JSON dan JSONB di Hologres. Oleh karena itu, Anda dapat menggunakan pernyataan INSERT INTO untuk menyinkronkan data JSON bersarang ke Hologres.
Metode ini mengharuskan Anda untuk secara manual membuat tabel bernama sync_kafka_users di Hologres sebelum menyinkronkan data dengan menggunakan perintah SQL berikut:
CREATE TEMPORARY TABLE kafka_users ( `id` INT NOT NULL, 'address' STRING, -- Data di kolom ini adalah data JSON bersarang. `offset` BIGINT NOT NULL METADATA, `partition` BIGINT NOT NULL METADATA, `timestamp` TIMESTAMP METADATA, `date` AS CAST(`timestamp` AS DATE), `country` AS JSON_VALUE(`address`, '$.country') ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092', 'topic' = 'users', 'format' = 'json', 'json.infer-schema.flatten-nested-columns.enable' = 'true', -- Secara otomatis memperluas kolom bersarang. 'scan.startup.mode' = 'earliest-offset' ); CREATE TEMPORARY TABLE holo ( `id` INT NOT NULL, `address` STRING, `offset` BIGINT, `partition` BIGINT, `timestamp` TIMESTAMP, `date` DATE, `country` STRING ) WITH ( 'connector' = 'hologres', 'endpoint' = 'hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80', 'username' = '************************', 'password' = '******************************', 'dbname' = 'flink_test_db', 'tablename' = 'sync_kafka_users' ); INSERT INTO holo SELECT * FROM kafka_users;Ganti nilai placeholder dalam perintah di atas dengan nilai aktual Anda:
Item konfigurasi
Contoh
Deskripsi
properties.bootstrap.servers
alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092
Alamat IP atau titik akhir broker Kafka.
Format: host:port,host:port,host:port. Pisahkan beberapa pasangan host:port dengan koma (,).
CatatanUntuk mendapatkan titik akhir instansi ApsaraMQ for Kafka Anda, ikuti langkah-langkah berikut:
Di konsol ApsaraMQ for Kafka, klik nama instansi target.
Di bagian Endpoint Information pada halaman Instance Details yang muncul, temukan titik akhir dengan jaringan VPC.
Salin nilai di kolom Domain Name.
topic
users
Nama topik Kafka.
endpoint
hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80
Titik akhir instansi Hologres.
Format: <ip>:<port>.
CatatanUntuk mendapatkan nilai titik akhir, ikuti langkah-langkah berikut:
Pergi ke Konsol Hologres.
Klik nama instansi Hologres Anda.
Di bagian Network Information pada halaman detail instansi, temukan titik akhir yang sesuai dengan Select VPC dan salin nilai titik akhir.
username
************************
Nama pengguna dan kata sandi yang digunakan untuk mengakses database Hologres. Masukkan ID AccessKey dan rahasia akun Alibaba Cloud Anda.
PentingUntuk meningkatkan keamanan kredensial Anda, hindari menuliskan pasangan AccessKey dalam teks biasa; gunakan variabel sebagai gantinya. Untuk informasi lebih lanjut, lihat Kelola variabel.
password
******************************
dbname
flink_test_db
Nama database Hologres yang ingin Anda akses.
tablename
sync_kafka_users
Nama tabel Hologres.
CatatanJika Anda menggunakan pernyataan INSERT INTO untuk menyinkronkan data, Anda harus membuat tabel sync_kafka_users dan mendefinisikan bidang yang diperlukan di database instansi Hologres tujuan terlebih dahulu.
Jika skema publik tidak digunakan, Anda harus menentukan tablename dalam format schema.tableName.
Simpan draf.
Klik Deploy.
Pergi ke , temukan penyebaran target, dan klik Start di kolom Actions.
Untuk informasi tentang parameter yang harus dikonfigurasi saat memulai penyebaran, lihat Mulai Penyebaran.
Anda dapat melihat status dan informasi lainnya tentang penyebaran di halaman Deployments setelah dimulai.

Langkah 5: Lihat hasil sinkronisasi data penuh
Masuk ke Konsol Hologres.
Di halaman Instances, klik nama instansi target.
Di sudut kanan atas halaman, klik Connect to Instance.
Di tab Metadata Management, lihat skema dan data tabel sync_kafka_users yang menerima data dari topik Kafka bernama users.

Gambar-gambar berikut menunjukkan skema dan data tabel sync_kafka_users setelah sinkronisasi data.
Skema Tabel
Klik dua kali nama tabel sync_kafka_users untuk melihat skema tabel.
CatatanSaat mengembangkan pekerjaan sinkronisasi data, kami sarankan Anda mendeklarasikan bidang partisi dan offset Kafka sebagai kunci utama untuk tabel Hologres. Dengan cara ini, jika data dikirim ulang karena kegagalan penyebaran, hanya satu salinan data dengan nilai partisi dan offset yang sama yang disimpan.
Data Tabel
Di sudut kanan atas halaman untuk tabel sync_kafka_users, klik Query table. Di editor SQL, salin dan tempel pernyataan berikut dan klik Running.
SELECT * FROM public.sync_kafka_users order by partition, "offset";Gambar berikut menunjukkan data tabel sync_kafka_users.

Langkah 6: Periksa apakah perubahan skema tabel disinkronkan secara otomatis
Di konsol ApsaraMQ for Kafka, kirim pesan yang berisi kolom baru.
Masuk ke ApsaraMQ for Kafka console.
Di halaman Instances, klik nama instansi target.
Di panel navigasi sisi kiri halaman yang muncul, klik Topics. Di halaman yang muncul, temukan topik bernama users.
Klik Send Message di kolom Actions.
Di panel Start to Send and Consume Message, konfigurasikan parameter sebagai berikut.

Item konfigurasi
Contoh
Method of Sending
Pilih Console.
Message Key
Masukkan flinktest.
Message Content
Salin dan tempel konten JSON berikut ke bidang Isi Pesan.
{ "id": 100001, "first_name": "Dennise", "last_name": "Schuppe", "address": { "country": "Isle of Man", "state": "Montana", "city": "East Coleburgh" }, "house-points": { "house": "Pukwudgie", "points": 76 } }CatatanDalam kode JSON di atas, house-points adalah kolom bersarang baru.
Send to Specified Partition
Pilih Yes.
Partition ID
Masukkan 0.
Klik OK.
Di konsol Hologres, lihat perubahan pada skema dan data tabel sync_kafka_users.
Masuk ke Konsol Hologres.
Di halaman Instances, klik nama instansi target.
Di sudut kanan atas halaman, klik Connect to Instance.
Di tab Metadata Management, klik dua kali nama tabel sync_kafka_users.
Di sudut kanan atas halaman untuk tabel sync_kafka_users, klik Query table. Di editor SQL, masukkan pernyataan berikut dan klik Running.
SELECT * FROM public.sync_kafka_users order by partition, "offset";Lihat data tabel.
Gambar berikut menunjukkan data tabel sync_kafka_users.

Gambar tersebut menunjukkan bahwa catatan data dengan id 100001 ditulis ke Hologres. Selain itu, kolom house-points.house dan house-points.points ditambahkan ke Hologres.
CatatanHanya data di kolom bersarang house-points yang termasuk dalam data yang dimasukkan ke tabel ApsaraMQ for Kafka. Namun, json.infer-schema.flatten-nested-columns.enable ditentukan dalam klausa WITH untuk membuat tabel Kafka users. Dalam kasus ini, Realtime Compute for Apache Flink secara otomatis memperluas kolom bersarang baru. Setelah kolom diperluas, jalur untuk mengakses kolom digunakan sebagai nama kolom.
Referensi
Untuk informasi tentang meningkatkan kinerja pekerjaan dengan memodifikasi paralelisme node dan sumber daya, lihat Konfigurasikan Penyebaran.