全部产品
Search
文档中心

MaxCompute:Operasi sequence dan eksekusi PyODPS

更新时间:Dec 05, 2025

Topik ini menjelaskan cara melakukan operasi sequence dan eksekusi di PyODPS.

Prosedur

  1. Anda telah membuat proyek MaxCompute.

  2. Pastikan Anda telah membuat ruang kerja DataWorks. Topik ini menggunakan ruang kerja yang berada dalam pratinjau publik untuk DataStudio sebagai contoh.

  3. Buat tabel bernama pyodps_iris di DataWorks.

    1. Masuk ke Konsol DataWorks dan pilih Wilayah di pojok kiri atas.

    2. Pada halaman Workspaces, temukan ruang kerja target dan pada kolom Actions, pilih Shortcuts > DataStudio.

    3. Pada halaman Debugging Configurations, pilih Computing Resource dan Resource Group.

      Jika tidak ada resource group yang ditampilkan, klik Create Resource Group dan tunggu beberapa menit hingga resource group selesai dibuat. Pada halaman Resource Groups, sambungkan ruang kerja ke resource group tersebut.

    4. Jalankan pernyataan berikut di node MaxCompute SQL untuk membuat tabel pyodps_iris.

      CREATE TABLE if not exists pyodps_iris
      (
      sepallength  DOUBLE comment 'Sepal length (cm)',
      sepalwidth   DOUBLE comment 'Sepal width (cm)',
      petallength  DOUBLE comment 'Petal length (cm)',
      petalwidth   DOUBLE comment 'Petal width (cm)',
      name         STRING comment 'Species'
      );
  4. Unduh set data uji dan impor ke MaxCompute.

    1. Unduh dan ekstrak dataset Iris flower. Ubah nama file iris.data menjadi iris.csv.

    2. Masuk ke Konsol DataWorks dan pilih Wilayah di pojok kiri atas.

    3. Pada panel navigasi di sebelah kiri, pilih Data Integration > Data Upload and Download.

    4. Klik Go to Data Upload and Download.

    5. Di panel navigasi sebelah kiri, klik ikon upload image, lalu klik Upload Data.

  5. Pada halaman DataStudio, buat node MaxCompute PyODPS 2 baru. Masukkan kode contoh berikut dan klik Run.

    from odps import DataFrame
    iris = DataFrame(o.get_table('iristable_new'))
    
    # Dapatkan kolom.
    print iris.sepallength.head(5)
    
    print iris['sepallength'].head(5)
    
    # Lihat tipe data suatu kolom.
    print iris.sepallength.dtype
    
    # Ubah tipe data suatu kolom.
    iris.sepallength.astype('int')
    
    # Lakukan perhitungan.
    print iris.groupby('name').sepallength.max().head(5)
    
    print iris.sepallength.max()
    
    # Ubah nama kolom.
    print iris.sepalwidth.rename('speal_width').head(5)
    
    # Lakukan operasi kolom sederhana.
    print (iris.sepallength + iris.sepalwidth).rename('sum_sepal').head(5)
  6. Buat dan jalankan node PyODPS bernama PyExecute dengan kode berikut:

    from odps import options
    from odps import DataFrame
    
    # Lihat URL Logview dari instans waktu proses.
    options.verbose = True
    iris = DataFrame(o.get_table('pyodps_iris'))
    iris[iris.sepallength < 5].exclude('sepallength')[:5].execute()
    
    my_logs = []
    def my_loggers(x):
      my_logs.append(x)
    
    options.verbose_log = my_loggers
    
    iris[iris.sepallength < 5].exclude('sepallength')[:5].execute()
    
    print(my_logs)
    
    # Cache hasil Collection antara.
    cached = iris[iris.sepalwidth < 3.5].cache()
    print cached.head(3)
    
    # Lakukan eksekusi asinkron dan paralel.
    from odps.df import Delay
    delay = Delay() # Buat objek Delay.
    df = iris[iris.sepalwidth < 5].cache()  # Terdapat dependensi umum.
    future1 = df.sepalwidth.sum().execute(delay=delay) # Sistem langsung mengembalikan objek future, tetapi eksekusi belum dimulai.
    future2 = df.sepalwidth.mean().execute(delay=delay)
    future3 = df.sepalwidth.max().execute(delay=delay)
    
    delay.execute(n_parallel=3)
    
    print future1.result()
    print future2.result()
    print future3.result()