Topik ini menjelaskan cara menggunakan Realtime Compute for Apache Flink untuk memproses informasi pesanan dan bayi dari MySQL secara real time, membangun tabel lebar, lalu menulis hasilnya ke Elasticsearch. Anda kemudian dapat menggunakan Kibana untuk melakukan pengelompokan dan agregasi serta menampilkan data pada sebuah dasbor guna mengungkap hubungan potensial antara volume pesanan dan kelahiran bayi.
Informasi latar belakang
Dengan diterapkannya kebijakan 'dua anak universal' dan pertumbuhan pendapatan yang dapat dibelanjakan secara stabil, pasar konsumsi ibu dan bayi di Tiongkok sedang memasuki masa keemasan. Pada saat yang sama, peningkatan konsumsi nasional dan munculnya orang tua yang lahir pada 1990-an mendorong perubahan mendalam dalam permintaan dan filosofi konsumsi. Menurut laporan terbaru oleh Roland Berger, industri ibu dan bayi diproyeksikan mencapai ukuran pasar sebesar CNY 3,6 triliun pada tahun 2020, dengan tingkat pertumbuhan tahunan majemuk (CAGR) sebesar 17% dari tahun 2016 hingga 2020. Hal ini menunjukkan prospek pertumbuhan yang sangat luas. Dalam konteks ini, seperti apa perilaku konsumsi populasi ibu dan bayi? Barang-barang apa yang mendominasi pengeluaran mereka?
Dalam skenario ini, informasi pesanan dan bayi disimpan dalam database MySQL. Untuk menyederhanakan analisis, tabel pesanan digabungkan dengan informasi bayi untuk membangun tabel lebar yang terperinci. Kemudian, Realtime Compute for Apache Flink menulis data ke Elasticsearch secara real time. Setelah itu, Anda dapat menggunakan Kibana untuk pengelompokan, agregasi, dan visualisasi dasbor dinamis guna mengungkap hubungan potensial antara volume pesanan dan kelahiran bayi.
Prasyarat
Anda telah membuat ruang kerja Flink. Untuk informasi selengkapnya, lihat Aktifkan Realtime Compute for Apache Flink.
Anda telah membuat instans ApsaraDB RDS for MySQL, database, dan akun. Untuk informasi selengkapnya, lihat Buat instans ApsaraDB RDS for MySQL dengan cepat dan Buat database dan akun.
Anda telah membuat instans Elasticsearch. Topik ini menggunakan versi 8.17.0 sebagai contoh. Untuk informasi selengkapnya, lihat Buat instans Alibaba Cloud Elasticsearch.
PentingRuang kerja Flink, instans ApsaraDB RDS for MySQL, dan instans Elasticsearch harus berada dalam virtual private cloud (VPC) yang sama. Anda juga harus mengonfigurasi daftar putih alamat IP untuk instans ApsaraDB RDS for MySQL dan Elasticsearch. Untuk informasi selengkapnya, lihat Konfigurasi daftar putih untuk instans ApsaraDB RDS for MySQL dan Konfigurasi daftar putih untuk instans Elasticsearch.
Langkah 1: Buat tabel RDS MySQL dan impor data
Dalam contoh ini, Anda akan membuat tiga tabel data. Tabel orders_dataset_tmp adalah tabel sementara untuk impor data. Dua tabel lainnya adalah tabel sumber untuk kueri real-time pesanan ibu dan bayi Taobao.
Buka halaman Instances. Di bilah navigasi atas, pilih wilayah tempat instans RDS berada. Lalu, temukan instans RDS tersebut dan klik ID instansnya.
Klik Log On To Database di bagian atas halaman. Di halaman logon DMS, masukkan akun dan kata sandi database, lalu klik Log On.
Di panel navigasi sebelah kiri, klik Database Instance. Di daftar Logged In Instances, klik ganda nama database target.
Di SQL Console di sebelah kanan, masukkan pernyataan berikut dan klik Execute untuk membuat tabel.
create table orders_dataset_tmp( user_id bigint comment 'Informasi ID pengguna', auction_id bigint comment 'ID perilaku pembelian', cat_id bigint comment 'Nomor seri kategori produk', cat1 bigint comment 'Nomor seri produk (kategori akar)', property TEXT comment 'Properti produk', buy_mount int comment 'Jumlah yang dibeli', day TEXT comment 'Tanggal pembelian' ); create table orders_dataset( order_id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY comment 'ID pesanan', user_id bigint comment 'Informasi ID pengguna', auction_id bigint comment 'ID perilaku pembelian', cat_id bigint comment 'Nomor seri kategori produk', cat1 bigint comment 'Nomor seri produk (kategori akar)', property TEXT comment 'Properti produk', buy_mount int comment 'Jumlah yang dibeli', day TEXT comment 'Tanggal pembelian' ); create table baby_dataset( user_id bigint NOT NULL PRIMARY KEY, birthday text comment 'Tanggal lahir bayi', gender int comment '0 menunjukkan perempuan, 1 menunjukkan laki-laki, 2 menunjukkan tidak diketahui' );Impor data.
Impor file E-commerce Infant Users ke tabel orders_dataset_tmp dan file Infant Information ke tabel baby_dataset.
Di bilah menu atas, klik Data Import.
Konfigurasikan pengaturan impor.
Item Konfigurasi
Deskripsi
Database
Lakukan pencarian fuzzy untuk nama database dan klik instans MySQL target.
Pengkodean File
Otomatis terdeteksi.
Mode Impor
Express Mode.
Jenis File
Format CSV.
Tabel Target
orders_dataset_tmp atau baby_dataset.
Lokasi Data
Baris pertama berisi properti.
Mode Penulisan
INSERT.
Lampiran
Klik Upload File dan impor file yang sesuai untuk tabel tersebut.
Klik Submit Request. Di Langkah 4, klik Execute Change. Di jendela pengaturan tugas, pilih Execute Now dan klik Confirm And Execute.
Setelah impor selesai, eksekusi pernyataan SQL berikut untuk mengimpor data pesanan ke tabel sumber orders_dataset.
insert into orders_dataset(user_id,auction_id,cat_id,cat1,property,buy_mount,day) select * from orders_dataset_tmp;
Langkah 2: Konfigurasi pembuatan indeks otomatis di Elasticsearch
Masuk ke Konsol Alibaba Cloud Elasticsearch. Di bilah menu atas, pilih kelompok sumber daya dan wilayah.
Di bagian Elasticsearch Instances, klik ID instans target.
Di halaman Basic Information, klik .

Di pojok kanan atas, klik Modify Configuration. Pilih Allow Auto Index Creation dan klik OK.
PentingOperasi ini akan me-restart instans. Pastikan Anda ingin melanjutkan.
Langkah 3: Buat pekerjaan streaming Flink SQL
Masuk ke Konsol Realtime Compute for Apache Flink. Untuk ruang kerja target, klik Console di kolom Actions.
Di panel navigasi sebelah kiri, klik .
Klik
dan pilih New Streaming Job. Masukkan File Name, pilih Engine Version, lalu klik Create.
Parameter Pekerjaan
Deskripsi
Contoh
File Name
Nama pekerjaan.
CatatanNama pekerjaan harus unik dalam proyek saat ini.
flink-test
Engine Version
Versi mesin Flink yang digunakan oleh pekerjaan saat ini. Untuk informasi selengkapnya tentang nomor versi mesin, pemetaan versi, dan tanggal siklus hidup penting, lihat Versi mesin.
vvr-8.0.11-flink-1.17
Edit kode pekerjaan streaming Flink SQL.
Salin kode SQL berikut ke editor SQL dan ganti nilai parameter dengan nilai aktual Anda.
Kode ini mendefinisikan dua tabel MySQL (
orders_datasetdanbaby_dataset) sebagai sumber data. Keduanya menyimpan informasi pesanan dan pengguna, masing-masing. Data ditulis ke satu indeks (enriched_orders_view) melalui dua tabel sink Elasticsearch (es_sink1danes_sink2). Dengan mengatursink.delete-strategymenjadiNON_PK_FIELD_TO_NULL, kode ini menggunakan kemampuan pembaruan parsial Elasticsearch. Ketika kunci primer identik, hanya bidang non-kunci primer yang diperbarui, sehingga memastikan konsistensi data.CREATE TEMPORARY TABLE orders_dataset ( `order_id` BIGINT, `user_id` bigint, `auction_id` bigint, `cat_id` bigint, `cat1` bigint, `property` varchar, `buy_mount` int, `day` varchar , PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = 'rm-2zew*******.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'flinkrds***', 'password' = 'Flink***@1', 'database-name' = 'ecommerce', 'table-name' = 'orders_dataset' ); CREATE TEMPORARY TABLE baby_dataset ( `user_id` bigint, `birthday` varchar, `gender` int, PRIMARY KEY(user_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = 'rm-2zew*******.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'flinkrds***', 'password' = 'Flink***@1', 'database-name' = 'ecommerce', 'table-name' = 'baby_dataset' ); CREATE TEMPORARY TABLE es_sink1( `order_id` BIGINT, `user_id` BIGINT, `buy_mount` INT, `day` VARCHAR, PRIMARY KEY(`user_id`) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-8', 'hosts' = 'http://192.xx.xx.252:9200', 'index' = 'enriched_orders_view', 'username' ='elastic', 'password' ='Flink***@1', 'sink.delete-strategy' = 'NON_PK_FIELD_TO_NULL' ); CREATE TEMPORARY TABLE es_sink2( `user_id` BIGINT, `birthday` VARCHAR, `gender` INT, PRIMARY KEY(`user_id`) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-8', 'hosts' = 'http://192.xx.xx.252:9200', 'index' = 'enriched_orders_view', 'username' ='elastic', 'password' ='Flink***@1', 'sink.delete-strategy' = 'NON_PK_FIELD_TO_NULL' ); BEGIN STATEMENT SET; INSERT INTO es_sink1 SELECT `order_id`, `user_id`, `buy_mount`, `day` FROM orders_dataset; INSERT INTO es_sink2 SELECT `user_id`, `birthday`, `gender` FROM baby_dataset; END;Kelas Penyimpanan
Parameter
Wajib
Deskripsi
MySQL
connector
Ya
Jenis tabel. Nilainya tetap
mysql.hostname
Ya
Alamat IP atau hostname database MySQL. Gunakan alamat VPC.
port
Tidak
Nomor port layanan database MySQL.
username
Ya
Nama pengguna untuk layanan database MySQL.
password
Ya
Kata sandi untuk layanan database MySQL.
database-name
Ya
Nama database MySQL.
table-name
Ya
Nama tabel MySQL.
Elasticsearch
connector
Ya
Jenis tabel sink.
hosts
Ya
Titik akhir Elasticsearch.
Formatnya adalah
http://host_name:port.index
Ya
Nama indeks.
Dalam contoh ini, nilainya adalah enriched_orders_view.
Klik Deploy.
Di halaman Job O&M, pilih Stateless Start dan klik Start.
Langkah 4: Lihat hasil data di konsol Elasticsearch
Setelah indeks enriched_orders_view dibuat di Elasticsearch, ikuti langkah-langkah berikut untuk melihat data yang ditulis.
1. Persiapan
Restart instans Elasticsearch.
Di halaman yang muncul, pilih . Di bagian Kibana, klik Internet Endpoint dan masukkan nama pengguna serta kata sandi Anda.
Nama pengguna default untuk konsol Kibana adalah elastic. Kata sandinya adalah yang Anda atur saat membuat instans Alibaba Cloud Elasticsearch.

Proses tipe data bidang data.
Untuk menggunakan histogram nanti, Anda harus mengubah tipe data bidang
daydari teks menjadi tanggal. Anda dapat menjalankan perintah berikut di .Buat indeks baru, seperti
enriched_orders_view_new, dan definisikan pemetaannya.Atur tipe bidang
daymenjadidatedan pertahankan struktur pemetaan untuk bidang lainnya.PUT enriched_orders_view_new { "mappings": { "properties": { "birthday": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } }, "fielddata": true }, "buy_mount": { "type": "long" }, "day": { "type": "date", "format": "yyyy-MM-dd" // Tentukan format tanggal agar konsisten dengan data mentah. }, "gender": { "type": "long" }, "order_id": { "type": "long" }, "user_id": { "type": "long" } } } }Gunakan API
_reindexuntuk menyalin data dari indeks asli ke indeks baru. Selama proses ini, ubah nilai bidangdayke format tanggal.POST _reindex { "source": { "index": "enriched_orders_view" }, "dest": { "index": "enriched_orders_view_new" }, "script": { "source": """ if (ctx._source['day'] != null) { // Ubah tanggal dari format 'yyyyMMdd' menjadi 'yyyy-MM-dd'. def originalDate = ctx._source['day']; if (originalDate.length() == 8) { ctx._source['day'] = originalDate.substring(0, 4) + '-' + originalDate.substring(4, 6) + '-' + originalDate.substring(6, 8); } else { ctx.op = 'noop'; // Jika format salah, lewati dokumen. } } """ } }Verifikasi bahwa bidang day di indeks baru telah diubah ke format data yang benar, seperti
yyyy-MM-dd.GET enriched_orders_view_new/_search { "size": 10 }
Buat tampilan data.
Di panel navigasi sebelah kiri, klik Discover.

Klik Create Data View. Masukkan nama. Atur Index pattern menjadi
enriched_orders_view_newdan Timestamp field menjadi day. Klik Save Data View To Kibana.
2. Lihat status penulisan data
Di pojok kiri atas halaman, klik .
Beralih ke tampilan data yang baru saja Anda buat.
Klik Search Entire Time Range.

Lihat status penulisan data.

3. Konfigurasi grafik visualisasi
Klik bidang day lalu Visualize.

Di sisi kanan halaman, konfigurasikan sumbu horizontal dan vertikal untuk grafik Vertical bar.
Setelah mengonfigurasi satu sumbu, klik Close dan konfigurasikan sumbu lainnya.
Item Konfigurasi
Deskripsi Konfigurasi
Gambar
Sumbu horizontal
Atur Function menjadi Date Histogram
Atur Kolom ke hari
Atur Name menjadi year_month

Sumbu vertikal
Atur Function menjadi Sum
Atur Field menjadi buy_mount
Atur Name menjadi buy_num
Atur Side menjadi Left

Di sisi kanan halaman, konfigurasikan sumbu horizontal dan vertikal untuk grafik Line.
Di pojok kanan bawah, klik Add Layer. Pilih Line sebagai jenis visualisasi. Lalu, konfigurasikan sumbu horizontal dan vertikal. Konfigurasikan satu sumbu, klik Close, lalu konfigurasikan sumbu lainnya.
Item Konfigurasi
Deskripsi Konfigurasi
Gambar
Sumbu horizontal
Atur Function menjadi Date Histogram
Atur Field menjadi day
Atur Name menjadi year_month

Sumbu vertikal
Atur Function menjadi Count
Atur Field menjadi birthday
Atur Name menjadi baby_num
Atur Side menjadi Right

4. Simpan dan lihat hasil visualisasi
Untuk menyimpan grafik gabungan garis dan kolom, klik Save di pojok kanan atas halaman.

Referensi
Untuk informasi selengkapnya tentang sintaksis, parameter WITH, dan contoh penggunaan konektor Elasticsearch, lihat Elasticsearch.
Untuk informasi selengkapnya tentang sintaksis, parameter WITH, dan contoh penggunaan konektor ApsaraDB RDS for MySQL, lihat ApsaraDB RDS for MySQL.