All Products
Search
Document Center

MaxCompute:Unggah data dalam mode multi-threaded

Last Updated:Mar 27, 2026

Unggah data ke MaxCompute secara paralel menggunakan antarmuka TableTunnel bersama ExecutorService dari Java. Setiap thread memiliki satu blok eksklusif dan RecordWriter—blok tidak boleh dibagikan antar thread.

Prasyarat

Sebelum memulai, pastikan Anda telah menyiapkan:

  • Proyek MaxCompute dengan tabel partisi yang akan ditulis

  • MaxCompute Java SDK yang telah ditambahkan ke dependensi proyek Anda

  • ID AccessKey dan Rahasia AccessKey yang disimpan sebagai variabel lingkungan:

    • ALIBABA_CLOUD_ACCESS_KEY_ID

    • ALIBABA_CLOUD_ACCESS_KEY_SECRET

  • (Disarankan) Pengguna RAM dengan izin minimum yang diperlukan, bukan akun root Alibaba Cloud. Kredensial akun root berisiko tinggi karena pasangan AccessKey memiliki izin atas semua operasi API. Buat pengguna RAM di Konsol Resource Access Management (RAM).

Cara kerja

Alur pengunggahan mencakup empat langkah:

  1. Buat satu UploadSession untuk partisi tabel target.

  2. Untuk setiap thread, buka RecordWriter khusus dengan memanggil uploadSession.openRecordWriter(blockId). ID blok secara unik mengidentifikasi segmen data thread tersebut—tidak ada dua thread yang berbagi blok yang sama.

  3. Setiap thread mengisi catatan dan menuliskannya melalui RecordWriter-nya sendiri, lalu menutup writer tersebut.

  4. Setelah semua thread selesai, komit sesi pengunggahan dengan daftar lengkap ID blok.

Unggah data dengan beberapa thread

Contoh berikut membuat 10 thread pengunggahan, masing-masing menulis 10 catatan ke bloknya sendiri.

import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordWriter;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.odps.tunnel.TableTunnel.UploadSession;

// Setiap thread memiliki satu blok (RecordWriter) dan menulis 10 catatan ke dalamnya.
class UploadThread implements Callable<Boolean> {
    private long id;
    private RecordWriter recordWriter;
    private Record record;
    private TableSchema tableSchema;

    public UploadThread(long id, RecordWriter recordWriter, Record record,
                    TableSchema tableSchema) {
        this.id = id;
        this.recordWriter = recordWriter;
        this.record = record;
        this.tableSchema = tableSchema;
    }

    @Override
    public Boolean call() {
        // Isi setiap kolom berdasarkan tipe datanya
        for (int i = 0; i < tableSchema.getColumns().size(); i++) {
            Column column = tableSchema.getColumn(i);
            switch (column.getType()) {
            case BIGINT:
                record.setBigint(i, 1L);
                break;
            case BOOLEAN:
                record.setBoolean(i, true);
                break;
            case DATETIME:
                record.setDatetime(i, new Date());
                break;
            case DOUBLE:
                record.setDouble(i, 0.0);
                break;
            case STRING:
                record.setString(i, "sample");
                break;
            default:
                throw new RuntimeException("Unknown column type: "
                                + column.getType());
            }
        }

        boolean success = true;
        try {
            for (int i = 0; i < 10; i++) {
                recordWriter.write(record);
            }
        } catch (IOException e) {
            success = false;
            e.printStackTrace();
        } finally {
            recordWriter.close();
        }
        return success;
    }
}

public class UploadThreadSample {
    // Muat kredensial dari variabel lingkungan—jangan pernah menyematkannya langsung di kode sumber.
    private static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    private static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");

    private static String odpsUrl = "<http://service.odps.aliyun.com/api>";

    // Atur tunnelUrl ke titik akhir Tunnel tertentu, atau biarkan kosong untuk menggunakan titik akhir publik.
    // Contoh di bawah ini menggunakan titik akhir Tunnel jaringan klasik di wilayah China (Shanghai).
    private static String tunnelUrl = "<http://dt.cn-shanghai.maxcompute.aliyun-inc.com>";

    private static String project = "<your project>";
    private static String table = "<your table name>";
    private static String partition = "<your partition spec>";
    private static int threadNum = 10;

    public static void main(String args[]) {
        Account account = new AliyunAccount(accessId, accessKey);
        Odps odps = new Odps(account);
        odps.setEndpoint(odpsUrl);
        odps.setDefaultProject(project);

        try {
            TableTunnel tunnel = new TableTunnel(odps);
            tunnel.setEndpoint(tunnelUrl);

            PartitionSpec partitionSpec = new PartitionSpec(partition);

            // Buat satu sesi pengunggahan untuk partisi target
            UploadSession uploadSession = tunnel.createUploadSession(project,
                            table, partitionSpec);
            System.out.println("Session Status is : " + uploadSession.getStatus().toString());

            // Tetapkan satu RecordWriter (blok) per thread
            ExecutorService pool = Executors.newFixedThreadPool(threadNum);
            ArrayList<Callable<Boolean>> callers = new ArrayList<Callable<Boolean>>();
            for (int i = 0; i < threadNum; i++) {
                RecordWriter recordWriter = uploadSession.openRecordWriter(i);
                Record record = uploadSession.newRecord();
                callers.add(new UploadThread(i, recordWriter, record,
                                uploadSession.getSchema()));
            }

            // Jalankan semua thread dan tunggu hingga selesai
            pool.invokeAll(callers);
            pool.shutdown();

            // Komit semua blok
            Long[] blockList = new Long[threadNum];
            for (int i = 0; i < threadNum; i++)
                blockList[i] = Long.valueOf(i);
            uploadSession.commit(blockList);
            System.out.println("upload success!");

        } catch (TunnelException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Ganti placeholder berikut sebelum menjalankan kode:

Placeholder Deskripsi Contoh
<your project> Nama proyek MaxCompute my_project
<your table name> Nama tabel target my_table
<your partition spec> Spesifikasi partisi ds=20240101

Konfigurasi titik akhir Tunnel

Atur tunnelUrl berdasarkan jenis jaringan Anda:

Skenario tunnelUrl
Biarkan kosong Menggunakan titik akhir publik secara default
Jaringan internal (jaringan klasik, China (Shanghai)) http://dt.cn-shanghai.maxcompute.aliyun-inc.com
Wilayah atau jenis jaringan lainnya Lihat Endpoints

Verifikasi pengunggahan

Setelah program mencetak upload success!, verifikasi data di MaxCompute dengan menjalankan:

SELECT COUNT(*) FROM <your table name> WHERE <your partition spec>;

Jumlah catatan harus sama dengan total catatan yang ditulis oleh seluruh thread. Pada contoh ini: 10 thread × 10 catatan = 100 catatan.

Keamanan

Simpan kredensial AccessKey dalam variabel lingkungan, bukan di dalam kode sumber. Untuk beban kerja produksi, gunakan pengguna RAM dengan izin minimum yang diperlukan, bukan kredensial akun root. Untuk membuat pengguna RAM, buka Konsol Resource Access Management (RAM).

Langkah berikutnya

  • Endpoints — temukan titik akhir Tunnel untuk wilayah dan jenis jaringan Anda