Spark Doris コネクタを使用すると、Spark の分散計算クラスターを活用して、大量のデータを ApsaraDB for SelectDB にロードできます。このコネクタは、MySQL、PostgreSQL、Hadoop 分散ファイルシステム (HDFS)、Amazon Simple Storage Service (Amazon S3) などの上流データソースからデータを読み込み、DataFrame として処理した後、Stream Load を使用して SelectDB に書き込みます。また、Spark Java Database Connectivity (JDBC) を使用して、SelectDB のテーブルからデータを読み込むことも可能です。
仕組み
Spark Doris コネクタは、外部データソースと ApsaraDB for SelectDB を接続するブリッジとして機能します。Spark が分散計算クラスターでデータを前処理し、その後コネクタが Stream Load を介して SelectDB に書き込みます。これにより、単一ノードベースの JDBC 書き込みが、Spark クラスターのスケールに応じて並列化可能なパイプラインに置き換えられます。
前提条件
開始する前に、以下の条件を満たしていることを確認してください。
Spark Doris コネクタ 1.3.1 以降がインストール済みであること
ApsaraDB for SelectDB のインスタンスが存在すること。詳細については、「インスタンスの作成」をご参照ください。
SelectDB インスタンスへの接続が確立済みであること。詳細については、「ApsaraDB for SelectDB インスタンスへの接続」をご参照ください。
ご利用の Spark 環境のパブリック IP アドレスが、SelectDB の IP アドレスホワイトリストに登録済みであること。詳細については、「IP アドレスホワイトリストの設定」をご参照ください。
Spark Doris コネクタのインストール
以下のいずれかのインストール方法を選択してください。
オプション 1:Maven 依存関係の追加
pom.xml に以下の依存関係を追加します。他のバージョンについては、「Maven Repository」をご参照ください。
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>spark-doris-connector-3.2_2.12</artifactId>
<version>1.3.2</version>
</dependency>オプション 2:JAR パッケージのダウンロード
ご利用の Spark および Scala のバージョンに対応する JAR パッケージをダウンロードします。他のバージョンについては、「Maven Repository」をご参照ください。
以下の JAR パッケージは Java 8 でコンパイルされています。異なる Java バージョンが必要な場合は、テクニカルサポートまでお問い合わせください。バージョン文字列は左から順に、Spark バージョン、Scala バージョン、Spark Doris コネクタのバージョンを表します。
| バージョン | JAR パッケージ |
|---|---|
| 2.4-2.12-1.3.2 | spark-doris-connector-2.4_2.12-1.3.2 |
| 3.1-2.12-1.3.2 | spark-doris-connector-3.1_2.12-1.3.2 |
| 3.2-2.12-1.3.2 | spark-doris-connector-3.2_2.12-1.3.2 |
ダウンロード後、ご利用の Spark クラスターモードに応じて JAR をデプロイします。
ローカルクラスターモード:JAR を Spark インストールディレクトリの
jarsディレクトリに配置します。YARN クラスターモード:JAR を事前デプロイメントパッケージとしてアップロードします。
HDFS への JAR のアップロード:
bash hdfs dfs -mkdir /spark-jars/ hdfs dfs -put /<your_local_path>/spark-doris-connector-3.2_2.12-1.3.2.jar /spark-jars/クラスター構成への依存関係の追加:
spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-3.2_2.12-1.3.2.jar
データのインポート
Spark Doris コネクタは、Spark SQL および DataFrame の 2 つの書き込み API をサポートしています。どちらも doris データソースフォーマットを使用し、同一の構成パラメーターを共有します。
SparkSQL
Spark SQL シェルを起動し、一時ビューを使用してタスクを送信します。
bin/spark-sqlval selectdbHttpPort = "selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080"
val selectdbJdbc = "jdbc:mysql://selectdb-cn-****.selectdbfe.rds.aliyuncs.com:9030"
val selectdbUser = "admin"
val selectdbPwd = "****"
val selectdbTable = "test_db.test_order"
CREATE TEMPORARY VIEW test_order
USING doris
OPTIONS(
"table.identifier"="${selectdbTable}",
"fenodes"="${selectdbHttpPort}",
"user"="${selectdbUser}",
"password"="${selectdbPwd}",
"sink.properties.format"="json"
);
INSERT INTO test_order SELECT order_id, order_amount, order_status FROM tmp_tb;DataFrame
Spark シェルを起動し、書き込みタスクを送信します。
bin/spark-shellval spark = SparkSession.builder().master("local[1]").getOrCreate()
val df = spark.createDataFrame(Seq(
("1", 100, "Pending Payment"),
("2", 200, null),
("3", 300, "Received")
)).toDF("order_id", "order_amount", "order_status")
df.write
.format("doris")
.option("fenodes", selectdbHttpPort)
.option("table.identifier", selectdbTable)
.option("user", selectdbUser)
.option("password", selectdbPwd)
.option("sink.batch.size", 100000)
.option("sink.max-retries", 3)
.option("sink.properties.file.column_separator", "\t")
.option("sink.properties.file.line_delimiter", "\n")
.save()パラメーター
特に明記されていない限り、すべてのパラメーターは Spark SQL および DataFrame の両方に適用されます。
接続パラメーター
| パラメーター | デフォルト | 必須 | 説明 |
|---|---|---|---|
fenodes | なし | はい | SelectDB インスタンスの HTTP エンドポイント。形式: <host>:<http-port>。エンドポイントを確認するには、ApsaraDB for SelectDB コンソールの「インスタンスの詳細」ページを開き、「基本情報 > ネットワーク情報」セクションで「VPC エンドポイント」または「パブリックエンドポイント」と「HTTP ポート」をコピーします。例:selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080。 |
table.identifier | なし | はい | 対象テーブル(形式:<database>.<table>)。例:test_db.test_table。 |
user | なし | はい | SelectDB インスタンスへの接続に使用するユーザー名。 |
password | なし | はい | SelectDB インスタンスへの接続に使用するパスワード。 |
読み取りパラメーター
| パラメーター | デフォルト | 必須 | 説明 |
|---|---|---|---|
request.retries | 3 | いいえ | SelectDB インスタンスへのリクエストの最大リトライ回数。 |
request.connect.timeout.ms | 30000 | いいえ | SelectDB インスタンスへのリクエストの接続タイムアウト(ミリ秒単位)。 |
request.read.timeout.ms | 30000 | いいえ | SelectDB インスタンスへのリクエストの読み取りタイムアウト(ミリ秒単位)。 |
request.query.timeout.s | 3600 | いいえ | SelectDB インスタンスのクエリタイムアウト(秒単位)。デフォルト値は 1 時間です。-1 を指定すると、タイムアウトが無効になります。 |
request.tablet.size | Integer.MAX_VALUE | いいえ | 各 Resilient Distributed Dataset (RDD) パーティションにマップされる SelectDB タブレットの数。値を小さくするとパーティション数が増え、Spark 側の並列性が向上しますが、その分 SelectDB への負荷も増加します。 |
read.field | なし | いいえ | SelectDB テーブルから読み取るカラム。複数のカラムを指定する場合は、カンマで区切ります。 |
batch.size | 1024 | いいえ | バックエンドノードから 1 回のリクエストで読み取る最大行数。値を大きくすると、接続数およびネットワーク遅延のオーバーヘッドが削減されます。 |
exec.mem.limit | 2147483648 | いいえ | 単一クエリのメモリ制限(バイト単位)。デフォルト値は 2 GB です。 |
deserialize.arrow.async | false | いいえ | Arrow データを RowBatch へ逆シリアル化する処理を非同期で実行するかどうか。 |
deserialize.queue.size | 64 | いいえ | 非同期 Arrow 逆シリアル化の内部キューのサイズ。このパラメーターは、deserialize.arrow.async が true の場合にのみ有効です。 |
filter.query.in.max.count | 100 | いいえ | 述語プッシュダウンにおける IN 句の最大値数。このしきい値を超えると、Spark がフィルターをローカルで処理します。 |
ignore-type | なし | いいえ | 一時ビューのスキーマ読み取り時に無視するフィールドタイプ。例:bitmap,hll。 |
書き込みパラメーター
| パラメーター | デフォルト | 必須 | 説明 |
|---|---|---|---|
write.fields | なし | いいえ | SelectDB テーブルに書き込むフィールド、またはフィールドの書き込み順序。複数のフィールドを指定する場合は、カンマで区切ります。デフォルトでは、テーブルのカラム順序に従ってすべてのフィールドが書き込まれます。 |
sink.batch.size | 100000 | いいえ | バックエンドに 1 バッチあたり書き込む最大行数。 |
sink.max-retries | 0 | いいえ | 書き込み失敗後の最大リトライ回数。 |
sink.properties.format | csv | いいえ | Stream Load のデータフォーマット。有効な値: csv、json、arrow。 |
sink.properties.* | -- | いいえ | Stream Load パラメーターは、基盤となる Stream Load ジョブに直接渡されます。たとえば、sink.properties.column_separator を設定して列区切り文字を指定します。利用可能なすべてのパラメーターについては、「Stream Load を使用したデータのインポート」をご参照ください。 |
sink.task.partition.size | なし | いいえ | SelectDB への書き込みに使用するパーティション数。RDD フィルター処理によって多数の小さなパーティションが生成された場合に、書き込み頻度を低減するために使用します。sink.task.use.repartition と併用します。 |
sink.task.use.repartition | false | いいえ | 書き込み前にデータを再パーティション化するかどうか。false の場合、coalesce(シャッフルなし)が使用されます。true の場合、repartition が使用され、より均等に分散されたパーティションが生成されますが、シャッフルによるオーバーヘッドが発生します。 |
sink.batch.interval.ms | 50 | いいえ | 書き込みバッチ間の間隔(ミリ秒単位)。 |
sink.enable-2pc | false | いいえ | 2 フェーズコミット(2PC)を使用するかどうか。有効にすると、トランザクションは Spark ジョブ全体が完了した後にのみコミットされます。いずれかのタスクが失敗した場合、事前にコミットされたすべてのトランザクションがロールバックされます。 |
sink.auto-redirect | true | いいえ | Stream Load リクエストをフロントエンドノード経由でリダイレクトするかどうか。有効にすると、バックエンドノードのアドレスを明示的に指定する必要はありません。 |
sink.streaming.passthrough | false | いいえ | (DataFrame のみ)最初のカラムの値を変換せずに書き込みます。 |
本番環境に適用する際の推奨事項
本番ワークロード向けに、以下の設定を検討してください。
Exactly-once 配信:すべてのデータが Spark ジョブの成功後にのみコミットされるよう、
sink.enable-2pc=trueを有効化します。いずれかのタスクが失敗した場合、事前にコミットされたすべてのトランザクションがロールバックされ、部分的な書き込みが防止されます。パーティション制御:RDD フィルター処理によって多数の小さなパーティションが生成された場合、
sink.task.partition.sizeとsink.task.use.repartition=trueを併用して、パーティションを統合し、書き込み頻度を低減します。失敗時のリトライ:一時的な書き込み障害に対して自動的にリトライを実行するため、
sink.max-retriesを正の整数(例:3)に設定します。
エンドツーエンドのサンプル
このサンプルでは、Spark SQL および DataFrame の両方を使用して、MySQL データベースから ApsaraDB for SelectDB へデータをインポートします。
サンプル環境:
| ソフトウェア | Java | Spark | Scala | SelectDB |
|---|---|---|---|---|
| バージョン | 1.8 | 3.1.2 | 2.12 | 3.0.4 |
環境のセットアップ
1. Spark の構成
Spark インストールパッケージをダウンロードして展開します。
wget https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
tar xvzf spark-3.1.2-bin-hadoop3.2.tgzspark-doris-connector-3.2_2.12-1.3.2.jar を SPARK_HOME/jars ディレクトリに配置します。
2. MySQL におけるソーステーブルの作成
CREATE TABLE `employees` (
`emp_no` int NOT NULL,
`birth_date` date NOT NULL,
`first_name` varchar(14) NOT NULL,
`last_name` varchar(16) NOT NULL,
`gender` enum('M','F') NOT NULL,
`hire_date` date NOT NULL,
PRIMARY KEY (`emp_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;テストデータを生成するには、Data Management (DMS) を使用します。
3. SelectDB におけるターゲットテーブルの作成
MySQL プロトコルを介して SelectDB インスタンスに接続し、以下のコマンドを実行します。
CREATE DATABASE test_db;
USE test_db;
CREATE TABLE employees (
emp_no int NOT NULL,
birth_date date,
first_name varchar(20),
last_name varchar(20),
gender char(2),
hire_date date
)
UNIQUE KEY(`emp_no`)
DISTRIBUTED BY HASH(`emp_no`) BUCKETS 32;4. パブリックエンドポイントの申請
SelectDB インスタンスのパブリックエンドポイントを申請します。詳細については、「パブリックエンドポイントの申請または解放」をご参照ください。
5. Spark ホストの IP をホワイトリストに追加
Spark 環境のパブリック IP アドレスを SelectDB IP アドレスホワイトリストに追加します。詳細については、「IP アドレスホワイトリストの設定」をご参照ください。
Spark SQL を使用したインポート
Spark SQL シェルを起動します。
bin/spark-sqlインポートタスクを送信します。
CREATE TEMPORARY VIEW mysql_tbl USING jdbc OPTIONS( "url"="jdbc:mysql://host:port/test_db", "dbtable"="employees", "driver"="com.mysql.jdbc.Driver", "user"="admin", "password"="****" ); CREATE TEMPORARY VIEW selectdb_tbl USING doris OPTIONS( "table.identifier"="test_db.employees", "fenodes"="selectdb-cn-****-public.selectdbfe.rds.aliyuncs.com:8080", "user"="admin", "password"="****", "sink.properties.format"="json" ); INSERT INTO selectdb_tbl SELECT emp_no, birth_date, first_name, last_name, gender, hire_date FROM mysql_tbl;ジョブの完了後、ApsaraDB for SelectDB コンソールにログインして、インポートされたデータを確認します。
DataFrame を使用したインポート
Spark シェルを起動します。
bin/spark-shellインポートタスクを送信します。
val mysqlDF = spark.read.format("jdbc") .option("url", "jdbc:mysql://host:port/test_db") .option("dbtable", "employees") .option("driver", "com.mysql.jdbc.Driver") .option("user", "admin") .option("password", "****") .load() mysqlDF.write.format("doris") .option("fenodes", "host:httpPort") .option("table.identifier", "test_db.employees") .option("user", "admin") .option("password", "****") .option("sink.batch.size", 100000) .option("sink.max-retries", 3) .option("sink.properties.format", "json") .save()ジョブの完了後、ApsaraDB for SelectDB コンソールにログインして、インポートされたデータを確認します。