The MaxCompute Storage API lets third-party engines — including Spark on EMR, StarRocks, Presto, PAI, and Hologres — read MaxCompute table data directly at high throughput. This page walks through Python code examples for connecting to MaxCompute and reading table data using the Storage API.
For the full example repository, see Python SDK examples.
Prerequisites
Before you begin, ensure that you have:
-
PyODPS installed. For detailed instructions, see Install PyODPS.
-
A MaxCompute project with a VPC endpoint. The Storage API endpoint currently supports only Alibaba Cloud VPC networks.
The Storage API endpoint currently supports only Alibaba Cloud VPC networks.
PyODPS is also available in managed environments:
-
DataWorks: The PyODPS node comes with PyODPS pre-installed. Develop and schedule PyODPS tasks directly on the PyODPS node. See Use PyODPS in DataWorks.
-
PAI: All built-in PAI images include PyODPS, enabling immediate use, such as in the custom Python widget of PAI-Designer. The way PyODPS is used in PAI Notebooks is much like how you would normally use it. See Overview of basic operations and DataFrame (not recommended).
PyODPS is the Python SDK for MaxCompute. For an overview, see PyODPS.
How it works
Reading table data through the Storage API follows four steps:
-
Create a read session to specify the target partition and split strategy.
-
Poll the session until its status is
NORMAL. -
Read data in Apache Arrow record batches using the session ID.
Each step passes a session_id to the next, so the examples below build on each other in sequence.
Read table data
Step 1: Set up the connection
Create an ODPS client and an Apache Arrow-format Storage API client. All subsequent read operations use the get_arrow_client() function defined here.
import os
from odps import ODPS
from odps.apis.storage_api import *
# Load AccessKey credentials from environment variables.
# Do not hardcode credentials in your source code.
o = ODPS(
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
project='<your-project>',
endpoint='<your-endpoint>'
)
# Name of the MaxCompute table to read
table = '<table-name>'
# Quota name for the Storage API exclusive resource group (subscription)
quota_name = '<quota-name>'
def get_arrow_client():
odps_table = o.get_table(table)
client = StorageApiArrowClient(odps=o, table=odps_table, quota_name=quota_name)
return client
Replace the following placeholders:
| Placeholder | Description | Example |
|---|---|---|
<your-project> |
Name of your MaxCompute project | my_project |
<your-endpoint> |
VPC endpoint for the MaxCompute service | http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api |
<table-name> |
Name of the table to read | my_table |
<quota-name> |
Quota name for the Storage API exclusive resource group | my_quota |
To find your quota name, log on to the MaxCompute console, switch to your region, and go to Workspace > Quotas in the left-side navigation pane. See Manage quotas for computing resources in the MaxCompute console for details.
To enable the Storage API, go to Tenants > Tenant Property in the left-side navigation pane and turn on Storage API Switch.
Step 2: Create a read session
Submit a TableBatchScanRequest to start a read session. The split mode controls how MaxCompute partitions the data for parallel reads: SIZE splits by data volume, ROW_OFFSET splits by number of rows.
import logging
import sys
from odps.apis.storage_api import *
from util import *
logger = logging.getLogger(__name__)
def create_read_session(mode):
client = get_arrow_client()
req = TableBatchScanRequest(required_partitions=['pt=test_write_1'])
if mode == 'size':
req.split_options = SplitOptions.get_default_options(SplitOptions.SplitMode.SIZE)
elif mode == 'row':
req.split_options = SplitOptions.get_default_options(SplitOptions.SplitMode.ROW_OFFSET)
resp = client.create_read_session(req)
if resp.status != Status.OK:
logger.info('Create read session failed')
return
logger.info('Read session id: ' + resp.session_id)
if __name__ == '__main__':
logging.basicConfig(
format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',
level=logging.INFO
)
if len(sys.argv) != 2:
raise ValueError('Please provide split mode: size|row')
mode = sys.argv[1]
if mode not in ('row', 'size'):
raise ValueError('Please provide split mode: size|row')
create_read_session(mode)
Copy the session_id from the log output — you need it in the next two steps.
Step 3: Verify the session status
Session creation is asynchronous and may take some time. Poll get_read_session until the status is SessionStatus.NORMAL before reading data.
import logging
import sys
from odps.apis.storage_api import *
from util import *
logger = logging.getLogger(__name__)
def check_session_status(session_id):
client = get_arrow_client()
req = SessionRequest(session_id=session_id)
resp = client.get_read_session(req)
if resp.status != Status.OK:
logger.info('Get read session failed')
return
if resp.session_status == SessionStatus.NORMAL:
logger.info('Read session id: ' + resp.session_id)
else:
logger.info('Session status is not expected')
if __name__ == '__main__':
logging.basicConfig(
format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',
level=logging.INFO
)
if len(sys.argv) != 2:
raise ValueError('Please provide session id')
session_id = sys.argv[1]
check_session_status(session_id)
When the log shows the session ID, the session is ready.
Step 4: Read data rows
Use the session ID to fetch data in Apache Arrow record batches. The script counts total rows as a basic verification.
import logging
import sys
from odps.apis.storage_api import *
from util import *
logger = logging.getLogger(__name__)
def read_rows(session_id):
client = get_arrow_client()
req = SessionRequest(session_id=session_id)
resp = client.get_read_session(req)
if resp.status != Status.OK and resp.status != Status.WAIT:
logger.info('Get read session failed')
return
req = ReadRowsRequest(session_id=session_id)
if resp.split_count == -1:
# No splits: read all rows in a single range
req.row_index = 0
req.row_count = resp.record_count
else:
# Splits available: read from split 0
req.split_index = 0
reader = client.read_rows_arrow(req)
total_line = 0
while True:
record_batch = reader.read()
if record_batch is None:
break
total_line += record_batch.num_rows
if reader.get_status() != Status.OK:
logger.info('Read rows failed')
return
logger.info('Total line is: ' + str(total_line))
if __name__ == '__main__':
logging.basicConfig(
format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',
level=logging.INFO
)
if len(sys.argv) != 2:
raise ValueError('Please provide session id')
session_id = sys.argv[1]
read_rows(session_id)
When the read succeeds, the log prints the total number of rows read from the table.
The example above reads fromsplit_index = 0. Whensplit_countis greater than 0, the session contains multiple splits that can be distributed across threads or worker nodes. Iterate over all split indices (0 throughsplit_count - 1) and assign each to a separate reader for parallel ingestion.
Complete example
The following script combines all four steps into a single runnable program. Replace the placeholders in the connection section with your own values before running.
import logging
import os
import sys
from odps import ODPS
from odps.apis.storage_api import *
logging.basicConfig(
format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
# Step 1: Set up the connection
o = ODPS(
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
project='<your-project>',
endpoint='<your-endpoint>'
)
table = '<table-name>'
quota_name = '<quota-name>'
def get_arrow_client():
odps_table = o.get_table(table)
return StorageApiArrowClient(odps=o, table=odps_table, quota_name=quota_name)
# Step 2: Create a read session
def create_read_session(mode):
client = get_arrow_client()
req = TableBatchScanRequest(required_partitions=['pt=test_write_1'])
if mode == 'size':
req.split_options = SplitOptions.get_default_options(SplitOptions.SplitMode.SIZE)
elif mode == 'row':
req.split_options = SplitOptions.get_default_options(SplitOptions.SplitMode.ROW_OFFSET)
resp = client.create_read_session(req)
if resp.status != Status.OK:
raise RuntimeError('Create read session failed')
logger.info('Read session id: ' + resp.session_id)
return resp.session_id
# Step 3: Verify the session status
def check_session_status(session_id):
client = get_arrow_client()
req = SessionRequest(session_id=session_id)
resp = client.get_read_session(req)
if resp.status != Status.OK:
raise RuntimeError('Get read session failed')
if resp.session_status == SessionStatus.NORMAL:
logger.info('Session is ready: ' + resp.session_id)
else:
logger.info('Session status is not NORMAL yet: ' + str(resp.session_status))
return resp.session_status == SessionStatus.NORMAL
# Step 4: Read data rows
def read_rows(session_id):
client = get_arrow_client()
req = SessionRequest(session_id=session_id)
resp = client.get_read_session(req)
if resp.status != Status.OK and resp.status != Status.WAIT:
raise RuntimeError('Get read session failed')
req = ReadRowsRequest(session_id=session_id)
if resp.split_count == -1:
req.row_index = 0
req.row_count = resp.record_count
else:
req.split_index = 0
reader = client.read_rows_arrow(req)
total_line = 0
while True:
record_batch = reader.read()
if record_batch is None:
break
total_line += record_batch.num_rows
if reader.get_status() != Status.OK:
raise RuntimeError('Read rows failed')
logger.info('Total rows read: ' + str(total_line))
return total_line
if __name__ == '__main__':
if len(sys.argv) != 2:
raise ValueError('Usage: python example.py <split-mode> (size|row)')
mode = sys.argv[1]
if mode not in ('row', 'size'):
raise ValueError('Split mode must be "size" or "row"')
session_id = create_read_session(mode)
ready = check_session_status(session_id)
if not ready:
logger.info('Session not ready — poll check_session_status until it returns True')
sys.exit(1)
read_rows(session_id)
Run the script with:
export ALIBABA_CLOUD_ACCESS_KEY_ID=<your-access-key-id>
export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<your-access-key-secret>
python example.py size
What's next
-
Storage API overview — architecture, quota configuration, and API reference
-
Storage API GitHub repository — full API source