全部产品
Search
文档中心

Realtime Compute for Apache Flink:Memulai dengan pemrosesan batch Realtime Compute for Apache Flink

更新时间:Jul 02, 2025

Sebagai kerangka komputasi yang mendukung pemrosesan aliran dan batch terpadu, Flink dapat memproses data aliran dengan latensi rendah serta data batch dengan throughput tinggi. Realtime Compute for Apache Flink menyediakan fitur seperti pengembangan draf, penyebaran O&M, alur kerja, manajemen antrian, dan profil data untuk mendukung pemrosesan batch. Anda dapat menggunakan kemampuan ini untuk memenuhi kebutuhan bisnis Anda. Topik ini menjelaskan cara menggunakan fitur utama Realtime Compute for Apache Flink dalam pemrosesan batch.

Fitur

Berikut adalah fitur utama dari Realtime Compute for Apache Flink yang mendukung pemrosesan batch:

  • Pengembangan Draf SQL: Di tab Draft halaman Editor SQL, Anda dapat membuat draf batch. Draf tersebut dapat disebarkan sebagai penyebaran batch dan dijalankan. Untuk informasi lebih lanjut, lihat Mengembangkan Draf SQL.

  • Manajemen Penyebaran: Di halaman Deployments, Anda dapat langsung menyebarkan draf batch JAR atau Python sebagai penyebaran batch. Untuk informasi lebih lanjut, lihat Manajemen Penyebaran. Pilih BATCH dari daftar drop-down jenis penyebaran. Perluas penyebaran batch yang diinginkan untuk melihat tugas-tugasnya. Biasanya, tugas-tugas berbeda dari penyebaran batch memiliki logika pemrosesan data yang sama tetapi parameter yang berbeda, seperti tanggal pemrosesan data.

  • Skrip: Di tab Scripts halaman Editor SQL, Anda dapat mengeksekusi pernyataan DDL atau kueri pendek untuk mengelola data secara cepat dan melakukan profil data. Untuk informasi lebih lanjut tentang pembuatan skrip, lihat Skrip. Kueri pendek dieksekusi dalam sesi sebelumnya di Realtime Compute for Apache Flink, memungkinkan Anda melakukan kueri sederhana dengan latensi rendah dengan menggunakan kembali sumber daya.

  • Katalog: Di halaman Catalogs, Anda dapat membuat dan melihat katalog yang berisi informasi tentang database dan tabel. Untuk informasi lebih lanjut, lihat Kelola Katalog. Anda juga dapat melihat katalog di tab Katalog halaman Editor SQL, meningkatkan efisiensi pengembangan.

  • Alur Kerja: Di halaman Workflows, Anda dapat membuat alur kerja dan mengonfigurasi dependensi antar tugas secara visual. Tugas-tugas ini terkait dengan penyebaran batch. Untuk informasi lebih lanjut, lihat Alur Kerja (pratinjau publik). Penyebaran batch dijalankan berdasarkan dependensi yang dikonfigurasikan. Anda dapat menjalankan tugas secara manual atau periodik dalam alur kerja yang dibuat.

  • Manajemen Antrian: Di halaman Queue Management, Anda dapat membagi sumber daya dalam ruang kerja untuk mencegah persaingan sumber daya antara penyebaran aliran, batch, dan prioritas lainnya. Untuk informasi lebih lanjut, lihat Kelola Antrian.

Perhatian

  • Ruang kerja telah dibuat. Untuk informasi lebih lanjut, lihat Aktifkan Realtime Compute for Apache Flink.

  • Object Storage Service (OSS) telah diaktifkan. Untuk informasi lebih lanjut, lihat Memulai dengan Menggunakan Konsol OSS. Kelas penyimpanan Bucket OSS harus Standar. Untuk informasi lebih lanjut, lihat Kelas Penyimpanan.

  • Contoh dalam topik ini menggunakan Apache Paimon untuk menyimpan data dan hanya berlaku untuk Realtime Compute for Apache Flink dengan Ververica Runtime (VVR) 8.0.5 atau versi lebih baru.

Contoh

Contoh dalam topik ini memproses data bisnis platform e-commerce dan menyimpannya dalam format lakehouse menggunakan Apache Paimon. Struktur gudang data disimulasikan, mencakup lapisan operational data store (ODS), data warehouse detail (DWD), dan data warehouse service (DWS). Kemampuan pemrosesan batch dari Realtime Compute for Apache Flink memungkinkan Anda memproses, membersihkan, dan menulis data ke tabel Apache Paimon, sehingga membangun struktur penyimpanan data berlapis.

image

Persiapan

  1. Buat skrip.

    Di tab Scripts pada halaman SQL Editor, Anda dapat membuat katalog, mendefinisikan basis data dan tabel di dalam katalog, serta menyisipkan data simulasi ke dalam tabel tersebut.

  2. Buat katalog Apache Paimon.

    1. Di editor skrip pada tab Scripts, masukkan pernyataan SQL berikut:

      CREATE CATALOG `my_catalog` WITH (
        'type' = 'paimon',
        'metastore' = 'filesystem',
        'warehouse' = '<warehouse>',
        'fs.oss.endpoint' = '<fs.oss.endpoint>',
        'fs.oss.accessKeyId' = '<fs.oss.accessKeyId>',
        'fs.oss.accessKeySecret' = '<fs.oss.accessKeySecret>'
      );

      Tabel berikut menjelaskan parameter dalam kode contoh.

      Parameter

      Deskripsi

      Diperlukan

      Catatan

      type

      Jenis katalog.

      Ya

      Atur nilainya menjadi Paimon.

      metastore

      Jenis penyimpanan metadata.

      Ya

      Nilainya diatur menjadi filesystem dalam kode contoh. Untuk informasi lebih lanjut tentang jenis lainnya, lihat Kelola katalog Apache Paimon.

      warehouse

      Direktori gudang data yang ditentukan di OSS.

      Ya

      Formatnya adalah oss://<bucket>/<object>. Parameter dalam direktori:

      • bucket: menunjukkan nama Bucket OSS yang Anda buat.

      • object: menunjukkan jalur tempat data Anda disimpan.

      Anda dapat melihat nama bucket dan nama objek di konsol OSS.

      fs.oss.endpoint

      Titik akhir OSS.

      Tidak

      Parameter ini diperlukan jika Bucket OSS yang ditentukan oleh parameter warehouse tidak berada di wilayah yang sama dengan ruang kerja Realtime Compute for Apache Flink atau Bucket OSS dalam akun Alibaba Cloud lain digunakan.

      Untuk informasi lebih lanjut, lihat Wilayah dan titik akhir.

      fs.oss.accessKeyId

      ID AccessKey akun Alibaba Cloud atau pengguna RAM yang memiliki izin baca dan tulis pada OSS.

      Tidak

      Parameter ini diperlukan jika Bucket OSS yang ditentukan oleh parameter warehouse tidak berada di wilayah yang sama dengan ruang kerja Realtime Compute for Apache Flink atau Bucket OSS dalam akun Alibaba Cloud lain digunakan. Untuk informasi lebih lanjut tentang cara mendapatkan pasangan AccessKey, lihat Buat pasangan AccessKey.

      fs.oss.accessKeySecret

      Rahasia AccessKey akun Alibaba Cloud atau pengguna RAM yang memiliki izin baca dan tulis pada OSS.

      Tidak

    2. Pilih kode sebelumnya dan klik Jalankan di sisi kiri editor skrip.

      Jika pesan Pernyataan berikut telah berhasil dieksekusi! muncul, katalog telah dibuat. Anda dapat melihat katalog yang dibuat di halaman Catalogs atau di tab Katalog halaman Editor SQL.

      image.png

Prosedur

Langkah 1: Buat tabel di lapisan ODS dan sisipkan data uji ke dalam tabel

Catatan

Data uji langsung dimasukkan ke dalam tabel di lapisan ODS untuk menyederhanakan prosedur dalam contoh ini. Di lingkungan produksi nyata, Realtime Compute for Apache Flink menggunakan kemampuan pemrosesan aliran untuk membaca data dari sumber eksternal dan menulisnya ke danau data sebagai data di lapisan ODS. Untuk informasi lebih lanjut, lihat Memulai dengan Fitur Dasar Apache Paimon.

  1. Di editor skrip tab Scripts, masukkan dan pilih pernyataan SQL berikut, lalu klik Run di sisi kiri editor skrip.

    CREATE DATABASE `my_catalog`.`order_dw`;
    
    USE `my_catalog`.`order_dw`;
    
    CREATE TABLE orders (
      order_id BIGINT,
      user_id STRING,
      shop_id BIGINT,
      product_id BIGINT,
      buy_fee BIGINT,   
      create_time TIMESTAMP,
      update_time TIMESTAMP,
      state INT
    );
    
    CREATE TABLE orders_pay (
      pay_id BIGINT,
      order_id BIGINT,
      pay_platform INT, 
      create_time TIMESTAMP
    );
    
    CREATE TABLE product_catalog (
      product_id BIGINT,
      catalog_name STRING
    );
    
    -- Masukkan data uji ke dalam tabel.
    
    INSERT INTO orders VALUES
    (100001, 'user_001', 12345, 1, 5000, TO_TIMESTAMP('2023-02-15 16:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1),
    (100002, 'user_002', 12346, 2, 4000, TO_TIMESTAMP('2023-02-15 15:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1),
    (100003, 'user_003', 12347, 3, 3000, TO_TIMESTAMP('2023-02-15 14:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1),
    (100004, 'user_001', 12347, 4, 2000, TO_TIMESTAMP('2023-02-15 13:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1),
    (100005, 'user_002', 12348, 5, 1000, TO_TIMESTAMP('2023-02-15 12:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1),
    (100006, 'user_001', 12348, 1, 1000, TO_TIMESTAMP('2023-02-15 11:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1),
    (100007, 'user_003', 12347, 4, 2000, TO_TIMESTAMP('2023-02-15 10:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1);
    
    INSERT INTO orders_pay VALUES
    (2001, 100001, 1, TO_TIMESTAMP('2023-02-15 17:40:56')),
    (2002, 100002, 1, TO_TIMESTAMP('2023-02-15 17:40:56')),
    (2003, 100003, 0, TO_TIMESTAMP('2023-02-15 17:40:56')),
    (2004, 100004, 0, TO_TIMESTAMP('2023-02-15 17:40:56')),
    (2005, 100005, 0, TO_TIMESTAMP('2023-02-15 18:40:56')),
    (2006, 100006, 0, TO_TIMESTAMP('2023-02-15 18:40:56')),
    (2007, 100007, 0, TO_TIMESTAMP('2023-02-15 18:40:56'));
    
    INSERT INTO product_catalog VALUES
      (1, 'phone_aaa'),
      (2, 'phone_bbb'),
      (3, 'phone_ccc'),
      (4, 'phone_ddd'),
      (5, 'phone_eee');
    Catatan

    Dalam contoh ini, tabel Apache Paimon append-only tanpa kunci utama dibuat. Tabel-tabel ini memiliki performa penulisan batch yang lebih baik daripada tabel Apache Paimon dengan kunci utama, tetapi tidak mendukung pembaruan data berdasarkan kunci utama.

    Hasil eksekusi berisi beberapa sub-tab. Jika pesan Pernyataan berikut telah berhasil dieksekusi! muncul, eksekusi pernyataan DDL berhasil.

    Jika ID pekerjaan dikembalikan, pernyataan DML, seperti pernyataan INSERT, dieksekusi. Dalam hal ini, penyebaran Realtime Compute for Apache Flink dibuat dan dijalankan dalam sesi Realtime Compute for Apache Flink. Anda dapat mengklik Flink UI di sisi kiri tab Hasil untuk melihat status eksekusi pernyataan. Tunggu beberapa detik agar eksekusi pernyataan selesai.

  2. Lakukan profil data di tabel lapisan ODS.

    Di editor skrip tab Scripts, masukkan dan pilih pernyataan SQL berikut, lalu klik Run di sisi kiri editor skrip.

    SELECT count(*) as order_count FROM `my_catalog`.`order_dw`.`orders`;
    SELECT count(*) as pay_count FROM `my_catalog`.`order_dw`.`orders_pay`;
    SELECT * FROM `my_catalog`.`order_dw`.`product_catalog`;

    Pernyataan SQL ini juga dieksekusi dalam sesi Realtime Compute for Apache Flink. Anda dapat melihat hasil eksekusi di tab Hasil dari masing-masing tiga kueri.

    image.png image.png image.png

Langkah 2: Buat tabel di lapisan DWD dan DWS

Di editor skrip tab Scripts, masukkan dan pilih pernyataan SQL berikut, lalu klik Run di sisi kiri editor skrip.

USE `my_catalog`.`order_dw`;

CREATE TABLE dwd_orders (
    order_id BIGINT,
    order_user_id STRING,
    order_shop_id BIGINT,
    order_product_id BIGINT,
    order_product_catalog_name STRING,
    order_fee BIGINT,
    order_create_time TIMESTAMP,
    order_update_time TIMESTAMP,
    order_state INT,
    pay_id BIGINT,
    pay_platform INT COMMENT 'platform 0: phone, 1: pc',
    pay_create_time TIMESTAMP
) WITH (
    'sink.parallelism' = '2'
);

CREATE TABLE dws_users (
    user_id STRING,
    ds STRING,
    total_fee BIGINT COMMENT 'Total jumlah pembayaran yang lengkap pada hari saat ini'
) WITH (
    'sink.parallelism' = '2'
);

CREATE TABLE dws_shops (
    shop_id BIGINT,
    ds STRING,
    total_fee BIGINT COMMENT 'Total jumlah pembayaran yang lengkap pada hari saat ini'
) WITH (
    'sink.parallelism' = '2'
);
Catatan

Tabel Apache Paimon append-only dibuat dalam langkah ini. Jika Anda menggunakan tabel Apache Paimon sebagai sink Flink, inferensi paralelisme otomatis tidak didukung. Anda harus secara eksplisit mengonfigurasi paralelisme untuk tabel Apache Paimon. Jika tidak, kesalahan mungkin terjadi.

Langkah 3: Buat draf di lapisan DWD dan DWS dan sebarkan draf sebagai penyebaran

  1. Buat draf di lapisan DWD dan sebarkan draf sebagai penyebaran.

    1. Buat draf pembaruan untuk tabel di lapisan DWD.

      Di halaman Development > ETL, buat draf batch kosong bernama dwd_orders dan salin pernyataan SQL berikut ke editor skrip. Kami merekomendasikan Anda memilih versi VVR dengan label RECOMMENDED. Secara default, pernyataan Flink SQL digunakan dan dialek SQL tidak tersedia untuk mesin versi sebelumnya. Pernyataan INSERT OVERWRITE digunakan untuk menimpa tabel di lapisan DWD karena tabel di lapisan DWD adalah tabel Apache Paimon append-only.

      INSERT OVERWRITE my_catalog.order_dw.dwd_orders
      SELECT 
          o.order_id,
          o.user_id,
          o.shop_id,
          o.product_id,
          c.catalog_name,
          o.buy_fee,
          o.create_time,
          o.update_time,
          o.state,
          p.pay_id,
          p.pay_platform,
          p.create_time
      FROM 
          my_catalog.order_dw.orders as o, 
          my_catalog.order_dw.product_catalog as c, 
          my_catalog.order_dw.orders_pay as p
      WHERE o.product_id = c.product_id AND o.order_id = p.order_id
    2. Di sudut kanan atas halaman Editor SQL, klik Deploy. Lalu, klik OK untuk menyebarkan draf dwd_orders sebagai penyebaran.

  2. Buat draf di lapisan DWS dan sebarkan draf sebagai penyebaran.

    1. Buat draf pembaruan untuk tabel di lapisan DWS.

      Buat dua draf batch kosong bernama dws_shops dan dws_users dengan merujuk ke Buat draf pembaruan untuk tabel di lapisan DWD. Lalu, salin salah satu pernyataan SQL berikut ke editor skrip draf terkait:

      INSERT OVERWRITE my_catalog.order_dw.dws_shops
      SELECT 
          order_shop_id,
          DATE_FORMAT(pay_create_time, 'yyyyMMdd') as ds,
          SUM(order_fee) as total_fee
      FROM my_catalog.order_dw.dwd_orders
      WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL
      GROUP BY order_shop_id, DATE_FORMAT(pay_create_time, 'yyyyMMdd');
      INSERT OVERWRITE my_catalog.order_dw.dws_users
      SELECT 
          order_user_id,
          DATE_FORMAT(pay_create_time, 'yyyyMMdd') as ds,
          SUM(order_fee) as total_fee
      FROM my_catalog.order_dw.dwd_orders
      WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL
      GROUP BY order_user_id, DATE_FORMAT(pay_create_time, 'yyyyMMdd');
    2. Di sudut kanan atas halaman Editor SQL, klik Deploy. Lalu, klik OK untuk menyebarkan draf dws_shops dan dws_users sebagai penyebaran.

Langkah 4: Mulai dan lihat penyebaran di lapisan DWD dan DWS

  • Mulai dan lihat penyebaran di lapisan DWD.

    1. Di halaman O&M > Deployments, pilih BATCH dari daftar drop-down jenis penyebaran. Temukan penyebaran dwd_orders dan klik Start di kolom Actions.

      Pekerjaan batch dalam status STARTING dibuat, seperti yang ditunjukkan pada gambar berikut.

      image.png

      Jika status pekerjaan berubah menjadi FINISHED, pemrosesan data selesai.

    2. Lihat hasil profil data.

      Di editor skrip tab Scripts, masukkan dan pilih pernyataan SQL berikut, lalu klik Run di sisi kiri editor skrip untuk menanyakan data di tabel lapisan DWD:

      SELECT * FROM `my_catalog`.`order_dw`.`dwd_orders`;

      Gambar berikut menunjukkan hasil kueri.

      image

  • Mulai dan lihat penyebaran di lapisan DWS.

    1. Di halaman O&M > Deployments, pilih BATCH dari daftar drop-down jenis penyebaran. Temukan penyebaran dws_shops dan dws_users dan klik Start di kolom Actions setiap penyebaran.

    2. Di editor skrip tab Scripts, masukkan dan pilih pernyataan SQL berikut, lalu klik Run di sisi kiri editor skrip untuk menanyakan data di tabel lapisan DWS:

      SELECT * FROM `my_catalog`.`order_dw`.`dws_shops`;
      SELECT * FROM `my_catalog`.`order_dw`.`dws_users`;

      Gambar berikut menunjukkan hasil kueri.

      image.png image.png

Langkah 5: Orkestrasi penyebaran sebagai alur kerja pemrosesan batch

Bab ini menjelaskan cara mengorkestrasi penyebaran yang dibuat di bagian sebelumnya sebagai alur kerja, sehingga Anda dapat menjalankan penyebaran secara seragam berdasarkan urutan tertentu.

  1. Buat alur kerja.

    1. Di panel navigasi sisi kiri konsol Realtime Compute for Apache Flink, klik O&M > Workflows. Di halaman yang muncul, klik Create Workflow.

    2. Di panel Buat Alur Kerja, masukkan wf_orders di bidang Nama. Pertahankan nilai parameter Jenis Penjadwalan pada pengaturan default (Manual Scheduling). Pilih default-queue untuk parameter Resource Queue, lalu klik Create. Halaman pengeditan alur kerja akan muncul.

    3. Edit alur kerja.

      1. Klik tugas awal, beri nama tugas v_dwd_orders, dan pilih penyebaran dwd_orders untuk tugas tersebut.

      2. Klik Tambah Tugas untuk membuat tugas bernama v_dws_shops. Pilih penyebaran dws_shops untuk tugas tersebut dan konfigurasikan tugas v_dwd_orders sebagai tugas hulu.

      3. Klik Tambah Tugas lagi untuk membuat tugas bernama v_dws_users. Pilih penyebaran dws_users untuk tugas tersebut dan konfigurasikan tugas v_dwd_orders sebagai tugas hulu.

      4. Di halaman pengeditan alur kerja, klik Save di sudut kanan atas, lalu klik OK.

        Gambar berikut menunjukkan alur kerja yang telah Anda buat:

        image.png

  2. Jalankan alur kerja secara manual.

    Catatan

    Anda dapat mengubah jenis penjadwalan alur kerja menjadi Penjadwalan Periodik dengan mengklik Edit Alur Kerja di kolom Tindakan alur kerja pada halaman Workflows. Untuk informasi lebih lanjut, lihat Alur Kerja (pratinjau publik).

    1. Sebelum menjalankan alur kerja, sisipkan beberapa data ke dalam tabel di lapisan ODS untuk memverifikasi hasil eksekusi alur kerja.

      Di editor skrip tab Scripts, masukkan pernyataan SQL berikut, pilih pernyataan tersebut, lalu klik Run di sisi kiri editor skrip.

      USE `my_catalog`.`order_dw`;
      
      INSERT INTO orders VALUES
      (100008, 'user_001', 12346, 1, 10000, TO_TIMESTAMP('2023-02-15 17:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1),
      (100009, 'user_002', 12347, 2, 20000, TO_TIMESTAMP('2023-02-15 18:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1),
      (100010, 'user_003', 12348, 3, 30000, TO_TIMESTAMP('2023-02-15 19:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1);
      
      INSERT INTO orders_pay VALUES
      (2008, 100008, 1, TO_TIMESTAMP('2023-02-15 20:40:56')),
      (2009, 100009, 1, TO_TIMESTAMP('2023-02-15 20:40:56')),
      (2010, 100010, 1, TO_TIMESTAMP('2023-02-15 20:40:56'));

      Di sisi kiri tab Hasil, klik Flink UI untuk melihat status penyebaran.

    2. Di halaman O&M > Workflows, temukan alur kerja yang telah Anda buat dan klik Execute di kolom Actions. Di kotak dialog yang muncul, klik OK untuk menjalankan alur kerja.

      image.png

      Klik nama alur kerja untuk masuk ke halaman detail alur kerja. Di tab Overview, lihat daftar instance tugas.

      image.png

      Klik ID instance yang sedang berjalan di alur kerja untuk masuk ke halaman detail eksekusi instance. Di halaman yang muncul, lihat status eksekusi setiap tugas. Tunggu hingga eksekusi alur kerja selesai.

      image.png

  3. Lihat hasil eksekusi alur kerja.

    1. Di editor skrip tab Scripts, masukkan pernyataan SQL berikut, pilih pernyataan tersebut, lalu klik Run di sisi kiri editor skrip.

      SELECT * FROM `my_catalog`.`order_dw`.`dws_shops`;
      SELECT * FROM `my_catalog`.`order_dw`.`dws_users`;
    2. Lihat hasil eksekusi alur kerja.

      Data baru di lapisan ODS diproses dan ditulis ke tabel di lapisan DWS.

      image.png image.png

Referensi