This topic describes how to use a PyODPS node to perform sequence operations.
Prerequisites
The following operations are completed:
- MaxCompute is activated. For more information, see Activate MaxCompute.
- DataWorks is activated. For more information, see Activate DataWorks.
- A workflow is created in the DataWorks console. In this example, a workflow is created
for a DataWorks workspace in basic mode. For more information, see Create a workflow.
Procedure
- Create a table and import data to it.
- Download the Iris dataset named iris.data and rename it iris.csv.
- Create a table named pyodps_iris and upload the iris.csv dataset. For more information, see Create tables and import data.
In this example, execute the following statement to create the pyodps_iris table:
CREATE TABLE if not exists pyodps_iris
(
sepallength DOUBLE comment 'sepal length (cm)',
sepalwidth DOUBLE comment 'sepal width (cm)',
petallength DOUBLE comment ''petal length (cm)',
petalwidth DOUBLE comment 'petal width (cm)',
name STRING comment 'type'
);
- Log on to the DataWorks console.
- In the left-side navigation pane, click Workspaces.
- On the page that appears, find the workspace that you want to manage and click Data Analytics in the Actions column.
- On the DataStudio page, right-click a workflow and choose .
- In the New node dialog box, specify the Node name parameter and click Submit.
- On the configuration tab of the PyODPS 2 node, enter the code of the node in the code
editor. In this example, enter the following code:
from odps import DataFrame
iris = DataFrame(o.get_table('pyodps_iris'))
# Obtain data from the specified column.
print iris.sepallength.head(5)
print iris['sepallength'].head(5)
# View the data type of the column.
print iris.sepallength.dtype
# Change the data type of the column.
iris.sepallength.astype('int')
# Group values in the specified column and obtain the maximum value in each group.
print iris.groupby('name').sepallength.max().head(5)
print iris.sepallength.max()
# Rename the specified column.
print iris.sepalwidth.rename('speal_width').head(5)
# Obtain the sum of the values of the specified columns, place the sum in a new column, and rename the new column.
print (iris.sepallength + iris.sepalwidth).rename('sum_sepal').head(5)
- Click the Run icon in the toolbar.
- View the running result of the PyODPS 2 node on the Run Log tab.
In this example, the following information appears on the Run Log tab:
Executing user script with PyODPS 0.8.0
Try to fetch data from tunnel
sepallength
0 4.9
1 4.7
2 4.6
3 5.0
4 5.4
Try to fetch data from tunnel
sepallength
0 4.9
1 4.7
2 4.6
3 5.0
4 5.4
FLOAT64
Sql compiled:
CREATE TABLE tmp_pyodps_ed78e3ba_f13c_4a49_812d_2790d57c25dd LIFECYCLE 1 AS
SELECT MAX(t1.`sepallength`) AS `sepallength_max`
FROM data_service_fr.`pyodps_iris` t1
GROUP BY t1.`name`
sepallength_max
0 5.8
1 7.0
2 7.9
Collection: ref_0
odps.Table
name: data_service_fr.`pyodps_iris`
schema:
sepallength : double # Sepal length (cm)
sepalwidth : double # Sepal width (cm)
petallength : double # Petal length (cm)
petalwidth : double # Petal width (cm)
name : string # Type
max = Max[float64]
sepallength = Column[sequence(float64)] 'sepallength' from collection ref_0
Try to fetch data from tunnel
speal_width
0 3.0
1 3.2
2 3.1
3 3.6
4 3.9
Sql compiled:
CREATE TABLE tmp_pyodps_28120275_8d0f_4683_8318_302fa21459ac LIFECYCLE 1 AS
SELECT t1.`sepallength` + t1.`sepalwidth` AS `sum_sepal`
FROM data_service_fr.`pyodps_iris` t1
sum_sepal
0 7.9
1 7.9
2 7.7
3 8.6
4 9.3
2019-08-13 10:48:13 INFO =================================================================
2019-08-13 10:48:13 INFO Exit code of the Shell command 0
2019-08-13 10:48:13 INFO --- Invocation of Shell command completed ---
2019-08-13 10:48:13 INFO Shell run successfully!
- Use the same method to create and run another PyODPS node named PyExecute.
On the configuration tab of the PyExecute node, enter the following code in the code
editor:
from odps import options
from odps import DataFrame
# View the logview of the running instance.
options.verbose = True
iris = DataFrame(o.get_table('pyodps_iris'))
iris[iris.sepallength < 5].exclude('sepallength')[:5].execute()
my_logs = []
def my_loggers(x):
my_logs.append(x)
options.verbose_log = my_loggers
iris[iris.sepallength < 5].exclude('sepallength')[:5].execute()
print(my_logs)
# Cache the intermediate collection.
cached = iris[iris.sepalwidth < 3.5].cache()
print cached.head(3)
# Concurrently execute methods in asynchronous mode.
from odps.df import Delay
delay = Delay() # Create a Delay object.
df = iris[iris.sepalwidth < 5].cache() # The sum(), mean(), and max() methods depend on cache().
future1 = df.sepalwidth.sum().execute(delay=delay) # This method immediately returns a future object without execution after it is called.
future2 = df.sepalwidth.mean().execute(delay=delay)
future3 = df.sepalwidth.max().execute(delay=delay)
delay.execute(n_parallel=3)
print future1.result()
print future2.result()
print future3.result()
In this example, the following logs are recorded after the PyExecute node is run:
Executing user script with PyODPS 0.8.0
Sql compiled:
CREATE TABLE tmp_pyodps_4a204590_0510_4e9c_823b_5b837a437840 LIFECYCLE 1 AS
SELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name`
FROM data_service_fr.`pyodps_iris` t1
WHERE t1.`sepallength` < 5
LIMIT 5
Instance ID: 20190813025233386g04djssa
Log view: http://logview.odps.aliyun.com/logview/XXX
['Sql compiled:', 'CREATE TABLE tmp_pyodps_03b92c55_8442_4e61_8978_656495487b8a LIFECYCLE 1 AS \nSELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name` \nFROM data_service_fr.`pyodps_iris` t1 \nWHERE t1.`sepallength` < 5 \nLIMIT 5', 'Instance ID: 20190813025236282gcsna5pr2', u'
Log view: http://logview.odps.aliyun.com/logview/?h=http://service.odps.aliyun.com/api&XXX
sepallength sepalwidth petallength petalwidth name
0 4.9 3.0 1.4 0.2 Iris-setosa
1 4.7 3.2 1.3 0.2 Iris-setosa
2 4.6 3.1 1.5 0.2 Iris-setosa
454.6
3.05100671141
4.4
2019-08-13 10:52:48 INFO =================================================================
2019-08-13 10:52:48 INFO Exit code of the Shell command 0
2019-08-13 10:52:48 INFO --- Invocation of Shell command completed ---
2019-08-13 10:52:48 INFO Shell run successfully!
2019-08-13 10:52:48 INFO Current task status: FINISH