All Products
Search
Document Center

MaxCompute:Develop a PyODPS 3 task

Last Updated:Mar 26, 2026

DataWorks provides PyODPS 3 nodes so that you can directly write Python code for MaxCompute jobs to schedule MaxCompute jobs on a regular basis.

Prerequisites

Before you begin, ensure that you have:

Write and run node code

PyODPS is MaxCompute SDK for Python. In DataWorks, you can use PyODPS nodes to schedule Python tasks and integrate Python tasks with other types of tasks.

After creating a PyODPS 3 node, write and run Python code directly in the node editor. For the full PyODPS API reference, see Overview.

Use the MaxCompute entry point

In DataWorks, each PyODPS node includes the global variable odps or o, which is the MaxCompute entry point. The platform injects this variable automatically before your code runs, so you can use it directly — no manual initialization is needed.

print(odps.exist_table('PyODPS_iris'))

Execute SQL statements

Pass SQL to execute_sql directly on the entry point object.

By default, InstanceTunnel is disabled in DataWorks. In this mode, instance.open_reader uses the Result interface and reads up to 10,000 records. Use reader.count to get the number of records returned.

To read all records without the limit, enable InstanceTunnel:

Option 1: Enable globally (affects all `open_reader` calls in the session)

options.tunnel.use_instance_tunnel = True
options.tunnel.limit_instance_tunnel = False  # Remove the record limit

with instance.open_reader() as reader:
    # Reads all records via InstanceTunnel

Option 2: Enable per call

# Enable InstanceTunnel and remove the record limit for this call only
with instance.open_reader(tunnel=True, limit=False) as reader:
    ...

Configure runtime parameters

Use the hints parameter (type: dict) to pass runtime parameters to a SQL statement:

o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})

To apply the same hints to all SQL statements in the session, set them globally:

from odps import options
options.sql.settings = {'odps.sql.mapper.split.size': 16}
o.execute_sql('select * from PyODPS_iris')  # Uses global hints automatically

For a full list of supported hints, see SET operations.

Get SQL query results

Structured data (standard SELECT statements):

with o.execute_sql('select * from dual').open_reader() as reader:
    for record in reader:  # Process each record
        ...

Raw output (statements like DESC):

with o.execute_sql('desc dual').open_reader() as reader:
    print(reader.raw)
If you use a custom scheduling parameter and run the PyODPS 3 node from the configuration tab, set the scheduling parameter to a constant value. Custom scheduling parameter values are not automatically replaced in PyODPS nodes.

Use DataFrame

DataFrame (not recommended) API operations are lazy — they run only when you call an immediately executed method explicitly.

from odps.df import DataFrame
iris = DataFrame(o.get_table('pyodps_iris'))
for record in iris[iris.sepal_width < 3].execute():  # Triggers execution
    ...

To trigger execution automatically when displaying results, set options.interactive to True:

from odps import options
from odps.df import DataFrame
options.interactive = True  # Set at the top of the script
iris = DataFrame(o.get_table('pyodps_iris'))
print(iris.sepal_width.sum())  # Executes immediately

To show verbose details (such as the Logview URL) during execution, set options.verbose to True. This is enabled by default in DataWorks.

End-to-end example

  1. Prepare a dataset and create a table named pyodps_iris. See DataFrame data processing.

  2. Create a DataFrame object from the table. See Create a DataFrame object from a MaxCompute table.

  3. Enter the following code in the node editor and run it:

    from odps.df import DataFrame
    
    # Create a DataFrame object from a MaxCompute table
    iris = DataFrame(o.get_table('pyodps_iris'))
    print(iris.sepallength.head(5))

    Expected output:

       sepallength
    0          4.5
    1          5.5
    2          4.9
    3          5.0
    4          6.0

Schedule a node

To run a PyODPS 3 node on a recurring schedule, configure scheduling properties in the node's Properties tab. For an overview of scheduling options, see Overview.

Configure scheduling parameters

In the right-side navigation pane of the node configuration tab, click Properties. In the Scheduling Parameter section, define the parameters for this node.

Important

PyODPS nodes handle scheduling parameters differently from SQL nodes. In SQL nodes, strings such as ${param_name} are replaced directly in the code. In PyODPS nodes, no string replacement occurs — DataWorks injects a global dictionary named args before the code runs. Read parameter values from args.

Node type Parameter handling How to read values
SQL node ${param_name} replaced inline Reference directly in SQL
PyODPS 3 node No inline replacement Read from args['param_name']

For example, if you set ds=${yyyymmdd} in the Scheduling Parameter section, read it in code as:

print('ds=' + args['ds'])
# Output: ds=20161116

To read a table partition using the same parameter:

o.get_table('table_name').get_partition('ds=' + args['ds'])

For differences between PyODPS and SQL node parameter configuration, see Configure scheduling parameters for different types of nodes.

For more advanced PyODPS task development scenarios, see:

Limitations

Constraint Limit What happens if exceeded Action
On-premises data processed per node run (exclusive resource group) 50 MB Out of memory (OOM) error; system reports Got killed Process data in MaxCompute using SQL or DataFrame instead of pulling it to the node
CUs per task (serverless resource group) Max 64 CUs; 16 CUs recommended Potential resource shortage at task startup Keep CU allocation at or below 16 CUs
Concurrent tasks per PyODPS 3 node 1 N/A Do not run multiple PyODPS 3 tasks on a single node simultaneously
Output log size 4 MB Log truncation Include alert and progress logs rather than large volumes of data results

Additional constraints:

  • Memory limits do not apply to SQL or DataFrame tasks (excluding to_pandas tasks) initiated by PyODPS.

  • NumPy and pandas are pre-installed in DataWorks for use in non-UDF code. Third-party packages that contain binary code are not supported.

  • options.tunnel.use_instance_tunnel is set to False by default in DataWorks. To enable InstanceTunnel globally, set it to True.

  • MaxCompute is compatible with Python 3.7. Other Python 3 subversions may fail on certain syntax — for example, Python 3.8 fails on finally block syntax. Use Python 3.7 to avoid compatibility errors.

Usage notes

  • Third-party packages: To use a third-party package in a PyODPS node running on a DataWorks resource group, use a serverless resource group and create a custom image to install the package.

    This method does not apply to third-party packages referenced inside user-defined functions (UDFs) in node code. For UDF-specific guidance, see Example: Reference third-party packages in Python UDFs.
  • Upgrading PyODPS: Run /home/tops/bin/pip3 install pyodps==0.12.1 on the resource group, replacing 0.12.1 with the target version.

    • Serverless resource group: use the image management feature. For more information, see Custom images.

    • Exclusive resource group for scheduling: use the O&M Assistant feature. For more information, see O&M Assistant.

  • VPC and data center access: To access a data source or service in a virtual private cloud (VPC) or data center, run the node on a serverless resource group and establish a network connection between the resource group and the target. For more information, see Network connectivity solutions.

  • Data lineage: If SQL statements in a PyODPS node cannot generate data lineages and Data Map does not display them correctly, manually configure the scheduling parameters in node code using the hints mechanism. For more information, see View data lineages and Configure the hints parameter. The following example shows how to get the parameters required for running tasks:

    import os
    ...
    # Get DataWorks scheduler runtime parameters
    skynet_hints = {}
    for k, v in os.environ.items():
        if k.startswith('SKYNET_'):
            skynet_hints[k] = v
    ...
    # Pass hints when submitting a task
    o.execute_sql('INSERT OVERWRITE TABLE XXXX SELECT * FROM YYYY WHERE ***', hints=skynet_hints)
    ...
  • PyODPS version: PyODPS 2 nodes use Python 2; PyODPS 3 nodes use Python 3. Choose the version that matches the Python version in your code. For the full API reference, see PyODPS.

What's next

FAQ

When I run a PyODPS 3 node in Operation Center to collect data from a third-party app (such as Lark), the node times out — even though it runs successfully on my local machine. What's wrong?

The node is running in a sandbox that blocks outbound connections to external addresses by default. In the DataWorks console, go to Management Center > Workspace > Security Settings, and add the third-party app's address to the sandbox whitelist.

image