All Products
Search
Document Center

Realtime Compute for Apache Flink:Panduan cepat dengan ingesti log waktu nyata

Last Updated:Mar 19, 2026

Topik ini menjelaskan cara menggunakan Realtime Compute for Apache Flink untuk menyinkronkan data log dari ApsaraMQ for Kafka ke Hologres secara waktu nyata. Di akhir tutorial ini, Anda akan memiliki pekerjaan Flink SQL yang berjalan dan terus-menerus mengalirkan catatan pengguna ke gudang data Hologres—serta secara otomatis menyesuaikan diri ketika skema sumber berubah.

Ikhtisar arsitektur

Pipeline mengikuti alur data berikut:

[Faker Connector] → [ApsaraMQ for Kafka] → [Realtime Compute for Apache Flink] → [Hologres]

Konektor Faker menghasilkan catatan pengguna sintetis dan menuliskannya ke topik ApsaraMQ for Kafka. Realtime Compute for Apache Flink membaca dari topik tersebut, mentransformasi data menggunakan Flink SQL, lalu menuliskan hasilnya ke gudang data Hologres. Ketiga layanan tersebut harus berada dalam VPC yang sama.

Langkah-langkah dalam tutorial ini

  1. Konfigurasikan daftar putih alamat IP

  2. Siapkan data uji untuk instans ApsaraMQ for Kafka

  3. Buat katalog Hologres (hanya metode CTAS)

  4. Kembangkan dan mulai pekerjaan sinkronisasi data

  5. Lihat hasil sinkronisasi data penuh

  6. Periksa apakah perubahan skema tabel disinkronkan secara otomatis

Prasyarat

Sebelum memulai, pastikan hal-hal berikut telah tersedia:

Penting

Instans ApsaraMQ for Kafka dan Hologres Anda harus berada dalam VPC yang sama dengan ruang kerja Flink Anda. Ini merupakan persyaratan jaringan wajib—jika layanan tidak berada dalam VPC yang sama, akan terjadi error koneksi. Jika tidak berada dalam VPC yang sama, Anda harus membuat koneksi di antara layanan tersebut sebelum melanjutkan. Untuk informasi selengkapnya, lihat Bagaimana Realtime Compute for Apache Flink mengakses layanan lintas VPC? atau Bagaimana Realtime Compute for Apache Flink mengakses Internet?

Langkah 1: Konfigurasikan daftar putih alamat IP

Untuk mengizinkan ruang kerja Flink Anda mengakses instans Kafka dan Hologres, tambahkan Blok CIDR vSwitch tempat ruang kerja Flink berada ke daftar putih kedua instans tersebut.

  1. Dapatkan Blok CIDR vSwitch tempat ruang kerja Flink Anda berada.

    1. Masuk ke Konsol Realtime Compute for Apache Flink.

    2. Temukan ruang kerja target dan pilih More > Workspace Details pada kolom Actions.

    3. Pada kotak dialog Workspace Details, salin CIDR block vSwitch.

    vSwitch CIDR block in Workspace Details dialog
  2. Tambahkan Blok CIDR ke daftar putih alamat IP instans Kafka Anda.

    Tambahkan Blok CIDR ke daftar putih untuk titik akhir dengan jaringan VPC.

    Kafka allowlist configuration
  3. Tambahkan Blok CIDR ke daftar putih alamat IP instans Hologres Anda.

    Hologres allowlist configuration

Langkah 2: Siapkan data uji untuk instans ApsaraMQ for Kafka

Gunakan konektor Faker Realtime Compute for Apache Flink sebagai generator data dan tulis data tersebut ke instans ApsaraMQ for Kafka Anda.

Catatan

Anda memerlukan nilai titik akhir properties.bootstrap.servers untuk langkah ini dan juga pada Langkah 4. Untuk mendapatkannya: buka konsol ApsaraMQ for Kafka, klik nama instans target, temukan bagian Endpoint Information pada halaman Instance Details, cari titik akhir dengan jaringan VPC, lalu salin nilai pada kolom Domain Name. Formatnya adalah host:port,host:port,host:port.

  1. Buat topik bernama users di konsol ApsaraMQ for Kafka.

  2. Kembangkan pekerjaan yang menulis data ke topik Kafka Anda.

    1. Masuk ke konsol manajemen Realtime Compute for Apache Flink.

    2. Temukan ruang kerja target dan klik Console pada kolom Actions.

    3. Pada panel navigasi kiri, pilih Development > ETL. Pada halaman yang muncul, klik New.

    4. Pada kotak dialog New Draft, pilih Blank Stream Draft lalu klik Next. Konfigurasikan draft:

      Item konfigurasi

      Contoh

      Deskripsi

      Name

      kafka-data-input

      Nama draft SQL. Catatan: Nama draft harus unik dalam namespace saat ini.

      Location

      Development

      Folder tempat file kode draft disimpan. Secara default, file kode draft disimpan di folder Development. Anda juga dapat mengklik ikon di sebelah kanan folder yang sudah ada untuk membuat subfolder.

      Engine Version

      vvr-8.0.11-flink-1.17

      Pilih versi engine untuk draft dari daftar drop-down.

    5. Klik Create.

    6. Salin dan tempel potongan kode berikut ke editor SQL, lalu ganti nilai placeholder untuk properties.bootstrap.servers dengan titik akhir Kafka aktual Anda (lihat catatan di atas).

      CREATE TEMPORARY TABLE source ( id INT, first_name STRING, last_name STRING, `address` ROW<`country` STRING, `state` STRING, `city` STRING>, event_time TIMESTAMP ) WITH ( 'connector' = 'faker', 'number-of-rows' = '100', 'rows-per-second' = '10', 'fields.id.expression' = '#{number.numberBetween ''0'',''1000''}', 'fields.first_name.expression' = '#{name.firstName}', 'fields.last_name.expression' = '#{name.lastName}', 'fields.address.country.expression' = '#{Address.country}', 'fields.address.state.expression' = '#{Address.state}', 'fields.address.city.expression' = '#{Address.city}', 'fields.event_time.expression' = '#{date.past ''15'',''SECONDS''}' ); CREATE TEMPORARY TABLE sink ( id INT, first_name STRING, last_name STRING, `address` ROW<`country` STRING, `state` STRING, `city` STRING>, `timestamp` TIMESTAMP METADATA ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092', 'topic' = 'users', 'format' = 'json' ); INSERT INTO sink SELECT * FROM source;

      Item konfigurasi

      Contoh

      Deskripsi

      properties.bootstrap.servers

      alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092

      Titik akhir broker Kafka. Format: host:port,host:port,host:port. Lihat catatan di atas untuk instruksi pengambilannya.

      topic

      users

      Nama topik Kafka.

  3. Jalankan pekerjaan.

    1. Di pojok kanan atas editor SQL, klik Deploy.

    2. Pada kotak dialog Deploy draft, klik Confirm.

    3. Konfigurasikan sumber daya untuk pekerjaan.

    4. Buka O&M > Deployments, temukan penerapan target, lalu klik Start pada kolom Actions. Untuk informasi tentang parameter yang harus dikonfigurasi saat memulai penerapan, lihat Mulai penerapan.

    5. Pada halaman Deployments, lihat status penerapan.

      Deployment state showing RUNNING then FINISHED

      Konektor Faker menyediakan aliran data terbatas, sehingga penerapan berpindah ke status FINISHED sekitar satu menit setelah mencapai status RUNNING. Saat selesai, data telah ditulis ke topik Kafka tujuan. Berikut contoh pesan berformat JSON yang ditulis ke ApsaraMQ for Kafka:

      { "id": 765, "first_name": "Barry", "last_name": "Pollich", "address": { "country": "United Arab Emirates", "state": "Nevada", "city": "Powlowskifurt" } }

Langkah 3: Buat katalog Hologres

Catatan

Langkah ini hanya diperlukan untuk metode CTAS saja. Jika Anda berencana menggunakan metode INSERT INTO pada Langkah 4, lewati langkah ini dan langsung lanjut ke Langkah 4.

Untuk sinkronisasi tabel tunggal menggunakan CTAS, buat katalog Hologres sebagai katalog tujuan di konsol pengembangan Realtime Compute for Apache Flink. Bagian ini mencakup item konfigurasi penting. Untuk detail lengkap, lihat Buat katalog Hologres.

Item konfigurasi

Deskripsi

catalog name

Masukkan nama kustom; contoh ini menggunakan holo.

endpoint

Titik akhir instans Hologres Anda.

username

ID AccessKey Akun Alibaba Cloud Anda.

password

Secret AccessKey Akun Alibaba Cloud Anda.

dbname

Masukkan nama database yang sudah ada di Hologres. Contoh ini menggunakan flink_test_db. Penting: Pastikan database flink_test_db telah dibuat di instans Hologres Anda sebelum melanjutkan. Jika tidak, akan terjadi error. Untuk informasi selengkapnya, lihat Buat database.

Langkah 4: Kembangkan dan mulai pekerjaan sinkronisasi data

Pilih metode sinkronisasi

Sebelum menulis kode, pilih metode yang sesuai dengan kasus penggunaan Anda:

CTAS

INSERT INTO

Pembuatan tabel

Flink membuat tabel Hologres secara otomatis

Anda membuat tabel secara manual di Hologres terlebih dahulu

Penanganan JSON

Kolom bersarang diekspansi otomatis (json.infer-schema.flatten-nested-columns.enable = true)

Anda dapat memetakan JSON bersarang langsung ke kolom JSONB native

Evolusi skema

Pembaruan skema Hologres dilakukan secara otomatis ketika kolom bersarang baru muncul

Pembaruan skema manual diperlukan

Memerlukan Langkah 3

Ya — katalog Hologres harus dibuat terlebih dahulu

Tidak — koneksi ditentukan langsung dalam klausa WITH

Paling cocok untuk

Pengaturan cepat; pipeline fleksibel terhadap skema

Kontrol penuh atas tipe kolom; optimalisasi JSONB

Buat pekerjaan sinkronisasi

  1. Masuk ke konsol manajemen Realtime Compute for Apache Flink.

  2. Temukan ruang kerja target dan klik Console pada kolom Actions.

  3. Pada panel navigasi kiri, pilih Development > ETL. Pada halaman yang muncul, klik New.

  4. Pada kotak dialog New Draft, pilih Blank Stream Draft lalu klik Next. Konfigurasikan draft:

    Item konfigurasi

    Contoh

    Deskripsi

    Name

    flink-quickstart-test

    Nama draft SQL. Catatan: Nama draft harus unik dalam namespace saat ini.

    Location

    Development

    Folder tempat file kode draft disimpan. Secara default, file kode draft disimpan di folder Development. Anda juga dapat mengklik ikon di sebelah kanan folder yang sudah ada untuk membuat subfolder.

    Engine Version

    vvr-8.0.11-flink-1.17

    Pilih versi engine untuk draft dari daftar drop-down.

  5. Klik Create.

  6. Salin dan tempel salah satu potongan kode berikut ke editor SQL lalu ganti nilai placeholder dengan nilai aktual Anda.

Metode 1: CTAS

Pernyataan CREATE TABLE AS (CTAS) secara otomatis membuat tabel sync_kafka_users di Hologres, sehingga tidak perlu menentukan tipe kolom JSON atau JSONB secara manual.

CREATE TEMPORARY TABLE kafka_users ( `id` INT NOT NULL, `address` STRING, `offset` BIGINT NOT NULL METADATA, `partition` BIGINT NOT NULL METADATA, `timestamp` TIMESTAMP METADATA, `date` AS CAST(`timestamp` AS DATE), `country` AS JSON_VALUE(`address`, '$.country'), PRIMARY KEY (`partition`, `offset`) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092', 'topic' = 'users', 'format' = 'json', 'json.infer-schema.flatten-nested-columns.enable' = 'true', -- Ekspansi kolom bersarang secara otomatis. 'scan.startup.mode' = 'earliest-offset' ); CREATE TABLE IF NOT EXISTS holo.flink_test_db.sync_kafka_users WITH ( 'connector' = 'hologres' ) AS TABLE kafka_users;
Catatan

Mendeklarasikan partition dan offset sebagai kunci primer mencegah data duplikat setelah failover pekerjaan. Jika data ditransmisikan ulang, Hologres hanya menyimpan satu salinan data dengan nilai partition dan offset yang sama.

Ganti nilai placeholder:

Item konfigurasi

Contoh

Deskripsi

properties.bootstrap.servers

alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092

Titik akhir broker Kafka. Format: host:port,host:port,host:port. Untuk instruksi pengambilannya, lihat catatan di Langkah 2.

topic

users

Nama topik Kafka.

Metode 2: INSERT INTO

Gunakan pernyataan INSERT INTO ketika Anda ingin menyimpan data JSON bersarang sebagai kolom JSONB native di Hologres. Metode ini mengharuskan Anda membuat tabel sync_kafka_users di Hologres secara manual sebelum menjalankan pekerjaan.

CREATE TEMPORARY TABLE kafka_users ( `id` INT NOT NULL, 'address' STRING, -- Data dalam kolom ini adalah data JSON bersarang. `offset` BIGINT NOT NULL METADATA, `partition` BIGINT NOT NULL METADATA, `timestamp` TIMESTAMP METADATA, `date` AS CAST(`timestamp` AS DATE), `country` AS JSON_VALUE(`address`, '$.country') ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092', 'topic' = 'users', 'format' = 'json', 'json.infer-schema.flatten-nested-columns.enable' = 'true', -- Ekspansi kolom bersarang secara otomatis. 'scan.startup.mode' = 'earliest-offset' ); CREATE TEMPORARY TABLE holo ( `id` INT NOT NULL, `address` STRING, `offset` BIGINT, `partition` BIGINT, `timestamp` TIMESTAMP, `date` DATE, `country` STRING ) WITH ( 'connector' = 'hologres', 'endpoint' = 'hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80', 'username' = '************************', 'password' = '******************************', 'dbname' = 'flink_test_db', 'tablename' = 'sync_kafka_users' ); INSERT INTO holo SELECT * FROM kafka_users;

Ganti nilai placeholder:

Item konfigurasi

Contoh

Deskripsi

properties.bootstrap.servers

alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092

Titik akhir broker Kafka. Format: host:port,host:port,host:port. Untuk instruksi pengambilannya, lihat catatan di Langkah 2.

topic

users

Nama topik Kafka.

endpoint

hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80

Titik akhir instans Hologres. Format: <ip>:<port>. Untuk mendapatkan nilai ini: buka konsol Hologres, klik nama instans Anda, lalu pada bagian Network Information temukan titik akhir untuk Select VPC.

username

************************

ID AccessKey Akun Alibaba Cloud Anda. Penting: Untuk melindungi kredensial Anda, hindari hardcoding pasangan AccessKey dalam teks biasa; gunakan variabel sebagai gantinya. Untuk informasi selengkapnya, lihat Kelola variabel.

password

******************************

Secret AccessKey Akun Alibaba Cloud Anda.

dbname

flink_test_db

Nama database Hologres.

tablename

sync_kafka_users

Nama tabel Hologres. Catatan: Jika Anda menggunakan INSERT INTO, Anda harus membuat tabel sync_kafka_users dan menentukan bidang yang diperlukan di database Hologres tujuan terlebih dahulu. Jika skema public tidak digunakan, tentukan tablename dalam format schema.tableName.

Terapkan dan jalankan pekerjaan

  1. Simpan draft.

  2. Klik Deploy.

  3. Buka O&M > Deployments, temukan penerapan target, lalu klik Start pada kolom Actions. Untuk informasi tentang parameter yang harus dikonfigurasi saat memulai penerapan, lihat Mulai penerapan.

  4. Pada halaman Deployments, lihat status dan detail penerapan.

    Deployment state on the Deployments page

Langkah 5: Lihat hasil sinkronisasi data penuh

Setelah pekerjaan sinkronisasi mencapai status RUNNING, verifikasi bahwa data telah ditulis ke Hologres.

  1. Masuk ke Konsol Hologres.

  2. Pada halaman Instances, klik nama instans target.

  3. Di pojok kanan atas halaman, klik Connect to Instance.

  4. Pada tab Metadata Management, lihat skema dan data tabel sync_kafka_users.

    sync_kafka_users table in Metadata Management
    • Skema tabel: Klik dua kali nama tabel sync_kafka_users untuk melihat skema tabel.

      sync_kafka_users table schema
      Catatan

      Deklarasikan bidang partition dan offset Kafka sebagai kunci primer untuk tabel Hologres. Jika data ditransmisikan ulang karena failover penerapan, hanya satu salinan data dengan nilai partition dan offset yang sama yang disimpan.

    • Data tabel: Di pojok kanan atas halaman tabel sync_kafka_users, klik Query table. Di editor SQL, jalankan pernyataan berikut lalu klik Running:

      SELECT * FROM public.sync_kafka_users order by partition, "offset";

      Anda seharusnya melihat 100 baris data, sesuai dengan pengaturan 'number-of-rows' = '100' dari konektor Faker.

      sync_kafka_users table data

Langkah 6: Periksa apakah perubahan skema tabel disinkronkan secara otomatis

Langkah ini menunjukkan evolusi skema: Anda mengirim pesan Kafka yang berisi kolom bersarang baru dan memverifikasi bahwa Hologres secara otomatis menambahkan kolom yang sesuai.

  1. Di konsol ApsaraMQ for Kafka, kirim pesan yang berisi kolom baru.

    1. Masuk ke Konsol ApsaraMQ for Kafka.

    2. Pada halaman Instances, klik nama instans target.

    3. Pada panel navigasi kiri, klik Topics. Temukan topik bernama users.

    4. Klik Send Message pada kolom Actions.

    5. Pada panel Start to Send and Consume Message, konfigurasikan parameter sebagai berikut.

      Send Message panel configuration

      Item konfigurasi

      Contoh

      Method of Sending

      Pilih Console.

      Message Key

      Masukkan flinktest.

      Message Content

      Salin dan tempel konten JSON berikut ke field Message Content. Catatan: house-points adalah kolom bersarang baru dalam pesan ini.

      Send to Specified Partition

      Pilih Yes.

      Partition ID

      Masukkan 0.

      { "id": 100001, "first_name": "Dennise", "last_name": "Schuppe", "address": { "country": "Isle of Man", "state": "Montana", "city": "East Coleburgh" }, "house-points": { "house": "Pukwudgie", "points": 76 } }
    6. Klik OK.

  2. Di konsol Hologres, lihat perubahan skema dan data pada tabel sync_kafka_users.

    1. Masuk ke Konsol Hologres.

    2. Pada halaman Instances, klik nama instans target.

    3. Di pojok kanan atas halaman, klik Connect to Instance.

    4. Pada tab Metadata Management, klik dua kali nama tabel sync_kafka_users.

    5. Di pojok kanan atas halaman tabel sync_kafka_users, klik Query table. Di editor SQL, jalankan pernyataan berikut lalu klik Running:

      SELECT * FROM public.sync_kafka_users order by partition, "offset";
    6. Lihat data tabel.

      sync_kafka_users table data after schema change

      Hasil menunjukkan bahwa catatan data dengan id 100001 ditulis ke Hologres. Kolom house-points.house dan house-points.points juga ditambahkan ke Hologres secara otomatis.

      Catatan

      Karena json.infer-schema.flatten-nested-columns.enable diatur ke true dalam klausa WITH untuk tabel Kafka users, Realtime Compute for Apache Flink secara otomatis mengekspansi kolom bersarang baru saat muncul. Jalur yang digunakan untuk mengakses setiap kolom menjadi nama kolomnya.

Ringkasan dan langkah selanjutnya

Anda telah berhasil membangun pipeline ingesti log waktu nyata yang mengalirkan catatan pengguna dari ApsaraMQ for Kafka ke Hologres menggunakan Realtime Compute for Apache Flink—dengan evolusi skema otomatis yang diaktifkan.

Dalam tutorial ini, Anda:

  • Mengonfigurasi daftar putih jaringan agar Flink dapat mengakses Kafka dan Hologres.

  • Menyebarluaskan generator data berbasis Faker yang menulis 100 catatan sintetis ke topik Kafka.

  • Membuat pekerjaan Flink SQL yang terus-menerus menyinkronkan data dari Kafka ke Hologres.

  • Memverifikasi bahwa perubahan skema (kolom bersarang baru) secara otomatis disebarkan ke Hologres.

Untuk melangkah lebih jauh, jelajahi topik-topik berikut:

Referensi