本文将进一步为您介绍如何将来自Tablestore(原OTS)的数据纳入MaxCompute上的计算生态,实现多种数据源之间的无缝连接。
表格存储(Tablestore)是构建在阿里云飞天分布式系统之上的NoSQL数据存储服务,提供海量结构化数据的存储和实时访问,详情请参见Tablestore文档。
您可以通过DataWorks配合MaxCompute对外部表进行可视化的创建、搜索、查询、配置、加工和分析。详情请参见外部表。
MaxCompute与Tablestore是两个独立的大数据计算和存储服务,所以两者之间的网络必须保证连通性。MaxCompute公共云服务访问Tablestore存储时,推荐您使用Tablestore私网地址,即Host名以ots-internal.aliyuncs.com作为结尾的地址,例如tablestore://odps-ots-dev.cn-shanghai.ots-internal.aliyuncs.com。
MaxCompute Type | Tablestore Type |
---|---|
STRING | STRING |
BIGINT | INTEGER |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
BINARY | BINARY |
STS模式授权
MaxCompute计算服务访问Tablestore数据需要有一个安全的授权通道。因此,MaxCompute结合了阿里云的访问控制服务(RAM)和令牌服务(STS)实现对数据的安全访问。
- 当MaxCompute和Tablestore的Owner是同一个账号时,在登录阿里云账号后,需要完成一键授权。详情请参见一键授权。
- 自定义授权
- 首先在RAM控制台中授予MaxCompute访问Tablestore的权限。
登录RAM控制台(若MaxCompute和Tablestore不是同一个账号,此处需由Tablestore账号登录进行授权),创建角色,例如角色AliyunODPSDefaultRole、AliyunODPSRoleForOtherUser。
- 修改策略内容设置。
--当MaxCompute和Tablestore的Owner是同一个账号时,进行如下设置。 { "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "odps.aliyuncs.com" ] } } ], "Version": "1" } --当MaxCompute和Tablestore的Owner不是同一个账号时,进行如下设置。 { "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "MaxCompute的Owner云账号UID@odps.aliyuncs.com" ] } } ], "Version": "1" }
您可以单击右上角的头像查看云账号的UID。 - 编辑该角色的授权策略AliyunODPSRolePolicy。
{ "Version": "1", "Statement": [ { "Action": [ "ots:ListTable", "ots:DescribeTable", "ots:GetRow", "ots:PutRow", "ots:UpdateRow", "ots:DeleteRow", "ots:GetRange", "ots:BatchGetRow", "ots:BatchWriteRow", "ots:ComputeSplitPointsBySize" ], "Resource": "*", "Effect": "Allow" } ] } --还可自定义其它权限。
- 将权限AliyunODPSRolePolicy授权给该角色。
- 首先在RAM控制台中授予MaxCompute访问Tablestore的权限。
创建外部表
MaxCompute通过创建外部表,把对Tablestore表数据的描述引入到MaxCompute的meta系统内部后,即可实现对Tablestore数据的处理。本节通过下述示例为您说明MaxCompute对接Tablestore的一些概念和实现。
DROP TABLE IF EXISTS ots_table_external;
CREATE EXTERNAL TABLE IF NOT EXISTS ots_table_external
(
odps_orderkey bigint,
odps_orderdate string,
odps_custkey bigint,
odps_orderstatus string,
odps_totalprice double,
odps_createdate timestamp
)
STORED BY 'com.aliyun.odps.TableStoreStorageHandler'
WITH SERDEPROPERTIES (
'tablestore.columns.mapping'=':o_orderkey,:o_orderdate,o_custkey,o_orderstatus,o_totalprice',
'tablestore.table.name'='ots_tpch_orders',
'odps.properties.rolearn'='acs:ram::xxxxx:role/aliyunodpsdefaultrole',
'tablestore.timestamp.ticks.unit'='seconds',
'tablestore.column.odps_createdate.timestamp.ticks.unit'='millis',
'tablestore.table.put.row'='true'
)
LOCATION 'tablestore://odps-ots-dev.cn-shanghai.ots-internal.aliyuncs.com';
com.aliyun.odps.TableStoreStorageHandler
是MaxCompute内置的处理Tablestore数据的StorageHandler,定义了MaxCompute和Tablestore的交互,相关逻辑由MaxCompute实现。SERDEPROPERITES
是提供参数选项的接口,在使用TableStoreStorageHandler时,有三个必选项和两个可选项。- 三个必选项为:
- tablestore.columns.mapping:用于描述MaxCompute将访问的Tablestore表的列,包括主键和属性列。
- 以冒号(:)开头用于表示Tablestore主键,例如示例中的
:o_orderkey
和:o_orderdate
,其它的均为属性列。 - Tablestore支持1~4个主键,主键类型为STRING、INTEGER和BINARY,其中第一个主键为分区键。
- 在指定映射时,您必须提供指定Tablestore表的所有主键,无需提供全部的属性列。只需提供需要通过MaxCompute访问的属性列,提供的属性列必须是Tablestore表的列,否则即使外部表可以创建成功,查询时也会报错。
- 以冒号(:)开头用于表示Tablestore主键,例如示例中的
- tablestore.table.name:需要访问的Tablestore表名。如果指定的Tablestore表名错误(不存在),则会报错,MaxCompute不会主动创建Tablestore表。
- odps.properties.rolearn:RAM中AliyunODPSDefaultRole的ARN信息。您可以通过RAM控制台中的RAM角色管理进行获取。
- tablestore.columns.mapping:用于描述MaxCompute将访问的Tablestore表的列,包括主键和属性列。
- 三个可选项为:
- tablestore.timestamp.ticks.unit:表级别时间类型设置。用以指定该外部表中所有Integer的字段都处于同一个时间类型。选项值为seconds(秒)、millis(毫秒)、micros(微秒)、nanos(纳秒)。
- tablestore.column.<col1_name>.timestamp.ticks.unit:列级别时间类型设置。用以指定该外部表中列字段的时间类型,选项值为seconds(秒)、millis(毫秒)、micros(微秒)、nanos(纳秒)。
- tablestore.table.put.row:支持指定PutRow的写入方式。选项值为True(打开)、False(关闭),默认值为False。
说明- 前两个选项主要是将Tablestore中字段类型为Integer的字段映射为MaxCompute中的timestamp类型。两个选项同时存在时,列级别优先级比表级别的高。
- 可以通过设置以下Flag的参数值指定PutRow的写入方式,默认值为False。详情请参见Flag参数列表。
set odps.sql.unstructured.tablestore.put.row=true;
- 三个必选项为:
- LOCATION :用来指定Tablestore的Instance名、Endpoint等具体信息。这里的Tablestore数据的安全访问建立在前文介绍的RAM/STS授权的前提上。说明 如果您使用公网地址报错,显示网络不同,可尝试更换为经典网地址。
desc extended <table_name>;
在返回的信息里,除了包含和内部表一样的基础信息,Extended Info还包含外部表StorageHandler、Location等信息。
查询外部表
SELECT odps_orderkey, odps_orderdate, SUM(odps_totalprice) AS sum_total
FROM ots_table_external
WHERE odps_orderkey > 5000 AND odps_orderkey < 7000 AND odps_orderdate >= '1996-05-03' AND odps_orderdate < '1997-05-01'
GROUP BY odps_orderkey, odps_orderdate
HAVING sum_total> 400000.0;
使用常见的MaxCompute SQL语句访问Tablestore时,所有的操作细节(例如列名的选择)是在MaxCompute内部处理完成的。上述SQL示例中,使用的列名是odps_orderkey、odps_totalprice等,而不是原始Tablestore中的主键名o_orderkey或属性列名o_totalprice。这是因为在创建外部表的DDL语句中,已经完成了对应的Mapping。您也可以在创建外部表时,按需选择保留原始的Tablestore主键/列名。
CREATE TABLE internal_orders AS
SELECT odps_orderkey, odps_orderdate, odps_custkey, odps_totalprice
FROM ots_table_external
WHERE odps_orderkey > 5000 ;
现在internal_orders就是一个MaxCompute表了,也拥有所有MaxCompute内部表的特性,包括高效的压缩列存储数据格式、完整的内部宏数据以及统计信息等。同时因为表存储在MaxCompute内部,所以访问速度会比访问外部的Tablestore更快。这种方法非常适用于需要进行多次计算的热点数据。
MaxCompute导出数据到Tablestore
insert overwrite table
操作实现,示例如下。 INSERT OVERWRITE TABLE ots_table_external
SELECT odps_orderkey, odps_orderdate, odps_custkey, CONCAT(odps_custkey, 'SHIPPED'), CEIL(odps_totalprice)
FROM internal_orders;
distribute by rand()
先将数据打散,示例如下。 INSERT OVERWRITE TABLE ots_table_external
SELECT odps_orderkey, odps_orderdate, odps_custkey, CONCAT(odps_custkey, 'SHIPPED'), CEIL(odps_totalprice)
FROM (SELECT * FROM internal_orders DISTRIBUTE BY rand()) t;
对于Tablestore这种KV数据的NoSQL存储介质,MaxCompute的输出将只影响相对应主键所在的行,例如示例中只影响所有odps_orderkey + odps_orderdate这两个主键值对应行上的数据。而且在这些Tablestore行上,也只会更新在创建外部表(ots_table_external)时指定的属性列,而不会修改未在外部表中出现的数据列。
- 将MaxCompute中的数据写入OTS时一次不能超过4MB,否则需要用户剔除掉超大数据再写入。此时可能会产生报错。
ODPS-0010000:System internal error - Output to TableStore failed with exception: TableStore BatchWrite request id XXXXX failed with error code OTSParameterInvalid and message:The total data size of BatchWriteRow request exceeds the limit
- 将数据批量写入或分行写入,都算一次操作。详细描述请参考BatchWriteRow。因此如果批量写入数据量太大,也可以分行写入。
- 将数据批量写入时请注意不要有重复行,否则可能产生如下报错。
ErrorCode: OTSParameterInvalid, ErrorMessage: The input parameter is invalid