Topik ini menjelaskan cara menggunakan fungsi tabel yang ditentukan pengguna (UDTF) Python 3 untuk membaca sumber daya dari MaxCompute di klien MaxCompute.
Prasyarat
Anda telah menginstal klien MaxCompute.
Parameter dinamis UDTF
Untuk informasi lebih lanjut tentang format tanda tangan fungsi Python UDTF, lihat Tanda Tangan Fungsi dan Tipe Data.
- Anda dapat menggunakan asterisk (
*) dalam daftar parameter untuk menunjukkan bahwa parameter input dapat memiliki panjang dan tipe apa pun. Sebagai contoh,@annotate('double,*->string')menunjukkan daftar parameter di mana parameter pertama adalah tipe data DOUBLE dan diikuti oleh parameter dengan panjang dan tipe apa pun. Dalam hal ini, Anda harus menyusun kode untuk menghitung jumlah dan tipe parameter input serta mengelolanya berdasarkan fungsiprintfdalam bahasa pemrograman C.CatatanAsterisk (*)dalam nilai balikan menunjukkan arti yang berbeda. Asterisk (*)dapat digunakan dalam nilai balikan UDTF untuk menunjukkan bahwa sejumlah nilai dari tipe data STRING dapat dikembalikan. Jumlah nilai balikan didasarkan pada jumlah alias yang dikonfigurasi saat memanggil fungsi. Sebagai contoh, metode panggilan@annotate("bigint,string->double,*")adalahUDTF(x, y) as (a, b, c). Dalam contoh ini, aliasa,b, dancdikonfigurasi setelahas. Editor mengidentifikasi bahwaaadalah tipe DOUBLE danbdancadalah tipe STRING. Tipe data kolom pertama yang dikembalikan dalam anotasi diberikan. Tiga nilai balikan disediakan dalam contoh ini. Oleh karena itu, metodeforwardyang dipanggil oleh UDTF harusforwardarray dengan tiga elemen. Jika tidak, kesalahan akan dikembalikan.Catatan Namun, kesalahan tidak dikembalikan selama kompilasi. Oleh karena itu, pemanggil UDTF harus mengonfigurasi jumlah alias dalam SQL berdasarkan aturan yang didefinisikan dalam UDTF. Jumlah nilai balikan fungsi agregat tetap menjadi 1. Oleh karena itu, aturan ini tidak berpengaruh pada fungsi agregat yang ditentukan pengguna (UDAF).
Kode contoh
- Membaca sumber daya dari MaxCompute
from odps.udf import annotate from odps.udf import BaseUDTF from odps.distcache import get_cache_file from odps.distcache import get_cache_table @annotate('string -> string, bigint') class UDTFExample(BaseUDTF): """Membaca pageid dan adid_list dari file get_cache_file dan tabel get_cache_table untuk menghasilkan dict. """ def __init__(self): import json cache_file = get_cache_file('test_json.txt') self.my_dict = json.load(cache_file) cache_file.close() records = list(get_cache_table('table_resource1')) for record in records: self.my_dict[record[0]] = record[1] """Masukkan pageid dan hasilkan pageid dan semua nilai adid. """ def process(self, pageid): for adid in self.my_dict[pageid]: self.forward(pageid, adid) - Konfigurasikan parameter dinamis
from odps.udf import annotate from odps.udf import BaseUDTF import json @annotate('string,*->string,*') class JsonTuple(BaseUDTF): def process(self, *args): length = len(args) result = [None] * length try: obj = json.loads(args[0]) for i in range(1, length): result[i] = str(obj.get(args[i])) except Exception as err: result[0] = str(err) for i in range(1, length): result[i] = None self.forward(*result)Dalam contoh ini, jumlah nilai balikan didasarkan pada jumlah parameter input. Parameter output pertama menunjukkan file JSON, dan parameter output lainnya diurai berdasarkan kunci dari file JSON. Nilai balikan pertama adalah pesan kesalahan selama penguraian file JSON. Jika tidak ada kesalahan, konten yang dihasilkan selama penguraian file JSON diberikan berdasarkan urutan kunci input. Pernyataan contoh:-- Konfigurasikan jumlah alias output berdasarkan jumlah parameter input. SELECT my_json_tuple(json, 'a', 'b') as (exceptions, a, b) FROM jsons; -- Bagian variabel-length dapat tidak memiliki kolom. SELECT my_json_tuple(json) as exceptions FROM jsons; -- Kesalahan terjadi saat pernyataan SQL berikut dieksekusi karena jumlah alias tidak sesuai dengan jumlah aktual. -- Kesalahan ini tidak dikembalikan selama kompilasi. SELECT my_json_tuple(json, 'a', 'b') as (exceptions, a, b, c) FROM jsons;
Prosedur
- Simpan kode contoh sebagai file py_udtf_example.py dan tempatkan file tersebut di folder bin klien MaxCompute.
- Masuk ke klien MaxCompute untuk membuat tabel sumber daya bernama table_resource1 dan tabel internal bernama tmp1, lalu masukkan data ke dalam tabel. Siapkan file sumber daya test_json.txt dan tempatkan file tersebut di folder bin klien MaxCompute. Tabel tmp1 adalah tabel tempat Anda mengeksekusi pernyataan DML untuk menulis data dalam operasi selanjutnya.Untuk informasi lebih lanjut tentang cara masuk ke klien MaxCompute, lihat Instal dan Mulai Klien MaxCompute. Contoh kode berikut menunjukkan operasi yang dapat Anda lakukan dalam langkah ini:
- Buat tabel sumber daya bernama table_resource1 dan masukkan data ke dalam tabel.
create table if not exists table_resource1 (pageid string, adid_list array<int>); insert into table table_resource1 values("contact_page2",array(2,3,4)),("contact_page3",array(5,6,7));Catatan Bidang adid_list di table_resource1 adalah tipe ARRAY. Eksekusi pernyataanset odps.sql.python.version=cp37;pada tingkat sesi untuk mengaktifkan Python 3 agar dapat membaca data tipe ARRAY. - Buat tabel internal bernama tmp1 dan masukkan data ke dalam tabel.
create table if not exists tmp1 (pageid string); insert into table tmp1 values ("front_page"),("contact_page1"),("contact_page3"); - Lihat isi file sumber daya test_json.txt.
{"front_page":[1, 2, 3], "contact_page1":[3, 4, 5]}
- Buat tabel sumber daya bernama table_resource1 dan masukkan data ke dalam tabel.
Buat UDTF bernama my_udtf di klien MaxCompute.
Untuk informasi lebih lanjut tentang cara membuat UDTF, lihat Buat UDF. Pernyataan contoh:
create function my_udtf as 'py_udtf_example.UDTFExample' using 'py_udtf_example.py, test_json.txt, table_resource1';- Eksekusi pernyataan SQL di klien MaxCompute untuk memanggil UDTF yang telah dibuat.Pernyataan contoh:
- Contoh 1: Gunakan hanya UDTF untuk mengeksekusi pernyataan SQL.
Hasil berikut dikembalikan:select my_udtf(pageid) as (pageid, adid) from tmp1;+------------+------------+ | pageid | adid | +------------+------------+ | front_page | 1 | | front_page | 2 | | front_page | 3 | | contact_page1 | 3 | | contact_page1 | 4 | | contact_page1 | 5 | | contact_page3 | 5 | | contact_page3 | 6 | | contact_page3 | 7 | +------------+------------+ - Contoh 2: Tulis ulang pernyataan dalam Contoh 1 dan gunakan klausa LATERAL VIEW dan UDTF untuk mengeksekusi pernyataan SQL.
Hasil berikut dikembalikan:select pageid, adid from tmp1 lateral view my_udtf(pageid) adTable as udtf_pageid, adid;+--------+------------+ | pageid | adid | +--------+------------+ | front_page | 1 | | front_page | 2 | | front_page | 3 | | contact_page1 | 3 | | contact_page1 | 4 | | contact_page1 | 5 | | contact_page3 | 5 | | contact_page3 | 6 | | contact_page3 | 7 | +--------+------------+ - Contoh 3: Gunakan fungsi agregat, klausa LATERAL VIEW, dan UDTF untuk mengeksekusi pernyataan SQL.
Hasil berikut dikembalikan:select adid, count(1) as cnt from tmp1 lateral view my_udtf(pageid) adTable as udtf_pageid, adid group by adid;+------------+------------+ | adid | cnt | +------------+------------+ | 1 | 1 | | 2 | 1 | | 3 | 2 | | 4 | 1 | | 5 | 2 | | 6 | 1 | | 7 | 1 | +------------+------------+
- Contoh 1: Gunakan hanya UDTF untuk mengeksekusi pernyataan SQL.