Sumber data MongoDB memungkinkan Anda membaca dari dan menulis ke MongoDB. Topik ini menjelaskan kemampuan sinkronisasi data yang disediakan DataWorks untuk MongoDB.
Versi yang didukung
MongoDB versi 4.x, 5.x, 6.x, 7.x, dan 8.0 didukung.
Catatan penggunaan
Data Integration terhubung ke database MongoDB menggunakan akun database yang sesuai. Jika Anda menggunakan ApsaraDB for MongoDB, akun root disediakan secara default. Untuk alasan keamanan, jangan gunakan akun root untuk mengakses sumber data MongoDB.
Jika database MongoDB Anda merupakan kluster sharded, konfigurasikan alamat mongos saat mengonfigurasi sumber data. Jangan konfigurasikan alamat node mongod/shard. Jika tidak, tugas sinkronisasi mungkin hanya mengekstrak data dari satu shard tertentu, bukan seluruh dataset. Untuk informasi lebih lanjut tentang mongos dan mongod, lihat mongos dan mongod.
Jika konkurensi lebih besar dari 1, semua bidang
_iddalam koleksi yang dikonfigurasi harus memiliki tipe yang sama. Misalnya, semua bidang_idharus berupa string atau ObjectId. Jika tidak, beberapa data mungkin gagal disinkronkan.CatatanKetika konkurensi lebih besar dari 1, tugas dibagi berdasarkan bidang
_id. Oleh karena itu, bidang_idtidak mendukung tipe data campuran dalam skenario ini. Jika bidang_idmemiliki beberapa tipe data, Anda dapat menyinkronkan data dengan konkurensi 1. Jangan konfigurasi parameter splitFactor, atau atur splitFactor ke 1.
Data Integration tidak mendukung tipe array. Namun, MongoDB mendukung array dan fitur pengindeksannya yang canggih. Anda dapat mengonfigurasi parameter tertentu untuk mengonversi string menjadi array di MongoDB. Setelah konversi, Anda dapat menulis data ke MongoDB secara paralel.
Database MongoDB yang dikelola sendiri tidak mendukung akses melalui jaringan publik. Akses hanya didukung melalui jaringan pribadi Alibaba Cloud.
Kluster MongoDB yang dideploy dengan Docker tidak didukung.
Data Integration tidak mendukung pembacaan data dari kolom tertentu dalam konfigurasi kueri data (parameter query).
Dalam tugas sinkronisasi offline, jika struktur bidang tidak dapat diperoleh dari MongoDB, pemetaan bidang default dengan enam bidang akan dihasilkan. Nama bidangnya adalah
col1,col2,col3,col4,col5, dancol6.Selama eksekusi tugas, perintah
splitVectordigunakan secara default untuk melakukan sharding pada tugas. Beberapa versi MongoDB tidak mendukung perintahsplitVector, sehingga menyebabkan errorno such cmd splitVector. Untuk mencegah error ini, klik ikon
pada konfigurasi tugas sinkronisasi untuk beralih ke editor kode. Kemudian, tambahkan parameter berikut ke pengaturan parameter MongoDB agar tidak menggunakan splitVector."useSplitVector" : false
Tipe bidang yang didukung
Tipe data MongoDB yang didukung oleh MongoDB Reader
Data Integration mendukung sebagian besar tipe data MongoDB. Namun, beberapa tipe data tidak didukung. Verifikasi tipe data Anda sebelum melanjutkan.
Untuk tipe data yang didukung, Data Integration membaca data sebagai berikut:
Untuk tipe data primitif, Data Integration secara otomatis membaca data dari path yang sesuai berdasarkan nama bidang baca (column) yang dikonfigurasi untuk tugas sinkronisasi. Data Integration juga secara otomatis mengonversi tipe data tersebut. Anda tidak perlu menentukan properti type untuk kolom tersebut. Untuk informasi lebih lanjut, lihat Lampiran: Contoh skrip MongoDB dan deskripsi parameter.
Type
Baca offline (MongoDB Reader)
Deskripsi
ObjectId
Didukung
Tipe object ID.
Double
Dukungan
Tipe bilangan titik mengambang 64-bit.
Integer 32-bit
Dukungan
Integer 32-bit.
Integer 64-bit
Dukungan
Integer 64-bit.
Decimal128
Dukungan
Tipe Decimal128.
CatatanJika tipe ini dikonfigurasi sebagai tipe nested atau tipe Combine, data diproses sebagai objek selama serialisasi JSON. Anda harus menambahkan parameter
decimal128OutputTypedan mengaturnya kebigDecimaluntuk mengeluarkan data sebagai desimal.String
Didukung
Tipe string.
Boolean
Dukungan
Tipe Boolean.
Timestamp
Dukungan
Tipe timestamp.
CatatanBsonTimestamp menyimpan timestamp. Anda tidak perlu mempertimbangkan dampak zona waktu. Untuk informasi lebih lanjut, lihat Masalah Zona Waktu di MongoDB.
Date
Dukungan
Tipe tanggal.
Untuk beberapa tipe data kompleks, Anda dapat menyesuaikan pemrosesan dengan mengonfigurasi properti type dari kolom tersebut.
Type
Baca offline (MongoDB Reader)
Deskripsi
Document
Dukungan
Tipe dokumen tersemat.
Jika properti type tidak dikonfigurasi, dokumen langsung diproses melalui serialisasi JSON.
Jika properti type diatur ke
document, ini merupakan tipe nested. MongoDB Reader membaca properti dokumen berdasarkan path. Untuk contoh detail, lihat Contoh tipe data 2: Mengurai dokumen nested multi-level secara rekursif.
Array
Dukungan
Tipe array.
Jika type diatur ke
array.jsonatauarrays, data langsung diproses melalui serialisasi JSON.Jika type diatur ke
arrayataudocument.array, data digabung menjadi string. Pemisah default (splitter dalam kolom) adalah koma (,).
PentingData Integration tidak mendukung tipe array. Namun, MongoDB mendukung array dan fitur pengindeksannya yang canggih. Anda dapat mengonfigurasi parameter tertentu untuk mengonversi string menjadi array di MongoDB. Setelah konversi, Anda dapat menulis data ke MongoDB secara paralel.
Tipe data khusus Data Integration: combine
Type | Baca offline (MongoDB Reader) | Deskripsi |
Combine | Dukungan | Tipe kustom Data Integration. Jika type diatur ke |
Konversi tipe data MongoDB Reader
Tabel berikut mencantumkan konversi tipe data yang dilakukan oleh MongoDB Reader.
Kategori tipe yang dikonversi | Tipe data MongoDB |
LONG | INT, LONG, document.INT, dan document.LONG |
DOUBLE | DOUBLE dan document.DOUBLE |
STRING | STRING, ARRAY, document.STRING, document.ARRAY, dan COMBINE |
DATE | DATE dan document.DATE |
BOOLEAN | BOOL dan document.BOOL |
BYTES | BYTES dan document.BYTES |
Konversi tipe data MongoDB Writer
Kategori tipe | Tipe data MongoDB |
Integer | INT dan LONG |
Bilangan titik mengambang | DOUBLE |
String | STRING dan ARRAY |
Tanggal dan waktu | DATE |
Boolean | BOOL |
Biner | BYTES |
Contoh tipe data 1: Menggunakan tipe combine
Tipe data Combine dari plugin MongoDB Reader memungkinkan Anda menggabungkan beberapa bidang dalam dokumen MongoDB menjadi satu string JSON. Misalnya, Anda ingin mengimpor bidang dari MongoDB ke MaxCompute. Tiga dokumen berikut berisi bidang di mana kunci merepresentasikan seluruh bidang dan nilainya dihilangkan. Bidang `a` dan `b` umum di semua dokumen, sedangkan `x_n` adalah bidang non-tetap.
doc1: a b x_1 x_2doc2: a b x_2 x_3 x_4doc3: a b x_5
Dalam file konfigurasi, tentukan bidang yang memerlukan pemetaan satu-ke-satu. Untuk bidang yang akan digabung, berikan nama baru yang berbeda dari bidang apa pun yang ada dalam dokumen, dan atur tipe ke COMBINE, seperti yang ditunjukkan di bawah ini.
"column": [
{
"name": "a",
"type": "string",
},
{
"name": "b",
"type": "string",
},
{
"name": "doc",
"type": "combine",
}
]Hasil akhir yang diekspor ke MaxCompute adalah sebagai berikut.
odps_column1 | odps_column2 | odps_column3 |
a | b | {x_1,x_2} |
a | b | {x_2,x_3,x_4} |
a | b | {x_5} |
Setelah menggunakan tipe COMBINE untuk menggabungkan beberapa bidang dalam dokumen MongoDB, bidang umum secara otomatis dihapus saat output dipetakan ke MaxCompute. Hanya bidang unik dari dokumen yang dipertahankan.
Misalnya, `a` dan `b` adalah bidang umum di semua dokumen. Setelah menggabungkan bidang dalam dokumen doc1: a b x_1 x_2 menggunakan tipe COMBINE, output seharusnya {a,b,x_1,x_2}. Saat hasil ini dipetakan ke MaxCompute, bidang umum `a` dan `b` dihapus. Output akhirnya adalah {x_1,x_2}.
Contoh tipe data 2: Mengurai dokumen nested multi-level secara rekursif
Ketika dokumen MongoDB memiliki beberapa level nesting, Anda dapat mengonfigurasi tipe document untuk memprosesnya secara rekursif. Berikut adalah contohnya:
Data sumber di MongoDB:
{ "name": "name1", "a": { "b": { "c": "this is value" } } }Konfigurasi kolom MongoDB:
{"name":"_id","type":"string"} {"name":"name","type":"string"} {"name":"a.b.c","type":"document"}
Dengan konfigurasi ini, nilai bidang nested `a.b.c` dari sumber ditulis ke bidang tujuan `c`. Setelah tugas sinkronisasi dijalankan, data yang ditulis ke tujuan adalah this is value.
Tambahkan sumber data
Sebelum mengembangkan tugas sinkronisasi di DataWorks, Anda harus menambahkan sumber data yang diperlukan ke DataWorks dengan mengikuti petunjuk di Manajemen Sumber Data. Anda dapat melihat deskripsi parameter di Konsol DataWorks untuk memahami arti parameter saat menambahkan sumber data.
Kembangkan tugas sinkronisasi data
Untuk informasi tentang titik masuk dan prosedur konfigurasi tugas sinkronisasi, lihat panduan konfigurasi berikut.
Konfigurasi tugas sinkronisasi offline tabel tunggal
Untuk prosedurnya, lihat Konfigurasi tugas di Antarmuka tanpa kode dan Konfigurasi tugas di editor kode.
Untuk semua parameter dan contoh skrip konfigurasi tugas di editor kode, lihat Lampiran: Contoh skrip MongoDB dan deskripsi parameter.
Konfigurasi tugas sinkronisasi real-time tabel tunggal
Untuk prosedurnya, lihat Konfigurasi tugas sinkronisasi real-time di Data Integration dan Konfigurasi tugas sinkronisasi real-time di DataStudio.
Konfigurasi tugas sinkronisasi seluruh database
Untuk informasi tentang cara mengonfigurasi tugas sinkronisasi seluruh database, seperti sinkronisasi offline seluruh database, sinkronisasi real-time penuh dan inkremental seluruh database, dan sinkronisasi real-time seluruh database dengan sharding, lihat Tugas sinkronisasi offline seluruh database dan Konfigurasi tugas sinkronisasi real-time seluruh database.
Praktik terbaik
FAQ
Lampiran: Contoh skrip MongoDB dan deskripsi parameter
Konfigurasi tugas sinkronisasi batch menggunakan editor kode
Jika Anda ingin mengonfigurasi tugas sinkronisasi batch menggunakan editor kode, Anda harus mengonfigurasi parameter terkait dalam skrip berdasarkan persyaratan format skrip terpadu. Untuk informasi lebih lanjut, lihat Konfigurasi tugas di editor kode. Informasi berikut menjelaskan parameter yang harus Anda konfigurasi untuk sumber data saat mengonfigurasi tugas sinkronisasi batch menggunakan editor kode.
Contoh skrip Reader
Kode berikut menunjukkan cara mengonfigurasi pekerjaan untuk mengekstrak data dari MongoDB ke tujuan lokal. Untuk informasi lebih lanjut tentang parameter, lihat tabel berikut.
Saat menjalankan kode, hapus komentar.
Ekstraksi elemen tertentu dari array tidak didukung.
{
"type":"job",
"version":"2.0",// Nomor versi.
"steps":[
{
"category": "reader",
"name": "Reader",
"parameter": {
"datasource": "datasourceName", // Nama sumber data.
"collectionName": "tag_data", // Nama koleksi.
"query": "", // Filter kueri data.
"column": [
{
"name": "unique_id", // Nama bidang.
"type": "string" // Tipe bidang.
},
{
"name": "sid",
"type": "string"
},
{
"name": "user_id",
"type": "string"
},
{
"name": "auction_id",
"type": "string"
},
{
"name": "content_type",
"type": "string"
},
{
"name": "pool_type",
"type": "string"
},
{
"name": "frontcat_id",
"type": "array",
"splitter": ""
},
{
"name": "categoryid",
"type": "array",
"splitter": ""
},
{
"name": "gmt_create",
"type": "string"
},
{
"name": "taglist",
"type": "array",
"splitter": " "
},
{
"name": "property",
"type": "string"
},
{
"name": "scorea",
"type": "int"
},
{
"name": "scoreb",
"type": "int"
},
{
"name": "scorec",
"type": "int"
},
{
"name": "a.b",
"type": "document.int"
},
{
"name": "a.b.c",
"type": "document.array",
"splitter": " "
}
]
},
"stepType": "mongodb"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"common": {
"column": {
"timeZone": "GMT+0" // Zona waktu.
}
},
"errorLimit":{
"record":"0"// Jumlah catatan error.
},
"speed":{
"throttle":true,// Jika throttle diatur ke false, parameter mbps tidak berlaku, yang berarti laju data tidak dibatasi. Jika throttle diatur ke true, laju data dibatasi.
"concurrent":1, // Jumlah pekerjaan konkuren.
"mbps":"12"// Batas laju data. 1 mbps setara dengan 1 MB/s.
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}Parameter | Deskripsi |
datasource | Nama sumber data. Anda dapat menambahkan sumber data di editor kode. Nilai parameter ini harus sama dengan nama sumber data yang ditambahkan. |
collectionName | Nama koleksi MongoDB. |
hint | MongoDB mendukung parameter hint, yang memaksa pengoptimal kueri untuk menggunakan indeks tertentu dalam menyelesaikan kueri. Dalam beberapa kasus, ini dapat meningkatkan performa kueri. Untuk informasi lebih lanjut, lihat parameter hint. Berikut adalah contohnya: |
column | Nama bidang dokumen di MongoDB. Konfigurasikan parameter ini sebagai array untuk merepresentasikan beberapa bidang di MongoDB.
|
batchSize | Jumlah catatan yang diambil dalam satu batch. Parameter ini opsional. Nilai default adalah |
cursorTimeoutInMs | Periode timeout kursor. Parameter ini opsional. Nilai default adalah Catatan
|
query | Anda dapat menggunakan parameter ini untuk membatasi rentang data MongoDB yang dikembalikan. Hanya format tanggal berikut yang didukung. Penggunaan format timestamp secara langsung tidak didukung. Catatan
Berikut adalah contoh umum parameter query:
Catatan Untuk informasi lebih lanjut tentang sintaks kueri MongoDB, lihat dokumentasi resmi MongoDB. |
splitFactor | Jika terjadi kesenjangan data yang signifikan, pertimbangkan untuk meningkatkan splitFactor guna mencapai granularitas chunk yang lebih kecil tanpa meningkatkan konkurensi. |
Contoh skrip Writer
Kode berikut menunjukkan cara mengonfigurasi pekerjaan sinkronisasi data untuk menulis data ke MongoDB. Untuk informasi lebih lanjut tentang parameter, lihat tabel berikut.
{
"type": "job",
"version": "2.0",// Nomor versi.
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "mongodb",// Nama plug-in.
"parameter": {
"datasource": "",// Nama sumber data.
"column": [
{
"name": "_id",// Nama kolom.
"type": "ObjectId"// Tipe data. Jika replaceKey diatur ke _id, tipe harus diatur ke ObjectId. Jika Anda mengatur tipe ke string, penggantian gagal.
},
{
"name": "age",
"type": "int"
},
{
"name": "id",
"type": "long"
},
{
"name": "wealth",
"type": "double"
},
{
"name": "hobby",
"type": "array",
"splitter": " "
},
{
"name": "valid",
"type": "boolean"
},
{
"name": "date_of_join",
"format": "yyyy-MM-dd HH:mm:ss",
"type": "date"
}
],
"writeMode": {// Mode penulisan.
"isReplace": "true",
"replaceKey": "_id"
},
"collectionName": "datax_test"// Nama koneksi.
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {// Jumlah catatan error.
"record": "0"
},
"speed": {
"throttle": true,// Jika throttle diatur ke false, parameter mbps tidak berlaku, yang berarti laju data tidak dibatasi. Jika throttle diatur ke true, laju data dibatasi.
"concurrent": 1,// Jumlah pekerjaan konkuren.
"mbps": "1"// Batas laju data. 1 mbps setara dengan 1 MB/s.
},
"jvmOption": "-Xms1024m -Xmx1024m"
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}Parameter skrip Writer
Parameter | Deskripsi | Wajib | Nilai default |
datasource | Nama sumber data. Anda dapat menambahkan sumber data di editor kode. Nilai parameter ini harus sama dengan nama sumber data yang ditambahkan. | Ya | Tidak ada |
collectionName | Nama koleksi MongoDB. | Ya | Tidak ada |
column | Nama bidang dokumen di MongoDB. Konfigurasikan parameter ini sebagai array untuk merepresentasikan beberapa bidang di MongoDB.
| Ya | Tidak ada |
writeMode | Menentukan apakah data akan ditimpa selama transmisi. Ini mencakup isReplace dan replaceKey:
Catatan Jika isReplace diatur ke true dan bidang selain Hal ini karena data yang ditulis mengandung ketidaksesuaian antara | Tidak | Tidak ada |
preSql | Merupakan operasi awal sebelum menulis data ke MongoDB, seperti membersihkan data historis. Jika preSql kosong, tidak ada operasi awal yang dikonfigurasi. Saat mengonfigurasi preSql, pastikan sesuai dengan persyaratan sintaks JSON. | Tidak | Tidak ada |
Saat pekerjaan Data Integration dijalankan, preSql yang dikonfigurasi dieksekusi terlebih dahulu. Fase penulisan data aktual baru dimulai setelah preSql selesai. Parameter preSql sendiri tidak memengaruhi konten data yang ditulis. Parameter preSql memungkinkan Data Integration mendukung eksekusi idempoten. Misalnya, preSql Anda dapat membersihkan data historis sebelum setiap eksekusi tugas sesuai aturan bisnis Anda. Dalam hal ini, jika tugas gagal, Anda hanya perlu menjalankan ulang pekerjaan Data Integration.
Persyaratan format untuk preSql adalah sebagai berikut:
Anda harus mengonfigurasi field type untuk menunjukkan kategori operasi awal. Nilai yang didukung adalah drop dan remove. Misalnya,
"preSql":{"type":"remove"}.drop: Menghapus koleksi dan datanya. Koleksi yang ditentukan oleh parameter collectionName adalah koleksi yang akan dihapus.
remove: Menghapus data berdasarkan kondisi.
json: Anda dapat menggunakan JSON untuk menentukan kondisi penghapusan data. Misalnya,
"preSql":{"type":"remove", "json":"{'operationTime':{'$gte':ISODate('${last_day}T00:00:00.424+0800')}}"}. Di sini,${last_day}adalah parameter penjadwalan DataWorks dalam format$[yyyy-mm-dd]. Anda dapat menggunakan operator kondisional MongoDB yang didukung (seperti $gt, $lt, $gte, dan $lte), operator logika (seperti and dan or), atau fungsi (seperti max, min, sum, avg, dan ISODate) sesuai kebutuhan.Data Integration mengeksekusi operasi data Anda untuk menghapus kueri menggunakan API MongoDB standar berikut.
query=(BasicDBObject) com.mongodb.util.JSON.parse(json); col.deleteMany(query);CatatanUntuk menghapus data secara kondisional, kami menyarankan menggunakan format konfigurasi JSON.
item: Anda dapat mengonfigurasi nama kolom (name), kondisi (condition), dan nilai kolom (value) untuk penyaringan data dalam item. Misalnya,
"preSql":{"type":"remove","item":[{"name":"pv","value":"100","condition":"$gt"},{"name":"pid","value":"10"}]}.Data Integration membuat kondisi kueri berdasarkan kondisi item yang Anda konfigurasi, lalu mengeksekusi penghapusan menggunakan API MongoDB standar. Misalnya,
col.deleteMany(query);.
Jika preSql tidak dikenali, tidak ada operasi penghapusan awal yang dilakukan.