All Products
Search
Document Center

DataWorks:Baca data dari sumber data MongoDB melalui sinkronisasi batch

Last Updated:Feb 27, 2026

DataWorks Data Integration menyediakan plugin MongoDB Reader untuk membaca data dari MongoDB dan menyinkronkannya ke sumber data lain. Topik ini memberikan contoh cara menggunakan Data Integration untuk melakukan sinkronisasi offline data dari MongoDB ke MaxCompute.

Informasi latar belakang

Dalam tutorial ini, sumber data adalah MongoDB dan tujuan sinkronisasi adalah MaxCompute. Sebelum memulai, siapkan data MongoDB untuk disinkronkan dan buat tabel MaxCompute untuk menyimpan data yang telah disinkronkan.

Prasyarat

Pastikan prasyarat berikut terpenuhi.

  • Anda telah mengaktifkan DataWorks dan membuat sumber data MaxCompute.

  • Diperlukan grup sumber daya eksklusif untuk Data Integration guna menjalankan task offline. Anda harus membeli dan mengonfigurasi grup sumber daya ini sebelum melanjutkan. Untuk informasi selengkapnya, lihat Use an exclusive resource group for Data Integration.

    Catatan

    Anda juga dapat menggunakan grup sumber daya General-purpose. Untuk informasi selengkapnya, lihat Use a Serverless resource group.

Siapkan tabel data sampel

Siapkan koleksi data MongoDB dan tabel MaxCompute untuk sinkronisasi data offline.

  1. Siapkan koleksi data MongoDB.

    Tutorial ini menggunakan ApsaraDB for MongoDB sebagai contoh. Kode berikut menunjukkan cara menyiapkan koleksi data MongoDB.

    1. Buat koleksi data bernama di_mongodb_conf_test.

      db.createCollection('di_mongodb_conf_test')
    2. Masukkan data sampel untuk tutorial ini ke dalam koleksi data.

      db.di_mongodb_conf_test.insertOne({
          'col_string':'mock string value',
          'col_int32':NumberInt("1"),
          'col_int32_min':NumberInt("-2147483648"),
          'col_int32_max':NumberInt("2147483647"),
          'col_int64':NumberLong("1234567890123456"),
          'col_int64_min':NumberLong("-9223372036854775807"),
          'col_int64_max':NumberLong("9223372036854775807"),
          'col_decimal':NumberDecimal("9999999.4999999999"),
          'col_double':9999999.99,
          'col_boolean':true,
          'col_timestamp':ISODate(),
          'col_date':new Date(),
          'col_array_to_json':['a','b'],
          'col_array_to_join':['a','b'],
          'col_doc':{
              'key_string':'mock string value',
              'key_int32':NumberInt("1"),
              'key_int32_min':NumberInt("-2147483648"),
              'key_int32_max':NumberInt("2147483647"),
              'key_int64':NumberLong("1234567890123456"),
              'key_int64_min':NumberLong("-9223372036854775807"),
              'key_int64_max':NumberLong("9223372036854775807"),
              'key_decimal':NumberDecimal("9999999.4999999999"),
              'key_double':9999999.99,
              'key_boolean':true,
              'key_timestamp':ISODate(),
              'key_date':new Date(),
              'key_array_to_json':['a','b'],
              'key_array_to_join':['a','b'],
          },
          'col_extra_1':'this is extra 1',
          'col_extra_2':'this is extra 2',
      })
    3. Kueri data yang dimasukkan ke MongoDB.

      db.getCollection("di_mongodb_conf_test").find({})

      Hasil kueri ditampilkan pada gambar berikut:mongodb

  2. Siapkan tabel MaxCompute.

    1. Buat tabel partisi bernama di_mongodb_conf_test. Bidang partisi adalah pt.

      CREATE TABLE IF NOT EXISTS di_mongodb_conf_test
      (
        `id`                 STRING
        ,`col_string`        STRING
        ,`col_int32`         INT
        ,`col_int32_min`     INT
        ,`col_int32_max`     INT
        ,`col_int64`         BIGINT
        ,`col_int64_min`     BIGINT
        ,`col_int64_max`     BIGINT
        ,`col_decimal`       DECIMAL(38,18)
        ,`col_double`        DOUBLE
        ,`col_boolean`       BOOLEAN
        ,`col_timestamp`     TIMESTAMP
        ,`col_date`          DATE
        ,`col_array_to_json` STRING
        ,`col_array_to_join` STRING
        ,`key_string`        STRING
        ,`key_int32`         INT
        ,`key_int32_min`     INT
        ,`key_int32_max`     INT
        ,`key_int64`         BIGINT
        ,`key_int64_min`     BIGINT
        ,`key_int64_max`     BIGINT
        ,`key_decimal`       DECIMAL(38,18)
        ,`key_double`        DOUBLE
        ,`key_boolean`       BOOLEAN
        ,`key_timestamp`     TIMESTAMP
        ,`key_date`          DATE
        ,`key_array_to_json` STRING
        ,`key_array_to_join` STRING
        ,`col_doc`           STRING
        ,`col_combine`       STRING
      )
      PARTITIONED BY
      (
        pt                   STRING
      )
      LIFECYCLE 36500
      ;
    2. Tambahkan partisi dengan nilai 20230202.

      alter table di_mongodb_conf_test add if not exists partition (pt='20230202');
    3. Verifikasi bahwa tabel partisi telah dibuat.

      SELECT*FROM di_mongodb_conf_test
      WHEREpt='20230202';

Konfigurasikan task offline

Langkah 1: Tambahkan sumber data MongoDB

Tambahkan sumber data MongoDB dan pastikan terdapat konektivitas jaringan antara sumber data tersebut dan grup sumber daya eksklusif untuk Data Integration. Untuk informasi selengkapnya, lihat Configure a MongoDB data source.

Langkah 2: Buat node sync offline dan konfigurasikan task sinkronisasi

Di DataWorks DataStudio, buat node sync offline. Konfigurasikan sumber, tujuan, dan parameter lainnya untuk task tersebut. Langkah-langkah konfigurasi utama dijelaskan di bawah ini. Anda dapat menggunakan nilai default untuk parameter lainnya. Untuk informasi selengkapnya, lihat Configure a task in the codeless UI.

  1. Konfigurasikan konektivitas jaringan.

    Pilih sumber data MongoDB dan MaxCompute yang telah Anda buat pada langkah sebelumnya serta grup sumber daya eksklusif untuk Data Integration yang sesuai. Kemudian, uji konektivitasnya.

  2. Konfigurasikan task: Pilih sumber data.

    Pilih koleksi data MongoDB dan tabel partisi MaxCompute yang telah Anda siapkan.

  3. Konfigurasikan task: Pemetaan bidang.

    Untuk sumber data MongoDB, Same-line Mapping digunakan secara default. Anda juga dapat mengklik ikon 图标 untuk mengedit bidang tabel sumber secara manual. Kode berikut memberikan contoh pengeditan manual.

    {"name":"_id","type":"string"}
    {"name":"col_string","type":"string"}
    {"name":"col_int32","type":"long"}
    {"name":"col_int32_min","type":"long"}
    {"name":"col_int32_max","type":"long"}
    {"name":"col_int64","type":"long"}
    {"name":"col_int64_min","type":"long"}
    {"name":"col_int64_max","type":"long"}
    {"name":"col_decimal","type":"double"}
    {"name":"col_double","type":"double"}
    {"name":"col_boolean","type":"boolean"}
    {"name":"col_timestamp","type":"date"}
    {"name":"col_date","type":"date"}
    {"name":"col_array_to_json","type":"string"}
    {"name":"col_array_to_join","type":"array","splitter":","}
    {"name":"col_doc.key_string","type":"document.string"}
    {"name":"col_doc.key_int32","type":"document.long"}
    {"name":"col_doc.key_int32_min","type":"document.long"}
    {"name":"col_doc.key_int32_max","type":"document.long"}
    {"name":"col_doc.key_int64","type":"document.long"}
    {"name":"col_doc.key_int64_min","type":"document.long"}
    {"name":"col_doc.key_int64_max","type":"document.long"}
    {"name":"col_doc.key_decimal","type":"document.double"}
    {"name":"col_doc.key_double","type":"document.double"}
    {"name":"col_doc.key_boolean","type":"document.boolean"}
    {"name":"col_doc.key_timestamp","type":"document.date"}
    {"name":"col_doc.key_date","type":"document.date"}
    {"name":"col_doc.key_array_to_json","type":"document"}
    {"name":"col_doc.key_array_to_join","type":"document.array","splitter":","}
    {"name":"col_doc","type":"string"}
    {"name":"col_combine","type":"combine"}

    Setelah Anda mengedit bidang secara manual, antarmuka akan menampilkan pemetaan antara bidang sumber dan tujuan.

Langkah 3: Kirim dan publikasikan node sync offline

Jika Anda menggunakan ruang kerja DataWorks mode standar dan ingin menjadwalkan task sync offline agar berjalan secara berkala di lingkungan produksi, Anda harus mengirim dan mempublikasikan node tersebut. Untuk informasi selengkapnya, lihat Publish a task.

Langkah 4: Jalankan node sync offline dan lihat hasilnya

Setelah konfigurasi selesai, jalankan node sync tersebut. Setelah node berhasil dijalankan, Anda dapat melihat data yang telah disinkronkan ke tabel MaxCompute.结果数据

  • Konten bidang col_doc ditampilkan pada gambar berikut.字段信息

  • Konten bidang col_combine ditampilkan pada gambar berikut.字段2

Catatan

Untuk informasi selengkapnya mengenai output tipe decimal, lihat Appendix 2: Output of the Decimal type in a document.

Lampiran 1: Transformasi format data selama sinkronisasi

Konversi data array ke format JSON: col_array_to_json

Data MongoDB mentah

Konfigurasi pemetaan bidang

Output ke MaxCompute

{
    "col_array_to_json":
    [
        "a",
        "b"
    ]
}
{"name":"col_array_to_json","type":"string"}

Saat mengonfigurasi pemetaan bidang, jika Anda mengatur type ke string, task sinkronisasi akan melakukan serialisasi data mentah ke format JSON.

[a, b]

Konversi data array menjadi string gabungan: col_array_to_join

Data MongoDB mentah

Konfigurasi pemetaan bidang

Output ke MaxCompute

{
    "col_array_to_join":
    [
        "a",
        "b"
    ]
}
{"name":"col_array_to_join","type":"array","splitter":","}

Saat mengonfigurasi pemetaan bidang, jika Anda mengatur type ke array, parameter splitter wajib diisi. Task sinkronisasi akan menggabungkan elemen-elemen array data mentah menggunakan splitter dan menghasilkan output berupa string.

a,b

Baca bidang tertentu dari dokumen bersarang

Data MongoDB mentah

Konfigurasi pemetaan bidang

Output ke MaxCompute

{
    "col_doc":
    {
        "key_string": "mock string value"
    }
}
{"name":"col_doc.key_string","type":"document.string"}

name adalah path bidang yang akan disinkronkan dalam document. Task sinkronisasi membaca document berdasarkan path tersebut dan menghasilkan output datanya.

mock string value

Data dokumen sebagai output serialisasi JSON

Data MongoDB mentah

Konfigurasi pemetaan bidang

Output ke MaxCompute

{
    "col_doc":
    {
        "key_string": "mock string value",
        "key_int32": 1
    }
}
{"name":"col_doc","type":"string"}

Saat mengonfigurasi pemetaan bidang, jika Anda mengatur type ke string, task sinkronisasi akan melakukan serialisasi seluruh dokumen col_doc ke format JSON dan menghasilkan output tersebut.

{"key_string":"mockstringvalue","key_int32":1}

Serialisasi sisa bidang dokumen ke format JSON

Data MongoDB mentah

Konfigurasi pemetaan bidang

Output ke MaxCompute

{
    "col_1": "value1",
    "col_2": "value2",
    "col_3": "value3",
    "col_4": "value4"
}
{"name":"col_1","type":"string"}
{"name":"col_2","type":"string"}
{"name":"col_combine","type":"combine"}

Dokumen memiliki empat bidang. Task dikonfigurasi dengan dua bidang bertipe non-combine (`col_1` dan `col_2`). Task sinkronisasi akan melakukan serialisasi semua bidang selain `col_1` dan `col_2` ke format JSON dan menghasilkan output tersebut.

{"col_3":"value3","col_4":"value4"}

Lampiran 2: Output tipe Decimal dalam dokumen

Saat sebuah dokumen diserialisasi ke format JSON, bidang bertipe Decimal128 secara default akan menghasilkan output sebagai berikut:

{
    "key_decimal":
    {
        "finite": true,
        "high": 3471149412795809792,
        "infinite": false,
        "low": 99999994999999999,
        "naN": false,
        "negative": false
    }
}

Untuk menghasilkan output berupa tipe angka, lakukan langkah-langkah berikut:

  1. Saat mengonfigurasi task sinkronisasi offline, Anda dapat beralih ke editor kode dengan mengklik tombol Convert to Script.

  2. Ubah konfigurasi task Reader. Di bagian parameter, tambahkan parameter decimal128OutputType dan atur nilainya ke bigDecimal.decimal

  3. Jalankan ulang task sinkronisasi offline dan lihat hasilnya.

    {
        "key_decimal": "9999999.4999999999"
    }