DataWorks menyediakan sinkronisasi data dua arah dengan MongoDB. Topik ini menjelaskan kemampuan sinkronisasi data yang disediakan DataWorks untuk MongoDB.
Versi yang didukung
DataWorks mendukung MongoDB versi 4.x, 5.x, 6.x, 7.x, dan 8.0.
Catatan penggunaan
Lakukan koneksi ke database MongoDB menggunakan akun yang dibuat khusus untuk database tersebut. Jika Anda menggunakan sumber data ApsaraDB for MongoDB, akun root dibuat secara default. Untuk alasan keamanan, kami menyarankan agar Anda tidak menggunakan akun root saat menambahkan sumber data MongoDB.
Jika Anda menggunakan kluster sharded MongoDB, Anda harus mengonfigurasi alamat node mongos untuk sumber data tersebut. Jangan mengonfigurasi alamat node mongod/shard. Jika tidak, task sinkronisasi mungkin hanya mengkueri data dari shard tertentu, bukan seluruh dataset. Untuk informasi lebih lanjut tentang mongos dan mongod, lihat dokumentasi mongos dan mongod.
Kluster primary/secondary MongoDB tidak didukung.
Jika concurrency lebih besar dari 1, semua field
_iddalam koleksi yang dikonfigurasi untuk task sinkronisasi harus memiliki tipe data yang sama. Misalnya, semua field_idharus bertipe string atau ObjectId. Jika tidak, sebagian data mungkin tidak tersinkronisasi.CatatanSaat concurrency lebih besar dari 1, task dibagi berdasarkan field
_id. Oleh karena itu, tipe data campuran tidak didukung untuk field_iddalam skenario ini. Jika field_idberisi beberapa tipe data, atur concurrency menjadi 1 untuk sinkronisasi data. Untuk melakukannya, jangan konfigurasi parameter splitFactor, atau atur parameter splitFactor ke 1.
Data Integration tidak mendukung tipe array. Namun, MongoDB mendukung tipe array dan menyediakan fitur pengindeksan yang kuat. Anda dapat mengonfigurasi parameter tertentu untuk mengonversi string menjadi array MongoDB. Setelah konversi, Anda dapat menulis data ke MongoDB secara paralel.
Database MongoDB yang dikelola sendiri tidak mendukung akses jaringan publik. Database tersebut hanya dapat diakses melalui jaringan internal Alibaba Cloud.
Kluster MongoDB yang dideploy menggunakan Docker tidak didukung.
Data Integration tidak mendukung pembacaan data dari kolom tertentu menggunakan parameter query.
Dalam task sinkronisasi batch, jika Data Integration tidak dapat mengambil struktur field dari MongoDB, Data Integration akan menghasilkan pemetaan field untuk enam field secara default. Nama field tersebut adalah
col1,col2,col3,col4,col5, dancol6.Selama eksekusi task, perintah
splitVectordigunakan secara default untuk melakukan sharding pada task. Beberapa versi MongoDB tidak mendukung perintahsplitVector, yang dapat menyebabkan errorno such cmd splitVector. Untuk mencegah error ini, klik ikon
pada konfigurasi task, beralih ke editor kode, lalu tambahkan parameter berikut ke konfigurasi parameter MongoDB untuk mencegah penggunaan splitVector."useSplitVector" : false
Tipe field yang didukung
Tipe data MongoDB yang didukung oleh MongoDB Reader
Data Integration mendukung sebagian besar, namun tidak semua, tipe data MongoDB. Pastikan tipe data Anda didukung.
Saat Data Integration membaca tipe data yang didukung, operasi berikut dilakukan:
Untuk tipe data primitif, Data Integration secara otomatis membaca data dari path yang sesuai berdasarkan nama field yang dikonfigurasi dalam parameter column. Untuk informasi lebih lanjut, lihat Lampiran: Contoh skrip dan deskripsi parameter untuk MongoDB. Data Integration juga secara otomatis mengonversi tipe data tersebut. Anda tidak perlu menentukan properti type untuk kolom tersebut.
Type
Baca batch (MongoDB Reader)
Deskripsi
ObjectId
Didukung
Tipe object ID.
Double
Didukung
Tipe bilangan titik mengambang 64-bit.
Integer 32-bit
Didukung
Integer 32-bit.
Integer 64-bit
Didukung
Integer 64-bit.
Decimal128
Didukung
Tipe Decimal128.
CatatanJika suatu field dikonfigurasi sebagai tipe nested atau combine, field tersebut diproses sebagai objek selama serialisasi JSON. Anda harus menambahkan parameter
decimal128OutputTypedan mengaturnya kebigDecimaluntuk mengeluarkan data sebagai desimal.String
Didukung
Tipe string.
Boolean
Didukung
Tipe Boolean.
Timestamp
Didukung
Tipe timestamp.
CatatanBsonTimestamp menyimpan stempel waktu. Anda tidak perlu mempertimbangkan dampak zona waktu. Untuk informasi lebih lanjut, lihat Masalah zona waktu di MongoDB.
Date
Dukungan
Tipe tanggal.
Untuk beberapa tipe data kompleks, Anda dapat mengonfigurasi properti type untuk kolom guna melakukan pemrosesan kustom.
Type
Baca batch (MongoDB Reader)
Deskripsi
Document
Didukung
Tipe dokumen tersemat.
Jika properti type tidak dikonfigurasi, Document langsung dikonversi menggunakan serialisasi JSON.
Jika properti type diatur ke
document, field tersebut merupakan tipe nested. MongoDB Reader membaca properti Document berdasarkan path. Untuk contoh lengkap, lihat Contoh 2: Mengurai secara rekursif Document bersarang multi-level di bawah.
Array
Didukung
Tipe array.
Jika type diatur ke
array.jsonatauarrays, data langsung diproses menggunakan serialisasi JSON.Jika type diatur ke
arrayataudocument.array, elemen-elemennya digabung menjadi string. Pemisah, yang ditentukan dalam properti splitter kolom, adalah koma (,) secara default.
PentingData Integration tidak mendukung tipe array. Namun, MongoDB mendukung tipe array dan menyediakan fitur pengindeksan yang kuat. Anda dapat mengonfigurasi parameter tertentu untuk mengonversi string menjadi array MongoDB. Setelah konversi, Anda dapat menulis data ke MongoDB secara paralel.
Tipe data khusus Data Integration: combine
Type | Baca batch (MongoDB Reader) | Deskripsi |
Combine | Didukung | Tipe data kustom dalam Data Integration. Jika type diatur ke |
Pemetaan tipe data MongoDB Reader
Tabel berikut mencantumkan pemetaan antara tipe data MongoDB dan tipe data Data Integration untuk MongoDB Reader.
Kategori tipe yang dikonversi | Tipe data MongoDB |
LONG | INT, LONG, document.INT, dan document.LONG |
DOUBLE | DOUBLE dan document.DOUBLE |
STRING | STRING, ARRAY, document.STRING, document.ARRAY, dan COMBINE |
DATE | DATE dan document.DATE |
BOOLEAN | BOOL dan document.BOOL |
BYTES | BYTES dan document.BYTES |
Pemetaan tipe data MongoDB Writer
Kategori tipe | Tipe data MongoDB |
Integer | INT dan LONG |
Bilangan titik mengambang | DOUBLE |
String | STRING dan ARRAY |
Tanggal dan waktu | DATE |
Boolean | BOOL |
Biner | BYTES |
Contoh 1: Menggunakan tipe combine
Tipe data combine dari plugin MongoDB Reader memungkinkan Anda menggabungkan beberapa field dalam dokumen MongoDB menjadi satu string JSON. Misalnya, asumsikan Anda ingin mengimpor field dari tiga dokumen MongoDB ke MaxCompute. Dalam contoh berikut, field direpresentasikan oleh kunci, bukan pasangan kunci-nilai. Field a dan b umum di ketiga dokumen tersebut, sedangkan x_n adalah field variabel.
doc1: a b x_1 x_2doc2: a b x_2 x_3 x_4doc3: a b x_5
Dalam file konfigurasi, Anda harus secara eksplisit menentukan field yang memerlukan pemetaan satu-ke-satu. Untuk field yang ingin digabung, berikan nama baru yang berbeda dari nama field apa pun yang sudah ada dalam dokumen dan atur type-nya ke COMBINE. Kode berikut memberikan contohnya.
"column": [
{
"name": "a",
"type": "string",
},
{
"name": "b",
"type": "string",
},
{
"name": "doc",
"type": "combine",
}
]Tabel berikut menunjukkan output akhir di MaxCompute.
odps_column1 | odps_column2 | odps_column3 |
a | b | {x_1,x_2} |
a | b | {x_2,x_3,x_4} |
a | b | {x_5} |
Setelah Anda menggunakan tipe combine untuk menggabungkan beberapa field dalam dokumen MongoDB, field umum secara otomatis dihapus saat output dipetakan ke MaxCompute. Hanya field unik dokumen yang dipertahankan.
Misalnya, a dan b adalah field umum di semua dokumen. Setelah field dalam dokumen doc1: a b x_1 x_2 digabung menggunakan tipe combine, output-nya adalah {a,b,x_1,x_2}. Saat hasil ini dipetakan ke MaxCompute, field umum a dan b dihapus. Output akhirnya adalah {x_1,x_2}.
Contoh 2: Mengurai secara rekursif Document bersarang multi-level
Jika sebuah dokumen di MongoDB memiliki beberapa level bersarang, Anda dapat mengonfigurasi tipe document untuk menguraikannya secara rekursif. Kode berikut memberikan contohnya.
Data sumber di MongoDB:
{ "name": "name1", "a": { "b": { "c": "this is value" } } }Konfigurasi kolom MongoDB:
{"name":"_id","type":"string"} {"name":"name","type":"string"} {"name":"a.b.c","type":"document"}
Dengan konfigurasi di atas, nilai field sumber bersarang a.b.c ditulis ke field tujuan c. Setelah task sinkronisasi dijalankan, data yang ditulis ke tujuan adalah this is value.
Tambahkan sumber data
Sebelum mengembangkan task sinkronisasi di DataWorks, Anda harus menambahkan sumber data yang diperlukan ke DataWorks dengan mengikuti petunjuk di Manajemen sumber data. Anda dapat melihat infotips parameter di Konsol DataWorks untuk memahami arti parameter saat menambahkan sumber data.
Kembangkan task sinkronisasi data
Untuk informasi tentang titik masuk dan prosedur konfigurasi task sinkronisasi, lihat panduan konfigurasi berikut.
Konfigurasikan task sinkronisasi batch untuk satu tabel
Untuk informasi lebih lanjut tentang prosedurnya, lihat Konfigurasikan task di Antarmuka tanpa kode dan Konfigurasikan task di editor kode.
Untuk informasi lebih lanjut tentang semua parameter dan contoh skrip untuk editor kode, lihat Lampiran: Contoh skrip dan deskripsi parameter untuk MongoDB.
Konfigurasikan task sinkronisasi real-time untuk satu tabel
Untuk informasi lebih lanjut tentang prosedurnya, lihat Konfigurasikan task sinkronisasi real-time di Data Integration dan Konfigurasikan task sinkronisasi real-time di DataStudio.
Konfigurasikan task sinkronisasi untuk seluruh database
Anda dapat mengonfigurasi task untuk sinkronisasi batch, sinkronisasi real-time penuh dan inkremental, atau sinkronisasi real-time dari database sharded untuk seluruh database. Untuk informasi lebih lanjut, lihat Task sinkronisasi batch untuk seluruh database dan Konfigurasikan task sinkronisasi real-time untuk seluruh database.
Praktik terbaik
FAQ
Lampiran: Contoh skrip dan deskripsi parameter untuk MongoDB
Konfigurasikan task sinkronisasi batch menggunakan editor kode
Jika Anda ingin mengonfigurasi task sinkronisasi batch menggunakan editor kode, Anda harus mengonfigurasi parameter terkait dalam skrip berdasarkan persyaratan format skrip terpadu. Untuk informasi lebih lanjut, lihat Konfigurasikan task di editor kode. Informasi berikut menjelaskan parameter yang harus Anda konfigurasi untuk sumber data saat mengonfigurasi task sinkronisasi batch menggunakan editor kode.
Contoh skrip Reader
Skrip berikut adalah contoh job yang dikonfigurasi untuk mengekstrak data dari MongoDB ke lingkungan lokal. Untuk informasi lebih lanjut tentang parameter, lihat deskripsi parameter berikut.
Sebelum menjalankan kode, hapus komentar.
Anda tidak dapat mengekstrak elemen tertentu dari array.
{
"type":"job",
"version":"2.0",// Nomor versi.
"steps":[
{
"category": "reader",
"name": "Reader",
"parameter": {
"datasource": "datasourceName", // Nama sumber data.
"collectionName": "tag_data", // Nama koleksi.
"query": "", // Kueri penyaringan data.
"column": [
{
"name": "unique_id", // Nama field.
"type": "string" // Tipe field.
},
{
"name": "sid",
"type": "string"
},
{
"name": "user_id",
"type": "string"
},
{
"name": "auction_id",
"type": "string"
},
{
"name": "content_type",
"type": "string"
},
{
"name": "pool_type",
"type": "string"
},
{
"name": "frontcat_id",
"type": "array",
"splitter": ""
},
{
"name": "categoryid",
"type": "array",
"splitter": ""
},
{
"name": "gmt_create",
"type": "string"
},
{
"name": "taglist",
"type": "array",
"splitter": " "
},
{
"name": "property",
"type": "string"
},
{
"name": "scorea",
"type": "int"
},
{
"name": "scoreb",
"type": "int"
},
{
"name": "scorec",
"type": "int"
},
{
"name": "a.b",
"type": "document.int"
},
{
"name": "a.b.c",
"type": "document.array",
"splitter": " "
}
]
},
"stepType": "mongodb"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"common": {
"column": {
"timeZone": "GMT+0" // Zona waktu.
}
},
"errorLimit":{
"record":"0"// Jumlah catatan error.
},
"speed":{
"throttle":true,// Menentukan apakah pengendalian aliran diaktifkan. Jika parameter ini diatur ke false, pengendalian aliran dinonaktifkan dan parameter mbps tidak berlaku. Jika diatur ke true, pengendalian aliran diaktifkan.
"concurrent":1, // Jumlah job konkuren.
"mbps":"12"// Laju pengendalian aliran. 1 mbps = 1 MB/s.
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}Parameter | Deskripsi |
datasource | Nama sumber data. Di editor kode, nilai parameter ini harus sama dengan nama sumber data yang ditambahkan. |
collectionName | Nama koleksi MongoDB. |
hint | MongoDB mendukung parameter hint, yang memaksa pengoptimal kueri untuk menggunakan indeks tertentu guna menyelesaikan kueri. Dalam beberapa kasus, hal ini dapat meningkatkan performa kueri. Untuk informasi lebih lanjut, lihat parameter hint. Kode berikut memberikan contohnya: |
column | Nama field dokumen di MongoDB. Konfigurasikan sebagai array untuk merepresentasikan beberapa field.
|
batchSize | Jumlah catatan yang diambil dalam satu batch. Parameter ini opsional. Nilai default: |
cursorTimeoutInMs | Periode timeout kursor. Parameter ini opsional. Nilai default: Catatan
|
query | Anda dapat menggunakan parameter ini untuk menyaring data MongoDB yang dikembalikan. Hanya format waktu berikut yang didukung. Format Stempel waktu UNIX tidak didukung secara langsung. Catatan
Kode berikut memberikan contoh umum untuk parameter query:
Catatan Untuk informasi lebih lanjut tentang sintaks kueri MongoDB, lihat dokumentasi resmi MongoDB. |
splitFactor | Jika terjadi kesenjangan data yang parah, pertimbangkan untuk meningkatkan splitFactor guna mencapai sharding yang lebih granular tanpa meningkatkan concurrency. |
Contoh skrip Writer
Skrip berikut adalah contoh job sinkronisasi data yang dikonfigurasi untuk menulis data ke MongoDB. Untuk informasi lebih lanjut tentang parameter, lihat deskripsi parameter berikut.
{
"type": "job",
"version": "2.0",// Nomor versi.
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "mongodb",// Nama plugin.
"parameter": {
"datasource": "",// Nama sumber data.
"column": [
{
"name": "_id",// Nama kolom.
"type": "ObjectId"// Tipe data. Jika replaceKey adalah _id, Anda harus mengatur type ke ObjectId. Jika Anda mengatur type ke string, penggantian gagal.
},
{
"name": "age",
"type": "int"
},
{
"name": "id",
"type": "long"
},
{
"name": "wealth",
"type": "double"
},
{
"name": "hobby",
"type": "array",
"splitter": " "
},
{
"name": "valid",
"type": "boolean"
},
{
"name": "date_of_join",
"format": "yyyy-MM-dd HH:mm:ss",
"type": "date"
}
],
"writeMode": {// Mode penulisan.
"isReplace": "true",
"replaceKey": "_id"
},
"collectionName": "datax_test"// Nama koleksi.
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {// Jumlah catatan error.
"record": "0"
},
"speed": {
"throttle": true,// Menentukan apakah pengendalian aliran diaktifkan. Jika parameter ini diatur ke false, pengendalian aliran dinonaktifkan dan parameter mbps tidak berlaku. Jika diatur ke true, pengendalian aliran diaktifkan.
"concurrent": 1,// Jumlah job konkuren.
"mbps": "1"// Laju pengendalian aliran. 1 mbps = 1 MB/s.
},
"jvmOption": "-Xms1024m -Xmx1024m"
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}Parameter skrip Writer
Parameter | Deskripsi | Wajib | Nilai default |
datasource | Nama sumber data. Di editor kode, nilai parameter ini harus sama dengan nama sumber data yang ditambahkan. | Ya | Tidak ada |
collectionName | Nama koleksi MongoDB. | Ya | Tidak ada |
column | Nama field dokumen di MongoDB. Konfigurasikan sebagai array untuk merepresentasikan beberapa field.
| Ya | Tidak ada |
writeMode | Menentukan apakah data akan ditimpa selama transmisi. Parameter ini mencakup isReplace dan replaceKey:
Catatan Jika isReplace diatur ke true dan field selain field Hal ini karena data yang akan ditulis berisi catatan di mana | Tidak | Tidak ada |
preSql | Operasi awal yang dieksekusi sebelum menulis data ke MongoDB, seperti membersihkan data historis. Jika preSql kosong, tidak ada operasi awal yang dikonfigurasi. Saat Anda mengonfigurasi preSql, pastikan nilainya mematuhi sintaks JSON. | Tidak | Tidak ada |
Saat Anda menjalankan job Data Integration, preSql yang dikonfigurasi dieksekusi terlebih dahulu. Fase penulisan data aktual baru dimulai setelah eksekusi preSql selesai. Parameter preSql tidak memengaruhi konten data yang ditulis. Parameter preSql menyediakan eksekusi idempoten untuk Data Integration. Misalnya, preSql Anda dapat digunakan untuk membersihkan data historis sebelum setiap eksekusi task berdasarkan aturan bisnis Anda. Dalam kasus ini, jika task gagal, Anda cukup menjalankan ulang job Data Integration.
Persyaratan format untuk preSql adalah sebagai berikut:
Anda harus mengonfigurasi field type untuk menentukan jenis operasi awal. Nilai yang didukung adalah drop dan remove. Contoh:
"preSql":{"type":"remove"}.drop: Menghapus koleksi dan datanya. Koleksi yang akan dihapus ditentukan oleh parameter collectionName.
remove: Menghapus data berdasarkan kondisi.
json: Anda dapat menggunakan objek JSON untuk menentukan kondisi penghapusan data. Contoh:
"preSql":{"type":"remove", "json":"{'operationTime':{'$gte':ISODate('${last_day}T00:00:00.424+0800')}}"}. Dalam contoh ini,${last_day}adalah parameter penjadwalan DataWorks dalam format$[yyyy-mm-dd]. Anda juga dapat menggunakan operator kondisional MongoDB yang didukung (seperti $gt, $lt, $gte, dan $lte), operator logika (seperti and dan or), atau fungsi (seperti max, min, sum, avg, dan ISODate) sesuai kebutuhan.Data Integration mengeksekusi kueri penghapusan data menggunakan API MongoDB standar berikut.
query=(BasicDBObject) com.mongodb.util.JSON.parse(json); col.deleteMany(query);CatatanUntuk menghapus data berdasarkan kondisi, kami menyarankan agar Anda menggunakan konfigurasi JSON.
item: Anda dapat mengonfigurasi nama kolom (name), kondisi (condition), dan nilai kolom (value) untuk penyaringan data dalam item. Contoh:
"preSql":{"type":"remove","item":[{"name":"pv","value":"100","condition":"$gt"},{"name":"pid","value":"10"}]}.Data Integration membuat kondisi kueri berdasarkan kondisi item yang dikonfigurasi, lalu mengeksekusi penghapusan menggunakan API MongoDB standar. Misalnya:
col.deleteMany(query);.
Jika preSql tidak dikenali, tidak ada operasi pra-penghapusan yang dilakukan.