This topic describes how to call TableRecordDataset to read data from MaxCompute tables by row and create data streams.

If you use TensorFlow 1.2 or later, the TensorFlow open source community recommends Dataset interfaces rather than threads or queues to create data streams. Multiple Dataset interfaces are combined to generate data, which simplifies data input code.

Description

TableRecordDataset provided by PAI-TensorFlow is similar to RecordDataset provided by native TensorFlow. Both of them provide data sources for Dataset interfaces that are used for data transformation. The following code defines TableRecordDataset:
class TableRecordDataset(Dataset):
  def __init__(self,
               filenames,
               record_defaults,
               selected_cols=None,
               excluded_cols=None,
                slice_id=0,
                slice_count=1,
               num_threads=0,
               capacity=0):
Parameter Description
filenames The name list of the tables that you want to read. Data in a table can be read repeatedly.
record_defaults The data type of the column that you want to read. If the column is empty, this parameter indicates the default data type. If the data type is inconsistent with the data type of the column that you read or the data type cannot be automatically converted, the system reports an exception. Valid values: FLOAT32, FLOAT64, INT32, INT64, BOOL, and STRING.
selected_cols The column that you selected. Separate multiple columns with commas (,). The value must be of the STRING type.
excluded_cols The column that you excluded. Separate multiple columns with commas (,). The value must be of the STRING type. Do not use excluded_cols and selected_cols at the same time.
slice_id The ID of the partition. In distributed read mode, the table is divided into multiple partitions based on slice_count. The system reads data from the partition specified by slice_id.
slice_count The number of partitions in distributed read mode. The value is the number of workers.
num_threads The number of threads enabled by the built-in reader of each table to prefetch data. The threads are independent of calculating threads. The value ranges from 1 to 64. If the value of num_threads is set to 0, the system automatically sets the threads configured to prefetch data to 1/4 of calculating threads.
Note I/O has different impacts on the overall computing performance of each model. As a result, the increase in the number of threads used to prefetch data does not necessarily improve the training speed of the overall model.
capacity The size of the data that is prefetched. Unit: rows. If the value of num_threads is greater than 1, each thread prefetches capacity/num_threads rows of data. The parameter value is rounded up. If the value of capacity is set to 0, the built-in reader configures the total size of the data that the threads can prefetch based on the average value of the first N rows in the table. The default value of N is 256. As a result, the size of the data that each thread prefetches is about 64 MB.
Note If you manually configure the size of the data that the threads can prefetch and a thread prefetches more than 1 GB of data, the system displays alert information for you to check the configuration, but does not stop the program.
Note If the data type of a MaxCompute table is DOUBLE, TensorFlow maps it to np.float.

Return value

TableRecordDataset returns a new dataset object, which can be used as the input to create pipeline workflows.
### other definition codes was ignored here.

# Suppose an odps table named 'sample_table' was built in
# 'test' project, which includes 5 columns:
#   (itemid bigint, name string, price double,
#    virtual bool, tags string)

# Table name would be passed from run commands.
tables = ["odps://test/tables/sample_table"]

# Firstly, we define a new TableRecordDataset to read itemid and price.
dataset = tf.data.TableRecordDataset(tables,
                                     record_defaults = (0, 0.0),
                                     selected_cols = "itemid, price")
# Get a batch of 128
dataset = dataset.batch(128)
# Set epoch as 10
dataset = dataset.repeat(10)
# At last we got a batch of ids and prices.
[ids, prices] = dataset.make_one_shot_iterator().get_next()

### Then we do other graph construction and finally run the graph.
The get_next() method is called to implement sessions. The method reads 128 rows of data from a table and parses each column of data to the required type of tensor based on the type specified by record_defaults. In a return value, the value of output_types must be the same as that of record_defaults, and the value of output_shapes must be the same as the number of elements in record_defaults.

Parameters in the console

  • If you use a table as the input, you must use -Dtables to configure the name of the table that you want to access when you submit the task.
    pai -name tensorflow1120_cpu_ext -Dtables=odps://algo_platform_dev/tables/sample;
  • If you want to read multiple tables, separate the table names with commas (,).
    pai -name tensorflow1120_cpu_ext -Dtables=odps://algo_platform_dev/tables/sample,odps://algo_platform_dev/tables/sample2
  • If you access partitioned tables, you must add partitions after the table names.
    pai -name tensorflow1120_cpu_ext -Dtables=odps://algo_platform_dev/tables/sample/pt=1;

Examples

In this example, logistic regression is used to describe how to use TableRecordDataset to read table data and perform model training.
  1. Prepare data.

    TableRecordReader imports the entire row of data as a string to a MaxCompute table, reads the string, and then parses the string. If you use TableRecordDataset, we recommend that you store data in the MaxCompute table by column. A Dataset interface can return the data of the table as a tensor of the specified type.

    1. Create a table.
      Create a table that contains four columns in MaxCompute.
      odps@ algo_platform_dev>create table sample (col1 double, col2 double, col3 double, col4 double);
      Data Health Manager:Your health synthesize score is 5, so, your job priority is 7
      
      ID = 201803050245351****6Vgsxo2
      OK
      odps@ algo_platform_dev>read sample;
      +------------+------------+------------+------------+
      | col1       | col2       | col3       | col4       |
      +------------+------------+------------+------------+
      +------------+------------+------------+------------+
    2. Import data.
      Download test data and run the required MaxCompute Tunnel command to import the data into the MaxCompute table.
      # View the downloaded test data.
      $head -n 3 sample.csv
      0,0,0.017179100152531324,1
      0,1,0.823381420409002,1
      0,2,1.6488850495540865,1
      # Import the data to the MaxCompute table.
      odps@ algo_platform_dev>tunnel upload /tmp/data/sample.csv sample -fd=,;
      Upload session: 20180305135640c8cc650a0000****
      Start upload:sample.csv
      Using \n to split records
      Upload in strict schema mode: true
      Total bytes:260093   Split input to 1 blocks
      2018-03-05 13:56:40 scan block: '1'
      2018-03-05 13:56:40 scan block complete, blockid=1
      2018-03-05 13:56:40 upload block: '1'
      2018-03-05 13:56:41 upload block complete, blockid=1
      upload complete, average speed is 254 KB/s
      OK
      odps@ algo_platform_dev>read sample 3;
      +------------+------------+------------+------------+
      | col1       | col2       | col3       | col4       |
      +------------+------------+------------+------------+
      | 0.0        | 0.0        | 0.017179100152531324 | 1.0        |
      | 0.0        | 1.0        | 0.823381420409002 | 1.0        |
      | 0.0        | 2.0        | 1.6488850495540865 | 1.0        |
      +------------+------------+------------+------------+
      Note Each row of the test data is separated by commas (,). You must use -fd=, to configure the comma (,) as the delimiter. This way, the test data is imported into the four columns of the required MaxCompute table.
  2. Define input data and models.
    You can use the following sample code to define the input data. The following code does not define tf.train.Coordinator and start_queue_runners, which is the only difference when you compare the following code with the code of TableRecordReader.
    #define the input
    def input_fn():
        dataset = tf.data.TableRecordDataset([FLAGS.tables], record_defaults=[1.0]*4).repeat().batch(128)
        v1, v2, v3, v4 = dataset.make_one_shot_iterator().get_next()
        labels = tf.reshape(tf.cast(v4, tf.int32), [128])
        features = tf.stack([v1, v2, v3], axis=1)
        return features, labels
    Sample code for lr_dataset.py:
    import tensorflow as tf
    
    tf.app.flags.DEFINE_string("tables", "", "tables info")
    FLAGS = tf.app.flags.FLAGS
    
    #define the input
    def input_fn():
        dataset = tf.data.TableRecordDataset([FLAGS.tables], record_defaults=[1.0]*4).repeat().batch(128)
        v1, v2, v3, v4 = dataset.make_one_shot_iterator().get_next()
        labels = tf.reshape(tf.cast(v4, tf.int32), [128])
        features = tf.stack([v1, v2, v3], axis=1)
        return features, labels
    
    #construct the model
    def model_fn(features, labels):
        W = tf.Variable(tf.zeros([3, 2]))
        b = tf.Variable(tf.zeros([2]))
        pred = tf.matmul(features, W) + b
    
        loss =  tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits=pred,labels=labels))
    
        # Gradient Descent
        optimizer = tf.train.GradientDescentOptimizer(0.05).minimize(loss)
        return loss, optimizer
    
    features, labels = input_fn()
    loss, optimizer = model_fn(features, labels)
    
    init = tf.global_variables_initializer()
    local_init = tf.local_variables_initializer()
    
    sess = tf.Session()
    sess.run(init)
    sess.run(local_init)
    
    for step in range(10000):
        _, c = sess.run([optimizer, loss])
        if step % 2000 == 0:
            print("loss," , c)
  3. Submit the task.
    odps@ algo_platform_dev>pai -name tensorflow1120_cpu_ext -Dtables=odps://algo_platform_dev/tables/sample -Dscript=file:///tmp/lr_dataset.py;
  4. View the results.
    Click the Logview link returned after you submit the task to view the results.
    start launching tensorflow job
    ('loss,', 0.6931472)
    ('loss,', 0.007929571)
    ('loss,', 0.0016527221)
    ('loss,', 0.0023481336)
    ('loss,', 0.0011788738)