全部产品
Search
文档中心

Realtime Compute for Apache Flink:Ingest data log ke gudang data secara real-time

更新时间:Jul 02, 2025

Topik ini menjelaskan cara menggunakan Realtime Compute for Apache Flink untuk menyinkronkan data log dari Kafka ke Hologres secara real-time.

Prasyarat

Langkah 1: Konfigurasi daftar putih alamat IP

Untuk memungkinkan ruang kerja Flink mengakses instansi Kafka dan Hologres, tambahkan blok CIDR dari vSwitch tempat ruang kerja Flink berada ke daftar putih instansi Kafka dan Hologres.

  1. Dapatkan blok CIDR vSwitch tempat ruang kerja Flink berada.

    1. Masuk ke Konsol Realtime Compute for Apache Flink.

    2. Temukan workspace target dan pilih More > Workspace Details di kolom Actions.

    3. Di kotak dialog Workspace Details, salin CIDR block dari vSwitch.

      网段信息

  2. Tambahkan blok CIDR ke daftar putih IP instansi Kafka Anda.

    Atur daftar putih untuk titik akhir dengan jaringan VPC.Kafka白名单

  3. Tambahkan blok CIDR ke daftar putih IP instansi Hologres Anda.

    Holo白名单

Langkah 2: Persiapkan data uji untuk instansi ApsaraMQ for Kafka

Gunakan Faker connector dari Realtime Compute for Apache Flink sebagai generator data dan tulis data tersebut ke instansi ApsaraMQ for Kafka. Anda dapat melakukan langkah-langkah berikut untuk menulis data ke instansi Kafka Anda di konsol pengembangan Realtime Compute for Apache Flink.

  1. Buat topik bernama users di konsol ApsaraMQ for Kafka.

  2. Kembangkan pekerjaan yang menulis data ke topik Kafka Anda.

    1. Masuk ke konsol manajemen Realtime Compute for Apache Flink.

    2. Temukan ruang kerja target dan klik Console di kolom Actions.

    3. Di panel navigasi sisi kiri, pilih Development > ETL. Pada halaman yang muncul, klik New.

    4. Dalam kotak dialog New Draft, pilih template, seperti Blank Stream Draft. Klik Next, lalu konfigurasikan draf sesuai tabel di bawah ini:

      Item konfigurasi

      Contoh

      Deskripsi

      Name

      kafka-data-input

      Nama draf SQL.

      Catatan

      Nama draf harus unik di namespace saat ini.

      Location

      Pengembangan

      Folder tempat file kode draf disimpan. Secara default, file kode draf disimpan di folder Development.

      Anda juga bisa mengklik ikon 新建文件夹 di sebelah kanan folder yang ada untuk membuat subfolder.

      Engine Version

      vvr-8.0.11-flink-1.17

      Pilih versi mesin untuk draf dari daftar drop-down.

    5. Klik Create.

    6. Tulis kode.

      Salin dan tempel potongan kode berikut ke editor SQL, lalu buat modifikasi yang diperlukan.

      CREATE TEMPORARY TABLE source (
        id INT,
        first_name STRING,
        last_name STRING,
        `address` ROW<`country` STRING, `state` STRING, `city` STRING>,
        event_time TIMESTAMP
      ) WITH (
        'connector' = 'faker',
        'number-of-rows' = '100',
        'rows-per-second' = '10',
        'fields.id.expression' = '#{number.numberBetween ''0'',''1000''}',
        'fields.first_name.expression' = '#{name.firstName}',
        'fields.last_name.expression' = '#{name.lastName}',
        'fields.address.country.expression' = '#{Address.country}',
        'fields.address.state.expression' = '#{Address.state}',
        'fields.address.city.expression' = '#{Address.city}',
        'fields.event_time.expression' = '#{date.past ''15'',''SECONDS''}'
      );
      
      CREATE TEMPORARY TABLE sink (
        id INT,
        first_name STRING,
        last_name STRING,
        `address` ROW<`country` STRING, `state` STRING, `city` STRING>,
        `timestamp` TIMESTAMP METADATA
      ) WITH (
        'connector' = 'kafka',
        'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092',
        'topic' = 'users',
        'format' = 'json'
      );
      
      INSERT INTO sink SELECT * FROM source;

      Ganti nilai placeholder dalam perintah di atas dengan nilai aktual Anda:

      Item konfigurasi

      Contoh

      Deskripsi

      properties.bootstrap.servers

      alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092

      Alamat IP atau titik akhir broker Kafka.

      Format: host:port,host:port,host:port. Pisahkan beberapa pasangan host:port dengan koma (,).

      Catatan

      Untuk mendapatkan titik akhir instansi ApsaraMQ for Kafka Anda, ikuti langkah-langkah berikut:

      1. Di konsol ApsaraMQ for Kafka, klik nama instansi target.

      2. Di bagian Endpoint Information pada halaman Instance Details yang muncul, temukan titik akhir dengan jaringan VPC.

      3. Salin nilai di kolom Domain Name.

      topic

      users

      Nama topik Kafka.

  3. Mulai pekerjaan.

    1. Di sudut kanan atas editor SQL, klik Deploy.

    2. Di kotak dialog Deploy draft, klik Confirm.

    3. Konfigurasikan sumber daya untuk pekerjaan.

    4. Pergi ke O&M > Deployments, temukan penyebaran target, dan klik Start di kolom Actions.

      Untuk informasi tentang parameter yang harus dikonfigurasi saat memulai penyebaran Anda, lihat Mulai penyebaran

    5. Di halaman Deployments, lihat status penyebaran.image

      Konektor Faker menyediakan aliran terbatas. Oleh karena itu, penyebaran menjadi FINISHED sekitar satu menit setelah penyebaran tetap RUNNING. Saat penyebaran selesai, itu menunjukkan bahwa data telah ditulis ke topik Kafka tujuan. Berikut adalah contoh pesan JSON yang ditulis ke ApsaraMQ for Kafka.

      {
        "id": 765,
        "first_name": "Barry",
        "last_name": "Pollich",
        "address": {
          "country": "Uni Emirat Arab",
          "state": "Nevada",
          "city": "Powlowskifurt"
        }
      }

Langkah 3: Buat katalog Hologres

Jika Anda ingin melakukan sinkronisasi tabel tunggal, Anda harus membuat tabel tujuan di katalog tujuan. Anda dapat membuat katalog tujuan di konsol pengembangan Realtime Compute for Apache Flink. Dalam topik ini, katalog Hologres digunakan sebagai katalog tujuan. Bagian ini secara singkat menjelaskan item konfigurasi penting saat Anda membuat katalog Hologres. Untuk informasi lebih rinci, lihat Buat Katalog Hologres.

Item konfigurasi

Deskripsi

catalog name

Masukkan nama kustom; dalam contoh ini, holo digunakan.

endpoint

Titik akhir instansi Hologres Anda.

username

ID AccessKey akun Alibaba Cloud.

password

Rahasia AccessKey akun Alibaba Cloud.

dbname

Masukkan nama database yang ada di Hologres. Contoh ini menggunakan flink_test_db.

Penting

Pastikan database flink_test_db yang ditentukan untuk bidang ini sudah dibuat di instansi Hologres Anda. Jika tidak, akan terjadi kesalahan. Untuk informasi lebih lanjut, lihat Buat database di dokumentasi Hologres.

Langkah 4: Kembangkan dan mulai pekerjaan sinkronisasi data

  1. Masuk ke konsol pengembangan Realtime Compute for Apache Flink dan kembangkan pekerjaan yang menyinkronkan data.

    1. Masuk ke konsol manajemen Realtime Compute for Apache Flink.

    2. Temukan ruang kerja target dan klik Console di kolom Actions.

    3. Di panel navigasi sisi kiri, pilih Development > ETL. Pada halaman yang muncul, klik New.

    4. Dalam kotak dialog New Draft, pilih template, seperti Blank Stream Draft. Klik Next, lalu konfigurasikan draf sesuai tabel di bawah ini:

      Item konfigurasi

      Contoh

      Deskripsi

      Name

      flink-quickstart-test

      Nama draf SQL.

      Catatan

      Nama draf harus unik di namespace saat ini.

      Location

      Pengembangan

      Folder tempat file kode draf disimpan. Secara default, file kode draf disimpan di folder Development.

      Anda juga bisa mengklik ikon 新建文件夹 di sebelah kanan folder yang ada untuk membuat subfolder.

      Engine Version

      vvr-8.0.11-flink-1.17

      Pilih versi mesin untuk draf dari daftar drop-down.

    5. Klik Create.

  2. Tulis kode. Salin dan tempel potongan kode berikut ke editor SQL, lalu buat modifikasi yang diperlukan.

    Gunakan salah satu metode berikut untuk menyinkronkan data dari topik Kafka users ke tabel sync_kafka_users di database flink_test_db di Hologres.

    CTAS

    Menggunakan pernyataan CREATE TABLE AS (CTAS) untuk sinkronisasi data menghilangkan kebutuhan untuk secara manual membuat tabel sync_kafka_users di Hologres dan menentukan tipe kolom sebagai JSON atau JSONB:

    CREATE TEMPORARY TABLE kafka_users (
      `id` INT NOT NULL,
      `address` STRING,
      `offset` BIGINT NOT NULL METADATA,
      `partition` BIGINT NOT NULL METADATA,
      `timestamp` TIMESTAMP METADATA,
      `date` AS CAST(`timestamp` AS DATE),
      `country` AS JSON_VALUE(`address`, '$.country'),
      PRIMARY KEY (`partition`, `offset`) NOT ENFORCED
    ) WITH (
      'connector' = 'kafka',
      'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092',
      'topic' = 'users',
      'format' = 'json',
      'json.infer-schema.flatten-nested-columns.enable' = 'true', -- Secara otomatis memperluas kolom bersarang. 
      'scan.startup.mode' = 'earliest-offset'
    );
    
    CREATE TABLE IF NOT EXISTS holo.flink_test_db.sync_kafka_users
    WITH (
      'connector' = 'hologres'
    ) AS TABLE kafka_users;
    Catatan

    Untuk mencegah data duplikat ditulis ke Hologres setelah pekerjaan gagal, Anda dapat menambahkan kunci utama terkait ke tabel untuk secara unik mengidentifikasi data. Jika data dikirim ulang, Hologres memastikan hanya satu salinan data dengan nilai partisi dan offset yang sama yang disimpan.

    Ganti nilai placeholder dalam perintah di atas dengan nilai aktual Anda:

    Item konfigurasi

    Contoh

    Deskripsi

    properties.bootstrap.servers

    alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092

    Alamat IP atau titik akhir broker Kafka.

    Format: host:port,host:port,host:port. Pisahkan beberapa pasangan host:port dengan koma (,).

    Catatan

    Untuk mendapatkan titik akhir instansi ApsaraMQ for Kafka Anda, ikuti langkah-langkah berikut:

    1. Di konsol ApsaraMQ for Kafka, klik nama instansi target.

    2. Di bagian Endpoint Information pada halaman Instance Details yang muncul, temukan titik akhir dengan jaringan VPC.

    3. Salin nilai di kolom Domain Name.

    topic

    users

    Nama topik Kafka.

    INSERT INTO

    Metode khusus digunakan untuk mengoptimalkan data JSON dan JSONB di Hologres. Oleh karena itu, Anda dapat menggunakan pernyataan INSERT INTO untuk menyinkronkan data JSON bersarang ke Hologres.

    Metode ini mengharuskan Anda untuk secara manual membuat tabel bernama sync_kafka_users di Hologres sebelum menyinkronkan data dengan menggunakan perintah SQL berikut:

    CREATE TEMPORARY TABLE kafka_users (
      `id` INT NOT NULL,
      'address' STRING, -- Data di kolom ini adalah data JSON bersarang. 
      `offset` BIGINT NOT NULL METADATA,
      `partition` BIGINT NOT NULL METADATA,
      `timestamp` TIMESTAMP METADATA,
      `date` AS CAST(`timestamp` AS DATE),
      `country` AS JSON_VALUE(`address`, '$.country')
    ) WITH (
      'connector' = 'kafka',
      'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092',
      'topic' = 'users',
      'format' = 'json',
      'json.infer-schema.flatten-nested-columns.enable' = 'true', -- Secara otomatis memperluas kolom bersarang. 
      'scan.startup.mode' = 'earliest-offset'
    );
    
    CREATE TEMPORARY TABLE holo (
      `id` INT NOT NULL,
      `address` STRING,
      `offset` BIGINT,
      `partition` BIGINT,
      `timestamp` TIMESTAMP,
      `date` DATE,
      `country` STRING
    ) WITH (
      'connector' = 'hologres',
      'endpoint' = 'hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80',
      'username' = '************************',
      'password' = '******************************',
      'dbname' = 'flink_test_db',
      'tablename' = 'sync_kafka_users'
    );
    
    INSERT INTO holo
    SELECT * FROM kafka_users;

    Ganti nilai placeholder dalam perintah di atas dengan nilai aktual Anda:

    Item konfigurasi

    Contoh

    Deskripsi

    properties.bootstrap.servers

    alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092

    Alamat IP atau titik akhir broker Kafka.

    Format: host:port,host:port,host:port. Pisahkan beberapa pasangan host:port dengan koma (,).

    Catatan

    Untuk mendapatkan titik akhir instansi ApsaraMQ for Kafka Anda, ikuti langkah-langkah berikut:

    1. Di konsol ApsaraMQ for Kafka, klik nama instansi target.

    2. Di bagian Endpoint Information pada halaman Instance Details yang muncul, temukan titik akhir dengan jaringan VPC.

    3. Salin nilai di kolom Domain Name.

    topic

    users

    Nama topik Kafka.

    endpoint

    hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80

    Titik akhir instansi Hologres.

    Format: <ip>:<port>.

    Catatan

    Untuk mendapatkan nilai titik akhir, ikuti langkah-langkah berikut:

    1. Pergi ke Konsol Hologres.

    2. Klik nama instansi Hologres Anda.

    3. Di bagian Network Information pada halaman detail instansi, temukan titik akhir yang sesuai dengan Select VPC dan salin nilai titik akhir.

    username

    ************************

    Nama pengguna dan kata sandi yang digunakan untuk mengakses database Hologres. Masukkan ID AccessKey dan rahasia akun Alibaba Cloud Anda.

    Penting

    Untuk meningkatkan keamanan kredensial Anda, hindari menuliskan pasangan AccessKey dalam teks biasa; gunakan variabel sebagai gantinya. Untuk informasi lebih lanjut, lihat Kelola variabel.

    password

    ******************************

    dbname

    flink_test_db

    Nama database Hologres yang ingin Anda akses.

    tablename

    sync_kafka_users

    Nama tabel Hologres.

    Catatan
    • Jika Anda menggunakan pernyataan INSERT INTO untuk menyinkronkan data, Anda harus membuat tabel sync_kafka_users dan mendefinisikan bidang yang diperlukan di database instansi Hologres tujuan terlebih dahulu.

    • Jika skema publik tidak digunakan, Anda harus menentukan tablename dalam format schema.tableName.

  3. Simpan draf.

  4. Klik Deploy.

  5. Pergi ke O&M > Deployments, temukan penyebaran target, dan klik Start di kolom Actions.

    Untuk informasi tentang parameter yang harus dikonfigurasi saat memulai penyebaran, lihat Mulai Penyebaran.

    Anda dapat melihat status dan informasi lainnya tentang penyebaran di halaman Deployments setelah dimulai.image

Langkah 5: Lihat hasil sinkronisasi data penuh

  1. Masuk ke Konsol Hologres.

  2. Di halaman Instances, klik nama instansi target.

  3. Di sudut kanan atas halaman, klik Connect to Instance.

  4. Di tab Metadata Management, lihat skema dan data tabel sync_kafka_users yang menerima data dari topik Kafka bernama users.

    sync_kafka_users表

    Gambar-gambar berikut menunjukkan skema dan data tabel sync_kafka_users setelah sinkronisasi data.

    • Skema Tabel

      Klik dua kali nama tabel sync_kafka_users untuk melihat skema tabel.

      表结构

      Catatan

      Saat mengembangkan pekerjaan sinkronisasi data, kami sarankan Anda mendeklarasikan bidang partisi dan offset Kafka sebagai kunci utama untuk tabel Hologres. Dengan cara ini, jika data dikirim ulang karena kegagalan penyebaran, hanya satu salinan data dengan nilai partisi dan offset yang sama yang disimpan.

    • Data Tabel

      Di sudut kanan atas halaman untuk tabel sync_kafka_users, klik Query table. Di editor SQL, salin dan tempel pernyataan berikut dan klik Running.

      SELECT * FROM public.sync_kafka_users order by partition, "offset";

      Gambar berikut menunjukkan data tabel sync_kafka_users.表数据

Langkah 6: Periksa apakah perubahan skema tabel disinkronkan secara otomatis

  1. Di konsol ApsaraMQ for Kafka, kirim pesan yang berisi kolom baru.

    1. Masuk ke ApsaraMQ for Kafka console.

    2. Di halaman Instances, klik nama instansi target.

    3. Di panel navigasi sisi kiri halaman yang muncul, klik Topics. Di halaman yang muncul, temukan topik bernama users.

    4. Klik Send Message di kolom Actions.

    5. Di panel Start to Send and Consume Message, konfigurasikan parameter sebagai berikut.

      image

      Item konfigurasi

      Contoh

      Method of Sending

      Pilih Console.

      Message Key

      Masukkan flinktest.

      Message Content

      Salin dan tempel konten JSON berikut ke bidang Isi Pesan.

      {
        "id": 100001,
        "first_name": "Dennise",
        "last_name": "Schuppe",
        "address": {
          "country": "Isle of Man",
          "state": "Montana",
          "city": "East Coleburgh"
        },
        "house-points": {
          "house": "Pukwudgie",
          "points": 76
        }
      }
      Catatan

      Dalam kode JSON di atas, house-points adalah kolom bersarang baru.

      Send to Specified Partition

      Pilih Yes.

      Partition ID

      Masukkan 0.

    6. Klik OK.

  2. Di konsol Hologres, lihat perubahan pada skema dan data tabel sync_kafka_users.

    1. Masuk ke Konsol Hologres.

    2. Di halaman Instances, klik nama instansi target.

    3. Di sudut kanan atas halaman, klik Connect to Instance.

    4. Di tab Metadata Management, klik dua kali nama tabel sync_kafka_users.

    5. Di sudut kanan atas halaman untuk tabel sync_kafka_users, klik Query table. Di editor SQL, masukkan pernyataan berikut dan klik Running.

      SELECT * FROM public.sync_kafka_users order by partition, "offset";
    6. Lihat data tabel.

      Gambar berikut menunjukkan data tabel sync_kafka_users.Hologres表结果

      Gambar tersebut menunjukkan bahwa catatan data dengan id 100001 ditulis ke Hologres. Selain itu, kolom house-points.house dan house-points.points ditambahkan ke Hologres.

      Catatan

      Hanya data di kolom bersarang house-points yang termasuk dalam data yang dimasukkan ke tabel ApsaraMQ for Kafka. Namun, json.infer-schema.flatten-nested-columns.enable ditentukan dalam klausa WITH untuk membuat tabel Kafka users. Dalam kasus ini, Realtime Compute for Apache Flink secara otomatis memperluas kolom bersarang baru. Setelah kolom diperluas, jalur untuk mengakses kolom digunakan sebagai nama kolom.

Referensi