Pernyataan CREATE DATABASE AS (CDAS) mendukung sinkronisasi real-time skema tabel dan data pada tingkat database, termasuk sinkronisasi perubahan skema. Topik ini menjelaskan penggunaan pernyataan CDAS dan skenario terkait.
Disarankan untuk membuat pekerjaan menggunakan YAML untuk pengambilan data. Anda juga dapat mengonversi draf SQL yang ada berisi pernyataan CTAS atau CDAS menjadi draf YAML:
Pendahuluan: Anda dapat mengembangkan pekerjaan menggunakan YAML untuk menyinkronkan data dari sumber ke tujuan.
Keuntungan: Pernyataan CTAS dan CDAS mendukung kemampuan utama seperti sinkronisasi database, tabel, skema tabel, serta kolom komputasi kustom. Selain itu, fitur seperti evolusi skema real-time, sinkronisasi data log biner mentah, klausa WHERE, dan pemangkasan kolom juga didukung.
Untuk informasi lebih lanjut, lihat Gunakan penerapan YAML untuk mengambil data.
Informasi latar belakang
Sebagai gula sintaksis dari CREATE TABLE AS (CTAS), pernyataan CDAS digunakan untuk menyinkronkan data dari beberapa atau semua tabel dalam database, ideal untuk skenario integrasi data otomatis. Biasanya digunakan dengan katalog sumber dan tujuan, yang menyediakan manajemen metadata persisten untuk tabel. Pernyataan CDAS membantu menerapkan replikasi data penuh dan bertahap serta menyinkronkan data dan perubahan skema tanpa perlu membuat tabel target terlebih dahulu.
Pernyataan CDAS menawarkan keuntungan berikut:
Sintaksis yang disederhanakan
Realtime Compute for Apache Flink secara otomatis mengonversi pernyataan CDAS menjadi pernyataan CTAS, satu pernyataan CTAS per tabel. Pernyataan CDAS mewarisi kemampuan pernyataan CTAS untuk sinkronisasi data dan evolusi skema.
Optimasi Sumber Daya
Realtime Compute for Apache Flink mengoptimalkan tabel sumber dengan menggunakan satu simpul sumber untuk membaca dari beberapa tabel bisnis. Hal ini sangat bermanfaat untuk sumber MySQL CDC, mengurangi koneksi database dan mencegah penarikan log biner yang berlebihan, sehingga mengurangi beban baca keseluruhan pada database MySQL.
Kemampuan inti
Sinkronisasi data
Fitur | Deskripsi |
Menjalankan sinkronisasi data penuh dan bertahap dari beberapa tabel (atau semua tabel) dalam database ke setiap tabel sink terkait. | |
Cocokkan nama tabel sumber di seluruh shard database dengan menggunakan ekspresi reguler, konsolidasikan tabel-tabel tersebut, dan sinkronkan ke sink yang sesuai. | |
Sinkronkan tabel yang baru ditambahkan dengan memulai ulang pekerjaan Anda dari titik simpanan. | |
Mengizinkan Anda menggunakan pernyataan STATEMENT SET untuk mengirimkan beberapa pernyataan CDAS dan CTAS sebagai satu pekerjaan. Anda juga dapat menggabungkan dan menggunakan kembali data operator tabel sumber untuk mengurangi beban baca pada sumber data. |
Evolusi skema
Selama sinkronisasi data database, perubahan skema (seperti menambahkan kolom) dapat disebarkan ke sink. Kebijakan ini konsisten dengan pernyataan CTAS. Untuk informasi lebih lanjut, lihat Evolusi Skema.
Proses startup
Tabel berikut menunjukkan proses menyinkronkan data dari MySQL ke Hologres menggunakan pernyataan CDAS.
Diagram alir | Deskripsi |
Saat menjalankan pernyataan CDAS, Realtime Compute for Apache Flink melakukan hal berikut:
|
Prasyarat
Katalog tujuan harus terdaftar di ruang kerja. Untuk informasi lebih lanjut, lihat Kelola Katalog.
Batasan
Batasan sintaksis
Debugging draf SQL yang berisi pernyataan CDAS tidak didukung.
MiniBatch tidak didukung.
PentingPastikan konfigurasi MiniBatch telah dihapus sebelum membuat draf SQL yang berisi pernyataan CTAS atau CDAS. Lakukan langkah-langkah berikut:
Buka .
Pilih tab Deployment Defaults.
Di bagian Other Configuration, verifikasi bahwa konfigurasi MiniBatch telah dihapus.
Jika kesalahan dilaporkan saat membuat atau memulai penyebaran dari draf SQL, lihat Bagaimana cara memperbaiki kesalahan "Saat ini tidak mendukung penggabungan StreamExecMiniBatchAssigner type ExecNode dalam sintaksis CTAS/CDAS"?.
Sistem hulu dan hilir yang didukung
Tabel berikut mencantumkan sistem hulu dan hilir yang didukung oleh pernyataan CDAS:
Connector | Tabel sumber | Tabel sink | Catatan |
√ | × | Tampilan tidak dapat disinkronkan. | |
√ | × | ||
√ | × |
| |
× | √ | ||
× | √ | Saat Hologres berfungsi sebagai sistem tujuan sinkronisasi data, sistem secara otomatis membuat koneksi untuk setiap tabel berdasarkan nilai opsi Catatan
| |
× | √ | Dukungan terbatas pada StarRocks di Alibaba Cloud EMR. | |
× | √ | N/A. |
Catatan penggunaan
Sinkronisasi Tabel Baru
VVR 8.0.6 atau lebih baru: Setelah tabel ditambahkan, buat titik simpanan dan mulai ulang pekerjaan dari titik simpanan untuk menangkap dan menyinkronkan tabel baru. Untuk informasi lebih lanjut, lihat Sinkronkan Tabel Baru.
VVR 8.0.5 atau lebih lama: Tabel baru tidak ditangkap atau disinkronkan melalui restart pekerjaan. Gunakan salah satu metode berikut:
Metode
Deskripsi
Buat pekerjaan baru untuk menyinkronkan tabel baru
Biarkan pekerjaan yang ada tetap berjalan. Buat pekerjaan baru untuk menyinkronkan tabel baru. Contoh kode:
-- Buat pekerjaan untuk menyinkronkan data dari tabel baru bernama new_table CREATE TABLE IF NOT EXISTS new_table AS TABLE mysql.tpcds.new_table /*+ OPTIONS('server-id'='8008-8010') */;Bersihkan data yang disinkronkan dan mulai ulang pekerjaan Anda
Lakukan langkah-langkah berikut:
Batalkan pekerjaan yang ada.
Bersihkan data yang disinkronkan di sink.
Mulai ulang pekerjaan tanpa status untuk menyinkronkan data lagi.
Akses Baca/Tulis ke Sistem Eksternal
Untuk memastikan operasi berhasil, berikan izin baca/tulis yang diperlukan ke akun Anda saat:
Mengakses sumber daya eksternal lintas akun;
Mengakses sumber daya eksternal sebagai pengguna RAM atau Peran RAM.
Sintaksis
CREATE DATABASE IF NOT EXISTS <target_database>
[COMMENT database_comment]
[WITH (key1=val1, key2=val2, ...)]
AS DATABASE <source_database>
INCLUDING { ALL TABLES | TABLE 'table_name' }
[EXCLUDING TABLE 'table_name']
[/*+ OPTIONS(key1=val1, key2=val2, ... ) */]
<target_database>:
[catalog_name.]db_name
<source_database>:
[catalog_name.]db_namePernyataan CDAS menggunakan sintaksis dasar pernyataan CREATE DATABASE. Tabel berikut menjelaskan parameter:
Parameter | Deskripsi |
target_database | Nama database tujuan. Secara opsional sertakan nama katalog tujuan. |
COMMENT | Deskripsi database tujuan. Deskripsi source_database digunakan secara otomatis. |
WITH | Opsi untuk database tujuan. Untuk informasi lebih lanjut, lihat dokumen masing-masing di bawah Kelola katalog. Catatan Baik kunci maupun nilai harus bertipe string, seperti 'sink.parallelism' = '4'. |
source_database | Nama database sumber. Secara opsional sertakan nama katalog sumber. |
INCLUDING ALL TABLES | Menentukan bahwa semua tabel di database sumber disinkronkan. |
INCLUDING TABLE | Menentukan tabel untuk sinkronisasi. Pisahkan beberapa tabel dengan batang vertikal (|). Anda dapat menggunakan ekspresi reguler untuk menyertakan semua tabel berdasarkan pola penamaan tertentu. Misalnya, INCLUDING TABLE 'web.*' menyinkronkan semua tabel dengan nama yang dimulai dengan |
EXCLUDING TABLE | Menentukan tabel yang dikecualikan dari sinkronisasi. Pisahkan beberapa tabel dengan batang vertikal (|). Anda dapat menggunakan ekspresi reguler untuk menyertakan semua tabel berdasarkan pola penamaan tertentu. Misalnya, INCLUDING TABLE 'web.*' menyinkronkan semua tabel dengan nama yang dimulai dengan |
OPTIONS | Opsi konektor untuk tabel sumber. Untuk informasi lebih lanjut, lihat dokumen masing-masing di bawah Konektor yang didukung. Catatan Baik kunci maupun nilai harus bertipe string, seperti 'server-id' = '65500'. |
Kata Kunci
IF NOT EXISTSDiperlukan. Ini meminta sistem untuk memeriksa keberadaan tabel sink di penyimpanan tujuan. Jika tidak ada, sistem akan membuat tabel sink. Jika sudah ada, pembuatan tabel dilewati.Tabel sink yang dibuat membagikan skema tabel sumber, termasuk kunci utama dan nama serta tipe kolom fisik, tetapi mengecualikan kolom komputasi, bidang metadata, dan konfigurasi watermark.
Realtime Compute for Apache Flink melakukan pemetaan tipe data dari tabel sumber ke tabel sink selama sinkronisasi data. Untuk informasi lebih lanjut tentang pemetaan tipe data, lihat dokumen konektor spesifik.
Contoh
Sinkronkan database
Deskripsi: Sinkronkan semua tabel dari database MySQL tpcds ke Hologres.
Prasyarat: Katalog berikut dibuat di ruang kerja Anda:
Katalog Hologres bernama
holo.Katalog MySQL bernama
mysql.
Contoh Kode:
USE CATALOG holo;
CREATE DATABASE IF NOT EXISTS holo_tpcds -- Buat database bernama holo_tpcds di Hologres.
WITH ('sink.parallelism' = '4') -- Secara opsional konfigurasikan opsi untuk database tujuan. Secara default, paralelisme sink untuk Hologres diatur ke 4.
AS DATABASE mysql.tpcds INCLUDING ALL TABLES -- Sinkronkan semua tabel.
/*+ OPTIONS('server-id'='8001-8004') */ ; -- Secara opsional konfigurasikan opsi untuk tabel sumber CDC MySQL.Opsi yang dikonfigurasikan untuk database tujuan dalam klausa WITH hanya berlaku untuk pekerjaan saat ini untuk mengontrol perilaku penulisan. Mereka tidak disimpan secara permanen di katalog Hologres. Untuk informasi tentang opsi konektor yang didukung, lihat Konektor Hologres.
Sinkronkan data lintas shard database
Deskripsi: Instance MySQL memiliki beberapa shard database bernama dari order_db01 hingga order_db99. Setiap shard database berisi beberapa tabel, seperti order dan order_detail. Pernyataan CDAS dapat digunakan untuk menyinkronkan semua tabel, bersama dengan data dan perubahan skema, di shard database ini ke Hologres.
Solusi:
Gunakan ekspresi reguler untuk nama database (`order_db[0-9]+`) untuk mencocokkan semua shard database (order_db01 hingga order_db99) untuk sinkronisasi. Nama database dan tabel ditambahkan ke setiap tabel sink sebagai dua bidang tambahan.
Kunci utama tabel Hologres mencakup nama database, nama tabel, dan kolom kunci utama tabel sumber untuk memastikan keunikan.
Tidak perlu membuat tabel target terlebih dahulu.
Contoh Kode dan Hasil:
Tabel dengan nama identik di seluruh shard database digabungkan sebelum disinkronkan ke dalam satu tabel Hologres.
Contoh kode | Hasil |
|
|
Sinkronkan tabel baru
Deskripsi: Setelah pekerjaan yang menyinkronkan data melalui pernyataan CDAS dimulai, tabel baru ditambahkan dan perlu disinkronkan.
Solusi: Aktifkan deteksi tabel baru untuk pekerjaan dan mulai ulang dari titik simpanan untuk menangkap dan menyinkronkan tabel yang baru ditambahkan.
Batasan: Deteksi tabel baru didukung di VVR 8.0.6 atau lebih baru. Untuk mengaktifkan fitur ini, pastikan mode startup tabel sumber diatur ke initial.
Prosedur:
Di halaman Deployments, temukan penyebaran target dan klik Cancel di kolom Actions.
Dalam dialog, perluas bagian More Strategies, pilih opsi Stop With Savepoint, lalu klik OK.
Dalam draf SQL pekerjaan, tambahkan pernyataan berikut untuk mengaktifkan deteksi tabel baru:
SET 'table.cdas.scan.newly-added-table.enabled' = 'true';Klik Deploy.
Pulihkan pekerjaan dari titik simpanan.
Di halaman Deployments, klik nama deployment Anda.
Di halaman detail penyebaran, klik tab State. Lalu, klik subtab History.
Di daftar Savepoints, temukan titik simpanan yang dibuat saat pekerjaan dibatalkan.
Pilih di kolom Actions. Untuk informasi lebih lanjut, lihat Mulai Penyebaran.
Jalankan beberapa pernyataan CDAS dan CTAS
Deskripsi: Sinkronkan data dari database tpcds, database tpch, dan database shard user_db01 hingga user_db99 ke Hologres dalam satu pekerjaan.
Solusi: Gunakan pernyataan STATEMENT SET untuk mengelompokkan beberapa pernyataan CDAS dan CTAS. Solusi ini menggunakan kembali simpul sumber untuk membaca data dari tabel yang diperlukan. Ini sangat bermanfaat dengan sumber data CDC MySQL, mengurangi jumlah ID server, koneksi database, dan beban baca keseluruhan pada database MySQL.
Untuk menggunakan kembali sumber dan mengoptimalkan kinerja, pastikan opsi konektor untuk setiap tabel sumber identik.
Untuk informasi tentang konfigurasi ID server, lihat Atur ID Server Berbeda untuk Setiap Klien.
Contoh Kode:
USE CATALOG holo;
BEGIN STATEMENT SET;
-- Sinkronkan data dari tabel pengguna di seluruh shard database.
CREATE TABLE IF NOT EXISTS user
AS TABLE mysql.`user_db[0-9]+`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */;
-- Sinkronkan data dari database tpcds.
CREATE DATABASE IF NOT EXISTS holo_tpcds
AS DATABASE mysql.tpcds INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */ ;
-- Sinkronkan data dari database tpch.
CREATE DATABASE IF NOT EXISTS holo_tpch
AS DATABASE mysql.tpch INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */ ;
END;Sinkronkan data di database ke Kafka melalui beberapa pernyataan CDAS
Deskripsi: Sinkronkan data di tabel dari beberapa database MySQL (tpcds dan tpch, dll.) ke Kafka.
Solusi: Saat menggunakan beberapa pernyataan CDAS untuk menyinkronkan data di database ke Kafka, tabel dengan nama identik mungkin ada di database yang berbeda. Untuk mencegah konflik topik, konfigurasikan opsi cdas.topic.pattern untuk menentukan pola nama topik. Anda dapat menggunakan placeholder {table-name}. Misalnya, menentukan 'cdas.topic.pattern'='dbname-{table-name}' menghasilkan data yang direplikasi dari tabel table1 di database db1 ke topik Kafka dbname-table1.
Contoh Kode:
USE CATALOG kafkaCatalog;
BEGIN STATEMENT SET;
-- Sinkronkan data dari database tpcds.
CREATE DATABASE IF NOT EXISTS kafka
WITH ('cdas.topic.pattern' = 'tpcds-{table-name}')
AS DATABASE mysql.tpcds INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */ ;
-- Sinkronkan data dari database tpch.
CREATE DATABASE IF NOT EXISTS kafka
WITH ('cdas.topic.pattern' = 'tpch-{table-name}')
AS DATABASE mysql.tpch INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */ ;
END;Dengan memperkenalkan Kafka sebagai lapisan perantara antara MySQL dan Flink, Anda dapat mengurangi beban pada MySQL. Untuk informasi lebih lanjut, lihat Sinkronkan Data dari Semua Tabel di Database MySQL ke Kafka.
FAQ
Kesalahan runtime
Apa yang harus saya lakukan jika penyebaran tidak dapat dimulai?
Apa yang harus saya lakukan jika penyebaran dimulai ulang setelah penyebaran dijalankan?
Apa yang harus saya lakukan jika pesan kesalahan "akka.pattern.AskTimeoutException" muncul?
Kinerja pekerjaan
Sinkronisasi data
Referensi
Katalog populer yang digunakan dengan pernyataan CDAS dan CTAS:
Praktik Terbaik:
Pengambilan Data melalui YAML:
