すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:マルチスレッドのダウンロード

最終更新日:Jan 08, 2025

このトピックでは、サンプルコードを使用して、TableTunnelインターフェイスを使用してマルチスレッドモードでデータをダウンロードする方法について説明します。 TableTunnelインターフェイスのみがマルチスレッドのダウンロードをサポートしています。

import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordReader;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TableTunnel.DownloadSession;
import com.aliyun.odps.tunnel.TunnelException;

class DownloadThread implements Callable<Long> {
    private long id;
    private RecordReader recordReader;
    private TableSchema tableSchema;

    public DownloadThread(int id, RecordReader recordReader, TableSchema tableSchema) {
        this.id = id;
        this.recordReader = recordReader;
        this.tableSchema = tableSchema;
    }

    @Override
    public Long call() {
        Long recordNum = 0L;
        try {
            Record record;
            while ((record = recordReader.read()) != null) {
                recordNum++;
                System.out.print("Thread " + id + "\t");
                consumeRecord(record, tableSchema);
            }
            recordReader.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return recordNum;
    }

    private static void consumeRecord(Record record, TableSchema schema) {
        for (int i = 0; i < schema.getColumns().size(); i++) {
            Column column = schema.getColumn(i);
            String colValue = null;
            switch (column.getType()) {
                case BIGINT: {
                    Long v = record.getBigint(i);
                    colValue = v == null ? null : v.toString();
                    break;
                }
                case BOOLEAN: {
                    Boolean v = record.getBoolean(i);
                    colValue = v == null ? null : v.toString();
                    break;
                }
                case DATETIME: {
                    Date v = record.getDatetime(i);
                    colValue = v == null ? null : v.toString();
                    break;
                }
                case DOUBLE: {
                    Double v = record.getDouble(i);
                    colValue = v == null ? null : v.toString();
                    break;
                }
                case STRING: {
                    String v = record.getString(i);
                    colValue = v == null ? null : v.toString();
                    break;
                }
                default:
                    throw new RuntimeException("Unknown column type: " + column.getType());
            }
            System.out.print(colValue == null ? "null" : colValue);
            if (i != this.tableSchema.getColumns().size() - 1) System.out.print("\t");
        }
        System.out.println();
    }
}

public class DownloadThreadSample {
    // The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations is a high-risk operation. We recommend that you use a RAM user to call API operations or perform routine O&M. To create a RAM user, log on to the Resource Access Management (RAM) console.
		// In this example, the AccessKey ID and AccessKey secret are configured as environment variables. You can also save your AccessKey pair in the configuration file based on your business requirements.
		// We recommend that you do not directly specify the AccessKey ID and AccessKey secret in code to prevent AccessKey pair leaks.
		private static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
		private static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    private static String odpsUrl = "http://service.odps.aliyun.com/api";
    private static String tunnelUrl = "http://dt.cn-shanghai.maxcompute.aliyun-inc.com";
    // Configure the tunnelUrl parameter. By default, data is transmitted on the Internet. If you need to transmit data on an internal network, you must configure the tunnelUrl parameter based on your business needs. In this example, the Tunnel endpoint of the cloud product interconnection network in the China (Shanghai) region is used. 
    private static String project = "<your project>";
    private static String table = "<your table name>";
    private static String partition = "<your partition spec>";
    private static int threadNum = 10;

    public static void main(String args[]) {
        Account account = new AliyunAccount(accessId, accessKey);
        Odps odps = new Odps(account);
        odps.setEndpoint(odpsUrl);
        odps.setDefaultProject(project);
        TableTunnel tunnel = new TableTunnel(odps);
        tunnel.setEndpoint(tunnelUrl);
        // Configure the tunnelUrl parameter. 
        PartitionSpec partitionSpec = new PartitionSpec(partition);
        DownloadSession downloadSession;
        try {
            downloadSession = tunnel.createDownloadSession(project, table, partitionSpec);
            System.out.println("Session Status is : " + downloadSession.getStatus().toString());
            long count = downloadSession.getRecordCount();
            System.out.println("RecordCount is: " + count);
            ExecutorService pool = Executors.newFixedThreadPool(threadNum);
            ArrayList<Callable<Long>> callers = new ArrayList<Callable<Long>>();
            long start = 0;
            long step = count / threadNum;
            for (int i = 0; i < threadNum - 1; i++) {
                RecordReader recordReader = downloadSession.openRecordReader(step * i, step);
                callers.add(new DownloadThread(i, recordReader, downloadSession.getSchema()));
            }
            RecordReader recordReader = downloadSession.openRecordReader(step * (threadNum - 1), count - ((threadNum - 1) * step));
            callers.add(new DownloadThread(threadNum - 1, recordReader, downloadSession.getSchema()));
            Long downloadNum = 0L;
            List<Future<Long>> recordNum = pool.invokeAll(callers);
            for (Future<Long> num : recordNum) downloadNum += num.get();
            System.out.println("Record Count is: " + downloadNum);
            pool.shutdown();
        } catch (TunnelException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}
説明

tunnelUrlパラメーターを特定のTunnelエンドポイントに設定するか、パラメーターを空のままにすることができます。

  • tunnelUrlパラメーターを特定のTunnelエンドポイントに設定すると、指定されたエンドポイントが使用されます。

  • tunnelUrlパラメーターを空のままにすると、デフォルトでパブリックエンドポイントが使用されます。

  • この例では、中国 (上海) リージョンのクラウド製品相互接続ネットワークのトンネルエンドポイントが使用されます。 他のリージョンのTuunelエンドポイントの詳細については、「エンドポイント」をご参照ください。