DataWorks provides the PyODPS 3 node, allowing you to write MaxCompute jobs directly in Python and configure periodic scheduling.
Overview
PyODPS is the Python Software Development Kit (SDK) for MaxCompute. Its simple programming interface lets you write jobs, query tables and views, and manage MaxCompute resources. For more information, see PyODPS. In DataWorks, you can use a PyODPS node to schedule and run Python tasks and integrate them with other jobs.
Considerations
When you run a PyODPS node on a DataWorks resource group, you can install a third-party package on a Serverless resource group if your code requires it. For more information, see Custom images.
NoteThis method does not support User-Defined Functions (UDFs) that reference a third-party package. For the correct configuration method, see UDF example: Use third-party packages in Python UDFs.
To upgrade the PyODPS version, run the Custom images command. You can replace
0.12.1with the version you want to upgrade to. On a Serverless resource group, run the command by following the instructions inInstall a third-party package. On an exclusive resource group for scheduling, follow the instructions in O&M Assistant.If your PyODPS task needs to access a specific network environment, such as a data source or service in a VPC network or an IDC, use a Serverless resource group. Configure network connectivity between the Serverless resource group and the target environment. For more information, see Network connectivity solutions.
For more information about PyODPS syntax, see the PyODPS documentation.
PyODPS nodes are available in two types, PyODPS 2 and PyODPS 3, which use Python 2 and Python 3, respectively. Create the node type that matches the Python version you use.
If SQL statements executed in a PyODPS node do not generate the correct data lineage in Data Map, you can manually set the runtime parameters for DataWorks scheduling in your task code. To view data lineage, see View data lineage. To set parameters, see Set runtime parameters (hints). You can use the following sample code to obtain the required runtime parameters.
import os ... # Get DataWorks scheduler runtime parameters skynet_hints = {} for k, v in os.environ.items(): if k.startswith('SKYNET_'): skynet_hints[k] = v ... # Set hints when you submit a task o.execute_sql('INSERT OVERWRITE TABLE XXXX SELECT * FROM YYYY WHERE ***', hints=skynet_hints) ...The maximum log output for a PyODPS node is 4 MB. Avoid printing large data results directly to the logs. Instead, focus on outputting valuable information, such as alert logs and progress updates.
Limitations
When you use an exclusive resource group for scheduling to run a PyODPS node, the amount of data processed locally within the node should not exceed 50 MB. This limit is determined by the specifications of the exclusive resource group for scheduling. Processing excessive local data can exceed the operating system's threshold and cause an Out of Memory (OOM) error, indicated by a `Got Killed` message. Avoid writing data-intensive processing logic in PyODPS nodes. For more information, see Best practices for using PyODPS efficiently.
When you use a Serverless resource group to run a PyODPS node, you can configure the CUs for the node based on the amount of data it needs to process.
NoteA single task can be configured with a maximum of
64CUs, but using more than16CUs is not recommended, as it can cause resource shortages that affect task startup.A Got Killed error indicates that the process was terminated due to an Out of Memory (OOM) condition. Therefore, try to minimize local data operations. SQL and DataFrame tasks initiated through PyODPS, except for
to_pandas, are not subject to this limitation.Code that is not part of a User-Defined Function (UDF) can use the pre-installed Numpy and Pandas packages. Other third-party packages that contain binary code are not supported.
For compatibility reasons,
options.tunnel.use_instance_tunnelis set toFalseby default in DataWorks. To globally enableinstance tunnel, you must manually set this value toTrue.The bytecode definitions differ between minor versions of Python 3, such as Python 3.8 and Python 3.7.
MaxCompute currently uses Python 3.7. Using syntax from other Python 3 versions, such as the
finallyblock in Python 3.8, will cause an execution error. We recommend that you use Python 3.7.PyODPS 3 can run on a Serverless resource group. To purchase and use this resource, see Use serverless resource groups.
A single PyODPS node does not support the concurrent execution of multiple Python tasks.
To print logs from a PyODPS node, use the
printfunction. Thelogger.infofunction is not currently supported.
Before you begin
Bind a MaxCompute compute resource to your DataWorks workspace.
Procedure
In the PyODPS 3 node editor, perform the following development steps.
PyODPS 3 code examples
After you create a PyODPS node, you can edit and run code. For more information about PyODPS syntax, see Overview. This section provides five code examples. Select the example that best suits your business needs.
ODPS entry point
DataWorks provides the global variable
odps(oro) as the ODPS entry point, so you do not need to define it manually.print(odps.exist_table('PyODPS_iris'))Execute SQL statements
You can execute SQL statements in a PyODPS node. For more information, see SQL.
In DataWorks,
instance tunnelis disabled by default. This meansinstance.open_readeruses the Result interface, which returns a maximum of 10,000 records. You can usereader.countto get the record count. To iterate through all data, you must disable thelimit. You can use the following statements to enableinstance tunnelglobally and disable thelimit.options.tunnel.use_instance_tunnel = True options.tunnel.limit_instance_tunnel = False # Disable the limit to read all data. with instance.open_reader() as reader: # You can read all data through the Instance Tunnel.Alternatively, you can add
tunnel=Trueto theopen_readercall to enableinstance tunnelonly for that specific operation. You can also addlimit=Falseto disable thelimitfor that operation.# Use the Instance Tunnel interface for this open_reader operation to read all data. with instance.open_reader(tunnel=True, limit=False) as reader:
Set runtime parameters
You can set runtime parameters by using the
hintsparameter, which is adicttype. For more information about hints, see SET operations.o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})If you configure
sql.settingsglobally, the specified runtime parameters are added to every execution.from odps import options options.sql.settings = {'odps.sql.mapper.split.size': 16} o.execute_sql('select * from PyODPS_iris') # Add hints based on the global configuration.
Reading execution results
An instance running an SQL statement can directly perform an
open_readeroperation in the following two scenarios:The SQL statement returns structured data.
with o.execute_sql('select * from dual').open_reader() as reader: for record in reader: # Process each record.The SQL statement is a command like
desc. You can use thereader.rawattribute to get the raw SQL execution result.with o.execute_sql('desc dual').open_reader() as reader: print(reader.raw)NoteWhen running a PyODPS 3 node directly from the editor, you must hardcode custom scheduling parameter values because the node does not automatically replace placeholders.
DataFrame
You can also use a DataFrame (not recommended) to process data.
Execution
In DataWorks, you must explicitly call an immediate execution method to execute a DataFrame.
from odps.df import DataFrame iris = DataFrame(o.get_table('pyodps_iris')) for record in iris[iris.sepal_width < 3].execute(): # Call an immediate execution method to process each Record.If you need to trigger immediate execution with a
printstatement, you must enableoptions.interactive.from odps import options from odps.df import DataFrame options.interactive = True # Enable the option at the beginning. iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepal_width.sum()) # Immediate execution is triggered by print().Printing detailed information
You can set the
options.verboseoption to print detailed information. In DataWorks, this option is enabled by default and prints details such as the Logview URL during execution.
Developing PyODPS 3 code
The following example shows how to use a PyODPS node:
Prepare a dataset. Create the pyodps_iris sample table. For instructions, see Use DataFrame to process data.
Create a DataFrame. For more information, see Create a DataFrame object from a MaxCompute table.
Enter the following code into the PyODPS node and run it.
from odps.df import DataFrame # Create a DataFrame from an ODPS table. iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepallength.head(5))
Run the PyODPS task
In the Run Configuration section, configure the Compute resources, Compute quota, and DataWorks resource group.
NoteTo access data sources in a public network or VPC network, you must use a resource group for scheduling that has passed the connectivity test with the data source. For more information, see Network connectivity solutions.
You can configure the Image based on the task requirements.
In the parameters dialog box in the toolbar, select the MaxCompute data source that you created and click Run.
To run the node task on a schedule, configure its scheduling properties based on your business requirements. For more information, see Node scheduling configuration.
Unlike SQL nodes in DataWorks, PyODPS nodes do not replace strings like `${param_name}` in the code to avoid unintended string substitution. Instead, before the code is executed, a
dictnamedargsis added to the global variables. You can retrieve scheduling parameters from thisdict. For example, if you set ds=${yyyymmdd} in theParameterstab, you can retrieve this parameter in your code as follows.print('ds=' + args['ds']) ds=20240930NoteTo get the partition named
ds, you can use the following method.o.get_table('table_name').get_partition('ds=' + args['ds'])After you configure the node task, you must deploy it. For more information, see Node and workflow deployment.
After the task is deployed, you can view the status of the scheduled task in the Operations Center. For more information, see Getting started with Operation Center.
Running a node with an associated role
You can associate a specific RAM role to run a node task. This enables fine-grained permission control and enhanced security.
Next steps
PyODPS FAQ: Learn about common PyODPS execution issues to help you quickly troubleshoot and resolve exceptions.