全部产品
Search
文档中心

Realtime Compute for Apache Flink:CREATE DATABASE AS (CDAS)

更新时间:Jul 06, 2025

Pernyataan CREATE DATABASE AS (CDAS) mendukung sinkronisasi real-time skema tabel dan data pada tingkat database, termasuk sinkronisasi perubahan skema. Topik ini menjelaskan penggunaan pernyataan CDAS dan skenario terkait.

Catatan

Disarankan untuk membuat pekerjaan menggunakan YAML untuk pengambilan data. Anda juga dapat mengonversi draf SQL yang ada berisi pernyataan CTAS atau CDAS menjadi draf YAML:

  • Pendahuluan: Anda dapat mengembangkan pekerjaan menggunakan YAML untuk menyinkronkan data dari sumber ke tujuan.

  • Keuntungan: Pernyataan CTAS dan CDAS mendukung kemampuan utama seperti sinkronisasi database, tabel, skema tabel, serta kolom komputasi kustom. Selain itu, fitur seperti evolusi skema real-time, sinkronisasi data log biner mentah, klausa WHERE, dan pemangkasan kolom juga didukung.

Untuk informasi lebih lanjut, lihat Gunakan penerapan YAML untuk mengambil data.

Informasi latar belakang

Sebagai gula sintaksis dari CREATE TABLE AS (CTAS), pernyataan CDAS digunakan untuk menyinkronkan data dari beberapa atau semua tabel dalam database, ideal untuk skenario integrasi data otomatis. Biasanya digunakan dengan katalog sumber dan tujuan, yang menyediakan manajemen metadata persisten untuk tabel. Pernyataan CDAS membantu menerapkan replikasi data penuh dan bertahap serta menyinkronkan data dan perubahan skema tanpa perlu membuat tabel target terlebih dahulu.

Pernyataan CDAS menawarkan keuntungan berikut:

  • Sintaksis yang disederhanakan

    Realtime Compute for Apache Flink secara otomatis mengonversi pernyataan CDAS menjadi pernyataan CTAS, satu pernyataan CTAS per tabel. Pernyataan CDAS mewarisi kemampuan pernyataan CTAS untuk sinkronisasi data dan evolusi skema.

  • Optimasi Sumber Daya

    Realtime Compute for Apache Flink mengoptimalkan tabel sumber dengan menggunakan satu simpul sumber untuk membaca dari beberapa tabel bisnis. Hal ini sangat bermanfaat untuk sumber MySQL CDC, mengurangi koneksi database dan mencegah penarikan log biner yang berlebihan, sehingga mengurangi beban baca keseluruhan pada database MySQL.

Kemampuan inti

Sinkronisasi data

Fitur

Deskripsi

Sinkronkan database

Menjalankan sinkronisasi data penuh dan bertahap dari beberapa tabel (atau semua tabel) dalam database ke setiap tabel sink terkait.

Konsolidasikan dan sinkronkan shard database

Cocokkan nama tabel sumber di seluruh shard database dengan menggunakan ekspresi reguler, konsolidasikan tabel-tabel tersebut, dan sinkronkan ke sink yang sesuai.

Sinkronkan tabel baru

Sinkronkan tabel yang baru ditambahkan dengan memulai ulang pekerjaan Anda dari titik simpanan.

Jalankan beberapa pernyataan CDAS dan CTAS

Mengizinkan Anda menggunakan pernyataan STATEMENT SET untuk mengirimkan beberapa pernyataan CDAS dan CTAS sebagai satu pekerjaan. Anda juga dapat menggabungkan dan menggunakan kembali data operator tabel sumber untuk mengurangi beban baca pada sumber data.

Evolusi skema

Selama sinkronisasi data database, perubahan skema (seperti menambahkan kolom) dapat disebarkan ke sink. Kebijakan ini konsisten dengan pernyataan CTAS. Untuk informasi lebih lanjut, lihat Evolusi Skema.

Proses startup

Tabel berikut menunjukkan proses menyinkronkan data dari MySQL ke Hologres menggunakan pernyataan CDAS.

Diagram alir

Deskripsi

Saat menjalankan pernyataan CDAS, Realtime Compute for Apache Flink melakukan hal berikut:

  1. Memverifikasi keberadaan database tujuan dan tabel sink.

    • Jika database tujuan tidak ditemukan, Realtime Compute for Apache Flink membuat database melalui katalog tujuan.

    • Jika database tujuan ditemukan, Realtime Compute for Apache Flink melewati pembuatan database dan memeriksa tabel sink di database.

      • Jika tabel sink tidak ditemukan, Realtime Compute for Apache Flink membuat tabel sink yang mencerminkan nama tabel sumber dan skema.

      • Jika tabel sink ditemukan, Realtime Compute for Apache Flink melewati pembuatan tabel.

  2. Mengirimkan dan menjalankan pekerjaan sinkronisasi data.

    Data dan perubahan skema direplikasi dari database sumber ke tabel di database tujuan.

Prasyarat

Katalog tujuan harus terdaftar di ruang kerja. Untuk informasi lebih lanjut, lihat Kelola Katalog.

Batasan

Batasan sintaksis

Sistem hulu dan hilir yang didukung

Tabel berikut mencantumkan sistem hulu dan hilir yang didukung oleh pernyataan CDAS:

Connector

Tabel sumber

Tabel sink

Catatan

Konektor MySQL

×

Tampilan tidak dapat disinkronkan.

Konektor Kafka

×

Konektor MongoDB

×

  • Konsolidasi dan sinkronisasi tabel dan database yang di-shard tidak didukung.

  • Metadata MongoDB tidak dapat disinkronkan.

  • Untuk menyinkronkan data dan perubahan skema dari MongoDB ke penyimpanan tujuan melalui pernyataan CDAS, lihat Kelola katalog MongoDB.

Konektor Upsert Kafka

×

Konektor Hologres

×

Saat Hologres berfungsi sebagai sistem tujuan sinkronisasi data, sistem secara otomatis membuat koneksi untuk setiap tabel berdasarkan nilai opsi connectionSize. Anda dapat mengonfigurasi kumpulan koneksi yang sama untuk beberapa tabel dengan menggunakan opsi connectionPoolName.

Catatan
  • Jika tipe data di tabel sumber tidak didukung oleh fitur rencana tetap Hologres, gunakan pernyataan INSERT INTO untuk sinkronisasi data. Jangan gunakan pernyataan CTAS, yang memberikan kinerja penulisan lebih rendah karena rencana tetap tidak dapat digunakan.

  • Realtime Compute for Apache Flink dapat membaca dan menulis ke instance Hologres eksklusif. Instance kluster bersama Hologres tidak didukung.

Konektor StarRocks

×

Dukungan terbatas pada StarRocks di Alibaba Cloud EMR.

Konektor Paimon

×

N/A.

Catatan penggunaan

  • Sinkronisasi Tabel Baru

    • VVR 8.0.6 atau lebih baru: Setelah tabel ditambahkan, buat titik simpanan dan mulai ulang pekerjaan dari titik simpanan untuk menangkap dan menyinkronkan tabel baru. Untuk informasi lebih lanjut, lihat Sinkronkan Tabel Baru.

    • VVR 8.0.5 atau lebih lama: Tabel baru tidak ditangkap atau disinkronkan melalui restart pekerjaan. Gunakan salah satu metode berikut:

      Metode

      Deskripsi

      Buat pekerjaan baru untuk menyinkronkan tabel baru

      Biarkan pekerjaan yang ada tetap berjalan. Buat pekerjaan baru untuk menyinkronkan tabel baru. Contoh kode:

      -- Buat pekerjaan untuk menyinkronkan data dari tabel baru bernama new_table
      CREATE TABLE IF NOT EXISTS new_table
      AS TABLE mysql.tpcds.new_table 
      /*+ OPTIONS('server-id'='8008-8010') */;

      Bersihkan data yang disinkronkan dan mulai ulang pekerjaan Anda

      Lakukan langkah-langkah berikut:

      1. Batalkan pekerjaan yang ada.

      2. Bersihkan data yang disinkronkan di sink.

      3. Mulai ulang pekerjaan tanpa status untuk menyinkronkan data lagi.

  • Akses Baca/Tulis ke Sistem Eksternal

    Untuk memastikan operasi berhasil, berikan izin baca/tulis yang diperlukan ke akun Anda saat:

    • Mengakses sumber daya eksternal lintas akun;

    • Mengakses sumber daya eksternal sebagai pengguna RAM atau Peran RAM.

Sintaksis

CREATE DATABASE IF NOT EXISTS <target_database>
[COMMENT database_comment]
[WITH (key1=val1, key2=val2, ...)]
AS DATABASE <source_database>
INCLUDING { ALL TABLES | TABLE 'table_name' }
[EXCLUDING TABLE 'table_name']
[/*+ OPTIONS(key1=val1, key2=val2, ... ) */]

<target_database>:
  [catalog_name.]db_name

<source_database>:
  [catalog_name.]db_name

Pernyataan CDAS menggunakan sintaksis dasar pernyataan CREATE DATABASE. Tabel berikut menjelaskan parameter:

Parameter

Deskripsi

target_database

Nama database tujuan. Secara opsional sertakan nama katalog tujuan.

COMMENT

Deskripsi database tujuan. Deskripsi source_database digunakan secara otomatis.

WITH

Opsi untuk database tujuan. Untuk informasi lebih lanjut, lihat dokumen masing-masing di bawah Kelola katalog.

Catatan

Baik kunci maupun nilai harus bertipe string, seperti 'sink.parallelism' = '4'.

source_database

Nama database sumber. Secara opsional sertakan nama katalog sumber.

INCLUDING ALL TABLES

Menentukan bahwa semua tabel di database sumber disinkronkan.

INCLUDING TABLE

Menentukan tabel untuk sinkronisasi. Pisahkan beberapa tabel dengan batang vertikal (|). Anda dapat menggunakan ekspresi reguler untuk menyertakan semua tabel berdasarkan pola penamaan tertentu. Misalnya, INCLUDING TABLE 'web.*' menyinkronkan semua tabel dengan nama yang dimulai dengan web di database sumber.

EXCLUDING TABLE

Menentukan tabel yang dikecualikan dari sinkronisasi. Pisahkan beberapa tabel dengan batang vertikal (|). Anda dapat menggunakan ekspresi reguler untuk menyertakan semua tabel berdasarkan pola penamaan tertentu. Misalnya, INCLUDING TABLE 'web.*' menyinkronkan semua tabel dengan nama yang dimulai dengan web di database sumber.

OPTIONS

Opsi konektor untuk tabel sumber. Untuk informasi lebih lanjut, lihat dokumen masing-masing di bawah Konektor yang didukung.

Catatan

Baik kunci maupun nilai harus bertipe string, seperti 'server-id' = '65500'.

Catatan
  • Kata Kunci IF NOT EXISTS Diperlukan. Ini meminta sistem untuk memeriksa keberadaan tabel sink di penyimpanan tujuan. Jika tidak ada, sistem akan membuat tabel sink. Jika sudah ada, pembuatan tabel dilewati.

  • Tabel sink yang dibuat membagikan skema tabel sumber, termasuk kunci utama dan nama serta tipe kolom fisik, tetapi mengecualikan kolom komputasi, bidang metadata, dan konfigurasi watermark.

  • Realtime Compute for Apache Flink melakukan pemetaan tipe data dari tabel sumber ke tabel sink selama sinkronisasi data. Untuk informasi lebih lanjut tentang pemetaan tipe data, lihat dokumen konektor spesifik.

Contoh

Sinkronkan database

Deskripsi: Sinkronkan semua tabel dari database MySQL tpcds ke Hologres.

Prasyarat: Katalog berikut dibuat di ruang kerja Anda:

  • Katalog Hologres bernama holo.

  • Katalog MySQL bernama mysql.

Contoh Kode:

USE CATALOG holo;

CREATE DATABASE IF NOT EXISTS holo_tpcds  -- Buat database bernama holo_tpcds di Hologres.
WITH ('sink.parallelism' = '4') -- Secara opsional konfigurasikan opsi untuk database tujuan. Secara default, paralelisme sink untuk Hologres diatur ke 4.
AS DATABASE mysql.tpcds INCLUDING ALL TABLES  -- Sinkronkan semua tabel.
/*+ OPTIONS('server-id'='8001-8004') */ ; -- Secara opsional konfigurasikan opsi untuk tabel sumber CDC MySQL.
Catatan

Opsi yang dikonfigurasikan untuk database tujuan dalam klausa WITH hanya berlaku untuk pekerjaan saat ini untuk mengontrol perilaku penulisan. Mereka tidak disimpan secara permanen di katalog Hologres. Untuk informasi tentang opsi konektor yang didukung, lihat Konektor Hologres.

Sinkronkan data lintas shard database

Deskripsi: Instance MySQL memiliki beberapa shard database bernama dari order_db01 hingga order_db99. Setiap shard database berisi beberapa tabel, seperti order dan order_detail. Pernyataan CDAS dapat digunakan untuk menyinkronkan semua tabel, bersama dengan data dan perubahan skema, di shard database ini ke Hologres.

Solusi:

Gunakan ekspresi reguler untuk nama database (`order_db[0-9]+`) untuk mencocokkan semua shard database (order_db01 hingga order_db99) untuk sinkronisasi. Nama database dan tabel ditambahkan ke setiap tabel sink sebagai dua bidang tambahan.

Kunci utama tabel Hologres mencakup nama database, nama tabel, dan kolom kunci utama tabel sumber untuk memastikan keunikan.

Tidak perlu membuat tabel target terlebih dahulu.

Contoh Kode dan Hasil:

Tabel dengan nama identik di seluruh shard database digabungkan sebelum disinkronkan ke dalam satu tabel Hologres.

Contoh kode

Hasil

USE CATALOG holo;

CREATE DATABASE IF NOT EXISTS holo_order  -- Buat database bernama holo_order di Hologres. Database ini berisi semua tabel di shard database order_db dari instance MySQL.
WITH('sink.parallelism'='4')        -- Tentukan parameter database tujuan. Secara default, paralelisme setiap sink Hologres adalah 4. Pengaturan ini opsional.
AS DATABASE mysql.`order_db[0-9]+` INCLUDING ALL TABLES -- Sinkronkan data dari semua tabel di shard database order_db dari instance MySQL.
/*+OPTIONS('server-id'='8001-8004')*/;  -- Tentukan parameter tambahan untuk tabel sumber CDC MySQL. Pengaturan ini opsional.

order1

Sinkronkan tabel baru

Deskripsi: Setelah pekerjaan yang menyinkronkan data melalui pernyataan CDAS dimulai, tabel baru ditambahkan dan perlu disinkronkan.

Solusi: Aktifkan deteksi tabel baru untuk pekerjaan dan mulai ulang dari titik simpanan untuk menangkap dan menyinkronkan tabel yang baru ditambahkan.

Batasan: Deteksi tabel baru didukung di VVR 8.0.6 atau lebih baru. Untuk mengaktifkan fitur ini, pastikan mode startup tabel sumber diatur ke initial.

Prosedur:

  1. Di halaman Deployments, temukan penyebaran target dan klik Cancel di kolom Actions.

  2. Dalam dialog, perluas bagian More Strategies, pilih opsi Stop With Savepoint, lalu klik OK.

  3. Dalam draf SQL pekerjaan, tambahkan pernyataan berikut untuk mengaktifkan deteksi tabel baru:

    SET 'table.cdas.scan.newly-added-table.enabled' = 'true';
  4. Klik Deploy.

  5. Pulihkan pekerjaan dari titik simpanan.

    1. Di halaman Deployments, klik nama deployment Anda.

    2. Di halaman detail penyebaran, klik tab State. Lalu, klik subtab History.

    3. Di daftar Savepoints, temukan titik simpanan yang dibuat saat pekerjaan dibatalkan.

    4. Pilih More > Start job from this savepoint di kolom Actions. Untuk informasi lebih lanjut, lihat Mulai Penyebaran.

Jalankan beberapa pernyataan CDAS dan CTAS

Deskripsi: Sinkronkan data dari database tpcds, database tpch, dan database shard user_db01 hingga user_db99 ke Hologres dalam satu pekerjaan.

Solusi: Gunakan pernyataan STATEMENT SET untuk mengelompokkan beberapa pernyataan CDAS dan CTAS. Solusi ini menggunakan kembali simpul sumber untuk membaca data dari tabel yang diperlukan. Ini sangat bermanfaat dengan sumber data CDC MySQL, mengurangi jumlah ID server, koneksi database, dan beban baca keseluruhan pada database MySQL.

Penting
  • Untuk menggunakan kembali sumber dan mengoptimalkan kinerja, pastikan opsi konektor untuk setiap tabel sumber identik.

  • Untuk informasi tentang konfigurasi ID server, lihat Atur ID Server Berbeda untuk Setiap Klien.

Contoh Kode:

USE CATALOG holo;

BEGIN STATEMENT SET;

-- Sinkronkan data dari tabel pengguna di seluruh shard database.
CREATE TABLE IF NOT EXISTS user
AS TABLE mysql.`user_db[0-9]+`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */;

-- Sinkronkan data dari database tpcds.
CREATE DATABASE IF NOT EXISTS holo_tpcds
AS DATABASE mysql.tpcds INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */ ;
-- Sinkronkan data dari database tpch.
CREATE DATABASE IF NOT EXISTS holo_tpch
AS DATABASE mysql.tpch INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */ ;

END;

Sinkronkan data di database ke Kafka melalui beberapa pernyataan CDAS

Deskripsi: Sinkronkan data di tabel dari beberapa database MySQL (tpcds dan tpch, dll.) ke Kafka.

Solusi: Saat menggunakan beberapa pernyataan CDAS untuk menyinkronkan data di database ke Kafka, tabel dengan nama identik mungkin ada di database yang berbeda. Untuk mencegah konflik topik, konfigurasikan opsi cdas.topic.pattern untuk menentukan pola nama topik. Anda dapat menggunakan placeholder {table-name}. Misalnya, menentukan 'cdas.topic.pattern'='dbname-{table-name}' menghasilkan data yang direplikasi dari tabel table1 di database db1 ke topik Kafka dbname-table1.

Contoh Kode:

USE CATALOG kafkaCatalog;

BEGIN STATEMENT SET;

-- Sinkronkan data dari database tpcds.
CREATE DATABASE IF NOT EXISTS kafka
WITH ('cdas.topic.pattern' = 'tpcds-{table-name}')
AS DATABASE mysql.tpcds INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */ ;

-- Sinkronkan data dari database tpch.
CREATE DATABASE IF NOT EXISTS kafka
WITH ('cdas.topic.pattern' = 'tpch-{table-name}')
AS DATABASE mysql.tpch INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */ ;

END;

Dengan memperkenalkan Kafka sebagai lapisan perantara antara MySQL dan Flink, Anda dapat mengurangi beban pada MySQL. Untuk informasi lebih lanjut, lihat Sinkronkan Data dari Semua Tabel di Database MySQL ke Kafka.

FAQ

Kesalahan runtime

Kinerja pekerjaan

Sinkronisasi data

Referensi