MaxCompute supports third-party engines such as Spark on EMR, StarRocks, Presto, PAI, and Hologres, enabling direct access to MaxCompute data through the Storage API by using the Java SDK. This topic provides code examples for accessing MaxCompute with the Java SDK.
Overview
The primary interfaces for Java SDK access to MaxCompute are listed below.
Main interface | Description |
Used to create a MaxCompute table read session. | |
Represents a session for reading data from a MaxCompute table. | |
Used to read a data segment contained in a data session. |
For Maven users, search for odps-sdk-table-api in the Maven repository to obtain different versions of the Java SDK. The configuration details are as follows.
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-table-api</artifactId>
<version>0.48.8-public</version>
</dependency>MaxCompute offers storage APIs. For more information, see odps-sdk-table-api.
TableReadSessionBuilder
This interface is designed to create a read session for MaxCompute tables. The main interface definitions are as follows. For more details, see the Java-sdk-doc.
Interface definition
public class TableReadSessionBuilder {
public TableReadSessionBuilder table(Table table);
public TableReadSessionBuilder identifier(TableIdentifier identifier);
public TableReadSessionBuilder requiredDataColumns(List<String> requiredDataColumns);
public TableReadSessionBuilder requiredPartitionColumns(List<String> requiredPartitionColumns);
public TableReadSessionBuilder requiredPartitions(List<PartitionSpec> requiredPartitions);
public TableReadSessionBuilder requiredBucketIds(List<Integer> requiredBucketIds);
public TableReadSessionBuilder withSplitOptions(SplitOptions splitOptions);
public TableReadSessionBuilder withArrowOptions(ArrowOptions arrowOptions);
public TableReadSessionBuilder withFilterPredicate(Predicate filterPredicate);
public TableReadSessionBuilder withSettings(EnvironmentSettings settings);
public TableReadSessionBuilder withSessionId(String sessionId);
public TableBatchReadSession buildBatchReadSession();
}Usage notes
Method name | Description |
| Defines the passed parameter Table as the target table in the current session. |
| Defines the passed parameter TableIdentifier as the target table in the current session. |
| Reads data of specified fields and ensures that the order of fields in the returned data is consistent with the order specified by the parameter Note If the parameter |
| Reads data from specified columns in a specified partition of a given table. This is applicable in scenarios where partition pruning is performed. Note If the parameter |
| Reads data of specified partitions of a specified table. This is applicable to partition cropping scenarios. Note If the parameter |
| Reads data of specified Buckets. This is only effective for clustered tables and is applicable to Bucket cropping scenarios. Note If the parameter |
| Splits table data. The SplitOptions object is defined as follows:
Usage example |
| Specifies Arrow data options. The
Usage example |
| Specifies the Predicate Pushdown option. Predicate is defined as follows: Usage example |
| Specifies the runtime environment information. The EnvironmentSettings interface is defined as follows:
|
| Specifies the SessionID information for reloading an existing session. |
| Creates or obtains a table read session. If the input parameter SessionID is provided, the created Session will be returned based on the SessionID. If no input parameter is provided, a new table read session will be created. Note The creation operation has a large overhead. When there are many files, it will take a long time to complete. |
TableBatchReadSession
The TableBatchReadSession interface represents a session for reading from a MaxCompute table. The main interface definitions are as follows.
Interface definition
public interface TableBatchReadSession {
String getId();
TableIdentifier getTableIdentifier();
SessionStatus getStatus();
DataSchema readSchema();
InputSplitAssigner getInputSplitAssigner() throws IOException;
SplitReader<ArrayRecord> createRecordReader(InputSplit split, ReaderOptions options) throws IOException;
SplitReader<VectorSchemaRoot> createArrowReader(InputSplit split, ReaderOptions options) throws IOException;
}Usage notes
Method name | Description |
| Obtains the current session ID. The default timeout for reading the session ID is 24 hours (h). |
| Obtains the table name in the current session. |
| Obtains the current session status. The status values are as follows:
|
| Obtains the table structure information of the current session. The DataSchema is defined as follows:
|
| Obtains the InputSplitAssigner of the current session. The InputSplitAssigner interface defines the method for assigning InputSplit instances in the current read session. Each InputSplit represents a data segment that can be processed by a single SplitReader. The InputSplitAssigner is defined as follows:
|
| Constructs a
|
| Constructs a |
SplitReader
The SplitReader interface is used for reading data from MaxCompute tables.
Interface definition
public interface SplitReader<T> {
boolean hasNext() throws IOException;
T get();
Metrics currentMetricsValues();
void close() throws IOException;
}Usage notes
Method name | Description |
| Checks whether there are more data items to read. If there is a next data item to read, it returns true. Otherwise, it returns false. |
| Obtains the current data item. Before calling this method, you must ensure that there is a next element by using the |
| Obtains the metrics related to SplitReader. |
| Closes the connection after reading is complete. |
Usage example
Set up the environment to connect to the MaxCompute service.
// AccessKey ID and AccessKey Secret of an Alibaba Cloud account or RAM user // The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations is a high-risk operation. We recommend that you use a RAM user to call API operations or perform routine O&M. To create a RAM user, log on to the RAM console // In this example, the AccessKey ID and AccessKey secret are stored in the environment variables. You can also save the AccessKey pair in the configuration file based on your business requirements // We recommend that you do not directly specify the AccessKey ID and AccessKey secret in code to prevent AccessKey pair leaks private static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); private static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); //Quota name used to access MaxCompute String quotaName = "<quotaName>"; //MaxCompute project name String project = "<project>"; //Create an Odps object to connect to MaxCompute service Account account = new AliyunAccount(accessId, accessKey); Odps odps = new Odps(account); odps.setDefaultProject(project); //Connection address of MaxCompute service. Only Alibaba Cloud VPC network is supported odps.setEndpoint(endpoint); Credentials credentials = Credentials.newBuilder().withAccount(odps.getAccount()).withAppAccount(odps.getAppAccount()).build(); EnvironmentSettings settings = EnvironmentSettings.newBuilder().withCredentials(credentials).withServiceEndpoint(odps.getEndpoint()).withQuotaName(quotaName).build();NoteTo obtain the quota names for the exclusive Data Transmission Service resource group (subscription) resources, follow these steps:
Exclusive Data Transmission Service resource group: log on to the MaxCompute console. Then, switch the region in the upper left corner and choose Workspace > Quotas from the left-side navigation pane to view the list of available quotas. For detailed instructions, see Manage quotas for computing resources in the MaxCompute console.
Storage APIs: Log on to the MaxCompute console, choose Tenants > Tenant Property in the left-side navigation pane, and enable Storage API Switch.
For job-level quota authorization, by default, all accounts, including Alibaba Cloud accounts and roles, lack permissions. Authorization is required. For details on authorization, see Authorization.
Perform a table read operation.
Create a data read session to access MaxCompute data.
//Table name corresponding to the MaxCompute project String tableName = "<table.name>"; //Create a table data read session TableReadSessionBuilder scanBuilder = new TableReadSessionBuilder(); TableBatchReadSession scan = scanBuilder.identifier(TableIdentifier.of(project, tableName)).withSettings(settings) .withSplitOptions(SplitOptions.newBuilder() .SplitByByteSize(256 * 1024L * 1024L) .withCrossPartition(false).build()) .requiredDataColumns(Arrays.asList("timestamp")) .requiredPartitionColumns(Arrays.asList("pt1")) .buildBatchReadSession();NoteIn scenarios with large data volumes, network latency, or instability, the creation of data read sessions may take a long time, leading to an automatic switch to an asynchronous process for session creation.
Traverse over the MaxCompute data for each segment, using the Arrow reader to sequentially read and output the data content of each segment.
//Traverse all input segment data and use the Arrow reader to read the data batch in each segment one by one, and finally output the content of each batch of data InputSplitAssigner assigner = scan.getInputSplitAssigner(); for (InputSplit split : assigner.getAllSplits()) { SplitReader<VectorSchemaRoot> reader = scan.createArrowReader(split, ReaderOptions.newBuilder() .withSettings(settings) .withCompressionCodec(CompressionCodec.ZSTD) .withReuseBatch(true) .build()); int rowCount = 0; List<VectorSchemaRoot> batchList = new ArrayList<>(); while (reader.hasNext()) { VectorSchemaRoot data = reader.get(); rowCount += data.getRowCount(); System.out.println(data.contentToTSVString()); } reader.close(); }
Reference
For more information about the introduction to MaxCompute Storage API , see Overview of storage API.