全部产品
Search
文档中心

Realtime Compute for Apache Flink:Kueri real-time dan dasbor untuk pesanan ibu dan bayi Taobao

更新时间:Nov 11, 2025

Topik ini menjelaskan cara menggunakan Realtime Compute for Apache Flink untuk memproses informasi pesanan dan bayi dari MySQL secara real time, membangun tabel lebar, lalu menulis hasilnya ke Elasticsearch. Anda kemudian dapat menggunakan Kibana untuk melakukan pengelompokan dan agregasi serta menampilkan data pada sebuah dasbor guna mengungkap hubungan potensial antara volume pesanan dan kelahiran bayi.

Informasi latar belakang

Dengan diterapkannya kebijakan 'dua anak universal' dan pertumbuhan pendapatan yang dapat dibelanjakan secara stabil, pasar konsumsi ibu dan bayi di Tiongkok sedang memasuki masa keemasan. Pada saat yang sama, peningkatan konsumsi nasional dan munculnya orang tua yang lahir pada 1990-an mendorong perubahan mendalam dalam permintaan dan filosofi konsumsi. Menurut laporan terbaru oleh Roland Berger, industri ibu dan bayi diproyeksikan mencapai ukuran pasar sebesar CNY 3,6 triliun pada tahun 2020, dengan tingkat pertumbuhan tahunan majemuk (CAGR) sebesar 17% dari tahun 2016 hingga 2020. Hal ini menunjukkan prospek pertumbuhan yang sangat luas. Dalam konteks ini, seperti apa perilaku konsumsi populasi ibu dan bayi? Barang-barang apa yang mendominasi pengeluaran mereka?

Dalam skenario ini, informasi pesanan dan bayi disimpan dalam database MySQL. Untuk menyederhanakan analisis, tabel pesanan digabungkan dengan informasi bayi untuk membangun tabel lebar yang terperinci. Kemudian, Realtime Compute for Apache Flink menulis data ke Elasticsearch secara real time. Setelah itu, Anda dapat menggunakan Kibana untuk pengelompokan, agregasi, dan visualisasi dasbor dinamis guna mengungkap hubungan potensial antara volume pesanan dan kelahiran bayi.

Prasyarat

Langkah 1: Buat tabel RDS MySQL dan impor data

Dalam contoh ini, Anda akan membuat tiga tabel data. Tabel orders_dataset_tmp adalah tabel sementara untuk impor data. Dua tabel lainnya adalah tabel sumber untuk kueri real-time pesanan ibu dan bayi Taobao.

  1. Buka halaman Instances. Di bilah navigasi atas, pilih wilayah tempat instans RDS berada. Lalu, temukan instans RDS tersebut dan klik ID instansnya.

  2. Klik Log On To Database di bagian atas halaman. Di halaman logon DMS, masukkan akun dan kata sandi database, lalu klik Log On.

  3. Di panel navigasi sebelah kiri, klik Database Instance. Di daftar Logged In Instances, klik ganda nama database target.

  4. Di SQL Console di sebelah kanan, masukkan pernyataan berikut dan klik Execute untuk membuat tabel.

    create table orders_dataset_tmp(
        user_id bigint comment 'Informasi ID pengguna',            
        auction_id bigint comment 'ID perilaku pembelian',        
        cat_id bigint comment 'Nomor seri kategori produk',            
        cat1 bigint comment 'Nomor seri produk (kategori akar)',                
        property TEXT comment 'Properti produk',            
        buy_mount int comment 'Jumlah yang dibeli',            
        day TEXT comment 'Tanggal pembelian'                
    );
    
    create table orders_dataset(
        order_id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY comment 'ID pesanan',
        user_id bigint comment 'Informasi ID pengguna',            
        auction_id bigint comment 'ID perilaku pembelian',        
        cat_id bigint comment 'Nomor seri kategori produk',            
        cat1 bigint comment 'Nomor seri produk (kategori akar)',                
        property TEXT comment 'Properti produk',            
        buy_mount int comment 'Jumlah yang dibeli',            
        day TEXT comment 'Tanggal pembelian'                
    );
    
    
    create table baby_dataset(
        user_id bigint NOT NULL PRIMARY KEY,    
        birthday text comment 'Tanggal lahir bayi',
        gender int comment '0 menunjukkan perempuan, 1 menunjukkan laki-laki, 2 menunjukkan tidak diketahui'
    );
  5. Impor data.

    Impor file E-commerce Infant Users ke tabel orders_dataset_tmp dan file Infant Information ke tabel baby_dataset.

    1. Di bilah menu atas, klik Data Import.

    2. Konfigurasikan pengaturan impor.

      Item Konfigurasi

      Deskripsi

      Database

      Lakukan pencarian fuzzy untuk nama database dan klik instans MySQL target.

      Pengkodean File

      Otomatis terdeteksi.

      Mode Impor

      Express Mode.

      Jenis File

      Format CSV.

      Tabel Target

      orders_dataset_tmp atau baby_dataset.

      Lokasi Data

      Baris pertama berisi properti.

      Mode Penulisan

      INSERT.

      Lampiran

      Klik Upload File dan impor file yang sesuai untuk tabel tersebut.

    3. Klik Submit Request. Di Langkah 4, klik Execute Change. Di jendela pengaturan tugas, pilih Execute Now dan klik Confirm And Execute.

  6. Setelah impor selesai, eksekusi pernyataan SQL berikut untuk mengimpor data pesanan ke tabel sumber orders_dataset.

    insert into orders_dataset(user_id,auction_id,cat_id,cat1,property,buy_mount,day)
    select * from orders_dataset_tmp;

Langkah 2: Konfigurasi pembuatan indeks otomatis di Elasticsearch

  1. Masuk ke Konsol Alibaba Cloud Elasticsearch. Di bilah menu atas, pilih kelompok sumber daya dan wilayah.

  2. Di bagian Elasticsearch Instances, klik ID instans target.

  3. Di halaman Basic Information, klik Configuration and Management > ES Cluster Configuration.

    image

  4. Di pojok kanan atas, klik Modify Configuration. Pilih Allow Auto Index Creation dan klik OK.

    image

    Penting

    Operasi ini akan me-restart instans. Pastikan Anda ingin melanjutkan.

Langkah 3: Buat pekerjaan streaming Flink SQL

  1. Masuk ke Konsol Realtime Compute for Apache Flink. Untuk ruang kerja target, klik Console di kolom Actions.

  2. Di panel navigasi sebelah kiri, klik Data Development > ETL.

  3. Klik image dan pilih New Streaming Job. Masukkan File Name, pilih Engine Version, lalu klik Create.

    image

    Parameter Pekerjaan

    Deskripsi

    Contoh

    File Name

    Nama pekerjaan.

    Catatan

    Nama pekerjaan harus unik dalam proyek saat ini.

    flink-test

    Engine Version

    Versi mesin Flink yang digunakan oleh pekerjaan saat ini. Untuk informasi selengkapnya tentang nomor versi mesin, pemetaan versi, dan tanggal siklus hidup penting, lihat Versi mesin.

    vvr-8.0.11-flink-1.17

  4. Edit kode pekerjaan streaming Flink SQL.

    Salin kode SQL berikut ke editor SQL dan ganti nilai parameter dengan nilai aktual Anda.

    Kode ini mendefinisikan dua tabel MySQL (orders_dataset dan baby_dataset) sebagai sumber data. Keduanya menyimpan informasi pesanan dan pengguna, masing-masing. Data ditulis ke satu indeks (enriched_orders_view) melalui dua tabel sink Elasticsearch (es_sink1 dan es_sink2). Dengan mengatur sink.delete-strategy menjadi NON_PK_FIELD_TO_NULL, kode ini menggunakan kemampuan pembaruan parsial Elasticsearch. Ketika kunci primer identik, hanya bidang non-kunci primer yang diperbarui, sehingga memastikan konsistensi data.

    CREATE TEMPORARY TABLE orders_dataset (
      `order_id` BIGINT,
      `user_id` bigint,            
      `auction_id` bigint,        
      `cat_id` bigint,            
      `cat1` bigint,                
      `property` varchar,            
      `buy_mount` int,            
      `day` varchar    ,
      PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql',
      'hostname' = 'rm-2zew*******.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'flinkrds***',
      'password' = 'Flink***@1',
      'database-name' = 'ecommerce',
      'table-name' = 'orders_dataset'
    );
    
    CREATE TEMPORARY TABLE baby_dataset (
      `user_id` bigint,
      `birthday` varchar,
      `gender` int,
      PRIMARY KEY(user_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql',
      'hostname' = 'rm-2zew*******.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'flinkrds***',
      'password' = 'Flink***@1',
      'database-name' = 'ecommerce',
      'table-name' = 'baby_dataset'
    );
    
    
    CREATE TEMPORARY TABLE es_sink1(
      `order_id` BIGINT,
      `user_id` BIGINT,
      `buy_mount` INT,
      `day` VARCHAR,
      PRIMARY KEY(`user_id`) NOT ENFORCED
    ) WITH (
      'connector' = 'elasticsearch-8',
      'hosts' = 'http://192.xx.xx.252:9200',
      'index' = 'enriched_orders_view',
      'username' ='elastic',
      'password' ='Flink***@1',
      'sink.delete-strategy' = 'NON_PK_FIELD_TO_NULL'
    );
    
    
    CREATE TEMPORARY TABLE es_sink2(
      `user_id` BIGINT,
      `birthday` VARCHAR,
      `gender` INT,
      PRIMARY KEY(`user_id`) NOT ENFORCED
    ) WITH (
      'connector' = 'elasticsearch-8',
      'hosts' = 'http://192.xx.xx.252:9200',
      'index' = 'enriched_orders_view',
      'username' ='elastic',
      'password' ='Flink***@1',
      'sink.delete-strategy' = 'NON_PK_FIELD_TO_NULL'
    );
    
    BEGIN STATEMENT SET;   
    INSERT INTO es_sink1
    SELECT 
        `order_id`,
        `user_id`,
        `buy_mount`,
        `day`
    FROM orders_dataset;
    
    
    INSERT INTO es_sink2
    SELECT 
        `user_id`,
        `birthday`,
        `gender`
    FROM baby_dataset;
    END;     

    Kelas Penyimpanan

    Parameter

    Wajib

    Deskripsi

    MySQL

    connector

    Ya

    Jenis tabel. Nilainya tetap mysql.

    hostname

    Ya

    Alamat IP atau hostname database MySQL. Gunakan alamat VPC.

    port

    Tidak

    Nomor port layanan database MySQL.

    username

    Ya

    Nama pengguna untuk layanan database MySQL.

    password

    Ya

    Kata sandi untuk layanan database MySQL.

    database-name

    Ya

    Nama database MySQL.

    table-name

    Ya

    Nama tabel MySQL.

    Elasticsearch

    connector

    Ya

    Jenis tabel sink.

    hosts

    Ya

    Titik akhir Elasticsearch.

    Formatnya adalah http://host_name:port.

    index

    Ya

    Nama indeks.

    Dalam contoh ini, nilainya adalah enriched_orders_view.

  5. Klik Deploy.

  6. Di halaman Job O&M, pilih Stateless Start dan klik Start.

Langkah 4: Lihat hasil data di konsol Elasticsearch

Setelah indeks enriched_orders_view dibuat di Elasticsearch, ikuti langkah-langkah berikut untuk melihat data yang ditulis.

1. Persiapan

  1. Hubungkan ke kluster menggunakan Kibana.

  2. Restart instans Elasticsearch.

  3. Di halaman yang muncul, pilih Configuration and Management > Visualization Control. Di bagian Kibana, klik Internet Endpoint dan masukkan nama pengguna serta kata sandi Anda.

    Nama pengguna default untuk konsol Kibana adalah elastic. Kata sandinya adalah yang Anda atur saat membuat instans Alibaba Cloud Elasticsearch.

    image

  4. Proses tipe data bidang data.

    Untuk menggunakan histogram nanti, Anda harus mengubah tipe data bidang day dari teks menjadi tanggal. Anda dapat menjalankan perintah berikut di Management > Dev Tools.

    1. Buat indeks baru, seperti enriched_orders_view_new, dan definisikan pemetaannya.

      Atur tipe bidang day menjadi date dan pertahankan struktur pemetaan untuk bidang lainnya.

      PUT enriched_orders_view_new
      {
        "mappings": {
          "properties": {
            "birthday": {
              "type": "text",
              "fields": {
                "keyword": {
                  "type": "keyword",
                  "ignore_above": 256
                }
              },
              "fielddata": true
            },
            "buy_mount": {
              "type": "long"
            },
            "day": {
              "type": "date",
              "format": "yyyy-MM-dd" // Tentukan format tanggal agar konsisten dengan data mentah.
            },
            "gender": {
              "type": "long"
            },
            "order_id": {
              "type": "long"
            },
            "user_id": {
              "type": "long"
            }
          }
        }
      }
      
    2. Gunakan API _reindex untuk menyalin data dari indeks asli ke indeks baru. Selama proses ini, ubah nilai bidang day ke format tanggal.

      POST _reindex
      {
        "source": {
          "index": "enriched_orders_view"
        },
        "dest": {
          "index": "enriched_orders_view_new"
        },
        "script": {
          "source": """
            if (ctx._source['day'] != null) {
              // Ubah tanggal dari format 'yyyyMMdd' menjadi 'yyyy-MM-dd'.
              def originalDate = ctx._source['day'];
              if (originalDate.length() == 8) {
                ctx._source['day'] = originalDate.substring(0, 4) + '-' + originalDate.substring(4, 6) + '-' + originalDate.substring(6, 8);
              } else {
                ctx.op = 'noop'; // Jika format salah, lewati dokumen.
              }
            }
          """
        }
      }
      
      
    3. Verifikasi bahwa bidang day di indeks baru telah diubah ke format data yang benar, seperti yyyy-MM-dd.

      GET enriched_orders_view_new/_search
      {
        "size": 10
      }
  5. Buat tampilan data.

    1. Di panel navigasi sebelah kiri, klik Discover.

      image

    2. Klik Create Data View. Masukkan nama. Atur Index pattern menjadi enriched_orders_view_new dan Timestamp field menjadi day. Klik Save Data View To Kibana.

      image

2. Lihat status penulisan data

  1. Di pojok kiri atas halaman, klik Analytics > Discover.

  2. Beralih ke tampilan data yang baru saja Anda buat.

  3. Klik Search Entire Time Range.

    image

  4. Lihat status penulisan data.

    image

3. Konfigurasi grafik visualisasi

  1. Klik bidang day lalu Visualize.

    image

  2. Di sisi kanan halaman, konfigurasikan sumbu horizontal dan vertikal untuk grafik Vertical bar.

    Setelah mengonfigurasi satu sumbu, klik Close dan konfigurasikan sumbu lainnya.

    Item Konfigurasi

    Deskripsi Konfigurasi

    Gambar

    Sumbu horizontal

    • Atur Function menjadi Date Histogram

    • Atur Kolom ke hari

    • Atur Name menjadi year_month

    image

    Sumbu vertikal

    • Atur Function menjadi Sum

    • Atur Field menjadi buy_mount

    • Atur Name menjadi buy_num

    • Atur Side menjadi Left

    image

  3. Di sisi kanan halaman, konfigurasikan sumbu horizontal dan vertikal untuk grafik Line.

    Di pojok kanan bawah, klik Add Layer. Pilih Line sebagai jenis visualisasi. Lalu, konfigurasikan sumbu horizontal dan vertikal. Konfigurasikan satu sumbu, klik Close, lalu konfigurasikan sumbu lainnya.

    Item Konfigurasi

    Deskripsi Konfigurasi

    Gambar

    Sumbu horizontal

    • Atur Function menjadi Date Histogram

    • Atur Field menjadi day

    • Atur Name menjadi year_month

    image

    Sumbu vertikal

    • Atur Function menjadi Count

    • Atur Field menjadi birthday

    • Atur Name menjadi baby_num

    • Atur Side menjadi Right

    image

4. Simpan dan lihat hasil visualisasi

Untuk menyimpan grafik gabungan garis dan kolom, klik Save di pojok kanan atas halaman.

image

Referensi

  • Untuk informasi selengkapnya tentang sintaksis, parameter WITH, dan contoh penggunaan konektor Elasticsearch, lihat Elasticsearch.

  • Untuk informasi selengkapnya tentang sintaksis, parameter WITH, dan contoh penggunaan konektor ApsaraDB RDS for MySQL, lihat ApsaraDB RDS for MySQL.