PyODPS 3 nodes let you write Python 3 code that reads and writes MaxCompute data, then schedule that code as part of a DataWorks pipeline. This topic walks you through writing, running, and scheduling a PyODPS 3 node.
Prerequisites
Before you begin, make sure you have:
-
A PyODPS 3 node created in DataWorks. For details, see Create and manage MaxCompute nodes
-
(Optional) A Serverless resource group, if your job needs to access VPC or IDC resources, or requires third-party packages
How it works
PyODPS is the Python SDK for MaxCompute. DataWorks wraps PyODPS into a schedulable node type so you can:
-
Query tables and run SQL against MaxCompute
-
Process data with the PyODPS DataFrame API
-
Schedule jobs to run on a recurring cadence and integrate them with other node types
When a PyODPS 3 node runs, DataWorks injects two global variables — odps and o — that serve as the MaxCompute entry point. No initialization code is needed.
# Check whether a table exists — odps and o are available without any setup
print(odps.exist_table('PyODPS_iris'))
Usage notes
Third-party packages
-
To use third-party packages on a Serverless resource group, install them via custom images.
-
This installation method does not support user-defined functions (UDFs) that reference third-party packages. For UDF dependencies, see UDF example: Use third-party packages in Python UDFs.
-
To upgrade PyODPS on a Serverless resource group, run the following command via custom images (replace
0.12.1with the target version):/home/tops/bin/pip3 install pyodps==0.12.1For an exclusive resource group for scheduling, run the same command via O&M (Operations and Maintenance) Assistant.
Network access
To access data sources in a VPC or IDC, use a Serverless resource group and configure network connectivity. See Network connectivity solution.
To access an external API (such as a third-party webhook), add the API endpoint to the sandbox whitelist: go to Management Center > Workspaces > Security Settings, then add the endpoint.
Node types
DataWorks supports both PyODPS 2 (Python 2) and PyODPS 3 (Python 3) nodes. Select the node type that matches your Python version.
Data lineage
SQL statements run inside a PyODPS node may not automatically generate data lineage in Data Map. To fix this, pass the SKYNET_ scheduling environment variables as hints when executing SQL:
import os
# Collect DataWorks scheduling runtime parameters
skynet_hints = {}
for k, v in os.environ.items():
if k.startswith('SKYNET_'):
skynet_hints[k] = v
# Pass them as hints so Data Map can track lineage
o.execute_sql('INSERT OVERWRITE TABLE target_table SELECT * FROM source_table WHERE dt=***', hints=skynet_hints)
For more on viewing lineage, see View lineage information. For the full list of hint settings, see Set runtime parameters (hints).
Logs
Output logs are capped at 4 MB. Use logs for alerts and progress tracking only — do not log large datasets.
Limitations
| Area | Limit | Details |
|---|---|---|
| Local data processing | 50 MB (exclusive resource group) | Exceeding this causes an out-of-memory (OOM) "Got killed" error. Run data-heavy operations via SQL or DataFrame instead of loading data locally. See Best practices for efficient PyODPS usage. |
| CU allocation (Serverless resource group) | Max 64 CU per task; 16 CU recommended | A compute unit (CU) is DataWorks' unit of processing capacity. Staying at 16 CU helps prevent resource contention that delays task execution. |
| Pre-installed packages | NumPy, Pandas | Non-UDF code can use these packages. Other binary third-party packages are not supported unless installed via custom images on a Serverless resource group. |
| Python version | Python 3.7 | MaxCompute currently supports Python 3.7. Different minor versions have different bytecode definitions — using Python 3.8-specific syntax (such as certain finally block behavior) causes errors. |
| Instance tunnel | Disabled by default | By default, instance.open_reader uses the Result interface, which returns at most 10,000 records. See Read execution results for how to enable the instance tunnel. |
| Concurrent tasks | Not supported | A single PyODPS node cannot run multiple Python tasks concurrently. |
| Serverless resource group | Supported for PyODPS 3 | PyODPS 3 supports running on Serverless resource groups. To purchase one, see Use Serverless resource groups. |
Write and run code
Execute SQL
Run SQL statements with o.execute_sql. By default, the instance tunnel is disabled and instance.open_reader returns at most 10,000 records.
To read all records, enable the instance tunnel globally:
from odps import options
options.tunnel.use_instance_tunnel = True
options.tunnel.limit_instance_tunnel = False # Disable the 10,000-record cap
with instance.open_reader() as reader:
# Reads all records via the instance tunnel
for record in reader:
...
Or enable it for a single open_reader call:
# Enable the instance tunnel and remove the record cap for this call only
with instance.open_reader(tunnel=True, limit=False) as reader:
...
Set runtime parameters (hints)
Pass a hints dict to configure MaxCompute runtime settings for a specific execution:
o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})
To apply hints globally across all executions, set options.sql.settings:
from odps import options
options.sql.settings = {'odps.sql.mapper.split.size': 16}
o.execute_sql('select * from PyODPS_iris') # Applies the global hints automatically
For available hint keys, see SET operations.
Read execution results
An instance returned by o.execute_sql can call open_reader directly:
-
For SQL that returns structured data (such as
SELECT):with o.execute_sql('select * from dual').open_reader() as reader: for record in reader: # Process each record -
For SQL that returns unstructured output (such as
DESC), usereader.raw:with o.execute_sql('desc dual').open_reader() as reader: print(reader.raw)
If you use custom scheduling parameters and manually trigger the node from the configuration page, hardcode any time values directly in the code. The node cannot perform parameter substitution in this mode.
Use DataFrame
All DataFrame operations in DataWorks require an explicit call to an immediate execution method:
from odps.df import DataFrame
iris = DataFrame(o.get_table('pyodps_iris'))
for record in iris[iris.sepal_width < 3].execute(): # .execute() triggers computation
...
To trigger execution automatically when printing, enable options.interactive at the start of your script:
from odps import options
from odps.df import DataFrame
options.interactive = True # Execution is triggered during print
iris = DataFrame(o.get_table('pyodps_iris'))
print(iris.sepal_width.sum())
options.verbose is enabled by default in DataWorks, so execution details such as the Logview URL are printed automatically.
End-to-end example
The following example reads five rows from the pyodps_iris sample table:
-
Create the
pyodps_irissample table. See Use DataFrame to process data. -
Create a DataFrame object from the table. See Create a DataFrame object from a MaxCompute table.
-
Paste the following code into the PyODPS node editor and run it.
from odps.df import DataFrame # Create a DataFrame from a MaxCompute table iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepallength.head(5))Expected output:
sepallength 0 4.5 1 5.5 2 4.9 3 5.0 4 6.0
Configure scheduling parameters
For periodic scheduling, click Properties in the node editor and configure parameters in the Parameters section.
Unlike SQL nodes, PyODPS nodes do not perform ${param_name} placeholder substitution — this is intentional, to avoid conflicts with Python string syntax. Instead, DataWorks injects a global args dictionary before execution. Retrieve parameter values from args:
# If you set ds=${yyyymmdd} in the Parameters section:
print('ds=' + args['ds'])
# Output: ds=20161116
To use a scheduling parameter as a partition value:
o.get_table('table_name').get_partition('ds=' + args['ds'])
For the full parameter configuration reference, see Scheduling parameter configuration.
For more advanced PyODPS development scenarios:
What's next
-
Determine task success: The success logic for PyODPS script tasks follows the same rules as Shell nodes.
-
Publish tasks: In standard mode, deploy your node to the production environment to enable periodic scheduling.
-
Periodic task O&M: Monitor and operate scheduled tasks in Operation Center.
-
PyODPS FAQ: Common issues and how to resolve them.
FAQ
My PyODPS 3 node works locally but times out in Operation Center when calling a third-party API (such as Feishu). Why?
The node runs inside a sandbox that blocks outbound requests by default. Go to Management Center > Workspaces > Security Settings, and add the API endpoint to the sandbox whitelist.