All Products
Search
Document Center

Realtime Compute for Apache Flink:Ingesti log waktu nyata

Last Updated:Apr 29, 2026

Topik ini menjelaskan cara membangun pekerjaan sinkronisasi data dari Kafka ke Hologres secara cepat untuk mengingesti data log waktu nyata menggunakan konsol Realtime Compute for Apache Flink.

Prasyarat

  • Pastikan pengguna RAM atau peran RAM Anda memiliki izin yang diperlukan untuk mengakses konsol Realtime Compute for Apache Flink. Untuk informasi selengkapnya, lihat Izin.

  • Anda telah membuat ruang kerja Flink. Untuk informasi selengkapnya, lihat Buat ruang kerja.

  • Penyimpanan hulu dan hilir

    Catatan

    Instans ApsaraMQ for Kafka dan Hologres harus berada di Wilayah dan VPC yang sama dengan ruang kerja Realtime Compute for Apache Flink Anda. Jika tidak, Anda harus menetapkan konektivitas jaringan. Untuk informasi selengkapnya, lihat Akses layanan lintas VPC atau Akses internet.

Langkah 1: Konfigurasikan daftar putih IP

Untuk mengizinkan Flink mengakses instans Kafka dan Hologres Anda, tambahkan Blok CIDR ruang kerja Flink ke daftar putih IP Kafka dan Hologres.

  1. Dapatkan Blok CIDR VPC ruang kerja Flink Anda.

    1. Masuk ke konsol Realtime Compute for Apache Flink.

    2. Pada kolom Actions dari workspace target, pilih More > Workspace Details.

    3. Pada kotak dialog Workspace Details, temukan CIDR block VSwitch.

      Kotak dialog ini menampilkan informasi dasar ruang kerja dan daftar VSwitch. Pada kolom CIDR block, lihat blok CIDR untuk setiap zona ketersediaan. Catat blok CIDR ini untuk langkah berikutnya.

  2. Tambahkan Blok CIDR ruang kerja Flink ke daftar putih IP instans Kafka Anda.

    Anda harus mengonfigurasi daftar putih IP untuk titik akhir VPC. Untuk langkah-langkah selengkapnya, lihat Konfigurasi daftar putih IP. Di kotak dialog pengeditan daftar putih, klik Add Whitelist IP untuk menambahkan Blok CIDR.

  3. Tambahkan Blok CIDR ruang kerja Flink ke daftar putih IP instans Hologres Anda.

    Masuk ke instans Hologres Anda dan konfigurasikan daftar putih IP-nya. Untuk langkah-langkah selengkapnya, lihat daftar putih IP. Pada halaman konfigurasi daftar putih di Pusat Keamanan HoloWeb, masukkan Blok CIDR pada bidang IP Address dalam kotak dialog Edit IP Whitelist, lalu klik OK.

Langkah 2: Siapkan data uji Kafka

Gunakan konektor Faker Realtime Compute for Apache Flink untuk menghasilkan dan menulis data ke ApsaraMQ for Kafka. Ikuti langkah-langkah berikut di konsol pengembangan Realtime Compute for Apache Flink.

  1. Di konsol ApsaraMQ for Kafka, buat topik bernama users.

    Untuk informasi selengkapnya, lihat Buat topik.

  2. Buat pekerjaan untuk menulis data ke ApsaraMQ for Kafka.

    1. Masuk ke konsol pengembangan Realtime Compute for Apache Flink.

    2. Pada kolom Actions ruang kerja target, klik Console.

    3. Di panel navigasi sebelah kiri, pilih Development > ETL.

    4. Klik ikon image, lalu klik New Stream Draft. Masukkan File Name dan pilih Engine Version.

      Realtime Compute for Apache Flink juga menyediakan berbagai templat kode dan sinkronisasi data. Setiap templat mencakup kasus penggunaan spesifik, contoh kode, dan instruksi. Klik templat untuk mempelajari fitur dan sintaks Realtime Compute for Apache Flink secara cepat serta menerapkan logika bisnis Anda. Untuk informasi selengkapnya, lihat Templat kode dan Templat sinkronisasi data.

      Parameter

      Deskripsi

      Contoh

      File Name

      Nama pekerjaan.

      Catatan

      Nama pekerjaan harus unik dalam ruang kerja saat ini.

      flink-test

      Engine Version

      Versi engine Flink untuk pekerjaan saat ini.

      Pilih versi yang diberi label Recommended atau Stable untuk keandalan dan kinerja yang lebih tinggi. Untuk informasi selengkapnya tentang versi engine, lihat Catatan rilis dan Versi engine.

      vvr-8.0.8-flink-1.17

    5. Klik Create.

    6. Tulis pernyataan SQL untuk pekerjaan tersebut.

      Salin kode berikut ke editor dan modifikasi parameter sesuai lingkungan Anda.

      CREATE TEMPORARY TABLE source (
        id INT,
        first_name STRING,
        last_name STRING,
        `address` ROW<`country` STRING, `state` STRING, `city` STRING>,
        event_time TIMESTAMP
      ) WITH (
        'connector' = 'faker',
        'number-of-rows' = '100',
        'rows-per-second' = '10',
        'fields.id.expression' = '#{number.numberBetween ''0'',''1000''}',
        'fields.first_name.expression' = '#{name.firstName}',
        'fields.last_name.expression' = '#{name.lastName}',
        'fields.address.country.expression' = '#{Address.country}',
        'fields.address.state.expression' = '#{Address.state}',
        'fields.address.city.expression' = '#{Address.city}',
        'fields.event_time.expression' = '#{date.past ''15'',''SECONDS''}'
      );
      
      CREATE TEMPORARY TABLE sink (
        id INT,
        first_name STRING,
        last_name STRING,
        `address` ROW<`country` STRING, `state` STRING, `city` STRING>,
        `timestamp` TIMESTAMP METADATA
      ) WITH (
        'connector' = 'kafka',
        'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092',
        'topic' = 'users',
        'format' = 'json',
        'properties.enable.idempotence'='false'
      );
      
      INSERT INTO sink SELECT * FROM source;

      Tabel berikut menjelaskan parameter yang perlu Anda modifikasi.

      Parameter

      Nilai contoh

      Deskripsi

      properties.bootstrap.servers

      alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092

      Titik akhir broker ApsaraMQ for Kafka.

      Daftar titik akhir broker yang dipisahkan koma dalam format 'host:port'. Anda dapat menemukan titik akhir Domain Name VPC di bagian Endpoint Information pada halaman Instance Details.

      topic

      users

      Nama topik ApsaraMQ for Kafka.

  3. Jalankan pekerjaan.

    1. Pada halaman Development > ETL, klik Deploy.

    2. Pada kotak dialog Deploy draft, klik Confirm.

    3. Konfigurasikan sumber daya untuk pekerjaan. Untuk informasi selengkapnya, lihat Konfigurasikan sumber daya untuk pekerjaan.

    4. Pada halaman O&M > Deployments, temukan penyebaran target, lalu klik Start pada kolom Actions. Untuk informasi selengkapnya mengenai konfigurasi startup, lihat Mulai penyebaran.

    5. Pada halaman Deployments, Anda dapat memantau informasi waktu proses dan status penyebaran.

      Karena sumber Faker menghasilkan aliran terbatas, status penyebaran berubah menjadi FINISHED sekitar satu menit setelah dimulai. Saat penyebaran selesai, data telah ditulis ke topik 'users'. Kode berikut menunjukkan contoh data berformat JSON.

      {
        "id": 765,
        "first_name": "Barry",
        "last_name": "Pollich",
        "address": {
          "country": "United Arab Emirates",
          "state": "Nevada",
          "city": "Powlowskifurt"
        }
      }

Langkah 3: Buat dan jalankan pekerjaan sinkronisasi data

Flink CDC

  1. Masuk ke konsol pengembangan Realtime Compute for Apache Flink untuk membuat pekerjaan sinkronisasi data.

    1. Masuk ke konsol Realtime Compute for Apache Flink.

    2. Pada kolom Actions ruang kerja target, klik Console.

    3. Di panel navigasi sebelah kiri, pilih Development > ETL.

    4. Klik ikon image lalu klik New ETL Draft. Masukkan name dan pilih engine version.

      Parameter

      Deskripsi

      Contoh

      Name

      Nama pekerjaan.

      Catatan

      Nama pekerjaan harus unik dalam proyek.

      flink-test

      Engine Version

      Versi engine Flink untuk pekerjaan.

      Pilih versi yang diberi label Recommended atau Stable. Versi ini memberikan keandalan dan kinerja yang lebih tinggi. Untuk informasi selengkapnya tentang versi engine, lihat Catatan rilis dan Versi engine.

      vvr-8.0.8-flink-1.17

    5. Klik Create.

  2. Tulis pekerjaan Flink CDC. Salin kode berikut ke editor dan perbarui parameter sesuai lingkungan Anda.

    Pekerjaan berikut menyinkronkan data tabel berformat JSON dari topik users di Kafka ke tabel users dalam skema test_schema di database flink_test_db di Hologres.

    source:
      type: kafka
      name: Kafka Source
      properties.bootstrap.servers: alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092
      topic: users
      scan.startup.mode: earliest-offset
      value.format: json
      json.infer-schema.flatten-nested-columns.enable: true
    
    sink:
      type: hologres
      name: Hologres Sink
      endpoint: hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80
      dbname: flink_test_db
      username: ******
      password: **
      sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT
    
    transform:
      - source-table: \.*.\.*
        projection: \*
        primary-keys: id
        
    route:
      - source-table: users
        sink-table: test_schema.users

    Tabel berikut menjelaskan parameter yang perlu dimodifikasi.

    Parameter

    Contoh

    Deskripsi

    properties.bootstrap.servers

    alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092

    Alamat broker Kafka.

    Formatnya adalah daftar entri host:port yang dipisahkan koma. Anda dapat memperoleh Domain Name Endpoint untuk jenis jaringan VPC dari bagian Network Information pada halaman instance details.

    topic

    users

    Nama topik Kafka.

    endpoint

    hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80

    Titik akhir instans Hologres.

    Formatnya adalah <ip>:<port>. Anda dapat memperoleh titik akhir VPC dari bagian Network Information pada halaman instance details di konsol Hologres.

    username

    **

    Username dan password untuk database Hologres. Masukkan ID AccessKey dan AccessKey Secret Akun Alibaba Cloud Anda.

    Penting

    Untuk mencegah kebocoran pasangan AccessKey Anda, gunakan manajemen variabel untuk menentukan ID AccessKey dan AccessKey Secret Anda. Untuk informasi selengkapnya, lihat Kelola variabel.

    password

    **

    dbname

    flink_test_db

    Nama database Hologres.

    source-table

    users

    Tabel sumber. Secara default, nama topik digunakan.

    sink-table

    test_schema.users

    Tabel tujuan. Tentukan tabel dalam format schema.table_name.

  3. Klik Save.

  4. Pada halaman Development > ETL, klik Deploy.

  5. Pada halaman O&M > Deployments, temukan penyebaran target dan klik Start pada kolom Actions. Untuk informasi selengkapnya tentang konfigurasi startup pekerjaan, lihat Jalankan pekerjaan.

    Setelah pekerjaan dimulai, Anda dapat melihat informasi waktu proses dan statusnya di halaman Deployments. Halaman ini menampilkan daftar penyebaran dengan metrik seperti Status, health score, CPU, dan memory, serta menyediakan aksi seperti Start dan Stop.

SQL

  1. Masuk ke konsol pengembangan Realtime Compute for Apache Flink untuk membuat pekerjaan sinkronisasi data.

    1. Masuk ke konsol Realtime Compute for Apache Flink.

    2. Pada kolom Actions ruang kerja target, klik Console.

    3. Di panel navigasi sebelah kiri, pilih Development > ETL, lalu klik New.

    4. Klik ikon image lalu klik New Stream Draft. Masukkan name dan pilih engine version.

      Parameter

      Deskripsi

      Contoh

      Name

      Nama pekerjaan.

      Catatan

      Nama pekerjaan harus unik dalam proyek.

      flink-test

      Engine Version

      Versi engine Flink untuk pekerjaan.

      Pilih versi yang diberi label Recommended atau Stable. Versi ini memberikan keandalan dan kinerja yang lebih tinggi. Untuk informasi selengkapnya tentang versi engine, lihat Catatan rilis dan Versi engine.

      vvr-8.0.8-flink-1.17

    5. Klik Create.

  2. Tulis pekerjaan SQL. Salin kode berikut ke editor SQL dan perbarui parameter sesuai lingkungan Anda.

    Anda dapat menggunakan pernyataan INSERT INTO untuk menyinkronkan data dari topik users di Kafka ke tabel users di database flink_test_db Hologres.

    Hologres menyediakan optimasi khusus untuk tipe data JSON dan JSONB. Anda dapat menggunakan pernyataan INSERT INTO untuk menulis data JSON bersarang ke Hologres.

    Metode ini mengharuskan Anda terlebih dahulu membuat tabel users di Hologres, lalu menjalankan pernyataan SQL berikut untuk menulis data ke tabel tersebut.

    CREATE TEMPORARY TABLE kafka_users (
      `id` INT NOT NULL,
      `address` STRING, -- Data dalam kolom ini adalah JSON bersarang.
      `offset` BIGINT NOT NULL METADATA,
      `partition` BIGINT NOT NULL METADATA,
      `timestamp` TIMESTAMP METADATA,
      `date` AS CAST(`timestamp` AS DATE),
      `country` AS JSON_VALUE(`address`, '$.country')
    ) WITH (
      'connector' = 'kafka',
      'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092',
      'topic' = 'users',
      'format' = 'json',
      'json.infer-schema.flatten-nested-columns.enable' = 'true', -- Secara otomatis memperluas kolom bersarang.
      'scan.startup.mode' = 'earliest-offset'
    );
    
    CREATE TEMPORARY TABLE holo (
      `id` INT NOT NULL,
      `address` STRING,
      `offset` BIGINT,
      `partition` BIGINT,
      `timestamp` TIMESTAMP,
      `date` DATE,
      `country` STRING
    ) WITH (
      'connector' = 'hologres',
      'endpoint' = 'hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80',
      'username' = '******',
      'password' = '******',
      'dbname' = 'flink_test_db',
      'tablename' = 'users'
    );
    
    INSERT INTO holo
    SELECT * FROM kafka_users;

    Tabel berikut menjelaskan parameter yang perlu dimodifikasi.

    Parameter

    Contoh

    Deskripsi

    properties.bootstrap.servers

    alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092

    Alamat broker Kafka.

    Formatnya adalah daftar entri host:port yang dipisahkan koma. Anda dapat memperoleh Domain Name Endpoint untuk jenis jaringan VPC dari bagian Network Information pada halaman instance details.

    topic

    users

    Nama topik Kafka.

    endpoint

    hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80

    Titik akhir instans Hologres.

    Formatnya adalah <ip>:<port>. Anda dapat memperoleh titik akhir VPC dari bagian Network Information pada halaman instance details di konsol Hologres.

    username

    ******

    Username dan password untuk database Hologres. Masukkan ID AccessKey dan AccessKey Secret Akun Alibaba Cloud Anda.

    Penting

    Untuk mencegah kebocoran pasangan AccessKey Anda, gunakan manajemen variabel untuk menentukan ID AccessKey dan AccessKey Secret Anda. Untuk informasi selengkapnya, lihat Kelola variabel.

    password

    ******

    dbname

    flink_test_db

    Nama database Hologres.

    tablename

    users

    Nama tabel Hologres.

    Catatan
    • Jika Anda menggunakan pernyataan INSERT INTO untuk menyinkronkan data, Anda harus membuat tabel users beserta kolom-kolomnya di database tujuan terlebih dahulu.

    • Jika skema bukan public, Anda harus menentukan parameter tablename dalam format schema.table_name.

  3. Klik Save.

  4. Pada halaman Development > ETL, klik Deploy.

  5. Pada halaman O&M > Deployments, temukan penyebaran target dan klik Start pada kolom Actions. Untuk informasi selengkapnya tentang konfigurasi startup pekerjaan, lihat Jalankan pekerjaan.

    Setelah pekerjaan dimulai, Anda dapat melihat informasi waktu proses dan statusnya di halaman Deployments. Halaman ini menampilkan daftar penyebaran dengan metrik seperti Status, health score, CPU, dan memory, serta menyediakan aksi seperti Start dan Stop.

Langkah 4: Lihat hasil sinkronisasi lengkap

  1. Masuk ke Konsol Manajemen Hologres.

  2. Pada halaman Instances, klik nama instans target Anda.

  3. Di pojok kanan atas halaman, klik Connect to Instance.

  4. Pada tab Metadata Management, lihat skema tabel dan data tabel users yang telah disinkronkan di database flink_test_db.

    Di pohon navigasi sebelah kiri, perluas nama instans Anda > flink_test_db > test_schema > Tables. Tabel users yang telah disinkronkan akan muncul.

    Skema dan data tabel yang telah disinkronkan adalah sebagai berikut.

    • Skema tabel

      Klik ganda nama tabel users untuk melihat skema tabel.

      Skema tabel users berisi bidang-bidang berikut: id (BIGINT, primary key), first_name (TEXT), last_name (TEXT), address.country (TEXT), address.state (TEXT), dan address.city (TEXT).

      Catatan

      Selama sinkronisasi data lengkap, kami menyarankan mendefinisikan partisi dan offset metadata Kafka sebagai primary key untuk tabel Hologres. Hal ini mencegah duplikasi data jika pekerjaan gagal dan mentransmisikan ulang data.

    • Data tabel

      Di pojok kanan atas halaman tabel users, klik Query table. Masukkan perintah berikut dan klik Run.

      SELECT * FROM test_schema.users;

      Perintah tersebut mengembalikan hasil berikut.

      Kueri mengembalikan beberapa baris. Hal ini mengonfirmasi bahwa catatan berhasil disinkronkan ke tabel users. Setiap baris yang dikembalikan berisi data lengkap untuk kolom id, first_name, last_name, address.country, address.state, dan address.city.

Langkah 5: Amati sinkronisasi skema otomatis

  1. Di konsol ApsaraMQ for Kafka, kirim pesan secara manual yang berisi kolom baru.

    1. Masuk ke ApsaraMQ for Kafka console.

    2. Pada halaman Instances, klik nama instans target.

    3. Pada halaman Topics, klik nama topik target (users).

    4. Klik Send Message.

    5. Konfigurasikan pesan.

      Pada kotak dialog Start to Send and Consume Message, konfigurasikan parameter sebagai berikut.

      Parameter

      Contoh

      Method of Sending

      Pilih Console.

      Message Key

      Masukkan flinktest.

      Message Content

      Salin dan tempel konten JSON berikut ke bidang Message Content.

      {
        "id": 100001,
        "first_name": "Dennise",
        "last_name": "Schuppe",
        "address": {
          "country": "Isle of Man",
          "state": "Montana",
          "city": "East Coleburgh"
        },
        "house-points": {
          "house": "Pukwudgie",
          "points": 76
        }
      }
      Catatan

      Dalam contoh ini, house-points adalah kolom bersarang baru.

      Send to Specified Partition

      Pilih Yes.

      Partition ID

      Masukkan 0.

    6. Klik OK.

  2. Di konsol Hologres, lihat perubahan skema dan data pada tabel users.

    1. Masuk ke konsol Hologres.

    2. Pada halaman Instances, klik nama instans target.

    3. Di pojok kanan atas halaman, klik Connect to Instance.

    4. Pada tab Metadata Management, klik ganda nama tabel users.

    5. Klik Query table, masukkan pernyataan berikut, lalu klik Running.

      SELECT * FROM test_schema.users;
    6. Lihat hasil kueri.

      Kueri mengembalikan hasil berikut.

      Hasil menunjukkan bahwa catatan dengan ID 100001 berhasil ditulis ke Hologres, dan dua kolom baru, house-points.house dan house-points.points, ditambahkan ke tabel Hologres.

      Catatan

      Pesan yang dikirim ke ApsaraMQ for Kafka hanya berisi satu kolom bersarang: house-points. Namun, karena json.infer-schema.flatten-nested-columns.enable ditentukan dalam klausa WITH, Realtime Compute for Apache Flink secara otomatis meratakan kolom ini, menggunakan jalur akses bidang bersarang sebagai nama kolom baru.

Referensi