すべてのプロダクト
Search
ドキュメントセンター

ApsaraDB for SelectDB:Spark を使用してデータをインポートする

最終更新日:Apr 09, 2025

ApsaraDB for SelectDB は Apache Doris と互換性があります。Spark Doris Connector を使用して、Spark の分散コンピューティング機能を使用して大量のデータをインポートできます。このトピックでは、Spark Doris Connector のしくみと、Spark Doris Connector を使用して ApsaraDB for SelectDB にデータをインポートする方法について説明します。

概要

Spark Doris Connector を使用すると、ApsaraDB for SelectDB に大量のデータをインポートする方法の 1 つとして使用できます。Spark の分散コンピューティング機能に基づいて、MySQL、PostgreSQL、Hadoop Distributed File System(HDFS)、Amazon Simple Storage Service(Amazon S3)などのアップストリームデータソースから DataFrame に大量のデータを読み取り、Spark Doris Connector を使用して ApsaraDB for SelectDB のテーブルにデータをインポートできます。また、Spark Java Database Connectivity(JDBC)を使用して、ApsaraDB for SelectDB のテーブルからデータを読み取ることもできます。

しくみ

次の図は、Spark Doris Connector を使用して ApsaraDB for SelectDB にデータをインポートする方法を示しています。図に示されているアーキテクチャ内では、Spark Doris Connector は外部データを ApsaraDB for SelectDB に書き込むためのブリッジとして機能し、分散コンピューティングクラスターを使用してデータを前処理します。これにより、データリンク全体のデータフローが高速化され、従来の JDBC を使用した低パフォーマンスのデータインポートが置き換えられます。

image

前提条件

Spark Doris Connector のバージョンは 1.3.1 以降です。

Spark Doris Connector 依存関係のインストール

次のいずれかの方法を使用して、Spark Doris Connector の依存関係をインストールできます。

  • Spark Doris Connector の Maven 依存関係をインストールします。次のサンプルコードは例を示しています。他のバージョンの依存関係を取得するには、Maven Repository にアクセスしてください。

    <dependency>
        <groupId>org.apache.doris</groupId>
        <artifactId>spark-doris-connector-3.2_2.12</artifactId>
        <version>1.3.2</version>
    </dependency>
  • Spark Doris Connector の JAR パッケージをダウンロードします。

    次の表に、Spark Doris Connector の 3 つの一般的なパッケージを示します。Spark のバージョンに基づいて JAR パッケージをダウンロードしてください。他のバージョンの依存関係を取得するには、Maven Repository にアクセスしてください。

    説明
    • 次の JAR パッケージは Java 8 を使用してコンパイルされています。他の Java バージョンを使用する場合は、ApsaraDB for SelectDB のテクニカルサポートにお問い合わせください。

    • JAR パッケージのバージョンは、左から右に、対応するサポートされている Spark バージョン、Scala バージョン、および Spark Doris Connector バージョンを示しています。

    バージョン

    JAR パッケージ

    2.4-2.12-1.3.2

    spark-doris-connector-2.4_2.12-1.3.2

    3.1-2.12-1.3.2

    spark-doris-connector-3.1_2.12-1.3.2

    3.2-2.12-1.3.2

    spark-doris-connector-3.2_2.12-1.3.2

    JAR パッケージをダウンロードした後、次のいずれかの方法を使用して Spark を実行します。

    • ローカルクラスターモードで実行されている Spark クラスターの場合、Spark Doris Connector の JAR パッケージを Spark インストールディレクトリの jars ディレクトリに配置します。

    • YARN クラスターモードで実行されている Spark クラスターの場合、Spark Doris Connector の JAR パッケージを事前デプロイパッケージとしてアップロードします。例:

      1. spark-doris-connector-3.2_2.12-1.3.2.jar パッケージを Spark クラスターの HDFS にアップロードします。

        hdfs dfs -mkdir /spark-jars/ 
        hdfs dfs -put /<your_local_path>/spark-doris-connector-3.2_2.12-1.3.2.jar/spark-jars/

        // 翻訳対象外のため変更なし

      2. spark-doris-connector-3.2_2.12-1.3.2.jar パッケージの依存関係をクラスターに追加します。

        spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-3.2_2.12-1.3.2.jar

手順

Spark クライアントで Spark を実行するか、Spark Doris Connector のパッケージを Spark 開発環境にインポートした後、Spark SQL または DataFrame を使用してデータを同期できます。次の例は、アップストリームデータを ApsaraDB for SelectDB インスタンスにインポートする方法を示しています。

Spark SQL を使用する

val selectdbHttpPort = "selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080"
val selectdbJdbc = "jdbc:mysql://selectdb-cn-****.selectdbfe.rds.aliyuncs.com:9030"
val selectdbUser = "admin"
val selectdbPwd = "****"
val selectdbTable = "test_db.test_order"
  
CREATE TEMPORARY VIEW test_order
USING doris
OPTIONS(
 "table.identifier"="${selectdbTable}",
 "fenodes"="${selectdbHttpPort}",
 "user"="${selectdbUser}",
 "password"="${selectdbPwd}",
 "sink.properties.format"="json"
);

// insert data into SelectDB
INSERT INTO test_order SELECT order_id,order_amount,order_status FROM tmp_tb;

パラメーター

パラメーター

デフォルト値

必須

説明

fenodes

なし

はい

HTTP 経由で ApsaraDB for SelectDB インスタンスに接続するために使用されるエンドポイントです。

ApsaraDB for SelectDBインスタンスのVirtual Private Cloud (VPC)エンドポイントまたはパブリックエンドポイントとHTTPポートを取得するには、次の操作を実行します。 ApsaraDB for SelectDB コンソールにログオンし、情報を表示するインスタンスの[インスタンスの詳細] ページに移動します。 [基本情報] タブの[ネットワーク情報] セクションで、[VPCエンドポイント] パラメーターまたは [パブリックエンドポイント] パラメーターと [HTTPポート] パラメーターの値を確認します。

例: selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 です。

table.identifier

なし

はい

ApsaraDB for SelectDB インスタンス内のテーブルの名前。形式: データベース名.テーブル名。例: test_db.test_table

request.retries

3

いいえ

ApsaraDB for SelectDB インスタンスへのリクエスト送信の最大再試行回数。

request.connect.timeout.ms

30000

いいえ

ApsaraDB for SelectDB インスタンスへのリクエストの接続タイムアウト期間。

request.read.timeout.ms

30000

いいえ

ApsaraDB for SelectDB インスタンスへのリクエストの読み取りタイムアウト期間。

request.query.timeout.s

3600

いいえ

ApsaraDB for SelectDB インスタンスのクエリタイムアウト期間。デフォルト値は 1 時間を示します。値 -1 は無制限のタイムアウト期間を示します。

request.tablet.size

Integer.MAX_VALUE

いいえ

Resilient Distributed Dataset(RDD)パーティションに対応する ApsaraDB for SelectDB タブレットの数。

設定値が小さいほど、生成されるパーティションが多くなります。これにより、Spark 側の並列性が向上しますが、ApsaraDB for SelectDB への負荷も増加します。

read.field

なし

いいえ

データを読み取る ApsaraDB for SelectDB テーブルの列。複数の列はコンマ(,)で区切ります。

batch.size

1024

いいえ

バックエンドから一度に読み取ることができる行の最大数。一度に多くの行をバックエンドから読み取る場合、Spark と ApsaraDB for SelectDB の間に確立される接続数が削減されます。これにより、ネットワークレイテンシによって発生する追加の時間オーバーヘッドが削減されます。

exec.mem.limit

2147483648

いいえ

単一クエリのメモリしきい値。デフォルト値は 2 GB を示します。単位: バイト。

deserialize.arrow.async

false

いいえ

Spark Doris Connector の反復処理に必要な RowBatch に Arrow データを非同期的に逆シリアル化するかどうかを指定します。

deserialize.queue.size

64

いいえ

Arrow データを非同期的に逆シリアル化するための内部処理キューのサイズ。このパラメーターは、deserialize.arrow.async パラメーターが true に設定されている場合にのみ有効になります。

write.fields

なし

いいえ

ApsaraDB for SelectDB テーブルに書き込むフィールド、またはフィールドを書き込む順序。複数のフィールドはコンマ(,)で区切ります。デフォルトでは、すべてのフィールドは ApsaraDB for SelectDB テーブルのフィールド順に基づいて ApsaraDB for SelectDB テーブルに書き込まれます。

sink.batch.size

100000

いいえ

バックエンドに一度に書き込むことができる行の最大数。

sink.max-retries

0

いいえ

バックエンドへのデータ書き込みに失敗した後の最大再試行回数。

sink.properties.format

csv

いいえ

Stream Load でサポートされているデータ形式。有効な値: csv、json、arrow。

sink.properties.*

--

いいえ

Stream Load ジョブの送信に使用されるパラメーター。たとえば、'sink.properties.column_separator' = ',' を指定して、列区切り文字を指定できます。パラメーターの詳細については、「Stream Load を使用してデータをインポートする」をご参照ください。

sink.task.partition.size

なし

いいえ

ApsaraDB for SelectDB インスタンスにデータが書き込まれるパーティションの数。Spark の RDD でフィルタリングなどの操作を実行すると、データが書き込まれるパーティションの数が増加する可能性がありますが、各パーティションには少数のレコードのみが含まれます。これにより、書き込み操作の頻度が増加し、コンピューティングリソースが浪費されます。

値が小さいほど、ApsaraDB for SelectDB インスタンスにデータを書き込む頻度が低くなります。これにより、ApsaraDB for SelectDB インスタンスのデータマージの負荷が軽減されます。このパラメーターは、sink.task.use.repartition パラメーターと一緒に使用する必要があります。

sink.task.use.repartition

false

いいえ

データを ApsaraDB for SelectDB インスタンスに書き込む前に、データを特定の数のパーティションに再パーティション化するかどうかを指定します。デフォルト値: false。これは、coalesce が使用されることを示します。書き込み操作の前にアクションが呼び出されない場合、コンピューティングの並列性が最適でない可能性があります。

このパラメーターを true に設定すると、再パーティション化が有効になります。データを再パーティション化するパーティションの数を指定できます。これにより、シャッフルのオーバーヘッドが増加します。

sink.batch.interval.ms

50

いいえ

シンクにデータが書き込まれる間隔。単位: ミリ秒。

sink.enable-2pc

false

いいえ

データの書き込みに 2 フェーズコミット(2PC)プロコルを使用するかどうかを指定します。このパラメーターを true に設定すると、Spark ジョブの完了後にトランザクションがコミットされます。一部のタスクが失敗した場合、事前にコミットされたすべてのトランザクションがロールバックされます。

sink.auto-redirect

true

いいえ

Stream Load リクエストをリダイレクトするかどうかを指定します。このパラメーターを true に設定すると、Stream Load リクエストはフロントエンドを使用してロードされます。これにより、バックエンド情報を明示的に取得する必要がなくなります。

user

なし

はい

ApsaraDB for SelectDB

password

なし

はい

ApsaraDB for SelectDB

filter.query.in.max.count

100

いいえ

Spark が述語プッシュダウンを実行するときに IN 句に含めることができる値の最大数。IN 句の値の数が指定されたしきい値を超えると、フィルター操作は Spark によって処理されます。

ignore-type

なし

いいえ

一時ビューのスキーマを読み取るときに無視するフィールドタイプ。

例: 'ignore-type'='bitmap,hll'

DataFrame を使用する

val spark = SparkSession.builder().master("local[1]").getOrCreate()
val df = spark.createDataFrame(Seq(
  ("1", 100, "Pending Payment"),
  ("2", 200, null),
  ("3", 300, "Received")
)).toDF("order_id", "order_amount", "order_status")

// DataFrame API を使用して SelectDB にデータを書き込む
df.write
  .format("doris")
  .option("fenodes", selectdbHttpPort)
  .option("table.identifier", selectdbTable)
  .option("user", selectdbUser)
  .option("password", selectdbPwd)
  .option("sink.batch.size", 100000)
  .option("sink.max-retries", 3)
  .option("sink.properties.file.column_separator", "\t")
  .option("sink.properties.file.line_delimiter", "\n")
  .save()

パラメーター

パラメーター

デフォルト値

必須

説明

fenodes

なし

はい

ApsaraDB for SelectDB インスタンスに HTTP 経由で接続するために使用されるエンドポイントです。

ApsaraDB for SelectDB インスタンスの VPC エンドポイントまたはパブリックエンドポイントと HTTP ポートを取得するには、次の操作を実行します。ApsaraDB for SelectDB コンソールにログオンし、情報を確認するインスタンスの [インスタンスの詳細] ページに移動します。「基本情報」タブの [ネットワーク情報] セクションで、[VPC エンドポイント] または [パブリックエンドポイント] パラメーターと [HTTP ポート] パラメーターの値を確認します。

例: selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080

table.identifier

なし

はい

ApsaraDB for SelectDB インスタンス内のテーブルの名前。形式: データベース名.テーブル名。例: test_db.test_table

request.retries

3

いいえ

ApsaraDB for SelectDB インスタンスへのリクエスト送信の最大再試行回数。

request.connect.timeout.ms

30000

いいえ

ApsaraDB for SelectDB インスタンスへのリクエストの接続タイムアウト期間。

request.read.timeout.ms

30000

いいえ

ApsaraDB for SelectDB インスタンスへのリクエストの読み取りタイムアウト期間。

request.query.timeout.s

3600

いいえ

ApsaraDB for SelectDB インスタンスのクエリタイムアウト期間。デフォルト値は 1 時間を示します。値 -1 は無制限のタイムアウト期間を示します。

request.tablet.size

Integer.MAX_VALUE

いいえ

RDD パーティションに対応する ApsaraDB for SelectDB タブレットの数。

設定値が小さいほど、生成されるパーティションが多くなります。これにより、Spark 側の並列性が向上しますが、ApsaraDB for SelectDB への負荷も増加します。

read.field

なし

いいえ

データを読み取る ApsaraDB for SelectDB テーブルの列。複数の列はコンマ(,)で区切ります。

batch.size

1024

いいえ

バックエンドから一度に読み取ることができる行の最大数。一度に多くの行をバックエンドから読み取る場合、Spark と ApsaraDB for SelectDB の間に確立される接続数が削減されます。これにより、ネットワークレイテンシによって発生する追加の時間オーバーヘッドが削減されます。

exec.mem.limit

2147483648

いいえ

単一クエリのメモリしきい値。デフォルト値は 2 GB を示します。単位: バイト。

deserialize.arrow.async

false

いいえ

Spark Doris Connector の反復処理に必要な RowBatch に Arrow データを非同期的に逆シリアル化するかどうかを指定します。

deserialize.queue.size

64

いいえ

Arrow データを非同期的に逆シリアル化するための内部処理キューのサイズ。このパラメーターは、deserialize.arrow.async パラメーターが true に設定されている場合にのみ有効になります。

write.fields

なし

いいえ

ApsaraDB for SelectDB テーブルに書き込むフィールド、またはフィールドを書き込む順序。複数のフィールドはコンマ(,)で区切ります。デフォルトでは、すべてのフィールドは ApsaraDB for SelectDB テーブルのフィールド順に基づいて ApsaraDB for SelectDB テーブルに書き込まれます。

sink.batch.size

100000

いいえ

バックエンドに一度に書き込むことができる行の最大数。

sink.max-retries

0

いいえ

バックエンドへのデータ書き込みに失敗した後の最大再試行回数。

sink.properties.format

csv

いいえ

Stream Load でサポートされているデータ形式。有効な値: csv、json、arrow。

sink.properties.*

--

いいえ

Stream Load ジョブの送信に使用されるパラメーター。たとえば、'sink.properties.column_separator' = ',' を指定して、列区切り文字を指定できます。その他のパラメーターについては、「Stream Load を使用してデータをインポートする」をご参照ください。

sink.task.partition.size

なし

いいえ

ApsaraDB for SelectDB インスタンスにデータが書き込まれるパーティションの数。Spark の RDD でフィルタリングなどの操作を実行すると、データが書き込まれるパーティションの数が増加する可能性がありますが、各パーティションには少数のレコードのみが含まれます。これにより、書き込み操作の頻度が増加し、コンピューティングリソースが浪費されます。

値が小さいほど、ApsaraDB for SelectDB インスタンスにデータを書き込む頻度が低くなります。これにより、ApsaraDB for SelectDB インスタンスのデータマージの負荷が軽減されます。このパラメーターは、sink.task.use.repartition パラメーターと一緒に使用する必要があります。

sink.task.use.repartition

false

いいえ

データを ApsaraDB for SelectDB インスタンスに書き込む前に、データを特定の数のパーティションに再パーティション化するかどうかを指定します。デフォルト値: false。これは、coalesce が使用されることを示します。書き込み操作の前にアクションが呼び出されない場合、コンピューティングの並列性が最適でない可能性があります。

このパラメーターを true に設定すると、再パーティション化が有効になります。データを再パーティション化するパーティションの数を指定できます。これにより、シャッフルのオーバーヘッドが増加します。

sink.batch.interval.ms

50

いいえ

シンクにデータが書き込まれる間隔。単位: ミリ秒。

sink.enable-2pc

false

いいえ

データの書き込みに 2PC プロトコルを使用するかどうかを指定します。このパラメーターを true に設定すると、Spark ジョブの完了後にトランザクションがコミットされます。一部のタスクが失敗した場合、事前にコミットされたすべてのトランザクションがロールバックされます。

sink.auto-redirect

true

いいえ

Stream Load リクエストをリダイレクトするかどうかを指定します。このパラメーターを true に設定すると、Stream Load リクエストはフロントエンドを使用してロードされます。これにより、バックエンド情報を明示的に取得する必要がなくなります。

user

なし

はい

ApsaraDB for SelectDB

password

なし

はい

ApsaraDB for SelectDB

filter.query.in.max.count

100

いいえ

Spark が述語プッシュダウンを実行するときに IN 句に含めることができる値の最大数。IN 句の値の数が指定されたしきい値を超えると、フィルター操作は Spark によって処理されます。

ignore-type

なし

いいえ

一時ビューのスキーマを読み取るときに無視するフィールドタイプ。

例: 'ignore-type'='bitmap,hll'

sink.streaming.passthrough

false

いいえ

最初の列の値を処理せずに書き込みます。

次の表に、サンプル環境のすべてのソフトウェアのバージョンを示します。

ソフトウェア

Java

Spark

Scala

SelectDB

バージョン

1.8

3.1.2

2.12

3.0.4

環境を準備する

  • Spark 環境を構成します。

    1. Spark インストールパッケージをダウンロードして解凍します。この例では、Spark インストールパッケージ spark-3.1.2-bin-hadoop3.2.tgz を使用します。

      wget https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
      tar xvzf spark-3.1.2-bin-hadoop3.2.tgz

      // 翻訳対象外のため変更なし

    2. spark-doris-connector-3.2_2.12-1.3.2.jar パッケージを SPARK_HOME/jars ディレクトリに配置します。

  • インポートするデータを構築します。この例では、少量のデータが MySQL データベースからインポートされます。

    1. MySQL データベースにテストテーブルを作成します。

      CREATE TABLE `employees` (
        `emp_no` int NOT NULL,
        `birth_date` date NOT NULL,
        `first_name` varchar(14) NOT NULL,
        `last_name` varchar(16) NOT NULL,
        `gender` enum('M','F') NOT NULL,
        `hire_date` date NOT NULL,
        PRIMARY KEY (`emp_no`)
      ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3

      // 翻訳対象外のため変更なし

    2. Data Management (DMS) を使用してテストデータを生成します。詳細については、「テストデータを生成する」をご参照ください。

  • ApsaraDB for SelectDB インスタンスを構成します。

    1. ApsaraDB for SelectDB インスタンスを作成します。詳細については、「インスタンスの作成」をご参照ください。

    2. MySQL プロトコルを介して ApsaraDB for SelectDB インスタンスに接続します。詳細については、「インスタンスへの接続」をご参照ください。

    3. テストデータベースとテストテーブルを作成します。

      1. テストデータベースを作成します。

        CREATE DATABASE test_db;
      2. テストテーブルを作成します。

        USE test_db;
        CREATE TABLE employees (
            emp_no       int NOT NULL,
            birth_date   date,
            first_name   varchar(20),
            last_name    varchar(20),
            gender       char(2),
            hire_date    date
        )
        UNIQUE KEY(`emp_no`)
        DISTRIBUTED BY HASH(`emp_no`) BUCKETS 32;
    4. ApsaraDB for SelectDB インスタンスのパブリックエンドポイントを申請します。詳細については、「パブリックエンドポイントの申請またはリリース」をご参照ください。

    5. Spark 環境のパブリック IP アドレスを ApsaraDB for SelectDB インスタンスの IP アドレスホワイトリストに追加します。詳細については、「IP アドレスホワイトリストを設定する」をご参照ください。

MySQL データベースから ApsaraDB for SelectDB インスタンスにデータをインポートする

Spark SQL を使用する

次の例は、Spark SQL を使用してアップストリーム MySQL データベースから ApsaraDB for SelectDB インスタンスにデータをインポートする方法を示しています。

  1. spark-sql サービスを開始します。

    bin/spark-sql

    // 翻訳対象外のため変更なし

  2. spark-sql でタスクを送信します。

    CREATE TEMPORARY VIEW mysql_tbl
    USING jdbc
    OPTIONS(
     "url"="jdbc:mysql://host:port/test_db",
     "dbtable"="employees",
     "driver"="com.mysql.jdbc.Driver",
     "user"="admin",
     "password"="****"
    );
    
    // SelectDB の一時ビューを作成する
    CREATE TEMPORARY VIEW selectdb_tbl
    USING doris
    OPTIONS(
     "table.identifier"="test_db.employees",
     "fenodes"="selectdb-cn-****-public.selectdbfe.rds.aliyuncs.com:8080",
     "user"="admin",
     "password"="****",
     "sink.properties.format"="json"
    );
    
    // MySQL テーブルから SelectDB テーブルにデータを挿入する
    INSERT INTO selectdb_tbl SELECT emp_no, birth_date, first_name, last_name, gender, hire_date FROM mysql_tbl;

    // 一部翻訳対象外のため変更なし

  3. Spark タスクの完了後、ApsaraDB for SelectDB コンソールにログインして、Spark を使用してインポートされたデータを表示します。

DataFrame を使用する

次の例は、DataFrame を使用して、アップストリーム MySQL データベースから ApsaraDB for SelectDB インスタンスにデータをインポートする方法を示しています。

  1. spark-shell サービスを起動します。

    bin/spark-shell
  2. spark-shell でタスクを送信します。

    val mysqlDF = spark.read.format("jdbc")
    	.option("url", "jdbc:mysql://host:port/test_db")
    	.option("dbtable", "employees")
    	.option("driver", "com.mysql.jdbc.Driver")
    	.option("user", "admin")
    	.option("password", "****")
    	.load()
    
    mysqlDF.write.format("doris")
    	.option("fenodes", "host:httpPort")
    	.option("table.identifier", "test_db.employees")
    	.option("user", "admin")
    	.option("password", "****")
    	.option("sink.batch.size", 100000)
    	.option("sink.max-retries", 3)
    	.option("sink.properties.format", "json")
    	.save()
  3. Spark タスクが完了したら、ApsaraDB for SelectDB コンソールにログインして、Spark を使用してインポートされたデータを表示します。