DataWorks provides the PyODPS 2 node type for developing PyODPS tasks using PyODPS syntax. As the Python SDK for MaxCompute, PyODPS lets you write and edit Python code on a PyODPS 2 node to interact directly with MaxCompute.
Overview
PyODPS is the Python SDK for MaxCompute. It provides a concise programming interface that enables you to use Python to write jobs, query tables and views, and manage MaxCompute resources. For more information, see PyODPS. In DataWorks, you can use a PyODPS node to schedule and run Python tasks and integrate them with other jobs.
Notes
If your code requires third-party packages when you run a PyODPS node on a DataWorks resource group, use a serverless resource group and a custom image to install the packages.
NoteThis method is not supported if your code uses a UDF that references a third-party package. For information about how to handle this scenario, see UDF example: Use third-party packages in Python UDFs.
If your PyODPS task needs to access a specific network environment, such as a data source or service in a VPC or an on-premises data center, use a serverless resource group and refer to Network connectivity solutions to establish the required network connection.
For more information about PyODPS syntax, see the PyODPS documentation.
PyODPS nodes are available in two types: PyODPS 2 and PyODPS 3. They differ in their underlying Python version. PyODPS 2 nodes use Python 2, and PyODPS 3 nodes use Python 3. Create the node type that corresponds to the Python version you use.
If executing SQL in a PyODPS node fails to generate data lineage, which prevents the data lineage from being displayed in Data Map, you can resolve this issue by manually setting the DataWorks scheduling parameters in the task code. To view data lineage, see View data lineage. For information about parameter settings, see Set runtime parameter hints. You can obtain the required runtime parameters using the following sample code.
import os # ... # get DataWorks sheduler runtime parameters skynet_hints = {} for k, v in os.environ.items(): if k.startswith('SKYNET_'): skynet_hints[k] = v # ... # setting hints while submiting a task o.execute_sql('INSERT OVERWRITE TABLE XXXX SELECT * FROM YYYY WHERE ***', hints=skynet_hints) # ...The output log for a PyODPS node supports a maximum size of 4 MB. Avoid printing large volumes of data directly to the log. Instead, log only essential warnings and progress updates.
Limitations
When you run a PyODPS node on an exclusive resource group for scheduling, we recommend that the data processed locally within the node does not exceed 50 MB. This operation is limited by the specifications of the exclusive resource group for scheduling. Processing too much local data that exceeds the operating system threshold can cause an OOM (got killed) error. Avoid writing too much data processing code in PyODPS nodes. For more information, see Best practices for efficient PyODPS usage.
When you run a PyODPS node on a serverless resource group, you can configure the CU allocation for the PyODPS node based on the volume of data to be processed.
NoteWhen you run this task on a serverless resource group, the maximum configuration for a single task is
64CU, but we recommend that you do not exceed16CUto avoid insufficient resources caused by excessive CU allocation, which may affect task startup.If you encounter a Got Killed error, it indicates that memory usage exceeded the limit and the process was terminated. Therefore, avoid local data operations as much as possible. SQL and DataFrame tasks initiated through PyODPS (except to_pandas) are not subject to this limitation.
Non-UDF code can use the pre-installed NumPy and Pandas packages. Other third-party packages that contain binary code are not supported.
Due to compatibility reasons, in DataWorks, options.tunnel.use_instance_tunnel is set to False by default. If you need to enable instance tunnel globally, you must manually set this value to True.
The underlying Python version for PyODPS 2 nodes is 2.7.
Running multiple Python tasks concurrently within a PyODPS node is not supported.
To print logs in a PyODPS node, use
print. Usinglogger.infois not supported.
Prerequisites
Associate a MaxCompute compute engine with your DataWorks workspace.
Procedure
On the PyODPS 2 node editing page, perform the following development operations.
PyODPS 2 code examples
After you create a PyODPS node, you can edit and run code. For more information about PyODPS syntax, see PyODPS documentation. This topic describes the following five code examples. You can select examples based on your business needs.
ODPS entry
In DataWorks PyODPS nodes, a global variable
odpsorois included as the ODPS entry. You do not need to manually define the ODPS entry.print(odps.exist_table('PyODPS_iris'))Execute SQL
You can execute SQL in a PyODPS node. For more information, see Execute SQL statements.
By default,
instance tunnelis not enabled on DataWorks, which meansinstance.open_readeruses the Result interface by default (up to 10,000 records). You can usereader.countto get the number of records. If you need to iterate over all data, you must disable thelimitrestriction. You can use the following statements to enableInstance Tunnelglobally and disable thelimitrestriction.options.tunnel.use_instance_tunnel = True options.tunnel.limit_instance_tunnel = False # Disable limit restriction, read all data. with instance.open_reader() as reader: # Use Instance Tunnel to read all data.You can also add
tunnel=Truetoopen_readerto enableinstance tunnelonly for the currentopen_readercall. You can also addlimit=Falseto disable thelimitrestriction only for the current call.# The open_reader uses the Instance Tunnel interface to read all data. with instance.open_reader(tunnel=True, limit=False) as reader:
Set runtime parameters
You can set runtime parameters by configuring the
hintsparameter, which is of thedicttype. For more information about the hints parameter, see SET operations.o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})After you configure
sql.settingsglobally, you need to add the relevant runtime parameters each time you run a task.from odps import options options.sql.settings = {'odps.sql.mapper.split.size': 16} o.execute_sql('select * from PyODPS_iris') # Add hints based on global configurationhints。
Read execution results
An instance that runs SQL can directly perform the
open_readeroperation. The following two scenarios are possible:The SQL statement returns structured data.
with o.execute_sql('select * from dual').open_reader() as reader: for record in reader: # Process eachrecord。The SQL statement may be a statement such as desc. You can use the
reader.rawproperty to obtain the raw SQL execution result.with o.execute_sql('desc dual').open_reader() as reader: print(reader.raw)NoteIf you use custom scheduling parameters, you must hardcode the time values when you directly trigger a PyODPS 2 node on the page because a PyODPS node cannot directly replace the parameters.
DataFrame
You can also use DataFrame to process data.
Execution
In the DataWorks environment, DataFrame execution requires explicit calls to methods that trigger immediate execution.
from odps.df import DataFrame iris = DataFrame(o.get_table('pyodps_iris')) for record in iris[iris.sepal_width < 3].execute(): # Execute immediately and process each Record.If you want to trigger immediate execution when using Print, you need to enable
options.interactive.from odps import options from odps.df import DataFrame options.interactive = True # Enable the switch at the beginning. iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepal_width.sum()) # Executes immediately on print.Print detailed information
You can configure the
options.verboseoption. On DataWorks, this option is enabled by default, and detailed information such as the Logview URL is printed during execution.
PyODPS 2 code development
The following simple example shows how to use a PyODPS node:
Prepare a dataset and create the pyodps_iris sample table. For more information, see Create a DataFrame.
Create a DataFrame. For more information, see Create a DataFrame.
Enter the following code in the PyODPS node.
from odps.df import DataFrame # Create DataFrame from ODPS table to create DataFrame. iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepallength.head(5))
Run a PyODPS task
In Run Configuration, configure Compute Resource, Compute Resource and Quota, and DataWorks Resource Group.
NoteTo access data sources in a public network or VPC network environment, use a scheduling resource group that has passed the connectivity test for the data source. For more information, see Network connectivity solutions.
You can configure the Image information based on the task requirements.
On the toolbar, click Run to run the PyODPS task.
If you need to run the node task on a periodic basis, configure the schedule settings based on your business needs. For more information, see Configure schedule settings.
Unlike SQL nodes in DataWorks, PyODPS nodes do not replace strings such as ${param_name} in the code to avoid affecting the code. Instead, before executing the code, a dict named
argsis added to the global variables, from which scheduling parameters can be retrieved. For example, if you setds=${yyyymmdd}in Parameter, you can retrieve the parameter in your code as follows.print('ds=' + args['ds']) ds=20240930NoteIf you need to obtain the partition named
ds, you can use the following method.o.get_table('table_name').get_partition('ds=' + args['ds'])After you complete the node task configuration, you need to deploy the node. For more information, see Deploy a node.
After you deploy the task, you can view the running status of the scheduled task in Operation Center. For more information, see View scheduled tasks.
Run a node by using an associated role
You can associate a RAM role to run a node, which allows you to run node tasks with a specific RAM role for fine-grained permission control and security management.
Next step
PyODPS FAQ: You can learn about common issues during PyODPS execution for quick troubleshooting.