Spark は、大規模なデータ処理のための統合分析エンジンです。Hologres は、コミュニティ版の Spark および EMR Spark と緊密に統合されており、データウェアハウスを迅速に構築するのに役立ちます。Hologres が提供する Spark コネクタは、Spark クラスターでの Hologres カタログの作成をサポートし、外部テーブルを使用した高性能なバッチ読み取りとインポートを可能にします。このアプローチは、ネイティブの Java Database Connectivity (JDBC) よりも優れたパフォーマンスを提供します。
制限事項
バージョン 1.3 以降の Hologres インスタンスのみが Spark コネクタをサポートします。現在のインスタンスバージョンは、Hologres 管理コンソール の Instance Details ページで確認できます。お使いのインスタンスのバージョンが 1.3 より前の場合、インスタンスのスペックアップ を使用するか、DingTalk グループ番号 32314975 を検索して Hologres 通信グループに参加し、スペックアップをリクエストしてください。
事前準備
-
`spark-sql`、`spark-shell`、または `pyspark` コマンドを実行できる互換性のある Spark 環境をインストールします。依存関係の問題を回避し、より多くの機能にアクセスするために、Spark 3.3.0 以降を使用してください。
-
Alibaba Cloud EMR Spark を使用して Spark 環境を迅速に構築し、Hologres インスタンスに接続します。詳細については、「EMR Spark の特徴」をご参照ください。
-
また、お好みの設定で Spark 環境を構築することもできます。詳細については、「Apache Spark」をご参照ください。
-
-
Spark を使用して Hologres の読み書きを行うには、コネクタの JAR パッケージ
hologres-connector-spark-3.xを参照します。このトピックでは、Spark コネクタのバージョン 1.5.2 を使用します。Maven Central Repository からダウンロードできます。すべてのコネクタリソースはオープンソースです。詳細については、「Hologres-Connectors」をご参照ください。 -
Java で Spark ジョブを開発し、IntelliJ IDEA などのツールでローカルでデバッグする場合は、pom.xml ファイルに次の Maven 依存関係を追加します。
<dependency> <groupId>com.alibaba.hologres</groupId> <artifactId>hologres-connector-spark-3.x</artifactId> <version>1.5.2</version> <classifier>jar-with-dependencies</classifier> </dependency>
Hologres カタログ
バージョン 1.5.2 以降、Spark コネクタは Hologres カタログをサポートしており、外部テーブルを使用して Hologres の読み書きを便利に行うことができます。
Spark では、各 Hologres カタログは Hologres のデータベースに対応し、Hologres カタログ内の各名前空間はそのデータベースのスキーマに対応します。次のセクションでは、Spark で Hologres カタログを使用する方法を示します。
Hologres カタログはテーブルの作成をサポートしていません。
このトピックでは、次のデータベース名とテーブル名を持つ Hologres インスタンスを使用します。
test_db --データベース
public.test_table1 --public スキーマ内のテーブル
public.test_table2
test_schema.test_table3 -- test_schema スキーマ内のテーブル
Hologres カタログの初期化
Spark クラスターで spark-sql を起動し、Hologres コネクタをロードして、カタログパラメーターを指定します。
spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
--conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \
--conf spark.sql.catalog.hologres_external_test_db.username=*** \
--conf spark.sql.catalog.hologres_external_test_db.password=*** \
--conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db
Hologres カタログの共通コマンド
-
Hologres カタログのロード
Spark の Hologres カタログは Hologres データベースに正確に対応しており、使用中に変更することはできません。
USE hologres_external_test_db; -
すべての名前空間のクエリ
Spark の名前空間は Hologres のスキーマに対応します。デフォルトのスキーマは `public` です。
USE命令を使用してデフォルトのスキーマを変更できます。-- Hologres カタログ内のすべての名前空間を表示します。これは Hologres 内のすべてのスキーマに対応します。 SHOW NAMESPACES; -
名前空間内のテーブルのクエリ
-
すべてのテーブルのクエリ
SHOW TABLES; -
特定の名前空間内のテーブルのクエリ
USE test_schema; SHOW TABLES; -- または SHOW TABLES IN test_schema;
-
-
テーブルの読み書き
SELECT 文と INSERT 文を使用して、カタログ内の Hologres 外部テーブルの読み書きを行います。
-- テーブルから読み取ります。 SELECT * FROM public.test_table1; -- テーブルに書き込みます。 INSERT INTO test_schema.test_table3 SELECT * FROM public.test_table1;
Hologres へのデータのインポート
このセクションでは、TPC-H データセットの customer テーブルを Hologres のソースデータとして使用します。Spark は Hologres テーブルのデータを CSV 形式で読み取ることができます。customer データをダウンロードします。customer テーブルスキーマを作成するための SQL 文は次のとおりです。
CREATE TABLE customer_holo_table
(
c_custkey BIGINT ,
c_name TEXT ,
c_address TEXT ,
c_nationkey INT ,
c_phone TEXT ,
c_acctbal DECIMAL(15,2) ,
c_mktsegment TEXT ,
c_comment TEXT
);
Spark-SQL を使用したデータのインポート
Spark-SQL を使用する場合、カタログを使用して Hologres テーブルのメタデータをロードする方が便利です。一時テーブルを作成して Hologres テーブルを宣言することもできます。
-
1.5.2 より前のバージョンの Hologres-Connector-Spark はカタログをサポートしていません。一時テーブルを作成して Hologres テーブルを宣言する必要があります。
-
Hologres-Connector-Spark のパラメーターの詳細については、「パラメーターの説明」をご参照ください。
-
Hologres カタログを初期化します。
spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \ --conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \ --conf spark.sql.catalog.hologres_external_test_db.username=*** \ --conf spark.sql.catalog.hologres_external_test_db.password=*** \ --conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db -
ソース CSV テーブルから Hologres 外部テーブルにデータをインポートします。
説明Spark の INSERT INTO 構文は、
column_listを使用して書き込む列の一部リストを指定することはサポートしていません。たとえば、INSERT INTO hologresTable(c_custkey) SELECT c_custkey FROM csvTableを使用して `c_custkey` フィールドのみを書き込むことはできません。特定のフィールドのみを書き込みたい場合は、
CREATE TEMPORARY VIEWを使用して、それらのフィールドのみを持つ Hologres 一時テーブルを宣言します。CATALOG を使用した書き込み
-- Hologres カタログをロードします。 USE hologres_external_test_db; -- CSV データソースを作成します。 CREATE TEMPORARY VIEW csvTable ( c_custkey BIGINT, c_name STRING, c_address STRING, c_nationkey INT, c_phone STRING, c_acctbal DECIMAL(15, 2), c_mktsegment STRING, c_comment STRING) USING csv OPTIONS ( path "resources/customer", sep "," -- ローカルテストの場合は、ファイルの絶対パスを使用します。 ); -- CSV テーブルから Hologres にデータを書き込みます。 INSERT INTO public.customer_holo_table SELECT * FROM csvTable;TEMPORARY VIEW を使用した書き込み
-- CSV データソースを作成します。 CREATE TEMPORARY VIEW csvTable ( c_custkey BIGINT, c_name STRING, c_address STRING, c_nationkey INT, c_phone STRING, c_acctbal DECIMAL(15, 2), c_mktsegment STRING, c_comment STRING) USING csv OPTIONS ( path "resources/customer", sep "," ); -- Hologres 一時テーブルを作成します。 CREATE TEMPORARY VIEW hologresTable ( c_custkey BIGINT, c_name STRING, c_phone STRING) USING hologres OPTIONS ( jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db", username "***", password "***", table "customer_holo_table" ); INSERT INTO hologresTable SELECT c_custkey,c_name,c_phone FROM csvTable;
DataFrame を使用したデータのインポート
spark-shell、pyspark、またはその他のツールを使用して Spark ジョブを開発する場合、DataFrame の `write` インターフェイスを呼び出してデータを書き込むこともできます。異なる開発言語は、CSV ファイルから読み取ったデータを DataFrame に変換し、それを Hologres インスタンスに書き込みます。以下はサンプルコードです。Hologres-Connector-Spark のパラメーターの詳細については、「パラメーターの説明」をご参照ください。
Scala
import org.apache.spark.sql.types._
import org.apache.spark.sql.SaveMode
// CSV ソースのスキーマ。
val schema = StructType(Array(
StructField("c_custkey", LongType),
StructField("c_name", StringType),
StructField("c_address", StringType),
StructField("c_nationkey", IntegerType),
StructField("c_phone", StringType),
StructField("c_acctbal", DecimalType(15, 2)),
StructField("c_mktsegment", StringType),
StructField("c_comment", StringType)
))
// CSV ファイルから DataFrame にデータを読み込みます。
val csvDf = spark.read.format("csv").schema(schema).option("sep", ",").load("resources/customer")
// DataFrame を Hologres に書き込みます。
csvDf.write
.format("hologres")
.option("username", "***")
.option("password", "***")
.option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
.option("table", "customer_holo_table")
.mode(SaveMode.Append)
.save()
Java
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.SaveMode;
import java.util.Arrays;
import java.util.List;
public class SparkTest {
public static void main(String[] args) {
// CSV ソースのスキーマ。
List<StructField> asList =
Arrays.asList(
DataTypes.createStructField("c_custkey", DataTypes.LongType, true),
DataTypes.createStructField("c_name", DataTypes.StringType, true),
DataTypes.createStructField("c_address", DataTypes.StringType, true),
DataTypes.createStructField("c_nationkey", DataTypes.IntegerType, true),
DataTypes.createStructField("c_phone", DataTypes.StringType, true),
DataTypes.createStructField("c_acctbal", new DecimalType(15, 2), true),
DataTypes.createStructField("c_mktsegment", DataTypes.StringType, true),
DataTypes.createStructField("c_comment", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(asList);
// ローカルモードで実行します。
SparkSession spark = SparkSession.builder()
.appName("Spark CSV Example")
.master("local[*]")
.getOrCreate();
// CSV ファイルから DataFrame にデータを読み込みます。
// ローカルテストの場合は、customer データの絶対パスを使用します。
Dataset<Row> csvDf = spark.read().format("csv").schema(schema).option("sep", ",").load("resources/customer");
// DataFrame を Hologres に書き込みます。
csvDf.write.format("hologres").option(
"username", "***").option(
"password", "***").option(
"jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
"table", "customer_holo_table").mode(
"append").save()
}
}
Maven ファイルには次の構成が必要です。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.5.4</version>
<scope>provided</scope>
</dependency>
Python
from pyspark.sql.types import *
# CSV ソースのスキーマ。
schema = StructType([
StructField("c_custkey", LongType()),
StructField("c_name", StringType()),
StructField("c_address", StringType()),
StructField("c_nationkey", IntegerType()),
StructField("c_phone", StringType()),
StructField("c_acctbal", DecimalType(15, 2)),
StructField("c_mktsegment", StringType()),
StructField("c_comment", StringType())
])
# CSV ファイルから DataFrame にデータを読み込みます。
csvDf = spark.read.csv("resources/customer", header=False, schema=schema, sep=',')
# DataFrame を Hologres に書き込みます。
csvDf.write.format("hologres").option(
"username", "***").option(
"password", "***").option(
"jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
"table", "customer_holo_table").mode(
"append").save()
異なる言語で Spark ジョブを実行するには、次の操作を実行します。
-
Scala
-
サンプルコードを使用して sparktest.scala ファイルを生成し、次のようにジョブを実行できます。
-- 依存関係をロードします。 spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar -- ローカルテストの場合は、絶対パスを使用してファイルをロードします。 scala> :load D:/sparktest.scala -
依存関係をロードした後にサンプルコードを直接貼り付けて実行することもできます。
-
-
Java
サンプルコードを開発ツールにインポートし、Maven を使用してパッケージ化できます。たとえば、パッケージ名を spark_test.jar とすることができます。次のコードでジョブを実行します。
-- ジョブの JAR パッケージには絶対パスを使用します。 spark-submit --class SparkTest --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar D:\spark_test.jar -
Python
次のコードを実行した後、サンプルコードを直接貼り付けて実行できます。
pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar
Hologres からのデータの読み取り
-
spark-connector バージョン 1.3.2 以降、Hologres からデータを読み取ることができます。Spark のデフォルトの
jdbc-connectorと比較して、spark-connectorは Hologres テーブルのシャードに基づいてデータを並行して読み取ることができるため、パフォーマンスが向上します。読み取りの並列度はテーブル内のシャード数に依存します。spark-connectorはread.max_task_countパラメーターによって制限できます。ジョブは最終的にMin(shardCount, max_task_count)個の読み取りタスクを生成します。また、スキーマ推論もサポートしています。スキーマを提供しない場合、コネクタは Hologres テーブルスキーマから Spark 側のスキーマを推論します。 -
spark-connector バージョン 1.5.0 以降、Hologres テーブルからの読み取りは、述語プッシュダウン、LIMIT プッシュダウン、およびカラムプルーニングをサポートします。また、Hologres の
SELECT QUERYを渡してデータを読み取ることもサポートしています。このバージョンではバッチモードの読み取りが導入され、以前のバージョンと比較して読み取りパフォーマンスが 3〜4 倍向上します。
Spark-SQL を使用したデータの読み取り
Spark-SQL を使用する場合、カタログを使用して Hologres テーブルのメタデータをロードする方が便利です。一時テーブルを作成して Hologres テーブルを宣言することもできます。
-
1.5.2 より前のバージョンの Hologres-Connector-Spark はカタログをサポートしていません。一時テーブルを作成して Hologres テーブルを宣言する必要があります。
-
Hologres-Connector-Spark のパラメーターの詳細については、「パラメーターの説明」をご参照ください。
-
Hologres カタログを初期化します。
spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \ --conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \ --conf spark.sql.catalog.hologres_external_test_db.username=*** \ --conf spark.sql.catalog.hologres_external_test_db.password=*** \ --conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db -
Hologres からデータを読み取ります。
-
カタログを使用してデータを読み取ります。
-- Hologres カタログをロードします。 USE hologres_external_test_db; -- Hologres テーブルから読み取ります。カラムプルーニングと述語プッシュダウンがサポートされています。 SELECT c_custkey,c_name,c_phone FROM public.customer_holo_table WHERE c_custkey < 500 LIMIT 10; -
一時テーブルを作成してデータを読み取ります。
CREATE TEMPORARY VIEW(table)
CREATE TEMPORARY VIEW hologresTable USING hologres OPTIONS ( jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db", username "***", password "***", read.max_task_count "80", // Hologres テーブルの読み取りに使用するタスクの最大数。 table "customer_holo_table" ); -- カラムプルーニングと述語プッシュダウンがサポートされています。 SELECT c_custkey,c_name,c_phone FROM hologresTable WHERE c_custkey < 500 LIMIT 10;CREATE TEMPORARY VIEW(query)
CREATE TEMPORARY VIEW hologresTable USING hologres OPTIONS ( jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db", username "***", password "***", read.query "select c_custkey,c_name,c_phone from customer_holo_table where c_custkey < 500 limit 10" ); SELECT * FROM hologresTable LIMIT 5;
-
Hologres データを DataFrame に読み込む
spark-shell、pyspark、またはその他のツールを使用して Spark ジョブを開発する場合、Spark の Read インターフェイスを呼び出してデータを DataFrame に読み込み、後続の計算を行うことができます。以下は、異なる言語で Hologres テーブルを DataFrame に読み込む例です。Hologres-Connector-Spark のパラメーターの詳細については、「パラメーターの説明」をご参照ください。
Scala
val readDf = (
spark.read
.format("hologres")
.option("username", "***")
.option("password", "***")
.option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
.option("table", "customer_holo_table")
.option("read.max_task_count", "80") // Hologres テーブルの読み取りに使用するタスクの最大数。
.load()
.filter("c_custkey < 500")
)
readDf.select("c_custkey", "c_name", "c_phone").show(10)
Java
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class SparkSelect {
public static void main(String[] args) {
// ローカルモードで実行します。
SparkSession spark = SparkSession.builder()
.appName("Spark CSV Example")
.master("local[*]")
.getOrCreate();
Dataset<Row> readDf = (
spark.read
.format("hologres")
.option("username", "***")
.option("password", "***")
.option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
.option("table", "customer_holo_table")
.option("read.max_task_count", "80") // Hologres テーブルの読み取りに使用するタスクの最大数。
.load()
.filter("c_custkey < 500")
);
readDf.select("c_custkey", "c_name", "c_phone").show(10);
}
}
Maven ファイルには次の構成が必要です。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.5.4</version>
<scope>provided</scope>
</dependency>
Python
readDf = spark.read.format("hologres").option(
"username", "***").option(
"password", "***").option(
"jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
"table", "customer_holo_table").option(
"read.max_task_count", "80").load()
readDf.select("c_custkey", "c_name", "c_phone").show(10)
異なる言語で Spark ジョブを実行するには、次の操作を実行します。
-
Scala
-
サンプルコードを使用して sparkselect.scala ファイルを生成し、次のようにジョブを実行できます。
-- 依存関係をロードします。 spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar -- ローカルテストの場合は、絶対パスを使用してファイルをロードします。 scala> :load D:/sparkselect.scala -
依存関係をロードした後にサンプルコードを直接貼り付けて実行することもできます。
-
-
Java
サンプルコードを開発ツールにインポートし、Maven を使用してパッケージ化できます。たとえば、パッケージ名を spark_select.jar とすることができます。次のコードでジョブを実行します。
-- ジョブの JAR パッケージには絶対パスを使用します。 spark-submit --class SparkSelect --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar D:\spark_select.jar -
Python
次のコードを実行した後、サンプルコードを直接貼り付けて実行できます。
pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar
パラメーターの説明
一般パラメーター
|
パラメーター |
デフォルト値 |
必須 |
説明 |
|
username |
なし |
はい |
|
|
password |
なし |
はい |
|
|
table |
なし |
はい |
読み書きする Hologres テーブルの名前。 説明
データを読み取る際、代わりに |
|
jdbcurl |
なし |
はい |
Hologres リアルタイムデータ API の JDBC URL。フォーマットは |
|
enable_serverless_computing |
false |
いいえ |
サーバーレスリソースを使用するかどうかを指定します。このパラメーターは、読み取り操作と |
|
serverless_computing_query_priority |
3 |
いいえ |
サーバーレスコンピューティングの実行優先度。 |
|
statement_timeout_seconds |
28800 (8 時間) |
いいえ |
クエリ実行のタイムアウト期間 (秒単位)。 |
|
retry_count |
3 |
いいえ |
接続が失敗した場合のリトライ回数。 |
|
direct_connect |
直接接続をサポートする環境では、デフォルトで直接接続が使用されます。 |
いいえ |
バッチデータの読み書きのボトルネックは、多くの場合、エンドポイントのネットワークスループットです。そのため、システムは現在の環境が Hologres フロントエンド (アクセスノード) に直接接続できるかどうかをテストします。直接接続がサポートされている場合、デフォルトで使用されます。このパラメーターを |
書き込みパラメーター
Hologres コネクタは Spark の SaveMode パラメーターをサポートしています。SQL の場合、これは INSERT INTO または INSERT OVERWRITE を意味します。DataFrame の場合、これは書き込み時に SaveMode を Append または Overwrite に設定することを意味します。Overwrite は書き込み用の一時テーブルを作成し、書き込みが成功した後に元のテーブルを置き換えます。注意して使用してください。
|
パラメーター名 |
以前のパラメーター名 |
デフォルト値 |
必須 |
説明 |
|
write.mode |
copy_write_mode |
auto |
いいえ |
書き込みモード。有効な値は次のとおりです。書き込みモードの比較については、「バッチ書き込みモードの比較」をご参照ください。
|
|
write.copy.max_buffer_size |
max_cell_buffer_size |
52428800 (50 MB) |
いいえ |
COPY モードで書き込む場合、通常、ローカルバッファーの最大長を調整する必要はありません。ただし、非常に長い文字列などの大きなフィールドを書き込むときにバッファーオーバーフローが発生した場合は、この値を増やすことができます。 |
|
write.copy.dirty_data_check |
copy_write_dirty_data_check |
false |
いいえ |
ダーティデータの検証を実行するかどうかを指定します。この機能を有効にすると、ダーティデータが見つかった場合に書き込みに失敗した特定の行を特定できます。ただし、これは書き込みパフォーマンスに影響します。トラブルシューティングの場合を除き、この機能を有効にしないでください。 |
|
write.on_conflict_action |
write_mode |
INSERT_OR_REPLACE |
いいえ |
プライマリキーを持つテーブルに挿入するときのポリシー:
|
次のパラメーターは、write.mode が insert に設定されている場合にのみ有効です。
|
パラメーター名 |
以前のパラメーター名 |
デフォルト値 |
必須 |
説明 |
|
write.insert.dynamic_partition |
dynamic_partition |
false |
いいえ |
このパラメーターは、 |
|
write.insert.batch_size |
write_batch_size |
512 |
いいえ |
各書き込みスレッドの最大バッチサイズ。WriteMode によってマージされた後の Put 操作の数がこの値に達すると、バッチがコミットされます。 |
|
write.insert.batch_byte_size |
write_batch_byte_size |
2097152 (2 * 1024 * 1024) |
いいえ |
各書き込みスレッドの最大バッチサイズ (バイト単位)。デフォルト値は 2 MB です。WriteMode によってマージされた後の Put データのバイトサイズがこの値に達すると、バッチがコミットされます。 |
|
write.insert.max_interval_ms |
write_max_interval_ms |
10000 |
いいえ |
最後のコミットからの時間がこの値を超えると、バッチがコミットされます。 |
|
write.insert.thread_size |
write_thread_size |
1 |
いいえ |
同時書き込みスレッドの数。各同時スレッドは 1 つのデータベース接続を占有します。 |
読み取りパラメーター
|
パラメーター名 |
以前のパラメーター名 (バージョン 1.5.0 以前) |
デフォルト値 |
必須 |
説明 |
|
read.mode |
bulk_read |
auto |
いいえ |
読み取りモード。有効な値:
|
|
read.max_task_count |
max_partition_count |
80 |
いいえ |
読み取る Hologres テーブルを複数のパーティションに分割します。各パーティションは 1 つの Spark タスクに対応します。Hologres テーブルの `ShardCount` がこのパラメーターより小さい場合、パーティションの数は最大で `ShardCount` になります。 |
|
read.copy.max_buffer_size |
/ |
52428800 (50 MB) |
いいえ |
COPY モードで読み取る場合、これはローカルバッファーの最大長です。大きなフィールドで例外が発生した場合は、この値を増やしてください。 |
|
read.push_down_predicate |
push_down_predicate |
true |
いいえ |
述語プッシュダウンを有効にするかどうかを指定します。たとえば、クエリ中にフィルター条件を適用します。この機能は、一般的なフィルター条件のプッシュダウンとカラムプルーニングをサポートします。 |
|
read.push_down_limit |
push_down_limit |
true |
いいえ |
LIMIT プッシュダウンを有効にするかどうかを指定します。 |
|
read.select.batch_size |
scan_batch_size |
256 |
いいえ |
このパラメーターは、 |
|
read.select.timeout_seconds |
scan_timeout_seconds |
60 |
いいえ |
このパラメーターは、 |
|
read.query |
query |
なし |
いいえ |
指定された 説明
|
データ型マッピング
|
Spark 型 |
Hologres 型 |
|
ShortType |
SMALLINT |
|
IntegerType |
INT |
|
LongType |
BIGINT |
|
StringType |
TEXT |
|
StringType |
JSON |
|
StringType |
JSONB |
|
DecimalType |
NUMERIC(38, 18) |
|
BooleanType |
BOOL |
|
DoubleType |
DOUBLE PRECISION |
|
FloatType |
FLOAT |
|
TimestampType |
TIMESTAMPTZ |
|
DateType |
DATE |
|
BinaryType |
BYTEA |
|
BinaryType |
ROARINGBITMAP |
|
ArrayType(IntegerType) |
INT4[] |
|
ArrayType(LongType) |
INT8[] |
|
ArrayType(FloatType) |
FLOAT4[] |
|
ArrayType(DoubleType) |
FLOAT8[] |
|
ArrayType(BooleanType) |
BOOLEAN[] |
|
ArrayType(StringType) |
TEXT[] |
接続数の計算
Hologres-Connector-Spark がデータを読み書きする際、一定数の JDBC 接続を使用します。この数は次の要因に影響される可能性があります。
-
Spark の並列度。これは同時に実行されるタスクの数で、ジョブ実行中に Spark UI で確認できます。
-
各同時タスクでコネクタが使用する接続数:
-
COPY モードでの書き込みの場合、各同時タスクは 1 つの JDBC 接続のみを使用します。
-
INSERT モードでの書き込みの場合、各同時タスクは
write_thread_size個の JDBC 接続を使用します。 -
読み取りの場合、各同時タスクは 1 つの JDBC 接続を使用します。
-
-
その他の側面で使用される接続:ジョブが開始されると、スキーマの取得などの操作が実行され、一時的に接続が確立される場合があります。
したがって、ジョブが使用する合計接続数は、次の数式を使用して計算できます。
|
作業項目 |
使用される接続数 |
|
カタログからメタデータをクエリ |
1 |
|
データの読み取り |
並列度 × 1 + 1 |
|
COPY モードでの書き込み |
並列度 × 1 + 1 |
|
INSERT モードでの書き込み |
並列度 × write_thread_size + 1 |
上記の接続数計算は、Spark が同時に実行できるタスクの数が、ジョブによって生成されるタスクの数よりも多いことを前提としています。
Spark が実行できる同時タスクの数は、spark.executor.instances などのユーザー設定パラメーターに影響される可能性があり、また Hadoop のファイル分割ポリシーにも影響される可能性があります。詳細については、「Apache Hadoop」をご参照ください。