Anda dapat membaca dan menulis data ke MaxCompute dalam layanan Platform for AI (PAI), termasuk Deep Learning Containers (DLC) dan Data Science Workshop (DSW), menggunakan PyODPS yang disediakan oleh MaxCompute atau PAIIO yang dikembangkan oleh tim PAI. Pilih metode pembacaan sesuai dengan skenario bisnis Anda.
Ikhtisar
PyODPS adalah SDK MaxCompute untuk Python, menyediakan antarmuka Python yang sederhana dan nyaman. Anda dapat menggunakan PyODPS untuk mengunggah dan mengunduh file, membuat tabel, serta menjalankan kueri ODPS SQL. Untuk informasi lebih lanjut, lihat PyODPS.
Modul PAIIO dikembangkan oleh tim PAI untuk memfasilitasi pembacaan dan penulisan data tabel MaxCompute dalam layanan PAI. PAIIO mendukung antarmuka berikut:
Antarmuka
Perbedaan
Deskripsi
TableRecordDataset
Bergantung pada framework TensorFlow. Kami merekomendasikan Anda menggunakan antarmuka Dataset di TensorFlow 1.2 atau yang lebih baru untuk menggantikan antarmuka threading dan queuing asli guna membuat aliran data. Untuk informasi lebih lanjut, lihat Dataset.
Baca data dari tabel MaxCompute.
TableReader
Berdasarkan MaxCompute dan tidak bergantung pada TensorFlow. Dapat langsung mengakses tabel MaxCompute dan memperoleh hasil I/O secara real-time.
Baca data dari tabel MaxCompute.
TableWriter
Berdasarkan MaxCompute dan tidak bergantung pada TensorFlow. Dapat langsung menulis data ke tabel MaxCompute dan mengembalikan hasilnya.
Tulis data ke tabel MaxCompute.
Prasyarat
Python 3.6 atau versi lebih baru telah terpasang. Disarankan untuk tidak menggunakan Python 2.7 atau versi lebih lama.
Variabel lingkungan telah dikonfigurasi. Untuk informasi lebih lanjut, lihat Konfigurasikan variabel lingkungan di Linux, macOS, dan Windows.
MaxCompute telah diaktifkan dan sebuah proyek telah dibuat. Untuk informasi lebih lanjut, lihat Aktifkan MaxCompute dan DataWorks dan Buat proyek MaxCompute.
Batasan
Modul PAIIO tidak mendukung gambar kustom. Anda hanya dapat menggunakan PAIIO ketika memilih gambar versi TensorFlow 1.12, 1.15, atau 2.0.
PyODPS
Anda dapat menggunakan PyODPS untuk membaca dan menulis data MaxCompute.
Jalankan perintah berikut untuk menginstal PyODPS:
pip install pyodpsJalankan perintah berikut untuk memeriksa apakah PyODPS telah terinstal: Jika tidak ada hasil yang dikembalikan dan tidak ada kesalahan yang dilaporkan, PyODPS telah terinstal.
python -c "from odps import ODPS"Jika versi Python yang digunakan bukan versi default, jalankan perintah berikut untuk beralih ke versi default setelah pip terinstal:
/home/tops/bin/python3.7 -m pip install setuptools>=3.0 #/home/tops/bin/python3.7 adalah direktori tempat Python terinstal.Gunakan PyODPS untuk membaca dan menulis data MaxCompute.
import numpy as np import pandas as pd import os from odps import ODPS from odps.df import DataFrame # Membuat koneksi. o = ODPS( os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='proyek-default-anda', endpoint='titik-akhir-anda', ) # Buat tabel non-partisi bernama my_new_table, yang berisi bidang dengan nama dan tipe data yang ditentukan. table = o.create_table('my_new_table', 'num bigint, id string', if_not_exists=True) # Masukkan data ke tabel non-partisi my_new_table. records = [[111, 'aaa'], [222, 'bbb'], [333, 'ccc'], [444, 'Chinese']] o.write_table(table, records) # Baca data dari tabel MaxCompute. sql = ''' SELECT * FROM your-default-project.<tabel> LIMIT 100 ; ''' query_job = o.execute_sql(sql) result = query_job.open_reader(tunnel=True) df=result.to_pandas (n_process=1) # Konfigurasikan n_process berdasarkan spesifikasi mesin. Jika nilainya lebih besar dari 1, percepatan multi-thread dapat diaktifkan.Catatan:
ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET: Atur variabel lingkungan ini ke ID AccessKey dan rahasia AccessKey akun Alibaba Cloud Anda.
nullDisarankan untuk tidak menggunakan ID AccessKey dan rahasia AccessKey secara langsung.
your-default-project dan your-end-point: Gantilah dengan nama proyek default dan titik akhir. Untuk informasi lebih lanjut tentang titik akhir di setiap wilayah, lihat Endpoints.
Untuk informasi lebih lanjut tentang cara menggunakan PyODPS untuk melakukan operasi lain pada tabel MaxCompute, lihat Tabel.
paiio
Persiapan: Konfigurasikan informasi akun
Sebelum menggunakan PAIIO untuk membaca atau menulis data ke tabel MaxCompute, konfigurasikan informasi AccessKey yang digunakan untuk mengakses sumber daya MaxCompute. PAI memungkinkan Anda memperoleh informasi AccessKey dari file konfigurasi. Untuk mencapai ini, simpan file konfigurasi di sistem file dan rujuk informasi tersebut dalam kode Anda menggunakan variabel lingkungan.
Buat file konfigurasi yang berisi konten berikut:
access_id=xxxx access_key=xxxx end_point=http://xxxxParameter
Deskripsi
access_id
ID AccessKey akun Alibaba Cloud.
access_key
Rahasia AccessKey akun Alibaba Cloud Anda.
end_point
Titik akhir MaxCompute. Sebagai contoh, titik akhir untuk wilayah China (Shanghai) adalah
http://service.cn-shanghai.maxcompute.aliyun.com/api. Untuk informasi lebih lanjut, lihat Titik Akhir.Gunakan sintaks berikut untuk menentukan jalur file konfigurasi dalam kode:
os.environ['ODPS_CONFIG_FILE_PATH'] = '<jalur file konfigurasi MaxCompute Anda>'Atur parameter <jalur file konfigurasi MaxCompute Anda> ke jalur file.
TableRecordDataset
Ikhtisar
TensorFlow open source merekomendasikan penggunaan TensorFlow Datasets di TensorFlow 1.2 atau versi lebih baru untuk menggantikan antarmuka threading dan queuing asli guna membuat aliran data. Beberapa antarmuka Dataset digabungkan untuk menghasilkan data untuk komputasi, menyederhanakan kode input data.
Sintaks (Python)
class TableRecordDataset(Dataset): def __init__(self, filenames, record_defaults, selected_cols=None, excluded_cols=None, slice_id=0, slice_count=1, num_threads=0, capacity=0):Parameter
Parameter
Diperlukan
Tipe
Nilai default
Deskripsi
filenames
Ya
STRING
None
Nama tabel yang ingin Anda baca. Tabel harus menggunakan skema yang sama. Format nama tabel:
odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/....record_defaults
Ya
LIST atau TUPLE
None
Tipe data kolom yang ingin Anda baca. Jika kolom kosong, parameter ini menunjukkan tipe data default. Jika tipe data tidak sesuai dengan tipe data kolom yang Anda baca atau tipe data tidak dapat dikonversi secara otomatis, sistem akan melempar pengecualian.
Nilai valid: FLOAT32, FLOAT64, INT32, INT64, BOOL, dan STRING. Untuk informasi tentang nilai default tipe data INT64, gunakan sintaks
np.array(0, np.int64)untuk kueri.selected_cols
Tidak
STRING
None
Kolom yang ingin Anda pilih. Pisahkan beberapa kolom dengan koma (,). Jika Anda mengatur parameter ini ke nilai default None, semua kolom akan dibaca. Anda hanya dapat menentukan salah satu dari selected_cols dan excluded_cols.
excluded_cols
Tidak
STRING
None
Kolom yang ingin Anda kecualikan. Pisahkan beberapa kolom dengan koma (,). Jika Anda mengatur parameter ini ke nilai default None, semua kolom akan dibaca. Anda hanya dapat menentukan salah satu dari selected_cols dan excluded_cols.
slice_id
Tidak
INT
0
ID shard dalam mode pembacaan terdistribusi. ID shard dimulai dari 0. Dalam mode pembacaan terdistribusi, tabel dibagi menjadi beberapa shard berdasarkan nilai parameter slice_count. Sistem membaca data dari shard yang ditentukan oleh parameter slice_id.
Jika slice_id diatur ke nilai default 0 dan slice_count diatur ke 1, seluruh tabel akan dibaca. Jika slice_id diatur ke nilai default 0 dan slice_count diatur ke nilai yang lebih besar dari 1, shard ke-0 akan dibaca.
slice_count
Tidak
INT
1
Jumlah shard dalam mode pembacaan terdistribusi. Dalam kebanyakan kasus, nilainya adalah jumlah pekerja. Jika Anda mengatur parameter ini ke nilai default 1, seluruh tabel akan dibaca tanpa sharding.
num_threads
Tidak
INT
0
Jumlah thread yang diaktifkan oleh pembaca bawaan setiap tabel untuk pra-mengambil data. Thread ini independen dari thread komputasi. Nilai valid: 1 hingga 64. Jika num_threads diatur ke 0, sistem secara otomatis menetapkan 25% dari thread komputasi untuk pra-mengambil data.
nullI/O memiliki dampak berbeda pada kinerja komputasi keseluruhan setiap model. Akibatnya, peningkatan jumlah thread yang digunakan untuk pra-mengambil data tidak selalu meningkatkan kecepatan pelatihan model secara keseluruhan.
capacity
Tidak
INT
0
Jumlah rekaman yang dipra-muat. Jika nilai yang ditentukan oleh num_threads lebih besar dari 1, setiap thread mempra-muat capacity/num_threads rekaman data. Nilai parameter dibulatkan ke atas. Jika capacity diatur ke 0, pembaca bawaan mengonfigurasi total ukuran data yang dapat dipra-muat oleh thread berdasarkan nilai rata-rata N rekaman pertama dalam tabel. Nilai default N adalah 256. Akibatnya, ukuran data yang dipra-muat oleh setiap thread sekitar 64 MB.
nullJika tipe data bidang dalam tabel MaxCompute adalah DOUBLE, TensorFlow memetakan tipe data ke np.float64.
Respon
Objek Dataset dikembalikan, yang dapat digunakan sebagai input untuk membuat pipeline.
Contoh
Sebagai contoh, Anda menyimpan tabel bernama test di proyek MaxCompute Anda bernama myproject. Tabel berikut mencantumkan sebagian isi tabel.
itemid (BIGINT) | name (STRING) | price (DOUBLE) | virtual (BOOL) |
25 | "Apple" | 5.0 | False |
38 | "Pear" | 4.5 | False |
17 | "Watermelon" | 2.2 | False |
Kode sampel berikut memberikan contoh tentang cara menggunakan antarmuka TableRecordDataset untuk membaca kolom itemid dan price dari tabel test:
import os
import tensorflow as tf
import paiio
# Tentukan jalur file konfigurasi. Ganti nilainya dengan jalur tempat file konfigurasi disimpan.
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Tentukan tabel yang ingin Anda baca. Ganti ${your_projectname} dengan nama proyek MaxCompute dan ${table_name} dengan nama tabel yang ingin Anda akses.
table = ["odps://${your_projectname}/tables/${table_name}"]
# Tentukan antarmuka TableRecordDataset untuk membaca kolom itemid dan price dari tabel.
dataset = paiio.data.TableRecordDataset(table,
record_defaults=[0, 0.0],
selected_cols="itemid,price",
num_threads=1,
capacity=10)
# Tentukan epoch 2, ukuran batch 3, dan pra-muat 100 batch.
dataset = dataset.repeat(2).batch(3).prefetch(100)
ids, prices = tf.compat.v1.data.make_one_shot_iterator(dataset).get_next()
with tf.compat.v1.Session() as sess:
sess.run(tf.compat.v1.global_variables_initializer())
sess.run(tf.compat.v1.local_variables_initializer())
try:
while True:
batch_ids, batch_prices = sess.run([ids, prices])
print("batch_ids:", batch_ids)
print("batch_prices:", batch_prices)
except tf.errors.OutOfRangeError:
print("Akhir dataset")TableReader
Ikhtisar
Anda dapat menggunakan antarmuka TableReader dalam SDK MaxCompute tanpa bergantung pada TensorFlow. Ini memungkinkan Anda mengakses tabel MaxCompute dan memperoleh hasil I/O secara real-time.
Buat objek Reader dan buka tabel
Sintaks
reader = paiio.python_io.TableReader(table, selected_cols="", excluded_cols="", slice_id=0, slice_count=1):Parameter
Respon
Objek Reader dikembalikan.
Parameter | Diperlukan | Tipe | Nilai default | Deskripsi |
table | Ya | STRING | None | Nama tabel MaxCompute yang ingin Anda buka. Format nama tabel: |
selected_cols | Tidak | STRING | String kosong ("") | Kolom yang ingin Anda pilih. Pisahkan beberapa kolom dengan koma (,). Nilainya harus bertipe STRING. Jika parameter ini diatur ke nilai default, semua kolom akan dibaca. Anda hanya dapat menentukan salah satu dari selected_cols dan excluded_cols. |
excluded_cols | Tidak | STRING | String kosong ("") | Kolom yang ingin Anda kecualikan. Pisahkan beberapa kolom dengan koma (,). Nilainya harus bertipe STRING. Jika parameter ini diatur ke nilai default, semua kolom akan dibaca. Anda hanya dapat menentukan salah satu dari selected_cols dan excluded_cols. |
slice_id | Tidak | INT | 0 | ID shard dalam mode pembacaan terdistribusi. Nilai valid: [0, slice_count-1]. Dalam mode pembacaan terdistribusi, tabel dibagi menjadi beberapa shard berdasarkan nilai slice_count. Sistem membaca data dari shard yang ditentukan oleh slice_id. Jika Anda mengatur parameter ini ke nilai default 0, semua rekaman tabel akan dibaca. |
slice_count | Tidak | INT | 1 | Jumlah shard dalam mode pembacaan terdistribusi. Dalam kebanyakan kasus, nilainya adalah jumlah pekerja. |
Baca rekaman data
Sintaks
reader.read(num_records=1)Parameter
num_records menentukan jumlah rekaman data yang dibaca secara berurutan. Nilai defaultnya adalah 1, yang menentukan bahwa satu rekaman dibaca. Jika Anda mengatur parameter num_records ke nilai yang lebih besar dari jumlah rekaman yang belum dibaca, semua rekaman yang dibaca akan dikembalikan. Jika tidak ada rekaman yang dikembalikan, PAIIO.python_io.OutOfRangeException dilempar.
Respon
Array n-dimensi numpy (atau array rekaman) dikembalikan. Setiap elemen dalam array adalah tuple yang terdiri dari rekaman tabel.
Peroleh data mulai dari rekaman data tertentu
Sintaks
reader.seek(offset=0)Parameter
offset menentukan ID rekaman data mulai dari mana Anda ingin memperoleh data. ID rekaman dimulai dari 0. Jika Anda menentukan slice_id dan slice_count, data diperoleh berdasarkan lokasi rekaman yang ditentukan oleh offset dalam shard yang sesuai. Jika offset diatur ke nilai yang lebih besar dari jumlah total rekaman data dalam tabel, pengecualian out-of-range dilempar. Jika operasi seek sebelumnya mengembalikan rekaman yang tidak termasuk dalam tabel dan Anda melanjutkan dengan operasi seek lainnya, PAIIO.python_io.OutOfRangeException dilempar.
Jika jumlah rekaman data yang belum dibaca dalam tabel kurang dari ukuran batch yang Anda tentukan untuk operasi baca, jumlah rekaman data yang belum dibaca dikembalikan dan tidak ada pengecualian yang dilempar. Jika Anda melanjutkan dengan operasi seek lainnya, pengecualian dilempar.
Respon
Tidak ada nilai yang dikembalikan. Jika terjadi kesalahan dalam operasi, sistem melempar pengecualian.
Peroleh jumlah total rekaman data dalam tabel
Sintaks
reader.get_row_count()Parameter
Tidak ada
Respon
Jumlah rekaman data dalam tabel dikembalikan. Jika Anda menentukan slice_id dan slice_count, jumlah rekaman data dalam shard dikembalikan.
Peroleh skema tabel
Sintaks
reader.get_schema()Parameter
Tidak ada
Respon
Array satu dimensi dikembalikan. Setiap elemen dalam array sesuai dengan skema kolom dalam tabel. Tabel berikut menjelaskan parameter yang terkandung dalam skema.
Parameter | Deskripsi |
colname | Nama kolom. |
typestr | Nama tipe data MaxCompute. |
pytype | Tipe data Python yang sesuai dengan nilai yang ditentukan oleh typestr. |
Tabel berikut menjelaskan pemetaan antara nilai yang dapat ditentukan oleh typestr dan pytype.
typestr | pytype |
BIGINT | INT |
DOUBLE | FLOAT |
BOOLEAN | BOOL |
STRING | OBJECT |
DATETIME | INT |
MAP null Tipe data ini tidak tersedia untuk TensorFlow yang dibangun ke dalam PAI. | OBJECT |
Tutup tabel
Sintaks
reader.close()Parameter
Tidak ada
Respon
Tidak ada nilai yang dikembalikan. Jika terjadi kesalahan dalam operasi, sistem melempar pengecualian.
Contoh
Sebagai contoh, Anda menyimpan tabel bernama test di proyek MaxCompute Anda bernama myproject. Tabel berikut mencantumkan sebagian isi tabel.
uid (BIGINT) | name (STRING) | price (DOUBLE) | virtual (BOOL) |
25 | "Apple" | 5.0 | False |
38 | "Pear" | 4.5 | False |
17 | "Watermelon" | 2.2 | False |
Kode berikut memberikan contoh tentang cara menggunakan antarmuka TableReader untuk membaca data yang terkandung dalam kolom uid, name, dan price.
import os
import paiio
# Tentukan jalur file konfigurasi. Ganti nilainya dengan jalur tempat file konfigurasi disimpan.
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Buka tabel dan kembalikan objek Reader. Ganti ${your_projectname} dengan nama proyek MaxCompute dan ${table_name} dengan nama tabel yang ingin Anda akses.
reader = paiio.python_io.TableReader("odps://myproject/tables/test", selected_cols="uid,name,price")
# Peroleh jumlah total rekaman data dalam tabel.
total_records_num = reader.get_row_count() # return 3
batch_size = 2
# Baca tabel dan kembalikan array rekaman dalam format [(uid, name, price)*2].
records = reader.read(batch_size) # Return [(25, "Apple", 5.0), (38, "Pear", 4.5)].
records = reader.read(batch_size) # Return [(17, "Watermelon", 2.2)].
# Jika Anda terus membaca, pengecualian kehabisan memori dilempar.
# Tutup reader.
reader.close()TableWriter
Anda dapat menggunakan antarmuka TableWriter dalam SDK MaxCompute tanpa bergantung pada TensorFlow. Ini memungkinkan Anda mengakses tabel MaxCompute dan memperoleh hasil I/O secara real-time.
Ikhtisar
Buat objek Writer dan buka tabel
Sintaks
writer = paiio.python_io.TableWriter(table, slice_id=0)nullAntarmuka ini menulis data ke tabel tanpa membersihkan data yang ada.
Data yang baru ditulis hanya dapat dibaca setelah tabel ditutup.
Parameter
Parameter
Diperlukan
Tipe
Nilai default
Deskripsi
table
Ya
STRING
None
Nama tabel MaxCompute yang ingin Anda buka. Format nama tabel:
odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/....slice_id
Tidak
INT
0
ID shard. Dalam mode terdistribusi, data ditulis ke shard yang berbeda untuk mencegah konflik tulis. Dalam mode standalone, gunakan nilai default 0. Dalam mode terdistribusi, operasi tulis gagal jika beberapa pekerja, termasuk node server parameter (PS), menulis data ke shard yang sama yang ditentukan oleh slice_id.
Respon
Objek Writer dikembalikan.
Tulis rekaman data
Sintaks
writer.write(values, indices)Parameter
Parameter
Diperlukan
Tipe
Nilai default
Deskripsi
values
Ya
STRING
None
Rekaman data yang ingin Anda tulis. Anda dapat menulis satu atau lebih rekaman.
Untuk menulis hanya satu rekaman, atur values ke tupel, daftar, atau array satu dimensi yang terdiri dari skalar. Jika values diatur ke daftar atau array satu dimensi, semua kolom rekaman memiliki tipe data yang sama.
Untuk menulis satu atau lebih rekaman, atur values ke daftar atau array satu dimensi. Setiap elemen dalam nilai tersebut sesuai dengan rekaman yang merupakan tupel, daftar, atau array satu dimensi.
indices
Ya
INT
None
Kolom rekaman data yang ingin Anda tulis. Nilainya bisa berupa tupel, daftar, atau array satu dimensi yang terdiri dari indeks integer. Setiap angka dalam nilai yang ditentukan oleh indices sesuai dengan kolom rekaman. Sebagai contoh, angka i sesuai dengan kolom i. Nomor kolom dimulai dari 0.
Respon
Tidak ada nilai yang dikembalikan. Jika terjadi kesalahan selama operasi tulis, sistem melempar pengecualian dan keluar dari proses saat ini.
Tutup tabel
Sintaks
writer.close()nullDalam pernyataan WITH, Anda tidak perlu secara eksplisit memanggil metode close() untuk menutup tabel.
Parameter
Tidak ada
Respon
Tidak ada nilai yang dikembalikan. Jika terjadi kesalahan dalam operasi, sistem melempar pengecualian.
Hasil sampel
Gunakan TableWriter dalam pernyataan WITH:
with paiio.python_io.TableWriter(table) as writer: # Persiapkan nilai untuk ditulis. writer.write(values, incides) # Tabel akan ditutup secara otomatis di luar bagian ini.
Contoh
import paiio
import os
# Tentukan jalur file konfigurasi. Ganti nilainya dengan jalur tempat file konfigurasi disimpan.
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Persiapkan data.
values = [(25, "Apple", 5.0, False),
(38, "Pear", 4.5, False),
(17, "Watermelon", 2.2, False)]
# Buka tabel dan kembalikan objek Writer. Ganti ${your_projectname} dengan nama proyek MaxCompute dan ${table_name} dengan nama tabel yang ingin Anda akses.
writer = paiio.python_io.TableWriter("odps://project/tables/test")
# Tulis data ke kolom 0 hingga 3 tabel.
records = writer.write(values, indices=[0, 1, 2, 3])
# Gunakan objek Writer untuk menutup tabel.
writer.close()