全部产品
Search
文档中心

MaxCompute:Gunakan Flink untuk menulis data ke MaxCompute

更新时间:Nov 10, 2025

MaxCompute menyediakan konektor Flink baru. Anda dapat menggunakannya untuk menulis data dari Flink ke tabel standar maupun Delta di MaxCompute. Topik ini menjelaskan kemampuan konektor Flink baru serta prosedur utama untuk menulis data ke MaxCompute.

Informasi latar belakang

  • Mode penulisan yang didukung

    Konektor Flink baru mendukung penulisan data ke MaxCompute dalam mode upsert atau insert. Dalam mode upsert, data dapat ditulis dengan salah satu dari dua cara berikut:

    • Dikelompokkan berdasarkan kunci primer

    • Dikelompokkan berdasarkan bidang partisi

      Jika tabel memiliki banyak partisi, Anda dapat mengelompokkan data berdasarkan bidang partisi. Perhatikan bahwa metode ini dapat menyebabkan kesenjangan data.

  • Untuk informasi lebih lanjut tentang proses penulisan data dan konfigurasi parameter yang direkomendasikan untuk konektor Flink dalam mode upsert, lihat Ingesti Data Real-time ke gudang data.

  • Saat mengonfigurasi pekerjaan Flink untuk menulis data ke MaxCompute, Anda dapat mengatur parameter konektor Flink untuk menentukan mode penulisan. Untuk daftar lengkap parameter konektor, lihat Lampiran: Semua parameter konektor Flink baru.

  • Atur interval checkpoint untuk pekerjaan upsert Flink menjadi lebih dari 3 menit. Jika tidak, efisiensi penulisan mungkin rendah dan banyak file kecil akan dihasilkan.

  • Tabel berikut memetakan tipe data bidang antara MaxCompute dan Realtime Compute for Apache Flink.

    Tipe data Flink

    Tipe data MaxCompute

    CHAR(p)

    CHAR(p)

    VARCHAR(p)

    VARCHAR(p)

    STRING

    STRING

    BOOLEAN

    BOOLEAN

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    INT

    INT

    BIGINT

    LONG

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    DECIMAL(p, s)

    DECIMAL(p, s)

    DATE

    DATE

    TIMESTAMP(9) WITHOUT TIME ZONE, TIMESTAMP_LTZ(9)

    TIMESTAMP

    TIMESTAMP(3) WITHOUT TIME ZONE, TIMESTAMP_LTZ(3)

    DATETIME

    BYTES

    BINARY

    ARRAY<T>

    LIST<T>

    MAP<K, V>

    MAP<K, V>

    ROW

    STRUCT

    Catatan

    Tipe data TIMESTAMP Flink tidak mencakup zona waktu, sedangkan tipe data TIMESTAMP MaxCompute mencakup zona waktu. Perbedaan ini menyebabkan selisih waktu 8 jam. Gunakan TIMESTAMP_LTZ(9) untuk menyelaraskan timestamp.

    -- FlinkSQL
    CREATE TEMPORARY TABLE odps_source(
      id BIGINT NOT NULL COMMENT 'id',
      created_time TIMESTAMP NOT NULL COMMENT 'Waktu pembuatan',
      updated_time TIMESTAMP_LTZ(9) NOT NULL COMMENT 'Waktu pembaruan',
    PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
    'connector' = 'maxcompute',
    ...
    );

Menulis data dari kluster Flink open source yang dikelola sendiri ke MaxCompute

  1. Persiapan: Buat tabel MaxCompute.

    Pertama, buat tabel MaxCompute tempat Anda dapat menulis data Flink. Contoh berikut menunjukkan cara membuat tabel Delta non-partisi dan tabel Delta terpartisi untuk mendemonstrasikan proses utama penulisan data Flink ke MaxCompute. Untuk informasi tentang properti tabel, lihat Parameter tabel Delta.

    -- Buat tabel Delta non-partisi.
    CREATE TABLE mf_flink_tt (
      id BIGINT not null,
      name STRING,
      age INT,
      status BOOLEAN, primary key (id)
    )
    tblproperties ("transactional"="true", 
                   "write.bucket.num" = "64", 
                   "acid.data.retain.hours"="12") ;
    
    -- Buat tabel Delta partisi.
    CREATE TABLE mf_flink_tt_part (
      id BIGINT not null,
      name STRING,
      age INT,
      status BOOLEAN, 
      primary key (id)
    )
      partitioned by (dd string, hh string) 
      tblproperties ("transactional"="true", 
                     "write.bucket.num" = "64", 
                     "acid.data.retain.hours"="12") ;
    
  2. Buat kluster open source Flink. Versi Flink 1.13, 1.15, 1.16, dan 1.17 didukung. Pilih konektor Flink yang sesuai dengan versi Flink Anda:

    Catatan
    • Konektor Flink untuk versi 1.16 dapat digunakan untuk Flink 1.17.

    • Topik ini menggunakan Konektor Flink 1.13 sebagai contoh. Unduh paket ke lingkungan lokal Anda dan ekstrak.

  3. Unduh konektor Flink dan tambahkan ke paket kluster Flink.

    1. Unduh paket JAR konektor Flink ke lingkungan lokal Anda.

    2. Tambahkan paket JAR konektor Flink ke direktori lib dari paket instalasi Flink yang telah diekstrak.

      mv flink-connector-odps-1.13-shaded.jar $FLINK_HOME/lib/flink-connector-odps-1.13-shaded.jar
  4. Mulai layanan Flink.

    cd $FLINK_HOME/bin
    ./start-cluster.sh
  5. Mulai klien SQL Flink.

    cd $FLINK_HOME/bin
    ./sql-client.sh
  6. Buat tabel Flink dan konfigurasikan parameter konektor Flink.

    Anda dapat membuat tabel Flink dan mengonfigurasi parameter menggunakan Flink SQL atau API DataStream Flink. Bagian berikut memberikan contoh inti untuk kedua metode tersebut.

    Gunakan Flink SQL

    1. Buka editor Flink SQL dan jalankan perintah berikut untuk membuat tabel serta mengonfigurasi parameter.

      -- Daftarkan tabel non-partisi yang sesuai di Flink SQL.
      CREATE TABLE mf_flink (
        id BIGINT,
        name STRING,
        age INT,
        status BOOLEAN,
        PRIMARY KEY(id) NOT ENFORCED
      ) WITH (
        'connector' = 'maxcompute',
        'table.name' = 'mf_flink_tt',
        'sink.operation' = 'upsert',
        'odps.access.id'='LTAI****************',
        'odps.access.key'='********************',
        'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api',
        'odps.project.name'='mf_mc_bj'
      );
      
      -- Daftarkan tabel terpartisi yang sesuai di Flink SQL.
      CREATE TABLE mf_flink_part (
        id BIGINT,
        name STRING,
        age INT,
        status BOOLEAN,
        dd STRING,
        hh STRING,
        PRIMARY KEY(id) NOT ENFORCED
      ) PARTITIONED BY (`dd`,`hh`)
      WITH (
        'connector' = 'maxcompute',
        'table.name' = 'mf_flink_tt_part',
        'sink.operation' = 'upsert',
        'odps.access.id'='LTAI****************',
        'odps.access.key'='********************',
        'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api',
        'odps.project.name'='mf_mc_bj'
      );
    2. Tulis data ke tabel Flink dan kueri tabel MaxCompute untuk memverifikasi bahwa data berhasil ditulis.

      -- Masukkan data ke tabel non-partisi di klien Flink SQL.
      INSERT INTO mf_flink VALUES (1,'Danny',27, false);
      
      -- Kueri data di MaxCompute dan periksa hasilnya.
      SELECT * FROM mf_flink_tt;
      +------------+------+------+--------+
      | id         | name | age  | status |
      +------------+------+------+--------+
      | 1          | Danny | 27   | false  |
      +------------+------+------+--------+
      
      -- Masukkan data ke tabel non-partisi di klien Flink SQL.
      INSERT INTO mf_flink VALUES (1,'Danny',28, false);
      -- Kueri data di MaxCompute dan periksa hasilnya.
      SELECT * FROM mf_flink_tt;
      +------------+------+------+--------+
      | id         | name | age  | status |
      +------------+------+------+--------+
      | 1          | Danny | 28   | false  |
      +------------+------+------+--------+
      
      -- Masukkan data ke tabel terpartisi di klien Flink SQL.
      INSERT INTO mf_flink_part VALUES (1,'Danny',27, false, '01','01');
      -- Kueri data di MaxCompute dan periksa hasilnya.
      SELECT * FROM mf_flink_tt_part WHERE dd=01 AND hh=01;
      +------------+------+------+--------+----+----+
      | id         | name | age  | status | dd | hh |
      +------------+------+------+--------+----+----+
      | 1          | Danny | 27   | false  | 01 | 01 |
      +------------+------+------+--------+----+----+
      
      -- Masukkan data ke tabel terpartisi di klien Flink SQL.
      INSERT INTO mf_flink_part VALUES (1,'Danny',30, false, '01','01');
      -- Kueri data di MaxCompute dan periksa hasilnya.
      SELECT * FROM mf_flink_tt_part WHERE dd=01 AND hh=01;
      +------------+------+------+--------+----+----+
      | id         | name | age  | status | dd | hh |
      +------------+------+------+--------+----+----+
      | 1          | Danny | 30   | false  | 01 | 01 |
      +------------+------+------+--------+----+----+

    Gunakan DataStream API

    1. Saat menggunakan API DataStream, pertama-tama tambahkan dependensi berikut.

      <dependency>
        <groupId>com.aliyun.odps</groupId>
        <artifactId>flink-connector-maxcompute</artifactId>
                  <version>xxx</version>
                  <scope>system</scope>
                  <systemPath>${mvn_project.basedir}/lib/flink-connector-maxcompute-xxx-shaded.jar</systemPath>
      </dependency>
      Catatan

      Ganti xxx dengan nomor versi sebenarnya.

    2. Kode contoh berikut menunjukkan cara membuat tabel dan mengonfigurasi parameter.

      package com.aliyun.odps.flink.examples;
      
      import org.apache.flink.configuration.Configuration;
      import org.apache.flink.odps.table.OdpsOptions;
      import org.apache.flink.odps.util.OdpsConf;
      import org.apache.flink.odps.util.OdpsPipeline;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.table.api.Table;
      import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
      import org.apache.flink.table.data.RowData;
      
      public class Examples {
      
          public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.enableCheckpointing(120 * 1000);
      
              StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env);
      
              Table source = streamTableEnvironment.sqlQuery("SELECT * FROM source_table");
              DataStream<RowData> input = streamTableEnvironment.toAppendStream(source, RowData.class);
      
              Configuration config = new Configuration();
              config.set(OdpsOptions.SINK_OPERATION, "upsert");
              config.set(OdpsOptions.UPSERT_COMMIT_THREAD_NUM, 8);
              config.set(OdpsOptions.UPSERT_MAJOR_COMPACT_MIN_COMMITS, 100);
      
              OdpsConf odpsConfig = new OdpsConf("accessid",
                      "accesskey",
                      "endpoint",
                      "project",
                      "tunnel endpoint");
      
              OdpsPipeline.Builder builder = OdpsPipeline.builder();
              builder.projectName("sql2_isolation_2a")
                      .tableName("user_ledger_portfolio")
                      .partition("")
                      .configuration(config)
                      .odpsConf(odpsConfig)
                      .sink(input, false);
              env.execute();
          }
      }

Menulis data dari Flink yang sepenuhnya dikelola di Alibaba Cloud ke MaxCompute

  1. Persiapan: Buat tabel MaxCompute.

    Pertama, buat tabel MaxCompute tempat Anda dapat menulis data Flink. Contoh berikut menunjukkan cara membuat tabel Delta.

    SET odps.sql.type.system.odps2=true;
    DROP TABLE mf_flink_upsert;
    CREATE TABLE mf_flink_upsert (
      c1 int not null, 
      c2 string, 
      gt timestamp,
      primary key (c1)
    ) 
      PARTITIONED BY (ds string)
      tblproperties ("transactional"="true",
                     "write.bucket.num" = "64", 
                     "acid.data.retain.hours"="12") ;
  2. Masuk ke Konsol Realtime Compute for Apache Flink dan lihat informasi konektor Flink. Konektor Flink telah dimuat sebelumnya pada Platform Ververica (VVP) untuk Flink yang sepenuhnya dikelola di Alibaba Cloud.

  3. Gunakan pekerjaan Flink SQL untuk membuat tabel Flink dan menghasilkan data Flink real-time. Setelah Anda mengembangkan pekerjaan tersebut, Anda dapat menerapkannya.

    Di halaman pengembangan pekerjaan Flink, buat dan edit pekerjaan Flink SQL. Contoh berikut membuat tabel sumber Flink dan tabel sink Flink sementara. Contoh ini juga mencakup logika untuk secara otomatis menghasilkan data real-time dan menuliskannya ke tabel sumber. Logika pekerjaan kemudian menulis data dari tabel sumber ke tabel sink sementara. Untuk informasi lebih lanjut tentang pengembangan pekerjaan SQL, lihat Peta pengembangan pekerjaan.

    -- Buat tabel sumber Flink.
    CREATE TEMPORARY TABLE fake_src_table
    (
        c1 int,
        c2 VARCHAR,
        gt AS CURRENT_TIMESTAMP
    ) WITH (
      'connector' = 'faker',
      'fields.c2.expression' = '#{superhero.name}',
      'rows-per-second' = '100',
      'fields.c1.expression' = '#{number.numberBetween ''0'',''1000''}'
    );
    
    -- Buat tabel sink Flink sementara.
    CREATE TEMPORARY TABLE test_c_d_g 
    (
        c1 int,
        c2 VARCHAR,
        gt TIMESTAMP,
        ds varchar,
        PRIMARY KEY(c1) NOT ENFORCED
     ) PARTITIONED BY(ds)
     WITH (
        		'connector' = 'maxcompute',
        		'table.name' = 'mf_flink_upsert',
        		'sink.operation' = 'upsert',
        		'odps.access.id'='LTAI****************',
        		'odps.access.key'='********************',
        		'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api',
        		'odps.project.name'='mf_mc_bj',
        		'upsert.write.bucket.num'='64'
    );
    
    -- Logika komputasi Flink
    INSERT INTO test_c_d_g
    SELECT  c1 AS c1,
            c2 AS c2,
            gt AS gt,
            date_format(gt, 'yyyyMMddHH') AS ds
    FROM    fake_src_table;

    Di mana:

    odps.end.point: Gunakan Titik akhir jaringan internal untuk wilayah yang sesuai.

    upsert.write.bucket.num: Nilai parameter ini harus sama dengan nilai properti write.bucket.num dari tabel Delta di MaxCompute.

  4. Kueri data di MaxCompute untuk memverifikasi bahwa data Flink berhasil ditulis.

    SELECT * FROM mf_flink_upsert WHERE ds=2023061517;
    
    -- Hasil: Karena data Flink dihasilkan secara acak, hasil kueri Anda di MaxCompute mungkin berbeda dari contoh.
    +------+----+------+----+
    | c1   | c2 | gt   | ds |
    +------+----+------+----+
    | 0    | Skaar | 2023-06-16 01:59:41.116 | 2023061517 |
    | 21   | Supah Century | 2023-06-16 01:59:59.117 | 2023061517 |
    | 104  | Dark Gorilla Grodd | 2023-06-16 01:59:57.117 | 2023061517 |
    | 126  | Leader | 2023-06-16 01:59:39.116 | 2023061517 |
    

Lampiran: Semua parameter konektor Flink baru

  • Parameter Dasar

    Parameter

    Diperlukan

    Nilai default

    Deskripsi

    connector

    Ya

    Tidak ada

    Jenis konektor. Atur ke MaxCompute.

    odps.project.name

    Ya

    Tidak ada

    Nama proyek MaxCompute.

    odps.access.id

    Ya

    Tidak ada

    ID AccessKey Akun Alibaba Cloud Anda. Anda dapat melihat informasi ini di halaman Pasangan AccessKey.

    odps.access.key

    Ya

    Tidak ada

    Rahasia AccessKey Akun Alibaba Cloud Anda. Anda dapat melihat informasi ini di halaman Pasangan AccessKey.

    odps.end.point

    Ya

    Tidak ada

    Titik akhir MaxCompute. Untuk Titik akhir MaxCompute di berbagai Wilayah, lihat Titik akhir.

    odps.tunnel.end.point

    Tidak

    Tidak ada

    Titik akhir publik layanan Tunnel. Jika Anda tidak mengonfigurasi Titik akhir Tunnel, lalu lintas akan secara otomatis diarahkan ke Titik akhir Tunnel yang sesuai dengan jaringan tempat layanan MaxCompute berada. Jika Anda mengonfigurasi Titik akhir Tunnel, lalu lintas akan diarahkan ke Titik akhir yang ditentukan dan pengarahan otomatis dinonaktifkan.

    Untuk Titik akhir Tunnel di berbagai Wilayah dan jenis jaringan, lihat Titik akhir.

    odps.tunnel.quota.name

    Tidak

    Tidak ada

    Nama kuota Tunnel yang digunakan untuk mengakses MaxCompute.

    table.name

    Ya

    Tidak ada

    Nama tabel MaxCompute. Formatnya adalah [project.][schema.]table.

    odps.namespace.schema

    Tidak

    false

    Menentukan apakah akan menggunakan model tiga lapisan. Untuk informasi lebih lanjut tentang model tiga lapisan, lihat Operasi skema.

    sink.operation

    Ya

    insert

    Mode penulisan. Nilai yang valid: insert dan upsert.

    Catatan

    Hanya tabel Delta MaxCompute yang mendukung penulisan upsert.

    sink.parallelism

    Tidak

    Tidak ada

    Tingkat paralelisme untuk penulisan. Jika tidak diatur, tingkat paralelisme data masukan digunakan secara default.

    Catatan

    Pastikan bahwa nilai properti tabel write.bucket.num merupakan kelipatan bilangan bulat dari nilai parameter ini. Hal ini memastikan kinerja penulisan optimal dan menghemat memori paling banyak pada node sink.

    sink.meta.cache.time

    Tidak

    400

    Ukuran cache metadata.

    sink.meta.cache.expire.time

    Tidak

    1200

    Periode timeout untuk cache metadata dalam satuan detik (s).

    sink.coordinator.enable

    Tidak

    Ya.

    Menentukan apakah akan mengaktifkan mode koordinator.

  • Parameter Partisi

    Parameter

    Diperlukan

    Nilai default

    Deskripsi

    sink.partition

    Tidak

    Tidak ada

    Nama partisi yang akan ditulis.

    Jika Anda menggunakan partisi dinamis, ini adalah nama partisi induk dari partisi dinamis.

    sink.partition.default-value

    Tidak

    __DEFAULT_PARTITION__

    Nama partisi default yang digunakan untuk partisi dinamis.

    sink.dynamic-partition.limit

    Tidak

    100

    Saat menulis ke partisi dinamis, ini adalah jumlah maksimum partisi yang dapat diimpor secara bersamaan dalam satu checkpoint.

    Catatan

    Jangan meningkatkan nilai ini secara signifikan. Menulis ke terlalu banyak partisi secara bersamaan dapat menyebabkan error kehabisan memori (OOM) pada node sink. Jika jumlah partisi konkuren untuk penulisan melebihi ambang batas, pekerjaan penulisan gagal.

    sink.group-partition.enable

    Tidak

    false

    Saat menulis ke partisi dinamis, menentukan apakah akan mengelompokkan data berdasarkan partisi.

    sink.partition.assigner.class

    Tidak

    Tidak ada

    Kelas implementasi PartitionAssigner.

  • Parameter untuk menulis dalam mode FileCached

    Jika Anda memiliki banyak partisi dinamis, Anda dapat menggunakan mode cache file. Anda dapat menggunakan parameter berikut untuk mengonfigurasi informasi file cache untuk penulisan data.

    Parameter

    Diperlukan

    Nilai default

    Deskripsi

    sink.file-cached.enable

    Tidak

    false

    Menentukan apakah akan mengaktifkan penulisan dalam mode FileCached. Nilai yang valid:

    • false: Menonaktifkan mode.

    • true: Mengaktifkan mode.

      Catatan

      Gunakan mode cache file saat terdapat banyak partisi dinamis.

    sink.file-cached.tmp.dirs

    Tidak

    ./local

    Direktori default untuk file cache dalam mode cache file.

    sink.file-cached.writer.num

    Tidak

    16

    Jumlah thread konkuren untuk satu tugas mengunggah data dalam mode cache file.

    Catatan

    Jangan meningkatkan nilai ini secara signifikan. Menulis ke terlalu banyak partisi secara bersamaan dapat menyebabkan error OOM.

    sink.bucket.check-interval

    Tidak

    60000

    Interval untuk memeriksa ukuran file dalam mode cache file. Satuan: milidetik (ms).

    sink.file-cached.rolling.max-size

    Tidak

    16 M

    Ukuran maksimum satu file cache dalam mode cache file.

    Jika file melebihi ukuran ini, datanya diunggah ke server.

    sink.file-cached.memory

    Tidak

    64 M

    Ukuran maksimum memori off-heap yang digunakan untuk menulis file dalam mode cache file.

    sink.file-cached.memory.segment-size

    Tidak

    128 KB

    Ukuran buffer yang digunakan untuk menulis file dalam mode cache file.

    sink.file-cached.flush.always

    Tidak

    true

    Dalam mode cache file, menentukan apakah akan menggunakan cache saat menulis file.

    sink.file-cached.write.max-retries

    Tidak

    3

    Jumlah percobaan ulang untuk mengunggah data dalam mode cache file.

  • Parameter untuk penulisan insert atau upsert

    Parameter penulisan Upsert

    Parameter

    Diperlukan

    Nilai default

    Deskripsi

    upsert.writer.max-retries

    Tidak

    3

    Jumlah percobaan ulang jika Upsert Writer gagal menulis ke bucket.

    upsert.writer.buffer-size

    Tidak

    64 m

    Ukuran cache untuk satu Upsert Writer di Flink.

    Catatan
    • Saat ukuran buffer total semua bucket mencapai ambang batas, sistem secara otomatis mengosongkan data ke server.

    • Satu Upsert Writer dapat menulis ke beberapa bucket secara bersamaan. Tingkatkan nilai ini untuk meningkatkan efisiensi penulisan.

    • Jika Anda menulis ke banyak partisi, terdapat risiko error OOM. Dalam kasus ini, pertimbangkan untuk mengurangi nilai ini.

    upsert.writer.bucket.buffer-size

    Tidak

    1 m

    Ukuran cache untuk satu bucket di Flink. Jika server Flink kekurangan sumber daya memori, Anda dapat mengurangi nilai ini.

    upsert.write.bucket.num

    Ya

    Tidak ada

    Jumlah bucket untuk tabel sink. Nilai ini harus sama dengan nilai properti write.bucket.num dari tabel sink.

    upsert.write.slot-num

    Tidak

    1

    Jumlah slot Tunnel yang digunakan oleh satu sesi.

    upsert.commit.max-retries

    Tidak

    3

    Jumlah percobaan ulang untuk commit sesi upsert.

    upsert.commit.thread-num

    Tidak

    16

    Tingkat paralelisme untuk commit sesi upsert.

    Jangan mengatur parameter ini ke nilai yang besar. Jumlah commit konkuren yang tinggi meningkatkan konsumsi sumber daya, yang dapat menyebabkan masalah kinerja atau penggunaan sumber daya berlebihan.

    upsert.major-compact.min-commits

    Tidak

    100

    Jumlah minimum commit yang diperlukan untuk memicu kompaksi mayor.

    upsert.commit.timeout

    Tidak

    600

    Periode timeout untuk commit sesi upsert menunggu. Satuan: detik (s).

    upsert.major-compact.enable

    Tidak

    false

    Menentukan apakah akan mengaktifkan pemadatan utama.

    upsert.flush.concurrent

    Tidak

    2

    Jumlah maksimum bucket yang dapat ditulis secara bersamaan dalam satu partisi.

    Catatan

    Setiap kali data dalam bucket dikosongkan, satu slot Tunnel digunakan.

    Catatan

    Untuk informasi lebih lanjut tentang konfigurasi parameter yang direkomendasikan untuk penulisan upsert, lihat Konfigurasi parameter yang direkomendasikan untuk penulisan upsert.

    Parameter penulisan Insert

    Parameter

    Diperlukan

    Nilai default

    Deskripsi

    insert.commit.thread-num

    Tidak

    16

    Tingkat paralelisme untuk sesi commit.

    insert.arrow-writer.enable

    Tidak

    false

    Menentukan apakah akan menggunakan format Arrow.

    insert.arrow-writer.batch-size

    Tidak

    512

    Jumlah maksimum baris dalam satu batch Arrow.

    insert.arrow-writer.flush-interval

    Tidak

    100000

    Interval di mana writer mengosongkan data. Satuan: milidetik (ms).

    insert.writer.buffer-size

    Tidak

    64 M

    Ukuran cache untuk penulis yang di-buffer.