Anda dapat menggunakan node PyODPS 3 di DataWorks untuk mengembangkan tugas MaxCompute dengan Python 3 dan menjadwalkannya agar berjalan secara berkala. Topik ini menjelaskan cara membuat dan mengonfigurasi tugas PyODPS 3.
Prasyarat
Node PyODPS 3 telah dibuat. Untuk informasi selengkapnya, lihat Buat dan kelola node MaxCompute.
Informasi latar belakang
PyODPS adalah SDK Python untuk MaxCompute yang menyediakan antarmuka untuk mengembangkan tugas MaxCompute, melakukan kueri terhadap tabel dan view, serta mengelola resource. Untuk informasi selengkapnya, lihat PyODPS. Node PyODPS di DataWorks memungkinkan Anda menjadwalkan tugas Python dan mengintegrasikannya dengan tugas lainnya.
Catatan
Jika kode PyODPS Anda berjalan pada resource group DataWorks dan memerlukan paket pihak ketiga, Anda dapat menginstalnya pada Serverless resource group melalui Custom images.
CatatanMetode ini tidak mendukung UDF yang mereferensikan paket pihak ketiga. Untuk informasi tentang cara mengonfigurasi UDF dengan dependensi pihak ketiga, lihat Contoh UDF: Gunakan paket pihak ketiga dalam UDF Python.
Untuk melakukan upgrade PyODPS pada Serverless resource group, jalankan
/home/tops/bin/pip3 install pyodps==0.12.1melalui Custom images (ganti0.12.1dengan versi yang diinginkan). Untuk exclusive resource group for scheduling, jalankan perintah yang sama melalui O&M Assistant.
Untuk mengakses sumber data atau layanan dalam lingkungan jaringan tertentu (seperti VPC atau IDC), gunakan Serverless resource group dan konfigurasikan konektivitas jaringan. Lihat Solusi konektivitas jaringan.
Untuk informasi selengkapnya tentang sintaksis PyODPS, lihat Dokumentasi PyODPS.
DataWorks mendukung node PyODPS 2 (Python 2) dan PyODPS 3 (Python 3). Pilih jenis node yang sesuai dengan versi Python Anda.
Jika pernyataan SQL yang dieksekusi dalam node PyODPS tidak menghasilkan alur data di Peta Data, Anda dapat mengonfigurasi parameter penjadwalan secara manual dalam kode untuk mengatasi masalah ini. Untuk informasi selengkapnya tentang cara melihat alur data, lihat Lihat informasi alur data. Untuk informasi selengkapnya tentang pengaturan parameter, lihat Atur parameter waktu proses (petunjuk). Anda dapat menggunakan kode berikut untuk memperoleh parameter yang diperlukan untuk eksekusi tugas:
import os ... # get DataWorks scheduler runtime parameters skynet_hints = {} for k, v in os.environ.items(): if k.startswith('SKYNET_'): skynet_hints[k] = v ... # setting hints while submitting a task o.execute_sql('INSERT OVERWRITE TABLE XXXX SELECT * FROM YYYY WHERE ***', hints=skynet_hints) ...
Log output node PyODPS dibatasi hingga 4 MB. Hindari menampilkan dataset besar; kami menyarankan hanya menggunakan log untuk alert dan pelacakan progres.
Batasan
Saat menggunakan exclusive resource group for scheduling, batasi pemrosesan data lokal hingga 50 MB. Penggunaan memori yang berlebihan dapat memicu error OOM (Got Killed) akibat batasan resource. Hindari pemrosesan data dalam jumlah besar secara lokal. Untuk informasi selengkapnya, lihat Praktik terbaik untuk penggunaan PyODPS yang efisien.
Saat Anda menggunakan Serverless resource group untuk menjalankan node PyODPS, konfigurasikan CU untuk node PyODPS berdasarkan jumlah data yang perlu diproses.
CatatanUntuk Serverless resource group, konfigurasi maksimum per tugas adalah
64CU. Kami menyarankan batas16CUuntuk mencegah konflik sumber daya, yang dapat menunda eksekusi tugas.Error Got killed menunjukkan bahwa penggunaan memori melebihi batas. Hindari operasi data lokal. Tugas SQL dan DataFrame yang diinisiasi melalui PyODPS (kecuali `to_pandas`) tidak terkena batasan ini.
Kode selain UDF dapat menggunakan paket NumPy dan Pandas yang telah pra-instal. Paket pihak ketiga lain yang berisi kode biner tidak didukung.
Untuk kompatibilitas, DataWorks mengatur options.tunnel.use_instance_tunnel ke False secara default. Untuk mengaktifkan instance tunnel secara global, atur nilai ini ke True.
Versi minor Python 3 yang berbeda (seperti Python 3.8 dan Python 3.7) memiliki definisi bytecode yang berbeda.
MaxCompute saat ini mendukung Python 3.7. Menggunakan sintaksis spesifik versi lain (seperti blok `finally` di Python 3.8) akan menyebabkan error. Kami menyarankan Anda menggunakan Python 3.7.
PyODPS 3 mendukung eksekusi pada Serverless resource group. Untuk membeli Serverless resource group, lihat Gunakan serverless resource groups.
Node PyODPS tidak mendukung eksekusi konkuren beberapa tugas Python.
Edit kode: Contoh sederhana
Setelah membuat node PyODPS, Anda dapat menulis dan menjalankan kode. Untuk informasi selengkapnya tentang sintaksis PyODPS, lihat Ikhtisar.
Titik masuk ODPS
Node PyODPS DataWorks mencakup variabel global, odps atau o, yang berfungsi sebagai titik masuk. Anda tidak perlu mendefinisikannya secara manual.
print(odps.exist_table('PyODPS_iris'))Eksekusi SQL
Anda dapat mengeksekusi pernyataan SQL dalam node PyODPS. Untuk informasi selengkapnya, lihat SQL.
Secara default, DataWorks menonaktifkan instance tunnel. Dalam mode ini, instance.open_reader menggunakan antarmuka Result, yang dibatasi hingga 10.000 record. Anda dapat menggunakan reader.count untuk mendapatkan jumlah record. Untuk membaca semua data, Anda harus menonaktifkan pembatasan
limit. Gunakan pernyataan berikut untuk mengaktifkan instance tunnel secara global dan menonaktifkan pembatasanlimit.options.tunnel.use_instance_tunnel = True options.tunnel.limit_instance_tunnel = False # Disable the limit to read all data. with instance.open_reader() as reader: # Use the instance tunnel to read all data.Alternatifnya, tambahkan
tunnel=Trueke untuk mengaktifkan instance tunnel untuk panggilan tertentu. Anda juga dapat menambahkanlimit=Falseuntuk menonaktifkan pembatasanlimituntuk panggilan tersebut.# Use the instance tunnel interface for this open_reader call and read all data. with instance.open_reader(tunnel=True, limit=False) as reader:
Atur parameter waktu proses
Anda dapat menggunakan parameter hints (tipe: dict) untuk mengonfigurasi parameter waktu proses. Untuk informasi selengkapnya tentang hints, lihat Operasi SET.
o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})Jika Anda mengonfigurasi sql.settings secara global, Anda harus menerapkan parameter waktu proses terkait untuk setiap eksekusi.
from odps import options options.sql.settings = {'odps.sql.mapper.split.size': 16} o.execute_sql('select * from PyODPS_iris') # Add hints based on the global configuration.
Baca hasil eksekusi
Instans yang dihasilkan oleh eksekusi SQL dapat langsung memanggil open_reader. Ada dua skenario:
Pernyataan SQL mengembalikan data terstruktur.
with o.execute_sql('select * from dual').open_reader() as reader: for record in reader: # Process each record.Pernyataan SQL (seperti `desc`) mengembalikan data tidak terstruktur. Anda dapat menggunakan atribut reader.raw untuk mengambil hasil eksekusi mentah.
with o.execute_sql('desc dual').open_reader() as reader: print(reader.raw)CatatanJika Anda menggunakan parameter penjadwalan kustom dan memicu node PyODPS 3 secara manual dari halaman konfigurasi, Anda harus mengkodekan waktu secara eksplisit karena node PyODPS tidak dapat melakukan penggantian parameter dalam mode ini.
DataFrame
Anda juga dapat menggunakan DataFrame untuk memproses data.
Eksekusi
Di DataWorks, Anda harus secara eksplisit memanggil metode eksekusi langsung untuk mengeksekusi DataFrame.
from odps.df import DataFrame iris = DataFrame(o.get_table('pyodps_iris')) for record in iris[iris.sepal_width < 3].execute(): # Call the immediate execution method to process each record.Jika Anda ingin memicu eksekusi langsung saat mencetak, Anda harus mengaktifkan
options.interactive.from odps import options from odps.df import DataFrame options.interactive = True # Enable the switch at the beginning. iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepal_width.sum()) # Immediate execution is triggered during printing.Cetak detail
Atur opsi
options.verboseke True. Di DataWorks, opsi ini diaktifkan secara default, dan detail seperti URL Logview dicetak selama eksekusi.
Contoh
Contoh sederhana berikut menunjukkan cara menggunakan node PyODPS:
Siapkan set data dan buat tabel sampel pyodps_iris. Untuk informasi selengkapnya, lihat Gunakan DataFrame untuk memproses data.
Buat DataFrame. Untuk informasi selengkapnya, lihat Buat objek DataFrame dari tabel MaxCompute.
Masukkan kode berikut ke dalam node PyODPS dan jalankan.
from odps.df import DataFrame # Create a DataFrame from a MaxCompute table. iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepallength.head(5))Hasil pengembalian:
sepallength 0 4.5 1 5.5 2 4.9 3 5.0 4 6.0
Edit kode: Contoh lanjutan
Jika node memerlukan penjadwalan berkala, konfigurasikan properti penjadwalannya. Untuk informasi selengkapnya tentang konfigurasi penjadwalan, lihat Ikhtisar.
Gunakan parameter penjadwalan
Klik Properties di editor node. Di bagian Parameters, konfigurasikan parameter kustom. Mendefinisikan variabel dalam node PyODPS berbeda dari node SQL. Untuk informasi selengkapnya, lihat Konfigurasi parameter penjadwalan.
Tidak seperti node SQL, node PyODPS tidak melakukan penggantian string untuk placeholder seperti ${param_name} untuk mencegah konflik dengan kode Python. Sebagai gantinya, DataWorks menyuntikkan dictionary global args sebelum eksekusi. Anda dapat mengambil parameter dari dictionary ini. Misalnya, jika Anda mengatur ds=${yyyymmdd} di Parameters, Anda dapat menggunakan kode berikut untuk mendapatkan parameter ini:
print('ds=' + args['ds'])
ds=20161116Jika Anda perlu mendapatkan partisi bernama ds, gunakan metode berikut:
o.get_table('table_name').get_partition('ds=' + args['ds'])Untuk skenario lain terkait pengembangan tugas PyODPS, lihat:
Langkah selanjutnya
Tentukan keberhasilan tugas skrip Shell kustom: Logika keberhasilan untuk tugas skrip Python kustom sama dengan node Shell. Anda dapat menggunakan metode ini untuk verifikasi.
Publikasikan tugas: Jika Anda menggunakan ruang kerja dalam mode standar, Anda harus men-deploy tugas ke lingkungan produksi melalui proses penerbitan agar penjadwalan berkala dapat diaktifkan.
O&M tugas berkala: Setelah tugas di-deploy ke Operation Center untuk penjadwalan, Anda dapat melakukan operasi O&M di Operation Center DataWorks.
FAQ PyODPS: Anda dapat mempelajari isu umum selama eksekusi PyODPS untuk mempercepat pemecahan masalah dan penyelesaian exception.
FAQ
Q: Node PyODPS 3 saya berfungsi selama pengembangan lokal tetapi timeout di Operation Center saat mengakses API pihak ketiga (misalnya, Feishu). Mengapa?
A: Anda harus mengonfigurasi daftar putih sandbox. Buka , lalu di bagian Security Settings, tambahkan titik akhir API pihak ketiga ke daftar putih untuk mengizinkan akses. Contohnya:
