DataX adalah alat open source untuk sinkronisasi data offline yang disediakan oleh Alibaba Group. Alat ini mendukung pembacaan dari database relasional seperti MySQL dan Oracle, Hadoop Distributed File System (HDFS), Hive, MaxCompute, HBase, FTP, dan lainnya. Gunakan DataX Doris Writer untuk menulis data ke ApsaraDB for SelectDB.
Alur data:
Sumber (misalnya, MySQL) -> mysqlreader -> saluran DataX -> doriswriter -> ApsaraDB for SelectDBPrasyarat
Sebelum memulai, pastikan Anda telah:
Maven terinstal
Python 3.6 atau versi lebih baru terinstal
Instans ApsaraDB for SelectDB
Impor data dari MySQL ke ApsaraDB for SelectDB
Contoh berikut menjelaskan proses impor lengkap di Linux dengan menggunakan database MySQL sebagai sumber.
Langkah 1: Siapkan lingkungan DataX
Unduh paket DataX dari ekstensi Doris di GitHub.
Jalankan skrip inisialisasi:
sh init-env.shKompilasi proyek DataX:
Jika Anda memerlukan plugin
hdfsreader,hdfswriter,ossreader, atauosswriter, diperlukan paket JAR tambahan. Untuk melewati plugin tersebut, hapus modulnya dariDataX/pom.xmlsebelum mengompilasi.cd DataX/ mvn package assembly:assembly -Dmaven.test.skip=trueHasil kompilasi disimpan di direktori
target/datax/datax/.Kompilasi plugin
mysqlreaderdandoriswritersecara terpisah:mvn clean install -pl plugin-rdbms-util,mysqlreader,doriswriter -DskipTests
Langkah 2: Siapkan data uji di MySQL
Buat tabel uji di MySQL:
CREATE TABLE `employees` ( `emp_no` int NOT NULL, `birth_date` date NOT NULL, `first_name` varchar(14) NOT NULL, `last_name` varchar(16) NOT NULL, `gender` enum('M','F') NOT NULL, `hire_date` date NOT NULL, PRIMARY KEY (`emp_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;Hasilkan data uji menggunakan Data Management (DMS).
Langkah 3: Buat tabel tujuan di ApsaraDB for SelectDB
Hubungkan ke instans ApsaraDB for SelectDB Anda melalui protokol MySQL. Untuk detailnya, lihat Hubungkan ke instans.
Buat database dan tabel tujuan:
CREATE DATABASE test_db; USE test_db; CREATE TABLE employees ( emp_no int NOT NULL, birth_date date, first_name varchar(20), last_name varchar(20), gender char(2), hire_date date ) UNIQUE KEY(`emp_no`) DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;
Langkah 4: Jalankan pekerjaan impor DataX
Ajukan titik akhir publik untuk instans ApsaraDB for SelectDB Anda. Untuk detailnya, lihat Ajukan atau rilis titik akhir publik.
Tambahkan alamat IP publik host DataX ke daftar putih alamat IP instans. Untuk detailnya, lihat Konfigurasi daftar putih alamat IP.
Buat file konfigurasi pekerjaan
mysqlToSelectDB.json:{ "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "column": [ "emp_no", "birth_date", "first_name", "last_name", "gender", "hire_date" ], "where": "emp_no>0", "connection": [ { "jdbcUrl": [ "jdbc:mysql://host:port/test_db?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8" ], "table": ["employees"] } ], "password": "123456", "splitPk": "emp_no", "username": "admin" } }, "writer": { "name": "doriswriter", "parameter": { "loadUrl": [ "selectdb-cn-xxx-public.selectdbfe.rds.aliyuncs.com:8080" ], "loadProps": { "format": "json", "strip_outer_array": "true" }, "column": [ "emp_no", "birth_date", "first_name", "last_name", "gender", "hire_date" ], "username": "admin", "password": "123456", "postSql": [], "preSql": [], "connection": [ { "jdbcUrl": "jdbc:mysql://selectdb-cn-xxx-public.selectdbfe.rds.aliyuncs.com:9030/test_db", "table": ["employees"], "selectedDatabase": "test_db" } ], "maxBatchRows": 1000000, "batchSize": 536870912000 } } } ], "setting": { "errorLimit": { "percentage": 0.02, "record": 0 }, "speed": { "channel": 5 } } } }Kirimkan pekerjaan:
cd target/datax/datax/bin python datax.py ../mysqlToSelectDB.json
Parameter
Tabel berikut menjelaskan parameter doriswriter. Parameter jdbcUrl dan loadUrl menerima titik akhir virtual private cloud (VPC) atau titik akhir publik instans ApsaraDB for SelectDB. Gunakan titik akhir VPC jika host DataX dan instans berada dalam VPC yang sama. Jika tidak, gunakan titik akhir publik.
Untuk menemukan titik akhir, masuk ke konsol ApsaraDB for SelectDB, buka halaman Instance Details, lalu periksa bagian Network Information pada tab Basic Information.
| Parameter | Wajib | Nilai default | Deskripsi |
|---|---|---|---|
jdbcUrl | Ya | — | URL Java Database Connectivity (JDBC) untuk instans. Format: jdbc:mysql://<endpoint>:<MySQL-port>/<database>. Contoh: jdbc:mysql://selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030/test_db |
loadUrl | Ya | — | Titik akhir HTTP untuk Stream Load. Format: <endpoint>:<HTTP-port>. Contoh: selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080 |
username | Ya | — | Username untuk instans. |
password | Ya | — | Password untuk username tersebut. |
connection.selectedDatabase | Ya | — | Nama database tujuan. |
connection.table | Ya | — | Nama tabel tujuan. |
column | Ya | — | Kolom tempat data ditulis. Pisahkan beberapa kolom dengan koma. Contoh: ["id", "name", "age"]. |
preSql | Tidak | — | Pernyataan SQL yang dijalankan sebelum menulis data. |
postSql | Tidak | — | Pernyataan SQL yang dijalankan setelah menulis data. |
maxBatchRows | Tidak | 500000 | Jumlah maksimum baris per batch. Batch akan dikirim saat salah satu dari maxBatchRows atau batchSize tercapai lebih dulu. |
batchSize | Tidak | 104857600 (100 MB) | Ukuran data maksimum per batch. Batch akan dikirim saat salah satu dari batchSize atau maxBatchRows tercapai lebih dulu. |
maxRetries | Tidak | 3 | Jumlah maksimum percobaan ulang setelah impor batch gagal. |
labelPrefix | Tidak | datax_doris_writer_ | Awalan untuk label impor. Setiap batch menggunakan label unik yang terdiri dari awalan ini dan UUID untuk mencegah impor duplikat. |
loadProps | Tidak | — | Parameter Stream Load tambahan. Untuk daftar lengkap parameter yang didukung, lihat bagian Parameter dalam "Impor data menggunakan Stream Load". |
flushInterval | Tidak | 30000 ms | Interval antar penulisan batch. |
Konfigurasi format data
Secara default, DataX Doris Writer mengonversi data ke format CSV, menggunakan \t sebagai pemisah kolom dan \n sebagai pemisah baris.
Gunakan pemisah kustom
Untuk mengubah pemisah, atur di loadProps. Gunakan karakter non-printable seperti \x01 dan \x02 untuk menghindari konflik dengan data yang mengandung tab atau baris baru:
"loadProps": {
"format": "csv",
"column_separator": "\\x01",
"line_delimiter": "\\x02"
}Gunakan format JSON
Untuk mengimpor data dalam format JSON:
"loadProps": {
"format": "json",
"strip_outer_array": true
}