本文介紹Parquet格式的OSS外部表格的建立、讀取及寫入方法。
適用範圍
OSS外部表格不支援cluster屬性。
單個檔案大小不能超過2GB,如果檔案過大,建議拆分。
MaxCompute需要與OSS部署在同一地區。
支援資料類型
MaxCompute資料類型詳情請參見1.0資料類型版本、2.0資料類型版本。
JNI模式:
set odps.ext.parquet.native=false,表示讀外部表格解析Parquet資料檔案時,使用原有基於Java的開源社區實現,支援讀和寫。Native模式:
set odps.ext.parquet.native=true,表示讀外部表格解析Parquet資料檔案時,使用新的基於C++的Native實現,僅支援讀。模式
Java模式(讀寫)
Native模式(唯讀)
TINYINT


SMALLINT


INT


BIGINT


BINARY


FLOAT


DOUBLE


DECIMAL(precision,scale)


VARCHAR(n)


CHAR(n)


STRING


DATE


DATETIME


TIMESTAMP


TIMESTAMP_NTZ


BOOLEAN


ARRAY


MAP


STRUCT


JSON


支援壓縮格式
當讀寫壓縮屬性的OSS檔案時,需要在建表語句中添加
with serdeproperties屬性配置,詳情請參見with serdeproperties屬性參數。支援讀寫的資料檔案格式:以ZSTD、SNAPPY、GZIP方式壓縮的Parquet。
建立外部表格
文法結構
當Parquet檔案中的Schema與外表Schema不一致時:
列數不一致:如果Parquet檔案中的列數小於外表DDL的列數,則讀取Parquet資料時,系統會將缺少的列值補充為NULL。反之(大於時),會丟棄超出的列資料。
列類型不一致:如果Parquet檔案中的列類型與外表DDL中對應的列類型不一致,則讀取Parquet資料時會報錯。例如:使用STRING(或INT)類型接收Parquet檔案中INT(或STRING)類型的資料,報錯
ODPS-0123131:User defined function exception - Traceback:xxx。
精簡文法結構
CREATE EXTERNAL TABLE [IF NOT EXISTS] <mc_oss_extable_name>
(
<col_name> <data_type>,
...
)
[COMMENT <table_comment>]
[PARTITIONED BY (<col_name> <data_type>, ...)]
STORED AS parquet
LOCATION '<oss_location>'
[tblproperties ('<tbproperty_name>'='<tbproperty_value>',...)];詳細文法結構
CREATE EXTERNAL TABLE [IF NOT EXISTS] <mc_oss_extable_name>
(
<col_name> <data_type>,
...
)
[COMMENT <table_comment>]
[PARTITIONED BY (<col_name> <data_type>, ...)]
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
WITH serdeproperties(
'odps.properties.rolearn'='acs:ram::<uid>:role/aliyunodpsdefaultrole',
'mcfed.parquet.compression'='ZSTD/SNAPPY/GZIP'
)
STORED AS parquet
LOCATION '<oss_location>'
;公用參數
公用參數說明請參見基礎文法參數說明。
獨有參數
with serdeproperties屬性參數
property_name | 使用情境 | 說明 | property_value | 預設值 |
mcfed.parquet.compression | 當需要將Parquet資料以壓縮方式寫入OSS時,請添加該屬性。 | Parquet壓縮屬性。Parquet資料預設不壓縮。 |
| 無 |
mcfed.parquet.compression.codec.zstd.level | 當 | level值越大,壓縮比越高,實測取值高時,寫出資料的減少量非常有限,但時間和資源消耗快速增加,性價比明顯降低,因此對於巨量資料讀寫壓縮Parquet檔案的情境,低level(level3~level5)的zstd壓縮效果最好。例如: | 取值範圍為1~22。 | 3 |
parquet.file.cache.size | 在處理Parquet資料情境中,如果需要提升讀OSS資料檔案效能,請添加該屬性。 | 指定讀OSS資料檔案時,可快取的資料量,單位為KB。 | 1024 | 無 |
parquet.io.buffer.size | 在處理Parquet資料情境中,如果需要提升讀OSS資料檔案效能,請添加該屬性。 | 指定OSS資料檔案大小超過1024 KB時,可快取的資料量,單位為KB。 | 4096 | 無 |
tblproperties屬性參數
property_name | 使用情境 | 說明 | property_value | 預設值 |
io.compression.codecs | 當OSS資料檔案為Raw-Snappy格式時,請添加該屬性。 | 內建的開來源資料解析器SNAPPY格式情境。 配置該參數值為True時,MaxCompute才可以正常讀取壓縮資料,否則MaxCompute無法成功讀取資料。 | com.aliyun.odps.io.compress.SnappyRawCodec。 | 無 |
odps.external.data.output.prefix (相容odps.external.data.prefix) | 當需要添加輸出檔案的自訂首碼名時,請添加該屬性。 |
| 合格字元組合,例如'mc_'。 | 無 |
odps.external.data.enable.extension | 當需要顯示輸出檔案的副檔名時,請添加該屬性。 | True表示顯示輸出檔案的副檔名,反之不顯示副檔名。 |
| False |
odps.external.data.output.suffix | 當需要添加輸出檔案的自訂尾碼名時,請添加該屬性。 | 僅包含數字,字母,底線(a-z, A-Z, 0-9, _)。 | 合格字元組合,例如'_hangzhou'。 | 無 |
odps.external.data.output.explicit.extension | 當需要添加輸出檔案的自訂副檔名時,請添加該屬性。 |
| 合格字元組合,例如"jsonl"。 | 無 |
mcfed.parquet.compression | 當需要將Parquet資料以壓縮方式寫入OSS時,請添加該屬性。 | Parquet壓縮屬性。Parquet資料預設不壓縮。 |
| 無 |
mcfed.parquet.block.size | 控制Parquet檔案的塊大小,影響儲存效率和讀取效能。 | Parquet調優屬性。定義Parquet塊大小,以位元組為單位。 | 非負整數 | 134217728 (128MB) |
mcfed.parquet.block.row.count.limit | 當向Parquet外部表格寫入資料時,限制每個行組中的記錄數,避免記憶體溢出。 | Parquet調優屬性。控制每行組(row group)的最大記錄數。如果出現OOM的情況,可將參數適當調小。 參數使用建議:
| 非負整數 | 2147483647 (Integer.MAX_VALUE) |
mcfed.parquet.page.size.row.check.min | 當向Parquet外部表格寫入資料時,控制記憶體檢查的頻率,防止記憶體溢出。 | Parquet調優屬性。限制記憶體檢查之間的最小記錄數。如果出現OOM的情況,可將參數適當調小。 | 非負整數 | 100 |
mcfed.parquet.page.size.row.check.max | 當向Parquet外部表格寫入資料時,控制記憶體檢查的頻率,防止記憶體溢出。 | Parquet調優屬性。限制記憶體檢查之間的最小記錄數。如果出現OOM的情況,可將參數適當調小。 由於需要對記憶體使用量情況進行頻繁計算,該參數調整可能會帶來額外開銷。 參數使用建議:
| 非負整數 | 1000 |
mcfed.parquet.compression.codec.zstd.level | 當需要將Parquet資料以ZSTD的壓縮形式寫入OSS,指定ZSTD壓縮演算法的壓縮層級時,請添加該屬性。 | Parquet壓縮屬性。指ZSTD壓縮演算法的壓縮層級。取值範圍:1~22。 | 非負整數 | 3 |
寫入資料
MaxCompute寫入文法詳情,請參見寫入文法說明。
查詢分析
SELECT文法詳情,請參見查詢文法說明。
最佳化查詢計劃詳情,請參見查詢最佳化。
若需要直讀LOCATION檔案,請參見特色功能:Schemaless Query。
查詢最佳化:Parquet外部表格支援通過開啟PPD(即Predicate Push Down)實現查詢最佳化。最佳化效能結果參見支援Predicate Push Down(Parquet PPD)。
在SQL前添加如下參數開啟PPD:
-- PPD參數需在Native模式下使用,即Native開關需為true。 -- 開啟parquet native reader。 SET odps.ext.parquet.native = true; -- 開啟parquet ppd。 SET odps.sql.parquet.use.predicate.pushdown = true;
支援Predicate Push Down(Parquet PPD)
Parquet外部表格本身不支援Predicate Push Down(Parquet PPD),執行帶有WHERE過濾條件的查詢時,MaxCompute預設會掃描所有資料,導致不必要的I/O、資源消耗和查詢延遲。因此,MaxCompute新增了ParquetPDD參數,通過開啟Parquet PDD,可在資料掃描階段利用Parquet檔案自身的中繼資料特性實現Parquet RowGroup層級的過濾,從而提升查詢效能、降低資源消耗與成本。
使用方式
開啟Predicate Push Down(Parquet PPD)
執行SQL查詢前,通過
set命令設定以下兩個Session層級的參數以開啟Parquet PDD。-- 開啟parquet native reader。 set odps.ext.parquet.native = true; -- 開啟parquet ppd。 set odps.sql.parquet.use.predicate.pushdown = true;使用樣本
基於1TB的TPCDS測試資料集,以
tpcds_1t_store_salesParquet外部表格為例,開啟PPD並執行過濾查詢,總資料量為2879987999。-- 建立外部表格tpcds_1t_store_sales。 CREATE EXTERNAL TABLE IF NOT EXISTS tpcds_1t_store_sales ( ss_sold_date_sk BIGINT, ss_sold_time_sk BIGINT, ss_item_sk BIGINT, ss_customer_sk BIGINT, ss_cdemo_sk BIGINT, ss_hdemo_sk BIGINT, ss_addr_sk BIGINT, ss_store_sk BIGINT, ss_promo_sk BIGINT, ss_ticket_number BIGINT, ss_quantity BIGINT, ss_wholesale_cost DECIMAL(7,2), ss_list_price DECIMAL(7,2), ss_sales_price DECIMAL(7,2), ss_ext_discount_amt DECIMAL(7,2), ss_ext_sales_price DECIMAL(7,2), ss_ext_wholesale_cost DECIMAL(7,2), ss_ext_list_price DECIMAL(7,2), ss_ext_tax DECIMAL(7,2), ss_coupon_amt DECIMAL(7,2), ss_net_paid DECIMAL(7,2), ss_net_paid_inc_tax DECIMAL(7,2), ss_net_profit DECIMAL(7,2) ) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' WITH serdeproperties( 'odps.properties.rolearn'='acs:ram::<uid>:role/aliyunodpsdefaultrole', 'mcfed.parquet.compression'='zstd' ) STORED AS parquet LOCATION 'oss://oss-cn-hangzhou-internal.aliyuncs.com/oss_bucket_path/'; -- 使用1TB的TPCDS測試資料集。 INSERT OVERWRITE TABLE tpcds_1t_store_sales SELECT ss_sold_date_sk, ss_sold_time_sk, ss_item_sk, ss_customer_sk, ss_cdemo_sk, ss_hdemo_sk, ss_addr_sk, ss_store_sk, ss_promo_sk, ss_ticket_number, ss_quantity, ss_wholesale_cost, ss_list_price, ss_sales_price, ss_ext_discount_amt, ss_ext_sales_price, ss_ext_wholesale_cost, ss_ext_list_price, ss_ext_tax, ss_coupon_amt, ss_net_paid, ss_net_paid_inc_tax, ss_net_profit FROM bigdata_public_dataset.tpcds_1t.store_sales; -- 執行查詢。 SELECT SUM(ss_sold_date_sk) FROM tpcds_1t_store_sales WHERE ss_sold_date_sk >= 2451871 AND ss_sold_date_sk <= 2451880;
效能對比結果
開啟PPD功能可減少資料掃描量,從而降低查詢延遲和資源消耗。
模式 | 表總資料量 | 掃描條數 | 掃描Bytes | Mapper耗時 | 總資源消耗 | 說明 |
Parquet外表+未開啟PPD | 2879987999 | 2879987999(100%) | 19386793984(100%) | 18s | cpu 19.25 Core * Min, memory 24.07 GB * Min 100% | |
Parquet外表+開啟PPD | 2879987999 | 762366649(26.47%) | 3339386880(17.22%) | 12s | cpu 11.47 Core * Min, memory 14.33 GB * Min ~59.58% | 降低掃描資料量後,延遲和資源消耗降低很明顯 |
內表+開啟PPD | 2879987999 | 32830000(1.14%) | 1633880386(8.43%) | 9s | cpu 5.62 Core * Min, memory 7.02 GB * Min ~29.19% | 內表資料有排序,因此PPD效果更加極致 |
測試詳情
Parquet外表+未開啟PPD
SET odps.ext.parquet.native = true; SET odps.sql.parquet.use.predicate.pushdown = false; SELECT SUM(ss_sold_date_sk) FROM tpcds_1t_store_sales WHERE ss_store_sk = 2 AND ss_sold_date_sk >= 2451871 AND ss_sold_date_sk <= 2451880;


Parquet外表+開啟PPD
SET odps.ext.parquet.native = true; SET odps.sql.parquet.use.predicate.pushdown = true; SELECT SUM(ss_sold_date_sk) FROM tpcds_1t_store_sales WHERE ss_store_sk = 2 AND ss_sold_date_sk >= 2451871 AND ss_sold_date_sk <= 2451880;

存在很多空Mapper,無需讀資料:

實際裁減RowGroup的日誌:

內表+開啟PPD
內表資料有排序屬性,因此裁減效果更明顯。
SELECT SUM(ss_sold_date_sk) FROM bigdata_public_dataset.tpcds_1t.store_sales WHERE ss_store_sk = 2 AND ss_sold_date_sk >= 2451871 AND ss_sold_date_sk <= 2451880;


情境樣本
本樣本將建立以ZSTD壓縮的Parquet格式外表,並進行讀取和寫入操作。
前置準備
已準備好OSS儲存空間(Bucket)、OSS目錄。具體操作請參見建立儲存空間、管理目錄。
由於MaxCompute只在部分地區部署,跨地區的資料連通性可能存在問題,因此建議Bucket與MaxCompute專案所在地區保持一致。
授權
具備訪問OSS的許可權。阿里雲帳號(主帳號)、RAM使用者或RAMRole身份可以訪問OSS外部表格,授權資訊請參見OSS的STS模式授權。
已具備在MaxCompute專案中建立表(CreateTable)的許可權。表操作的許可權資訊請參見MaxCompute許可權。
準備ZSTD格式資料檔案。
在樣本資料的
oss-mc-testBucket中建立parquet_zstd_jni/dt=20230418目錄層級,並將存放在分區目錄dt=20230418下。建立ZSTD壓縮格式的Parquet外部表格。
CREATE EXTERNAL TABLE IF NOT EXISTS mc_oss_parquet_data_type_zstd ( vehicleId INT, recordId INT, patientId INT, calls INT, locationLatitute DOUBLE, locationLongtitue DOUBLE, recordTime STRING, direction STRING ) PARTITIONED BY (dt STRING ) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' WITH serdeproperties( 'odps.properties.rolearn'='acs:ram::<uid>:role/aliyunodpsdefaultrole', 'mcfed.parquet.compression'='zstd' ) STORED AS parquet LOCATION 'oss://oss-cn-hangzhou-internal.aliyuncs.com/oss-mc-test/parquet_zstd_jni/';引入分區資料。當建立的OSS外部表格為分區表時,需要額外執行引入分區資料的操作,更多操作請參見補全OSS外部表格分區資料文法。
-- 引入分區資料。 MSCK REPAIR TABLE mc_oss_parquet_data_type_zstd ADD PARTITIONS;讀取Parquet外表資料。
SELECT * FROM mc_oss_parquet_data_type_zstd WHERE dt='20230418' LIMIT 10;部分返回結果如下:
+------------+------------+------------+------------+------------------+-------------------+----------------+------------+------------+ | vehicleid | recordid | patientid | calls | locationlatitute | locationlongtitue | recordtime | direction | dt | +------------+------------+------------+------------+------------------+-------------------+----------------+------------+------------+ | 1 | 12 | 76 | 1 | 46.81006 | -92.08174 | 9/14/2014 0:10 | SW | 20230418 | | 1 | 1 | 51 | 1 | 46.81006 | -92.08174 | 9/14/2014 0:00 | S | 20230418 | | 1 | 2 | 13 | 1 | 46.81006 | -92.08174 | 9/14/2014 0:01 | NE | 20230418 | | 1 | 3 | 48 | 1 | 46.81006 | -92.08174 | 9/14/2014 0:02 | NE | 20230418 | | 1 | 4 | 30 | 1 | 46.81006 | -92.08174 | 9/14/2014 0:03 | W | 20230418 | | 1 | 5 | 47 | 1 | 46.81006 | -92.08174 | 9/14/2014 0:04 | S | 20230418 | | 1 | 6 | 9 | 1 | 46.81006 | -92.08174 | 9/14/2014 0:05 | S | 20230418 | | 1 | 7 | 53 | 1 | 46.81006 | -92.08174 | 9/14/2014 0:06 | N | 20230418 | | 1 | 8 | 63 | 1 | 46.81006 | -92.08174 | 9/14/2014 0:07 | SW | 20230418 | | 1 | 9 | 4 | 1 | 46.81006 | -92.08174 | 9/14/2014 0:08 | NE | 20230418 | | 1 | 10 | 31 | 1 | 46.81006 | -92.08174 | 9/14/2014 0:09 | N | 20230418 | +------------+------------+------------+------------+------------------+-------------------+----------------+------------+------------+寫入資料至Parquet外表。
INSERT INTO mc_oss_parquet_data_type_zstd PARTITION ( dt = '20230418') VALUES (1,16,76,1,46.81006,-92.08174,'9/14/2014 0:10','SW'); -- 查詢新寫入的資料 SELECT * FROM mc_oss_parquet_data_type_zstd WHERE dt = '20230418' AND recordid=16;返回結果如下:
+------------+------------+------------+------------+------------------+-------------------+----------------+------------+------------+ | vehicleid | recordid | patientid | calls | locationlatitute | locationlongtitue | recordtime | direction | dt | +------------+------------+------------+------------+------------------+-------------------+----------------+------------+------------+ | 1 | 16 | 76 | 1 | 46.81006 | -92.08174 | 9/14/2014 0:10 | SW | 20230418 | +------------+------------+------------+------------+------------------+-------------------+----------------+------------+------------+
常見問題
Parquet檔案列類型與外表DDL類型不一致
報錯資訊
ODPS-0123131:User defined function exception - Traceback: java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.IntWritable at org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector.getPrimitiveJavaObject(WritableIntObjectInspector.java:46)錯誤描述
Parquet檔案的LongWritable欄位類型與外表DDL的INT類型不一致。
解決方案
外部表格DDL中的INT類型需要改為BIGINT類型。
寫外部表格時報錯java.lang.OutOfMemoryError
報錯資訊
ODPS-0123131:User defined function exception - Traceback: java.lang.OutOfMemoryError: Java heap space at java.io.ByteArrayOutputStream.<init>(ByteArrayOutputStream.java:77) at org.apache.parquet.bytes.BytesInput$BAOS.<init>(BytesInput.java:175) at org.apache.parquet.bytes.BytesInput$BAOS.<init>(BytesInput.java:173) at org.apache.parquet.bytes.BytesInput.toByteArray(BytesInput.java:161)錯誤描述
大量資料寫入Parquet外表時出現OOM報錯。
解決方案
建議建立外部表格時,先調小
mcfed.parquet.block.row.count.limit參數,如果還會發生OOM,或者輸出檔案太大,可以調小mcfed.parquet.page.size.row.check.max參數,更加頻繁地檢查記憶體。詳情請參見專屬參數。在向Parquet外表寫入資料前,加上如下參數。
-- 設定UDF JVM Heap使用的最大記憶體。 SET odps.sql.udf.jvm.memory=12288; -- 控制Runtime側batch size。 SET odps.sql.executionengine.batch.rowcount =64; -- 設定每個Map Worker的記憶體大小。 SET odps.stage.mapper.mem=12288; -- 修改每個Map Worker的輸入資料量,即輸入檔案的分區大小,從而間接控制每個Map階段下Worker的數量。 SET odps.stage.mapper.split.size=64;