すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:PostgreSQL へのデータの読み書き

最終更新日:Nov 09, 2025

Spark は、Java Database Connectivity (JDBC) コネクタを使用して PostgreSQL にアクセスすることをネイティブにサポートしています。一部のバージョンの Serverless Spark は、起動時に PostgreSQL JDBC ドライバーを自動的にロードします。これにより、Serverless Spark SQL セッション、Spark バッチ処理ジョブ、または Notebook を使用して、データを直接読み書きできます。

前提条件

  • Serverless Spark ワークスペースが作成されていること。詳細については、「ワークスペースの作成」をご参照ください。

  • PostgreSQL インスタンスが作成されていること。

    セルフマネージド PostgreSQL インスタンス、または RDS for PostgreSQL や PolarDB for PostgreSQL などの Alibaba Cloud データベースを使用できます。

    このトピックでは、Alibaba Cloud RDS for PostgreSQL インスタンスを例として使用します。詳細については、「RDS for PostgreSQL インスタンスをすばやく作成する」をご参照ください。

使用上の注意

  • JDBC ドライバーのバージョン要件:

    • 以下のいずれかの Serverless Spark エンジンバージョンを使用する場合、PostgreSQL JDBC ドライバーを準備する必要はありません。Serverless Spark には、ドライバー (バージョン 42.7.6) が組み込まれています。

      • esr-4.4.0 以降

      • esr-3.4.0 以降

      • esr-2.8.0 以降

    • 上記のバージョンよりも低いエンジンバージョンを使用する場合は、PostgreSQL JDBC Driver を手動でダウンロードし、OSS にアップロードして、[セッションマネージャー][Spark 設定] セクションに次のパラメーターを入力する必要があります。

      spark.emr.serverless.user.defined.jars oss://<bucket>/path/to/postgresql-<version>.jar
  • ネットワーク構成: Serverless Spark と PostgreSQL サービス間のネットワーク接続が確立されていることを確認してください。詳細については、「EMR Serverless Spark と他の VPC 間のネットワーク接続を確立する」をご参照ください。

    説明

    セキュリティグループルールを追加するときは、必要なポートを開いてください。この例では、TCP ポート 5432 を開く必要があります。

手順

方法 1: SQL セッションを使用する

  1. SQL セッションを作成します。 [セッションマネージャー] で SQL セッションを作成し、事前設定された [ネットワーク接続] を選択します。詳細については、「SQL セッションの作成」をご参照ください。

  2. SQL ジョブを作成します。 [データ開発] で、SQL > SparkSQL ジョブを作成し、テスト用に次の SQL 文を使用します。

    CREATE TEMPORARY VIEW test
    USING org.apache.spark.sql.jdbc
    OPTIONS (
      url 'jdbc:postgresql://<jdbc_url>/<database>',
      dbtable '<schema>.<table>',
      user '<username>',
      password '<password>'
    );
    
    SELECT * FROM test;

    次の表にパラメーターを示します。

    パラメーター

    説明

    url

    JDBC 接続文字列。PostgreSQL ホストアドレス、ポート、データベース名が含まれます。

    フォーマット jdbc:postgresql://<jdbc_url>/<database> を使用し、プレースホルダーを実際の値に置き換えます。

    dbtable

    読み取るデータベーステーブルの名前。フォーマット <schema>.<table> を使用します。

    user

    PostgreSQL データベースのユーザー名。

    説明

    ユーザーはターゲットテーブルに対する読み取り権限を持っている必要があります。

    password

    PostgreSQL データベースのパスワード。

    テーブルの内容が正しく返された場合、接続は成功です。

    image

  3. データを挿入します。 次のコマンドを使用して、テーブルにデータを挿入します。

    INSERT INTO test VALUES(4, 'd'),(5, 'e');
    SELECT * FROM test;

    挿入されたデータが正しく返された場合、書き込み操作は成功です。

    image

方法 2: Notebook セッションを使用する

  1. [セッションマネージャー] で Notebook セッションを作成し、事前設定された [ネットワーク接続] を選択します。詳細については、「Notebook セッションの作成」をご参照ください。

  2. Notebook ジョブを作成します。 [データ開発] で、Python > Notebook ジョブを作成し、テスト用に次の Python コードを使用します。

    df = spark.read \
      .format("jdbc") \
      .option("url", "jdbc:postgresql://<jdbc_url>/<database>") \
      .option("dbtable", "<schema>.<table>") \
      .option("user", "<username>") \
      .option("password", "<password>") \
      .load()
    df.show()

    テーブルの内容が正しく返された場合、接続は成功です。

    image

  3. データを挿入します。 次のコードを使用して、テーブルにデータを挿入します。

    df = spark.createDataFrame([(6, 'f'), (7, 'g')], ["id", "name"])
    df.write \
      .format("jdbc") \
      .mode("append") \
      .option("url", "jdbc:postgresql://<jdbc_url>/<database>") \
      .option("dbtable", "<schema>.<table>") \
      .option("user", "<username>") \
      .option("password", "<password>") \
      .save()
    df.show()

    mode("append") パラメーターは、書き込みモードを `append` に設定します。これにより、既存のデータを上書きまたは削除することなく、新しいデータがターゲットテーブルに追加されます。

    挿入されたデータが正しく返された場合、書き込み操作は成功です。

    image

方法 3: Spark バッチジョブを使用する

  1. テストコードを記述します。 次の Scala コードをコンパイルし、JAR ファイルにパッケージ化します。

    package spark.test
    
    import org.apache.spark.sql.SparkSession
    
    object Main {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder()
          .appName("test")
          .getOrCreate()
          
        val newRows = spark.createDataFrame(Seq((6, "f"), (7, "g"))).toDF("id", "name")
        newRows.write.format("jdbc")
          .mode("append")
          .option("url", "jdbc:postgresql://<jdbc_url>/<database>")
          .option("dbtable", "<schema>.<table>")
          .option("user", "<username>")
          .option("password", "<password>")
          .save()
    
        spark.read.format("jdbc")
          .option("url", "jdbc:postgresql://<jdbc_url>/<database>")
          .option("dbtable", "<schema>.<table>")
          .option("user", "<username>")
          .option("password", "<password>")
          .load()
          .show()
    
        spark.stop()
      }
    }
  2. バッチジョブを作成します。[データ開発]バッチジョブ > JAR タスクを作成し、テスト用に次のパラメーターを設定します。

    • メイン JAR リソース: パッケージ化された JAR ファイルのパスを選択または入力します。

    • メインクラス: spark.test.Main

    • ネットワーク接続: 事前設定されたネットワーク接続を選択します。

  3. 結果を表示します。タスクの実行後、[実行記録] エリアで [ログの表示] をクリックし、[ドライバーログ][Stdout] タブで対応する PostgreSQL テーブルの内容を表示します。