ほとんどの場合、ストリームインジェスチョンをサポートするシステムは、ストリーミングデータを小さなバッチでストレージシステムに小さなファイルとして書き込み、定期的に小さなファイルをマージします。たとえば、HiveとDelta Lakeは前述の方法でストリーミングデータを取り込みます。Kuduもストリームインジェスチョンをサポートしています。ただし、Kuduは独自のストレージアーキテクチャを持ち、ビッグデータストレージシステム上に構築されていません。このトピックでは、Kafkaからストリーミングデータを取り込む方法について説明します。
前提条件
制限事項
DataLakeクラスターまたはカスタムクラスターとDataflowクラスターは、同じ仮想プライベートクラウド(VPC)の同じvSwitchにデプロイする必要があります。
ストリームインジェスチョンの進化
フェーズ | 説明 |
以前 | ストリーミングデータを取り込むには、ファクトテーブルを時間ごとに手動で細かくパーティション化する必要がありました。たとえば、5分ごとにパーティションを作成できます。データがパーティションに書き込まれた後、INSERT OVERWRITEステートメントを実行して、パーティション内の小さなファイルを1つ以上の大型ファイルにマージし、大型ファイルをパーティションに書き戻す必要があります。ただし、次の問題が発生する可能性があります:
Hive 0.13以降ではトランザクションがサポートされています。Hive 2.0以降では、Hive Streaming機能が提供され、ストリームインジェスチョンがサポートされています。ただし、Hive Streamingは、次の理由により広く使用されていません。
|
現在 | Delta Lakeを使用して、ストリーミングデータを取り込むことができます。手順:
|
例
この例では、データはKafkaから読み取られ、Deltaテーブルに書き込まれます。
SSHモードでDataflowクラスターにログオンします。詳細については、「クラスターへのログオン」をご参照ください。
次のコマンドを実行して、Kafkaトピックを作成します。
sudo su - kafka kafka-topics.sh --partitions 3 --replication-factor 2 --bootstrap-server core-1-1:9092 --topic delta_stream_sample --create
説明core-1-1
は、Dataflowクラスター内のブローカーノードの内部IPアドレスまたはホスト名を示します。Kafkaに継続的にデータを送信するPythonスクリプトを準備します。
#! /usr/bin/env python3 import json import time from kafka import KafkaProducer from kafka.errors import KafkaError bootstrap = ['core-1-1:9092'] topic = 'delta_stream_sample' def gnerator(): id = 0 line = {} while True: line['id'] = id line['date'] = '2019-11-11' line['name'] = 'Robert' line['sales'] = 123 yield line id = id + 1 def sendToKafka(): producer = KafkaProducer(bootstrap_servers=bootstrap) for line in gnerator(): data = json.dumps(line).encode('utf-8') # 非同期で送信されます (デフォルト) future = producer.send(topic, data) # 同期送信の場合はブロックします try: record_metadata = future.get(timeout=10) except KafkaError as e: # produceリクエストが失敗した場合の処理を決定します pass time.sleep(0.1) sendToKafka()
便宜上、すべてのデータレコードは
ID
を除いて同じです。{"id": 0, "date": "2019-11-11", "name": "Robert", "sales": 123} {"id": 1, "date": "2019-11-11", "name": "Robert", "sales": 123} {"id": 2, "date": "2019-11-11", "name": "Robert", "sales": 123} {"id": 3, "date": "2019-11-11", "name": "Robert", "sales": 123} {"id": 4, "date": "2019-11-11", "name": "Robert", "sales": 123} {"id": 5, "date": "2019-11-11", "name": "Robert", "sales": 123}
Spark Streamingジョブを開始して、Kafkaからデータを読み取り、Deltaテーブルに書き込みます。
Sparkコードを記述します。
Scalaのサンプルコード:
import org.apache.spark.SparkConf import org.apache.spark.sql.{SparkSession, functions} import org.apache.spark.sql.types.{DataTypes, StructField} object StreamToDelta { def main(args: Array[String]): Unit = { val targetDir = "/tmp/delta_table" val checkpointLocation = "/tmp/delta_table_checkpoint" // 192.168.XX.XX はKafkaの内部IPアドレスです。 val bootstrapServers = "192.168.XX.XX:9092" val topic = "delta_stream_sample" val schema = DataTypes.createStructType(Array[StructField]( DataTypes.createStructField("id", DataTypes.LongType, false), DataTypes.createStructField("date", DataTypes.DateType, false), DataTypes.createStructField("name", DataTypes.StringType, false), DataTypes.createStructField("sales", DataTypes.StringType, false))) val sparkConf = new SparkConf() // StreamToDelta はScalaのクラス名です。 val spark = SparkSession .builder() .config(sparkConf) .appName("StreamToDelta") .getOrCreate() val lines = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option("subscribe", topic) .option("maxOffsetsPerTrigger", 1000) .option("startingOffsets", "earliest") .option("failOnDataLoss", value = false) .load() .select(functions.from_json(functions.col("value").cast("string"), schema).as("json")) .select("json.*") val query = lines.writeStream .outputMode("append") .format("delta") .option("checkpointLocation", checkpointLocation) .start(targetDir) query.awaitTermination() } }
コードをパッケージ化し、DataLakeクラスターにデプロイします。
オンプレミスマシンでコードをデバッグした後、次のコマンドを実行してコードをパッケージ化します。
mvn clean install
SSHモードでDataLakeクラスターにログオンします。詳細については、「クラスターへのログオン」をご参照ください。
JARパッケージをDataLakeクラスターにアップロードします。
この例では、JARパッケージはDataLakeクラスターのルートディレクトリにアップロードされます。
Spark Streamingジョブを送信して実行します。
spark-submitコマンドを実行して、Spark Streamingジョブを送信して実行します。
spark-submit \ --master yarn \ --deploy-mode cluster \ --driver-memory 1g \ --executor-cores 2 \ --executor-memory 3g \ --num-executors 1 \ --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 \ --class com.aliyun.delta.StreamToDelta \ delta-demo-1.0.jar
説明この例では、delta-demo-1.0.jarという名前のJARパッケージが使用されています。--classパラメータの値とJARパッケージの名前は、ビジネス要件に基づいて変更できます。
spark-shellクライアントを起動し、データがDeltaテーブルに書き込まれているかどうかを確認します。
Scala
次のコマンドを実行して、spark-shellクライアントに移動します。
spark-shell --master local
次のScalaステートメントを実行して、データをクエリします。
val df = spark.read.format("delta").load("/tmp/delta_table") df.select("*").orderBy("id").show(10000)
SQL
次のコマンドを実行して、streaming-sqlクライアントに移動します。
streaming-sql --master local
次のSQLステートメントを実行して、データをクエリします。
SELECT * FROM delta_table ORDER BY id LIMIT 10000;
結果は、2,285件のデータレコードがDeltaテーブルに書き込まれたことを示しています。
|2295|2019-11-11|Robert| 123| |2296|2019-11-11|Robert| 123| |2297|2019-11-11|Robert| 123| |2275|2019-11-11|Robert| 123| |2276|2019-11-11|Robert| 123| |2277|2019-11-11|Robert| 123| |2278|2019-11-11|Robert| 123| |2279|2019-11-11|Robert| 123| |2280|2019-11-11|Robert| 123| |2281|2019-11-11|Robert| 123| |2282|2019-11-11|Robert| 123| |2283|2019-11-11|Robert| 123| |2284|2019-11-11|Robert| 123| |2285|2019-11-11|Robert| 123| +----+----------+------+-----+
Exactly-Onceセマンティクスのテスト
Spark Streamingジョブを停止し、ジョブを再起動します。Deltaテーブルを読み取ります。データの読み取りが以前に停止した位置からデータが読み取られる場合、Exactly-Onceセマンティクスが実現されます。
Scala
df.select("*").orderBy("id").show(10000)
SQL
SELECT * FROM delta_table ORDER BY id LIMIT 10000;
|2878|2019-11-11|Robert| 123| |2879|2019-11-11|Robert| 123| |2880|2019-11-11|Robert| 123| |2881|2019-11-11|Robert| 123| |2882|2019-11-11|Robert| 123| |2883|2019-11-11|Robert| 123| |2884|2019-11-11|Robert| 123| |2885|2019-11-11|Robert| 123| |2886|2019-11-11|Robert| 123| |2887|2019-11-11|Robert| 123| |2888|2019-11-11|Robert| 123| |2889|2019-11-11|Robert| 123| |2890|2019-11-11|Robert| 123| |2891|2019-11-11|Robert| 123|