全部产品
Search
文档中心

Realtime Compute for Apache Flink:Panduan Cepat untuk Pekerjaan Ingesti Data Flink CDC

更新时间:Jan 27, 2026

Realtime Compute for Apache Flink menggunakan Flink Change Data Capture (CDC) untuk mengingesti data dari sumber ke tujuan. Anda dapat mengembangkan pekerjaan YAML untuk menyinkronkan data. Topik ini menjelaskan cara membuat pekerjaan ingesti data Flink CDC guna menyinkronkan seluruh data dari database MySQL ke StarRocks.

Prasyarat

Informasi latar belakang

Asumsikan bahwa sebuah instans MySQL memiliki database bernama order_dw_mysql. Database ini berisi tiga tabel bisnis: orders, orders_pay, dan product_catalog. Untuk mengembangkan pekerjaan ingesti data Flink CDC yang menyinkronkan tabel-tabel tersebut beserta datanya ke database order_dw_sr di StarRocks, ikuti langkah-langkah berikut:

  1. Langkah 1: Siapkan data uji RDS MySQL

  2. Langkah 2: Kembangkan pekerjaan ingesti data Flink CDC

  3. Langkah 3: Jalankan pekerjaan ingesti data Flink CDC

  4. Step 4: View the synchronization result in StarRocks

Langkah 1: Siapkan data uji RDS MySQL

  1. Buat database dan akun.

    Pada instans RDS MySQL, buat database bernama order_dw_mysql dan akun standar yang memiliki izin baca dan tulis pada database tersebut. Untuk informasi selengkapnya, lihat Buat database dan akun dan Kelola database.

  2. Masuk ke instans RDS MySQL menggunakan DMS.

    Untuk informasi selengkapnya, lihat Masuk ke instans RDS MySQL menggunakan DMS.

  3. Pada SQL Console, masukkan perintah berikut dan klik Execute untuk membuat tiga tabel bisnis dan memasukkan data.

    CREATE TABLE `orders` (
      order_id bigint not null primary key,
      user_id varchar(50) not null,
      shop_id bigint not null,
      product_id bigint not null,
      buy_fee numeric(20,2) not null,   
      create_time timestamp not null,
      update_time timestamp not null default now(),
      state int not null 
    );
    
    
    CREATE TABLE `orders_pay` (
      pay_id bigint not null primary key,
      order_id bigint not null,
      pay_platform int not null, 
      create_time timestamp not null
    );
    
    
    CREATE TABLE `product_catalog` (
      product_id bigint not null primary key,
      catalog_name varchar(50) not null
    );
    
    -- Siapkan data
    INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee');
    
    INSERT INTO orders VALUES
    (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
    (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
    (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
    (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
    (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
    (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
    (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
    
    INSERT INTO orders_pay VALUES
    (2001, 100001, 1, '2023-02-15 17:40:56'),
    (2002, 100002, 1, '2023-02-15 17:40:56'),
    (2003, 100003, 0, '2023-02-15 17:40:56'),
    (2004, 100004, 0, '2023-02-15 17:40:56'),
    (2005, 100005, 0, '2023-02-15 18:40:56'),
    (2006, 100006, 0, '2023-02-15 18:40:56'),
    (2007, 100007, 0, '2023-02-15 18:40:56');

Langkah 2: Kembangkan pekerjaan ingesti data Flink CDC

  1. Masuk ke Konsol Realtime Compute for Apache Flink.

  2. Klik Console untuk menavigasi ke ruang kerja yang diperlukan.

  3. Pada panel navigasi di sebelah kiri, pilih Data Studio > Data Ingestion.

  4. Klik ikon image, klik New from Template, pilih MySQL to StarRocks Data Synchronization, lalu klik Next.

    image

  5. Masukkan Job Name, tentukan Storage Location, pilih Engine Version, lalu klik OK.

  6. Konfigurasikan kode pekerjaan YAML.

    Kode berikut memberikan contoh cara menyinkronkan semua tabel dari database order_dw_mysql di MySQL ke database order_dw_sr di StarRocks.

    source:
      type: mysql
      hostname: rm-bp1rk934iidc3****.mysql.rds.aliyuncs.com
      port: 3306
      username: ${secret_values.mysqlusername}
      password: ${secret_values.mysqlpassword}
      tables: order_dw_mysql.\.*
      server-id: 8601-8604
      # (Opsional) Sinkronkan data dari tabel yang baru dibuat selama fase inkremental.
      scan.binlog.newly-added-table.enabled: true
      # (Opsional) Sinkronkan komentar tabel dan bidang.
      include-comments.enabled: true
      # (Opsional) Utamakan distribusi chunk tak terbatas untuk mencegah potensi error OutOfMemory pada TaskManager.
      scan.incremental.snapshot.unbounded-chunk-first.enabled: true
      # (Opsional) Aktifkan filter parsing untuk mempercepat pembacaan.
      scan.only.deserialize.captured.tables.changelog.enabled: true 
    
    sink:
      type: starrocks
      name: StarRocks Sink
      jdbc-url: jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030
      load-url: fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030
      username: ${secret_values.starrocksusername}
      password: ${secret_values.starrockspassword}
      table.create.properties.replication_num: 1
      sink.buffer-flush.interval-ms: 5000 # Flush data setiap 5 detik.
      
    route:
      - source-table: order_dw_mysql.\.*
        sink-table: order_dw_sr.<>
        replace-symbol: <>
        description: route all tables in source_db to sink_db
    
    pipeline:
      name: Sync MySQL Database to StarRocks

    Tabel berikut menjelaskan informasi konfigurasi yang diperlukan untuk contoh ini. Untuk informasi selengkapnya tentang parameter ingesti data, lihat MySQL dan StarRocks.

    Catatan

    Pekerjaan YAML hanya mendukung variabel proyek. Anda dapat menggunakan variabel untuk menghindari pengeksposan password teks biasa dan informasi sensitif lainnya. Untuk informasi selengkapnya, lihat Manajemen Variabel.

    Category

    Parameter

    Description

    Nilai contoh

    source

    hostname

    Alamat IP atau hostname database MySQL.

    Kami menyarankan Anda menggunakan alamat VPC.

    rm-bp1rk934iidc3****.mysql.rds.aliyuncs.com

    port

    Nomor port layanan database MySQL.

    3306

    username

    Username dan password untuk layanan database MySQL. Gunakan informasi akun yang Anda buat di Langkah 1: Siapkan data uji RDS MySQL.

    ${secret_values.mysqlusername}

    password

    ${secret_values.mysqlpassword}

    tables

    Nama tabel MySQL. Ekspresi reguler didukung untuk membaca data dari beberapa tabel.

    Topik ini menyinkronkan semua tabel dan data dalam database order_dw_mysql.

    order_dw_mysql.\.*

    server-id

    ID numerik untuk client database.

    5405-5415

    sink

    jdbc-url

    URL Java Database Connectivity (JDBC).

    Tentukan alamat IP dan port kueri frontend (FE) dalam format jdbc:mysql://ip:port.

    Pada tab Instance Details di Konsol E-MapReduce, Anda dapat melihat Internal Network Address dan Query Port FE dari instans target.

    jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030

    load-url

    URL layanan HTTP yang digunakan untuk menghubungkan ke node FE.

    Anda dapat melihat Internal Endpoint dan HTTP Port FE dari instans target pada tab Instance Details di Konsol E-MapReduce.

    fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030

    username

    Username dan password untuk koneksi StarRocks.

    Gunakan username dan password yang Anda tentukan saat membuat instans StarRocks.

    Catatan

    Contoh ini menggunakan variabel untuk menghindari pengeksposan password teks biasa dan informasi sensitif lainnya. Untuk informasi selengkapnya, lihat Manajemen Variabel.

    ${secret_values.starrocksusername}

    password

    ${secret_values.starrockspassword}

    sink.buffer-flush.interval-ms

    Interval refresh buffer internal.

    Karena volume data dalam contoh ini kecil, interval pendek (5 detik) ditetapkan agar hasil dapat diamati lebih cepat.

    5000

    route

    source-table

    Menentukan tabel leluhur yang akan dirutekan.

    Anda dapat menggunakan ekspresi reguler untuk mencocokkan beberapa tabel. Misalnya, order_dw_mysql.\.* merutekan semua tabel dalam database order_dw_mysql.

    order_dw_mysql.\.*

    sink-table

    Menentukan tujuan routing data.

    Anda dapat menggunakan simbol dari replace-symbol sebagai placeholder untuk setiap nama tabel leluhur guna menerapkan perutean banyak-ke-banyak.

    Untuk informasi selengkapnya tentang aturan routing, lihat Modul Route.

    order_dw_sr.<>

    replace-symbol

    String yang merepresentasikan nama tabel leluhur saat Anda menggunakan fitur pencocokan pola.

    <>

  7. Klik Deploy.

Langkah 3: Jalankan pekerjaan ingesti data Flink CDC

  1. Pada halaman Data Ingestion, klik Deploy. Pada kotak dialog yang muncul, klik OK.

  2. Pada halaman Operation Center > Job O&M, temukan pekerjaan YAML target dan klik Start pada kolom Actions.

  3. Klik Start.

    Dalam contoh ini, Stateless Start dipilih. Untuk informasi selengkapnya tentang pengaturan parameter, lihat Jalankan pekerjaan. Setelah pekerjaan dimulai, Anda dapat memantau informasi waktu proses dan statusnya pada halaman Job O&M.

Step 4: View the synchronization result in StarRocks

Setelah pekerjaan YAML memasuki status Running, Anda dapat melihat hasil sinkronisasi data di StarRocks.

  1. Hubungkan ke instans StarRocks menggunakan EMR StarRocks Manager.

  2. Pada panel navigasi di sebelah kiri, klik SQL Editor. Pada tab Databases, klik ikon image.

    Database bernama order_dw_sr muncul di bawah default_catalog.

  3. Pada tab Query List, klik +File untuk membuat query script. Kemudian, masukkan pernyataan SQL berikut dan klik Run.

    SELECT * FROM default_catalog.order_dw_sr.orders order by order_id;
    SELECT * FROM default_catalog.order_dw_sr.orders_pay order by pay_id;
    SELECT * FROM default_catalog.order_dw_sr.product_catalog order by product_id;
  4. Lihat hasil sinkronisasi di bawah perintah.

    Anda dapat melihat bahwa tabel dan data dari database MySQL kini tersedia di StarRocks.

    image

Referensi