All Products
Search
Document Center

Platform For AI:Baca dan tulis tabel MaxCompute dengan paiio

Last Updated:Mar 11, 2026

Tim Platform for AI (PAI) mengembangkan modul paiio untuk menyederhanakan pembacaan dan penulisan data ke tabel MaxCompute dalam tugas Deep Learning Containers (DLC). Modul paiio mendukung tiga antarmuka: TableRecordDataset, TableReader, dan TableWriter. Topik ini menjelaskan antarmuka tersebut beserta contoh penggunaannya.

Batasan

  • Modul paiio mendukung TensorFlow 1.12, 1.15, dan 2.0. Anda hanya dapat menggunakan modul ini jika memilih citra runtime yang sesuai untuk versi-versi tersebut dalam tugas DLC Anda.

  • Modul paiio tidak mendukung gambar kustom.

Persiapan: Konfigurasi informasi akun

Sebelum menggunakan modul paiio untuk membaca atau menulis data ke tabel MaxCompute, Anda harus mengonfigurasi AccessKey akun MaxCompute Anda. PAI membaca informasi konfigurasi dari sebuah file. Tempatkan file konfigurasi tersebut di sistem file yang dipasang (mounted) dan referensikan dalam kode Anda melalui variabel lingkungan.

  1. Buat file konfigurasi dengan konten berikut.

    access_id=xxxx
    access_key=xxxx
    end_point=http://xxxx

    Parameter

    Deskripsi

    access_id

    ID AccessKey Akun Alibaba Cloud Anda.

    access_key

    Rahasia AccessKey Akun Alibaba Cloud Anda.

    end_point

    Titik akhir MaxCompute. Misalnya, titik akhir untuk wilayah Tiongkok (Shanghai) adalah http://service.cn-shanghai.maxcompute.aliyun.com/api. Untuk informasi lebih lanjut, lihat Endpoints.

  2. Tentukan path file konfigurasi dalam kode Anda sebagai berikut.

    os.environ['ODPS_CONFIG_FILE_PATH'] = '<your MaxCompute config file path>'

    <your MaxCompute config file path> adalah path ke file konfigurasi.

Penggunaan TableRecordDataset

Deskripsi antarmuka

Komunitas TensorFlow merekomendasikan penggunaan antarmuka Dataset mulai TensorFlow 1.2 dan seterusnya untuk membangun aliran data. Antarmuka ini menggantikan antarmuka thread dan queue yang asli. Untuk informasi lebih lanjut, lihat Dataset. Anda dapat menggabungkan dan mentransformasi beberapa objek Dataset untuk menghasilkan data komputasi, sehingga menyederhanakan kode input data.

  • Definisi antarmuka (Python)

    class TableRecordDataset(Dataset):
      def __init__(self,
                   filenames,
                   record_defaults,
                   selected_cols=None,
                   excluded_cols=None,
                   slice_id=0,
                   slice_count=1,
                   num_threads=0,
                   capacity=0):
  • Parameter

    Parameter

    Wajib

    Tipe

    Bawaan

    Deskripsi

    filenames

    Ya

    STRING

    N/A

    Daftar nama tabel yang akan dibaca. Skema semua tabel harus sama. Format nama tabel adalah odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/....

    record_defaults

    Ya

    LIST atau TUPLE

    N/A

    Digunakan untuk konversi tipe data kolom output dan menyediakan nilai bawaan untuk kolom kosong. Jika jumlah nilai tidak sesuai dengan jumlah kolom yang dibaca, atau jika tipe data tidak dapat dikonversi secara otomatis, sistem akan melemparkan exception selama eksekusi.

    Tipe data yang didukung meliputi FLOAT32, FLOAT64, INT32, INT64, BOOL, dan STRING. Untuk nilai bawaan tipe INT64, lihat np.array(0, np.int64).

    selected_cols

    Tidak

    STRING

    None

    Kolom yang dipilih, ditentukan sebagai string nama kolom yang dipisahkan koma (,). Nilai bawaan None membaca semua kolom. Parameter ini tidak dapat digunakan bersamaan dengan excluded_cols.

    excluded_cols

    Tidak

    STRING

    None

    Kolom yang dikecualikan, ditentukan sebagai string nama kolom yang dipisahkan koma (,). Nilai bawaan None membaca semua kolom. Parameter ini tidak dapat digunakan bersamaan dengan selected_cols.

    slice_id

    Tidak

    INT

    0

    Dalam skenario pembacaan terdistribusi, ini adalah ID shard saat ini (penomoran berbasis nol). Untuk pembacaan terdistribusi, sistem membagi tabel menjadi beberapa shard berdasarkan slice_count dan membaca shard yang sesuai dengan slice_id.

    Jika slice_id adalah 0 (bawaan) dan slice_count adalah 1, seluruh tabel dibaca. Jika slice_count lebih besar dari 1, shard ke-0 dibaca.

    slice_count

    Tidak

    INT

    1

    Dalam skenario pembacaan terdistribusi, ini adalah jumlah total shard, yang biasanya merupakan jumlah worker. Nilai bawaan adalah 1, yang berarti tidak ada sharding dan seluruh tabel dibaca.

    num_threads

    Tidak

    INT

    0

    Jumlah thread yang diaktifkan oleh reader bawaan untuk setiap tabel guna melakukan pra-ambil (prefetch) data. Thread-thread ini independen dari thread komputasi. Nilainya harus bilangan bulat antara 1 hingga 64. Jika num_threads adalah 0, sistem secara otomatis mengatur jumlah thread pra-ambil menjadi seperempat dari jumlah thread dalam kolam thread komputasi.

    Catatan

    Karena I/O memengaruhi komputasi keseluruhan setiap model secara berbeda, meningkatkan jumlah thread pra-ambil tidak menjamin peningkatan kecepatan pelatihan keseluruhan model.

    capacity

    Tidak

    INT

    0

    Ukuran total pra-ambil untuk membaca tabel, dalam jumlah baris. Jika num_threads lebih besar dari 1, ukuran pra-ambil untuk setiap thread adalah capacity/num_threads baris (dibulatkan ke atas). Jika capacity adalah 0, reader bawaan secara otomatis mengonfigurasi ukuran total pra-ambil berdasarkan ukuran rata-rata N baris pertama (N=256 secara bawaan) dari tabel. Hal ini membuat data yang dipra-ambil untuk setiap thread sekitar 64 MB.

    Catatan

    Jika suatu bidang dalam tabel MaxCompute bertipe DOUBLE, Anda harus menggunakan format `np.float64` di TensorFlow sebagai tipe yang sesuai.

  • Nilai kembali

    Mengembalikan objek Dataset yang dapat digunakan sebagai input untuk membangun pipa data.

Contoh

Asumsikan sebuah tabel bernama `test` disimpan dalam proyek bernama `myproject`. Tabel berikut menunjukkan contoh datanya.

itemid (BIGINT)

name (STRING)

price (DOUBLE)

virtual (BOOL)

25

"Apple"

5.0

False

38

"Pear"

4.5

False

17

"Watermelon"

2.2

False

Kode berikut menunjukkan cara menggunakan antarmuka TableRecordDataset untuk membaca kolom `itemid` dan `price` dari tabel `test`.

import os
import tensorflow as tf
import paiio

# Tentukan path file konfigurasi. Ganti ini dengan path aktual file.
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Definisikan tabel yang akan dibaca. Anda dapat menentukan beberapa tabel. Ganti nama tabel dan nama proyek MaxCompute yang sesuai dengan milik Anda.
table = ["odps://${your_projectname}/tables/${table_name}"]
# Definisikan TableRecordDataset untuk membaca kolom itemid dan price dari tabel.
dataset = paiio.data.TableRecordDataset(table,
                                       record_defaults=[0, 0.0],
                                       selected_cols="itemid,price",
                                       num_threads=1,
                                       capacity=10)
# Atur epoch menjadi 2, ukuran batch menjadi 3, dan prefetch menjadi 100 batch.
dataset = dataset.repeat(2).batch(3).prefetch(100)

ids, prices = tf.compat.v1.data.make_one_shot_iterator(dataset).get_next()

with tf.compat.v1.Session() as sess:
    sess.run(tf.compat.v1.global_variables_initializer())
    sess.run(tf.compat.v1.local_variables_initializer())
    try:
        while True:
            batch_ids, batch_prices = sess.run([ids, prices])
            print("batch_ids:", batch_ids)
            print("batch_prices:", batch_prices)
    except tf.errors.OutOfRangeError:
        print("End of dataset")

Instruksi penggunaan TableReader

Deskripsi antarmuka

TableReader diimplementasikan berdasarkan SDK MaxCompute dan tidak bergantung pada framework TensorFlow. Anda dapat menggunakannya untuk langsung mengakses tabel MaxCompute dan mendapatkan hasil I/O secara instan.

  • Buat reader dan buka tabel

    • Definisi antarmuka

    • reader = paiio.python_io.TableReader(table,
                           selected_cols="",
                          excluded_cols="",
                           slice_id=0,
                          slice_count=1):
    • Parameter

    • Parameter

      Wajib

      Tipe

      Bawaan

      Deskripsi

      table

      Ya

      STRING

      N/A

      Nama tabel MaxCompute yang akan dibuka. Formatnya adalah odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...

      selected_cols

      Tidak

      STRING

      String kosong ("")

      Kolom yang dipilih, ditentukan sebagai string nama kolom yang dipisahkan koma (,). Nilai bawaan, yaitu string kosong (""), membaca semua kolom. Parameter ini tidak dapat digunakan bersamaan dengan excluded_cols.

      excluded_cols

      Tidak

      STRING

      String kosong ("")

      Kolom yang dikecualikan, ditentukan sebagai string nama kolom yang dipisahkan koma (,). Nilai bawaan, yaitu string kosong (""), membaca semua kolom. Parameter ini tidak dapat digunakan bersamaan dengan selected_cols.

      slice_id

      Tidak

      INT

      0

      Dalam skenario pembacaan terdistribusi, ini adalah ID shard saat ini. Nilainya harus berada dalam rentang [0, slice_count-1]. Untuk pembacaan terdistribusi, sistem membagi tabel menjadi beberapa shard berdasarkan slice_count dan membaca shard yang sesuai dengan slice_id. Nilai bawaan adalah 0, yang berarti tidak ada sharding dan semua baris tabel dibaca.

      slice_count

      Tidak

      INT

      1

      Dalam skenario pembacaan terdistribusi, ini adalah jumlah total shard, yang biasanya merupakan jumlah worker.

    • Nilai kembali

      Objek reader

  • Baca catatan

    • Definisi antarmuka

    • reader.read(num_records=1)
    • Parameter

      num_records menentukan jumlah baris yang dibaca secara berurutan. Nilai bawaan adalah 1, yang berarti satu baris dibaca. Jika num_records melebihi jumlah baris yang belum dibaca, semua baris tersisa dikembalikan. Jika tidak ada catatan yang dapat dibaca, exception `OutOfRangeException` (paiio.python_io.OutOfRangeException) dilemparkan.

    • Nilai kembali

      Mengembalikan ndarray NumPy, juga dikenal sebagai recarray. Setiap elemen dalam array merupakan tuple yang merepresentasikan satu baris data dari tabel.

  • Lompat ke baris tertentu

    • Definisi antarmuka

    • reader.seek(offset=0)
    • Parameter

    • offset menentukan indeks berbasis nol dari baris yang akan dilompati. Operasi baca berikutnya dimulai dari baris ini. Jika slice_id dan slice_count dikonfigurasi, operasi seek relatif terhadap awal shard. Jika offset melebihi jumlah total baris dalam tabel, exception `OutOfRange` dilemparkan. Jika posisi baca sudah melewati akhir tabel, operasi seek lainnya akan melemparkan exception `OutOfRangeException` (paiio.python_io.OutOfRangeException).

      Penting

      Saat Anda membaca sejumlah data sekaligus, jika jumlah baris tersisa kurang dari ukuran batch, operasi baca hanya mengembalikan baris tersisa dan tidak melemparkan exception. Namun, jika Anda kemudian melakukan operasi seek, exception akan dilemparkan.

    • Nilai kembali

      None. Jika terjadi error selama operasi, exception dilemparkan.

  • Ambil jumlah total catatan dalam tabel

    • Definisi antarmuka

    • reader.get_row_count()
    • Parameter

      None

    • Nilai kembali

      Mengembalikan jumlah baris dalam tabel. Jika slice_id dan slice_count dikonfigurasi, ukuran shard dikembalikan sebagai gantinya.

  • Ambil skema tabel

    • Definisi antarmuka

    • reader.get_schema()
    • Parameter

      None

    • Nilai kembali

    • Mengembalikan ndarray terstruktur satu dimensi. Setiap elemen sesuai dengan skema kolom yang dipilih dalam tabel MaxCompute dan mencakup tiga bidang berikut.

      Parameter

      Deskripsi

      colname

      Nama kolom.

      typestr

      Nama tipe data MaxCompute.

      pytype

      Tipe data Python yang sesuai dengan typestr.

      Tabel berikut menjelaskan pemetaan antara typestr dan pytype.

      typestr

      pytype

      BIGINT

      INT

      DOUBLE

      FLOAT

      BOOLEAN

      BOOL

      STRING

      OBJECT

      DATETIME

      INT

      MAP

      Catatan

      PAI-TensorFlow tidak mendukung operasi pada tipe data MAP.

      OBJECT

  • Tutup tabel

    • Definisi antarmuka

    • reader.close()
    • Parameter

      None

    • Nilai kembali

      None. Jika terjadi error selama operasi, exception dilemparkan.

Contoh

Asumsikan sebuah tabel bernama `test` disimpan dalam proyek bernama `myproject`. Tabel berikut menunjukkan contoh datanya.

uid (BIGINT)

name (STRING)

price (DOUBLE)

virtual (BOOL)

25

"Apple"

5.0

False

38

"Pear"

4.5

False

17

"Watermelon"

2.2

False

Kode berikut menunjukkan cara menggunakan antarmuka TableReader untuk membaca data dari kolom uid, name, dan price.

    import os
    import paiio
    
    # Tentukan path file konfigurasi. Ganti ini dengan path aktual.
    os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
    # Buka tabel dan kembalikan objek reader. Ganti nama tabel dan nama proyek MaxCompute yang sesuai dengan milik Anda.
    reader = paiio.python_io.TableReader("odps://myproject/tables/test", selected_cols="uid,name,price")
    
    # Dapatkan jumlah total baris dalam tabel.
    total_records_num = reader.get_row_count() # return 3
    
    batch_size = 2
    # Baca tabel. Nilai kembali akan berupa recarray dalam format [(uid, name, price)*2].
    records = reader.read(batch_size) # Mengembalikan [(25, "Apple", 5.0), (38, "Pear", 4.5)]
    records = reader.read(batch_size) # Mengembalikan [(17, "Watermelon", 2.2)]
    # Membaca lebih lanjut akan melemparkan exception OutOfRange.
    
    # Tutup reader.
    reader.close()

Instruksi penggunaan TableWriter

TableWriter diimplementasikan berdasarkan SDK MaxCompute dan tidak bergantung pada framework TensorFlow. Anda dapat menggunakannya untuk langsung menulis data ke tabel MaxCompute.

Deskripsi antarmuka

  • Buat writer dan buka tabel

    • Definisi antarmuka

      writer = paiio.python_io.TableWriter(table, slice_id=0)
      Catatan
      • Operasi ini menambahkan data ke tabel tanpa menghapus data yang sudah ada.

      • Anda harus menutup tabel sebelum dapat membaca data yang ditambahkan.

    • Parameter

      Parameter

      Wajib

      Tipe

      Bawaan

      Deskripsi

      table

      Ya

      STRING

      N/A

      Nama tabel MaxCompute yang akan dibuka. Formatnya adalah odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...

      slice_id

      Tidak

      INT

      0

      Dalam skenario terdistribusi, tulis data ke shard berbeda untuk menghindari konflik penulisan. Dalam skenario standalone, Anda dapat menggunakan nilai bawaan 0. Dalam skenario multi-mesin, jika beberapa worker, termasuk parameter servers (PS), menulis ke tabel menggunakan slice_id yang sama secara bersamaan, operasi penulisan gagal.

    • Nilai kembali

      Mengembalikan objek Writer.

  • Tulis catatan

    • Definisi antarmuka

      writer.write(values, indices)
    • Parameter

      Parameter

      Wajib

      Tipe

      Bawaan

      Deskripsi

      values

      Ya

      STRING

      N/A

      Data yang akan ditulis. Anda dapat menulis satu baris atau beberapa baris:

      • Untuk menulis satu baris, berikan TUPLE, LIST, atau ndarray 1D yang terdiri dari skalar ke parameter values. Jika Anda memberikan LIST atau ndarray, ini menunjukkan bahwa semua kolom yang akan ditulis memiliki tipe data yang sama.

      • Untuk menulis N baris (N>=1), Anda dapat memberikan LIST atau ndarray 1D ke parameter values. Setiap elemen dalam parameter harus sesuai dengan satu baris data, direpresentasikan sebagai TUPLE atau LIST. Data juga dapat disimpan dalam format terstruktur di dalam ndarray.

      indices

      Ya

      INT

      N/A

      Menentukan kolom tempat data ditulis. Anda dapat memberikan TUPLE, LIST, atau ndarray 1D yang terdiri dari indeks bertipe INT. Setiap angka (i) dalam indices sesuai dengan kolom ke-i dalam tabel (penomoran berbasis nol).

    • Nilai kembali

      None. Jika terjadi error selama proses penulisan, exception dilemparkan dan proses dihentikan.

  • Tutup tabel

    • Definisi antarmuka

      writer.close()
      Catatan

      Saat Anda menggunakan blok pernyataan `with`, Anda tidak perlu secara eksplisit memanggil metode `close()` untuk menutup tabel.

    • Parameter

      None

    • Nilai kembali

      None. Jika terjadi error selama operasi, exception dilemparkan.

    • Contoh

      Gunakan TableWriter dengan pernyataan `with` seperti yang ditunjukkan dalam kode berikut.

      with paiio.python_io.TableWriter(table) as writer:
        # Siapkan nilai untuk ditulis.
          writer.write(values, indices)
          # Tabel akan ditutup secara otomatis di luar bagian ini.

Contoh

import paiio
import os

# Tentukan path file konfigurasi. Ganti ini dengan path aktual.
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Siapkan data.
values = [(25, "Apple", 5.0, False),
          (38, "Pear", 4.5, False),
          (17, "Watermelon", 2.2, False)]

# Buka tabel dan kembalikan objek writer. Ganti nama tabel dan nama proyek MaxCompute yang sesuai dengan milik Anda.
writer = paiio.python_io.TableWriter("odps://project/tables/test")

# Tulis catatan ke kolom 0 hingga 3 tabel.
records = writer.write(values, indices=[0, 1, 2, 3])

# Tutup writer.
writer.close()

Langkah selanjutnya

Setelah mengonfigurasi kode, ikuti langkah-langkah berikut untuk menggunakan paiio membaca data dari dan menulis data ke tabel MaxCompute:

  1. Buat dataset dan unggah file konfigurasi serta kode Anda ke sumber data. Untuk informasi lebih lanjut, lihat Kelola Dataset.

  2. Buat tugas DLC. Pengaturan parameter utama dijelaskan sebagai berikut. Untuk informasi lebih lanjut tentang pengaturan parameter lainnya, lihat Buat tugas pelatihan.

    • Node Image: Untuk PAI Official Image, pilih citra yang sesuai dengan TensorFlow 1.12, TensorFlow 1.15, atau TensorFlow 2.0.

    • Dataset Configuration: Untuk Dataset, pilih dataset yang Anda buat pada langkah sebelumnya. Atur Mount Path ke /mnt/data/.

    • Execution Command: Atur parameter ini ke python /mnt/data/xxx.py. Ganti `xxx.py` dengan nama file kode yang Anda unggah pada langkah pertama.

  3. Klik OK.

    Setelah tugas pelatihan dikirim, Anda dapat melihat hasilnya di log instans. Untuk informasi lebih lanjut, lihat Lihat log tugas.