Gunakan user-defined table-valued function (UDTF) Python 3 untuk membaca resource MaxCompute—termasuk file resource dan tabel resource—pada klien MaxCompute.
Prasyarat
Sebelum memulai, pastikan Anda telah:
Menginstal klien MaxCompute. Untuk informasi lebih lanjut, lihat Install and configure the MaxCompute client.
Metode kelas handler
UDTF Python 3 diimplementasikan sebagai kelas yang memperluas BaseUDTF. Metode-metode dalam kelas tersebut adalah:
| Method | Required | Description |
|---|---|---|
__init__ | No | Menginisialisasi state. Gunakan metode ini untuk memuat file dan tabel resource sekali saat startup, sebelum baris apa pun diproses. |
process | Yes | Memproses setiap baris input. Panggil forward() di dalam metode ini untuk menghasilkan baris output. |
Decorator @annotate mendefinisikan signature fungsi. Panggil forward(*args) di dalam metode process untuk menghasilkan setiap baris output—satu pemanggilan forward menghasilkan satu baris output.
Parameter dinamis
Untuk referensi lengkap mengenai format signature fungsi dan tipe data, lihat Function signatures and data types.
Tanda bintang (*) dalam signature @annotate memiliki makna berbeda tergantung posisinya:
Dalam daftar parameter: * berarti input menerima sejumlah parameter tambahan dengan tipe apa pun. Misalnya, @annotate('double,*->string') mendeklarasikan parameter pertama bertipe DOUBLE diikuti oleh sejumlah parameter lainnya. Anda harus mengompilasi kode untuk menghitung jumlah dan tipe parameter input, serta mengelolanya seperti fungsi printf dalam bahasa pemrograman C. Metode process menerima parameter tersebut sebagai *args dan harus menanganinya secara eksplisit.
Dalam nilai kembali: * berarti sejumlah nilai STRING dikembalikan, dengan jumlah ditentukan oleh jumlah alias yang diberikan saat pemanggilan. Misalnya, @annotate("bigint,string->double,*") yang dipanggil sebagai UDTF(x, y) as (a, b, c) mengembalikan tiga nilai: a bertipe DOUBLE, serta b dan c bertipe STRING. Pemanggilan forward di dalam process harus menghasilkan array dengan panjang tepat sesuai jumlah alias—ketidaksesuaian akan menyebabkan error waktu proses, bukan error waktu kompilasi.
Aturan * ini dalam nilai kembali hanya berlaku untuk UDTF. User-defined aggregate functions (UDAF) selalu mengembalikan tepat satu nilai.
Kode contoh
Baca resource dari MaxCompute
UDTF berikut membaca pemetaan halaman ke iklan dari file resource JSON (test_json.txt) dan tabel resource (table_resource1), lalu menghasilkan satu baris per ID iklan untuk setiap ID halaman input.
from odps.udf import annotate
from odps.udf import BaseUDTF
from odps.distcache import get_cache_file
from odps.distcache import get_cache_table
@annotate('string -> string, bigint')
class UDTFExample(BaseUDTF):
"""Read pageid and adid_list from the file get_cache_file and the table get_cache_table to generate dict.
"""
def __init__(self):
import json
cache_file = get_cache_file('test_json.txt')
self.my_dict = json.load(cache_file)
cache_file.close()
records = list(get_cache_table('table_resource1'))
for record in records:
self.my_dict[record[0]] = record[1]
"""Enter pageid and generate pageid and all adid values.
"""
def process(self, pageid):
for adid in self.my_dict[pageid]:
self.forward(pageid, adid)get_cache_file dan get_cache_table (dari odps.distcache) memuat resource berdasarkan nama yang digunakan saat resource tersebut didaftarkan dengan add file atau add table. Resource dimuat sekali dalam __init__ dan digunakan ulang di semua pemanggilan process.
Gunakan parameter dinamis
UDTF berikut mengurai string JSON dan mengekstraksi nilai berdasarkan kunci. Jumlah nilai kembali sama dengan jumlah parameter input.
from odps.udf import annotate
from odps.udf import BaseUDTF
import json
@annotate('string,*->string,*')
class JsonTuple(BaseUDTF):
def process(self, *args):
length = len(args)
result = [None] * length
try:
obj = json.loads(args[0])
for i in range(1, length):
result[i] = str(obj.get(args[i]))
except Exception as err:
result[0] = str(err)
for i in range(1, length):
result[i] = None
self.forward(*result)Argumen pertama (args[0]) adalah string JSON. Argumen tambahan adalah kunci yang akan diekstraksi. Nilai kembali pertama berisi error penguraian (jika ada); nilai-nilai berikutnya berisi konten yang diekstraksi sesuai urutan kunci.
Panggil UDTF ini dengan jumlah alias yang sesuai:
-- Number of output aliases matches number of input parameters
SELECT my_json_tuple(json, 'a', 'b') as (exceptions, a, b) FROM jsons;
-- The variable-length part can have no columns
SELECT my_json_tuple(json) as exceptions FROM jsons;
-- This causes a runtime error: alias count (4) does not match input parameter count (3)
SELECT my_json_tuple(json, 'a', 'b') as (exceptions, a, b, c) FROM jsons;Daftarkan dan panggil UDTF
Prosedur berikut menggunakan UDTFExample untuk menunjukkan alur kerja end-to-end. Simpan kode sebagai py_udtf_example.py di folder bin klien MaxCompute sebelum memulai.
Langkah 1: Buat tabel resource dan siapkan data
Login ke klien MaxCompute. Untuk informasi lebih lanjut, lihat Start the MaxCompute client.
Buat dan isi tabel resource table_resource1:
create table if not exists table_resource1 (pageid string, adid_list array<int>);
insert into table table_resource1 values("contact_page2",array(2,3,4)),("contact_page3",array(5,6,7));Bidang adid_list bertipe ARRAY. Untuk memungkinkan Python 3 membaca data bertipe ARRAY, jalankan set odps.sql.python.version=cp37; pada tingkat sesi sebelum melakukan kueri.
Buat dan isi tabel internal tmp1:
create table if not exists tmp1 (pageid string);
insert into table tmp1 values ("front_page"),("contact_page1"),("contact_page3");Letakkan test_json.txt di folder bin klien MaxCompute. File tersebut berisi:
{"front_page":[1, 2, 3], "contact_page1":[3, 4, 5]}Langkah 2: Daftarkan resource
Tambahkan file Python, file JSON, dan tabel sebagai resource MaxCompute. Untuk informasi lebih lanjut, lihat Add resources.
add py py_udtf_example.py;
add file test_json.txt;
add table table_resource1 as table_resource1;Langkah 3: Buat UDTF
Daftarkan UDTF, sertakan semua resource yang dibutuhkan. Untuk informasi lebih lanjut, lihat Create a UDF.
create function my_udtf as 'py_udtf_example.UDTFExample' using 'py_udtf_example.py, test_json.txt, table_resource1';Langkah 4: Panggil UDTF
Terdapat tiga pola pemanggilan yang didukung. Semuanya menghasilkan output yang sama untuk contoh ini.
Pemanggilan langsung:
select my_udtf(pageid) as (pageid, adid) from tmp1;Dengan LATERAL VIEW:
select pageid, adid from tmp1 lateral view my_udtf(pageid) adTable as udtf_pageid, adid;Dengan LATERAL VIEW dan fungsi agregat:
select adid, count(1) as cnt
from tmp1 lateral view my_udtf(pageid) adTable as udtf_pageid, adid
group by adid;Kueri pemanggilan langsung dan LATERAL VIEW mengembalikan 9 baris yang sama:
+------------+------------+
| pageid | adid |
+------------+------------+
| front_page | 1 |
| front_page | 2 |
| front_page | 3 |
| contact_page1 | 3 |
| contact_page1 | 4 |
| contact_page1 | 5 |
| contact_page3 | 5 |
| contact_page3 | 6 |
| contact_page3 | 7 |
+------------+------------+Kueri agregat mengembalikan:
+------------+------------+
| adid | cnt |
+------------+------------+
| 1 | 1 |
| 2 | 1 |
| 3 | 2 |
| 4 | 1 |
| 5 | 2 |
| 6 | 1 |
| 7 | 1 |
+------------+------------+