The MaxCompute Storage API lets third-party engines — Spark on EMR, StarRocks, Presto, PAI, and Hologres — read MaxCompute table data directly. This topic shows you how to read MaxCompute table data using the Java SDK.
The SDK exposes three primary interfaces:
| Interface | Purpose |
|---|---|
TableReadSessionBuilder |
Builds a read session for a MaxCompute table |
TableBatchReadSession |
Represents an active read session; provides splits and readers |
SplitReader |
Reads a single split from a session |
Prerequisites
Before you begin, ensure that you have:
-
A MaxCompute project with at least one table
-
An Alibaba Cloud AccessKey ID and AccessKey Secret with permissions to access the project
-
(Optional) A quota name for an exclusive Data Transmission Service (DTS) resource group
Only Alibaba Cloud VPC network connections are supported.
Add the Maven dependency
Search for odps-sdk-table-api in the Maven repository to find available versions. Add the following dependency to your pom.xml:
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-table-api</artifactId>
<version>0.48.8-public</version>
</dependency>
For the SDK source code, see odps-sdk-table-api on GitHub.
Read table data
The read flow follows three steps: configure credentials and environment settings, create a read session, then iterate over splits to read data.
Step 1: Configure credentials and environment settings
Store your AccessKey credentials in environment variables rather than hardcoding them in your source code.
// Read credentials from environment variables
String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// Replace <project> with your MaxCompute project name
// Replace <endpoint> with the VPC endpoint for your region
// Replace <quotaName> with your quota name
String project = "<project>";
String endpoint = "<endpoint>";
String quotaName = "<quotaName>";
Account account = new AliyunAccount(accessId, accessKey);
Odps odps = new Odps(account);
odps.setDefaultProject(project);
odps.setEndpoint(endpoint);
Credentials credentials = Credentials.newBuilder()
.withAccount(odps.getAccount())
.withAppAccount(odps.getAppAccount())
.build();
EnvironmentSettings settings = EnvironmentSettings.newBuilder()
.withCredentials(credentials)
.withServiceEndpoint(odps.getEndpoint())
.withQuotaName(quotaName)
.build();
To find the project name, log in to the MaxCompute console, switch to the target region, and go to Workspace > Projects.
To find the quota name:
-
Exclusive DTS resource group: Go to Workspace > Quotas in the MaxCompute console. For details, see Manage quotas for computing resources in the MaxCompute console.
-
Storage API: Go to Tenants > Tenant Property and enable Storage API Switch.
Job-level quota authorization is not granted by default to any account, including Alibaba Cloud accounts and roles. Authorize access before using the API. For details, see Authorization.
For endpoint values by region, see Endpoints.
Step 2: Create a read session
Build a TableBatchReadSession to specify the table, columns, partitions, and split strategy for your read job.
// Replace <table-name> with your table name
String tableName = "<table-name>";
TableBatchReadSession scan = new TableReadSessionBuilder()
.identifier(TableIdentifier.of(project, tableName))
.withSettings(settings)
.withSplitOptions(SplitOptions.newBuilder()
.SplitByByteSize(256 * 1024L * 1024L) // 256 MB per split
.withCrossPartition(false)
.build())
.requiredDataColumns(Arrays.asList("timestamp"))
.requiredPartitionColumns(Arrays.asList("pt1"))
.buildBatchReadSession();
In scenarios with large data volumes, network latency, or instability, the creation of data read sessions may take a long time, leading to an automatic switch to an asynchronous process for session creation.
Step 3: Iterate splits and read data
Get the InputSplitAssigner from the session, iterate over each split, and read data using an Arrow reader.
InputSplitAssigner assigner = scan.getInputSplitAssigner();
for (InputSplit split : assigner.getAllSplits()) {
SplitReader<VectorSchemaRoot> reader = scan.createArrowReader(
split,
ReaderOptions.newBuilder()
.withSettings(settings)
.withCompressionCodec(CompressionCodec.ZSTD)
.withReuseBatch(true)
.build()
);
while (reader.hasNext()) {
VectorSchemaRoot batch = reader.get();
// batch.getRowCount() returns the number of rows in this batch
// batch.contentToTSVString() prints the batch as tab-separated values
System.out.println(batch.contentToTSVString());
}
reader.close();
}
When transmitting large amounts of Arrow data, enabling ZSTD or LZ4 compression reduces transfer time caused by network bandwidth limits. If no compression codec is specified, data is sent uncompressed.
Common read patterns
The following examples show minimal, self-contained patterns for the most common operations. All examples build on the settings object from Step 1.
Read specific columns (column pruning)
Use requiredDataColumns to read only the columns you need. The returned data preserves the column order you specify.
TableBatchReadSession scan = new TableReadSessionBuilder()
.identifier(TableIdentifier.of(project, tableName))
.withSettings(settings)
.requiredDataColumns(Arrays.asList("user_id", "event_time", "action"))
.buildBatchReadSession();
If requiredDataColumns is empty, all partition data is returned.
Read specific partitions (partition pruning)
Use requiredPartitions to limit the read to specific partitions. If requiredPartitions is empty, all partitions are read.
Push down filter predicates
Use withFilterPredicate to push filter conditions to the server, reducing the amount of data transferred.
// Filter: c1 > 20000 AND c2 < 100000
BinaryPredicate c1 = new BinaryPredicate(
BinaryPredicate.Operator.GREATER_THAN,
Attribute.of("c1"),
Constant.of(20000)
);
BinaryPredicate c2 = new BinaryPredicate(
BinaryPredicate.Operator.LESS_THAN,
Attribute.of("c2"),
Constant.of(100000)
);
CompoundPredicate predicate = new CompoundPredicate(
CompoundPredicate.Operator.AND,
ImmutableList.of(c1, c2)
);
TableBatchReadSession scan = new TableReadSessionBuilder()
.identifier(TableIdentifier.of(project, tableName))
.withSettings(settings)
.withFilterPredicate(predicate)
.buildBatchReadSession();
Read by row offset
Use SplitByRowOffset when you need precise control over which rows each split covers — for example, to distribute work evenly across parallel readers.
TableBatchReadSession scan = new TableReadSessionBuilder()
.identifier(TableIdentifier.of(project, tableName))
.withSettings(settings)
.withSplitOptions(SplitOptions.newBuilder()
.SplitByRowOffset()
.build())
.buildBatchReadSession();
InputSplitAssigner assigner = scan.getInputSplitAssigner();
long totalRows = assigner.getTotalRowCount();
long recordsPerSplit = 10000;
for (long offset = 0; offset < totalRows; offset += recordsPerSplit) {
long count = Math.min(recordsPerSplit, totalRows - offset);
InputSplit split = assigner.getSplitByRowOffset(offset, count);
// Create a reader and process the split
}
API reference
TableReadSessionBuilder
Builds a TableBatchReadSession. For the full Javadoc, see odps-sdk-table-api Javadoc.
| Method | Description |
|---|---|
table(Table table) |
Sets the target table for the session. |
identifier(TableIdentifier identifier) |
Sets the target table by project and table name. |
requiredDataColumns(List<String> columns) |
Reads only the specified data columns, in the order listed. If empty, all partition data is returned. |
requiredPartitionColumns(List<String> columns) |
Reads only the specified partition columns. If empty, all partition columns are returned. |
requiredPartitions(List<PartitionSpec> partitions) |
Reads only the specified partitions. If empty, all partitions are read. |
requiredBucketIds(List<Integer> bucketIds) |
Reads only the specified buckets. Applies to clustered tables only. If empty, all buckets are read. |
withSplitOptions(SplitOptions options) |
Configures the split strategy. See SplitOptions. |
withArrowOptions(ArrowOptions options) |
Configures Arrow timestamp and datetime units. See ArrowOptions. |
withFilterPredicate(Predicate predicate) |
Pushes a filter predicate to the server to reduce data transfer. See Predicate. |
withSettings(EnvironmentSettings settings) |
Sets the runtime environment, including credentials, endpoint, and quota. See EnvironmentSettings. |
withSessionId(String sessionId) |
Reloads an existing session by ID instead of creating a new one. |
buildBatchReadSession() |
Creates a new session or reloads an existing one if a session ID was provided. Session creation has significant overhead when the table has many files. |
SplitOptions
SplitOptions controls how table data is divided into splits.
public class SplitOptions {
public static SplitOptions.Builder newBuilder();
public static class Builder {
public Builder SplitByByteSize(long splitByteSize);
public Builder SplitByRowOffset();
public Builder withCrossPartition(boolean crossPartition);
public Builder withMaxFileNum(int splitMaxFileNum);
public SplitOptions build();
}
}
| Method | Description | Default |
|---|---|---|
SplitByByteSize(long bytes) |
Splits data so each split does not exceed bytes. Minimum value: 10 MB (10×1024×1024). |
256 MB (256×1024×1024) |
SplitByRowOffset() |
Splits data by row count, enabling reads from a specified row index. | — |
withCrossPartition(boolean) |
If true, a single split may span multiple partitions. |
true |
withMaxFileNum(int) |
Sets the maximum number of physical files per split, producing more splits for large tables. | No limit |
Example:
// Split by byte size
SplitOptions bySize = SplitOptions.newBuilder()
.SplitByByteSize(256 * 1024L * 1024L)
.build();
// Split by row offset
SplitOptions byRow = SplitOptions.newBuilder()
.SplitByRowOffset()
.build();
// Split by row offset, max 1 file per split
SplitOptions byRowOneFile = SplitOptions.newBuilder()
.SplitByRowOffset()
.withMaxFileNum(1)
.build();
ArrowOptions
ArrowOptions controls the time unit for Timestamp and Datetime columns in Arrow output.
public class ArrowOptions {
public static Builder newBuilder();
public static class Builder {
public Builder withTimestampUnit(TimestampUnit unit);
public Builder withDatetimeUnit(TimestampUnit unit);
public ArrowOptions build();
}
public enum TimestampUnit { SECOND, MILLI, MICRO, NANO }
}
| Method | Description | Default |
|---|---|---|
withTimestampUnit(TimestampUnit) |
Sets the unit for Timestamp columns: SECOND, MILLI, MICRO, or NANO. |
NANO |
withDatetimeUnit(TimestampUnit) |
Sets the unit for Datetime columns: SECOND, MILLI, MICRO, or NANO. |
MILLI |
Example:
ArrowOptions options = ArrowOptions.newBuilder()
.withDatetimeUnit(ArrowOptions.TimestampUnit.MILLI)
.withTimestampUnit(ArrowOptions.TimestampUnit.NANO)
.build();
Predicate
Predicate defines server-side filter expressions for predicate pushdown. Combine predicates to express complex filter conditions.
| Type | Class | Operators |
|---|---|---|
| Binary comparison | BinaryPredicate |
=, !=, >, <, >=, <= |
| Null check | UnaryPredicate |
IS NULL, IS NOT NULL |
| Set membership | InPredicate |
IN, NOT IN |
| Column reference | Attribute |
— |
| Literal value | Constant |
— |
| Compound | CompoundPredicate |
AND, OR, NOT |
Examples:
// c1 > 20000 AND c2 < 100000
BinaryPredicate c1 = new BinaryPredicate(BinaryPredicate.Operator.GREATER_THAN, Attribute.of("c1"), Constant.of(20000));
BinaryPredicate c2 = new BinaryPredicate(BinaryPredicate.Operator.LESS_THAN, Attribute.of("c2"), Constant.of(100000));
CompoundPredicate predicate = new CompoundPredicate(CompoundPredicate.Operator.AND, ImmutableList.of(c1, c2));
// c1 IS NOT NULL
Predicate notNull = new UnaryPredicate(UnaryPredicate.Operator.NOT_NULL, Attribute.of("c1"));
// c1 IN (1, 10001)
Predicate inList = new InPredicate(InPredicate.Operator.IN, Attribute.of("c1"),
ImmutableList.of(Constant.of(1), Constant.of(10001)));
EnvironmentSettings
EnvironmentSettings carries the connection configuration passed to each builder.
public class EnvironmentSettings {
public static Builder newBuilder();
public static class Builder {
public Builder withDefaultProject(String projectName);
public Builder withDefaultSchema(String schema);
public Builder withServiceEndpoint(String endpoint);
public Builder withTunnelEndpoint(String tunnelEndpoint);
public Builder withQuotaName(String quotaName);
public Builder withCredentials(Credentials credentials);
public Builder withRestOptions(RestOptions restOptions);
public EnvironmentSettings build();
}
}
| Method | Description |
|---|---|
withDefaultProject(String) |
Sets the MaxCompute project name. Find it in the MaxCompute console under Workspace > Projects. |
withDefaultSchema(String) |
Sets the default schema. For details, see Schema-related operations. |
withServiceEndpoint(String) |
Sets the service endpoint. For values by region, see Endpoints. |
withTunnelEndpoint(String) |
Sets the Tunnel endpoint. For values by region, see Endpoints. |
withQuotaName(String) |
Sets the quota name for the DTS resource group or Storage API. |
withCredentials(Credentials) |
Sets authentication credentials. Supports Account, AppAccount, and AppStsAccount. |
withRestOptions(RestOptions) |
Configures network options (user agent, connect timeout, read timeout). Default connect timeout: 10 seconds. Default read timeout: 120 seconds. |
TableBatchReadSession
TableBatchReadSession represents an active read session. Retrieve it from TableReadSessionBuilder.buildBatchReadSession().
public interface TableBatchReadSession {
String getId();
TableIdentifier getTableIdentifier();
SessionStatus getStatus();
DataSchema readSchema();
InputSplitAssigner getInputSplitAssigner() throws IOException;
SplitReader<ArrayRecord> createRecordReader(InputSplit split, ReaderOptions options) throws IOException;
SplitReader<VectorSchemaRoot> createArrowReader(InputSplit split, ReaderOptions options) throws IOException;
}
| Method | Description |
|---|---|
getId() |
Returns the session ID. Sessions expire after 24 hours. |
getTableIdentifier() |
Returns the table name for the session. |
getStatus() |
Returns the current session status: INIT (creating), NORMAL (ready), CRITICAL (failed), or EXPIRED (timed out). |
readSchema() |
Returns the DataSchema, which lists columns, partition keys, column names, and data types. |
getInputSplitAssigner() |
Returns the InputSplitAssigner for distributing splits across readers. |
createRecordReader(split, options) |
Creates a SplitReader<ArrayRecord> for record-format reads. |
createArrowReader(split, options) |
Creates a SplitReader<VectorSchemaRoot> for Apache Arrow format reads. |
InputSplitAssigner
InputSplitAssigner distributes splits across parallel readers.
| Method | Description |
|---|---|
getSplitsCount() |
Returns the number of splits. When using SplitByByteSize, the value is >= 0. |
getTotalRowCount() |
Returns the total row count. When using SplitByByteSize, the value is >= 0. |
getSplit(int index) |
Returns the split at the given index. Index range: [0, SplitsCount-1]. |
getSplitByRowOffset(long startIndex, long numRecord) |
Returns a split covering rows from startIndex for numRecord rows. startIndex range: [0, RecordCount-1]. |
Example:
// Distribute by byte-size splits
InputSplitAssigner assigner = scan.getInputSplitAssigner();
int splitCount = assigner.getSplitsCount();
for (int i = 0; i < splitCount; i++) {
InputSplit split = assigner.getSplit(i);
// process split
}
// Distribute by row offset
long totalRows = assigner.getTotalRowCount();
long recordsPerSplit = 10000;
for (long offset = 0; offset < totalRows; offset += recordsPerSplit) {
long count = Math.min(recordsPerSplit, totalRows - offset);
InputSplit split = assigner.getSplitByRowOffset(offset, count);
// process split
}
ReaderOptions
ReaderOptions controls batching and compression for a reader.
public class ReaderOptions {
public static ReaderOptions.Builder newBuilder();
public static class Builder {
public Builder withMaxBatchRowCount(int maxBatchRowCount);
public Builder withMaxBatchRawSize(long batchRawSize);
public Builder withCompressionCodec(CompressionCodec codec);
public Builder withBufferAllocator(BufferAllocator allocator);
public Builder withReuseBatch(boolean reuseBatch);
public Builder withSettings(EnvironmentSettings settings);
public ReaderOptions build();
}
}
| Method | Description | Default |
|---|---|---|
withMaxBatchRowCount(int) |
Maximum rows per batch returned by the server. | 4096 |
withMaxBatchRawSize(long) |
Maximum raw bytes per batch returned by the server. | — |
withCompressionCodec(CompressionCodec) |
Compression codec for data transfer. Supported values: ZSTD, LZ4_FRAME. |
No compression |
withBufferAllocator(BufferAllocator) |
Memory allocator for Arrow data reads. | — |
withReuseBatch(boolean) |
If true, the Arrow batch buffer is reused across reads. |
true |
withSettings(EnvironmentSettings) |
Runtime environment for the reader. | — |
SplitReader
SplitReader<T> reads data from a single split. Use SplitReader<ArrayRecord> for record format or SplitReader<VectorSchemaRoot> for Arrow format.
public interface SplitReader<T> {
boolean hasNext() throws IOException;
T get();
Metrics currentMetricsValues();
void close() throws IOException;
}
| Method | Description |
|---|---|
hasNext() |
Returns true if more data is available; false when the split is exhausted. |
get() |
Returns the current batch. Call hasNext() first to confirm data is available. |
currentMetricsValues() |
Returns read metrics for the current reader. |
close() |
Closes the reader and releases resources. Always call close() when done. |
What's next
-
Overview of the Storage API — architecture, concepts, and quota configuration
-
Schema-related operations — working with MaxCompute schemas
-
Endpoints — VPC endpoint addresses by region