PyODPS 2 nodes let you write Python code directly in DataWorks to process MaxCompute data using the PyODPS Python SDK.
Prerequisites
Before you begin, ensure that you have:
-
A PyODPS 2 node created. See Create and manage MaxCompute nodes
How it works
PyODPS is the Python SDK for MaxCompute. In DataWorks, each PyODPS node comes with a pre-injected global variable odps (also aliased as o) that serves as the MaxCompute entry point. You write Python code against this entry point to query tables, run SQL statements, manage resources, and process data. DataWorks also injects a global args dictionary so your code can read scheduling parameters at runtime.
PyODPS 2 nodes and PyODPS 3 nodes differ only at the Python layer: PyODPS 2 uses Python 2.7 and PyODPS 3 uses Python 3.
Limits
| Constraint | Details |
|---|---|
| Python version | 2.7 |
| Local data processing (exclusive resource group) | Keep under 50 MB. Exceeding this limit may cause an out of memory (OOM) error and the process is terminated with Got killed |
| CUs (serverless resource group) | Up to 64 CUs per task; stay within 16 CUs to avoid resource shortages at startup |
| Concurrent Python tasks | One task at a time per node |
| Output log size | Up to 4 MB |
| Third-party packages with binary code | Not supported |
| Pre-installed libraries | NumPy and pandas (usable outside user-defined functions (UDFs)) |
| InstanceTunnel | Disabled by default; set options.tunnel.use_instance_tunnel = True to enable globally |
The 50 MB memory limit applies only to local data operations. SQL and DataFrame tasks (excluding to_pandas) initiated by PyODPS are not subject to this limit.
Usage notes
Third-party packages
To use a third-party package in a PyODPS node, use a serverless resource group and create a custom image that includes the package.
If your UDF code needs a third-party package, the custom image approach does not apply. See Example: Reference third-party packages in Python UDFs instead.
Network access
To access a data source in a virtual private cloud (VPC) or on-premises data center, run the node on a serverless resource group and set up a network connection between the resource group and the data source. See Network connectivity solutions.
Upgrading PyODPS
-
On a serverless resource group: use the image management feature to run
/home/tops/bin/pip3 install pyodps==0.12.1on a PyODPS 3 node (replace0.12.1with your target version). See Manage images. -
On an exclusive resource group for scheduling: use the O&M Assistant feature to run the same command on a PyODPS 3 node. See Use the O&M Assistant feature.
Output logs
Keep log output lean. Include alert logs and progress checkpoints rather than dumping large datasets to the log. The 4 MB cap applies to the entire output log of a node run.
Data lineage
If SQL statements in a PyODPS node fail to generate data lineages in Data Map, pass the DataWorks scheduler runtime parameters as SQL hints. The following code shows how to collect these parameters and pass them when running SQL:
import os
# Collect DataWorks scheduler runtime parameters
skynet_hints = {}
for k, v in os.environ.items():
if k.startswith('SKYNET_'):
skynet_hints[k] = v
# Pass the parameters as hints when running SQL
o.execute_sql('INSERT OVERWRITE TABLE XXXX SELECT * FROM YYYY WHERE ***', hints=skynet_hints)
For more information, see View data lineages and Configure the hints parameter.
Write and run code
For the full PyODPS syntax reference, see Overview.
Use the MaxCompute entry point
Every PyODPS node pre-injects odps (and its alias o) as the MaxCompute entry point. There is no need to initialize a client manually.
print(odps.exist_table('PyODPS_iris'))
Run SQL statements
Use o.execute_sql() to run SQL statements against MaxCompute.
By default, InstanceTunnel is disabled. When reading results with instance.open_reader, the Result interface limits reads to 10,000 records. To read all records, enable InstanceTunnel globally:
options.tunnel.use_instance_tunnel = True
options.tunnel.limit_instance_tunnel = False # Remove the record count limit
with instance.open_reader() as reader:
# Reads all records using InstanceTunnel
To enable InstanceTunnel for a single read operation without changing the global setting:
with instance.open_reader(tunnel=True, limit=False) as reader:
# Reads all records for this operation only
Read SQL query results
Use open_reader to process query results.
For SQL statements that return structured data:
with o.execute_sql('select * from dual').open_reader() as reader:
for record in reader: # Process each record
...
For DDL statements such as DESC, use reader.raw to get the raw output:
with o.execute_sql('desc dual').open_reader() as reader:
print(reader.raw)
Configure runtime parameters
Pass runtime settings to SQL execution using the hints parameter (a dictionary):
o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})
To apply settings globally across all SQL executions in the node:
from odps import options
options.sql.settings = {'odps.sql.mapper.split.size': 16}
o.execute_sql('select * from PyODPS_iris') # Uses the global settings
For more information about supported hints, see SET operations.
Use DataFrame to process data
DataFrame is not recommended. Consider using SQL or other supported approaches.
DataFrame API operations are lazy — they run only when you call an immediately executed method.
from odps.df import DataFrame
iris = DataFrame(o.get_table('pyodps_iris'))
for record in iris[iris.sepal_width < 3].execute(): # Triggers execution
...
To enable implicit execution for display methods, set options.interactive to True at the start of the node:
from odps import options
from odps.df import DataFrame
options.interactive = True # Enable at the top of the node
iris = DataFrame(o.get_table('pyodps_iris'))
print(iris.sepal_width.sum()) # Runs immediately and prints the result
By default, options.verbose is True in DataWorks, so the Logview URL and other execution details appear in the log during a run.
Example: query a MaxCompute table with DataFrame
-
Prepare a dataset and create a table named
pyodps_iris. See DataFrame data processing. -
Create a DataFrame object from the table. See Create a DataFrame object from a MaxCompute table.
-
Enter the following code in the code editor and run the node:
from odps.df import DataFrame # Create a DataFrame from the MaxCompute table iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepallength.head(5))Expected output:
sepallength 0 4.5 1 5.5 2 4.9 3 5.0 4 6.0
Configure scheduling parameters
Unlike SQL nodes, PyODPS node code does not perform ${param_name} string substitution. Instead, DataWorks injects scheduling parameters into the node as a global args dictionary before the node runs.
Step 1: Add parameters in the node properties
Open the node's Properties tab (right-side navigation pane on the configuration tab) and add entries in the Scheduling Parameter section. For the format and syntax differences between node types, see Configure scheduling parameters for different types of nodes.
Step 2: Read parameters in your code
For example, if you set ds=${yyyymmdd} in the Scheduling Parameter section, read the value in your code as follows:
print('ds=' + args['ds'])
# Output: ds=20161116
To get the partition for that date:
o.get_table('table_name').get_partition('ds=' + args['ds'])
Custom scheduling parameters for PyODPS nodes must be set to a constant value. Unlike SQL nodes, the value is not automatically substituted at runtime.
For more PyODPS task development scenarios, see:
What's next
-
Verify that the node ran successfully: The approach for confirming a successful Shell script run also applies to Python scripts.
-
Deploy the PyODPS 3 node: In standard mode workspaces, deploy the node to the production environment before scheduling it.
-
Perform O&M on the PyODPS 3 node: After deploying to Operation Center, manage and monitor the node from the production environment.
-
PyODPS FAQ: Common issues and troubleshooting guidance.