All Products
Search
Document Center

E-MapReduce:Import and export data between OSS and ClickHouse

Last Updated:Oct 09, 2023

Object Storage Service (OSS) is compatible with the Amazon Simple Storage Service (Amazon S3) protocol. You can use S3 table engines or S3 table functions in an E-MapReduce (EMR) ClickHouse cluster to read data from and write data to OSS. This topic describes how to import data from OSS to a ClickHouse cluster and how to export data from a ClickHouse cluster to OSS.

Prerequisites

Import data from OSS to a ClickHouse cluster

Step 1: Create a business table

  1. Log on to the ClickHouse cluster in SSH mode. For more information, see Log on to a cluster.

  2. Run the following command to start the ClickHouse client:

    clickhouse-client -h core-1-1 -m
    Note

    In the sample command, core-1-1 indicates the name of the core node that you log on to. If your cluster has multiple core nodes, you can log on to one of the core nodes.

  3. Execute the following statements to create a database named product and a business table named orders in the product database:

    CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr;
    CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr
    (
        `uid` UInt32,
        `date` DateTime,
        `skuId` UInt32,
        `order_revenue` UInt32
    )
    Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}')
    PARTITION BY toYYYYMMDD(date)
    ORDER BY toYYYYMMDD(date);
    CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr
    (
        `uid` UInt32,
        `date` DateTime,
        `skuId` UInt32,
        `order_revenue` UInt32
    )
    Engine = Distributed(cluster_emr, product, orders, rand());
    Note

    In the sample code, {shard} and {replica} are macros that are automatically generated by Alibaba Cloud EMR for ClickHouse clusters and can be directly used.

Step 2: Import data

Use S3 table engines to import data

Hadoop Distributed File System (HDFS) table engines of ClickHouse can read file data in a specific format from a specified OSS address. Syntax:

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
  name1 [type1] [NULL|NOT NULL] [DEFAULT|MATERIALIZED|ALIAS expr1] [compression_codec] [TTL expr1],
  name2 [type2] [NULL|NOT NULL] [DEFAULT|MATERIALIZED|ALIAS expr2] [compression_codec] [TTL expr2],
  ...
)
ENGINE = S3(path, [access_key_id, access_key_secret,] format, [compression]);

Parameter

Description

db

The name of the database.

table_name

The name of the table.

name1/name2

The names of the columns in the table.

tyep1/type2

The data types of the columns in the table.

path

The OSS path of the object that is stored in the OSS bucket.

For information about the endpoint that is used by a ClickHouse cluster to access an OSS bucket, see Access to OSS resources from ECS instances by using an internal endpoint of OSS.

You can set the path parameter to a virtual-hosted URL or a path-style URL. We recommend that you set this parameter to a virtual-hosted URL.

The value of the path parameter supports the following wildcards:

  • * indicates any characters except a forward slash (/). Empty strings are also supported.

  • ? indicates a single character.

  • {str1,str2,...,strn} specifies a string from str1, str2, ... and strn.

  • {N..M} specifies a number in the value range from N to M. N and M can contain leading zeros, such as {001..099}.

    Important

    The preceding wildcards are used to determine which files are specified in a SELECT statement, rather than for creating a table. If you want to write data to OSS, do not use wildcards.

access_key_id

The AccessKey ID of your Alibaba Cloud account.

access_key_secret

The AccessKey secret of your Alibaba Cloud account.

format

The format of the object that is stored in the OSS path. File formats such as CSV and XML are supported. For more information, see Formats for Input and Output Data.

compression

The compression format.

This parameter is optional. If you do not specify this parameter, the system uses a compression format based on the file name extension.

You can specify this parameter based on the version of the EMR cluster that you create:

  • EMR V3.X series: none, gzip/gz, brotli/b, deflate, or auto.

  • EMR V5.X series: none, gzip/gz, brotli/b, lzma(xz), auto, or zstd/zst.

  1. Create a table to read data from OSS.

    1. Download the sample object orders.csv and upload the object to OSS. In this example, the object is uploaded to the root directory of an OSS bucket named test.

    2. Execute the following statements to create an OSS table and use the S3 table engine:

      CREATE DATABASE IF NOT EXISTS oss ON CLUSTER cluster_emr;
      CREATE TABLE oss.orders_oss
      (
        uid UInt32,
        date DateTime,
        skuId UInt32,
        order_revenue UInt32
      ) ENGINE = S3('http://test.oss-cn-beijing-internal.aliyuncs.com/orders.csv', '<access_key_id>', '<access_key_secret>', 'CSV');
      Note

      In the sample code, http://test.oss-cn-beijing-internal.aliyuncs.com/orders.csv is the path to the orders.csv object in an OSS bucket named test in the China (Beijing) region.

  2. Execute the following statements to import data to the product.orders_all table:

    INSERT INTO product.orders_all
    SELECT
      uid,
      date,
      skuId,
      order_revenue
    FROM
      oss.orders_oss;
  3. Execute the following statements to view the data in the table and check data consistency:

    • Query the data in the orders_all table.

      SELECT count(1) FROM product.orders_all;
    • Query the data in the orders_oss table.

      SELECT count(1) FROM oss.orders_oss;

Use S3 table functions to import data

S3 table functions of ClickHouse can read file data from a specified HDFS address and return a table with a specified schema. Syntax:

s3(path, [access_key_id, access_key_secret,] format, structure, [compression])

Parameter

Description

path

The OSS path of the object that is stored in the OSS bucket.

For information about the endpoint that is used by a ClickHouse cluster to access an OSS bucket, see Access to OSS resources from ECS instances by using an internal endpoint of OSS.

You can set the path parameter to a virtual-hosted URL or a path-style URL. We recommend that you set this parameter to a virtual-hosted URL.

The value of the path parameter supports the following wildcards:

  • * indicates any characters except a forward slash (/). Empty strings are also supported.

  • ? indicates a single character.

  • {str1,str2,...,strn} specifies a string from str1, str2, ... and strn.

  • {N..M} specifies a number in the value range from N to M. N and M can contain leading zeros, such as {001..099}.

    Important

    The preceding wildcards are used to determine which files are specified in a SELECT statement, rather than for creating a table. If you want to write data to OSS, do not use wildcards.

access_key_id

The AccessKey ID of your Alibaba Cloud account.

access_key_secret

The AccessKey secret of your Alibaba Cloud account.

format

The format of the object that is stored in the OSS path. File formats such as CSV and XML are supported. For more information, see Formats for Input and Output Data.

structure

The data types of the fields in the table. Example: column1 UInt32 and column2 String.

compression

The compression format.

This parameter is optional. If you do not specify this parameter, the system uses a compression format based on the file name extension.

You can specify this parameter based on the version of the EMR cluster that you create:

  • EMR V3.X series: none, gzip/gz, brotli/b, deflate, or auto.

  • EMR V5.X series: none, gzip/gz, brotli/b, lzma(xz), auto, or zstd/zst.

  1. Use S3 table functions to import data to the ClickHouse cluster.

    INSERT INTO product.orders_all
    SELECT
      uid,
      date,
      skuId,
      order_revenue
    FROM
      s3('http://test.oss-cn-beijing-internal.aliyuncs.com/orders.csv',
          '<your-access-key>',
          '<your-access-secret>',
          'CSV',
          'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32');
  2. Execute the following statements to view the data in the table and check data consistency:

    • Query the data in the orders_all table.

      SELECT count(1) FROM product.orders_all;
    • Query the data in the orders_oss table.

      SELECT count(1) FROM oss.orders_oss;

Export data from a ClickHouse cluster to OSS

Step 1: Create a business table

In this example, the business table schema used for the data export is the same as that used for the data import. For more information, see Step 1: Create a business table.

Step 2: Prepare data

  1. Execute the following statement to insert data into the product.orders_all business table to prepare the data for subsequent export operations:

    INSERT INTO product.orders_all VALUES 
      (60333391,'2021-08-04 11:26:01',49358700,89) 
      (38826285,'2021-08-03 10:47:29',25166907,27) 
      (10793515,'2021-07-31 02:10:31',95584454,68) 
      (70246093,'2021-08-01 00:00:08',82355887,97) 
      (70149691,'2021-08-02 12:35:45',68748652,1)  
      (87307646,'2021-08-03 19:45:23',16898681,71) 
      (61694574,'2021-08-04 23:23:32',79494853,35) 
      (61337789,'2021-08-02 07:10:42',23792355,55) 
      (66879038,'2021-08-01 16:13:19',95820038,89);
  2. Optional. Specify an export method to prevent export failures that occur if the object to which you want to write data already exists. You can specify the export method for EMR clusters of V5.8.0, V3.45.0, and their later minor versions.

    Incremental export

    After you configure the following parameter, if the object to which you want to write data already exists in OSS, a new object is created in the related directory to store the incremental data.

    set s3_create_new_file_on_insert=1

    Overwrite export

    After you configure the following parameter, if the object to which you want to write data already exists in OSS, the data that you want to write will overwrite the existing data in the related object. Proceed with caution.

    set s3_truncate_on_insert=1

Step 3: Export data

Use S3 table engines to export data

  1. Execute the following statement to create an S3 table:

    CREATE TABLE oss.orders_oss
    (
      uid UInt32,
      date DateTime,
      skuId UInt32,
      order_revenue UInt32
    ) ENGINE = S3('http://test.oss-cn-beijing-internal.aliyuncs.com/orders.csv', '<access_key_id>', '<access_key_secret>', 'CSV');
  2. Execute the following statements to write data to the table:

    -- In this example, the business table product.orders_all is used.
    INSERT INTO oss.orders_oss
    SELECT
      uid,
      date,
      skuId,
      order_revenue
    FROM
      product.orders_all;
    Note

    During the data export, ClickHouse creates an object in the related path and writes data to the object. By default, if the object to which you want to write data already exists in OSS, the data export fails. You can configure parameters for EMR clusters of V5.8.0, V3.45.0, and their later minor versions to prevent the failure.

  3. View data in the OSS console.

Use HDFS table functions to export data

  1. Execute the following statements to export data:

    INSERT INTO FUNCTION
    s3('http://test.oss-cn-beijing-internal.aliyuncs.com/orders.csv',
          '<your-access-key>',
          '<your-access-secret>',
          'CSV',
          'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32')
    SELECT
      uid,
      date,
      skuId,
      order_revenue
    FROM
      product.orders_all;
    Note

    During the data export, ClickHouse creates an object in the related path and writes data to the object. By default, if the object to which you want to write data already exists in OSS, the data export fails. You can configure parameters for EMR clusters of V5.8.0, V3.45.0, and their later minor versions to prevent the failure.

  2. View data in the OSS console.

Configure OSS-related parameters

profile

  1. Supported profiles

    If you use the multipart upload method to upload a file to OSS, you can use the s3_min_upload_part_size parameter to specify the minimum size of each part in the file. The default minimum size is 512 MB. The value of this parameter must be an integer of the UInt64 value type.

  2. Configuration methods

    • The following sample code shows how to specify the s3_min_upload_part_size parameter in a single SQL query:

      INSERT INTO OSS_TABLE
      SELECT
        ...
      FROM
        ...
      SETTINGS
        s3_min_upload_part_size=1073741824;
    • The following sample code shows how to specify the s3_min_upload_part_size parameter in a session:

      SET s3_min_upload_part_size=1073741824;
      INSERT INTO OSS_TABLE
      SELECT
        ...
      FROM
        ...
      ;
    • The following sample code shows how to specify the s3_min_upload_part_size parameter for a table:

      CREATE TABLE OSS_TABLE
      (
        ...
      ) ENGINE = s3(...)
      SETTINGS
        s3_min_upload_part_size=1073741824;
    • Specify the s3_min_upload_part_size parameter for a user:

      On the Configure tab of the ClickHouse service page in the EMR console, click the server-users tab. On this tab, click Add Configuration Item. In the Add Configuration Item dialog box, add a configuration item whose key is users.<YourUserName>.s3_min_upload_part_size and whose value is 1073741824.

configuration

The following sample code shows how to specify OSS-related parameters in an EMR ClickHouse cluster:

<s3>
    <endpoint-name>
        <endpoint>https://oss-cn-beijing-internal.aliyuncs.com/bucket</endpoint>
        <access_key_id>ACCESS_KEY_ID</access_key_id>
        <secret_access_key>ACCESS_KEY_SECRET</secret_access_key>
    </endpoint-name>
</s3>

The following table describes the parameters in the sample code.

Parameter

Description

endpoint-name

The information about the endpoint.

endpoint

The endpoint that is used to access OSS. For more information, see OSS domain names.

access_key_id

The AccessKey ID of your Alibaba Cloud account.

secret_access_key

The AccessKey secret of your Alibaba Cloud account.

You can also go to the Configure tab of the ClickHouse service page in the EMR console and click the server-config tab to add custom configurations by using one of the following methods.

Method

Description

Method 1

Add configuration items whose keys are oss.<endpoint-name>.endpoint, oss.<endpoint-name>.access_key_id, and oss.<endpoint-name>.secret_access_key and specify their values.

Note

You must replace <endpoint-name> in the keys with the endpoint of OSS.

Method 2

Add a configuration item whose key is oss and whose value is in the following format:

<endpoint-name>
  <endpoint>https://oss-cn-beijing-internal.aliyuncs.com/bucket</endpoint>
  <access_key_id>ACCESS_KEY_ID</access_key_id>
  <secret_access_key>ACCESS_KEY_SECRET</secret_access_key>
</endpoint-name>
Note

You must replace the values of the parameters with the actual endpoint, AccessKey ID, and AccessKey secret.

After you complete the preceding configurations, you can execute the following statements to create an OSS table and use OSS table functions:

  • Create an OSS table

    CREATE TABLE OSS_TABLE
    (
      column1 UInt32,
      column2 String
      ...
    )
    ENGINE = S3(path, format, [compression]);
  • Use OSS table functions

    s3(path, format, structure, [compression]);