To enable reading from and writing to MaxCompute tables in Deep Learning Containers (DLC) jobs, the Platform for AI (PAI) team developed the PAIIO module. PAIIO provides three types of interfaces: TableRecordDataset, TableReader, and TableWriter. This topic explains how to use these interfaces to read data from and write data to MaxCompute tables, and provides code examples.
Limitations
-
PAIIO can only be used with DLC jobs that use TensorFlow 1.12, 1.15, or 2.0 images.
-
PAIIO does not support custom images.
Configure account information
Before using the paiio module to read from or write to MaxCompute tables, you must configure the AccessKey for your MaxCompute account. PAI reads the configuration from a file. You can place this file in a mounted file system and reference it in your code using an environment variable.
-
Create a configuration file that contains the following content:
access_id=xxxx access_key=xxxx end_point=http://xxxxParameter
Description
access_id
The AccessKey ID of your Alibaba Cloud account.
access_key
The AccessKey secret of your Alibaba Cloud account.
end_point
The endpoint for MaxCompute. For example, the endpoint for the China (Shanghai) region is
http://service.cn-shanghai.maxcompute.aliyun.com/api. For more information, see Endpoints. -
In your code, specify the path to the configuration file as follows:
os.environ['ODPS_CONFIG_FILE_PATH'] = '<your MaxCompute config file path>'Replace <your MaxCompute config file path> with the actual path to your configuration file.
TableRecordDataset
API
The TensorFlow community recommends using the Dataset interface in TensorFlow 1.2 and later to build input pipelines, which replaces the legacy thread and queue interfaces. You can combine and transform multiple Dataset objects to generate data for computation, simplifying data input code.
-
Python definition
class TableRecordDataset(Dataset): def __init__(self, filenames, record_defaults, selected_cols=None, excluded_cols=None, slice_id=0, slice_count=1, num_threads=0, capacity=0): -
Parameters
Parameter
Required
Type
Default
Description
filenames
Yes
STRING
-
A list of tables to read. All tables must have the same schema. The table name must be in the format
odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/....record_defaults
Yes
LIST or TUPLE
-
A list or tuple that specifies the data type and default value for each column to be read. The method throws an exception if the number of elements does not match the number of columns to be read, or if a data type cannot be converted.
Supported data types include
FLOAT32,FLOAT64,INT32,INT64,BOOL, andSTRING. For anINT64default value, usenp.array(0, np.int64).selected_cols
No
STRING
None
A comma-separated string of column names to read. If this parameter is None, all columns are read. This parameter cannot be used with excluded_cols.
excluded_cols
No
STRING
None
A comma-separated string of column names to exclude. If this parameter is None, no columns are excluded. This parameter cannot be used with selected_cols.
slice_id
No
INT
0
For distributed reading, this parameter specifies the 0-based index of the data shard to read. The system divides the table into the number of shards specified by slice_count and reads the shard corresponding to this slice_id.
If slice_id is 0 (the default) and slice_count is 1, the entire table is read. If slice_count is greater than 1, only the first shard (index 0) is read.
slice_count
No
INT
1
For distributed reading, this parameter specifies the total number of shards to divide the data into. This value is typically set to the number of workers. The default value of 1 means the table is not sharded and the reader reads the entire table.
num_threads
No
INT
0
Specifies the number of parallel threads the reader uses to prefetch data for each table. These threads operate independently of computation threads. The value must be an integer from 1 to 64. If num_threads is set to 0, the system automatically sets the number of prefetch threads to one-fourth of the number of computation threads.
NoteIncreasing the number of prefetch threads does not guarantee faster model training, as the I/O impact varies by model.
capacity
No
INT
0
Specifies the total number of rows to prefetch from a table. If num_threads is greater than 1, the prefetch capacity for each thread is capacity/num_threads rows, rounded up. If capacity is set to 0, the built-in Reader automatically configures the total prefetch capacity based on the average size of the first N rows of the table, where N defaults to 256. This ensures that the amount of prefetched data for each thread is approximately 64 MB.
NoteIf a field in a MaxCompute table is of the DOUBLE data type, you must map it to
np.float64in TensorFlow. -
Return value
Returns a
Datasetobject that can be used to build a data pipeline.
Example
Assume you have a table named test in your myproject project with the following partial content.
|
itemid (BIGINT) |
name (STRING) |
price (DOUBLE) |
virtual (BOOL) |
|
25 |
"Apple" |
5.0 |
False |
|
38 |
"Pear" |
4.5 |
False |
|
17 |
"Watermelon" |
2.2 |
False |
The following code shows how to use the TableRecordDataset interface to read the itemid and price columns from the test table.
import os
import tensorflow as tf
import paiio
# Specify the path to the configuration file. Replace this with the actual file path.
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Define the table(s) to read. Replace with your actual project and table names.
table = ["odps://${your_projectname}/tables/${table_name}"]
# Define the TableRecordDataset to read the 'itemid' and 'price' columns.
dataset = paiio.data.TableRecordDataset(table,
record_defaults=[0, 0.0],
selected_cols="itemid,price",
num_threads=1,
capacity=10)
# Set 2 epochs, a batch size of 3, and prefetch 100 batches.
dataset = dataset.repeat(2).batch(3).prefetch(100)
ids, prices = tf.compat.v1.data.make_one_shot_iterator(dataset).get_next()
with tf.compat.v1.Session() as sess:
sess.run(tf.compat.v1.global_variables_initializer())
sess.run(tf.compat.v1.local_variables_initializer())
try:
while True:
batch_ids, batch_prices = sess.run([ids, prices])
print("batch_ids:", batch_ids)
print("batch_prices:", batch_prices)
except tf.errors.OutOfRangeError:
print("End of dataset")
TableReader
API reference
TableReader is built on the MaxCompute SDK and operates independently of the TensorFlow framework. It allows you to directly access MaxCompute tables and retrieve I/O results in real time.
-
Create a reader and open a table
-
Syntax
reader = paiio.python_io.TableReader(table, selected_cols="", excluded_cols="", slice_id=0, slice_count=1): -
-
Parameters
-
Return value
Returns a Reader object.
|
Parameter |
Required |
Type |
Default |
Description |
|
table |
Yes |
STRING |
N/A |
The name of the MaxCompute table to open. The table name must be in the format: |
|
selected_cols |
No |
STRING |
Empty string ("") |
A comma-separated string of column names to select. If an empty string ("") is provided, all columns are read. This parameter cannot be used with excluded_cols. |
|
excluded_cols |
No |
STRING |
Empty string ("") |
A comma-separated string of column names to exclude. If an empty string ("") is provided, all columns are read. This parameter cannot be used with selected_cols. |
|
slice_id |
No |
INT |
0 |
In a distributed read scenario, this parameter specifies the index of the current shard. The value can range from [0, slice_count-1]. When reading in distributed mode, the system divides the table into multiple shards based on slice_count and reads the shard specified by slice_id. The default value 0 indicates that the table is not sharded and all rows are read. |
|
slice_count |
No |
INT |
1 |
In a distributed read scenario, this parameter specifies the total number of shards, which is typically the number of workers. |
Read records
-
Syntax
reader.read(num_records=1)
Parameters
num_records specifies the number of rows to read sequentially. The default value is 1, which reads one row. If num_records exceeds the number of unread rows, all remaining rows are returned. If no records are read, a paiio.python_io.OutOfRangeException exception is thrown.
Return value
Returns a NumPy ndarray (or recarray). Each element in the array is a tuple that represents a table row.
Seek to a specific row
-
Syntax
reader.seek(offset=0)
Parameters
offset specifies the row to seek to (row indexing starts from 0). The next read operation starts from this row. If slice_id and slice_count are configured, the seek is relative to the position within the shard. If offset exceeds the total number of rows in the table, an OutOfRangeException is thrown. If the read position is already past the end of the table, attempting to seek again will also throw a paiio.python_io.OutOfRangeException.
When you read a batch, if the number of remaining rows is less than the batch_size, the read operation returns the remaining rows without throwing an exception. In this case, attempting another seek operation throws an exception.
Return value
None. An exception is thrown if an error occurs.
Get the total row count
-
Syntax
reader.get_row_count()
Parameters
None
Return value
Returns the number of rows in the table. If slice_id and slice_count are configured, it returns the size of the shard.
Get the table schema
-
Syntax
reader.get_schema()
Parameters
None
Return value
Returns a 1D structured ndarray. Each element describes a selected column from the MaxCompute table and contains the following three fields.
|
Parameter |
Description |
|
colname |
The column name. |
|
typestr |
The name of the MaxCompute data type. |
|
pytype |
The Python data type corresponding to typestr. |
The following table describes the mapping between typestr and pytype.
|
typestr |
pytype |
|
BIGINT |
INT |
|
DOUBLE |
FLOAT |
|
BOOLEAN |
BOOL |
|
STRING |
OBJECT |
|
DATETIME |
INT |
|
MAP Note
PAI-TensorFlow does not support MAP data. |
OBJECT |
Close the table
-
Syntax
reader.close()
Parameters
None
Return value
None. An exception is thrown if an error occurs.
Example
This example uses a table named test in the myproject project with the following data.
|
uid (BIGINT) |
name (STRING) |
price (DOUBLE) |
virtual (BOOL) |
|
25 |
"Apple" |
5.0 |
False |
|
38 |
"Pear" |
4.5 |
False |
|
17 |
"Watermelon" |
2.2 |
False |
The following code shows how to use TableReader to read data from the uid, name, and price columns.
import os
import paiio
# Specify the path of the configuration file. Replace the value with the actual path.
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Open a table. Replace myproject and test with your project and table names.
reader = paiio.python_io.TableReader("odps://myproject/tables/test", selected_cols="uid,name,price")
# Get the total number of rows in the table.
total_records_num = reader.get_row_count() # return 3
batch_size = 2
# Read the table. The return value is a recarray in the format [(uid, name, price)*2].
records = reader.read(batch_size) # Returns [(25, "Apple", 5.0), (38, "Pear", 4.5)]
records = reader.read(batch_size) # Returns [(17, "Watermelon", 2.2)]
# Reading again throws an OutOfRangeException.
# Close the reader.
reader.close()
TableWriter usage
TableWriter is based on the MaxCompute SDK and does not depend on the TensorFlow framework, allowing you to write data directly to MaxCompute tables.
API
-
Create a writer and open a table
-
Syntax
writer = paiio.python_io.TableWriter(table, slice_id=0)Note-
This operation appends data to a table and does not clear existing data.
-
You can read the newly written data only after the table is closed.
-
-
Parameters
Parameter
Required
Type
Default
Description
table
Yes
STRING
None
The name of the MaxCompute table to open. The name must be in the following format:
odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...slice_id
No
INT
0
The ID of the shard to write to. In distributed mode, writing to different shards prevents write conflicts. In standalone mode, you can use the default value 0. In distributed mode, the write operation will fail if multiple workers, including parameter server (PS) nodes, write to the same shard using the same slice_id.
-
Return value
Returns a Writer object.
-
-
Write records
-
Syntax
writer.write(values, indices) -
Parameters
Parameter
Required
Type
Default
Description
values
Yes
STRING
None
The data to write, specified as a single record or multiple records:
-
To write a single record, pass a TUPLE, LIST, or 1D-ndarray of scalars to the values parameter. If you pass a LIST or ndarray, all columns in the record must have the same data type.
-
To write one or more records, pass a LIST or 1D-ndarray to the values parameter. Each element must be a TUPLE, LIST, or structured ndarray element that represents a single record.
indices
Yes
INT
None
The indices of the columns to write to. This can be a TUPLE, LIST, or 1D-ndarray of integers. Each index in indices is the zero-based column number.
-
-
Return value
The number of records successfully written. If the operation fails, an exception is thrown.
-
-
Close the table
-
Syntax
writer.close()NoteYou do not need to explicitly call the close() method when you use a with statement.
-
Parameters
None
-
Return value
None. If an error occurs, an exception is thrown.
-
Example
The following code shows how to use TableWriter with a with statement.
with paiio.python_io.TableWriter(table) as writer: # Prepare values for writing. writer.write(values, indices) # The writer is closed automatically when the 'with' block is exited.
-
Example
import paiio
import os
# Specify the path of the configuration file. Replace the value with the actual path.
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Prepare the data.
values = [(25, "Apple", 5.0, False),
(38, "Pear", 4.5, False),
(17, "Watermelon", 2.2, False)]
# Open a table to get a writer object. Replace the project and table names with your actual values.
writer = paiio.python_io.TableWriter("odps://project/tables/test")
# Write records to columns 0 to 3 of the table.
records = writer.write(values, indices=[0, 1, 2, 3])
# Close the writer.
writer.close()
Next steps
After configuring the code, follow these steps to use PAIIO to read from and write to MaxCompute tables:
-
Create a dataset and upload your configuration and code files to the data source. For more information, see Create and manage datasets.
-
Create a DLC job. The key parameters are described below. For other parameters, see Create a training job.
-
Node Image: Under Alibaba Cloud Images, select an image for TensorFlow 1.12, TensorFlow 1.15, or TensorFlow 2.0.
-
Dataset Configuration: For Dataset, select the dataset you created in step 1 and set Mount Path to
/mnt/data/. -
Job Command: Enter
python /mnt/data/xxx.py. Replace xxx.py with the name of the code file you uploaded in step 1.
-
-
Click Confirm.
After submitting the training job, you can view the results in the job logs. For more information, see View job logs.