全部产品
Search
文档中心

MaxCompute:Flink CDC Open-source untuk Penulisan Waktu-Nyata Mendekati ke Tabel Delta

更新时间:Jul 02, 2025

MaxCompute menyediakan versi baru dari konektor Flink Change Data Capture (CDC). Anda dapat menggunakan konektor Flink CDC untuk menyinkronkan data dari sumber seperti MySQL ke tabel standar MaxCompute atau tabel Delta secara real-time. Topik ini menjelaskan cara menggunakan versi baru konektor Flink CDC untuk menyinkronkan data ke MaxCompute.

Informasi Latar Belakang Flink CDC

Flink CDC adalah alat open source ujung ke ujung untuk integrasi data waktu nyata. Alat ini mendefinisikan serangkaian API yang sepenuhnya berfungsi dan menyediakan kerangka kerja ETL (ekstrak, transformasi, dan muat) untuk memproses data. Anda dapat menjalankan pekerjaan Apache Flink untuk memanfaatkan fitur yang disediakan oleh Flink CDC. Untuk informasi lebih lanjut, lihat Selamat Datang di Flink CDC. Dalam integrasi mendalam dengan dan didukung oleh Apache Flink, Flink CDC menyediakan fitur inti berikut:

  • Menyediakan kerangka kerja integrasi data ujung ke ujung.

  • Menyediakan API untuk pengguna integrasi data guna membangun pekerjaan secara efisien.

  • Memproses beberapa tabel dalam sumber atau sink.

  • Mendukung sinkronisasi semua data dari database.

  • Mendukung evolusi skema.

Prasyarat

Proyek MaxCompute telah dibuat. Untuk informasi lebih lanjut, lihat Buat Proyek MaxCompute.

Peringatan

  • Konektor Flink CDC dapat membuat tabel secara otomatis. Saat menggunakan konektor untuk menyinkronkan data, lokasi dan tipe data antara tabel MaxCompute dan tabel sumber dipetakan secara otomatis. Jika tabel sumber memiliki kunci utama, tabel Delta akan dibuat secara otomatis. Jika tabel sumber tidak memiliki kunci utama, tabel standar MaxCompute akan dibuat. Untuk informasi lebih lanjut tentang pemetaan lokasi dan tipe data antara tabel sumber dan tabel MaxCompute, lihat Pemetaan Lokasi Tabel dan Pemetaan Tipe Data.

  • Jika data ditulis ke tabel standar MaxCompute, sistem mengabaikan operasi DELETE. Operasi UPDATE dianggap sebagai operasi INSERT.

  • Hanya semantik setidaknya sekali yang didukung. Penulisan idempoten dapat diimplementasikan dalam tabel Delta berdasarkan kunci utama dalam tabel Delta.

  • Perubahan skema tabel sumber dapat disinkronkan ke tabel MaxCompute.

  • Kolom baru hanya dapat ditambahkan ke tabel MaxCompute sebagai kolom terakhir.

  • Anda dapat mengubah tipe data kolom hanya ke tipe data yang kompatibel dengan tipe data aslinya. Untuk informasi lebih lanjut tentang konversi antara tipe data, lihat Ubah Tipe Data Kolom.

Memulai

Topik ini menjelaskan cara mengembangkan pekerjaan ETL streaming untuk menyinkronkan data perubahan dari MySQL ke MaxCompute menggunakan pipeline Flink CDC. Dalam pipeline Flink CDC, Anda dapat menyinkronkan semua data dalam database, perubahan skema tabel, dan data dari tabel dalam database sharded.

Siapkan Lingkungan

Siapkan Kluster Flink yang Diterapkan dalam Mode Mandiri

  1. Unduh paket flink-1.18.0-bin-scala_2.12.tgz dan dekompresi paket tersebut untuk mendapatkan direktori flink-1.18.0. Masuk ke direktori flink-1.18.0 dan jalankan perintah berikut untuk menyetel direktori instalasi flink-1.18.0 ke FLINK_HOME:

    export FLINK_HOME=$(pwd)
  2. Jalankan perintah vim flink-conf.yaml di direktori $flink-1.18.0/conf, tambahkan parameter berikut ke file konfigurasi, lalu simpan file tersebut.

    # Aktifkan fitur checkpointing. Jalankan checkpoint setiap 3 detik.
    # Konfigurasi ini diberikan hanya untuk tujuan pengujian. Kami sarankan Anda menyetel interval checkpoint tidak kurang dari 30 detik untuk sebuah pekerjaan.
    execution.checkpointing.interval: 3000
    
    # flink-cdc-pipeline-connector-maxcompute bergantung pada mekanisme komunikasi Flink untuk sinkronisasi data.
    # Tingkatkan periode timeout untuk komunikasi Flink.
    pekko.ask.timeout: 60s
  3. Jalankan perintah berikut untuk memulai kluster Flink:

    ./bin/start-cluster.sh

    Jika startup berhasil, Anda dapat memasukkan http://localhost:8081/ di bilah alamat browser web untuk mengakses UI web Flink. 8081 adalah nomor port default.

    Anda dapat menjalankan perintah start-cluster.sh beberapa kali untuk menjalankan beberapa TaskManagers secara paralel.

Siapkan Lingkungan MySQL

Dalam contoh ini, Docker Compose digunakan untuk menyiapkan lingkungan MySQL.

  1. Mulai gambar Docker dan buat file bernama docker-compose.yaml. Kode berikut menunjukkan isi file tersebut:

    version: '2.1'
    services:
      mysql:
        image: debezium/example-mysql:1.1
        ports:
          - "3306:3306"
        environment:
          - MYSQL_ROOT_PASSWORD=123456
          - MYSQL_USER=mysqluser
          - MYSQL_PASSWORD=mysqlpw

    Tabel berikut menjelaskan parameter.

    Parameter

    Deskripsi

    version

    Versi Docker.

    image

    Versi gambar Docker. Setel nilai menjadi debezium/example-mysql:1.1.

    ports

    Nomor port instance MySQL.

    environment

    Nama pengguna dan kata sandi instance MySQL.

    Dalam contoh ini, database MySQL app_db yang berisi informasi produk digunakan dalam Docker Compose.

  2. Jalankan perintah berikut di direktori tempat file docker-compose.yaml disimpan untuk memulai komponen yang diperlukan:

    docker-compose up -d

    Perintah ini secara otomatis memulai semua kontainer yang didefinisikan dalam konfigurasi Docker Compose dalam mode detached. Anda dapat menjalankan perintah docker ps untuk memeriksa apakah kontainer telah dimulai.

Siapkan Data di Database MySQL

  1. Jalankan perintah berikut untuk mengakses kontainer MySQL:

    docker-compose exec mysql mysql -uroot -p123456
  2. Buat database di MySQL dan siapkan data tabel.

    1. Buat database.

      CREATE DATABASE app_db;
      USE app_db;
    2. Siapkan data tabel.

      • Buat tabel bernama orders dan masukkan data ke dalam tabel.

        CREATE TABLE `orders` (
        `id` INT NOT NULL,
        `price` DECIMAL(10,2) NOT NULL,
        PRIMARY KEY (`id`)
        );
        
        -- Masukkan data ke dalam tabel.
        INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
        INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);
      • Buat tabel bernama shipments dan masukkan data ke dalam tabel.

        CREATE TABLE `shipments` (
        `id` INT NOT NULL,
        `city` VARCHAR(255) NOT NULL,
        PRIMARY KEY (`id`)
        );
        
        -- Masukkan data ke dalam tabel.
        INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing');
        INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');
      • Buat tabel bernama products dan masukkan data ke dalam tabel.

        -- 
        CREATE TABLE `products` (
        `id` INT NOT NULL,
        `product` VARCHAR(255) NOT NULL,
        PRIMARY KEY (`id`)
        );
        
        -- Masukkan data ke dalam tabel.
        INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer');
        INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap');
        INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');

Kirim File YAML Menggunakan CLI Flink CDC

  1. Unduh paket JAR yang diperlukan:

    • paket flink-cdc

      Pergi ke halaman flink-cdc untuk mengunduh paket biner terkompresi flink-cdc-3.1.1-bin.tar.gz dan dekompresi paket tersebut untuk mendapatkan direktori flink-cdc-3.1.1. Direktori flink-cdc-3.1.1 berisi direktori bin, lib, log, dan conf. Kemudian, pindahkan file-file dalam direktori-direktori ini ke direktori terkait untuk flink-1.18.0.

    • Paket konektor

      Unduh paket konektor berikut dan pindahkan paket tersebut ke direktori flink-1.18.0/lib.

      Catatan

      Anda dapat mengklik tautan unduhan untuk mengunduh hanya versi konektor yang dirilis. Jika Anda ingin merujuk versi SNAPSHOT, Anda harus mengkompilasi kode sumber versi tersebut berdasarkan cabang master atau rilis di mesin lokal Anda.

    • Paket driver

      Unduh paket MySQL Connector Java dan teruskan paket tersebut ke CLI Flink CDC menggunakan parameter --jar, atau letakkan paket tersebut di direktori $flink-1.18.0/lib dan mulai ulang kluster Flink. Ini karena konektor CDC tidak lagi berisi driver-driver ini.

  2. Tulis file YAML untuk konfigurasi tugas. Contoh kode berikut memberikan contoh file mysql-to-maxcompute.yaml untuk sinkronisasi database:

    ################################################################################
    # Deskripsi: Sinkronkan semua tabel MySQL ke MaxCompute
    ################################################################################
    source:
      type: mysql
      hostname: localhost
      port: 3306
      username: root
      password: 123456
      tables: app_db.\.*
      server-id: 5400-5404
      server-time-zone: UTC
    
    # Konfigurasikan parameter accessId, accessKey, endpoint, dan project.
    sink:
       type: maxcompute
       name: MaxComputeSink
       accessId: ${your_accessId}
       accessKey: ${your_accessKey}
       endpoint: ${your_maxcompute_endpoint}
       project: ${your_project}
       bucketsNum: 8
    
    pipeline:
      name: Sinkronkan Database MySQL ke MaxCompute
      parallelism: 1
    

    Parameter:

  3. Jalankan perintah berikut untuk mengirimkan tugas ke kluster Flink yang diterapkan dalam mode mandiri:

    ./bin/flink-cdc.sh mysql-to-maxcompute.yaml

    Setelah tugas dikirimkan, hasil berikut dikembalikan:

    Pipeline telah dikirimkan ke kluster.
    Job ID: f9f9689866946e25bf151ecc179ef46f
    Deskripsi Job: Sinkronkan Database MySQL ke MaxCompute

    Di UI web Flink, tugas bernama Sinkronkan Database MySQL ke MaxCompute sedang berjalan.

  4. Eksekusi pernyataan SQL berikut di MaxCompute untuk memeriksa apakah tabel orders, shipments, dan products telah dibuat dan data dapat ditulis ke tabel-tabel tersebut.

    -- Query tabel orders.
    read orders;
    
    -- Hasil berikut dikembalikan:
    +------------+------------+
    | id         | price      |
    +------------+------------+
    | 1          | 4          |
    | 2          | 100        |
    +------------+------------+
    
    -- Query tabel shipments.
    read shipments;
    
    -- Hasil berikut dikembalikan:
    +------------+------------+
    | id         | city       |
    +------------+------------+
    | 1          | beijing    |
    | 2          | xian       |
    +------------+------------+
    
    -- Query tabel products.
    read products;
    
    -- Hasil berikut dikembalikan:
    +------------+------------+
    | id         | product    |
    +------------+------------+
    | 3          | Peanut     |
    | 1          | Beer       |
    | 2          | Cap        |
    +------------+------------+

Sinkronkan Perubahan Secara Real-Time

Dalam contoh ini, tabel orders digunakan. Saat data di tabel sumber dalam database MySQL diubah, data di tabel tujuan MaxCompute juga diubah secara real-time.

  1. Jalankan perintah berikut untuk mengakses kontainer MySQL:

    docker-compose exec mysql mysql -uroot -p123456
  2. Masukkan catatan data ke tabel orders MySQL.

    INSERT INTO app_db.orders (id, price) VALUES (3, 100.00);

    Jalankan perintah read orders; di MaxCompute untuk menanyakan data di tabel orders. Hasil berikut dikembalikan:

    +------------+------------+
    | id         | price      |
    +------------+------------+
    | 3          | 100        |
    | 1          | 4          |
    | 2          | 100        |
    +------------+------------+
  3. Tambahkan bidang ke tabel orders MySQL.

    ALTER TABLE app_db.orders ADD amount varchar(100) NULL;

    Jalankan perintah read orders; di MaxCompute untuk menanyakan data di tabel orders. Hasil berikut dikembalikan:

    +------------+------------+------------+
    | id         | price      | amount     |
    +------------+------------+------------+
    | 3          | 100        | NULL       |
    | 1          | 4          | NULL       |
    | 2          | 100        | NULL       |
    +------------+------------+------------+
  4. Perbarui catatan data di tabel orders MySQL.

    UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1;

    Jalankan perintah read orders; di MaxCompute untuk menanyakan data di tabel orders. Hasil berikut dikembalikan:

    +------------+------------+------------+
    | id         | price      | amount     |
    +------------+------------+------------+
    | 3          | 100        | NULL       |
    | 1          | 100        | 100.00     |
    | 2          | 100        | NULL       |
    +------------+------------+------------+
  5. Hapus catatan data dari tabel orders MySQL.

    DELETE FROM app_db.orders WHERE id=2;

    Jalankan perintah read orders; di MaxCompute untuk menanyakan data di tabel orders. Hasil berikut dikembalikan:

    +------------+------------+------------+
    | id         | price      | amount     |
    +------------+------------+------------+
    | 3          | 100        | NULL       |
    | 1          | 100        | 100.00     |
    +------------+------------+------------+

Setiap kali Anda melakukan operasi di MySQL, pratinjau data dilakukan di MaxCompute. Data yang ditampilkan di tabel orders di MaxCompute diperbarui secara real-time.

Routing Perubahan

Flink CDC menyediakan fitur untuk merutekan skema atau data tabel sumber ke nama tabel lain. Anda dapat menggunakan fitur ini untuk melakukan operasi seperti penggantian nama tabel atau database, dan sinkronisasi database. Contoh kode berikut memberikan contoh.

################################################################################
# Deskripsi: Sinkronkan semua tabel MySQL ke MaxCompute
################################################################################
source:
   type: mysql
   hostname: localhost
   port: 3306
   username: root
   password: 123456
   tables: app_db.\.*
   server-id: 5400-5404
   server-time-zone: UTC

# Konfigurasikan parameter accessId, accessKey, endpoint, dan project.
sink:
   type: maxcompute
   name: MaxComputeSink
   accessId: ${your_accessId}
   accessKey: ${your_accessKey}
   endpoint: ${your_maxcompute_endpoint}
   project: ${your_project}
   bucketsNum: 8

route:
   - source-table: app_db.orders
     sink-table: ods_db.ods_orders
   - source-table: app_db.shipments
     sink-table: ods_db.ods_shipments
   - source-table: app_db.products
     sink-table: ods_db.ods_products

pipeline:
   name: Sinkronkan Database MySQL ke MaxCompute
   parallelism: 1

Untuk informasi lebih lanjut tentang parameter di bagian route, lihat Route.

Konfigurasi sebelumnya di bagian route digunakan untuk menyinkronkan skema dan data tabel app_db.orders ke tabel ods_db.ods_orders. Dengan cara ini, migrasi database diimplementasikan. Anda dapat menggunakan ekspresi reguler untuk mencocokkan beberapa tabel di source-table untuk menyinkronkan data dari beberapa tabel dalam database sharded. Contoh kode:

route:
  - source-table: app_db.order\.*
    sink-table: ods_db.ods_orders

Dalam kasus ini, data dari tabel-tabel, seperti app_db.order01, app_db.order02, dan app_db.order03, dapat digabungkan ke dalam tabel ods_db.ods_orders.

Catatan

Skenario di mana nilai kunci utama duplikat ada di beberapa tabel tidak didukung dan akan didukung di versi konektor selanjutnya.

Bersihkan Lingkungan

Setelah Anda melakukan operasi sebelumnya, Anda harus membersihkan lingkungan.

  1. Jalankan perintah berikut di direktori tempat file docker-compose.yml disimpan untuk menghentikan semua kontainer:

    docker-compose down
  2. Jalankan perintah berikut di direktori flink-1.18.0 tempat Flink berada untuk menghentikan kluster Flink:

    ./bin/stop-cluster.sh

Lampiran

Item konfigurasi konektor

Parameter

Diperlukan

Nilai Default

Tipe Data

Deskripsi

type

Ya

none

String

Konektor yang ingin Anda gunakan. Setel nilainya menjadi maxcompute.

name

Tidak

none

String

Nama sink.

accessId

Ya

none

String

ID AccessKey akun Alibaba Cloud atau pengguna RAM Anda. Anda dapat memperoleh ID AccessKey dari halaman Pasangan AccessKey.

accessKey

Ya

none

String

Rahasia AccessKey yang sesuai dengan ID AccessKey.

endpoint

Ya

none

String

Titik akhir MaxCompute. Anda harus mengonfigurasi parameter ini berdasarkan wilayah dan metode koneksi jaringan yang Anda pilih saat membuat proyek MaxCompute. Untuk informasi lebih lanjut tentang titik akhir yang digunakan di berbagai wilayah dan mode koneksi jaringan yang berbeda, lihat Titik Akhir.

project

Ya

none

String

Nama proyek MaxCompute. Untuk mendapatkan nama proyek MaxCompute, lakukan langkah-langkah berikut: Masuk ke konsol MaxCompute. Di panel navigasi sebelah kiri, pilih Workspace > Projects untuk melihat nama proyek MaxCompute.

tunnelEndpoint

Tidak

none

String

Titik akhir layanan MaxCompute Tunnel. Dalam banyak kasus, titik akhir mendukung perutean otomatis berdasarkan wilayah tempat proyek berada. Titik akhir hanya digunakan dalam lingkungan jaringan khusus, seperti lingkungan jaringan yang menggunakan proxy.

quotaName

Tidak

none

String

Nama grup sumber daya eksklusif untuk MaxCompute Tunnel. Jika parameter ini tidak dikonfigurasi, grup sumber daya bersama akan digunakan.

stsToken

Tidak

none

String

Parameter ini diperlukan jika Anda menggunakan token Layanan Keamanan (STS) yang diterbitkan oleh peran RAM untuk autentikasi.

bucketsNum

Tidak

16

Integer

Jumlah bucket yang digunakan untuk membuat tabel Delta MaxCompute secara otomatis. Untuk informasi lebih lanjut, lihat Gudang data waktu-nyata mendekati.

compressAlgorithm

Tidak

zlib

String

Algoritma kompresi yang digunakan saat menulis data ke MaxCompute. Nilai valid: raw, zlib, dan snappy. Nilai raw menunjukkan bahwa data tidak dikompresi.

totalBatchSize

Tidak

64MB

String

Jumlah maksimum data dalam buffer partisi atau tabel non-partisi. Buffer dari partisi atau tabel non-partisi yang berbeda independen satu sama lain. Saat data dalam buffer melebihi nilai parameter ini, data ditulis ke MaxCompute.

bucketBatchSize

Tidak

4MB

String

Jumlah maksimum data dalam buffer bucket. Parameter ini hanya berlaku jika Anda menulis data ke tabel Delta. Buffer dari bucket yang berbeda independen satu sama lain. Saat data dalam buffer melebihi nilai parameter ini, data ditulis ke MaxCompute.

numCommitThreads

Tidak

16

Integer

Jumlah maksimum partisi atau tabel yang dapat diproses pada saat yang sama selama operasi checkpointing.

numFlushConcurrent

Tidak

4

Integer

Jumlah maksimum bucket yang dapat ditulis ke MaxCompute pada saat yang sama. Parameter ini hanya berlaku jika Anda menulis data ke tabel Delta.

retryTimes

Tidak

3

Integer

Jumlah maksimum percobaan ulang saat terjadi kesalahan koneksi jaringan.

sleepMillis

Tidak

true

Long

Durasi tunggu untuk setiap percobaan ulang saat terjadi kesalahan koneksi jaringan. Unit: milidetik.

Pemetaan Lokasi Tabel

Saat konektor Flink CDC membuat tabel secara otomatis, informasi lokasi tabel sumber dipetakan ke tabel MaxCompute berdasarkan pemetaan yang dijelaskan dalam tabel berikut.

Penting

Jika proyek MaxCompute tidak mendukung model skema, setiap tugas sinkronisasi hanya dapat menyinkronkan data dari satu database MySQL. Untuk sumber data lainnya, konektor Flink CDC mengabaikan informasi tableId.namespace.

Objek dalam Flink CDC

Lokasi di MaxCompute

Lokasi di MySQL

Proyek dalam file konfigurasi

Proyek

tidak ada

TableId.namespace

Skema (Konfigurasi ini hanya didukung jika proyek MaxCompute mendukung model skema. Jika proyek MaxCompute tidak mendukung model skema, konfigurasi ini diabaikan.)

Database

TableId.tableName

Tabel

Tabel

Pemetaan Tipe Data

Tipe data Flink

Tipe data MaxCompute

CHAR/VARCHAR

STRING

BOOLEAN

BOOLEAN

BINARY/VARBINARY

BINARY

DECIMAL

DECIMAL

TINYINT

TINYINT

SMALLINT

SMALLINT

INTEGER

INTEGER

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

TIME_WITHOUT_TIME_ZONE

STRING

DATE

DATE

TIMESTAMP_WITHOUT_TIME_ZONE

TIMESTAMP_NTZ

TIMESTAMP_WITH_LOCAL_TIME_ZONE

TIMESTAMP

TIMESTAMP_WITH_TIME_ZONE

TIMESTAMP

ARRAY

ARRAY

MAP

MAP

ROW

STRUCT