MaxCompute mendukung mesin pihak ketiga seperti Spark on EMR, StarRocks, Presto, PAI, dan Hologres, memungkinkan akses langsung ke data MaxCompute melalui Storage API menggunakan Java SDK. Topik ini menyediakan contoh kode untuk mengakses MaxCompute dengan Java SDK.
Ikhtisar
Antarmuka utama untuk akses Java SDK ke MaxCompute tercantum di bawah ini.
Antarmuka utama | Deskripsi |
Digunakan untuk membuat sesi pembacaan tabel MaxCompute. | |
Mewakili sesi untuk membaca data dari tabel MaxCompute. | |
Digunakan untuk membaca segmen data yang terkandung dalam sesi data. |
Untuk pengguna Maven, cari odps-sdk-table-api di repositori Maven untuk mendapatkan versi berbeda dari Java SDK. Detail konfigurasi adalah sebagai berikut.
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-table-api</artifactId>
<version>0.48.8-public</version>
</dependency>MaxCompute menawarkan API penyimpanan. Untuk informasi lebih lanjut, lihat odps-sdk-table-api.
TableReadSessionBuilder
Antarmuka ini dirancang untuk membuat sesi pembacaan untuk tabel MaxCompute. Definisi antarmuka utama adalah sebagai berikut. Untuk detail lebih lanjut, lihat Java-sdk-doc.
Definisi Antarmuka
public class TableReadSessionBuilder {
public TableReadSessionBuilder table(Table table);
public TableReadSessionBuilder identifier(TableIdentifier identifier);
public TableReadSessionBuilder requiredDataColumns(List<String> requiredDataColumns);
public TableReadSessionBuilder requiredPartitionColumns(List<String> requiredPartitionColumns);
public TableReadSessionBuilder requiredPartitions(List<PartitionSpec> requiredPartitions);
public TableReadSessionBuilder requiredBucketIds(List<Integer> requiredBucketIds);
public TableReadSessionBuilder withSplitOptions(SplitOptions splitOptions);
public TableReadSessionBuilder withArrowOptions(ArrowOptions arrowOptions);
public TableReadSessionBuilder withFilterPredicate(Predicate filterPredicate);
public TableReadSessionBuilder withSettings(EnvironmentSettings settings);
public TableReadSessionBuilder withSessionId(String sessionId);
public TableBatchReadSession buildBatchReadSession();
}Catatan Penggunaan
Nama Metode | Deskripsi |
| Mendefinisikan parameter Table yang dilewatkan sebagai tabel target dalam sesi saat ini. |
| Mendefinisikan parameter TableIdentifier yang dilewatkan sebagai tabel target dalam sesi saat ini. |
| Membaca data dari bidang yang ditentukan dan memastikan bahwa urutan bidang dalam data yang dikembalikan konsisten dengan urutan yang ditentukan oleh parameter Catatan Jika parameter |
| Membaca data dari kolom tertentu dalam partisi tertentu dari tabel yang diberikan. Ini berlaku dalam skenario di mana pemangkasan partisi dilakukan. Catatan Jika parameter |
| Membaca data dari partisi tertentu dari tabel tertentu. Ini berlaku untuk skenario pemotongan partisi. Catatan Jika parameter |
| Membaca data dari Bucket tertentu. Ini hanya efektif untuk tabel terkluster dan berlaku untuk skenario pemotongan Bucket. Catatan Jika parameter |
| Membagi data tabel. Objek SplitOptions didefinisikan sebagai berikut:
Contoh penggunaan |
| Menentukan opsi data Arrow. Objek
Contoh penggunaan |
| Menentukan opsi Predicate Pushdown. Predicate didefinisikan sebagai berikut: Contoh penggunaan |
| Menentukan informasi lingkungan runtime. Antarmuka EnvironmentSettings didefinisikan sebagai berikut:
|
| Menentukan informasi SessionID untuk memuat ulang sesi yang ada. |
| Membuat atau mendapatkan sesi pembacaan tabel. Jika parameter input SessionID diberikan, Session yang dibuat akan dikembalikan berdasarkan SessionID. Jika tidak ada parameter input yang diberikan, sesi pembacaan tabel baru akan dibuat. Catatan Operasi pembuatan memiliki overhead yang besar. Ketika ada banyak file, akan memakan waktu lama untuk menyelesaikannya. |
TableBatchReadSession
Antarmuka TableBatchReadSession mewakili sesi untuk membaca dari tabel MaxCompute. Definisi antarmuka utama adalah sebagai berikut.
Definisi Antarmuka
public interface TableBatchReadSession {
String getId();
TableIdentifier getTableIdentifier();
SessionStatus getStatus();
DataSchema readSchema();
InputSplitAssigner getInputSplitAssigner() throws IOException;
SplitReader<ArrayRecord> createRecordReader(InputSplit split, ReaderOptions options) throws IOException;
SplitReader<VectorSchemaRoot> createArrowReader(InputSplit split, ReaderOptions options) throws IOException;
}Catatan Penggunaan
Nama Metode | Deskripsi |
| Mendapatkan ID sesi saat ini. Waktu habis bacaan ID sesi default adalah 24 jam (h). |
| Mendapatkan nama tabel dalam sesi saat ini. |
| Mendapatkan status sesi saat ini.Nilai status adalah sebagai berikut:
|
| Mendapatkan informasi struktur tabel dari sesi saat ini. DataSchema didefinisikan sebagai berikut:
|
| Mendapatkan InputSplitAssigner dari sesi saat ini. Antarmuka InputSplitAssigner mendefinisikan metode untuk menetapkan instance InputSplit dalam sesi pembacaan saat ini. Setiap InputSplit mewakili segmen data yang dapat diproses oleh satu SplitReader. InputSplitAssigner didefinisikan sebagai berikut:
|
| Membangun objek
|
| Membangun objek |
SplitReader
Antarmuka SplitReader digunakan untuk membaca data dari tabel MaxCompute.
Definisi Antarmuka
public interface SplitReader<T> {
boolean hasNext() throws IOException;
T get();
Metrics currentMetricsValues();
void close() throws IOException;
}Catatan Penggunaan
Nama Metode | Deskripsi |
| Memeriksa apakah ada item data lain untuk dibaca. Jika ada item data berikutnya untuk dibaca, maka mengembalikan true. Jika tidak, mengembalikan false. |
| Mendapatkan item data saat ini. Sebelum memanggil metode ini, Anda harus memastikan bahwa ada elemen berikutnya dengan menggunakan metode |
| Mendapatkan metrik terkait SplitReader. |
| Menutup koneksi setelah pembacaan selesai. |
Contoh Penggunaan
Siapkan lingkungan untuk terhubung ke layanan MaxCompute.
// AccessKey ID dan AccessKey Secret dari akun Alibaba Cloud atau RAM user // Pasangan AccessKey dari akun Alibaba Cloud memiliki izin pada semua operasi API. Menggunakan kredensial ini untuk melakukan operasi merupakan operasi berisiko tinggi. Kami merekomendasikan Anda menggunakan RAM user untuk memanggil operasi API atau melakukan pemeliharaan rutin. Untuk membuat RAM user, masuk ke konsol RAM // Dalam contoh ini, AccessKey ID dan AccessKey secret disimpan dalam variabel lingkungan. Anda juga dapat menyimpan pasangan AccessKey dalam file konfigurasi sesuai dengan kebutuhan bisnis Anda // Kami merekomendasikan Anda untuk tidak secara langsung menentukan AccessKey ID dan AccessKey secret dalam kode untuk mencegah kebocoran pasangan AccessKey private static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); private static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); //Nama Quota yang digunakan untuk mengakses MaxCompute String quotaName = "<quotaName>"; //Nama proyek MaxCompute String project = "<project>"; //Buat objek Odps untuk terhubung ke layanan MaxCompute Account account = new AliyunAccount(accessId, accessKey); Odps odps = new Odps(account); odps.setDefaultProject(project); //Alamat koneksi layanan MaxCompute. Hanya jaringan VPC Alibaba Cloud yang didukung odps.setEndpoint(endpoint); Credentials credentials = Credentials.newBuilder().withAccount(odps.getAccount()).withAppAccount(odps.getAppAccount()).build(); EnvironmentSettings settings = EnvironmentSettings.newBuilder().withCredentials(credentials).withServiceEndpoint(odps.getEndpoint()).withQuotaName(quotaName).build();CatatanUntuk mendapatkan nama kuota untuk grup sumber daya Data Transmission Service eksklusif (langganan) , ikuti langkah-langkah berikut:
Grup sumber daya Data Transmission Service eksklusif: Masuk ke konsol MaxCompute. Lalu, beralih wilayah di pojok kiri atas dan pilih Workspace > Quotas dari panel navigasi sebelah kiri untuk melihat daftar kuota yang tersedia. Untuk instruksi rinci, lihat Kelola kuota untuk sumber daya komputasi di konsol MaxCompute.
API Penyimpanan: Masuk ke konsol MaxCompute, pilih Tenants > Tenant Property di panel navigasi sebelah kiri, dan aktifkan Storage API Switch.
Untuk otorisasi kuota tingkat pekerjaan, secara default, semua akun, termasuk akun Alibaba Cloud dan peran, tidak memiliki izin. Otorisasi diperlukan. Untuk detail tentang otorisasi, lihat Otorisasi.
Lakukan operasi pembacaan tabel.
Buat sesi pembacaan data untuk mengakses data MaxCompute.
//Nama tabel yang sesuai dengan proyek MaxCompute String tableName = "<table.name>"; //Buat sesi pembacaan data tabel TableReadSessionBuilder scanBuilder = new TableReadSessionBuilder(); TableBatchReadSession scan = scanBuilder.identifier(TableIdentifier.of(project, tableName)).withSettings(settings) .withSplitOptions(SplitOptions.newBuilder() .SplitByByteSize(256 * 1024L * 1024L) .withCrossPartition(false).build()) .requiredDataColumns(Arrays.asList("timestamp")) .requiredPartitionColumns(Arrays.asList("pt1")) .buildBatchReadSession();CatatanDalam skenario dengan volume data besar, latensi jaringan, atau ketidakstabilan, pembuatan sesi pembacaan data mungkin memakan waktu lama, sehingga proses otomatis beralih ke mode asinkron untuk pembuatan sesi.
Telusuri data MaxCompute untuk setiap segmen, gunakan pembaca Arrow untuk membaca dan menampilkan isi data setiap segmen secara berurutan.
//Telusuri semua data segmen input dan gunakan pembaca Arrow untuk membaca batch data di setiap segmen satu per satu, dan akhirnya menampilkan isi setiap batch data InputSplitAssigner assigner = scan.getInputSplitAssigner(); for (InputSplit split : assigner.getAllSplits()) { SplitReader<VectorSchemaRoot> reader = scan.createArrowReader(split, ReaderOptions.newBuilder() .withSettings(settings) .withCompressionCodec(CompressionCodec.ZSTD) .withReuseBatch(true) .build()); int rowCount = 0; List<VectorSchemaRoot> batchList = new ArrayList<>(); while (reader.hasNext()) { VectorSchemaRoot data = reader.get(); rowCount += data.getRowCount(); System.out.println(data.contentToTSVString()); } reader.close(); }
Referensi
Untuk informasi lebih lanjut tentang pengenalan API Penyimpanan MaxCompute , lihat Ikhtisar API Penyimpanan.