全部产品
Search
文档中心

Realtime Compute for Apache Flink:Konektor

更新时间:Nov 10, 2025

Topik ini menjawab pertanyaan umum (FAQ) mengenai konektor untuk Realtime Compute for Apache Flink.

Bagaimana Flink mendapatkan data JSON dari Kafka?

  • Untuk mengambil data JSON biasa, lihat Format JSON.

  • Untuk mengambil data JSON bersarang, Anda dapat mendefinisikan objek JSON menggunakan format ROW dalam DDL tabel sumber. Dalam DDL tabel sink, Anda dapat mendefinisikan kunci yang sesuai dengan data JSON yang ingin Anda ambil. Dalam pernyataan DML, Anda dapat menentukan cara mengambil kunci tersebut. Hal ini memungkinkan Anda mengambil nilai kunci bersarang yang sesuai. Kode berikut memberikan contoh:

    • Data contoh

      {
          "a":"abc",
          "b":1,
          "c":{
              "e":["1","2","3","4"],
              "f":{"m":"567"}
          }
      }
    • DDL tabel sumber

      CREATE TEMPORARY TABLE `kafka_table` (
        `a` VARCHAR,
         b int,
        `c` ROW<e ARRAY<VARCHAR>,f ROW<m VARCHAR>>  -- c adalah objek JSON yang sesuai dengan ROW di Flink. e adalah daftar JSON yang sesuai dengan ARRAY.
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'xxx',
        'properties.bootstrap.servers' = 'xxx',
        'properties.group.id' = 'xxx',
        'format' = 'json',
        'scan.startup.mode' = 'xxx'
      );
    • DDL tabel sink

      CREATE TEMPORARY TABLE `sink` (
       `a` VARCHAR,
        b INT,
        e VARCHAR,
        `m` varchar
      ) WITH (
        'connector' = 'print',
        'logger' = 'true'
      );
    • Pernyataan DML

      INSERT INTO `sink`
        SELECT 
        `a`,
        b,
        c.e[1], -- Flink melakukan traversal array mulai dari indeks 1. Contoh ini mengambil elemen pada indeks 1 array. Untuk mengambil seluruh array, hapus [1].
        c.f.m
      FROM `kafka_table`;
    • Hasil pengujian测试结果

Apa yang harus saya lakukan jika Flink memiliki konektivitas jaringan dengan Kafka tetapi tidak dapat mengonsumsi atau menulis data?

  • Penyebab

    Jika proxy, pemetaan port, atau mekanisme penerusan lain digunakan antara Flink dan Kafka, alamat jaringan yang ditarik oleh klien Kafka dari server Kafka adalah alamat server Kafka, bukan alamat proxy. Dalam kasus ini, Flink tidak dapat mengonsumsi atau menulis data meskipun terdapat konektivitas jaringan antara Flink dan Kafka.

    Koneksi antara Flink dan klien Kafka (konektor Kafka Flink) dibentuk dalam dua langkah:

    1. Klien Kafka menarik metadata dari server Kafka (broker). Metadata mencakup alamat jaringan semua broker di server Kafka.

    2. Flink menggunakan klien Kafka untuk menarik alamat jaringan dari server Kafka guna mengonsumsi atau menulis data.

  • Pemecahan Masalah

    Anda dapat melakukan langkah-langkah berikut untuk memeriksa apakah proxy, pemetaan port, atau mekanisme penerusan lain digunakan antara Flink dan Kafka:

    1. Gunakan antarmuka baris perintah ZooKeeper (zkCli.sh atau zookeeper-shell.sh) untuk masuk ke kluster ZooKeeper yang digunakan oleh kluster Kafka Anda.

    2. Jalankan perintah yang sesuai untuk kluster Anda guna mengambil metadata broker Kafka Anda.

      Anda biasanya dapat menggunakan perintah get /brokers/ids/0 untuk mengambil metadata broker Kafka. Alamat koneksi Kafka berada di field endpoints.endpoint

    3. Gunakan perintah seperti ping atau telnet untuk menguji konektivitas jaringan antara Flink dan endpoint yang ditampilkan dalam metadata.

      Jika koneksi gagal, berarti proxy, pemetaan port, atau mekanisme penerusan lain digunakan antara Flink dan Kafka.

  • Solusi

    • Jangan gunakan proxy, pemetaan port, atau mekanisme penerusan lain. Bangun koneksi jaringan langsung antara Flink dan Kafka sehingga Flink dapat langsung terhubung ke endpoint yang ditampilkan dalam metadata Kafka.

    • Hubungi insinyur operasi dan pemeliharaan (O&M) Kafka untuk menambahkan alamat penerusan ke parameter advertised.listeners pada broker Kafka. Hal ini memungkinkan klien Kafka menarik metadata server Kafka yang mencakup alamat penerusan.

      Catatan

      Hanya Kafka versi 0.10.2.0 dan yang lebih baru yang mendukung penambahan alamat proxy ke listener broker Kafka.

    Untuk informasi lebih lanjut mengenai prinsip masalah ini, lihat KIP-103: Separation of Internal and External traffic dan Penjelasan Masalah Konektivitas Kafka.

Mengapa tabel sumber Kafka tidak menghasilkan data setelah jendela berbasis waktu peristiwa?

  • Detail

    Tabel sumber Kafka tidak menghasilkan data setelah jendela berbasis waktu peristiwa.

  • Penyebab

    Jika partisi di Kafka tidak memiliki data, pembuatan watermark terpengaruh. Akibatnya, tabel sumber Kafka tidak dapat menghasilkan data setelah jendela berbasis waktu peristiwa.

  • Solusi

    1. Pastikan semua partisi berisi data.

    2. Anda dapat mengaktifkan fitur deteksi sumber menganggur dengan menambahkan kode berikut di bagian Additional Configurations dan menyimpannya. Untuk informasi lebih lanjut, lihat Bagaimana cara mengonfigurasi parameter runtime pekerjaan kustom?.

      table.exec.source.idle-timeout: 5

      Untuk informasi lebih lanjut mengenai parameter table.exec.source.idle-timeout, lihat Konfigurasi.

Apa tujuan dari offset commit di Kafka?

Offset commit Kafka digunakan untuk memastikan konsistensi dan keandalan pemrosesan data streaming. Dengan mencatat posisi data yang telah diproses, offset commit mencegah duplikasi atau kehilangan data. Offset commit biasanya digunakan bersama antrian pesan seperti Kafka untuk mengontrol progres konsumsi data dalam pemrosesan data streaming. Flink hanya melakukan commit offset bacaan saat ini ke Kafka ketika checkpoint berhasil. Hal ini mencatat posisi data yang telah diproses. Jika checkpoint tidak diaktifkan atau interval checkpoint terlalu lama, offset bacaan saat ini mungkin tidak ditemukan di sisi Kafka. Hal ini dapat menyebabkan duplikasi atau kehilangan data.

Bagaimana cara mengurai data JSON bersarang menggunakan konektor Kafka?

Sebagai contoh, jika Anda mengurai data JSON berikut secara langsung menggunakan format JSON, data tersebut diurai menjadi field ARRAY<ROW<cola VARCHAR, colb VARCHAR>>. Field ini merupakan array tipe ROW, dan tipe ROW berisi dua field VARCHAR. Kemudian, Anda dapat mengurai data tersebut menggunakan fungsi bernilai tabel yang ditentukan pengguna (UDTF).

{"data":[{"cola":"test1","colb":"test2"},{"cola":"test1","colb":"test2"},{"cola":"test1","colb":"test2"},{"cola":"test1","colb":"test2"},{"cola":"test1","colb":"test2"}]}

Bagaimana cara menghubungkan ke kluster Kafka yang telah dikonfigurasi informasi keamanannya?

  1. Dalam parameter WITH DDL Kafka, Anda dapat menambahkan konfigurasi keamanan terkait enkripsi dan autentikasi. Untuk informasi lebih lanjut mengenai konfigurasi keamanan, lihat KEAMANAN. Kode berikut memberikan contoh.

    Penting

    Anda harus menambahkan awalan properties. ke konfigurasi keamanan.

    • Contoh berikut menunjukkan cara mengonfigurasi tabel Kafka untuk menggunakan PLAIN sebagai mekanisme SASL dan menyediakan konfigurasi JAAS.

      CREATE TABLE KafkaTable (
        `user_id` BIGINT,
        `item_id` BIGINT,
        `behavior` STRING,
        `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
      ) WITH (
        'connector' = 'kafka',
        ...
        'properties.security.protocol' = 'SASL_PLAINTEXT',
        'properties.sasl.mechanism' = 'PLAIN',
        'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";'
      );
    • Contoh berikut menunjukkan cara menggunakan SASL_SSL sebagai protokol keamanan dan SCRAM-SHA-256 sebagai mekanisme SASL.

      CREATE TABLE KafkaTable (
        `user_id` BIGINT,
        `item_id` BIGINT,
        `behavior` STRING,
        `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
      ) WITH (
        'connector' = 'kafka',
        ...
        'properties.security.protocol' = 'SASL_SSL',
        /* Konfigurasi SSL */
        /* Konfigurasikan path truststore (sertifikat CA) yang disediakan oleh server. */
        'properties.ssl.truststore.location' = '/flink/usrlib/kafka.client.truststore.jks',
        'properties.ssl.truststore.password' = 'test1234',
        /* Jika autentikasi klien diperlukan, konfigurasikan path keystore (kunci privat). */
        'properties.ssl.keystore.location' = '/flink/usrlib/kafka.client.keystore.jks',
        'properties.ssl.keystore.password' = 'test1234',
        /* Konfigurasi SASL */
        /* Konfigurasikan mekanisme SASL sebagai SCRAM-SHA-256. */
        'properties.sasl.mechanism' = 'SCRAM-SHA-256',
        /* Konfigurasikan JAAS. */
        'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";'
      );
      Catatan
      • Jika properties.sasl.mechanism adalah SCRAM-SHA-256, gunakan org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule untuk properties.sasl.jaas.config.

      • Jika properties.sasl.mechanism adalah PLAINTEXT, gunakan org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule untuk properties.sasl.jaas.config.

  2. Di bagian Additional Dependencies untuk pekerjaan, Anda dapat mengunggah file yang diperlukan, seperti sertifikat, kunci publik, atau kunci privat.

    Setelah file diunggah, file tersebut disimpan di direktori /flink/usrlib. Untuk informasi lebih lanjut mengenai cara mengunggah file di bagian Dependensi Tambahan, lihat Men-deploy pekerjaan.

    Penting

    Jika mekanisme autentikasi untuk nama pengguna dan kata sandi pada broker Kafka Anda adalah SASL_SSL, tetapi mekanisme autentikasi pada klien adalah SASL_PLAINTEXT, pengecualian OutOfMemory dilaporkan selama validasi pekerjaan. Dalam kasus ini, Anda harus memodifikasi mekanisme autentikasi klien.

Bagaimana cara menyelesaikan konflik penamaan field?

  • Gejala

    Pesan dari sumber data Kafka diserialisasi menjadi dua string JSON. Dalam kasus ini, baik kunci maupun nilai berisi field dengan nama yang sama, misalnya field id dalam kode contoh. Jika Anda mengurai data ini langsung ke tabel Flink untuk diproses, terjadi konflik penamaan field.

    • kunci

      {
         "id": 1
      }

    • nilai

      {
         "id": 100,
         "name": "flink"
      }
  • Solusi

    Anda dapat menghindari masalah ini menggunakan properti key.fields-prefix. Pernyataan SQL berikut mendefinisikan tabel Flink.

    CREATE TABLE kafka_table (
      -- Definisikan field di kunci dan nilai di sini.
      key_id INT,
      value_id INT,
      name STRING
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'test_topic',
      'properties.bootstrap.servers' = 'localhost:9092',
      'format' = 'json',
      'json.ignore-parse-errors' = 'true',
      -- Tentukan field di kunci dan tipe datanya yang sesuai.
      'key.format' = 'json',
      'key.fields' = 'id',
      'value.format' = 'json',
      'value.fields' = 'id, name',
      -- Tetapkan awalan untuk field di kunci.
      'key.fields-prefix' = 'key_'
    );

    Saat Anda membuat tabel Flink, properti key.fields-prefix ditentukan sebagai key_. Artinya, saat Flink memproses data dari Kafka, field di kunci, yaitu field id dalam kasus ini, diberi awalan key_. Oleh karena itu, nama field di tabel Flink menjadi key_id, yang membedakannya dari value_id.

    Jika Anda menjalankan kueri SELECT * FROM kafka_table; , output berikut dikembalikan.

    key_id: 1,
    value_id: 100,
    name: flink

Apa yang harus saya lakukan jika membaca dari tabel sumber Kafka menunjukkan latensi bisnis yang tidak terduga?

  • Detail

    Saat Anda membaca data dari tabel sumber Kafka, currentEmitEventTimeLag lebih dari 50 tahun, seperti yang ditunjukkan pada gambar berikut.延迟

  • Pemecahan Masalah

    1. Pertama, Anda perlu menentukan apakah pekerjaan tersebut merupakan pekerjaan JAR atau pekerjaan SQL.

      Jika pekerjaan tersebut adalah pekerjaan JAR, Anda juga perlu mengonfirmasi apakah dependensi Kafka yang digunakan oleh Pom telah dibangun ke dalam Realtime Compute for Apache Flink. Dependensi open-source tidak melaporkan kurva.

    2. Tentukan apakah data real-time masuk ke semua partisi topik Kafka hulu.

    3. Tentukan apakah timestamp metadata pada pesan Kafka adalah 0 atau null.

      Latensi sumber Kafka dihitung dengan mengurangkan timestamp pada pesan Kafka dari waktu saat ini. Jika pesan tidak memiliki timestamp, latensi ditampilkan lebih dari 50 tahun. Anda dapat menggunakan metode berikut untuk menentukan timestamp:

      • Untuk pekerjaan SQL, Anda dapat mengambil timestamp pesan dengan mendefinisikan kolom metadata. Untuk informasi lebih lanjut, lihat Tabel sumber Kafka.

        CREATE TEMPORARY TABLE sk_flink_src_user_praise_rt (
            `timestamp` BIGINT ,
            `timestamp` TIMESTAMP METADATA,  -- Timestamp metadata.
            ts as to_timestamp (
              from_unixtime (`timestamp`, 'yyyy-MM-dd HH:mm:ss')
            ),
            watermark for ts as ts - interval '5' second
          ) WITH (
            'connector' = 'kafka',
            'topic' = '',
            'properties.bootstrap.servers' = '',
            'properties.group.id' = '',
            'format' = 'json',
            'scan.startup.mode' = 'latest-offset',
            'json.fail-on-missing-field' = 'false',
            'json.ignore-parse-errors' = 'true'
          );
      • Anda dapat menulis program Java sederhana untuk membaca pesan dengan KafkaConsumer untuk pengujian.

Kesalahan: Tabel 'upsert-kafka' memerlukan PRIMARY KEY

  • Rincian

    image.png

  • Penyebab

    Kunci primer tidak ditentukan saat DDL didefinisikan. Jika upsert-kafka digunakan sebagai tabel sink, konektor dapat mengonsumsi aliran changelog yang dihasilkan oleh logika komputasi hulu. Konektor menulis data INSERT atau UPDATE_AFTER ke Kafka dan menulis data DELETE sebagai pesan dengan nilai kosong. Hal ini menunjukkan bahwa pesan untuk kunci yang sesuai dihapus. Flink mempartisi data berdasarkan nilai kolom kunci primer. Hal ini memastikan bahwa pesan pada kunci primer yang sama diurutkan dan pesan pembaruan atau penghapusan untuk kunci primer yang sama masuk ke partisi yang sama.

  • Solusi

    Anda harus menentukan kunci primer saat mendefinisikan DDL.

Bagaimana cara memulihkan pekerjaan Flink yang gagal setelah topik DataHub di-split atau diskalakan?

Jika topik yang sedang dibaca Flink di-split atau diskalakan, pekerjaan terus melaporkan kesalahan dan tidak dapat pulih sendiri. Dalam kasus ini, Anda harus memulai ulang pekerjaan dengan menghentikan lalu menjalankannya kembali untuk mengembalikannya ke kondisi normal.

Apakah saya dapat menghapus topik DataHub yang sedang dikonsumsi?

Tidak, Anda tidak dapat. Anda tidak dapat menghapus atau membuat ulang topik DataHub yang sedang dikonsumsi.

Apa itu parameter endPoint dan tunnelEndpoint, serta apa yang terjadi jika dikonfigurasi secara salah?

Untuk deskripsi parameter endPoint dan tunnelEndpoint, lihat Endpoint. Di lingkungan VPC, konfigurasi salah kedua parameter ini dapat menyebabkan pengecualian pekerjaan, seperti berikut:

  • Jika parameter endPoint dikonfigurasi secara salah, penyebaran pekerjaan berhenti pada progres 91%.

  • Jika parameter tunnelEndpoint dikonfigurasi secara salah, pekerjaan gagal dijalankan.

Bagaimana tabel sumber MaxCompute penuh dan tambahan membaca data dari MaxCompute?

Tabel sumber MaxCompute penuh dan inkremental membaca data dari MaxCompute melalui terowongan. Kecepatan baca dibatasi oleh bandwidth terowongan MaxCompute.

Jika MaxCompute digunakan sebagai sumber data dan data ditambahkan ke partisi atau tabel yang sudah ada setelah pekerjaan dimulai, apakah data baru tersebut dapat dibaca oleh tabel sumber MaxCompute penuh atau inkremental?

Tidak, tidak dapat. Setelah pekerjaan Flink dimulai, jika data baru ditambahkan ke tabel atau partisi yang sedang dibaca atau telah dibaca oleh sumber, data ini tidak dibaca. Hal ini juga dapat menyebabkan pekerjaan gagal.

Kedua tabel sumber MaxCompute penuh dan inkremental menggunakan ODPS DOWNLOAD SESSION untuk membaca data tabel atau data partisi. Saat DOWNLOAD SESSION baru dibuat, server membuat file indeks. Hal ini setara dengan membuat pemetaan data pada saat DOWNLOAD SESSION dibuat. Pembacaan data selanjutnya berdasarkan pemetaan ini. Oleh karena itu, setelah DOWNLOAD SESSION baru dibuat, data yang ditambahkan ke tabel atau partisi MaxCompute tidak dibaca dalam proses normal. Namun, jika data baru ditulis ke tabel sumber MaxCompute, dua jenis pengecualian dapat terjadi:

  • Kesalahan dilaporkan: Jika data baru ditulis saat terowongan membaca data, kesalahan berikut dilaporkan: ErrorCode=TableModified,ErrorMessage=The specified table has been modified since the download initiated..

  • Keakuratan data tidak dapat dijamin: Jika data baru ditulis setelah terowongan ditutup, data ini tidak dibaca. Namun, jika pekerjaan gagal atau dilanjutkan setelah dijeda, data yang telah dibaca mungkin dibaca ulang, dan data yang baru ditulis mungkin tidak dibaca secara lengkap.

Dapatkah saya menjeda pekerjaan yang menggunakan tabel sumber MaxCompute penuh atau inkremental, mengubah konkurensi, lalu melanjutkan pekerjaan tersebut?

Ya, Anda dapat. Untuk tabel sumber MaxCompute dengan opsi useNewApi diaktifkan, yang merupakan pengaturan default, Anda dapat menjeda pekerjaan, mengubah konkurensi, lalu melanjutkan pekerjaan dalam mode streaming. Tabel sumber MaxCompute membaca beberapa partisi yang cocok secara berurutan. Saat membaca partisi saat ini, berbagai rentang data di partisi tersebut dialokasikan ke setiap thread konkuren. Mengubah konkurensi tidak mengubah metode alokasi konkurensi untuk partisi yang sedang dibaca sebelum pekerjaan dijeda. Saat partisi berikutnya dibaca, rentang baca untuk setiap thread konkuren dialokasikan berdasarkan konkurensi baru. Oleh karena itu, saat Anda membaca satu partisi besar, dimungkinkan bahwa setelah Anda meningkatkan konkurensi dan memulai ulang pekerjaan, hanya beberapa operator MaxCompute yang membaca data.

Untuk pekerjaan dengan useNewApi diatur ke false dan untuk pekerjaan batch, Anda tidak dapat mengubah konkurensi.

Mengapa partisi sebelum offset awal (2019-10-11 00:00:00) juga dibaca oleh tabel sumber MaxCompute penuh?

Menetapkan offset awal hanya efektif untuk sumber data tipe antrian pesan, seperti DataHub. Hal ini tidak efektif untuk tabel sumber MaxCompute. Rentang data yang dibaca setelah pekerjaan Flink dimulai adalah sebagai berikut:

  • Tabel terpartisi: Semua partisi saat ini dibaca.

  • Tabel tidak terpartisi: Data yang ada saat ini dibaca.

Apa yang harus saya lakukan jika tabel sumber MaxCompute inkremental mendeteksi partisi baru sebelum semua data ditulis ke dalamnya?

Saat ini, tidak ada mekanisme yang tersedia untuk menandai apakah data di partisi sudah lengkap. Selama partisi baru terdeteksi, partisi tersebut dibaca. Saat Anda menggunakan tabel sumber MaxCompute inkremental untuk membaca tabel terpartisi MaxCompute T dengan kolom kunci partisi ds, kami menyarankan Anda menggunakan metode penulisan berikut: Jangan membuat partisi. Pertama, jalankan pernyataan Insert overwrite table T partition (ds='20191010') .... Setelah pekerjaan berhasil diselesaikan, partisi dan data terlihat secara bersamaan.

Penting

Metode penulisan berikut tidak diperbolehkan: Pertama, buat partisi, misalnya ds=20191010, lalu tulis data ke partisi tersebut. Tabel sumber MaxCompute inkremental mendeteksi partisi baru ds=20191010 dan segera membaca partisi tersebut. Jika data belum ditulis ke partisi pada saat ini, data akan terlewat.

Kesalahan runtime konektor MaxCompute: ErrorMessage=Authorization Failed [4019], You have NO privilege

  • Detail kesalahan

    Selama eksekusi pekerjaan, kesalahan dilaporkan di halaman Failover atau di TaskManager.log. Pesan kesalahan adalah sebagai berikut.

    ErrorMessage=Authorization Failed [4019], You have NO privilege'ODPS:***'
  • Penyebab

    Informasi identitas pengguna yang ditentukan dalam definisi DDL MaxCompute tidak dapat digunakan untuk mengakses MaxCompute.

  • Solusi

    Anda dapat mengautentikasi identitas pengguna menggunakan akun Alibaba Cloud, pengguna RAM, atau peran RAM. Untuk informasi lebih lanjut, lihat Autentikasi pengguna.

Bagaimana cara menentukan parameter startPartition untuk tabel sumber MaxCompute inkremental?

Anda dapat mengikuti langkah-langkah berikut untuk menentukan parameter startPartition.

Langkah

Deskripsi

Contoh

1

Hubungkan setiap nama kolom kunci partisi dan nilai partisi yang sesuai dengan tanda sama dengan (=). Nilai partisi harus berupa field statis.

Kolom kunci partisi adalah dt. Anda perlu membaca data mulai dari nilai partisi 20220901. Hasilnya adalah dt=20220901.

2

Urutkan hasil dari langkah 1 secara menaik berdasarkan level partisi, lalu hubungkan dengan koma (,). Tidak boleh ada spasi di antaranya. Hasil langkah ini adalah nilai parameter startPartition.

Catatan

Anda hanya dapat menentukan beberapa level partisi pertama.

  • Hanya ada satu partisi hash, dt. Untuk membaca data mulai dari dt=20220901, tentukan 'startPartition' = 'dt=20220901'.

  • Terdapat tiga level partisi: partisi hash adalah dt, subpartisi pertama adalah hh, dan subpartisi kedua adalah mm. Untuk membaca data mulai dari dt=20220901, hh=08, dan mm=10, tentukan 'startPartition' = 'dt=20220901,hh=08,mm=10'.

  • Terdapat tiga level partisi: partisi hash adalah dt, subpartisi pertama adalah hh, dan subpartisi kedua adalah mm. Untuk membaca data mulai dari dt=20220901 dan hh=08, tentukan 'startPartition' = 'dt=20220901,hh=08'.

Saat sistem memuat daftar partisi, sistem membandingkan semua partisi dengan startPartition dalam urutan leksikografis. Kemudian, sistem memuat partisi yang lebih besar dari atau sama dengan startPartition. Sebagai contoh, tabel terpartisi MaxCompute inkremental memiliki dua kolom kunci partisi: partisi hash ds dan subpartisi type. Misalnya, asumsikan tabel memiliki enam partisi berikut:

  • ds=20191201,type=a

  • ds=20191201,type=b

  • ds=20191202,type=a

  • ds=20191202,type=b

  • ds=20191202,type=c

  • ds=20191203,type=a

Jika nilai startPartition adalah ds=20191202, empat partisi ds=20191202,type=a, ds=20191202,type=b, ds=20191202,type=c, dan ds=20191203,type=a dibaca. Jika nilai startPartition adalah ds=20191202,type=b, tiga partisi ds=20191202,type=b, ds=20191202,type=c, dan ds=20191203,type=a dibaca.

Catatan

Partisi yang ditentukan oleh startPartition tidak perlu ada. Partisi apa pun yang secara leksikografis lebih besar dari atau sama dengan startPartition dibaca.

Mengapa pekerjaan dengan tabel sumber MaxCompute inkremental tidak mulai membaca data dalam waktu lama setelah dimulai?

Masalah ini terjadi karena terlalu banyak partisi yang ada secara leksikografis lebih besar dari atau sama dengan startPartition, atau terlalu banyak file kecil di partisi-partisi tersebut. Tabel sumber MaxCompute inkremental harus terlebih dahulu mengatur informasi partisi yang memenuhi syarat sebelum mulai membaca data. Oleh karena itu, kami menyarankan Anda melakukan operasi berikut:

  • Jangan membaca data historis dalam jumlah berlebihan.

    Catatan

    Untuk memproses data historis, Anda dapat menjalankan pekerjaan batch yang memiliki tabel sumber MaxCompute.

  • Kurangi jumlah file kecil dalam data historis.

Bagaimana cara menentukan parameter partisi saat membaca dari atau menulis ke partisi?

Baca partisi

  • Baca partisi tetap

    Saat tabel sumber atau tabel dimensi perlu membaca partisi tetap, Anda dapat mengikuti langkah-langkah berikut untuk menentukan parameter partisi.

    Langkah

    Deskripsi

    Contoh

    1

    • Untuk tabel dimensi, hubungkan setiap nama kolom kunci partisi dan nilai partisi yang sesuai dengan tanda sama dengan (=). Nilai partisi adalah field statis.

    • Untuk tabel sumber, hubungkan setiap nama kolom kunci partisi dan nilai partisi yang sesuai dengan tanda sama dengan (=). Nilai partisi dapat berupa field statis atau nilai yang mengandung karakter wildcard (*). Karakter wildcard dapat mencocokkan string apa pun (termasuk string kosong).

    • Kolom kunci partisi adalah dt. Untuk membaca data dengan nilai partisi 20220901, hasilnya adalah dt=20220901.

    • Kolom kunci partisi adalah dt. Untuk membaca data dengan nilai partisi yang dimulai dengan 202209, hasilnya adalah dt=202209* (hanya untuk tabel sumber).

    • Kolom kunci partisi adalah dt. Untuk membaca data dengan nilai partisi yang dimulai dengan 2022 dan diakhiri dengan 01, hasilnya adalah dt=2022*01 (hanya untuk tabel sumber).

    • Kolom kunci partisi adalah dt. Untuk membaca data dari semua partisi, hasilnya adalah dt=* (hanya untuk tabel sumber).

    2

    Urutkan hasil dari langkah 1 secara menaik berdasarkan level partisi, lalu hubungkan dengan koma (,). Tidak boleh ada spasi di antaranya. Hasil langkah ini adalah nilai parameter partisi.

    Anda hanya dapat menentukan beberapa level partisi pertama.

    • Hanya ada satu partisi hash, dt. Untuk membaca data dari dt=20220901, tentukan 'partition' = 'dt=20220901'.

    • Terdapat tiga level partisi: partisi hash adalah dt, subpartisi pertama adalah hh, dan subpartisi kedua adalah mm. Untuk membaca data dari dt=20220901, hh=08, dan mm=10, tentukan 'partition' = 'dt=20220901,hh=08,mm=10'.

    • Terdapat tiga level partisi: partisi hash adalah dt, subpartisi pertama adalah hh, dan subpartisi kedua adalah mm. Untuk membaca data dari dt=20220901 dan hh apa pun, tentukan 'partition' = 'dt=20220901,hh=08' atau 'partition' = 'dt=20220901,hh=08,mm=*'.

    • Terdapat tiga level partisi: partisi hash adalah dt, subpartisi pertama adalah hh, dan subpartisi kedua adalah mm. Untuk membaca data dari dt=20220901, hh apa pun, dan mm=10, tentukan 'partition' = 'dt=20220901,hh=*,mm=10'.

    Jika langkah-langkah di atas tidak dapat memenuhi persyaratan untuk memfilter partisi, Anda juga dapat menulis kondisi filter di klausa WHERE pernyataan SQL dan menggunakan fitur pushdown partisi pengoptimal SQL untuk memfilter partisi. Terdapat dua level partisi: partisi hash adalah dt dan subpartisi adalah hh. Untuk membaca partisi di mana dt lebih besar dari atau sama dengan 20220901 dan kurang dari atau sama dengan 20220903, dan hh lebih besar dari atau sama dengan 09 dan kurang dari atau sama dengan 17, Anda dapat menggunakan kode SQL berikut.

    CREATE TABLE maxcompute_table (
      content VARCHAR,
      dt VARCHAR,
      hh VARCHAR
    ) PARTITIONED BY (dt, hh) WITH ( 
       -- Anda perlu menentukan kolom kunci partisi dengan PARTITIONED BY. Jika tidak, fitur pushdown partisi pengoptimal SQL tidak dapat diaktifkan, yang memengaruhi efisiensi eksekusi.
      'connector' = 'odps',
      ... -- Tentukan parameter yang diperlukan seperti accessId. Parameter partisi dapat dibiarkan tidak ditentukan, karena pengoptimal SQL akan melakukan pemfilteran.
    );
    
    SELECT content, dt, hh FROM maxcompute_table
    WHERE dt >= '20220901' AND dt <= '20220903' AND hh >= '09' AND hh <= '17'; -- Tentukan kondisi filter partisi di klausa WHERE.
  • Baca partisi terbesar secara leksikografis

    • Jika tabel sumber atau tabel dimensi perlu membaca partisi terbesar secara leksikografis, Anda harus mengatur parameter partisi menjadi 'partition' = 'max_pt()'.

    • Jika tabel sumber atau tabel dimensi perlu membaca dua partisi terbesar secara leksikografis, Anda harus mengatur parameter partisi menjadi 'partition' = 'max_two_pt()'.

    • Jika tabel sumber atau tabel dimensi perlu membaca partisi terbesar secara leksikografis yang disertai file .done, Anda harus mengatur parameter partisi menjadi 'partition' = 'max_pt_with_done()'.

    Dalam sebagian besar skenario, partisi terbesar secara leksikografis juga merupakan partisi yang paling baru dihasilkan. Dalam beberapa kasus, data di partisi terbaru belum siap, dan Anda ingin tabel dimensi sementara membaca data dari partisi yang lebih lama. Dalam kasus ini, Anda dapat menggunakan nilai parameter partisi max_pt_with_done().

    Saat data di partisi siap, Anda harus membuat partisi kosong secara bersamaan. Nama partisi ini adalah nama partisi data yang sesuai ditambahkan akhiran .done. Misalnya, saat data di partisi dt=20220901 siap, Anda harus membuat partisi kosong dt=20220901.done secara bersamaan. Setelah Anda mengatur nilai parameter partisi max_pt_with_done(), tabel dimensi hanya membaca partisi yang memiliki partisi data dan partisi .done. Partisi data tanpa partisi .done tidak dibaca. Untuk informasi lebih lanjut, lihat Apa perbedaan antara max_pt() dan max_pt_with_done()?.

    Catatan

    Tabel sumber hanya mengambil partisi terbesar secara leksikografis saat pekerjaan dimulai. Pekerjaan berakhir setelah semua data dibaca dan tidak memantau partisi baru. Untuk terus membaca partisi baru, Anda dapat menggunakan mode tabel sumber inkremental. Tabel dimensi memeriksa partisi terbaru dan membaca data terbaru pada setiap pembaruan.

Tulis ke partisi

  • Tulis ke partisi tetap

    Saat tabel sink perlu menulis data ke partisi tetap, Anda dapat menentukan parameter partisi dengan cara yang sama seperti saat membaca partisi tetap.

    Penting

    Parameter partisi untuk tabel sink tidak mendukung karakter wildcard (*).

  • Tulis ke partisi dinamis

    Saat tabel sink perlu menulis data ke partisi yang sesuai berdasarkan nilai spesifik kolom kunci partisi dalam data yang ditulis, Anda harus mengurutkan nama kolom kunci partisi secara menaik berdasarkan level partisi lalu menghubungkannya dengan koma (,). Tidak boleh ada spasi di antaranya. Hasilnya adalah nilai parameter partisi. Misalnya, jika terdapat tiga level partisi, partisi hash adalah dt, subpartisi pertama adalah hh, dan subpartisi kedua adalah mm, Anda dapat menentukan 'partition' = 'dt,hh,mm'.

Mengapa pekerjaan dengan tabel sumber MaxCompute tetap dalam status memulai atau menghasilkan data lama setelah dimulai?

Alasannya adalah sebagai berikut:

  • Terlalu banyak file kecil di tabel MaxCompute.

  • Kluster penyimpanan MaxCompute dan kluster komputasi Flink tidak berada di wilayah yang sama. Hal ini mengakibatkan waktu komunikasi jaringan yang lama. Kami menyarankan Anda menggunakan wilayah yang sama untuk kluster penyimpanan dan kluster komputasi lalu coba lagi.

  • Izin MaxCompute tidak dikonfigurasi dengan benar. Membaca tabel sumber memerlukan izin unduh untuk tabel MaxCompute.

Bagaimana cara memilih terowongan data?

MaxCompute menyediakan dua terowongan data: Batch Tunnel dan Streaming Tunnel. Anda dapat memilih terowongan data yang berbeda berdasarkan kebutuhan konsistensi dan kinerja Anda. Perbedaan antara kedua terowongan data tersebut adalah sebagai berikut.

Kebutuhan

Batch Tunnel

Streaming Tunnel

Konsistensi

Dibandingkan dengan Streaming Tunnel, metode ini dapat menulis data ke tabel MaxCompute lebih stabil dalam sebagian besar kasus, memastikan data tidak hilang (semantik at-least-once).

Data duplikat mungkin dihasilkan di beberapa partisi hanya ketika terjadi pengecualian selama proses checkpoint dan pekerjaan menulis ke beberapa partisi secara bersamaan.

Memastikan data tidak hilang (semantik at-least-once). Saat terjadi pengecualian dalam pekerjaan dalam kondisi apa pun, data duplikat mungkin dihasilkan.

Kinerja

Karena operasi seperti mengirimkan data selama proses checkpoint dan membuat file di server, efisiensi keseluruhan lebih rendah daripada Streaming Tunnel.

Tidak perlu melakukan commit data selama proses checkpoint. Jika Anda menggunakan Streaming Tunnel dan mengatur nilai numFlushThreads lebih besar dari 1, Anda dapat terus menerima data hulu selama proses flushing data. Efisiensi keseluruhan lebih tinggi daripada Batch Tunnel.

Catatan

Untuk pekerjaan yang saat ini menggunakan MaxCompute Batch Tunnel, jika proses checkpoint sangat lambat atau bahkan timeout, dan Anda mengonfirmasi bahwa downstream dapat menerima data duplikat, Anda dapat mempertimbangkan untuk menggunakan MaxCompute Stream Tunnel.

Bagaimana cara menangani data duplikat saat menulis ke tabel sink MaxCompute?

Jika data duplikat terjadi saat pekerjaan Flink menulis data ke MaxCompute melalui konektor MaxCompute, Anda dapat memecahkan masalah tersebut dari aspek berikut:

  • Periksa logika pekerjaan. Bahkan jika kendala PRIMARY KEY dideklarasikan di tabel sink MaxCompute, Flink tidak melakukan pemeriksaan keunikan kunci primer saat menulis data ke penyimpanan eksternal, dan tabel non-transaksional di MaxCompute tidak mendukung kendala PRIMARY KEY. Jika logika pekerjaan Flink menghitung data duplikat, data duplikat tetap muncul di tabel MaxCompute.

  • Periksa apakah beberapa pekerjaan Flink menulis ke tabel MaxCompute yang sama secara bersamaan. Seperti yang disebutkan, MaxCompute tidak mendukung kendala PRIMARY KEY. Jika beberapa pekerjaan Flink menghitung hasil yang sama, hasil tersebut diduplikasi di tabel MaxCompute.

  • Saat Anda menggunakan Batch Tunnel, pekerjaan Flink gagal selama checkpoint. Jika checkpoint gagal, tabel sink MaxCompute mungkin telah mengirimkan data ke server. Oleh karena itu, saat pekerjaan dilanjutkan dari checkpoint sebelumnya, data antara dua checkpoint mungkin diduplikasi.

  • Saat Anda menggunakan Stream Tunnel, terjadi failover pekerjaan Flink. Saat Anda menulis data ke MaxCompute dengan Stream Tunnel diaktifkan, data dikirimkan ke server MaxCompute antara checkpoint. Oleh karena itu, saat pekerjaan gagal dan dilanjutkan dari checkpoint terbaru, data antara penyelesaian checkpoint terbaru dan failover pekerjaan mungkin diduplikasi. Untuk informasi lebih lanjut, lihat Bagaimana cara memilih terowongan data?. Dalam kasus ini, Anda dapat beralih ke mode Batch Tunnel untuk menghindari data duplikat yang dihasilkan dalam situasi ini.

  • Saat Anda menggunakan Batch Tunnel, pekerjaan Flink gagal atau dimulai setelah dibatalkan, misalnya dipicu oleh penyetelan autopilot. Di versi sebelum vvr-6.0.7-flink-1.15, tabel sink MaxCompute mengirimkan data saat ditutup. Oleh karena itu, saat pekerjaan Flink berhenti dan dilanjutkan dari checkpoint sebelumnya, data antara checkpoint dan penghentian pekerjaan mungkin diduplikasi. Anda dapat meningkatkan versi Flink ke vvr-6.0.7-flink-1.15 atau yang lebih baru untuk menyelesaikan masalah ini.

Pekerjaan dengan tabel sink MaxCompute melaporkan kesalahan "Invalid partition spec" selama runtime. Apa yang harus saya lakukan?

  • Penyebab: Nilai kolom kunci partisi dalam data yang ditulis ke MaxCompute tidak valid. Nilai yang tidak valid meliputi string kosong, nilai null, dan nilai yang mengandung tanda sama dengan (=), koma (,), atau garis miring (/).

  • Solusi: Anda dapat memeriksa data yang tidak valid.

Pekerjaan dengan tabel sink MaxCompute melaporkan kesalahan "No more available blockId" selama runtime. Apa yang harus saya lakukan?

  • Penyebab: Jumlah blok yang ditulis ke tabel sink MaxCompute melebihi batas. Hal ini menunjukkan bahwa jumlah data yang di-flush setiap kali terlalu kecil dan flushing terlalu sering.

  • Solusi: Kami menyarankan Anda menyesuaikan nilai parameter batchSize dan flushIntervalMs.

Bagaimana cara menggunakan hint SHUFFLE_HASH untuk tabel dimensi?

Secara default, setiap thread konkuren menyimpan informasi seluruh tabel dimensi. Jika tabel dimensi memiliki banyak data, Anda dapat menggunakan hint SHUFFLE_HASH untuk mendistribusikan data tabel dimensi secara merata ke setiap thread konkuren. Hal ini mengurangi konsumsi memori heap JVM. Dalam contoh berikut, data tabel dimensi dim_1 dan dim_3 didistribusikan ke setiap thread konkuren, sedangkan data tabel dimensi dim_2 tetap di-cache sepenuhnya di setiap thread konkuren.

-- Buat tabel sumber dan tiga tabel dimensi.
CREATE TABLE source_table (k VARCHAR, v VARCHAR) WITH ( ... );
CREATE TABLE dim_1 (k VARCHAR, v VARCHAR) WITH ('connector' = 'odps', 'cache' = 'ALL', ... );
CREATE TABLE dim_2 (k VARCHAR, v VARCHAR) WITH ('connector' = 'odps', 'cache' = 'ALL', ... );
CREATE TABLE dim_3 (k VARCHAR, v VARCHAR) WITH ('connector' = 'odps', 'cache' = 'ALL', ... );

-- Tulis nama tabel dimensi yang datanya perlu didistribusikan di hint SHUFFLE_HASH.
SELECT /*+ SHUFFLE_HASH(dim_1), SHUFFLE_HASH(dim_3) */
k, s.v, d1.v, d2.v, d3.v
FROM source_table AS s
INNER JOIN dim_1 FOR SYSTEM_TIME AS OF PROCTIME() AS d1 ON s.k = d1.k
LEFT JOIN dim_2 FOR SYSTEM_TIME AS OF PROCTIME() AS d2 ON s.k = d2.k
LEFT JOIN dim_3 FOR SYSTEM_TIME AS OF PROCTIME() AS d3 ON s.k = d3.k;

Bagaimana cara menentukan parameter CacheReloadTimeBlackList?

Periode di mana pembaruan tabel dimensi dilarang pada waktu tetap setiap hari.

  • Tipe data: String

  • Gunakan -> untuk menghubungkan waktu mulai dan waktu selesai.

  • Gunakan , untuk memisahkan beberapa periode waktu.

  • Format waktu: YYYY-MM-DD HH:mm (Jika hanya jam dan menit yang ditentukan, berlaku setiap hari).

'cacheReloadTimeBlackList' = '14:00 -> 15:00,23:00 -> 01:00'

Skenario contoh

Nilai contoh

Periode waktu tunggal

14:00 -> 15:00

Beberapa periode waktu

14:00 -> 15:00,23:00 -> 01:00

Periode waktu khusus

14:00 -> 15:00, 23:00 -> 01:00,2025-10-01 22:00 -> 2025-10-01 23:00

Kesalahan: java.io.EOFException: SSL peer shut down incorrectly

  • Detail kesalahan

    Caused by: java.io.EOFException: SSL peer shut down incorrectly
        at sun.security.ssl.SSLSocketInputRecord.decodeInputRecord(SSLSocketInputRecord.java:239) ~[?:1.8.0_302]
        at sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:190) ~[?:1.8.0_302]
        at sun.security.ssl.SSLTransport.decode(SSLTransport.java:109) ~[?:1.8.0_302]
        at sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1392) ~[?:1.8.0_302]
        at sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1300) ~[?:1.8.0_302]
        at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:435) ~[?:1.8.0_302]
        at com.mysql.cj.protocol.ExportControlled.performTlsHandshake(ExportControlled.java:347) ~[?:?]
        at com.mysql.cj.protocol.StandardSocketFactory.performTlsHandshake(StandardSocketFactory.java:194) ~[?:?]
        at com.mysql.cj.protocol.a.NativeSocketConnection.performTlsHandshake(NativeSocketConnection.java:101) ~[?:?]
        at com.mysql.cj.protocol.a.NativeProtocol.negotiateSSLConnection(NativeProtocol.java:308) ~[?:?]
        at com.mysql.cj.protocol.a.NativeAuthenticationProvider.connect(NativeAuthenticationProvider.java:204) ~[?:?]
        at com.mysql.cj.protocol.a.NativeProtocol.connect(NativeProtocol.java:1369) ~[?:?]
        at com.mysql.cj.NativeSession.connect(NativeSession.java:133) ~[?:?]
        at com.mysql.cj.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:949) ~[?:?]
        at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:819) ~[?:?]
        at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:449) ~[?:?]
        at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:242) ~[?:?]
        at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:198) ~[?:?]
        at org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider.getOrEstablishConnection(SimpleJdbcConnectionProvider.java:128) ~[?:?]
        at org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:54) ~[?:?]
        ... 14 more
  • Penyebab

    Kesalahan ini biasanya terjadi saat protokol SSL diaktifkan untuk database MySQL, tetapi klien tidak dikonfigurasi dengan benar untuk koneksi SSL. Misalnya, saat versi driver MySQL adalah 8.0.27, database MySQL memiliki protokol SSL diaktifkan, tetapi metode akses default tidak terhubung ke database melalui SSL. Hal ini menyebabkan kesalahan.

  • Solusi

    Kami menyarankan Anda mengatur konektor ke RDS di parameter WITH dan menambahkan characterEncoding=utf-8&useSSL=false ke parameter URL tabel dimensi MySQL. Misalnya:

    'url'='jdbc:mysql://***.***.***.***:3306/test?characterEncoding=utf-8&useSSL=false'

Mengapa kunci primer tabel MySQL berubah dari BIGINT UNSIGNED menjadi DECIMAL saat tabel tersebut didaftarkan dalam katalog Flink, lalu berubah menjadi TEXT setelah disinkronkan ke Hologres menggunakan pernyataan CTAS?

Masalah ini terjadi karena Flink tidak mendukung BIGINT UNSIGNED. Flink mengidentifikasi kunci primer BIGINT UNSIGNED MySQL sebagai tipe DECIMAL dengan mempertimbangkan rentang nilai. Saat data disinkronkan ke Hologres, sistem secara otomatis mengonversi kunci primer ke tipe TEXT karena Hologres tidak mendukung BIGINT UNSIGNED dan tidak mendukung penggunaan tipe DECIMAL sebagai kunci primer.

Kami menyarankan Anda melakukan penyesuaian berdasarkan spesifikasi ini selama pengembangan dan desain. Jika Anda ingin kolom ini tetap berupa tipe DECIMAL, Anda dapat membuat tabel di Hologres secara manual terlebih dahulu dan mengatur field lain sebagai kunci primer atau tidak mengatur kunci primer. Namun, hal ini dapat menyebabkan masalah duplikasi data karena kunci primer yang berbeda atau hilang memengaruhi keunikan data. Oleh karena itu, Anda perlu menyelesaikan masalah ini di lapisan aplikasi, misalnya dengan mentolerir tingkat duplikasi data tertentu atau menambahkan logika deduplikasi.

Saat Flink menulis data ke tabel RDS, apakah catatan diperbarui berdasarkan kunci primer atau menyisipkan catatan baru?

Jika kunci primer didefinisikan dalam DDL, metode INSERT INTO tablename(field1,field2, field3, ...) VALUES(value1, value2, value3, ...) ON DUPLICATE KEY UPDATE field1=value1,field2=value2, field3=value3, ...; digunakan untuk memperbarui catatan. Untuk field kunci primer yang tidak ada, catatan langsung disisipkan. Untuk field kunci primer yang ada, nilai yang sesuai diperbarui. Jika PRIMARY KEY tidak dideklarasikan dalam DDL, metode insert into digunakan untuk menyisipkan catatan dan menambahkan data.

Apa yang perlu saya ketahui saat menggunakan indeks unik dari tabel RDS dalam klausa GROUP BY?

  • Anda perlu mendeklarasikan indeks unik dalam klausa GROUP BY di pekerjaan.

  • Jika hanya ada satu kunci primer auto-increment di RDS, Anda tidak dapat mendeklarasikannya sebagai PRIMARY KEY di pekerjaan Flink.

Mengapa tipe field INT UNSIGNED dalam tabel fisik MySQL (termasuk RDS for MySQL dan AnalyticDB for MySQL) dideklarasikan sebagai tipe berbeda dalam Flink SQL?

Masalah ini terjadi karena driver JDBC MySQL menggunakan tipe data berbeda untuk membawa data karena masalah presisi. Secara khusus, untuk tipe INT UNSIGNED di MySQL, tipe LONG digunakan di Java untuk membawa data, yang sesuai dengan BIGINT di Flink SQL. Untuk tipe BIGINT UNSIGNED di MySQL, tipe BIGINTEGER digunakan di Java untuk membawa data, yang sesuai dengan DECIMAL(20, 0) di Flink SQL.

Kesalahan: Incorrect string value: '\xF0\x9F\x98\x80\xF0\x9F...' for column 'test' at row 1

  • Detail kesalahan

    Caused by: java.sql.BatchUpdateException: Incorrect string value: '\xF0\x9F\x98\x80\xF0\x9F...' for column 'test' at row 1
    at sun.reflect.GeneratedConstructorAccessor59.newInstance(Unknown Source)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at com.mysql.cj.util.Util.handleNewInstance(Util.java:192)
    at com.mysql.cj.util.Util.getInstance(Util.java:167)
    at com.mysql.cj.util.Util.getInstance(Util.java:174)
    at com.mysql.cj.jdbc.exceptions.SQLError.createBatchUpdateException(SQLError.java:224)
    at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchedInserts(ClientPreparedStatement.java:755)
    at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:426)
    at com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:796)
    at com.alibaba.druid.pool.DruidPooledPreparedStatement.executeBatch(DruidPooledPreparedStatement.java:565)
    at com.alibaba.ververica.connectors.rds.sink.RdsOutputFormat.executeSql(RdsOutputFormat.java:488)
    ... 15 more
  • Penyebab

    Data berisi karakter khusus atau format encoding yang menyebabkan encoding database tidak diurai dengan benar.

  • Solusi

    Saat Anda menggunakan JDBC untuk terhubung ke database MySQL, Anda dapat menambahkan UTF-8 di akhir alamat URL, misalnya, jdbc:mysql://<internal_endpoint>/<databaseName>?characterEncoding=UTF-8.

Apa yang harus saya lakukan jika terjadi deadlock saat menulis data ke MySQL (TDDL/RDS)?

  • Rincian

    Deadlock terjadi saat Anda menulis data ke MySQL (TDDL/RDS).

    Penting

    Di Realtime Compute for Apache Flink, jika database downstream adalah database relasional seperti MySQL, yang konektornya sesuai adalah TDDL/RDS, deadlock mungkin terjadi jika Realtime Compute for Apache Flink sering menulis ke tabel atau sumber daya.

    Contoh berikut menunjukkan bagaimana deadlock terbentuk:

    Asumsikan operasi INSERT memerlukan dua kunci, (A,B), yang harus diperoleh secara berurutan. A adalah kunci rentang. Terdapat dua transaksi, (T1,T2), dan skema tabel adalah (id(kunci primer auto-increment), nid(kunci unik)). T1 berisi dua pernyataan `insert`, insert(null,2),(null,1), dan T2 berisi satu pernyataan `insert`, insert(null,2).

    1. Pada waktu t, INSERT pertama T1 dieksekusi. Saat ini, T1 memegang dua kunci (A,B).

    2. Pada waktu t+1, T2 mulai menyisipkan. T2 perlu menunggu kunci A untuk mengunci (-inf,2]. Saat ini, A dipegang oleh T1 dan telah mengunci (-inf,2]. Interval memiliki hubungan inklusi, sehingga T2 bergantung pada T1 untuk melepaskan A.

    3. Pada waktu t+2, INSERT kedua T1 dieksekusi. T1 memerlukan A untuk mengunci (-inf,1]. Interval ini termasuk dalam (-inf,2], sehingga perlu menunggu giliran T2 melepaskan kunci. Oleh karena itu, T1 bergantung pada T2 untuk melepaskan A.

    Deadlock terbentuk karena T1 dan T2 saling bergantung dan menunggu satu sama lain.

  • Perbedaan antara kunci mesin database RDS/TDDL dan Tablestore

    • RDS/TDDL: Di InnoDB, kunci baris ditempatkan pada indeks, bukan pada catatan individual. Oleh karena itu, jika baris berbeda diakses menggunakan kunci indeks yang sama, terjadi konflik kunci, mencegah pembaruan data di seluruh area.

    • OTS: Menggunakan kunci baris tunggal, yang tidak memengaruhi pembaruan data lainnya.

  • Solusi untuk deadlock

    Untuk kueri per detik (QPS) tinggi, transaksi per detik (TPS) tinggi, atau skenario penulisan konkurensi tinggi, kami menyarankan Anda menggunakan Tablestore sebagai tabel sink untuk menyelesaikan masalah deadlock. Secara umum, kami tidak menyarankan Anda menggunakan TDDL atau RDS sebagai tabel sink untuk pekerjaan Flink.

    Jika Anda harus menggunakan database relasional seperti MySQL sebagai node sink, kami menyarankan Anda mempertimbangkan saran berikut:

    • Pastikan tidak ada gangguan dari pihak bisnis baca atau tulis lainnya.

    • Jika volume data pekerjaan tidak besar, Anda dapat mencoba penulisan konkurensi tunggal. Namun, dalam situasi QPS/TPS tinggi dan konkurensi tinggi, kinerja penulisan berkurang.

    • Cobalah untuk tidak menggunakan kunci unik. Menulis ke tabel yang memiliki kunci unik dapat menyebabkan deadlock. Jika bisnis Anda mengharuskan tabel berisi kunci unik, Anda dapat mendefinisikan kunci unik dengan mengatur field dalam urutan menurun berdasarkan kemampuan pembedanya. Hal ini dapat sangat mengurangi kemungkinan deadlock. Misalnya, Anda dapat meletakkan fungsi MD5 sebelum day_time(20171010) untuk mendefinisikan kunci unik dengan mengatur field dalam urutan menurun berdasarkan kemampuan pembedanya. Hal ini membantu menyelesaikan masalah deadlock.

    • Anda dapat melakukan sharding berdasarkan karakteristik bisnis untuk menghindari penulisan tabel tunggal sebanyak mungkin. Untuk detail implementasi, hubungi administrator database yang sesuai.

Mengapa skema tabel downstream tidak berubah setelah skema tabel MySQL yang sesuai diperbarui?

Sinkronisasi perubahan skema tidak mengenali DDL tertentu. Sebaliknya, sinkronisasi menangkap perubahan skema antara dua catatan data berturut-turut. Jika hanya terjadi perubahan DDL, tetapi tidak ada data baru atau perubahan data di hulu, perubahan data di downstream tidak dipicu. Untuk informasi lebih lanjut, lihat Kebijakan sinkronisasi untuk perubahan skema.

Apa penyebab pengecualian "finish split response timeout" di sumber?

Pengecualian ini disebabkan oleh penggunaan CPU tugas yang tinggi, yang mencegah tugas merespons permintaan RPC Koordinator tepat waktu. Dalam kasus ini, Anda perlu meningkatkan sumber daya CPU TaskManager di halaman konfigurasi sumber daya.

Apa dampak perubahan skema selama fase sinkronisasi data penuh MySQL CDC?

Jika perubahan skema terjadi selama fase sinkronisasi data penuh, pekerjaan mungkin melaporkan kesalahan atau gagal menyinkronkan perubahan skema. Dalam kasus ini, Anda harus menghentikan pekerjaan, menghapus tabel downstream yang disinkronkan, dan memulai pekerjaan tanpa status.

Apa yang harus saya lakukan jika terjadi perubahan skema yang tidak didukung selama sinkronisasi CTAS/CDAS dan menyebabkan pekerjaan gagal?

Anda perlu menyinkronkan ulang data tabel tersebut. Untuk melakukannya, Anda dapat menghentikan Pekerjaan, menghapus tabel hilir, dan memulai ulang pekerjaan sinkronisasi tanpa state. Hindari modifikasi yang tidak kompatibel ini. Jika tidak, Pekerjaan akan tetap melaporkan kesalahan dan gagal melakukan sinkronisasi setelah dimulai ulang. Untuk informasi selengkapnya tentang dukungan untuk perubahan skema, lihat Pernyataan CREATE TABLE AS (CTAS).

Apakah tabel sink ClickHouse mendukung penarikan kembali dan pembaruan data?

Ya, mendukung. Saat Primary Key ditentukan dalam DDL tabel sink Flink dan parameter ignoreDelete diatur ke false, penarikan kembali dan pembaruan data didukung, tetapi kinerjanya berkurang secara signifikan.

Hal ini karena ClickHouse adalah DBMS berorientasi kolom untuk pemrosesan analitik online (OLAP) dan dukungannya untuk UPDATE dan DELETE tidak sempurna. Jika Primary Key ditentukan dalam DDL Flink, Flink mencoba menggunakan ALTER TABLE UPDATE dan ALTER TABLE DELETE untuk memperbarui dan menghapus data, yang sangat tidak efisien.

Kapan saya dapat melihat data yang ditulis ke tabel sink di ClickHouse?

  • Untuk tabel sink ClickHouse tanpa semantik exactly-once diaktifkan, yang merupakan pengaturan default, selama jumlah catatan data di cache mencapai nilai parameter batchSize, atau waktu tunggu melebihi flushIntervalMs, sistem secara otomatis menulis data di cache ke tabel ClickHouse. Pada saat ini, Anda dapat melihat data yang ditulis ke tabel sink di ClickHouse. Anda tidak perlu menunggu checkpoint berhasil.

  • Untuk tabel sink ClickHouse dengan semantik exactly-once diaktifkan, Anda perlu menunggu checkpoint berhasil sebelum dapat melihat data yang ditulis ke tabel sink di ClickHouse.

Bagaimana cara melihat hasil data print di konsol?

Anda dapat menggunakan salah satu dari dua metode berikut untuk melihat hasil data print:

  • Lihat hasil di konsol pengembangan Realtime Compute:

    1. Di panel navigasi sebelah kiri konsol pengembangan Realtime Compute, pilih Operation Center > Job O&M.

    2. Klik nama pekerjaan target.

    3. Klik Job Logs.

    4. Di tab Runtime Logs, dari daftar drop-down Job, pilih pekerjaan yang sedang berjalan.

      查看启动和运行日志2.jpg

    5. Klik Path, ID di tab Running Task Managers.

      修改运行作业2.jpg

    6. Klik Logs untuk melihat output print.

  • Lihat hasil di UI Flink:

    1. Di panel navigasi kiri konsol pengembangan Realtime Compute, pilih Operation Center > Job O&M.

    2. Klik nama pekerjaan target.

    3. Klik Flink UI di tab Status Overview pekerjaan target.

      上下游存储.jpg

    4. Klik Task Managers.

    5. Klik Path, ID.

    6. Di tab logs, Anda dapat melihat hasil print.

Apa yang harus saya lakukan jika operasi JOIN pada tabel dimensi tidak mengembalikan data?

Anda dapat memeriksa apakah tipe skema dan nama dalam pernyataan DDL dan tabel fisik konsisten.

Apa perbedaan antara max_pt() dan max_pt_with_done()?

max_pt() memilih partisi terbesar secara leksikografis di antara semua partisi. max_pt_with_done() memilih partisi terbesar secara leksikografis di antara semua partisi yang disertai partisi .done. Jika kolom partisi direpresentasikan sebagai berikut:

  • ds=20190101

  • ds=20190101.done

  • ds=20190102

  • ds=20190102.done

  • ds=20190103

Perbedaan antara max_pt() dan max_pt_with_done() adalah sebagai berikut:

  • `partition`='max_pt_with_done()' cocok dengan partisi ds=20190102.

  • `partition`='max_pt()' cocok dengan partisi ds=20190103.

Pekerjaan yang menulis data ke Paimon melaporkan kesalahan "Heartbeat of TaskManager timed out". Apa yang harus saya lakukan?

Penyebab paling mungkin dari kesalahan ini adalah memori heap TaskManager yang tidak mencukupi. Paimon menggunakan memori heap terutama dengan cara berikut:

  • Setiap thread konkuren operator penulis tabel kunci primer Paimon memiliki buffer memori untuk pengurutan. Ukuran buffer ini dikontrol oleh parameter tabel write-buffer-size, dengan nilai default 256 MB.

  • Paimon menggunakan format file ORC secara default, sehingga diperlukan buffer memori lain untuk mengonversi data di memori ke format penyimpanan kolom secara batch. Ukuran buffer ini dikontrol oleh parameter tabel orc.write.batch-size, yang memiliki nilai default 1024. Artinya, buffer menyimpan 1.024 baris data secara default.

  • Setiap bucket yang dimodifikasi memiliki objek penulis khusus untuk menangani data tulis untuk bucket tersebut.

Berdasarkan cara penggunaan memori heap, kemungkinan penyebab memori heap tidak mencukupi dan solusi yang sesuai adalah sebagai berikut:

  • Nilai write-buffer-size terlalu besar.

    Anda dapat mengurangi parameter ini secara tepat. Namun, buffer yang terlalu kecil menyebabkan penulisan ke disk terlalu sering, dan frekuensi penggabungan file kecil juga meningkat. Hal ini memengaruhi kinerja penulisan.

  • Ukuran data tunggal terlalu besar.

    Misalnya, jika data berisi field JSON sebesar 4 MB, ukuran buffer ORC mencapai 4 MB × 1.024 = 4 GB, yang menempati banyak memori heap. Anda dapat menggunakan salah satu solusi berikut:

    • Kurangi nilai orc.write.batch-size.

    • Jika Anda tidak perlu melakukan kueri ad hoc (OLAP) pada tabel sink Paimon, tetapi hanya perlu melakukan konsumsi batch atau streaming, Anda dapat menggunakan format AVRO dan menonaktifkan pengumpulan informasi statistik dengan mengatur parameter tabel 'file.format' = 'avro' dan 'metadata.stats-mode' = 'none' saat membuat tabel.

      Catatan

      Parameter hanya dapat diatur saat tabel dibuat dan tidak dapat dimodifikasi menggunakan pernyataan ALTER TABLE atau SQL Hint setelah tabel dibuat.

  • Jumlah partisi yang ditulis secara bersamaan terlalu besar, atau jumlah bucket di setiap partisi terlalu besar. Hal ini mengakibatkan terlalu banyak objek penulis yang dibuat.

    Anda dapat memeriksa apakah pengaturan kolom kunci partisi masuk akal, apakah data lain ditulis ke kolom kunci partisi karena penulisan SQL yang salah, dan apakah jumlah bucket masuk akal. Kami menyarankan volume data total di setiap bucket sekitar 2 GB, dengan maksimum tidak lebih dari 5 GB. Untuk informasi tentang cara menyesuaikan jumlah bucket, lihat Menyesuaikan jumlah bucket untuk tabel bucket tetap.

Pekerjaan yang menulis data ke Paimon melaporkan kesalahan "Sink materializer must not be used with Paimon sink". Apa yang harus saya lakukan?

Operator sink materializer awalnya digunakan untuk menyelesaikan masalah ketidakteraturan data yang disebabkan oleh JOIN bertingkat dalam pekerjaan streaming. Namun, saat Anda menulis data ke tabel Paimon, operator ini tidak hanya memperkenalkan overhead tambahan tetapi juga dapat menyebabkan hasil perhitungan yang salah saat Anda menggunakan mekanisme penggabungan data Agregasi. Oleh karena itu, operator sink materializer tidak dapat digunakan dalam pekerjaan yang menulis ke tabel Paimon.

Anda dapat menonaktifkan operator sink materializer dengan mengatur parameter table.exec.sink.upsert-materialize ke false menggunakan pernyataan SET atau dengan mengonfigurasi parameter runtime. Jika Anda juga perlu menyelesaikan masalah ketidakteraturan data yang disebabkan oleh JOIN bertingkat, lihat Menangani data tidak teratur.

Pekerjaan yang menulis data ke Paimon melaporkan kesalahan "File deletion conflicts detected" atau "LSM conflicts detected". Apa yang harus saya lakukan?

Kemungkinan penyebab kesalahan ini adalah sebagai berikut:

  • Beberapa pekerjaan menulis ke partisi yang sama dari tabel Paimon yang sama secara bersamaan. Dalam kasus ini, tabel Paimon perlu menyelesaikan konflik dengan gagal dan memulai ulang, yang merupakan fenomena normal. Jika kesalahan tidak berulang, tidak diperlukan tindakan.

  • Pekerjaan dilanjutkan dari status lama yang sudah ada. Dalam kasus ini, kesalahan berulang. Anda perlu melanjutkan pekerjaan dari status terbaru atau memulai pekerjaan tanpa status.

  • Dalam pekerjaan yang sama, beberapa pernyataan INSERT digunakan untuk menulis ke tabel Paimon yang sama. Paimon saat ini tidak mendukung penulisan terpisah melalui beberapa pernyataan INSERT dalam pekerjaan yang sama. Anda dapat menggunakan pernyataan UNION ALL untuk menulis beberapa aliran data ke tabel Paimon.

  • Konkurensi node Global Committer atau node Compaction Coordinator saat Anda menulis data ke tabel Append Scalable lebih besar dari 1. Konkurensi kedua node ini harus 1. Jika tidak, konsistensi data tidak dapat dijamin.

Pekerjaan yang membaca data dari Paimon melaporkan kesalahan "File xxx not found, Possible causes". Apa yang harus saya lakukan?

Konsumsi tabel Paimon bergantung pada file snapshot. Jika waktu kedaluwarsa snapshot terlalu singkat atau pekerjaan konsumsi tidak efisien, file snapshot yang sedang dikonsumsi dihapus karena kedaluwarsa, dan pekerjaan konsumsi melaporkan kesalahan.

Anda dapat mempertimbangkan menyesuaikan waktu kedaluwarsa file snapshot, menentukan ID Konsumen, atau mengoptimalkan pekerjaan konsumsi. Untuk mengkueri file snapshot yang tersedia saat ini dan waktu pembuatan setiap file snapshot, lihat Tabel sistem snapshot.

Pekerjaan Paimon melaporkan kesalahan "No space left on device". Apa yang harus saya lakukan?

  • Jika pekerjaan melibatkan proses kueri Paimon, seperti menggunakan tabel Paimon sebagai tabel dimensi atau menggunakan changelog-producer=lookup saat menulis ke tabel Paimon, Anda dapat menggunakan Petunjuk SQL untuk mengatur parameter berikut. Hal ini membatasi ruang disk maksimum yang ditempati selama proses kueri dan waktu kedaluwarsa cache untuk menghindari ruang disk yang tidak mencukupi akibat terlalu banyak file cache.

    • lookup.cache-max-disk-size: Ruang disk lokal maksimum yang dapat digunakan cache kueri. Kami menyarankan Anda menentukan nilai seperti 256 MB, 512 MB, atau 1 GB.

    • lookup.cache-file-retention: Waktu kedaluwarsa cache kueri. Kami menyarankan Anda menentukan waktu 30 menit, 15 menit, atau kurang.

  • Untuk pekerjaan yang menulis ke tabel Paimon, Anda dapat menggunakan Petunjuk SQL untuk mengatur parameter berikut guna membatasi ukuran total file sementara lokal selama proses penulisan. Hal ini membantu menghindari ruang disk yang tidak mencukupi akibat terlalu banyak file sementara.

    • write-buffer-spillable: Sakelar global untuk membuang cache data tulis ke disk. Jika Anda mengatur parameter ini ke false, Anda dapat mencegah seluruh ruang disk ditempati oleh cache ini.

    • write-buffer-spill.max-disk-size: Ruang maksimum yang dapat digunakan cache data tulis untuk membuang ke disk. Kami menyarankan Anda menentukan nilai seperti 256 MB, 512 MB, atau 1 GB.

Apa yang harus saya lakukan jika terdapat banyak file Paimon di OSS?

  • Paimon menyimpan beberapa file data historis untuk mendukung akses ke status historis tabel. Anda dapat menyesuaikan kebijakan retensi data untuk file data historis. Untuk informasi lebih lanjut, lihat Membersihkan data kedaluwarsa.

  • Pengaturan kolom kunci partisi yang tidak masuk akal atau terlalu banyak bucket juga dapat menyebabkan fenomena ini. Kami menyarankan volume data total di setiap bucket sekitar 2 GB, dengan maksimum tidak lebih dari 5 GB. Untuk informasi lebih lanjut, lihat Metode bucketing.

  • Secara default, file data disimpan dalam format ORC. Anda dapat mengatur parameter tabel 'file.compression' = 'zstd' saat membuat tabel untuk menggunakan format kompresi ZSTD guna mengurangi ukuran total file data.

    Catatan

    Parameter hanya dapat diatur saat tabel dibuat dan tidak dapat dimodifikasi menggunakan pernyataan ALTER TABLE atau SQL Hint setelah tabel dibuat.

Apakah ada hubungan antara visibilitas data konektor Paimon dan interval Checkpoint?

Ya, ada. Semantik exactly-once Paimon bergantung pada checkpoint sistem untuk memastikan jaminan. Paimon hanya melakukan commit data pada setiap checkpoint, yang membuat data ini terlihat oleh konsumen downstream. Sebelum melakukan commit, data di buffer lokal di-flush ke sistem file jarak jauh, tetapi konsumen downstream tidak diberi tahu bahwa mereka dapat membaca data ini.

Mengapa penggunaan memori pekerjaan yang menulis data ke Paimon meningkat perlahan dalam jangka waktu lama?

  • Jika RPS (permintaan per detik) pekerjaan juga meningkat perlahan, wajar jika penggunaan memori pekerjaan juga meningkat.

  • Jika Anda menggunakan Katalog FileSystem Paimon untuk membaca dan menulis ke OSS, pastikan Anda telah mengonfigurasi fs.oss.endpoint, fs.oss.accessKeyId, dan fs.oss.accessKeySecret di parameter Katalog. Jika tidak, jika pekerjaan Flink berjalan lama, mungkin terjadi masalah kebocoran memori lambat yang diketahui di komunitas.

Kesalahan: IllegalArgumentException: timeout value is negative

  • Detail kesalahan报错

  • Penyebab

    Jika tidak ada pesan MQ baru yang dikonsumsi dalam periode waktu tertentu, thread MetaQSource tidur. Durasi tidur adalah nilai yang diatur oleh parameter pullIntervalMs. Namun, nilai default parameter pullIntervalMs adalah -1. Jika -1 digunakan sebagai durasi tidur, pekerjaan melaporkan kesalahan.

  • Solusi

    Anda dapat mengatur parameter pullIntervalMs ke angka non-negatif.

Bagaimana RocketMQ mendeteksi perubahan jumlah partisi topik selama penskalaan topik?

  • Untuk versi mesin komputasi real-time Flink VVR yang lebih awal dari 6.0.2, implementasinya adalah mengambil jumlah partisi saat ini setiap 5 hingga 10 menit. Jika jumlah partisi berbeda dari jumlah partisi asli selama tiga kali berturut-turut, failover dipicu. Oleh karena itu, sumber dapat mendeteksi perubahan jumlah partisi 10 hingga 30 menit setelah perubahan terjadi, dan failover terjadi. Setelah pekerjaan dimulai ulang, pekerjaan membaca data berdasarkan jumlah partisi baru.

  • Untuk versi mesin komputasi real-time Flink VVR 6.0.2 dan yang lebih baru, implementasinya adalah mengambil jumlah partisi saat ini setiap 5 menit secara default. Saat partisi baru ditemukan, partisi tersebut langsung diserahkan ke operator sumber TM untuk membaca data partisi baru, tanpa memerlukan failover pekerjaan. Oleh karena itu, sumber dapat mendeteksi perubahan jumlah partisi dalam 1 hingga 5 menit.

Kesalahan: BackPressure Exceed reject Limit

  • Detail kesalahan报错详情

  • Penyebab

    Tekanan tulis pada Hologres relatif tinggi.

  • Solusi

    Anda dapat memberikan informasi instans kepada dukungan teknis Hologres untuk operasi peningkatan.

Error: Sisa slot koneksi dicadangkan untuk koneksi superuser non-replikasi

  • Detail kesalahan

    Caused by: com.alibaba.hologres.client.exception.HoloClientWithDetailsException: failed records 1, first:Record{schema=org.postgresql.model.TableSchema@188365, values=[f06b41455c694d24a18d0552b8b0****, com.chot.tpfymnq.meta, 2022-04-02 19:46:40.0, 28, 1, null], bitSet={0, 1, 2, 3, 4}},first err:[106]FATAL: remaining connection slots are reserved for non-replication superuser connections
        at com.alibaba.hologres.client.impl.Worker.handlePutAction(Worker.java:406) ~[?:?]
        at com.alibaba.hologres.client.impl.Worker.run(Worker.java:118) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_302]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_302]
        ... 1 more
    Caused by: com.alibaba.hologres.org.postgresql.util.PSQLException: FATAL: remaining connection slots are reserved for non-replication superuser connections
        at com.alibaba.hologres.org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2553) ~[?:?]
        at com.alibaba.hologres.org.postgresql.core.v3.QueryExecutorImpl.readStartupMessages(QueryExecutorImpl.java:2665) ~[?:?]
        at com.alibaba.hologres.org.postgresql.core.v3.QueryExecutorImpl.<init>(QueryExecutorImpl.java:147) ~[?:?]
        at com.alibaba.hologres.org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:273) ~[?:?]
        at com.alibaba.hologres.org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:51) ~[?:?]
        at com.alibaba.hologres.org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:240) ~[?:?]
        at com.alibaba.hologres.org.postgresql.Driver.makeConnection(Driver.java:478) ~[?:?]
        at com.alibaba.hologres.org.postgresql.Driver.connect(Driver.java:277) ~[?:?]
        at java.sql.DriverManager.getConnection(DriverManager.java:674) ~[?:1.8.0_302]
        at java.sql.DriverManager.getConnection(DriverManager.java:217) ~[?:1.8.0_302]
        at com.alibaba.hologres.client.impl.ConnectionHolder.buildConnection(ConnectionHolder.java:122) ~[?:?]
        at com.alibaba.hologres.client.impl.ConnectionHolder.retryExecute(ConnectionHolder.java:195) ~[?:?]
        at com.alibaba.hologres.client.impl.ConnectionHolder.retryExecute(ConnectionHolder.java:184) ~[?:?]
        at com.alibaba.hologres.client.impl.Worker.doHandlePutAction(Worker.java:460) ~[?:?]
        at com.alibaba.hologres.client.impl.Worker.handlePutAction(Worker.java:389) ~[?:?]
        at com.alibaba.hologres.client.impl.Worker.run(Worker.java:118) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_302]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_302]
        ... 1 more
  • Penyebab

    Jumlah koneksi telah melebihi batas.

  • Solusi

    • Lihat app_name setiap node Frontend (FE) yang terhubung untuk memeriksa jumlah koneksi Hologres Client yang digunakan oleh flink-connector.

    • Periksa apakah pekerjaan lain terhubung ke Hologres.

    • Lepaskan koneksi. Untuk informasi lebih lanjut, lihat Manajemen koneksi.

Error: tidak ada tabel yang didefinisikan dalam publication

  • Detail kesalahan

    Menghapus dan membuat ulang tabel dengan nama yang sama dapat menyebabkan pekerjaan melaporkan kesalahan no table is defined in publication.

  • Penyebab

    Saat tabel dihapus, publikasi yang terikat ke tabel tidak dihapus.

  • Solusi

    1. Di Hologres, jalankan perintah select * from pg_publication where pubname not in (select pubname from pg_publication_tables); untuk mengkueri informasi publikasi yang tidak dihapus saat tabel dihapus.

    2. Jalankan pernyataan drop publication xx; untuk menghapus publikasi yang tersisa.

    3. Mulai ulang pekerjaan.

Apa hubungan antara interval checkpoint konektor sink Flink Hologres dan visibilitas akhir data di Hologres?

Tidak ada hubungan langsung antara interval checkpoint (CP) konektor sink Flink Hologres dan visibilitas akhir data di Hologres. Interval CP memengaruhi perjanjian tingkat layanan (SLA) untuk pemulihan data, tetapi tidak menentukan waktu akhir saat data terlihat di Hologres.

Konektor Hologres tidak mendukung transaksi. Konektor hanya secara berkala mem-flush data ke database. Interval CP hanya memastikan bahwa data di-flush ke database pada setiap CP, tetapi tidak berarti menunggu interval ini paling lama. Faktanya, jika buffer memenuhi kondisi tertentu, data di-flush ke downstream lebih awal. Untuk informasi lebih lanjut, lihat Hologres, Hologres, dan Hologres. Secara umum, gudang data tidak perlu memastikan konsistensi transaksional. Konektor secara asinkron mem-flush data di latar belakang lalu memaksa flush pada waktu CP untuk mempersiapkan pemulihan pengecualian.

Pekerjaan yang dipublikasikan melaporkan pengecualian permission denied for database. Apa yang harus saya lakukan?

  • Penyebab

    Mulai dari VVR 8.0.4 mesin komputasi real-time, jika konektor menemukan bahwa pengguna menggunakan instans Hologres versi lebih baru dari 2.0, konektor memaksa penggunaan mode JDBC untuk mengonsumsi log biner. Jika instans Hologres versi 2.0 atau lebih baru dan pengguna bukan Superuser, konfigurasi izin khusus diperlukan untuk mengonsumsi log biner dalam mode JDBC.

  • Solusi

    Jika pengguna bukan Superuser, konfigurasi izin diperlukan untuk mengonsumsi log biner dalam mode JDBC.

    user_name adalah ID akun Alibaba Cloud atau nama pengguna RAM. Untuk informasi lebih lanjut, lihat Ikhtisar akun.

    -- Berikan izin CREATE kepada pengguna di bawah model otorisasi PostgreSQL standar, dan berikan izin Peran Replikasi kepada pengguna untuk instans.
    GRANT CREATE ON DATABASE <db_name> TO <user_name>;
    alter role <user_name> replication;
    
    -- Jika database memiliki model izin sederhana (SLMP) diaktifkan, Anda tidak dapat menjalankan pernyataan GRANT. Gunakan spm_grant untuk memberikan izin Admin kepada pengguna untuk DB. Anda juga dapat memberikan izin langsung di Holoweb.
    call spm_grant('<db_name>_admin', '<user_name>');
    alter role <user_name> replication;

Pekerjaan melaporkan pengecualian table id parsed from checkpoint is different from the current table id saat dilanjutkan dari checkpoint. Apa yang harus saya lakukan?

  • Penyebab

    Masalah ini terjadi karena di versi mesin komputasi real-time VVR 8.0.5 hingga 8.0.8, saat tabel sumber log biner Hologres dilanjutkan dari checkpoint, tabel tersebut memaksa memeriksa ID tabel Hologres. Jika ID tabel saat ini tidak konsisten dengan yang disimpan di checkpoint, pekerjaan tidak dapat dilanjutkan dari checkpoint. Pengecualian ini menunjukkan bahwa pengguna melakukan operasi TRUNCATE atau pembuatan ulang tabel lainnya pada tabel sumber selama eksekusi pekerjaan.

  • Solusi

    Kami menyarankan Anda meningkatkan ke VVR-8.0.9 atau versi yang lebih baru untuk memulai pekerjaan. Mengingat kompleksitas skenario bisnis, pemeriksaan wajib ID tabel dibatalkan di VVR 8.0.9. Namun, tetap tidak disarankan untuk melakukan operasi pembuatan ulang tabel pada tabel sumber log biner. Saat tabel dibuat ulang, log biner historis tabel asli sepenuhnya dihapus. Jika Flink menggunakan offset konsumsi tabel lama untuk mengonsumsi data tabel baru, hal ini dapat menyebabkan inkonsistensi data dan situasi tak terduga lainnya.

Apa yang harus saya lakukan jika presisi data tidak sesuai harapan saat mengonsumsi log biner dalam mode JDBC?

  • Penyebab

    Di versi mesin komputasi real-time VVR 8.0.10 dan sebelumnya, jika presisi tipe DECIMAL dari tabel sumber log biner yang dideklarasikan dalam DDL Flink tidak konsisten dengan yang ada di Hologres, presisi tidak sesuai harapan.

  • Solusi

    Di VVR 8.0.11, masalah ini telah diperbaiki. Namun, Anda tetap harus memastikan bahwa presisi tipe DECIMAL di Flink dan Hologres konsisten untuk menghindari kehilangan data akibat kehilangan presisi.

Mengapa menghapus dan membuat ulang tabel dengan nama yang sama dapat menyebabkan pengecualian no table is defined in publication atau The table xxx has no slot named xxx?

  • Penyebab

    Saat tabel dihapus, publikasi yang terikat ke tabel tidak dihapus.

  • Solusi

    Solusi 1: Anda dapat menjalankan pernyataan select * from pg_publication where pubname not in (select pubname from pg_publication_tables); di Hologres untuk mengkueri publikasi yang tidak dihapus saat tabel dihapus, lalu menjalankan pernyataan drop publication xx; untuk menghapus publikasi yang tersisa. Setelah itu, Anda dapat memulai ulang pekerjaan.

    Solusi 2: Anda dapat memilih VVR 8.0.5 atau versi yang lebih baru. Konektor secara otomatis melakukan operasi pembersihan.

Kesalahan: Caused by: com.aliyun.openservices.aliyun.log.producer.errors.LogSizeTooLargeException: the logs is 8785684 bytes which is larger than MAX_BATCH_SIZE_IN_BYTES 8388608

  • Detail kesalahan

    Caused by: com.aliyun.openservices.aliyun.log.producer.errors.LogSizeTooLargeException: the logs is 8785684 bytes which is larger than MAX_BATCH_SIZE_IN_BYTES 8388608
    at com.aliyun.openservices.aliyun.log.producer.internals.LogAccumulator.ensureValidLogSize(LogAccumulator.java:249)
    at com.aliyun.openservices.aliyun.log.producer.internals.LogAccumulator.doAppend(LogAccumulator.java:103)
    at com.aliyun.openservices.aliyun.log.producer.internals.LogAccumulator.append(LogAccumulator.java:84)
    at com.aliyun.openservices.aliyun.log.producer.LogProducer.send(LogProducer.java:385)
    at com.aliyun.openservices.aliyun.log.producer.LogProducer.send(LogProducer.java:308)
    at com.aliyun.openservices.aliyun.log.producer.LogProducer.send(LogProducer.java:211)
    at com.alibaba.ververica.connectors.sls.sink.SLSOutputFormat.writeRecord(SLSOutputFo
    rmat.java:100)
  • Penyebab

    Log tunggal yang ditulis ke SLS melebihi 8 MB, dan tidak ada data lagi yang dapat ditulis.

  • Solusi

    Anda dapat mengubah offset awal untuk melewati data abnormal yang terlalu besar. Untuk informasi lebih lanjut, lihat Memulai pekerjaan.

Saat program Flink yang gagal dilanjutkan, terjadi OOM TaskManager dan tabel sumber melaporkan kesalahan "java.lang.OutOfMemoryError: Java heap space". Apa yang harus saya lakukan?

  • Penyebab

    Masalah ini biasanya disebabkan oleh isi pesan SLS yang terlalu besar. Konektor SLS meminta data secara batch. LogGroup dikontrol oleh parameter batchGetSize, yang default-nya 100. Oleh karena itu, hingga 100 LogGroup diterima setiap kali. Selama operasi normal, Flink mengonsumsi data tepat waktu, dan umumnya tidak menerima 100 LogGroup. Namun, selama failover, sejumlah besar data yang belum dikonsumsi menumpuk. Jika memori yang ditempati oleh satu LogGroup dikalikan 100 lebih besar dari memori JVM yang tersedia, TaskManager mengalami kesalahan OOM.

  • Solusi

    Anda dapat mengurangi nilai parameter batchGetSize.

Bagaimana cara mengatur offset konsumen untuk tabel sumber Paimon?

Anda dapat mengatur offset konsumen untuk tabel sumber Paimon menggunakan parameter scan.mode. Nilai dan perilaku parameter scan.mode adalah sebagai berikut.

Nilai parameter

Perilaku pembacaan batch

Perilaku baca streaming

default

Nilai default. Perilaku aktual ditentukan oleh parameter lain.

  • Jika scan.timestamp-millis diatur, perilakunya sama dengan nilai parameter from-timestamp.

  • Jika scan.snapshot-id diatur, perilakunya sama dengan nilai parameter from-snapshot.

Jika kedua parameter di atas tidak diatur, perilakunya sama dengan nilai parameter latest-full.

latest-full

Menghasilkan snapshot terbaru dari tabel.

Saat pekerjaan dimulai, pekerjaan pertama kali menghasilkan snapshot terbaru dari tabel, lalu terus menghasilkan data inkremental.

compacted-full

Menghasilkan snapshot tabel setelah kompaksi terakhir.

Saat pekerjaan dimulai, pekerjaan pertama kali menghasilkan snapshot tabel setelah kompaksi terakhir, lalu terus menghasilkan data inkremental.

latest

Sama dengan latest-full.

Saat pekerjaan dimulai, pekerjaan tidak menghasilkan snapshot terbaru dari tabel, lalu terus menghasilkan data inkremental.

from-timestamp

Menghasilkan snapshot terbaru dari tabel sebelum atau pada waktu yang ditentukan oleh scan.timestamp-millis.

Saat pekerjaan dimulai, pekerjaan tidak menghasilkan snapshot, lalu terus menghasilkan data inkremental mulai dari waktu yang ditentukan oleh scan.timestamp-millis.

from-snapshot

Menghasilkan snapshot tabel. Nomor snapshot ditentukan oleh scan.snapshot-id.

Saat pekerjaan dimulai, pekerjaan tidak menghasilkan snapshot, lalu terus menghasilkan data inkremental mulai dari snapshot yang ditentukan oleh scan.snapshot-id.

from-snapshot-full

Sama dengan from-snapshot.

Saat pekerjaan dimulai, pekerjaan menghasilkan snapshot tabel. Nomor snapshot ditentukan oleh scan.snapshot-id. Kemudian, pekerjaan terus menghasilkan data inkremental mulai dari setelah snapshot yang ditentukan oleh scan.snapshot-id.

Bagaimana cara mengonfigurasi kadaluarsa partisi otomatis?

Tabel Paimon mendukung penghapusan otomatis partisi yang masa hidupnya lebih besar dari waktu kedaluwarsa partisi untuk menghemat biaya penyimpanan. Detailnya adalah sebagai berikut:

  • Masa hidup: Waktu sistem saat ini dikurangi timestamp yang dikonversi dari nilai partisi. Timestamp yang dikonversi dari nilai partisi diperoleh dengan urutan berikut:

    1. Konversi nilai partisi ke string waktu menggunakan string format parameter partition.timestamp-pattern.

      Dalam string format ini, kolom kunci partisi direpresentasikan oleh tanda dolar ($) diikuti nama kolom. Misalnya, asumsikan kolom kunci partisi terdiri dari empat kolom: year, month, day, dan hour. String format $year-$month-$day $hour:00:00 mengonversi partisi year=2023,month=04,day=21,hour=17 ke string 2023-04-21 17:00:00.

    2. Konversi string waktu ke timestamp menggunakan string format parameter partition.timestamp-formatter.

      Jika parameter ini tidak diatur, string format yyyy-MM-dd HH:mm:ss dan yyyy-MM-dd dicoba secara default. String format apa pun yang kompatibel dengan DateTimeFormatter Java dapat digunakan.

  • Waktu kedaluwarsa partisi: Nilai yang Anda atur untuk parameter partition.expiration-time.

Apa yang harus saya lakukan jika tidak dapat menemukan data di penyimpanan?

  • Jika data belum di-flush ke disk, hal ini merupakan fenomena normal karena penulis Flink mem-flush data ke disk berdasarkan kebijakan berikut:

    • Bucket tertentu terakumulasi hingga ukuran tertentu di memori. Ukuran default adalah 64 MB.

    • Ukuran buffer total terakumulasi hingga ukuran tertentu. Ukuran default adalah 1 GB.

    • Checkpoint dipicu, dan semua data di memori di-flush keluar.

  • Jika ini adalah penulisan streaming, pastikan checkpoint diaktifkan.

Apa yang harus saya lakukan jika terdapat data duplikat?

  • Jika Anda menggunakan penulisan Copy On Write (COW), Anda perlu mengaktifkan parameter write.insert.drop.duplicates.

    Untuk penulisan COW, file pertama setiap bucket tidak dideduplikasi secara default. Hanya data inkremental yang dideduplikasi. Deduplikasi global memerlukan pengaktifan parameter ini. Penulisan Merge On Read (MOR) tidak memerlukan pengaktifan parameter apa pun. Setelah kunci primer didefinisikan, deduplikasi global diaktifkan secara default.

    Catatan

    Mulai dari Hudi 0.10.0, properti ini telah diganti namanya menjadi write.precombine, dan nilai default-nya adalah true.

  • Untuk mendeduplikasi beberapa partisi, Anda perlu mengatur parameter index.global.enabled ke true.

    Catatan
    • Mulai dari Hudi 0.10.0, properti ini default-nya true.

    • Saat index.type=bucket, mengatur parameter index.global.enabled ke true tidak valid karena indeks Bucket tidak mendukung perubahan lintas partisi. Oleh karena itu, meskipun indeks global diaktifkan, deduplikasi beberapa partisi tidak dapat dicapai.

  • Untuk pembaruan jangka panjang, seperti memperbarui data dari sebulan yang lalu, Anda perlu meningkatkan nilai index.state.ttl (dalam hari).

    Indeks adalah struktur data inti untuk menentukan duplikasi data. index.state.ttl mengatur waktu retensi indeks, yang default-nya 1,5 hari. Nilai kurang dari 0 berarti retensi permanen.

    Catatan

    Mulai dari Hudi 0.10.0, properti ini default-nya 0.

Mengapa Merge On Read hanya memiliki file log?

  • Penyebab: Hudi hanya menghasilkan file Parquet setelah melakukan kompaksi. Jika tidak, hanya file log yang ada. Merge On Read mengaktifkan kompaksi asinkron secara default, dan kebijakannya adalah melakukan kompaksi sekali setiap 5 commit. Tugas kompaksi hanya dipicu saat kondisi terpenuhi.

  • Solusi: Anda dapat memicu tugas kompaksi lebih cepat dengan menyesuaikan parameter interval kompaksi compaction.delta_commits.

Kesalahan: Ditemukan multi-statement

  • Detail

    Saat pekerjaan Flink menulis data ke AnalyticDB for MySQL (ADB), pekerjaan dimulai ulang secara abnormal dan melaporkan kesalahan Caused by: java.sql.SQLSyntaxErrorException: [13000, 2024101216171419216823505703151806929] multi-statement be found.

    image

  • Penyebab

    Saat konfigurasi ALLOW_MULTI_QUERIES=true diaktifkan untuk database AnalyticDB for MySQL (ADB) dan digunakan dengan MySQL JDBC Driver 8.x, terjadi masalah kompatibilitas.

  • Solusi

    1. Hubungi dukungan teknis untuk mendapatkan konektor kustom ADB 3.0 untuk MySQL JDBC Driver versi 5.1.46, lalu terapkan ke pekerjaan Flink Anda. Untuk informasi tentang cara menggunakan konektor kustom, lihat Mengelola konektor kustom.

    2. Konfigurasikan parameter allowMultiQueries=true pada URI tabel ADB, misalnya, jdbc:mysql://xxxxx.ads.aliyuncs.com:3306/xxx?allowMultiQueries=true'.

Kesalahan: No suitable driver found for

  • Penyebab

    Konektor kustom tidak dapat menemukan driver yang sesuai.

  • Solusi