All Products
Search
Document Center

MaxCompute:Example of multithreading uploads

Last Updated:Aug 08, 2024

This topic describes how to call the StreamUploadSession and StreamRecordPack operations to implement multithreading uploads.

import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;

class StreamUploadThread implements Callable<Boolean> {
  private String project;
  private String table;
  private String partition;
  private TableTunnel tunnel;
  public StreamUploadThread(String project, String table, String partition, TableTunnel tunnel) {
    this.project = project;
    this.table = table;
    this.partition = partition;
    this.tunnel = tunnel;
  }
  @Override
  public Boolean call() {
    try {
      PartitionSpec partitionSpec = new PartitionSpec(partition);
      TableTunnel.StreamUploadSession uploadSession = tunnel.buildStreamUploadSession(project, table).setPartitionSpec(partitionSpec).build();
      TableSchema schema = uploadSession.getSchema();
      TableTunnel.StreamRecordPack pack = uploadSession.newRecordPack();
      Record record = uploadSession.newRecord();
      for (int i = 0; i < schema.getColumns().size(); i++) {
        Column column = schema.getColumn(i);
        switch (column.getType()) {
          case BIGINT:
            record.setBigint(i, 1L);
            break;
          case BOOLEAN:
            record.setBoolean(i, true);
            break;
          case DATETIME:
            record.setDatetime(i, new Date());
            break;
          case DOUBLE:
            record.setDouble(i, 0.0);
            break;
          case STRING:
            record.setString(i, "sample");
            break;
          default:
            throw new RuntimeException("Unknown column type: "
                    + column.getType());
        }
      }
      for (int i = 0; i < 10; i++) {
        pack.append(record);
      }

      int retry = 0;
      while (retry < 3) {
        try {
          // A flush succeeds, the data write operation is complete, and the data can be viewed. 
          // A flush succeeds, and the StreamRecordPack object can be reused to avoid memory reclamation caused by frequent memory requests. 
          // A flush fails, and you can retry the operation. 
          // A flush fails, and the StreamRecordPack object cannot be reused. You must create another StreamRecordPack object. 
          String traceId = pack.flush();
          System.out.println("flush success:" + traceId);
          break;
        } catch (IOException e) {
          retry++;
          e.printStackTrace();
          Thread.sleep(500);
        }
      }

      System.out.println("upload success!");
    } catch (TunnelException e) {
      e.printStackTrace();
    } catch (IOException | InterruptedException e) {
      e.printStackTrace();
    }
    return true;
  }
}

public class StreamUploadThreadSample {
  // The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations is a high-risk operation. We recommend that you use a RAM user to call API operations or perform routine O&M. To create a RAM user, log on to the Resource Access Management (RAM) console.
  // In this example, the AccessKey ID and AccessKey secret are configured as environment variables. You can also save your AccessKey pair in the configuration file based on your business requirements.
  // We recommend that you do not directly specify the AccessKey ID and AccessKey secret in code to prevent AccessKey pair leaks.
  private static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
  private static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
   
  private static String odpsEndpoint = "<endpoint>";
   
  private static String tunnelEndpoint = "<tunnel_endpoint>";
  // The name of the MaxCompute project. 
  private static String project = "<your_project>";
  // The name of the table in the MaxCompute project. 
  private static String table = "<your_table_name>";
  // The partition information of the table in the MaxCompute project. 
  private static String partition = "<your_partition_spec>";
  private static int threadNum = 10;
  public static void main(String args[]) {
    Account account = new AliyunAccount(accessId, accessKey);
    Odps odps = new Odps(account);
    odps.setEndpoint(odpsEndpoint);
    odps.setDefaultProject(project);
    try {
      TableTunnel tunnel = new TableTunnel(odps);
      // tunnel.setEndpoint(tunnelEndpoint);
      ExecutorService pool = Executors.newFixedThreadPool(threadNum);
      ArrayList<Callable<Boolean>> callers = new ArrayList<Callable<Boolean>>();
      for (int i = 0; i < threadNum; i++) {
        callers.add(new StreamUploadThread(project, table, partition, tunnel));
      }
      pool.invokeAll(callers);
      pool.shutdown();
      System.out.println("upload success!");
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}