DataWorks menyediakan node PyODPS 2 untuk mengembangkan tugas menggunakan sintaks PyODPS. Sebagai Python SDK untuk MaxCompute, PyODPS memungkinkan Anda menulis kode Python guna mengoperasikan MaxCompute secara langsung di dalam node tersebut.
Ikhtisar
PyODPS adalah Python SDK untuk MaxCompute yang menyediakan antarmuka pemrograman untuk menulis pekerjaan, melakukan kueri terhadap tabel dan tampilan (view), serta mengelola resource MaxCompute. Untuk informasi selengkapnya, lihat PyODPS. Di DataWorks, Anda dapat menggunakan node PyODPS untuk menjalankan dan menjadwalkan tugas Python serta mengintegrasikannya dengan pekerjaan lain.
Catatan penggunaan
Jika kode PyODPS Anda memerlukan paket pihak ketiga, Anda dapat menginstalnya saat menjalankan node pada Serverless Resource Group. Untuk informasi selengkapnya, lihat Custom images.
CatatanJika kode Anda mencakup user-defined function (UDF) yang mereferensikan paket pihak ketiga, metode di atas tidak dapat digunakan. Untuk informasi tentang metode konfigurasi yang benar, lihat Contoh UDF: Gunakan paket pihak ketiga dalam UDF Python.
Jika tugas PyODPS Anda perlu mengakses lingkungan jaringan tertentu, seperti sumber data atau layanan di VPC atau IDC, gunakan Serverless Resource Group dan buat koneksi jaringan antara Serverless Resource Group dengan lingkungan target. Untuk informasi selengkapnya, lihat Solusi konektivitas jaringan.
Untuk informasi lebih lanjut tentang sintaks PyODPS dan detail lainnya, lihat Dokumentasi PyODPS.
Node PyODPS tersedia dalam dua jenis: PyODPS 2 dan PyODPS 3. Perbedaannya terletak pada versi Python yang mendasarinya. Node PyODPS 2 menggunakan Python 2, sedangkan node PyODPS 3 menggunakan Python 3. Pilih jenis node yang sesuai dengan versi Python Anda.
Jika pernyataan SQL dalam node PyODPS gagal menghasilkan alur data (Data Lineage) sehingga tidak ditampilkan di Peta Data (Data Map), Anda dapat mengonfigurasi manual Parameter Penjadwalan terkait untuk DataWorks dalam kode tugas Anda guna mengatasi masalah tersebut. Untuk melihat alur data, lihat Lihat alur data. Untuk informasi tentang pengaturan parameter, lihat Atur parameter waktu proses (hints). Anda dapat menggunakan kode berikut untuk mendapatkan parameter yang diperlukan saat waktu proses.
import os # ... # Dapatkan parameter waktu proses penjadwal DataWorks. skynet_hints = {} for k, v in os.environ.items(): if k.startswith('SKYNET_'): skynet_hints[k] = v # ... # Atur hints saat mengirimkan tugas. o.execute_sql('INSERT OVERWRITE TABLE XXXX SELECT * FROM YYYY WHERE ***', hints=skynet_hints) # ...Hindari mencetak data dalam jumlah besar ke log. Cukup catat informasi penting saja, seperti peringatan dan pembaruan progres.
Batasan
Saat menggunakan Exclusive Resource Group for Scheduling untuk menjalankan node PyODPS, kami menyarankan agar jumlah data yang diproses secara lokal di dalam node tidak melebihi 50 MB. Operasi ini dibatasi oleh spesifikasi Exclusive Resource Group for Scheduling. Memproses data lokal yang berlebihan hingga melebihi ambang batas sistem operasi dapat menyebabkan error Out-of-Memory (OOM), yang ditandai dengan pesan "Got killed". Hindari menulis kode pemrosesan data yang ekstensif di node PyODPS. Untuk informasi selengkapnya, lihat Praktik terbaik penggunaan PyODPS yang efisien.
Saat menggunakan Serverless Resource Group untuk menjalankan node PyODPS, Anda dapat mengonfigurasi CU untuk node tersebut berdasarkan jumlah data yang perlu diproses.
CatatanSaat menjalankan tugas menggunakan Serverless Resource Group, Anda dapat mengonfigurasi maksimal
64 CUuntuk satu tugas. Namun, kami menyarankan agar tidak melebihi16 CUuntuk mencegah kegagalan startup tugas akibat kekurangan resource.Error Got killed menunjukkan bahwa penggunaan memori telah melebihi batas, sehingga proses dihentikan. Untuk mencegah hal ini, hindari melakukan operasi data secara lokal. Tugas SQL dan DataFrame yang diinisiasi melalui PyODPS, kecuali
to_pandas, tidak terkena batasan ini.Anda dapat menggunakan library Numpy dan Pandas yang telah pra-instal dalam kode Anda, tetapi tidak di dalam UDF. Paket pihak ketiga lain yang mengandung kode biner tidak didukung.
Untuk kompatibilitas,
options.tunnel.use_instance_tunnelsecara default bernilaiFalsedi DataWorks. Untuk mengaktifkan Instance Tunnel secara global, Anda harus mengatur nilai ini secara manual menjadiTrue.Versi Python yang mendasari node PyODPS 2 adalah 2.7.
Menjalankan beberapa tugas Python secara konkuren dalam satu node PyODPS tidak didukung.
Gunakan
printuntuk mencetak log di node PyODPS.logger.infotidak didukung.
Sebelum memulai
Ikatkan resource komputasi MaxCompute compute resource ke ruang kerja DataWorks Anda.
Prosedur
Pada halaman pengeditan node PyODPS 2, lakukan operasi pengembangan berikut.
Contoh kode PyODPS 2
Setelah membuat node PyODPS, Anda dapat mengedit dan menjalankan kode Anda. Untuk informasi selengkapnya tentang sintaks PyODPS, lihat Ikhtisar. Dokumen ini menyediakan lima contoh kode. Pilih salah satu yang paling sesuai dengan kebutuhan bisnis Anda.
Titik masuk ODPS
Node PyODPS DataWorks mencakup variabel global,
odpsatauo, yang berfungsi sebagai titik masuk ODPS. Anda tidak perlu mendefinisikannya secara manual.print(odps.exist_table('PyODPS_iris'))Eksekusi SQL
Anda dapat mengeksekusi SQL di node PyODPS. Untuk informasi selengkapnya, lihat SQL.
Secara default,
Instance Tunneltidak diaktifkan di DataWorks, artinyainstance.open_readermenggunakan Result API dan mengembalikan maksimal 10.000 record. Anda dapat menggunakanreader.countuntuk mendapatkan jumlah record. Untuk mengiterasi seluruh data, Anda harus menonaktifkan pembatasanlimit. Anda dapat menggunakan pernyataan berikut untuk mengaktifkanInstance Tunneldan menonaktifkan pembatasanlimitsecara global.options.tunnel.use_instance_tunnel = True options.tunnel.limit_instance_tunnel = False # Nonaktifkan limit untuk membaca seluruh data. with instance.open_reader() as reader: # Anda dapat membaca seluruh data melalui Instance Tunnel.Anda juga dapat menambahkan
tunnel=Truekeopen_readeruntuk mengaktifkanInstance Tunnelhanya untuk operasiopen_readersaat ini. Selain itu, Anda dapat menambahkanlimit=Falseuntuk menonaktifkan pembatasanlimithanya untuk operasi tersebut.# Operasi open_reader ini menggunakan Instance Tunnel API dan dapat membaca seluruh data. with instance.open_reader(tunnel=True, limit=False) as reader:
Atur parameter waktu proses
Anda dapat mengatur parameter waktu proses dengan mengonfigurasi parameter
hints, yang bertipedict. Untuk informasi selengkapnya tentang hints, lihat Operasi SET.o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})Jika Anda mengonfigurasi
sql.settingsdi pengaturan global, DataWorks akan menambahkan parameter waktu proses ini ke setiap eksekusi.from odps import options options.sql.settings = {'odps.sql.mapper.split.size': 16} o.execute_sql('select * from PyODPS_iris') # Menambahkan hints berdasarkan konfigurasi global.
Baca hasil eksekusi
Instans yang menjalankan SQL dapat langsung mengeksekusi operasi
open_readerdalam dua skenario berikut:SQL mengembalikan data terstruktur.
with o.execute_sql('select * from dual').open_reader() as reader: for record in reader: # Proses setiap record.Pernyataan SQL seperti
DESCdieksekusi. Anda dapat menggunakan atributreader.rawuntuk mendapatkan hasil mentah dari eksekusi SQL.with o.execute_sql('desc dual').open_reader() as reader: print(reader.raw)CatatanSaat menjalankan manual node PyODPS 2 yang menggunakan Parameter Penjadwalan kustom, Anda harus menyematkan nilai waktu secara hardcoded. Node PyODPS tidak dapat langsung menggantikan parameter tersebut.
DataFrame
Anda juga dapat memproses data menggunakan DataFrame (tidak disarankan).
Eksekusi
Di lingkungan DataWorks, mengeksekusi DataFrame memerlukan pemanggilan eksplisit terhadap metode eksekusi langsung.
from odps.df import DataFrame iris = DataFrame(o.get_table('pyodps_iris')) for record in iris[iris.sepal_width < 3].execute(): # Panggil metode eksekusi langsung untuk memproses setiap record.Untuk memicu eksekusi langsung dengan
print, aktifkanoptions.interactive.from odps import options from odps.df import DataFrame options.interactive = True # Aktifkan opsi ini di awal. iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepal_width.sum()) # Memicu eksekusi langsung saat mencetak.Cetak informasi detail
Anda dapat mengatur opsi
options.verbose. Opsi ini secara default diaktifkan di DataWorks, dan informasi detail seperti URL Logview akan dicetak selama eksekusi.
Kembangkan kode PyODPS 2
Contoh sederhana berikut menunjukkan cara menggunakan node PyODPS:
Siapkan set data dengan membuat tabel sampel pyodps_iris. Untuk informasi selengkapnya, lihat Gunakan DataFrame untuk memproses data.
Buat DataFrame. Untuk informasi selengkapnya, lihat Buat objek DataFrame dari tabel MaxCompute.
Masukkan kode berikut di node PyODPS.
from odps.df import DataFrame # Buat DataFrame dari tabel ODPS. iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepallength.head(5))
Jalankan tugas PyODPS
Pada bagian Run Configuration Resource, konfigurasikan compute engine instance, DataWorks Resource Group, dan DataWorks Resource Group.
CatatanUntuk mengakses sumber data melalui jaringan publik atau di VPC, Anda harus menggunakan Resource Group yang dapat terhubung ke sumber data tersebut. Untuk informasi selengkapnya, lihat Solusi konektivitas jaringan.
Anda dapat mengonfigurasi parameter Image sesuai kebutuhan tugas Anda.
Pada bilah alat, klik Run untuk menjalankan tugas PyODPS.
Untuk menjalankan tugas node secara terjadwal, konfigurasikan properti penjadwalannya sesuai kebutuhan bisnis Anda. Untuk informasi selengkapnya, lihat Konfigurasi penjadwalan node.
Berbeda dengan node SQL di DataWorks, node PyODPS tidak mengganti string seperti ${param_name} dalam kode untuk menghindari modifikasi yang tidak diinginkan. Sebaliknya, sebelum kode dieksekusi, DataWorks menambahkan
dictbernamaargske variabel global. Anda dapat menggunakandictini untuk mengambil Parameter Penjadwalan. Misalnya, jika Anda mengatur parameter Parameters menjadids=${yyyymmdd}, Anda dapat mengakses parameter ini dalam kode sebagai berikut.print('ds=' + args['ds']) ds=20240930CatatanUntuk mendapatkan
Partitionbernamads, Anda dapat menggunakan metode berikut.o.get_table('table_name').get_partition('ds=' + args['ds'])Setelah mengonfigurasi tugas node, Anda harus menerbitkannya. Untuk informasi selengkapnya, lihat Penerapan node dan alur kerja.
Setelah tugas diterbitkan, Anda dapat membuka Pusat Operasi (Operation Center) untuk melihat detail eksekusi Tugas Berkala (Periodic Task). Untuk informasi selengkapnya, lihat Memulai Pusat Operasi.
Menjalankan node dengan peran yang terkait
Anda dapat mengaitkan peran RAM tertentu untuk menjalankan tugas node. Hal ini memungkinkan kontrol izin detail halus dan meningkatkan keamanan.
Langkah selanjutnya
FAQ PyODPS: Anda dapat mempelajari isu umum yang mungkin terjadi selama eksekusi PyODPS untuk mempercepat pemecahan masalah dan penanganan exception.