DataWorks Data Integration menyediakan plugin MongoDB Reader untuk membaca data dari MongoDB dan menyinkronkannya ke sumber data lain. Topik ini memberikan contoh cara menggunakan Data Integration untuk melakukan sinkronisasi offline data dari MongoDB ke MaxCompute.
Informasi latar belakang
Dalam tutorial ini, sumber data adalah MongoDB dan tujuan sinkronisasi adalah MaxCompute. Sebelum memulai, siapkan data MongoDB untuk disinkronkan dan buat tabel MaxCompute untuk menyimpan data yang telah disinkronkan.
Prasyarat
Pastikan prasyarat berikut terpenuhi.
Anda telah mengaktifkan DataWorks dan membuat sumber data MaxCompute.
Diperlukan grup sumber daya eksklusif untuk Data Integration guna menjalankan task offline. Anda harus membeli dan mengonfigurasi grup sumber daya ini sebelum melanjutkan. Untuk informasi selengkapnya, lihat Use an exclusive resource group for Data Integration.
CatatanAnda juga dapat menggunakan grup sumber daya General-purpose. Untuk informasi selengkapnya, lihat Use a Serverless resource group.
Siapkan tabel data sampel
Siapkan koleksi data MongoDB dan tabel MaxCompute untuk sinkronisasi data offline.
Siapkan koleksi data MongoDB.
Tutorial ini menggunakan ApsaraDB for MongoDB sebagai contoh. Kode berikut menunjukkan cara menyiapkan koleksi data MongoDB.
Buat koleksi data bernama
di_mongodb_conf_test.db.createCollection('di_mongodb_conf_test')Masukkan data sampel untuk tutorial ini ke dalam koleksi data.
db.di_mongodb_conf_test.insertOne({ 'col_string':'mock string value', 'col_int32':NumberInt("1"), 'col_int32_min':NumberInt("-2147483648"), 'col_int32_max':NumberInt("2147483647"), 'col_int64':NumberLong("1234567890123456"), 'col_int64_min':NumberLong("-9223372036854775807"), 'col_int64_max':NumberLong("9223372036854775807"), 'col_decimal':NumberDecimal("9999999.4999999999"), 'col_double':9999999.99, 'col_boolean':true, 'col_timestamp':ISODate(), 'col_date':new Date(), 'col_array_to_json':['a','b'], 'col_array_to_join':['a','b'], 'col_doc':{ 'key_string':'mock string value', 'key_int32':NumberInt("1"), 'key_int32_min':NumberInt("-2147483648"), 'key_int32_max':NumberInt("2147483647"), 'key_int64':NumberLong("1234567890123456"), 'key_int64_min':NumberLong("-9223372036854775807"), 'key_int64_max':NumberLong("9223372036854775807"), 'key_decimal':NumberDecimal("9999999.4999999999"), 'key_double':9999999.99, 'key_boolean':true, 'key_timestamp':ISODate(), 'key_date':new Date(), 'key_array_to_json':['a','b'], 'key_array_to_join':['a','b'], }, 'col_extra_1':'this is extra 1', 'col_extra_2':'this is extra 2', })Kueri data yang dimasukkan ke MongoDB.
db.getCollection("di_mongodb_conf_test").find({})Hasil kueri ditampilkan pada gambar berikut:

Siapkan tabel MaxCompute.
Buat tabel partisi bernama
di_mongodb_conf_test. Bidang partisi adalahpt.CREATE TABLE IF NOT EXISTS di_mongodb_conf_test ( `id` STRING ,`col_string` STRING ,`col_int32` INT ,`col_int32_min` INT ,`col_int32_max` INT ,`col_int64` BIGINT ,`col_int64_min` BIGINT ,`col_int64_max` BIGINT ,`col_decimal` DECIMAL(38,18) ,`col_double` DOUBLE ,`col_boolean` BOOLEAN ,`col_timestamp` TIMESTAMP ,`col_date` DATE ,`col_array_to_json` STRING ,`col_array_to_join` STRING ,`key_string` STRING ,`key_int32` INT ,`key_int32_min` INT ,`key_int32_max` INT ,`key_int64` BIGINT ,`key_int64_min` BIGINT ,`key_int64_max` BIGINT ,`key_decimal` DECIMAL(38,18) ,`key_double` DOUBLE ,`key_boolean` BOOLEAN ,`key_timestamp` TIMESTAMP ,`key_date` DATE ,`key_array_to_json` STRING ,`key_array_to_join` STRING ,`col_doc` STRING ,`col_combine` STRING ) PARTITIONED BY ( pt STRING ) LIFECYCLE 36500 ;Tambahkan partisi dengan nilai
20230202.alter table di_mongodb_conf_test add if not exists partition (pt='20230202');Verifikasi bahwa tabel partisi telah dibuat.
SELECT*FROM di_mongodb_conf_test WHEREpt='20230202';
Konfigurasikan task offline
Langkah 1: Tambahkan sumber data MongoDB
Tambahkan sumber data MongoDB dan pastikan terdapat konektivitas jaringan antara sumber data tersebut dan grup sumber daya eksklusif untuk Data Integration. Untuk informasi selengkapnya, lihat Configure a MongoDB data source.
Langkah 2: Buat node sync offline dan konfigurasikan task sinkronisasi
Di DataWorks DataStudio, buat node sync offline. Konfigurasikan sumber, tujuan, dan parameter lainnya untuk task tersebut. Langkah-langkah konfigurasi utama dijelaskan di bawah ini. Anda dapat menggunakan nilai default untuk parameter lainnya. Untuk informasi selengkapnya, lihat Configure a task in the codeless UI.
Konfigurasikan konektivitas jaringan.
Pilih sumber data MongoDB dan MaxCompute yang telah Anda buat pada langkah sebelumnya serta grup sumber daya eksklusif untuk Data Integration yang sesuai. Kemudian, uji konektivitasnya.
Konfigurasikan task: Pilih sumber data.
Pilih koleksi data MongoDB dan tabel partisi MaxCompute yang telah Anda siapkan.
Konfigurasikan task: Pemetaan bidang.
Untuk sumber data MongoDB, Same-line Mapping digunakan secara default. Anda juga dapat mengklik ikon
untuk mengedit bidang tabel sumber secara manual. Kode berikut memberikan contoh pengeditan manual.{"name":"_id","type":"string"} {"name":"col_string","type":"string"} {"name":"col_int32","type":"long"} {"name":"col_int32_min","type":"long"} {"name":"col_int32_max","type":"long"} {"name":"col_int64","type":"long"} {"name":"col_int64_min","type":"long"} {"name":"col_int64_max","type":"long"} {"name":"col_decimal","type":"double"} {"name":"col_double","type":"double"} {"name":"col_boolean","type":"boolean"} {"name":"col_timestamp","type":"date"} {"name":"col_date","type":"date"} {"name":"col_array_to_json","type":"string"} {"name":"col_array_to_join","type":"array","splitter":","} {"name":"col_doc.key_string","type":"document.string"} {"name":"col_doc.key_int32","type":"document.long"} {"name":"col_doc.key_int32_min","type":"document.long"} {"name":"col_doc.key_int32_max","type":"document.long"} {"name":"col_doc.key_int64","type":"document.long"} {"name":"col_doc.key_int64_min","type":"document.long"} {"name":"col_doc.key_int64_max","type":"document.long"} {"name":"col_doc.key_decimal","type":"document.double"} {"name":"col_doc.key_double","type":"document.double"} {"name":"col_doc.key_boolean","type":"document.boolean"} {"name":"col_doc.key_timestamp","type":"document.date"} {"name":"col_doc.key_date","type":"document.date"} {"name":"col_doc.key_array_to_json","type":"document"} {"name":"col_doc.key_array_to_join","type":"document.array","splitter":","} {"name":"col_doc","type":"string"} {"name":"col_combine","type":"combine"}Setelah Anda mengedit bidang secara manual, antarmuka akan menampilkan pemetaan antara bidang sumber dan tujuan.
Langkah 3: Kirim dan publikasikan node sync offline
Jika Anda menggunakan ruang kerja DataWorks mode standar dan ingin menjadwalkan task sync offline agar berjalan secara berkala di lingkungan produksi, Anda harus mengirim dan mempublikasikan node tersebut. Untuk informasi selengkapnya, lihat Publish a task.
Langkah 4: Jalankan node sync offline dan lihat hasilnya
Setelah konfigurasi selesai, jalankan node sync tersebut. Setelah node berhasil dijalankan, Anda dapat melihat data yang telah disinkronkan ke tabel MaxCompute.
Konten bidang
col_docditampilkan pada gambar berikut.
Konten bidang
col_combineditampilkan pada gambar berikut.
Untuk informasi selengkapnya mengenai output tipe decimal, lihat Appendix 2: Output of the Decimal type in a document.
Lampiran 1: Transformasi format data selama sinkronisasi
Konversi data array ke format JSON: col_array_to_json
Data MongoDB mentah | Konfigurasi pemetaan bidang | Output ke MaxCompute |
| Saat mengonfigurasi pemetaan bidang, jika Anda mengatur | |
Konversi data array menjadi string gabungan: col_array_to_join
Data MongoDB mentah | Konfigurasi pemetaan bidang | Output ke MaxCompute |
| Saat mengonfigurasi pemetaan bidang, jika Anda mengatur | |
Baca bidang tertentu dari dokumen bersarang
Data MongoDB mentah | Konfigurasi pemetaan bidang | Output ke MaxCompute |
|
| |
Data dokumen sebagai output serialisasi JSON
Data MongoDB mentah | Konfigurasi pemetaan bidang | Output ke MaxCompute |
| Saat mengonfigurasi pemetaan bidang, jika Anda mengatur | |
Serialisasi sisa bidang dokumen ke format JSON
Data MongoDB mentah | Konfigurasi pemetaan bidang | Output ke MaxCompute |
| Dokumen memiliki empat bidang. Task dikonfigurasi dengan dua bidang bertipe non-combine (`col_1` dan `col_2`). Task sinkronisasi akan melakukan serialisasi semua bidang selain `col_1` dan `col_2` ke format JSON dan menghasilkan output tersebut. | |
Lampiran 2: Output tipe Decimal dalam dokumen
Saat sebuah dokumen diserialisasi ke format JSON, bidang bertipe Decimal128 secara default akan menghasilkan output sebagai berikut:
{
"key_decimal":
{
"finite": true,
"high": 3471149412795809792,
"infinite": false,
"low": 99999994999999999,
"naN": false,
"negative": false
}
}Untuk menghasilkan output berupa tipe angka, lakukan langkah-langkah berikut:
Saat mengonfigurasi task sinkronisasi offline, Anda dapat beralih ke editor kode dengan mengklik tombol Convert to Script.
Ubah konfigurasi task Reader. Di bagian parameter, tambahkan parameter
decimal128OutputTypedan atur nilainya kebigDecimal.
Jalankan ulang task sinkronisasi offline dan lihat hasilnya.
{ "key_decimal": "9999999.4999999999" }