This topic describes how to create, read from, and write to Parquet-format external tables in OSS.
Scope of Use
OSS external tables do not support the cluster property.
The size of a single file cannot exceed 2 GB. You must split files that are larger than 2 GB.
MaxCompute and OSS must be in the same region.
Supported Data Types
For details about MaxCompute data types, see Data Type Version 1.0 and Data Type Version 2.0.
JNI mode:
set odps.ext.parquet.native=false. This uses the original Java-based open-source implementation to parse Parquet data files. It supports both reading and writing.Native mode:
set odps.ext.parquet.native=true. This uses a new C++-based native implementation to parse Parquet data files. It supports reading only.Mode
Java Mode (Read/Write)
Native Mode (Read Only)
TINYINT
SMALLINT
INT
BIGINT
BINARY
FLOAT
DOUBLE
DECIMAL(precision,scale)
VARCHAR(n)
CHAR(n)
STRING
DATE
DATETIME
TIMESTAMP
TIMESTAMP_NTZ
BOOLEAN
ARRAY
MAP
STRUCT
JSON
Supported Compression Formats
When you read or write OSS files with compression properties, you need to add the
with serdepropertiesproperty configuration to the table creation statement. For more information, see with serdeproperties property parameters.Supported Parquet file formats: ZSTD, SNAPPY, and GZIP compression.
Schema Evolution Support
Parquet external tables map column values by name, matching the table schema to the file columns.
The Data Compatibility Notes in the following table describe whether an external table with a modified schema can read data that matches the new schema. They also describe whether the table can read existing historical data that does not match the new schema.
Operation Type | Supported | Description | Data Compatibility Notes |
Add column |
|
| |
Delete column | Parquet external tables map column values by name. | Compatible | |
Reorder columns | Parquet external tables map column values by name. | Compatible | |
Change column data type | This operation is not supported. Parquet enforces strict schema validation. Changing a compatible type may break compatibility. | Not applicable | |
Rename column | This operation is not supported. Parquet enforces strict schema validation. Changing a compatible type may break compatibility. | Not applicable | |
Update column comment | Comments must be valid strings no longer than 1024 bytes. Otherwise, an error occurs. | Compatible | |
Change column nullability | This operation is not supported. Columns are nullable by default. | Not applicable |
Create an External Table
Syntax
When the Parquet file schema differs from the external table schema:
Column count mismatch: If the Parquet file has fewer columns than the external table DDL, missing columns return NULL. If the Parquet file has more columns, extra columns are ignored.
Column type mismatch: If a Parquet file column type differs from the corresponding column type in the external table DDL, reading fails. For example, attempting to read INT (or STRING) data using STRING (or INT) returns the error
ODPS-0123131:User defined function exception - Traceback:xxx.
Simplified syntax structure
CREATE EXTERNAL TABLE [IF NOT EXISTS] <mc_oss_extable_name>
(
<col_name> <data_type>,
...
)
[COMMENT <table_comment>]
[PARTITIONED BY (<col_name> <data_type>, ...)]
STORED AS parquet
LOCATION '<oss_location>'
[tblproperties ('<tbproperty_name>'='<tbproperty_value>',...)];Detailed Syntax
CREATE EXTERNAL TABLE [IF NOT EXISTS] <mc_oss_extable_name>
(
<col_name> <data_type>,
...
)
[COMMENT <table_comment>]
[PARTITIONED BY (<col_name> <data_type>, ...)]
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
WITH serdeproperties(
'odps.properties.rolearn'='acs:ram::<uid>:role/aliyunodpsdefaultrole',
'mcfed.parquet.compression'='ZSTD/SNAPPY/GZIP'
)
STORED AS parquet
LOCATION '<oss_location>'
;Common parameters
For more information about common parameters, see Basic syntax parameters.
Unique Parameters
with serdeproperties Parameters
property_name | Use Case | Description | property_value | Default Value |
mcfed.parquet.compression | Add this property when writing Parquet data to OSS in compressed format. | Parquet compression setting. Parquet data is uncompressed by default. |
| None |
mcfed.parquet.compression.codec.zstd.level | Add this property when | Higher levels increase compression ratio but reduce performance. For large-scale Parquet I/O, use low levels (3–5). Example: | Valid values: 1–22. | 3 |
parquet.file.cache.size | Add this property to improve OSS read performance. | Cache size for OSS data files, in KB. | 1024 | None |
parquet.io.buffer.size | Add this property to improve OSS read performance. | Buffer size for OSS data files larger than 1024 KB, in KB. | 4096 | None |
tblproperties Parameters
property_name | Use Case | Description | property_value | Default Value |
io.compression.codecs | Add this property when OSS data files use Raw-Snappy format. | The built-in open-source data resolver supports the SNAPPY format. When this parameter is set to True, MaxCompute can read compressed data. Otherwise, MaxCompute cannot read the data. | com.aliyun.odps.io.compress.SnappyRawCodec. | None |
odps.external.data.output.prefix (Backward-compatible with odps.external.data.prefix) | Add this property to set a custom prefix for output files. |
| Valid combination, such as mc_. | None |
odps.external.data.enable.extension | Add this property to show file extensions in output filenames. | True shows extensions. False hides them. |
| False |
odps.external.data.output.suffix | Add this property to set a custom suffix for output files. | Must contain only letters, digits, and underscores (a–z, A–Z, 0–9, _). | Valid combination, such as _hangzhou. | None |
odps.external.data.output.explicit.extension | Add this property to set a custom file extension. |
| Valid combination, such as jsonl. | None |
mcfed.parquet.compression | Add this property when writing Parquet data to OSS in compressed format. | Parquet compression setting. Parquet data is uncompressed by default. |
| None |
mcfed.parquet.block.size | Controls Parquet block size, affecting storage efficiency and read performance. | Parquet tuning setting. Block size in bytes. | Non-negative integer | 134217728 (128 MB) |
mcfed.parquet.block.row.count.limit | Limits the number of records per row group when writing to Parquet external tables to avoid out-of-memory (OOM) errors. | Parquet tuning setting. Maximum records per row group. Reduce this value if OOM occurs. Usage tips:
| Non-negative integer | 2147483647 (Integer.MAX_VALUE) |
mcfed.parquet.page.size.row.check.min | Controls memory check frequency during writes to Parquet external tables to prevent OOM. | Parquet tuning setting. Minimum records between memory checks. Reduce this value if OOM occurs. | Non-negative integer | 100 |
mcfed.parquet.page.size.row.check.max | Controls memory check frequency during writes to Parquet external tables to prevent OOM. | Parquet Tuning Properties. Specifies the minimum number of records between memory checks. If an out-of-memory error occurs, decrease this parameter. Because frequent calculations of memory usage are required, adjusting this parameter may introduce additional overhead. Usage tips:
| Non-negative integer | 1000 |
mcfed.parquet.compression.codec.zstd.level | Add this property to set the ZSTD compression level when writing Parquet data to OSS in ZSTD format. | Parquet compression setting. ZSTD compression level. Valid values: 1–22. | Non-negative integer | 3 |
Write data
For more information about the MaxCompute write syntax, see Write syntax.
Query and Analysis
For more information about the SELECT syntax, see Query syntax.
For more information about query plan optimization, see Query optimization.
To query LOCATION files directly, see Feature: Schemaless Query.
Query Optimization: Parquet external tables support Predicate Push Down (PPD) to optimize queries. For performance results, see Predicate Push Down (Parquet PPD).
Enable PPD by adding these parameters before your SQL:
-- Use PPD only in Native mode (odps.ext.parquet.native = true). -- Enable the Parquet native reader. SET odps.ext.parquet.native = true; -- Enable Parquet PPD. SET odps.sql.parquet.use.predicate.pushdown = true;
Predicate Push Down (Parquet PPD)
Parquet external tables do not natively support Predicate Push Down (Parquet PPD). When you execute queries with WHERE filter conditions, MaxCompute scans all data by default, resulting in unnecessary I/O, resource consumption, and query latency. Therefore, MaxCompute has introduced the Parquet PPD parameter. By enabling Parquet PPD, you can leverage the metadata features of Parquet files themselves during the data scanning phase to implement Parquet RowGroup-level filtering, thereby improving query performance and reducing resource consumption and costs.
How to Use
Enable Predicate Push Down (Parquet PPD)
Before you execute an SQL query, use the
SETcommand to set the following two session-level parameters to enable Parquet PDD.-- Enable the Parquet native reader. set odps.ext.parquet.native = true; -- Enable Parquet PPD. set odps.sql.parquet.use.predicate.pushdown = true;Example
Based on a 1 TB TPCDS test dataset, this example uses the
tpcds_1t_store_salesParquet foreign table to run a filtering query with PPD enabled. The total data volume is2879987999.-- Create the external table tpcds_1t_store_sales. CREATE EXTERNAL TABLE IF NOT EXISTS tpcds_1t_store_sales ( ss_sold_date_sk BIGINT, ss_sold_time_sk BIGINT, ss_item_sk BIGINT, ss_customer_sk BIGINT, ss_cdemo_sk BIGINT, ss_hdemo_sk BIGINT, ss_addr_sk BIGINT, ss_store_sk BIGINT, ss_promo_sk BIGINT, ss_ticket_number BIGINT, ss_quantity BIGINT, ss_wholesale_cost DECIMAL(7,2), ss_list_price DECIMAL(7,2), ss_sales_price DECIMAL(7,2), ss_ext_discount_amt DECIMAL(7,2), ss_ext_sales_price DECIMAL(7,2), ss_ext_wholesale_cost DECIMAL(7,2), ss_ext_list_price DECIMAL(7,2), ss_ext_tax DECIMAL(7,2), ss_coupon_amt DECIMAL(7,2), ss_net_paid DECIMAL(7,2), ss_net_paid_inc_tax DECIMAL(7,2), ss_net_profit DECIMAL(7,2) ) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' WITH serdeproperties( 'odps.properties.rolearn'='acs:ram::<uid>:role/aliyunodpsdefaultrole', 'mcfed.parquet.compression'='zstd' ) STORED AS parquet LOCATION 'oss://oss-cn-hangzhou-internal.aliyuncs.com/oss_bucket_path/'; -- Load 1 TB of TPCDS test data. INSERT OVERWRITE TABLE tpcds_1t_store_sales SELECT ss_sold_date_sk, ss_sold_time_sk, ss_item_sk, ss_customer_sk, ss_cdemo_sk, ss_hdemo_sk, ss_addr_sk, ss_store_sk, ss_promo_sk, ss_ticket_number, ss_quantity, ss_wholesale_cost, ss_list_price, ss_sales_price, ss_ext_discount_amt, ss_ext_sales_price, ss_ext_wholesale_cost, ss_ext_list_price, ss_ext_tax, ss_coupon_amt, ss_net_paid, ss_net_paid_inc_tax, ss_net_profit FROM bigdata_public_dataset.tpcds_1t.store_sales; -- Run the query. SELECT SUM(ss_sold_date_sk) FROM tpcds_1t_store_sales WHERE ss_sold_date_sk >= 2451871 AND ss_sold_date_sk <= 2451880;
Performance Comparison
Enabling PPD reduces data scanned, lowering query latency and resource usage.
Mode | Total Rows | Rows Scanned | Bytes Scanned | Mapper Time | Total Resource Usage | Notes |
Parquet External Table (PPD Disabled) | 2879987999 | 2879987999 (100%) | 19386793984 (100%) | 18 s | cpu 19.25 Core * Min, memory 24.07 GB * Min 100% | |
Parquet External Table (PPD Enabled) | 2879987999 | 762366649 (26.47%) | 3339386880 (17.22%) | 12 s | cpu 11.47 Core * Min, memory 14.33 GB * Min ~59.58% | Reducing scanned data significantly lowers latency and resource usage |
Internal Table (PPD Enabled) | 2879987999 | 32830000 (1.14%) | 1633880386 (8.43%) | 9 s | cpu 5.62 Core * Min, memory 7.02 GB * Min ~29.19% | Internal tables are sorted, so PPD works even better |
Test Details
Parquet External Table (PPD Disabled)
SET odps.ext.parquet.native = true; SET odps.sql.parquet.use.predicate.pushdown = false; SELECT SUM(ss_sold_date_sk) FROM tpcds_1t_store_sales WHERE ss_store_sk = 2 AND ss_sold_date_sk >= 2451871 AND ss_sold_date_sk <= 2451880;


Parquet External Table (PPD Enabled)
SET odps.ext.parquet.native = true; SET odps.sql.parquet.use.predicate.pushdown = true; SELECT SUM(ss_sold_date_sk) FROM tpcds_1t_store_sales WHERE ss_store_sk = 2 AND ss_sold_date_sk >= 2451871 AND ss_sold_date_sk <= 2451880;

Many mappers are empty and skip reading data:

Actual RowGroup pruning log:

Internal Table (PPD Enabled)
Internal table data is sorted, so pruning is more effective.
SELECT SUM(ss_sold_date_sk) FROM bigdata_public_dataset.tpcds_1t.store_sales WHERE ss_store_sk = 2 AND ss_sold_date_sk >= 2451871 AND ss_sold_date_sk <= 2451880;


Example Scenario
This example creates a ZSTD-compressed Parquet external table and performs read and write operations.
Prerequisites
You have created a MaxCompute project.
You have prepared an OSS bucket and OSS directory. For more information, see Create a bucket and Manage folders.
Because MaxCompute is available only in specific regions, cross-region connectivity issues may occur. We recommend using an OSS bucket in the same region as your MaxCompute project.
Authorization
You have permissions to access OSS. An Alibaba Cloud account (primary account), Resource Access Management (RAM) user, or RAM role can access OSS external tables. For authorization details, see STS-mode authorization for OSS.
You have CreateTable permissions in the MaxCompute project. For details about table operation permissions, see MaxCompute permissions.
Prepare ZSTD-formatted data files.
In the
oss-mc-testbucket from the sample data, create the directoryparquet_zstd_jni/dt=20230418. Place the partitioned data in thedt=20230418directory.Create a ZSTD-compressed Parquet external table.
CREATE EXTERNAL TABLE IF NOT EXISTS mc_oss_parquet_data_type_zstd ( vehicleId INT, recordId INT, patientId INT, calls INT, locationLatitute DOUBLE, locationLongtitue DOUBLE, recordTime STRING, direction STRING ) PARTITIONED BY (dt STRING ) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' WITH serdeproperties( 'odps.properties.rolearn'='acs:ram::<uid>:role/aliyunodpsdefaultrole', 'mcfed.parquet.compression'='zstd' ) STORED AS parquet LOCATION 'oss://oss-cn-hangzhou-internal.aliyuncs.com/oss-mc-test/parquet_zstd_jni/';If your OSS external table is partitioned, run this command to add partitions. For more options, see Add Partitions to OSS External Tables.
-- Add partition data. MSCK REPAIR TABLE mc_oss_parquet_data_type_zstd ADD PARTITIONS;Read data from the Parquet external table.
SELECT * FROM mc_oss_parquet_data_type_zstd WHERE dt='20230418' LIMIT 10;Sample results:
+------------+------------+------------+------------+------------------+-------------------+----------------+------------+------------+ | vehicleid | recordid | patientid | calls | locationlatitute | locationlongtitue | recordtime | direction | dt | +------------+------------+------------+------------+------------------+-------------------+----------------+------------+------------+ | 1 | 12 | 76 | 1 | 46.81006 | -92.08174 | 9/14/2014 0:10 | SW | 20230418 | | 1 | 1 | 51 | 1 | 46.81006 | -92.08174 | 9/14/2014 0:00 | S | 20230418 | | 1 | 2 | 13 | 1 | 46.81006 | -92.08174 | 9/14/2014 0:01 | NE | 20230418 | | 1 | 3 | 48 | 1 | 46.81006 | -92.08174 | 9/14/2014 0:02 | NE | 20230418 | | 1 | 4 | 30 | 1 | 46.81006 | -92.08174 | 9/14/2014 0:03 | W | 20230418 | | 1 | 5 | 47 | 1 | 46.81006 | -92.08174 | 9/14/2014 0:04 | S | 20230418 | | 1 | 6 | 9 | 1 | 46.81006 | -92.08174 | 9/14/2014 0:05 | S | 20230418 | | 1 | 7 | 53 | 1 | 46.81006 | -92.08174 | 9/14/2014 0:06 | N | 20230418 | | 1 | 8 | 63 | 1 | 46.81006 | -92.08174 | 9/14/2014 0:07 | SW | 20230418 | | 1 | 9 | 4 | 1 | 46.81006 | -92.08174 | 9/14/2014 0:08 | NE | 20230418 | | 1 | 10 | 31 | 1 | 46.81006 | -92.08174 | 9/14/2014 0:09 | N | 20230418 | +------------+------------+------------+------------+------------------+-------------------+----------------+------------+------------+Write data to the Parquet external table.
INSERT INTO mc_oss_parquet_data_type_zstd PARTITION ( dt = '20230418') VALUES (1,16,76,1,46.81006,-92.08174,'9/14/2014 0:10','SW'); -- Query the newly inserted data. SELECT * FROM mc_oss_parquet_data_type_zstd WHERE dt = '20230418' AND recordid=16;Result:
+------------+------------+------------+------------+------------------+-------------------+----------------+------------+------------+ | vehicleid | recordid | patientid | calls | locationlatitute | locationlongtitue | recordtime | direction | dt | +------------+------------+------------+------------+------------------+-------------------+----------------+------------+------------+ | 1 | 16 | 76 | 1 | 46.81006 | -92.08174 | 9/14/2014 0:10 | SW | 20230418 | +------------+------------+------------+------------+------------------+-------------------+----------------+------------+------------+
FAQ
Parquet File Column Type Does Not Match External Table DDL Type
Error Message
ODPS-0123131:User defined function exception - Traceback: java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.IntWritable at org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector.getPrimitiveJavaObject(WritableIntObjectInspector.java:46)Error Description
The Parquet file uses LongWritable, but the external table DDL defines the column as INT.
Solution
Change the column type in the external table DDL from INT to BIGINT.
Out-of-Memory Error When Writing to External Tablesjava.lang.OutOfMemoryError
Error Message
ODPS-0123131:User defined function exception - Traceback: java.lang.OutOfMemoryError: Java heap space at java.io.ByteArrayOutputStream.<init>(ByteArrayOutputStream.java:77) at org.apache.parquet.bytes.BytesInput$BAOS.<init>(BytesInput.java:175) at org.apache.parquet.bytes.BytesInput$BAOS.<init>(BytesInput.java:173) at org.apache.parquet.bytes.BytesInput.toByteArray(BytesInput.java:161)Error Description
An OutOfMemoryError occurs when writing large amounts of data to a Parquet external table.
Solution
When creating the external table, first reduce
mcfed.parquet.block.row.count.limit. If OOM persists or output files are too large, reducemcfed.parquet.page.size.row.check.maxto check memory more frequently. For details, see Unique Parameters.Add these settings before writing to the Parquet external table:
-- Set maximum JVM heap memory for UDFs. SET odps.sql.udf.jvm.memory=12288; -- Control batch size on the runtime side. SET odps.sql.executionengine.batch.rowcount =64; -- Set memory size per Map Worker. SET odps.stage.mapper.mem=12288; -- Adjust input data size per Map Worker (file split size) to control the number of Workers per Map stage. SET odps.stage.mapper.split.size=64;