All Products
Search
Document Center

DataWorks:PyODPS 2 node

Last Updated:Mar 26, 2026

PyODPS is the Python SDK for MaxCompute. The PyODPS 2 node in DataWorks lets you write Python 2 code to query tables, run SQL, and manage MaxCompute resources—all within a scheduled workflow. For more information about PyODPS, see PyODPS. For full PyODPS API details, see PyODPS documentation.

Prerequisites

Before you begin, ensure that you have:

Limitations

ConstraintDetail
Local data size (Exclusive Resource Group for Scheduling)Keep local data under 50 MB. Exceeding the OS memory threshold causes an out-of-memory (OOM) error, shown as "Got killed". To prevent this, avoid performing data operations locally. For guidance, see Best practices for efficient PyODPS usage.
Compute Unit (CU) per task (Serverless Resource Group)Maximum 64 CU per task. Use no more than 16 CU to avoid task startup failures from insufficient resources.
Pre-installed librariesNumPy and pandas are pre-installed. Other third-party packages containing binary code are not supported. Pre-installed libraries cannot be used inside a user-defined function (UDF).
Concurrent Python tasksRunning multiple Python tasks concurrently within a single PyODPS node is not supported.
LoggingUse print to write logs. logger.info is not supported.
Instance Tunneloptions.tunnel.use_instance_tunnel is False by default. instance.open_reader uses the Result API and returns up to 10,000 records.
Python versionPython 2.7

SQL and DataFrame tasks submitted through PyODPS (except to_pandas) are not subject to the local data size limitation.

Usage notes

  • Third-party packages: Install third-party packages based on your resource group type:

  • Network access: To access a data source in a virtual private cloud (VPC) or an on-premises data center, use a Serverless Resource Group and establish a network connection between the resource group and the target environment. For details, see Network connectivity solutions.

  • Node types: PyODPS nodes come in two types—PyODPS 2 (Python 2.7) and PyODPS 3 (Python 3). Create the type that matches your Python version.

  • Log volume: Log only essential information such as warnings and progress updates. Avoid printing large amounts of data to the logs.

  • Data lineage: If an SQL statement fails to generate data lineage and the lineage is not visible in Data Map, manually configure the scheduling parameters (hints) in your task code. See View data lineage and Set runtime parameters (hints). Use the following code to retrieve the required runtime parameters:

    import os
    # Get DataWorks scheduler runtime parameters.
    skynet_hints = {}
    for k, v in os.environ.items():
        if k.startswith('SKYNET_'):
            skynet_hints[k] = v
    # Set hints when submitting a task.
    o.execute_sql('INSERT OVERWRITE TABLE XXXX SELECT * FROM YYYY WHERE ***', hints=skynet_hints)

Develop a PyODPS 2 node

ODPS entry point

Every PyODPS node includes a global variable odps (also aliased as o) that serves as the ODPS entry point. No initialization is needed.

print(odps.exist_table('PyODPS_iris'))

Execute SQL

For full SQL details, see SQL.

By default, Instance Tunnel is disabled. instance.open_reader uses the Result API, which returns up to 10,000 records. To read all records, enable Instance Tunnel and disable the limit restriction globally:

options.tunnel.use_instance_tunnel = True
options.tunnel.limit_instance_tunnel = False  # Disable the limit to read all data.

with instance.open_reader() as reader:
    # Read all data through Instance Tunnel.

To enable Instance Tunnel only for a single open_reader call, pass tunnel=True and limit=False directly:

# This call uses the Instance Tunnel API and can read all data.
with instance.open_reader(tunnel=True, limit=False) as reader:
    ...

Set runtime parameters

Pass runtime parameters using the hints parameter (a dict). For available hints, see SET operations.

o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})

To apply hints to every SQL execution in the node, set options.sql.settings globally:

from odps import options
options.sql.settings = {'odps.sql.mapper.split.size': 16}
o.execute_sql('select * from PyODPS_iris')  # Hints are added from the global configuration.

Read execution results

An instance that runs SQL supports open_reader directly in two cases:

  • Structured data: Iterate over records returned by a SELECT statement.

    with o.execute_sql('select * from dual').open_reader() as reader:
        for record in reader:  # Process each record.
            ...
  • Raw output: Use reader.raw to get the raw result of a DESC or similar statement.

    with o.execute_sql('desc dual').open_reader() as reader:
        print(reader.raw)
When manually running a PyODPS 2 node that uses custom scheduling parameters, hardcode the time value. The PyODPS node does not substitute parameters automatically.

DataFrame (not recommended)

DataWorks supports DataFrame, but it is not recommended. If you use it, call an immediate execution method explicitly—DataFrame does not execute automatically in DataWorks.

from odps.df import DataFrame
iris = DataFrame(o.get_table('pyodps_iris'))
for record in iris[iris.sepal_width < 3].execute():  # Call an immediate execution method.
    ...

To trigger execution when printing, enable options.interactive:

from odps import options
from odps.df import DataFrame
options.interactive = True  # Enable at the start of the script.
iris = DataFrame(o.get_table('pyodps_iris'))
print(iris.sepal_width.sum())  # Triggers immediate execution.

options.verbose is enabled by default in DataWorks, so execution details such as the Logview URL are printed automatically.

Code example: end-to-end workflow

The following example demonstrates a complete PyODPS workflow using the pyodps_iris sample table.

  1. Prepare the dataset by creating the pyodps_iris sample table. For instructions, see Use DataFrame to process data.

  2. Create a DataFrame object from the table. For instructions, see Create a DataFrame object from a MaxCompute table.

  3. Enter the following code in the PyODPS node:

    from odps.df import DataFrame
    
    # Create a DataFrame from a MaxCompute table.
    iris = DataFrame(o.get_table('pyodps_iris'))
    print(iris.sepallength.head(5))

Run a PyODPS task

  1. In the Run Configuration > Resource section, configure the compute engine instance, compute quota, and DataWorks Resource Group.

    To access a data source over the public network or in a VPC, use a Resource Group with network connectivity to that data source. For details, see Network connectivity solutions. Configure the Image parameter based on your task requirements.
  2. Click Run in the toolbar.

Schedule and access parameters

To run the node on a schedule, configure its scheduling properties. For details, see Node scheduling configuration.

How PyODPS nodes receive scheduling parameters: Unlike SQL nodes, PyODPS nodes do not substitute ${param_name} strings in code. Before execution, DataWorks injects a dict named args into the global scope. Read parameters from args directly.

For example, if Parameters is set to ds=${yyyymmdd}, retrieve the value as follows:

print('ds=' + args['ds'])
# ds=20240930

To use the parameter as a partition key:

o.get_table('table_name').get_partition('ds=' + args['ds'])

Publish and monitor

After configuring the node, publish it before it can run on schedule. For details, see Node and workflow deployment.

After publishing, go to Operation Center to monitor Periodic Task runs. For details, see Getting started with Operation Center.

Run a node with an associated role

Associate a RAM role with a node to enable fine-grained permission control.

What's next

PyODPS FAQ: Common issues and solutions for PyODPS execution errors.