全部产品
Search
文档中心

Realtime Compute for Apache Flink:CREATE TABLE AS (CTAS)

更新时间:Jul 06, 2025

Pernyataan CREATE TABLE AS (CTAS) dapat digunakan untuk menyinkronkan data dan perubahan skema tabel dari sistem upstream ke downstream secara real-time. Pernyataan ini meningkatkan efisiensi pembuatan tabel di sistem tujuan serta menyinkronkan perubahan skema tabel. Topik ini menjelaskan cara menggunakan pernyataan CTAS beserta contoh-contoh dalam berbagai skenario.

Catatan

Disarankan untuk membuat pekerjaan menggunakan YAML untuk ingest data. Anda juga dapat mengonversi draf SQL yang ada yang berisi pernyataan CTAS atau CDAS menjadi draf YAML:

  • Pengenalan: Anda dapat mengembangkan pekerjaan dengan menggunakan YAML untuk menyinkronkan data dari sumber ke tujuan.

  • Keuntungan: Kemampuan utama dari pernyataan CTAS dan CDAS didukung, termasuk sinkronisasi basis data, tabel, skema tabel, dan kolom terhitung kustom. Selain itu, evolusi skema real-time, sinkronisasi data log biner mentah, klausa WHERE, dan pemangkasan kolom juga didukung.

Untuk informasi lebih lanjut, lihat Gunakan penerapan YAML untuk ingest data.

Fitur

Sinkronisasi Data

Fitur

Deskripsi

Menyinkronkan tabel

Menyinkronkan data lengkap dan data tambahan dari tabel sumber ke tabel sink secara real-time. (Contoh: Menyinkronkan tabel)

Menggabungkan dan menyinkronkan shard tabel

Menggunakan ekspresi reguler untuk mencocokkan nama basis data dan shard tabel. Kemudian, Anda dapat menggabungkan shard tabel ini dan menyinkronkan datanya ke tabel sink. (Contoh: Menggabungkan dan menyinkronkan shard tabel)

Catatan

Tanda caret (^) tidak dapat digunakan untuk mencocokkan awal nama tabel.

Menyinkronkan kolom terhitung kustom

Menambahkan kolom terhitung untuk mengonversi dan memproses kolom tertentu. Anda dapat menggunakan fungsi sistem atau fungsi yang ditentukan pengguna (UDF) untuk kolom terhitung dan menentukan posisi kolom terhitung yang ingin Anda tambahkan. Kolom terhitung baru akan digunakan sebagai kolom fisik di tabel sink, dan hasilnya disinkronkan ke tabel sink secara real-time. (Contoh: Menyinkronkan kolom terhitung kustom)

Menjalankan beberapa pernyataan CTAS

Evolusi Skema

Selama sinkronisasi data, pernyataan CTAS mendukung replikasi perubahan skema, termasuk pembuatan tabel dan modifikasi skema, dari tabel sumber ke tabel sink.

  • Perubahan skema yang didukung

    Perubahan skema

    Deskripsi

    Menambahkan kolom nullable

    Secara otomatis menambahkan kolom terkait ke akhir skema tabel sink, dengan data disinkronkan ke kolom yang ditambahkan. Kolom baru diatur sebagai kolom nullable secara default, dan data dalam kolom ini sebelum perubahan secara otomatis diatur ke NULL.

    Menambahkan kolom non-null

    Secara otomatis menambahkan kolom terkait ke akhir skema tabel sink dan menyinkronkan data.

    Menghapus kolom nullable

    Secara otomatis mengisi nilai null di kolom nullable tabel sink alih-alih menghapus kolom dari tabel.

    Mengganti nama kolom

    Operasi mengganti nama kolom melibatkan penambahan kolom dan penghapusan kolom. Setelah kolom diganti namanya di tabel sumber, kolom yang menggunakan nama baru ditambahkan ke akhir tabel sink dan kolom yang menggunakan nama asli diisi dengan nilai null.

    Catatan

    Sebagai contoh, jika nama kolom col_a di tabel sumber diubah menjadi col_b, kolom col_b ditambahkan ke akhir tabel sink dan kolom col_a secara otomatis diisi dengan nilai null.

    Memodifikasi tipe data kolom

    • Untuk sistem downstream yang mendukung modifikasi tipe kolom: Saat ini, Paimon adalah satu-satunya sistem downstream yang mendukung perubahan tipe. Pernyataan CTAS mendukung modifikasi tipe kolom biasa, seperti dari INT ke BIGINT.

      Kompatibilitas bergantung pada aturan spesifik sistem (lihat dokumentasi konektor).

    • Untuk sistem downstream yang tidak mendukung modifikasi tipe kolom: Saat ini, hanya Hologres yang mendukung penggunaan pelebaran tipe untuk menangani perubahan tipe kolom. Berikut mekanismenya: Saat pekerjaan dimulai, tabel Hologres dengan tipe data yang lebih lebar dibuat, dan perubahan tipe kolom didukung berdasarkan kompatibilitas sink. Untuk informasi lebih lanjut, lihat Contoh: Menyinkronkan data dalam mode normalisasi tipe.

      Penting

      Untuk mengaktifkan Hologres mendukung pelebaran tipe, aktifkan mode normalisasi tipe selama peluncuran pekerjaan pertama. Untuk pekerjaan yang sudah ada, hapus tabel Hologres dan mulai ulang tanpa status untuk menerapkan pengaturan normalisasi tipe.

    Penting

    Saat pernyataan CTAS digunakan untuk sinkronisasi, sistem hanya membandingkan perbedaan skema dan tidak mengidentifikasi jenis DDL tertentu. Sebagai contoh:

    • Jika sebuah kolom dihapus dan kemudian ditambahkan lagi tanpa perubahan data selama periode ini, sistem mendeteksi tidak ada perubahan skema.

    • Jika sebuah kolom dihapus dan kemudian ditambahkan lagi dengan perubahan data selama periode ini, sistem mendeteksi dan menyinkronkan perubahan skema.

  • Perubahan skema yang tidak didukung

    • Memodifikasi batasan, seperti kunci utama atau indeks.

    • Menghapus kolom non-nullable.

    • Mengubah dari NOT NULL menjadi NULLABLE.

    Penting

    Untuk menyinkronkan perubahan skema yang tidak didukung, hapus tabel sink secara manual dan mulai ulang pekerjaan Anda untuk menyinkronkan ulang data historis.

Proses Sinkronisasi

Bagan alir berikut menunjukkan proses menyinkronkan data dari MySQL ke Hologres dengan pernyataan CTAS.

Bagan Alir

Deskripsi

Saat mengeksekusi pernyataan CTAS, Realtime Compute for Apache Flink melakukan hal berikut:

  1. Memverifikasi keberadaan tabel sink di sistem tujuan.

    • Jika tidak ada, Realtime Compute for Apache Flink membuat tabel sink yang mencerminkan skema tabel sumber menggunakan katalog penyimpanan tujuan.

    • Jika ada, Realtime Compute for Apache Flink melewati pembuatan tabel dan memverifikasi konsistensi skema tabel sink dengan skema tabel sumber. Jika skemanya berbeda, kesalahan dilaporkan.

  2. Mengirim dan menjalankan pekerjaan sinkronisasi data.

    Realtime Compute for Apache Flink menyinkronkan data dan perubahan skema dari sumber ke sink.

Prasyarat

Katalog penyimpanan tujuan harus dibuat di ruang kerja Anda. Untuk informasi lebih lanjut, lihat Kelola katalog.

Batasan

Batasan sintaksis

  • Debugging draf SQL yang berisi pernyataan CTAS tidak didukung.

  • Pernyataan CTAS tidak dapat digunakan bersamaan dengan pernyataan INSERT INTO dalam draf SQL yang sama.

  • Data tidak dapat disinkronkan ke tabel partisi StarRocks.

  • MiniBatch tidak didukung.

    Penting

Sistem upstream dan downstream yang didukung

Tabel berikut menjelaskan penyimpanan data upstream dan downstream yang dapat digunakan dengan pernyataan CTAS.

Konektor

Tabel Sumber

Tabel Sink

Catatan

MySQL

Didukung

Tidak Didukung

  • Nama basis data dan tabel disinkronkan secara otomatis selama penggabungan dan sinkronisasi tabel dan basis data sharded.

  • Selama sinkronisasi tabel tunggal, untuk menyinkronkan nama basis data dan tabel Anda, buat katalog MySQL melalui Flink SQL dan sertakan opsi catalog.table.metadata-columns. Untuk informasi lebih lanjut, lihat Kelola katalog MySQL.

  • Tampilan tidak dapat disinkronkan.

Kafka Konektor

Didukung

Tidak Didukung

N/A

MongoDB

Didukung

Tidak Didukung

  • Penggabungan dan sinkronisasi tabel dan basis data sharded tidak didukung.

  • Metadata basis data MongoDB tidak dapat disinkronkan.

  • Tabel baru di basis data sumber tidak dapat ditangkap atau disinkronkan.

  • Data dan perubahan skema tabel dapat disinkronkan dari MongoDB ke sistem lain. Untuk informasi lebih lanjut, lihat Sinkronkan data dari tabel sumber MongoDB ke tabel Hologres.

Upsert Kafka

Tidak Didukung

Didukung

N/A

StarRocks Konektor

Tidak Didukung

Didukung

Dukungan terbatas pada StarRocks di Alibaba Cloud EMR.

Hologres Konektor

Tidak Didukung

Didukung

Saat Hologres bertindak sebagai sistem tujuan sinkronisasi data, sistem secara otomatis membuat koneksi untuk setiap tabel berdasarkan nilai opsi connectionSize. Anda dapat mengonfigurasi kumpulan koneksi yang sama untuk beberapa tabel dengan menggunakan opsi connectionPoolName.

Catatan

Jika tipe data di tabel sumber tidak didukung oleh fitur rencana tetap Hologres, gunakan pernyataan INSERT INTO untuk sinkronisasi data. Jangan gunakan pernyataan CTAS, yang memberikan kinerja penulisan lebih rendah karena rencana tetap tidak dapat digunakan.

Paimon Konektor

Tidak Didukung

Didukung

Sintaksis

CREATE TABLE IF NOT EXISTS <sink_table>
(
  [ <table_constraint> ]
)
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
WITH (
  key1=val1,
  key2=val2, 
  ...
 )
AS TABLE <source_table> [/*+ OPTIONS(key1=val1, key2=val2, ... ) */]
[ADD COLUMN { <column_component> | (<column_component> [, ...])}];

<sink_table>:
  [catalog_name.][db_name.]table_name

<table_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED

<source_table>:
  [catalog_name.][db_name.]table_name

<column_component>:
  column_name AS computed_column_expression [COMMENT column_comment] [FIRST | AFTER column_name]

Pernyataan CTAS menggunakan sintaksis dasar dari pernyataan CREATE TABLE. Tabel berikut menjelaskan beberapa argumen:

Argumen

Deskripsi

sink_table

Nama tabel tujuan untuk sinkronisasi data. Secara opsional, gunakan nama tabel yang sepenuhnya memenuhi syarat dengan menyertakan katalog dan basis datanya.

COMMENT

Deskripsi tabel sink. Secara default, deskripsi dari source_table digunakan.

PARTITIONED BY

Menentukan kolom partisi.

Penting

Data tidak dapat disinkronkan ke tabel partisi StarRocks.

table_constraint

Kunci utama, yang merupakan pengenal unik untuk setiap catatan dalam tabel.

WITH

Opsi konektor untuk tabel sink. Untuk informasi lebih lanjut, lihat bagian "Opsi Konektor dalam Klausul WITH" di Upsert Kafka Konektor, Hologres Konektor, StarRocks Konektor, atau Paimon Konektor.

Catatan

Baik kunci maupun nilai harus bertipe STRING, seperti 'jdbcWriteBatchSize' = '1024'.

source_table

Nama tabel sumber. Secara opsional, gunakan nama yang sepenuhnya memenuhi syarat termasuk katalog dan basis data tabel.

OPTIONS

Opsi konektor untuk tabel sumber. Untuk informasi lebih lanjut, lihat "Opsi Konektor dalam Klausul WITH" di MySQL Konektor dan Kafka Konektor.

Catatan

Baik kunci maupun nilai harus bertipe STRING, seperti 'server-id' = '65500'.

ADD COLUMN

Menambahkan kolom ke tabel sink selama sinkronisasi data. Hanya kolom terhitung yang didukung.

column_component

Deskripsi kolom baru.

computed_column_expression

Deskripsi ekspresi kolom terhitung.

FIRST

Menentukan bahwa kolom baru digunakan sebagai bidang pertama di tabel sink. Secara default, kolom baru ditambahkan di akhir tabel sink.

AFTER

Menentukan bahwa kolom baru ditambahkan setelah bidang tertentu.

Catatan
  • Kata kunci IF NOT EXISTS diperlukan. Ini meminta sistem untuk memeriksa keberadaan tabel sink di penyimpanan tujuan. Jika tidak ada, sistem akan membuat tabel sink. Jika ada, pembuatan tabel dilewati.

  • Tabel sink yang dibuat menggunakan skema tabel sumber, termasuk kunci utama dan nama serta tipe bidang fisik, tetapi mengecualikan kolom terhitung, bidang metadata, dan konfigurasi watermark.

  • Realtime Compute for Apache Flink melakukan pemetaan tipe data dari tabel sumber ke tabel sink selama sinkronisasi data. Untuk informasi lebih lanjut tentang pemetaan tipe data, lihat dokumen konektor spesifik.

Contoh

Menyinkronkan Tabel

Deskripsi: Sinkronkan tabel web_sales dari MySQL ke Hologres.

Prasyarat:

  • Katalog Hologres bernama holo telah dibuat.

  • Katalog MySQL bernama mysql telah dibuat.

Kode contoh:

Pernyataan CTAS sering digunakan bersama dengan katalog sumber dan tujuan untuk mendukung sinkronisasi data lengkap dan tambahan. Katalog sumber secara otomatis mengurai skema tabel sumber dan properti tanpa DDL eksplisit.

USE CATALOG holo;

CREATE TABLE IF NOT EXISTS web_sales   -- Sinkronkan data ke tabel web_sales di basis data default.
WITH ('jdbcWriteBatchSize' = '1024')   -- Secara opsional konfigurasikan opsi konektor untuk tabel sink.
AS TABLE mysql.tpcds.web_sales   
/*+ OPTIONS('server-id'='8001-8004') */; -- Secara opsional konfigurasikan opsi tambahan untuk tabel sumber CDC MySQL.

Menggabungkan dan Menyinkronkan Shard Tabel dan Basis Data

Deskripsi: Gabungkan tabel dan basis data sharded MySQL sebelum menyinkronkan data ke tabel Hologres.

Metode: Gunakan katalog MySQL dan ekspresi reguler untuk mencocokkan basis data dan tabel yang ingin Anda sinkronkan.

Nama basis data dan tabel ditulis ke tabel sink sebagai nilai dua bidang tambahan. Kunci utama tabel sink terdiri dari nama basis data, nama tabel, dan kunci utama asli untuk memastikan bahwa kunci utamanya unik.

Kode dan hasil:

Kode contoh

Hasil

Menggabungkan dan menyinkronkan shard tabel:

USE CATALOG holo;

CREATE TABLE IF NOT EXISTS user
WITH ('jdbcWriteBatchSize' = '1024')
AS TABLE mysql.`wp.*`.`user[0-9]+`  
/*+ OPTIONS('server-id'='8001-8004') */;

效果

Mengubah skema tabel sumber: Tambahkan kolom baru bernama age ke tabel user02 dan rekaman baru. Perubahan data dan skema pada tabel user02 disinkronkan ke tabel sink secara real-time meskipun skema tabel sumber berbeda.

ALTER TABLE `user02` ADD COLUMN `age` INT;
INSERT INTO `user02` (id, name, age) VALUES (27, 'Tony', 30);

image

Menyinkronkan Kolom Terhitung Kustom

Deskripsi: Selama sinkronisasi tabel dan basis data sharded gabungan dari MySQL ke Hologres, tambahkan kolom terhitung kustom.

Kode dan hasil:

Kode contoh

Hasil

USE CATALOG holo;

CREATE TABLE IF NOT EXISTS user
WITH ('jdbcWriteBatchSize' = '1024')
AS TABLE mysql.`wp.*`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */
ADD COLUMN (
  `c_id` AS `id` + 10 AFTER `id`,
  `calss` AS 3  AFTER `id`
);

image

Menjalankan beberapa pernyataan CTAS dalam satu pekerjaan

Deskripsi: Sinkronkan tabel web_sales dan user dari MySQL ke Hologres dalam satu pekerjaan.

Metode: Gunakan STATEMENT SET untuk menjalankan beberapa pernyataan CTAS sebagai grup. Pendekatan ini menggunakan ulang simpul sumber untuk membaca data dari beberapa tabel, mengurangi jumlah ID server, koneksi basis data, dan beban baca keseluruhan.

Penting

Kode contoh:

USE CATALOG holo;

BEGIN STATEMENT SET;

-- Sinkronkan data dari tabel web_sales.
CREATE TABLE IF NOT EXISTS web_sales
AS TABLE mysql.tpcds.web_sales
/*+ OPTIONS('server-id'='8001-8004') */;

-- Sinkronkan data dari shard tabel user.
CREATE TABLE IF NOT EXISTS user
AS TABLE mysql.`wp.*`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */;

END;

Menyinkronkan data dari sumber ke beberapa sink menggunakan beberapa pernyataan CTAS

  • Tidak ada kolom terhitung yang ditambahkan ke tabel sink

    USE CATALOG `holo`;
    
    BEGIN STATEMENT SET;
    
    -- Sinkronkan data dari tabel user MySQL ke tabel user di database1 Hologres.
    CREATE TABLE IF NOT EXISTS `database1`.`user`
    AS TABLE `mysql`.`tpcds`.`user`
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    -- Sinkronkan data dari tabel user basis data MySQL ke tabel user di database2 Hologres.
    CREATE TABLE IF NOT EXISTS `database2`.`user`
    AS TABLE `mysql`.`tpcds`.`user`
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    END;
  • Kolom terhitung ditambahkan ke tabel sink

    -- Buat tabel sementara bernama user_with_changed_id berdasarkan tabel sumber user. Tentukan kolom computed_id berdasarkan kolom id tabel sumber.
    CREATE TEMPORARY TABLE `user_with_changed_id` (
      `computed_id` AS `id` + 1000
    ) LIKE `mysql`.`tpcds`.`user`;
    
    -- Buat tabel sementara bernama user_with_changed_age berdasarkan tabel sumber user. Tentukan kolom computed_age berdasarkan kolom age tabel sumber.
    CREATE TEMPORARY TABLE `user_with_changed_age` (
      `computed_age` AS `age` + 1
    ) LIKE `mysql`.`tpcds`.`user`;
    
    BEGIN STATEMENT SET;
    
    -- Sinkronkan data dari tabel user basis data MySQL ke tabel user_with_changed_id Hologres. Tabel user_with_changed_id berisi ID yang diperoleh dari perhitungan berdasarkan kolom id tabel sumber. ID yang diperoleh berada di kolom computed_id.
    CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`user_with_changed_id`
    AS TABLE `user_with_changed_id`
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    -- Sinkronkan data dari tabel user basis data MySQL ke tabel user_with_changed_age Hologres. Tabel user_with_changed_age berisi nilai usia yang diperoleh dari perhitungan berdasarkan kolom age tabel sumber. Nilai usia yang diperoleh berada di kolom computed_age.
    CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`user_with_changed_age`
    AS TABLE `user_with_changed_age`
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    END;

Menyinkronkan tabel baru menggunakan beberapa pernyataan CTAS

Deskripsi skenario: Setelah pekerjaan yang menggunakan beberapa pernyataan CTAS untuk sinkronisasi dimulai, tambahkan pernyataan CTAS untuk mereplikasi tabel baru.

Metode: Aktifkan deteksi tabel baru untuk pekerjaan, tambahkan pernyataan CTAS ke kode SQL pekerjaan, dan mulai ulang dari titik simpanan. Setelah tabel baru ditangkap, data akan direplikasi.

Batasan:

  • Deteksi tabel baru didukung untuk VVR 8.0.1 atau lebih baru.

  • Saat data disinkronkan dari tabel sumber CDC, hanya pekerjaan yang dimulai dalam mode awal yang dapat mendeteksi tabel baru.

  • Konfigurasi tabel sumber yang ditambahkan menggunakan pernyataan CTAS baru harus sama dengan konfigurasi tabel sumber asli. Dengan cara ini, simpul sumber dapat digunakan kembali.

  • Parameter konfigurasi pekerjaan sebelum dan sesudah menambahkan pernyataan CTAS harus sama. Misalnya, mode startup harus sama.

Prosedur:

  1. Di halaman Deployments, temukan penyebaran target dan klik Cancel di kolom Actions.

  2. Di dialog, perluas bagian More Strategies, pilih Stop With Savepoint, dan klik OK.

  3. Dalam draf SQL pekerjaan, aktifkan deteksi tabel baru dan tambahkan pernyataan CTAS.

    1. Tambahkan pernyataan berikut untuk mengaktifkan deteksi tabel baru.

      SET 'table.cdas.scan.newly-added-table.enabled' = 'true';
    2. Tambahkan pernyataan CTAS. Kode lengkap pekerjaan terlihat seperti ini:

      -- Aktifkan deteksi tabel baru
      SET 'table.cdas.scan.newly-added-table.enabled' = 'true';
      
      USE CATALOG holo;
      
      BEGIN STATEMENT SET;
      
      -- Sinkronkan data dari tabel web_sales.
      CREATE TABLE IF NOT EXISTS web_sales
      AS TABLE mysql.tpcds.web_sales
      /*+ OPTIONS('server-id'='8001-8004') */;
      
      -- Sinkronkan data dari shard tabel user.
      CREATE TABLE IF NOT EXISTS user
      AS TABLE mysql.`wp.*`.`user[0-9]+`
      /*+ OPTIONS('server-id'='8001-8004') */;
      
      -- Sinkronkan data dari tabel product.
      CREATE TABLE IF NOT EXISTS product
      AS TABLE mysql.tpcds.product
      /*+ OPTIONS('server-id'='8001-8004') */;
      
      END;
    3. Klik Deploy.

  4. Pulihkan pekerjaan dari titik simpanan.

    1. Di halaman Deployments, klik nama deployment Anda.

    2. Di halaman detail penyebaran, klik tab State. Lalu, klik subtab History.

    3. Dalam daftar Savepoints, temukan titik simpanan yang dibuat saat pekerjaan dibatalkan.

    4. Pilih More > Start job from this savepoint di kolom Actions. Untuk informasi lebih lanjut, lihat Mulai Penyebaran.

Menyinkronkan ke tabel partisi di Hologres

Deskripsi skenario: Replikasi data dari MySQL ke tabel partisi Hologres.

Catatan Penggunaan: Jika kunci utama didefinisikan untuk tabel Hologres, kolom partisi harus disertakan dalam kunci utama.

Kode contoh:

Buat tabel MySQL:

CREATE TABLE orders (
    order_id INTEGER NOT NULL,
    product_id INTEGER NOT NULL,
    city VARCHAR(100) NOT NULL
    order_date DATE,
    purchaser INTEGER,
    PRIMARY KEY(order_id, product_id)
);

Bergantung pada apakah kolom partisi merupakan bagian dari kunci utama, pilih metode yang tepat:

  • Jika kunci utama sumber mencakup kolom partisi:

    • Gunakan pernyataan CTAS langsung.

      Hologres akan secara otomatis memverifikasi apakah kolom partisi disertakan dalam kunci utama.

      CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders`
      PARTITIONED BY (product_id)
      AS TABLE `mysql`.`tpcds`.`orders`;
  • Jika kunci utama sumber tidak mencakup kolom partisi:

    • Nyatakan kunci utama tabel sink dalam pernyataan CTAS dan sertakan kolom partisi dalam definisi kunci utama.

      Dalam hal ini, tidak mendefinisikan ulang kunci utama atau menyertakan kolom partisi di dalamnya akan menyebabkan pekerjaan gagal.

      -- Nyatakan bidang order_id, product_id, dan city sebagai kunci utama tabel partisi Hologres.
      CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders`(
          CONSTRAINT `PK_order_id_city` PRIMARY KEY (`order_id`,`product_id`,`city`) NOT ENFORCED
      )
      PARTITIONED BY (city)
      AS TABLE `mysql`.`tpcds`.`orders`;

Perlebar tipe data selama replikasi

Deskripsi skenario: Selama sinkronisasi data, ubah presisi kolom, seperti dari VARCHAR(10) menjadi VARCHAR(20), atau ubah tipe data kolom, seperti dari SMALLINT menjadi INT.

Metode:

  • Pekerjaan baru: Aktifkan mode normalisasi tipe saat peluncuran pertama.

  • Pekerjaan yang sudah ada: Hapus tabel sink Hologres, dan mulai ulang tanpa status untuk menerapkan normalisasi tipe.

Aturan normalisasi tipe:

Jika tipe data baru dan asli dinormalisasi menjadi tipe data yang sama, tipe data dapat berhasil diubah dan pekerjaan akan berjalan normal. Jika tidak, pengecualian akan dilaporkan. Detailnya adalah sebagai berikut:

  • TINYINT, SMALLINT, INT, dan BIGINT dikonversi menjadi BIGINT.

  • CHAR, VARCHAR, dan STRING dikonversi menjadi STRING.

  • FLOAT dan DOUBLE dikonversi menjadi DOUBLE.

  • Tipe data lainnya dikonversi berdasarkan pemetaan tipe data antara bidang Hologres dan Flink. Untuk informasi lebih lanjut, lihat Pemetaan Tipe Data.

Kode contoh:

CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders` 
WITH (
'connector' = 'hologres', 
'enableTypeNormalization' = 'true' -- Aktifkan mode normalisasi tipe.
) AS TABLE `mysql`.`tpcds`.`orders`;

Menyinkronkan data dari MongoDB ke Hologres

Batasan:

  • Dukungan terbatas pada VVR 8.0.6 atau lebih baru dan versi MongoDB 6.0 atau lebih baru.

  • Dalam opsi konektor untuk tabel sumber, scan.incremental.snapshot.enabled dan scan.full-changelog harus diatur ke true.

  • Fitur preimage dan postimage harus diaktifkan untuk basis data MongoDB. Untuk informasi lebih lanjut, lihat Dokumen Preimages.

  • Untuk menyinkronkan data dari beberapa koleksi MongoDB dalam satu pekerjaan, pastikan konfigurasi opsi konektor berikut identik untuk semua tabel:

    • Opsi terkait basis data MongoDB, termasuk hosts, scheme, username, password, dan connectionOptions.

    • scan.startup.mode.

Kode contoh:

BEGIN STATEMENT SET;

CREATE TABLE IF NOT EXISTS `holo`.`database`.`table1`
AS TABLE `mongodb`.`database`.`collection1`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true','scan.full-changelog'='true') */;

CREATE TABLE IF NOT EXISTS `holo`.`database`.`table2`
AS TABLE `mongodb`.`database`.`collection2`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true','scan.full-changelog'='true') */;

END;

FAQ

Kesalahan Runtime

Kinerja Pekerjaan

Sinkronisasi Data

Referensi