すべてのプロダクト
Search
ドキュメントセンター

:Spark を使用してデータをインポートする

最終更新日:Jan 15, 2025

このトピックでは、Spark を使用して複数のタイプのソースからデータを読み取り、Hologres に書き込む方法について説明します。

背景情報

Spark は、大量のデータを一元的に処理する分析エンジンです。 Hologres は、Apache Spark と E-MapReduce(EMR)Spark の両方に統合されており、データ ウェアハウスを効率的に構築できます。 Hologres は、バッチ モードで Hologres にデータを書き込むために使用できる Spark コネクタを提供します。 Spark を使用して、ファイル、Hive テーブル、MySQL テーブル、PostgreSQL テーブルなど、複数のタイプのソースからデータを読み取ることができます。

Hologres は PostgreSQL と互換性があります。 PostgreSQL に基づいて Spark を使用して Hologres データを読み取ることができます。 その後、データの抽出、変換、ロード(ETL)を行い、処理済みデータを Hologres または他の宛先に書き戻すことができます。

前提条件

  • Hologres インスタンスのバージョンが V0.9 以降であること。 Hologres インスタンスのバージョンは、Hologres コンソールのインスタンス詳細ページで確認できます。 Hologres インスタンスのバージョンが V0.9 より前の場合は、Hologres コンソールで Hologres インスタンスを手動でアップグレードするか、テクニカル サポートを受けるために Hologres の DingTalk グループに参加してください。 Hologres コンソールで Hologres インスタンスを手動でアップグレードする方法の詳細については、インスタンスのアップグレード をご参照ください。テクニカル サポートを受ける方法の詳細については、Hologres のオンライン サポートを受ける をご参照ください。

  • Hologres でサポートされているバージョンの Spark 環境がインストールされていること。 これにより、Spark 環境で spark-shell コマンドを実行できます。

接続の使用

Hologres の Spark コネクタを使用してデータの読み取りまたは書き込みを行う場合、Java Database Connectivity(JDBC)接続が使用されます。 使用される接続数は、以下の項目の影響を受けます。

  • 並列 Spark タスクの数: ジョブの実行中に Spark UI で数を確認できます。

  • データ書き込みモード: Spark コネクタを使用して固定コピー モードでデータを書き込む場合、各並列タスクは 1 つの JDBC 接続を使用します。 INSERT ステートメントを使用してデータを書き込む場合、各並列タスクで使用される JDBC 接続の数は write_thread_size の値と同じです。 データ読み取りの各並列タスクは 1 つの JDBC 接続を使用します。

  • その他: ジョブの開始時に、スキーマ情報を取得する必要がある場合があります。 この場合、JDBC 接続が短時間使用される可能性があります。

ジョブで使用される JDBC 接続の総数は、次の式を使用して計算できます。

  • 固定コピー モード: Parallelism × 1 + 1

  • INSERT ステートメント: Parallelism × write_thread_size + 1

説明

Spark タスクの並列処理は、手動パラメータ設定と Hadoop のファイル ブロック ポリシーの影響を受けます。

(推奨) Spark コネクタを使用して Hologres にデータを書き込む

Hologres の組み込み Spark コネクタを使用して Hologres にデータを書き込むことをお勧めします。 Spark コネクタは、Holo Client と一緒に使用されます。 他の書き込み方法と比較して、Spark コネクタはより優れた書き込みパフォーマンスを提供します。 Spark コネクタを使用してデータを書き込むには、次の手順を実行します。 サンプル コードの詳細については、このトピックの Spark コネクタを使用して Hologres にデータを書き込む例 セクションをご参照ください。

準備

  1. JAR パッケージを取得します。

    Spark コネクタは Spark 2 と Spark 3 で使用できます。 Spark コネクタを使用して Hologres にデータを書き込む場合は、Spark コネクタの JAR パッケージを参照する必要があります。 JAR パッケージは既に Maven 中央リポジトリに公開されています。 設定については、次の pom.xml ファイルを参照してください。

    説明

    関連コネクタもオープンソースです。 詳細については、alibabacloud-hologres-connectors ページをご覧ください。

    <dependency>
        <groupId>com.alibaba.hologres</groupId>
        <artifactId>hologres-connector-spark-3.x</artifactId>
        <version>1.4.0</version>
        <classifier>jar-with-dependencies</classifier>
    </dependency>

    次のリンクをクリックして、Hologres が提供する JAR パッケージをダウンロードすることもできます。

  2. JAR パッケージを使用します。

    次のコマンドを実行して Spark を起動し、コネクタを読み込みます。

    spark-shell --jars hologres-connector-spark-3.x-1.4.0-SNAPSHOT-jar-with-dependencies.jar

    次のコマンドを実行して PySpark を起動し、コネクタを読み込むこともできます。

    pyspark --jars hologres-connector-spark-3.x-1.4.0-SNAPSHOT-jar-with-dependencies.jar

Spark コネクタを使用して Hologres にデータを書き込む例

次の例は、Spark コネクタを使用して Hologres にデータを書き込む方法を示しています。

  1. Hologres にテーブルを作成します。

    Hologres で次の SQL ステートメントを実行して、データを書き込むテーブルを作成します。

    CREATE TABLE tb008 (
      id BIGINT primary key,
      counts INT,
      name TEXT,
      price NUMERIC(38, 18),
      out_of_stock BOOL,
      weight DOUBLE PRECISION,
      thick FLOAT,
      time TIMESTAMPTZ,
      dt DATE, 
      by bytea,
      inta int4[],
      longa int8[],
      floata float4[],
      doublea float8[],
      boola boolean[],
      stringa text[]
    );
  2. Spark でデータを準備し、Hologres に書き込みます。

    1. CLI で次のコマンドを実行して、Spark コネクタを有効にします。

      spark-shell --jars hologres-connector-spark-3.x-1.4.0-SNAPSHOT-jar-with-dependencies.jar
    2. load spark-test.scalaspark-shell で コマンドを実行して、サンプル データを読み込みます。

      spark-test.scala ファイルには、次のデータが含まれています。

      import java.sql.{Timestamp, Date}
      import org.apache.spark.sql.types._
      import org.apache.spark.sql.Row
      
      val byteArray = Array(1.toByte, 2.toByte, 3.toByte, 'b'.toByte, 'a'.toByte)
      val intArray = Array(1, 2, 3)
      val longArray = Array(1L, 2L, 3L)
      val floatArray = Array(1.2F, 2.44F, 3.77F)
      val doubleArray = Array(1.222, 2.333, 3.444)
      val booleanArray = Array(true, false, false)
      val stringArray = Array("abcd", "bcde", "defg")
      
      val data = Seq(
        Row(-7L, 100, "phone1", BigDecimal(1234.567891234), false, 199.35, 6.7F, Timestamp.valueOf("2021-01-01 00:00:00"), Date.valueOf("2021-01-01"), byteArray, intArray, longArray, floatArray, doubleArray, booleanArray, stringArray),
        Row(6L, -10, "phone2", BigDecimal(1234.56), true, 188.45, 7.8F, Timestamp.valueOf("2021-01-01 00:00:00"), Date.valueOf("1970-01-01"), byteArray, intArray, longArray, floatArray, doubleArray, booleanArray, stringArray),
        Row(1L, 10, "phone3\"", BigDecimal(1234.56), true, 111.45, null, Timestamp.valueOf("2020-02-29 00:12:33"), Date.valueOf("2020-07-23"), byteArray, intArray, longArray, floatArray, doubleArray, booleanArray, stringArray)
      )
      
      
      val schema = StructType(Array(
        StructField("id", LongType),
        StructField("counts", IntegerType),
        StructField("name", StringType, false), // false は、このフィールドの値がテーブルで null にできないことを示します。
        StructField("price", DecimalType(38, 12)),
        StructField("out_of_stock", BooleanType),
        StructField("weight", DoubleType),
        StructField("thick", FloatType),
        StructField("time", TimestampType),
        StructField("dt", DateType),
        StructField("by", BinaryType),
        StructField("inta", ArrayType(IntegerType)),
        StructField("longa", ArrayType(LongType)),
        StructField("floata", ArrayType(FloatType)),
        StructField("doublea", ArrayType(DoubleType)),
        StructField("boola", ArrayType(BooleanType)),
        StructField("stringa", ArrayType(StringType))
      ))
      
      
      val df = spark.createDataFrame(
        spark.sparkContext.parallelize(data),
        schema
      )
      df.show()
      
      // サンプル データを Hologres に書き込むには、次のパラメータを設定します。
      df.write.format("hologres") // hologres に設定します。
        .option("username", "your_username") // Alibaba Cloud アカウントの AccessKey ID。
        .option("password", "your_password") // Alibaba Cloud アカウントの AccessKey シークレット。
        .option("endpoint", "Ip:Port") // Hologres インスタンスの IP アドレスとポート番号。
        .option("database", "test_database") // Hologres データベースの名前。 この例では test_database です。
        .option("table", "tb008") // データを書き込む Hologres テーブルの名前。 この例ではテーブル名は tb008 です。
        .option("write_batch_size", 512) // バッチで許可される書き込みリクエストの最大数。 詳細については、「Spark を使用してリアルタイムで Hologres にデータを書き込む」セクションのパラメータの説明をご参照ください。
        .option("input_data_schema_ddl", df.schema.toDDL) // DataFrame のデータ定義言語(DDL)ステートメント。 このパラメータは Spark 3.X の場合にのみ必要です。
        .mode(SaveMode.Append) // Spark DataFrameWriter インターフェースの保存モード。 値は Append である必要があります。 このパラメータは WRITE_MODE パラメータとは異なります。 hologres-connector 1.3.3 以降では、OverWrite 値がサポートされています。 このパラメータを OverWrite に設定すると、ソース テーブルのデータがクリアされます。 この値を使用する場合は注意してください。
        .save()
  3. 宛先テーブルのデータをクエリします。

    Hologres コンソールで宛先テーブルのデータをクエリして、書き込まれたデータを確認できます。 次の図は例を示しています。测试示例数据

PySpark コネクタを使用して Hologres にデータを書き込む例

  1. PySpark を起動し、PySpark コネクタを読み込みます。

    pyspark --jars hologres-connector-spark-3.x-1.4.0-SNAPSHOT-jar-with-dependencies.jar

  2. メタデータを使用して DataFrame オブジェクトを作成し、コネクタを呼び出して Hologres にデータを書き込みます。 操作は、Spark コネクタを使用する場合の操作に似ています。

    data = [[1, "Elia"], [2, "Teo"], [3, "Fang"]]
    df = spark.createDataFrame(data, schema="id LONG, name STRING")
    df.show()
    
    df2.write.format("hologres").option(
      "username", "your_username").option(
      "password", "your_password").option(
      "endpoint", "hologres_endpoint").option(
      "database", "test_database").option(
      "table", "tb008").save()
    

Spark SQL を使用して Hologres へのデータ書き込み用コネクタを読み込む

説明

このモードは、Spark 3 用の Spark コネクタのみがサポートしています。

  1. Spark SQL を起動し、Spark コネクタを読み込みます。

    spark-sql --jars hologres-connector-spark-3.x-1.4.0-SNAPSHOT-jar-with-dependencies.jar
  2. 次の Spark SQL DDL ステートメントを実行して、CSV ビューと Hologres ビューを作成し、それらにデータを書き込みます。

    CREATE TEMPORARY VIEW csvTable (
      c_custkey bigint,
      c_name string,
      c_address string,
      c_nationkey int,
      c_phone string,
      c_acctbal decimal(15, 2),
      c_mktsegment string,
      c_comment string)
    USING csv OPTIONS (
      path "resources/customer1.tbl", sep "|"
    );
    
    CREATE TEMPORARY VIEW hologresTable (
      c_custkey bigint,
      c_name string,
      c_address string,
      c_nationkey int,
      c_phone string,
      c_acctbal decimal(15, 2),
      c_mktsegment string,
      c_comment string)
    USING hologres OPTIONS (
      jdbcurl "jdbc:postgresql://hologres_endpoint/test_database",
      username "your_username", 
      password "your_password", 
      table "customer_holo_table", 
      copy_write_mode "true", 
      bulk_load "true", 
      copy_write_format "text"
    );
    
    -- SQL ステートメントを使用して作成された Hologres ビューの特定の列にデータを書き込むことはできません。 たとえば、insert into hologresTable(c_custkey) select c_custkey from csvTable ステートメントを実行することはできません。 データを書き込むときは、DDL ステートメントで宣言されているすべての列にデータを書き込む必要があります。 特定の列にデータを書き込む場合は、テーブルを作成するときにこれらの列のみを宣言できます。
    INSERT INTO hologresTable SELECT * FROM csvTable;

Spark を使用して特定のタイプのソースからデータを読み取り、Hologres に書き込む

  1. 特定のタイプのソースからデータを読み取ります。

    Spark を使用して、さまざまなタイプのソースからデータを読み取ることができます。 次の例は、Hologres または別のタイプのソースからデータを読み取る方法を示しています。

    • Hologres からデータを読み取る

      Hologres は PostgreSQL と互換性があります。 PostgreSQL JDBC ドライバに基づいて Spark を使用して Hologres データを読み取ることができます。 次のサンプル コードは参照用です。

      説明

      Hologres データを読み取る前に、公式 Web サイトで PostgreSQL JDBC JAR パッケージをダウンロードしてください。 この例では、postgresql-42.2.18 が使用されています。 次に、spark-shell で ./bin/spark-shell --jars /path/to/postgresql-42.2.18.jar コマンドを実行して、PostgreSQL JDBC JAR パッケージを読み込みます。 Hologres の Spark コネクタの JAR パッケージと一緒に PostgreSQL JDBC JAR パッケージを読み込むこともできます。

      // 例えば tb008 などのテーブルから読み取る
      val readDf = spark.read
        .format("jdbc") // PostgreSQL JDBC ドライバに基づいて Hologres データを読み取ります。
        .option("driver","org.postgresql.Driver")
        .option("url", "jdbc:postgresql://Ip:Por/test_database")
        .option("dbtable", "tb008")
        .option("user", "your_username")
        .option("password", "your_password")
        .load()

      Spark connector V1.3.2 以降では、Hologres からデータを読み取り、並列データ読み取りのパフォーマンスを最適化できます。 PostgreSQL JDBC ドライバと比較して、Spark コネクタではデータ読み取りの並列処理を設定でき、Hologres でのデータシャーディングをサポートして並列データ読み取りを実装できます。 これにより、パフォーマンスが大幅に向上します。 サンプルコード:

      val spark = SparkSession
      .builder
      .appName("ReadFromHologres")
      .master("local[*]")
      .getOrCreate()
      
      spark.sparkContext.setLogLevel("WARN")
      
      import spark.implicits._
      
      val schema = StructType(Array(
        StructField("id", LongType),
        StructField("counts", IntegerType),
        StructField("name", StringType, false),
        StructField("price", DecimalType(38, 12)),
        StructField("out_of_stock", BooleanType)
      ))
      
      val readDf = spark.read
      .format("hologres")
      .schema(schema) // オプション。 スキーマを指定しない場合、デフォルトでは Hologres テーブルのすべてのフィールドが読み取られます。
      .option("username", "your_username")
      .option("password", "your_password")
      .option("jdbcurl", "jdbc:postgresql://hologres_endpoint/test_db")
      .option("table", "tb008")
      .option("scan_parallelism", "10") // Hologres からデータを読み取るためのデフォルトの並列処理。 最大値は、Hologres テーブルのシャード数です。
      .load()

    • Parquet ファイルなどの別のタイプのソースからデータを読み取る

      Spark を使用して、Parquet ファイルや Hive テーブルなど、他のタイプのソースからデータを読み取ることができます。 次の例は、サンプル コードを提供します。

      import org.apache.spark.{SparkConf, SparkContext}
      import org.apache.spark.sql.hive.HiveContext
      
      val sparkConf = new SparkConf()
      val sc = new SparkContext(sparkConf)
      val hiveContext = new HiveContext(sc)
      
      // 例えば phone などのテーブルから読み取る
      val readDf = hiveContext.sql("select * from hive_database.phone")
  2. データを Hologres に書き込みます。

    import com.alibaba.hologres.spark2.sink.SourceProvider
    
    -- hologres テーブルに書き込む
    df.write
      .format("hologres")
      .option(SourceProvider.USERNAME, "your_username")
      .option(SourceProvider.PASSWORD, "your_password")
      .option(SourceProvider.ENDPOINT, "Ip:Port")
      .option(SourceProvider.DATABASE, "test_database")
      .option(SourceProvider.TABLE, table)
      .option(SourceProvider.WRITE_BATCH_SIZE, 512) -- バッチで許可されるリクエストの最大数。
      .option(SourceProvider.INPUT_DATA_SCHEMA_DDL, df.schema.toDDL) -- このパラメータは Spark 3.X の場合にのみ必要です。
      .mode(SaveMode.Append) // このパラメータは Spark 3.X の場合にのみ必要です。
      .save()

Spark を使用してリアルタイムで Hologres にデータを書き込む

  1. 次のステートメントを実行して、Hologres にデータを書き込むテーブルを作成します。

    CREATE TABLE test_table_stream
    (
        value text,
        count bigint
    );
  2. オンプレミス マシンからデータを読み取ります。 単語頻度の統計を収集し、統計をリアルタイムで Hologres に書き込みます。 次の例は、サンプル コードを提供します。

    • コード

       val spark = SparkSession
            .builder
            .appName("StreamToHologres")
            .master("local[*]")
            .getOrCreate()
      
          spark.sparkContext.setLogLevel("WARN")
          import spark.implicits._
      
          val lines = spark.readStream
            .format("socket")
            .option("host", "localhost")
            .option("port", 9999)
            .load()
      
          -- 行を単語に分割する
          val words = lines.as[String].flatMap(_.split(" "))
      
          -- 実行中の単語数を生成する
          val wordCounts = words.groupBy("value").count()
      
          wordCounts.writeStream
              .outputMode(OutputMode.Complete())
              .format("hologres")
              .option(SourceProvider.USERNAME, "your_username")
              .option(SourceProvider.PASSWORD, "your_password")
              .option(SourceProvider.JDBCURL, "jdbc:postgresql://Ip:Port/dbname")
              .option(SourceProvider.TABLE, "test_table_stream")
              .option("batchsize", 1)
              .option("isolationLevel", "NONE")
              .option("checkpointLocation", checkpointLocation)
              .start()
              .awaitTermination()
    • パラメータ

      パラメータ

      デフォルト値

      必須

      説明

      username

      デフォルト値なし

      はい

      Alibaba Cloud アカウントの AccessKey ID。セキュリティ管理 ページで AccessKey ID を取得できます。

      環境変数を設定し、環境変数から AccessKey ID と AccessKey シークレットを取得することをお勧めします。 これにより、リークのリスクを軽減できます。

      password

      デフォルト値なし

      はい

      Alibaba Cloud アカウントの AccessKey シークレット。セキュリティ管理

      環境変数を設定し、環境変数から AccessKey ID と AccessKey シークレットを取得することをお勧めします。 これにより、リークのリスクを軽減できます。

      table

      デフォルト値なし

      はい

      データを書き込む Hologres テーブルの名前。

      endpoint

      デフォルト値なし

      このパラメータまたは JDBCURL パラメータを設定します。

      Hologres インスタンスのエンドポイント。

      Hologres コンソール のインスタンス詳細ページの [ネットワーク情報] セクションで、Hologres インスタンスのエンドポイントを取得できます。

      database

      デフォルト値なし

      このパラメータまたは JDBCURL パラメータを設定します。

      宛先テーブルが存在する Hologres データベースの名前。

      jdbcurl

      デフォルト値なし

      このパラメータのみ、または ENDPOINT パラメータと DATABASE パラメータを設定します。

      Hologres インスタンスの JDBC URL

      copy_write_mode

      true

      いいえ

      固定コピー モードでデータを書き込むかどうかを指定します。 固定コピーは、Hologres V1.3 でサポートされている新機能です。 固定コピー モードでは、データはバッチではなくストリーミング モードで書き込まれます。 したがって、固定コピー モードでのデータ書き込みは、INSERT ステートメントを使用したデータ書き込みよりもスループットが高く、データ レイテンシが低く、クライアント メモリ リソースの消費が少なくなります。

      説明

      固定コピー モードを使用するには、コネクタのバージョンが V1.3.0 以降であり、Hologres エンジンのバージョンが V1.3.34 以降である必要があります。

      copy_write_format

      false

      いいえ

      不正なデータを確認するかどうかを指定します。 このパラメータは、COPY_WRITE_MODE が true に設定されている場合にのみ有効になります。 このパラメータを true に設定し、不正なデータが生成された場合、システムは書き込みに失敗した行を特定できます。

      説明

      不正なデータの確認は、書き込みパフォーマンスに悪影響を及ぼします。 トラブルシューティング プロセスでのみ、このパラメータを true に設定することをお勧めします。

      bulk_load

      true

      いいえ

      バッチ コピー モードで Hologres にデータを書き込むかどうかを指定します。 固定コピー モードでは、データはストリーミング モードで書き込まれます。

      説明
      • Hologres V2.1 では、プライマリ キーのないテーブルへのデータ書き込みのパフォーマンスが最適化されています。 Hologres V2.1 では、バッチ モードでプライマリ キーのないテーブルにデータを書き込む操作は、テーブル ロックを取得するのではなく、行ロックを取得します。 これにより、バッチ モードでのデータ書き込みを、固定プランを使用して実行される操作と並行して実行できます。 これにより、データ処理の効率と並列処理が向上します。

      • コネクタのバージョンが V1.4.0 以降の場合は、Hologres エンジンのバージョンが V2.1.0 以降である必要があります。

      max_cell_buffer_size

      20971520 (20 MB)

      いいえ

      COPY_WRITE_MODE が true に設定されている場合のフィールドの最大長。

      copy_write_dirty_data_check

      false

      いいえ

      不正なデータを確認するかどうかを指定します。 このパラメータを true に設定し、不正なデータが生成された場合、システムは書き込みに失敗した行を特定できます。 不正なデータの確認は、書き込みパフォーマンスに悪影響を及ぼします。 トラブルシューティング プロセスでのみ、このパラメータを true に設定することをお勧めします。

      説明

      このパラメータは、COPY_WRITE_MODE が true に設定されている場合にのみ有効になります。

      copy_write_direct_connect

      直接接続が許可されているシナリオでは true

      いいえ

      このパラメータは、COPY_WRITE_MODE が true に設定されている場合にのみ有効になります。 コピー モードで書き込むことができるデータ量は、VPC エンドポイントのスループットに基づいて決定されます。 システムは、コピー モードでデータが書き込まれるときに、環境が Hologres FE ノードに直接接続できるかどうかを確認します。 環境が Hologres FE ノードに直接接続できる場合は、デフォルトで直接接続が使用されます。 このパラメータを false に設定すると、直接接続は使用されません。

      input_data_schema_ddl

      デフォルト値なし

      このパラメータは Spark 3.X で必要です。 <your_DataFrame>.schema.toDDL 形式でパラメータ値を設定します。

      Spark の DataFrame の DDL ステートメント。

      write_mode

      INSERT_OR_REPLACE

      いいえ

      プライマリ キーの競合を処理するために使用されるポリシー。 宛先テーブルにプライマリ キーがある場合は、このパラメータが必要です。 有効な値:

      • INSERT_OR_IGNORE: プライマリ キーの競合が発生した場合、書き込もうとしているデータを破棄します。

      • INSERT_OR_UPDATE: プライマリ キーの競合が発生した場合、宛先テーブルの関連列を更新します。

      • INSERT_OR_REPLACE: プライマリ キーの競合が発生した場合、宛先テーブルのすべての列を更新します。

      write_batch_size

      512

      いいえ

      データを書き込むスレッドのバッチで許可されるリクエストの最大数。 write_mode パラメータに基づいて競合が処理された後、PUT リクエストの総数が write_batch_size パラメータで指定された上限に達すると、データはバッチで送信されます。

      write_batch_byte_size

      2 MB

      いいえ

      データを書き込むスレッドのバッチで許可されるバイトの最大数。 WRITE_MODE パラメータに基づいて競合が処理された後、PUT リクエストのバイトの総数が WRITE_BATCH_BYTE_SIZE パラメータで指定された上限に達すると、データはバッチで送信されます。

      write_max_interval_ms

      10000 ms

      いいえ

      データがバッチで送信される間隔。

      write_fail_strategy

      TYR_ONE_BY_ONE

      いいえ

      送信エラーを処理するために使用されるポリシー。 バッチを送信できない場合、Holo Client はバッチ内のデータエントリを指定された順序で 1 つずつ送信します。 データエントリを送信できない場合、Holo Client はデータエントリに関する情報を含むエラーメッセージを返します。

      write_thread_size

      1

      いいえ

      データの書き込みに使用される並列スレッドの数。 各スレッドは 1 つの接続を占有します。

      Spark ジョブによって占有される接続の総数は、Spark の並列処理によって異なります。 接続の総数は、次の式を使用して計算できます: 接続の総数 = spark.default.parallelism の値 × WRITE_THREAD_SIZE の値

      dynamic_partition

      false

      いいえ

      パーティションのない親テーブルにデータが書き込まれた場合に、パーティションを自動的に作成するかどうかを指定します。 有効な値: true および false。 true: パーティションのない親テーブルにデータが書き込まれた場合、パーティションが自動的に作成されます。

      retry_count

      3

      いいえ

      接続エラーが発生した場合に、データの書き込みとクエリに許可される再試行の最大回数。

      retry_sleep_init_ms

      1000 ms

      いいえ

      リクエストの再試行に費やされる時間は、次の式を使用して計算されます: retry_sleep_init_ms + retry_count × retry_sleep_step_ms

      retry_sleep_step_ms

      10*1000 ms

      いいえ

      リクエストの再試行に費やされる時間は、次の式を使用して計算されます: retry_sleep_init_ms + retry_count × retry_sleep_step_ms

      connection_max_idle_ms

      60000 ms

      いいえ

      データの読み取りと書き込みに使用される接続のアイドル タイムアウト期間。 接続が指定されたアイドル タイムアウト期間よりも長くアイドル状態のままである場合、Holo Client は接続を自動的に解放します。

      fixed_connection_mode

      false

      いいえ

      固定接続を使用するかどうかを指定します。 固定コピー以外のモード(INSERT モードなど)では、データの書き込みとポイント クエリは接続を占有しません。

      説明

      固定接続機能はベータ版であり、コネクタのバージョンが V1.2.0 以降、Hologres エンジンのバージョンが V1.3.0 以降の場合にのみ使用できます。

      scan_batch_size

      256

      いいえ

      Hologres からデータを読み取るときに毎回スキャンされる行数。

      scan_timeout_seconds

      60

      いいえ

      Hologres からデータを読み取るときのスキャン操作のタイムアウト期間。 単位: 秒。

      scan_parallelism

      10

      いいえ

      Hologres からデータを読み取るために使用されるシャードの数。 最大値は、Hologres テーブルのシャード数です。 ジョブの実行時、シャードはデータ読み取りのために Spark タスクに割り当てられます。

データ型マッピング

次の表は、Spark と Hologres 間のデータ型マッピングを示しています。

Spark データ型

Hologres データ型

ShortType

SMALLINT

IntegerType

INT

LongType

BIGINT

StringType

TEXT、JSONB、および JSON

DecimalType

NUMERIC(38, 18)

BooleanType

BOOL

DoubleType

DOUBLE PRECISION

FloatType

FLOAT

TimestampType

TIMESTAMPTZ

DateType

DATE

BinaryType

BYTEA および ROARINGBITMAP

ArrayType(IntegerType)

int4[]

ArrayType(LongType)

int8[]

ArrayType(FloatType)

float4[]

ArrayType(DoubleType)

float8[]

ArrayType(BooleanType)

boolean[]

ArrayType(StringType)

text[]