Object Storage Service (OSS) is compatible with the Amazon Simple Storage Service (Amazon S3) protocol. You can use S3 table engines or S3 table functions in an E-MapReduce (EMR) ClickHouse cluster to read data from and write data to OSS. This topic describes how to import data from OSS to a ClickHouse cluster and how to export data from a ClickHouse cluster to OSS.
Prerequisites
A bucket is created in OSS. For more information, see Create buckets.
An EMR ClickHouse cluster is created. For more information, see Create a ClickHouse cluster.
Import data from OSS to a ClickHouse cluster
Step 1: Create a business table
Log on to the ClickHouse cluster in SSH mode. For more information, see Log on to a cluster.
Run the following command to start the ClickHouse client:
clickhouse-client -h core-1-1 -mNoteIn the sample command, core-1-1 indicates the name of the core node that you log on to. If your cluster has multiple core nodes, you can log on to one of the core nodes.
Execute the following statements to create a database named product and a business table named orders in the product database:
CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr; CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr ( `uid` UInt32, `date` DateTime, `skuId` UInt32, `order_revenue` UInt32 ) Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}') PARTITION BY toYYYYMMDD(date) ORDER BY toYYYYMMDD(date); CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr ( `uid` UInt32, `date` DateTime, `skuId` UInt32, `order_revenue` UInt32 ) Engine = Distributed(cluster_emr, product, orders, rand());NoteIn the sample code, {shard} and {replica} are macros that are automatically generated by Alibaba Cloud EMR for ClickHouse clusters and can be directly used.
Step 2: Import data
Use S3 table engines to import data
Hadoop Distributed File System (HDFS) table engines of ClickHouse can read file data in a specific format from a specified OSS address. Syntax:
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [NULL|NOT NULL] [DEFAULT|MATERIALIZED|ALIAS expr1] [compression_codec] [TTL expr1],
name2 [type2] [NULL|NOT NULL] [DEFAULT|MATERIALIZED|ALIAS expr2] [compression_codec] [TTL expr2],
...
)
ENGINE = S3(path, [access_key_id, access_key_secret,] format, [compression]);Parameter | Description |
db | The name of the database. |
table_name | The name of the table. |
name1/name2 | The names of the columns in the table. |
tyep1/type2 | The data types of the columns in the table. |
path | The OSS path of the object that is stored in the OSS bucket. For information about the endpoint that is used by a ClickHouse cluster to access an OSS bucket, see Access to OSS resources from ECS instances by using an internal endpoint of OSS. You can set the path parameter to a virtual-hosted URL or a path-style URL. We recommend that you set this parameter to a virtual-hosted URL. The value of the path parameter supports the following wildcards:
|
access_key_id | The AccessKey ID of your Alibaba Cloud account. |
access_key_secret | The AccessKey secret of your Alibaba Cloud account. |
format | The format of the object that is stored in the OSS path. File formats such as CSV and XML are supported. For more information, see Formats for Input and Output Data. |
compression | The compression format. This parameter is optional. If you do not specify this parameter, the system uses a compression format based on the file name extension. You can specify this parameter based on the version of the EMR cluster that you create:
|
Create a table to read data from OSS.
Download the sample object orders.csv and upload the object to OSS. In this example, the object is uploaded to the root directory of an OSS bucket named test.
Execute the following statements to create an OSS table and use the S3 table engine:
CREATE DATABASE IF NOT EXISTS oss ON CLUSTER cluster_emr; CREATE TABLE oss.orders_oss ( uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32 ) ENGINE = S3('http://test.oss-cn-beijing-internal.aliyuncs.com/orders.csv', '<access_key_id>', '<access_key_secret>', 'CSV');NoteIn the sample code, http://test.oss-cn-beijing-internal.aliyuncs.com/orders.csv is the path to the orders.csv object in an OSS bucket named test in the China (Beijing) region.
Execute the following statements to import data to the product.orders_all table:
INSERT INTO product.orders_all SELECT uid, date, skuId, order_revenue FROM oss.orders_oss;Execute the following statements to view the data in the table and check data consistency:
Query the data in the orders_all table.
SELECT count(1) FROM product.orders_all;Query the data in the orders_oss table.
SELECT count(1) FROM oss.orders_oss;
Use S3 table functions to import data
S3 table functions of ClickHouse can read file data from a specified HDFS address and return a table with a specified schema. Syntax:
s3(path, [access_key_id, access_key_secret,] format, structure, [compression])Parameter | Description |
| The OSS path of the object that is stored in the OSS bucket. For information about the endpoint that is used by a ClickHouse cluster to access an OSS bucket, see Access to OSS resources from ECS instances by using an internal endpoint of OSS. You can set the path parameter to a virtual-hosted URL or a path-style URL. We recommend that you set this parameter to a virtual-hosted URL. The value of the path parameter supports the following wildcards:
|
| The AccessKey ID of your Alibaba Cloud account. |
| The AccessKey secret of your Alibaba Cloud account. |
| The format of the object that is stored in the OSS path. File formats such as CSV and XML are supported. For more information, see Formats for Input and Output Data. |
| The data types of the fields in the table. Example: column1 UInt32 and column2 String. |
| The compression format. This parameter is optional. If you do not specify this parameter, the system uses a compression format based on the file name extension. You can specify this parameter based on the version of the EMR cluster that you create:
|
Use S3 table functions to import data to the ClickHouse cluster.
INSERT INTO product.orders_all SELECT uid, date, skuId, order_revenue FROM s3('http://test.oss-cn-beijing-internal.aliyuncs.com/orders.csv', '<your-access-key>', '<your-access-secret>', 'CSV', 'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32');Execute the following statements to view the data in the table and check data consistency:
Query the data in the orders_all table.
SELECT count(1) FROM product.orders_all;Query the data in the orders_oss table.
SELECT count(1) FROM oss.orders_oss;
Export data from a ClickHouse cluster to OSS
Step 1: Create a business table
In this example, the business table schema used for the data export is the same as that used for the data import. For more information, see Step 1: Create a business table.
Step 2: Prepare data
Execute the following statement to insert data into the product.orders_all business table to prepare the data for subsequent export operations:
INSERT INTO product.orders_all VALUES (60333391,'2021-08-04 11:26:01',49358700,89) (38826285,'2021-08-03 10:47:29',25166907,27) (10793515,'2021-07-31 02:10:31',95584454,68) (70246093,'2021-08-01 00:00:08',82355887,97) (70149691,'2021-08-02 12:35:45',68748652,1) (87307646,'2021-08-03 19:45:23',16898681,71) (61694574,'2021-08-04 23:23:32',79494853,35) (61337789,'2021-08-02 07:10:42',23792355,55) (66879038,'2021-08-01 16:13:19',95820038,89);Optional. Specify an export method to prevent export failures that occur if the object to which you want to write data already exists. You can specify the export method for EMR clusters of V5.8.0, V3.45.0, and their later minor versions.
Incremental export
After you configure the following parameter, if the object to which you want to write data already exists in OSS, a new object is created in the related directory to store the incremental data.
set s3_create_new_file_on_insert=1Overwrite export
After you configure the following parameter, if the object to which you want to write data already exists in OSS, the data that you want to write will overwrite the existing data in the related object. Proceed with caution.
set s3_truncate_on_insert=1
Step 3: Export data
Use S3 table engines to export data
Execute the following statement to create an S3 table:
CREATE TABLE oss.orders_oss ( uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32 ) ENGINE = S3('http://test.oss-cn-beijing-internal.aliyuncs.com/orders.csv', '<access_key_id>', '<access_key_secret>', 'CSV');Execute the following statements to write data to the table:
-- In this example, the business table product.orders_all is used. INSERT INTO oss.orders_oss SELECT uid, date, skuId, order_revenue FROM product.orders_all;NoteDuring the data export, ClickHouse creates an object in the related path and writes data to the object. By default, if the object to which you want to write data already exists in OSS, the data export fails. You can configure parameters for EMR clusters of V5.8.0, V3.45.0, and their later minor versions to prevent the failure.
View data in the OSS console.
Use HDFS table functions to export data
Execute the following statements to export data:
INSERT INTO FUNCTION s3('http://test.oss-cn-beijing-internal.aliyuncs.com/orders.csv', '<your-access-key>', '<your-access-secret>', 'CSV', 'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32') SELECT uid, date, skuId, order_revenue FROM product.orders_all;NoteDuring the data export, ClickHouse creates an object in the related path and writes data to the object. By default, if the object to which you want to write data already exists in OSS, the data export fails. You can configure parameters for EMR clusters of V5.8.0, V3.45.0, and their later minor versions to prevent the failure.
View data in the OSS console.
Configure OSS-related parameters
profile
Supported profiles
If you use the multipart upload method to upload a file to OSS, you can use the
s3_min_upload_part_sizeparameter to specify the minimum size of each part in the file. The default minimum size is 512 MB. The value of this parameter must be an integer of the UInt64 value type.Configuration methods
The following sample code shows how to specify the s3_min_upload_part_size parameter in a single SQL query:
INSERT INTO OSS_TABLE SELECT ... FROM ... SETTINGS s3_min_upload_part_size=1073741824;The following sample code shows how to specify the s3_min_upload_part_size parameter in a session:
SET s3_min_upload_part_size=1073741824; INSERT INTO OSS_TABLE SELECT ... FROM ... ;The following sample code shows how to specify the s3_min_upload_part_size parameter for a table:
CREATE TABLE OSS_TABLE ( ... ) ENGINE = s3(...) SETTINGS s3_min_upload_part_size=1073741824;Specify the s3_min_upload_part_size parameter for a user:
On the Configure tab of the ClickHouse service page in the EMR console, click the server-users tab. On this tab, click Add Configuration Item. In the Add Configuration Item dialog box, add a configuration item whose key is users.<YourUserName>.s3_min_upload_part_size and whose value is 1073741824.
configuration
The following sample code shows how to specify OSS-related parameters in an EMR ClickHouse cluster:
<s3>
<endpoint-name>
<endpoint>https://oss-cn-beijing-internal.aliyuncs.com/bucket</endpoint>
<access_key_id>ACCESS_KEY_ID</access_key_id>
<secret_access_key>ACCESS_KEY_SECRET</secret_access_key>
</endpoint-name>
</s3>The following table describes the parameters in the sample code.
Parameter | Description |
endpoint-name | The information about the endpoint. |
endpoint | The endpoint that is used to access OSS. For more information, see OSS domain names. |
access_key_id | The AccessKey ID of your Alibaba Cloud account. |
secret_access_key | The AccessKey secret of your Alibaba Cloud account. |
You can also go to the Configure tab of the ClickHouse service page in the EMR console and click the server-config tab to add custom configurations by using one of the following methods.
Method | Description |
Method 1 | Add configuration items whose keys are oss.<endpoint-name>.endpoint, oss.<endpoint-name>.access_key_id, and oss.<endpoint-name>.secret_access_key and specify their values. Note You must replace |
Method 2 | Add a configuration item whose key is oss and whose value is in the following format: Note You must replace the values of the parameters with the actual endpoint, AccessKey ID, and AccessKey secret. |
After you complete the preceding configurations, you can execute the following statements to create an OSS table and use OSS table functions:
Create an OSS table
CREATE TABLE OSS_TABLE ( column1 UInt32, column2 String ... ) ENGINE = S3(path, format, [compression]);Use OSS table functions
s3(path, format, structure, [compression]);