All Products
Search
Document Center

E-MapReduce:Separate hot and cold data by using HDFS

Last Updated:May 22, 2023

This topic describes how to separate hot and cold data in a ClickHouse cluster of Alibaba Cloud E-MapReduce (EMR) by using Hadoop Distributed File System (HDFS). The operations that are described in this topic help you not only automatically maintain hot and cold data in a cluster, but also fully utilize computing and storage resources in the cluster to reduce costs. The operations do not affect the read and write performance of the cluster.

Prerequisites

  • A ClickHouse cluster of EMR V5.5.0 or later is created in the EMR console. For more information, see Create a ClickHouse cluster.

  • An HDFS service that is deployed in the same virtual private cloud (VPC) as the ClickHouse cluster is available. For example, you can use the HDFS service in an EMR Hadoop cluster.

  • You are granted the read and write permissions on the HDFS service.

Limits

You can perform the operations that are described in this topic only in a ClickHouse cluster of EMR V5.5.0 or later.

Procedure

  1. Step 1: Add an HDFS disk in the EMR console

  2. Step 2: Check the configuration

  3. Step 3: Separate hot and cold data

Step 1: Add an HDFS disk in the EMR console

  1. Go to the Configure tab on the ClickHouse service page of the cluster.

    1. Log on to the EMR on ECS console.

    2. In the top navigation bar, select the region where your cluster resides and select a resource group based on your business requirements.

    3. On the EMR on ECS page, find the cluster that you want to manage and click Services in the Actions column.

    4. On the Services tab, click Configure in the ClickHouse section.

  2. On the Configure tab, click the server-metrika tab.

  3. Change the value of the storage_configuration parameter.

    1. Add an HDFS disk to disks.

      Sample code:

      <disk_hdfs>
          <type>hdfs</type>
          <endpoint>hdfs://${your-hdfs-url}</endpoint>
          <min_bytes_for_seek>1048576</min_bytes_for_seek>
          <thread_pool_size>16</thread_pool_size>
          <objects_chunk_size_to_delete>1000</objects_chunk_size_to_delete>
      </disk_hdfs>

      The following table describes the parameters that are used in the preceding code.

      Parameter

      Required

      Description

      disk_hdfs

      Yes

      The name of the disk. You can specify a custom name.

      type

      Yes

      The type of the disk. Set the value to hdfs.

      endpoint

      Yes

      The endpoint of the HDFS service.

      Important

      In most cases, the endpoint of the HDFS service is the address of NameNode. In most cases, if NameNode is deployed in high availability (HA) mode, the port number of the HDFS service is 8020. Otherwise, the port number of the HDFS service is 9000.

      min_bytes_for_seek

      No

      The minimum positive seek offset that will be executed by using the seek operation. If the positive seek offset is less than the value of this parameter, the skip operation is performed instead of the seek operation. Default value: 1048576.

      thread_pool_size

      No

      The size of the thread pool that is used to process the restore request in the disk. Default value: 16.

      objects_chunk_size_to_delete

      No

      The maximum number of HDFS files that can be deleted at the same time. Default value: 1000.

    2. Add a storage policy to policies.

      Sample code:

      <hdfs_ttl>
          <volumes>
            <local>
              <!-- Include all disks that use the default storage policy. -->
              <disk>disk1</disk>
              <disk>disk2</disk>
              <disk>disk3</disk>
              <disk>disk4</disk>
            </local>
            <remote>
              <disk>disk_hdfs</disk>
            </remote>
          </volumes>
          <move_factor>0.2</move_factor>
      </hdfs_ttl>
      Note

      You can also add the preceding code to the default storage policy.

  4. Save the configuration.

    1. Click Save in the upper-right corner of the Service Configuration section.

    2. In the Confirm Changes dialog box, configure Description, turn on the Auto-update Configuration switch, and then click OK.

  5. Deploy the client configuration.

    1. On the Configure tab of the ClickHouse service page, click Deploy Client Configuration.

    2. In the Cluster Activities dialog box, configure Description and click OK.

    3. In the Confirm message, click OK.

Step 2: Check the configuration

  1. Log on to the ClickHouse cluster in SSH mode. For more information, see Log on to a cluster.

  2. Run the following command to start the ClickHouse client:
    clickhouse-client -h core-1-1 -m
    Note In the sample command, core-1-1 indicates the name of the core node that you log on to. If you have multiple core nodes, you can log on to one of the nodes.
  3. Execute the following statement to view disk information:

    select * from system.disks;

    The following output is returned:

    ┌─name─────┬─path─────────────────────────────────┬───────────free_space─┬──────────total_space─┬─keep_free_space─┬─type──┐
    │ default  │ /var/lib/clickhouse/                 │          83868921856 │          84014424064 │               0 │ local │
    │ disk1    │ /mnt/disk1/clickhouse/               │          83858436096 │          84003938304 │        10485760 │ local │
    │ disk2    │ /mnt/disk2/clickhouse/               │          83928215552 │          84003938304 │        10485760 │ local │
    │ disk3    │ /mnt/disk3/clickhouse/               │          83928301568 │          84003938304 │        10485760 │ local │
    │ disk4    │ /mnt/disk4/clickhouse/               │          83928301568 │          84003938304 │        10485760 │ local │
    │ disk_hdfs│ /var/lib/clickhouse/disks/disk_hdfs/ │ 18446744073709551615 │ 18446744073709551615 │               0 │ hdfs  │
    └──────────┴──────────────────────────────────────┴──────────────────────┴──────────────────────┴─────────────────┴───────┘
                                
  4. Execute the following statement to view disk storage policies:

    select * from system.storage_policies;

    The following output is returned:

    ┌─policy_name──┬─volume_name─┬─volume_priority─┬─disks──────────────────────────────┬─volume_type─┬─max_data_part_size─┬─move_factor─┬─prefer_not_to_merge─┐
    │ default      │ single      │               1 │ ['disk1','disk2','disk3','disk4']           │JBOD        │                  0 │           0 │                   0 │
    │ hdfs_ttl     │ local       │               1 │ ['disk1','disk2','disk3','disk4']           │JBOD        │                  0 │          0.2 │                   0 │
    │ hdfs_ttl     │ remote      │               2 │ ['disk_hdfs']                         │JBOD        │                  0 │          0.2 │                   0 │
    └──────────────┴─────────────┴─────────────────┴───────────────────────────────────┴─────────────┴────────────────────┴─────────────┴─────────────────────┘

    If the preceding output is returned, the HDFS disk is added.

Step 3: Separate hot and cold data

Reconstruct an existing table

  1. View the current storage policy.

    1. Run the following command on the ClickHouse client to view the disk information:

      SELECT
        storage_policy
      FROM system.tables
      WHERE database='<database_name>' AND name='<table_name>';

      In the preceding command, <database_name> indicates the database name, and <table_name> indicates the table name.

      If the following output is returned, you need to perform the next step to add a volume:

      <default>
        <volumes>
          <single>
            <disk>disk1</disk>
            <disk>disk2</disk>
            <disk>disk3</disk>
            <disk>disk4</disk>
          </single>
        </volumes>
      </default>
  2. Add a volume.

    On the Configure tab of the ClickHouse service page in the EMR console, add a volume to volumes. Sample code:

    <default>
      <volumes>
        <single>
          <disk>disk1</disk>
          <disk>disk2</disk>
          <disk>disk3</disk>
          <disk>disk4</disk>
        </single>
        <!-- The following volume named remote is added. -->
        <remote>
          <disk>disk_hdfs</disk>
        </remote>
      </volumes>
      <!-- If you want to add multiple volumes, specify move_factor. -->
      <move_factor>0.2</move_factor>
    </default>
  3. Run the following command to change the time-to-live (TTL):

    ALTER TABLE <yourDataName>.<yourTableName>
      MODIFY TTL toStartOfMinute(addMinutes(t, 5)) TO VOLUME 'remote';
  4. Run the following command to view the distribution of each data part:

    select partition,name,path from system.parts where database='<yourDataName>' and table='<yourTableName>' and active=1

    The following output is returned:

    ┌─partition───────────┬─name─────────────────┬─path──────────────────────────────────────────────────────────────────────────────────────────────────┐
    │ 2022-01-12 11:30:00 │ 1641958200_1_96_3    │ /var/lib/clickhouse/disks/disk_hdfs/store/156/156008ff-41bf-460c-8848-e34fad88c25d/1641958200_1_96_3/ │
    │ 2022-01-12 11:35:00 │ 1641958500_97_124_2  │ /mnt/disk3/clickhouse/store/156/156008ff-41bf-460c-8848-e34fad88c25d/1641958500_97_124_2/             │
    │ 2022-01-12 11:35:00 │ 1641958500_125_152_2 │ /mnt/disk4/clickhouse/store/156/156008ff-41bf-460c-8848-e34fad88c25d/1641958500_125_152_2/            │
    │ 2022-01-12 11:35:00 │ 1641958500_153_180_2 │ /mnt/disk1/clickhouse/store/156/156008ff-41bf-460c-8848-e34fad88c25d/1641958500_153_180_2/            │
    │ 2022-01-12 11:35:00 │ 1641958500_181_186_1 │ /mnt/disk4/clickhouse/store/156/156008ff-41bf-460c-8848-e34fad88c25d/1641958500_181_186_1/            │
    │ 2022-01-12 11:35:00 │ 1641958500_187_192_1 │ /mnt/disk3/clickhouse/store/156/156008ff-41bf-460c-8848-e34fad88c25d/1641958500_187_192_1/            │
    └─────────────────────┴──────────────────────┴───────────────────────────────────────────────────────────────────────────────────────────────────────┘
    
    6 rows in set. Elapsed: 0.002 sec.
    Note

    If the preceding output is returned, hot and cold data is separated based on the TTL. Hot data is stored in local disks, and cold data is stored in HDFS.

    /var/lib/clickhouse/disks/disk_hdfs is the path that stores the metadata of the HDFS disk. /mnt/disk{1..4}/clickhouse is the local disk path.

Create a table

  • Syntax

    CREATE TABLE <yourDataName>.<yourTableName> [ON CLUSTER cluster_emr]
    (
      column1 Type1,
      column2 Type2,
      ...
    ) Engine = MergeTree() -- You can also use an engine of the Replicated*MergeTree type. 
    PARTITION BY <yourPartitionKey>
    ORDER BY <yourPartitionKey>
    TTL <yourTtlKey> TO VOLUME 'remote'
    SETTINGS storage_policy='hdfs_ttl';
    Note

    In the preceding command, <yourPartitionKey> indicates the partition key for a table in the ClickHouse cluster. <yourTtlKey> indicates the TTL that you specify.

  • Sample code

    CREATE TABLE test.test
    (
        `id` UInt32,
        `t` DateTime
    )
    ENGINE = MergeTree()
    PARTITION BY toStartOfFiveMinute(t)
    ORDER BY id
    TTL toStartOfMinute(addMinutes(t, 5)) TO VOLUME 'remote'
    SETTINGS storage_policy='hdfs_ttl';
    Note

    In this example, the table named test.test stores only data of the last 5 minutes in local disks. Five minutes after data is generated, the data is moved to the volume named remote, which is in HDFS.

Other parameters

  • server-config

    merge_tree.allow_remote_fs_zero_copy_replication: Set the value to true. This way, the engine of the Replicated*MergeTree type replicates the metadata that points to the HDFS disk to generate multiple metadata replicas for the same shard in the ClickHouse cluster.

  • server-users

    profile.${your-profile-name}.hdfs_replication: specifies the number of data replicas that are generated in HDFS.