すべてのプロダクト
Search
ドキュメントセンター

ApsaraDB for SelectDB:Spark を使用したデータのインポート

最終更新日:Mar 29, 2026

Spark Doris コネクタを使用すると、Spark の分散計算クラスターを活用して、大量のデータを ApsaraDB for SelectDB にロードできます。このコネクタは、MySQL、PostgreSQL、Hadoop 分散ファイルシステム (HDFS)、Amazon Simple Storage Service (Amazon S3) などの上流データソースからデータを読み込み、DataFrame として処理した後、Stream Load を使用して SelectDB に書き込みます。また、Spark Java Database Connectivity (JDBC) を使用して、SelectDB のテーブルからデータを読み込むことも可能です。

仕組み

Spark Doris コネクタは、外部データソースと ApsaraDB for SelectDB を接続するブリッジとして機能します。Spark が分散計算クラスターでデータを前処理し、その後コネクタが Stream Load を介して SelectDB に書き込みます。これにより、単一ノードベースの JDBC 書き込みが、Spark クラスターのスケールに応じて並列化可能なパイプラインに置き換えられます。

image

前提条件

開始する前に、以下の条件を満たしていることを確認してください。

  • Spark Doris コネクタ 1.3.1 以降がインストール済みであること

  • ApsaraDB for SelectDB のインスタンスが存在すること。詳細については、「インスタンスの作成」をご参照ください。

  • SelectDB インスタンスへの接続が確立済みであること。詳細については、「ApsaraDB for SelectDB インスタンスへの接続」をご参照ください。

  • ご利用の Spark 環境のパブリック IP アドレスが、SelectDB の IP アドレスホワイトリストに登録済みであること。詳細については、「IP アドレスホワイトリストの設定」をご参照ください。

Spark Doris コネクタのインストール

以下のいずれかのインストール方法を選択してください。

オプション 1:Maven 依存関係の追加

pom.xml に以下の依存関係を追加します。他のバージョンについては、「Maven Repository」をご参照ください。

<dependency>
    <groupId>org.apache.doris</groupId>
    <artifactId>spark-doris-connector-3.2_2.12</artifactId>
    <version>1.3.2</version>
</dependency>

オプション 2:JAR パッケージのダウンロード

ご利用の Spark および Scala のバージョンに対応する JAR パッケージをダウンロードします。他のバージョンについては、「Maven Repository」をご参照ください。

以下の JAR パッケージは Java 8 でコンパイルされています。異なる Java バージョンが必要な場合は、テクニカルサポートまでお問い合わせください。バージョン文字列は左から順に、Spark バージョン、Scala バージョン、Spark Doris コネクタのバージョンを表します。
バージョンJAR パッケージ
2.4-2.12-1.3.2spark-doris-connector-2.4_2.12-1.3.2
3.1-2.12-1.3.2spark-doris-connector-3.1_2.12-1.3.2
3.2-2.12-1.3.2spark-doris-connector-3.2_2.12-1.3.2

ダウンロード後、ご利用の Spark クラスターモードに応じて JAR をデプロイします。

  • ローカルクラスターモード:JAR を Spark インストールディレクトリの jars ディレクトリに配置します。

  • YARN クラスターモード:JAR を事前デプロイメントパッケージとしてアップロードします。

    1. HDFS への JAR のアップロード:bash hdfs dfs -mkdir /spark-jars/ hdfs dfs -put /<your_local_path>/spark-doris-connector-3.2_2.12-1.3.2.jar /spark-jars/

    2. クラスター構成への依存関係の追加: spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-3.2_2.12-1.3.2.jar

データのインポート

Spark Doris コネクタは、Spark SQL および DataFrame の 2 つの書き込み API をサポートしています。どちらも doris データソースフォーマットを使用し、同一の構成パラメーターを共有します。

SparkSQL

Spark SQL シェルを起動し、一時ビューを使用してタスクを送信します。

bin/spark-sql
val selectdbHttpPort = "selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080"
val selectdbJdbc = "jdbc:mysql://selectdb-cn-****.selectdbfe.rds.aliyuncs.com:9030"
val selectdbUser = "admin"
val selectdbPwd = "****"
val selectdbTable = "test_db.test_order"

CREATE TEMPORARY VIEW test_order
USING doris
OPTIONS(
 "table.identifier"="${selectdbTable}",
 "fenodes"="${selectdbHttpPort}",
 "user"="${selectdbUser}",
 "password"="${selectdbPwd}",
 "sink.properties.format"="json"
);

INSERT INTO test_order SELECT order_id, order_amount, order_status FROM tmp_tb;

DataFrame

Spark シェルを起動し、書き込みタスクを送信します。

bin/spark-shell
val spark = SparkSession.builder().master("local[1]").getOrCreate()
val df = spark.createDataFrame(Seq(
  ("1", 100, "Pending Payment"),
  ("2", 200, null),
  ("3", 300, "Received")
)).toDF("order_id", "order_amount", "order_status")

df.write
  .format("doris")
  .option("fenodes", selectdbHttpPort)
  .option("table.identifier", selectdbTable)
  .option("user", selectdbUser)
  .option("password", selectdbPwd)
  .option("sink.batch.size", 100000)
  .option("sink.max-retries", 3)
  .option("sink.properties.file.column_separator", "\t")
  .option("sink.properties.file.line_delimiter", "\n")
  .save()

パラメーター

特に明記されていない限り、すべてのパラメーターは Spark SQL および DataFrame の両方に適用されます。

接続パラメーター

パラメーターデフォルト必須説明
fenodesなしはいSelectDB インスタンスの HTTP エンドポイント。形式: <host>:<http-port>。エンドポイントを確認するには、ApsaraDB for SelectDB コンソールの「インスタンスの詳細」ページを開き、「基本情報 > ネットワーク情報」セクションで「VPC エンドポイント」または「パブリックエンドポイント」と「HTTP ポート」をコピーします。例:selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080
table.identifierなしはい対象テーブル(形式:<database>.<table>)。例:test_db.test_table
userなしはいSelectDB インスタンスへの接続に使用するユーザー名。
passwordなしはいSelectDB インスタンスへの接続に使用するパスワード。

読み取りパラメーター

パラメーターデフォルト必須説明
request.retries3いいえSelectDB インスタンスへのリクエストの最大リトライ回数。
request.connect.timeout.ms30000いいえSelectDB インスタンスへのリクエストの接続タイムアウト(ミリ秒単位)。
request.read.timeout.ms30000いいえSelectDB インスタンスへのリクエストの読み取りタイムアウト(ミリ秒単位)。
request.query.timeout.s3600いいえSelectDB インスタンスのクエリタイムアウト(秒単位)。デフォルト値は 1 時間です。-1 を指定すると、タイムアウトが無効になります。
request.tablet.sizeInteger.MAX_VALUEいいえ各 Resilient Distributed Dataset (RDD) パーティションにマップされる SelectDB タブレットの数。値を小さくするとパーティション数が増え、Spark 側の並列性が向上しますが、その分 SelectDB への負荷も増加します。
read.fieldなしいいえSelectDB テーブルから読み取るカラム。複数のカラムを指定する場合は、カンマで区切ります。
batch.size1024いいえバックエンドノードから 1 回のリクエストで読み取る最大行数。値を大きくすると、接続数およびネットワーク遅延のオーバーヘッドが削減されます。
exec.mem.limit2147483648いいえ単一クエリのメモリ制限(バイト単位)。デフォルト値は 2 GB です。
deserialize.arrow.asyncfalseいいえArrow データを RowBatch へ逆シリアル化する処理を非同期で実行するかどうか。
deserialize.queue.size64いいえ非同期 Arrow 逆シリアル化の内部キューのサイズ。このパラメーターは、deserialize.arrow.asynctrue の場合にのみ有効です。
filter.query.in.max.count100いいえ述語プッシュダウンにおける IN 句の最大値数。このしきい値を超えると、Spark がフィルターをローカルで処理します。
ignore-typeなしいいえ一時ビューのスキーマ読み取り時に無視するフィールドタイプ。例:bitmap,hll

書き込みパラメーター

パラメーターデフォルト必須説明
write.fieldsなしいいえSelectDB テーブルに書き込むフィールド、またはフィールドの書き込み順序。複数のフィールドを指定する場合は、カンマで区切ります。デフォルトでは、テーブルのカラム順序に従ってすべてのフィールドが書き込まれます。
sink.batch.size100000いいえバックエンドに 1 バッチあたり書き込む最大行数。
sink.max-retries0いいえ書き込み失敗後の最大リトライ回数。
sink.properties.formatcsvいいえStream Load のデータフォーマット。有効な値: csvjsonarrow
sink.properties.*--いいえStream Load パラメーターは、基盤となる Stream Load ジョブに直接渡されます。たとえば、sink.properties.column_separator を設定して列区切り文字を指定します。利用可能なすべてのパラメーターについては、「Stream Load を使用したデータのインポート」をご参照ください。
sink.task.partition.sizeなしいいえSelectDB への書き込みに使用するパーティション数。RDD フィルター処理によって多数の小さなパーティションが生成された場合に、書き込み頻度を低減するために使用します。sink.task.use.repartition と併用します。
sink.task.use.repartitionfalseいいえ書き込み前にデータを再パーティション化するかどうか。false の場合、coalesce(シャッフルなし)が使用されます。true の場合、repartition が使用され、より均等に分散されたパーティションが生成されますが、シャッフルによるオーバーヘッドが発生します。
sink.batch.interval.ms50いいえ書き込みバッチ間の間隔(ミリ秒単位)。
sink.enable-2pcfalseいいえ2 フェーズコミット(2PC)を使用するかどうか。有効にすると、トランザクションは Spark ジョブ全体が完了した後にのみコミットされます。いずれかのタスクが失敗した場合、事前にコミットされたすべてのトランザクションがロールバックされます。
sink.auto-redirecttrueいいえStream Load リクエストをフロントエンドノード経由でリダイレクトするかどうか。有効にすると、バックエンドノードのアドレスを明示的に指定する必要はありません。
sink.streaming.passthroughfalseいいえ(DataFrame のみ)最初のカラムの値を変換せずに書き込みます。

本番環境に適用する際の推奨事項

本番ワークロード向けに、以下の設定を検討してください。

  • Exactly-once 配信:すべてのデータが Spark ジョブの成功後にのみコミットされるよう、sink.enable-2pc=true を有効化します。いずれかのタスクが失敗した場合、事前にコミットされたすべてのトランザクションがロールバックされ、部分的な書き込みが防止されます。

  • パーティション制御:RDD フィルター処理によって多数の小さなパーティションが生成された場合、sink.task.partition.sizesink.task.use.repartition=true を併用して、パーティションを統合し、書き込み頻度を低減します。

  • 失敗時のリトライ:一時的な書き込み障害に対して自動的にリトライを実行するため、sink.max-retries を正の整数(例:3)に設定します。

エンドツーエンドのサンプル

このサンプルでは、Spark SQL および DataFrame の両方を使用して、MySQL データベースから ApsaraDB for SelectDB へデータをインポートします。

サンプル環境:

ソフトウェアJavaSparkScalaSelectDB
バージョン1.83.1.22.123.0.4

環境のセットアップ

1. Spark の構成

Spark インストールパッケージをダウンロードして展開します。

wget https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
tar xvzf spark-3.1.2-bin-hadoop3.2.tgz

spark-doris-connector-3.2_2.12-1.3.2.jarSPARK_HOME/jars ディレクトリに配置します。

2. MySQL におけるソーステーブルの作成

CREATE TABLE `employees` (
  `emp_no` int NOT NULL,
  `birth_date` date NOT NULL,
  `first_name` varchar(14) NOT NULL,
  `last_name` varchar(16) NOT NULL,
  `gender` enum('M','F') NOT NULL,
  `hire_date` date NOT NULL,
  PRIMARY KEY (`emp_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;

テストデータを生成するには、Data Management (DMS) を使用します。

3. SelectDB におけるターゲットテーブルの作成

MySQL プロトコルを介して SelectDB インスタンスに接続し、以下のコマンドを実行します。

CREATE DATABASE test_db;

USE test_db;
CREATE TABLE employees (
    emp_no       int NOT NULL,
    birth_date   date,
    first_name   varchar(20),
    last_name    varchar(20),
    gender       char(2),
    hire_date    date
)
UNIQUE KEY(`emp_no`)
DISTRIBUTED BY HASH(`emp_no`) BUCKETS 32;

4. パブリックエンドポイントの申請

SelectDB インスタンスのパブリックエンドポイントを申請します。詳細については、「パブリックエンドポイントの申請または解放」をご参照ください。

5. Spark ホストの IP をホワイトリストに追加

Spark 環境のパブリック IP アドレスを SelectDB IP アドレスホワイトリストに追加します。詳細については、「IP アドレスホワイトリストの設定」をご参照ください。

Spark SQL を使用したインポート

  1. Spark SQL シェルを起動します。

    bin/spark-sql
  2. インポートタスクを送信します。

    CREATE TEMPORARY VIEW mysql_tbl
    USING jdbc
    OPTIONS(
     "url"="jdbc:mysql://host:port/test_db",
     "dbtable"="employees",
     "driver"="com.mysql.jdbc.Driver",
     "user"="admin",
     "password"="****"
    );
    
    CREATE TEMPORARY VIEW selectdb_tbl
    USING doris
    OPTIONS(
     "table.identifier"="test_db.employees",
     "fenodes"="selectdb-cn-****-public.selectdbfe.rds.aliyuncs.com:8080",
     "user"="admin",
     "password"="****",
     "sink.properties.format"="json"
    );
    
    INSERT INTO selectdb_tbl SELECT emp_no, birth_date, first_name, last_name, gender, hire_date FROM mysql_tbl;
  3. ジョブの完了後、ApsaraDB for SelectDB コンソールにログインして、インポートされたデータを確認します。

DataFrame を使用したインポート

  1. Spark シェルを起動します。

    bin/spark-shell
  2. インポートタスクを送信します。

    val mysqlDF = spark.read.format("jdbc")
        .option("url", "jdbc:mysql://host:port/test_db")
        .option("dbtable", "employees")
        .option("driver", "com.mysql.jdbc.Driver")
        .option("user", "admin")
        .option("password", "****")
        .load()
    
    mysqlDF.write.format("doris")
        .option("fenodes", "host:httpPort")
        .option("table.identifier", "test_db.employees")
        .option("user", "admin")
        .option("password", "****")
        .option("sink.batch.size", 100000)
        .option("sink.max-retries", 3)
        .option("sink.properties.format", "json")
        .save()
  3. ジョブの完了後、ApsaraDB for SelectDB コンソールにログインして、インポートされたデータを確認します。

次のステップ