The Platform for AI (PAI) team developed the paiio module to simplify reading data from and writing data to MaxCompute tables in Deep Learning Containers (DLC) tasks. The paiio module supports three interfaces: TableRecordDataset, TableReader, and TableWriter. This topic describes these interfaces and provides examples of how to use them.
Limits
-
The paiio module supports TensorFlow 1.12, 1.15, and 2.0. You can use the paiio module only if you select the corresponding runtime images for these versions in your DLC tasks.
-
The paiio module does not support custom images.
Preparations: Configure account information
Before you use the paiio module to read data from or write data to MaxCompute tables, you must configure the AccessKey for your MaxCompute account. PAI reads the configuration information from a file. You must place the configuration file in a mounted file system and reference it in your code using an environment variable.
-
Create a configuration file with the following content.
access_id=xxxx access_key=xxxx end_point=http://xxxxParameter
Description
access_id
The AccessKey ID of your Alibaba Cloud account.
access_key
The AccessKey secret of your Alibaba Cloud account.
end_point
The endpoint of MaxCompute. For example, the endpoint for the China (Shanghai) region is
http://service.cn-shanghai.maxcompute.aliyun.com/api. For more information, see Endpoints. -
Specify the path of the configuration file in your code as follows.
os.environ['ODPS_CONFIG_FILE_PATH'] = '<your MaxCompute config file path>'<your MaxCompute config file path> is the path to the configuration file.
TableRecordDataset Usage
Interface description
The TensorFlow community recommends using the Dataset interface in TensorFlow 1.2 and later to build data streams. This interface replaces the original thread and queue interfaces. For more information, see Dataset. You can combine and transform multiple Dataset objects to generate data for computation. This simplifies the data entry code.
-
Interface definition (Python)
class TableRecordDataset(Dataset): def __init__(self, filenames, record_defaults, selected_cols=None, excluded_cols=None, slice_id=0, slice_count=1, num_threads=0, capacity=0): -
Parameters
Parameter
Required
Type
Default
Description
filenames
Yes
STRING
N/A
A list of table names to read. The schemas of all tables must be the same. The table name format is
odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/....record_defaults
Yes
LIST or TUPLE
N/A
Used for data type conversion of the output columns and provides default values for empty columns. If the number of values does not match the number of columns read, or if a data type cannot be automatically converted, the system throws an exception during execution.
Supported data types include FLOAT32, FLOAT64, INT32, INT64, BOOL, and STRING. For the default value of the INT64 type, see
np.array(0, np.int64).selected_cols
No
STRING
None
The columns to select, specified as a string of column names separated by commas (,). The default value None reads all columns. This parameter cannot be used with excluded_cols.
excluded_cols
No
STRING
None
The columns to exclude, specified as a string of column names separated by commas (,). The default value None reads all columns. This parameter cannot be used with selected_cols.
slice_id
No
INT
0
In a distributed reading scenario, this is the ID of the current shard (zero-based numbering). For distributed reading, the system divides the table into multiple shards based on slice_count and reads the shard corresponding to slice_id.
If slice_id is 0 (the default) and slice_count is 1, the entire table is read. If slice_count is greater than 1, the 0th shard is read.
slice_count
No
INT
1
In a distributed reading scenario, this is the total number of shards, which is usually the number of workers. The default value is 1, which means no sharding is performed and the entire table is read.
num_threads
No
INT
0
The number of threads enabled by the built-in reader for each table to prefetch data. These threads are independent of the computation threads. The value must be an integer from 1 to 64. If num_threads is 0, the system automatically sets the number of prefetch threads to one-fourth of the number of threads in the computation thread pool.
NoteBecause I/O affects the overall computation of each model differently, increasing the number of prefetch threads does not guarantee an increase in the overall training speed of the model.
capacity
No
INT
0
The total prefetch size for reading the table, in number of rows. If num_threads is greater than 1, the prefetch size for each thread is capacity/num_threads rows (rounded up). If capacity is 0, the built-in reader automatically configures the total prefetch size based on the average size of the first N rows (N=256 by default) of the table. This makes the prefetched data for each thread about 64 MB.
NoteIf a field in a MaxCompute table is of the DOUBLE type, you must use the `np.float64` format in TensorFlow as the corresponding type.
-
Return value
Returns a Dataset object that can be used as input for building a data pipeline.
Example
Assume that a table named `test` is stored in a project named `myproject`. The following table shows a sample of the data.
|
itemid (BIGINT) |
name (STRING) |
price (DOUBLE) |
virtual (BOOL) |
|
25 |
"Apple" |
5.0 |
False |
|
38 |
"Pear" |
4.5 |
False |
|
17 |
"Watermelon" |
2.2 |
False |
The following code shows how to use the TableRecordDataset interface to read the `itemid` and `price` columns from the `test` table.
import os
import tensorflow as tf
import paiio
# Specify the path of the configuration file. Replace this with the actual path of the file.
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Define the tables to read. You can specify multiple tables. Replace the table name and the corresponding MaxCompute project name with your own.
table = ["odps://${your_projectname}/tables/${table_name}"]
# Define TableRecordDataset to read the itemid and price columns of the table.
dataset = paiio.data.TableRecordDataset(table,
record_defaults=[0, 0.0],
selected_cols="itemid,price",
num_threads=1,
capacity=10)
# Set epoch to 2, batch size to 3, and prefetch to 100 batches.
dataset = dataset.repeat(2).batch(3).prefetch(100)
ids, prices = tf.compat.v1.data.make_one_shot_iterator(dataset).get_next()
with tf.compat.v1.Session() as sess:
sess.run(tf.compat.v1.global_variables_initializer())
sess.run(tf.compat.v1.local_variables_initializer())
try:
while True:
batch_ids, batch_prices = sess.run([ids, prices])
print("batch_ids:", batch_ids)
print("batch_prices:", batch_prices)
except tf.errors.OutOfRangeError:
print("End of dataset")
TableReader usage instructions
Interface description
TableReader is implemented based on the MaxCompute SDK and does not depend on the TensorFlow framework. You can use it to directly access MaxCompute tables and obtain I/O results instantly.
-
Create a reader and open a table
-
Interface definition
reader = paiio.python_io.TableReader(table, selected_cols="", excluded_cols="", slice_id=0, slice_count=1): -
-
Parameters
-
Return value
Reader object
|
Parameter |
Required |
Type |
Default |
Description |
|
table |
Yes |
STRING |
N/A |
The name of the MaxCompute table to open. The format is |
|
selected_cols |
No |
STRING |
Empty string ("") |
The columns to select, specified as a string of column names separated by commas (,). The default value, an empty string (""), reads all columns. This parameter cannot be used with excluded_cols. |
|
excluded_cols |
No |
STRING |
Empty string ("") |
The columns to exclude, specified as a string of column names separated by commas (,). The default value, an empty string (""), reads all columns. This parameter cannot be used with selected_cols. |
|
slice_id |
No |
INT |
0 |
In a distributed reading scenario, this is the ID of the current shard. The value must be in the range of [0, slice_count-1]. For distributed reading, the system divides the table into multiple shards based on slice_count and reads the shard corresponding to slice_id. The default value is 0, which means no sharding is performed and all rows of the table are read. |
|
slice_count |
No |
INT |
1 |
In a distributed reading scenario, this is the total number of shards, which is usually the number of workers. |
Read records
-
Interface definition
reader.read(num_records=1)
Parameters
num_records specifies the number of rows to read sequentially. The default value is 1, which means one row is read. If num_records exceeds the number of unread rows, all remaining rows are returned. If no records can be read, an `OutOfRangeException` (paiio.python_io.OutOfRangeException) is thrown.
Return value
Returns a NumPy ndarray, also known as a recarray. Each element in the array is a tuple that represents one row of data from the table.
Seek to a specific row
-
Interface definition
reader.seek(offset=0)
Parameters
offset specifies the zero-based index of the row to seek to. The next read operation starts from this row. If slice_id and slice_count are configured, the seek operation is relative to the start of the shard. If offset exceeds the total number of rows in the table, an `OutOfRange` exception is thrown. If the read position is already past the end of the table, another seek operation will throw an `OutOfRangeException` (paiio.python_io.OutOfRangeException).
When you read a batch of data, if the number of remaining rows is less than the batch size, the read operation returns only the remaining rows and does not throw an exception. However, if you then perform a seek operation, an exception is thrown.
Return value
None. If an error occurs during the operation, an exception is thrown.
Retrieve the total number of records in the table
-
Interface definition
reader.get_row_count()
Parameters
None
Return value
Returns the number of rows in the table. If slice_id and slice_count are configured, the size of the shard is returned instead.
Retrieve the table schema
-
Interface definition
reader.get_schema()
Parameters
None
Return value
Returns a one-dimensional structured ndarray. Each element corresponds to the schema of a selected column in the MaxCompute table and includes the following three fields.
|
Parameter |
Description |
|
colname |
The column name. |
|
typestr |
The MaxCompute data type name. |
|
pytype |
The Python data type that corresponds to typestr. |
The following table describes the mapping between typestr and pytype.
|
typestr |
pytype |
|
BIGINT |
INT |
|
DOUBLE |
FLOAT |
|
BOOLEAN |
BOOL |
|
STRING |
OBJECT |
|
DATETIME |
INT |
|
MAP Note
PAI-TensorFlow does not support operations on MAP data types. |
OBJECT |
Close the table
-
Interface definition
reader.close()
Parameters
None
Return value
None. If an error occurs during the operation, an exception is thrown.
Example
Assume that a table named `test` is stored in a project named `myproject`. The following table shows a sample of the data.
|
uid (BIGINT) |
name (STRING) |
price (DOUBLE) |
virtual (BOOL) |
|
25 |
"Apple" |
5.0 |
False |
|
38 |
"Pear" |
4.5 |
False |
|
17 |
"Watermelon" |
2.2 |
False |
The following code shows how to use the TableReader interface to read data from the uid, name, and price columns.
import os
import paiio
# Specify the path of the configuration file. Replace this with the actual path.
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Open a table and return a reader object. Replace the table name and the corresponding MaxCompute project name with your own.
reader = paiio.python_io.TableReader("odps://myproject/tables/test", selected_cols="uid,name,price")
# Get the total number of rows in the table.
total_records_num = reader.get_row_count() # return 3
batch_size = 2
# Read the table. The return value will be a recarray in the format of [(uid, name, price)*2].
records = reader.read(batch_size) # Returns [(25, "Apple", 5.0), (38, "Pear", 4.5)]
records = reader.read(batch_size) # Returns [(17, "Watermelon", 2.2)]
# Reading further will throw an OutOfRange exception.
# Close the reader.
reader.close()
TableWriter usage instructions
TableWriter is implemented based on the MaxCompute SDK and does not depend on the TensorFlow framework. You can use it to directly write data to MaxCompute tables.
Interface description
-
Create a writer and open a table
-
Interface definition
writer = paiio.python_io.TableWriter(table, slice_id=0)Note-
This operation appends data to the table without clearing existing data.
-
You must close the table before you can read the appended data.
-
-
Parameters
Parameter
Required
Type
Default
Description
table
Yes
STRING
N/A
The name of the MaxCompute table to open. The format is
odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...slice_id
No
INT
0
In a distributed scenario, write data to different shards to avoid write conflicts. In a standalone scenario, you can use the default value 0. In a multi-machine scenario, if multiple workers, including parameter servers (PS), write to the table using the same slice_id at the same time, the write operation fails.
-
Return value
Returns a Writer object.
-
-
Write records
-
Interface definition
writer.write(values, indices) -
Parameters
Parameter
Required
Type
Default
Description
values
Yes
STRING
N/A
The data to write. You can write a single row or multiple rows:
-
To write a single row, pass a TUPLE, LIST, or 1D-ndarray composed of scalars to the values parameter. If you pass a LIST or ndarray, it indicates that all columns to be written have the same data type.
-
To write N rows (N>=1), you can pass a LIST or 1D-ndarray to the values parameter. Each element in the parameter should correspond to a single row of data, represented as a TUPLE or LIST. It can also be stored in a structured format within the ndarray.
indices
Yes
INT
N/A
Specifies the columns to write data to. You can pass a TUPLE, LIST, or 1D-ndarray composed of INT type indexes. Each number (i) in indices corresponds to the i-th column in the table (zero-based numbering).
-
-
Return value
None. If an error occurs during the write process, an exception is thrown and the process exits.
-
-
Close the table
-
Interface definition
writer.close()NoteWhen you use a `with` statement block, you do not need to explicitly call the `close()` method to close the table.
-
Parameters
None
-
Return value
None. If an error occurs during the operation, an exception is thrown.
-
Example
Use TableWriter with a `with` statement as shown in the following code.
with paiio.python_io.TableWriter(table) as writer: # Prepare values for writing. writer.write(values, indices) # Table would be closed automatically outside this section.
-
Example
import paiio
import os
# Specify the path of the configuration file. Replace this with the actual path.
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Prepare the data.
values = [(25, "Apple", 5.0, False),
(38, "Pear", 4.5, False),
(17, "Watermelon", 2.2, False)]
# Open a table and return a writer object. Replace the table name and the corresponding MaxCompute project name with your own.
writer = paiio.python_io.TableWriter("odps://project/tables/test")
# Write records to columns 0 to 3 of the table.
records = writer.write(values, indices=[0, 1, 2, 3])
# Close the writer.
writer.close()
What to do next
After you configure the code, follow these steps to use paiio to read data from and write data to MaxCompute tables:
-
Create a dataset and upload your configuration and code files to the data source. For more information, see Manage Datasets.
-
Create a DLC task. The key parameter settings are described as follows. For more information about other parameter settings, see Create a training task.
-
Node Image: For PAI Official Image, select an image that corresponds to TensorFlow 1.12, TensorFlow 1.15, or TensorFlow 2.0.
-
Dataset Configuration: For Dataset, select the dataset that you created in the previous step. Set Mount Path to
/mnt/data/. -
Execution Command: Set this parameter to
python /mnt/data/xxx.py. Replace `xxx.py` with the name of the code file that you uploaded in the first step.
-
-
Click OK.
After the training task is submitted, you can view the results in the instance logs. For more information, see View task logs.