Topik ini menjelaskan cara mengimpor data dari Apache Flink sumber terbuka ke kluster AnalyticDB for MySQL Data Warehouse Edition.
Prasyarat
Driver Apache Flink telah diunduh dan diterapkan ke direktori ${Direktori penerapan Flink}/lib di semua node Apache Flink. Anda dapat mengunduh driver sesuai dengan versi Apache Flink yang digunakan. Berikut adalah tautan unduhan untuk paket driver yang sesuai:
Apache Flink 1.11: flink-connector-jdbc_2.11-1.11.0.jar
Apache Flink 1.12: flink-connector-jdbc_2.11-1.12.0.jar
Apache Flink 1.13: flink-connector-jdbc_2.11-1.13.0.jar
Untuk mengunduh driver untuk versi lainnya, kunjungi halaman JDBC SQL Connector.
Driver MySQL telah diunduh dan diterapkan ke direktori ${Direktori penerapan Flink}/lib di semua node Apache Flink.
CatatanVersi driver MySQL harus 5.1.40 atau lebih baru. Untuk mengunduh driver MySQL, kunjungi halaman mysql/mysql-connector-java.
Kluster Apache Flink telah di-restart setelah semua paket JAR diterapkan. Untuk informasi lebih lanjut tentang memulai kluster Apache Flink, lihat Langkah 2: Mulai Kluster.
Database dan tabel telah dibuat di kluster AnalyticDB for MySQL untuk menyimpan data yang ingin ditulis. Untuk informasi tentang pembuatan database dan tabel, lihat CREATE DATABASE dan CREATE TABLE.
CatatanDalam contoh ini, database bernama
tpchdibuat menggunakan pernyataan berikut:CREATE DATABASE IF NOT EXISTS tpch;Dalam contoh ini, tabel bernama
persondibuat menggunakan pernyataan berikut:CREATE TABLE IF NOT EXISTS person(user_id string, user_name string, age int);
Jika kluster AnalyticDB for MySQL dalam mode elastis, aktifkan ENI di bagian Network Information pada halaman Cluster Information.
PentingSaat mengaktifkan atau menonaktifkan ENI, koneksi database mungkin terputus selama sekitar 2 menit. Selama periode ini, operasi baca atau tulis tidak dapat dilakukan. Harap berhati-hati saat mengaktifkan atau menonaktifkan ENI.
Catatan penggunaan
Topik ini hanya menjelaskan penggunaan Apache Flink SQL untuk membuat tabel dan menulis data ke AnalyticDB for MySQL. Untuk informasi tentang penggunaan API Java Database Connectivity (JDBC) Apache Flink untuk menulis data, lihat JDBC Connector.
Metode yang dijelaskan dalam topik ini hanya berlaku untuk Apache Flink 1.11 dan yang lebih baru. Untuk menulis data dari versi Apache Flink lainnya ke kluster AnalyticDB for MySQL, gunakan salah satu metode berikut:
Untuk informasi tentang menulis data dari Apache Flink 1.9 dan 1.10, lihat Flink v1.10.
Untuk informasi tentang menulis data dari Apache Flink 1.8 dan yang lebih lama, lihat Flink v1.8.
Prosedur
Dalam contoh ini, file CSV digunakan sebagai data sumber yang akan ditulis.
Langkah | Deskripsi |
Buat file CSV, tulis data ke file tersebut, lalu terapkan file tersebut ke direktori /root di semua node Apache Flink. | |
Eksekusi pernyataan SQL untuk membuat tabel sumber dan tabel hasil di Apache Flink dan gunakan tabel-tabel tersebut untuk menulis data sumber ke kluster AnalyticDB for MySQL. | |
Masuk ke database AnalyticDB for MySQL untuk memeriksa apakah data sumber telah ditulis. |
Langkah 1: Persiapkan data
Di direktori root node Apache Flink, jalankan perintah
vim /root/data.csvuntuk membuat file CSV bernama data.csv.File CSV berisi data berikut. Anda dapat menyalin baris tambahan data untuk meningkatkan jumlah data yang ingin ditulis.
0,json00,20 1,json01,21 2,json02,22 3,json03,23 4,json04,24 5,json05,25 6,json06,26 7,json07,27 8,json08,28 9,json09,29Setelah file CSV dibuat, terapkan file tersebut ke direktori /root di node Apache Flink lainnya.
Langkah 2: Tulis data
Mulai dan jalankan aplikasi Apache Flink SQL. Untuk informasi lebih lanjut, lihat Memulai CLI Klien SQL.
Eksekusi pernyataan berikut untuk membuat tabel sumber bernama
csv_person:CREATE TABLE if not exists csv_person ( `user_id` STRING, `user_name` STRING, `age` INT ) WITH ( 'connector' = 'filesystem', 'path' = 'file:///root/data.csv', 'format' = 'csv', 'csv.ignore-parse-errors' = 'true', 'csv.allow-comments' = 'true' );CatatanNama kolom dan tipe data tabel sumber harus sama dengan nama kolom dan tipe data tabel tujuan di kluster AnalyticDB for MySQL.
Bidang
pathdalam pernyataan di atas menentukan direktori lokal file data.csv. Pastikan direktori file di semua node Apache Flink sama. Jika file data.csv tidak disimpan di perangkat lokal Anda, tentukan direktori aktual file tersebut.Untuk informasi tentang parameter lainnya dalam pernyataan di atas, lihat FileSystem SQL Connector.
Eksekusi pernyataan berikut untuk membuat tabel hasil bernama
mysql_person:CREATE TABLE mysql_person ( user_id String, user_name String, age INT ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://<endpoint:port>/<db_name>?useServerPrepStmts=false&rewriteBatchedStatements=true', 'table-name' = '<table_name>', 'username' = '<username>', 'password' = '<password>', 'sink.buffer-flush.max-rows' = '10', 'sink.buffer-flush.interval' = '1s' );CatatanNama kolom dan tipe data tabel hasil harus sama dengan nama kolom dan tipe data tabel tujuan di kluster AnalyticDB for MySQL.
Tabel berikut menjelaskan parameter yang diperlukan untuk terhubung ke kluster AnalyticDB for MySQL. Untuk informasi tentang parameter opsional, lihat bagian "Opsi konektor" dari topik JDBC SQL Connector.
Parameter
Deskripsi
connectorJenis konektor yang digunakan oleh Apache Flink. Atur parameter ini ke
jdbc.urlURL JDBC kluster AnalyticDB for MySQL.
Format:
jdbc:mysql://<endpoint:port>/<db_name>?useServerPrepStmts=false&rewriteBatchedStatements=true'.endpoint: endpoint kluster AnalyticDB for MySQL.CatatanJika Anda ingin menggunakan endpoint publik untuk terhubung ke kluster AnalyticDB for MySQL, Anda harus terlebih dahulu mengajukan endpoint publik.
db_name: nama database tujuan di kluster AnalyticDB for MySQL.useServerPrepStmts=false&rewriteBatchedStatements=true: konfigurasi yang diperlukan untuk menulis data secara batch ke kluster AnalyticDB for MySQL. Konfigurasi ini digunakan untuk meningkatkan kinerja penulisan dan mengurangi beban pada kluster AnalyticDB for MySQL.
Contoh:
jdbc:mysql://am-**********.ads.aliyuncs.com:3306/tpch?useServerPrepStmts=false&rewriteBatchedStatements=true.table-nameNama tabel tujuan di kluster AnalyticDB for MySQL yang digunakan untuk menyimpan data yang ingin Anda tulis. Dalam contoh ini, nama tabel tujuan adalah
person.usernameNama akun database AnalyticDB for MySQL yang memiliki izin menulis.
CatatanAnda dapat mengeksekusi pernyataan SHOW GRANTS untuk memeriksa izin akun saat ini.
Anda dapat mengeksekusi pernyataan GRANT untuk memberikan izin kepada akun.
passwordKata sandi akun database AnalyticDB for MySQL yang memiliki izin menulis.
sink.buffer-flush.max-rowsJumlah maksimum baris yang dapat ditulis dari Apache Flink ke kluster AnalyticDB for MySQL dalam satu waktu. Apache Flink menerima data secara real-time. Saat jumlah baris data yang diterima Apache Flink mencapai nilai parameter ini, baris data tersebut ditulis secara batch ke kluster AnalyticDB for MySQL. Nilai valid:
0: Apache Flink menulis data secara batch hanya ketika interval waktu maksimum yang ditentukan oleh parameter
sink.buffer-flush.intervaltercapai.Nilai selain 0 yang menentukan jumlah baris tertentu. Contoh: 1000 atau 2000.
CatatanKami merekomendasikan agar Anda tidak mengatur parameter ini ke 0. Jika Anda mengatur parameter ini ke 0, kinerja penulisan menurun dan beban pada kluster AnalyticDB for MySQL meningkat selama kueri bersamaan.
Jika Anda mengatur baik parameter
sink.buffer-flush.max-rowsmaupunsink.buffer-flush.intervalke nilai selain 0, aturan penulisan batch berikut berlaku:Jika jumlah baris data yang diterima Apache Flink mencapai nilai parameter
sink.buffer-flush.max-rowstetapi interval waktu maksimum belum mencapai nilai parametersink.buffer-flush.interval, Apache Flink menulis data secara batch ke kluster AnalyticDB for MySQL tanpa perlu menunggu interval waktu maksimum habis.Jika jumlah baris data yang diterima Apache Flink tidak mencapai nilai parameter
sink.buffer-flush.max-rowstetapi interval waktu maksimum mencapai nilai parametersink.buffer-flush.interval, Apache Flink menulis data secara batch ke kluster AnalyticDB for MySQL terlepas dari jumlah data yang diterima Apache Flink.
sink.buffer-flush.intervalInterval waktu maksimum untuk Apache Flink menulis data secara batch ke kluster AnalyticDB for MySQL, yang juga merupakan jumlah waktu maksimum yang diperlukan sebelum operasi penulisan batch berikutnya. Nilai valid:
0: Apache Flink menulis data secara batch hanya ketika jumlah baris data maksimum yang ditentukan oleh parameter
sink.buffer-flush.max-rowstercapai.Nilai selain 0 yang menentukan interval waktu tertentu. Contoh: 1d, 1h, 1min, 1s, atau 1ms.
CatatanKami merekomendasikan agar Anda tidak mengatur parameter ini ke 0 untuk memastikan bahwa data ditulis tepat waktu ketika jumlah data sumber kecil selama jam-jam sepi.
Eksekusi pernyataan
INSERT INTOuntuk mengimpor data. Jika kunci utama memiliki nilai duplikat, data tidak dimasukkan ulang, dan pernyataan INSERT INTO setara dengan pernyataanINSERT IGNORE INTO. Untuk informasi lebih lanjut, lihat INSERT INTO.INSERT INTO mysql_person SELECT user_id, user_name, age FROM csv_person;
Langkah 3: Verifikasi data
Setelah data ditulis, masuk ke database tpch di kluster AnalyticDB for MySQL dan eksekusi pernyataan berikut untuk memeriksa apakah data sumber telah ditulis ke tabel person:
SELECT * FROM person;