すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Spark を使用して MySQL にアクセスする

最終更新日:Jan 11, 2025

このトピックでは、Spark を使用して MySQL にアクセスする方法について説明します。

Spark RDD を使用して MySQL にアクセスする

サンプルコード:
val input = getSparkContext.textFile(inputPath, numPartitions)
    input.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _)
      .mapPartitions(e => {
        // 接続、準備済みステートメントなどの変数を宣言します。
        var conn: Connection = null
        var ps: PreparedStatement = null
        val sql = s"insert into $tbName(word, count) values (?, ?)"
        try {
          // MySQL データベースへの接続を確立します。
          conn = DriverManager.getConnection(s"jdbc:mysql://$dbUrl:$dbPort/$dbName", dbUser, dbPwd)
          // SQL ステートメントを準備します。
          ps = conn.prepareStatement(sql)
          // 各ペアを反復処理し、準備済みステートメントに値を設定して実行します。
          e.foreach(pair => {
            ps.setString(1, pair._1)
            ps.setLong(2, pair._2)
            ps.executeUpdate()
          })

          // 準備済みステートメントと接続を閉じます。
          ps.close()
          conn.close()
        } catch {
          // 例外が発生した場合は、スタックトレースを出力します。
          case e: Exception => e.printStackTrace()
        } finally {
          // リソースが確実に閉じられるようにします。
          if (ps != null) {
            ps.close()
          }
          if (conn != null) {
            conn.close()
          }
        }
      Iterator.empty
    }).count()

Spark SQL ステートメントを使用して MySQL にアクセスする

サンプル SQL ステートメント:
spark-sql --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*,mysql-connector-java-8.0.30.jar 
説明
  • mysql-connector-java-8.0.30.jar ファイルには、MySQL JDBC ドライバーが含まれています。MySQL JDBC ドライバーのバージョンとパスを指定する必要があります。
  • MySQL にアクセスするために使用されるデータソースのタイプは、/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/* から取得できます。E-MapReduce(EMR)クラスターで Spark 2 を使用している場合は、上記のステートメントの spark3spark2 に変更する必要があります。
次の例は、テーブルを作成し、テーブルからデータを読み取る方法を示しています。
// テーブルを作成します。
create table test1(id int)
using jdbc2
options(
  url="jdbc:mysql://mysql_url/test_db?user=root&password=root",
  dbtable="test1",
  driver="com.mysql.jdbc.Driver");

// MySQL からデータを読み取ります。
select * from test1;

// MySQL にデータを書き込みます。
insert into test1 values(1);

参照