All Products
Search
Document Center

DataWorks:Kembangkan tugas PyODPS 3

Last Updated:Feb 26, 2026

Anda dapat menggunakan node PyODPS 3 di DataWorks untuk mengembangkan tugas MaxCompute dengan Python 3 dan menjadwalkannya agar berjalan secara berkala. Topik ini menjelaskan cara membuat dan mengonfigurasi tugas PyODPS 3.

Prasyarat

Node PyODPS 3 telah dibuat. Untuk informasi selengkapnya, lihat Buat dan kelola node MaxCompute.

Informasi latar belakang

PyODPS adalah SDK Python untuk MaxCompute yang menyediakan antarmuka untuk mengembangkan tugas MaxCompute, melakukan kueri terhadap tabel dan view, serta mengelola resource. Untuk informasi selengkapnya, lihat PyODPS. Node PyODPS di DataWorks memungkinkan Anda menjadwalkan tugas Python dan mengintegrasikannya dengan tugas lainnya.

Catatan

  • Jika kode PyODPS Anda berjalan pada resource group DataWorks dan memerlukan paket pihak ketiga, Anda dapat menginstalnya pada Serverless resource group melalui Custom images.

    Catatan

    Metode ini tidak mendukung UDF yang mereferensikan paket pihak ketiga. Untuk informasi tentang cara mengonfigurasi UDF dengan dependensi pihak ketiga, lihat Contoh UDF: Gunakan paket pihak ketiga dalam UDF Python.

  • Untuk melakukan upgrade PyODPS pada Serverless resource group, jalankan /home/tops/bin/pip3 install pyodps==0.12.1 melalui Custom images (ganti 0.12.1 dengan versi yang diinginkan). Untuk exclusive resource group for scheduling, jalankan perintah yang sama melalui O&M Assistant.

  • Untuk mengakses sumber data atau layanan dalam lingkungan jaringan tertentu (seperti VPC atau IDC), gunakan Serverless resource group dan konfigurasikan konektivitas jaringan. Lihat Solusi konektivitas jaringan.

  • Untuk informasi selengkapnya tentang sintaksis PyODPS, lihat Dokumentasi PyODPS.

  • DataWorks mendukung node PyODPS 2 (Python 2) dan PyODPS 3 (Python 3). Pilih jenis node yang sesuai dengan versi Python Anda.

  • Jika pernyataan SQL yang dieksekusi dalam node PyODPS tidak menghasilkan alur data di Peta Data, Anda dapat mengonfigurasi parameter penjadwalan secara manual dalam kode untuk mengatasi masalah ini. Untuk informasi selengkapnya tentang cara melihat alur data, lihat Lihat informasi alur data. Untuk informasi selengkapnya tentang pengaturan parameter, lihat Atur parameter waktu proses (petunjuk). Anda dapat menggunakan kode berikut untuk memperoleh parameter yang diperlukan untuk eksekusi tugas:

    import os
    ...
    # get DataWorks scheduler runtime parameters
    skynet_hints = {}
    for k, v in os.environ.items():
        if k.startswith('SKYNET_'):
            skynet_hints[k] = v
    ...
    # setting hints while submitting a task
    o.execute_sql('INSERT OVERWRITE TABLE XXXX SELECT * FROM YYYY WHERE ***', hints=skynet_hints)
    ...
  • Log output node PyODPS dibatasi hingga 4 MB. Hindari menampilkan dataset besar; kami menyarankan hanya menggunakan log untuk alert dan pelacakan progres.

Batasan

  • Saat menggunakan exclusive resource group for scheduling, batasi pemrosesan data lokal hingga 50 MB. Penggunaan memori yang berlebihan dapat memicu error OOM (Got Killed) akibat batasan resource. Hindari pemrosesan data dalam jumlah besar secara lokal. Untuk informasi selengkapnya, lihat Praktik terbaik untuk penggunaan PyODPS yang efisien.

  • Saat Anda menggunakan Serverless resource group untuk menjalankan node PyODPS, konfigurasikan CU untuk node PyODPS berdasarkan jumlah data yang perlu diproses.

    Catatan

    Untuk Serverless resource group, konfigurasi maksimum per tugas adalah 64CU. Kami menyarankan batas 16CU untuk mencegah konflik sumber daya, yang dapat menunda eksekusi tugas.

  • Error Got killed menunjukkan bahwa penggunaan memori melebihi batas. Hindari operasi data lokal. Tugas SQL dan DataFrame yang diinisiasi melalui PyODPS (kecuali `to_pandas`) tidak terkena batasan ini.

  • Kode selain UDF dapat menggunakan paket NumPy dan Pandas yang telah pra-instal. Paket pihak ketiga lain yang berisi kode biner tidak didukung.

  • Untuk kompatibilitas, DataWorks mengatur options.tunnel.use_instance_tunnel ke False secara default. Untuk mengaktifkan instance tunnel secara global, atur nilai ini ke True.

  • Versi minor Python 3 yang berbeda (seperti Python 3.8 dan Python 3.7) memiliki definisi bytecode yang berbeda.

    MaxCompute saat ini mendukung Python 3.7. Menggunakan sintaksis spesifik versi lain (seperti blok `finally` di Python 3.8) akan menyebabkan error. Kami menyarankan Anda menggunakan Python 3.7.

  • PyODPS 3 mendukung eksekusi pada Serverless resource group. Untuk membeli Serverless resource group, lihat Gunakan serverless resource groups.

  • Node PyODPS tidak mendukung eksekusi konkuren beberapa tugas Python.

Edit kode: Contoh sederhana

Setelah membuat node PyODPS, Anda dapat menulis dan menjalankan kode. Untuk informasi selengkapnya tentang sintaksis PyODPS, lihat Ikhtisar.

  • Titik masuk ODPS

    Node PyODPS DataWorks mencakup variabel global, odps atau o, yang berfungsi sebagai titik masuk. Anda tidak perlu mendefinisikannya secara manual.

    print(odps.exist_table('PyODPS_iris'))
  • Eksekusi SQL

    Anda dapat mengeksekusi pernyataan SQL dalam node PyODPS. Untuk informasi selengkapnya, lihat SQL.

    • Secara default, DataWorks menonaktifkan instance tunnel. Dalam mode ini, instance.open_reader menggunakan antarmuka Result, yang dibatasi hingga 10.000 record. Anda dapat menggunakan reader.count untuk mendapatkan jumlah record. Untuk membaca semua data, Anda harus menonaktifkan pembatasan limit. Gunakan pernyataan berikut untuk mengaktifkan instance tunnel secara global dan menonaktifkan pembatasan limit.

      options.tunnel.use_instance_tunnel = True
      options.tunnel.limit_instance_tunnel = False  # Disable the limit to read all data.
      
      with instance.open_reader() as reader:
        # Use the instance tunnel to read all data.
    • Alternatifnya, tambahkan tunnel=True ke open_reader untuk mengaktifkan instance tunnel untuk panggilan tertentu. Anda juga dapat menambahkan limit=False untuk menonaktifkan pembatasan limit untuk panggilan tersebut.

      # Use the instance tunnel interface for this open_reader call and read all data.
      with instance.open_reader(tunnel=True, limit=False) as reader:
  • Atur parameter waktu proses

    • Anda dapat menggunakan parameter hints (tipe: dict) untuk mengonfigurasi parameter waktu proses. Untuk informasi selengkapnya tentang hints, lihat Operasi SET.

      o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})
    • Jika Anda mengonfigurasi sql.settings secara global, Anda harus menerapkan parameter waktu proses terkait untuk setiap eksekusi.

      from odps import options
      options.sql.settings = {'odps.sql.mapper.split.size': 16}
      o.execute_sql('select * from PyODPS_iris')  # Add hints based on the global configuration.
  • Baca hasil eksekusi

    Instans yang dihasilkan oleh eksekusi SQL dapat langsung memanggil open_reader. Ada dua skenario:

    • Pernyataan SQL mengembalikan data terstruktur.

      with o.execute_sql('select * from dual').open_reader() as reader:
      	for record in reader:  # Process each record.
    • Pernyataan SQL (seperti `desc`) mengembalikan data tidak terstruktur. Anda dapat menggunakan atribut reader.raw untuk mengambil hasil eksekusi mentah.

      with o.execute_sql('desc dual').open_reader() as reader:
      	print(reader.raw)
      Catatan

      Jika Anda menggunakan parameter penjadwalan kustom dan memicu node PyODPS 3 secara manual dari halaman konfigurasi, Anda harus mengkodekan waktu secara eksplisit karena node PyODPS tidak dapat melakukan penggantian parameter dalam mode ini.

  • DataFrame

    Anda juga dapat menggunakan DataFrame untuk memproses data.

    • Eksekusi

      Di DataWorks, Anda harus secara eksplisit memanggil metode eksekusi langsung untuk mengeksekusi DataFrame.

      from odps.df import DataFrame
      iris = DataFrame(o.get_table('pyodps_iris'))
      for record in iris[iris.sepal_width < 3].execute():  # Call the immediate execution method to process each record.

      Jika Anda ingin memicu eksekusi langsung saat mencetak, Anda harus mengaktifkan options.interactive.

      from odps import options
      from odps.df import DataFrame
      options.interactive = True  # Enable the switch at the beginning.
      iris = DataFrame(o.get_table('pyodps_iris'))
      print(iris.sepal_width.sum())  # Immediate execution is triggered during printing.
    • Cetak detail

      Atur opsi options.verbose ke True. Di DataWorks, opsi ini diaktifkan secara default, dan detail seperti URL Logview dicetak selama eksekusi.

Contoh

Contoh sederhana berikut menunjukkan cara menggunakan node PyODPS:

  1. Siapkan set data dan buat tabel sampel pyodps_iris. Untuk informasi selengkapnya, lihat Gunakan DataFrame untuk memproses data.

  2. Buat DataFrame. Untuk informasi selengkapnya, lihat Buat objek DataFrame dari tabel MaxCompute.

  3. Masukkan kode berikut ke dalam node PyODPS dan jalankan.

    from odps.df import DataFrame
    
    # Create a DataFrame from a MaxCompute table.
    iris = DataFrame(o.get_table('pyodps_iris'))
    print(iris.sepallength.head(5))

    Hasil pengembalian:

       sepallength
    0          4.5
    1          5.5
    2          4.9
    3          5.0
    4          6.0

Edit kode: Contoh lanjutan

Jika node memerlukan penjadwalan berkala, konfigurasikan properti penjadwalannya. Untuk informasi selengkapnya tentang konfigurasi penjadwalan, lihat Ikhtisar.

Gunakan parameter penjadwalan

Klik Properties di editor node. Di bagian Parameters, konfigurasikan parameter kustom. Mendefinisikan variabel dalam node PyODPS berbeda dari node SQL. Untuk informasi selengkapnya, lihat Konfigurasi parameter penjadwalan.

Tidak seperti node SQL, node PyODPS tidak melakukan penggantian string untuk placeholder seperti ${param_name} untuk mencegah konflik dengan kode Python. Sebagai gantinya, DataWorks menyuntikkan dictionary global args sebelum eksekusi. Anda dapat mengambil parameter dari dictionary ini. Misalnya, jika Anda mengatur ds=${yyyymmdd} di Parameters, Anda dapat menggunakan kode berikut untuk mendapatkan parameter ini:

print('ds=' + args['ds'])
ds=20161116
Catatan

Jika Anda perlu mendapatkan partisi bernama ds, gunakan metode berikut:

o.get_table('table_name').get_partition('ds=' + args['ds'])

Untuk skenario lain terkait pengembangan tugas PyODPS, lihat:

Langkah selanjutnya

  • Tentukan keberhasilan tugas skrip Shell kustom: Logika keberhasilan untuk tugas skrip Python kustom sama dengan node Shell. Anda dapat menggunakan metode ini untuk verifikasi.

  • Publikasikan tugas: Jika Anda menggunakan ruang kerja dalam mode standar, Anda harus men-deploy tugas ke lingkungan produksi melalui proses penerbitan agar penjadwalan berkala dapat diaktifkan.

  • O&M tugas berkala: Setelah tugas di-deploy ke Operation Center untuk penjadwalan, Anda dapat melakukan operasi O&M di Operation Center DataWorks.

  • FAQ PyODPS: Anda dapat mempelajari isu umum selama eksekusi PyODPS untuk mempercepat pemecahan masalah dan penyelesaian exception.

FAQ

Q: Node PyODPS 3 saya berfungsi selama pengembangan lokal tetapi timeout di Operation Center saat mengakses API pihak ketiga (misalnya, Feishu). Mengapa?

A: Anda harus mengonfigurasi daftar putih sandbox. Buka Management Center > Workspaces, lalu di bagian Security Settings, tambahkan titik akhir API pihak ketiga ke daftar putih untuk mengizinkan akses. Contohnya:

image