All Products
Search
Document Center

Platform For AI:Read and write MaxCompute tables with PAIIO

Last Updated:Apr 17, 2026

To enable reading from and writing to MaxCompute tables in Deep Learning Containers (DLC) jobs, the Platform for AI (PAI) team developed the PAIIO module. PAIIO provides three types of interfaces: TableRecordDataset, TableReader, and TableWriter. This topic explains how to use these interfaces to read data from and write data to MaxCompute tables, and provides code examples.

Limitations

  • PAIIO can only be used with DLC jobs that use TensorFlow 1.12, 1.15, or 2.0 images.

  • PAIIO does not support custom images.

Configure account information

Before using the paiio module to read from or write to MaxCompute tables, you must configure the AccessKey for your MaxCompute account. PAI reads the configuration from a file. You can place this file in a mounted file system and reference it in your code using an environment variable.

  1. Create a configuration file that contains the following content:

    access_id=xxxx
    access_key=xxxx
    end_point=http://xxxx

    Parameter

    Description

    access_id

    The AccessKey ID of your Alibaba Cloud account.

    access_key

    The AccessKey secret of your Alibaba Cloud account.

    end_point

    The endpoint for MaxCompute. For example, the endpoint for the China (Shanghai) region is http://service.cn-shanghai.maxcompute.aliyun.com/api. For more information, see Endpoints.

  2. In your code, specify the path to the configuration file as follows:

    os.environ['ODPS_CONFIG_FILE_PATH'] = '<your MaxCompute config file path>'

    Replace <your MaxCompute config file path> with the actual path to your configuration file.

TableRecordDataset

API

The TensorFlow community recommends using the Dataset interface in TensorFlow 1.2 and later to build input pipelines, which replaces the legacy thread and queue interfaces. You can combine and transform multiple Dataset objects to generate data for computation, simplifying data input code.

  • Python definition

    class TableRecordDataset(Dataset):
      def __init__(self,
                   filenames,
                   record_defaults,
                   selected_cols=None,
                   excluded_cols=None,
                   slice_id=0,
                   slice_count=1,
                   num_threads=0,
                   capacity=0):
  • Parameters

    Parameter

    Required

    Type

    Default

    Description

    filenames

    Yes

    STRING

    -

    A list of tables to read. All tables must have the same schema. The table name must be in the format odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/....

    record_defaults

    Yes

    LIST or TUPLE

    -

    A list or tuple that specifies the data type and default value for each column to be read. The method throws an exception if the number of elements does not match the number of columns to be read, or if a data type cannot be converted.

    Supported data types include FLOAT32, FLOAT64, INT32, INT64, BOOL, and STRING. For an INT64 default value, use np.array(0, np.int64).

    selected_cols

    No

    STRING

    None

    A comma-separated string of column names to read. If this parameter is None, all columns are read. This parameter cannot be used with excluded_cols.

    excluded_cols

    No

    STRING

    None

    A comma-separated string of column names to exclude. If this parameter is None, no columns are excluded. This parameter cannot be used with selected_cols.

    slice_id

    No

    INT

    0

    For distributed reading, this parameter specifies the 0-based index of the data shard to read. The system divides the table into the number of shards specified by slice_count and reads the shard corresponding to this slice_id.

    If slice_id is 0 (the default) and slice_count is 1, the entire table is read. If slice_count is greater than 1, only the first shard (index 0) is read.

    slice_count

    No

    INT

    1

    For distributed reading, this parameter specifies the total number of shards to divide the data into. This value is typically set to the number of workers. The default value of 1 means the table is not sharded and the reader reads the entire table.

    num_threads

    No

    INT

    0

    Specifies the number of parallel threads the reader uses to prefetch data for each table. These threads operate independently of computation threads. The value must be an integer from 1 to 64. If num_threads is set to 0, the system automatically sets the number of prefetch threads to one-fourth of the number of computation threads.

    Note

    Increasing the number of prefetch threads does not guarantee faster model training, as the I/O impact varies by model.

    capacity

    No

    INT

    0

    Specifies the total number of rows to prefetch from a table. If num_threads is greater than 1, the prefetch capacity for each thread is capacity/num_threads rows, rounded up. If capacity is set to 0, the built-in Reader automatically configures the total prefetch capacity based on the average size of the first N rows of the table, where N defaults to 256. This ensures that the amount of prefetched data for each thread is approximately 64 MB.

    Note

    If a field in a MaxCompute table is of the DOUBLE data type, you must map it to np.float64 in TensorFlow.

  • Return value

    Returns a Dataset object that can be used to build a data pipeline.

Example

Assume you have a table named test in your myproject project with the following partial content.

itemid (BIGINT)

name (STRING)

price (DOUBLE)

virtual (BOOL)

25

"Apple"

5.0

False

38

"Pear"

4.5

False

17

"Watermelon"

2.2

False

The following code shows how to use the TableRecordDataset interface to read the itemid and price columns from the test table.

import os
import tensorflow as tf
import paiio

# Specify the path to the configuration file. Replace this with the actual file path.
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Define the table(s) to read. Replace with your actual project and table names.
table = ["odps://${your_projectname}/tables/${table_name}"]
# Define the TableRecordDataset to read the 'itemid' and 'price' columns.
dataset = paiio.data.TableRecordDataset(table,
                                       record_defaults=[0, 0.0],
                                       selected_cols="itemid,price",
                                       num_threads=1,
                                       capacity=10)
# Set 2 epochs, a batch size of 3, and prefetch 100 batches.
dataset = dataset.repeat(2).batch(3).prefetch(100)

ids, prices = tf.compat.v1.data.make_one_shot_iterator(dataset).get_next()

with tf.compat.v1.Session() as sess:
    sess.run(tf.compat.v1.global_variables_initializer())
    sess.run(tf.compat.v1.local_variables_initializer())
    try:
        while True:
            batch_ids, batch_prices = sess.run([ids, prices])
            print("batch_ids:", batch_ids)
            print("batch_prices:", batch_prices)
    except tf.errors.OutOfRangeError:
        print("End of dataset")

TableReader

API reference

TableReader is built on the MaxCompute SDK and operates independently of the TensorFlow framework. It allows you to directly access MaxCompute tables and retrieve I/O results in real time.

  • Create a reader and open a table

    • Syntax

    • reader = paiio.python_io.TableReader(table,
                           selected_cols="",
                          excluded_cols="",
                           slice_id=0,
                          slice_count=1):
    • Parameters

    • Parameter

      Required

      Type

      Default

      Description

      table

      Yes

      STRING

      N/A

      The name of the MaxCompute table to open. The table name must be in the format: odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...

      selected_cols

      No

      STRING

      Empty string ("")

      A comma-separated string of column names to select. If an empty string ("") is provided, all columns are read. This parameter cannot be used with excluded_cols.

      excluded_cols

      No

      STRING

      Empty string ("")

      A comma-separated string of column names to exclude. If an empty string ("") is provided, all columns are read. This parameter cannot be used with selected_cols.

      slice_id

      No

      INT

      0

      In a distributed read scenario, this parameter specifies the index of the current shard. The value can range from [0, slice_count-1]. When reading in distributed mode, the system divides the table into multiple shards based on slice_count and reads the shard specified by slice_id. The default value 0 indicates that the table is not sharded and all rows are read.

      slice_count

      No

      INT

      1

      In a distributed read scenario, this parameter specifies the total number of shards, which is typically the number of workers.

    • Return value

      Returns a Reader object.

  • Read records

    • Syntax

    • reader.read(num_records=1)
    • Parameters

      num_records specifies the number of rows to read sequentially. The default value is 1, which reads one row. If num_records exceeds the number of unread rows, all remaining rows are returned. If no records are read, a paiio.python_io.OutOfRangeException exception is thrown.

    • Return value

      Returns a NumPy ndarray (or recarray). Each element in the array is a tuple that represents a table row.

  • Seek to a specific row

    • Syntax

    • reader.seek(offset=0)
    • Parameters

    • offset specifies the row to seek to (row indexing starts from 0). The next read operation starts from this row. If slice_id and slice_count are configured, the seek is relative to the position within the shard. If offset exceeds the total number of rows in the table, an OutOfRangeException is thrown. If the read position is already past the end of the table, attempting to seek again will also throw a paiio.python_io.OutOfRangeException.

      Important

      When you read a batch, if the number of remaining rows is less than the batch_size, the read operation returns the remaining rows without throwing an exception. In this case, attempting another seek operation throws an exception.

    • Return value

      None. An exception is thrown if an error occurs.

  • Get the total row count

    • Syntax

    • reader.get_row_count()
    • Parameters

      None

    • Return value

      Returns the number of rows in the table. If slice_id and slice_count are configured, it returns the size of the shard.

  • Get the table schema

    • Syntax

    • reader.get_schema()
    • Parameters

      None

    • Return value

    • Returns a 1D structured ndarray. Each element describes a selected column from the MaxCompute table and contains the following three fields.

      Parameter

      Description

      colname

      The column name.

      typestr

      The name of the MaxCompute data type.

      pytype

      The Python data type corresponding to typestr.

      The following table describes the mapping between typestr and pytype.

      typestr

      pytype

      BIGINT

      INT

      DOUBLE

      FLOAT

      BOOLEAN

      BOOL

      STRING

      OBJECT

      DATETIME

      INT

      MAP

      Note

      PAI-TensorFlow does not support MAP data.

      OBJECT

  • Close the table

    • Syntax

    • reader.close()
    • Parameters

      None

    • Return value

      None. An exception is thrown if an error occurs.

Example

This example uses a table named test in the myproject project with the following data.

uid (BIGINT)

name (STRING)

price (DOUBLE)

virtual (BOOL)

25

"Apple"

5.0

False

38

"Pear"

4.5

False

17

"Watermelon"

2.2

False

The following code shows how to use TableReader to read data from the uid, name, and price columns.

    import os
    import paiio
    
    # Specify the path of the configuration file. Replace the value with the actual path.
    os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
    # Open a table. Replace myproject and test with your project and table names.
    reader = paiio.python_io.TableReader("odps://myproject/tables/test", selected_cols="uid,name,price")
    
    # Get the total number of rows in the table.
    total_records_num = reader.get_row_count() # return 3
    
    batch_size = 2
    # Read the table. The return value is a recarray in the format [(uid, name, price)*2].
    records = reader.read(batch_size) # Returns [(25, "Apple", 5.0), (38, "Pear", 4.5)]
    records = reader.read(batch_size) # Returns [(17, "Watermelon", 2.2)]
    # Reading again throws an OutOfRangeException.
    
    # Close the reader.
    reader.close()

TableWriter usage

TableWriter is based on the MaxCompute SDK and does not depend on the TensorFlow framework, allowing you to write data directly to MaxCompute tables.

API

  • Create a writer and open a table

    • Syntax

      writer = paiio.python_io.TableWriter(table, slice_id=0)
      Note
      • This operation appends data to a table and does not clear existing data.

      • You can read the newly written data only after the table is closed.

    • Parameters

      Parameter

      Required

      Type

      Default

      Description

      table

      Yes

      STRING

      None

      The name of the MaxCompute table to open. The name must be in the following format: odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...

      slice_id

      No

      INT

      0

      The ID of the shard to write to. In distributed mode, writing to different shards prevents write conflicts. In standalone mode, you can use the default value 0. In distributed mode, the write operation will fail if multiple workers, including parameter server (PS) nodes, write to the same shard using the same slice_id.

    • Return value

      Returns a Writer object.

  • Write records

    • Syntax

      writer.write(values, indices)
    • Parameters

      Parameter

      Required

      Type

      Default

      Description

      values

      Yes

      STRING

      None

      The data to write, specified as a single record or multiple records:

      • To write a single record, pass a TUPLE, LIST, or 1D-ndarray of scalars to the values parameter. If you pass a LIST or ndarray, all columns in the record must have the same data type.

      • To write one or more records, pass a LIST or 1D-ndarray to the values parameter. Each element must be a TUPLE, LIST, or structured ndarray element that represents a single record.

      indices

      Yes

      INT

      None

      The indices of the columns to write to. This can be a TUPLE, LIST, or 1D-ndarray of integers. Each index in indices is the zero-based column number.

    • Return value

      The number of records successfully written. If the operation fails, an exception is thrown.

  • Close the table

    • Syntax

      writer.close()
      Note

      You do not need to explicitly call the close() method when you use a with statement.

    • Parameters

      None

    • Return value

      None. If an error occurs, an exception is thrown.

    • Example

      The following code shows how to use TableWriter with a with statement.

      with paiio.python_io.TableWriter(table) as writer:
        # Prepare values for writing.
          writer.write(values, indices)
          # The writer is closed automatically when the 'with' block is exited.

Example

import paiio
import os

# Specify the path of the configuration file. Replace the value with the actual path.
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Prepare the data.
values = [(25, "Apple", 5.0, False),
          (38, "Pear", 4.5, False),
          (17, "Watermelon", 2.2, False)]

# Open a table to get a writer object. Replace the project and table names with your actual values.
writer = paiio.python_io.TableWriter("odps://project/tables/test")

# Write records to columns 0 to 3 of the table.
records = writer.write(values, indices=[0, 1, 2, 3])

# Close the writer.
writer.close()

Next steps

After configuring the code, follow these steps to use PAIIO to read from and write to MaxCompute tables:

  1. Create a dataset and upload your configuration and code files to the data source. For more information, see Create and manage datasets.

  2. Create a DLC job. The key parameters are described below. For other parameters, see Create a training job.

    • Node Image: Under Alibaba Cloud Images, select an image for TensorFlow 1.12, TensorFlow 1.15, or TensorFlow 2.0.

    • Dataset Configuration: For Dataset, select the dataset you created in step 1 and set Mount Path to /mnt/data/.

    • Job Command: Enter python /mnt/data/xxx.py. Replace xxx.py with the name of the code file you uploaded in step 1.

  3. Click Confirm.

    After submitting the training job, you can view the results in the job logs. For more information, see View job logs.