本文為您介紹如何使用StreamUploadSession和StreamRecordPack介面實現多線程上傳。
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;
class StreamUploadThread implements Callable<Boolean> {
private String project;
private String table;
private String partition;
private TableTunnel tunnel;
public StreamUploadThread(String project, String table, String partition, TableTunnel tunnel) {
this.project = project;
this.table = table;
this.partition = partition;
this.tunnel = tunnel;
}
@Override
public Boolean call() {
try {
PartitionSpec partitionSpec = new PartitionSpec(partition);
TableTunnel.StreamUploadSession uploadSession = tunnel.buildStreamUploadSession(project, table).setPartitionSpec(partitionSpec).build();
TableSchema schema = uploadSession.getSchema();
TableTunnel.StreamRecordPack pack = uploadSession.newRecordPack();
Record record = uploadSession.newRecord();
for (int i = 0; i < schema.getColumns().size(); i++) {
Column column = schema.getColumn(i);
switch (column.getType()) {
case BIGINT:
record.setBigint(i, 1L);
break;
case BOOLEAN:
record.setBoolean(i, true);
break;
case DATETIME:
record.setDatetime(i, new Date());
break;
case DOUBLE:
record.setDouble(i, 0.0);
break;
case STRING:
record.setString(i, "sample");
break;
default:
throw new RuntimeException("Unknown column type: "
+ column.getType());
}
}
for (int i = 0; i < 10; i++) {
pack.append(record);
}
int retry = 0;
while (retry < 3) {
try {
// flush成功表示資料寫入成功,寫入成功後資料立即可見。
// flush成功後pack對象可以複用,避免頻繁申請記憶體導致記憶體回收。
// flush失敗可以直接重試。
// flush失敗後pack對象不可重用,需要重新建立新的StreamRecordPack對象。
String traceId = pack.flush();
System.out.println("flush success:" + traceId);
break;
} catch (IOException e) {
retry++;
e.printStackTrace();
Thread.sleep(500);
}
}
System.out.println("upload success!");
} catch (TunnelException e) {
e.printStackTrace();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
return true;
}
}
public class StreamUploadThreadSample {
// 阿里雲帳號AccessKey擁有所有API的存取權限,風險很高。強烈建議您建立並使用RAM使用者進行API訪問或日常營運,請登入RAM控制台建立RAM使用者
// 此處以把AccessKey 和 AccessKeySecret 儲存在環境變數為例說明。您也可以根據業務需要,儲存到設定檔裡
// 強烈建議不要把 AccessKey 和 AccessKeySecret 儲存到代碼裡,會存在密鑰泄漏風險
private static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// MaxCompute專案的Endpoint資訊,詳情請參見Endpoint。
private static String odpsEndpoint = "<endpoint>";
// MaxCompute專案的Tunnel Endpoint資訊,詳情請參見Endpoint。
private static String tunnelEndpoint = "<tunnel_endpoint>";
// MaxCompute專案的名稱。
private static String project = "<your_project>";
// MaxCompute專案中的表名稱。
private static String table = "<your_table_name>";
// MaxCompute專案中的表的分區資訊。
private static String partition = "<your_partition_spec>";
private static int threadNum = 10;
public static void main(String args[]) {
Account account = new AliyunAccount(accessId, accessKey);
Odps odps = new Odps(account);
odps.setEndpoint(odpsEndpoint);
odps.setDefaultProject(project);
try {
TableTunnel tunnel = new TableTunnel(odps);
// tunnel.setEndpoint(tunnelEndpoint);
ExecutorService pool = Executors.newFixedThreadPool(threadNum);
ArrayList<Callable<Boolean>> callers = new ArrayList<Callable<Boolean>>();
for (int i = 0; i < threadNum; i++) {
callers.add(new StreamUploadThread(project, table, partition, tunnel));
}
pool.invokeAll(callers);
pool.shutdown();
System.out.println("upload success!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}