すべてのプロダクト
Search
ドキュメントセンター

Hologres:Spark を使用して Hologres のデータを読み書きする

最終更新日:Nov 09, 2025

Spark は、大規模なデータ処理のための統合分析エンジンです。Hologres は、オープンソースの Apache Spark および E-MapReduce (EMR) Spark と統合されており、企業が迅速にデータウェアハウスを構築するのに役立ちます。Hologres が提供する Spark コネクタを使用すると、Spark クラスターに Hologres カタログを作成できます。これにより、外部テーブルを使用して Hologres のデータをバッチで読み書きできます。Spark コネクタは、ネイティブの Java Database Connectivity (JDBC) よりも優れたパフォーマンスを提供します。

制限事項

Spark コネクタは、V1.3 以降の Hologres インスタンスでのみサポートされています。インスタンスのバージョンは、Hologres コンソール[インスタンス詳細] ページで確認できます。インスタンスのバージョンが V1.3 より前の場合は、インスタンスをスペックアップするか、Hologres DingTalk グループ (ID: 32314975) に参加してインスタンスのスペックアップをリクエストすることができます。

準備

  • spark-sql、spark-shell、または pyspark コマンドを実行できる Spark 環境をインストールします。依存関係の問題を回避し、より多くの機能にアクセスするために、Spark 3.3.0 以降を使用することをお勧めします。

    • EMR Spark を使用して、Spark 環境を迅速に構築し、Hologres インスタンスに接続できます。詳細については、「EMR における Spark の拡張機能」をご参照ください。

    • 独立した Spark 環境をセットアップすることもできます。詳細については、「Apache Spark」をご参照ください。

  • Spark を使用して Hologres からデータを読み書きするには、hologres-connector-spark-3.x コネクタ JAR パッケージが必要です。このトピックでは、バージョン 1.5.2 を例として使用します。パッケージは Maven Central Repository からダウンロードできます。コネクタのリソースはオープンソースです。詳細については、「Hologres-Connectors」をご参照ください。

  • Java で Spark ジョブを開発し、IntelliJ IDEA などのツールでローカルデバッグを実行するには、次の Maven 依存関係を pom.xml ファイルに追加します。

    <dependency>
        <groupId>com.alibaba.hologres</groupId>
        <artifactId>hologres-connector-spark-3.x</artifactId>
        <version>1.5.2</version>
        <classifier>jar-with-dependencies</classifier>
    </dependency>

Hologres カタログ

Hologres カタログは、Spark コネクタ 1.5.2 以降でサポートされています。外部テーブルを使用して、Hologres のデータを簡単に読み書きできます。

Spark の各 Hologres カタログは、Hologres のデータベースにマッピングされます。Hologres カタログの各名前空間は、マッピングされたデータベースのスキーマにマッピングされます。以下のセクションでは、Spark で Hologres カタログを使用する方法について説明します。

説明

Hologres カタログはテーブル作成をサポートしていません。

このトピックでは、Hologres インスタンスで次のデータベースとテーブルを使用します:

test_db -- データベース
  public.test_table1 -- public スキーマのテーブル
  public.test_table2
  test_schema.test_table3  -- test_schema スキーマのテーブル 

Hologres カタログを初期化する

Spark クラスターで spark-sql を起動し、Hologres コネクタをロードして、カタログパラメーターを指定します。

spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
--conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \
--conf spark.sql.catalog.hologres_external_test_db.username=*** \
--conf spark.sql.catalog.hologres_external_test_db.password=*** \
--conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db

一般的な Hologres カタログコマンド

  • Hologres カタログのロード

    Spark の Hologres カタログは Hologres データベースにマッピングされます。このマッピングはセッション中に変更できません。

    USE hologres_external_test_db;
  • すべての名前空間をクエリします。

    Spark の 名前空間 は、Hologres の スキーマ にマッピングされます。デフォルトのスキーマは public です。USE コマンドを使用して、デフォルトのスキーマを変更できます。

    -- Hologres カタログのすべての名前空間を表示します。これは Hologres データベースのすべてのスキーマに対応します。
    SHOW NAMESPACES;
  • 名前空間内のテーブルをクエリします。

    • すべてのテーブルをクエリします。

      SHOW TABLES;
    • 特定の名前空間内のテーブルをクエリします。

      USE test_schema;
      SHOW TABLES;
      
      -- または、次のステートメントを使用します。 
      SHOW TABLES IN test_schema;
  • テーブルからのデータの読み取りと書き込み

    SELECT および INSERT ステートメントを使用して、カタログ内の Hologres 外部テーブルからデータを読み書きします。

    -- テーブルからデータを読み取ります。
    SELECT * FROM public.test_table1;
    
    -- テーブルにデータを書き込みます。
    INSERT INTO test_schema.test_table3 SELECT * FROM public.test_table1;

Hologres にデータをインポートする

このセクションでは、TPC-H データセットの customer テーブルのテストデータを使用します。サンプルの customer データを CSV フォーマットでダウンロードできます。次の SQL 文は、customer テーブルスキーマを作成します。

CREATE TABLE customer_holo_table
(
  c_custkey    BIGINT ,
  c_name       TEXT   ,
  c_address    TEXT   ,
  c_nationkey  INT    ,
  c_phone      TEXT   ,
  c_acctbal    DECIMAL(15,2) ,
  c_mktsegment TEXT   ,
  c_comment    TEXT
);

Spark-SQL を使用したデータのインポート

Spark-SQL を使用する場合、カタログを使用して Hologres テーブルのメタデータをロードすると便利です。一時テーブルを作成して Hologres テーブルを宣言することもできます。

説明
  • 1.5.2 より前の Spark コネクタはカタログをサポートしていません。一時テーブルを作成して Hologres テーブルを宣言することしかできません。

  • Hologres-Connector-Spark のパラメーターの詳細については、「パラメーター」をご参照ください。

  1. Hologres カタログを初期化します。

    spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
    --conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \
    --conf spark.sql.catalog.hologres_external_test_db.username=*** \
    --conf spark.sql.catalog.hologres_external_test_db.password=*** \
    --conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db
  2. CSV ソーステーブルから Hologres 外部テーブルにデータをインポートします。

    説明

    Spark の INSERT INTO 構文は、column_list を使用して書き込み対象の一部の列を指定することをサポートしていません。たとえば、INSERT INTO hologresTable(c_custkey) SELECT c_custkey FROM csvTable を使用して c_custkey フィールドにのみデータを書き込むことはできません。

    特定のフィールドにデータを書き込むには、CREATE TEMPORARY VIEW 文を使用して、必要なフィールドのみを含む Hologres の一時テーブルを宣言します。

    カタログを使用したデータの書き込み

    -- Hologres カタログをロードします。
    USE hologres_external_test_db;
    
    -- CSV データソースを作成します。
    CREATE TEMPORARY VIEW csvTable (
        c_custkey BIGINT,
        c_name STRING,
        c_address STRING,
        c_nationkey INT,
        c_phone STRING,
        c_acctbal DECIMAL(15, 2),
        c_mktsegment STRING,
        c_comment STRING)
    USING csv OPTIONS (
        path "resources/customer", sep "," -- ローカルテストの場合は、ファイルの絶対パスを使用します。
    );
    
    -- CSV テーブルから Hologres にデータを書き込みます。
    INSERT INTO public.customer_holo_table SELECT * FROM csvTable;

    一時ビューを使用したデータの書き込み

    -- CSV データソースを作成します。
    CREATE TEMPORARY VIEW csvTable (
        c_custkey BIGINT,
        c_name STRING,
        c_address STRING,
        c_nationkey INT,
        c_phone STRING,
        c_acctbal DECIMAL(15, 2),
        c_mktsegment STRING,
        c_comment STRING)
    USING csv OPTIONS (
        path "resources/customer", sep ","
    );
    
    -- Hologres 一時テーブルを作成します。
    CREATE TEMPORARY VIEW hologresTable (
        c_custkey BIGINT,
        c_name STRING,
        c_phone STRING)
    USING hologres OPTIONS (
        jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db",
        username "***", 
        password "***", 
        table "customer_holo_table"
    );
    
    INSERT INTO hologresTable SELECT c_custkey,c_name,c_phone FROM csvTable;

DataFrame を使用したデータのインポート

spark-shell や pyspark などのツールを使用して Spark ジョブを開発し、DataFrame の `write` メソッドを呼び出してデータを書き込むことができます。CSV ファイルから読み取られたデータは DataFrame に変換され、Hologres インスタンスに書き込まれます。以下のセクションでは、さまざまなプログラミング言語のサンプルコードを提供します。Hologres Connector for Spark のパラメーターの詳細については、「パラメーター」をご参照ください。

Scala

import org.apache.spark.sql.types._
import org.apache.spark.sql.SaveMode

// CSV ソースのスキーマ。
val schema = StructType(Array(
  StructField("c_custkey", LongType),
  StructField("c_name", StringType),
  StructField("c_address", StringType),
  StructField("c_nationkey", IntegerType),
  StructField("c_phone", StringType),
  StructField("c_acctbal", DecimalType(15, 2)),
  StructField("c_mktsegment", StringType),
  StructField("c_comment", StringType)
))

// CSV ファイルからデータを DataFrame として読み取ります。
val csvDf = spark.read.format("csv").schema(schema).option("sep", ",").load("resources/customer")

// DataFrame を Hologres に書き込みます。
csvDf.write
.format("hologres")
.option("username", "***")
.option("password", "***")
.option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
.option("table", "customer_holo_table")
.mode(SaveMode.Append)
.save()

Java

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.SaveMode;
import java.util.Arrays;
import java.util.List;

public class SparkTest {
    public static void main(String[] args) {
        // CSV ソースのスキーマ。
        List<StructField> asList =
                Arrays.asList(
                        DataTypes.createStructField("c_custkey", DataTypes.LongType, true),
                        DataTypes.createStructField("c_name", DataTypes.StringType, true),
                        DataTypes.createStructField("c_address", DataTypes.StringType, true),
                        DataTypes.createStructField("c_nationkey", DataTypes.IntegerType, true),
                        DataTypes.createStructField("c_phone", DataTypes.StringType, true),
                        DataTypes.createStructField("c_acctbal", new DecimalType(15, 2), true),
                        DataTypes.createStructField("c_mktsegment", DataTypes.StringType, true),
                        DataTypes.createStructField("c_comment", DataTypes.StringType, true));
        StructType schema = DataTypes.createStructType(asList);

        // ローカルモードで実行します。
        SparkSession spark = SparkSession.builder()
                .appName("Spark CSV Example")
                .master("local[*]") 
                .getOrCreate();

        // CSV ファイルからデータを DataFrame として読み取ります。
        // ローカルテストの場合は、customer データの絶対パスを使用します。
        Dataset<Row> csvDf = spark.read().format("csv").schema(schema).option("sep", ",").load("resources/customer");

        // DataFrame を Hologres に書き込みます。
        csvDf.write.format("hologres").option(
           "username", "***").option(
           "password", "***").option(
           "jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
           "table", "customer_holo_table").mode(
           "append").save()
    }
}

Maven ファイルには次の構成が必要です。

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.13</artifactId>
      <version>3.5.4</version>
      <scope>provided</scope>
</dependency>

Python

from pyspark.sql.types import *

# CSV ソースのスキーマ。
schema = StructType([
    StructField("c_custkey", LongType()),
    StructField("c_name", StringType()),
    StructField("c_address", StringType()),
    StructField("c_nationkey", IntegerType()),
    StructField("c_phone", StringType()),
    StructField("c_acctbal", DecimalType(15, 2)),
    StructField("c_mktsegment", StringType()),
    StructField("c_comment", StringType())
])

# CSV ファイルからデータを DataFrame として読み取ります。
csvDf = spark.read.csv("resources/customer", header=False, schema=schema, sep=',')

# DataFrame を Hologres に書き込みます。
csvDf.write.format("hologres").option(
    "username", "***").option(
    "password", "***").option(
    "jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
    "table", "customer_holo_table").mode(
    "append").save()

Spark を使用してさまざまなプログラミング言語でジョブを実行するには、次の手順を実行します:

  • Scala

    • サンプルコードを使用して sparktest.scala ファイルを生成し、次のコマンドを実行してジョブを実行できます。

      -- 依存関係を読み込みます。
      spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar
      
      -- ローカルテストの場合は、絶対パスを使用してファイルを読み込みます。
      scala> :load D:/sparktest.scala
    • または、依存関係がロードされた後にサンプルコードを貼り付けて実行することもできます。

  • Java

    開発ツールを使用してサンプルコードを追加し、Maven ツールを使用してパッケージ化できます。たとえば、パッケージ名が spark_test.jar の場合、次のコマンドを実行してジョブを実行します。

    -- ジョブ JAR パッケージの絶対パスを使用します。
    spark-submit --class SparkTest  --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar  D:\spark_test.jar
  • Python

    次のコマンドが実行された後、サンプルコードを貼り付けて実行できます。

    pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar

Hologres からデータを読み取る

  • Spark コネクタ 1.3.2 以降では、Hologres からデータを読み取ることができます。 Spark のデフォルトの jdbc-connector と比較して、spark-connector はテーブルのシャードに基づいて Hologres テーブルからデータを同時に読み取ることができます。 これにより、読み取りパフォーマンスが向上します。 読み取りの同時実行数は、テーブル内のシャード数によって決まります。 read.max_task_count パラメーターを使用して、spark-connector の読み取りの同時実行数を制限できます。 ジョブは、Min(shardCount, max_task_count) と等しい数の読み取りタスクを生成します。 スキーマ推論もサポートされています。 スキーマを指定しない場合、Spark スキーマは Hologres テーブルのスキーマから推論されます。

  • Spark コネクタ 1.5.0 以降では、Hologres テーブルからデータを読み取る際に、述語プッシュダウン、LIMIT プッシュダウン、およびフィールドプルーニングがサポートされています。また、Hologres の SELECT QUERY を使用してデータを読み取ることもできます。このバージョンではバッチ読み取りモードがサポートされており、以前のバージョンと比較して読み取りパフォーマンスが 3〜4 倍向上します。

Spark-SQL を使用したデータの読み取り

Spark-SQL を使用する場合、カタログを使用して Hologres テーブルのメタデータをロードすると便利です。一時テーブルを作成して Hologres テーブルを宣言することもできます。

説明
  • 1.5.2 より前の Spark コネクタはカタログをサポートしていません。一時テーブルを作成して Hologres テーブルを宣言することしかできません。

  • Hologres-Connector-Spark のパラメーターの詳細については、「パラメーター」をご参照ください。

  1. Hologres カタログを初期化します。

    spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
    --conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \
    --conf spark.sql.catalog.hologres_external_test_db.username=*** \
    --conf spark.sql.catalog.hologres_external_test_db.password=*** \
    --conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db
  2. Hologres データを読み取ります。

    • カタログを使用したデータの読み取り

      -- Hologres カタログをロードします。
      USE hologres_external_test_db;
      
      -- Hologres テーブルからデータを読み取ります。フィールドプルーニングと述語プッシュダウンがサポートされています。
      SELECT c_custkey,c_name,c_phone FROM public.customer_holo_table WHERE c_custkey < 500 LIMIT 10;
    • 一時テーブルを作成してデータを読み取ります。

      CREATE TEMPORARY VIEW (table)

      CREATE TEMPORARY VIEW hologresTable
      USING hologres OPTIONS (
        jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db",
        username "***", 
        password "***", 
        read.max_task_count "80", // Hologres テーブルを読み取り用に分割できるタスクの最大数。
        table "customer_holo_table"
      );
      
      -- フィールドプルーニングと述語プッシュダウンがサポートされています。
      SELECT c_custkey,c_name,c_phone FROM hologresTable WHERE c_custkey < 500 LIMIT 10;

      CREATE TEMPORARY VIEW (query)

      CREATE TEMPORARY VIEW hologresTable
      USING hologres OPTIONS (
        jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db",
        username "***", 
        password "***", 
        read.query "select c_custkey,c_name,c_phone from customer_holo_table where c_custkey < 500 limit 10"
      );
      
      SELECT * FROM hologresTable LIMIT 5;

Hologres データを DataFrame として読み取る

spark-shell や pyspark などのツールを使用して Spark ジョブを開発し、Spark の read メソッドを呼び出してデータを DataFrame として読み取り、後続の処理を行うことができます。以下のセクションでは、さまざまなプログラミング言語で Hologres テーブルからデータを DataFrame として読み取る方法を示すサンプルコードを提供します。Hologres-Connector-Spark のパラメーターの詳細については、「パラメーター」をご参照ください。

Scala

val readDf = (
  spark.read
    .format("hologres")
    .option("username", "***")
    .option("password", "***")
    .option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
    .option("table", "customer_holo_table")
    .option("read.max_task_count", "80") // Hologres テーブルを読み取り用に分割できるタスクの最大数。
    .load()
    .filter("c_custkey < 500")
)

readDf.select("c_custkey", "c_name", "c_phone").show(10)

Java

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class SparkSelect {
    public static void main(String[] args) {
        
        // ローカルモードで実行します。
        SparkSession spark = SparkSession.builder()
                .appName("Spark CSV Example")
                .master("local[*]") 
                .getOrCreate();
                
        Dataset<Row> readDf = (
           spark.read
                .format("hologres")
                .option("username", "***")
                .option("password", "***")
                .option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
                .option("table", "customer_holo_table")
                .option("read.max_task_count", "80") // Hologres テーブルを読み取り用に分割できるタスクの最大数。
                .load()
                .filter("c_custkey < 500")
        );
        readDf.select("c_custkey", "c_name", "c_phone").show(10);
    }
}

Maven ファイルには次の構成が必要です。

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.13</artifactId>
      <version>3.5.4</version>
      <scope>provided</scope>
</dependency>

Python

readDf = spark.read.format("hologres").option(
"username", "***").option(
"password", "***").option(
"jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
"table", "customer_holo_table").option(
"read.max_task_count", "80").load()

readDf.select("c_custkey", "c_name", "c_phone").show(10)

Spark を使用してさまざまなプログラミング言語でジョブを実行するには、次の手順を実行します:

  • Scala

    • サンプルコードを使用して sparkselect.scala ファイルを生成し、次のコマンドを実行してジョブを実行できます。

      -- 依存関係を読み込みます。
      spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar
      
      -- ローカルテストの場合は、絶対パスを使用してファイルを読み込みます。
      scala> :load D:/sparkselect.scala
    • または、依存関係がロードされた後にサンプルコードを貼り付けて実行することもできます。

  • Java

    開発ツールを使用してサンプルコードを追加し、Maven ツールを使用してパッケージ化できます。たとえば、パッケージ名が spark_select.jar の場合、次のコマンドを実行してジョブを実行します。

    -- ジョブ JAR パッケージの絶対パスを使用します。
    spark-submit --class SparkSelect --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar  D:\spark_select.jar
  • Python

    次のコマンドが実行された後、サンプルコードを貼り付けて実行できます。

    pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar

パラメータ

一般パラメータ

パラメータ

デフォルト値

必須

説明

username

なし

はい

  • Alibaba Cloud アカウントの AccessKey ID。AccessKey ID を取得するには、[AccessKey 管理] ページに移動します。

  • アカウントの名前。アカウントを作成した後、データベース権限を付与する必要があります。詳細については、「Hologres 権限モデル」をご参照ください。権限が付与されているかどうかは、「HoloWeb に接続してクエリを実行する」の手順に従って確認できます。

password

なし

はい

  • アカウントの AccessKey Secret。AccessKey Secret の取得方法の詳細については、「AccessKey の作成」をご参照ください。

  • 作成されたアカウントのパスワード。

table

なし

はい

データの読み書きに使用する Hologres テーブルの名前。

説明

データを読み取る際に、このパラメーターを read.query パラメーターに置き換えることもできます。

jdbcurl

なし

はい

Hologres リアルタイムデータ API の JDBC URL。jdbc:postgresql://<host>:<port>/<db_name> の形式です。[Hologres コンソール] に移動します。左側のナビゲーションウィンドウで、[インスタンス] をクリックします。対象のインスタンスを見つけて、[インスタンス詳細] ページに移動します。[ネットワーク情報] セクションで、ホストとポート番号を取得します。

enable_serverless_computing

false

いいえ

サーバーレスコンピューティングリソースを使用するかどうかを指定します。このパラメーターは、データの読み取りと bulk_load モードでのデータ書き込みにのみ有効です。詳細については、「サーバーレスコンピューティングユーザーガイド」をご参照ください。

serverless_computing_query_priority

3

いいえ

サーバーレスコンピューティングの実行優先度。

statement_timeout_seconds

28800 (8 時間)

いいえ

クエリのタイムアウト期間。単位: 秒。

retry_count

3

いいえ

接続に失敗した場合の再試行回数。

direct_connect

ダイレクト接続をサポートする環境では、デフォルトでダイレクト接続が使用されます。

いいえ

バッチデータの読み書きのボトルネックは、通常、エンドポイントのネットワークスループットです。そのため、システムは現在の環境が Hologres フロントエンド (アクセスノード) に直接接続できるかどうかをテストします。ダイレクト接続がサポートされている場合、デフォルトで使用されます。このパラメーターを false に設定すると、ダイレクト接続は使用されません。

書き込みパラメーター

Hologres コネクタは、Spark の SaveMode パラメーターをサポートしています。SQL の場合、これは INSERT INTO または INSERT OVERWRITE に対応します。DataFrame の場合、データを書き込むときに SaveMode を Append または Overwrite に設定できます。Overwrite モードを使用すると、データ書き込み用の一時テーブルが作成されます。書き込み操作が成功した後、一時テーブルが元のテーブルを置き換えます。Overwrite モードは必要な場合にのみ使用してください。

パラメーター名

以前のパラメーター名

デフォルト値

必須

説明

write.mode

copy_write_mode

auto

いいえ

書き込みモード。書き込みモードの違いの詳細については、「バッチ書き込みモードの比較」をご参照ください。有効な値:

  • auto (デフォルト): コネクタは、Hologres インスタンスのバージョンと宛先テーブルのメタデータに基づいて、最適なモードを自動的に選択します。次の選択ロジックが適用されます:

    1. Hologres インスタンスが V2.2.25 より新しいバージョンで、テーブルにプライマリキーがある場合、bulk_load_on_conflict モードが使用されます。

    2. Hologres インスタンスが V2.1.0 より新しいバージョンで、テーブルにプライマリキーがない場合、bulk_load モードが使用されます。

    3. Hologres インスタンスが V1.3 より新しい場合、stream モードが使用されます。

    4. その他の場合、insert モードが使用されます。

  • stream: 固定プランでの COPY。COPY ステートメントは、Hologres V1.3 以降の固定プランでサポートされています。INSERT ステートメントと比較して、COPY ステートメントはストリームモードを使用するため、より高いスループットを提供します。COPY ステートメントは、データをバッチ処理しないため、データ遅延が低く、クライアントのメモリ消費量も削減されます。

    説明

    このモードには、Hologres コネクタ 1.3.0 以降および Hologres V1.3.34 以降が必要です。

  • bulk_load: このモードはバッチ COPY を使用します。固定プランでのストリーミング COPY と比較して、バッチ COPY は高い 1 秒あたりのレコード数 (RPS) 条件下で Hologres インスタンスへの負荷が低くなります。ただし、このモードはプライマリキーのないテーブルへのデータ書き込みのみをサポートします。

    説明

    このモードには、Hologres コネクタ 1.4.2 以降および Hologres V2.1.0 以降が必要です。

  • bulk_load_on_conflict: このモードは、バッチ COPY を使用してプライマリキーを持つテーブルにデータを書き込み、重複するプライマリキーを処理できます。デフォルトでは、プライマリキーを持つ Hologres テーブルにデータをバッチでインポートすると、テーブルロックがトリガーされます。これにより、複数の接続を使用して同時にデータを書き込む機能が制限されます。現在のコネクタでは、宛先の Hologres テーブルの分散キーに基づいてデータを再配布できます。これにより、各 Spark タスクが 1 つのシャードにのみデータを書き込むことが保証されます。これにより、元のテーブルレベルのロックがシャードレベルのロックに縮小され、同時書き込みが可能になり、書き込みパフォーマンスが向上します。各接続は少数のシャードのデータのみを維持する必要があるため、この最適化により、小さなファイルの数と Hologres のメモリ使用量も大幅に削減できます。テストによると、同時書き込みの前にデータを再パーティション化すると、ストリームモードでの書き込みと比較してシステム負荷が約 67% 削減されます。

    説明

    このモードには、Hologres コネクタ 1.4.2 以降および Hologres V2.2.25 以降が必要です。

  • insert: このモードは INSERT メソッドを使用してデータを書き込みます。

write.copy.max_buffer_size

max_cell_buffer_size

52428800 (50 MB)

いいえ

COPY モードでデータを書き込む際のローカルバッファーの最大サイズ。通常、この値を調整する必要はありません。ただし、非常に長い文字列などの大きなフィールドを書き込むときにバッファーオーバーフローが発生した場合は、この値を増やしてください。

write.copy.dirty_data_check

copy_write_dirty_data_check

false

いいえ

ダーティデータをチェックするかどうかを指定します。この機能を有効にすると、ダーティデータが見つかったときに書き込みに失敗した特定の行を正確に特定できます。ただし、これは書き込みパフォーマンスに影響します。したがって、トラブルシューティングの場合を除き、この機能を有効にしないでください。

write.on_conflict_action

write_mode

INSERT_OR_REPLACE

いいえ

INSERT ステートメントの宛先テーブルにプライマリキーがある場合に使用されるポリシー:

  • INSERT_OR_IGNORE: プライマリキーの競合が発生した場合、データは書き込まれません。

  • INSERT_OR_UPDATE: プライマリキーの競合が発生した場合、対応する列が更新されます。

  • INSERT_OR_REPLACE: プライマリキーの競合が発生した場合、すべての列が更新されます。

write.modeinsert に設定されている場合にのみ、次のパラメーターが有効になります。

パラメーター名

以前のパラメーター名

デフォルト値

必須

説明

write.insert.dynamic_partition

dynamic_partition

false

いいえ

このパラメーターは、copy_write_modeinsert に設定されている場合に有効になります。このパラメーターが true に設定されている場合、親パーティションテーブルにデータを書き込むときに、存在しないパーティションが自動的に作成されます。

write.insert.batch_size

write_batch_size

512

いいえ

各書き込みスレッドの最大バッチサイズ。マージ後の Put 操作の数が WriteBatchSize の値に達すると、バッチコミットが実行されます。

write.insert.batch_byte_size

write_batch_byte_size

2097152 (2 × 1024 × 1024)

いいえ

各書き込みスレッドの最大バッチサイズ。単位: バイト。デフォルト値: 2 MB。マージ後の Put データのバイトサイズが WriteBatchByteSize の値に達すると、バッチコミットが実行されます。

write.insert.max_interval_ms

write_max_interval_ms

10000

いいえ

最後のコミットからの経過時間が WriteMaxIntervalMs の値を超えると、バッチコミットがトリガーされます。

write.insert.thread_size

write_thread_size

1

いいえ

同時書き込みスレッドの数。各同時スレッドは 1 つのデータベース接続を占有します。

読み取りパラメーター

パラメーター名

以前のパラメーター名

(1.5.0 以前)

デフォルト値

必須

説明

read.mode

bulk_read

auto

いいえ

読み取りモード。有効な値:

  • auto (デフォルト): Hologres コネクタは、Hologres インスタンスのバージョンと宛先テーブルのメタデータに基づいて、最適なモードを自動的に選択します。次の選択ロジックが適用されます:

    1. 読み取るフィールドに JSONB 型が含まれている場合、select モードが使用されます。

    2. インスタンスのバージョンが 3.0.24 より新しい場合、bulk_read_compressed モードが使用されます。

    3. その他の場合、bulk_read モードが使用されます。

  • bulk_read: このモードは、COPY OUT を使用して arrow 形式でデータを読み取ります。このモードのパフォーマンスは、select モードの数倍です。このモードは、Hologres から JSONB 型のデータを読み取ることをサポートしていません。

  • bulk_read_compressed: このモードは、COPY OUT を使用して arrow 形式で圧縮データを読み取ります。非圧縮データを読み取る場合と比較して、このモードは約 45% の帯域幅を節約できます。

  • select: このモードは、一般的な SELECT ステートメントを使用してデータを読み取ります。

read.max_task_count

max_partition_count

80

いいえ

読み取る Hologres テーブルは複数のパーティションに分割されます。各パーティションは Spark タスクに対応します。Hologres テーブルのシャード数がこのパラメーターの値より小さい場合、パーティションの数は最大でシャード数になります。

read.copy.max_buffer_size

/

52428800 (50 MB)

いいえ

COPY モードでデータを読み取る際のローカルバッファーの最大サイズ。大きなフィールドサイズが原因で例外が発生した場合は、この値を増やしてください。

read.push_down_predicate

push_down_predicate

true

いいえ

クエリ中に適用されるフィルター条件のプッシュダウンなど、述語プッシュダウンを実行するかどうかを指定します。一般的なフィルター条件のプッシュダウンと列プルーニングがサポートされています。

read.push_down_limit

push_down_limit

true

いいえ

Limit プッシュダウンを実行するかどうかを指定します。

read.select.batch_size

scan_batch_size

256

いいえ

このパラメーターは、read_modeselect に設定されている場合に有効になります。このパラメーターは、Hologres からデータを読み取る際に、1 回のスキャン操作でフェッチする行数を指定します。

read.select.timeout_seconds

scan_timeout_seconds

60

いいえ

このパラメーターは、read_modeselect に設定されている場合に有効になります。このパラメーターは、Hologres からデータを読み取る際のスキャン操作のタイムアウト期間を指定します。

read.query

query

なし

いいえ

Hologres からデータを読み取るために使用される querytable パラメーターと read.query パラメーターのいずれか一方のみを指定できます。

説明
  • query パラメーターを使用する場合、単一のタスクのみを使用してデータを読み取ることができます。述語プッシュダウンはサポートされていません。

  • table パラメーターを使用する場合、Hologres テーブルのシャード数に基づいて、複数のタスクが同時にデータを読み取るために使用されます。

データ型マッピング

Spark 型

Hologres 型

ShortType

SMALLINT

IntegerType

INT

LongType

BIGINT

StringType

TEXT

StringType

JSON

StringType

JSONB

DecimalType

NUMERIC(38, 18)

BooleanType

BOOL

DoubleType

DOUBLE PRECISION

FloatType

FLOAT

TimestampType

TIMESTAMPTZ

DateType

DATE

BinaryType

BYTEA

BinaryType

ROARINGBITMAP

ArrayType(IntegerType)

INT4[]

ArrayType(LongType)

INT8[]

ArrayType(FloatType)

FLOAT4[]

ArrayType(DoubleType)

FLOAT8[]

ArrayType(BooleanType)

BOOLEAN[]

ArrayType(StringType)

TEXT[]

接続数の計算

Hologres connector for Spark は、読み取りおよび書き込み操作に特定の数の JDBC 接続を使用します。接続数は、次の要因によって決まります:

  • Spark の並列処理。これは、ジョブの実行中に Spark UI に表示される同時タスクの数です。

  • コネクタの各同時タスクで使用される接続数:

    • COPY モードでデータを書き込む場合、各同時タスクは 1 つの JDBC 接続を使用します。

    • INSERT モードでデータを書き込む場合、各同時タスクが使用する JDBC 接続の数は、write_thread_size パラメーターの値と同じです。

    • データを読み取る場合、各同時タスクは 1 つの JDBC 接続を使用します。

  • 他の操作で使用される接続数。ジョブが開始されると、スキーマの取得などの操作が実行され、短時間接続が確立される場合があります。

ジョブが使用する JDBC 接続の総数は、次の式を使用して計算できます。

項目

使用される接続数

カタログを使用したメタデータのクエリ

1

データの読み取り

並列処理 × 1 + 1

COPY モードでのデータの書き込み

並列処理 × 1 + 1

INSERT モードでのデータの書き込み

並列処理 × write_thread_size + 1

上記の接続数の計算は、Spark が同時に実行できるタスクの数が、ジョブによって生成されるタスクの数よりも多いことを前提としています。

Spark が同時に実行できるタスクの数は、spark.executor.instances などのパラメーター設定の影響を受ける可能性があります。また、Hadoop のファイルブロック分割ポリシーの影響を受けることもあります。詳細については、「Apache Hadoop」をご参照ください。