MaxCompute menyediakan versi baru dari konektor Flink Change Data Capture (CDC). Anda dapat menggunakan konektor Flink CDC untuk menyinkronkan data dari sumber seperti MySQL ke tabel standar MaxCompute atau tabel Delta secara real-time. Topik ini menjelaskan cara menggunakan versi baru konektor Flink CDC untuk menyinkronkan data ke MaxCompute.
Informasi Latar Belakang Flink CDC
Flink CDC adalah alat open source ujung ke ujung untuk integrasi data waktu nyata. Alat ini mendefinisikan serangkaian API yang sepenuhnya berfungsi dan menyediakan kerangka kerja ETL (ekstrak, transformasi, dan muat) untuk memproses data. Anda dapat menjalankan pekerjaan Apache Flink untuk memanfaatkan fitur yang disediakan oleh Flink CDC. Untuk informasi lebih lanjut, lihat Selamat Datang di Flink CDC. Dalam integrasi mendalam dengan dan didukung oleh Apache Flink, Flink CDC menyediakan fitur inti berikut:
Menyediakan kerangka kerja integrasi data ujung ke ujung.
Menyediakan API untuk pengguna integrasi data guna membangun pekerjaan secara efisien.
Memproses beberapa tabel dalam sumber atau sink.
Mendukung sinkronisasi semua data dari database.
Mendukung evolusi skema.
Prasyarat
Proyek MaxCompute telah dibuat. Untuk informasi lebih lanjut, lihat Buat Proyek MaxCompute.
Peringatan
Konektor Flink CDC dapat membuat tabel secara otomatis. Saat menggunakan konektor untuk menyinkronkan data, lokasi dan tipe data antara tabel MaxCompute dan tabel sumber dipetakan secara otomatis. Jika tabel sumber memiliki kunci utama, tabel Delta akan dibuat secara otomatis. Jika tabel sumber tidak memiliki kunci utama, tabel standar MaxCompute akan dibuat. Untuk informasi lebih lanjut tentang pemetaan lokasi dan tipe data antara tabel sumber dan tabel MaxCompute, lihat Pemetaan Lokasi Tabel dan Pemetaan Tipe Data.
Jika data ditulis ke tabel standar MaxCompute, sistem mengabaikan operasi
DELETE. OperasiUPDATEdianggap sebagai operasiINSERT.Hanya semantik setidaknya sekali yang didukung. Penulisan idempoten dapat diimplementasikan dalam tabel Delta berdasarkan kunci utama dalam tabel Delta.
Perubahan skema tabel sumber dapat disinkronkan ke tabel MaxCompute.
Kolom baru hanya dapat ditambahkan ke tabel MaxCompute sebagai kolom terakhir.
Anda dapat mengubah tipe data kolom hanya ke tipe data yang kompatibel dengan tipe data aslinya. Untuk informasi lebih lanjut tentang konversi antara tipe data, lihat Ubah Tipe Data Kolom.
Memulai
Topik ini menjelaskan cara mengembangkan pekerjaan ETL streaming untuk menyinkronkan data perubahan dari MySQL ke MaxCompute menggunakan pipeline Flink CDC. Dalam pipeline Flink CDC, Anda dapat menyinkronkan semua data dalam database, perubahan skema tabel, dan data dari tabel dalam database sharded.
Siapkan Lingkungan
Siapkan Kluster Flink yang Diterapkan dalam Mode Mandiri
Unduh paket flink-1.18.0-bin-scala_2.12.tgz dan dekompresi paket tersebut untuk mendapatkan direktori
flink-1.18.0. Masuk ke direktoriflink-1.18.0dan jalankan perintah berikut untuk menyetel direktori instalasi flink-1.18.0 ke FLINK_HOME:export FLINK_HOME=$(pwd)Jalankan perintah
vim flink-conf.yamldi direktori$flink-1.18.0/conf, tambahkan parameter berikut ke file konfigurasi, lalu simpan file tersebut.# Aktifkan fitur checkpointing. Jalankan checkpoint setiap 3 detik. # Konfigurasi ini diberikan hanya untuk tujuan pengujian. Kami sarankan Anda menyetel interval checkpoint tidak kurang dari 30 detik untuk sebuah pekerjaan. execution.checkpointing.interval: 3000 # flink-cdc-pipeline-connector-maxcompute bergantung pada mekanisme komunikasi Flink untuk sinkronisasi data. # Tingkatkan periode timeout untuk komunikasi Flink. pekko.ask.timeout: 60sJalankan perintah berikut untuk memulai kluster Flink:
./bin/start-cluster.shJika startup berhasil, Anda dapat memasukkan http://localhost:8081/ di bilah alamat browser web untuk mengakses UI web Flink. 8081 adalah nomor port default.
Anda dapat menjalankan perintah start-cluster.sh beberapa kali untuk menjalankan beberapa TaskManagers secara paralel.
Siapkan Lingkungan MySQL
Dalam contoh ini, Docker Compose digunakan untuk menyiapkan lingkungan MySQL.
Mulai gambar Docker dan buat file bernama
docker-compose.yaml. Kode berikut menunjukkan isi file tersebut:version: '2.1' services: mysql: image: debezium/example-mysql:1.1 ports: - "3306:3306" environment: - MYSQL_ROOT_PASSWORD=123456 - MYSQL_USER=mysqluser - MYSQL_PASSWORD=mysqlpwTabel berikut menjelaskan parameter.
Parameter
Deskripsi
version
Versi Docker.
image
Versi gambar Docker. Setel nilai menjadi debezium/example-mysql:1.1.
ports
Nomor port instance MySQL.
environment
Nama pengguna dan kata sandi instance MySQL.
Dalam contoh ini, database MySQL app_db yang berisi informasi produk digunakan dalam Docker Compose.
Jalankan perintah berikut di direktori tempat file docker-compose.yaml disimpan untuk memulai komponen yang diperlukan:
docker-compose up -dPerintah ini secara otomatis memulai semua kontainer yang didefinisikan dalam konfigurasi Docker Compose dalam mode detached. Anda dapat menjalankan perintah
docker psuntuk memeriksa apakah kontainer telah dimulai.
Siapkan Data di Database MySQL
Jalankan perintah berikut untuk mengakses kontainer MySQL:
docker-compose exec mysql mysql -uroot -p123456Buat database di MySQL dan siapkan data tabel.
Buat database.
CREATE DATABASE app_db; USE app_db;Siapkan data tabel.
Buat tabel bernama orders dan masukkan data ke dalam tabel.
CREATE TABLE `orders` ( `id` INT NOT NULL, `price` DECIMAL(10,2) NOT NULL, PRIMARY KEY (`id`) ); -- Masukkan data ke dalam tabel. INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00); INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);Buat tabel bernama shipments dan masukkan data ke dalam tabel.
CREATE TABLE `shipments` ( `id` INT NOT NULL, `city` VARCHAR(255) NOT NULL, PRIMARY KEY (`id`) ); -- Masukkan data ke dalam tabel. INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing'); INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');Buat tabel bernama products dan masukkan data ke dalam tabel.
-- CREATE TABLE `products` ( `id` INT NOT NULL, `product` VARCHAR(255) NOT NULL, PRIMARY KEY (`id`) ); -- Masukkan data ke dalam tabel. INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer'); INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap'); INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');
Kirim File YAML Menggunakan CLI Flink CDC
Unduh paket JAR yang diperlukan:
paket flink-cdc
Pergi ke halaman flink-cdc untuk mengunduh paket biner terkompresi flink-cdc-3.1.1-bin.tar.gz dan dekompresi paket tersebut untuk mendapatkan direktori
flink-cdc-3.1.1. Direktori flink-cdc-3.1.1 berisi direktori bin, lib, log, dan conf. Kemudian, pindahkan file-file dalam direktori-direktori ini ke direktori terkait untuk flink-1.18.0.Paket konektor
Unduh paket konektor berikut dan pindahkan paket tersebut ke direktori
flink-1.18.0/lib.CatatanAnda dapat mengklik tautan unduhan untuk mengunduh hanya versi konektor yang dirilis. Jika Anda ingin merujuk versi SNAPSHOT, Anda harus mengkompilasi kode sumber versi tersebut berdasarkan cabang master atau rilis di mesin lokal Anda.
Paket driver
Unduh paket MySQL Connector Java dan teruskan paket tersebut ke CLI Flink CDC menggunakan parameter --jar, atau letakkan paket tersebut di direktori
$flink-1.18.0/libdan mulai ulang kluster Flink. Ini karena konektor CDC tidak lagi berisi driver-driver ini.
Tulis file YAML untuk konfigurasi tugas. Contoh kode berikut memberikan contoh file
mysql-to-maxcompute.yamluntuk sinkronisasi database:################################################################################ # Deskripsi: Sinkronkan semua tabel MySQL ke MaxCompute ################################################################################ source: type: mysql hostname: localhost port: 3306 username: root password: 123456 tables: app_db.\.* server-id: 5400-5404 server-time-zone: UTC # Konfigurasikan parameter accessId, accessKey, endpoint, dan project. sink: type: maxcompute name: MaxComputeSink accessId: ${your_accessId} accessKey: ${your_accessKey} endpoint: ${your_maxcompute_endpoint} project: ${your_project} bucketsNum: 8 pipeline: name: Sinkronkan Database MySQL ke MaxCompute parallelism: 1Parameter:
Untuk informasi lebih lanjut tentang parameter di bagian sumber, lihat Konektor MySQL.
Untuk informasi lebih lanjut tentang parameter di bagian sink, lihat Item Konfigurasi Konektor.
Jalankan perintah berikut untuk mengirimkan tugas ke kluster Flink yang diterapkan dalam mode mandiri:
./bin/flink-cdc.sh mysql-to-maxcompute.yamlSetelah tugas dikirimkan, hasil berikut dikembalikan:
Pipeline telah dikirimkan ke kluster. Job ID: f9f9689866946e25bf151ecc179ef46f Deskripsi Job: Sinkronkan Database MySQL ke MaxComputeDi UI web Flink, tugas bernama
Sinkronkan Database MySQL ke MaxComputesedang berjalan.Eksekusi pernyataan SQL berikut di MaxCompute untuk memeriksa apakah tabel orders, shipments, dan products telah dibuat dan data dapat ditulis ke tabel-tabel tersebut.
-- Query tabel orders. read orders; -- Hasil berikut dikembalikan: +------------+------------+ | id | price | +------------+------------+ | 1 | 4 | | 2 | 100 | +------------+------------+ -- Query tabel shipments. read shipments; -- Hasil berikut dikembalikan: +------------+------------+ | id | city | +------------+------------+ | 1 | beijing | | 2 | xian | +------------+------------+ -- Query tabel products. read products; -- Hasil berikut dikembalikan: +------------+------------+ | id | product | +------------+------------+ | 3 | Peanut | | 1 | Beer | | 2 | Cap | +------------+------------+
Sinkronkan Perubahan Secara Real-Time
Dalam contoh ini, tabel orders digunakan. Saat data di tabel sumber dalam database MySQL diubah, data di tabel tujuan MaxCompute juga diubah secara real-time.
Jalankan perintah berikut untuk mengakses kontainer MySQL:
docker-compose exec mysql mysql -uroot -p123456Masukkan catatan data ke tabel orders MySQL.
INSERT INTO app_db.orders (id, price) VALUES (3, 100.00);Jalankan perintah
read orders;di MaxCompute untuk menanyakan data di tabel orders. Hasil berikut dikembalikan:+------------+------------+ | id | price | +------------+------------+ | 3 | 100 | | 1 | 4 | | 2 | 100 | +------------+------------+Tambahkan bidang ke tabel orders MySQL.
ALTER TABLE app_db.orders ADD amount varchar(100) NULL;Jalankan perintah
read orders;di MaxCompute untuk menanyakan data di tabel orders. Hasil berikut dikembalikan:+------------+------------+------------+ | id | price | amount | +------------+------------+------------+ | 3 | 100 | NULL | | 1 | 4 | NULL | | 2 | 100 | NULL | +------------+------------+------------+Perbarui catatan data di tabel orders MySQL.
UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1;Jalankan perintah
read orders;di MaxCompute untuk menanyakan data di tabel orders. Hasil berikut dikembalikan:+------------+------------+------------+ | id | price | amount | +------------+------------+------------+ | 3 | 100 | NULL | | 1 | 100 | 100.00 | | 2 | 100 | NULL | +------------+------------+------------+Hapus catatan data dari tabel orders MySQL.
DELETE FROM app_db.orders WHERE id=2;Jalankan perintah
read orders;di MaxCompute untuk menanyakan data di tabel orders. Hasil berikut dikembalikan:+------------+------------+------------+ | id | price | amount | +------------+------------+------------+ | 3 | 100 | NULL | | 1 | 100 | 100.00 | +------------+------------+------------+
Setiap kali Anda melakukan operasi di MySQL, pratinjau data dilakukan di MaxCompute. Data yang ditampilkan di tabel orders di MaxCompute diperbarui secara real-time.
Routing Perubahan
Flink CDC menyediakan fitur untuk merutekan skema atau data tabel sumber ke nama tabel lain. Anda dapat menggunakan fitur ini untuk melakukan operasi seperti penggantian nama tabel atau database, dan sinkronisasi database. Contoh kode berikut memberikan contoh.
################################################################################
# Deskripsi: Sinkronkan semua tabel MySQL ke MaxCompute
################################################################################
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: app_db.\.*
server-id: 5400-5404
server-time-zone: UTC
# Konfigurasikan parameter accessId, accessKey, endpoint, dan project.
sink:
type: maxcompute
name: MaxComputeSink
accessId: ${your_accessId}
accessKey: ${your_accessKey}
endpoint: ${your_maxcompute_endpoint}
project: ${your_project}
bucketsNum: 8
route:
- source-table: app_db.orders
sink-table: ods_db.ods_orders
- source-table: app_db.shipments
sink-table: ods_db.ods_shipments
- source-table: app_db.products
sink-table: ods_db.ods_products
pipeline:
name: Sinkronkan Database MySQL ke MaxCompute
parallelism: 1Untuk informasi lebih lanjut tentang parameter di bagian route, lihat Route.
Konfigurasi sebelumnya di bagian route digunakan untuk menyinkronkan skema dan data tabel app_db.orders ke tabel ods_db.ods_orders. Dengan cara ini, migrasi database diimplementasikan. Anda dapat menggunakan ekspresi reguler untuk mencocokkan beberapa tabel di source-table untuk menyinkronkan data dari beberapa tabel dalam database sharded. Contoh kode:
route:
- source-table: app_db.order\.*
sink-table: ods_db.ods_ordersDalam kasus ini, data dari tabel-tabel, seperti app_db.order01, app_db.order02, dan app_db.order03, dapat digabungkan ke dalam tabel ods_db.ods_orders.
Skenario di mana nilai kunci utama duplikat ada di beberapa tabel tidak didukung dan akan didukung di versi konektor selanjutnya.
Bersihkan Lingkungan
Setelah Anda melakukan operasi sebelumnya, Anda harus membersihkan lingkungan.
Jalankan perintah berikut di direktori tempat file docker-compose.yml disimpan untuk menghentikan semua kontainer:
docker-compose downJalankan perintah berikut di direktori flink-1.18.0 tempat Flink berada untuk menghentikan kluster Flink:
./bin/stop-cluster.sh
Lampiran
Item konfigurasi konektor
Parameter | Diperlukan | Nilai Default | Tipe Data | Deskripsi |
type | Ya | none | String | Konektor yang ingin Anda gunakan. Setel nilainya menjadi |
name | Tidak | none | String | Nama sink. |
accessId | Ya | none | String | ID AccessKey akun Alibaba Cloud atau pengguna RAM Anda. Anda dapat memperoleh ID AccessKey dari halaman Pasangan AccessKey. |
accessKey | Ya | none | String | Rahasia AccessKey yang sesuai dengan ID AccessKey. |
endpoint | Ya | none | String | Titik akhir MaxCompute. Anda harus mengonfigurasi parameter ini berdasarkan wilayah dan metode koneksi jaringan yang Anda pilih saat membuat proyek MaxCompute. Untuk informasi lebih lanjut tentang titik akhir yang digunakan di berbagai wilayah dan mode koneksi jaringan yang berbeda, lihat Titik Akhir. |
project | Ya | none | String | Nama proyek MaxCompute. Untuk mendapatkan nama proyek MaxCompute, lakukan langkah-langkah berikut: Masuk ke konsol MaxCompute. Di panel navigasi sebelah kiri, pilih Workspace > Projects untuk melihat nama proyek MaxCompute. |
tunnelEndpoint | Tidak | none | String | Titik akhir layanan MaxCompute Tunnel. Dalam banyak kasus, titik akhir mendukung perutean otomatis berdasarkan wilayah tempat proyek berada. Titik akhir hanya digunakan dalam lingkungan jaringan khusus, seperti lingkungan jaringan yang menggunakan proxy. |
quotaName | Tidak | none | String | Nama grup sumber daya eksklusif untuk MaxCompute Tunnel. Jika parameter ini tidak dikonfigurasi, grup sumber daya bersama akan digunakan. |
stsToken | Tidak | none | String | Parameter ini diperlukan jika Anda menggunakan token Layanan Keamanan (STS) yang diterbitkan oleh peran RAM untuk autentikasi. |
bucketsNum | Tidak | 16 | Integer | Jumlah bucket yang digunakan untuk membuat tabel Delta MaxCompute secara otomatis. Untuk informasi lebih lanjut, lihat Gudang data waktu-nyata mendekati. |
compressAlgorithm | Tidak | zlib | String | Algoritma kompresi yang digunakan saat menulis data ke MaxCompute. Nilai valid: |
totalBatchSize | Tidak | 64MB | String | Jumlah maksimum data dalam buffer partisi atau tabel non-partisi. Buffer dari partisi atau tabel non-partisi yang berbeda independen satu sama lain. Saat data dalam buffer melebihi nilai parameter ini, data ditulis ke MaxCompute. |
bucketBatchSize | Tidak | 4MB | String | Jumlah maksimum data dalam buffer bucket. Parameter ini hanya berlaku jika Anda menulis data ke tabel Delta. Buffer dari bucket yang berbeda independen satu sama lain. Saat data dalam buffer melebihi nilai parameter ini, data ditulis ke MaxCompute. |
numCommitThreads | Tidak | 16 | Integer | Jumlah maksimum partisi atau tabel yang dapat diproses pada saat yang sama selama operasi checkpointing. |
numFlushConcurrent | Tidak | 4 | Integer | Jumlah maksimum bucket yang dapat ditulis ke MaxCompute pada saat yang sama. Parameter ini hanya berlaku jika Anda menulis data ke tabel Delta. |
retryTimes | Tidak | 3 | Integer | Jumlah maksimum percobaan ulang saat terjadi kesalahan koneksi jaringan. |
sleepMillis | Tidak | true | Long | Durasi tunggu untuk setiap percobaan ulang saat terjadi kesalahan koneksi jaringan. Unit: milidetik. |
Pemetaan Lokasi Tabel
Saat konektor Flink CDC membuat tabel secara otomatis, informasi lokasi tabel sumber dipetakan ke tabel MaxCompute berdasarkan pemetaan yang dijelaskan dalam tabel berikut.
Jika proyek MaxCompute tidak mendukung model skema, setiap tugas sinkronisasi hanya dapat menyinkronkan data dari satu database MySQL. Untuk sumber data lainnya, konektor Flink CDC mengabaikan informasi tableId.namespace.
Objek dalam Flink CDC | Lokasi di MaxCompute | Lokasi di MySQL |
Proyek dalam file konfigurasi | Proyek | tidak ada |
TableId.namespace | Skema (Konfigurasi ini hanya didukung jika proyek MaxCompute mendukung model skema. Jika proyek MaxCompute tidak mendukung model skema, konfigurasi ini diabaikan.) | Database |
TableId.tableName | Tabel | Tabel |
Pemetaan Tipe Data
Tipe data Flink | Tipe data MaxCompute |
CHAR/VARCHAR | STRING |
BOOLEAN | BOOLEAN |
BINARY/VARBINARY | BINARY |
DECIMAL | DECIMAL |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INTEGER | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
TIME_WITHOUT_TIME_ZONE | STRING |
DATE | DATE |
TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_NTZ |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | TIMESTAMP |
TIMESTAMP_WITH_TIME_ZONE | TIMESTAMP |
ARRAY | ARRAY |
MAP | MAP |
ROW | STRUCT |