Spark は、大規模なデータ処理のための統合分析エンジンです。Hologres は、オープンソースの Apache Spark および E-MapReduce (EMR) Spark と統合されており、企業が迅速にデータウェアハウスを構築するのに役立ちます。Hologres が提供する Spark コネクタを使用すると、Spark クラスターに Hologres カタログを作成できます。これにより、外部テーブルを使用して Hologres のデータをバッチで読み書きできます。Spark コネクタは、ネイティブの Java Database Connectivity (JDBC) よりも優れたパフォーマンスを提供します。
制限事項
Spark コネクタは、V1.3 以降の Hologres インスタンスでのみサポートされています。インスタンスのバージョンは、Hologres コンソールの[インスタンス詳細] ページで確認できます。インスタンスのバージョンが V1.3 より前の場合は、インスタンスをスペックアップするか、Hologres DingTalk グループ (ID: 32314975) に参加してインスタンスのスペックアップをリクエストすることができます。
準備
spark-sql、spark-shell、または pyspark コマンドを実行できる Spark 環境をインストールします。依存関係の問題を回避し、より多くの機能にアクセスするために、Spark 3.3.0 以降を使用することをお勧めします。
EMR Spark を使用して、Spark 環境を迅速に構築し、Hologres インスタンスに接続できます。詳細については、「EMR における Spark の拡張機能」をご参照ください。
独立した Spark 環境をセットアップすることもできます。詳細については、「Apache Spark」をご参照ください。
Spark を使用して Hologres からデータを読み書きするには、
hologres-connector-spark-3.xコネクタ JAR パッケージが必要です。このトピックでは、バージョン 1.5.2 を例として使用します。パッケージは Maven Central Repository からダウンロードできます。コネクタのリソースはオープンソースです。詳細については、「Hologres-Connectors」をご参照ください。Java で Spark ジョブを開発し、IntelliJ IDEA などのツールでローカルデバッグを実行するには、次の Maven 依存関係を pom.xml ファイルに追加します。
<dependency> <groupId>com.alibaba.hologres</groupId> <artifactId>hologres-connector-spark-3.x</artifactId> <version>1.5.2</version> <classifier>jar-with-dependencies</classifier> </dependency>
Hologres カタログ
Hologres カタログは、Spark コネクタ 1.5.2 以降でサポートされています。外部テーブルを使用して、Hologres のデータを簡単に読み書きできます。
Spark の各 Hologres カタログは、Hologres のデータベースにマッピングされます。Hologres カタログの各名前空間は、マッピングされたデータベースのスキーマにマッピングされます。以下のセクションでは、Spark で Hologres カタログを使用する方法について説明します。
Hologres カタログはテーブル作成をサポートしていません。
このトピックでは、Hologres インスタンスで次のデータベースとテーブルを使用します:
test_db -- データベース
public.test_table1 -- public スキーマのテーブル
public.test_table2
test_schema.test_table3 -- test_schema スキーマのテーブル Hologres カタログを初期化する
Spark クラスターで spark-sql を起動し、Hologres コネクタをロードして、カタログパラメーターを指定します。
spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
--conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \
--conf spark.sql.catalog.hologres_external_test_db.username=*** \
--conf spark.sql.catalog.hologres_external_test_db.password=*** \
--conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db一般的な Hologres カタログコマンド
Hologres カタログのロード
Spark の Hologres カタログは Hologres データベースにマッピングされます。このマッピングはセッション中に変更できません。
USE hologres_external_test_db;すべての名前空間をクエリします。
Spark の 名前空間 は、Hologres の スキーマ にマッピングされます。デフォルトのスキーマは public です。
USEコマンドを使用して、デフォルトのスキーマを変更できます。-- Hologres カタログのすべての名前空間を表示します。これは Hologres データベースのすべてのスキーマに対応します。 SHOW NAMESPACES;名前空間内のテーブルをクエリします。
すべてのテーブルをクエリします。
SHOW TABLES;特定の名前空間内のテーブルをクエリします。
USE test_schema; SHOW TABLES; -- または、次のステートメントを使用します。 SHOW TABLES IN test_schema;
テーブルからのデータの読み取りと書き込み
SELECT および INSERT ステートメントを使用して、カタログ内の Hologres 外部テーブルからデータを読み書きします。
-- テーブルからデータを読み取ります。 SELECT * FROM public.test_table1; -- テーブルにデータを書き込みます。 INSERT INTO test_schema.test_table3 SELECT * FROM public.test_table1;
Hologres にデータをインポートする
このセクションでは、TPC-H データセットの customer テーブルのテストデータを使用します。サンプルの customer データを CSV フォーマットでダウンロードできます。次の SQL 文は、customer テーブルスキーマを作成します。
CREATE TABLE customer_holo_table
(
c_custkey BIGINT ,
c_name TEXT ,
c_address TEXT ,
c_nationkey INT ,
c_phone TEXT ,
c_acctbal DECIMAL(15,2) ,
c_mktsegment TEXT ,
c_comment TEXT
);Spark-SQL を使用したデータのインポート
Spark-SQL を使用する場合、カタログを使用して Hologres テーブルのメタデータをロードすると便利です。一時テーブルを作成して Hologres テーブルを宣言することもできます。
1.5.2 より前の Spark コネクタはカタログをサポートしていません。一時テーブルを作成して Hologres テーブルを宣言することしかできません。
Hologres-Connector-Spark のパラメーターの詳細については、「パラメーター」をご参照ください。
Hologres カタログを初期化します。
spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \ --conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \ --conf spark.sql.catalog.hologres_external_test_db.username=*** \ --conf spark.sql.catalog.hologres_external_test_db.password=*** \ --conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_dbCSV ソーステーブルから Hologres 外部テーブルにデータをインポートします。
説明Spark の INSERT INTO 構文は、
column_listを使用して書き込み対象の一部の列を指定することをサポートしていません。たとえば、INSERT INTO hologresTable(c_custkey) SELECT c_custkey FROM csvTableを使用して c_custkey フィールドにのみデータを書き込むことはできません。特定のフィールドにデータを書き込むには、
CREATE TEMPORARY VIEW文を使用して、必要なフィールドのみを含む Hologres の一時テーブルを宣言します。カタログを使用したデータの書き込み
-- Hologres カタログをロードします。 USE hologres_external_test_db; -- CSV データソースを作成します。 CREATE TEMPORARY VIEW csvTable ( c_custkey BIGINT, c_name STRING, c_address STRING, c_nationkey INT, c_phone STRING, c_acctbal DECIMAL(15, 2), c_mktsegment STRING, c_comment STRING) USING csv OPTIONS ( path "resources/customer", sep "," -- ローカルテストの場合は、ファイルの絶対パスを使用します。 ); -- CSV テーブルから Hologres にデータを書き込みます。 INSERT INTO public.customer_holo_table SELECT * FROM csvTable;一時ビューを使用したデータの書き込み
-- CSV データソースを作成します。 CREATE TEMPORARY VIEW csvTable ( c_custkey BIGINT, c_name STRING, c_address STRING, c_nationkey INT, c_phone STRING, c_acctbal DECIMAL(15, 2), c_mktsegment STRING, c_comment STRING) USING csv OPTIONS ( path "resources/customer", sep "," ); -- Hologres 一時テーブルを作成します。 CREATE TEMPORARY VIEW hologresTable ( c_custkey BIGINT, c_name STRING, c_phone STRING) USING hologres OPTIONS ( jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db", username "***", password "***", table "customer_holo_table" ); INSERT INTO hologresTable SELECT c_custkey,c_name,c_phone FROM csvTable;
DataFrame を使用したデータのインポート
spark-shell や pyspark などのツールを使用して Spark ジョブを開発し、DataFrame の `write` メソッドを呼び出してデータを書き込むことができます。CSV ファイルから読み取られたデータは DataFrame に変換され、Hologres インスタンスに書き込まれます。以下のセクションでは、さまざまなプログラミング言語のサンプルコードを提供します。Hologres Connector for Spark のパラメーターの詳細については、「パラメーター」をご参照ください。
Scala
import org.apache.spark.sql.types._
import org.apache.spark.sql.SaveMode
// CSV ソースのスキーマ。
val schema = StructType(Array(
StructField("c_custkey", LongType),
StructField("c_name", StringType),
StructField("c_address", StringType),
StructField("c_nationkey", IntegerType),
StructField("c_phone", StringType),
StructField("c_acctbal", DecimalType(15, 2)),
StructField("c_mktsegment", StringType),
StructField("c_comment", StringType)
))
// CSV ファイルからデータを DataFrame として読み取ります。
val csvDf = spark.read.format("csv").schema(schema).option("sep", ",").load("resources/customer")
// DataFrame を Hologres に書き込みます。
csvDf.write
.format("hologres")
.option("username", "***")
.option("password", "***")
.option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
.option("table", "customer_holo_table")
.mode(SaveMode.Append)
.save()Java
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.SaveMode;
import java.util.Arrays;
import java.util.List;
public class SparkTest {
public static void main(String[] args) {
// CSV ソースのスキーマ。
List<StructField> asList =
Arrays.asList(
DataTypes.createStructField("c_custkey", DataTypes.LongType, true),
DataTypes.createStructField("c_name", DataTypes.StringType, true),
DataTypes.createStructField("c_address", DataTypes.StringType, true),
DataTypes.createStructField("c_nationkey", DataTypes.IntegerType, true),
DataTypes.createStructField("c_phone", DataTypes.StringType, true),
DataTypes.createStructField("c_acctbal", new DecimalType(15, 2), true),
DataTypes.createStructField("c_mktsegment", DataTypes.StringType, true),
DataTypes.createStructField("c_comment", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(asList);
// ローカルモードで実行します。
SparkSession spark = SparkSession.builder()
.appName("Spark CSV Example")
.master("local[*]")
.getOrCreate();
// CSV ファイルからデータを DataFrame として読み取ります。
// ローカルテストの場合は、customer データの絶対パスを使用します。
Dataset<Row> csvDf = spark.read().format("csv").schema(schema).option("sep", ",").load("resources/customer");
// DataFrame を Hologres に書き込みます。
csvDf.write.format("hologres").option(
"username", "***").option(
"password", "***").option(
"jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
"table", "customer_holo_table").mode(
"append").save()
}
}Maven ファイルには次の構成が必要です。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.5.4</version>
<scope>provided</scope>
</dependency>Python
from pyspark.sql.types import *
# CSV ソースのスキーマ。
schema = StructType([
StructField("c_custkey", LongType()),
StructField("c_name", StringType()),
StructField("c_address", StringType()),
StructField("c_nationkey", IntegerType()),
StructField("c_phone", StringType()),
StructField("c_acctbal", DecimalType(15, 2)),
StructField("c_mktsegment", StringType()),
StructField("c_comment", StringType())
])
# CSV ファイルからデータを DataFrame として読み取ります。
csvDf = spark.read.csv("resources/customer", header=False, schema=schema, sep=',')
# DataFrame を Hologres に書き込みます。
csvDf.write.format("hologres").option(
"username", "***").option(
"password", "***").option(
"jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
"table", "customer_holo_table").mode(
"append").save()Spark を使用してさまざまなプログラミング言語でジョブを実行するには、次の手順を実行します:
Scala
サンプルコードを使用して sparktest.scala ファイルを生成し、次のコマンドを実行してジョブを実行できます。
-- 依存関係を読み込みます。 spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar -- ローカルテストの場合は、絶対パスを使用してファイルを読み込みます。 scala> :load D:/sparktest.scalaまたは、依存関係がロードされた後にサンプルコードを貼り付けて実行することもできます。
Java
開発ツールを使用してサンプルコードを追加し、Maven ツールを使用してパッケージ化できます。たとえば、パッケージ名が spark_test.jar の場合、次のコマンドを実行してジョブを実行します。
-- ジョブ JAR パッケージの絶対パスを使用します。 spark-submit --class SparkTest --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar D:\spark_test.jarPython
次のコマンドが実行された後、サンプルコードを貼り付けて実行できます。
pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar
Hologres からデータを読み取る
Spark コネクタ 1.3.2 以降では、Hologres からデータを読み取ることができます。 Spark のデフォルトの
jdbc-connectorと比較して、spark-connectorはテーブルのシャードに基づいて Hologres テーブルからデータを同時に読み取ることができます。 これにより、読み取りパフォーマンスが向上します。 読み取りの同時実行数は、テーブル内のシャード数によって決まります。read.max_task_countパラメーターを使用して、spark-connectorの読み取りの同時実行数を制限できます。 ジョブは、Min(shardCount, max_task_count)と等しい数の読み取りタスクを生成します。 スキーマ推論もサポートされています。 スキーマを指定しない場合、Spark スキーマは Hologres テーブルのスキーマから推論されます。Spark コネクタ 1.5.0 以降では、Hologres テーブルからデータを読み取る際に、述語プッシュダウン、LIMIT プッシュダウン、およびフィールドプルーニングがサポートされています。また、Hologres の
SELECT QUERYを使用してデータを読み取ることもできます。このバージョンではバッチ読み取りモードがサポートされており、以前のバージョンと比較して読み取りパフォーマンスが 3〜4 倍向上します。
Spark-SQL を使用したデータの読み取り
Spark-SQL を使用する場合、カタログを使用して Hologres テーブルのメタデータをロードすると便利です。一時テーブルを作成して Hologres テーブルを宣言することもできます。
1.5.2 より前の Spark コネクタはカタログをサポートしていません。一時テーブルを作成して Hologres テーブルを宣言することしかできません。
Hologres-Connector-Spark のパラメーターの詳細については、「パラメーター」をご参照ください。
Hologres カタログを初期化します。
spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \ --conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \ --conf spark.sql.catalog.hologres_external_test_db.username=*** \ --conf spark.sql.catalog.hologres_external_test_db.password=*** \ --conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_dbHologres データを読み取ります。
カタログを使用したデータの読み取り
-- Hologres カタログをロードします。 USE hologres_external_test_db; -- Hologres テーブルからデータを読み取ります。フィールドプルーニングと述語プッシュダウンがサポートされています。 SELECT c_custkey,c_name,c_phone FROM public.customer_holo_table WHERE c_custkey < 500 LIMIT 10;一時テーブルを作成してデータを読み取ります。
CREATE TEMPORARY VIEW (table)
CREATE TEMPORARY VIEW hologresTable USING hologres OPTIONS ( jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db", username "***", password "***", read.max_task_count "80", // Hologres テーブルを読み取り用に分割できるタスクの最大数。 table "customer_holo_table" ); -- フィールドプルーニングと述語プッシュダウンがサポートされています。 SELECT c_custkey,c_name,c_phone FROM hologresTable WHERE c_custkey < 500 LIMIT 10;CREATE TEMPORARY VIEW (query)
CREATE TEMPORARY VIEW hologresTable USING hologres OPTIONS ( jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db", username "***", password "***", read.query "select c_custkey,c_name,c_phone from customer_holo_table where c_custkey < 500 limit 10" ); SELECT * FROM hologresTable LIMIT 5;
Hologres データを DataFrame として読み取る
spark-shell や pyspark などのツールを使用して Spark ジョブを開発し、Spark の read メソッドを呼び出してデータを DataFrame として読み取り、後続の処理を行うことができます。以下のセクションでは、さまざまなプログラミング言語で Hologres テーブルからデータを DataFrame として読み取る方法を示すサンプルコードを提供します。Hologres-Connector-Spark のパラメーターの詳細については、「パラメーター」をご参照ください。
Scala
val readDf = (
spark.read
.format("hologres")
.option("username", "***")
.option("password", "***")
.option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
.option("table", "customer_holo_table")
.option("read.max_task_count", "80") // Hologres テーブルを読み取り用に分割できるタスクの最大数。
.load()
.filter("c_custkey < 500")
)
readDf.select("c_custkey", "c_name", "c_phone").show(10)Java
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class SparkSelect {
public static void main(String[] args) {
// ローカルモードで実行します。
SparkSession spark = SparkSession.builder()
.appName("Spark CSV Example")
.master("local[*]")
.getOrCreate();
Dataset<Row> readDf = (
spark.read
.format("hologres")
.option("username", "***")
.option("password", "***")
.option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
.option("table", "customer_holo_table")
.option("read.max_task_count", "80") // Hologres テーブルを読み取り用に分割できるタスクの最大数。
.load()
.filter("c_custkey < 500")
);
readDf.select("c_custkey", "c_name", "c_phone").show(10);
}
}Maven ファイルには次の構成が必要です。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.5.4</version>
<scope>provided</scope>
</dependency>Python
readDf = spark.read.format("hologres").option(
"username", "***").option(
"password", "***").option(
"jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
"table", "customer_holo_table").option(
"read.max_task_count", "80").load()
readDf.select("c_custkey", "c_name", "c_phone").show(10)Spark を使用してさまざまなプログラミング言語でジョブを実行するには、次の手順を実行します:
Scala
サンプルコードを使用して sparkselect.scala ファイルを生成し、次のコマンドを実行してジョブを実行できます。
-- 依存関係を読み込みます。 spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar -- ローカルテストの場合は、絶対パスを使用してファイルを読み込みます。 scala> :load D:/sparkselect.scalaまたは、依存関係がロードされた後にサンプルコードを貼り付けて実行することもできます。
Java
開発ツールを使用してサンプルコードを追加し、Maven ツールを使用してパッケージ化できます。たとえば、パッケージ名が spark_select.jar の場合、次のコマンドを実行してジョブを実行します。
-- ジョブ JAR パッケージの絶対パスを使用します。 spark-submit --class SparkSelect --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar D:\spark_select.jarPython
次のコマンドが実行された後、サンプルコードを貼り付けて実行できます。
pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar
パラメータ
一般パラメータ
パラメータ | デフォルト値 | 必須 | 説明 |
username | なし | はい |
|
password | なし | はい |
|
table | なし | はい | データの読み書きに使用する Hologres テーブルの名前。 説明 データを読み取る際に、このパラメーターを |
jdbcurl | なし | はい | Hologres リアルタイムデータ API の JDBC URL。 |
enable_serverless_computing | false | いいえ | サーバーレスコンピューティングリソースを使用するかどうかを指定します。このパラメーターは、データの読み取りと |
serverless_computing_query_priority | 3 | いいえ | サーバーレスコンピューティングの実行優先度。 |
statement_timeout_seconds | 28800 (8 時間) | いいえ | クエリのタイムアウト期間。単位: 秒。 |
retry_count | 3 | いいえ | 接続に失敗した場合の再試行回数。 |
direct_connect | ダイレクト接続をサポートする環境では、デフォルトでダイレクト接続が使用されます。 | いいえ | バッチデータの読み書きのボトルネックは、通常、エンドポイントのネットワークスループットです。そのため、システムは現在の環境が Hologres フロントエンド (アクセスノード) に直接接続できるかどうかをテストします。ダイレクト接続がサポートされている場合、デフォルトで使用されます。このパラメーターを |
書き込みパラメーター
Hologres コネクタは、Spark の SaveMode パラメーターをサポートしています。SQL の場合、これは INSERT INTO または INSERT OVERWRITE に対応します。DataFrame の場合、データを書き込むときに SaveMode を Append または Overwrite に設定できます。Overwrite モードを使用すると、データ書き込み用の一時テーブルが作成されます。書き込み操作が成功した後、一時テーブルが元のテーブルを置き換えます。Overwrite モードは必要な場合にのみ使用してください。
パラメーター名 | 以前のパラメーター名 | デフォルト値 | 必須 | 説明 |
write.mode | copy_write_mode | auto | いいえ | 書き込みモード。書き込みモードの違いの詳細については、「バッチ書き込みモードの比較」をご参照ください。有効な値:
|
write.copy.max_buffer_size | max_cell_buffer_size | 52428800 (50 MB) | いいえ | COPY モードでデータを書き込む際のローカルバッファーの最大サイズ。通常、この値を調整する必要はありません。ただし、非常に長い文字列などの大きなフィールドを書き込むときにバッファーオーバーフローが発生した場合は、この値を増やしてください。 |
write.copy.dirty_data_check | copy_write_dirty_data_check | false | いいえ | ダーティデータをチェックするかどうかを指定します。この機能を有効にすると、ダーティデータが見つかったときに書き込みに失敗した特定の行を正確に特定できます。ただし、これは書き込みパフォーマンスに影響します。したがって、トラブルシューティングの場合を除き、この機能を有効にしないでください。 |
write.on_conflict_action | write_mode | INSERT_OR_REPLACE | いいえ | INSERT ステートメントの宛先テーブルにプライマリキーがある場合に使用されるポリシー:
|
write.mode が insert に設定されている場合にのみ、次のパラメーターが有効になります。
パラメーター名 | 以前のパラメーター名 | デフォルト値 | 必須 | 説明 |
write.insert.dynamic_partition | dynamic_partition | false | いいえ | このパラメーターは、 |
write.insert.batch_size | write_batch_size | 512 | いいえ | 各書き込みスレッドの最大バッチサイズ。マージ後の Put 操作の数が WriteBatchSize の値に達すると、バッチコミットが実行されます。 |
write.insert.batch_byte_size | write_batch_byte_size | 2097152 (2 × 1024 × 1024) | いいえ | 各書き込みスレッドの最大バッチサイズ。単位: バイト。デフォルト値: 2 MB。マージ後の Put データのバイトサイズが WriteBatchByteSize の値に達すると、バッチコミットが実行されます。 |
write.insert.max_interval_ms | write_max_interval_ms | 10000 | いいえ | 最後のコミットからの経過時間が WriteMaxIntervalMs の値を超えると、バッチコミットがトリガーされます。 |
write.insert.thread_size | write_thread_size | 1 | いいえ | 同時書き込みスレッドの数。各同時スレッドは 1 つのデータベース接続を占有します。 |
読み取りパラメーター
パラメーター名 | 以前のパラメーター名 (1.5.0 以前) | デフォルト値 | 必須 | 説明 |
read.mode | bulk_read | auto | いいえ | 読み取りモード。有効な値:
|
read.max_task_count | max_partition_count | 80 | いいえ | 読み取る Hologres テーブルは複数のパーティションに分割されます。各パーティションは Spark タスクに対応します。Hologres テーブルのシャード数がこのパラメーターの値より小さい場合、パーティションの数は最大でシャード数になります。 |
read.copy.max_buffer_size | / | 52428800 (50 MB) | いいえ | COPY モードでデータを読み取る際のローカルバッファーの最大サイズ。大きなフィールドサイズが原因で例外が発生した場合は、この値を増やしてください。 |
read.push_down_predicate | push_down_predicate | true | いいえ | クエリ中に適用されるフィルター条件のプッシュダウンなど、述語プッシュダウンを実行するかどうかを指定します。一般的なフィルター条件のプッシュダウンと列プルーニングがサポートされています。 |
read.push_down_limit | push_down_limit | true | いいえ | Limit プッシュダウンを実行するかどうかを指定します。 |
read.select.batch_size | scan_batch_size | 256 | いいえ | このパラメーターは、 |
read.select.timeout_seconds | scan_timeout_seconds | 60 | いいえ | このパラメーターは、 |
read.query | query | なし | いいえ | Hologres からデータを読み取るために使用される 説明
|
データ型マッピング
Spark 型 | Hologres 型 |
ShortType | SMALLINT |
IntegerType | INT |
LongType | BIGINT |
StringType | TEXT |
StringType | JSON |
StringType | JSONB |
DecimalType | NUMERIC(38, 18) |
BooleanType | BOOL |
DoubleType | DOUBLE PRECISION |
FloatType | FLOAT |
TimestampType | TIMESTAMPTZ |
DateType | DATE |
BinaryType | BYTEA |
BinaryType | ROARINGBITMAP |
ArrayType(IntegerType) | INT4[] |
ArrayType(LongType) | INT8[] |
ArrayType(FloatType) | FLOAT4[] |
ArrayType(DoubleType) | FLOAT8[] |
ArrayType(BooleanType) | BOOLEAN[] |
ArrayType(StringType) | TEXT[] |
接続数の計算
Hologres connector for Spark は、読み取りおよび書き込み操作に特定の数の JDBC 接続を使用します。接続数は、次の要因によって決まります:
Spark の並列処理。これは、ジョブの実行中に Spark UI に表示される同時タスクの数です。
コネクタの各同時タスクで使用される接続数:
COPY モードでデータを書き込む場合、各同時タスクは 1 つの JDBC 接続を使用します。
INSERT モードでデータを書き込む場合、各同時タスクが使用する JDBC 接続の数は、
write_thread_sizeパラメーターの値と同じです。データを読み取る場合、各同時タスクは 1 つの JDBC 接続を使用します。
他の操作で使用される接続数。ジョブが開始されると、スキーマの取得などの操作が実行され、短時間接続が確立される場合があります。
ジョブが使用する JDBC 接続の総数は、次の式を使用して計算できます。
項目 | 使用される接続数 |
カタログを使用したメタデータのクエリ | 1 |
データの読み取り | 並列処理 × 1 + 1 |
COPY モードでのデータの書き込み | 並列処理 × 1 + 1 |
INSERT モードでのデータの書き込み | 並列処理 × write_thread_size + 1 |
上記の接続数の計算は、Spark が同時に実行できるタスクの数が、ジョブによって生成されるタスクの数よりも多いことを前提としています。
Spark が同時に実行できるタスクの数は、spark.executor.instances などのパラメーター設定の影響を受ける可能性があります。また、Hadoop のファイルブロック分割ポリシーの影響を受けることもあります。詳細については、「Apache Hadoop」をご参照ください。