Untuk mempermudah pembacaan dan penulisan data dari dan ke tabel MaxCompute dalam pekerjaan Deep Learning Containers (DLC), tim Platform for AI (PAI) mengembangkan modul PAIIO. Modul ini mendukung antarmuka TableRecordDataset, TableReader, dan TableWriter. Topik ini menjelaskan cara menggunakan antarmuka tersebut untuk membaca dan menulis data ke tabel MaxCompute serta memberikan contoh.
Batasan
PAIIO hanya tersedia untuk pekerjaan DLC yang menggunakan TensorFlow 1.12, TensorFlow 1.15, atau TensorFlow 2.0.
PAIIO tidak tersedia untuk pekerjaan berbasis citra kustom.
Konfigurasi Informasi Akun
Sebelum menggunakan PAIIO untuk membaca atau menulis data ke tabel MaxCompute, Anda harus mengonfigurasi informasi AccessKey yang digunakan untuk mengakses sumber daya MaxCompute. PAI memungkinkan Anda mendapatkan informasi AccessKey dari file konfigurasi. Untuk melakukannya, simpan file konfigurasi di sistem file dan rujuk informasi tersebut di kode Anda menggunakan variabel lingkungan.
Buat file konfigurasi dengan konten berikut:
access_id=xxxx access_key=xxxx end_point=http://xxxxParameter
Deskripsi
access_id
ID AccessKey akun Alibaba Cloud Anda.
access_key
Rahasia AccessKey akun Alibaba Cloud Anda.
end_point
Titik akhir MaxCompute. Sebagai contoh, titik akhir untuk wilayah China (Shanghai) adalah
http://service.cn-shanghai.maxcompute.aliyun.com/api. Untuk informasi lebih lanjut, lihat Titik Akhir.Gunakan sintaksis berikut untuk menentukan jalur file konfigurasi di kode:
os.environ['ODPS_CONFIG_FILE_PATH'] = '<jalur file konfigurasi MaxCompute Anda>'Ganti <jalur file konfigurasi MaxCompute Anda> dengan jalur file yang sesuai.
TableRecordDataset
Ikhtisar
TensorFlow open source merekomendasikan penggunaan TensorFlow Datasets pada TensorFlow 1.2 atau versi lebih baru untuk menggantikan antarmuka threading dan antrian asli guna membuat aliran data. Beberapa antarmuka Dataset digabungkan untuk menghasilkan data untuk komputasi, sehingga menyederhanakan kode input data.
Sintaksis (Python)
class TableRecordDataset(Dataset): def __init__(self, filenames, record_defaults, selected_cols=None, excluded_cols=None, slice_id=0, slice_count=1, num_threads=0, capacity=0):Parameter
Parameter
Diperlukan
Tipe
Nilai Default
Deskripsi
filenames
Ya
STRING
Tidak ada
Nama tabel yang ingin Anda baca. Tabel harus menggunakan skema yang sama. Format nama tabel:
odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/....record_defaults
Ya
LIST atau TUPLE
Tidak ada
Tipe data kolom yang ingin Anda baca. Jika kolom kosong, parameter ini menentukan tipe data default. Jika tipe data berbeda dari tipe data kolom yang Anda baca atau tipe data tidak dapat dikonversi secara otomatis, sistem akan melempar pengecualian.
Nilai valid: FLOAT32, FLOAT64, INT32, INT64, BOOL, dan STRING. Untuk informasi tentang nilai default tipe data INT64, gunakan sintaksis
np.array(0, np.int64)untuk kueri.selected_cols
Tidak
STRING
Tidak ada
Kolom yang ingin Anda pilih. Pisahkan beberapa kolom dengan koma (,). Jika Anda mengatur parameter ini ke nilai default None, semua kolom akan dibaca. Anda hanya dapat menentukan salah satu dari parameter selected_cols dan excluded_cols.
excluded_cols
Tidak
STRING
Tidak ada
Kolom yang ingin Anda kecualikan. Pisahkan beberapa kolom dengan koma (,). Jika Anda mengatur parameter ini ke nilai default None, semua kolom akan dibaca. Anda hanya dapat menentukan salah satu dari selected_cols dan excluded_cols.
slice_id
Tidak
INT
0
ID shard dalam mode pembacaan terdistribusi. ID shard dimulai dari 0. Dalam mode pembacaan terdistribusi, tabel dibagi menjadi beberapa shard berdasarkan nilai parameter slice_count. Sistem membaca data dari shard yang ditentukan oleh parameter slice_id.
Jika slice_id diatur ke nilai default 0 dan slice_count diatur ke 1, seluruh tabel akan dibaca. Jika slice_id diatur ke nilai default 0 dan slice_count diatur ke nilai yang lebih besar dari 1, shard ke-0 akan dibaca.
slice_count
Tidak
INT
1
Jumlah shard dalam mode pembacaan terdistribusi. Pada sebagian besar kasus, nilainya adalah jumlah worker. Jika Anda mengatur parameter ini ke nilai default 1, seluruh tabel akan dibaca tanpa sharding.
num_threads
Tidak
INT
0
Jumlah thread yang diaktifkan oleh pembaca bawaan setiap tabel untuk pra-mengambil data. Thread ini independen dari thread perhitungan. Nilai valid: 1 hingga 64. Jika num_threads diatur ke 0, sistem secara otomatis menetapkan 25% dari thread perhitungan untuk pra-mengambil data.
nullI/O memiliki dampak yang berbeda pada kinerja komputasi keseluruhan setiap model. Akibatnya, peningkatan jumlah thread yang digunakan untuk pra-mengambil data tidak selalu meningkatkan kecepatan pelatihan model keseluruhan.
capacity
Tidak
INT
0
Jumlah rekaman yang dipra-muat. Jika nilai yang ditentukan oleh num_threads lebih besar dari 1, setiap thread memuat capacity/num_threads rekaman data. Nilai parameter dibulatkan ke atas. Jika capacity diatur ke 0, pembaca bawaan mengonfigurasi ukuran total data yang dapat dipra-muat oleh thread berdasarkan nilai rata-rata dari N rekaman pertama dalam tabel. Nilai default N adalah 256. Akibatnya, ukuran data yang dipra-muat oleh setiap thread sekitar 64 MB.
nullJika tipe data bidang dalam tabel MaxCompute adalah DOUBLE, TensorFlow akan memetakan tipe data tersebut ke np.float64.
Respon
Objek Dataset dikembalikan, yang dapat digunakan sebagai input untuk membuat pipeline.
Contoh
Sebagai contoh, Anda memiliki tabel bernama test di proyek MaxCompute Anda bernama myproject. Berikut adalah sebagian isi tabel tersebut.
itemid (BIGINT) | name (STRING) | price (DOUBLE) | virtual (BOOL) |
25 | "Apple" | 5.0 | False |
38 | "Pear" | 4.5 | False |
17 | "Watermelon" | 2.2 | False |
Kode sampel berikut menunjukkan cara menggunakan antarmuka TableRecordDataset untuk membaca kolom itemid dan price dari tabel test:
import os
import tensorflow as tf
import paiio
# Tentukan jalur file konfigurasi. Ganti nilainya dengan jalur tempat file konfigurasi disimpan.
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Tentukan tabel yang ingin Anda baca. Ganti ${your_projectname} dengan nama proyek MaxCompute dan ${table_name} dengan nama tabel yang ingin Anda akses.
table = ["odps://${your_projectname}/tables/${table_name}"]
# Tentukan antarmuka TableRecordDataset untuk membaca kolom itemid dan price dari tabel.
dataset = paiio.data.TableRecordDataset(table,
record_defaults=[0, 0.0],
selected_cols="itemid,price",
num_threads=1,
capacity=10)
# Tentukan epoch 2, ukuran batch 3, dan pra-muat 100 batch.
dataset = dataset.repeat(2).batch(3).prefetch(100)
ids, prices = tf.compat.v1.data.make_one_shot_iterator(dataset).get_next()
with tf.compat.v1.Session() as sess:
sess.run(tf.compat.v1.global_variables_initializer())
sess.run(tf.compat.v1.local_variables_initializer())
try:
while True:
batch_ids, batch_prices = sess.run([ids, prices])
print("batch_ids:", batch_ids)
print("batch_prices:", batch_prices)
except tf.errors.OutOfRangeError:
print("Akhir dataset")TableReader
Ikhtisar
Anda dapat menggunakan antarmuka TableReader dalam SDK MaxCompute tanpa bergantung pada TensorFlow. Antarmuka ini memungkinkan Anda mengakses tabel MaxCompute dan mendapatkan hasil I/O secara real-time.
Membuat objek Reader dan membuka tabel
Sintaksis
reader = paiio.python_io.TableReader(table, selected_cols="", excluded_cols="", slice_id=0, slice_count=1):Parameter
Respon
Objek Reader dikembalikan.
Parameter | Diperlukan | Tipe | Nilai default | Deskripsi |
table | Ya | STRING | Tidak ada | Nama tabel MaxCompute yang ingin Anda buka. Format nama tabel: |
selected_cols | Tidak | STRING | String kosong ("") | Kolom yang ingin Anda pilih. Pisahkan beberapa kolom dengan koma (,). Nilainya harus bertipe STRING. Jika parameter ini diatur ke nilai default, semua kolom akan dibaca. Anda hanya dapat menentukan salah satu dari selected_cols dan excluded_cols. |
excluded_cols | Tidak | STRING | String kosong ("") | Kolom yang ingin Anda kecualikan. Pisahkan beberapa kolom dengan koma (,). Nilainya harus bertipe STRING. Jika parameter ini diatur ke nilai default, semua kolom akan dibaca. Anda hanya dapat menentukan salah satu dari selected_cols dan excluded_cols. |
slice_id | Tidak | INT | 0 | ID shard dalam mode pembacaan terdistribusi. Nilai valid: [0, slice_count-1]. Dalam mode pembacaan terdistribusi, tabel dibagi menjadi beberapa shard berdasarkan nilai slice_count. Sistem membaca data dari shard yang ditentukan oleh slice_id. Jika Anda mengatur parameter ini ke nilai default 0, semua catatan tabel akan dibaca. |
slice_count | Tidak | INT | 1 | Jumlah shard dalam mode pembacaan terdistribusi. Pada sebagian besar kasus, nilainya adalah jumlah worker. |
Membaca rekaman data
Sintaksis
reader.read(num_records=1)Parameter
num_records menentukan jumlah rekaman data yang dibaca secara berurutan. Nilai defaultnya adalah 1, yang berarti satu rekaman dibaca. Jika parameter num_records diatur lebih besar dari jumlah rekaman yang belum dibaca, semua rekaman yang tersisa akan dikembalikan. Jika tidak ada rekaman yang tersisa, paiio.python_io.OutOfRangeException dilempar.
Respon
Array n-dimensi NumPy (atau array rekaman) dikembalikan, di mana setiap elemennya merupakan tuple yang berisi rekaman tabel.
Mendapatkan data mulai dari rekaman tertentu
Sintaksis
reader.seek(offset=0)Parameter
offset menentukan ID rekaman data mulai dari mana Anda ingin mendapatkan data. ID rekaman dimulai dari 0. Jika Anda menentukan slice_id dan slice_count, data diperoleh berdasarkan lokasi rekaman yang ditentukan oleh offset dalam shard yang sesuai. Jika offset diatur lebih besar dari jumlah total rekaman data dalam tabel, pengecualian out-of-range dilempar. Jika operasi seek sebelumnya mengembalikan rekaman yang tidak termasuk dalam tabel dan Anda melanjutkan dengan operasi seek lainnya, paiio.python_io.OutOfRangeException dilempar.
Jika jumlah rekaman data yang belum dibaca dalam tabel kurang dari ukuran batch yang Anda tentukan untuk operasi baca, jumlah rekaman data yang tersisa dikembalikan tanpa melempar pengecualian. Namun, jika Anda melanjutkan dengan operasi seek lainnya, pengecualian akan dilempar.
Respon
Tidak ada nilai yang dikembalikan. Jika terjadi kesalahan dalam operasi, sistem melempar pengecualian.
Mendapatkan jumlah total rekaman data dalam tabel
Sintaksis
reader.get_row_count()Parameter
Tidak ada
Respon
Jumlah rekaman data dalam tabel dikembalikan. Jika Anda menentukan slice_id dan slice_count, jumlah rekaman data dalam shard dikembalikan.
Mendapatkan skema tabel
Sintaksis
reader.get_schema()Parameter
Tidak ada
Respon
Array satu dimensi dikembalikan. Setiap elemen dalam array sesuai dengan skema kolom dalam tabel. Tabel berikut menjelaskan parameter yang terkandung dalam skema.
Parameter | Deskripsi |
colname | Nama kolom. |
typestr | Nama tipe data MaxCompute. |
pytype | Tipe data Python yang sesuai dengan nilai yang ditentukan oleh typestr. |
Tabel berikut menjelaskan pemetaan antara nilai yang dapat ditentukan oleh typestr dan pytype.
typestr | pytype |
BIGINT | INT |
DOUBLE | FLOAT |
BOOLEAN | BOOL |
STRING | OBJECT |
DATETIME | INT |
MAP null Tipe data ini tidak tersedia untuk TensorFlow yang dibangun ke dalam PAI. | OBJECT |
Menutup tabel
Sintaksis
reader.close()Parameter
Tidak ada
Respon
Tidak ada nilai yang dikembalikan. Jika terjadi kesalahan dalam operasi, sistem melempar pengecualian.
Contoh
Sebagai contoh, Anda memiliki tabel bernama test di proyek MaxCompute Anda bernama myproject. Berikut adalah sebagian isi tabel tersebut.
uid (BIGINT) | name (STRING) | price (DOUBLE) | virtual (BOOL) |
25 | "Apple" | 5.0 | False |
38 | "Pear" | 4.5 | False |
17 | "Watermelon" | 2.2 | False |
Kode berikut menunjukkan cara menggunakan antarmuka TableReader untuk membaca data dari kolom uid, name, dan price.
import os
import paiio
# Tentukan jalur file konfigurasi. Ganti nilainya dengan jalur tempat file konfigurasi disimpan.
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Buka tabel dan kembalikan objek Reader. Ganti ${your_projectname} dengan nama proyek MaxCompute dan ${table_name} dengan nama tabel yang ingin Anda akses.
reader = paiio.python_io.TableReader("odps://myproject/tables/test", selected_cols="uid,name,price")
# Dapatkan jumlah total rekaman data dalam tabel.
total_records_num = reader.get_row_count() # mengembalikan 3
batch_size = 2
# Baca tabel dan kembalikan array rekaman dalam bentuk [(uid, name, price)*2].
records = reader.read(batch_size) # Mengembalikan [(25, "Apple", 5.0), (38, "Pear", 4.5)].
records = reader.read(batch_size) # Mengembalikan [(17, "Watermelon", 2.2)].
# Jika Anda terus membaca, pengecualian kehabisan memori dilempar.
# Tutup pembaca.
reader.close()TableWriter
Anda dapat menggunakan antarmuka TableWriter dalam SDK MaxCompute tanpa bergantung pada TensorFlow. Antarmuka ini memungkinkan Anda menulis data ke tabel MaxCompute.
Ikhtisar
Membuat objek Writer dan membuka tabel
Sintaksis
writer = paiio.python_io.TableWriter(table, slice_id=0)nullAntarmuka ini menulis data ke tabel tanpa membersihkan data yang sudah ada.
Data yang baru ditulis hanya dapat dibaca setelah tabel ditutup.
Parameter
Parameter
Diperlukan
Tipe
Nilai default
Deskripsi
table
Ya
STRING
Tidak ada
Nama tabel MaxCompute yang ingin Anda buka. Format nama tabel:
odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/....slice_id
Tidak
INT
0
ID shard. Dalam mode terdistribusi, data ditulis ke shard yang berbeda untuk mencegah konflik tulis. Dalam mode standalone, gunakan nilai default 0. Dalam mode terdistribusi, operasi tulis gagal jika beberapa worker, termasuk node parameter server (PS), menulis data ke shard yang sama yang ditentukan oleh slice_id.
Respon
Objek Writer dikembalikan.
Menulis rekaman data
Sintaksis
writer.write(values, indices)Parameter
Parameter
Diperlukan
Tipe
Nilai default
Deskripsi
values
Ya
STRING
Tidak ada
Rekaman data yang ingin Anda tulis. Anda dapat menulis satu atau lebih rekaman.
Untuk menulis hanya satu rekaman, atur values ke tuple, daftar, atau array satu dimensi yang terdiri dari skalar. Jika values diatur ke daftar atau array satu dimensi, semua kolom rekaman memiliki tipe data yang sama.
Untuk menulis satu atau lebih rekaman, atur values ke daftar atau array satu dimensi. Setiap elemen dalam nilai tersebut sesuai dengan rekaman yang merupakan tuple, daftar, atau array satu dimensi.
indices
Ya
INT
Tidak ada
Kolom rekaman data yang ingin Anda tulis. Nilainya bisa berupa tuple, daftar, atau array satu dimensi yang terdiri dari indeks integer. Setiap angka dalam nilai yang ditentukan oleh indices sesuai dengan kolom rekaman. Sebagai contoh, angka i sesuai dengan kolom i. Nomor kolom dimulai dari 0.
Respon
Tidak ada nilai yang dikembalikan. Jika terjadi kesalahan selama operasi tulis, sistem melempar pengecualian dan keluar dari proses saat ini.
Menutup tabel
Sintaksis
writer.close()nullDalam pernyataan WITH, Anda tidak perlu secara eksplisit memanggil metode close() untuk menutup tabel.
Parameter
Tidak ada
Respon
Tidak ada nilai yang dikembalikan. Jika terjadi kesalahan dalam operasi, sistem melempar pengecualian.
Contoh
Gunakan TableWriter dalam pernyataan WITH:
with paiio.python_io.TableWriter(table) as writer: # Persiapkan nilai untuk ditulis. writer.write(values, indices) # Tabel akan ditutup secara otomatis di luar bagian ini.
Contoh
import paiio
import os
# Tentukan jalur file konfigurasi. Ganti nilainya dengan jalur tempat file konfigurasi disimpan.
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Persiapkan data.
values = [(25, "Apple", 5.0, False),
(38, "Pear", 4.5, False),
(17, "Watermelon", 2.2, False)]
# Buka tabel dan kembalikan objek Writer. Ganti ${your_projectname} dengan nama proyek MaxCompute dan ${table_name} dengan nama tabel yang ingin Anda akses.
writer = paiio.python_io.TableWriter("odps://project/tables/test")
# Tulis data ke kolom 0 hingga 3 tabel.
records = writer.write(values, indices=[0, 1, 2, 3])
# Gunakan objek Writer untuk menutup tabel.
writer.close()Apa yang Harus Dilakukan Selanjutnya
Setelah mengonfigurasi kode, Anda dapat menggunakan PAIIO untuk membaca data dari dan menulis data ke tabel MaxCompute dengan melakukan langkah-langkah berikut:
Buat dataset dan unggah file konfigurasi serta kode yang telah Anda siapkan ke sumber data. Untuk informasi lebih lanjut tentang cara membuat dataset, lihat Buat dan Kelola Dataset.
Buat pekerjaan DLC. Bagian berikut menjelaskan parameter utama. Untuk informasi lebih lanjut tentang parameter lainnya, lihat Kirim Pekerjaan Pelatihan.
Node Image: Klik Alibaba Cloud Image dan pilih citra TensorFlow 1.12, TensorFlow 1.15, atau TensorFlow 2.0.
Datasets: Pilih dataset yang Anda buat di Langkah 1 dan atur Mount Path ke
/mnt/data/.Job Command: Atur perintah ke
python /mnt/data/xxx.py. Ganti xxx.py dengan nama file kode yang Anda unggah di Langkah 1.
Klik OK.
Setelah Anda mengirim pekerjaan pelatihan, Anda dapat melihat hasil yang sedang berjalan di log pekerjaan. Untuk informasi lebih lanjut, lihat bagian "Lihat Log Pekerjaan" dalam topik Lihat Pekerjaan Pelatihan.