全部产品
Search
文档中心

MaxCompute:Gunakan UDF dan pustaka Python pihak ketiga

更新时间:Jun 19, 2025

Topik ini menjelaskan cara menggunakan fungsi yang ditentukan pengguna (UDF) dan pustaka Python pihak ketiga.

Gunakan UDF

DataFrame memungkinkan Anda menggunakan metode map pada objek Sequence untuk memanggil UDF di semua elemennya.

>>> iris.sepallength.map(lambda x: x + 1).head(5)
   sepallength
0          6.1
1          5.9
2          5.7
3          5.6
4          6.0
null

Saat menggunakan UDF, tipe List atau Dict tidak dapat digunakan sebagai input atau output.

Jika tipe Sequence berubah setelah metode map dijalankan, Anda perlu secara eksplisit menentukan tipe baru dari Sequence.

>>> iris.sepallength.map(lambda x: 't'+str(x), 'string').head(5)
   sepallength
0         t5.1
1         t4.9
2         t4.7
3         t4.6
4         t5.0

Jika suatu fungsi mengandung closure, perubahan nilai variabel closure di luar fungsi menyebabkan nilai variabel dalam fungsi tersebut berubah.

>>> dfs = []
>>> for i in range(10):
>>>     dfs.append(df.sepal_length.map(lambda x: x + i))

Akibatnya, setiap objek SequenceExpr dalam dfs adalah df.sepal_length + 9. Untuk menyelesaikan masalah ini, Anda dapat menggunakan fungsi sebagai nilai kembalian dari fungsi lain, atau gunakan partial. Lihat dua contoh berikut.

>>> dfs = []
>>> def get_mapper(i):
>>>     return lambda x: x + i
>>> for i in range(10):
>>>     dfs.append(df.sepal_length.map(get_mapper(i)))
>>> import functools
>>> dfs = []
>>> for i in range(10):
>>>     dfs.append(df.sepal_length.map(functools.partial(lambda v, x: x + v, i)))

Metode map juga mendukung UDF yang sudah ada. Anda dapat memasukkan parameter bertipe str, yang mewakili nama fungsi, atau objek Fungsi. Untuk informasi lebih lanjut, lihat Fungsi.

Saat mengimplementasikan metode map untuk fungsi Python, Anda harus menggunakan MaxCompute Python UDF. Jika proyek Anda tidak mendukung Python UDF, Anda tidak dapat menggunakan metode map. Selain itu, semua batasan Python UDF berlaku.

Pustaka pihak ketiga satu-satunya yang tersedia (berisi kode dalam bahasa C) adalah NumPy. Untuk informasi lebih lanjut tentang cara menggunakan pustaka Python pihak ketiga, lihat Gunakan pustaka Python pihak ketiga.

Selain UDF, DataFrame menyediakan banyak fungsi bawaan, beberapa di antaranya diimplementasikan dengan menggunakan fungsi map. Oleh karena itu, jika proyek Anda tidak mendukung Python UDF, Anda tidak dapat menggunakan fungsi-fungsi ini. Perhatikan bahwa layanan publik Alibaba Cloud tidak mendukung Python UDF.

null

Karena perbedaan dalam definisi byte code, jika Anda menggunakan fitur baru yang didukung oleh Python 3 (seperti yield from), kesalahan mungkin terjadi saat kode dieksekusi pada MaxCompute Worker dari Python 2.7. Oleh karena itu, kami sarankan Anda memastikan kode Anda berjalan normal sebelum menulis kode produksi dengan menggunakan API MapReduce di Python 3.

Contoh penggunaan counter:

from odps.udf import get_execution_context
def h(x):
    ctx = get_execution_context()
    counters = ctx.get_counters()
    counters.get_counter('df', 'add_one').increment(1)
    return x + 1
df.field.map(h)

Anda dapat menemukan nilai counter di JSONSummary dari LogView.

Gunakan UDF untuk satu baris

Untuk menggunakan UDF untuk satu baris, Anda dapat menggunakan metode apply. Parameter axis harus disetel ke 1 untuk menunjukkan bahwa operasi dilakukan pada baris. UDF dari metode apply mengambil satu parameter, yaitu satu baris data dari objek Collection sebelumnya. Anda dapat mengambil data di bidang tertentu dengan menggunakan atribut atau offset.

  • Jika parameter reduce disetel ke True, objek Sequence dikembalikan. Jika tidak, objek Collection dikembalikan. Anda dapat menggunakan parameter names dan types untuk menentukan nama bidang dan jenis dari objek Sequence atau Collection yang dikembalikan. Secara default, jika Anda tidak menentukan jenis, jenis STRING digunakan.

    >>> iris.apply(lambda row: row.sepallength + row.sepalwidth, axis=1, reduce=True, types='float').rename('sepaladd').head(3)
       sepaladd
    0       8.6
    1       7.9
    2       7.9
  • Jika reduce adalah False dalam UDF dari metode apply, Anda dapat menggunakan kata kunci yield untuk mengembalikan beberapa hasil baris.

    >>> iris.count()
    150
    >>>
    >>> def handle(row):
    >>>     yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
    >>>     yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
    >>>
    >>> iris.apply(handle, axis=1, names=['iris_add', 'iris_sub'], types=['float', 'float']).count()
    300
  • Anda juga dapat memberi anotasi pada nama bidang dan jenis yang dikembalikan dalam fungsi. Anda tidak perlu menentukannya saat memanggil fungsi.

    >>> from odps.df import output
    >>>
    >>> @output(['iris_add', 'iris_sub'], ['float', 'float'])
    >>> def handle(row):
    >>>     yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
    >>>     yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
    >>>
    >>> iris.apply(handle, axis=1).count()
    300
  • Anda juga dapat menggunakan map_reduce dari map-only, yang setara dengan metode apply dengan axis=1.

    >>> iris.map_reduce(mapper=handle).count()
    300
  • Untuk menggunakan fungsi tabel yang ditentukan pengguna (UDTF) yang sudah ada di MaxCompute, tentukan nama UDTF.

    >>> iris['name', 'sepallength'].apply('your_func', axis=1, names=['name2', 'sepallength2'], types=['string', 'float'])
  • Saat menggunakan metode apply untuk baris dan reduce adalah False, Anda dapat menggunakan lateral view dengan baris yang ada untuk tujuan seperti agregasi.

    >>> from odps.df import output
    >>>
    >>> @output(['iris_add', 'iris_sub'], ['float', 'float'])
    >>> def handle(row):
    >>>     yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
    >>>     yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
    >>>
    >>> iris[iris.category, iris.apply(handle, axis=1)]

Gunakan agregasi kustom untuk semua kolom

Saat menggunakan metode apply, jika axis tidak ditentukan atau nilai dari axis adalah 0, Anda dapat memasukkan kelas agregasi kustom untuk menggabungkan semua objek Sequence.

class Agg(object):

    def buffer(self):
        return [0.0, 0]

    def __call__(self, buffer, val):
        buffer[0] += val
        buffer[1] += 1

    def merge(self, buffer, pbuffer):
        buffer[0] += pbuffer[0]
        buffer[1] += pbuffer[1]

    def getvalue(self, buffer):
        if buffer[1] == 0:
            return 0.0
        return buffer[0] / buffer[1]
>>> iris.exclude('name').apply(Agg)
   sepallength_aggregation  sepalwidth_aggregation  petallength_aggregation  petalwidth_aggregation
0                 5.843333                   3.054                 3.758667                1.198667
null

Saat menggunakan UDF, Anda tidak dapat menggunakan tipe LIST atau DICT sebagai input atau output.

Sumber referensi

UDF juga dapat membaca sumber daya MaxCompute (seperti sumber daya tabel dan file) atau merujuk objek Collection sebagai sumber daya. Untuk melakukannya, Anda perlu menulis UDF sebagai closure atau kelas yang dapat dipanggil. Lihat dua contoh berikut.

>>> file_resource = o.create_resource('pyodps_iris_file', 'file', file_obj='Iris-setosa')
>>>
>>> iris_names_collection = iris.distinct('name')[:2]
>>> iris_names_collection
       sepallength
0      Iris-setosa
1  Iris-versicolor
>>> def myfunc(resources):  # resources diteruskan sesuai urutan pemanggilan
>>>     names = set()
>>>     fileobj = resources[0] # sumber daya file direpresentasikan oleh objek mirip file
>>>     for l in fileobj:
>>>         names.add(l)
>>>     collection = resources[1]
>>>     for r in collection:
>>>         names.add(r.name)  # ambil nilai dengan menggunakan nama bidang atau offset
>>>     def h(x):
>>>         if x in names:
>>>             return True
>>>         else:
>>>             return False
>>>     return h
>>>
>>> df = iris.distinct('name')
>>> df = df[df.name,
>>>         df.name.map(myfunc, resources=[file_resource, iris_names_collection], rtype='boolean').rename('isin')]
>>>
>>> df
              name   isin
0      Iris-setosa   True
1  Iris-versicolor   True
2   Iris-virginica  False
null

Saat tabel partisi dibaca, bidang partisi tidak termasuk.

Jika axis adalah 1 dalam operasi baris, Anda perlu menulis penutup fungsi atau kelas yang dapat dipanggil. Untuk operasi agregasi pada kolom, Anda hanya perlu menggunakan fungsi __init__ untuk membaca sumber daya.

>>> words_df
                     sentence
0                 Hello World
1                Hello Python
2  Life is short I use Python
>>>
>>> import pandas as pd
>>> stop_words = DataFrame(pd.DataFrame({'stops': ['is', 'a', 'I']}))
>>>
>>> @output(['sentence'], ['string'])
>>> def filter_stops(resources):
>>>     stop_words = set([r[0] for r in resources[0]])
>>>     def h(row):
>>>         return ' '.join(w for w in row[0].split() if w not in stop_words),
>>>     return h
>>>
>>> words_df.apply(filter_stops, axis=1, resources=[stop_words])
                sentence
0            Hello World
1           Hello Python
2  Life short use Python
null

Dalam contoh ini, stop_words adalah variabel lokal, yang dirujuk sebagai sumber daya di MaxCompute selama eksekusi.

Gunakan pustaka Python pihak ketiga

Anda dapat mengunggah paket Python pihak ketiga (dalam format whl, egg, zip, dan tar.gz) ke MaxCompute. Saat menggunakan metode global atau metode langsung, Anda perlu menentukan file paket. Pastikan semua pustaka dependen ditentukan. Jika tidak, kesalahan mungkin terjadi saat Anda mengimpor file.

  • Anda dapat mengunggah sumber daya dengan memanggil antarmuka unggah sumber daya PyODPS create_resource.

    Ambil paket python-dateutil sebagai contoh.

    1. Anda dapat menjalankan perintah pip download untuk mengunduh paket dan dependensinya ke jalur tertentu. Dua paket diunduh: six-1.10.0-py2.py3-none-any.whl dan python_dateutil-2.5.3-py2.py3-none-any.whl. Perhatikan bahwa paket yang diunduh harus mendukung lingkungan Linux.

      $ pip download python-dateutil -d /to/path/
    2. Kemudian unggah kedua file tersebut ke MaxCompute sebagai sumber daya.

      >>> # Pastikan ekstensi nama file benar.
      >>> odps.create_resource('six.whl', 'file', file_obj=open('six-1.10.0-py2.py3-none-any.whl', 'rb'))
      >>> odps.create_resource('python_dateutil.whl', 'file', file_obj=open('python_dateutil-2.5.3-py2.py3-none-any.whl', 'rb'))
    3. Sekarang Anda memiliki objek DataFrame yang hanya berisi bidang STRING.

      >>> df
                     datestr
      0  2016-08-26 14:03:29
      1  2015-08-26 14:03:29
    4. Gunakan pustaka pihak ketiga berikut dalam konfigurasi global.

      >>> from odps import options
      >>>
      >>> def get_year(t):
      >>>     from dateutil.parser import parse
      >>>     return parse(t).strftime('%Y')
      >>>
      >>> options.df.libraries = ['six.whl', 'python_dateutil.whl']
      >>> df.datestr.map(get_year)
         datestr
      0     2016
      1     2015

      Atau, gunakan parameter libraries dari metode untuk menentukan pustaka pihak ketiga.

      >>> def get_year(t):
      >>>     from dateutil.parser import parse
      >>>     return parse(t).strftime('%Y')
      >>>
      >>> df.datestr.map(get_year).execute(libraries=['six.whl', 'python_dateutil.whl'])
         datestr
      0     2016
      1     2015

    Secara default, PyODPS mendukung pustaka pihak ketiga yang berisi kode Python murni tetapi tanpa operasi file. Di versi MaxCompute yang lebih baru, PyODPS juga mendukung pustaka Python yang berisi kode biner atau operasi file. Pustaka ini harus memiliki akhiran cp27-cp27m-manylinux1_x86_64 dan diunggah sebagai arsip. File .whl harus diubah namanya menjadi .zip. Anda juga perlu menyetel odps.isolation.session.enable ke True atau mengaktifkan isolation di proyek Anda. Contoh berikut menunjukkan cara mengunggah dan menggunakan fungsi khusus dalam scipy.

    >>> # paket yang berisi kode biner harus diunggah menggunakan metode arsip, dan ekstensi file .whl harus diganti dengan .zip.
    >>> odps.create_resource('scipy.zip', 'archive', file_obj=open('scipy-0.19.0-cp27-cp27m-manylinux1_x86_64.whl', 'rb'))
    >>>
    >>> # jika isolation diaktifkan untuk proyek Anda, opsi berikut bersifat opsional.
    >>> options.sql.settings = { 'odps.isolation.session.enable': True }
    >>>
    >>> def psi(value):
    >>>     # kami sarankan Anda mengimpor pustaka pihak ketiga di dalam fungsi untuk menghindari kesalahan yang disebabkan oleh struktur biner yang berbeda antara sistem operasi yang berbeda.
    >>>     from scipy.special import psi
    >>>     return float(psi(value))
    >>>
    >>> df.float_col.map(psi).execute(libraries=['scipy.zip'])

    Paket biner yang hanya berisi kode sumber dapat dikemas ke dalam file Wheel dan diunggah di Linux Shell. File Wheel yang dihasilkan di Mac dan Windows tidak dapat digunakan di MaxCompute.

    python setup.py bdist_wheel
  • Anda juga dapat menggunakan Konsol MaxCompute untuk mengunggah sumber daya.

    1. Sebagian besar paket Python disertai dengan paket .whl, termasuk paket yang berisi file biner di berbagai platform. Oleh karena itu, Anda perlu terlebih dahulu menemukan paket yang dapat berjalan di MaxCompute.

    2. Selain itu, semua dependensi harus disertakan. Tabel berikut mencantumkan dependensi paket.

      Nama paket

      Dependensi

      pandas

      numpy, python-dateutil, pytz, six

      scipy

      numpy

      scikit-learn

      numpy, scipy

      null

      Paket numpy sudah disediakan. Anda hanya perlu mengunggah paket python-dateutil, pytz, pandas, SciPy, sklearn, dan six untuk menjalankan paket pandas, scipy, dan scikit-learn.

    3. Anda dapat menemukan python-dateutil-2.6.0.zip di python-dateutil dan mengunduhnya.

    4. Ganti nama file yang diunduh menjadi python-dateutil.zip, dan unggah file tersebut sebagai sumber daya di konsol MaxCompute.

      add archive python-dateutil.zip;
      null

      Cari, unduh, dan unggah file pytz-2017.2.zip dan six-1.11.0.tar.gz dengan cara yang sama seperti Anda lakukan untuk file python-dateutil.

    5. Untuk memastikan bahwa paket yang berisi kode dalam bahasa C, seperti Pandas, berjalan normal di MaxCompute, Anda perlu menemukan paket .whl yang namanya mengandung cp27-cp27m-manylinux1_x86_64. Oleh karena itu, Anda perlu menemukan dan mengunduh pandas-0.20.2-cp27-cp27m-manylinux1_x86_64.whl, ubah ekstensinya menjadi .zip, dan jalankan add archive pandas.zip; di Konsol MaxCompute untuk mengunggahnya. Unggah paket scipy dan scikit-learn ke MaxCompute dengan cara yang sama seperti prosedur sebelumnya.

    Tabel berikut mencantumkan sumber daya yang harus diunduh untuk semua paket.

    Nama paket

    Nama file

    Nama sumber daya untuk diunggah

    python-dateutil

    python-dateutil-2.6.0.zip

    python-dateutil.zip

    pytz

    pytz-2017.2.zip

    pytz.zip

    six

    six-1.11.0.tar.gz

    six.tar.gz

    pandas

    pandas-0.20.2-cp27-cp27m-manylinux1_x86_64.zip

    pandas.zip

    scipy

    scipy-0.19.0-cp27-cp27m-manylinux1_x86_64.zip

    scipy.zip

    scikit-learn

    scikit_learn-0.18.1-cp27-cp27m-manylinux1_x86_64.zip

    sklearn.zip

Tentukan pustaka Python pihak ketiga

  • Tentukan secara global pustaka yang akan digunakan.

    >>> from odps import options
    >>> options.df.libraries = ['six.whl', 'python_dateutil.whl']
  • Dalam metode langsung, tentukan secara lokal pustaka yang akan digunakan.

    >>> df.apply(my_func, axis=1).to_pandas(libraries=['six.whl', 'python_dateutil.whl'])