全部产品
Search
文档中心

Platform For AI:Gunakan PAIIO untuk Membaca Data dari dan Menulis Data ke Tabel MaxCompute

更新时间:Jun 22, 2025

Untuk mempermudah pembacaan dan penulisan data dari dan ke tabel MaxCompute dalam pekerjaan Deep Learning Containers (DLC), tim Platform for AI (PAI) mengembangkan modul PAIIO. Modul ini mendukung antarmuka TableRecordDataset, TableReader, dan TableWriter. Topik ini menjelaskan cara menggunakan antarmuka tersebut untuk membaca dan menulis data ke tabel MaxCompute serta memberikan contoh.

Batasan

  • PAIIO hanya tersedia untuk pekerjaan DLC yang menggunakan TensorFlow 1.12, TensorFlow 1.15, atau TensorFlow 2.0.

  • PAIIO tidak tersedia untuk pekerjaan berbasis citra kustom.

Konfigurasi Informasi Akun

Sebelum menggunakan PAIIO untuk membaca atau menulis data ke tabel MaxCompute, Anda harus mengonfigurasi informasi AccessKey yang digunakan untuk mengakses sumber daya MaxCompute. PAI memungkinkan Anda mendapatkan informasi AccessKey dari file konfigurasi. Untuk melakukannya, simpan file konfigurasi di sistem file dan rujuk informasi tersebut di kode Anda menggunakan variabel lingkungan.

  1. Buat file konfigurasi dengan konten berikut:

    access_id=xxxx
    access_key=xxxx
    end_point=http://xxxx

    Parameter

    Deskripsi

    access_id

    ID AccessKey akun Alibaba Cloud Anda.

    access_key

    Rahasia AccessKey akun Alibaba Cloud Anda.

    end_point

    Titik akhir MaxCompute. Sebagai contoh, titik akhir untuk wilayah China (Shanghai) adalah http://service.cn-shanghai.maxcompute.aliyun.com/api. Untuk informasi lebih lanjut, lihat Titik Akhir.

  2. Gunakan sintaksis berikut untuk menentukan jalur file konfigurasi di kode:

    os.environ['ODPS_CONFIG_FILE_PATH'] = '<jalur file konfigurasi MaxCompute Anda>'

    Ganti <jalur file konfigurasi MaxCompute Anda> dengan jalur file yang sesuai.

TableRecordDataset

Ikhtisar

TensorFlow open source merekomendasikan penggunaan TensorFlow Datasets pada TensorFlow 1.2 atau versi lebih baru untuk menggantikan antarmuka threading dan antrian asli guna membuat aliran data. Beberapa antarmuka Dataset digabungkan untuk menghasilkan data untuk komputasi, sehingga menyederhanakan kode input data.

  • Sintaksis (Python)

    class TableRecordDataset(Dataset):
      def __init__(self,
                   filenames,
                   record_defaults,
                   selected_cols=None,
                   excluded_cols=None,
                   slice_id=0,
                   slice_count=1,
                   num_threads=0,
                   capacity=0):
  • Parameter

    Parameter

    Diperlukan

    Tipe

    Nilai Default

    Deskripsi

    filenames

    Ya

    STRING

    Tidak ada

    Nama tabel yang ingin Anda baca. Tabel harus menggunakan skema yang sama. Format nama tabel: odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/....

    record_defaults

    Ya

    LIST atau TUPLE

    Tidak ada

    Tipe data kolom yang ingin Anda baca. Jika kolom kosong, parameter ini menentukan tipe data default. Jika tipe data berbeda dari tipe data kolom yang Anda baca atau tipe data tidak dapat dikonversi secara otomatis, sistem akan melempar pengecualian.

    Nilai valid: FLOAT32, FLOAT64, INT32, INT64, BOOL, dan STRING. Untuk informasi tentang nilai default tipe data INT64, gunakan sintaksis np.array(0, np.int64) untuk kueri.

    selected_cols

    Tidak

    STRING

    Tidak ada

    Kolom yang ingin Anda pilih. Pisahkan beberapa kolom dengan koma (,). Jika Anda mengatur parameter ini ke nilai default None, semua kolom akan dibaca. Anda hanya dapat menentukan salah satu dari parameter selected_cols dan excluded_cols.

    excluded_cols

    Tidak

    STRING

    Tidak ada

    Kolom yang ingin Anda kecualikan. Pisahkan beberapa kolom dengan koma (,). Jika Anda mengatur parameter ini ke nilai default None, semua kolom akan dibaca. Anda hanya dapat menentukan salah satu dari selected_cols dan excluded_cols.

    slice_id

    Tidak

    INT

    0

    ID shard dalam mode pembacaan terdistribusi. ID shard dimulai dari 0. Dalam mode pembacaan terdistribusi, tabel dibagi menjadi beberapa shard berdasarkan nilai parameter slice_count. Sistem membaca data dari shard yang ditentukan oleh parameter slice_id.

    Jika slice_id diatur ke nilai default 0 dan slice_count diatur ke 1, seluruh tabel akan dibaca. Jika slice_id diatur ke nilai default 0 dan slice_count diatur ke nilai yang lebih besar dari 1, shard ke-0 akan dibaca.

    slice_count

    Tidak

    INT

    1

    Jumlah shard dalam mode pembacaan terdistribusi. Pada sebagian besar kasus, nilainya adalah jumlah worker. Jika Anda mengatur parameter ini ke nilai default 1, seluruh tabel akan dibaca tanpa sharding.

    num_threads

    Tidak

    INT

    0

    Jumlah thread yang diaktifkan oleh pembaca bawaan setiap tabel untuk pra-mengambil data. Thread ini independen dari thread perhitungan. Nilai valid: 1 hingga 64. Jika num_threads diatur ke 0, sistem secara otomatis menetapkan 25% dari thread perhitungan untuk pra-mengambil data.

    null

    I/O memiliki dampak yang berbeda pada kinerja komputasi keseluruhan setiap model. Akibatnya, peningkatan jumlah thread yang digunakan untuk pra-mengambil data tidak selalu meningkatkan kecepatan pelatihan model keseluruhan.

    capacity

    Tidak

    INT

    0

    Jumlah rekaman yang dipra-muat. Jika nilai yang ditentukan oleh num_threads lebih besar dari 1, setiap thread memuat capacity/num_threads rekaman data. Nilai parameter dibulatkan ke atas. Jika capacity diatur ke 0, pembaca bawaan mengonfigurasi ukuran total data yang dapat dipra-muat oleh thread berdasarkan nilai rata-rata dari N rekaman pertama dalam tabel. Nilai default N adalah 256. Akibatnya, ukuran data yang dipra-muat oleh setiap thread sekitar 64 MB.

    null

    Jika tipe data bidang dalam tabel MaxCompute adalah DOUBLE, TensorFlow akan memetakan tipe data tersebut ke np.float64.

  • Respon

    Objek Dataset dikembalikan, yang dapat digunakan sebagai input untuk membuat pipeline.

Contoh

Sebagai contoh, Anda memiliki tabel bernama test di proyek MaxCompute Anda bernama myproject. Berikut adalah sebagian isi tabel tersebut.

itemid (BIGINT)

name (STRING)

price (DOUBLE)

virtual (BOOL)

25

"Apple"

5.0

False

38

"Pear"

4.5

False

17

"Watermelon"

2.2

False

Kode sampel berikut menunjukkan cara menggunakan antarmuka TableRecordDataset untuk membaca kolom itemid dan price dari tabel test:

import os
import tensorflow as tf
import paiio

# Tentukan jalur file konfigurasi. Ganti nilainya dengan jalur tempat file konfigurasi disimpan.
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Tentukan tabel yang ingin Anda baca. Ganti ${your_projectname} dengan nama proyek MaxCompute dan ${table_name} dengan nama tabel yang ingin Anda akses.
table = ["odps://${your_projectname}/tables/${table_name}"]
# Tentukan antarmuka TableRecordDataset untuk membaca kolom itemid dan price dari tabel.
dataset = paiio.data.TableRecordDataset(table,
                                       record_defaults=[0, 0.0],
                                       selected_cols="itemid,price",
                                       num_threads=1,
                                       capacity=10)
# Tentukan epoch 2, ukuran batch 3, dan pra-muat 100 batch.
dataset = dataset.repeat(2).batch(3).prefetch(100)

ids, prices = tf.compat.v1.data.make_one_shot_iterator(dataset).get_next()

with tf.compat.v1.Session() as sess:
    sess.run(tf.compat.v1.global_variables_initializer())
    sess.run(tf.compat.v1.local_variables_initializer())
    try:
        while True:
            batch_ids, batch_prices = sess.run([ids, prices])
            print("batch_ids:", batch_ids)
            print("batch_prices:", batch_prices)
    except tf.errors.OutOfRangeError:
        print("Akhir dataset")

TableReader

Ikhtisar

Anda dapat menggunakan antarmuka TableReader dalam SDK MaxCompute tanpa bergantung pada TensorFlow. Antarmuka ini memungkinkan Anda mengakses tabel MaxCompute dan mendapatkan hasil I/O secara real-time.

  • Membuat objek Reader dan membuka tabel

    • Sintaksis

    • reader = paiio.python_io.TableReader(table,
                           selected_cols="",
                          excluded_cols="",
                           slice_id=0,
                          slice_count=1):
    • Parameter

    • Parameter

      Diperlukan

      Tipe

      Nilai default

      Deskripsi

      table

      Ya

      STRING

      Tidak ada

      Nama tabel MaxCompute yang ingin Anda buka. Format nama tabel: odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...

      selected_cols

      Tidak

      STRING

      String kosong ("")

      Kolom yang ingin Anda pilih. Pisahkan beberapa kolom dengan koma (,). Nilainya harus bertipe STRING. Jika parameter ini diatur ke nilai default, semua kolom akan dibaca. Anda hanya dapat menentukan salah satu dari selected_cols dan excluded_cols.

      excluded_cols

      Tidak

      STRING

      String kosong ("")

      Kolom yang ingin Anda kecualikan. Pisahkan beberapa kolom dengan koma (,). Nilainya harus bertipe STRING. Jika parameter ini diatur ke nilai default, semua kolom akan dibaca. Anda hanya dapat menentukan salah satu dari selected_cols dan excluded_cols.

      slice_id

      Tidak

      INT

      0

      ID shard dalam mode pembacaan terdistribusi. Nilai valid: [0, slice_count-1]. Dalam mode pembacaan terdistribusi, tabel dibagi menjadi beberapa shard berdasarkan nilai slice_count. Sistem membaca data dari shard yang ditentukan oleh slice_id. Jika Anda mengatur parameter ini ke nilai default 0, semua catatan tabel akan dibaca.

      slice_count

      Tidak

      INT

      1

      Jumlah shard dalam mode pembacaan terdistribusi. Pada sebagian besar kasus, nilainya adalah jumlah worker.

    • Respon

      Objek Reader dikembalikan.

  • Membaca rekaman data

    • Sintaksis

    • reader.read(num_records=1)
    • Parameter

      num_records menentukan jumlah rekaman data yang dibaca secara berurutan. Nilai defaultnya adalah 1, yang berarti satu rekaman dibaca. Jika parameter num_records diatur lebih besar dari jumlah rekaman yang belum dibaca, semua rekaman yang tersisa akan dikembalikan. Jika tidak ada rekaman yang tersisa, paiio.python_io.OutOfRangeException dilempar.

    • Respon

      Array n-dimensi NumPy (atau array rekaman) dikembalikan, di mana setiap elemennya merupakan tuple yang berisi rekaman tabel.

  • Mendapatkan data mulai dari rekaman tertentu

    • Sintaksis

    • reader.seek(offset=0)
    • Parameter

    • offset menentukan ID rekaman data mulai dari mana Anda ingin mendapatkan data. ID rekaman dimulai dari 0. Jika Anda menentukan slice_id dan slice_count, data diperoleh berdasarkan lokasi rekaman yang ditentukan oleh offset dalam shard yang sesuai. Jika offset diatur lebih besar dari jumlah total rekaman data dalam tabel, pengecualian out-of-range dilempar. Jika operasi seek sebelumnya mengembalikan rekaman yang tidak termasuk dalam tabel dan Anda melanjutkan dengan operasi seek lainnya, paiio.python_io.OutOfRangeException dilempar.

      null

      Jika jumlah rekaman data yang belum dibaca dalam tabel kurang dari ukuran batch yang Anda tentukan untuk operasi baca, jumlah rekaman data yang tersisa dikembalikan tanpa melempar pengecualian. Namun, jika Anda melanjutkan dengan operasi seek lainnya, pengecualian akan dilempar.

    • Respon

      Tidak ada nilai yang dikembalikan. Jika terjadi kesalahan dalam operasi, sistem melempar pengecualian.

  • Mendapatkan jumlah total rekaman data dalam tabel

    • Sintaksis

    • reader.get_row_count()
    • Parameter

      Tidak ada

    • Respon

      Jumlah rekaman data dalam tabel dikembalikan. Jika Anda menentukan slice_id dan slice_count, jumlah rekaman data dalam shard dikembalikan.

  • Mendapatkan skema tabel

    • Sintaksis

    • reader.get_schema()
    • Parameter

      Tidak ada

    • Respon

    • Array satu dimensi dikembalikan. Setiap elemen dalam array sesuai dengan skema kolom dalam tabel. Tabel berikut menjelaskan parameter yang terkandung dalam skema.

      Parameter

      Deskripsi

      colname

      Nama kolom.

      typestr

      Nama tipe data MaxCompute.

      pytype

      Tipe data Python yang sesuai dengan nilai yang ditentukan oleh typestr.

      Tabel berikut menjelaskan pemetaan antara nilai yang dapat ditentukan oleh typestr dan pytype.

      typestr

      pytype

      BIGINT

      INT

      DOUBLE

      FLOAT

      BOOLEAN

      BOOL

      STRING

      OBJECT

      DATETIME

      INT

      MAP

      null

      Tipe data ini tidak tersedia untuk TensorFlow yang dibangun ke dalam PAI.

      OBJECT

  • Menutup tabel

    • Sintaksis

    • reader.close()
    • Parameter

      Tidak ada

    • Respon

      Tidak ada nilai yang dikembalikan. Jika terjadi kesalahan dalam operasi, sistem melempar pengecualian.

Contoh

Sebagai contoh, Anda memiliki tabel bernama test di proyek MaxCompute Anda bernama myproject. Berikut adalah sebagian isi tabel tersebut.

uid (BIGINT)

name (STRING)

price (DOUBLE)

virtual (BOOL)

25

"Apple"

5.0

False

38

"Pear"

4.5

False

17

"Watermelon"

2.2

False

Kode berikut menunjukkan cara menggunakan antarmuka TableReader untuk membaca data dari kolom uid, name, dan price.

    import os
    import paiio
    
    # Tentukan jalur file konfigurasi. Ganti nilainya dengan jalur tempat file konfigurasi disimpan.
    os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
    # Buka tabel dan kembalikan objek Reader. Ganti ${your_projectname} dengan nama proyek MaxCompute dan ${table_name} dengan nama tabel yang ingin Anda akses.
    reader = paiio.python_io.TableReader("odps://myproject/tables/test", selected_cols="uid,name,price")
    
    # Dapatkan jumlah total rekaman data dalam tabel.
    total_records_num = reader.get_row_count() # mengembalikan 3
    
    batch_size = 2
    # Baca tabel dan kembalikan array rekaman dalam bentuk [(uid, name, price)*2].
    records = reader.read(batch_size) # Mengembalikan [(25, "Apple", 5.0), (38, "Pear", 4.5)].
    records = reader.read(batch_size) # Mengembalikan [(17, "Watermelon", 2.2)].
    # Jika Anda terus membaca, pengecualian kehabisan memori dilempar.
    
    # Tutup pembaca.
    reader.close()

TableWriter

Anda dapat menggunakan antarmuka TableWriter dalam SDK MaxCompute tanpa bergantung pada TensorFlow. Antarmuka ini memungkinkan Anda menulis data ke tabel MaxCompute.

Ikhtisar

  • Membuat objek Writer dan membuka tabel

    • Sintaksis

      writer = paiio.python_io.TableWriter(table, slice_id=0)
      null
      • Antarmuka ini menulis data ke tabel tanpa membersihkan data yang sudah ada.

      • Data yang baru ditulis hanya dapat dibaca setelah tabel ditutup.

    • Parameter

      Parameter

      Diperlukan

      Tipe

      Nilai default

      Deskripsi

      table

      Ya

      STRING

      Tidak ada

      Nama tabel MaxCompute yang ingin Anda buka. Format nama tabel:odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/....

      slice_id

      Tidak

      INT

      0

      ID shard. Dalam mode terdistribusi, data ditulis ke shard yang berbeda untuk mencegah konflik tulis. Dalam mode standalone, gunakan nilai default 0. Dalam mode terdistribusi, operasi tulis gagal jika beberapa worker, termasuk node parameter server (PS), menulis data ke shard yang sama yang ditentukan oleh slice_id.

    • Respon

      Objek Writer dikembalikan.

  • Menulis rekaman data

    • Sintaksis

      writer.write(values, indices)
    • Parameter

      Parameter

      Diperlukan

      Tipe

      Nilai default

      Deskripsi

      values

      Ya

      STRING

      Tidak ada

      Rekaman data yang ingin Anda tulis. Anda dapat menulis satu atau lebih rekaman.

      • Untuk menulis hanya satu rekaman, atur values ke tuple, daftar, atau array satu dimensi yang terdiri dari skalar. Jika values diatur ke daftar atau array satu dimensi, semua kolom rekaman memiliki tipe data yang sama.

      • Untuk menulis satu atau lebih rekaman, atur values ke daftar atau array satu dimensi. Setiap elemen dalam nilai tersebut sesuai dengan rekaman yang merupakan tuple, daftar, atau array satu dimensi.

      indices

      Ya

      INT

      Tidak ada

      Kolom rekaman data yang ingin Anda tulis. Nilainya bisa berupa tuple, daftar, atau array satu dimensi yang terdiri dari indeks integer. Setiap angka dalam nilai yang ditentukan oleh indices sesuai dengan kolom rekaman. Sebagai contoh, angka i sesuai dengan kolom i. Nomor kolom dimulai dari 0.

    • Respon

      Tidak ada nilai yang dikembalikan. Jika terjadi kesalahan selama operasi tulis, sistem melempar pengecualian dan keluar dari proses saat ini.

  • Menutup tabel

    • Sintaksis

      writer.close()
      null

      Dalam pernyataan WITH, Anda tidak perlu secara eksplisit memanggil metode close() untuk menutup tabel.

    • Parameter

      Tidak ada

    • Respon

      Tidak ada nilai yang dikembalikan. Jika terjadi kesalahan dalam operasi, sistem melempar pengecualian.

    • Contoh

      Gunakan TableWriter dalam pernyataan WITH:

      with paiio.python_io.TableWriter(table) as writer:
        # Persiapkan nilai untuk ditulis.
          writer.write(values, indices)
          # Tabel akan ditutup secara otomatis di luar bagian ini.

Contoh

import paiio
import os

# Tentukan jalur file konfigurasi. Ganti nilainya dengan jalur tempat file konfigurasi disimpan.
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Persiapkan data.
values = [(25, "Apple", 5.0, False),
          (38, "Pear", 4.5, False),
          (17, "Watermelon", 2.2, False)]

# Buka tabel dan kembalikan objek Writer. Ganti ${your_projectname} dengan nama proyek MaxCompute dan ${table_name} dengan nama tabel yang ingin Anda akses.
writer = paiio.python_io.TableWriter("odps://project/tables/test")

# Tulis data ke kolom 0 hingga 3 tabel.
records = writer.write(values, indices=[0, 1, 2, 3])

# Gunakan objek Writer untuk menutup tabel.
writer.close()

Apa yang Harus Dilakukan Selanjutnya

Setelah mengonfigurasi kode, Anda dapat menggunakan PAIIO untuk membaca data dari dan menulis data ke tabel MaxCompute dengan melakukan langkah-langkah berikut:

  1. Buat dataset dan unggah file konfigurasi serta kode yang telah Anda siapkan ke sumber data. Untuk informasi lebih lanjut tentang cara membuat dataset, lihat Buat dan Kelola Dataset.

  2. Buat pekerjaan DLC. Bagian berikut menjelaskan parameter utama. Untuk informasi lebih lanjut tentang parameter lainnya, lihat Kirim Pekerjaan Pelatihan.

    • Node Image: Klik Alibaba Cloud Image dan pilih citra TensorFlow 1.12, TensorFlow 1.15, atau TensorFlow 2.0.

    • Datasets: Pilih dataset yang Anda buat di Langkah 1 dan atur Mount Path ke /mnt/data/.

    • Job Command: Atur perintah ke python /mnt/data/xxx.py. Ganti xxx.py dengan nama file kode yang Anda unggah di Langkah 1.

  3. Klik OK.

    Setelah Anda mengirim pekerjaan pelatihan, Anda dapat melihat hasil yang sedang berjalan di log pekerjaan. Untuk informasi lebih lanjut, lihat bagian "Lihat Log Pekerjaan" dalam topik Lihat Pekerjaan Pelatihan.