全部产品
Search
文档中心

DataWorks:Gunakan tugas sinkronisasi batch untuk membaca data dari sumber data MongoDB

更新时间:Jul 06, 2025

DataWorks Data Integration menyediakan MongoDB Reader yang memungkinkan Anda membaca data dari sumber data MongoDB. Anda juga dapat menggunakan writer untuk menulis data ke sumber data lain. Topik ini memberikan contoh cara menggunakan tugas sinkronisasi batch di Data Integration untuk menyinkronkan data dari sumber data MongoDB ke sumber data MaxCompute.

Informasi latar belakang

Dalam topik ini, sumber data MongoDB digunakan sebagai sumber dan sumber data MaxCompute sebagai tujuan. Sebelum melakukan sinkronisasi data, pastikan Anda merujuk pada bagian "Persiapan Awal" untuk menyiapkan data MongoDB dan tabel MaxCompute yang akan digunakan dalam sinkronisasi.

Prasyarat

DataWorks telah diaktifkan dan sumber data MaxCompute telah ditambahkan ke ruang kerja DataWorks.

Persiapan awal

Dalam contoh ini, Anda perlu menyiapkan koleksi data MongoDB dan tabel MaxCompute untuk sinkronisasi data.

  1. Siapkan Koleksi Data MongoDB.

    Dalam contoh ini, ApsaraDB for MongoDB digunakan. Contoh kode berikut menunjukkan cara menyiapkan koleksi data ApsaraDB for MongoDB.

    1. Buat koleksi data bernama di_mongodb_conf_test.

      db.createCollection('di_mongodb_conf_test')
    2. Masukkan data sampel ke dalam koleksi data.

      db.di_mongodb_conf_test.insertOne({
          'col_string':'mock string value',
          'col_int32':NumberInt("1"),
          'col_int32_min':NumberInt("-2147483648"),
          'col_int32_max':NumberInt("2147483647"),
          'col_int64':NumberLong("1234567890123456"),
          'col_int64_min':NumberLong("-9223372036854775807"),
          'col_int64_max':NumberLong("9223372036854775807"),
          'col_decimal':NumberDecimal("9999999.4999999999"),
          'col_double':9999999.99,
          'col_boolean':true,
          'col_timestamp':ISODate(),
          'col_date':new Date(),
          'col_array_to_json':['a','b'],
          'col_array_to_join':['a','b'],
          'col_doc':{
              'key_string':'mock string value',
              'key_int32':NumberInt("1"),
              'key_int32_min':NumberInt("-2147483648"),
              'key_int32_max':NumberInt("2147483647"),
              'key_int64':NumberLong("1234567890123456"),
              'key_int64_min':NumberLong("-9223372036854775807"),
              'key_int64_max':NumberLong("9223372036854775807"),
              'key_decimal':NumberDecimal("9999999.4999999999"),
              'key_double':9999999.99,
              'key_boolean':true,
              'key_timestamp':ISODate(),
              'key_date':new Date(),
              'key_array_to_json':['a','b'],
              'key_array_to_join':['a','b'],
          },
          'col_extra_1':'this is extra 1',
          'col_extra_2':'this is extra 2',
      })
    3. Kueri data yang dimasukkan ke dalam koleksi data MongoDB.

      db.getCollection("di_mongodb_conf_test").find({})

      Gambar berikut menunjukkan hasil kueri.mongodb

  2. Siapkan Tabel MaxCompute.

    1. Buat tabel partisi bernama di_mongodb_conf_test. Bidang partisi adalah pt.

      CREATE TABLE IF NOT EXISTS di_mongodb_conf_test
      (
        `id`                 STRING
        ,`col_string`        STRING
        ,`col_int32`         INT
        ,`col_int32_min`     INT
        ,`col_int32_max`     INT
        ,`col_int64`         BIGINT
        ,`col_int64_min`     BIGINT
        ,`col_int64_max`     BIGINT
        ,`col_decimal`       DECIMAL(38,18)
        ,`col_double`        DOUBLE
        ,`col_boolean`       BOOLEAN
        ,`col_timestamp`     TIMESTAMP
        ,`col_date`          DATE
        ,`col_array_to_json` STRING
        ,`col_array_to_join` STRING
        ,`key_string`        STRING
        ,`key_int32`         INT
        ,`key_int32_min`     INT
        ,`key_int32_max`     INT
        ,`key_int64`         BIGINT
        ,`key_int64_min`     BIGINT
        ,`key_int64_max`     BIGINT
        ,`key_decimal`       DECIMAL(38,18)
        ,`key_double`        DOUBLE
        ,`key_boolean`       BOOLEAN
        ,`key_timestamp`     TIMESTAMP
        ,`key_date`          DATE
        ,`key_array_to_json` STRING
        ,`key_array_to_join` STRING
        ,`col_doc`           STRING
        ,`col_combine`       STRING
      )
      PARTITIONED BY
      (
        pt                   STRING
      )
      LIFECYCLE 36500
      ;
    2. Tambahkan nilai 20230202 ke bidang partisi.

      alter table di_mongodb_conf_test add if not exists partition (pt='20230202');
    3. Periksa apakah tabel partisi dibuat dengan benar.

      SELECT*FROM di_mongodb_conf_test
      WHEREpt='20230202';

Konfigurasikan tugas sinkronisasi batch

Langkah 1: Tambahkan sumber data MongoDB

Tambahkan sumber data MongoDB dan pastikan koneksi jaringan telah dibuat antara sumber data dan grup sumber daya eksklusif untuk Data Integration. Untuk informasi lebih lanjut, lihat Tambahkan Sumber Data MongoDB.

Langkah 2: Buat dan konfigurasikan tugas sinkronisasi batch

Buat tugas sinkronisasi batch di halaman DataStudio di konsol DataWorks dan konfigurasikan item-item seperti sumber dan tujuan untuk tugas sinkronisasi batch. Langkah ini hanya menjelaskan beberapa item yang harus Anda konfigurasikan. Untuk item lainnya, pertahankan nilai default. Untuk informasi lebih lanjut, lihat Konfigurasikan Tugas Sinkronisasi Batch Menggunakan UI Tanpa Kode.

  1. Buat koneksi jaringan antara sumber data dan grup sumber daya eksklusif untuk Data Integration.

    Pilih sumber data MongoDB yang Anda tambahkan di Langkah 1, sumber data MaxCompute yang Anda tambahkan, dan grup sumber daya eksklusif untuk Data Integration. Kemudian, uji konektivitas jaringan antara sumber data dan grup sumber daya.

  2. Pilih Sumber Data.

    Pilih koleksi data MongoDB dan tabel MaxCompute partisi yang Anda siapkan dalam langkah persiapan data.

  3. Konfigurasikan Pemetaan Bidang.

    Jika sumber data MongoDB ditambahkan, metode mapping fields in a row of the source to the fields in the same row of the destination digunakan secara default. Anda juga dapat mengklik ikon 图标 untuk mengedit bidang dalam koleksi sumber secara manual. Contoh kode berikut menunjukkan cara mengedit bidang dalam koleksi sumber:

    {"name":"_id","type":"string"}
    {"name":"col_string","type":"string"}
    {"name":"col_int32","type":"long"}
    {"name":"col_int32_min","type":"long"}
    {"name":"col_int32_max","type":"long"}
    {"name":"col_int64","type":"long"}
    {"name":"col_int64_min","type":"long"}
    {"name":"col_int64_max","type":"long"}
    {"name":"col_decimal","type":"double"}
    {"name":"col_double","type":"double"}
    {"name":"col_boolean","type":"boolean"}
    {"name":"col_timestamp","type":"date"}
    {"name":"col_date","type":"date"}
    {"name":"col_array_to_json","type":"string"}
    {"name":"col_array_to_join","type":"array","splitter":","}
    {"name":"col_doc.key_string","type":"document.string"}
    {"name":"col_doc.key_int32","type":"document.long"}
    {"name":"col_doc.key_int32_min","type":"document.long"}
    {"name":"col_doc.key_int32_max","type":"document.long"}
    {"name":"col_doc.key_int64","type":"document.long"}
    {"name":"col_doc.key_int64_min","type":"document.long"}
    {"name":"col_doc.key_int64_max","type":"document.long"}
    {"name":"col_doc.key_decimal","type":"document.double"}
    {"name":"col_doc.key_double","type":"document.double"}
    {"name":"col_doc.key_boolean","type":"document.boolean"}
    {"name":"col_doc.key_timestamp","type":"document.date"}
    {"name":"col_doc.key_date","type":"document.date"}
    {"name":"col_doc.key_array_to_json","type":"document"}
    {"name":"col_doc.key_array_to_join","type":"document.array","splitter":","}
    {"name":"col_doc","type":"string"}
    {"name":"col_combine","type":"combine"}

    Setelah Anda mengedit bidang, pemetaan baru antara bidang sumber dan bidang tujuan akan ditampilkan di tab konfigurasi tugas.

Langkah 3: Kirim dan terapkan tugas sinkronisasi batch

Jika Anda menggunakan ruang kerja dalam mode standar dan ingin menjadwalkan tugas sinkronisasi batch secara berkala di lingkungan produksi, Anda dapat mengirim dan menerapkan tugas ke lingkungan produksi. Untuk informasi lebih lanjut, lihat Terapkan Node.

Langkah 4: Jalankan tugas sinkronisasi batch dan lihat hasil sinkronisasi

Setelah Anda menyelesaikan konfigurasi sebelumnya, Anda dapat menjalankan tugas sinkronisasi batch. Setelah proses selesai, Anda dapat melihat data yang disinkronkan ke tabel MaxCompute.结果数据

  • Gambar berikut menunjukkan data di bidang col_doc.字段信息

  • Gambar berikut menunjukkan data di bidang col_combine.字段2

Catatan

Untuk informasi tentang masalah terkait data keluaran yang dikonversi dari data tipe DECIMAL, lihat bagian Lampiran 2: Masalah Terkait Data Keluaran yang Dikonversi dari Data Tipe DECIMAL dari topik ini.

Lampiran 1: Konversi tipe data selama sinkronisasi data

Konversi array menjadi data JSON: col_array_to_json

Data MongoDB mentah

Konfigurasi pemetaan bidang

Hasil konversi

{
    "col_array_to_json":
    [
        "a",
        "b"
    ]
}
{"name":"col_array_to_json","type":"string"}

Jika Anda mengatur parameter type dari koleksi data MongoDB ke string saat Anda mengonfigurasi pemetaan bidang untuk tugas sinkronisasi batch, data mentah akan diserialisasi menjadi data JSON saat tugas dijalankan.

[a, b]

Konversi array menjadi string gabungan: col_array_to_join

Data MongoDB mentah

Konfigurasi pemetaan bidang

Hasil konversi

{
    "col_array_to_join":
    [
        "a",
        "b"
    ]
}
{"name":"col_array_to_join","type":"array","splitter":","}

Jika Anda mengatur parameter type dari koleksi data MongoDB ke array saat Anda mengonfigurasi pemetaan bidang untuk tugas sinkronisasi batch, Anda harus mengonfigurasi parameter splitter. Array mentah dikonversi menjadi string gabungan berdasarkan pengaturan parameter splitter saat tugas sinkronisasi batch dijalankan.

a,b

Baca data dari bidang bersarang tertentu dalam dokumen MongoDB

Data MongoDB mentah

Konfigurasi pemetaan bidang

Hasil konversi

{
    "col_doc":
    {
        "key_string": "mock string value"
    }
}
{"name":"col_doc.key_string","type":"document.string"}

name menentukan path dari bidang dalam dokumen MongoDB dari mana Anda ingin membaca data saat tugas dijalankan.

mock string value

Konversi data dalam dokumen MongoDB menjadi data JSON

Data MongoDB mentah

Konfigurasi pemetaan bidang

Hasil konversi

{
    "col_doc":
    {
        "key_string": "mock string value",
        "key_int32": 1
    }
}
{"name":"col_doc","type":"string"}

Jika Anda mengatur parameter type dari koleksi data MongoDB ke string saat Anda mengonfigurasi pemetaan bidang untuk tugas sinkronisasi batch, data dalam bidang col_doc dikonversi menjadi data JSON saat tugas dijalankan.

{"key_string":"mockstringvalue","key_int32":1}

Konversi data dalam bidang kecuali bidang yang dikonfigurasikan dalam dokumen MongoDB menjadi data JSON

Data MongoDB mentah

Konfigurasi pemetaan bidang

Hasil konversi

{
    "col_1": "value1",
    "col_2": "value2",
    "col_3": "value3",
    "col_4": "value4"
}
{"name":"col_1","type":"string"}
{"name":"col_2","type":"string"}
{"name":"col_combine","type":"combine"}

Dokumen MongoDB berisi empat bidang: col_1, col_2, col_3, dan col_4. Bidang col_1 dan col_2 bukan tipe data COMBINE dan dikonfigurasikan dalam tugas sinkronisasi batch. Bidang col_3 dan col_4 dikonversi menjadi data JSON saat tugas dijalankan.

{"col_3":"value3","col_4":"value4"}

Lampiran 2: Masalah terkait data keluaran yang dikonversi dari data tipe DECIMAL

Contoh kode berikut menunjukkan keluaran default setelah data tipe Decimal128 dikonversi menjadi data JSON:

{
    "key_decimal":
    {
        "finite": true,
        "high": 3471149412795809792,
        "infinite": false,
        "low": 99999994999999999,
        "naN": false,
        "negative": false
    }
}

Anda dapat melakukan langkah-langkah berikut jika ingin mengonversi data tipe Decimal128 menjadi angka:

  1. Saat Anda mengonfigurasi tugas sinkronisasi batch, klik Skrip Konversi di bilah alat atas tab konfigurasi tugas untuk beralih ke mode skrip.

  2. Ubah konfigurasi sumber dengan menambahkan parameter decimal128OutputType dalam parameter dan atur parameter tersebut ke bigDecimal sebagai nilai tetap.decimal

  3. Jalankan tugas sinkronisasi batch lagi dan lihat hasilnya.

    {
        "key_decimal": "9999999.4999999999"
    }