Topik ini menjelaskan cara menggunakan fungsi yang ditentukan pengguna (UDF) dan pustaka Python pihak ketiga.
Gunakan UDF
DataFrame memungkinkan Anda menggunakan metode map pada objek Sequence untuk memanggil UDF di semua elemennya.
>>> iris.sepallength.map(lambda x: x + 1).head(5)
sepallength
0 6.1
1 5.9
2 5.7
3 5.6
4 6.0Saat menggunakan UDF, tipe List atau Dict tidak dapat digunakan sebagai input atau output.
Jika tipe Sequence berubah setelah metode map dijalankan, Anda perlu secara eksplisit menentukan tipe baru dari Sequence.
>>> iris.sepallength.map(lambda x: 't'+str(x), 'string').head(5)
sepallength
0 t5.1
1 t4.9
2 t4.7
3 t4.6
4 t5.0Jika suatu fungsi mengandung closure, perubahan nilai variabel closure di luar fungsi menyebabkan nilai variabel dalam fungsi tersebut berubah.
>>> dfs = []
>>> for i in range(10):
>>> dfs.append(df.sepal_length.map(lambda x: x + i))Akibatnya, setiap objek SequenceExpr dalam dfs adalah df.sepal_length + 9. Untuk menyelesaikan masalah ini, Anda dapat menggunakan fungsi sebagai nilai kembalian dari fungsi lain, atau gunakan partial. Lihat dua contoh berikut.
>>> dfs = []
>>> def get_mapper(i):
>>> return lambda x: x + i
>>> for i in range(10):
>>> dfs.append(df.sepal_length.map(get_mapper(i)))>>> import functools
>>> dfs = []
>>> for i in range(10):
>>> dfs.append(df.sepal_length.map(functools.partial(lambda v, x: x + v, i)))Metode map juga mendukung UDF yang sudah ada. Anda dapat memasukkan parameter bertipe str, yang mewakili nama fungsi, atau objek Fungsi. Untuk informasi lebih lanjut, lihat Fungsi.
Saat mengimplementasikan metode map untuk fungsi Python, Anda harus menggunakan MaxCompute Python UDF. Jika proyek Anda tidak mendukung Python UDF, Anda tidak dapat menggunakan metode map. Selain itu, semua batasan Python UDF berlaku.
Pustaka pihak ketiga satu-satunya yang tersedia (berisi kode dalam bahasa C) adalah NumPy. Untuk informasi lebih lanjut tentang cara menggunakan pustaka Python pihak ketiga, lihat Gunakan pustaka Python pihak ketiga.
Selain UDF, DataFrame menyediakan banyak fungsi bawaan, beberapa di antaranya diimplementasikan dengan menggunakan fungsi map. Oleh karena itu, jika proyek Anda tidak mendukung Python UDF, Anda tidak dapat menggunakan fungsi-fungsi ini. Perhatikan bahwa layanan publik Alibaba Cloud tidak mendukung Python UDF.
Karena perbedaan dalam definisi byte code, jika Anda menggunakan fitur baru yang didukung oleh Python 3 (seperti yield from), kesalahan mungkin terjadi saat kode dieksekusi pada MaxCompute Worker dari Python 2.7. Oleh karena itu, kami sarankan Anda memastikan kode Anda berjalan normal sebelum menulis kode produksi dengan menggunakan API MapReduce di Python 3.
Contoh penggunaan counter:
from odps.udf import get_execution_context
def h(x):
ctx = get_execution_context()
counters = ctx.get_counters()
counters.get_counter('df', 'add_one').increment(1)
return x + 1
df.field.map(h)Anda dapat menemukan nilai counter di JSONSummary dari LogView.
Gunakan UDF untuk satu baris
Untuk menggunakan UDF untuk satu baris, Anda dapat menggunakan metode apply. Parameter axis harus disetel ke 1 untuk menunjukkan bahwa operasi dilakukan pada baris. UDF dari metode apply mengambil satu parameter, yaitu satu baris data dari objek Collection sebelumnya. Anda dapat mengambil data di bidang tertentu dengan menggunakan atribut atau offset.
Jika
parameter reduce disetel ke True, objek Sequence dikembalikan. Jika tidak, objek Collection dikembalikan. Anda dapat menggunakan parameternamesdantypesuntuk menentukan nama bidang dan jenis dari objek Sequence atau Collection yang dikembalikan. Secara default, jika Anda tidak menentukan jenis, jenis STRING digunakan.>>> iris.apply(lambda row: row.sepallength + row.sepalwidth, axis=1, reduce=True, types='float').rename('sepaladd').head(3) sepaladd 0 8.6 1 7.9 2 7.9Jika
reduceadalah False dalam UDF dari metodeapply, Anda dapat menggunakan kata kunciyielduntuk mengembalikan beberapa hasil baris.>>> iris.count() 150 >>> >>> def handle(row): >>> yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth >>> yield row.petallength - row.petalwidth, row.petallength + row.petalwidth >>> >>> iris.apply(handle, axis=1, names=['iris_add', 'iris_sub'], types=['float', 'float']).count() 300Anda juga dapat memberi anotasi pada nama bidang dan jenis yang dikembalikan dalam fungsi. Anda tidak perlu menentukannya saat memanggil fungsi.
>>> from odps.df import output >>> >>> @output(['iris_add', 'iris_sub'], ['float', 'float']) >>> def handle(row): >>> yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth >>> yield row.petallength - row.petalwidth, row.petallength + row.petalwidth >>> >>> iris.apply(handle, axis=1).count() 300Anda juga dapat menggunakan
map_reducedarimap-only, yang setara dengan metodeapplydenganaxis=1.>>> iris.map_reduce(mapper=handle).count() 300Untuk menggunakan fungsi tabel yang ditentukan pengguna (UDTF) yang sudah ada di MaxCompute, tentukan nama UDTF.
>>> iris['name', 'sepallength'].apply('your_func', axis=1, names=['name2', 'sepallength2'], types=['string', 'float'])Saat menggunakan metode
applyuntuk baris danreduceadalah False, Anda dapat menggunakan lateral view dengan baris yang ada untuk tujuan seperti agregasi.>>> from odps.df import output >>> >>> @output(['iris_add', 'iris_sub'], ['float', 'float']) >>> def handle(row): >>> yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth >>> yield row.petallength - row.petalwidth, row.petallength + row.petalwidth >>> >>> iris[iris.category, iris.apply(handle, axis=1)]
Gunakan agregasi kustom untuk semua kolom
Saat menggunakan metode apply, jika axis tidak ditentukan atau nilai dari axis adalah 0, Anda dapat memasukkan kelas agregasi kustom untuk menggabungkan semua objek Sequence.
class Agg(object):
def buffer(self):
return [0.0, 0]
def __call__(self, buffer, val):
buffer[0] += val
buffer[1] += 1
def merge(self, buffer, pbuffer):
buffer[0] += pbuffer[0]
buffer[1] += pbuffer[1]
def getvalue(self, buffer):
if buffer[1] == 0:
return 0.0
return buffer[0] / buffer[1]>>> iris.exclude('name').apply(Agg)
sepallength_aggregation sepalwidth_aggregation petallength_aggregation petalwidth_aggregation
0 5.843333 3.054 3.758667 1.198667Saat menggunakan UDF, Anda tidak dapat menggunakan tipe LIST atau DICT sebagai input atau output.
Sumber referensi
UDF juga dapat membaca sumber daya MaxCompute (seperti sumber daya tabel dan file) atau merujuk objek Collection sebagai sumber daya. Untuk melakukannya, Anda perlu menulis UDF sebagai closure atau kelas yang dapat dipanggil. Lihat dua contoh berikut.
>>> file_resource = o.create_resource('pyodps_iris_file', 'file', file_obj='Iris-setosa')
>>>
>>> iris_names_collection = iris.distinct('name')[:2]
>>> iris_names_collection
sepallength
0 Iris-setosa
1 Iris-versicolor>>> def myfunc(resources): # resources diteruskan sesuai urutan pemanggilan
>>> names = set()
>>> fileobj = resources[0] # sumber daya file direpresentasikan oleh objek mirip file
>>> for l in fileobj:
>>> names.add(l)
>>> collection = resources[1]
>>> for r in collection:
>>> names.add(r.name) # ambil nilai dengan menggunakan nama bidang atau offset
>>> def h(x):
>>> if x in names:
>>> return True
>>> else:
>>> return False
>>> return h
>>>
>>> df = iris.distinct('name')
>>> df = df[df.name,
>>> df.name.map(myfunc, resources=[file_resource, iris_names_collection], rtype='boolean').rename('isin')]
>>>
>>> df
name isin
0 Iris-setosa True
1 Iris-versicolor True
2 Iris-virginica FalseSaat tabel partisi dibaca, bidang partisi tidak termasuk.
Jika axis adalah 1 dalam operasi baris, Anda perlu menulis penutup fungsi atau kelas yang dapat dipanggil. Untuk operasi agregasi pada kolom, Anda hanya perlu menggunakan fungsi __init__ untuk membaca sumber daya.
>>> words_df
sentence
0 Hello World
1 Hello Python
2 Life is short I use Python
>>>
>>> import pandas as pd
>>> stop_words = DataFrame(pd.DataFrame({'stops': ['is', 'a', 'I']}))
>>>
>>> @output(['sentence'], ['string'])
>>> def filter_stops(resources):
>>> stop_words = set([r[0] for r in resources[0]])
>>> def h(row):
>>> return ' '.join(w for w in row[0].split() if w not in stop_words),
>>> return h
>>>
>>> words_df.apply(filter_stops, axis=1, resources=[stop_words])
sentence
0 Hello World
1 Hello Python
2 Life short use PythonDalam contoh ini, stop_words adalah variabel lokal, yang dirujuk sebagai sumber daya di MaxCompute selama eksekusi.
Gunakan pustaka Python pihak ketiga
Anda dapat mengunggah paket Python pihak ketiga (dalam format whl, egg, zip, dan tar.gz) ke MaxCompute. Saat menggunakan metode global atau metode langsung, Anda perlu menentukan file paket. Pastikan semua pustaka dependen ditentukan. Jika tidak, kesalahan mungkin terjadi saat Anda mengimpor file.
Anda dapat mengunggah sumber daya dengan memanggil antarmuka unggah sumber daya PyODPS create_resource.
Ambil paket python-dateutil sebagai contoh.
Anda dapat menjalankan perintah
pip downloaduntuk mengunduh paket dan dependensinya ke jalur tertentu. Dua paket diunduh: six-1.10.0-py2.py3-none-any.whl dan python_dateutil-2.5.3-py2.py3-none-any.whl. Perhatikan bahwa paket yang diunduh harus mendukung lingkungan Linux.$ pip download python-dateutil -d /to/path/Kemudian unggah kedua file tersebut ke MaxCompute sebagai sumber daya.
>>> # Pastikan ekstensi nama file benar. >>> odps.create_resource('six.whl', 'file', file_obj=open('six-1.10.0-py2.py3-none-any.whl', 'rb')) >>> odps.create_resource('python_dateutil.whl', 'file', file_obj=open('python_dateutil-2.5.3-py2.py3-none-any.whl', 'rb'))Sekarang Anda memiliki objek DataFrame yang hanya berisi bidang STRING.
>>> df datestr 0 2016-08-26 14:03:29 1 2015-08-26 14:03:29Gunakan pustaka pihak ketiga berikut dalam konfigurasi global.
>>> from odps import options >>> >>> def get_year(t): >>> from dateutil.parser import parse >>> return parse(t).strftime('%Y') >>> >>> options.df.libraries = ['six.whl', 'python_dateutil.whl'] >>> df.datestr.map(get_year) datestr 0 2016 1 2015Atau, gunakan parameter
librariesdari metode untuk menentukan pustaka pihak ketiga.>>> def get_year(t): >>> from dateutil.parser import parse >>> return parse(t).strftime('%Y') >>> >>> df.datestr.map(get_year).execute(libraries=['six.whl', 'python_dateutil.whl']) datestr 0 2016 1 2015
Secara default, PyODPS mendukung pustaka pihak ketiga yang berisi kode Python murni tetapi tanpa operasi file. Di versi MaxCompute yang lebih baru, PyODPS juga mendukung pustaka Python yang berisi kode biner atau operasi file. Pustaka ini harus memiliki akhiran cp27-cp27m-manylinux1_x86_64 dan diunggah sebagai arsip. File .whl harus diubah namanya menjadi .zip. Anda juga perlu menyetel odps.isolation.session.enable ke True atau mengaktifkan
isolationdi proyek Anda. Contoh berikut menunjukkan cara mengunggah dan menggunakan fungsi khusus dalamscipy.>>> # paket yang berisi kode biner harus diunggah menggunakan metode arsip, dan ekstensi file .whl harus diganti dengan .zip. >>> odps.create_resource('scipy.zip', 'archive', file_obj=open('scipy-0.19.0-cp27-cp27m-manylinux1_x86_64.whl', 'rb')) >>> >>> # jika isolation diaktifkan untuk proyek Anda, opsi berikut bersifat opsional. >>> options.sql.settings = { 'odps.isolation.session.enable': True } >>> >>> def psi(value): >>> # kami sarankan Anda mengimpor pustaka pihak ketiga di dalam fungsi untuk menghindari kesalahan yang disebabkan oleh struktur biner yang berbeda antara sistem operasi yang berbeda. >>> from scipy.special import psi >>> return float(psi(value)) >>> >>> df.float_col.map(psi).execute(libraries=['scipy.zip'])Paket biner yang hanya berisi kode sumber dapat dikemas ke dalam file Wheel dan diunggah di Linux Shell. File Wheel yang dihasilkan di Mac dan Windows tidak dapat digunakan di MaxCompute.
python setup.py bdist_wheelAnda juga dapat menggunakan Konsol MaxCompute untuk mengunggah sumber daya.
Sebagian besar paket Python disertai dengan paket .whl, termasuk paket yang berisi file biner di berbagai platform. Oleh karena itu, Anda perlu terlebih dahulu menemukan paket yang dapat berjalan di MaxCompute.
Selain itu, semua dependensi harus disertakan. Tabel berikut mencantumkan dependensi paket.
Nama paket
Dependensi
pandas
numpy, python-dateutil, pytz, six
scipy
numpy
scikit-learn
numpy, scipy
nullPaket numpy sudah disediakan. Anda hanya perlu mengunggah paket python-dateutil, pytz, pandas, SciPy, sklearn, dan six untuk menjalankan paket pandas, scipy, dan scikit-learn.
Anda dapat menemukan python-dateutil-2.6.0.zip di python-dateutil dan mengunduhnya.

Ganti nama file yang diunduh menjadi python-dateutil.zip, dan unggah file tersebut sebagai sumber daya di konsol MaxCompute.
add archive python-dateutil.zip;nullCari, unduh, dan unggah file pytz-2017.2.zip dan six-1.11.0.tar.gz dengan cara yang sama seperti Anda lakukan untuk file python-dateutil.
Untuk memastikan bahwa paket yang berisi kode dalam bahasa C, seperti Pandas, berjalan normal di MaxCompute, Anda perlu menemukan paket .whl yang namanya mengandung cp27-cp27m-manylinux1_x86_64. Oleh karena itu, Anda perlu menemukan dan mengunduh pandas-0.20.2-cp27-cp27m-manylinux1_x86_64.whl, ubah ekstensinya menjadi .zip, dan jalankan
add archive pandas.zip;di Konsol MaxCompute untuk mengunggahnya. Unggah paket scipy dan scikit-learn ke MaxCompute dengan cara yang sama seperti prosedur sebelumnya.
Tabel berikut mencantumkan sumber daya yang harus diunduh untuk semua paket.
Nama paket
Nama file
Nama sumber daya untuk diunggah
python-dateutil
python-dateutil.zip
pytz
pytz.zip
six
six.tar.gz
pandas
pandas.zip
scipy
scipy.zip
scikit-learn
sklearn.zip
Tentukan pustaka Python pihak ketiga
Tentukan secara global pustaka yang akan digunakan.
>>> from odps import options >>> options.df.libraries = ['six.whl', 'python_dateutil.whl']Dalam metode langsung, tentukan secara lokal pustaka yang akan digunakan.
>>> df.apply(my_func, axis=1).to_pandas(libraries=['six.whl', 'python_dateutil.whl'])