All Products
Search
Document Center

Platform For AI:Read and write MaxCompute tables with paiio

Last Updated:Mar 10, 2026

The Platform for AI (PAI) team developed the paiio module to simplify reading data from and writing data to MaxCompute tables in Deep Learning Containers (DLC) tasks. The paiio module supports three interfaces: TableRecordDataset, TableReader, and TableWriter. This topic describes these interfaces and provides examples of how to use them.

Limits

  • The paiio module supports TensorFlow 1.12, 1.15, and 2.0. You can use the paiio module only if you select the corresponding runtime images for these versions in your DLC tasks.

  • The paiio module does not support custom images.

Preparations: Configure account information

Before you use the paiio module to read data from or write data to MaxCompute tables, you must configure the AccessKey for your MaxCompute account. PAI reads the configuration information from a file. You must place the configuration file in a mounted file system and reference it in your code using an environment variable.

  1. Create a configuration file with the following content.

    access_id=xxxx
    access_key=xxxx
    end_point=http://xxxx

    Parameter

    Description

    access_id

    The AccessKey ID of your Alibaba Cloud account.

    access_key

    The AccessKey secret of your Alibaba Cloud account.

    end_point

    The endpoint of MaxCompute. For example, the endpoint for the China (Shanghai) region is http://service.cn-shanghai.maxcompute.aliyun.com/api. For more information, see Endpoints.

  2. Specify the path of the configuration file in your code as follows.

    os.environ['ODPS_CONFIG_FILE_PATH'] = '<your MaxCompute config file path>'

    <your MaxCompute config file path> is the path to the configuration file.

TableRecordDataset Usage

Interface description

The TensorFlow community recommends using the Dataset interface in TensorFlow 1.2 and later to build data streams. This interface replaces the original thread and queue interfaces. For more information, see Dataset. You can combine and transform multiple Dataset objects to generate data for computation. This simplifies the data entry code.

  • Interface definition (Python)

    class TableRecordDataset(Dataset):
      def __init__(self,
                   filenames,
                   record_defaults,
                   selected_cols=None,
                   excluded_cols=None,
                   slice_id=0,
                   slice_count=1,
                   num_threads=0,
                   capacity=0):
  • Parameters

    Parameter

    Required

    Type

    Default

    Description

    filenames

    Yes

    STRING

    N/A

    A list of table names to read. The schemas of all tables must be the same. The table name format is odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/....

    record_defaults

    Yes

    LIST or TUPLE

    N/A

    Used for data type conversion of the output columns and provides default values for empty columns. If the number of values does not match the number of columns read, or if a data type cannot be automatically converted, the system throws an exception during execution.

    Supported data types include FLOAT32, FLOAT64, INT32, INT64, BOOL, and STRING. For the default value of the INT64 type, see np.array(0, np.int64).

    selected_cols

    No

    STRING

    None

    The columns to select, specified as a string of column names separated by commas (,). The default value None reads all columns. This parameter cannot be used with excluded_cols.

    excluded_cols

    No

    STRING

    None

    The columns to exclude, specified as a string of column names separated by commas (,). The default value None reads all columns. This parameter cannot be used with selected_cols.

    slice_id

    No

    INT

    0

    In a distributed reading scenario, this is the ID of the current shard (zero-based numbering). For distributed reading, the system divides the table into multiple shards based on slice_count and reads the shard corresponding to slice_id.

    If slice_id is 0 (the default) and slice_count is 1, the entire table is read. If slice_count is greater than 1, the 0th shard is read.

    slice_count

    No

    INT

    1

    In a distributed reading scenario, this is the total number of shards, which is usually the number of workers. The default value is 1, which means no sharding is performed and the entire table is read.

    num_threads

    No

    INT

    0

    The number of threads enabled by the built-in reader for each table to prefetch data. These threads are independent of the computation threads. The value must be an integer from 1 to 64. If num_threads is 0, the system automatically sets the number of prefetch threads to one-fourth of the number of threads in the computation thread pool.

    Note

    Because I/O affects the overall computation of each model differently, increasing the number of prefetch threads does not guarantee an increase in the overall training speed of the model.

    capacity

    No

    INT

    0

    The total prefetch size for reading the table, in number of rows. If num_threads is greater than 1, the prefetch size for each thread is capacity/num_threads rows (rounded up). If capacity is 0, the built-in reader automatically configures the total prefetch size based on the average size of the first N rows (N=256 by default) of the table. This makes the prefetched data for each thread about 64 MB.

    Note

    If a field in a MaxCompute table is of the DOUBLE type, you must use the `np.float64` format in TensorFlow as the corresponding type.

  • Return value

    Returns a Dataset object that can be used as input for building a data pipeline.

Example

Assume that a table named `test` is stored in a project named `myproject`. The following table shows a sample of the data.

itemid (BIGINT)

name (STRING)

price (DOUBLE)

virtual (BOOL)

25

"Apple"

5.0

False

38

"Pear"

4.5

False

17

"Watermelon"

2.2

False

The following code shows how to use the TableRecordDataset interface to read the `itemid` and `price` columns from the `test` table.

import os
import tensorflow as tf
import paiio

# Specify the path of the configuration file. Replace this with the actual path of the file.
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Define the tables to read. You can specify multiple tables. Replace the table name and the corresponding MaxCompute project name with your own.
table = ["odps://${your_projectname}/tables/${table_name}"]
# Define TableRecordDataset to read the itemid and price columns of the table.
dataset = paiio.data.TableRecordDataset(table,
                                       record_defaults=[0, 0.0],
                                       selected_cols="itemid,price",
                                       num_threads=1,
                                       capacity=10)
# Set epoch to 2, batch size to 3, and prefetch to 100 batches.
dataset = dataset.repeat(2).batch(3).prefetch(100)

ids, prices = tf.compat.v1.data.make_one_shot_iterator(dataset).get_next()

with tf.compat.v1.Session() as sess:
    sess.run(tf.compat.v1.global_variables_initializer())
    sess.run(tf.compat.v1.local_variables_initializer())
    try:
        while True:
            batch_ids, batch_prices = sess.run([ids, prices])
            print("batch_ids:", batch_ids)
            print("batch_prices:", batch_prices)
    except tf.errors.OutOfRangeError:
        print("End of dataset")

TableReader usage instructions

Interface description

TableReader is implemented based on the MaxCompute SDK and does not depend on the TensorFlow framework. You can use it to directly access MaxCompute tables and obtain I/O results instantly.

  • Create a reader and open a table

    • Interface definition

    • reader = paiio.python_io.TableReader(table,
                           selected_cols="",
                          excluded_cols="",
                           slice_id=0,
                          slice_count=1):
    • Parameters

    • Parameter

      Required

      Type

      Default

      Description

      table

      Yes

      STRING

      N/A

      The name of the MaxCompute table to open. The format is odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...

      selected_cols

      No

      STRING

      Empty string ("")

      The columns to select, specified as a string of column names separated by commas (,). The default value, an empty string (""), reads all columns. This parameter cannot be used with excluded_cols.

      excluded_cols

      No

      STRING

      Empty string ("")

      The columns to exclude, specified as a string of column names separated by commas (,). The default value, an empty string (""), reads all columns. This parameter cannot be used with selected_cols.

      slice_id

      No

      INT

      0

      In a distributed reading scenario, this is the ID of the current shard. The value must be in the range of [0, slice_count-1]. For distributed reading, the system divides the table into multiple shards based on slice_count and reads the shard corresponding to slice_id. The default value is 0, which means no sharding is performed and all rows of the table are read.

      slice_count

      No

      INT

      1

      In a distributed reading scenario, this is the total number of shards, which is usually the number of workers.

    • Return value

      Reader object

  • Read records

    • Interface definition

    • reader.read(num_records=1)
    • Parameters

      num_records specifies the number of rows to read sequentially. The default value is 1, which means one row is read. If num_records exceeds the number of unread rows, all remaining rows are returned. If no records can be read, an `OutOfRangeException` (paiio.python_io.OutOfRangeException) is thrown.

    • Return value

      Returns a NumPy ndarray, also known as a recarray. Each element in the array is a tuple that represents one row of data from the table.

  • Seek to a specific row

    • Interface definition

    • reader.seek(offset=0)
    • Parameters

    • offset specifies the zero-based index of the row to seek to. The next read operation starts from this row. If slice_id and slice_count are configured, the seek operation is relative to the start of the shard. If offset exceeds the total number of rows in the table, an `OutOfRange` exception is thrown. If the read position is already past the end of the table, another seek operation will throw an `OutOfRangeException` (paiio.python_io.OutOfRangeException).

      Important

      When you read a batch of data, if the number of remaining rows is less than the batch size, the read operation returns only the remaining rows and does not throw an exception. However, if you then perform a seek operation, an exception is thrown.

    • Return value

      None. If an error occurs during the operation, an exception is thrown.

  • Retrieve the total number of records in the table

    • Interface definition

    • reader.get_row_count()
    • Parameters

      None

    • Return value

      Returns the number of rows in the table. If slice_id and slice_count are configured, the size of the shard is returned instead.

  • Retrieve the table schema

    • Interface definition

    • reader.get_schema()
    • Parameters

      None

    • Return value

    • Returns a one-dimensional structured ndarray. Each element corresponds to the schema of a selected column in the MaxCompute table and includes the following three fields.

      Parameter

      Description

      colname

      The column name.

      typestr

      The MaxCompute data type name.

      pytype

      The Python data type that corresponds to typestr.

      The following table describes the mapping between typestr and pytype.

      typestr

      pytype

      BIGINT

      INT

      DOUBLE

      FLOAT

      BOOLEAN

      BOOL

      STRING

      OBJECT

      DATETIME

      INT

      MAP

      Note

      PAI-TensorFlow does not support operations on MAP data types.

      OBJECT

  • Close the table

    • Interface definition

    • reader.close()
    • Parameters

      None

    • Return value

      None. If an error occurs during the operation, an exception is thrown.

Example

Assume that a table named `test` is stored in a project named `myproject`. The following table shows a sample of the data.

uid (BIGINT)

name (STRING)

price (DOUBLE)

virtual (BOOL)

25

"Apple"

5.0

False

38

"Pear"

4.5

False

17

"Watermelon"

2.2

False

The following code shows how to use the TableReader interface to read data from the uid, name, and price columns.

    import os
    import paiio
    
    # Specify the path of the configuration file. Replace this with the actual path.
    os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
    # Open a table and return a reader object. Replace the table name and the corresponding MaxCompute project name with your own.
    reader = paiio.python_io.TableReader("odps://myproject/tables/test", selected_cols="uid,name,price")
    
    # Get the total number of rows in the table.
    total_records_num = reader.get_row_count() # return 3
    
    batch_size = 2
    # Read the table. The return value will be a recarray in the format of [(uid, name, price)*2].
    records = reader.read(batch_size) # Returns [(25, "Apple", 5.0), (38, "Pear", 4.5)]
    records = reader.read(batch_size) # Returns [(17, "Watermelon", 2.2)]
    # Reading further will throw an OutOfRange exception.
    
    # Close the reader.
    reader.close()

TableWriter usage instructions

TableWriter is implemented based on the MaxCompute SDK and does not depend on the TensorFlow framework. You can use it to directly write data to MaxCompute tables.

Interface description

  • Create a writer and open a table

    • Interface definition

      writer = paiio.python_io.TableWriter(table, slice_id=0)
      Note
      • This operation appends data to the table without clearing existing data.

      • You must close the table before you can read the appended data.

    • Parameters

      Parameter

      Required

      Type

      Default

      Description

      table

      Yes

      STRING

      N/A

      The name of the MaxCompute table to open. The format is odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...

      slice_id

      No

      INT

      0

      In a distributed scenario, write data to different shards to avoid write conflicts. In a standalone scenario, you can use the default value 0. In a multi-machine scenario, if multiple workers, including parameter servers (PS), write to the table using the same slice_id at the same time, the write operation fails.

    • Return value

      Returns a Writer object.

  • Write records

    • Interface definition

      writer.write(values, indices)
    • Parameters

      Parameter

      Required

      Type

      Default

      Description

      values

      Yes

      STRING

      N/A

      The data to write. You can write a single row or multiple rows:

      • To write a single row, pass a TUPLE, LIST, or 1D-ndarray composed of scalars to the values parameter. If you pass a LIST or ndarray, it indicates that all columns to be written have the same data type.

      • To write N rows (N>=1), you can pass a LIST or 1D-ndarray to the values parameter. Each element in the parameter should correspond to a single row of data, represented as a TUPLE or LIST. It can also be stored in a structured format within the ndarray.

      indices

      Yes

      INT

      N/A

      Specifies the columns to write data to. You can pass a TUPLE, LIST, or 1D-ndarray composed of INT type indexes. Each number (i) in indices corresponds to the i-th column in the table (zero-based numbering).

    • Return value

      None. If an error occurs during the write process, an exception is thrown and the process exits.

  • Close the table

    • Interface definition

      writer.close()
      Note

      When you use a `with` statement block, you do not need to explicitly call the `close()` method to close the table.

    • Parameters

      None

    • Return value

      None. If an error occurs during the operation, an exception is thrown.

    • Example

      Use TableWriter with a `with` statement as shown in the following code.

      with paiio.python_io.TableWriter(table) as writer:
        # Prepare values for writing.
          writer.write(values, indices)
          # Table would be closed automatically outside this section.

Example

import paiio
import os

# Specify the path of the configuration file. Replace this with the actual path.
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Prepare the data.
values = [(25, "Apple", 5.0, False),
          (38, "Pear", 4.5, False),
          (17, "Watermelon", 2.2, False)]

# Open a table and return a writer object. Replace the table name and the corresponding MaxCompute project name with your own.
writer = paiio.python_io.TableWriter("odps://project/tables/test")

# Write records to columns 0 to 3 of the table.
records = writer.write(values, indices=[0, 1, 2, 3])

# Close the writer.
writer.close()

What to do next

After you configure the code, follow these steps to use paiio to read data from and write data to MaxCompute tables:

  1. Create a dataset and upload your configuration and code files to the data source. For more information, see Manage Datasets.

  2. Create a DLC task. The key parameter settings are described as follows. For more information about other parameter settings, see Create a training task.

    • Node Image: For PAI Official Image, select an image that corresponds to TensorFlow 1.12, TensorFlow 1.15, or TensorFlow 2.0.

    • Dataset Configuration: For Dataset, select the dataset that you created in the previous step. Set Mount Path to /mnt/data/.

    • Execution Command: Set this parameter to python /mnt/data/xxx.py. Replace `xxx.py` with the name of the code file that you uploaded in the first step.

  3. Click OK.

    After the training task is submitted, you can view the results in the instance logs. For more information, see View task logs.