Topik ini menjelaskan cara melakukan operasi sequence dan eksekusi di PyODPS.
Prosedur
Anda telah membuat proyek MaxCompute.
Pastikan Anda telah membuat ruang kerja DataWorks. Topik ini menggunakan ruang kerja yang berada dalam pratinjau publik untuk DataStudio sebagai contoh.
Buat tabel bernama
pyodps_irisdi DataWorks.Masuk ke Konsol DataWorks dan pilih Wilayah di pojok kiri atas.
Pada halaman Workspaces, temukan ruang kerja target dan pada kolom Actions, pilih .
Pada halaman Debugging Configurations, pilih Computing Resource dan Resource Group.
Jika tidak ada resource group yang ditampilkan, klik Create Resource Group dan tunggu beberapa menit hingga resource group selesai dibuat. Pada halaman Resource Groups, sambungkan ruang kerja ke resource group tersebut.
Jalankan pernyataan berikut di node MaxCompute SQL untuk membuat tabel
pyodps_iris.CREATE TABLE if not exists pyodps_iris ( sepallength DOUBLE comment 'Sepal length (cm)', sepalwidth DOUBLE comment 'Sepal width (cm)', petallength DOUBLE comment 'Petal length (cm)', petalwidth DOUBLE comment 'Petal width (cm)', name STRING comment 'Species' );
Unduh set data uji dan impor ke MaxCompute.
Unduh dan ekstrak dataset Iris flower. Ubah nama file
iris.datamenjadiiris.csv.Masuk ke Konsol DataWorks dan pilih Wilayah di pojok kiri atas.
Pada panel navigasi di sebelah kiri, pilih .
Klik Go to Data Upload and Download.
Di panel navigasi sebelah kiri, klik ikon upload
, lalu klik Upload Data.
Pada halaman DataStudio, buat node MaxCompute PyODPS 2 baru. Masukkan kode contoh berikut dan klik Run.
from odps import DataFrame iris = DataFrame(o.get_table('iristable_new')) # Dapatkan kolom. print iris.sepallength.head(5) print iris['sepallength'].head(5) # Lihat tipe data suatu kolom. print iris.sepallength.dtype # Ubah tipe data suatu kolom. iris.sepallength.astype('int') # Lakukan perhitungan. print iris.groupby('name').sepallength.max().head(5) print iris.sepallength.max() # Ubah nama kolom. print iris.sepalwidth.rename('speal_width').head(5) # Lakukan operasi kolom sederhana. print (iris.sepallength + iris.sepalwidth).rename('sum_sepal').head(5)Buat dan jalankan node PyODPS bernama PyExecute dengan kode berikut:
from odps import options from odps import DataFrame # Lihat URL Logview dari instans waktu proses. options.verbose = True iris = DataFrame(o.get_table('pyodps_iris')) iris[iris.sepallength < 5].exclude('sepallength')[:5].execute() my_logs = [] def my_loggers(x): my_logs.append(x) options.verbose_log = my_loggers iris[iris.sepallength < 5].exclude('sepallength')[:5].execute() print(my_logs) # Cache hasil Collection antara. cached = iris[iris.sepalwidth < 3.5].cache() print cached.head(3) # Lakukan eksekusi asinkron dan paralel. from odps.df import Delay delay = Delay() # Buat objek Delay. df = iris[iris.sepalwidth < 5].cache() # Terdapat dependensi umum. future1 = df.sepalwidth.sum().execute(delay=delay) # Sistem langsung mengembalikan objek future, tetapi eksekusi belum dimulai. future2 = df.sepalwidth.mean().execute(delay=delay) future3 = df.sepalwidth.max().execute(delay=delay) delay.execute(n_parallel=3) print future1.result() print future2.result() print future3.result()