All Products
Search
Document Center

MaxCompute:Java SDK examples

Last Updated:Mar 12, 2025

MaxCompute supports third-party engines such as Spark on EMR, StarRocks, Presto, PAI, and Hologres, enabling direct access to MaxCompute data through the Storage API by using the Java SDK. This topic provides code examples for accessing MaxCompute with the Java SDK.

Overview

The primary interfaces for Java SDK access to MaxCompute are listed below.

Main interface

Description

TableReadSessionBuilder

Used to create a MaxCompute table read session.

TableBatchReadSession

Represents a session for reading data from a MaxCompute table.

SplitReader

Used to read a data segment contained in a data session.

For Maven users, search for odps-sdk-table-api in the Maven repository to obtain different versions of the Java SDK. The configuration details are as follows.

<dependency>
	<groupId>com.aliyun.odps</groupId>
	<artifactId>odps-sdk-table-api</artifactId>
	<version>0.48.8-public</version>
</dependency>

MaxCompute offers storage APIs. For more information, see odps-sdk-table-api.

TableReadSessionBuilder

This interface is designed to create a read session for MaxCompute tables. The main interface definitions are as follows. For more details, see the Java-sdk-doc.

Interface definition

public class TableReadSessionBuilder {

    public TableReadSessionBuilder table(Table table);

    public TableReadSessionBuilder identifier(TableIdentifier identifier);

    public TableReadSessionBuilder requiredDataColumns(List<String> requiredDataColumns);

    public TableReadSessionBuilder requiredPartitionColumns(List<String> requiredPartitionColumns);

    public TableReadSessionBuilder requiredPartitions(List<PartitionSpec> requiredPartitions);

    public TableReadSessionBuilder requiredBucketIds(List<Integer> requiredBucketIds);

    public TableReadSessionBuilder withSplitOptions(SplitOptions splitOptions);

    public TableReadSessionBuilder withArrowOptions(ArrowOptions arrowOptions);

    public TableReadSessionBuilder withFilterPredicate(Predicate filterPredicate);

    public TableReadSessionBuilder withSettings(EnvironmentSettings settings);

    public TableReadSessionBuilder withSessionId(String sessionId);

    public TableBatchReadSession buildBatchReadSession();
}

Usage notes

Method name

Description

table(Table table)

Defines the passed parameter Table as the target table in the current session.

identifier(TableIdentifier identifier)

Defines the passed parameter TableIdentifier as the target table in the current session.

requiredDataColumns(List<String> requiredDataColumns)

Reads data of specified fields and ensures that the order of fields in the returned data is consistent with the order specified by the parameter requiredDataColumns. This is applicable to data field cropping scenarios.

Note

If the parameter requiredDataColumns is empty, all partition data will be returned.

requiredPartitionColumns(List<String> requiredPartitionColumns)

Reads data from specified columns in a specified partition of a given table. This is applicable in scenarios where partition pruning is performed.

Note

If the parameter requiredPartitionColumns is empty, all partition data will be returned.

requiredPartitions(List<PartitionSpec> requiredPartitions)

Reads data of specified partitions of a specified table. This is applicable to partition cropping scenarios.

Note

If the parameter requiredPartitions is empty, all partition data will be returned.

requiredBucketIds(List<Integer> requiredBucketIds)

Reads data of specified Buckets. This is only effective for clustered tables and is applicable to Bucket cropping scenarios.

Note

If the parameter requiredBucketIds is empty, all Bucket data will be returned.

withSplitOptions(SplitOptions splitOptions)

Splits table data. The SplitOptions object is defined as follows:

public class SplitOptions {

    public static SplitOptions.Builder newBuilder() {
        return new Builder();
    }

    public static class Builder {

      public SplitOptions.Builder SplitByByteSize(long splitByteSize);
  
      public SplitOptions.Builder SplitByRowOffset();
  
      public SplitOptions.Builder withCrossPartition(boolean crossPartition);
  
      public SplitOptions.Builder withMaxFileNum(int splitMaxFileNum);
  
      public SplitOptions build();
    }
}
  • SplitByByteSize(long splitByteSize): Splits data according to the specified parameter splitByteSize. The size of a single data segment returned by the server does not exceed splitByteSize (unit: bytes).

    Note
    • If you do not use SplitByByteSize(long splitByteSize) to customize the data splitting value, the system will split data by default according to 256×1024×1024 (256 MB).

    • The custom data splitting value cannot be less than 10×1024×1024 (10 MB).

  • SplitByRowOffset(): Splits data by row, allowing the client to read data from a specified row index.

  • withCrossPartition(boolean crossPartition): Specifies whether a single data segment is allowed to contain multiple data partitions. The parameter crossPartition has the following values:

    • true (default): Allows a single data segment to contain multiple data partitions.

    • false: Does not allow it to do so.

  • withMaxFileNum(int splitMaxFileNum): When the number of table files is large, you can specify the maximum number of physical files contained in a single data segment to generate more data segments.

    Note

    By default, there is no limit on the number of physical files contained in a single data segment.

  • build(): Creates a SplitOptions object.

Usage example

// 1. Split data by SplitSize, set SplitSize to 256MB

SplitOptions splitOptionsByteSize = 
      SplitOptions.newBuilder().SplitByByteSize(256 * 1024L * 1024L).build()

// 2. Split data by RowOffset

SplitOptions splitOptionsCount = 
      SplitOptions.newBuilder().SplitByRowOffset().build()

//3. Specify the maximum number of files contained in a single Split as 1

SplitOptions splitOptionsCount = 
      SplitOptions.newBuilder().SplitByRowOffset().withMaxFileNum(1).build()

withArrowOptions(ArrowOptions arrowOptions)

Specifies Arrow data options. The ArrowOptions object is defined as follows:

public class ArrowOptions {
    
    public static Builder newBuilder() {
        return new Builder();
    }

    public static class Builder {

        public Builder withTimestampUnit(TimestampUnit unit);

        public Builder withDatetimeUnit(TimestampUnit unit);

        public ArrowOptions build();
    }

    public enum TimestampUnit {
        SECOND,
        MILLI,
        MICRO,
        NANO;
    }
}
  • TimestampUnit: Specifies the unit of Timestamp and Datetime data types. The values are as follows:

    • SECOND: second (s).

    • MILLI: millisecond (ms).

    • MICRO: microsecond (μs).

    • NANO: nanosecond (ns).

  • withTimestampUnit(TimestampUnit unit): Specifies the unit of the Timestamp data type. The default unit is NANO.

  • withDatetimeUnit(TimestampUnit unit): Specifies the unit of the Datetime data type. The default unit is MILLI.

Usage example

ArrowOptions options = ArrowOptions.newBuilder()
          .withDatetimeUnit(ArrowOptions.TimestampUnit.MILLI)
          .withTimestampUnit(ArrowOptions.TimestampUnit.NANO)
          .build()

withFilterPredicate(Predicate filterPredicate)

Specifies the Predicate Pushdown option. Predicate is defined as follows:

// 1. Binary operation

public class BinaryPredicate extends Predicate {

  public enum Operator {
    /**
     * Binary operation operator
     */
    EQUALS("="),
    NOT_EQUALS("!="),
    GREATER_THAN(">"),
    LESS_THAN("<"),
    GREATER_THAN_OR_EQUAL(">="),
    LESS_THAN_OR_EQUAL("<=");
   
  }

  public BinaryPredicate(Operator operator, Serializable leftOperand, Serializable rightOperand);

  public static BinaryPredicate equals(Serializable leftOperand, Serializable rightOperand);

  public static BinaryPredicate notEquals(Serializable leftOperand, Serializable rightOperand);

  public static BinaryPredicate greaterThan(Serializable leftOperand, Serializable rightOperand);

  public static BinaryPredicate lessThan(Serializable leftOperand, Serializable rightOperand);

  public static BinaryPredicate greaterThanOrEqual(Serializable leftOperand,
                                                   Serializable rightOperand);

  public static BinaryPredicate lessThanOrEqual(Serializable leftOperand,
                                                Serializable rightOperand);
}

// 2. Unary operation
public class UnaryPredicate extends Predicate {

  public enum Operator {
    /**
     * Unary operation operator
     */
    IS_NULL("is null"),
    NOT_NULL("is not null");
  }

  public static UnaryPredicate isNull(Serializable operand);
  public static UnaryPredicate notNull(Serializable operand);
}

### 3. IN and NOT IN
public class InPredicate extends Predicate {

  public enum Operator {
    /**
     * IN and NOT IN operators for set membership check
     */
    IN("in"),
    NOT_IN("not in");
  }

  public InPredicate(Operator operator, Serializable operand, List<Serializable> set);

  public static InPredicate in(Serializable operand, List<Serializable> set);

  public static InPredicate notIn(Serializable operand, List<Serializable> set);
}

// 4. Column name
public class Attribute extends Predicate {

  public Attribute(Object value);
    
  public static Attribute of(Object value);
}

// 5. Constant
public class Constant extends Predicate {

  public Constant(Object value);

  public static Constant of(Object value);
}

// 6. Compound operation
public class CompoundPredicate extends Predicate {

  public enum Operator {
    /**
     * Compound predicate operator
     */
    AND("and"),
    OR("or"),
    NOT("not");
  }

  public CompoundPredicate(Operator logicalOperator, List<Predicate> predicates);

  public static CompoundPredicate and(Predicate... predicates);
  
  public static CompoundPredicate or(Predicate... predicates);

  public static CompoundPredicate not(Predicate predicates);

  public void addPredicate(Predicate predicate);
}

Usage example

// 1. c1 > 20000 and c2 < 100000
BinaryPredicate c1 = new BinaryPredicate(BinaryPredicate.Operator.GREATER_THAN, Attribute.of("c1"), Constant.of(20000));
BinaryPredicate c2 = new BinaryPredicate(BinaryPredicate.Operator.LESS_THAN, Attribute.of("c2"), Constant.of(100000));
CompoundPredicate predicate =
        new CompoundPredicate(CompoundPredicate.Operator.AND, ImmutableList.of(c1, c2));

// 2. c1 is not null
Predicate predicate = new UnaryPredicate(UnaryPredicate.Operator.NOT_NULL,  Attribute.of("c1"));

  
// 3. c1 in (1, 10001)
Predicate predicate =
        new InPredicate(InPredicate.Operator.IN,  Attribute.of("c1"), ImmutableList.of(Constant.of(1), Constant.of(10001)));

withSettings(EnvironmentSettings settings)

Specifies the runtime environment information. The EnvironmentSettings interface is defined as follows:

public class EnvironmentSettings {

    public static Builder newBuilder() {
        return new Builder();
    }

    public static class Builder {

        public Builder withDefaultProject(String projectName);

        public Builder withDefaultSchema(String schema);

        public Builder withServiceEndpoint(String endPoint);

        public Builder withTunnelEndpoint(String tunnelEndPoint);

        public Builder withQuotaName(String quotaName);

        public Builder withCredentials(Credentials credentials);

        public Builder withRestOptions(RestOptions restOptions);

        public EnvironmentSettings build();
    }
}
  • withDefaultProject(String projectName): Sets the current project name.

    Note

    The parameter projectName is the MaxCompute project name. You can log on to the MaxCompute console, switch the region in the upper left corner, and choose Workspace > Projects in the left-side navigation pane to view the specific MaxCompute project name.

  • withDefaultSchema(String schema): Sets the current default schema.

    Note

    The parameter schema is the MaxCompute Schema name. For more information about Schema, see Schema-related operations.

  • withServiceEndpoint(String endPoint): Sets the current service connection address Endpoint.

    Note

    For Endpoint information in each region, see Endpoints.

  • withTunnelEndpoint(String tunnelEndPoint): Sets the current service connection address TunnelEndpoint.

    Note

    For TunnelEndpoint information in each region, see Endpoints.

  • withQuotaName(String quotaName): Specifies the current Quota name in use.

    MaxCompute supports to access exclusive Data Transmission Service resource groups (subscription) resources. The methods to obtain the Quota name are as follows:

  • withCredentials(Credentials credentials): Specifies the current authentication information. Credentials is defined as follows:

    public class Credentials {
    
        public static Builder newBuilder() {
            return new Builder();
        }
    
        public static class Builder {
    
            public Builder withAccount(Account account);
    
            public Builder withAppAccount(AppAccount appAccount);
    
            public Builder withAppStsAccount(AppStsAccount appStsAccount);
    
            public Credentials build();
        }
    
    }
    • withAccount(Account account): Specifies the Odps Account object.

    • withAppAccount(AppAccount appAccount): Specifies the Odps appAccount object.

    • withAppStsAccount(AppStsAccount appStsAccount): Specifies the Odps appStsAccount object.

    • withRestOptions(RestOptions restOptions): Specifies the current network access configuration. RestOptions is defined as follows:

      public class RestOptions implements Serializable {
      
          public static Builder newBuilder() {
              return new RestOptions.Builder();
          }
      
          public static class Builder {
              public Builder witUserAgent(String userAgent);
              public Builder withConnectTimeout(int connectTimeout);
              public Builder withReadTimeout(int readTimeout);
              public RestOptions build();
          }
      }
      • witUserAgent(String userAgent): Specifies the current userAgent information.

      • withConnectTimeout(int connectTimeout): Specifies the current underlying network connection timeout. The default is 10 seconds.

      • withReadTimeout(int readTimeout): Specifies the current underlying network connection timeout, 120 seconds.

withSessionId(String sessionId)

Specifies the SessionID information for reloading an existing session.

buildBatchReadSession()

Creates or obtains a table read session. If the input parameter SessionID is provided, the created Session will be returned based on the SessionID. If no input parameter is provided, a new table read session will be created.

Note

The creation operation has a large overhead. When there are many files, it will take a long time to complete.

TableBatchReadSession

The TableBatchReadSession interface represents a session for reading from a MaxCompute table. The main interface definitions are as follows.

Interface definition

public interface TableBatchReadSession {

    String getId();

    TableIdentifier getTableIdentifier();

    SessionStatus getStatus();

    DataSchema readSchema();
    
    InputSplitAssigner getInputSplitAssigner() throws IOException;

    SplitReader<ArrayRecord> createRecordReader(InputSplit split, ReaderOptions options) throws IOException;

    SplitReader<VectorSchemaRoot> createArrowReader(InputSplit split, ReaderOptions options) throws IOException;    

}

Usage notes

Method name

Description

String getId()

Obtains the current session ID. The default timeout for reading the session ID is 24 hours (h).

getTableIdentifier()

Obtains the table name in the current session.

getStatus()

Obtains the current session status. The status values are as follows:

  • INIT: The initial value set when a session is created.

  • NORMAL: The session is successfully created.

  • CRITICAL: The session creation failed.

  • EXPIRED: The session timed out.

readSchema()

Obtains the table structure information of the current session. The DataSchema is defined as follows:

public class DataSchema implements Serializable {
    
    List<Column> getColumns();

    List<String> getPartitionKeys();

    List<String> getColumnNames();

    List<TypeInfo> getColumnDataTypes();

    Optional<Column> getColumn(int columnIndex);

    Optional<Column> getColumn(String columnName);

}
  • getColumns(): Obtains the Column information of the table and partition to be read.

  • getPartitionKeys(): Obtains the Column names of the partitions to be read.

  • getColumnNames(): Obtains the Column names of the table and partition to be read.

  • getColumnDataTypes(): Obtains the Column information of the table and partition to be read.

  • getColumn(int columnIndex): Obtains the Column object based on the index. If the index is out of the current Column range, it returns empty.

  • getColumn(String columnName): Obtains the Column object based on the parameter columnName. If the current table's Column does not contain columnName, it returns empty.

getInputSplitAssigner()

Obtains the InputSplitAssigner of the current session. The InputSplitAssigner interface defines the method for assigning InputSplit instances in the current read session. Each InputSplit represents a data segment that can be processed by a single SplitReader. The InputSplitAssigner is defined as follows:

public interface InputSplitAssigner {

    int getSplitsCount();

    long getTotalRowCount();

    InputSplit getSplit(int index);

    InputSplit getSplitByRowOffset(long startIndex, long numRecord);
}
  • getSplitsCount(): Obtains the number of data segments contained in the session.

    Note

    When SplitOptions is SplitByByteSize, the return value of this interface is greater than or equal to 0.

  • getTotalRowCount(): Obtains the number of data rows contained in the session.

    Note

    When SplitOptions is SplitByByteSize, the return value of this interface is greater than or equal to 0

  • getSplit(int index): Obtains the corresponding InputSplit based on the specified data segment parameter Index. The parameter index value range is: [0,SplitsCount-1].

  • getSplitByRowOffset(long startIndex, long numRecord): Obtains the corresponding InputSplit. The parameter descriptions are as follows:

    • startIndex: Specifies the starting index of the data row read by InputSplit. The value range is [0,RecordCount-1].

    • numRecord: Specifies the number of data rows read by InputSplit.

// 1. If SplitOptions is SplitByByteSize

TableBatchReadSession scan = ...;
InputSplitAssigner assigner = scan.getInputSplitAssigner();
int splitCount = assigner.getSplitsCount();
for (int k = 0; k < splitCount; k++) {
    InputSplit split = assigner.getSplit(k);
    ...
}

// 2. If SplitOptions is SplitByRowOffset
TableBatchReadSession scan = ...;
InputSplitAssigner assigner = scan.getInputSplitAssigner();
long rowCount = assigner.getTotalRowCount();
long recordsPerSplit = 10000;
for (long offset = 0; offset < numRecords; offset += recordsPerSplit) {
    recordsPerSplit = Math.min(recordsPerSplit, numRecords - offset);
    InputSplit split = assigner.getSplitByRowOffset(offset, recordsPerSplit);
    ...
}

createRecordReader(InputSplit split, ReaderOptions options)

Constructs a SplitReader<ArrayRecord> object. ReaderOptions is defined as follows:

public class ReaderOptions {
    
    public static ReaderOptions.Builder newBuilder() {
        return new Builder();
    }

    public static class Builder {

        public Builder withMaxBatchRowCount(int maxBatchRowCount);

        public Builder withMaxBatchRawSize(long batchRawSize);

        public Builder withCompressionCodec(CompressionCodec codec);

        public Builder withBufferAllocator(BufferAllocator allocator);

        public Builder withReuseBatch(boolean reuseBatch);

        public Builder withSettings(EnvironmentSettings settings);

        public ReaderOptions build();
    }

}
  • withMaxBatchRowCount(int maxBatchRowCount): Specifies the maximum number of rows in each batch of data returned by the server. The default maximum value of the parameter maxBatchRowCount is 4096.

  • withMaxBatchRawSize(long batchRawSize): Specifies the maximum number of raw bytes contained in each batch of data returned by the server.

  • withCompressionCodec(CompressionCodec codec): Specifies the data compression type. Only ZSTD and LZ4_FRAME compression types are supported.

    Note
    • When transmitting a large amount of uncompressed Arrow data directly, the data transmission time may increase significantly due to network bandwidth limitations.

    • If the compression type is not specified, data compression is not performed by default.

  • withBufferAllocator(BufferAllocator allocator): Specifies the memory allocator for reading Arrow data.

  • withReuseBatch(boolean reuseBatch): Specifies whether the ArrowBatch memory can be reused. The parameter reuseBatch has the following values:

    • true (default): The ArrowBatch memory can be reused.

    • false: The ArrowBatch memory cannot be reused.

  • withSettings(EnvironmentSettings settings): Specifies the runtime environment information.

createArrowReader(InputSplit split, ReaderOptions options)

Constructs a SplitReader<VectorSchemaRoot> object.

SplitReader

The SplitReader interface is used for reading data from MaxCompute tables.

Interface definition

public interface SplitReader<T> {

    boolean hasNext() throws IOException;

    T get();

    Metrics currentMetricsValues();

    void close() throws IOException;
}

Usage notes

Method name

Description

hasNext()

Checks whether there are more data items to read. If there is a next data item to read, it returns true. Otherwise, it returns false.

get()

Obtains the current data item. Before calling this method, you must ensure that there is a next element by using the hasNext() method.

currentMetricsValues()

Obtains the metrics related to SplitReader.

close()

Closes the connection after reading is complete.

Usage example

  1. Set up the environment to connect to the MaxCompute service.

    // AccessKey ID and AccessKey Secret of an Alibaba Cloud account or RAM user
    // The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations is a high-risk operation. We recommend that you use a RAM user to call API operations or perform routine O&M. To create a RAM user, log on to the RAM console
    // In this example, the AccessKey ID and AccessKey secret are stored in the environment variables. You can also save the AccessKey pair in the configuration file based on your business requirements
    // We recommend that you do not directly specify the AccessKey ID and AccessKey secret in code to prevent AccessKey pair leaks
    private static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    private static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    //Quota name used to access MaxCompute
    String quotaName = "<quotaName>";
    //MaxCompute project name
    String project = "<project>";
    //Create an Odps object to connect to MaxCompute service
    Account account = new AliyunAccount(accessId, accessKey);
    Odps odps = new Odps(account);
    odps.setDefaultProject(project);
    //Connection address of MaxCompute service. Only Alibaba Cloud VPC network is supported
    odps.setEndpoint(endpoint);
    Credentials credentials = Credentials.newBuilder().withAccount(odps.getAccount()).withAppAccount(odps.getAppAccount()).build();
    EnvironmentSettings settings = EnvironmentSettings.newBuilder().withCredentials(credentials).withServiceEndpoint(odps.getEndpoint()).withQuotaName(quotaName).build();
    Note
    • To obtain the quota names for the exclusive Data Transmission Service resource group (subscription) resources, follow these steps:

    • For job-level quota authorization, by default, all accounts, including Alibaba Cloud accounts and roles, lack permissions. Authorization is required. For details on authorization, see Authorization.

  2. Perform a table read operation.

    1. Create a data read session to access MaxCompute data.

      //Table name corresponding to the MaxCompute project
      String tableName = "<table.name>";
      //Create a table data read session
      TableReadSessionBuilder scanBuilder = new TableReadSessionBuilder();
      TableBatchReadSession scan = scanBuilder.identifier(TableIdentifier.of(project, tableName)).withSettings(settings)
              .withSplitOptions(SplitOptions.newBuilder()
                      .SplitByByteSize(256 * 1024L * 1024L)
                      .withCrossPartition(false).build())
              .requiredDataColumns(Arrays.asList("timestamp"))
              .requiredPartitionColumns(Arrays.asList("pt1"))
              .buildBatchReadSession();
      Note

      In scenarios with large data volumes, network latency, or instability, the creation of data read sessions may take a long time, leading to an automatic switch to an asynchronous process for session creation.

    2. Traverse over the MaxCompute data for each segment, using the Arrow reader to sequentially read and output the data content of each segment.

      //Traverse all input segment data and use the Arrow reader to read the data batch in each segment one by one, and finally output the content of each batch of data
      InputSplitAssigner assigner = scan.getInputSplitAssigner();
      for (InputSplit split : assigner.getAllSplits()) {
          SplitReader<VectorSchemaRoot> reader =
                  scan.createArrowReader(split, ReaderOptions.newBuilder()
                          .withSettings(settings)
                          .withCompressionCodec(CompressionCodec.ZSTD)
                          .withReuseBatch(true)
                          .build());
      
          int rowCount = 0;
          List<VectorSchemaRoot> batchList = new ArrayList<>();
          while (reader.hasNext()) {
              VectorSchemaRoot data = reader.get();
              rowCount += data.getRowCount();
              System.out.println(data.contentToTSVString());
          }
          reader.close();
      }

Reference

For more information about the introduction to MaxCompute Storage API , see Overview of storage API.