All Products
Search
Document Center

DataWorks:Develop a PyODPS 3 job

Last Updated:Jun 20, 2026

DataWorks provides a PyODPS 3 node for writing and periodically running MaxCompute jobs in Python. This topic describes how to configure and schedule Python jobs using DataWorks.

Prerequisites

A PyODPS 3 node is created. For more information, see Create and manage MaxCompute nodes.

Background information

PyODPS is the Python SDK for MaxCompute. It provides a Python programming interface to write MaxCompute jobs, query tables and views, and manage resources. For more information, see PyODPS. In DataWorks, you can use a PyODPS node to schedule and run Python jobs and integrate them with other types of jobs.

Usage notes

  • If your PyODPS code requires third-party packages, you can install them by using a serverless resource group and a custom image.

    Note

    If your code includes a User-Defined Function (UDF) that references a third-party package, this method is not supported. For the correct configuration, see UDF example: Use third-party packages in Python UDFs.

  • To upgrade the PyODPS version, use a custom image to run the /home/tops/bin/pip3 install pyodps==0.12.1 command for a serverless resource group (you can replace 0.12.1 with the target PyODPS version), or use O&M Assistant to run the same command for an exclusive resource group for scheduling.

  • If your PyODPS job needs to access a special network environment, such as a data source or service in a VPC or an on-premises data center (IDC), use a serverless resource group and establish a network connection between the resource group and the target environment. For more information, see Network connectivity solutions.

  • For more information about the PyODPS syntax, see the PyODPS documentation.

  • PyODPS nodes are available in two types: PyODPS 2 and PyODPS 3. They use different underlying Python versions: PyODPS 2 nodes use Python 2, and PyODPS 3 nodes use Python 3. Ensure you create the node type that matches your Python version.

  • If executing SQL in a PyODPS node fails to generate correct data lineage, preventing it from appearing in Data Map, resolve the issue by manually setting the DataWorks scheduling and runtime parameters in your code. To learn how to view data lineage, see View data lineage. For parameter settings, see Set runtime parameter hints. You can obtain the required runtime parameters using the following sample code:

    import os
    ...
    # get DataWorks sheduler runtime parameters
    skynet_hints = {}
    for k, v in os.environ.items():
        if k.startswith('SKYNET_'):
            skynet_hints[k] = v
    ...
    # setting hints while submiting a task
    o.execute_sql('INSERT OVERWRITE TABLE XXXX SELECT * FROM YYYY WHERE ***', hints=skynet_hints)
    ...
  • The maximum size for the output log of a PyODPS node is 4 MB. Avoid printing large data results to the log. Instead, output only essential alert and progress information.

Limitations

  • When running a PyODPS node on an exclusive resource group for scheduling, do not process more than 50 MB of local data. This is due to the resource specifications of the exclusive resource group. Processing a large amount of local data that exceeds the operating system's threshold may cause an out-of-memory (OOM) error, indicated by a Got Killed message. Avoid writing extensive data processing code directly in the PyODPS node. For more information, see Best practices for efficient use of PyODPS.

  • When you run a PYODPS node by using a serverless resource group, you can configure the CUs for the node based on the amount of data it needs to process.

    Note

    When you run a task in a serverless resource group, a single task supports a maximum configuration of 64CU, but we recommend that you do not exceed 16CU to prevent resource shortages caused by an excessive CU value, which can impact task startup.

  • A Got killed error indicates that memory usage exceeded the limit, causing the process to terminate. To prevent this, avoid local data operations. This limitation does not apply to SQL or DataFrame jobs (except for to_pandas) initiated through PyODPS.

  • You can use the pre-installed Numpy and Pandas libraries for code that does not involve custom functions. Other third-party packages containing binary code are not supported.

  • For compatibility reasons, options.tunnel.use_instance_tunnel is set to False by default in DataWorks. If you need to enable the instance tunnel globally, you must manually set this value to True.

  • The definition of bytecode differs between minor versions of Python 3, such as Python 3.8 and Python 3.7.

    MaxCompute currently uses Python 3.7. An execution error will occur if you use syntax from other Python 3 versions, such as the finally block in Python 3.8. We recommend that you use Python 3.7.

  • PyODPS 3 supports running on a serverless resource group. To purchase and use one, see Use serverless resource groups.

  • Running multiple Python jobs concurrently within a single PyODPS node is not supported.

Edit code: Basic example

After you create a PyODPS node, you can edit and run your code. For more information about the PyODPS syntax, see Overview of basic operations.

  • ODPS entry point

    A DataWorks PyODPS node provides a global variable, named odps or o, as the ODPS entry point. You do not need to manually define it.

    print(odps.exist_table('PyODPS_iris'))
  • Execute SQL

    You can execute SQL statements in a PyODPS node. For more information, see SQL.

    • By default, the instance tunnel is disabled in DataWorks. This means instance.open_reader uses the Result interface, which reads a maximum of 10,000 records. You can use reader.count to get the number of records. To iterate through all data, you must disable the limit. Use the following statements to enable the instance tunnel globally and disable the limit.

      options.tunnel.use_instance_tunnel = True
      options.tunnel.limit_instance_tunnel = False  # Disable the limit to read all data.
      with instance.open_reader() as reader:
        # All data can be read through the instance tunnel.
    • You can also add tunnel=True to open_reader to enable the instance tunnel for the current open_reader call. Similarly, you can add limit=False to disable the limit restriction for the current call.

      # Use the Instance Tunnel interface for the current open_reader operation to read all data.
      with instance.open_reader(tunnel=True, limit=False) as reader:
  • Runtime parameters

    • Set runtime parameters using the hints parameter, which is a dict. For more information about hints, see SET operations.

      o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})
    • If you set sql.settings in the global configuration, these runtime parameters are added to every execution.

      from odps import options
      options.sql.settings = {'odps.sql.mapper.split.size': 16}
      o.execute_sql('select * from PyODPS_iris')  # This call includes hints from the global configuration.
  • Execution results

    An SQL execution instance can directly execute the open_reader operation in the following two scenarios:

    • The SQL statement returns structured data.

      with o.execute_sql('select * from dual').open_reader() as reader:
      	for record in reader:  # Process each record.
    • When executing statements like desc, you can retrieve the raw SQL execution result by using the reader.raw property.

      with o.execute_sql('desc dual').open_reader() as reader:
      	print(reader.raw)
      Note

      If you use custom scheduling parameters, you must hardcode the time when you directly trigger a PyODPS 3 node to run from the page. The PyODPS node cannot directly substitute this value.

  • DataFrame

    You can also process data using a DataFrame (not recommended).

    • Execution

      In the DataWorks environment, DataFrame operations must be explicitly triggered by calling an immediately executed method.

      from odps.df import DataFrame
      iris = DataFrame(o.get_table('pyodps_iris'))
      for record in iris[iris.sepal_width < 3].execute():  # Call an immediately executed method to process each record.

      If you need to trigger an immediate execution when printing, you must enable options.interactive.

      from odps import options
      from odps.df import DataFrame
      options.interactive = True  # Enable the switch at the beginning.
      iris = DataFrame(o.get_table('pyodps_iris'))
      print(iris.sepal_width.sum())  # An immediate execution is triggered when printing.
    • Print detailed information

      Set the options.verbose option. This option is enabled by default in DataWorks and prints detailed information, such as the Logview URL, during execution.

Example

The following example shows how to use a PyODPS node:

  1. Prepare the dataset and create the pyodps_iris sample table. For details, see Process data by using DataFrame.

  2. Create a DataFrame. For details, see Create a DataFrame from a MaxCompute table.

  3. Enter and run the following code in the PyODPS node.

    from odps.df import DataFrame
    # Create a DataFrame from an ODPS table.
    iris = DataFrame(o.get_table('pyodps_iris'))
    print(iris.sepallength.head(5))

    The following result is returned:

       sepallength
    0          4.5
    1          5.5
    2          4.9
    3          5.0
    4          6.0

Edit code: Advanced example

If the node needs to run periodically, you must define its scheduling properties. For more information, see Configure scheduling properties for a node.

Scheduling parameters

In the right-side pane of the node editor, click Scheduling Settings. In the Parameter section, configure custom parameters. The way variables are defined in a PyODPS node is different from how they are defined in an SQL node. For more information, see Configure scheduling parameters.

Unlike SQL nodes in DataWorks, PyODPS nodes do not replace strings such as ${param_name} in the code. Instead, a dict named args is added to the global variables before the code is executed. You can retrieve scheduling parameters from this dict. For example, if you set ds=${yyyymmdd} in Parameter, you can use the following method to retrieve this parameter in your code.

print('ds=' + args['ds'])
ds=20161116
Note

If you need to obtain the partition named ds, you can use the following method.

o.get_table('table_name').get_partition('ds=' + args['ds'])

For more information about developing PyODPS jobs for other use cases, see the following topics:

Next steps

FAQ

Q: I am using a PyODPS3 node to collect data from a third-party API, such as Lark, and import it into DataWorks. The code runs without issues in my local development environment, but reports a response timeout error when it is submitted to the production environment and executed in the Operation Center. Why?

A: In Management Center > Security Settings > Workspace. In the sandbox whitelist, add the domain name of the third-party API to grant the PyODPS 3 job access. For example:

Add the Lark API domain name open.feishu.cn and set the port to 443.