Upload data to MaxCompute in parallel by using the TableTunnel interface with Java's ExecutorService. Each thread owns an exclusive block and RecordWriter—blocks must not be shared across threads.
Prerequisites
Before you begin, make sure you have:
-
A MaxCompute project with a partitioned table to write to
-
The MaxCompute Java SDK added to your project dependencies
-
An AccessKey ID and AccessKey secret stored as environment variables:
-
ALIBABA_CLOUD_ACCESS_KEY_ID -
ALIBABA_CLOUD_ACCESS_KEY_SECRET
-
-
(Recommended) A RAM user with minimum required permissions, rather than an Alibaba Cloud root account. Root account credentials are high-risk because the AccessKey pair has permissions on all API operations. Create a RAM user in the Resource Access Management (RAM) console.
How it works
The upload flow involves four steps:
-
Create a single
UploadSessionfor the target table partition. -
For each thread, open a dedicated
RecordWriterby callinguploadSession.openRecordWriter(blockId). The block ID uniquely identifies that thread's data segment—no two threads share the same block. -
Each thread fills records and writes them through its own
RecordWriter, then closes the writer. -
After all threads finish, commit the upload session with the complete list of block IDs.
Upload data with multiple threads
The following example creates 10 upload threads, each writing 10 records to its own block.
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordWriter;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.odps.tunnel.TableTunnel.UploadSession;
// Each thread owns one block (RecordWriter) and writes 10 records to it.
class UploadThread implements Callable<Boolean> {
private long id;
private RecordWriter recordWriter;
private Record record;
private TableSchema tableSchema;
public UploadThread(long id, RecordWriter recordWriter, Record record,
TableSchema tableSchema) {
this.id = id;
this.recordWriter = recordWriter;
this.record = record;
this.tableSchema = tableSchema;
}
@Override
public Boolean call() {
// Populate each column based on its data type
for (int i = 0; i < tableSchema.getColumns().size(); i++) {
Column column = tableSchema.getColumn(i);
switch (column.getType()) {
case BIGINT:
record.setBigint(i, 1L);
break;
case BOOLEAN:
record.setBoolean(i, true);
break;
case DATETIME:
record.setDatetime(i, new Date());
break;
case DOUBLE:
record.setDouble(i, 0.0);
break;
case STRING:
record.setString(i, "sample");
break;
default:
throw new RuntimeException("Unknown column type: "
+ column.getType());
}
}
boolean success = true;
try {
for (int i = 0; i < 10; i++) {
recordWriter.write(record);
}
} catch (IOException e) {
success = false;
e.printStackTrace();
} finally {
recordWriter.close();
}
return success;
}
}
public class UploadThreadSample {
// Load credentials from environment variables—never hardcode them in source code.
private static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
private static String odpsUrl = "<http://service.odps.aliyun.com/api>";
// Set tunnelUrl to a specific Tunnel endpoint, or leave it blank to use the public endpoint.
// The example below uses the classic network Tunnel endpoint in the China (Shanghai) region.
private static String tunnelUrl = "<http://dt.cn-shanghai.maxcompute.aliyun-inc.com>";
private static String project = "<your project>";
private static String table = "<your table name>";
private static String partition = "<your partition spec>";
private static int threadNum = 10;
public static void main(String args[]) {
Account account = new AliyunAccount(accessId, accessKey);
Odps odps = new Odps(account);
odps.setEndpoint(odpsUrl);
odps.setDefaultProject(project);
try {
TableTunnel tunnel = new TableTunnel(odps);
tunnel.setEndpoint(tunnelUrl);
PartitionSpec partitionSpec = new PartitionSpec(partition);
// Create a single upload session for the target partition
UploadSession uploadSession = tunnel.createUploadSession(project,
table, partitionSpec);
System.out.println("Session Status is : " + uploadSession.getStatus().toString());
// Assign one RecordWriter (block) per thread
ExecutorService pool = Executors.newFixedThreadPool(threadNum);
ArrayList<Callable<Boolean>> callers = new ArrayList<Callable<Boolean>>();
for (int i = 0; i < threadNum; i++) {
RecordWriter recordWriter = uploadSession.openRecordWriter(i);
Record record = uploadSession.newRecord();
callers.add(new UploadThread(i, recordWriter, record,
uploadSession.getSchema()));
}
// Run all threads and wait for them to finish
pool.invokeAll(callers);
pool.shutdown();
// Commit all blocks
Long[] blockList = new Long[threadNum];
for (int i = 0; i < threadNum; i++)
blockList[i] = Long.valueOf(i);
uploadSession.commit(blockList);
System.out.println("upload success!");
} catch (TunnelException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Replace the following placeholders before running the code:
| Placeholder | Description | Example |
|---|---|---|
<your project> |
MaxCompute project name | my_project |
<your table name> |
Target table name | my_table |
<your partition spec> |
Partition specification | ds=20240101 |
Configure the Tunnel endpoint
Set tunnelUrl based on your network type:
| Scenario | tunnelUrl |
|---|---|
| Leave blank | Public endpoint used by default |
| Internal network (classic network, China (Shanghai)) | http://dt.cn-shanghai.maxcompute.aliyun-inc.com |
| Other regions or network types | See Endpoints |
Verify the upload
After the program prints upload success!, verify the data in MaxCompute by running:
SELECT COUNT(*) FROM <your table name> WHERE <your partition spec>;
The count should equal the total number of records written across all threads. In this example: 10 threads x 10 records = 100 records.
Security
Store AccessKey credentials in environment variables, not in source code. For production workloads, use a RAM user with minimum required permissions rather than root account credentials. To create a RAM user, go to the Resource Access Management (RAM) console.
What's next
-
Endpoints — find the Tunnel endpoint for your region and network type