すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:OSS と ClickHouse 間のデータのインポートとエクスポート

最終更新日:Jan 11, 2025

オブジェクトストレージサービス(OSS)は、Amazon Simple Storage Service(Amazon S3)プロトコルと互換性があります。 E-MapReduce(EMR)ClickHouse クラスターで S3 テーブルエンジンまたは S3 テーブル関数を使用して、OSS からデータを読み書きできます。 このトピックでは、OSS から ClickHouse クラスターにデータをインポートする方法と、ClickHouse クラスターから OSS にデータをエクスポートする方法について説明します。

前提条件

  • OSS でバケットが作成されていること。 詳細については、「バケットの作成」をご参照ください。

  • EMR ClickHouse クラスターが作成されていること。 詳細については、「ClickHouse クラスターの作成」をご参照ください。

OSS から ClickHouse クラスターへのデータのインポート

手順 1:ビジネス テーブルを作成する

  1. SSH モードで ClickHouse クラスターにログオンします。 詳細については、「クラスターへのログオン」をご参照ください。

  2. 次のコマンドを実行して、ClickHouse クライアントを起動します。

    clickhouse-client -h core-1-1 -m
    説明

    サンプルコマンドでは、core-1-1 はログオンするコアノードの名前を示しています。 クラスターに複数のコアノードがある場合は、いずれかのコアノードにログオンできます。

  3. 次のステートメントを実行して、product という名前のデータベースと、product データベースに orders という名前のビジネス テーブルを作成します。

    CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr;
    CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr
    (
        `uid` UInt32,
        `date` DateTime,
        `skuId` UInt32,
        `order_revenue` UInt32
    )
    Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}')
    PARTITION BY toYYYYMMDD(date)
    ORDER BY toYYYYMMDD(date);
    CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr
    (
        `uid` UInt32,
        `date` DateTime,
        `skuId` UInt32,
        `order_revenue` UInt32
    )
    Engine = Distributed(cluster_emr, product, orders, rand());
    説明

    サンプルコードでは、{shard}{replica} は、Alibaba Cloud EMR によって ClickHouse クラスター用に自動的に生成されるマクロであり、直接使用できます。

手順 2:データをインポートする

S3 テーブルエンジンを使用してデータをインポートする

ClickHouse の Hadoop Distributed File System(HDFS)テーブルエンジンは、指定された OSS アドレスから特定の形式のファイルデータを読み取ることができます。 構文:

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
  name1 [type1] [NULL|NOT NULL] [DEFAULT|MATERIALIZED|ALIAS expr1] [compression_codec] [TTL expr1],
  name2 [type2] [NULL|NOT NULL] [DEFAULT|MATERIALIZED|ALIAS expr2] [compression_codec] [TTL expr2],
  ...
)
ENGINE = S3(path, [access_key_id, access_key_secret,] format, [compression]);

パラメーター

説明

db

データベースの名前。

table_name

テーブルの名前。

name1/name2

テーブルの列の名前。

tyep1/type2

テーブルの列のデータ型。

path

OSS バケットに格納されているオブジェクトの OSS パス。

ClickHouse クラスターが OSS バケットにアクセスするために使用するエンドポイントについては、「OSS の内部エンドポイントを使用して ECS インスタンスから OSS リソースにアクセスする」をご参照ください。

path パラメーターは、仮想ホスト型 URL またはパススタイル URL に設定できます。 このパラメーターを仮想ホスト型 URL に設定することをお勧めします。

path パラメーターの値は、次のワイルドカードをサポートしています。

  • * は、スラッシュ(/)以外の任意の文字を示します。 空の文字列もサポートされています。

  • ? は、単一の文字を示します。

  • {str1,str2,...,strn} は、str1、str2、...、strn からの文字列を指定します。

  • {N..M} は、N から M までの値の範囲の数を指定します。N と M には、{001..099} などの先行ゼロを含めることができます。

    重要

    上記のワイルドカードは、テーブルの作成ではなく、SELECT ステートメントでどのファイルを指定するかを決定するために使用されます。 OSS にデータを書き込む場合は、ワイルドカードを使用しないでください。

access_key_id

Alibaba Cloud アカウントの AccessKey ID。

access_key_secret

Alibaba Cloud アカウントの AccessKey シークレット。

format

OSS path に格納されているオブジェクトの形式。CSV や XML などのファイル形式がサポートされています。 詳細については、「入力データと出力データの形式」をご参照ください。

compression

圧縮形式。

このパラメーターはオプションです。 このパラメーターを指定しない場合、システムはファイル名の拡張子に基づいて圧縮形式を使用します。

作成する EMR クラスターのバージョンに基づいて、このパラメーターを指定できます。

  • EMR V3.X シリーズ: nonegzip/gzbrotli/bdeflate、または auto

  • EMR V5.X シリーズ: nonegzip/gzbrotli/blzma(xz)auto、または zstd/zst

  1. OSS からデータを読み取るテーブルを作成します。

    1. サンプルオブジェクト orders.csv をダウンロードし、OSS にアップロードします。 この例では、オブジェクトは test という名前の OSS バケットのルートディレクトリにアップロードされます。

    2. 次のステートメントを実行して、OSS テーブルを作成し、S3 テーブルエンジンを使用します。

      CREATE DATABASE IF NOT EXISTS oss ON CLUSTER cluster_emr;
      CREATE TABLE oss.orders_oss
      (
        uid UInt32,
        date DateTime,
        skuId UInt32,
        order_revenue UInt32
      ) ENGINE = S3('http://test.oss-cn-beijing-internal.aliyuncs.com/orders.csv', '<access_key_id>', '<access_key_secret>', 'CSV');
      説明

      サンプルコードでは、http://test.oss-cn-beijing-internal.aliyuncs.com/orders.csv は、中国(北京)リージョンの test という名前の OSS バケットにある orders.csv オブジェクトへのパスです。

  2. 次のステートメントを実行して、product.orders_all テーブルにデータをインポートします。

    INSERT INTO product.orders_all
    SELECT
      uid,
      date,
      skuId,
      order_revenue
    FROM
      oss.orders_oss;
  3. 次のステートメントを実行して、テーブル内のデータを表示し、データの整合性を確認します。

    • orders_all テーブルのデータをクエリします。

      SELECT count(1) FROM product.orders_all;
    • orders_oss テーブルのデータをクエリします。

      SELECT count(1) FROM oss.orders_oss;

S3 テーブル関数を使用してデータをインポートする

ClickHouse の S3 テーブル関数は、指定された HDFS アドレスからファイルデータを読み取り、指定されたスキーマを持つテーブルを返します。 構文:

s3(path, [access_key_id, access_key_secret,] format, structure, [compression])

パラメーター

説明

path

OSS バケットに格納されているオブジェクトの OSS パス。

ClickHouse クラスターが OSS バケットにアクセスするために使用するエンドポイントについては、「OSS の内部エンドポイントを使用して ECS インスタンスから OSS リソースにアクセスする」をご参照ください。

path パラメーターは、仮想ホスト型 URL またはパススタイル URL に設定できます。 このパラメーターを仮想ホスト型 URL に設定することをお勧めします。

path パラメーターの値は、次のワイルドカードをサポートしています。

  • * は、スラッシュ(/)以外の任意の文字を示します。 空の文字列もサポートされています。

  • ? は、単一の文字を示します。

  • {str1,str2,...,strn} は、str1、str2、...、strn からの文字列を指定します。

  • {N..M} は、N から M までの値の範囲の数を指定します。N と M には、{001..099} などの先行ゼロを含めることができます。

    重要

    上記のワイルドカードは、テーブルの作成ではなく、SELECT ステートメントでどのファイルを指定するかを決定するために使用されます。 OSS にデータを書き込む場合は、ワイルドカードを使用しないでください。

access_key_id

Alibaba Cloud アカウントの AccessKey ID。

access_key_secret

Alibaba Cloud アカウントの AccessKey シークレット。

format

OSS path に格納されているオブジェクトの形式。CSV や XML などのファイル形式がサポートされています。 詳細については、「入力データと出力データの形式」をご参照ください。

structure

テーブルのフィールドのデータ型。 例:column1 UInt32、column2 String。

compression

圧縮形式。

このパラメーターはオプションです。 このパラメーターを指定しない場合、システムはファイル名の拡張子に基づいて圧縮形式を使用します。

作成する EMR クラスターのバージョンに基づいて、このパラメーターを指定できます。

  • EMR V3.X シリーズ: nonegzip/gzbrotli/bdeflate、または auto

  • EMR V5.X シリーズ: nonegzip/gzbrotli/blzma(xz)auto、または zstd/zst

  1. S3 テーブル関数を使用して、ClickHouse クラスターにデータをインポートします。

    INSERT INTO product.orders_all
    SELECT
      uid,
      date,
      skuId,
      order_revenue
    FROM
      s3('http://test.oss-cn-beijing-internal.aliyuncs.com/orders.csv',
          '<your-access-key>',
          '<your-access-secret>',
          'CSV',
          'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32');
  2. 次のステートメントを実行して、テーブル内のデータを表示し、データの整合性を確認します。

    • orders_all テーブルのデータをクエリします。

      SELECT count(1) FROM product.orders_all;
    • orders_oss テーブルのデータをクエリします。

      SELECT count(1) FROM oss.orders_oss;

ClickHouse クラスターから OSS にデータをエクスポートする

手順 1:ビジネス テーブルを作成する

この例では、データのエクスポートに使用するビジネス テーブルスキーマは、データのインポートに使用するものと同じです。 詳細については、「手順 1:ビジネス テーブルを作成する」をご参照ください。

手順 2:データを準備する

  1. 次のステートメントを実行して、product.orders_all ビジネス テーブルにデータを挿入し、後続のエクスポート操作のためのデータを準備します。

    INSERT INTO product.orders_all VALUES 
      (60333391,'2021-08-04 11:26:01',49358700,89) 
      (38826285,'2021-08-03 10:47:29',25166907,27) 
      (10793515,'2021-07-31 02:10:31',95584454,68) 
      (70246093,'2021-08-01 00:00:08',82355887,97) 
      (70149691,'2021-08-02 12:35:45',68748652,1)  
      (87307646,'2021-08-03 19:45:23',16898681,71) 
      (61694574,'2021-08-04 23:23:32',79494853,35) 
      (61337789,'2021-08-02 07:10:42',23792355,55) 
      (66879038,'2021-08-01 16:13:19',95820038,89);
  2. オプション。 データを書き込むオブジェクトが既に存在する場合に発生するエクスポートエラーを防ぐために、エクスポート方法を指定します。 V5.8.0、V3.45.0 以降のマイナーバージョンの EMR クラスターのエクスポート方法を指定できます。

    増分エクスポート

    次のパラメーターを設定した後、データを書き込むオブジェクトが OSS に既に存在する場合、関連ディレクトリに新しいオブジェクトが作成され、増分データが格納されます。

    set s3_create_new_file_on_insert=1

    上書きエクスポート

    次のパラメーターを設定した後、データを書き込むオブジェクトが OSS に既に存在する場合、書き込むデータは関連オブジェクトの既存のデータを上書きします。 注意して進めてください。

    set s3_truncate_on_insert=1

手順 3:データをエクスポートする

S3 テーブルエンジンを使用してデータをエクスポートする

  1. 次のステートメントを実行して、S3 テーブルを作成します。

    CREATE TABLE oss.orders_oss
    (
      uid UInt32,
      date DateTime,
      skuId UInt32,
      order_revenue UInt32
    ) ENGINE = S3('http://test.oss-cn-beijing-internal.aliyuncs.com/orders.csv', '<access_key_id>', '<access_key_secret>', 'CSV');
  2. 次のステートメントを実行して、テーブルにデータを書き込みます。

    -- この例では、ビジネス テーブル product.orders_all が使用されます。
    INSERT INTO oss.orders_oss
    SELECT
      uid,
      date,
      skuId,
      order_revenue
    FROM
      product.orders_all;
    説明

    データのエクスポート中、ClickHouse は関連パスにオブジェクトを作成し、オブジェクトにデータを書き込みます。 デフォルトでは、データを書き込むオブジェクトが OSS に既に存在する場合、データのエクスポートは失敗します。 V5.8.0、V3.45.0 以降のマイナーバージョンの EMR クラスターの パラメーターを設定 して、エラーを防ぐことができます。

  3. OSS コンソール でデータを表示します。

HDFS テーブル関数を使用してデータをエクスポートする

  1. 次のステートメントを実行して、データをエクスポートします。

    INSERT INTO FUNCTION
    s3('http://test.oss-cn-beijing-internal.aliyuncs.com/orders.csv',
          '<your-access-key>',
          '<your-access-secret>',
          'CSV',
          'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32')
    SELECT
      uid,
      date,
      skuId,
      order_revenue
    FROM
      product.orders_all;
    説明

    データのエクスポート中、ClickHouse は関連パスにオブジェクトを作成し、オブジェクトにデータを書き込みます。 デフォルトでは、データを書き込むオブジェクトが OSS に既に存在する場合、データのエクスポートは失敗します。 V5.8.0、V3.45.0 以降のマイナーバージョンの EMR クラスターの パラメーターを設定 して、エラーを防ぐことができます。

  2. OSS コンソール でデータを表示します。

OSS 関連のパラメーターを設定する

profile

  1. サポートされているプロファイル

    マルチパートアップロード方式を使用して OSS にファイルをアップロードする場合、s3_min_upload_part_size パラメーターを使用して、ファイルの各パートの最小サイズを指定できます。 デフォルトの最小サイズは 512 MB です。 このパラメーターの値は、UInt64 値型の整数である必要があります。

  2. 設定方法

    • 次のサンプルコードは、単一の SQL クエリで s3_min_upload_part_size パラメーターを指定する方法を示しています。

      INSERT INTO OSS_TABLE
      SELECT
        ...
      FROM
        ...
      SETTINGS
        s3_min_upload_part_size=1073741824;
    • 次のサンプルコードは、セッションで s3_min_upload_part_size パラメーターを指定する方法を示しています。

      SET s3_min_upload_part_size=1073741824;
      INSERT INTO OSS_TABLE
      SELECT
        ...
      FROM
        ...
      ;
    • 次のサンプルコードは、テーブルの s3_min_upload_part_size パラメーターを指定する方法を示しています。

      CREATE TABLE OSS_TABLE
      (
        ...
      ) ENGINE = s3(...)
      SETTINGS
        s3_min_upload_part_size=1073741824;
    • ユーザーの s3_min_upload_part_size パラメーターを指定します。

      EMR コンソールの ClickHouse サービスページの [設定] タブで、[サーバーユーザー] タブをクリックします。 このタブで、[設定項目の追加] をクリックします。 [設定項目の追加] ダイアログボックスで、キーが users.<YourUserName>.s3_min_upload_part_size で、値が 1073741824 の設定項目を追加します。

configuration

次のサンプルコードは、EMR ClickHouse クラスターで OSS 関連のパラメーターを指定する方法を示しています。

<s3>
    <endpoint-name>
        <endpoint>https://oss-cn-beijing-internal.aliyuncs.com/bucket</endpoint>
        <access_key_id>ACCESS_KEY_ID</access_key_id>
        <secret_access_key>ACCESS_KEY_SECRET</secret_access_key>
    </endpoint-name>
</s3>

次の表は、サンプルコードのパラメーターについて説明しています。

パラメーター

説明

endpoint-name

エンドポイントに関する情報。

endpoint

OSS にアクセスするために使用されるエンドポイント。 詳細については、「OSS ドメイン名」をご参照ください。

access_key_id

Alibaba Cloud アカウントの AccessKey ID。

secret_access_key

Alibaba Cloud アカウントの AccessKey シークレット。

EMR コンソールの ClickHouse サービスページの [設定] タブに移動し、[サーバー設定] タブをクリックして、次のいずれかの方法でカスタム設定を追加することもできます。

方法

説明

方法 1

キーが oss.<endpoint-name>.endpointoss.<endpoint-name>.access_key_idoss.<endpoint-name>.secret_access_key の設定項目を追加し、それらの値を指定します。

説明

キーの <endpoint-name> は、OSS のエンドポイントに置き換える必要があります。

方法 2

キーが oss で、値が次の形式の設定項目を追加します。

<endpoint-name>
  <endpoint>https://oss-cn-beijing-internal.aliyuncs.com/bucket</endpoint>
  <access_key_id>ACCESS_KEY_ID</access_key_id>
  <secret_access_key>ACCESS_KEY_SECRET</secret_access_key>
</endpoint-name>
説明

パラメーターの値は、実際のエンドポイント、AccessKey ID、AccessKey シークレットに置き換える必要があります。

上記の設定を完了した後、次のステートメントを実行して OSS テーブルを作成し、OSS テーブル関数を使用できます。

  • OSS テーブルを作成する

    CREATE TABLE OSS_TABLE
    (
      column1 UInt32,
      column2 String
      ...
    )
    ENGINE = S3(path, format, [compression]);
  • OSS テーブル関数を使用する

    s3(path, format, structure, [compression]);