Topik ini menjelaskan cara menggunakan fungsi tabel bernilai pengguna Java (UDTF) untuk membaca sumber daya dari MaxCompute berdasarkan MaxCompute Studio.
Prasyarat
MaxCompute Studio telah diinstal dan terhubung ke proyek MaxCompute, serta modul Java MaxCompute telah dibuat. Untuk informasi lebih lanjut, lihat Instal MaxCompute Studio, Kelola koneksi proyek, dan Buat modul Java MaxCompute.
Alat pengembangan IDEA 2024 dan JDK versi 1.8 telah diinstal.
Contoh kode UDTF
Kode sampel berikut adalah Java UDTF.
Kategori Parameter | Tipe Parameter | Deskripsi |
Parameter Masukan | String | Parameter masukan pertama. |
String | Parameter masukan kedua. | |
Parameter Keluaran | String | Nilai parameter masukan pertama. |
Bigint | Panjang string parameter masukan kedua. | |
String | Nilai gabungan dari jumlah baris dari file_resource.txt, jumlah baris dari tabel table_resource1, dan jumlah baris dari tabel table_resource2. |
package com.aliyun.odps.examples.udf;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Iterator;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.UDTF;
import com.aliyun.odps.udf.annotation.Resolve;
/**
* project: example_project
* table: wc_in2
* partitions: p1=2,p2=1
* columns: cola,colc
*/
@Resolve("string,string->string,bigint,string")
public class UDTFResource extends UDTF {
ExecutionContext ctx;
long fileResourceLineCount;
long tableResource1RecordCount;
long tableResource2RecordCount;
@Override
public void setup(ExecutionContext ctx) throws UDFException {
this.ctx = ctx;
try {
InputStream in = ctx.readResourceFileAsStream("file_resource.txt");
BufferedReader br = new BufferedReader(new InputStreamReader(in));
String line;
fileResourceLineCount = 0;
while ((line = br.readLine()) != null) {
fileResourceLineCount++;
}
br.close();
Iterator<Object[]> iterator = ctx.readResourceTable("table_resource1").iterator();
tableResource1RecordCount = 0;
while (iterator.hasNext()) {
tableResource1RecordCount++;
iterator.next();
}
iterator = ctx.readResourceTable("table_resource2").iterator();
tableResource2RecordCount = 0;
while (iterator.hasNext()) {
tableResource2RecordCount++;
iterator.next();
}
} catch (IOException e) {
throw new UDFException(e);
}
}
@Override
public void process(Object[] args) throws UDFException {
String a = (String) args[0];
long b = args[1] == null ? 0 : ((String) args[1]).length();
forward(a, b, "fileResourceLineCount=" + fileResourceLineCount + "|tableResource1RecordCount="
+ tableResource1RecordCount + "|tableResource2RecordCount=" + tableResource2RecordCount);
}
}Kode berikut menunjukkan dependensi yang diperlukan dalam file pom.xml untuk pengujian lokal.
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-udf-local</artifactId>
<version>0.48.0-public</version>
</dependency>Prosedur
Pengujian Lokal
Buat program Java baru tipe UDTF di MaxCompute Studio. Sebagai contoh, beri nama Class Java
UDTFResourcedan gunakan kode program dari contoh kode UDTF.Konfigurasikan parameter waktu proses berdasarkan sumber daya warehouse di Modul Java.
CatatanParameter masukan adalah nilai kolom pertama dan ketiga dari setiap baris di partisi p1=2, p2=1 dari tabel wc_in2 di sumber daya lokal.
Eksekusi kode mengambil data dari sumber daya lokal
file_resource.txt, data daritable_resource1yang sesuai dengan tabelwc_in1, dan data daritable_resource2yang sesuai dengan tabelwc_in2dengan parameterp1=2danp2=1.

Klik kanan pada kelas UDTFResource dan pilih Run untuk mengeksekusi program. Hasilnya akan ditampilkan.


Pengujian Klien
Klik
Project Explorer di pojok kiri atas IDEA, dan pilih
Add Resource.
Tambahkan file file_resource.txt berdasarkan informasi instans MaxCompute.

Buat tabel data sampel wc_in1 dan wc_in2 di proyek MaxCompute dan sisipkan data.
CREATE TABLE wc_in1 ( col1 STRING, col2 STRING, col3 STRING, col4 STRING ); INSERT INTO wc_in1 VALUES ('A1','A2','A3','A4'), ('A1','A2','A3','A4'), ('A1','A2','A3','A4'), ('A1','A2','A3','A4'); CREATE TABLE wc_in2 ( cola STRING, colb STRING, colc STRING ) PARTITIONED BY (p1 STRING, p2 STRING); ALTER TABLE wc_in2 ADD PARTITION (p1='2',p2='1'); INSERT INTO wc_in2 PARTITION (p1='2',p2='1') VALUES ('three1','three2','three3'), ('three1','three2','three3'), ('three1','three2','three3');Petakan tabel wc_in1 dan wc_in2 yang dibuat di MaxCompute ke table_resource1 dan table_resource2 di Resource.
Tambahkan sumber daya wc_in1.

Tambahkan sumber daya wc_in2.

Kemas UDTF yang dibuat ke dalam file JAR, unggah ke proyek MaxCompute, dan daftarkan fungsi tersebut. Sebagai contoh, nama fungsinya adalah
my_udtf. Klik kanan pada kelas UDTFResource dan pilih Deploy to Server... untuk masuk ke antarmuka pengemasan dan pengunggahan. Sertakan sumber daya yang diperlukan sepertifile_resource.txt,table_resource1, dantable_resource2di bagian Extra resources.
Klik
Project Explorer di pojok kiri atas IDEA, klik kanan proyek MaxCompute target, dan pilih Open Console untuk memulai klien MaxCompute dan menjalankan perintah SQL untuk memanggil UDTF yang baru dibuat. Hasilnya akan ditampilkan.

Contoh perintah SQL:
SELECT my_udtf("10","20") AS (a, b, fileResourceLineCount);