DataWorks Data Integration menyediakan MongoDB Reader yang memungkinkan Anda membaca data dari sumber data MongoDB. Anda juga dapat menggunakan writer untuk menulis data ke sumber data lain. Topik ini memberikan contoh cara menggunakan tugas sinkronisasi batch di Data Integration untuk menyinkronkan data dari sumber data MongoDB ke sumber data MaxCompute.
Informasi latar belakang
Dalam topik ini, sumber data MongoDB digunakan sebagai sumber dan sumber data MaxCompute sebagai tujuan. Sebelum melakukan sinkronisasi data, pastikan Anda merujuk pada bagian "Persiapan Awal" untuk menyiapkan data MongoDB dan tabel MaxCompute yang akan digunakan dalam sinkronisasi.
Prasyarat
DataWorks telah diaktifkan dan sumber data MaxCompute telah ditambahkan ke ruang kerja DataWorks.
Grup sumber daya eksklusif untuk Data Integration telah dibeli dan dikonfigurasi. Grup ini digunakan untuk menjalankan tugas sinkronisasi batch dalam topik ini. Untuk informasi lebih lanjut, lihat Buat dan Gunakan Grup Sumber Daya Eksklusif untuk Data Integration.
CatatanAnda juga dapat menggunakan grup sumber daya versi baru. Untuk informasi lebih lanjut, lihat Buat dan Gunakan Grup Sumber Daya Serverless.
Persiapan awal
Dalam contoh ini, Anda perlu menyiapkan koleksi data MongoDB dan tabel MaxCompute untuk sinkronisasi data.
Siapkan Koleksi Data MongoDB.
Dalam contoh ini, ApsaraDB for MongoDB digunakan. Contoh kode berikut menunjukkan cara menyiapkan koleksi data ApsaraDB for MongoDB.
Buat koleksi data bernama
di_mongodb_conf_test.db.createCollection('di_mongodb_conf_test')Masukkan data sampel ke dalam koleksi data.
db.di_mongodb_conf_test.insertOne({ 'col_string':'mock string value', 'col_int32':NumberInt("1"), 'col_int32_min':NumberInt("-2147483648"), 'col_int32_max':NumberInt("2147483647"), 'col_int64':NumberLong("1234567890123456"), 'col_int64_min':NumberLong("-9223372036854775807"), 'col_int64_max':NumberLong("9223372036854775807"), 'col_decimal':NumberDecimal("9999999.4999999999"), 'col_double':9999999.99, 'col_boolean':true, 'col_timestamp':ISODate(), 'col_date':new Date(), 'col_array_to_json':['a','b'], 'col_array_to_join':['a','b'], 'col_doc':{ 'key_string':'mock string value', 'key_int32':NumberInt("1"), 'key_int32_min':NumberInt("-2147483648"), 'key_int32_max':NumberInt("2147483647"), 'key_int64':NumberLong("1234567890123456"), 'key_int64_min':NumberLong("-9223372036854775807"), 'key_int64_max':NumberLong("9223372036854775807"), 'key_decimal':NumberDecimal("9999999.4999999999"), 'key_double':9999999.99, 'key_boolean':true, 'key_timestamp':ISODate(), 'key_date':new Date(), 'key_array_to_json':['a','b'], 'key_array_to_join':['a','b'], }, 'col_extra_1':'this is extra 1', 'col_extra_2':'this is extra 2', })Kueri data yang dimasukkan ke dalam koleksi data MongoDB.
db.getCollection("di_mongodb_conf_test").find({})Gambar berikut menunjukkan hasil kueri.

Siapkan Tabel MaxCompute.
Buat tabel partisi bernama
di_mongodb_conf_test. Bidang partisi adalahpt.CREATE TABLE IF NOT EXISTS di_mongodb_conf_test ( `id` STRING ,`col_string` STRING ,`col_int32` INT ,`col_int32_min` INT ,`col_int32_max` INT ,`col_int64` BIGINT ,`col_int64_min` BIGINT ,`col_int64_max` BIGINT ,`col_decimal` DECIMAL(38,18) ,`col_double` DOUBLE ,`col_boolean` BOOLEAN ,`col_timestamp` TIMESTAMP ,`col_date` DATE ,`col_array_to_json` STRING ,`col_array_to_join` STRING ,`key_string` STRING ,`key_int32` INT ,`key_int32_min` INT ,`key_int32_max` INT ,`key_int64` BIGINT ,`key_int64_min` BIGINT ,`key_int64_max` BIGINT ,`key_decimal` DECIMAL(38,18) ,`key_double` DOUBLE ,`key_boolean` BOOLEAN ,`key_timestamp` TIMESTAMP ,`key_date` DATE ,`key_array_to_json` STRING ,`key_array_to_join` STRING ,`col_doc` STRING ,`col_combine` STRING ) PARTITIONED BY ( pt STRING ) LIFECYCLE 36500 ;Tambahkan nilai
20230202ke bidang partisi.alter table di_mongodb_conf_test add if not exists partition (pt='20230202');Periksa apakah tabel partisi dibuat dengan benar.
SELECT*FROM di_mongodb_conf_test WHEREpt='20230202';
Konfigurasikan tugas sinkronisasi batch
Langkah 1: Tambahkan sumber data MongoDB
Tambahkan sumber data MongoDB dan pastikan koneksi jaringan telah dibuat antara sumber data dan grup sumber daya eksklusif untuk Data Integration. Untuk informasi lebih lanjut, lihat Tambahkan Sumber Data MongoDB.
Langkah 2: Buat dan konfigurasikan tugas sinkronisasi batch
Buat tugas sinkronisasi batch di halaman DataStudio di konsol DataWorks dan konfigurasikan item-item seperti sumber dan tujuan untuk tugas sinkronisasi batch. Langkah ini hanya menjelaskan beberapa item yang harus Anda konfigurasikan. Untuk item lainnya, pertahankan nilai default. Untuk informasi lebih lanjut, lihat Konfigurasikan Tugas Sinkronisasi Batch Menggunakan UI Tanpa Kode.
Buat koneksi jaringan antara sumber data dan grup sumber daya eksklusif untuk Data Integration.
Pilih sumber data MongoDB yang Anda tambahkan di Langkah 1, sumber data MaxCompute yang Anda tambahkan, dan grup sumber daya eksklusif untuk Data Integration. Kemudian, uji konektivitas jaringan antara sumber data dan grup sumber daya.
Pilih Sumber Data.
Pilih koleksi data MongoDB dan tabel MaxCompute partisi yang Anda siapkan dalam langkah persiapan data.
Konfigurasikan Pemetaan Bidang.
Jika sumber data MongoDB ditambahkan, metode mapping fields in a row of the source to the fields in the same row of the destination digunakan secara default. Anda juga dapat mengklik ikon
untuk mengedit bidang dalam koleksi sumber secara manual. Contoh kode berikut menunjukkan cara mengedit bidang dalam koleksi sumber:{"name":"_id","type":"string"} {"name":"col_string","type":"string"} {"name":"col_int32","type":"long"} {"name":"col_int32_min","type":"long"} {"name":"col_int32_max","type":"long"} {"name":"col_int64","type":"long"} {"name":"col_int64_min","type":"long"} {"name":"col_int64_max","type":"long"} {"name":"col_decimal","type":"double"} {"name":"col_double","type":"double"} {"name":"col_boolean","type":"boolean"} {"name":"col_timestamp","type":"date"} {"name":"col_date","type":"date"} {"name":"col_array_to_json","type":"string"} {"name":"col_array_to_join","type":"array","splitter":","} {"name":"col_doc.key_string","type":"document.string"} {"name":"col_doc.key_int32","type":"document.long"} {"name":"col_doc.key_int32_min","type":"document.long"} {"name":"col_doc.key_int32_max","type":"document.long"} {"name":"col_doc.key_int64","type":"document.long"} {"name":"col_doc.key_int64_min","type":"document.long"} {"name":"col_doc.key_int64_max","type":"document.long"} {"name":"col_doc.key_decimal","type":"document.double"} {"name":"col_doc.key_double","type":"document.double"} {"name":"col_doc.key_boolean","type":"document.boolean"} {"name":"col_doc.key_timestamp","type":"document.date"} {"name":"col_doc.key_date","type":"document.date"} {"name":"col_doc.key_array_to_json","type":"document"} {"name":"col_doc.key_array_to_join","type":"document.array","splitter":","} {"name":"col_doc","type":"string"} {"name":"col_combine","type":"combine"}Setelah Anda mengedit bidang, pemetaan baru antara bidang sumber dan bidang tujuan akan ditampilkan di tab konfigurasi tugas.
Langkah 3: Kirim dan terapkan tugas sinkronisasi batch
Jika Anda menggunakan ruang kerja dalam mode standar dan ingin menjadwalkan tugas sinkronisasi batch secara berkala di lingkungan produksi, Anda dapat mengirim dan menerapkan tugas ke lingkungan produksi. Untuk informasi lebih lanjut, lihat Terapkan Node.
Langkah 4: Jalankan tugas sinkronisasi batch dan lihat hasil sinkronisasi
Setelah Anda menyelesaikan konfigurasi sebelumnya, Anda dapat menjalankan tugas sinkronisasi batch. Setelah proses selesai, Anda dapat melihat data yang disinkronkan ke tabel MaxCompute.
Gambar berikut menunjukkan data di bidang
col_doc.
Gambar berikut menunjukkan data di bidang
col_combine.
Untuk informasi tentang masalah terkait data keluaran yang dikonversi dari data tipe DECIMAL, lihat bagian Lampiran 2: Masalah Terkait Data Keluaran yang Dikonversi dari Data Tipe DECIMAL dari topik ini.
Lampiran 1: Konversi tipe data selama sinkronisasi data
Konversi array menjadi data JSON: col_array_to_json
Data MongoDB mentah | Konfigurasi pemetaan bidang | Hasil konversi |
| Jika Anda mengatur parameter | |
Konversi array menjadi string gabungan: col_array_to_join
Data MongoDB mentah | Konfigurasi pemetaan bidang | Hasil konversi |
| Jika Anda mengatur parameter | |
Baca data dari bidang bersarang tertentu dalam dokumen MongoDB
Data MongoDB mentah | Konfigurasi pemetaan bidang | Hasil konversi |
|
| |
Konversi data dalam dokumen MongoDB menjadi data JSON
Data MongoDB mentah | Konfigurasi pemetaan bidang | Hasil konversi |
| Jika Anda mengatur parameter | |
Konversi data dalam bidang kecuali bidang yang dikonfigurasikan dalam dokumen MongoDB menjadi data JSON
Data MongoDB mentah | Konfigurasi pemetaan bidang | Hasil konversi |
| Dokumen MongoDB berisi empat bidang: col_1, col_2, col_3, dan col_4. Bidang col_1 dan col_2 bukan tipe data COMBINE dan dikonfigurasikan dalam tugas sinkronisasi batch. Bidang col_3 dan col_4 dikonversi menjadi data JSON saat tugas dijalankan. | |
Lampiran 2: Masalah terkait data keluaran yang dikonversi dari data tipe DECIMAL
Contoh kode berikut menunjukkan keluaran default setelah data tipe Decimal128 dikonversi menjadi data JSON:
{
"key_decimal":
{
"finite": true,
"high": 3471149412795809792,
"infinite": false,
"low": 99999994999999999,
"naN": false,
"negative": false
}
}Anda dapat melakukan langkah-langkah berikut jika ingin mengonversi data tipe Decimal128 menjadi angka:
Saat Anda mengonfigurasi tugas sinkronisasi batch, klik Skrip Konversi di bilah alat atas tab konfigurasi tugas untuk beralih ke mode skrip.
Ubah konfigurasi sumber dengan menambahkan parameter
decimal128OutputTypedalam parameter dan atur parameter tersebut kebigDecimalsebagai nilai tetap.
Jalankan tugas sinkronisasi batch lagi dan lihat hasilnya.
{ "key_decimal": "9999999.4999999999" }