Tim Platform for AI (PAI) mengembangkan modul paiio untuk menyederhanakan pembacaan dan penulisan data ke tabel MaxCompute dalam tugas Deep Learning Containers (DLC). Modul paiio mendukung tiga antarmuka: TableRecordDataset, TableReader, dan TableWriter. Topik ini menjelaskan antarmuka tersebut beserta contoh penggunaannya.
Batasan
-
Modul paiio mendukung TensorFlow 1.12, 1.15, dan 2.0. Anda hanya dapat menggunakan modul ini jika memilih citra runtime yang sesuai untuk versi-versi tersebut dalam tugas DLC Anda.
-
Modul paiio tidak mendukung gambar kustom.
Persiapan: Konfigurasi informasi akun
Sebelum menggunakan modul paiio untuk membaca atau menulis data ke tabel MaxCompute, Anda harus mengonfigurasi AccessKey akun MaxCompute Anda. PAI membaca informasi konfigurasi dari sebuah file. Tempatkan file konfigurasi tersebut di sistem file yang dipasang (mounted) dan referensikan dalam kode Anda melalui variabel lingkungan.
-
Buat file konfigurasi dengan konten berikut.
access_id=xxxx access_key=xxxx end_point=http://xxxxParameter
Deskripsi
access_id
ID AccessKey Akun Alibaba Cloud Anda.
access_key
Rahasia AccessKey Akun Alibaba Cloud Anda.
end_point
Titik akhir MaxCompute. Misalnya, titik akhir untuk wilayah Tiongkok (Shanghai) adalah
http://service.cn-shanghai.maxcompute.aliyun.com/api. Untuk informasi lebih lanjut, lihat Endpoints. -
Tentukan path file konfigurasi dalam kode Anda sebagai berikut.
os.environ['ODPS_CONFIG_FILE_PATH'] = '<your MaxCompute config file path>'<your MaxCompute config file path> adalah path ke file konfigurasi.
Penggunaan TableRecordDataset
Deskripsi antarmuka
Komunitas TensorFlow merekomendasikan penggunaan antarmuka Dataset mulai TensorFlow 1.2 dan seterusnya untuk membangun aliran data. Antarmuka ini menggantikan antarmuka thread dan queue yang asli. Untuk informasi lebih lanjut, lihat Dataset. Anda dapat menggabungkan dan mentransformasi beberapa objek Dataset untuk menghasilkan data komputasi, sehingga menyederhanakan kode input data.
-
Definisi antarmuka (Python)
class TableRecordDataset(Dataset): def __init__(self, filenames, record_defaults, selected_cols=None, excluded_cols=None, slice_id=0, slice_count=1, num_threads=0, capacity=0): -
Parameter
Parameter
Wajib
Tipe
Bawaan
Deskripsi
filenames
Ya
STRING
N/A
Daftar nama tabel yang akan dibaca. Skema semua tabel harus sama. Format nama tabel adalah
odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/....record_defaults
Ya
LIST atau TUPLE
N/A
Digunakan untuk konversi tipe data kolom output dan menyediakan nilai bawaan untuk kolom kosong. Jika jumlah nilai tidak sesuai dengan jumlah kolom yang dibaca, atau jika tipe data tidak dapat dikonversi secara otomatis, sistem akan melemparkan exception selama eksekusi.
Tipe data yang didukung meliputi FLOAT32, FLOAT64, INT32, INT64, BOOL, dan STRING. Untuk nilai bawaan tipe INT64, lihat
np.array(0, np.int64).selected_cols
Tidak
STRING
None
Kolom yang dipilih, ditentukan sebagai string nama kolom yang dipisahkan koma (,). Nilai bawaan None membaca semua kolom. Parameter ini tidak dapat digunakan bersamaan dengan excluded_cols.
excluded_cols
Tidak
STRING
None
Kolom yang dikecualikan, ditentukan sebagai string nama kolom yang dipisahkan koma (,). Nilai bawaan None membaca semua kolom. Parameter ini tidak dapat digunakan bersamaan dengan selected_cols.
slice_id
Tidak
INT
0
Dalam skenario pembacaan terdistribusi, ini adalah ID shard saat ini (penomoran berbasis nol). Untuk pembacaan terdistribusi, sistem membagi tabel menjadi beberapa shard berdasarkan slice_count dan membaca shard yang sesuai dengan slice_id.
Jika slice_id adalah 0 (bawaan) dan slice_count adalah 1, seluruh tabel dibaca. Jika slice_count lebih besar dari 1, shard ke-0 dibaca.
slice_count
Tidak
INT
1
Dalam skenario pembacaan terdistribusi, ini adalah jumlah total shard, yang biasanya merupakan jumlah worker. Nilai bawaan adalah 1, yang berarti tidak ada sharding dan seluruh tabel dibaca.
num_threads
Tidak
INT
0
Jumlah thread yang diaktifkan oleh reader bawaan untuk setiap tabel guna melakukan pra-ambil (prefetch) data. Thread-thread ini independen dari thread komputasi. Nilainya harus bilangan bulat antara 1 hingga 64. Jika num_threads adalah 0, sistem secara otomatis mengatur jumlah thread pra-ambil menjadi seperempat dari jumlah thread dalam kolam thread komputasi.
CatatanKarena I/O memengaruhi komputasi keseluruhan setiap model secara berbeda, meningkatkan jumlah thread pra-ambil tidak menjamin peningkatan kecepatan pelatihan keseluruhan model.
capacity
Tidak
INT
0
Ukuran total pra-ambil untuk membaca tabel, dalam jumlah baris. Jika num_threads lebih besar dari 1, ukuran pra-ambil untuk setiap thread adalah capacity/num_threads baris (dibulatkan ke atas). Jika capacity adalah 0, reader bawaan secara otomatis mengonfigurasi ukuran total pra-ambil berdasarkan ukuran rata-rata N baris pertama (N=256 secara bawaan) dari tabel. Hal ini membuat data yang dipra-ambil untuk setiap thread sekitar 64 MB.
CatatanJika suatu bidang dalam tabel MaxCompute bertipe DOUBLE, Anda harus menggunakan format `np.float64` di TensorFlow sebagai tipe yang sesuai.
-
Nilai kembali
Mengembalikan objek Dataset yang dapat digunakan sebagai input untuk membangun pipa data.
Contoh
Asumsikan sebuah tabel bernama `test` disimpan dalam proyek bernama `myproject`. Tabel berikut menunjukkan contoh datanya.
|
itemid (BIGINT) |
name (STRING) |
price (DOUBLE) |
virtual (BOOL) |
|
25 |
"Apple" |
5.0 |
False |
|
38 |
"Pear" |
4.5 |
False |
|
17 |
"Watermelon" |
2.2 |
False |
Kode berikut menunjukkan cara menggunakan antarmuka TableRecordDataset untuk membaca kolom `itemid` dan `price` dari tabel `test`.
import os
import tensorflow as tf
import paiio
# Tentukan path file konfigurasi. Ganti ini dengan path aktual file.
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Definisikan tabel yang akan dibaca. Anda dapat menentukan beberapa tabel. Ganti nama tabel dan nama proyek MaxCompute yang sesuai dengan milik Anda.
table = ["odps://${your_projectname}/tables/${table_name}"]
# Definisikan TableRecordDataset untuk membaca kolom itemid dan price dari tabel.
dataset = paiio.data.TableRecordDataset(table,
record_defaults=[0, 0.0],
selected_cols="itemid,price",
num_threads=1,
capacity=10)
# Atur epoch menjadi 2, ukuran batch menjadi 3, dan prefetch menjadi 100 batch.
dataset = dataset.repeat(2).batch(3).prefetch(100)
ids, prices = tf.compat.v1.data.make_one_shot_iterator(dataset).get_next()
with tf.compat.v1.Session() as sess:
sess.run(tf.compat.v1.global_variables_initializer())
sess.run(tf.compat.v1.local_variables_initializer())
try:
while True:
batch_ids, batch_prices = sess.run([ids, prices])
print("batch_ids:", batch_ids)
print("batch_prices:", batch_prices)
except tf.errors.OutOfRangeError:
print("End of dataset")
Instruksi penggunaan TableReader
Deskripsi antarmuka
TableReader diimplementasikan berdasarkan SDK MaxCompute dan tidak bergantung pada framework TensorFlow. Anda dapat menggunakannya untuk langsung mengakses tabel MaxCompute dan mendapatkan hasil I/O secara instan.
-
Buat reader dan buka tabel
-
Definisi antarmuka
reader = paiio.python_io.TableReader(table, selected_cols="", excluded_cols="", slice_id=0, slice_count=1): -
-
Parameter
-
Nilai kembali
Objek reader
|
Parameter |
Wajib |
Tipe |
Bawaan |
Deskripsi |
|
table |
Ya |
STRING |
N/A |
Nama tabel MaxCompute yang akan dibuka. Formatnya adalah |
|
selected_cols |
Tidak |
STRING |
String kosong ("") |
Kolom yang dipilih, ditentukan sebagai string nama kolom yang dipisahkan koma (,). Nilai bawaan, yaitu string kosong (""), membaca semua kolom. Parameter ini tidak dapat digunakan bersamaan dengan excluded_cols. |
|
excluded_cols |
Tidak |
STRING |
String kosong ("") |
Kolom yang dikecualikan, ditentukan sebagai string nama kolom yang dipisahkan koma (,). Nilai bawaan, yaitu string kosong (""), membaca semua kolom. Parameter ini tidak dapat digunakan bersamaan dengan selected_cols. |
|
slice_id |
Tidak |
INT |
0 |
Dalam skenario pembacaan terdistribusi, ini adalah ID shard saat ini. Nilainya harus berada dalam rentang [0, slice_count-1]. Untuk pembacaan terdistribusi, sistem membagi tabel menjadi beberapa shard berdasarkan slice_count dan membaca shard yang sesuai dengan slice_id. Nilai bawaan adalah 0, yang berarti tidak ada sharding dan semua baris tabel dibaca. |
|
slice_count |
Tidak |
INT |
1 |
Dalam skenario pembacaan terdistribusi, ini adalah jumlah total shard, yang biasanya merupakan jumlah worker. |
Baca catatan
-
Definisi antarmuka
reader.read(num_records=1)
Parameter
num_records menentukan jumlah baris yang dibaca secara berurutan. Nilai bawaan adalah 1, yang berarti satu baris dibaca. Jika num_records melebihi jumlah baris yang belum dibaca, semua baris tersisa dikembalikan. Jika tidak ada catatan yang dapat dibaca, exception `OutOfRangeException` (paiio.python_io.OutOfRangeException) dilemparkan.
Nilai kembali
Mengembalikan ndarray NumPy, juga dikenal sebagai recarray. Setiap elemen dalam array merupakan tuple yang merepresentasikan satu baris data dari tabel.
Lompat ke baris tertentu
-
Definisi antarmuka
reader.seek(offset=0)
Parameter
offset menentukan indeks berbasis nol dari baris yang akan dilompati. Operasi baca berikutnya dimulai dari baris ini. Jika slice_id dan slice_count dikonfigurasi, operasi seek relatif terhadap awal shard. Jika offset melebihi jumlah total baris dalam tabel, exception `OutOfRange` dilemparkan. Jika posisi baca sudah melewati akhir tabel, operasi seek lainnya akan melemparkan exception `OutOfRangeException` (paiio.python_io.OutOfRangeException).
Saat Anda membaca sejumlah data sekaligus, jika jumlah baris tersisa kurang dari ukuran batch, operasi baca hanya mengembalikan baris tersisa dan tidak melemparkan exception. Namun, jika Anda kemudian melakukan operasi seek, exception akan dilemparkan.
Nilai kembali
None. Jika terjadi error selama operasi, exception dilemparkan.
Ambil jumlah total catatan dalam tabel
-
Definisi antarmuka
reader.get_row_count()
Parameter
None
Nilai kembali
Mengembalikan jumlah baris dalam tabel. Jika slice_id dan slice_count dikonfigurasi, ukuran shard dikembalikan sebagai gantinya.
Ambil skema tabel
-
Definisi antarmuka
reader.get_schema()
Parameter
None
Nilai kembali
Mengembalikan ndarray terstruktur satu dimensi. Setiap elemen sesuai dengan skema kolom yang dipilih dalam tabel MaxCompute dan mencakup tiga bidang berikut.
|
Parameter |
Deskripsi |
|
colname |
Nama kolom. |
|
typestr |
Nama tipe data MaxCompute. |
|
pytype |
Tipe data Python yang sesuai dengan typestr. |
Tabel berikut menjelaskan pemetaan antara typestr dan pytype.
|
typestr |
pytype |
|
BIGINT |
INT |
|
DOUBLE |
FLOAT |
|
BOOLEAN |
BOOL |
|
STRING |
OBJECT |
|
DATETIME |
INT |
|
MAP Catatan
PAI-TensorFlow tidak mendukung operasi pada tipe data MAP. |
OBJECT |
Tutup tabel
-
Definisi antarmuka
reader.close()
Parameter
None
Nilai kembali
None. Jika terjadi error selama operasi, exception dilemparkan.
Contoh
Asumsikan sebuah tabel bernama `test` disimpan dalam proyek bernama `myproject`. Tabel berikut menunjukkan contoh datanya.
|
uid (BIGINT) |
name (STRING) |
price (DOUBLE) |
virtual (BOOL) |
|
25 |
"Apple" |
5.0 |
False |
|
38 |
"Pear" |
4.5 |
False |
|
17 |
"Watermelon" |
2.2 |
False |
Kode berikut menunjukkan cara menggunakan antarmuka TableReader untuk membaca data dari kolom uid, name, dan price.
import os
import paiio
# Tentukan path file konfigurasi. Ganti ini dengan path aktual.
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Buka tabel dan kembalikan objek reader. Ganti nama tabel dan nama proyek MaxCompute yang sesuai dengan milik Anda.
reader = paiio.python_io.TableReader("odps://myproject/tables/test", selected_cols="uid,name,price")
# Dapatkan jumlah total baris dalam tabel.
total_records_num = reader.get_row_count() # return 3
batch_size = 2
# Baca tabel. Nilai kembali akan berupa recarray dalam format [(uid, name, price)*2].
records = reader.read(batch_size) # Mengembalikan [(25, "Apple", 5.0), (38, "Pear", 4.5)]
records = reader.read(batch_size) # Mengembalikan [(17, "Watermelon", 2.2)]
# Membaca lebih lanjut akan melemparkan exception OutOfRange.
# Tutup reader.
reader.close()
Instruksi penggunaan TableWriter
TableWriter diimplementasikan berdasarkan SDK MaxCompute dan tidak bergantung pada framework TensorFlow. Anda dapat menggunakannya untuk langsung menulis data ke tabel MaxCompute.
Deskripsi antarmuka
-
Buat writer dan buka tabel
-
Definisi antarmuka
writer = paiio.python_io.TableWriter(table, slice_id=0)Catatan-
Operasi ini menambahkan data ke tabel tanpa menghapus data yang sudah ada.
-
Anda harus menutup tabel sebelum dapat membaca data yang ditambahkan.
-
-
Parameter
Parameter
Wajib
Tipe
Bawaan
Deskripsi
table
Ya
STRING
N/A
Nama tabel MaxCompute yang akan dibuka. Formatnya adalah
odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...slice_id
Tidak
INT
0
Dalam skenario terdistribusi, tulis data ke shard berbeda untuk menghindari konflik penulisan. Dalam skenario standalone, Anda dapat menggunakan nilai bawaan 0. Dalam skenario multi-mesin, jika beberapa worker, termasuk parameter servers (PS), menulis ke tabel menggunakan slice_id yang sama secara bersamaan, operasi penulisan gagal.
-
Nilai kembali
Mengembalikan objek Writer.
-
-
Tulis catatan
-
Definisi antarmuka
writer.write(values, indices) -
Parameter
Parameter
Wajib
Tipe
Bawaan
Deskripsi
values
Ya
STRING
N/A
Data yang akan ditulis. Anda dapat menulis satu baris atau beberapa baris:
-
Untuk menulis satu baris, berikan TUPLE, LIST, atau ndarray 1D yang terdiri dari skalar ke parameter values. Jika Anda memberikan LIST atau ndarray, ini menunjukkan bahwa semua kolom yang akan ditulis memiliki tipe data yang sama.
-
Untuk menulis N baris (N>=1), Anda dapat memberikan LIST atau ndarray 1D ke parameter values. Setiap elemen dalam parameter harus sesuai dengan satu baris data, direpresentasikan sebagai TUPLE atau LIST. Data juga dapat disimpan dalam format terstruktur di dalam ndarray.
indices
Ya
INT
N/A
Menentukan kolom tempat data ditulis. Anda dapat memberikan TUPLE, LIST, atau ndarray 1D yang terdiri dari indeks bertipe INT. Setiap angka (i) dalam indices sesuai dengan kolom ke-i dalam tabel (penomoran berbasis nol).
-
-
Nilai kembali
None. Jika terjadi error selama proses penulisan, exception dilemparkan dan proses dihentikan.
-
-
Tutup tabel
-
Definisi antarmuka
writer.close()CatatanSaat Anda menggunakan blok pernyataan `with`, Anda tidak perlu secara eksplisit memanggil metode `close()` untuk menutup tabel.
-
Parameter
None
-
Nilai kembali
None. Jika terjadi error selama operasi, exception dilemparkan.
-
Contoh
Gunakan TableWriter dengan pernyataan `with` seperti yang ditunjukkan dalam kode berikut.
with paiio.python_io.TableWriter(table) as writer: # Siapkan nilai untuk ditulis. writer.write(values, indices) # Tabel akan ditutup secara otomatis di luar bagian ini.
-
Contoh
import paiio
import os
# Tentukan path file konfigurasi. Ganti ini dengan path aktual.
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Siapkan data.
values = [(25, "Apple", 5.0, False),
(38, "Pear", 4.5, False),
(17, "Watermelon", 2.2, False)]
# Buka tabel dan kembalikan objek writer. Ganti nama tabel dan nama proyek MaxCompute yang sesuai dengan milik Anda.
writer = paiio.python_io.TableWriter("odps://project/tables/test")
# Tulis catatan ke kolom 0 hingga 3 tabel.
records = writer.write(values, indices=[0, 1, 2, 3])
# Tutup writer.
writer.close()
Langkah selanjutnya
Setelah mengonfigurasi kode, ikuti langkah-langkah berikut untuk menggunakan paiio membaca data dari dan menulis data ke tabel MaxCompute:
-
Buat dataset dan unggah file konfigurasi serta kode Anda ke sumber data. Untuk informasi lebih lanjut, lihat Kelola Dataset.
-
Buat tugas DLC. Pengaturan parameter utama dijelaskan sebagai berikut. Untuk informasi lebih lanjut tentang pengaturan parameter lainnya, lihat Buat tugas pelatihan.
-
Node Image: Untuk PAI Official Image, pilih citra yang sesuai dengan TensorFlow 1.12, TensorFlow 1.15, atau TensorFlow 2.0.
-
Dataset Configuration: Untuk Dataset, pilih dataset yang Anda buat pada langkah sebelumnya. Atur Mount Path ke
/mnt/data/. -
Execution Command: Atur parameter ini ke
python /mnt/data/xxx.py. Ganti `xxx.py` dengan nama file kode yang Anda unggah pada langkah pertama.
-
-
Klik OK.
Setelah tugas pelatihan dikirim, Anda dapat melihat hasilnya di log instans. Untuk informasi lebih lanjut, lihat Lihat log tugas.