全部产品
Search
文档中心

Realtime Compute for Apache Flink:Ingesti data ke gudang data secara real time

更新时间:Nov 10, 2025

Realtime Compute for Apache Flink memungkinkan Anda mengingesti data ke gudang data secara real time. Layanan ini menyediakan berbagai fitur, seperti peralihan sinkronisasi data penuh dan inkremental, penemuan metadata otomatis, sinkronisasi perubahan skema tabel, serta sinkronisasi database, untuk menyederhanakan ingesti data real-time ke gudang data dan meningkatkan efisiensi serta kenyamanan sinkronisasi data real-time. Topik ini menjelaskan cara membuat draf yang menyinkronkan data dari instans ApsaraDB RDS for MySQL ke instans Hologres di konsol pengembangan Realtime Compute for Apache Flink.

Informasi latar belakang

Sebagai contoh, sebuah instans ApsaraDB RDS for MySQL memiliki empat database bernama tpc_ds, user_db1, user_db2, dan user_db3. Database tpc_ds berisi 24 tabel bisnis dengan skema tabel yang berbeda-beda. Sharding dilakukan pada database user_db1, user_db2, dan user_db3. Masing-masing dari ketiga database tersebut berisi tiga tabel dengan skema tabel yang sama, sehingga total terdapat sembilan tabel bernama user01 hingga user09. Gambar berikut menunjukkan struktur database dan tabel pada instans ApsaraDB RDS for MySQL yang dapat Anda lihat di konsol Alibaba Cloud Database Management Service (DMS).数据库和表情况

Jika Anda ingin mengembangkan draf untuk menyinkronkan tabel dan data dari database instans ApsaraDB RDS for MySQL ke Hologres, ikuti langkah-langkah berikut. Anda dapat menggabungkan tabel bernama user01 hingga user09 dan menyinkronkan datanya ke satu tabel Hologres.

Dalam topik ini, pernyataan CREATE TABLE AS dan CREATE DATABASE AS yang didukung oleh Realtime Compute for Apache Flink digunakan untuk menyinkronkan seluruh database, menggabungkan dan menyinkronkan tabel dalam database ter-shard, melakukan sinkronisasi data penuh dan inkremental, serta menyinkronkan perubahan skema tabel secara real time.

Prasyarat

  • Jika Anda mengakses konsol pengembangan Realtime Compute for Apache Flink sebagai Pengguna Resource Access Management (RAM) atau dengan mengasumsikan Peran RAM, izin yang diperlukan telah diberikan kepada Pengguna RAM atau Peran RAM tersebut. Untuk informasi selengkapnya, lihat Manajemen izin.

  • Ruang kerja Realtime Compute for Apache Flink telah dibuat. Untuk informasi selengkapnya, lihat Aktifkan Realtime Compute for Apache Flink.

  • Instans penyimpanan hulu dan hilir telah dibuat.

    Catatan

    Instans ApsaraDB RDS for MySQL dan instans Hologres berada dalam virtual private cloud (VPC) yang sama dengan ruang kerja Realtime Compute for Apache Flink. Jika instans-instans tersebut tidak berada dalam VPC yang sama dengan ruang kerja Realtime Compute for Apache Flink, Anda harus membuat koneksi jaringan antara instans ApsaraDB RDS for MySQL dan ruang kerja Realtime Compute for Apache Flink serta antara instans Hologres dan ruang kerja Realtime Compute for Apache Flink. Untuk informasi selengkapnya, lihat bagian Bagaimana Realtime Compute for Apache Flink mengakses layanan lintas VPC? dan Bagaimana Realtime Compute for Apache Flink mengakses Internet? dalam topik "Referensi".

  • Data uji telah disiapkan dan daftar putih alamat IP MySQL serta Hologres telah dikonfigurasi. Untuk informasi selengkapnya, lihat bagian Siapkan data uji dan Konfigurasi daftar putih alamat IP dalam topik ini.

Siapkan data uji

  1. Klik tpc_ds.sql, user_db1.sql, user_db2.sql, dan user_db3.sql untuk mengunduh data uji ke komputer lokal Anda.

  2. Di konsol DMS, siapkan data uji untuk instans ApsaraDB RDS for MySQL.

    1. Masuk ke instans ApsaraDB RDS for MySQL dari konsol DMS.

    2. Di tab SQLConsole, masukkan perintah berikut dan klik Execute.

      Perintah berikut digunakan untuk membuat database tpc_ds, user_db1, user_db2, dan user_db3.

      CREATE DATABASE tpc_ds;
      CREATE DATABASE user_db1;
      CREATE DATABASE user_db2;
      CREATE DATABASE user_db3;
    3. Di bilah navigasi atas, klik Data Import. Di halaman yang muncul, klik tab Large Data Import.

    4. Di tab Large Data Import, pilih database yang datanya ingin Anda impor di kolom Database, klik File untuk mengunggah file SQL dari database yang dipilih, lalu klik Submit. Setelah file lolos pemeriksaan awal, ajukan tiket. Setelah tiket disetujui, klik Execute Change. Di kotak dialog yang muncul, klik Confirm Execution.

      Ulangi langkah ini untuk mengimpor file data ke database tpc_ds, user_db1, user_db2, dan user_db3 secara berurutan.导入数据

  3. Masuk ke konsol Hologres dan buat database my_user untuk menyimpan data hasil penggabungan tabel user01 hingga user09.

    Untuk informasi selengkapnya tentang cara membuat database, lihat Buat database.

Konfigurasi daftar putih alamat IP

Agar Realtime Compute for Apache Flink dapat mengakses instans ApsaraDB RDS for MySQL dan Hologres, Anda harus menambahkan blok CIDR dari vSwitch tempat ruang kerja Realtime Compute for Apache Flink berada ke daftar putih instans ApsaraDB RDS for MySQL dan Hologres.

  1. Dapatkan blok CIDR dari vSwitch tempat ruang kerja Realtime Compute for Apache Flink berada.

    1. Masuk ke konsol manajemen Realtime Compute for Apache Flink.

    2. Di tab Fully Managed Flink, temukan workspace Anda dan pilih More > Workspace Details di kolom Actions.

    3. Di kotak dialog Workspace Details, lihat CIDR block dari vSwitch tempat ruang kerja Realtime Compute for Apache Flink berada.

      网段信息

  2. Tambahkan blok CIDR dari vSwitch tempat instans Realtime Compute for Apache Flink berada ke daftar putih alamat IP instans ApsaraDB RDS for MySQL.

    Untuk informasi selengkapnya, lihat Konfigurasi daftar putih alamat IP.RDS白名单

  3. Tambahkan blok CIDR dari vSwitch tempat ruang kerja Realtime Compute for Apache Flink berada ke daftar putih alamat IP instans Hologres.

    Untuk mengonfigurasi daftar putih alamat IP untuk instans di konsol HoloWeb, Anda harus mengatur parameter Logon Method menjadi Password-free Logon saat menyiapkan koneksi ke instans tersebut. Untuk informasi selengkapnya, lihat Konfigurasi daftar putih alamat IP.Holo白名单

Langkah 1: Buat katalog

Jika Anda ingin menyinkronkan seluruh database, menggabungkan dan menyinkronkan tabel dalam database ter-shard, atau menyinkronkan satu tabel saja, Anda harus membuat katalog tujuan. Anda juga harus membuat katalog sumber untuk mendapatkan daftar tabel sumber dan informasi mengenai tabel-tabel tersebut. Anda dapat membuat katalog sumber dan tujuan di konsol pengembangan Realtime Compute for Apache Flink. Dalam contoh ini, katalog sumber adalah katalog ApsaraDB RDS for MySQL dan katalog tujuan adalah katalog Hologres.

  1. Buat katalog ApsaraDB RDS for MySQL bernama mysql.

    Untuk informasi selengkapnya, lihat bagian "Kelola katalog Hologres" dalam topik Kelola katalog MySQL.mysql catalog

  2. Buat katalog Hologres bernama holo.

    Untuk informasi selengkapnya, lihat bagian "Buat katalog Hologres" dalam topik Kelola katalog Hologres.Holo Catalog

  3. Masuk ke konsol pengembangan Realtime Compute for Apache Flink. Di panel navigasi sebelah kiri, klik Catalogs. Di halaman Catalog List, periksa apakah katalog mysql dan holo telah dibuat.

Langkah 2: Kembangkan draf sinkronisasi data

  1. Masuk ke konsol pengembangan Realtime Compute for Apache Flink dan buat draf.

    1. Di panel navigasi sebelah kiri, pilih Development > ETL. Di pojok kiri atas halaman SQL Editor, klik New.

    2. Di tab SQL Scripts pada kotak dialog New Draft, klik Blank Stream Draft.

      Realtime Compute for Apache Flink menyediakan berbagai templat kode dan mendukung sinkronisasi data. Setiap templat kode menyediakan skenario tertentu, contoh kode, dan instruksi. Anda dapat mengklik templat untuk mempelajari fitur-fitur dan sintaks terkait Realtime Compute for Apache Flink serta menerapkan logika bisnis Anda. Untuk informasi selengkapnya, lihat Templat kode dan Templat sinkronisasi data.

    3. Klik Next.

    4. Di kotak dialog New Draft, konfigurasikan parameter draf. Tabel berikut menjelaskan parameter-parameter tersebut.

      Parameter

      Deskripsi

      Contoh

      Name

      Nama draf yang ingin Anda buat.

      Catatan

      Nama draf harus unik dalam proyek saat ini.

      flink-test

      Location

      Folder tempat file kode draf disimpan.

      Anda juga dapat mengklik ikon 新建文件夹 di sebelah kanan folder yang sudah ada untuk membuat subfolder.

      Draft

      Engine Version

      Anda dapat melihat versi mesin Flink yang digunakan oleh penerapan. Untuk informasi selengkapnya tentang versi mesin, pemetaan versi, dan titik waktu penting dalam siklus hidup setiap versi, lihat Versi mesin.

      vvr-6.0.4-flink-1.15

    5. Klik Create.

  2. Salin kode draf berikut ke editor kode.

    Kode contoh berikut menunjukkan cara menyinkronkan semua tabel dalam database tpc_ds ApsaraDB RDS for MySQL ke database tpc_ds Hologres, lalu menggabungkan dan menyinkronkan tabel user01 hingga user09 ke tabel my_user.users Hologres. Contoh kode:

    USE CATALOG holo;
    
    BEGIN STATEMENT SET;
    
    -- Sinkronkan semua tabel dalam database tpc_ds ApsaraDB RDS for MySQL ke database tpc_ds Hologres. 
    CREATE DATABASE IF NOT EXISTS tpc_ds
    AS DATABASE mysql.tpc_ds INCLUDING ALL TABLES
    /*+ OPTIONS('server-id'='8001-8004') */ ;
    
    -- Sinkronkan tabel user01 hingga user09 ke tabel my_user.users Hologres. 
    CREATE TABLE IF NOT EXISTS my_user.users
    AS TABLE mysql.`user_db[0-9]+`.`user[0-9]+`
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    END;

    Pernyataan CREATE DATABASE AS digunakan untuk menyinkronkan semua tabel dalam database tpc_ds ke Hologres. Pernyataan CREATE TABLE AS digunakan untuk menyinkronkan tabel user01 hingga user09 ke satu tabel di Hologres. Pernyataan STATEMENT SET digunakan untuk menggabungkan dan meng-commit pernyataan CREATE DATABASE AS dan CREATE TABLE AS dalam satu penerapan. Realtime Compute for Apache Flink secara otomatis mengoptimalkan sumber dan menggunakan kembali satu node sumber untuk membaca data dari beberapa tabel ApsaraDB RDS for MySQL. Hal ini secara signifikan mengurangi jumlah koneksi ApsaraDB RDS for MySQL dan beban pembacaan data, serta meningkatkan stabilitas pembacaan.

    Catatan

    Jika Anda ingin menyinkronkan tabel tertentu dalam sebuah database, Anda dapat menambahkan INCLUDING TABLE atau EXCLUDING TABLE ke pernyataan CREATE DATABASE AS untuk menentukan tabel yang ingin Anda sinkronkan. Misalnya, INCLUDING TABLE 'web.*' berarti hanya tabel-tabel yang namanya diawali dengan web dalam database tersebut yang perlu disinkronkan.

Langkah 3: Mulai penerapan

  1. Di pojok kanan atas halaman ETL, klik Deploy. Di kotak dialog yang muncul, klik Confirm.部署

    Catatan

    Kluster sesi berlaku untuk lingkungan non-produksi, seperti lingkungan pengembangan dan pengujian. Anda dapat menerapkan atau men-debug draf dalam kluster sesi untuk meningkatkan pemanfaatan sumber daya Manajer Pekerjaan dan mempercepat startup penerapan. Kami menyarankan agar Anda tidak menerapkan draf dalam kluster sesi. Jika Anda menerapkan draf dalam kluster sesi, masalah stabilitas mungkin terjadi. Untuk informasi selengkapnya, lihat Konfigurasi lingkungan pengembangan dan pengujian (kluster sesi).

  2. Di panel navigasi sebelah kiri, pilih O&M > Deployments. Di halaman Deployments, temukan penerapan yang ingin Anda kelola dan klik Start di kolom Actions. Untuk informasi selengkapnya tentang cara mengonfigurasi parameter, lihat Mulai penerapan pekerjaan.

  3. Di kotak dialog Start Job, klik Start.

    Anda dapat melihat status dan informasi penerapan di halaman Deployments setelah penerapan dimulai.作业状态

Langkah 4: Lihat hasil sinkronisasi data penuh

  1. Masuk ke konsol Hologres.

  2. Di panel navigasi sebelah kiri, klik Go to HoloWeb. Di halaman Metadata Management HoloWeb, lihat 24 tabel dan data tabel dalam database tpc_ds instans Hologres.

    holo表数据

  3. Di halaman Metadata Management, lihat skema tabel users dalam database my_user.

    Gambar berikut menunjukkan skema tabel dan data setelah sinkronisasi data penuh.

    • Skema tabel表结构

      Dalam skema tabel users, kolom _db_name dan _table_name ditambahkan berdasarkan skema tabel sumber ApsaraDB RDS for MySQL. Kolom _db_name menunjukkan nama database sumber data, dan kolom _table_name menunjukkan nama tabel sumber data. Kedua kolom ini digunakan sebagai bagian dari kunci utama gabungan untuk memastikan keunikan data setelah tabel-tabel dalam database ter-shard digabungkan.

    • Data tabel

      Di pojok kanan atas tab users, klik Query table. Di editor SQL, masukkan perintah berikut dan klik Run:

      select * from users order by _db_name,_table_name,id;

      Gambar berikut menunjukkan data tabel.表数据

Langkah 5: Lihat hasil sinkronisasi inkremental

Setelah sinkronisasi data penuh selesai, sistem secara otomatis beralih ke fase sinkronisasi data inkremental tanpa memerlukan intervensi manual. Anda dapat menentukan fase sinkronisasi data suatu penerapan sinkronisasi data berdasarkan nilai currentEmitEventTimeLag pada titik waktu tertentu di tab Alerm di konsol pengembangan Realtime Compute for Apache Flink.

  1. Masuk ke konsol manajemen Realtime Compute for Apache Flink.

  2. Di tab Fully Managed Flink, temukan ruang kerja yang ingin Anda kelola dan klik Console di kolom Actions.

  3. Di panel navigasi sebelah kiri, pilih O&M > Deployments. Di halaman Deployments, klik nama penerapan yang ingin Anda kelola.

  4. Klik tab Alerm.

  5. Lihat grafik currentEmitEventTimeLag untuk menentukan fase sinkronisasi data penerapan.

    数据曲线

    • Jika nilai currentEmitEventTimeLag pada suatu titik waktu adalah 0, penerapan berjalan dalam fase sinkronisasi data penuh.

    • Jika nilai currentEmitEventTimeLag pada suatu titik waktu lebih besar dari 0, penerapan memasuki fase sinkronisasi inkremental.

  6. Verifikasi sinkronisasi real-time perubahan data dan skema.

    Sumber data MySQL CDC memungkinkan Anda menyinkronkan perubahan data tabel dan skema secara real time selama sinkronisasi data inkremental. Setelah penerapan memasuki fase sinkronisasi data inkremental, Anda dapat memodifikasi skema dan data dalam tabel user01 hingga user09 instans ApsaraDB RDS for MySQL untuk memverifikasi sinkronisasi real-time perubahan data dan skema.

    1. Masuk ke instans ApsaraDB RDS for MySQL menggunakan konsol DMS.

    2. Di database user_db2, jalankan perintah berikut untuk memodifikasi skema tabel user02, memasukkan data ke tabel user02, dan memperbarui data di tabel user05:

      USE DATABASE `user_db2`;
      ALTER TABLE `user02` ADD COLUMN `age` INT;   -- Tambahkan kolom age ke tabel user02. 
      INSERT INTO `user02` (id, name, age) VALUES (27, 'Tony', 30); -- Masukkan data yang mencakup informasi age ke tabel user02. 
      UPDATE `user05` SET name='JARK' WHERE id=15;  -- Ubah nilai spesifik bidang name menjadi huruf kapital.

    3. Di konsol Hologres, lihat perubahan skema dan data tabel users.

      Di pojok kanan atas tab users, klik Query table. Di editor SQL, masukkan perintah berikut dan klik Run:

      select * from users order by _db_name,_table_name,id;

      Gambar berikut menunjukkan data tabel.表结构和数据变化Skema beberapa tabel dalam database ter-shard berbeda-beda. Namun, perubahan skema dan data tabel user02 serta perubahan data tabel user05 disinkronkan ke tabel tujuan secara real time. Di tabel users Hologres, kolom age ditambahkan, data age Tony dimasukkan, dan nama JARK ditampilkan dalam huruf kapital.

(Opsional) Langkah 6: Konfigurasi sumber daya untuk penerapan

Untuk memastikan kinerja penerapan optimal, kami menyarankan Anda menyesuaikan paralelisme penerapan dan konfigurasi sumber daya berbagai node berdasarkan jumlah data yang perlu diproses. Untuk menyesuaikan paralelisme penerapan dan jumlah CU secara sederhana, gunakan mode konfigurasi sumber daya dasar. Untuk menyesuaikan paralelisme penerapan dan konfigurasi sumber daya node secara lebih detail halus, gunakan mode konfigurasi sumber daya ahli.

  1. Di panel navigasi sebelah kiri, pilih O&M > Deployments. Di halaman Deployments, klik nama pengembangan yang ingin Anda kelola.

  2. Di pojok kanan atas bagian Resources di tab Configuration, klik Edit.

  3. Pilih Export untuk parameter Mode. Lalu, klik Get Plan Now.

  4. Arahkan pointer ke More dan klik Expand All.

    Anda dapat melihat topologi lengkap untuk memahami rencana sinkronisasi data penerapan. Rencana ini menunjukkan tabel-tabel dari mana data perlu disinkronkan.

  5. Konfigurasikan PARALLELISM secara manual untuk setiap node.

    Atur PARALLELISM menjadi 4 untuk semua node sink kecuali node holo.tpc_ds.store_sales. Tabel store_sales dalam database tpc_ds berisi jumlah data terbesar. Untuk meningkatkan kinerja penulisan data ke Hologres, Anda dapat mengatur PARALLELISM menjadi 8 untuk node holo.tpc_ds.store_sales. Untuk informasi selengkapnya tentang cara mengonfigurasi parameter sumber daya, lihat Konfigurasi penerapan.

  6. Di pojok kanan atas bagian Resources, klik Save.

  7. Mulai ulang penerapan.

    Setelah sumber daya dikonfigurasi untuk penerapan, Anda harus memulai ulang penerapan agar konfigurasi berlaku.

  8. Klik nama penerapan Anda. Di tab Overview, lihat efek setelah penyesuaian.

FAQ

Referensi