表格存储提供了BatchWriteRow、BatchGetRow、GetRange等多行操作的接口。

前提条件

  • 已初始化Client,详情请参见初始化
  • 已创建数据表并写入数据。

批量写(BatchWriteRow)

批量写接口用于在一次请求中进行批量的写入操作,也支持一次对多个数据表进行写入。BatchWriteRow操作由多个PutRow、UpdateRow、DeleteRow子操作组成,构造子操作的过程与使用PutRow接口、UpdateRow接口和DeleteRow接口时相同,也支持使用条件更新。

BatchWriteRow的各个子操作独立执行,表格存储会分别返回各个子操作的执行结果。

由于批量写入可能存在部分行失败的情况,失败行的Index及错误信息在返回的BatchWriteRowResponse中,但并不抛出异常。因此调用BatchWriteRow接口时,需要检查返回值,判断每行的状态是否成功;如果不检查返回值,则可能会忽略掉部分操作的失败。

当服务端检查到某些操作出现参数错误时,BatchWriteRow接口可能会抛出参数错误的异常,此时该请求中所有的操作都未执行。

接口

"""
说明:批量修改多行数据。
request = MiltiTableInBatchWriteRowItem()
request.add(TableInBatchWriteRowItem(table0, row_items))
request.add(TableInBatchWriteRowItem(table1, row_items))
response = client.batch_write_row(request)
``response``为返回的结果,类型为tablestore.metadata.BatchWriteRowResponse。
"""
def batch_write_row(self, request):                   

参数

详细参数说明请参见单行数据操作

示例

批量写数据。

put_row_items = []
# 增加PutRow的行。
for i in range(0, 10):
    primary_key = [('gid',i), ('uid',i+1)]
    attribute_columns = [('name','somebody'+str(i)), ('address','somewhere'+str(i)), ('age',i)]
    row = Row(primary_key, attribute_columns)
    condition = Condition(RowExistenceExpectation.IGNORE)
    item = PutRowItem(row, condition)
    put_row_items.append(item)

# 增加UpdateRow的行。
for i in range(10, 20):
    primary_key = [('gid',i), ('uid',i+1)]
    attribute_columns = {'put': [('name','somebody'+str(i)), ('address','somewhere'+str(i)), ('age',i)]}
    row = Row(primary_key, attribute_columns)
    condition = Condition(RowExistenceExpectation.IGNORE, SingleColumnCondition("age", i, ComparatorType.EQUAL))
    item = UpdateRowItem(row, condition)
    put_row_items.append(item)

# 增加DeleteRow的行。
delete_row_items = []
for i in range(10, 20):
    primary_key = [('gid',i), ('uid',i+1)]
    row = Row(primary_key)
    condition = Condition(RowExistenceExpectation.IGNORE)
    item = DeleteRowItem(row, condition)
    delete_row_items.append(item)

# 构造批量写请求。
request = BatchWriteRowRequest()
request.add(TableInBatchWriteRowItem(table_name, put_row_items))
request.add(TableInBatchWriteRowItem('notExistTable', delete_row_items))

# 调用batch_write_row方法执行批量写,如果请求参数等错误会抛异常;如果部分行失败,则不会抛异常,但是内部的Item会失败。
try:
    result = client.batch_write_row(request)
    print ('Result status: %s'%(result.is_all_succeed()))

    # 检查Put行的结果。
    print ('check first table\'s put results:')
    succ, fail = result.get_put()
    for item in succ:
        print ('Put succeed, consume %s write cu.' % item.consumed.write)
    for item in fail:
        print ('Put failed, error code: %s, error message: %s' % (item.error_code, item.error_message))

    # 检查Update行的结果。
    print ('check first table\'s update results:')
    succ, fail = result.get_update()
    for item in succ:
        print ('Update succeed, consume %s write cu.' % item.consumed.write)
    for item in fail:
        print ('Update failed, error code: %s, error message: %s' % (item.error_code, item.error_message))

    # 检查Delete行的结果。
    print ('check second table\'s delete results:')
    succ, fail = result.get_delete()
    for item in succ:
        print ('Delete succeed, consume %s write cu.' % item.consumed.write)
    for item in fail:
        print ('Delete failed, error code: %s, error message: %s' % (item.error_code, item.error_message)) 
# 客户端异常,一般为参数错误或者网络异常。
except OTSClientError as e:
    print "get row failed, http_status:%d, error_message:%s" % (e.get_http_status(), e.get_error_message())
# 服务端异常,一般为参数错误或者流控错误。
except OTSServiceError as e:
    print "get row failed, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % (e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id())                    

详细代码请参见BatchWriteRow@GitHub

批量读(BatchGetRow)

批量读接口用于一次请求读取多行数据,也支持一次对多个数据表进行读取。BatchGetRow由多个GetRow子操作组成。构造子操作的过程与使用GetRow接口时相同,也支持使用过滤器。

批量读取的所有行采用相同的参数条件,例如ColumnsToGet=[colA],则要读取的所有行都只读取colA列。

BatchGetRow的各个子操作独立执行,表格存储会分别返回各个子操作的执行结果。

由于批量读取可能存在部分行失败的情况,失败行的错误信息在返回的BatchGetRowResponse中,但并不抛出异常。因此调用BatchGetRow接口时,需要检查返回值,判断每行的状态是否成功。

接口

"""
说明:批量获取多行数据。
request = BatchGetRowRequest()
request.add(TableInBatchGetRowItem(myTable0, primary_keys, column_to_get=None, column_filter=None))
request.add(TableInBatchGetRowItem(myTable1, primary_keys, column_to_get=None, column_filter=None))
request.add(TableInBatchGetRowItem(myTable2, primary_keys, column_to_get=None, column_filter=None))
request.add(TableInBatchGetRowItem(myTable3, primary_keys, column_to_get=None, column_filter=None))
response = client.batch_get_row(request)
``response``为返回的结果,类型为tablestore.metadata.BatchGetRowResponse。
"""
def batch_get_row(self, request):                    

参数

详细参数说明请参见单行数据操作

示例

批量一次读取3行数据。

# 设置需要返回的列。
columns_to_get = ['name', 'mobile', 'address', 'age']

# 读取3行。
rows_to_get = [] 
for i in range(0, 3):
    primary_key = [('gid',i), ('uid',i+1)]
    rows_to_get.append(primary_key)

# 过滤条件为name等于John,且address等于China。
cond = CompositeColumnCondition(LogicalOperator.AND)
cond.add_sub_condition(SingleColumnCondition("name", "John", ComparatorType.EQUAL))
cond.add_sub_condition(SingleColumnCondition("address", 'China', ComparatorType.EQUAL))

# 构造批量读请求。
request = BatchGetRowRequest()

# 增加表table_name中需要读取的行,最后一个参数1表示读取最新的一个版本。
request.add(TableInBatchGetRowItem(table_name, rows_to_get, columns_to_get, cond, 1))

# 增加表notExistTable中需要读取的行。
request.add(TableInBatchGetRowItem('notExistTable', rows_to_get, columns_to_get, cond, 1))

try:
       result = client.batch_get_row(request) 
    print ('Result status: %s'%(result.is_all_succeed()))

    table_result_0 = result.get_result_by_table(table_name)
    table_result_1 = result.get_result_by_table('notExistTable')
    print ('Check first table\'s result:')
    for item in table_result_0: 
        if item.is_ok:
            print ('Read succeed, PrimaryKey: %s, Attributes: %s' % (item.row.primary_key, item.row.attribute_columns))
        else:
            print ('Read failed, error code: %s, error message: %s' % (item.error_code, item.error_message))
    print ('Check second table\'s result:')
    for item in table_result_1:
        if item.is_ok:
            print ('Read succeed, PrimaryKey: %s, Attributes: %s' % (item.row.primary_key, item.row.attribute_columns))
        else: 
            print ('Read failed, error code: %s, error message: %s' % (item.error_code, item.error_message))
# 客户端异常,一般为参数错误或者网络异常。
except OTSClientError as e:
    print "get row failed, http_status:%d, error_message:%s" % (e.get_http_status(), e.get_error_message())
# 服务端异常,一般为参数错误或者流控错误。
except OTSServiceError as e:
    print "get row failed, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % (e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id())                   

详细代码请参见BatchGetRow@GitHub

范围读(GetRange)

范围读接口用于读取一个主键范围内的数据。

范围读接口支持按照确定范围进行正序读取和逆序读取,可以设置要读取的行数。如果范围较大,已扫描的行数或者数据量超过一定限制,会停止扫描,并返回已获取的行和下一个主键信息。您可以根据返回的下一个主键信息,继续发起请求,获取范围内剩余的行。

GetRange操作可能在如下情况停止执行并返回数据。
  • 扫描的行数据大小之和达到4 MB。
  • 扫描的行数等于5000。
  • 返回的行数等于最大返回行数。
  • 当前剩余的预留读吞吐量已全部使用,余量不足以读取下一条数据。
说明 表格存储表中的行默认是按照主键排序的,而主键是由全部主键列按照顺序组成的,所以不能理解为表格存储会按照某列主键排序,这是常见的误区。

接口

"""
说明:根据范围条件获取多行数据。
``table_name``是数据表名称。
``direction``表示范围读取的读取方向,字符串格式,取值包括'FORWARD'和'BACKWARD'。
``inclusive_start_primary_key``表示范围的起始主键(在范围内)。
``exclusive_end_primary_key``表示范围的结束主键(不在范围内)。
``columns_to_get``是可选参数,表示要获取的列的名称列表,类型为list;如果不填,表示获取所有列。
``limit``是可选参数,表示最多读取多少行;如果不填,则没有限制。
``column_filter``是可选参数,表示读取指定条件的行。
``max_version``是可选参数,表示返回的最大版本数目,max_version与time_range必须至少存在一个。
``time_range``是可选参数,表示返回的版本的范围,max_version与time_range必须至少存在一个。
``start_column``是可选参数,用于宽行读取,表示本次读取的起始列。
``end_column``是可选参数,用于宽行读取,表示本次读取的结束列。
``token``是可选参数,用于宽行读取,表示本次读取的起始列位置,内容被二进制编码,来源于上次请求的返回结果中。

返回:符合条件的结果列表。
``consumed``表示本次操作消耗的CapacityUnit,是tablestore.metadata.CapacityUnit类的实例。
``next_start_primary_key``表示下次get_range操作的起始点的主健列,类型为dict。
``row_list``表示本次操作返回的行数据列表,格式为:[Row, ...]。  
"""
def get_range(self, table_name, direction,
             inclusive_start_primary_key,
             exclusive_end_primary_key,
             columns_to_get=None,
             limit=None,
             column_filter=None,
             max_version=None,
             time_range=None,
             start_column=None,
             end_column=None,
             token = None):                   

参数

参数 说明
table_name 数据表名称。
direction 读取方向。
  • 如果值为正序(FORWARD),则起始主键必须小于结束主键,返回的行按照主键由小到大的顺序进行排列。
  • 如果值为逆序(BACKWARD),则起始主键必须大于结束主键,返回的行按照主键由大到小的顺序进行排列。

例如同一表中有两个主键A和B,A<B。如正序读取[A, B),则按从A至B的顺序返回主键大于等于A、小于B的行;逆序读取[B, A),则按从B至A的顺序返回大于A、小于等于B的数据。

inclusive_start_primary_key 本次范围读的起始主键和结束主键,起始主键和结束主键需要是有效的主键或者是由INF_MIN和INF_MAX类型组成的虚拟点,虚拟点的列数必须与主键相同。

其中INF_MIN表示无限小,任何类型的值都比它大;INF_MAX表示无限大,任何类型的值都比它小。

  • inclusive_start_primary_key表示起始主键,如果该行存在,则返回结果中一定会包含此行。
  • exclusive_end_primary_key表示结束主键,无论该行是否存在,返回结果中都不会包含此行。

数据表中的行按主键从小到大排序,读取范围是一个左闭右开的区间,正序读取时,返回的是大于等于起始主键且小于结束主键的所有的行。

exclusive_end_primary_key
limit 数据的最大返回行数,此值必须大于 0。

表格存储按照正序或者逆序返回指定的最大返回行数后即结束该操作的执行,即使该区间内仍有未返回的数据。此时可以通过返回结果中的next_start_primary_key记录本次读取到的位置,用于下一次读取。

columns_to_get 读取的列集合,列名可以是主键列或属性列。

如果不设置返回的列名,则返回整行数据。

说明
  • 查询一行数据时,默认返回此行所有列的数据。如果需要只返回特定列,可以通过设置columns_to_get参数限制。如果将col0和col1加入到columns_to_get中,则只返回col0和col1列的值。
  • 如果某行数据的主键属于读取范围,但是该行数据不包含指定返回的列,那么返回结果中不包含该行数据。
  • 当columns_to_get和column_filter同时使用时,执行顺序是先获取columns_to_get指定的列,再在返回的列中进行条件过滤。
max_version 最多读取的版本数。
说明 max_version与time_range必须至少设置一个。
  • 如果仅设置max_version,则最多返回所有版本中从新到旧指定数量版本的数据。
  • 如果仅设置time_range,则返回该范围内所有数据或指定版本数据。
  • 如果同时设置max_version和time_range,则最多返回版本号范围内从新到旧指定数量版本的数据。
time_range 读取版本号范围或特定版本号的数据。更多信息,请参见TimeRange
说明 max_version与time_range必须至少设置一个。
  • 如果仅设置max_version,则最多返回所有版本中从新到旧指定数量版本的数据。
  • 如果仅设置time_range,则返回该范围内所有数据或指定版本数据。
  • 如果同时设置max_version和time_range,则最多返回版本号范围内从新到旧指定数量版本的数据。
  • 如果查询一个范围的数据,则需要设置start_time和end_time。start_time和end_time分别表示起始时间戳和结束时间戳,范围为前闭后开区间,即[start_time, end_time)
  • 如果查询特定版本号的数据,则需要设置specific_time。specific_time表示特定的时间戳。

specific_time和[start_time, end_time)中只需要设置一个。

时间戳的单位为毫秒,最小值为0,最大值为INT64.MAX。

column_filter 使用过滤器,在服务端对读取结果再进行一次过滤,只返回符合过滤器中条件的数据行。更多信息,请参见过滤器
说明 当columns_to_get和column_filter同时使用时,执行顺序是先获取columns_to_get指定的列,再在返回的列中进行条件过滤。
next_start_primary_key 根据返回结果中的next_start_primary_key判断数据是否全部读取。
  • 当返回结果中next_start_primary_key不为空时,可以使用此返回值作为下一次GetRange操作的起始点继续读取数据。
  • 当返回结果中next_start_primary_key为空时,表示读取范围内的数据全部返回。

示例

按照范围读取数据。

# 设置范围读的起始主键。
inclusive_start_primary_key = [('uid', INF_MIN), ('gid', INF_MIN)]

# 设置范围读的结束主键。
exclusive_end_primary_key = [('uid', INF_MAX), ('gid', INF_MAX)]

# 查询所有列。
columns_to_get = []

# 每次最多返回90行,如果总共有100个结果,首次查询时指定limit=90,则第一次最多返回90,最少可能返回0个结果,但是next_start_primary_key不为None。
limit = 90

# 设置过滤器。
cond = CompositeColumnCondition(LogicalOperator.AND)
cond.add_sub_condition(SingleColumnCondition("address", 'China', ComparatorType.EQUAL))
cond.add_sub_condition(SingleColumnCondition("age", 50, ComparatorType.LESS_THAN))

try:
    # 调用get_range接口。
    consumed, next_start_primary_key, row_list, next_token = client.get_range(
        table_name, Direction.FORWARD,
        inclusive_start_primary_key, exclusive_end_primary_key,
        columns_to_get,
        limit,
        column_filter=cond,
        max_version=1,
        time_range = (1557125059000, 1557129059000)  # start_time大于等于1557125059000,end_time小于1557129059000。
    )

    all_rows = []
    all_rows.extend(row_list)

    # 当next_start_primary_key不为空时,则继续读取。
    while next_start_primary_key is not None:
        inclusive_start_primary_key = next_start_primary_key
        consumed, next_start_primary_key, row_list, next_token = client.get_range(
            table_name, Direction.FORWARD,
            inclusive_start_primary_key, exclusive_end_primary_key,
            columns_to_get, limit,
            column_filter=cond,
            max_version=1
        )
        all_rows.extend(row_list)

    # 打印主键和属性列。
    for row in all_rows:
        print(row.primary_key, row.attribute_columns)
    print('Total rows: ', len(all_rows))
# 客户端异常,一般为参数错误或者网络异常。
except OTSClientError as e:
    print
    "get row failed, http_status:%d, error_message:%s" % (e.get_http_status(), e.get_error_message())
# 服务端异常,一般为参数错误或者流控错误。
except OTSServiceError as e:
    print
    "get row failed, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % (e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id())

详细代码请参见GetRange@GitHub