全部产品
Search
文档中心

DataWorks:Node PyODPS 2

更新时间:Feb 05, 2026

DataWorks menyediakan node PyODPS 2 untuk mengembangkan tugas menggunakan sintaks PyODPS. Sebagai Python SDK untuk MaxCompute, PyODPS memungkinkan Anda menulis kode Python guna mengoperasikan MaxCompute secara langsung di dalam node tersebut.

Ikhtisar

PyODPS adalah Python SDK untuk MaxCompute yang menyediakan antarmuka pemrograman untuk menulis pekerjaan, melakukan kueri terhadap tabel dan tampilan (view), serta mengelola resource MaxCompute. Untuk informasi selengkapnya, lihat PyODPS. Di DataWorks, Anda dapat menggunakan node PyODPS untuk menjalankan dan menjadwalkan tugas Python serta mengintegrasikannya dengan pekerjaan lain.

Catatan penggunaan

  • Jika kode PyODPS Anda memerlukan paket pihak ketiga, Anda dapat menginstalnya saat menjalankan node pada Serverless Resource Group. Untuk informasi selengkapnya, lihat Custom images.

    Catatan

    Jika kode Anda mencakup user-defined function (UDF) yang mereferensikan paket pihak ketiga, metode di atas tidak dapat digunakan. Untuk informasi tentang metode konfigurasi yang benar, lihat Contoh UDF: Gunakan paket pihak ketiga dalam UDF Python.

  • Jika tugas PyODPS Anda perlu mengakses lingkungan jaringan tertentu, seperti sumber data atau layanan di VPC atau IDC, gunakan Serverless Resource Group dan buat koneksi jaringan antara Serverless Resource Group dengan lingkungan target. Untuk informasi selengkapnya, lihat Solusi konektivitas jaringan.

  • Untuk informasi lebih lanjut tentang sintaks PyODPS dan detail lainnya, lihat Dokumentasi PyODPS.

  • Node PyODPS tersedia dalam dua jenis: PyODPS 2 dan PyODPS 3. Perbedaannya terletak pada versi Python yang mendasarinya. Node PyODPS 2 menggunakan Python 2, sedangkan node PyODPS 3 menggunakan Python 3. Pilih jenis node yang sesuai dengan versi Python Anda.

  • Jika pernyataan SQL dalam node PyODPS gagal menghasilkan alur data (Data Lineage) sehingga tidak ditampilkan di Peta Data (Data Map), Anda dapat mengonfigurasi manual Parameter Penjadwalan terkait untuk DataWorks dalam kode tugas Anda guna mengatasi masalah tersebut. Untuk melihat alur data, lihat Lihat alur data. Untuk informasi tentang pengaturan parameter, lihat Atur parameter waktu proses (hints). Anda dapat menggunakan kode berikut untuk mendapatkan parameter yang diperlukan saat waktu proses.

    import os
    # ...
    # Dapatkan parameter waktu proses penjadwal DataWorks.
    skynet_hints = {}
    for k, v in os.environ.items():
        if k.startswith('SKYNET_'):
            skynet_hints[k] = v
    # ...
    # Atur hints saat mengirimkan tugas.
    o.execute_sql('INSERT OVERWRITE TABLE XXXX SELECT * FROM YYYY WHERE ***', hints=skynet_hints)
    # ...
  • Hindari mencetak data dalam jumlah besar ke log. Cukup catat informasi penting saja, seperti peringatan dan pembaruan progres.

Batasan

  • Saat menggunakan Exclusive Resource Group for Scheduling untuk menjalankan node PyODPS, kami menyarankan agar jumlah data yang diproses secara lokal di dalam node tidak melebihi 50 MB. Operasi ini dibatasi oleh spesifikasi Exclusive Resource Group for Scheduling. Memproses data lokal yang berlebihan hingga melebihi ambang batas sistem operasi dapat menyebabkan error Out-of-Memory (OOM), yang ditandai dengan pesan "Got killed". Hindari menulis kode pemrosesan data yang ekstensif di node PyODPS. Untuk informasi selengkapnya, lihat Praktik terbaik penggunaan PyODPS yang efisien.

  • Saat menggunakan Serverless Resource Group untuk menjalankan node PyODPS, Anda dapat mengonfigurasi CU untuk node tersebut berdasarkan jumlah data yang perlu diproses.

    Catatan

    Saat menjalankan tugas menggunakan Serverless Resource Group, Anda dapat mengonfigurasi maksimal 64 CU untuk satu tugas. Namun, kami menyarankan agar tidak melebihi 16 CU untuk mencegah kegagalan startup tugas akibat kekurangan resource.

  • Error Got killed menunjukkan bahwa penggunaan memori telah melebihi batas, sehingga proses dihentikan. Untuk mencegah hal ini, hindari melakukan operasi data secara lokal. Tugas SQL dan DataFrame yang diinisiasi melalui PyODPS, kecuali to_pandas, tidak terkena batasan ini.

  • Anda dapat menggunakan library Numpy dan Pandas yang telah pra-instal dalam kode Anda, tetapi tidak di dalam UDF. Paket pihak ketiga lain yang mengandung kode biner tidak didukung.

  • Untuk kompatibilitas, options.tunnel.use_instance_tunnel secara default bernilai False di DataWorks. Untuk mengaktifkan Instance Tunnel secara global, Anda harus mengatur nilai ini secara manual menjadi True.

  • Versi Python yang mendasari node PyODPS 2 adalah 2.7.

  • Menjalankan beberapa tugas Python secara konkuren dalam satu node PyODPS tidak didukung.

  • Gunakan print untuk mencetak log di node PyODPS. logger.info tidak didukung.

Sebelum memulai

Ikatkan resource komputasi MaxCompute compute resource ke ruang kerja DataWorks Anda.

Prosedur

  1. Pada halaman pengeditan node PyODPS 2, lakukan operasi pengembangan berikut.

    Contoh kode PyODPS 2

    Setelah membuat node PyODPS, Anda dapat mengedit dan menjalankan kode Anda. Untuk informasi selengkapnya tentang sintaks PyODPS, lihat Ikhtisar. Dokumen ini menyediakan lima contoh kode. Pilih salah satu yang paling sesuai dengan kebutuhan bisnis Anda.

    Titik masuk ODPS

    Node PyODPS DataWorks mencakup variabel global, odps atau o, yang berfungsi sebagai titik masuk ODPS. Anda tidak perlu mendefinisikannya secara manual.

    print(odps.exist_table('PyODPS_iris'))

    Eksekusi SQL

    Anda dapat mengeksekusi SQL di node PyODPS. Untuk informasi selengkapnya, lihat SQL.

    • Secara default, Instance Tunnel tidak diaktifkan di DataWorks, artinya instance.open_reader menggunakan Result API dan mengembalikan maksimal 10.000 record. Anda dapat menggunakan reader.count untuk mendapatkan jumlah record. Untuk mengiterasi seluruh data, Anda harus menonaktifkan pembatasan limit. Anda dapat menggunakan pernyataan berikut untuk mengaktifkan Instance Tunnel dan menonaktifkan pembatasan limit secara global.

      options.tunnel.use_instance_tunnel = True
      options.tunnel.limit_instance_tunnel = False  # Nonaktifkan limit untuk membaca seluruh data.
      
      with instance.open_reader() as reader:
        # Anda dapat membaca seluruh data melalui Instance Tunnel.
    • Anda juga dapat menambahkan tunnel=True ke open_reader untuk mengaktifkan Instance Tunnel hanya untuk operasi open_reader saat ini. Selain itu, Anda dapat menambahkan limit=False untuk menonaktifkan pembatasan limit hanya untuk operasi tersebut.

      # Operasi open_reader ini menggunakan Instance Tunnel API dan dapat membaca seluruh data.
      with instance.open_reader(tunnel=True, limit=False) as reader:

    Atur parameter waktu proses

    • Anda dapat mengatur parameter waktu proses dengan mengonfigurasi parameter hints, yang bertipe dict. Untuk informasi selengkapnya tentang hints, lihat Operasi SET.

      o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})
    • Jika Anda mengonfigurasi sql.settings di pengaturan global, DataWorks akan menambahkan parameter waktu proses ini ke setiap eksekusi.

      from odps import options
      options.sql.settings = {'odps.sql.mapper.split.size': 16}
      o.execute_sql('select * from PyODPS_iris')  # Menambahkan hints berdasarkan konfigurasi global.

    Baca hasil eksekusi

    Instans yang menjalankan SQL dapat langsung mengeksekusi operasi open_reader dalam dua skenario berikut:

    • SQL mengembalikan data terstruktur.

      with o.execute_sql('select * from dual').open_reader() as reader:
      	for record in reader:  # Proses setiap record.
    • Pernyataan SQL seperti DESC dieksekusi. Anda dapat menggunakan atribut reader.raw untuk mendapatkan hasil mentah dari eksekusi SQL.

      with o.execute_sql('desc dual').open_reader() as reader:
      	print(reader.raw)
      Catatan

      Saat menjalankan manual node PyODPS 2 yang menggunakan Parameter Penjadwalan kustom, Anda harus menyematkan nilai waktu secara hardcoded. Node PyODPS tidak dapat langsung menggantikan parameter tersebut.

    DataFrame

    Anda juga dapat memproses data menggunakan DataFrame (tidak disarankan).

    • Eksekusi

      Di lingkungan DataWorks, mengeksekusi DataFrame memerlukan pemanggilan eksplisit terhadap metode eksekusi langsung.

      from odps.df import DataFrame
      iris = DataFrame(o.get_table('pyodps_iris'))
      for record in iris[iris.sepal_width < 3].execute():  # Panggil metode eksekusi langsung untuk memproses setiap record.

      Untuk memicu eksekusi langsung dengan print, aktifkan options.interactive.

      from odps import options
      from odps.df import DataFrame
      options.interactive = True  # Aktifkan opsi ini di awal.
      iris = DataFrame(o.get_table('pyodps_iris'))
      print(iris.sepal_width.sum())  # Memicu eksekusi langsung saat mencetak.
    • Cetak informasi detail

      Anda dapat mengatur opsi options.verbose. Opsi ini secara default diaktifkan di DataWorks, dan informasi detail seperti URL Logview akan dicetak selama eksekusi.

    Kembangkan kode PyODPS 2

    Contoh sederhana berikut menunjukkan cara menggunakan node PyODPS:

    1. Siapkan set data dengan membuat tabel sampel pyodps_iris. Untuk informasi selengkapnya, lihat Gunakan DataFrame untuk memproses data.

    2. Buat DataFrame. Untuk informasi selengkapnya, lihat Buat objek DataFrame dari tabel MaxCompute.

    3. Masukkan kode berikut di node PyODPS.

      from odps.df import DataFrame
      
      # Buat DataFrame dari tabel ODPS.
      iris = DataFrame(o.get_table('pyodps_iris'))
      print(iris.sepallength.head(5))

    Jalankan tugas PyODPS

    1. Pada bagian Run Configuration Resource, konfigurasikan compute engine instance, DataWorks Resource Group, dan DataWorks Resource Group.

      Catatan
      • Untuk mengakses sumber data melalui jaringan publik atau di VPC, Anda harus menggunakan Resource Group yang dapat terhubung ke sumber data tersebut. Untuk informasi selengkapnya, lihat Solusi konektivitas jaringan.

      • Anda dapat mengonfigurasi parameter Image sesuai kebutuhan tugas Anda.

    2. Pada bilah alat, klik Run untuk menjalankan tugas PyODPS.

  2. Untuk menjalankan tugas node secara terjadwal, konfigurasikan properti penjadwalannya sesuai kebutuhan bisnis Anda. Untuk informasi selengkapnya, lihat Konfigurasi penjadwalan node.

    Berbeda dengan node SQL di DataWorks, node PyODPS tidak mengganti string seperti ${param_name} dalam kode untuk menghindari modifikasi yang tidak diinginkan. Sebaliknya, sebelum kode dieksekusi, DataWorks menambahkan dict bernama args ke variabel global. Anda dapat menggunakan dict ini untuk mengambil Parameter Penjadwalan. Misalnya, jika Anda mengatur parameter Parameters menjadi ds=${yyyymmdd}, Anda dapat mengakses parameter ini dalam kode sebagai berikut.

    print('ds=' + args['ds'])
    ds=20240930
    Catatan

    Untuk mendapatkan Partition bernama ds, Anda dapat menggunakan metode berikut.

    o.get_table('table_name').get_partition('ds=' + args['ds'])
  3. Setelah mengonfigurasi tugas node, Anda harus menerbitkannya. Untuk informasi selengkapnya, lihat Penerapan node dan alur kerja.

  4. Setelah tugas diterbitkan, Anda dapat membuka Pusat Operasi (Operation Center) untuk melihat detail eksekusi Tugas Berkala (Periodic Task). Untuk informasi selengkapnya, lihat Memulai Pusat Operasi.

Menjalankan node dengan peran yang terkait

Anda dapat mengaitkan peran RAM tertentu untuk menjalankan tugas node. Hal ini memungkinkan kontrol izin detail halus dan meningkatkan keamanan.

Langkah selanjutnya

FAQ PyODPS: Anda dapat mempelajari isu umum yang mungkin terjadi selama eksekusi PyODPS untuk mempercepat pemecahan masalah dan penanganan exception.