DataWorks provides the PyODPS 3 node, which lets you write MaxCompute jobs directly in Python and schedule them to run periodically. This topic describes how to configure and schedule these Python jobs in DataWorks.
Introduction
PyODPS is the Python SDK for MaxCompute. It provides an easy-to-use programming interface that lets you write jobs, query tables and views, and manage MaxCompute resources by using Python. For more information, see PyODPS. In DataWorks, you can use a PyODPS node to schedule and run Python tasks and integrate them with other jobs.
Usage notes
When you run code in a PyODPS node on a DataWorks resource group, if the code needs to call a third-party package, you can use a serverless resource group to install the package by using a custom image.
NoteThis method is not supported if your code includes a user-defined function (UDF) that references a third-party package. For the correct procedure, see UDF example: Use a third-party package in a Python UDF.
To upgrade the PyODPS version, you can use a custom image in a serverless resource group to run the
/home/tops/bin/pip3 install pyodps==0.12.1command. You can replace0.12.1with the target PyODPS version. For an exclusive resource group for scheduling, use O&M Assistant to run the same command.If your PyODPS job needs to access a specific network environment, such as a data source or service in a VPC or an IDC network, use a serverless resource group. For more information about how to connect the serverless resource group to the target environment, see Network connectivity solutions.
For more information about PyODPS syntax, see the PyODPS documentation.
PyODPS nodes are available in two types: PyODPS 2 and PyODPS 3. They differ in their underlying Python versions. PyODPS 2 uses Python 2, and PyODPS 3 uses Python 3. Create the node type that matches the Python version you use.
If running SQL statements in a PyODPS node does not generate correct data lineage in Data Map, resolve this issue by manually setting the relevant DataWorks scheduling parameters in the job code. For information about how to view data lineage, see View data lineage. For information about how to set parameters, see Set runtime parameter hints. You can use the following sample code to obtain the parameters required at runtime.
import os ... # get DataWorks scheduler runtime parameters skynet_hints = {} for k, v in os.environ.items(): if k.startswith('SKYNET_'): skynet_hints[k] = v ... # setting hints while submitting a job o.execute_sql('INSERT OVERWRITE TABLE XXXX SELECT * FROM YYYY WHERE ***', hints=skynet_hints) ...The output log for a PyODPS node has a maximum size of 4 MB. Avoid printing large amounts of data directly to the log. Instead, focus on outputting alert and progress logs to provide more valuable information.
Limitations
When you run a PyODPS node on an exclusive resource group for scheduling, we recommend that the amount of data processed locally within the node does not exceed 50 MB. This operation is limited by the specifications of the exclusive resource group for scheduling. If an excessive amount of local data is processed and the operating system threshold is exceeded, an OOM (Got Killed) error may occur. Avoid writing excessive data processing code in the PyODPS node. For more information, see Best Practices for Using PyODPS Efficiently.
When you use a serverless resource group to run a PyODPS node, you can configure an appropriate number of CUs for the node based on the amount of data that needs to be processed.
NoteIf you run the job on a serverless resource group, a single job can be configured with a maximum of
64 CUs. However, we recommend that you do not exceed16 CUsto prevent resource shortages that can affect job startup.A Got killed error indicates that the process was terminated because it exceeded the memory limit. Therefore, avoid local data operations. SQL and DataFrame jobs, excluding to_pandas operations, initiated through PyODPS are not subject to this limitation.
You can use the pre-installed Numpy and Pandas libraries in code that does not involve user-defined functions. Other third-party packages that contain binary code are not supported.
For compatibility reasons,
options.tunnel.use_instance_tunnelis set toFalseby default in DataWorks. If you need to enableinstance tunnelglobally, you must manually set this value toTrue.The bytecode definition differs between minor versions of Python 3, such as Python 3.8 and Python 3.7.
MaxCompute currently uses Python 3.7. If you use syntax from other Python 3 versions, such as the
finally blockfrom Python 3.8, an error will occur during execution. We recommend using Python 3.7.PyODPS 3 supports running on a serverless resource group. To purchase and use one, see Use a serverless resource group.
You cannot configure multiple Python jobs to run concurrently within a single PyODPS node.
To print logs in a PyODPS node, use
print. The use oflogger.infois not supported.
Prerequisites
Associate a MaxCompute compute engine with your DataWorks workspace.
Procedure
Develop your code on the editor page of the PyODPS 3 node.
PyODPS 3 code examples
After you create a PyODPS node, you can edit and run your code. For more information about PyODPS syntax, see Basic operations. This topic provides five types of code examples. You can choose the one that fits your business needs.
ODPS entry point
Each PyODPS node in DataWorks includes a global ODPS entry point variable,
odpsoro. You do not need to define it manually.print(odps.exist_table('PyODPS_iris'))Execute SQL
You can run SQL statements in a PyODPS node. For more information, see SQL.
By default,
instance tunnelis disabled in DataWorks, which meansinstance.open_readeruses the Result interface and returns a maximum of 10,000 records. You can usereader.countto get the number of records. If you need to iterate through all data, you must disable thelimit. You can use the following statements to globally enableinstance tunneland disable thelimit.options.tunnel.use_instance_tunnel = True options.tunnel.limit_instance_tunnel = False # Disable the limit to read all data. with instance.open_reader() as reader: # All data can be read through Instance Tunnel.You can also enable
instance tunnelfor a singleopen_readercall by addingtunnel=Trueto theopen_readercall. You can also addlimit=Falseto disable thelimitrestriction for the call.# Use Instance Tunnel for this open_reader call and read all data. with instance.open_reader(tunnel=True, limit=False) as reader:
Set runtime parameters
You can set runtime parameters by using the
hintsparameter, which is adicttype. For more information about hints, see SET operations.o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})If you set global configurations by using
sql.settings, the relevant runtime parameters are added to every execution.from odps import options options.sql.settings = {'odps.sql.mapper.split.size': 16} o.execute_sql('select * from PyODPS_iris') # Hints are added based on the global settings.
Read execution results
You can call the
open_readeroperation directly on an SQL execution instance. This supports two scenarios:The SQL returns structured data.
with o.execute_sql('select * from dual').open_reader() as reader: for record in reader: # Process each record.If you execute SQL statements such as
desc, you can use thereader.rawproperty to obtain the raw results of the SQL execution.with o.execute_sql('desc dual').open_reader() as reader: print(reader.raw)NoteIf you use custom scheduling parameters and run a PyODPS 3 node directly from the UI, you must hardcode the time value because the node cannot substitute variables at runtime.
DataFrame
You can also process data by using DataFrame (not recommended).
Execution
In the DataWorks environment, DataFrame operations require an explicit call to an immediate execution method.
from odps.df import DataFrame iris = DataFrame(o.get_table('pyodps_iris')) for record in iris[iris.sepal_width < 3].execute(): # Call an immediate execution method to process each record.If you need to trigger immediate execution when printing, you must enable
options.interactive.from odps import options from odps.df import DataFrame options.interactive = True # Enable the option at the beginning. iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepal_width.sum()) # This triggers immediate execution.Print detailed information
The
options.verboseoption is enabled by default in DataWorks, causing detailed information such as the Logview URL to be printed during execution.
PyODPS 3 code development
The following simple example shows how to use a PyODPS node:
Prepare a dataset by creating the pyodps_iris sample table. For more information, see Process DataFrame data.
Create a DataFrame. For more information, see Create a DataFrame from a MaxCompute table.
Enter the following code in the PyODPS node and run it.
from odps.df import DataFrame # Create a DataFrame from an ODPS table. iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepallength.head(5))
Run the PyODPS job
In the Run Configuration pane, under the Compute Resource section, configure the Compute Resource, computing quota, and DataWorks Resource Group.
NoteTo access a data source over a public network or a VPC, you must use a scheduling resource group that has passed a connectivity test with the data source. For more information, see Network connectivity solutions.
You can configure the Image information based on the job requirements.
In the parameters dialog box on the toolbar, select the created MaxCompute data source and click Run to run the PyODPS job.
To run the node periodically, configure its scheduling properties based on your business requirements. For more information, see Configure node scheduling.
Unlike SQL nodes in DataWorks, PyODPS nodes do not replace strings such as ${param_name} in the code. Instead, before the code is executed, a dictionary named
argsis added to the global variables, from which you can retrieve scheduling parameters. For example, if you setds=${yyyymmdd}in the Parameter section, you can retrieve the parameter information in the code as follows.print('ds=' + args['ds']) ds=20240930NoteIf you need to obtain the partition named
ds, you can use the following method.o.get_table('table_name').get_partition('ds=' + args['ds'])After the node is configured, you must deploy it. For more information, see Node and workflow deployment.
After the job is deployed, you can view its status in Operation Center. For more information, see Get started with Operation Center.
Run a node by associating a role
You can associate a RAM role with a node to run node tasks. This provides fine-grained permission control and security management.
Next steps
FAQ about PyODPS: Learn about common issues that occur during PyODPS execution to quickly identify and resolve exceptions.