全部产品
Search
文档中心

Realtime Compute for Apache Flink:CREATE TABLE AS (CTAS)

更新时间:Dec 25, 2025

Anda dapat mengeksekusi pernyataan CREATE TABLE AS (CTAS) untuk menyinkronkan data dan perubahan skema tabel dari sistem hulu ke hilir secara real time. Hal ini meningkatkan efisiensi pembuatan tabel di sistem tujuan serta sinkronisasi perubahan skema tabel. Topik ini menjelaskan cara menggunakan pernyataan CTAS beserta contoh pada berbagai skenario.

Catatan

Kami menyarankan Anda membuat pekerjaan menggunakan YAML untuk ingesti data. Anda dapat mengonversi draf SQL yang ada berisi pernyataan CTAS atau CDAS menjadi draf YAML:

  • Pengenalan: Anda dapat mengembangkan pekerjaan menggunakan YAML untuk menyinkronkan data dari sumber ke tujuan.

  • Keunggulan: Kemampuan utama pernyataan CTAS dan CDAS didukung, termasuk sinkronisasi database, tabel, skema tabel, dan kolom terhitung kustom. Selain itu, evolusi skema real time, sinkronisasi data log biner mentah, klausa WHERE, dan pemangkasan kolom juga didukung.

Untuk informasi selengkapnya, lihat Ingesti data dengan Flink CDC.

Fitur

Sinkronisasi data

Fitur

Deskripsi

Menyinkronkan tabel

Menyinkronkan data penuh dan data inkremental dari tabel sumber ke tabel sink secara real time. (Contoh: Menyinkronkan tabel)

Menggabungkan dan menyinkronkan shard tabel

Menggunakan regular expressions untuk mencocokkan nama database dan shard tabel. Kemudian, Anda dapat menggabungkan shard tabel tersebut dan menyinkronkan datanya ke tabel sink. (Contoh: Menggabungkan dan menyinkronkan shard tabel)

Catatan

Tanda sisipan (^) tidak dapat digunakan untuk mencocokkan awal nama tabel.

Menyinkronkan kolom terhitung kustom

Menambahkan kolom terhitung untuk mengonversi dan memproses kolom tertentu. Anda dapat menggunakan fungsi sistem atau user-defined functions (UDFs) untuk kolom terhitung dan menentukan posisi kolom terhitung yang ingin ditambahkan. Kolom terhitung yang baru ditambahkan akan digunakan sebagai kolom fisik di tabel sink, dan hasilnya disinkronkan ke tabel sink secara real time. (Contoh: Menyinkronkan kolom terhitung kustom)

Menjalankan beberapa pernyataan CTAS

Evolusi skema

Selama sinkronisasi data, pernyataan CTAS mendukung replikasi perubahan skema, termasuk pembuatan tabel dan modifikasi skema, dari tabel sumber ke tabel sink.

  • Perubahan skema yang didukung

    Perubahan skema

    Deskripsi

    Menambahkan kolom nullable

    Secara otomatis menambahkan kolom terkait ke akhir skema tabel sink, dengan data yang disinkronkan ke kolom yang ditambahkan. Kolom baru secara default diatur sebagai kolom nullable, dan data pada kolom ini sebelum perubahan secara otomatis diatur ke NULL.

    Menambahkan kolom non-null

    Secara otomatis menambahkan kolom yang sesuai ke akhir skema tabel sink dan menyinkronkan data.

    Menghapus kolom nullable

    Secara otomatis mengisi nilai null pada kolom nullable di tabel sink alih-alih menghapus kolom tersebut dari tabel.

    Mengganti nama kolom

    Operasi penggantian nama kolom melibatkan penambahan kolom dan penghapusan kolom. Setelah nama kolom diubah di tabel sumber, kolom dengan nama baru ditambahkan ke akhir tabel sink dan kolom dengan nama asli diisi dengan nilai null.

    Catatan

    Sebagai contoh, jika nama kolom col_a di tabel sumber diubah menjadi col_b, maka kolom col_b ditambahkan ke akhir tabel sink dan kolom col_a secara otomatis diisi dengan nilai null.

    Memodifikasi tipe data kolom

    • Untuk sistem hilir yang mendukung modifikasi tipe kolom: Saat ini, hanya Paimon yang mendukung perubahan tipe. Pernyataan CTAS mendukung modifikasi tipe kolom biasa, misalnya dari INT ke BIGINT.

      Kompatibilitas bergantung pada aturan spesifik sistem (lihat dokumentasi konektor).

    • Untuk sistem hilir yang tidak mendukung modifikasi tipe kolom: Saat ini, hanya Hologres yang mendukung penggunaan pelebaran tipe (type widening) untuk menangani perubahan tipe kolom. Mekanismenya sebagai berikut: Saat startup pekerjaan, tabel Hologres dengan tipe data yang lebih luas dibuat, dan perubahan tipe kolom didukung berdasarkan kompatibilitas sink. Untuk informasi selengkapnya, lihat Contoh: Menyinkronkan data dalam mode normalisasi tipe.

      Penting

      Untuk mengaktifkan dukungan pelebaran tipe pada Hologres, aktifkan normalisasi tipe saat peluncuran pekerjaan awal. Untuk pekerjaan yang sudah ada, hapus tabel Hologres dan restart tanpa state untuk menerapkan pengaturan normalisasi tipe.

    Penting

    Saat pernyataan CTAS digunakan untuk sinkronisasi, sistem hanya membandingkan perbedaan skema dan tidak mengidentifikasi jenis DDL spesifik. Sebagai contoh:

    • Jika sebuah kolom dihapus lalu ditambahkan kembali tanpa perubahan data selama periode tersebut, sistem tidak mendeteksi adanya perubahan skema.

    • Jika sebuah kolom dihapus lalu ditambahkan kembali dengan perubahan data selama periode tersebut, sistem mendeteksi dan menyinkronkan perubahan skema.

  • Perubahan skema yang tidak didukung

    • Memodifikasi constraint, seperti primary key atau indeks.

    • Menghapus kolom non-nullable.

    • Mengubah dari NOT NULL menjadi NULLABLE.

    Penting

    Untuk menyinkronkan perubahan skema yang tidak didukung, hapus secara manual tabel sink dan restart pekerjaan Anda untuk menyinkronkan ulang data historis.

Proses sinkronisasi

Bagan alir berikut menunjukkan proses penyinkronan data dari MySQL ke Hologres dengan pernyataan CTAS.

Bagan alir

Deskripsi

image

Saat mengeksekusi pernyataan CTAS, Realtime Compute for Apache Flink melakukan hal berikut:

  1. Memverifikasi keberadaan tabel sink di sistem tujuan.

    • Jika tidak ada, Realtime Compute for Apache Flink membuat tabel sink yang mencerminkan skema tabel sumber menggunakan katalog penyimpanan tujuan.

    • Jika ada, Realtime Compute for Apache Flink melewati pembuatan tabel dan memverifikasi konsistensi skema tabel sink dengan skema tabel sumber. Jika skemanya berbeda, kesalahan dilaporkan.

  2. Mengirimkan dan menjalankan pekerjaan sinkronisasi data.

    Realtime Compute for Apache Flink menyinkronkan data dan perubahan skema dari sumber ke sink.

Prasyarat

Katalog penyimpanan tujuan telah dibuat di ruang kerja Anda. Untuk informasi selengkapnya, lihat Katalog.

Batasan

Batasan sintaksis

  • Debugging draf SQL yang berisi pernyataan CTAS tidak didukung.

  • Pernyataan CTAS tidak dapat digunakan bersamaan dengan pernyataan INSERT INTO dalam draf SQL yang sama.

  • Data tidak dapat disinkronkan ke tabel partisi StarRocks.

  • MiniBatch tidak didukung.

    Penting

Sistem hulu dan hilir yang didukung

Tabel berikut menjelaskan penyimpanan data hulu dan hilir yang dapat digunakan dengan pernyataan CTAS.

Konektor

Tabel sumber

Tabel sink

Catatan

MySQL

Didukung

Tidak didukung

  • Nama database dan tabel secara otomatis disinkronkan selama penggabungan dan sinkronisasi tabel dan database yang di-shard.

  • Selama sinkronisasi tabel tunggal, untuk menyinkronkan nama database dan tabel Anda, buat katalog MySQL melalui Flink SQL dan sertakan opsi catalog.table.metadata-columns. Untuk informasi selengkapnya, lihat Kelola katalog MySQL.

  • View tidak dapat disinkronkan.

ApsaraMQ for Kafka

Didukung

Tidak didukung

N/A

MongoDB

Didukung

Tidak didukung

  • Penggabungan dan sinkronisasi tabel dan database yang di-shard tidak didukung.

  • Metadata database MongoDB tidak dapat disinkronkan.

  • Tabel baru di database sumber tidak dapat ditangkap atau disinkronkan.

  • Data dan perubahan skema tabel dapat disinkronkan dari MongoDB ke sistem lain. Untuk informasi selengkapnya, lihat Sinkronisasi data dari tabel sumber MongoDB ke tabel Hologres.

Upsert Kafka

Tidak didukung

Didukung

N/A

StarRocks

Tidak didukung

Didukung

Dukungan terbatas pada StarRocks di Alibaba Cloud EMR.

Hologres

Tidak didukung

Didukung

Saat Hologres berfungsi sebagai sistem tujuan sinkronisasi data, sistem secara otomatis membuat koneksi untuk setiap tabel berdasarkan nilai opsi connectionSize. Anda dapat mengonfigurasi kolam koneksi yang sama untuk beberapa tabel dengan menggunakan opsi connectionPoolName.

Catatan

Jika tipe data di tabel sumber tidak didukung oleh fitur fixed plan Hologres, gunakan pernyataan INSERT INTO untuk sinkronisasi data. Jangan gunakan pernyataan CTAS karena memberikan performa penulisan yang lebih rendah akibat ketidakmampuan menggunakan fixed plan.

Streaming data lakehouse Paimon

Tidak didukung

Didukung

Sintaksis

CREATE TABLE IF NOT EXISTS <sink_table>
(
  [ <table_constraint> ]
)
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
WITH (
  key1=val1,
  key2=val2, 
  ...
 )
AS TABLE <source_table> [/*+ OPTIONS(key1=val1, key2=val2, ... ) */]
[ADD COLUMN { <column_component> | (<column_component> [, ...])}];

<sink_table>:
  [catalog_name.][db_name.]table_name

<table_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED

<source_table>:
  [catalog_name.][db_name.]table_name

<column_component>:
  column_name AS computed_column_expression [COMMENT column_comment] [FIRST | AFTER column_name]

Pernyataan CTAS menggunakan sintaks dasar pernyataan CREATE TABLE. Tabel berikut menjelaskan beberapa argumen:

Argumen

Deskripsi

sink_table

Nama tabel target untuk sinkronisasi data. Secara opsional, gunakan nama lengkap tabel dengan menyertakan katalog dan database-nya.

COMMENT

Deskripsi tabel sink. Secara default, deskripsi source_table digunakan.

PARTITIONED BY

Menentukan kolom partisi.

Penting

Data tidak dapat disinkronkan ke tabel partisi StarRocks.

table_constraint

Primary key, yang merupakan pengenal unik untuk setiap catatan dalam tabel.

WITH

Opsi konektor untuk tabel sink. Untuk informasi selengkapnya, lihat bagian "Opsi konektor dalam klausa WITH" di Konektor Upsert Kafka, Konektor Hologres, StarRocks, atau Konektor Paimon.

Catatan

Baik kunci maupun nilai harus bertipe STRING, seperti 'jdbcWriteBatchSize' = '1024'.

source_table

Nama tabel sumber. Secara opsional, gunakan nama lengkap dengan menyertakan katalog dan database-nya.

OPTIONS

Opsi konektor untuk tabel sumber. Untuk informasi selengkapnya, lihat "Opsi konektor dalam klausa WITH" di Konektor MySQL dan Konektor Kafka.

Catatan

Baik kunci maupun nilai harus bertipe STRING, seperti 'server-id' = '65500'.

ADD COLUMN

Menambahkan kolom terhitung ke tabel sink atau mengganti nama kolom sumber.

Penting

Pemetaan kolom murni (misalnya, col AS new_col) dapat dioptimalkan. Untuk menjamin pemetaan tersebut, gunakan ekspresi tanpa komputasi, seperti col AS new_col + INTERVAL '0' SECOND.

column_component

Deskripsi kolom baru.

computed_column_expression

Deskripsi ekspresi kolom terhitung.

FIRST

Menentukan bahwa kolom baru digunakan sebagai bidang pertama di tabel sink. Secara default, kolom baru ditambahkan di akhir tabel sink.

AFTER

Menentukan bahwa kolom baru ditambahkan setelah bidang tertentu.

Catatan
  • Kata kunci IF NOT EXISTS wajib digunakan. Kata kunci ini meminta sistem untuk memeriksa keberadaan tabel sink di penyimpanan tujuan. Jika tidak ada, sistem akan membuat tabel sink. Jika sudah ada, pembuatan tabel dilewati.

  • Tabel sink yang dibuat berbagi skema tabel sumber, termasuk primary key dan nama serta tipe bidang fisik, tetapi tidak termasuk kolom terhitung, bidang metadata, dan konfigurasi watermark.

  • Realtime Compute for Apache Flink melakukan pemetaan tipe data dari tabel sumber ke tabel sink selama sinkronisasi data. Untuk informasi selengkapnya tentang pemetaan tipe data, lihat dokumen konektor spesifik.

Contoh

Menyinkronkan tabel

Deskripsi: Menyinkronkan tabel web_sales dari MySQL ke Hologres.

Prasyarat:

  • Katalog Hologres bernama holo telah dibuat.

  • Katalog MySQL bernama mysql telah dibuat.

Kode contoh:

Pernyataan CTAS sering digunakan bersama katalog sumber dan tujuan untuk mendukung sinkronisasi data penuh dan inkremental. Katalog sumber secara otomatis mengurai skema dan properti tabel sumber tanpa DDL eksplisit.

USE CATALOG holo;

CREATE TABLE IF NOT EXISTS web_sales   -- Menyinkronkan data ke tabel web_sales di database default.
WITH ('jdbcWriteBatchSize' = '1024')   -- Secara opsional konfigurasikan opsi konektor untuk tabel sink.
AS TABLE mysql.tpcds.web_sales   
/*+ OPTIONS('server-id'='8001-8004') */; -- Secara opsional konfigurasikan opsi tambahan untuk tabel sumber CDC MySQL.

Menggabungkan dan menyinkronkan shard tabel dan database

Deskripsi: Menggabungkan tabel dan database MySQL yang di-shard sebelum menyinkronkan data ke tabel Hologres.

Metode: Gunakan katalog MySQL dan regular expressions untuk mencocokkan database dan tabel yang ingin disinkronkan.

Nama database dan tabel ditulis ke tabel sink sebagai nilai dua bidang tambahan. Primary key tabel sink terdiri dari nama database, nama tabel, dan primary key asli untuk memastikan keunikan primary key.

Kode dan hasil:

Kode contoh

Hasil

Menggabungkan dan menyinkronkan shard tabel:

USE CATALOG holo;

CREATE TABLE IF NOT EXISTS user
WITH ('jdbcWriteBatchSize' = '1024')
AS TABLE mysql.`wp.*`.`user[0-9]+`  
/*+ OPTIONS('server-id'='8001-8004') */;

效果

Mengubah skema tabel sumber: Tambahkan kolom baru bernama age ke tabel user02 dan tambahkan catatan baru. Data dan perubahan skema pada tabel user02 disinkronkan ke tabel sink secara real time meskipun skema tabel sumber berbeda.

ALTER TABLE `user02` ADD COLUMN `age` INT;
INSERT INTO `user02` (id, name, age) VALUES (27, 'Tony', 30);

image

Menyinkronkan kolom terhitung kustom

Deskripsi: Selama sinkronisasi gabungan shard tabel dan database dari MySQL ke Hologres, tambahkan kolom terhitung kustom.

Kode dan hasil:

Kode contoh

Hasil

USE CATALOG holo;

CREATE TABLE IF NOT EXISTS user
WITH ('jdbcWriteBatchSize' = '1024')
AS TABLE mysql.`wp.*`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */
ADD COLUMN (
  `c_id` AS `id` + 10 AFTER `id`,
  `calss` AS 3  AFTER `id`
);

image

Menjalankan beberapa pernyataan CTAS dalam satu pekerjaan

Deskripsi: Menyinkronkan shard tabel web_sales dan user dari MySQL ke Hologres dalam satu pekerjaan.

Metode: Gunakan STATEMENT SET untuk mengeksekusi beberapa pernyataan CTAS sebagai satu kelompok. Pendekatan ini menggunakan kembali vertex sumber untuk membaca data dari beberapa tabel, sehingga mengurangi jumlah server ID, koneksi database, dan beban baca secara keseluruhan.

Penting

Kode contoh:

USE CATALOG holo;

BEGIN STATEMENT SET;

-- Menyinkronkan data dari tabel web_sales.
CREATE TABLE IF NOT EXISTS web_sales
AS TABLE mysql.tpcds.web_sales
/*+ OPTIONS('server-id'='8001-8004') */;

-- Menyinkronkan data dari shard tabel user.
CREATE TABLE IF NOT EXISTS user
AS TABLE mysql.`wp.*`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */;

END;

Menyinkronkan data dari sumber ke beberapa sink menggunakan beberapa pernyataan CTAS

  • Tidak ada kolom terhitung yang ditambahkan ke tabel sink

    USE CATALOG `holo`;
    
    BEGIN STATEMENT SET;
    
    -- Menyinkronkan data dari tabel MySQL user ke tabel user di database1 Hologres.
    CREATE TABLE IF NOT EXISTS `database1`.`user`
    AS TABLE `mysql`.`tpcds`.`user`
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    -- Menyinkronkan data dari tabel database MySQL user ke tabel user di database2 Hologres.
    CREATE TABLE IF NOT EXISTS `database2`.`user`
    AS TABLE `mysql`.`tpcds`.`user`
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    END;
  • Kolom terhitung ditambahkan ke tabel sink

    -- Buat tabel temporary bernama user_with_changed_id berdasarkan tabel sumber user. Definisikan kolom computed_id berdasarkan kolom id tabel sumber.
    CREATE TEMPORARY TABLE `user_with_changed_id` (
      `computed_id` AS `id` + 1000
    ) LIKE `mysql`.`tpcds`.`user`;
    
    -- Buat tabel temporary bernama user_with_changed_age berdasarkan tabel sumber user. Definisikan kolom computed_age berdasarkan kolom age tabel sumber.
    CREATE TEMPORARY TABLE `user_with_changed_age` (
      `computed_age` AS `age` + 1
    ) LIKE `mysql`.`tpcds`.`user`;
    
    BEGIN STATEMENT SET;
    
    -- Menyinkronkan data dari tabel database MySQL user ke tabel user_with_changed_id Hologres. Tabel user_with_changed_id berisi ID yang diperoleh dari perhitungan berdasarkan kolom id tabel sumber. ID tersebut berada di kolom computed_id.
    CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`user_with_changed_id`
    AS TABLE `user_with_changed_id`
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    -- Menyinkronkan data dari tabel database MySQL user ke tabel user_with_changed_age Hologres. Tabel user_with_changed_age berisi nilai usia yang diperoleh dari perhitungan berdasarkan kolom age tabel sumber. Nilai usia tersebut berada di kolom computed_age.
    CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`user_with_changed_age`
    AS TABLE `user_with_changed_age`
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    END;

Menyinkronkan tabel baru menggunakan beberapa pernyataan CTAS

Deskripsi skenario: Setelah pekerjaan yang menggunakan beberapa pernyataan CTAS untuk sinkronisasi dimulai, tambahkan pernyataan CTAS untuk mereplikasi tabel baru.

Metode: Aktifkan deteksi tabel baru untuk pekerjaan, tambahkan pernyataan CTAS ke kode SQL pekerjaan, dan restart dari titik simpan. Setelah tabel baru ditangkap, data akan direplikasi.

Batasan:

  • Deteksi tabel baru didukung untuk VVR 8.0.1 atau versi yang lebih baru.

  • Saat data disinkronkan dari tabel sumber CDC, hanya pekerjaan yang dimulai dalam mode initial yang dapat mendeteksi tabel baru.

  • Konfigurasi tabel sumber yang ditambahkan menggunakan pernyataan CTAS baru harus sama dengan konfigurasi tabel sumber asli. Dengan demikian, vertex sumber dapat digunakan kembali.

  • Parameter konfigurasi pekerjaan sebelum dan sesudah penambahan pernyataan CTAS harus sama. Misalnya, mode startup harus sama.

Prosedur:

  1. Pada halaman Deployments, temukan deployment target dan klik Cancel di kolom Actions.

  2. Pada dialog, perluas bagian More Strategies, pilih Stop With Savepoint, lalu klik OK.

  3. Pada draf SQL pekerjaan, aktifkan deteksi tabel baru dan tambahkan pernyataan CTAS.

    1. Tambahkan pernyataan berikut untuk mengaktifkan deteksi tabel baru.

      SET 'table.cdas.scan.newly-added-table.enabled' = 'true';
    2. Tambahkan pernyataan CTAS. Kode lengkap pekerjaan terlihat seperti berikut:

      -- Aktifkan deteksi tabel baru
      SET 'table.cdas.scan.newly-added-table.enabled' = 'true';
      
      USE CATALOG holo;
      
      BEGIN STATEMENT SET;
      
      -- Menyinkronkan data dari tabel web_sales.
      CREATE TABLE IF NOT EXISTS web_sales
      AS TABLE mysql.tpcds.web_sales
      /*+ OPTIONS('server-id'='8001-8004') */;
      
      -- Menyinkronkan data dari shard tabel user.
      CREATE TABLE IF NOT EXISTS user
      AS TABLE mysql.`wp.*`.`user[0-9]+`
      /*+ OPTIONS('server-id'='8001-8004') */;
      
      -- Menyinkronkan data dari tabel product.
      CREATE TABLE IF NOT EXISTS product
      AS TABLE mysql.tpcds.product
      /*+ OPTIONS('server-id'='8001-8004') */;
      
      END;
    3. Klik Deploy.

  4. Pulihkan pekerjaan dari titik simpan.

    1. Pada halaman Deployments, klik nama deployment Anda.

    2. Pada halaman detail deployment, klik tab State. Lalu, klik subtab History.

    3. Pada daftar Savepoints, temukan titik simpan yang dibuat saat pekerjaan dibatalkan.

    4. Pilih More > Start job from this savepoint di kolom Actions. Untuk informasi selengkapnya, lihat Memulai penerapan Pekerjaan.

Menyinkronkan ke tabel partisi di Hologres

Deskripsi skenario: Mereplikasi data dari MySQL ke tabel partisi Hologres.

Catatan penggunaan: Jika primary key didefinisikan untuk tabel Hologres, kolom partisi harus disertakan dalam primary key.

Kode contoh:

Buat tabel MySQL:

CREATE TABLE orders (
    order_id INTEGER NOT NULL,
    product_id INTEGER NOT NULL,
    city VARCHAR(100) NOT NULL
    order_date DATE,
    purchaser INTEGER,
    PRIMARY KEY(order_id, product_id)
);

Bergantung pada apakah kolom partisi termasuk dalam primary key, pilih metode yang tepat:

  • Jika primary key sumber berisi kolom partisi:

    • Gunakan pernyataan CTAS secara langsung.

      Hologres akan secara otomatis memverifikasi apakah kolom partisi disertakan dalam primary key.

      CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders`
      PARTITIONED BY (product_id)
      AS TABLE `mysql`.`tpcds`.`orders`;
  • Jika primary key sumber tidak mencakup kolom partisi:

    • Deklarasikan primary key tabel sink dalam pernyataan CTAS dan sertakan kolom partisi dalam definisi primary key.

      Dalam kasus ini, tidak mendeklarasikan ulang primary key atau tidak menyertakan kolom partisi di dalamnya akan menyebabkan pekerjaan gagal.

      -- Deklarasikan bidang order_id, product_id, dan city sebagai primary key tabel partisi Hologres.
      CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders`(
          CONSTRAINT `PK_order_id_city` PRIMARY KEY (`order_id`,`product_id`,`city`) NOT ENFORCED
      )
      PARTITIONED BY (city)
      AS TABLE `mysql`.`tpcds`.`orders`;

Melebarkan tipe data selama replikasi

Deskripsi skenario: Selama sinkronisasi data, ubah presisi kolom, misalnya dari VARCHAR(10) ke VARCHAR(20), atau ubah tipe data kolom, seperti dari SMALLINT ke INT.

Metode:

  • Pekerjaan baru: Aktifkan mode normalisasi tipe saat peluncuran pertama.

  • Pekerjaan yang sudah ada: Hapus tabel sink Hologres, lalu restart tanpa state untuk menerapkan normalisasi tipe.

Aturan normalisasi tipe:

Jika tipe data baru dan asli dinormalisasi ke tipe data yang sama, perubahan tipe data berhasil dan pekerjaan berjalan normal. Jika tidak, pengecualian akan dilaporkan. Rinciannya sebagai berikut:

  • TINYINT, SMALLINT, INT, dan BIGINT dikonversi menjadi BIGINT.

  • CHAR, VARCHAR, dan STRING dikonversi menjadi STRING.

  • FLOAT dan DOUBLE dikonversi menjadi DOUBLE.

  • Tipe data lain dikonversi berdasarkan pemetaan tipe data antara bidang Hologres dan Flink. Untuk informasi selengkapnya, lihat Pemetaan tipe data.

Kode contoh:

CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders` 
WITH (
'connector' = 'hologres', 
'enableTypeNormalization' = 'true' -- Aktifkan mode normalisasi tipe.
) AS TABLE `mysql`.`tpcds`.`orders`;

Menyinkronkan data dari MongoDB ke Hologres

Batasan:

  • Dukungan terbatas pada VVR 8.0.6 atau versi yang lebih baru dan MongoDB versi 6.0 atau lebih baru.

  • Pada opsi konektor untuk tabel sumber, scan.incremental.snapshot.enabled dan scan.full-changelog harus diatur ke true.

  • Fitur preimage dan postimage harus diaktifkan untuk database MongoDB. Untuk informasi selengkapnya, lihat Document Preimages.

  • Untuk menyinkronkan data dari beberapa koleksi MongoDB dalam satu pekerjaan, pastikan konfigurasi opsi konektor berikut identik untuk semua tabel:

    • Opsi terkait database MongoDB, termasuk hosts, scheme, username, password, dan connectionOptions

    • scan.startup.mode

Kode contoh:

BEGIN STATEMENT SET;

CREATE TABLE IF NOT EXISTS `holo`.`database`.`table1`
AS TABLE `mongodb`.`database`.`collection1`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true','scan.full-changelog'='true') */;

CREATE TABLE IF NOT EXISTS `holo`.`database`.`table2`
AS TABLE `mongodb`.`database`.`collection2`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true','scan.full-changelog'='true') */;

END;

FAQ

Kesalahan runtime

Performa pekerjaan

Sinkronisasi data

Referensi