Topik ini menjelaskan cara membuat konektor sumber MySQL dan menggunakan DataWorks untuk menyinkronkan data dari ApsaraDB RDS for MySQL ke topik dalam instance ApsaraMQ for Kafka Anda.
Prasyarat
- Fitur konektor diaktifkan untuk instance ApsaraMQ for Kafka Anda. Untuk informasi lebih lanjut, lihat Aktifkan Fitur Konektor. Penting Pastikan bahwa instance ApsaraMQ for Kafka Anda ditempatkan di wilayah China (Shenzhen), China (Chengdu), China (Beijing), China (Zhangjiakou), China (Hangzhou), China (Shanghai), atau Singapura.
- Sebuah instance ApsaraDB RDS for MySQL telah dibuat. Untuk informasi tentang cara membuat instance ApsaraDB RDS for MySQL, lihat Buat Instance ApsaraDB RDS for MySQL.
- Sebuah database dan akun database telah dibuat dalam instance ApsaraDB RDS for MySQL. Untuk informasi tentang cara membuat database dan akun database, lihat Buat Akun dan Database.
- Sebuah tabel telah dibuat dalam database. Untuk informasi tentang pernyataan SQL umum yang digunakan dalam ApsaraDB RDS for MySQL, lihat Pernyataan SQL yang Sering Digunakan untuk MySQL.
- DataWorks diberi otorisasi untuk mengakses elastic network interfaces (ENIs) Anda terlepas dari apakah Anda menggunakan akun Alibaba Cloud atau pengguna Resource Access Management (RAM). Untuk memberikan izin kepada akun, kunjungi halaman Otorisasi Akses Sumber Daya Cloud. Penting Jika Anda menggunakan pengguna RAM, pastikan bahwa pengguna RAM tersebut diberikan izin berikut:
- AliyunDataWorksFullAccess: izin untuk mengelola semua sumber daya DataWorks dalam akun Alibaba Cloud.
- AliyunBSSOrderAccess: izin untuk membeli layanan Alibaba Cloud.
Untuk informasi tentang cara menambahkan kebijakan ke pengguna RAM, lihat Langkah 2: Berikan Izin kepada Pengguna RAM.
- Baik sumber data maupun tujuan data dibuat menggunakan akun Anda. Sumber data adalah instance ApsaraDB RDS for MySQL. Tujuan data adalah instance ApsaraMQ for Kafka Anda.
- Blok CIDR dari virtual private cloud (VPC) tempat instance ApsaraDB RDS for MySQL berada dan blok CIDR dari VPC tempat instance ApsaraMQ for Kafka berada tidak tumpang tindih. Jika blok CIDR tumpang tindih, konektor sumber MySQL tidak dapat dibuat dalam Message Queue for Apache Kafka.
Informasi latar belakang
Anda dapat membuat konektor sumber MySQL di konsol ApsaraMQ for Kafka untuk menyinkronkan data dari tabel dalam instance ApsaraDB RDS for MySQL Anda ke topik dalam instance ApsaraMQ for Kafka Anda. Konektor dibuat dan dijalankan menggunakan DataWorks, seperti yang ditunjukkan pada gambar berikut.
Setelah konektor sumber MySQL dibuat di konsol ApsaraMQ for Kafka, DataWorks Basic Edition secara otomatis diaktifkan tanpa biaya. DataWorks Basic Edition menciptakan ruang kerja DataWorks dan grup sumber daya eksklusif untuk integrasi data. Ruang kerja DataWorks gratis, sedangkan grup sumber daya eksklusif adalah layanan berbayar. Spesifikasi grup sumber daya eksklusif untuk integrasi data adalah 4 vCPU dan 8 GB memori. Grup sumber daya mendukung langganan bulanan. Secara default, grup sumber daya eksklusif untuk integrasi data diperpanjang secara otomatis setelah kedaluwarsa. Untuk informasi lebih lanjut tentang penagihan DataWorks, lihat Ikhtisar.
Selain itu, DataWorks secara otomatis menghasilkan topik tujuan dalam ApsaraMQ for Kafka berdasarkan konfigurasi konektor sumber MySQL Anda. Tabel sumber dan topik tujuan saling berkaitan berdasarkan pemetaan satu-satu. Secara default, DataWorks menghasilkan topik yang berisi enam partisi untuk setiap tabel yang memiliki kunci utama dan topik yang berisi satu partisi untuk setiap tabel yang tidak memiliki kunci utama. Pastikan bahwa jumlah topik dan partisi yang tersedia dalam instance Message Queue for Apache Kafka Anda cukup. Jika tidak, konektor sumber MySQL gagal dibuat karena DataWorks gagal membuat topik.
Nama setiap topik dalam format <Awalan yang ditentukan>_<Nama tabel sumber>. Garis bawah (_) ditambahkan secara otomatis oleh sistem. Gambar berikut memberikan contoh.

Dalam contoh ini, awalan yang ditentukan adalah mysql. Tabel sumber yang akan disinkronkan adalah table_1, table_2, ..., dan table_n. DataWorks secara otomatis menghasilkan topik untuk Anda menerima data yang disinkronkan dari tabel sumber. Topik tersebut dinamai mysql_table_1, mysql_table_2, ..., dan mysql_table_n.
Tindakan pencegahan
- Wilayah
- Sumber data dan tujuan instance Message Queue for Apache Kafka Anda mungkin tidak ditempatkan di wilayah yang sama. Dalam kasus ini, pastikan bahwa Anda memiliki instance Cloud Enterprise Network (CEN) dalam akun Alibaba Cloud Anda dan bahwa VPC dari sumber data dan instance Message Queue for Apache Kafka Anda terhubung ke instance CEN. Selain itu, pastikan bahwa bandwidth untuk koneksi lintas wilayah dikonfigurasi untuk memastikan konektivitas ujung ke ujung antara sumber data dan tujuan data.
Jika tidak, instance CEN mungkin secara otomatis dibuat, dan VPC dari instance Message Queue for Apache Kafka tujuan serta instance Elastic Compute Service (ECS) tempat grup sumber daya eksklusif Anda berada terhubung ke instance CEN untuk memastikan konektivitas ujung ke ujung. Bandwidth dari instance CEN yang dibuat secara otomatis sangat rendah karena bandwidth tidak dikonfigurasi secara manual. Ini dapat menyebabkan kesalahan konektivitas saat Anda membuat konektor sumber MySQL atau saat konektor tersebut berjalan.
- Sumber data dan tujuan instance Message Queue for Apache Kafka Anda mungkin berada di wilayah yang sama. Dalam kasus ini, saat Anda membuat konektor sumber MySQL, ENI secara otomatis dibuat dalam VPC dari sumber data atau instance Message Queue for Apache Kafka. ENI juga secara otomatis terikat ke instance ECS tempat grup sumber daya eksklusif Anda berada. Ini memastikan konektivitas ujung ke ujung antara sumber data dan tujuan data.
- Sumber data dan tujuan instance Message Queue for Apache Kafka Anda mungkin tidak ditempatkan di wilayah yang sama. Dalam kasus ini, pastikan bahwa Anda memiliki instance Cloud Enterprise Network (CEN) dalam akun Alibaba Cloud Anda dan bahwa VPC dari sumber data dan instance Message Queue for Apache Kafka Anda terhubung ke instance CEN. Selain itu, pastikan bahwa bandwidth untuk koneksi lintas wilayah dikonfigurasi untuk memastikan konektivitas ujung ke ujung antara sumber data dan tujuan data.
- Grup Sumber Daya Eksklusif dalam DataWorks
- DataWorks memungkinkan Anda menggunakan setiap grup sumber daya eksklusif untuk menjalankan hingga tiga konektor sumber MySQL. Jika DataWorks menemukan bahwa grup sumber daya yang ada telah digunakan untuk menjalankan kurang dari tiga konektor sumber MySQL saat Anda membuat konektor sumber MySQL, DataWorks menggunakan grup sumber daya ini untuk menjalankan konektor sumber MySQL yang baru dibuat.
- Setiap grup sumber daya eksklusif dalam DataWorks dapat dikaitkan dengan ENI dari maksimal dua VPC. DataWorks mungkin gagal membuat konektor sumber MySQL menggunakan grup sumber daya jika ENI yang akan dikaitkan dengan grup sumber daya dan ENI yang sudah dikaitkan dengan grup sumber daya memiliki blok CIDR yang tumpang tindih atau karena batasan teknis lainnya. Dalam kasus ini, meskipun kurang dari tiga konektor berjalan pada grup sumber daya ini, DataWorks tetap membuat grup sumber daya untuk memastikan bahwa konektor sumber MySQL dapat dibuat.
Buat dan sebarkan konektor sumber MySQL
Masuk ke konsol ApsaraMQ for Kafka.
Di bagian Resource Distribution dari halaman Overview, pilih wilayah tempat instance ApsaraMQ for Kafka yang ingin Anda kelola berada.
Di panel navigasi di sebelah kiri, klik Connectors.
Di halaman Connectors, pilih instance tempat konektor milik dari daftar drop-down Select Instance dan klik Create Connector.
- Di wizard Create Connector, lakukan operasi berikut:
- Di langkah Configure Basic Information, masukkan nama konektor di bidang Name dan klik Next.
Parameter Deskripsi Contoh Name Nama konektor. Tentukan nama konektor berdasarkan konvensi penamaan berikut: - Nama konektor harus memiliki panjang 1 hingga 48 karakter dan dapat berisi digit, huruf kecil, dan tanda hubung (-). Nama tidak boleh dimulai dengan tanda hubung (-).
- Setiap nama konektor harus unik dalam sebuah instance ApsaraMQ for Kafka.
Konektor harus menggunakan grup konsumen yang dinamai dalam format connect-Nama konektor.Group Jika Anda belum membuat grup konsumen ini, Message Queue for Apache Kafka secara otomatis membuat grup konsumen untuk Anda.Group
kafka-source-mysql Instance Informasi tentang instance Message Queue for Apache Kafka. Secara default, nama dan ID instance ditampilkan. demo alikafka_post-cn-st21p8vj**** - Di langkah Configure Source Service, pilih ApsaraDB RDS for MySQL sebagai layanan sumber, konfigurasikan parameter yang dijelaskan dalam tabel berikut, lalu klik Next.
Parameter Deskripsi Contoh Region of ApsaraDB RDS for MySQL Instance ID wilayah instance ApsaraDB RDS for MySQL. Pilih ID wilayah dari daftar drop-down. China (Shenzhen) ApsaraDB RDS for MySQL Instance ID ID instance ApsaraDB RDS for MySQL dari mana Anda ingin menyinkronkan data. rm-wz91w3vk6owmz**** Database Name Nama database ApsaraDB RDS for MySQL dari mana Anda ingin menyinkronkan data. mysql-to-kafka Database Account Nama pengguna akun yang digunakan untuk mengakses database ApsaraDB RDS for MySQL. mysql_to_kafka Password of Database Account Kata sandi akun yang digunakan untuk mengakses database ApsaraDB RDS for MySQL. N/A Database Table Nama satu atau lebih tabel ApsaraDB RDS for MySQL dari mana Anda ingin menyinkronkan data. Pisahkan beberapa nama tabel dengan koma (,). Tabel sumber dan topik tujuan saling berkaitan berdasarkan pemetaan satu-satu.
mysql_tbl Tables to Automatically Add Ekspresi reguler berdasarkan sistem dapat secara otomatis menambahkan tabel lain dalam database. Saat sistem mendeteksi bahwa tabel dibuat dalam database dan sesuai dengan ekspresi reguler, sistem secara otomatis menyinkronkan data dalam tabel ke Message Queue for Apache Kafka. Anda dapat menggunakan .* untuk menentukan semua tabel dalam database.
.* Topic Prefix Awalan yang digunakan untuk menamai topik yang dibuat secara otomatis untuk konektor dalam ApsaraMQ for Kafka. Setiap nama topik terdiri dari awalan dan nama tabel sumber yang sesuai dalam database ApsaraDB RDS for MySQL. Pastikan bahwa awalan bersifat unik secara global. mysql PentingPastikan bahwa akun database ApsaraDB RDS for MySQL diberikan setidaknya izin berikut:- SELECT
- REPLICATION SLAVE
- REPLICATION CLIENT
Pernyataan berikut memberikan contoh tentang cara memberikan izin kepada akun:GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'Nama akun' @'%'; // Berikan izin SELECT, REPLICATION SLAVE, dan REPLICATION CLIENT kepada akun database. - Di langkah Configure Destination Service, tujuan instance ApsaraMQ for Kafka tempat Anda ingin menyinkronkan data muncul. Konfirmasikan informasi dan klik Create.
- Di langkah Configure Basic Information, masukkan nama konektor di bidang Name dan klik Next.
- Pergi ke halaman Connectors, temukan konektor yang Anda buat, lalu klik Deploy di kolom Actions.Di halaman Connectors, jika nilai parameter Status ditampilkan sebagai Running untuk konektor, konektor telah dibuat.Catatan Jika konektor gagal dibuat, periksa apakah semua prasyarat yang dijelaskan dalam topik ini telah dipenuhi.
Jika Anda perlu memodifikasi konfigurasi konektor sumber MySQL, klik Task Configurations di kolom Actions. Di konsol DataWorks yang Anda dialihkan, modifikasi konfigurasi konektor.
Verifikasi hasil
- Masukkan data ke dalam tabel sumber dalam database ApsaraDB RDS for MySQL.Kode sampel berikut memberikan contoh tentang cara memasukkan data ke dalam tabel sumber:
Untuk informasi lebih lanjut tentang pernyataan SQL, lihat Pernyataan SQL yang Sering Digunakan untuk MySQL.INSERT INTO mysql_tbl (mysql_title, mysql_author, submission_date) VALUES ("mysql2kafka", "tester", NOW()) - Gunakan fitur kueri pesan ApsaraMQ for Kafka untuk memeriksa apakah data tabel sumber dalam database ApsaraDB RDS for MySQL disinkronkan ke topik yang sesuai dalam instance ApsaraMQ for Kafka Anda.Untuk informasi lebih lanjut, lihat Kueri Pesan.Kode sampel berikut memberikan contoh data yang disinkronkan dari tabel sumber dalam ApsaraDB RDS for MySQL ke topik dalam ApsaraMQ for Kafka. Untuk informasi lebih lanjut tentang struktur pesan dan bidang, lihat Lampiran: Format Pesan.
{ "schema":{ "dataColumn":[ { "name":"mysql_id", "type":"LONG" }, { "name":"mysql_title", "type":"STRING" }, { "name":"mysql_author", "type":"STRING" }, { "name":"submission_date", "type":"DATE" } ], "primaryKey":[ "mysql_id" ], "source":{ "dbType":"MySQL", "dbName":"mysql_to_kafka", "tableName":"mysql_tbl" } }, "payload":{ "before":null, "after":{ "dataColumn":{ "mysql_title":"mysql2kafka", "mysql_author":"tester", "submission_date":1614700800000 } }, "sequenceId":"1614748790461000000", "timestamp":{ "eventTime":1614748870000, "systemTime":1614748870925, "checkpointTime":1614748870000 }, "op":"INSERT", "ddl":null }, "version":"0.0.1" }