このトピックでは、Spark を使用して MySQL にアクセスする方法について説明します。
Spark RDD を使用して MySQL にアクセスする
サンプルコード:
val input = getSparkContext.textFile(inputPath, numPartitions)
input.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _)
.mapPartitions(e => {
// 接続、準備済みステートメントなどの変数を宣言します。
var conn: Connection = null
var ps: PreparedStatement = null
val sql = s"insert into $tbName(word, count) values (?, ?)"
try {
// MySQL データベースへの接続を確立します。
conn = DriverManager.getConnection(s"jdbc:mysql://$dbUrl:$dbPort/$dbName", dbUser, dbPwd)
// SQL ステートメントを準備します。
ps = conn.prepareStatement(sql)
// 各ペアを反復処理し、準備済みステートメントに値を設定して実行します。
e.foreach(pair => {
ps.setString(1, pair._1)
ps.setLong(2, pair._2)
ps.executeUpdate()
})
// 準備済みステートメントと接続を閉じます。
ps.close()
conn.close()
} catch {
// 例外が発生した場合は、スタックトレースを出力します。
case e: Exception => e.printStackTrace()
} finally {
// リソースが確実に閉じられるようにします。
if (ps != null) {
ps.close()
}
if (conn != null) {
conn.close()
}
}
Iterator.empty
}).count()Spark SQL ステートメントを使用して MySQL にアクセスする
サンプル SQL ステートメント:
spark-sql --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*,mysql-connector-java-8.0.30.jar 説明
mysql-connector-java-8.0.30.jarファイルには、MySQL JDBC ドライバーが含まれています。MySQL JDBC ドライバーのバージョンとパスを指定する必要があります。- MySQL にアクセスするために使用されるデータソースのタイプは、
/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*から取得できます。E-MapReduce(EMR)クラスターで Spark 2 を使用している場合は、上記のステートメントのspark3をspark2に変更する必要があります。
次の例は、テーブルを作成し、テーブルからデータを読み取る方法を示しています。
// テーブルを作成します。
create table test1(id int)
using jdbc2
options(
url="jdbc:mysql://mysql_url/test_db?user=root&password=root",
dbtable="test1",
driver="com.mysql.jdbc.Driver");
// MySQL からデータを読み取ります。
select * from test1;
// MySQL にデータを書き込みます。
insert into test1 values(1);参照
- JDBC To other databases (他のデータベースへの JDBC)
- MySQL JDBC driver (MySQL JDBC ドライバー)