All Products
Search
Document Center

MaxCompute:Example of data upload in multithreading mode

Last Updated:Aug 04, 2023

This topic uses sample code to describe how to use the TableTunnel interface to upload data in multithreading mode.

import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import com.aliyun.odps.Column;
 import com.aliyun.odps.Odps;
 import com.aliyun.odps.PartitionSpec;
 import com.aliyun.odps.TableSchema;
 import com.aliyun.odps.account.Account;
 import com.aliyun.odps.account.AliyunAccount;
 import com.aliyun.odps.data.Record;
 import com.aliyun.odps.data.RecordWriter;
 import com.aliyun.odps.tunnel.TableTunnel;
 import com.aliyun.odps.tunnel.TunnelException;
 import com.aliyun.odps.tunnel.TableTunnel.UploadSession;
 class UploadThread implements Callable<Boolean> {
         private long id;
         private RecordWriter recordWriter;
         private Record record;
         private TableSchema tableSchema;
         public UploadThread(long id, RecordWriter recordWriter, Record record,
                         TableSchema tableSchema) {
                 this.id = id;
                 this.recordWriter = recordWriter;
                 this.record = record;
                 this.tableSchema = tableSchema;
         }
         @Override
         public Boolean call() {
                 for (int i = 0; i < tableSchema.getColumns().size(); i++) {
                         Column column = tableSchema.getColumn(i);
                         switch (column.getType()) {
                         case BIGINT:
                                 record.setBigint(i, 1L);
                                 break;
                         case BOOLEAN:
                                 record.setBoolean(i, true);
                                 break;
                         case DATETIME:
                                 record.setDatetime(i, new Date());
                                 break;
                         case DOUBLE:
                                 record.setDouble(i, 0.0);
                                 break;
                         case STRING:
                                 record.setString(i, "sample");
                                 break;
                         default:
                                 throw new RuntimeException("Unknown column type: "
                                                 + column.getType());
                         }
                 }
                boolean success = true;
                try {
	            for (int i = 0; i < 10; i++) {
                         recordWriter.write(record);
	            }
                } catch (IOException e) {
 	            success = false;
                    e.printStackTrace();
                } finally {
 	            recordWriter.close();
                }
                    return success;
               
 }
 public class UploadThreadSample {
         // The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations is a high-risk operation. We recommend that you use a RAM user to call API operations or perform routine O&M. To create a RAM user, log on to the Resource Access Management (RAM) console.
         // In this example, the AccessKey ID and AccessKey secret are configured as environment variables. You can also save your AccessKey pair in the configuration file based on your business requirements.
         // We recommend that you do not directly specify the AccessKey ID and AccessKey secret in code to prevent AccessKey pair leaks.
			 	 private static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
				 private static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
         private static String odpsUrl = "<http://service.odps.aliyun.com/api>";
         private static String tunnelUrl = "<http://dt.cn-shanghai.maxcompute.aliyun-inc.com>";
         //Configure the tunnelUrl parameter. By default, data is transmitted on the Internet. If you need to transmit data on an internal network, you must set the tunnelUrl parameter based on your business needs. In this example, the Tunnel endpoint of the classic network in the China (Shanghai) region is used. 
         private static String project = "<your project>";
         private static String table = "<your table name>";
         private static String partition = "<your partition spec>";
         private static int threadNum = 10;
         public static void main(String args[]) {
                 Account account = new AliyunAccount(accessId, accessKey);
                 Odps odps = new Odps(account);
                 odps.setEndpoint(odpsUrl);
                 odps.setDefaultProject(project);
                 try {
                         TableTunnel tunnel = new TableTunnel(odps);
                         tunnel.setEndpoint(tunnelUrl);// Configure the tunnelUrl parameter.
                         PartitionSpec partitionSpec = new PartitionSpec(partition);
                         UploadSession uploadSession = tunnel.createUploadSession(project,
                                         table, partitionSpec);
                         System.out.println("Session Status is : "
                                         + uploadSession.getStatus().toString());
                         ExecutorService pool = Executors.newFixedThreadPool(threadNum);
                         ArrayList<Callable<Boolean>> callers = new ArrayList<Callable<Boolean>>();
                         for (int i = 0; i < threadNum; i++) {
                                 RecordWriter recordWriter = uploadSession.openRecordWriter(i);
                                 Record record = uploadSession.newRecord();
                                 callers.add(new UploadThread(i, recordWriter, record,
                                                 uploadSession.getSchema()));
                         }
                         pool.invokeAll(callers);
                         pool.shutdown();
                         Long[] blockList = new Long[threadNum];
                         for (int i = 0; i < threadNum; i++)
                                 blockList[i] = Long.valueOf(i);
                         uploadSession.commit(blockList);
                         System.out.println("upload success!");
                 } catch (TunnelException e) {
                         e.printStackTrace();
                 } catch (IOException e) {
                         e.printStackTrace();
                 } catch (InterruptedException e) {
                         e.printStackTrace();
                 }
         }
 }
Note

You can set the tunnelUrl parameter to a specific Tunnel endpoint or leave the parameter empty.

  • If you set the tunnelUrl parameter to a specific Tunnel endpoint, the specified endpoint is used.

  • If you leave the tunnelUrl parameter empty, a public endpoint is used by default.

  • In this example, the Tunnel endpoint of the classic network in the China (Shanghai) region is used. For more information about endpoints and regions, see Endpoints.