MaxCompute supports third-party engines such as Spark on EMR, StarRocks, Presto, PAI, and Hologres, enabling direct access to MaxCompute data through the SDK by invoking the Storage API. This topic provides code examples for accessing MaxCompute with the Python SDK.
MaxCompute offers storage APIs. For more information, see aliyun-odps-python-sdk.
Prerequisites
The code examples in this topic use PyODPS. To run the code locally, you'll need to have PyODPS installed. For detailed instructions, see Install PyODPS.
PyODPS is also supported in DataWorks and PAI Notebooks:
In DataWorks, the PyODPS node comes with PyODPS pre-installed. You can develop and schedule PyODPS tasks directly on the PyODPS node. For details, see Use PyODPS in DataWorks.
PAI's Python environment can install and run PyODPS. All built-in images of PAI include PyODPS, enabling immediate use, such as in the custom Python widget of PAI-Designer. The way PyODPS is used in PAI Notebooks is much like how you would normally use it. For more details, see Overview of Basic Operations and DataFrame (not recommended).
PyODPS is the Python version of the MaxCompute SDK. For more details, see PyODPS.
Usage examples
For a code example of how to access MaxCompute by using the Python SDK, see Python SDK Examples.
Set up the environment to connect to the MaxCompute service.
import os from odps import ODPS from odps.apis.storage_api import * # Set the environment variable ALIBABA_CLOUD_ACCESS_KEY_ID to the AccessKey ID of your Alibaba Cloud account. # Set the environment variable ALIBABA_CLOUD_ACCESS_KEY_SECRET to the AccessKey secret of your Alibaba Cloud account. # We recommend that you do not directly use the strings of your AccessKey ID and AccessKey secret. # The endpoint is the connection address of the MaxCompute service. Currently, only Alibaba Cloud VPC networks are supported. o = ODPS( os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='your-default-project', endpoint='your-end-point' ) # Name of the MaxCompute table table = "<table to access>" # Name of the quota used to access MaxCompute quota_name = "<quota name>" # Connect to and access the Alibaba Cloud MaxCompute service and create a Storage API object based on the Arrow format def get_arrow_client(): odps_table = o.get_table(table) client = StorageApiArrowClient(odps=o, table=odps_table, quota_name=quota_name) return clientNoteTo obtain the quota names for the Data Transmission Service exclusive resource group (subscription) , follow these steps:
Log on to the MaxCompute console. Once you've switched the region in the upper left corner, choose Workspace > Quotas in the left-side navigation pane to see a list of available quotas. For detailed instructions, see Manage quotas for computing resources in the MaxCompute console.
Storage API: Log on to the MaxCompute console. In the left-side navigation pane, choose Tenants > Tenant Property, then enable Storage API Switch.
Perform table read operations.
Initiate a data reading session to read MaxCompute data.
import logging import sys from odps.apis.storage_api import * from util import * logger = logging.getLogger(__name__) # Define the function create_read_session. The mode parameter is used to specify the sharding strategy used when scanning data. If the mode is size, sharding is performed by data size. If it is row, sharding is performed by the number of rows. def create_read_session(mode): client = get_arrow_client() req = TableBatchScanRequest(required_partitions=['pt=test_write_1']) if mode == "size": req.split_options = SplitOptions.get_default_options(SplitOptions.SplitMode.SIZE) elif mode == "row": req.split_options = SplitOptions.get_default_options(SplitOptions.SplitMode.ROW_OFFSET) resp = client.create_read_session(req) if resp.status != Status.OK: logger.info("Create read session failed") return logger.info("Read session id: " + resp.session_id) if __name__ == '__main__': logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO) if len(sys.argv) != 2: raise ValueError("Please provide split mode: size|row") mode = sys.argv[1] if mode != "row" and mode != "size": raise ValueError("Please provide split mode: size|row") create_read_session(mode)Establish a session to monitor and verify the data reading status.
import logging import sys import time from odps.apis.storage_api import * from util import * logger = logging.getLogger(__name__) # Ensure that the read session is successfully created and is in the ready state before performing data reading operations. def check_session_status(session_id): client = get_arrow_client() req = SessionRequest(session_id=session_id) resp = client.get_read_session(req) if resp.status != Status.OK: logger.info("Get read session failed") return # The session creation process may take a long time. You need to wait until the session status is NORMAL before reading data. if resp.session_status == SessionStatus.NORMAL: logger.info("Read session id: " + resp.session_id) else: logger.info("Session status is not expected") if __name__ == '__main__': logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO) if len(sys.argv) != 2: raise ValueError("Please provide session id") session_id = sys.argv[1] check_session_status(session_id)Read the data from MaxCompute.
# Read data rows from MaxCompute using the specified session_id and count the total number of data rows read. import logging import sys from odps.apis.storage_api import * from util import * logger = logging.getLogger(__name__) def read_rows(session_id): client = get_arrow_client() req = SessionRequest(session_id=session_id) resp = client.get_read_session(req) if resp.status != Status.OK and resp.status != Status.WAIT: logger.info("Get read session failed") return req = ReadRowsRequest(session_id=session_id) if resp.split_count == -1: req.row_index = 0 req.row_count = resp.record_count else: req.split_index = 0 reader = client.read_rows_arrow(req) total_line = 0 while True: record_batch = reader.read() if record_batch is None: break total_line += record_batch.num_rows if reader.get_status() != Status.OK: logger.info("Read rows failed") return logger.info("Total line is:" + str(total_line)) if __name__ == '__main__': logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO) if len(sys.argv) != 2: raise ValueError("Please provide session id") session_id = sys.argv[1] read_rows(session_id)
Reference
For more information on MaxCompute storage API, see Overview of storage API.