DataWorks menyediakan node PyODPS 3 yang memungkinkan Anda menulis pekerjaan MaxCompute langsung dalam Python dan mengonfigurasi penjadwalan berkala.
Ikhtisar
PyODPS adalah Python Software Development Kit (SDK) untuk MaxCompute. Antarmuka pemrogramannya yang sederhana memungkinkan Anda menulis pekerjaan, melakukan kueri terhadap tabel dan tampilan (views), serta mengelola resource MaxCompute. Untuk informasi selengkapnya, lihat PyODPS. Di DataWorks, Anda dapat menggunakan node PyODPS untuk menjadwalkan dan menjalankan tugas Python serta mengintegrasikannya dengan pekerjaan lain.
Pertimbangan
Saat menjalankan node PyODPS pada kelompok sumber daya DataWorks, Anda dapat menginstal paket pihak ketiga pada Serverless resource group jika kode Anda memerlukannya. Untuk informasi selengkapnya, lihat Custom images.
CatatanMetode ini tidak mendukung User-Defined Functions (UDFs) yang mereferensikan paket pihak ketiga. Untuk metode konfigurasi yang benar, lihat Contoh UDF: Gunakan paket pihak ketiga dalam UDF Python.
Untuk meningkatkan versi PyODPS, jalankan perintah yang dijelaskan di Custom images. Anda dapat mengganti
0.12.1dengan versi yang ingin Anda tingkatkan. Pada Serverless resource group, jalankan perintah tersebut sesuai petunjuk diInstall a third-party package. Pada exclusive resource group for scheduling, ikuti petunjuk di O&M Assistant.Jika tugas PyODPS Anda perlu mengakses lingkungan jaringan tertentu, seperti sumber data atau layanan di jaringan VPC atau IDC, gunakan Serverless resource group. Konfigurasikan konektivitas jaringan antara Serverless resource group dan lingkungan target. Untuk informasi selengkapnya, lihat Solusi konektivitas jaringan.
Untuk informasi lebih lanjut tentang sintaksis PyODPS, lihat Dokumentasi PyODPS.
Node PyODPS tersedia dalam dua jenis, yaitu PyODPS 2 dan PyODPS 3, yang masing-masing menggunakan Python 2 dan Python 3. Buat jenis node yang sesuai dengan versi Python yang Anda gunakan.
Jika pernyataan SQL yang dieksekusi dalam node PyODPS tidak menghasilkan alur data yang benar di Data Map, Anda dapat mengatur parameter waktu proses (runtime parameters) untuk penjadwalan DataWorks secara manual dalam kode tugas Anda. Untuk melihat alur data, lihat Lihat alur data. Untuk mengatur parameter, lihat Atur parameter waktu proses (hints). Anda dapat menggunakan contoh kode berikut untuk mendapatkan parameter waktu proses yang diperlukan.
import os ... # Dapatkan parameter waktu proses penjadwal DataWorks skynet_hints = {} for k, v in os.environ.items(): if k.startswith('SKYNET_'): skynet_hints[k] = v ... # Atur hints saat Anda mengirimkan tugas o.execute_sql('INSERT OVERWRITE TABLE XXXX SELECT * FROM YYYY WHERE ***', hints=skynet_hints) ...Batas maksimum output log untuk node PyODPS adalah 4 MB. Hindari mencetak hasil data besar secara langsung ke log. Sebagai gantinya, fokuslah pada output informasi bernilai, seperti log peringatan dan pembaruan progres.
Batasan
Saat menggunakan exclusive resource group for scheduling untuk menjalankan node PyODPS, jumlah data yang diproses secara lokal dalam node tersebut tidak boleh melebihi 50 MB. Batasan ini ditentukan oleh spesifikasi exclusive resource group for scheduling. Memproses data lokal yang berlebihan dapat melebihi ambang batas sistem operasi dan menyebabkan error Out of Memory (OOM), yang ditandai dengan pesan `Got Killed`. Hindari menulis logika pemrosesan data intensif dalam node PyODPS. Untuk informasi selengkapnya, lihat Praktik terbaik penggunaan PyODPS secara efisien.
Saat menggunakan Serverless resource group untuk menjalankan node PyODPS, Anda dapat mengonfigurasi CUs untuk node tersebut berdasarkan jumlah data yang perlu diproses.
CatatanSatu tugas dapat dikonfigurasi hingga maksimal
64CUs, tetapi penggunaan lebih dari16CUs tidak disarankan karena dapat menyebabkan kekurangan resource yang memengaruhi startup tugas.Error Got Killed menunjukkan bahwa proses dihentikan karena kondisi Out of Memory (OOM). Oleh karena itu, usahakan meminimalkan operasi data lokal. Tugas SQL dan DataFrame yang diinisiasi melalui PyODPS, kecuali
to_pandas, tidak terkena batasan ini.Kode yang bukan bagian dari User-Defined Function (UDF) dapat menggunakan paket Numpy dan Pandas yang telah dipra-instal. Paket pihak ketiga lain yang berisi kode biner tidak didukung.
Karena alasan kompatibilitas,
options.tunnel.use_instance_tunnelsecara default diatur keFalsedi DataWorks. Untuk mengaktifkaninstance tunnelsecara global, Anda harus mengatur nilai ini secara manual menjadiTrue.Definisi bytecode berbeda antar versi minor Python 3, seperti Python 3.8 dan Python 3.7.
MaxCompute saat ini menggunakan Python 3.7. Penggunaan sintaksis dari versi Python 3 lain, seperti blok
finallydi Python 3.8, akan menyebabkan error eksekusi. Kami menyarankan Anda menggunakan Python 3.7.PyODPS 3 dapat dijalankan pada Serverless resource group. Untuk membeli dan menggunakan resource ini, lihat Gunakan serverless resource groups.
Satu node PyODPS tidak mendukung eksekusi konkuren beberapa tugas Python.
Untuk mencetak log dari node PyODPS, gunakan fungsi
print. Fungsilogger.infosaat ini tidak didukung.
Sebelum memulai
Ikatkan resource komputasi MaxCompute compute resource ke ruang kerja DataWorks Anda.
Prosedur
Di editor node PyODPS 3, lakukan langkah-langkah pengembangan berikut.
Contoh kode PyODPS 3
Setelah membuat node PyODPS, Anda dapat mengedit dan menjalankan kode. Untuk informasi lebih lanjut tentang sintaksis PyODPS, lihat Ikhtisar. Bagian ini menyediakan lima contoh kode. Pilih contoh yang paling sesuai dengan kebutuhan bisnis Anda.
Titik masuk ODPS
DataWorks menyediakan variabel global
odps(atauo) sebagai titik masuk ODPS, sehingga Anda tidak perlu mendefinisikannya secara manual.print(odps.exist_table('PyODPS_iris'))Eksekusi pernyataan SQL
Anda dapat mengeksekusi pernyataan SQL dalam node PyODPS. Untuk informasi selengkapnya, lihat SQL.
Di DataWorks,
instance tunneldinonaktifkan secara default. Artinya,instance.open_readermenggunakan antarmuka Result, yang mengembalikan maksimal 10.000 record. Anda dapat menggunakanreader.countuntuk mendapatkan jumlah record. Untuk mengiterasi seluruh data, Anda harus menonaktifkanlimit. Anda dapat menggunakan pernyataan berikut untuk mengaktifkaninstance tunnelsecara global dan menonaktifkanlimit.options.tunnel.use_instance_tunnel = True options.tunnel.limit_instance_tunnel = False # Nonaktifkan limit untuk membaca semua data. with instance.open_reader() as reader: # Anda dapat membaca semua data melalui Instance Tunnel.Alternatifnya, Anda dapat menambahkan
tunnel=Trueke pemanggilanopen_readeruntuk mengaktifkaninstance tunnelhanya untuk operasi tersebut. Anda juga dapat menambahkanlimit=Falseuntuk menonaktifkanlimituntuk operasi tersebut.# Gunakan antarmuka Instance Tunnel untuk operasi open_reader ini agar membaca semua data. with instance.open_reader(tunnel=True, limit=False) as reader:
Atur parameter waktu proses
Anda dapat mengatur parameter waktu proses menggunakan parameter
hints, yang bertipedict. Untuk informasi selengkapnya tentang hints, lihat Operasi SET.o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})Jika Anda mengonfigurasi
sql.settingssecara global, parameter waktu proses yang ditentukan akan ditambahkan ke setiap eksekusi.from odps import options options.sql.settings = {'odps.sql.mapper.split.size': 16} o.execute_sql('select * from PyODPS_iris') # Tambahkan hints berdasarkan konfigurasi global.
Membaca hasil eksekusi
Instans yang menjalankan pernyataan SQL dapat langsung melakukan operasi
open_readerdalam dua skenario berikut:Pernyataan SQL mengembalikan data terstruktur.
with o.execute_sql('select * from dual').open_reader() as reader: for record in reader: # Proses setiap record.Pernyataan SQL merupakan perintah seperti
desc. Anda dapat menggunakan atributreader.rawuntuk mendapatkan hasil eksekusi SQL mentah.with o.execute_sql('desc dual').open_reader() as reader: print(reader.raw)CatatanSaat menjalankan node PyODPS 3 langsung dari editor, Anda harus menetapkan nilai parameter penjadwalan kustom secara eksplisit karena node tidak secara otomatis mengganti placeholder.
DataFrame
Anda juga dapat menggunakan DataFrame (tidak disarankan) untuk memproses data.
Eksekusi
Di DataWorks, Anda harus secara eksplisit memanggil metode eksekusi langsung untuk mengeksekusi DataFrame.
from odps.df import DataFrame iris = DataFrame(o.get_table('pyodps_iris')) for record in iris[iris.sepal_width < 3].execute(): # Panggil metode eksekusi langsung untuk memproses setiap Record.Jika Anda perlu memicu eksekusi langsung dengan pernyataan
print, Anda harus mengaktifkanoptions.interactive.from odps import options from odps.df import DataFrame options.interactive = True # Aktifkan opsi di awal. iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepal_width.sum()) # Eksekusi langsung dipicu oleh print().Mencetak informasi detail
Anda dapat mengatur opsi
options.verboseuntuk mencetak informasi detail. Di DataWorks, opsi ini diaktifkan secara default dan mencetak detail seperti URL Logview selama eksekusi.
Mengembangkan kode PyODPS 3
Contoh berikut menunjukkan cara menggunakan node PyODPS:
Siapkan set data. Buat tabel sampel pyodps_iris. Untuk petunjuk, lihat Gunakan DataFrame untuk memproses data.
Buat DataFrame. Untuk informasi selengkapnya, lihat Buat DataFrame dari tabel MaxCompute.
Masukkan kode berikut ke dalam node PyODPS dan jalankan.
from odps.df import DataFrame # Buat DataFrame dari tabel ODPS. iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepallength.head(5))
Jalankan tugas PyODPS
Pada bagian Run Configuration, konfigurasikan Compute resources, Compute quota, dan DataWorks resource group.
CatatanUntuk mengakses sumber data di jaringan publik atau jaringan VPC, Anda harus menggunakan resource group for scheduling yang telah lulus uji konektivitas dengan sumber data tersebut. Untuk informasi selengkapnya, lihat Solusi konektivitas jaringan.
Anda dapat mengonfigurasi Image berdasarkan kebutuhan tugas.
Di kotak dialog parameter pada toolbar, pilih sumber data MaxCompute yang telah Anda buat dan klik Run.
Untuk menjalankan tugas node sesuai jadwal, konfigurasikan properti penjadwalannya sesuai kebutuhan bisnis Anda. Untuk informasi selengkapnya, lihat Konfigurasi penjadwalan node.
Berbeda dengan node SQL di DataWorks, node PyODPS tidak mengganti string seperti `${param_name}` dalam kode untuk menghindari substitusi string yang tidak diinginkan. Sebagai gantinya, sebelum kode dieksekusi, sebuah
dictbernamaargsditambahkan ke variabel global. Anda dapat mengambil parameter penjadwalan daridictini. Misalnya, jika Anda mengatur ds=${yyyymmdd} di tabParameters, Anda dapat mengambil parameter ini dalam kode sebagai berikut.print('ds=' + args['ds']) ds=20240930CatatanUntuk mendapatkan partisi bernama
ds, Anda dapat menggunakan metode berikut.o.get_table('table_name').get_partition('ds=' + args['ds'])Setelah mengonfigurasi tugas node, Anda harus menerapkannya. Untuk informasi selengkapnya, lihat Penerapan node dan alur kerja.
Setelah tugas diterapkan, Anda dapat melihat status tugas terjadwal di Pusat Operasi. Untuk informasi selengkapnya, lihat Memulai Pusat Operasi.
Menjalankan node dengan peran yang terkait
Anda dapat mengaitkan role RAM tertentu untuk menjalankan tugas node. Hal ini memungkinkan kontrol izin detail halus dan keamanan yang lebih baik.
Langkah selanjutnya
FAQ PyODPS: Pelajari isu umum eksekusi PyODPS untuk membantu Anda memecahkan dan menyelesaikan exception dengan cepat.