Anda dapat mengeksekusi pernyataan CREATE TABLE AS (CTAS) untuk menyinkronkan data dan perubahan skema tabel dari sistem hulu ke hilir secara real time. Hal ini meningkatkan efisiensi pembuatan tabel di sistem tujuan serta sinkronisasi perubahan skema tabel. Topik ini menjelaskan cara menggunakan pernyataan CTAS beserta contoh pada berbagai skenario.
Kami menyarankan Anda membuat pekerjaan menggunakan YAML untuk ingesti data. Anda dapat mengonversi draf SQL yang ada berisi pernyataan CTAS atau CDAS menjadi draf YAML:
Pengenalan: Anda dapat mengembangkan pekerjaan menggunakan YAML untuk menyinkronkan data dari sumber ke tujuan.
Keunggulan: Kemampuan utama pernyataan CTAS dan CDAS didukung, termasuk sinkronisasi database, tabel, skema tabel, dan kolom terhitung kustom. Selain itu, evolusi skema real time, sinkronisasi data log biner mentah, klausa WHERE, dan pemangkasan kolom juga didukung.
Untuk informasi selengkapnya, lihat Ingesti data dengan Flink CDC.
Fitur
Sinkronisasi data
Fitur | Deskripsi |
Menyinkronkan tabel | Menyinkronkan data penuh dan data inkremental dari tabel sumber ke tabel sink secara real time. (Contoh: Menyinkronkan tabel) |
Menggabungkan dan menyinkronkan shard tabel | Menggunakan regular expressions untuk mencocokkan nama database dan shard tabel. Kemudian, Anda dapat menggabungkan shard tabel tersebut dan menyinkronkan datanya ke tabel sink. (Contoh: Menggabungkan dan menyinkronkan shard tabel) Catatan Tanda sisipan (^) tidak dapat digunakan untuk mencocokkan awal nama tabel. |
Menyinkronkan kolom terhitung kustom | Menambahkan kolom terhitung untuk mengonversi dan memproses kolom tertentu. Anda dapat menggunakan fungsi sistem atau user-defined functions (UDFs) untuk kolom terhitung dan menentukan posisi kolom terhitung yang ingin ditambahkan. Kolom terhitung yang baru ditambahkan akan digunakan sebagai kolom fisik di tabel sink, dan hasilnya disinkronkan ke tabel sink secara real time. (Contoh: Menyinkronkan kolom terhitung kustom) |
Menjalankan beberapa pernyataan CTAS |
|
Evolusi skema
Selama sinkronisasi data, pernyataan CTAS mendukung replikasi perubahan skema, termasuk pembuatan tabel dan modifikasi skema, dari tabel sumber ke tabel sink.
Perubahan skema yang didukung
Perubahan skema
Deskripsi
Menambahkan kolom nullable
Secara otomatis menambahkan kolom terkait ke akhir skema tabel sink, dengan data yang disinkronkan ke kolom yang ditambahkan. Kolom baru secara default diatur sebagai kolom nullable, dan data pada kolom ini sebelum perubahan secara otomatis diatur ke NULL.
Menambahkan kolom non-null
Secara otomatis menambahkan kolom yang sesuai ke akhir skema tabel sink dan menyinkronkan data.
Menghapus kolom nullable
Secara otomatis mengisi nilai null pada kolom nullable di tabel sink alih-alih menghapus kolom tersebut dari tabel.
Mengganti nama kolom
Operasi penggantian nama kolom melibatkan penambahan kolom dan penghapusan kolom. Setelah nama kolom diubah di tabel sumber, kolom dengan nama baru ditambahkan ke akhir tabel sink dan kolom dengan nama asli diisi dengan nilai null.
CatatanSebagai contoh, jika nama kolom
col_adi tabel sumber diubah menjadicol_b, maka kolomcol_bditambahkan ke akhir tabel sink dan kolomcol_asecara otomatis diisi dengan nilai null.Memodifikasi tipe data kolom
Untuk sistem hilir yang mendukung modifikasi tipe kolom: Saat ini, hanya Paimon yang mendukung perubahan tipe. Pernyataan CTAS mendukung modifikasi tipe kolom biasa, misalnya dari INT ke BIGINT.
Kompatibilitas bergantung pada aturan spesifik sistem (lihat dokumentasi konektor).
Untuk sistem hilir yang tidak mendukung modifikasi tipe kolom: Saat ini, hanya Hologres yang mendukung penggunaan pelebaran tipe (type widening) untuk menangani perubahan tipe kolom. Mekanismenya sebagai berikut: Saat startup pekerjaan, tabel Hologres dengan tipe data yang lebih luas dibuat, dan perubahan tipe kolom didukung berdasarkan kompatibilitas sink. Untuk informasi selengkapnya, lihat Contoh: Menyinkronkan data dalam mode normalisasi tipe.
PentingUntuk mengaktifkan dukungan pelebaran tipe pada Hologres, aktifkan normalisasi tipe saat peluncuran pekerjaan awal. Untuk pekerjaan yang sudah ada, hapus tabel Hologres dan restart tanpa state untuk menerapkan pengaturan normalisasi tipe.
PentingSaat pernyataan CTAS digunakan untuk sinkronisasi, sistem hanya membandingkan perbedaan skema dan tidak mengidentifikasi jenis DDL spesifik. Sebagai contoh:
Jika sebuah kolom dihapus lalu ditambahkan kembali tanpa perubahan data selama periode tersebut, sistem tidak mendeteksi adanya perubahan skema.
Jika sebuah kolom dihapus lalu ditambahkan kembali dengan perubahan data selama periode tersebut, sistem mendeteksi dan menyinkronkan perubahan skema.
Perubahan skema yang tidak didukung
Memodifikasi constraint, seperti primary key atau indeks.
Menghapus kolom non-nullable.
Mengubah dari NOT NULL menjadi NULLABLE.
PentingUntuk menyinkronkan perubahan skema yang tidak didukung, hapus secara manual tabel sink dan restart pekerjaan Anda untuk menyinkronkan ulang data historis.
Proses sinkronisasi
Bagan alir berikut menunjukkan proses penyinkronan data dari MySQL ke Hologres dengan pernyataan CTAS.
Bagan alir | Deskripsi |
Saat mengeksekusi pernyataan CTAS, Realtime Compute for Apache Flink melakukan hal berikut:
|
Prasyarat
Katalog penyimpanan tujuan telah dibuat di ruang kerja Anda. Untuk informasi selengkapnya, lihat Katalog.
Batasan
Batasan sintaksis
Debugging draf SQL yang berisi pernyataan CTAS tidak didukung.
Pernyataan CTAS tidak dapat digunakan bersamaan dengan pernyataan INSERT INTO dalam draf SQL yang sama.
Data tidak dapat disinkronkan ke tabel partisi StarRocks.
MiniBatch tidak didukung.
PentingPastikan konfigurasi MiniBatch telah dihapus sebelum membuat draf SQL yang berisi pernyataan CTAS atau CDAS. Lakukan langkah berikut:
Buka .
Pilih tab Deployment Defaults.
Pada bagian Other Configuration, verifikasi bahwa konfigurasi MiniBatch telah dihapus.
Jika muncul kesalahan saat membuat deployment dari draf SQL atau saat memulai deployment, lihat Bagaimana cara memperbaiki kesalahan "Currently does not support merge StreamExecMiniBatchAssigner type ExecNode in CTAS/CDAS syntax"?.
Sistem hulu dan hilir yang didukung
Tabel berikut menjelaskan penyimpanan data hulu dan hilir yang dapat digunakan dengan pernyataan CTAS.
Konektor | Tabel sumber | Tabel sink | Catatan |
Didukung | Tidak didukung |
| |
Didukung | Tidak didukung | N/A | |
Didukung | Tidak didukung |
| |
Tidak didukung | Didukung | N/A | |
Tidak didukung | Didukung | Dukungan terbatas pada StarRocks di Alibaba Cloud EMR. | |
Tidak didukung | Didukung | Saat Hologres berfungsi sebagai sistem tujuan sinkronisasi data, sistem secara otomatis membuat koneksi untuk setiap tabel berdasarkan nilai opsi Catatan Jika tipe data di tabel sumber tidak didukung oleh fitur fixed plan Hologres, gunakan pernyataan INSERT INTO untuk sinkronisasi data. Jangan gunakan pernyataan CTAS karena memberikan performa penulisan yang lebih rendah akibat ketidakmampuan menggunakan fixed plan. | |
Tidak didukung | Didukung |
Sintaksis
CREATE TABLE IF NOT EXISTS <sink_table>
(
[ <table_constraint> ]
)
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
WITH (
key1=val1,
key2=val2,
...
)
AS TABLE <source_table> [/*+ OPTIONS(key1=val1, key2=val2, ... ) */]
[ADD COLUMN { <column_component> | (<column_component> [, ...])}];
<sink_table>:
[catalog_name.][db_name.]table_name
<table_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
<source_table>:
[catalog_name.][db_name.]table_name
<column_component>:
column_name AS computed_column_expression [COMMENT column_comment] [FIRST | AFTER column_name]Pernyataan CTAS menggunakan sintaks dasar pernyataan CREATE TABLE. Tabel berikut menjelaskan beberapa argumen:
Argumen | Deskripsi |
| Nama tabel target untuk sinkronisasi data. Secara opsional, gunakan nama lengkap tabel dengan menyertakan katalog dan database-nya. |
| Deskripsi tabel sink. Secara default, deskripsi source_table digunakan. |
| Menentukan kolom partisi. Penting Data tidak dapat disinkronkan ke tabel partisi StarRocks. |
| Primary key, yang merupakan pengenal unik untuk setiap catatan dalam tabel. |
| Opsi konektor untuk tabel sink. Untuk informasi selengkapnya, lihat bagian "Opsi konektor dalam klausa WITH" di Konektor Upsert Kafka, Konektor Hologres, StarRocks, atau Konektor Paimon. Catatan Baik kunci maupun nilai harus bertipe STRING, seperti |
| Nama tabel sumber. Secara opsional, gunakan nama lengkap dengan menyertakan katalog dan database-nya. |
| Opsi konektor untuk tabel sumber. Untuk informasi selengkapnya, lihat "Opsi konektor dalam klausa WITH" di Konektor MySQL dan Konektor Kafka. Catatan Baik kunci maupun nilai harus bertipe STRING, seperti 'server-id' = '65500'. |
| Menambahkan kolom terhitung ke tabel sink atau mengganti nama kolom sumber. Penting Pemetaan kolom murni (misalnya, |
| Deskripsi kolom baru. |
| Deskripsi ekspresi kolom terhitung. |
| Menentukan bahwa kolom baru digunakan sebagai bidang pertama di tabel sink. Secara default, kolom baru ditambahkan di akhir tabel sink. |
| Menentukan bahwa kolom baru ditambahkan setelah bidang tertentu. |
Kata kunci
IF NOT EXISTSwajib digunakan. Kata kunci ini meminta sistem untuk memeriksa keberadaan tabel sink di penyimpanan tujuan. Jika tidak ada, sistem akan membuat tabel sink. Jika sudah ada, pembuatan tabel dilewati.Tabel sink yang dibuat berbagi skema tabel sumber, termasuk primary key dan nama serta tipe bidang fisik, tetapi tidak termasuk kolom terhitung, bidang metadata, dan konfigurasi watermark.
Realtime Compute for Apache Flink melakukan pemetaan tipe data dari tabel sumber ke tabel sink selama sinkronisasi data. Untuk informasi selengkapnya tentang pemetaan tipe data, lihat dokumen konektor spesifik.
Contoh
Menyinkronkan tabel
Deskripsi: Menyinkronkan tabel web_sales dari MySQL ke Hologres.
Prasyarat:
Katalog Hologres bernama
holotelah dibuat.Katalog MySQL bernama
mysqltelah dibuat.
Kode contoh:
Pernyataan CTAS sering digunakan bersama katalog sumber dan tujuan untuk mendukung sinkronisasi data penuh dan inkremental. Katalog sumber secara otomatis mengurai skema dan properti tabel sumber tanpa DDL eksplisit.
USE CATALOG holo;
CREATE TABLE IF NOT EXISTS web_sales -- Menyinkronkan data ke tabel web_sales di database default.
WITH ('jdbcWriteBatchSize' = '1024') -- Secara opsional konfigurasikan opsi konektor untuk tabel sink.
AS TABLE mysql.tpcds.web_sales
/*+ OPTIONS('server-id'='8001-8004') */; -- Secara opsional konfigurasikan opsi tambahan untuk tabel sumber CDC MySQL.Menggabungkan dan menyinkronkan shard tabel dan database
Deskripsi: Menggabungkan tabel dan database MySQL yang di-shard sebelum menyinkronkan data ke tabel Hologres.
Metode: Gunakan katalog MySQL dan regular expressions untuk mencocokkan database dan tabel yang ingin disinkronkan.
Nama database dan tabel ditulis ke tabel sink sebagai nilai dua bidang tambahan. Primary key tabel sink terdiri dari nama database, nama tabel, dan primary key asli untuk memastikan keunikan primary key.
Kode dan hasil:
Kode contoh | Hasil |
Menggabungkan dan menyinkronkan shard tabel: |
|
Mengubah skema tabel sumber: Tambahkan kolom baru bernama |
|
Menyinkronkan kolom terhitung kustom
Deskripsi: Selama sinkronisasi gabungan shard tabel dan database dari MySQL ke Hologres, tambahkan kolom terhitung kustom.
Kode dan hasil:
Kode contoh | Hasil |
|
|
Menjalankan beberapa pernyataan CTAS dalam satu pekerjaan
Deskripsi: Menyinkronkan shard tabel web_sales dan user dari MySQL ke Hologres dalam satu pekerjaan.
Metode: Gunakan STATEMENT SET untuk mengeksekusi beberapa pernyataan CTAS sebagai satu kelompok. Pendekatan ini menggunakan kembali vertex sumber untuk membaca data dari beberapa tabel, sehingga mengurangi jumlah server ID, koneksi database, dan beban baca secara keseluruhan.
Untuk menggunakan kembali sumber dan mengoptimalkan performa, pastikan opsi konektor untuk setiap tabel sumber identik.
Untuk informasi tentang konfigurasi server ID, lihat Tetapkan server ID berbeda untuk setiap client.
Kode contoh:
USE CATALOG holo;
BEGIN STATEMENT SET;
-- Menyinkronkan data dari tabel web_sales.
CREATE TABLE IF NOT EXISTS web_sales
AS TABLE mysql.tpcds.web_sales
/*+ OPTIONS('server-id'='8001-8004') */;
-- Menyinkronkan data dari shard tabel user.
CREATE TABLE IF NOT EXISTS user
AS TABLE mysql.`wp.*`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */;
END;Menyinkronkan data dari sumber ke beberapa sink menggunakan beberapa pernyataan CTAS
Tidak ada kolom terhitung yang ditambahkan ke tabel sink
USE CATALOG `holo`; BEGIN STATEMENT SET; -- Menyinkronkan data dari tabel MySQL user ke tabel user di database1 Hologres. CREATE TABLE IF NOT EXISTS `database1`.`user` AS TABLE `mysql`.`tpcds`.`user` /*+ OPTIONS('server-id'='8001-8004') */; -- Menyinkronkan data dari tabel database MySQL user ke tabel user di database2 Hologres. CREATE TABLE IF NOT EXISTS `database2`.`user` AS TABLE `mysql`.`tpcds`.`user` /*+ OPTIONS('server-id'='8001-8004') */; END;Kolom terhitung ditambahkan ke tabel sink
-- Buat tabel temporary bernama user_with_changed_id berdasarkan tabel sumber user. Definisikan kolom computed_id berdasarkan kolom id tabel sumber. CREATE TEMPORARY TABLE `user_with_changed_id` ( `computed_id` AS `id` + 1000 ) LIKE `mysql`.`tpcds`.`user`; -- Buat tabel temporary bernama user_with_changed_age berdasarkan tabel sumber user. Definisikan kolom computed_age berdasarkan kolom age tabel sumber. CREATE TEMPORARY TABLE `user_with_changed_age` ( `computed_age` AS `age` + 1 ) LIKE `mysql`.`tpcds`.`user`; BEGIN STATEMENT SET; -- Menyinkronkan data dari tabel database MySQL user ke tabel user_with_changed_id Hologres. Tabel user_with_changed_id berisi ID yang diperoleh dari perhitungan berdasarkan kolom id tabel sumber. ID tersebut berada di kolom computed_id. CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`user_with_changed_id` AS TABLE `user_with_changed_id` /*+ OPTIONS('server-id'='8001-8004') */; -- Menyinkronkan data dari tabel database MySQL user ke tabel user_with_changed_age Hologres. Tabel user_with_changed_age berisi nilai usia yang diperoleh dari perhitungan berdasarkan kolom age tabel sumber. Nilai usia tersebut berada di kolom computed_age. CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`user_with_changed_age` AS TABLE `user_with_changed_age` /*+ OPTIONS('server-id'='8001-8004') */; END;
Menyinkronkan tabel baru menggunakan beberapa pernyataan CTAS
Deskripsi skenario: Setelah pekerjaan yang menggunakan beberapa pernyataan CTAS untuk sinkronisasi dimulai, tambahkan pernyataan CTAS untuk mereplikasi tabel baru.
Metode: Aktifkan deteksi tabel baru untuk pekerjaan, tambahkan pernyataan CTAS ke kode SQL pekerjaan, dan restart dari titik simpan. Setelah tabel baru ditangkap, data akan direplikasi.
Batasan:
Deteksi tabel baru didukung untuk VVR 8.0.1 atau versi yang lebih baru.
Saat data disinkronkan dari tabel sumber CDC, hanya pekerjaan yang dimulai dalam mode initial yang dapat mendeteksi tabel baru.
Konfigurasi tabel sumber yang ditambahkan menggunakan pernyataan CTAS baru harus sama dengan konfigurasi tabel sumber asli. Dengan demikian, vertex sumber dapat digunakan kembali.
Parameter konfigurasi pekerjaan sebelum dan sesudah penambahan pernyataan CTAS harus sama. Misalnya, mode startup harus sama.
Prosedur:
Pada halaman Deployments, temukan deployment target dan klik Cancel di kolom Actions.
Pada dialog, perluas bagian More Strategies, pilih Stop With Savepoint, lalu klik OK.
Pada draf SQL pekerjaan, aktifkan deteksi tabel baru dan tambahkan pernyataan CTAS.
Tambahkan pernyataan berikut untuk mengaktifkan deteksi tabel baru.
SET 'table.cdas.scan.newly-added-table.enabled' = 'true';Tambahkan pernyataan CTAS. Kode lengkap pekerjaan terlihat seperti berikut:
-- Aktifkan deteksi tabel baru SET 'table.cdas.scan.newly-added-table.enabled' = 'true'; USE CATALOG holo; BEGIN STATEMENT SET; -- Menyinkronkan data dari tabel web_sales. CREATE TABLE IF NOT EXISTS web_sales AS TABLE mysql.tpcds.web_sales /*+ OPTIONS('server-id'='8001-8004') */; -- Menyinkronkan data dari shard tabel user. CREATE TABLE IF NOT EXISTS user AS TABLE mysql.`wp.*`.`user[0-9]+` /*+ OPTIONS('server-id'='8001-8004') */; -- Menyinkronkan data dari tabel product. CREATE TABLE IF NOT EXISTS product AS TABLE mysql.tpcds.product /*+ OPTIONS('server-id'='8001-8004') */; END;Klik Deploy.
Pulihkan pekerjaan dari titik simpan.
Pada halaman Deployments, klik nama deployment Anda.
Pada halaman detail deployment, klik tab State. Lalu, klik subtab History.
Pada daftar Savepoints, temukan titik simpan yang dibuat saat pekerjaan dibatalkan.
Pilih di kolom Actions. Untuk informasi selengkapnya, lihat Memulai penerapan Pekerjaan.
Menyinkronkan ke tabel partisi di Hologres
Deskripsi skenario: Mereplikasi data dari MySQL ke tabel partisi Hologres.
Catatan penggunaan: Jika primary key didefinisikan untuk tabel Hologres, kolom partisi harus disertakan dalam primary key.
Kode contoh:
Buat tabel MySQL:
CREATE TABLE orders (
order_id INTEGER NOT NULL,
product_id INTEGER NOT NULL,
city VARCHAR(100) NOT NULL
order_date DATE,
purchaser INTEGER,
PRIMARY KEY(order_id, product_id)
);Bergantung pada apakah kolom partisi termasuk dalam primary key, pilih metode yang tepat:
Jika primary key sumber berisi kolom partisi:
Gunakan pernyataan CTAS secara langsung.
Hologres akan secara otomatis memverifikasi apakah kolom partisi disertakan dalam primary key.
CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders` PARTITIONED BY (product_id) AS TABLE `mysql`.`tpcds`.`orders`;
Jika primary key sumber tidak mencakup kolom partisi:
Deklarasikan primary key tabel sink dalam pernyataan CTAS dan sertakan kolom partisi dalam definisi primary key.
Dalam kasus ini, tidak mendeklarasikan ulang primary key atau tidak menyertakan kolom partisi di dalamnya akan menyebabkan pekerjaan gagal.
-- Deklarasikan bidang order_id, product_id, dan city sebagai primary key tabel partisi Hologres. CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders`( CONSTRAINT `PK_order_id_city` PRIMARY KEY (`order_id`,`product_id`,`city`) NOT ENFORCED ) PARTITIONED BY (city) AS TABLE `mysql`.`tpcds`.`orders`;
Melebarkan tipe data selama replikasi
Deskripsi skenario: Selama sinkronisasi data, ubah presisi kolom, misalnya dari VARCHAR(10) ke VARCHAR(20), atau ubah tipe data kolom, seperti dari SMALLINT ke INT.
Metode:
Pekerjaan baru: Aktifkan mode normalisasi tipe saat peluncuran pertama.
Pekerjaan yang sudah ada: Hapus tabel sink Hologres, lalu restart tanpa state untuk menerapkan normalisasi tipe.
Aturan normalisasi tipe:
Jika tipe data baru dan asli dinormalisasi ke tipe data yang sama, perubahan tipe data berhasil dan pekerjaan berjalan normal. Jika tidak, pengecualian akan dilaporkan. Rinciannya sebagai berikut:
TINYINT, SMALLINT, INT, dan BIGINT dikonversi menjadi BIGINT.
CHAR, VARCHAR, dan STRING dikonversi menjadi STRING.
FLOAT dan DOUBLE dikonversi menjadi DOUBLE.
Tipe data lain dikonversi berdasarkan pemetaan tipe data antara bidang Hologres dan Flink. Untuk informasi selengkapnya, lihat Pemetaan tipe data.
Kode contoh:
CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders`
WITH (
'connector' = 'hologres',
'enableTypeNormalization' = 'true' -- Aktifkan mode normalisasi tipe.
) AS TABLE `mysql`.`tpcds`.`orders`;Menyinkronkan data dari MongoDB ke Hologres
Batasan:
Dukungan terbatas pada VVR 8.0.6 atau versi yang lebih baru dan MongoDB versi 6.0 atau lebih baru.
Pada opsi konektor untuk tabel sumber, scan.incremental.snapshot.enabled dan scan.full-changelog harus diatur ke
true.Fitur preimage dan postimage harus diaktifkan untuk database MongoDB. Untuk informasi selengkapnya, lihat Document Preimages.
Untuk menyinkronkan data dari beberapa koleksi MongoDB dalam satu pekerjaan, pastikan konfigurasi opsi konektor berikut identik untuk semua tabel:
Opsi terkait database MongoDB, termasuk
hosts,scheme,username,password, danconnectionOptionsscan.startup.mode
Kode contoh:
BEGIN STATEMENT SET;
CREATE TABLE IF NOT EXISTS `holo`.`database`.`table1`
AS TABLE `mongodb`.`database`.`collection1`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true','scan.full-changelog'='true') */;
CREATE TABLE IF NOT EXISTS `holo`.`database`.`table2`
AS TABLE `mongodb`.`database`.`collection2`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true','scan.full-changelog'='true') */;
END;FAQ
Kesalahan runtime
Apa yang harus saya lakukan jika pekerjaan tidak dapat dimulai?
Apa yang harus saya lakukan jika pekerjaan di-restart setelah dijalankan?
Apa yang harus saya lakukan jika muncul pesan kesalahan "akka.pattern.AskTimeoutException"?
Performa pekerjaan
Bagaimana cara mengatasi masalah tekanan balik (backpressure)?
Apa yang harus saya lakukan jika konsumsi data hulu tidak stabil?
Sinkronisasi data
Referensi
Pernyataan CTAS sering digunakan dengan katalog, yang menawarkan manajemen metadata persisten untuk tabel dan memungkinkan akses data lintas pekerjaan: Katalog populer:
Praktik terbaik penggunaan pernyataan CTAS dan CDAS:
Untuk informasi tentang sinkronisasi data dari semua tabel dalam database, penggabungan dan sinkronisasi data tabel dalam database yang di-shard, atau sinkronisasi data dari tabel baru di database sumber, lihat CREATE DATABASE AS (CDAS).
Untuk mengurangi beban baca data pada database MySQL selama sinkronisasi database, Anda dapat menyinkronkan data dalam database ke Kafka. Lihat Sinkronisasi database dari MySQL ke Kafka dengan Flink CDC.
Untuk informasi tentang cara menggunakan pernyataan CTAS dan CDAS untuk melakukan sinkronisasi data, lihat Ingesti data ke gudang data secara real time, Bangun gudang data real-time dengan Flink dan Hologres, atau Bangun danau data streaming dengan Flink, Paimon, dan StarRocks.
Ingesti data melalui YAML:


