本文为您介绍MaxCompute如何通过非结构化框架处理存储在OSS的各种流行开源数据格式(ORC、PARQUET、SEQUENCEFILE、RCFILE、AVRO和TEXTFILE)。

您可以通过DataWorks配合MaxCompute对外部表进行可视化的创建、搜索、查询、配置、加工和分析。详情请参见外部表

开源格式数据,非结构框架会直接调用开源社区的实现来进行开源数据格式解析,并且与MaxCompute系统无缝对接。
说明 处理OSS的开源格式数据前,需要首先对OSS进行STS模式授权,详情请参见 STS模式授权

创建外部表语法说明

MaxCompute非结构化数据框架通过外部表与各种数据的关联。关联OSS上开源数据外部表的语法示例如下。
DROP TABLE [IF EXISTS] <external_table>;
CREATE EXTERNAL TABLE [IF NOT EXISTS] <external_table>
(<column schemas>)
[PARTITIONED BY (partition column schemas)]
[ROW FORMAT SERDE '<serde class>'
  [WITH SERDEPROPERTIES ('odps.properties.rolearn'='${roleran}' [,'name2'='value2',...])]
]
STORED AS <file format>
LOCATION 'oss://${endpoint}/${bucket}/${userfilePath}/';
该语法格式与Hive的语法接近,但需要注意以下问题:
  • column schemas:外部表的列结构。必须与具体OSS上存储数据的列结构一致。
  • ROW FORMAT SERDE:非必选项,仅在使用一些特殊的格式(例如TEXTFILE)时才需要使用。
  • WITH SERDEPROPERTIES:当关联OSS使用STS模式授权时,需要该参数指定odps.properties.rolearn属性,属性值为RAM中具体使用的Role的Arn的信息。您可以在配置STORED AS <file format>的同时通过<serde class>说明file format文件格式。以ORC文件格式为例,如下所示。
    CREATE EXTERNAL TABLE [IF NOT EXISTS] <external_table>
    (<column schemas>)
    [PARTITIONED BY (partition column schemas)]
    ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
    WITH SERDEPROPERTIES ('odps.properties.rolearn'='${roleran}'
    STORED AS ORC
    LOCATION 'oss://${endpoint}/${bucket}/${userfilePath}/'
    说明 如果关联OSS不使用STS模式授权,则无需指定 odps.properties.rolearn属性,直接在Location传入明文 AccessKeyIdAccessKeySecret
  • STORED AS关键字:不是创建普通非结构化外部表时用的STORED BY关键字,这是目前在读取开源兼容数据时独有的。

    STORED AS后面接的是文件格式名字,例如ORC、PARQUET、RCFILE、SEQUENCEFILE或TEXTFILE等。STORED AS单个文件大小不能超过3 GB,如果文件过大,建议拆分。

    不同 file format对应的 serde class如下:
    • SEQUENCEFILE:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
    • TEXTFILE:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
    • RCFILE:org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe
    • ORC:org.apache.hadoop.hive.ql.io.orc.OrcSerde
    • ORCFILE:org.apache.hadoop.hive.ql.io.orc.OrcSerde
    • PARQUET:org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
    • AVRO:org.apache.hadoop.hive.serde2.avro.AvroSerDe
  • Location:如果关联OSS,需要使用明文AK,写法如下所示。
    LOCATION 'oss://${accessKeyId}:${accessKeySecret}@${endpoint}/${bucket}/${userPath}/'

    访问OSS外部表,目前不支持使用外网Endpoint。

关联OSS的PARQUET数据

假设有一些文件存放在一个OSS路径中,每个文件都是PARQUET格式,存放的Schema为16列(4列BIGINT、4列DOUBLE和8列STRING)的数据,建表DDL语句示例如下所示。
CREATE EXTERNAL TABLE tpch_lineitem_parquet
(
  l_orderkey bigint,
  l_partkey bigint,
  l_suppkey bigint,
  l_linenumber bigint,
  l_quantity double,
  l_extendedprice double,
  l_discount double,
  l_tax double,
  l_returnflag string,
  l_linestatus string,
  l_shipdate string,
  l_commitdate string,
  l_receiptdate string,
  l_shipinstruct string,
  l_shipmode string,
  l_comment string
)
STORED AS PARQUET
LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/parquet_data/';
默认PARQUET数据不压缩,如果您需要在MaxCompute上压缩PARQUET数据,打开 set odps.sql.hive.compatible=true;开关,压缩格式如下:
  • 设置TBLPROPERTIES'mcfed.parquet.compression'='SNAPPY',指定PARQUET的压缩格式为SNAPPY
  • 设置TBLPROPERTIES'mcfed.parquet.compression'='GZIP',指定PARQUET的压缩格式为GZIP

关联OSS的TEXT数据

  • 如果数据为JSON格式,存储为TEXTFILE文件,同时多个TEXTFILE文件存放在OSS的多个目录中,并以统一存储和命名方式组织,则可以使用MaxCompute分区表和数据进行关联。创建分区表的DDL语句示例如下。
    CREATE EXTERNAL TABLE tpch_lineitem_textfile
    (
      l_orderkey bigint,
      l_partkey bigint,
      l_suppkey bigint,
      l_linenumber bigint,
      l_quantity double,
      l_extendedprice double,
      l_discount double,
      l_tax double,
      l_returnflag string,
      l_linestatus string,
      l_shipdate string,
      l_commitdate string,
      l_receiptdate string,
      l_shipinstruct string,
      l_shipmode string,
      l_comment string
    )
    PARTITIONED BY (ds string)
    ROW FORMAT serde 'org.apache.hive.hcatalog.data.JsonSerDe'
    STORED AS TEXTFILE
    LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data/';
  • 如果OSS表目录下面的分区目录是以Partition Name方式组织,示例如下。
    oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data/ds=20170102/'
    oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data/ds=20170103/'
    ...
    这种情况下,可以使用以下DDL语句ADD PARTITION。
    ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170102");
    ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170103");
  • 如果OSS分区目录不是按Partition Name方式组织,或者根本不在表目录下,示例如下。
    oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170102/;
    oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170103/;
    ...
    这种情况下,可以使用以下DDL语句ADD PARTITION。
    ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170102")
    LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170102/';
    ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170103")
    LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170103/';
    ...
  • TEXT数据建表不支持自定义ROW FORMAT字符。ROW FORMAT默认值如下。
    FIELDS TERMINATED BY :'\001'
    ESCAPED BY :'\'
    COLLECTION ITEMS TERMINATED BY :'\002'
    MAP KEYS TERMINATED BY :'\003'
    LINES TERMINATED BY :'\n'
    NULL DEFINED AS :'\N'

关联OSS的CSV数据

DDL语句格式如下所示。
CREATE EXTERNAL TABLE [IF NOT EXISTS] 
(<column schemas>)
[PARTITIONED BY (partition column schemas)]
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
  WITH SERDEPROPERTIES
    ('separatorChar'=',', 'quoteChar'='"', 'escapeChar'='\\')
STORED AS TEXTFILE
LOCATION 'oss://${endpoint}/${bucket}/${userfilePath}/';
从上述语句可以看出,CSV数据DDL语句支持SERDEPROPERTIES(key:默认值)。
separatorChar:','
quoteChar:'"'
escapeChar:'\'
说明 Hive OpenCSVSerde只支持STRING类型。Hive OpenCSVSerde当前不属于Builtin Serde,执行DML语句时,您需要打开 set odps.sql.hive.compatible=true;开关。

关联OSS的JSON数据

DDL语句格式如下所示,支持SERDEPROPERTIES。
CREATE EXTERNAL TABLE [IF NOT EXISTS] 
(<column schemas>)
[PARTITIONED BY (partition column schemas)]
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS TEXTFILE
LOCATION 'oss://${endpoint}/${bucket}/${userfilePath}/';

关联OSS的ORC数据

DDL语句格式如下所示。
CREATE EXTERNAL TABLE [IF NOT EXISTS] 
(<column schemas>)
[PARTITIONED BY (partition column schemas)]
STORED AS ORC
LOCATION 'oss://${endpoint}/${bucket}/${userfilePath}/';

关联OSS的AVRO数据

DDL语句格式如下所示。
CREATE EXTERNAL TABLE [IF NOT EXISTS] 
(<column schemas>)
[PARTITIONED BY (partition column schemas)]
STORED AS AVRO
LOCATION 'oss://${endpoint}/${bucket}/${userfilePath}/';

关联OSS的SEQUENCEFILE数据

DDL语句格式如下所示。
CREATE EXTERNAL TABLE [IF NOT EXISTS] 
(<column schemas>)
[PARTITIONED BY (partition column schemas)]
STORED AS SEQUENCEFILE
LOCATION 'oss://${endpoint}/${bucket}/${userfilePath}/';

读取以及处理OSS的开源格式数据

基于上述创建外部表示例,可以看出对于不同文件类型,您只需要简单修改STORED AS后的格式名。在下述示例中只集中描述对上述PARQUET数据对应外表(tpch_lineitem_parquet)的处理。如果要处理不同的文件类型,只要在DDL创建外表时指定是PARQUET、ORC、TEXTFILE或RCFILE即可,处理数据的语句一样。

  • 直接读取以及处理OSS的开源数据
    创建数据外表进行关联后,直接对外表进行与普通MaxCompute表一样的操作,如下所示。
    SELECT l_returnflag, l_linestatus,
    SUM(l_extendedprice*(1-l_discount)) AS sum_disc_price,
    AVG(l_quantity) AS avg_qty,
    COUNT(*) AS count_order
    FROM tpch_lineitem_parquet
    WHERE l_shipdate <= '1998-09-02'
    GROUP BY l_returnflag, l_linestatus;

    外表tpch_lineitem_parquet被当作一个普通的内部表一样使用,不同之处在于,MaxCompute内部的计算引擎是直接从OSS读取对应的PARQUET数据进行处理的。

    对于关联TEXTFILE的外部分区表tpch_lineitem_textfile,因为使用了 ROW FORMAT+ STORED ASodps.sql.hive.compatible默认为FALSE,所以需要手动设置flag set odps.sql.hive.compatible=true;再读取数据,否则会报错。
    SELECT * FROM tpch_lineitem_textfile LIMIT 1;
    FAILED: ODPS-0123131:User defined function exception - Traceback:
    com.aliyun.odps.udf.UDFException: java.lang.ClassNotFoundException: com.aliyun.odps.hive.wrapper.HiveStorageHandlerWrapper
    //需要手动设置Hive兼容flag。
    set odps.sql.hive.compatible=true;
    SELECT * FROM tpch_lineitem_textfile LIMIT 1;
    +------------+------------+------------+--------------+------------+-----------------+------------+------------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------+
    | l_orderkey | l_partkey  | l_suppkey  | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax      | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment |
    +------------+------------+------------+--------------+------------+-----------------+------------+------------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------+
    | 5640000001 | 174458698  | 9458733    | 1            | 14.0       | 23071.58        | 0.08       | 0.06       | N            | O            | 1998-01-26 | 1997-11-16   | 1998-02-18    | TAKE BACK RETURN | SHIP       | cuses nag silently. quick |
    +------------+------------+------------+--------------+------------+-----------------+------------+------------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------+
    说明
    • 直接使用外表,每次读取数据都需要涉及外部OSS的I/O操作,且MaxCompute系统本身针对内部存储做的许多高性能优化都无法应用,因此性能上就会有所损失。所以,如果您需要对数据进行反复计算或对计算的高效性比较敏感,推荐先将数据导入MaxCompute内部再进行计算。
    • SQL(CREATE、SELECT或INSERT等操作)中涉及到复杂数据类型,需在SQL语句前添加语句set odps.sql.type.system.odps2=true;,执行时set语句和SQL语句一起提交执行。详情请参见数据类型版本说明
  • 将OSS的开源数据导入MaxCompute再进行计算
    首先创建一个与外部表Schema一样的内部表tpch_lineitem_internal,然后将OSS上的开源数据导入MaxCompute内部表,以MaxCompute内部数据存储格式进行存储。
    CREATE TABLE tpch_lineitem_internal LIKE tpch_lineitem_parquet;
    INSERT OVERWRITE TABLE tpch_lineitem_internal;
    SELECT * FROM tpch_lineitem_parquet;

    然后在内部表上执行基于外部表的复杂查询语句,可以获得更高的计算性能。

    SELECT l_returnflag, l_linestatus,
    SUM(l_extendedprice*(1-l_discount)) AS sum_disc_price,
    AVG(l_quantity) AS avg_qty,
    COUNT(*) AS count_order
    FROM tpch_lineitem_internal
    WHERE l_shipdate <= '1998-09-02'
    GROUP BY l_returnflag, l_linestatus;

处理OSS数据常见问题

作业报错Inline data exceeds the maximun allowed size

问题原因:OSS Store对于每一个小文件有一个大小限制,如果超过3 GB则报错。

处理方法:针对该问题,您可以通过调整以下两个flag值进行处理。其原理是通过flag调整执行计划,控制每个Reducer写入外部表OSS的数据大小,使得OSS Store文件不超过3 GB的限制。

set odps.sql.mapper.split.size=256; #调整每个Mapper读取table数据的大小,单位是MB。
set odps.sql.reducer.instances=100; #调整执行计划的Reducer数量。