全部产品
Search
文档中心

DataWorks:Kembangkan Tugas PyODPS 2

更新时间:Jul 10, 2025

DataWorks menyediakan node PyODPS 2 yang memungkinkan Anda mengembangkan tugas PyODPS menggunakan sintaksis PyODPS di DataWorks. PyODPS terintegrasi dengan Python SDK dari MaxCompute, sehingga Anda dapat menulis kode Python pada node PyODPS 2 di Konsol DataWorks untuk memproses data di MaxCompute.

Prasyarat

Sebuah node PyODPS 2 telah dibuat. Untuk informasi lebih lanjut, lihat Buat dan kelola node MaxCompute.

Informasi latar belakang

PyODPS adalah SDK MaxCompute untuk Python yang menyediakan antarmuka pemrograman Python yang sederhana dan nyaman. Dengan ini, Anda dapat menggunakan Python untuk menulis kode pekerjaan MaxCompute, menanyakan tabel dan tampilan MaxCompute, serta mengelola sumber daya MaxCompute. Untuk informasi lebih lanjut, lihat PyODPS. Di DataWorks, Anda dapat menggunakan node PyODPS untuk menjadwalkan tugas Python dan mengintegrasikannya dengan jenis tugas lainnya.

Perhatian

  • Jika paket pihak ketiga perlu dirujuk saat menjalankan node PyODPS pada grup sumber daya DataWorks, gunakan grup sumber daya tanpa server dan buat gambar kustom untuk menginstal paket pihak ketiga tersebut.

    Catatan

    Jika paket pihak ketiga perlu dirujuk dalam fungsi yang ditentukan pengguna (UDF) dalam kode node, Anda tidak dapat menggunakan metode sebelumnya untuk menginstal paket pihak ketiga. Untuk informasi tentang cara merujuk paket pihak ketiga dalam UDF, lihat Contoh: Merujuk paket pihak ketiga dalam Python UDF.

  • Untuk meningkatkan versi PyODPS dari node PyODPS 3 yang berjalan pada grup sumber daya tanpa server, Anda dapat menggunakan fitur manajemen gambar untuk menjalankan perintah /home/tops/bin/pip3 install pyodps==0.12.1 pada grup sumber daya. Untuk meningkatkan versi PyODPS dari node PyODPS 3 yang berjalan pada grup sumber daya eksklusif untuk penjadwalan, Anda dapat menggunakan fitur Asisten O&M untuk menjalankan perintah yang sama pada grup sumber daya. Saat menjalankan perintah, Anda dapat mengubah 0.12.1 ke versi yang ingin Anda tingkatkan. Untuk informasi lebih lanjut tentang fitur manajemen gambar dan fitur Asisten O&M, lihat Kelola gambar dan Gunakan fitur Asisten O&M.

  • Jika Anda ingin menggunakan node PyODPS untuk mengakses sumber data atau layanan yang diterapkan di lingkungan jaringan khusus, seperti virtual private cloud (VPC) atau pusat data, gunakan grup sumber daya tanpa server untuk menjalankan node tersebut, dan buat koneksi jaringan antara grup sumber daya dan sumber data atau layanan. Untuk informasi lebih lanjut, lihat Solusi konektivitas jaringan.

  • Untuk informasi lebih lanjut tentang sintaksis PyODPS, lihat Ikhtisar.

  • Node PyODPS diklasifikasikan menjadi node PyODPS 2 dan node PyODPS 3. Kedua jenis node PyODPS menggunakan versi Python yang berbeda di lapisan bawah. Node PyODPS 2 menggunakan Python 2, sedangkan node PyODPS 3 menggunakan Python 3. Anda dapat membuat node PyODPS berdasarkan versi Python yang digunakan.

  • Jika garis keturunan data tidak dapat dihasilkan dengan mengeksekusi pernyataan SQL dalam node PyODPS dan Peta Data tidak dapat menampilkan garis keturunan data seperti yang diharapkan, Anda dapat menyelesaikan masalah tersebut dengan mengonfigurasi parameter penjadwalan secara manual dalam kode node PyODPS. Untuk informasi lebih lanjut tentang cara melihat garis keturunan data, lihat Lihat garis keturunan data. Untuk informasi lebih lanjut tentang cara mengonfigurasi parameter, lihat Konfigurasikan parameter hints. Contoh kode berikut memberikan contoh tentang cara mendapatkan parameter yang diperlukan untuk menjalankan tugas:

    import os
    ...
    # dapatkan parameter runtime penjadwal DataWorks
    skynet_hints = {}
    for k, v in os.environ.items():
        if k.startswith('SKYNET_'):
            skynet_hints[k] = v
    ...
    # atur hints saat mengirimkan tugas
    o.execute_sql('INSERT OVERWRITE TABLE XXXX SELECT * FROM YYYY WHERE ***', hints=skynet_hints)
    ...
  • Log keluaran node PyODPS bisa mencapai ukuran maksimum 4 MB. Kami merekomendasikan agar Anda tidak menyertakan sejumlah besar entri hasil data dalam log keluaran. Sebagai gantinya, kami merekomendasikan agar Anda menyediakan lebih banyak log peringatan dan log terkait kemajuan untuk mendapatkan informasi yang berharga.

Batasan

  • Karena spesifikasi sumber daya dalam grup sumber daya eksklusif untuk penjadwalan yang ingin Anda gunakan untuk menjalankan node PyODPS 2, kami merekomendasikan agar Anda menggunakan node PyODPS 2 untuk memproses tidak lebih dari 50 MB data dari mesin lokal Anda. Jika node PyODPS 2 memproses lebih dari 50 MB data dari mesin lokal Anda, pengecualian kehabisan memori (OOM) mungkin terjadi, dan sistem mungkin melaporkan Got killed. Kami merekomendasikan agar Anda tidak menulis kode pemrosesan data yang berlebihan untuk node PyODPS 2. Untuk informasi lebih lanjut, lihat Praktik terbaik untuk penggunaan efisien node PyODPS.

  • Saat menggunakan grup sumber daya tanpa server untuk menjalankan node PyODPS 2, Anda dapat mengonfigurasi jumlah CU yang sesuai untuk node PyODPS 2 berdasarkan jumlah data yang perlu diproses dalam node tersebut.

    Catatan

    Saat menjalankan tugas pada grup sumber daya tanpa server, satu tugas dapat dikonfigurasi hingga 64 CU. Namun, disarankan untuk menjaga pengaturan dalam 16 CU untuk menghindari potensi kekurangan sumber daya yang dapat memengaruhi startup tugas.

  • Jika sistem melaporkan Got killed, penggunaan memori melebihi batas, dan sistem menghentikan proses terkait. Kami merekomendasikan agar Anda tidak melakukan operasi data lokal. Namun, batasan pada penggunaan memori tidak berlaku untuk tugas SQL atau DataFrame (tidak termasuk tugas to_pandas) yang diinisiasi oleh PyODPS.

  • Anda dapat menggunakan pustaka NumPy dan pandas yang sudah diinstal sebelumnya di DataWorks untuk menjalankan fungsi selain UDF. Paket pihak ketiga yang berisi kode biner tidak didukung.

  • Untuk alasan kompatibilitas, options.tunnel.use_instance_tunnel diatur ke False secara default di DataWorks. Jika Anda ingin mengaktifkan InstanceTunnel secara global, Anda harus mengatur parameter ini ke True.

  • Versi Python dari node PyODPS 2 adalah 2.7.

  • Anda tidak dapat menjalankan beberapa tugas Python pada node PyODPS 2 secara bersamaan.

Contoh pengeditan kode sederhana

Setelah Anda membuat node PyODPS, Anda dapat menulis dan menjalankan kode. Untuk informasi tentang sintaksis PyODPS, lihat Ikhtisar.

  • Gunakan titik masuk MaxCompute

    Di DataWorks, setiap node PyODPS mencakup variabel global odps atau o, yang merupakan titik masuk MaxCompute. Oleh karena itu, Anda tidak perlu secara manual menentukan titik masuk MaxCompute.

    print(odps.exist_table('PyODPS_iris'))
  • Eksekusi pernyataan SQL

    Anda dapat mengeksekusi pernyataan SQL dalam node PyODPS. Untuk informasi lebih lanjut, lihat SQL.

    • Secara default, InstanceTunnel dinonaktifkan di DataWorks. Dalam hal ini, instance.open_reader dijalankan menggunakan antarmuka Hasil, dan maksimal 10.000 catatan data dapat dibaca. Anda dapat menggunakan reader.count untuk mendapatkan jumlah catatan data yang dibaca. Jika Anda perlu membaca semua data secara iteratif, Anda harus menghapus batasan jumlah catatan data yang akan dibaca. Anda dapat mengeksekusi pernyataan berikut untuk mengaktifkan InstanceTunnel secara global dan menghapus batasan jumlah catatan data yang akan dibaca:

      options.tunnel.use_instance_tunnel = True
      options.tunnel.limit_instance_tunnel = False  # Hapus batasan jumlah catatan data yang akan dibaca.
      
      with instance.open_reader() as reader:
        # Gunakan InstanceTunnel untuk membaca semua data.

    • Anda juga dapat menambahkan tunnel=True ke open_reader untuk mengaktifkan InstanceTunnel untuk operasi open_reader saat ini. Anda dapat menambahkan limit=False ke open_reader untuk menghapus batasan jumlah catatan data yang akan dibaca untuk operasi open_reader saat ini.

      # Operasi open_reader saat ini dilakukan menggunakan InstanceTunnel, dan semua data dapat dibaca.
      with instance.open_reader(tunnel=True, limit=False) as reader:
  • Konfigurasikan parameter waktu proses

    • Anda dapat menggunakan parameter hints untuk mengonfigurasi parameter waktu proses. Parameter hints bertipe DICT. Untuk informasi lebih lanjut tentang parameter hints, lihat Operasi SET.

      o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})
    • Jika Anda mengonfigurasi parameter sql.settings untuk konfigurasi global, Anda harus mengonfigurasi parameter waktu proses setiap kali menjalankan kode node.

      from odps import options
      options.sql.settings = {'odps.sql.mapper.split.size': 16}
      o.execute_sql('select * from PyODPS_iris')  # Konfigurasikan parameter hints berdasarkan konfigurasi global.

  • Dapatkan hasil query pernyataan SQL

    Anda dapat menggunakan metode open_reader untuk mendapatkan hasil query dalam skenario berikut:

    • Pernyataan SQL mengembalikan data terstruktur.

      with o.execute_sql('select * from dual').open_reader() as reader:
      	for record in reader:  # Proses setiap catatan.

    • Pernyataan SQL seperti DESC dieksekusi. Dalam hal ini, Anda dapat menggunakan properti reader.raw untuk mendapatkan hasil query mentah.

      with o.execute_sql('desc dual').open_reader() as reader:
      	print(reader.raw)
      Catatan

      Jika Anda menggunakan parameter penjadwalan kustom dan menjalankan node PyODPS 3 pada tab konfigurasi node, Anda harus mengatur parameter penjadwalan ke nilai konstan untuk menentukan waktu tetap. Nilai parameter penjadwalan kustom untuk node PyODPS tidak dapat diganti secara otomatis.

  • Gunakan DataFrame untuk memproses data

    Anda dapat menggunakan DataFrame (tidak direkomendasikan) untuk memproses data.

    • Panggil operasi API DataFrame

      DataFrame operasi API tidak dipanggil secara otomatis. Operasi ini hanya dapat dipanggil jika Anda secara eksplisit memanggil metode eksekusi langsung.

      from odps.df import DataFrame
      iris = DataFrame(o.get_table('pyodps_iris'))
      for record in iris[iris.sepal_width < 3].execute():  # Panggil metode eksekusi langsung untuk memproses setiap catatan data.

      Untuk memanggil metode eksekusi langsung untuk tampilan data, atur options.interactive ke True.

      from odps import options
      from odps.df import DataFrame
      options.interactive = True  # Atur options.interactive ke True di awal kode.
      iris = DataFrame(o.get_table('pyodps_iris'))
      print(iris.sepal_width.sum())  # Metode segera dieksekusi setelah sistem menampilkan informasi.

    • Tampilkan Detail

      Untuk menampilkan detail, Anda harus mengatur options.verbose ke True. Secara default, parameter ini diatur ke True di DataWorks. Sistem menampilkan detail seperti URL Logview selama proses berjalan.

Contoh

Contoh berikut menjelaskan cara menggunakan node PyODPS:

  1. Siapkan dataset dan buat tabel bernama pyodps_iris. Untuk informasi lebih lanjut, lihat Pemrosesan Data DataFrame.

  2. Buat objek DataFrame. Untuk informasi lebih lanjut, lihat Buat Objek DataFrame dari Tabel MaxCompute.

  3. Masukkan kode berikut di editor kode node PyODPS dan jalankan kode:

    from odps.df import DataFrame
    
    # Buat objek DataFrame menggunakan tabel MaxCompute.
    iris = DataFrame(o.get_table('pyodps_iris'))
    print(iris.sepallength.head(5))

    Informasi berikut dikembalikan:

       sepallength
    0          4.5
    1          5.5
    2          4.9
    3          5.0
    4          6.0

Contoh pengeditan kode lanjutan

Jika Anda ingin node PyODPS dijadwalkan secara berkala, Anda harus mendefinisikan properti penjadwalan untuk node tersebut. Untuk informasi lebih lanjut, lihat Ikhtisar.

Konfigurasikan parameter penjadwalan

Di panel navigasi kanan tab konfigurasi node, klik tab Properties. Di bagian Scheduling Parameter tab Properti, konfigurasikan parameter penjadwalan untuk node tersebut. Metode konfigurasi parameter penjadwalan untuk node PyODPS berbeda dari metode konfigurasi parameter penjadwalan untuk node SQL. Untuk informasi lebih lanjut, lihat Konfigurasikan Parameter Penjadwalan untuk Berbagai Jenis Node.

Berbeda dengan node SQL di DataWorks, string seperti ${param_name} tidak diganti dalam kode node PyODPS. Sebaliknya, sebuah kamus bernama args ditambahkan ke node PyODPS sebagai variabel global sebelum kode node dijalankan. Anda dapat memperoleh parameter penjadwalan dari kamus tersebut. Dengan cara ini, kode Python tidak terpengaruh. Misalnya, jika Anda mengatur ds=${yyyymmdd} di bagian Scheduling Parameter tab Properti, Anda dapat menjalankan perintah berikut untuk mendapatkan nilai parameter:

print('ds=' + args['ds'])
ds=20161116
Catatan

Anda dapat menjalankan perintah berikut untuk mendapatkan partisi bernama ds:

o.get_table('table_name').get_partition('ds=' + args['ds'])

Untuk informasi lebih lanjut tentang cara mengembangkan tugas PyODPS dalam skenario lain, lihat topik berikut:

Apa yang Harus Dilakukan Selanjutnya

  • Tentukan Apakah Skrip Shell Kustom Berhasil Dijalankan: Logika untuk menentukan apakah skrip Python kustom berhasil dijalankan sama dengan logika untuk menentukan apakah skrip Shell kustom berhasil dijalankan. Anda dapat menggunakan metode ini untuk menentukan apakah skrip Python kustom berhasil dijalankan.

  • Deploy Node PyODPS 3: Jika Anda menggunakan ruang kerja dalam mode standar, Anda harus menerapkan node PyODPS 3 ke lingkungan produksi sebelum node PyODPS 3 dapat dijadwalkan secara berkala.

  • Lakukan O&M pada Node PyODPS 3: Setelah node PyODPS 3 diterapkan ke Pusat Operasi di lingkungan produksi, Anda dapat melakukan operasi O&M pada node tersebut di Pusat Operasi.

  • Pelajari FAQ tentang PyODPS: Anda dapat mempelajari FAQ tentang PyODPS. Dengan cara ini, Anda dapat mengidentifikasi dan menyelesaikan masalah secara efisien ketika terjadi pengecualian.