This topic describes how to create a PyODPS 3 node and the limits on the usage of PyODPS 3 nodes in DataWorks.
Background information
- DataWorks allows you to create Python resources in a visualized manner. If you want to use a PyODPS node to reference a third-party package, use an exclusive resource group for scheduling and install the package on the O&M Assistant page of the resource group.
- A third-party package that you install on the O&M Assistant page can be referenced only when you run a PyODPS node on an exclusive resource group for scheduling. For information about how to reference third-party packages in MaxCompute Python user-defined functions (UDFs), see Reference third-party packages in Python UDFs.
- If you want to use a PyODPS node to access a data source or service that is deployed in a special network environment, such as a virtual private cloud (VPC) or data center, use an exclusive resource group for scheduling to run the node, and establish a network connection between the resource group and the data source or service. For more information, see Establish a network connection between a resource group and a data source.
- For more information about the PyODPS syntax, see PyODPS documentation.
- PyODPS nodes are classified into PyODPS 2 nodes and PyODPS 3 nodes. The two types of PyODPS nodes use different Python versions at the underlying layer. PyODPS 2 nodes use Python 2, and PyODPS 3 nodes use Python 3. You can create a PyODPS node based on the Python version in use.
Limits
- Due to the specifications of resources in the resource group (including the shared resource group for scheduling and exclusive resource group for scheduling) that you want to use to run a PyODPS node, we recommend that you use the PyODPS node to process no more than 50 MB of on-premises data. If the PyODPS node processes more than 50 MB of on-premises data, an out-of-memory (OOM) exception may occur, and the system may report Got killed. We recommend that you do not write excessive data processing code for a PyODPS node. For more information, see Best practices for efficient use of PyODPS nodes.
- If the system reports Got killed, the memory usage exceeds the limit, and the system terminates the related processes. We recommend that you do not perform local data operations. However, the limits on the memory usage and CPU utilization do not apply to SQL or DataFrame tasks that are initiated by PyODPS. Take note that to_pandas tasks are excluded.
- You can use the NumPy and pandas libraries that are pre-installed in DataWorks to run functions other than UDFs. Third-party packages that contain binary code are not supported.
- For compatibility reasons, options.tunnel.use_instance_tunnel is set to False in DataWorks by default. If you want to globally enable InstanceTunnel, you must set this parameter to True.
- Python 3 defines bytecode differently in different subversions such as Python 3.7 and Python 3.8.
MaxCompute is compatible with Python 3.7. A MaxCompute client that uses another subversion of Python 3 returns an error when code that has specific syntax is run. For example, a MaxCompute client that uses Python 3.8 returns an error when code that has the finally block syntax is run. We recommend that you use Python 3.7.
- PyODPS 3 nodes can run on a shared resource group or an exclusive resource group for scheduling that is purchased after April 2020. If your exclusive resource group for scheduling is purchased before April 2020, you can join the DataWorks DingTalk group and contact the technical personnel on-duty to upgrade your resource group.
Procedure
For more information about the PyODPS syntax, see PyODPS documentation.Configuration | Description |
---|---|
Create a PyODPS 3 node | DataWorks provides PyODPS 3 nodes. PyODPS integrates with MaxCompute SDK for Python. You can create a PyODPS 3 node and edit Python code for the node. |
Use the MaxCompute entry point | In DataWorks, each PyODPS 3 node includes the global variable odps or o, which is the MaxCompute entry point. Therefore, you do not need to manually specify the MaxCompute entry point. |
Execute SQL statements | PyODPS 3 nodes allow you to execute SQL statements to query data. |
Configure runtime parameters | You can use the hints parameter to configure runtime parameters. The hints parameter is of the DICT type. |
Obtain query results of SQL statements | You can obtain query results of SQL statements. |
Use DataFrame to process data | You can use DataFrame to process data. |
Configure scheduling properties | If you want the system to periodically run a PyODPS node, you must configure the scheduling properties for the node. |
Commit a node | You can commit a node. After you commit a node in a workspace in standard mode, the node takes effect only in the development environment, and the system does not automatically schedule the node in the development environment. You must deploy the node to the production environment before the node can be scheduled to run periodically. |
Create a PyODPS 3 node
- Go to the DataStudio page.
- Log on to the DataWorks console.
- In the left-side navigation pane, click Workspaces.
- In the top navigation bar, select the region in which the workspace that you want to manage resides. Find the workspace and click DataStudio in the Actions column.
- Move the pointer over the
icon and choose .
Alternatively, you can click the name of the desired workflow in the Business Flow section, right-click MaxCompute, and then choose .For more information about how to create a workflow, see Create a workflow.
- In the Create Node dialog box, configure the Name and Path parameters. Note The node name must be 1 to 128 characters in length and can contain letters, digits, underscores (_), and periods (.).
- Click Commit.
Use the MaxCompute entry point
print(odps.exist_table('PyODPS_iris'))
Execute SQL statements
You can execute SQL statements in the PyODPS 3 node. For more information, see SQL.
options.tunnel.use_instance_tunnel = True
options.tunnel.limit_instance_tunnel = False # Remove the limit on the number of data records to read.
with instance.open_reader() as reader:
# Use InstanceTunnel to read all data.
You can also add tunnel=True
to to enable InstanceTunnel for the current open_reader operation. You can add limit=False
to open_reader to remove the limit on the number of data records to read for the current open_reader operation.
with instance.open_reader(tunnel=True, limit=False) as reader:
# The current open_reader operation is performed by using InstanceTunnel, and all data can be read.
Configure runtime parameters
You can use the hints parameter to configure runtime parameters. The hints parameter is of the DICT type. For more information about the hints parameter, see SET operations.o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})
from odps import options
options.sql.settings = {'odps.sql.mapper.split.size': 16}
o.execute_sql('select * from PyODPS_iris') # Configure the hints parameter globally.
Obtain query results of SQL statements
You can use the open_reader method to obtain query results in the following scenarios:- The SQL statements return structured data.
with o.execute_sql('select * from dual').open_reader() as reader: for record in reader: # Process each record.
- SQL statements such as DESC are executed. In this case, you can use the reader.raw property to obtain raw query results.
with o.execute_sql('desc dual').open_reader() as reader: print(reader.raw)
Note If you use a custom scheduling parameter and run the PyODPS 3 node on the configuration tab of the node, you must set the scheduling parameter to a constant value to specify a fixed time. The value of the custom scheduling parameter for PyODPS nodes cannot be automatically replaced.
Use DataFrame to process data
- Call a DataFrame API operation
DataFrame API operations are not automatically called. These operations can be called only when you explicitly call an immediately executed method.
from odps.df import DataFrame iris = DataFrame(o.get_table('pyodps_iris')) for record in iris[iris.sepal_width < 3].execute(): # Call an immediately executed method to process each data record.
To call an immediately executed method for data display, set
options.interactive
to True.from odps import options from odps.df import DataFrame options.interactive = True # Set options.interactive to True at the beginning of the code. iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepal_width.sum()) # The method is immediately executed after the system displays information.
- Display details
To display details, you must set
options.verbose
to True. By default, this parameter is set to True in DataWorks. The system displays details such as the Logview URL during the running process.
Configure scheduling properties
In the right-side navigation pane of the configuration tab of the node, click the Properties tab. In the Parameters section of the Properties tab, configure scheduling parameters for the node. The method of configuring scheduling parameters for PyODPS nodes is different from that of configuring scheduling parameters for SQL nodes. For more information, see Configure scheduling parameters for different types of nodes.
Different from SQL nodes in DataWorks, strings such as ${param_name} are not replaced in the code of a PyODPS node. Instead, a dictionary named args
is added to the PyODPS node as a global variable before the node code is run. You can obtain the scheduling parameters from the dictionary. This way, Python code is not affected. For example, if you set ds=${yyyymmdd}
in the Parameters section of the Properties tab, you can run the following command to obtain the parameter value:
print('ds=' + args['ds'])
ds=20161116
ds=${yyyymmdd}
: o.get_table('table_name').get_partition('ds=' + args['ds'])
Commit a node
- Click the
icon in the top toolbar.
- In the Submit dialog box, enter information in the Change description field.
- Click Confirm.
For more information about node O&M, see Basic O&M operations for auto triggered nodes.
FAQ: How do I determine whether a custom Python script is successfully run?
The logic for determining whether a custom Python script is successfully run is the same as the logic for determining whether a custom Shell script is successfully run. For more information, see Create a Shell node.