全部产品
Search
文档中心

DataWorks:Node PyODPS 3

更新时间:Feb 05, 2026

DataWorks menyediakan node PyODPS 3 yang memungkinkan Anda menulis pekerjaan MaxCompute langsung dalam Python dan mengonfigurasi penjadwalan berkala.

Ikhtisar

PyODPS adalah Python Software Development Kit (SDK) untuk MaxCompute. Antarmuka pemrogramannya yang sederhana memungkinkan Anda menulis pekerjaan, melakukan kueri terhadap tabel dan tampilan (views), serta mengelola resource MaxCompute. Untuk informasi selengkapnya, lihat PyODPS. Di DataWorks, Anda dapat menggunakan node PyODPS untuk menjadwalkan dan menjalankan tugas Python serta mengintegrasikannya dengan pekerjaan lain.

Pertimbangan

  • Saat menjalankan node PyODPS pada kelompok sumber daya DataWorks, Anda dapat menginstal paket pihak ketiga pada Serverless resource group jika kode Anda memerlukannya. Untuk informasi selengkapnya, lihat Custom images.

    Catatan

    Metode ini tidak mendukung User-Defined Functions (UDFs) yang mereferensikan paket pihak ketiga. Untuk metode konfigurasi yang benar, lihat Contoh UDF: Gunakan paket pihak ketiga dalam UDF Python.

  • Untuk meningkatkan versi PyODPS, jalankan perintah yang dijelaskan di Custom images. Anda dapat mengganti 0.12.1 dengan versi yang ingin Anda tingkatkan. Pada Serverless resource group, jalankan perintah tersebut sesuai petunjuk di Install a third-party package. Pada exclusive resource group for scheduling, ikuti petunjuk di O&M Assistant.

  • Jika tugas PyODPS Anda perlu mengakses lingkungan jaringan tertentu, seperti sumber data atau layanan di jaringan VPC atau IDC, gunakan Serverless resource group. Konfigurasikan konektivitas jaringan antara Serverless resource group dan lingkungan target. Untuk informasi selengkapnya, lihat Solusi konektivitas jaringan.

  • Untuk informasi lebih lanjut tentang sintaksis PyODPS, lihat Dokumentasi PyODPS.

  • Node PyODPS tersedia dalam dua jenis, yaitu PyODPS 2 dan PyODPS 3, yang masing-masing menggunakan Python 2 dan Python 3. Buat jenis node yang sesuai dengan versi Python yang Anda gunakan.

  • Jika pernyataan SQL yang dieksekusi dalam node PyODPS tidak menghasilkan alur data yang benar di Data Map, Anda dapat mengatur parameter waktu proses (runtime parameters) untuk penjadwalan DataWorks secara manual dalam kode tugas Anda. Untuk melihat alur data, lihat Lihat alur data. Untuk mengatur parameter, lihat Atur parameter waktu proses (hints). Anda dapat menggunakan contoh kode berikut untuk mendapatkan parameter waktu proses yang diperlukan.

    import os
    ...
    # Dapatkan parameter waktu proses penjadwal DataWorks
    skynet_hints = {}
    for k, v in os.environ.items():
        if k.startswith('SKYNET_'):
            skynet_hints[k] = v
    ...
    # Atur hints saat Anda mengirimkan tugas
    o.execute_sql('INSERT OVERWRITE TABLE XXXX SELECT * FROM YYYY WHERE ***', hints=skynet_hints)
    ...
  • Batas maksimum output log untuk node PyODPS adalah 4 MB. Hindari mencetak hasil data besar secara langsung ke log. Sebagai gantinya, fokuslah pada output informasi bernilai, seperti log peringatan dan pembaruan progres.

Batasan

  • Saat menggunakan exclusive resource group for scheduling untuk menjalankan node PyODPS, jumlah data yang diproses secara lokal dalam node tersebut tidak boleh melebihi 50 MB. Batasan ini ditentukan oleh spesifikasi exclusive resource group for scheduling. Memproses data lokal yang berlebihan dapat melebihi ambang batas sistem operasi dan menyebabkan error Out of Memory (OOM), yang ditandai dengan pesan `Got Killed`. Hindari menulis logika pemrosesan data intensif dalam node PyODPS. Untuk informasi selengkapnya, lihat Praktik terbaik penggunaan PyODPS secara efisien.

  • Saat menggunakan Serverless resource group untuk menjalankan node PyODPS, Anda dapat mengonfigurasi CUs untuk node tersebut berdasarkan jumlah data yang perlu diproses.

    Catatan

    Satu tugas dapat dikonfigurasi hingga maksimal 64 CUs, tetapi penggunaan lebih dari 16 CUs tidak disarankan karena dapat menyebabkan kekurangan resource yang memengaruhi startup tugas.

  • Error Got Killed menunjukkan bahwa proses dihentikan karena kondisi Out of Memory (OOM). Oleh karena itu, usahakan meminimalkan operasi data lokal. Tugas SQL dan DataFrame yang diinisiasi melalui PyODPS, kecuali to_pandas, tidak terkena batasan ini.

  • Kode yang bukan bagian dari User-Defined Function (UDF) dapat menggunakan paket Numpy dan Pandas yang telah dipra-instal. Paket pihak ketiga lain yang berisi kode biner tidak didukung.

  • Karena alasan kompatibilitas, options.tunnel.use_instance_tunnel secara default diatur ke False di DataWorks. Untuk mengaktifkan instance tunnel secara global, Anda harus mengatur nilai ini secara manual menjadi True.

  • Definisi bytecode berbeda antar versi minor Python 3, seperti Python 3.8 dan Python 3.7.

    MaxCompute saat ini menggunakan Python 3.7. Penggunaan sintaksis dari versi Python 3 lain, seperti blok finally di Python 3.8, akan menyebabkan error eksekusi. Kami menyarankan Anda menggunakan Python 3.7.

  • PyODPS 3 dapat dijalankan pada Serverless resource group. Untuk membeli dan menggunakan resource ini, lihat Gunakan serverless resource groups.

  • Satu node PyODPS tidak mendukung eksekusi konkuren beberapa tugas Python.

  • Untuk mencetak log dari node PyODPS, gunakan fungsi print. Fungsi logger.info saat ini tidak didukung.

Sebelum memulai

Ikatkan resource komputasi MaxCompute compute resource ke ruang kerja DataWorks Anda.

Prosedur

  1. Di editor node PyODPS 3, lakukan langkah-langkah pengembangan berikut.

    Contoh kode PyODPS 3

    Setelah membuat node PyODPS, Anda dapat mengedit dan menjalankan kode. Untuk informasi lebih lanjut tentang sintaksis PyODPS, lihat Ikhtisar. Bagian ini menyediakan lima contoh kode. Pilih contoh yang paling sesuai dengan kebutuhan bisnis Anda.

    Titik masuk ODPS

    DataWorks menyediakan variabel global odps (atau o) sebagai titik masuk ODPS, sehingga Anda tidak perlu mendefinisikannya secara manual.

    print(odps.exist_table('PyODPS_iris'))

    Eksekusi pernyataan SQL

    Anda dapat mengeksekusi pernyataan SQL dalam node PyODPS. Untuk informasi selengkapnya, lihat SQL.

    • Di DataWorks, instance tunnel dinonaktifkan secara default. Artinya, instance.open_reader menggunakan antarmuka Result, yang mengembalikan maksimal 10.000 record. Anda dapat menggunakan reader.count untuk mendapatkan jumlah record. Untuk mengiterasi seluruh data, Anda harus menonaktifkan limit. Anda dapat menggunakan pernyataan berikut untuk mengaktifkan instance tunnel secara global dan menonaktifkan limit.

      options.tunnel.use_instance_tunnel = True
      options.tunnel.limit_instance_tunnel = False  # Nonaktifkan limit untuk membaca semua data.
      
      with instance.open_reader() as reader:
        # Anda dapat membaca semua data melalui Instance Tunnel.
    • Alternatifnya, Anda dapat menambahkan tunnel=True ke pemanggilan open_reader untuk mengaktifkan instance tunnel hanya untuk operasi tersebut. Anda juga dapat menambahkan limit=False untuk menonaktifkan limit untuk operasi tersebut.

      # Gunakan antarmuka Instance Tunnel untuk operasi open_reader ini agar membaca semua data.
      with instance.open_reader(tunnel=True, limit=False) as reader:

    Atur parameter waktu proses

    • Anda dapat mengatur parameter waktu proses menggunakan parameter hints, yang bertipe dict. Untuk informasi selengkapnya tentang hints, lihat Operasi SET.

      o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})
    • Jika Anda mengonfigurasi sql.settings secara global, parameter waktu proses yang ditentukan akan ditambahkan ke setiap eksekusi.

      from odps import options
      options.sql.settings = {'odps.sql.mapper.split.size': 16}
      o.execute_sql('select * from PyODPS_iris')  # Tambahkan hints berdasarkan konfigurasi global.

    Membaca hasil eksekusi

    Instans yang menjalankan pernyataan SQL dapat langsung melakukan operasi open_reader dalam dua skenario berikut:

    • Pernyataan SQL mengembalikan data terstruktur.

      with o.execute_sql('select * from dual').open_reader() as reader:
      	for record in reader:  # Proses setiap record.
    • Pernyataan SQL merupakan perintah seperti desc. Anda dapat menggunakan atribut reader.raw untuk mendapatkan hasil eksekusi SQL mentah.

      with o.execute_sql('desc dual').open_reader() as reader:
      	print(reader.raw)
      Catatan

      Saat menjalankan node PyODPS 3 langsung dari editor, Anda harus menetapkan nilai parameter penjadwalan kustom secara eksplisit karena node tidak secara otomatis mengganti placeholder.

    DataFrame

    Anda juga dapat menggunakan DataFrame (tidak disarankan) untuk memproses data.

    • Eksekusi

      Di DataWorks, Anda harus secara eksplisit memanggil metode eksekusi langsung untuk mengeksekusi DataFrame.

      from odps.df import DataFrame
      iris = DataFrame(o.get_table('pyodps_iris'))
      for record in iris[iris.sepal_width < 3].execute():  # Panggil metode eksekusi langsung untuk memproses setiap Record.

      Jika Anda perlu memicu eksekusi langsung dengan pernyataan print, Anda harus mengaktifkan options.interactive.

      from odps import options
      from odps.df import DataFrame
      options.interactive = True  # Aktifkan opsi di awal.
      iris = DataFrame(o.get_table('pyodps_iris'))
      print(iris.sepal_width.sum())  # Eksekusi langsung dipicu oleh print().
    • Mencetak informasi detail

      Anda dapat mengatur opsi options.verbose untuk mencetak informasi detail. Di DataWorks, opsi ini diaktifkan secara default dan mencetak detail seperti URL Logview selama eksekusi.

    Mengembangkan kode PyODPS 3

    Contoh berikut menunjukkan cara menggunakan node PyODPS:

    1. Siapkan set data. Buat tabel sampel pyodps_iris. Untuk petunjuk, lihat Gunakan DataFrame untuk memproses data.

    2. Buat DataFrame. Untuk informasi selengkapnya, lihat Buat DataFrame dari tabel MaxCompute.

    3. Masukkan kode berikut ke dalam node PyODPS dan jalankan.

      from odps.df import DataFrame
      
      # Buat DataFrame dari tabel ODPS.
      iris = DataFrame(o.get_table('pyodps_iris'))
      print(iris.sepallength.head(5))

    Jalankan tugas PyODPS

    1. Pada bagian Run Configuration, konfigurasikan Compute resources, Compute quota, dan DataWorks resource group.

      Catatan
      • Untuk mengakses sumber data di jaringan publik atau jaringan VPC, Anda harus menggunakan resource group for scheduling yang telah lulus uji konektivitas dengan sumber data tersebut. Untuk informasi selengkapnya, lihat Solusi konektivitas jaringan.

      • Anda dapat mengonfigurasi Image berdasarkan kebutuhan tugas.

    2. Di kotak dialog parameter pada toolbar, pilih sumber data MaxCompute yang telah Anda buat dan klik Run.

  2. Untuk menjalankan tugas node sesuai jadwal, konfigurasikan properti penjadwalannya sesuai kebutuhan bisnis Anda. Untuk informasi selengkapnya, lihat Konfigurasi penjadwalan node.

    Berbeda dengan node SQL di DataWorks, node PyODPS tidak mengganti string seperti `${param_name}` dalam kode untuk menghindari substitusi string yang tidak diinginkan. Sebagai gantinya, sebelum kode dieksekusi, sebuah dict bernama args ditambahkan ke variabel global. Anda dapat mengambil parameter penjadwalan dari dict ini. Misalnya, jika Anda mengatur ds=${yyyymmdd} di tab Parameters, Anda dapat mengambil parameter ini dalam kode sebagai berikut.

    print('ds=' + args['ds'])
    ds=20240930
    Catatan

    Untuk mendapatkan partisi bernama ds, Anda dapat menggunakan metode berikut.

    o.get_table('table_name').get_partition('ds=' + args['ds'])
  3. Setelah mengonfigurasi tugas node, Anda harus menerapkannya. Untuk informasi selengkapnya, lihat Penerapan node dan alur kerja.

  4. Setelah tugas diterapkan, Anda dapat melihat status tugas terjadwal di Pusat Operasi. Untuk informasi selengkapnya, lihat Memulai Pusat Operasi.

Menjalankan node dengan peran yang terkait

Anda dapat mengaitkan role RAM tertentu untuk menjalankan tugas node. Hal ini memungkinkan kontrol izin detail halus dan keamanan yang lebih baik.

Langkah selanjutnya

FAQ PyODPS: Pelajari isu umum eksekusi PyODPS untuk membantu Anda memecahkan dan menyelesaikan exception dengan cepat.