全部产品
Search
文档中心

云原生大数据计算服务 MaxCompute:Tablestore外部表

更新时间:Dec 07, 2023

本文将进一步为您介绍如何将来自Tablestore(原OTS)的数据纳入MaxCompute上的计算生态,实现多种数据源之间的无缝连接。

背景信息

表格存储(Tablestore)是构建在阿里云飞天分布式系统之上的NoSQL数据存储服务,提供海量结构化数据的存储和实时访问,详情请参见Tablestore文档

您可以通过DataWorks配合MaxCompute对外部表进行可视化的创建、搜索、查询、配置、加工和分析。详情请参见外部表

注意事项

  • MaxCompute与Tablestore是两个独立的大数据计算和存储服务,所以两者之间的网络必须保证连通性。MaxCompute公共云服务访问Tablestore存储时,推荐您使用Tablestore私网地址,即Host名以ots-internal.aliyuncs.com作为结尾的地址,例如tablestore://odps-ots-dev.cn-shanghai.ots-internal.aliyuncs.com

  • Tablestore与MaxCompute都有其自身的类型系统。在MaxCompute处理Tablestore数据时,两者之间的数据类型对应关系如下所示。

    MaxCompute Type

    Tablestore Type

    STRING

    STRING

    BIGINT

    INTEGER

    DOUBLE

    DOUBLE

    BOOLEAN

    BOOLEAN

    BINARY

    BINARY

  • Tablestore外部表不支持cluster属性。

前提条件

创建外部表

MaxCompute通过创建外部表,把对Tablestore表数据的描述引入到MaxCompute的meta系统内部后,即可实现对Tablestore数据的处理。本节通过下述示例为您说明MaxCompute对接Tablestore的一些概念和实现。

创建外部表语句示例如下。

DROP TABLE IF EXISTS ots_table_external;
CREATE EXTERNAL TABLE IF NOT EXISTS ots_table_external
(
odps_orderkey bigint,
odps_orderdate string,
odps_custkey bigint,
odps_orderstatus string,
odps_totalprice double,
odps_createdate timestamp
)
STORED BY 'com.aliyun.odps.TableStoreStorageHandler'
WITH SERDEPROPERTIES (
'tablestore.columns.mapping'=':o_orderkey,:o_orderdate,o_custkey,o_orderstatus,o_totalprice',
'tablestore.table.name'='ots_tpch_orders',
'odps.properties.rolearn'='acs:ram::xxxxx:role/aliyunodpsdefaultrole',
'tablestore.read.mode'='permissive',
'tablestore.corrupt.column'='ColumnName',
'tablestore.timestamp.ticks.unit'='seconds',
'tablestore.column.odps_createdate.timestamp.ticks.unit'='millis',
'tablestore.table.put.row'='true'
)
LOCATION 'tablestore://odps-ots-dev.cn-shanghai.ots-internal.aliyuncs.com';

上述建表语句重点参数介绍如下表所示。

参数

是否必填

说明

com.aliyun.odps.TableStoreStorageHandler

是MaxCompute内置的处理Tablestore数据的StorageHandler,定义了MaxCompute和Tablestore的交互,相关逻辑由MaxCompute实现。

tablestore.columns.mapping

描述MaxCompute将访问的Tablestore表的列,包括主键和属性列。

  • 以冒号(:)开头用于表示Tablestore主键,例如示例中的:o_orderkey:o_orderdate,其他的均为属性列。

  • Tablestore支持1~4个主键,主键类型为STRINGINTEGERBINARY,其中第一个主键为分区键。

  • 在指定映射时,您必须提供指定Tablestore表的所有主键和需要通过MaxCompute访问的属性列。

tablestore.table.name

需要访问的Tablestore表名称。本文以ots_tpch_orders为例。

odps.properties.rolearn

RAM中AliyunODPSDefaultRole的ARN信息。您可以通过RAM控制台中的RAM角色管理进行获取。

tablestore.timestamp.ticks.unit

表级别时间类型设置。用以指定该外部表中所有Integer的字段都处于同一个时间类型。取值如下:

  • seconds(秒)

  • millis(毫秒)

  • micros(微秒)

  • nanos(纳秒)

tablestore.column.<col1_name>.timestamp.ticks.unit

列级别时间类型设置。用以指定该外部表中列字段的时间类型。取值如下:

  • seconds(秒)

  • millis(毫秒)

  • micros(微秒)

  • nanos(纳秒)

说明

tablestore.timestamp.ticks.unittablestore.column.<col1_name>.timestamp.ticks.unit都配置时,tablestore.column.<col1_name>.timestamp.ticks.unit参数的优先级更高。

tablestore.table.put.row

支持指定PutRow的写入方式。取值说明如下:

  • True:打开。

  • False(默认值):关闭。

说明

可以通过设置以下Flag的参数值指定PutRow的写入方式,默认值为False。详情请参见Flag参数列表

set odps.sql.unstructured.tablestore.put.row=true;

tablestore.read.mode

当遇到脏数据时行为定义字段,取值说明如下:

  • permissive(默认值):当遇到脏数据时,可以跳过。

  • failfast:当遇到脏数据时,直接报错。

关于脏数据处理示例,详情请参见脏数据处理示例

tablestore.corrupt.column

指定脏数据写入列。

  • 仅当tablestore.read.mode参数取值permissive时,需要配置此参数。

  • 指定的列名称为MaxCompute外部表列名称,且只能是最后一列。

  • 不支持指定Tablestore主键列。

关于脏数据处理示例,详情请参见脏数据处理示例

LOCATION

用来指定Tablestore的Instance名、Endpoint等具体信息。这里的Tablestore数据的安全访问建立在前文介绍的RAM/STS授权的前提上。

说明

如果您使用公网地址报错,显示网络不同,可尝试更换为经典网地址。

您可以执行如下语句,查看创建好的外部表结构信息。

desc extended <table_name>;
说明

在返回的信息里,除了包含和内部表一样的基础信息,Extended Info还包含外部表StorageHandler、Location等信息。

查询外部表

创建完外部表后,Tablestore的数据便引入到了MaxCompute生态中,即可通过正常的MaxCompute SQL语法访问Tablestore数据,如下所示。

SELECT odps_orderkey, odps_orderdate, SUM(odps_totalprice) AS sum_total
FROM ots_table_external
WHERE odps_orderkey > 5000 AND odps_orderkey < 7000 AND odps_orderdate >= '1996-05-03' AND odps_orderdate < '1997-05-01'
GROUP BY odps_orderkey, odps_orderdate
HAVING sum_total> 400000.0;
说明

在查询表或字段时,无需区分大小写,且不支持强制转换大小写。

使用常见的MaxCompute SQL语句访问Tablestore时,所有的操作细节(例如列名的选择)是在MaxCompute内部处理完成的。上述SQL示例中,使用的列名是odps_orderkey、odps_totalprice等,而不是原始Tablestore中的主键名o_orderkey或属性列名o_totalprice。这是因为在创建外部表的DDL语句中,已经完成了对应的Mapping。您也可以在创建外部表时,按需选择保留原始的Tablestore主键/列名。

如果您需要对一份数据做多次计算,相比每次从Tablestore去远程读数据,更高效的方法是先一次性把需要的数据导入到MaxCompute内部成为一个MaxCompute(内部)表,示例如下。

CREATE TABLE internal_orders AS
SELECT odps_orderkey, odps_orderdate, odps_custkey, odps_totalprice
FROM ots_table_external
WHERE odps_orderkey > 5000 ;

现在internal_orders就是一个MaxCompute表了,也拥有所有MaxCompute内部表的特性,包括高效的压缩列存储数据格式、完整的内部宏数据以及统计信息等。同时因为表存储在MaxCompute内部,所以访问速度会比访问外部的Tablestore更快。这种方法非常适用于需要进行多次计算的热点数据。

MaxCompute导出数据到Tablestore

说明

MaxCompute不会主动创建外部的Tablestore表,所以在对Tablestore表进行数据输出之前,必须保证该表已经在Tablestore上完成创建(否则将报错)。

根据上面的操作,您已创建了外部表ots_table_external来打通MaxCompute与Tablestore数据表ots_tpch_orders的链路,同时还有一份存储在MaxCompute内部表internal_orders的数据。现在,如果您需要对internal_orders中的数据进行处理后再写回Tablestore,则可通过对外部表执行insert overwrite table操作实现,示例如下。

INSERT OVERWRITE TABLE ots_table_external
SELECT odps_orderkey, odps_orderdate, odps_custkey, CONCAT(odps_custkey, 'SHIPPED'), CEIL(odps_totalprice)
FROM internal_orders;
说明

如果MaxCompute表内数据本身有一定的顺序,例如已经按照Primary Key做过一次排序,则在写入到OTS表时,会导致压力集中在一个OTS分区上面,无法充分利用分布式写入的特点。因此,当出现这种情况时,建议您通过distribute by rand()先将数据打散,示例如下。

INSERT OVERWRITE TABLE ots_table_external
SELECT odps_orderkey, odps_orderdate, odps_custkey, CONCAT(odps_custkey, 'SHIPPED'), CEIL(odps_totalprice)
FROM (SELECT * FROM internal_orders DISTRIBUTE BY rand()) t;

对于Tablestore这种KV数据的NoSQL存储介质,MaxCompute的输出将只影响相对应主键所在的行,例如示例中只影响所有odps_orderkey + odps_orderdate这两个主键值对应行上的数据。而且在这些Tablestore行上,也只会更新在创建外部表(ots_table_external)时指定的属性列,而不会修改未在外部表中出现的数据列。

说明
  • 将MaxCompute中的数据写入OTS时一次不能超过4MB,否则需要用户剔除掉超大数据再写入。此时可能会产生报错。

    ODPS-0010000:System internal error - Output to TableStore failed with exception:
    TableStore BatchWrite request id XXXXX failed with error code OTSParameterInvalid and message:The total data size of BatchWriteRow request exceeds the limit
  • 将数据批量写入或分行写入,都算一次操作。详细描述请参考BatchWriteRow。因此如果批量写入数据量太大,也可以分行写入。

  • 将数据批量写入时请注意不要有重复行,否则可能产生如下报错。

    ErrorCode: OTSParameterInvalid, ErrorMessage: The input parameter is invalid 

    详细描述请参见使用BatchWriteRow一次提交100条数据的时候报OTSParameterInvalid错误

  • 由于Tablestore是KV存储,使用insert overwrite table写入数据到Tablestore表,不会清空目标Tablestore表的全部内容,只会覆盖目标表中Key一致的Value值。

脏数据处理示例

  1. 准备Tablestore表mf_ots_test和数据。具体操作,请参见通过控制台使用宽表模型

    默认Tablestore的表格数据如下所示。

    +----+------+------+
    | id | name | desc |
    +----+------+------+
    | 1  | 张三 | 张三的描述 |
    +----+------+------+
  2. 创建MaxCompute外部表。

    CREATE EXTERNAL TABLE IF NOT EXISTS mf_ots_external_permi
    (
     id string,
    	name bigint,
    	desc string,
    	corrupt_col string
    )
    STORED BY 'com.aliyun.odps.TableStoreStorageHandler'
    WITH SERDEPROPERTIES (
    'tablestore.columns.mapping'=':id,name,desc',
    'tablestore.table.name'='mf_ots_test',
    'tablestore.read.mode'='permissive',
    'tablestore.corrupt.column'='corrupt_col',
    'odps.properties.rolearn'='acs:ram::139699392458****:role/aliyunodpsdefaultrole'
    )
    LOCATION 'tablestore://santie-doc.cn-shanghai.ots-internal.aliyuncs.com';
  3. 执行以下代码,查询MaxCompute外部表数据。

    ---查询数据
    select * from mf_ots_external_permi;

    返回结果如下,错误字段会以JSON格式写入至corrupt_col列中。

    --
    +------------+------------+------------+-------------+
    | id         | name       | desc       | corrupt_col |
    +------------+------------+------------+-------------+
    | 1          | NULL       | 张三的描述      | {"name": "\"张三\""} |
    +------------+------------+------------+-------------+
    说明

    tablestore.read.mode未配置或配置为permissive,但未配置tablestore.corrupt.column指定脏数据存入列,查询外表时会报错“Columns not match with columns mapping and corrupt column”