すべてのプロダクト
Search
ドキュメントセンター

Hologres:Spark を使用して Hologres の読み書きを行う

最終更新日:Feb 04, 2026

Spark は、大規模なデータ処理のための統合分析エンジンです。Hologres は、コミュニティ版の Spark および EMR Spark と緊密に統合されており、データウェアハウスを迅速に構築するのに役立ちます。Hologres が提供する Spark コネクタは、Spark クラスターでの Hologres カタログの作成をサポートし、外部テーブルを使用した高性能なバッチ読み取りとインポートを可能にします。このアプローチは、ネイティブの Java Database Connectivity (JDBC) よりも優れたパフォーマンスを提供します。

制限事項

バージョン 1.3 以降の Hologres インスタンスのみが Spark コネクタをサポートします。現在のインスタンスバージョンは、Hologres 管理コンソールInstance Details ページで確認できます。お使いのインスタンスのバージョンが 1.3 より前の場合、インスタンスのスペックアップ を使用するか、DingTalk グループ番号 32314975 を検索して Hologres 通信グループに参加し、スペックアップをリクエストしてください。

事前準備

  • `spark-sql`、`spark-shell`、または `pyspark` コマンドを実行できる互換性のある Spark 環境をインストールします。依存関係の問題を回避し、より多くの機能にアクセスするために、Spark 3.3.0 以降を使用してください。

    • Alibaba Cloud EMR Spark を使用して Spark 環境を迅速に構築し、Hologres インスタンスに接続します。詳細については、「EMR Spark の特徴」をご参照ください。

    • また、お好みの設定で Spark 環境を構築することもできます。詳細については、「Apache Spark」をご参照ください。

  • Spark を使用して Hologres の読み書きを行うには、コネクタの JAR パッケージ hologres-connector-spark-3.x を参照します。このトピックでは、Spark コネクタのバージョン 1.5.2 を使用します。Maven Central Repository からダウンロードできます。すべてのコネクタリソースはオープンソースです。詳細については、「Hologres-Connectors」をご参照ください。

  • Java で Spark ジョブを開発し、IntelliJ IDEA などのツールでローカルでデバッグする場合は、pom.xml ファイルに次の Maven 依存関係を追加します。

    <dependency>
        <groupId>com.alibaba.hologres</groupId>
        <artifactId>hologres-connector-spark-3.x</artifactId>
        <version>1.5.2</version>
        <classifier>jar-with-dependencies</classifier>
    </dependency>

Hologres カタログ

バージョン 1.5.2 以降、Spark コネクタは Hologres カタログをサポートしており、外部テーブルを使用して Hologres の読み書きを便利に行うことができます。

Spark では、各 Hologres カタログは Hologres のデータベースに対応し、Hologres カタログ内の各名前空間はそのデータベースのスキーマに対応します。次のセクションでは、Spark で Hologres カタログを使用する方法を示します。

説明

Hologres カタログはテーブルの作成をサポートしていません。

このトピックでは、次のデータベース名とテーブル名を持つ Hologres インスタンスを使用します。

test_db --データベース
  public.test_table1 --public スキーマ内のテーブル
  public.test_table2
  test_schema.test_table3  -- test_schema スキーマ内のテーブル

Hologres カタログの初期化

Spark クラスターで spark-sql を起動し、Hologres コネクタをロードして、カタログパラメーターを指定します。

spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
--conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \
--conf spark.sql.catalog.hologres_external_test_db.username=*** \
--conf spark.sql.catalog.hologres_external_test_db.password=*** \
--conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db

Hologres カタログの共通コマンド

  • Hologres カタログのロード

    Spark の Hologres カタログは Hologres データベースに正確に対応しており、使用中に変更することはできません。

    USE hologres_external_test_db;
  • すべての名前空間のクエリ

    Spark の名前空間は Hologres のスキーマに対応します。デフォルトのスキーマは `public` です。USE 命令を使用してデフォルトのスキーマを変更できます。

    -- Hologres カタログ内のすべての名前空間を表示します。これは Hologres 内のすべてのスキーマに対応します。
    SHOW NAMESPACES;
  • 名前空間内のテーブルのクエリ

    • すべてのテーブルのクエリ

      SHOW TABLES;
    • 特定の名前空間内のテーブルのクエリ

      USE test_schema;
      SHOW TABLES;
      
      -- または
      SHOW TABLES IN test_schema;
  • テーブルの読み書き

    SELECT 文と INSERT 文を使用して、カタログ内の Hologres 外部テーブルの読み書きを行います。

    -- テーブルから読み取ります。
    SELECT * FROM public.test_table1;
    
    -- テーブルに書き込みます。
    INSERT INTO test_schema.test_table3 SELECT * FROM public.test_table1;

Hologres へのデータのインポート

このセクションでは、TPC-H データセットの customer テーブルを Hologres のソースデータとして使用します。Spark は Hologres テーブルのデータを CSV 形式で読み取ることができます。customer データをダウンロードします。customer テーブルスキーマを作成するための SQL 文は次のとおりです。

CREATE TABLE customer_holo_table
(
  c_custkey    BIGINT ,
  c_name       TEXT   ,
  c_address    TEXT   ,
  c_nationkey  INT    ,
  c_phone      TEXT   ,
  c_acctbal    DECIMAL(15,2) ,
  c_mktsegment TEXT   ,
  c_comment    TEXT
);

Spark-SQL を使用したデータのインポート

Spark-SQL を使用する場合、カタログを使用して Hologres テーブルのメタデータをロードする方が便利です。一時テーブルを作成して Hologres テーブルを宣言することもできます。

説明
  • 1.5.2 より前のバージョンの Hologres-Connector-Spark はカタログをサポートしていません。一時テーブルを作成して Hologres テーブルを宣言する必要があります。

  • Hologres-Connector-Spark のパラメーターの詳細については、「パラメーターの説明」をご参照ください。

  1. Hologres カタログを初期化します。

    spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
    --conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \
    --conf spark.sql.catalog.hologres_external_test_db.username=*** \
    --conf spark.sql.catalog.hologres_external_test_db.password=*** \
    --conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db
  2. ソース CSV テーブルから Hologres 外部テーブルにデータをインポートします。

    説明

    Spark の INSERT INTO 構文は、column_list を使用して書き込む列の一部リストを指定することはサポートしていません。たとえば、INSERT INTO hologresTable(c_custkey) SELECT c_custkey FROM csvTable を使用して `c_custkey` フィールドのみを書き込むことはできません。

    特定のフィールドのみを書き込みたい場合は、CREATE TEMPORARY VIEW を使用して、それらのフィールドのみを持つ Hologres 一時テーブルを宣言します。

    CATALOG を使用した書き込み

    -- Hologres カタログをロードします。
    USE hologres_external_test_db;
    
    -- CSV データソースを作成します。
    CREATE TEMPORARY VIEW csvTable (
        c_custkey BIGINT,
        c_name STRING,
        c_address STRING,
        c_nationkey INT,
        c_phone STRING,
        c_acctbal DECIMAL(15, 2),
        c_mktsegment STRING,
        c_comment STRING)
    USING csv OPTIONS (
        path "resources/customer", sep "," -- ローカルテストの場合は、ファイルの絶対パスを使用します。
    );
    
    -- CSV テーブルから Hologres にデータを書き込みます。
    INSERT INTO public.customer_holo_table SELECT * FROM csvTable;

    TEMPORARY VIEW を使用した書き込み

    -- CSV データソースを作成します。
    CREATE TEMPORARY VIEW csvTable (
        c_custkey BIGINT,
        c_name STRING,
        c_address STRING,
        c_nationkey INT,
        c_phone STRING,
        c_acctbal DECIMAL(15, 2),
        c_mktsegment STRING,
        c_comment STRING)
    USING csv OPTIONS (
        path "resources/customer", sep ","
    );
    
    -- Hologres 一時テーブルを作成します。
    CREATE TEMPORARY VIEW hologresTable (
        c_custkey BIGINT,
        c_name STRING,
        c_phone STRING)
    USING hologres OPTIONS (
        jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db",
        username "***", 
        password "***", 
        table "customer_holo_table"
    );
    
    INSERT INTO hologresTable SELECT c_custkey,c_name,c_phone FROM csvTable;

DataFrame を使用したデータのインポート

spark-shell、pyspark、またはその他のツールを使用して Spark ジョブを開発する場合、DataFrame の `write` インターフェイスを呼び出してデータを書き込むこともできます。異なる開発言語は、CSV ファイルから読み取ったデータを DataFrame に変換し、それを Hologres インスタンスに書き込みます。以下はサンプルコードです。Hologres-Connector-Spark のパラメーターの詳細については、「パラメーターの説明」をご参照ください。

Scala

import org.apache.spark.sql.types._
import org.apache.spark.sql.SaveMode

// CSV ソースのスキーマ。
val schema = StructType(Array(
  StructField("c_custkey", LongType),
  StructField("c_name", StringType),
  StructField("c_address", StringType),
  StructField("c_nationkey", IntegerType),
  StructField("c_phone", StringType),
  StructField("c_acctbal", DecimalType(15, 2)),
  StructField("c_mktsegment", StringType),
  StructField("c_comment", StringType)
))

// CSV ファイルから DataFrame にデータを読み込みます。
val csvDf = spark.read.format("csv").schema(schema).option("sep", ",").load("resources/customer")

// DataFrame を Hologres に書き込みます。
csvDf.write
.format("hologres")
.option("username", "***")
.option("password", "***")
.option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
.option("table", "customer_holo_table")
.mode(SaveMode.Append)
.save()

Java

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.SaveMode;
import java.util.Arrays;
import java.util.List;

public class SparkTest {
    public static void main(String[] args) {
        // CSV ソースのスキーマ。
        List<StructField> asList =
                Arrays.asList(
                        DataTypes.createStructField("c_custkey", DataTypes.LongType, true),
                        DataTypes.createStructField("c_name", DataTypes.StringType, true),
                        DataTypes.createStructField("c_address", DataTypes.StringType, true),
                        DataTypes.createStructField("c_nationkey", DataTypes.IntegerType, true),
                        DataTypes.createStructField("c_phone", DataTypes.StringType, true),
                        DataTypes.createStructField("c_acctbal", new DecimalType(15, 2), true),
                        DataTypes.createStructField("c_mktsegment", DataTypes.StringType, true),
                        DataTypes.createStructField("c_comment", DataTypes.StringType, true));
        StructType schema = DataTypes.createStructType(asList);

        // ローカルモードで実行します。
        SparkSession spark = SparkSession.builder()
                .appName("Spark CSV Example")
                .master("local[*]") 
                .getOrCreate();

        // CSV ファイルから DataFrame にデータを読み込みます。
        // ローカルテストの場合は、customer データの絶対パスを使用します。
        Dataset<Row> csvDf = spark.read().format("csv").schema(schema).option("sep", ",").load("resources/customer");

        // DataFrame を Hologres に書き込みます。
        csvDf.write.format("hologres").option(
           "username", "***").option(
           "password", "***").option(
           "jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
           "table", "customer_holo_table").mode(
           "append").save()
    }
}

Maven ファイルには次の構成が必要です。

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.13</artifactId>
      <version>3.5.4</version>
      <scope>provided</scope>
</dependency>

Python

from pyspark.sql.types import *

# CSV ソースのスキーマ。
schema = StructType([
    StructField("c_custkey", LongType()),
    StructField("c_name", StringType()),
    StructField("c_address", StringType()),
    StructField("c_nationkey", IntegerType()),
    StructField("c_phone", StringType()),
    StructField("c_acctbal", DecimalType(15, 2)),
    StructField("c_mktsegment", StringType()),
    StructField("c_comment", StringType())
])

# CSV ファイルから DataFrame にデータを読み込みます。
csvDf = spark.read.csv("resources/customer", header=False, schema=schema, sep=',')

# DataFrame を Hologres に書き込みます。
csvDf.write.format("hologres").option(
    "username", "***").option(
    "password", "***").option(
    "jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
    "table", "customer_holo_table").mode(
    "append").save()

異なる言語で Spark ジョブを実行するには、次の操作を実行します。

  • Scala

    • サンプルコードを使用して sparktest.scala ファイルを生成し、次のようにジョブを実行できます。

      -- 依存関係をロードします。
      spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar
      
      -- ローカルテストの場合は、絶対パスを使用してファイルをロードします。
      scala> :load D:/sparktest.scala
    • 依存関係をロードした後にサンプルコードを直接貼り付けて実行することもできます。

  • Java

    サンプルコードを開発ツールにインポートし、Maven を使用してパッケージ化できます。たとえば、パッケージ名を spark_test.jar とすることができます。次のコードでジョブを実行します。

    -- ジョブの JAR パッケージには絶対パスを使用します。
    spark-submit --class SparkTest  --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar  D:\spark_test.jar
  • Python

    次のコードを実行した後、サンプルコードを直接貼り付けて実行できます。

    pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar

Hologres からのデータの読み取り

  • spark-connector バージョン 1.3.2 以降、Hologres からデータを読み取ることができます。Spark のデフォルトの jdbc-connector と比較して、spark-connector は Hologres テーブルのシャードに基づいてデータを並行して読み取ることができるため、パフォーマンスが向上します。読み取りの並列度はテーブル内のシャード数に依存します。spark-connectorread.max_task_count パラメーターによって制限できます。ジョブは最終的に Min(shardCount, max_task_count) 個の読み取りタスクを生成します。また、スキーマ推論もサポートしています。スキーマを提供しない場合、コネクタは Hologres テーブルスキーマから Spark 側のスキーマを推論します。

  • spark-connector バージョン 1.5.0 以降、Hologres テーブルからの読み取りは、述語プッシュダウン、LIMIT プッシュダウン、およびカラムプルーニングをサポートします。また、Hologres の SELECT QUERY を渡してデータを読み取ることもサポートしています。このバージョンではバッチモードの読み取りが導入され、以前のバージョンと比較して読み取りパフォーマンスが 3〜4 倍向上します。

Spark-SQL を使用したデータの読み取り

Spark-SQL を使用する場合、カタログを使用して Hologres テーブルのメタデータをロードする方が便利です。一時テーブルを作成して Hologres テーブルを宣言することもできます。

説明
  • 1.5.2 より前のバージョンの Hologres-Connector-Spark はカタログをサポートしていません。一時テーブルを作成して Hologres テーブルを宣言する必要があります。

  • Hologres-Connector-Spark のパラメーターの詳細については、「パラメーターの説明」をご参照ください。

  1. Hologres カタログを初期化します。

    spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
    --conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \
    --conf spark.sql.catalog.hologres_external_test_db.username=*** \
    --conf spark.sql.catalog.hologres_external_test_db.password=*** \
    --conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db
  2. Hologres からデータを読み取ります。

    • カタログを使用してデータを読み取ります。

      -- Hologres カタログをロードします。
      USE hologres_external_test_db;
      
      -- Hologres テーブルから読み取ります。カラムプルーニングと述語プッシュダウンがサポートされています。
      SELECT c_custkey,c_name,c_phone FROM public.customer_holo_table WHERE c_custkey < 500 LIMIT 10;
    • 一時テーブルを作成してデータを読み取ります。

      CREATE TEMPORARY VIEW(table)

      CREATE TEMPORARY VIEW hologresTable
      USING hologres OPTIONS (
        jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db",
        username "***", 
        password "***", 
        read.max_task_count "80", // Hologres テーブルの読み取りに使用するタスクの最大数。
        table "customer_holo_table"
      );
      
      -- カラムプルーニングと述語プッシュダウンがサポートされています。
      SELECT c_custkey,c_name,c_phone FROM hologresTable WHERE c_custkey < 500 LIMIT 10;

      CREATE TEMPORARY VIEW(query)

      CREATE TEMPORARY VIEW hologresTable
      USING hologres OPTIONS (
        jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db",
        username "***", 
        password "***", 
        read.query "select c_custkey,c_name,c_phone from customer_holo_table where c_custkey < 500 limit 10"
      );
      
      SELECT * FROM hologresTable LIMIT 5;

Hologres データを DataFrame に読み込む

spark-shell、pyspark、またはその他のツールを使用して Spark ジョブを開発する場合、Spark の Read インターフェイスを呼び出してデータを DataFrame に読み込み、後続の計算を行うことができます。以下は、異なる言語で Hologres テーブルを DataFrame に読み込む例です。Hologres-Connector-Spark のパラメーターの詳細については、「パラメーターの説明」をご参照ください。

Scala

val readDf = (
  spark.read
    .format("hologres")
    .option("username", "***")
    .option("password", "***")
    .option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
    .option("table", "customer_holo_table")
    .option("read.max_task_count", "80") // Hologres テーブルの読み取りに使用するタスクの最大数。
    .load()
    .filter("c_custkey < 500")
)

readDf.select("c_custkey", "c_name", "c_phone").show(10)

Java

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class SparkSelect {
    public static void main(String[] args) {
        
        // ローカルモードで実行します。
        SparkSession spark = SparkSession.builder()
                .appName("Spark CSV Example")
                .master("local[*]") 
                .getOrCreate();
                
        Dataset<Row> readDf = (
           spark.read
                .format("hologres")
                .option("username", "***")
                .option("password", "***")
                .option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
                .option("table", "customer_holo_table")
                .option("read.max_task_count", "80") // Hologres テーブルの読み取りに使用するタスクの最大数。
                .load()
                .filter("c_custkey < 500")
        );
        readDf.select("c_custkey", "c_name", "c_phone").show(10);
    }
}

Maven ファイルには次の構成が必要です。

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.13</artifactId>
      <version>3.5.4</version>
      <scope>provided</scope>
</dependency>

Python

readDf = spark.read.format("hologres").option(
"username", "***").option(
"password", "***").option(
"jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
"table", "customer_holo_table").option(
"read.max_task_count", "80").load()

readDf.select("c_custkey", "c_name", "c_phone").show(10)

異なる言語で Spark ジョブを実行するには、次の操作を実行します。

  • Scala

    • サンプルコードを使用して sparkselect.scala ファイルを生成し、次のようにジョブを実行できます。

      -- 依存関係をロードします。
      spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar
      
      -- ローカルテストの場合は、絶対パスを使用してファイルをロードします。
      scala> :load D:/sparkselect.scala
    • 依存関係をロードした後にサンプルコードを直接貼り付けて実行することもできます。

  • Java

    サンプルコードを開発ツールにインポートし、Maven を使用してパッケージ化できます。たとえば、パッケージ名を spark_select.jar とすることができます。次のコードでジョブを実行します。

    -- ジョブの JAR パッケージには絶対パスを使用します。
    spark-submit --class SparkSelect --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar  D:\spark_select.jar
  • Python

    次のコードを実行した後、サンプルコードを直接貼り付けて実行できます。

    pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar

パラメーターの説明

一般パラメーター

パラメーター

デフォルト値

必須

説明

username

なし

はい

  • アカウントの AccessKey ID。取得するには、AccessKey 管理に移動します。

  • 作成したユーザーアカウントの名前。ユーザーを作成した後、データベースに対する権限をユーザーに付与する必要があります。詳細については、「Hologres 権限モデル」および「HoloWeb に接続してクエリを実行する」を参照して権限を確認してください。

password

なし

はい

  • アカウントの AccessKey Secret。詳細については、「AccessKey の作成」をご参照ください。

  • 作成したユーザーアカウントのパスワード。

table

なし

はい

読み書きする Hologres テーブルの名前。

説明

データを読み取る際、代わりに read.query パラメーターを使用することもできます。

jdbcurl

なし

はい

Hologres リアルタイムデータ API の JDBC URL。フォーマットは jdbc:postgresql://<host>:<port>/<db_name> です。Hologres コンソールに移動し、左側のナビゲーションウィンドウで Instances をクリックし、対象のインスタンスを選択して、Instance Details ページの Network Information セクションからホストとポート番号を取得します。

enable_serverless_computing

false

いいえ

サーバーレスリソースを使用するかどうかを指定します。このパラメーターは、読み取り操作と bulk_load モードでの書き込み操作にのみ有効です。詳細については、「サーバーレスコンピューティングガイド」をご参照ください。

serverless_computing_query_priority

3

いいえ

サーバーレスコンピューティングの実行優先度。

statement_timeout_seconds

28800 (8 時間)

いいえ

クエリ実行のタイムアウト期間 (秒単位)。

retry_count

3

いいえ

接続が失敗した場合のリトライ回数。

direct_connect

直接接続をサポートする環境では、デフォルトで直接接続が使用されます。

いいえ

バッチデータの読み書きのボトルネックは、多くの場合、エンドポイントのネットワークスループットです。そのため、システムは現在の環境が Hologres フロントエンド (アクセスノード) に直接接続できるかどうかをテストします。直接接続がサポートされている場合、デフォルトで使用されます。このパラメーターを false に設定すると、直接接続が無効になります。

書き込みパラメーター

Hologres コネクタは Spark の SaveMode パラメーターをサポートしています。SQL の場合、これは INSERT INTO または INSERT OVERWRITE を意味します。DataFrame の場合、これは書き込み時に SaveMode を Append または Overwrite に設定することを意味します。Overwrite は書き込み用の一時テーブルを作成し、書き込みが成功した後に元のテーブルを置き換えます。注意して使用してください。

パラメーター名

以前のパラメーター名

デフォルト値

必須

説明

write.mode

copy_write_mode

auto

いいえ

書き込みモード。有効な値は次のとおりです。書き込みモードの比較については、「バッチ書き込みモードの比較」をご参照ください。

  • auto (デフォルト)。コネクタは、バージョンと宛先テーブルのメタデータに基づいて最適なモードを自動的に選択します。選択ロジックは次のとおりです。

    1. Hologres インスタンスのバージョンが V2.2.25 以降で、テーブルにプライマリキーがある場合、bulk_load_on_conflict モードが選択されます。

    2. Hologres インスタンスのバージョンが V2.1.0 以降で、テーブルにプライマリキーがない場合、bulk_load モードが選択されます。

    3. Hologres インスタンスのバージョンが V1.3 以降の場合、stream モードが選択されます。

    4. その他の場合、insert モードが選択されます。

  • stream。これは SQL 実行を高速化するための固定プラン を使用します。固定プランでは、COPY は Hologres V1.3 で導入された機能です。INSERT メソッドと比較して、COPY メソッドはストリーミングモードを使用するためスループットが高く、データ遅延が低く、バッチを蓄積しないためクライアントのメモリ消費量が少なくなります。

    説明

    Hologres Connector 1.3.0 以降および Hologres V1.3.34 以降が必要です。

  • bulk_load。これはバッチ COPY です。固定プランのストリーミング COPY と比較して、バッチ COPY は同じ 1 秒あたりのレコード数 (RPS) 条件下で Hologres インスタンスへの負荷が低くなります。ただし、プライマリキーのないテーブルへの書き込みのみをサポートします。

    説明

    Hologres Connector 1.4.2 以降および Hologres V2.1.0 以降が必要です。

  • bulk_load_on_conflict。バッチ COPY を使用してプライマリキーを持つテーブルに書き込む場合、このモードはプライマリキーの競合を処理します。デフォルトでは、プライマリキーを持つ Hologres テーブルへのバッチデータインポートはテーブルロックをトリガーし、複数の接続が同時に書き込む能力を制限します。コネクタは、宛先 Hologres テーブルの分散キーに基づいてデータを再配布することをサポートします。これにより、各 Spark タスクは 1 つのシャードにのみデータを書き込むことができ、ロックレベルがテーブルからシャードに減少します。これにより、同時書き込みが可能になり、書き込みパフォーマンスが向上します。各接続は少数のシャードのデータのみを維持する必要があるため、この最適化は小規模ファイルの数を大幅に削減し、Hologres のメモリ使用量を低減することもできます。テストによると、同時書き込みの前にデータを再パーティション化すると、ストリームモードでの書き込みと比較してシステム負荷が約 67% 削減されることが示されています。

    説明

    Hologres Connector 1.4.2 以降および Hologres V2.2.25 以降が必要です。

  • insert。INSERT メソッドを使用してデータを書き込みます。

write.copy.max_buffer_size

max_cell_buffer_size

52428800 (50 MB)

いいえ

COPY モードで書き込む場合、通常、ローカルバッファーの最大長を調整する必要はありません。ただし、非常に長い文字列などの大きなフィールドを書き込むときにバッファーオーバーフローが発生した場合は、この値を増やすことができます。

write.copy.dirty_data_check

copy_write_dirty_data_check

false

いいえ

ダーティデータの検証を実行するかどうかを指定します。この機能を有効にすると、ダーティデータが見つかった場合に書き込みに失敗した特定の行を特定できます。ただし、これは書き込みパフォーマンスに影響します。トラブルシューティングの場合を除き、この機能を有効にしないでください。

write.on_conflict_action

write_mode

INSERT_OR_REPLACE

いいえ

プライマリキーを持つテーブルに挿入するときのポリシー:

  • INSERT_OR_IGNORE:プライマリキーの競合が発生した場合、データは書き込まれません。

  • INSERT_OR_UPDATE:プライマリキーの競合が発生した場合、対応する列が更新されます。

  • INSERT_OR_REPLACE:プライマリキーの競合が発生した場合、すべての列が更新されます。

次のパラメーターは、write.modeinsert に設定されている場合にのみ有効です。

パラメーター名

以前のパラメーター名

デフォルト値

必須

説明

write.insert.dynamic_partition

dynamic_partition

false

いいえ

このパラメーターは、copy_write_modeinsert に設定されている場合にのみ有効です。true に設定すると、パーティションテーブルの親テーブルにデータを書き込むときに、存在しないパーティションが自動的に作成されます。

write.insert.batch_size

write_batch_size

512

いいえ

各書き込みスレッドの最大バッチサイズ。WriteMode によってマージされた後の Put 操作の数がこの値に達すると、バッチがコミットされます。

write.insert.batch_byte_size

write_batch_byte_size

2097152 (2 * 1024 * 1024)

いいえ

各書き込みスレッドの最大バッチサイズ (バイト単位)。デフォルト値は 2 MB です。WriteMode によってマージされた後の Put データのバイトサイズがこの値に達すると、バッチがコミットされます。

write.insert.max_interval_ms

write_max_interval_ms

10000

いいえ

最後のコミットからの時間がこの値を超えると、バッチがコミットされます。

write.insert.thread_size

write_thread_size

1

いいえ

同時書き込みスレッドの数。各同時スレッドは 1 つのデータベース接続を占有します。

読み取りパラメーター

パラメーター名

以前のパラメーター名

(バージョン 1.5.0 以前)

デフォルト値

必須

説明

read.mode

bulk_read

auto

いいえ

読み取りモード。有効な値:

  • auto (デフォルト)。Hologres コネクタは、バージョンと宛先テーブルのメタデータに基づいて最適なモードを自動的に選択します。選択ロジックは次のとおりです。

    1. 読み取るフィールドに JSONB 型が含まれている場合、`select` モードが選択されます。

    2. インスタンスのバージョンが 3.0.24 以降の場合、`bulk_read_compressed` モードが選択されます。

    3. その他の場合、`bulk_read` モードが選択されます。

  • bulk_read。COPY OUT を使用して arrow 形式でデータを読み取ります。パフォーマンスは select モードの数倍です。Hologres の JSONB 型の読み取りはサポートされていません。

  • bulk_read_compressed。COPY OUT を使用して圧縮された arrow 形式のデータを読み取ります。これにより、非圧縮データと比較して約 45% の帯域幅を節約できます。

  • select。標準の SELECT メソッドを使用してデータを読み取ります。

read.max_task_count

max_partition_count

80

いいえ

読み取る Hologres テーブルを複数のパーティションに分割します。各パーティションは 1 つの Spark タスクに対応します。Hologres テーブルの `ShardCount` がこのパラメーターより小さい場合、パーティションの数は最大で `ShardCount` になります。

read.copy.max_buffer_size

/

52428800 (50 MB)

いいえ

COPY モードで読み取る場合、これはローカルバッファーの最大長です。大きなフィールドで例外が発生した場合は、この値を増やしてください。

read.push_down_predicate

push_down_predicate

true

いいえ

述語プッシュダウンを有効にするかどうかを指定します。たとえば、クエリ中にフィルター条件を適用します。この機能は、一般的なフィルター条件のプッシュダウンとカラムプルーニングをサポートします。

read.push_down_limit

push_down_limit

true

いいえ

LIMIT プッシュダウンを有効にするかどうかを指定します。

read.select.batch_size

scan_batch_size

256

いいえ

このパラメーターは、read_modeselect に設定されている場合にのみ有効です。Hologres から読み取る際のスキャン操作で一度にフェッチする行数。

read.select.timeout_seconds

scan_timeout_seconds

60

いいえ

このパラメーターは、read_modeselect に設定されている場合にのみ有効です。Hologres から読み取る際のスキャン操作のタイムアウト期間。

read.query

query

なし

いいえ

指定された query を使用して Hologres からデータを読み取ります。このパラメーターまたは table パラメーターのいずれかを設定できますが、両方を設定することはできません。

説明
  • query メソッドを使用してデータを読み取る場合、読み取りには単一のタスクしか使用できません。述語プッシュダウンはサポートされていません。

  • table メソッドを使用してデータを読み取る場合、読み取り操作は Hologres テーブルの `ShardCount` に基づいて複数のタスクに分割され、並行して読み取られます。

データ型マッピング

Spark 型

Hologres 型

ShortType

SMALLINT

IntegerType

INT

LongType

BIGINT

StringType

TEXT

StringType

JSON

StringType

JSONB

DecimalType

NUMERIC(38, 18)

BooleanType

BOOL

DoubleType

DOUBLE PRECISION

FloatType

FLOAT

TimestampType

TIMESTAMPTZ

DateType

DATE

BinaryType

BYTEA

BinaryType

ROARINGBITMAP

ArrayType(IntegerType)

INT4[]

ArrayType(LongType)

INT8[]

ArrayType(FloatType)

FLOAT4[]

ArrayType(DoubleType)

FLOAT8[]

ArrayType(BooleanType)

BOOLEAN[]

ArrayType(StringType)

TEXT[]

接続数の計算

Hologres-Connector-Spark がデータを読み書きする際、一定数の JDBC 接続を使用します。この数は次の要因に影響される可能性があります。

  • Spark の並列度。これは同時に実行されるタスクの数で、ジョブ実行中に Spark UI で確認できます。

  • 各同時タスクでコネクタが使用する接続数:

    • COPY モードでの書き込みの場合、各同時タスクは 1 つの JDBC 接続のみを使用します。

    • INSERT モードでの書き込みの場合、各同時タスクは write_thread_size 個の JDBC 接続を使用します。

    • 読み取りの場合、各同時タスクは 1 つの JDBC 接続を使用します。

  • その他の側面で使用される接続:ジョブが開始されると、スキーマの取得などの操作が実行され、一時的に接続が確立される場合があります。

したがって、ジョブが使用する合計接続数は、次の数式を使用して計算できます。

作業項目

使用される接続数

カタログからメタデータをクエリ

1

データの読み取り

並列度 × 1 + 1

COPY モードでの書き込み

並列度 × 1 + 1

INSERT モードでの書き込み

並列度 × write_thread_size + 1

上記の接続数計算は、Spark が同時に実行できるタスクの数が、ジョブによって生成されるタスクの数よりも多いことを前提としています。

Spark が実行できる同時タスクの数は、spark.executor.instances などのユーザー設定パラメーターに影響される可能性があり、また Hadoop のファイル分割ポリシーにも影響される可能性があります。詳細については、「Apache Hadoop」をご参照ください。