Topik ini menjelaskan metode eksekusi yang dapat digunakan untuk operasi DataFrame.
Prasyarat
Pastikan persyaratan berikut terpenuhi:
Tabel sampel bernama pyodps_iris telah disiapkan. Untuk informasi lebih lanjut, lihat Pemrosesan Data DataFrame.
Objek DataFrame telah dibuat. Untuk informasi lebih lanjut, lihat bagian "Buat Objek DataFrame dari Tabel MaxCompute" di Buat Objek DataFrame.
Eksekusi tertunda
Operasi DataFrame hanya dieksekusi ketika Anda secara eksplisit memanggil metode execute atau memanggil metode yang secara internal memanggil metode execute. Tabel berikut mencantumkan metode yang secara internal memanggil metode execute.
Metode | Deskripsi | Nilai kembali |
persist | Menyimpan hasil eksekusi ke tabel MaxCompute. | PyODPS DataFrame |
execute | Menjalankan operasi dan mengembalikan semua hasil. | ResultFrame |
head | Menjalankan operasi dan mengembalikan N baris pertama data hasil. | ResultFrame |
tail | Menjalankan operasi dan mengembalikan N baris terakhir data hasil. | ResultFrame |
to_pandas | Mengonversi objek Collection menjadi objek pandas DataFrame atau mengonversi objek Sequence menjadi objek Series. Jika parameter wrap diatur ke True, objek PyODPS DataFrame dikembalikan. |
|
plot, hist, dan boxplot | Metode pemetaan. | Tidak tersedia |
Dalam lingkungan interaktif, PyODPS DataFrame secara otomatis memanggil metode execute ketika menampilkan data hasil atau memanggil metode repr. Anda tidak perlu memanggil metode execute secara manual.
Contoh
# Dalam lingkungan non-interaktif, Anda perlu memanggil metode execute secara manual.
print(iris[iris.sepallength < 5][:5].execute())
# Dalam lingkungan interaktif, sistem secara otomatis memanggil metode execute.
print(iris[iris.sepallength < 5][:5])Hasil berikut dikembalikan:
sepallength sepalwidth petallength petalwidth name
0 4.9 3.0 1.4 0.2 Iris-setosa
1 4.7 3.2 1.3 0.2 Iris-setosa
2 4.6 3.1 1.5 0.2 Iris-setosa
3 4.6 3.4 1.4 0.3 Iris-setosa
4 4.4 2.9 1.4 0.2 Iris-setosaJika Anda perlu menonaktifkan sistem agar tidak memanggil metode execute secara otomatis dalam lingkungan interaktif, diperlukan operasi manual. Kode berikut menunjukkan contohnya:
from odps import options
options.interactive = False
print(iris[iris.sepallength < 5][:5])Hasil berikut dikembalikan:
Collection: ref_0
odps.Table
name: hudi_mc_0612.`iris3`
schema:
sepallength : double # Panjang sepal (cm)
sepalwidth : double # Lebar sepal (cm)
petallength : double # Panjang petal (cm)
petalwidth : double # Lebar petal (cm)
name : string # Jenis
Collection: ref_1
Filter[collection]
collection: ref_0
predicate:
Less[sequence(boolean)]
sepallength = Column[sequence(float64)] 'sepallength' from collection ref_0
Scalar[int8]
5
Slice[collection]
collection: ref_1
stop:
Scalar[int8]
5Setelah Anda menonaktifkan panggilan otomatis, seluruh pohon sintaksis abstrak (AST) ditampilkan ketika objek repr ditampilkan. Dalam hal ini, Anda harus memanggil metode execute secara manual jika diperlukan.
Ambil hasil eksekusi
Jika ResultFrame dikembalikan setelah Anda memanggil metode execute atau head, Anda dapat mengambil hasil dari ResultFrame.
ResultFrame adalah set hasil dan tidak dapat digunakan dalam perhitungan berikutnya.
Anda dapat mengambil semua rekaman dari ResultFrame secara iteratif. Kode berikut menunjukkan contohnya:
result = iris.head(3) for r in result: print(list(r))Hasil berikut dikembalikan:
[4.9, 3.0, 1.4, 0.2, 'Iris-setosa'] [4.7, 3.2, 1.3, 0.2, 'Iris-setosa'] [4.6, 3.1, 1.5, 0.2, 'Iris-setosa']Jika pandas diinstal, ResultFrame dapat dikonversi menjadi pandas DataFrame atau PyODPS DataFrame yang menggunakan backend pandas.
# Mengembalikan pandas DataFrame. pd_df = iris.head(3).to_pandas() # Mengembalikan PyODPS DataFrame yang menggunakan backend pandas. wrapped_df = iris.head(3).to_pandas(wrap=True)
Simpan hasil ke tabel MaxCompute
Anda dapat memanggil metode
persistuntuk mengembalikan objek DataFrame baru untuk objek Collection. Metode persist menggunakan nama tabel sebagai parameter.iris2 = iris[iris.sepalwidth < 2.5].persist('pyodps_iris') print(iris2.head(5))Hasil berikut dikembalikan:
sepallength sepalwidth petallength petalwidth name 0 4.5 2.3 1.3 0.3 Iris-setosa 1 5.5 2.3 4.0 1.3 Iris-versicolor 2 4.9 2.4 3.3 1.0 Iris-versicolor 3 5.0 2.0 3.5 1.0 Iris-versicolor 4 6.0 2.2 4.0 1.0 Iris-versicolorAnda dapat menentukan parameter
partitionsdalam metodepersistuntuk membuat tabel terpartisi. Tabel dipartisi berdasarkan kolom yang ditentukan olehpartitions.iris3 = iris[iris.sepalwidth < 2.5].persist('pyodps_iris_test', partitions=['name']) print(iris3.data)Hasil berikut dikembalikan:
odps.Table name: odps_test_sqltask_finance.`pyodps_iris` schema: sepallength : double sepalwidth : double petallength : double petalwidth : double partitions: name : stringUntuk menulis data ke partisi tabel yang ada, Anda dapat menentukan parameter
partitiondalam metodepersist. Parameter partition menentukan partisi ke mana data ditulis. Sebagai contoh, atur parameter partition keds=******. Tabel harus berisi semua kolom objek DataFrame dan kolom-kolom tersebut harus memiliki tipe yang sama. Parameterdrop_partitiondancreate_partitionhanya valid jika parameter partition ditentukan. Parameter drop_partition menentukan apakah akan menghapus partisi yang ditentukan jika partisi tersebut ada. Parameter create_partition menentukan apakah akan membuat partisi yang ditentukan jika partisi tersebut tidak ada.print(iris[iris.sepalwidth < 2.5].persist('pyodps_iris_partition', partition='ds=test', drop_partition=True, create_partition=True).head(5))Hasil berikut dikembalikan:
sepallength sepalwidth petallength petalwidth name ds 0 4.5 2.3 1.3 0.3 Iris-setosa test 1 5.5 2.3 4.0 1.3 Iris-versicolor test 2 4.9 2.4 3.3 1.0 Iris-versicolor test 3 5.0 2.0 3.5 1.0 Iris-versicolor test 4 6.0 2.2 4.0 1.0 Iris-versicolor testKetika Anda menulis data ke tabel, Anda dapat menentukan waktu hidup (TTL) tabel. Sebagai contoh, pernyataan berikut mengatur TTL tabel menjadi 10 hari.
print(iris[iris.sepalwidth < 2.5].persist('pyodps_iris', lifecycle=10).head(5))Hasil berikut dikembalikan:
sepallength sepalwidth petallength petalwidth name 0 4.5 2.3 1.3 0.3 Iris-setosa 1 5.5 2.3 4.0 1.3 Iris-versicolor 2 4.9 2.4 3.3 1.0 Iris-versicolor 3 5.0 2.0 3.5 1.0 Iris-versicolor 4 6.0 2.2 4.0 1.0 Iris-versicolorJika sumber data tidak berisi objek MaxCompute dan hanya berisi objek pandas, Anda harus secara manual menentukan objek pintu masuk MaxCompute atau menandai objek pintu masuk sebagai objek global ketika Anda memanggil metode
persist.# Objek pintu masuk adalah o. # Tentukan objek pintu masuk. df.persist('table_name', odps=o) # Operasi alternatif: Tandai objek pintu masuk sebagai objek global. o.to_global() df.persist('table_name')
Simpan hasil ke pandas DataFrame
Anda dapat memanggil metode to_pandas untuk menyimpan hasil ke objek pandas DataFrame. Jika parameter wrap diatur ke True, objek PyODPS DataFrame dikembalikan.
Contoh 1: Panggil metode
to_pandasuntuk mengembalikan objek pandas DataFrame.print(type(iris[iris.sepalwidth < 2.5].to_pandas()))Hasil berikut dikembalikan:
<class 'pandas.core.frame.DataFrame'>Contoh 2: Atur parameter
wrapke True untuk mengembalikan objek PyODPS DataFrame.print(type(iris[iris.sepalwidth < 2.5].to_pandas(wrap=True)))Hasil berikut dikembalikan:
<class 'odps.df.core.DataFrame'>
Anda dapat memanggil metode open_reader di PyODPS dan menggunakan reader.to_pandas() untuk mengonversi hasil menjadi objek pandas DataFrame. Untuk informasi lebih lanjut, lihat Tabel.
Konfigurasikan parameter runtime
Anda dapat mengonfigurasi parameter runtime untuk metode yang dieksekusi segera, seperti execute, persist, dan to_pandas. Pengaturan ini hanya valid untuk backend SQL MaxCompute.
Konfigurasikan parameter global. Untuk informasi lebih lanjut, lihat SQL.
Tentukan parameter
hintsdalam metode-metode ini. Ini memastikan bahwa parameter runtime yang ditentukan hanya valid untuk perhitungan saat ini.print(iris[iris.sepallength < 5].to_pandas(hints={'odps.sql.mapper.split.size': 16}))Hasil berikut dikembalikan:
sepallength sepalwidth petallength petalwidth name 0 4.5 2.3 1.3 0.3 Iris-setosa 1 4.9 2.4 3.3 1.0 Iris-versicolor
Tampilkan detail pada runtime
Untuk melihat informasi LogView instance pada runtime, Anda harus memodifikasi konfigurasi global. Kode berikut menunjukkan contohnya:
from odps import options options.verbose = True print(iris[iris.sepallength < 5].exclude('sepallength')[:5].execute())Hasil berikut dikembalikan:
Sql compiled: SELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name` FROM odps_test_sqltask_finance.`pyodps_iris` t1 WHERE t1.`sepallength` < 5 LIMIT 5 Instance ID: Log view:http://logview sepalwidth petallength petalwidth name 0 2.3 1.3 0.3 Iris-setosa 1 2.4 3.3 1.0 Iris-versicolorAnda dapat menentukan fungsi logging. Kode berikut menunjukkan contohnya:
my_logs = [] def my_logger(x): my_logs.append(x) options.verbose_log = my_logger print(iris[iris.sepallength < 5].exclude('sepallength')[:5].execute()) print(my_logs)Hasil berikut dikembalikan:
sepalwidth petallength petalwidth name 0 2.3 1.3 0.3 Iris-setosa 1 2.4 3.3 1.0 Iris-versicolor ['Sql compiled:', 'CREATE TABLE tmp_pyodps_24332bdb_4fd0_4d0d_aed4_38a443618268 LIFECYCLE 1 AS \nSELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name` \nFROM odps_test_sqltask_finance.`pyodps_iris` t1 \nWHERE t1.`sepallength` < 5 \nLIMIT 5', 'Instance ID: 20230815034706122gbymevg*****', ' Log view:]
Cached intermediate calculation results of Collection objects
Selama proses perhitungan DataFrame, beberapa objek Collection digunakan berkali-kali. Untuk melihat hasil eksekusi dari proses perantara, Anda dapat memanggil metode cache untuk menandai objek Collection yang ingin Anda hitung terlebih dahulu. Kode berikut menunjukkan contohnya.
Eksekusi metode cache ditunda. Perhitungan otomatis tidak langsung dipicu setelah metode cache dipanggil.
cached = iris[iris.sepalwidth < 3.5]['sepallength', 'name'].cache()
df = cached.head(3)
print(df)
# Hasil berikut dikembalikan:
sepallength name
0 4.5 Iris-setosa
1 5.5 Iris-versicolor
2 4.9 Iris-versicolor
# Anda dapat segera mengambil hasil perhitungan karena cached telah dihitung.
print(cached.head(3))
# Hasil berikut dikembalikan:
sepallength name
0 4.5 Iris-setosa
1 5.5 Iris-versicolor
2 4.9 Iris-versicolorEkeskusi asinkron dan paralel
Ekeskusi asinkron
PyODPS DataFrame mendukung eksekusi asinkron. Anda dapat menentukan parameter async untuk mengaktifkan eksekusi asinkron untuk metode yang dieksekusi segera berikut ini: execute, persist, head, tail, dan to_pandas. Parameter timeout menentukan periode timeout. Operasi asinkron mengembalikan objek Future.
future = iris[iris.sepalwidth < 10].head(10, async_=True)
print(future.result())
# Hasil berikut dikembalikan:
sepallength sepalwidth petallength petalwidth name
0 4.5 2.3 1.3 0.3 Iris-setosa
1 5.5 2.3 4.0 1.3 Iris-versicolor
2 4.9 2.4 3.3 1.0 Iris-versicolor
3 5.0 2.0 3.5 1.0 Iris-versicolor
4 6.0 2.2 4.0 1.0 Iris-versicolor
5 6.2 2.2 4.5 1.5 Iris-versicolor
6 5.5 2.4 3.8 1.1 Iris-versicolor
7 5.5 2.4 3.7 1.0 Iris-versicolor
8 6.3 2.3 4.4 1.3 Iris-versicolor
9 5.0 2.3 3.3 1.0 Iris-versicolorEkeskusi paralel
Anda dapat memanggil operasi Delay API yang baru diperkenalkan untuk menunda metode yang dieksekusi segera berikut ini: execute, persist, head, tail, dan to_pandas. Kemudian, objek Future dikembalikan. Ketika operasi Delay API dipanggil, sistem menemukan dependensi dan menjalankan metode berdasarkan konkurensi yang ditentukan. Dalam hal ini, eksekusi asinkron didukung.
from odps.df import Delay
delay = Delay() # Buat objek Delay.
df = iris[iris.sepal_width < 5].cache() # Dependensi umum dari ekspresi berikutnya.
future1 = df.sepal_width.sum().execute(delay=delay) # Mengembalikan objek Future. Eksekusi belum dimulai.
future2 = df.sepal_width.mean().execute(delay=delay)
future3 = df.sepal_length.max().execute(delay=delay)
delay.execute(n_parallel=3) # Eksekusi dimulai dengan tiga thread konkuren.
|==========================================| 1 / 1 (100.00%) 21s
print(future1.result())
# Hasil berikut dikembalikan:
25.0
print(future2.result())
# Hasil berikut dikembalikan:
2.272727272727273Dalam contoh sebelumnya, PyODPS DataFrame pertama-tama mengeksekusi objek dependensi bersama. Kemudian, PyODPS DataFrame mengatur konkurensi menjadi 3 dan mengeksekusi objek dari future1 hingga future3.
Anda dapat menentukan parameter async di delay.execute untuk menentukan apakah akan mengaktifkan eksekusi asinkron. Jika eksekusi asinkron diaktifkan, Anda juga dapat menggunakan parameter timeout untuk menentukan periode timeout.