PyODPS nodes let you run Python scripts that interact with MaxCompute directly in DataWorks. Each node comes with a pre-initialized MaxCompute client, so you can query tables, execute SQL, and manage resources without any setup code.
PyODPS 2 nodes run Python 2.7. If your codebase targets Python 3, use a PyODPS 3 node instead.
Prerequisites
Before you begin, ensure that you have:
-
A PyODPS 2 node created in DataWorks. See Create and manage MaxCompute nodes
How it works
PyODPS is the MaxCompute SDK for Python. In DataWorks, each PyODPS node exposes a pre-initialized MaxCompute entry point — the global variable odps (also aliased as o). Write Python code against this object to query tables, run SQL, and manage MaxCompute resources.
When the node runs, DataWorks injects an args dictionary containing any scheduling parameters you have configured. Unlike SQL nodes, DataWorks does not substitute ${param_name} placeholders in PyODPS node code — all parameter access goes through args.
For full PyODPS API documentation, see PyODPS.
Write and run code
Use the MaxCompute entry point
The global variable odps (or o) is available in every PyODPS node. No client initialization is required.
print(odps.exist_table('PyODPS_iris'))
Execute SQL statements
Call execute_sql to run SQL. For full SQL support, see SQL.
Reading results: the 10,000-record default limit
By default, InstanceTunnel is disabled in DataWorks. When disabled, instance.open_reader uses the Result interface, which returns at most 10,000 records. Use reader.count to check how many records were returned.
To read beyond 10,000 records, enable InstanceTunnel. Choose one of the following options:
| Option | Scope | When to use |
|---|---|---|
| Option 1: Set globally | All open_reader calls in the node |
Most nodes — avoids repeating the flag on every call |
| Option 2: Set per call | Current open_reader call only |
When different calls in the same node need different limits |
Option 1: Enable globally
options.tunnel.use_instance_tunnel = True
options.tunnel.limit_instance_tunnel = False # Remove the 10,000-record limit
with instance.open_reader() as reader:
# Reads all data via InstanceTunnel
Option 2: Enable for a single call
with instance.open_reader(tunnel=True, limit=False) as reader:
# Reads all data for this call only
Retrieve SQL query results
For SQL statements that return structured data:
with o.execute_sql('select * from dual').open_reader() as reader:
for record in reader:
# Process each record
For statements like DESC that return raw text, use reader.raw:
with o.execute_sql('desc dual').open_reader() as reader:
print(reader.raw)
If you use a custom scheduling parameter and run the PyODPS 3 node on the configuration tab, set the parameter to a constant value. The scheduling parameter value in PyODPS nodes is not automatically substituted.
Configure runtime parameters for SQL
Pass runtime parameters to SQL using the hints parameter (type: dict). For supported hint keys, see SET operations.
Per-query hints:
o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})
Global hints (applied to all subsequent `execute_sql` calls):
from odps import options
options.sql.settings = {'odps.sql.mapper.split.size': 16}
o.execute_sql('select * from PyODPS_iris')
Use DataFrame to process data
DataFrame is not recommended for new development. Use SQL directly for most data processing tasks.
DataFrame API operations are lazy — they only execute when you call an immediately executed method.
from odps.df import DataFrame
iris = DataFrame(o.get_table('pyodps_iris'))
for record in iris[iris.sepal_width < 3].execute():
# Process each record
To trigger execution on display (for example, in a notebook-style flow), set options.interactive = True at the top of your script:
from odps import options
from odps.df import DataFrame
options.interactive = True
iris = DataFrame(o.get_table('pyodps_iris'))
print(iris.sepal_width.sum()) # Executes immediately when printed
By default, options.verbose is True in DataWorks, so the Logview URL and other progress details are printed automatically during execution.
End-to-end example
This example loads the pyodps_iris table and prints the first five values in the sepallength column.
-
Prepare the dataset. See DataFrame data processing for how to create the
pyodps_iristable. -
Create a DataFrame object. See Create a DataFrame object from a MaxCompute table.
-
Paste the following code into the node editor and run it:
from odps.df import DataFrame # Create a DataFrame object from a MaxCompute table iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepallength.head(5))Expected output:
sepallength 0 4.5 1 5.5 2 4.9 3 5.0 4 6.0
Configure scheduling parameters
For nodes that run on a schedule, configure scheduling parameters on the Properties tab of the node. In the Scheduling Parameter section, add parameters in the format key=${expression}.
Unlike SQL nodes, DataWorks does not substitute ${param_name} placeholders directly in PyODPS node code. Instead, DataWorks injects the evaluated values into a global args dictionary before the node runs. Retrieve them using args['key'].
For example, if you set ds=${yyyymmdd}:
print('ds=' + args['ds'])
# Output: ds=20161116
To get a partition by the scheduling date:
o.get_table('table_name').get_partition('ds=' + args['ds'])
For more on scheduling, see Scheduling overview and Configure scheduling parameters for different types of nodes.
Limitations
| Limitation | Detail | Workaround |
|---|---|---|
| Python version | PyODPS 2 nodes run Python 2.7. | Use a PyODPS 3 node for Python 3. |
| Concurrency | Only one Python task can run on a PyODPS 2 node at a time. | Use separate nodes for concurrent workloads. |
| Local data (exclusive resource group) | Process no more than 50 MB of data loaded from your local machine. Exceeding this limit may cause an out of memory (OOM) error ("Got killed"). Note: the memory limit does not apply to SQL or DataFrame tasks (excluding to_pandas tasks) that are initiated by PyODPS. |
Push heavy processing into MaxCompute SQL or DataFrame tasks. See Best practices for efficient use of PyODPS nodes. |
| Local data (serverless resource group) | A single task supports up to 64 CUs. | Stay at or below 16 CUs to avoid resource contention at startup. |
| Pre-installed libraries | NumPy and pandas are pre-installed and available for non-UDF code. Third-party packages that contain binary code are not supported. | For other packages, use a serverless resource group with a custom image (see Install third-party packages). |
| InstanceTunnel | options.tunnel.use_instance_tunnel defaults to False in DataWorks, limiting reads to 10,000 records. |
Set it to True to enable InstanceTunnel globally (see Execute SQL statements). |
| Output log size | Logs are capped at 4 MB. | Keep logs focused on alerts and progress — avoid printing large result sets. |
Usage notes
Install third-party packages
To use a third-party package in a PyODPS node:
-
Use a serverless resource group to run the node.
-
Create a custom image that includes the package.
This method does not apply to third-party packages referenced inside user-defined functions (UDFs). For UDF-specific instructions, see Example: Reference third-party packages in Python UDFs.
Upgrade the PyODPS version
-
Serverless resource group: Use the image management feature to run the following command on a PyODPS 3 node:
/home/tops/bin/pip3 install pyodps==0.12.1Replace0.12.1with the target version. See Custom images. -
Exclusive resource group for scheduling: Use the O&M Assistant feature to run the same command on a PyODPS 3 node. See O&M Assistant.
Access resources in a VPC or private network
To connect a PyODPS node to a data source in a virtual private cloud (VPC) or private data center, use a serverless resource group and establish a network connection between the resource group and the target. See Network connectivity solutions.
Fix missing data lineages
If SQL statements run in a PyODPS node do not generate data lineages in Data Map, pass DataWorks scheduler runtime parameters as hints when executing SQL:
import os
# Collect DataWorks scheduler runtime parameters
skynet_hints = {}
for k, v in os.environ.items():
if k.startswith('SKYNET_'):
skynet_hints[k] = v
# Pass hints when submitting a task
o.execute_sql('INSERT OVERWRITE TABLE XXXX SELECT * FROM YYYY WHERE ***', hints=skynet_hints)
For more on viewing data lineages, see View data lineages. For the hints parameter, see Configure the hints parameter.
What's next
-
Use a PyODPS node to download data to a local directory for processing or to process data online
-
Determine whether a custom Python script ran successfully — the logic is the same as for Shell scripts
-
Deploy the PyODPS 3 node — required if you use a workspace in standard mode and want the node scheduled in production
-
Perform O&M on the PyODPS 3 node — manage deployed nodes in Operation Center
-
PyODPS FAQ — troubleshoot common issues