All Products
Search
Document Center

MaxCompute:Python SDK examples

Last Updated:Mar 26, 2026

The MaxCompute Storage API lets third-party engines — including Spark on EMR, StarRocks, Presto, PAI, and Hologres — read MaxCompute table data directly at high throughput. This page walks through Python code examples for connecting to MaxCompute and reading table data using the Storage API.

For the full example repository, see Python SDK examples.

Prerequisites

Before you begin, ensure that you have:

  • PyODPS installed. For detailed instructions, see Install PyODPS.

  • A MaxCompute project with a VPC endpoint. The Storage API endpoint currently supports only Alibaba Cloud VPC networks.

The Storage API endpoint currently supports only Alibaba Cloud VPC networks.

PyODPS is also available in managed environments:

  • DataWorks: The PyODPS node comes with PyODPS pre-installed. Develop and schedule PyODPS tasks directly on the PyODPS node. See Use PyODPS in DataWorks.

  • PAI: All built-in PAI images include PyODPS, enabling immediate use, such as in the custom Python widget of PAI-Designer. The way PyODPS is used in PAI Notebooks is much like how you would normally use it. See Overview of basic operations and DataFrame (not recommended).

PyODPS is the Python SDK for MaxCompute. For an overview, see PyODPS.

How it works

Reading table data through the Storage API follows four steps:

  1. Create a read session to specify the target partition and split strategy.

  2. Poll the session until its status is NORMAL.

  3. Read data in Apache Arrow record batches using the session ID.

Each step passes a session_id to the next, so the examples below build on each other in sequence.

Read table data

Step 1: Set up the connection

Create an ODPS client and an Apache Arrow-format Storage API client. All subsequent read operations use the get_arrow_client() function defined here.

import os
from odps import ODPS
from odps.apis.storage_api import *

# Load AccessKey credentials from environment variables.
# Do not hardcode credentials in your source code.
o = ODPS(
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    project='<your-project>',
    endpoint='<your-endpoint>'
)

# Name of the MaxCompute table to read
table = '<table-name>'
# Quota name for the Storage API exclusive resource group (subscription)
quota_name = '<quota-name>'

def get_arrow_client():
    odps_table = o.get_table(table)
    client = StorageApiArrowClient(odps=o, table=odps_table, quota_name=quota_name)
    return client

Replace the following placeholders:

Placeholder Description Example
<your-project> Name of your MaxCompute project my_project
<your-endpoint> VPC endpoint for the MaxCompute service http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api
<table-name> Name of the table to read my_table
<quota-name> Quota name for the Storage API exclusive resource group my_quota

To find your quota name, log on to the MaxCompute console, switch to your region, and go to Workspace > Quotas in the left-side navigation pane. See Manage quotas for computing resources in the MaxCompute console for details.

To enable the Storage API, go to Tenants > Tenant Property in the left-side navigation pane and turn on Storage API Switch.

Step 2: Create a read session

Submit a TableBatchScanRequest to start a read session. The split mode controls how MaxCompute partitions the data for parallel reads: SIZE splits by data volume, ROW_OFFSET splits by number of rows.

import logging
import sys
from odps.apis.storage_api import *
from util import *

logger = logging.getLogger(__name__)

def create_read_session(mode):
    client = get_arrow_client()
    req = TableBatchScanRequest(required_partitions=['pt=test_write_1'])

    if mode == 'size':
        req.split_options = SplitOptions.get_default_options(SplitOptions.SplitMode.SIZE)
    elif mode == 'row':
        req.split_options = SplitOptions.get_default_options(SplitOptions.SplitMode.ROW_OFFSET)

    resp = client.create_read_session(req)

    if resp.status != Status.OK:
        logger.info('Create read session failed')
        return

    logger.info('Read session id: ' + resp.session_id)

if __name__ == '__main__':
    logging.basicConfig(
        format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',
        level=logging.INFO
    )
    if len(sys.argv) != 2:
        raise ValueError('Please provide split mode: size|row')

    mode = sys.argv[1]
    if mode not in ('row', 'size'):
        raise ValueError('Please provide split mode: size|row')

    create_read_session(mode)

Copy the session_id from the log output — you need it in the next two steps.

Step 3: Verify the session status

Session creation is asynchronous and may take some time. Poll get_read_session until the status is SessionStatus.NORMAL before reading data.

import logging
import sys
from odps.apis.storage_api import *
from util import *

logger = logging.getLogger(__name__)

def check_session_status(session_id):
    client = get_arrow_client()
    req = SessionRequest(session_id=session_id)
    resp = client.get_read_session(req)

    if resp.status != Status.OK:
        logger.info('Get read session failed')
        return

    if resp.session_status == SessionStatus.NORMAL:
        logger.info('Read session id: ' + resp.session_id)
    else:
        logger.info('Session status is not expected')

if __name__ == '__main__':
    logging.basicConfig(
        format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',
        level=logging.INFO
    )
    if len(sys.argv) != 2:
        raise ValueError('Please provide session id')

    session_id = sys.argv[1]
    check_session_status(session_id)

When the log shows the session ID, the session is ready.

Step 4: Read data rows

Use the session ID to fetch data in Apache Arrow record batches. The script counts total rows as a basic verification.

import logging
import sys
from odps.apis.storage_api import *
from util import *

logger = logging.getLogger(__name__)

def read_rows(session_id):
    client = get_arrow_client()
    req = SessionRequest(session_id=session_id)
    resp = client.get_read_session(req)

    if resp.status != Status.OK and resp.status != Status.WAIT:
        logger.info('Get read session failed')
        return

    req = ReadRowsRequest(session_id=session_id)
    if resp.split_count == -1:
        # No splits: read all rows in a single range
        req.row_index = 0
        req.row_count = resp.record_count
    else:
        # Splits available: read from split 0
        req.split_index = 0

    reader = client.read_rows_arrow(req)
    total_line = 0
    while True:
        record_batch = reader.read()
        if record_batch is None:
            break
        total_line += record_batch.num_rows

    if reader.get_status() != Status.OK:
        logger.info('Read rows failed')
        return

    logger.info('Total line is: ' + str(total_line))

if __name__ == '__main__':
    logging.basicConfig(
        format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',
        level=logging.INFO
    )
    if len(sys.argv) != 2:
        raise ValueError('Please provide session id')

    session_id = sys.argv[1]
    read_rows(session_id)

When the read succeeds, the log prints the total number of rows read from the table.

The example above reads from split_index = 0. When split_count is greater than 0, the session contains multiple splits that can be distributed across threads or worker nodes. Iterate over all split indices (0 through split_count - 1) and assign each to a separate reader for parallel ingestion.

Complete example

The following script combines all four steps into a single runnable program. Replace the placeholders in the connection section with your own values before running.

import logging
import os
import sys
from odps import ODPS
from odps.apis.storage_api import *

logging.basicConfig(
    format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',
    level=logging.INFO
)
logger = logging.getLogger(__name__)

# Step 1: Set up the connection
o = ODPS(
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    project='<your-project>',
    endpoint='<your-endpoint>'
)

table = '<table-name>'
quota_name = '<quota-name>'

def get_arrow_client():
    odps_table = o.get_table(table)
    return StorageApiArrowClient(odps=o, table=odps_table, quota_name=quota_name)

# Step 2: Create a read session
def create_read_session(mode):
    client = get_arrow_client()
    req = TableBatchScanRequest(required_partitions=['pt=test_write_1'])

    if mode == 'size':
        req.split_options = SplitOptions.get_default_options(SplitOptions.SplitMode.SIZE)
    elif mode == 'row':
        req.split_options = SplitOptions.get_default_options(SplitOptions.SplitMode.ROW_OFFSET)

    resp = client.create_read_session(req)
    if resp.status != Status.OK:
        raise RuntimeError('Create read session failed')

    logger.info('Read session id: ' + resp.session_id)
    return resp.session_id

# Step 3: Verify the session status
def check_session_status(session_id):
    client = get_arrow_client()
    req = SessionRequest(session_id=session_id)
    resp = client.get_read_session(req)

    if resp.status != Status.OK:
        raise RuntimeError('Get read session failed')

    if resp.session_status == SessionStatus.NORMAL:
        logger.info('Session is ready: ' + resp.session_id)
    else:
        logger.info('Session status is not NORMAL yet: ' + str(resp.session_status))

    return resp.session_status == SessionStatus.NORMAL

# Step 4: Read data rows
def read_rows(session_id):
    client = get_arrow_client()
    req = SessionRequest(session_id=session_id)
    resp = client.get_read_session(req)

    if resp.status != Status.OK and resp.status != Status.WAIT:
        raise RuntimeError('Get read session failed')

    req = ReadRowsRequest(session_id=session_id)
    if resp.split_count == -1:
        req.row_index = 0
        req.row_count = resp.record_count
    else:
        req.split_index = 0

    reader = client.read_rows_arrow(req)
    total_line = 0
    while True:
        record_batch = reader.read()
        if record_batch is None:
            break
        total_line += record_batch.num_rows

    if reader.get_status() != Status.OK:
        raise RuntimeError('Read rows failed')

    logger.info('Total rows read: ' + str(total_line))
    return total_line

if __name__ == '__main__':
    if len(sys.argv) != 2:
        raise ValueError('Usage: python example.py <split-mode>  (size|row)')

    mode = sys.argv[1]
    if mode not in ('row', 'size'):
        raise ValueError('Split mode must be "size" or "row"')

    session_id = create_read_session(mode)
    ready = check_session_status(session_id)
    if not ready:
        logger.info('Session not ready — poll check_session_status until it returns True')
        sys.exit(1)
    read_rows(session_id)

Run the script with:

export ALIBABA_CLOUD_ACCESS_KEY_ID=<your-access-key-id>
export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<your-access-key-secret>
python example.py size

What's next