All Products
Search
Document Center

Platform For AI:Use PAIIO to read data from and write data to MaxCompute tables

Last Updated:Apr 12, 2024

To facilitate data reads and writes from and to MaxCompute tables for Deep Learning Containers (DLC) jobs, the Platform for AI (PAI) team develops the PAIIO module. PAIIO supports the TableRecordDataset, TableReader, and TableWriter interfaces. This topic describes how to use these interfaces to read data from and write data to MaxCompute tables and provides examples.

Limits

  • PAIIO is available only for DLC jobs that run TensorFlow 1.12, TensorFlow 1.15, or TensorFlow 2.0.

  • PAIIO is unavailable for jobs that are created based on a custom image.

Configure account information

Before you use PAIIO to read data from or write data to MaxCompute tables, you must configure the AccessKey information used to access MaxCompute resources. PAI allows you to obtain the AccessKey information from a configuration file. To achieve this, you can store the configuration file in a file system and reference the information in your code by using environment variables.

  1. Create a configuration file that contains the following content:

    access_id=xxxx
    access_key=xxxx
    end_point=http://xxxx

    Parameter

    Description

    access_id

    The AccessKey ID of your Alibaba Cloud account.

    access_key

    The AccessKey secret of your Alibaba Cloud account.

    end_point

    The endpoint of MaxCompute. For example, the endpoint for the China (Shanghai) region is http://service.cn-shanghai.maxcompute.aliyun.com/api. For more information, see Endpoints.

  2. Use the following syntax to specify the path of the configuration file in the code:

    os.environ['ODPS_CONFIG_FILE_PATH'] = '<your MaxCompute config file path>'

    Replace <your MaxCompute config file path> with the file path.

TableRecordDataset

Overview

Open source TensorFlow recommends that you use TensorFlow Datasets in TensorFlow 1.2 or later to replace the original threading and queuing interfaces to create data streams. Multiple Dataset interfaces are combined to generate data for computing. This simplifies data input code.

  • Syntax (Python)

    class TableRecordDataset(Dataset):
      def __init__(self,
                   filenames,
                   record_defaults,
                   selected_cols=None,
                   excluded_cols=None,
                   slice_id=0,
                   slice_count=1,
                   num_threads=0,
                   capacity=0):
  • Parameters

    Parameter

    Required

    Type

    Default Value

    Description

    filenames

    Yes

    STRING

    None

    The names of the tables that you want to read. The tables must use the same schema. Table name format: odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/....

    record_defaults

    Yes

    LIST or TUPLE

    None

    The data type of the column that you want to read. If the column is empty, this parameter specifies the default data type. If the data type is different from the data type of the column that you read or the data type cannot be automatically converted, the system throws an exception.

    Valid values: FLOAT32, FLOAT64, INT32, INT64, BOOL, and STRING. For information about the default values of the INT64 data type, use np.array(0, np.int64) syntax for query.

    selected_cols

    No

    STRING

    None

    The column that you want to select. Separate multiple columns with commas (,). If you set this parameter to the default value None, all columns are read. You can specify only one of the selected_cols and excluded_cols parameters.

    excluded_cols

    No

    STRING

    None

    The column that you want to exclude. Separate multiple columns with commas (,). If you set this parameter to the default value None, all columns are read. You can specify only one of selected_cols and excluded_cols.

    slice_id

    No

    INT

    0

    The ID of the shard in distributed read mode. The shard ID starts from 0. In distributed read mode, the table is divided into multiple shards based on the value of the slice_count parameter. The system reads data from the shard specified by the slice_id parameter.

    If slice_id is set to the default value 0 and slice_count is set to 1, the entire table is read. If slice_id is set to the default value 0 and slice_count is set to a value that is greater than 1, the 0th shard is read.

    slice_count

    No

    INT

    1

    The number of shards in distributed read mode. In most cases, the value is the number of workers. If you set this parameter to the default value 1, the entire table is read without sharding.

    num_threads

    No

    INT

    0

    The number of threads enabled by the built-in reader of each table to prefetch data. The threads are independent of calculating threads. Valid values: 1 to 64. If num_threads is set to 0, the system automatically assigns 25% of the calculating threads to prefetch data.

    Note

    I/O has different impacts on the overall computing performance of each model. As a result, the increase in the number of threads used to prefetch data does not necessarily improve the training speed of the overall model.

    capacity

    No

    INT

    0

    The number of records that are prefetched. If the value specified by num_threads is greater than 1, each thread prefetches capacity/num_threads data records. The parameter value is rounded up. If capacity is set to 0, the built-in reader configures the total size of data that the threads can prefetch based on the average value of the first N records in the table. The default value of N is 256. As a result, the size of data that each thread prefetches is approximately 64 MB.

    Note

    If the data type of fields in a MaxCompute table is DOUBLE, TensorFlow maps the data type to np.float64.

  • Responses

    A Dataset object is returned, which can be used as the input to create pipelines.

Examples

For example, you store a table named test in your MaxCompute project named myproject. The following table lists partial table content.

itemid (BIGINT)

name (STRING)

price (DOUBLE)

virtual (BOOL)

25

"Apple"

5.0

False

38

"Pear"

4.5

False

17

"Watermelon"

2.2

False

The following sample code provides an example on how to use the TableRecordDataset interface to read the itemid and price columns from the test table:

import os
import tensorflow as tf
import paiio

# Specify the path of the configuration file. Replace the value with the path where the configuration file is stored. 
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Specify the tables that you want to read. Replace ${your_projectname} with the name of the MaxCompute project and ${table_name} with the name of the table that you want to access. 
table = ["odps://${your_projectname}/tables/${table_name}"]
# Specify the TableRecordDataset interface to read the itemid and price columns of the table. 
dataset = paiio.data.TableRecordDataset(table,
                                       record_defaults=[0, 0.0],
                                       selected_cols="itemid,price",
                                       num_threads=1,
                                       capacity=10)
# Specify epoch 2, batch size 3, and prefetch 100 batch. 
dataset = dataset.repeat(2).batch(3).prefetch(100)

ids, prices = tf.compat.v1.data.make_one_shot_iterator(dataset).get_next()

with tf.compat.v1.Session() as sess:
    sess.run(tf.compat.v1.global_variables_initializer())
    sess.run(tf.compat.v1.local_variables_initializer())
    try:
        while True:
            batch_ids, batch_prices = sess.run([ids, prices])
            print("batch_ids:", batch_ids)
            print("batch_prices:", batch_prices)
    except tf.errors.OutOfRangeError:
        print("End of dataset")

TableReader

Overview

You can use the TableReader interface in a MaxCompute SDK without the need to rely on TensorFlow. This allows you can to access MaxCompute tables and obtain real-time I/O results.

  • Create a Reader object and open a table

    • Syntax

    • reader = paiio.python_io.TableReader(table,
                           selected_cols="",
                          excluded_cols="",
                           slice_id=0,
                          slice_count=1):
    • Parameters

    • Parameter

      Required

      Type

      Default value

      Description

      table

      Yes

      STRING

      None

      The name of the MaxCompute table that you want to open. Table name format: odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...

      selected_cols

      No

      STRING

      Empty string ("")

      The column that you want to select. Separate multiple columns with commas (,). The value must be of the STRING type. If this parameter is set to the default value, all columns are read. You can specify only one of selected_cols and excluded_cols.

      excluded_cols

      No

      STRING

      Empty string ("")

      The column that you want to exclude. Separate multiple columns with commas (,). The value must be of the STRING type. If this parameter is set to the default value, all columns are read. You can specify only one of selected_cols and excluded_cols.

      slice_id

      No

      INT

      0

      The ID of the shard in distributed read mode. Valid values: [0, slice_count-1]. In distributed read mode, the table is divided into multiple shards based on the value of slice_count. The system reads data from the shard specified by slice_id. If you set this parameter to the default value 0, all table records are read.

      slice_count

      No

      INT

      1

      The number of shards in distributed read mode. In most cases, the value is the number of workers.

    • Responses

      A Reader object is returned.

  • Read data records

    • Syntax

    • reader.read(num_records=1)
    • Parameters

      num_records specifies the number of data records that are sequentially read. The default value is 1, which specifies that a single record is read. If you set the num_records parameter to a value that is greater than the number of unread records, all records that are read are returned. If no records are returned, paiio.python_io.OutOfRangeException is thrown.

    • Responses

      A numpy n-dimensional array (or record array) is returned. Each element in the array is a tuple that consists of a table record.

  • Obtain data starting from a specific data record

    • Syntax

    • reader.seek(offset=0)
    • Parameters

    • offset specifies the ID of the data record starting from which you want to obtain data. The record ID starts from 0. If you specify slice_id and slice_count, data is obtained based on the location of the record specified by offset in the corresponding shard. If offset is set to a value that is greater than the total number of data records in the table, an out-of-range exception is thrown. If the previous seek operation returns a record that does not belong to the table and you proceed with another seek operation, paiio.python_io.OutOfRangeException is thrown.

      Important

      If the number of unread data records in the table is less than the batch size you specified for a read operation, the number of unread data records is returned and no exception is thrown. If you proceed with another seek operation, an exception is thrown.

    • Responses

      No value is returned. If an error occurs in an operation, the system throws an exception.

  • Obtain the total number of data records in the table

    • Syntax

    • reader.get_row_count()
    • Parameters

      None

    • Responses

      The number of data records in the table is returned. If you specify slice_id and slice_count, the number of data records in the shard is returned.

  • Obtain the schema of the table

    • Syntax

    • reader.get_schema()
    • Parameters

      None

    • Responses

    • A one-dimensional array is returned. Each element in the array corresponds to the schema of a column in the table. The following table describes the parameters contained in a schema.

      Parameter

      Description

      colname

      The name of the column.

      typestr

      The name of the MaxCompute data type.

      pytype

      The Python data type that corresponds to the value specified by typestr.

      The following table describes the mappings between values that can be specified by typestr and pytype.

      typestr

      pytype

      BIGINT

      INT

      DOUBLE

      FLOAT

      BOOLEAN

      BOOL

      STRING

      OBJECT

      DATETIME

      INT

      MAP

      Note

      This data type is unavailable for TensorFlow that is built into PAI.

      OBJECT

  • Close the table

    • Syntax

    • reader.close()
    • Parameters

      None

    • Responses

      No value is returned. If an error occurs in an operation, the system throws an exception.

Examples

For example, you store a table named test in your MaxCompute project named myproject. The following table lists partial table content.

uid (BIGINT)

name (STRING)

price (DOUBLE)

virtual (BOOL)

25

"Apple"

5.0

False

38

"Pear"

4.5

False

17

"Watermelon"

2.2

False

The following code provides an example on how to use the TableReader interface to read the data contained in the uid, name, and price columns.

    import os
    import paiio
    
    # Specify the path of the configuration file. Replace the value with the path where the configuration file is stored. 
    os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
    # Open a table and return a Reader object. Replace ${your_projectname} with the name of the MaxCompute project and ${table_name} with the name of the table that you want to access. 
    reader = paiio.python_io.TableReader("odps://myproject/tables/test", selected_cols="uid,name,price")
    
    # Obtain the total number of data records in the table. 
    total_records_num = reader.get_row_count() # return 3
    
    batch_size = 2
    # Read the table and return a record array in the [(uid, name, price)*2] form. 
    records = reader.read(batch_size) # Return [(25, "Apple", 5.0), (38, "Pear", 4.5)].
    records = reader.read(batch_size) # Return [(17, "Watermelon", 2.2)].
    # If you continue to read, an out-of-memory exception is thrown. 
    
    # Close the reader.
    reader.close()

TableWriter

You can use the TableReader interface in a MaxCompute SDK without the need to rely on TensorFlow. This allows you to access MaxCompute tables and obtain real-time I/O results.

Overview

  • Create a Writer object and open a table

    • Syntax

      writer = paiio.python_io.TableWriter(table, slice_id=0)
      Note
      • This interface writes data to a table without clearing existing data.

      • Newly written data can be read only after the table is closed.

    • Parameters

      Parameter

      Required

      Type

      Default value

      Description

      table

      Yes

      STRING

      None

      The name of the MaxCompute table that you want to open. Table name format:odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/....

      slice_id

      No

      INT

      0

      The ID of the shard. In distributed mode, the data is written to different shards to prevent write conflicts. In standalone mode, use the default value 0. In distributed mode, the write operation fails if multiple workers, including parameter server (PS) nodes, write data to the same shard specified by slice_id.

    • Responses

      A Writer object is returned.

  • Write data records

    • Syntax

      writer.write(values, indices)
    • Parameters

      Parameter

      Required

      Type

      Default value

      Description

      values

      Yes

      STRING

      None

      The data record that you want to write. You can write one or more records.

      • To write only one record, set values to a tuple, list, or one-dimensional array that consists of scalars. If values is set to a list or one-dimensional array, all columns of the record are of the same data type.

      • To write one or more records, set values to a list or one-dimensional array. Each element in the value corresponds to a record that is a tuple, list, or one-dimensional array.

      indices

      Yes

      INT

      None

      The columns of the data record that you want to write. The value can be a tuple, list, or one-dimensional array that consists of integer indexes. Each number in the value specified by indices corresponds to a column of the record. For example, the number i corresponds to the column i. The column number starts from 0.

    • Responses

      No value is returned. If an error occurs during the write operation, the system throws an exception and exits the current process.

  • Close the table

    • Syntax

      writer.close()
      Note

      In the WITH statement, you do not need to explicitly call the close() method to close a table.

    • Parameters

      None

    • Responses

      No value is returned. If an error occurs in an operation, the system throws an exception.

    • Examples

      Use TableWriter in the WITH statement:

      with paiio.python_io.TableWriter(table) as writer:
        # Prepare values for writing.
          writer.write(values, incides)
          # Table would be closed automatically outside this section.

Examples

import paiio
import os

# Specify the path of the configuration file. Replace the value with the path where the configuration file is stored. 
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Prepare data. 
values = [(25, "Apple", 5.0, False),
          (38, "Pear", 4.5, False),
          (17, "Watermelon", 2.2, False)]

# Open a table and return a Writer object. Replace ${your_projectname} with the name of the MaxCompute project and ${table_name} with the name of the table that you want to access. 
writer = paiio.python_io.TableWriter("odps://project/tables/test")

# Write data to columns 0 to 3 of the table. 
records = writer.write(values, indices=[0, 1, 2, 3])

# Use the Writer object to close the table. 
writer.close()

What to do next

After you configure code, you can use PAIIO to read data from and write data to MaxCompute tables by performing the following operations:

  1. Create a dataset and upload the configuration and code files that you prepared to the data source. For more information about how to create a dataset, see Create and manage datasets.

  2. Create a DLC job. The following section describes the key parameters. For more information about other parameters, see Submit training jobs.

    • Node Image: Click Alibaba Cloud Image and select a TensorFlow 1.12, TensorFlow 1.15, or TensorFlow 2.0 image.

    • Datasets: Select the dataset that you created in Step 1 and set Mount Path to /mnt/data/.

    • Job Command: Set the command to python /mnt/data/xxx.py. Replace xxx.py with the name of the code file that you uploaded in Step 1.

  3. Click OK.

    After you submit a training job, you can view the running result in the job logs. For more information, see the "View the job logs" section in the View training jobs topic.