すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:ストリームインジェスチョン

最終更新日:Jan 11, 2025

ほとんどの場合、ストリームインジェスチョンをサポートするシステムは、ストリーミングデータを小さなバッチでストレージシステムに小さなファイルとして書き込み、定期的に小さなファイルをマージします。たとえば、HiveとDelta Lakeは前述の方法でストリーミングデータを取り込みます。Kuduもストリームインジェスチョンをサポートしています。ただし、Kuduは独自のストレージアーキテクチャを持ち、ビッグデータストレージシステム上に構築されていません。このトピックでは、Kafkaからストリーミングデータを取り込む方法について説明します。

前提条件

  • E-MapReduce(EMR)コンソールで、Delta Lakeサービスを含むカスタムクラスターまたはDataLakeクラスターが作成されています。詳細については、「クラスターの作成」をご参照ください。

  • Kafkaサービスを含むDataflowクラスターが作成されています。詳細については、「クラスターの作成」をご参照ください。

制限事項

DataLakeクラスターまたはカスタムクラスターとDataflowクラスターは、同じ仮想プライベートクラウド(VPC)の同じvSwitchにデプロイする必要があります。

ストリームインジェスチョンの進化

フェーズ

説明

以前

ストリーミングデータを取り込むには、ファクトテーブルを時間ごとに手動で細かくパーティション化する必要がありました。たとえば、5分ごとにパーティションを作成できます。データがパーティションに書き込まれた後、INSERT OVERWRITEステートメントを実行して、パーティション内の小さなファイルを1つ以上の大型ファイルにマージし、大型ファイルをパーティションに書き戻す必要があります。ただし、次の問題が発生する可能性があります:

  • 読み取りと書き込みが分離されていないため、読み取りエラーやデータの精度に関する問題が発生しやすくなります。

  • ストリーミングジョブは、Exactly-Onceセマンティクスを保証できません。ストリームインジェスチョンジョブが失敗した場合、データが重複または省略されていないことを確認するために手動による介入が必要です。Spark Streamingは、At-Least-Onceセマンティクスは保証できますが、Exactly-Onceセマンティクスは保証できません。

Hive 0.13以降ではトランザクションがサポートされています。Hive 2.0以降では、Hive Streaming機能が提供され、ストリームインジェスチョンがサポートされています。ただし、Hive Streamingは、次の理由により広く使用されていません。

  • Hiveは、トランザクション機能を実装するために基になるファイルを変更します。その結果、共通ストレージ形式のデータはHiveによってのみ読み取ることができます。Spark SQLやPrestoなどの他のツールを使用して、ストレージ形式のデータを読み取ることはできません。

  • Hiveトランザクションは、Optimized Row Columnar(ORC)形式のみをサポートします。

  • Hiveはマージオンリードモードを使用します。Hiveは、データの読み取り中に小さなファイルをソートしてマージする必要があります。小さなファイルの数が増えると、読み取りパフォーマンスが急激に低下します。したがって、できるだけ早く小さなファイルをマージする必要があります。ただし、小さなファイルは多くの場合マージに失敗し、ビジネス効率に影響を与えます。

  • Hive Streamingは、データウェアハウスシナリオでのみ使用できます。データレイクシナリオでは、データソースとデータ要件が多様であるため、Hive Streamingは適していません。

現在

Delta Lakeを使用して、ストリーミングデータを取り込むことができます。手順:

  1. テーブルを作成します。

  2. Spark Streamingジョブを開始して、ストリーミングデータをテーブルに書き込みます。

  3. 定期的にデータを最適化します。たとえば、データがパーティションに書き込まれた後に、定期的にデータを最適化できます。

  4. 定期的に履歴データをバキュームします。たとえば、毎日データをバキュームできます。

この例では、データはKafkaから読み取られ、Deltaテーブルに書き込まれます。

  1. SSHモードでDataflowクラスターにログオンします。詳細については、「クラスターへのログオン」をご参照ください。

  2. 次のコマンドを実行して、Kafkaトピックを作成します。

    sudo su - kafka
    kafka-topics.sh --partitions 3 --replication-factor 2 --bootstrap-server core-1-1:9092 --topic delta_stream_sample --create
    説明

    core-1-1は、Dataflowクラスター内のブローカーノードの内部IPアドレスまたはホスト名を示します。

  3. Kafkaに継続的にデータを送信するPythonスクリプトを準備します。

    #! /usr/bin/env python3
    
    import json
    import time
    
    from kafka import KafkaProducer
    from kafka.errors import KafkaError
    
    bootstrap = ['core-1-1:9092']
    topic = 'delta_stream_sample'
    
    def gnerator():
        id = 0
        line = {}
        while True:
            line['id'] = id
            line['date'] = '2019-11-11'
            line['name'] = 'Robert'
            line['sales'] = 123
            yield line
            id = id + 1  
    
    def sendToKafka():
        producer = KafkaProducer(bootstrap_servers=bootstrap)
    
        for line in gnerator():
            data = json.dumps(line).encode('utf-8')
    
            # 非同期で送信されます (デフォルト)
            future = producer.send(topic, data)
    
            # 同期送信の場合はブロックします
            try:
                record_metadata = future.get(timeout=10)
            except KafkaError as e:
                # produceリクエストが失敗した場合の処理を決定します
                pass
            time.sleep(0.1)
    
    sendToKafka()

    便宜上、すべてのデータレコードは ID を除いて同じです。

    {"id": 0, "date": "2019-11-11", "name": "Robert", "sales": 123}
    {"id": 1, "date": "2019-11-11", "name": "Robert", "sales": 123}
    {"id": 2, "date": "2019-11-11", "name": "Robert", "sales": 123}
    {"id": 3, "date": "2019-11-11", "name": "Robert", "sales": 123}
    {"id": 4, "date": "2019-11-11", "name": "Robert", "sales": 123}
    {"id": 5, "date": "2019-11-11", "name": "Robert", "sales": 123}
  4. Spark Streamingジョブを開始して、Kafkaからデータを読み取り、Deltaテーブルに書き込みます。

    1. Sparkコードを記述します。

      Scalaのサンプルコード:

      import org.apache.spark.SparkConf
      import org.apache.spark.sql.{SparkSession, functions}
      import org.apache.spark.sql.types.{DataTypes, StructField}
      
      object StreamToDelta {
        def main(args: Array[String]): Unit = {
          val targetDir = "/tmp/delta_table"
          val checkpointLocation = "/tmp/delta_table_checkpoint"
          // 192.168.XX.XX はKafkaの内部IPアドレスです。
          val bootstrapServers = "192.168.XX.XX:9092"
          val topic = "delta_stream_sample"
      
          val schema = DataTypes.createStructType(Array[StructField](
            DataTypes.createStructField("id", DataTypes.LongType, false),
            DataTypes.createStructField("date", DataTypes.DateType, false),
            DataTypes.createStructField("name", DataTypes.StringType, false),
            DataTypes.createStructField("sales", DataTypes.StringType, false)))
      
          val sparkConf = new SparkConf()
      
          // StreamToDelta はScalaのクラス名です。
          val spark = SparkSession
            .builder()
            .config(sparkConf)
            .appName("StreamToDelta")
            .getOrCreate()
      
          val lines = spark
            .readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", bootstrapServers)
            .option("subscribe", topic)
            .option("maxOffsetsPerTrigger", 1000)
            .option("startingOffsets", "earliest")
            .option("failOnDataLoss", value = false)
            .load()
            .select(functions.from_json(functions.col("value").cast("string"), schema).as("json"))
            .select("json.*")
      
          val query = lines.writeStream
            .outputMode("append")
            .format("delta")
            .option("checkpointLocation", checkpointLocation)
            .start(targetDir)
      
          query.awaitTermination()
        }
      }
    2. コードをパッケージ化し、DataLakeクラスターにデプロイします。

      1. オンプレミスマシンでコードをデバッグした後、次のコマンドを実行してコードをパッケージ化します。

        mvn clean install
      2. SSHモードでDataLakeクラスターにログオンします。詳細については、「クラスターへのログオン」をご参照ください。

      3. JARパッケージをDataLakeクラスターにアップロードします。

        この例では、JARパッケージはDataLakeクラスターのルートディレクトリにアップロードされます。

    3. Spark Streamingジョブを送信して実行します。

      spark-submitコマンドを実行して、Spark Streamingジョブを送信して実行します。

      spark-submit \
       --master yarn \
       --deploy-mode cluster \
       --driver-memory 1g \
       --executor-cores 2 \
       --executor-memory 3g \
       --num-executors 1 \
       --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 \
       --class com.aliyun.delta.StreamToDelta \
       delta-demo-1.0.jar
      説明

      この例では、delta-demo-1.0.jarという名前のJARパッケージが使用されています。--classパラメータの値とJARパッケージの名前は、ビジネス要件に基づいて変更できます。

  5. spark-shellクライアントを起動し、データがDeltaテーブルに書き込まれているかどうかを確認します。

    • Scala

      1. 次のコマンドを実行して、spark-shellクライアントに移動します。

        spark-shell --master local 
      2. 次のScalaステートメントを実行して、データをクエリします。

        val df = spark.read.format("delta").load("/tmp/delta_table")
        df.select("*").orderBy("id").show(10000)
    • SQL

      1. 次のコマンドを実行して、streaming-sqlクライアントに移動します。

        streaming-sql --master local
      2. 次のSQLステートメントを実行して、データをクエリします。

        SELECT * FROM delta_table ORDER BY id LIMIT 10000;

        結果は、2,285件のデータレコードがDeltaテーブルに書き込まれたことを示しています。

        |2295|2019-11-11|Robert|  123|
        |2296|2019-11-11|Robert|  123|
        |2297|2019-11-11|Robert|  123|
        |2275|2019-11-11|Robert|  123|
        |2276|2019-11-11|Robert|  123|
        |2277|2019-11-11|Robert|  123|
        |2278|2019-11-11|Robert|  123|
        |2279|2019-11-11|Robert|  123|
        |2280|2019-11-11|Robert|  123|
        |2281|2019-11-11|Robert|  123|
        |2282|2019-11-11|Robert|  123|
        |2283|2019-11-11|Robert|  123|
        |2284|2019-11-11|Robert|  123|
        |2285|2019-11-11|Robert|  123|
        +----+----------+------+-----+

Exactly-Onceセマンティクスのテスト

Spark Streamingジョブを停止し、ジョブを再起動します。Deltaテーブルを読み取ります。データの読み取りが以前に停止した位置からデータが読み取られる場合、Exactly-Onceセマンティクスが実現されます。

  • Scala

    df.select("*").orderBy("id").show(10000)
  • SQL

    SELECT * FROM delta_table ORDER BY id LIMIT 10000;
    |2878|2019-11-11|Robert|  123|
    |2879|2019-11-11|Robert|  123|
    |2880|2019-11-11|Robert|  123|
    |2881|2019-11-11|Robert|  123|
    |2882|2019-11-11|Robert|  123|
    |2883|2019-11-11|Robert|  123|
    |2884|2019-11-11|Robert|  123|
    |2885|2019-11-11|Robert|  123|
    |2886|2019-11-11|Robert|  123|
    |2887|2019-11-11|Robert|  123|
    |2888|2019-11-11|Robert|  123|
    |2889|2019-11-11|Robert|  123|
    |2890|2019-11-11|Robert|  123|
    |2891|2019-11-11|Robert|  123|