This topic describes how to call TableRecordDataset to read data from MaxCompute tables by row and create data streams.
If you use TensorFlow 1.2 or later, the TensorFlow open source community recommends Dataset interfaces rather than threads or queues to create data streams. Multiple Dataset interfaces are combined to generate data, which simplifies data input code.
GPU-accelerated servers will be phased out. You can submit TensorFlow tasks that run on CPU servers. If you want to use GPU-accelerated instances for model training, go to Deep Learning Containers (DLC) to submit jobs. For more information, see Submit training jobs.
Description
TableRecordDataset provided by PAI-TensorFlow is similar to RecordDataset provided by native TensorFlow. Both of them provide data sources for Dataset interfaces that are used for data transformation. The following code defines TableRecordDataset.
class TableRecordDataset(Dataset):
def __init__(self,
filenames,
record_defaults,
selected_cols=None,
excluded_cols=None,
slice_id=0,
slice_count=1,
num_threads=0,
capacity=0):
Parameter | Description |
filenames | The name list of the tables that you want to read. Data in a table can be read repeatedly. |
record_defaults | The data type of the column that you want to read. If the column is empty, this parameter indicates the default data type. If the data type is inconsistent with the data type of the column that you read or the data type cannot be automatically converted, the system reports an exception. Valid values: FLOAT32, FLOAT64, INT32, INT64, BOOL, and STRING. |
selected_cols | The column that you want to select. Separate multiple columns with commas (,). The value must be of the STRING type. |
excluded_cols | The column that you want to exclude. Separate multiple columns with commas (,). The value must be of the STRING type. Do not use excluded_cols and selected_cols at the same time. |
slice_id | The ID of the partition. In distributed read mode, the table is divided into multiple partitions based on slice_count. The system reads data from the partition specified by slice_id. |
slice_count | The number of partitions in distributed read mode. The value is the number of workers. |
num_threads | The number of threads enabled by the built-in reader of each table to prefetch data. The threads are independent of calculating threads. Valid values: 1 to 64. If num_threads is set to 0, the system automatically sets the threads configured to prefetch data to 1/4 of calculating threads. Note I/O has different impacts on the overall computing performance of each model. As a result, the increase in the number of threads used to prefetch data does not necessarily improve the training speed of the overall model. |
capacity | The number of records that are prefetched. If the value specified by num_threads is greater than 1, each thread prefetches capacity/num_threads data records. The parameter value is rounded up. If capacity is set to 0, the built-in reader configures the total size of data that the threads can prefetch based on the average value of the first N records in the table. The default value of N is 256. As a result, the size of data that each thread prefetches is about 64 MB. Note If you manually configure the size of the data that the threads can prefetch and a thread prefetches more than 1 GB of data, the system displays alert information for you to check the configuration, but does not stop the program. |
If the data type of a MaxCompute table is DOUBLE, TensorFlow maps it to np.float.
Return value
TableRecordDataset returns a new dataset object, which can be used as the input to create pipeline workflows.
### other definition codes was ignored here.
# Suppose an odps table named 'sample_table' was built in
# 'test' project, which includes 5 columns:
# (itemid bigint, name string, price double,
# virtual bool, tags string)
# Table name would be passed from run commands.
tables = ["odps://test/tables/sample_table"]
# Firstly, we define a new TableRecordDataset to read itemid and price.
dataset = tf.data.TableRecordDataset(tables,
record_defaults = (0, 0.0),
selected_cols = "itemid, price")
# Get a batch of 128
dataset = dataset.batch(128)
# Set epoch as 10
dataset = dataset.repeat(10)
# At last we got a batch of ids and prices.
[ids, prices] = dataset.make_one_shot_iterator().get_next()
### Then we do other graph construction and finally run the graph.
The get_next()
method is called to implement sessions. The method reads 128 rows of data from a table and parses each column of data to the required type of tensor based on the type specified by record_defaults. In a return value, the value of output_types must be the same as that of record_defaults, and the value of output_shapes must be the same as the number of elements in record_defaults.
Parameters in the console
If you use a table as the input, you must use -Dtables to configure the name of the table that you want to access when you submit the task.
pai -name tensorflow1120_cpu_ext -Dtables=odps://algo_platform_dev/tables/sample;
If you want to read multiple tables, separate the table names with commas (,).
pai -name tensorflow1120_cpu_ext -Dtables=odps://algo_platform_dev/tables/sample,odps://algo_platform_dev/tables/sample2
If you access partitioned tables, you must add partitions after the table names.
pai -name tensorflow1120_cpu_ext -Dtables=odps://algo_platform_dev/tables/sample/pt=1;
Examples
In this example, logistic regression is used to describe how to use TableRecordDataset to read table data and perform model training.
Prepare data.
TableRecordReader imports the entire row of data as a string to a MaxCompute table, reads the string, and then parses the string. If you use TableRecordDataset, we recommend that you store data in the MaxCompute table by column. A Dataset interface can return the data of the table as a tensor of the specified type.
Create a table.
Create a table that contains four columns in MaxCompute.
odps@ algo_platform_dev>create table sample (col1 double, col2 double, col3 double, col4 double); Data Health Manager:Your health synthesize score is 5, so, your job priority is 7 ID = 201803050245351****6Vgsxo2 OK odps@ algo_platform_dev>read sample; +------------+------------+------------+------------+ | col1 | col2 | col3 | col4 | +------------+------------+------------+------------+ +------------+------------+------------+------------+
Import data.
Download test data and run the required MaxCompute Tunnel command to import the data into the MaxCompute table.
# View the downloaded test data. $head -n 3 sample.csv 0,0,0.017179100152531324,1 0,1,0.823381420409002,1 0,2,1.6488850495540865,1
# Import the data to the MaxCompute table. odps@ algo_platform_dev>tunnel upload /tmp/data/sample.csv sample -fd=,; Upload session: 20180305135640c8cc650a0000**** Start upload:sample.csv Using \n to split records Upload in strict schema mode: true Total bytes:260093 Split input to 1 blocks 2018-03-05 13:56:40 scan block: '1' 2018-03-05 13:56:40 scan block complete, blockid=1 2018-03-05 13:56:40 upload block: '1' 2018-03-05 13:56:41 upload block complete, blockid=1 upload complete, average speed is 254 KB/s OK odps@ algo_platform_dev>read sample 3; +------------+------------+------------+------------+ | col1 | col2 | col3 | col4 | +------------+------------+------------+------------+ | 0.0 | 0.0 | 0.017179100152531324 | 1.0 | | 0.0 | 1.0 | 0.823381420409002 | 1.0 | | 0.0 | 2.0 | 1.6488850495540865 | 1.0 | +------------+------------+------------+------------+
NoteEach row of the test data is separated by commas (,). You must use
-fd=
, to configure the comma (,) as the delimiter. This way, the test data is imported into the four columns of the required MaxCompute table.
Define input data and models.
You can use the following sample code to define the input data. The following code does not define tf.train.Coordinator and start_queue_runners, which is the only difference when you compare the following code with the code of TableRecordReader.
#define the input def input_fn(): dataset = tf.data.TableRecordDataset([FLAGS.tables], record_defaults=[1.0]*4).repeat().batch(128) v1, v2, v3, v4 = dataset.make_one_shot_iterator().get_next() labels = tf.reshape(tf.cast(v4, tf.int32), [128]) features = tf.stack([v1, v2, v3], axis=1) return features, labels
Sample code for lr_dataset.py:
import tensorflow as tf tf.app.flags.DEFINE_string("tables", "", "tables info") FLAGS = tf.app.flags.FLAGS #define the input def input_fn(): dataset = tf.data.TableRecordDataset([FLAGS.tables], record_defaults=[1.0]*4).repeat().batch(128) v1, v2, v3, v4 = dataset.make_one_shot_iterator().get_next() labels = tf.reshape(tf.cast(v4, tf.int32), [128]) features = tf.stack([v1, v2, v3], axis=1) return features, labels #construct the model def model_fn(features, labels): W = tf.Variable(tf.zeros([3, 2])) b = tf.Variable(tf.zeros([2])) pred = tf.matmul(features, W) + b loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits=pred,labels=labels)) # Gradient Descent optimizer = tf.train.GradientDescentOptimizer(0.05).minimize(loss) return loss, optimizer features, labels = input_fn() loss, optimizer = model_fn(features, labels) init = tf.global_variables_initializer() local_init = tf.local_variables_initializer() sess = tf.Session() sess.run(init) sess.run(local_init) for step in range(10000): _, c = sess.run([optimizer, loss]) if step % 2000 == 0: print("loss," , c)
Submit the task.
odps@ algo_platform_dev>pai -name tensorflow1120_cpu_ext -Dtables=odps://algo_platform_dev/tables/sample -Dscript=file:///tmp/lr_dataset.py;
View the results.
Click the LogView link returned after you submit the task to view the results.
start launching tensorflow job ('loss,', 0.6931472) ('loss,', 0.007929571) ('loss,', 0.0016527221) ('loss,', 0.0023481336) ('loss,', 0.0011788738)