PyODPS is MaxCompute SDK for Python. You can create and run PyODPS nodes in DataWorks. This topic describes the limits of PyODPS in DataWorks and how to create a PyODPS node. This topic also provides examples on how to use PyODPS.
Limits
- Limits on usage
Each PyODPS node in DataWorks can process a maximum of 50 MB of data and can occupy a maximum of 1 GB of memory. If the system reports Got killed, the memory usage exceeds the upper limit, and the system terminates the related processes. We recommend that you commit data processing tasks to MaxCompute for distributed execution instead of directly downloading data from a PyODPS node and processing the data in DataWorks. For more information about the comparison between the two methods, see Overview.
- Limits on packages
- Features may be limited in the following aspects due to the lack of packages, such as matplotlib:
- The use of the plot function of DataFrame is affected.
- DataFrame user-defined functions (UDFs) can be used only after they are committed to MaxCompute. You can use only pure Python libraries and the NumPy library to run UDFs based on the requirements of the Python sandbox. Other third-party libraries, such as pandas, cannot be used.
- However, you can use the NumPy and pandas libraries that are pre-installed in DataWorks to run non-UDFs. Third-party packages that contain binary code are not supported.
- PyODPS nodes in DataWorks do not support the Python package atexit. You must use try-finally to enable the related features.
- Features may be limited in the following aspects due to the lack of packages, such as matplotlib:
- Limit on the number of data records that can be read
By default, the options.tunnel.use_instance_tunnel parameter is set to False for a PyODPS node in DataWorks. This indicates that a maximum of 10,000 data records can be read. If you want to read more data records, you must manually set options.tunnel.use_instance_tunnel to True to globally enable InstanceTunnel.
Procedure
- Create a PyODPS node.
You can go to the Data Analytics page of DataWorks to create a PyODPS node. PyODPS nodes are classified into PyODPS 2 nodes and PyODPS 3 nodes.
- The underlying Python version of PyODPS 2 is Python 2.
- The underlying Python version of PyODPS 3 is Python 3.
- Develop code for the PyODPS node.
After you create the PyODPS node, you can learn about the main capabilities of PyODPS from the examples in the following sections:
- MaxCompute entry
- Execution of SQL statements
- DataFrame
- Obtain scheduling parameters
- Configure the hints parameter
- Configure scheduling parameters. Then, save, commit, and publish the node. This way, the node can run at regular intervals.
MaxCompute entry
odps
or
o
, which is the MaxCompute entry. You do not need to specify the MaxCompute entry. Sample command:
# Check whether the pyodps_iris table exists.
print(o.exist_table('pyodps_iris'))
If
True
is returned, the table exists.
Execution of SQL statements
General capabilities
- Execute SQL statements on a PyODPS node: For example, you can execute SQL statements by using execute_sql()/run_sql(). You can execute only DDL and DML SQL statements.
Note Not all SQL statements can be executed by using the methods of the MaxCompute entry object. Specific SQL statements that can be executed on the MaxCompute client may fail to be executed by using the
execute_sql()
andrun_sql()
methods. You must use other methods to execute non-DDL or non-DML statements. For example, you must use therun_security_query
method to execute GRANT or REVOKE statements, and use therun_xflow
orexecute_xflow
method to call API operations. - Read SQL execution results from a PyODPS node: For example, you can use the open_reader() method to read SQL execution results.
Limit on the number of data records that can be read
limit
on the number of data records that can be read.
- Globally remove the
limit
on the number of data records that can be readYou can execute the following statements to enable InstanceTunnel and remove the limit on the number of data records that can be read.options.tunnel.use_instance_tunnel = True options.tunnel.limit_instance_tunnel = False # Remove the limit on the number of data records that can be read. with instance.open_reader() as reader: # Use InstanceTunnel to read all data. You can use reader.count to obtain the number of data records.
- Remove the
limit
on the number of data records that can be readYou can also addtunnel=True
to the open_reader() method to enable InstanceTunnel for the current open_reader() method. You can addlimit=False
to the open_reader() method to remove thelimit
on the amount of data for the current open_reader() method.with instance.open_reader(tunnel=True, limit=False) as reader: # The current open_reader method is called by using InstanceTunnel, and all data can be read.
DataFrame
- Execution methods
To perform operations on DataFrames in DataWorks, you must explicitly call automatically executed methods, such as
execute
andpersist
. Sample code:# Call an automatically executed method to process each data record and display all data records whose iris.sepalwidth is less than 3 in the pyodps_iris table. from odps.df import DataFrame iris = DataFrame(o.get_table('pyodps_iris')) for record in iris[iris.sepalwidth < 3].execute(): print(record)
- Display of details
By default,
options.verbose
is enabled in DataWorks. This indicates that the processing details of a PyODPS node in DataWorks, such as Logview, are displayed on your screen. You can manually configure this option to specify whether to display the processing details.
Obtain scheduling parameters
- In the code of a PyODPS node, a scheduling parameter is directly used to replace a string, such as ${param_name}.
- Before the code of a PyODPS node is run, a dictionary named
args
is added to the global variable. This prevents the impact on the code. The code uses the args[param_name] method to obtain the value of the scheduling parameter, instead of replacing ${param_name} with the scheduling parameter in the code.
ds=${yyyymmdd}
in the
Parameters field in the
Basic properties section. Then, you can specify the following commands in the code of the node to obtain the parameter value:
- Obtain the value of the
ds
parameter.print('ds=' + args['ds']) # Obtain the time that is specified by the ds parameter, such as ds=20161116.
- Obtain the table data for the partition named
ds=${yyyymmdd}
.o.get_table('table_name').get_partition('ds=' + args['ds']) # Obtain the data from the table that is specified by table_name in the ds partition.
Configure the hints parameter
hints
parameter to configure runtime parameters. The value of the hints parameter is of the DICT type.
o.execute_sql('select * from pyodps_iris', hints={'odps.sql.mapper.split.size': 16})
You can globally configure the sql.settings parameter. The relevant runtime parameters are automatically added during each execution.
from odps import options
options.sql.settings = {'odps.sql.mapper.split.size': 16}
o.execute_sql('select * from pyodps_iris') # The hints parameter is automatically configured based on global settings.
Appendix: Sample data
You can create a table named pyodps_iris in DataWorks and import data to the table. For more information about how to create a table and import data to the table, see Step 1 in Use a PyODPS node to query data based on specific criteria. The pyodps_iris table is used in the operation example.