All Products
Search
Document Center

DataWorks:PyODPS 3 node

Last Updated:Apr 21, 2026

DataWorks provides the PyODPS 3 node, which lets you write MaxCompute jobs directly in Python and schedule them to run periodically. This topic describes how to configure and schedule these Python jobs in DataWorks.

Introduction

PyODPS is the Python SDK for MaxCompute. It provides an easy-to-use programming interface that lets you write jobs, query tables and views, and manage MaxCompute resources by using Python. For more information, see PyODPS. In DataWorks, you can use a PyODPS node to schedule and run Python tasks and integrate them with other jobs.

Usage notes

  • When you run code in a PyODPS node on a DataWorks resource group, if the code needs to call a third-party package, you can use a serverless resource group to install the package by using a custom image.

    Note

    This method is not supported if your code includes a user-defined function (UDF) that references a third-party package. For the correct procedure, see UDF example: Use a third-party package in a Python UDF.

  • To upgrade the PyODPS version, you can use a custom image in a serverless resource group to run the /home/tops/bin/pip3 install pyodps==0.12.1 command. You can replace 0.12.1 with the target PyODPS version. For an exclusive resource group for scheduling, use O&M Assistant to run the same command.

  • If your PyODPS job needs to access a specific network environment, such as a data source or service in a VPC or an IDC network, use a serverless resource group. For more information about how to connect the serverless resource group to the target environment, see Network connectivity solutions.

  • For more information about PyODPS syntax, see the PyODPS documentation.

  • PyODPS nodes are available in two types: PyODPS 2 and PyODPS 3. They differ in their underlying Python versions. PyODPS 2 uses Python 2, and PyODPS 3 uses Python 3. Create the node type that matches the Python version you use.

  • If running SQL statements in a PyODPS node does not generate correct data lineage in Data Map, resolve this issue by manually setting the relevant DataWorks scheduling parameters in the job code. For information about how to view data lineage, see View data lineage. For information about how to set parameters, see Set runtime parameter hints. You can use the following sample code to obtain the parameters required at runtime.

    import os
    ...
    # get DataWorks scheduler runtime parameters
    skynet_hints = {}
    for k, v in os.environ.items():
        if k.startswith('SKYNET_'):
            skynet_hints[k] = v
    ...
    # setting hints while submitting a job
    o.execute_sql('INSERT OVERWRITE TABLE XXXX SELECT * FROM YYYY WHERE ***', hints=skynet_hints)
    ...
  • The output log for a PyODPS node has a maximum size of 4 MB. Avoid printing large amounts of data directly to the log. Instead, focus on outputting alert and progress logs to provide more valuable information.

Limitations

  • When you run a PyODPS node on an exclusive resource group for scheduling, we recommend that the amount of data processed locally within the node does not exceed 50 MB. This operation is limited by the specifications of the exclusive resource group for scheduling. If an excessive amount of local data is processed and the operating system threshold is exceeded, an OOM (Got Killed) error may occur. Avoid writing excessive data processing code in the PyODPS node. For more information, see Best Practices for Using PyODPS Efficiently.

  • When you use a serverless resource group to run a PyODPS node, you can configure an appropriate number of CUs for the node based on the amount of data that needs to be processed.

    Note

    If you run the job on a serverless resource group, a single job can be configured with a maximum of 64 CUs. However, we recommend that you do not exceed 16 CUs to prevent resource shortages that can affect job startup.

  • A Got killed error indicates that the process was terminated because it exceeded the memory limit. Therefore, avoid local data operations. SQL and DataFrame jobs, excluding to_pandas operations, initiated through PyODPS are not subject to this limitation.

  • You can use the pre-installed Numpy and Pandas libraries in code that does not involve user-defined functions. Other third-party packages that contain binary code are not supported.

  • For compatibility reasons, options.tunnel.use_instance_tunnel is set to False by default in DataWorks. If you need to enable instance tunnel globally, you must manually set this value to True.

  • The bytecode definition differs between minor versions of Python 3, such as Python 3.8 and Python 3.7.

    MaxCompute currently uses Python 3.7. If you use syntax from other Python 3 versions, such as the finally block from Python 3.8, an error will occur during execution. We recommend using Python 3.7.

  • PyODPS 3 supports running on a serverless resource group. To purchase and use one, see Use a serverless resource group.

  • You cannot configure multiple Python jobs to run concurrently within a single PyODPS node.

  • To print logs in a PyODPS node, use print. The use of logger.info is not supported.

Prerequisites

Associate a MaxCompute compute engine with your DataWorks workspace.

Procedure

  1. Develop your code on the editor page of the PyODPS 3 node.

    PyODPS 3 code examples

    After you create a PyODPS node, you can edit and run your code. For more information about PyODPS syntax, see Basic operations. This topic provides five types of code examples. You can choose the one that fits your business needs.

    ODPS entry point

    Each PyODPS node in DataWorks includes a global ODPS entry point variable, odps or o. You do not need to define it manually.

    print(odps.exist_table('PyODPS_iris'))

    Execute SQL

    You can run SQL statements in a PyODPS node. For more information, see SQL.

    • By default, instance tunnel is disabled in DataWorks, which means instance.open_reader uses the Result interface and returns a maximum of 10,000 records. You can use reader.count to get the number of records. If you need to iterate through all data, you must disable the limit. You can use the following statements to globally enable instance tunnel and disable the limit.

      options.tunnel.use_instance_tunnel = True
      options.tunnel.limit_instance_tunnel = False  # Disable the limit to read all data.
      
      with instance.open_reader() as reader:
        # All data can be read through Instance Tunnel.
    • You can also enable instance tunnel for a single open_reader call by adding tunnel=True to the open_reader call. You can also add limit=False to disable the limit restriction for the call.

      # Use Instance Tunnel for this open_reader call and read all data.
      with instance.open_reader(tunnel=True, limit=False) as reader:

    Set runtime parameters

    • You can set runtime parameters by using the hints parameter, which is a dict type. For more information about hints, see SET operations.

      o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})
    • If you set global configurations by using sql.settings, the relevant runtime parameters are added to every execution.

      from odps import options
      options.sql.settings = {'odps.sql.mapper.split.size': 16}
      o.execute_sql('select * from PyODPS_iris')  # Hints are added based on the global settings.

    Read execution results

    You can call the open_reader operation directly on an SQL execution instance. This supports two scenarios:

    • The SQL returns structured data.

      with o.execute_sql('select * from dual').open_reader() as reader:
      	for record in reader:  # Process each record.
    • If you execute SQL statements such as desc, you can use the reader.raw property to obtain the raw results of the SQL execution.

      with o.execute_sql('desc dual').open_reader() as reader:
      	print(reader.raw)
      Note

      If you use custom scheduling parameters and run a PyODPS 3 node directly from the UI, you must hardcode the time value because the node cannot substitute variables at runtime.

    DataFrame

    You can also process data by using DataFrame (not recommended).

    • Execution

      In the DataWorks environment, DataFrame operations require an explicit call to an immediate execution method.

      from odps.df import DataFrame
      iris = DataFrame(o.get_table('pyodps_iris'))
      for record in iris[iris.sepal_width < 3].execute():  # Call an immediate execution method to process each record.

      If you need to trigger immediate execution when printing, you must enable options.interactive.

      from odps import options
      from odps.df import DataFrame
      options.interactive = True  # Enable the option at the beginning.
      iris = DataFrame(o.get_table('pyodps_iris'))
      print(iris.sepal_width.sum())  # This triggers immediate execution.
    • Print detailed information

      The options.verbose option is enabled by default in DataWorks, causing detailed information such as the Logview URL to be printed during execution.

    PyODPS 3 code development

    The following simple example shows how to use a PyODPS node:

    1. Prepare a dataset by creating the pyodps_iris sample table. For more information, see Process DataFrame data.

    2. Create a DataFrame. For more information, see Create a DataFrame from a MaxCompute table.

    3. Enter the following code in the PyODPS node and run it.

      from odps.df import DataFrame
      
      # Create a DataFrame from an ODPS table.
      iris = DataFrame(o.get_table('pyodps_iris'))
      print(iris.sepallength.head(5))

    Run the PyODPS job

    1. In the Run Configuration pane, under the Compute Resource section, configure the Compute Resource, computing quota, and DataWorks Resource Group.

      Note
      • To access a data source over a public network or a VPC, you must use a scheduling resource group that has passed a connectivity test with the data source. For more information, see Network connectivity solutions.

      • You can configure the Image information based on the job requirements.

    2. In the parameters dialog box on the toolbar, select the created MaxCompute data source and click Run to run the PyODPS job.

  2. To run the node periodically, configure its scheduling properties based on your business requirements. For more information, see Configure node scheduling.

    Unlike SQL nodes in DataWorks, PyODPS nodes do not replace strings such as ${param_name} in the code. Instead, before the code is executed, a dictionary named args is added to the global variables, from which you can retrieve scheduling parameters. For example, if you set ds=${yyyymmdd} in the Parameter section, you can retrieve the parameter information in the code as follows.

    print('ds=' + args['ds'])
    ds=20240930
    Note

    If you need to obtain the partition named ds, you can use the following method.

    o.get_table('table_name').get_partition('ds=' + args['ds'])
  3. After the node is configured, you must deploy it. For more information, see Node and workflow deployment.

  4. After the job is deployed, you can view its status in Operation Center. For more information, see Get started with Operation Center.

Run a node by associating a role

You can associate a RAM role with a node to run node tasks. This provides fine-grained permission control and security management.

Next steps

FAQ about PyODPS: Learn about common issues that occur during PyODPS execution to quickly identify and resolve exceptions.