ここでは、Spark SQL の SCAN 構文について説明します。 SCAN 構文は、E-MapReduce V3.23.0 以降のバージョンで使用できます。

SCAN 構文の必要性

SCAN 構文は、テーブルの読み取り方法を定義します。 基本的には、SCAN 構文は論理テーブルを作成し、パラメーターを指定することで、ソーステーブルの読み取り方法を定義します。 なぜ、SCAN 構文を使用して、テーブルの定義後に、テーブルを読み取る方法を定義する必要があるのでしょうか。 Spark DataSource V2 のデータモデル設計に基づくと、テーブルはバッチデータソースやストリーミングデータソースに限られません。 さらに、Spark はマルチカタログ設計に対応しており、外部カタログを使用して、さまざまなデータソースからのデータを解析します。 つまり、

  • Spark DataSource V1 に基づいてテーブルを定義する場合、テーブルの基本情報を定義するだけでよく、テーブルの読み取り方法に関するパラメーターを定義する必要はありません。 たとえば、Kafka データソーステーブルを定義する場合、Kafka 接続アドレスとトピックを定義するだけで済みます。 テーブル定義で maxOffsetsPerTrigger (トリガーの間隔ごとに処理されるオフセット最大値のレート制限) などのパラメーターを指定する必要はありません。 これらのパラメーターは実行時に指定されます。
  • 外部カタログを使用してデータソースのメタデータを提供する場合、Spark セッションで明示的にテーブルを作成する必要はありません。 これらを考慮し、データソースの読み取り方法を定義する SCAN 構文を提供します。

この構文は、Spark DataSource V2 のデータモデル設計をサポートする試みです。 また、SCAN 構文は Spark SQL にバッチクエリとストリーミングクエリを統合的に実装しています。 たとえば、Kafka データソーステーブルを定義し、バッチ読み取り用とストリーミング読み取り用に 2 つの SCAN 文を実行できます。

注意事項:

  • SCAN 構文は、テーブルの読み取り方法を定義します。 基本的には、SCAN 構文は論理テーブルを作成し、パラメーターを指定することで、ソーステーブルの読み取り方法を定義します。 この論理テーブルは、Spark では一時的なビューインスタンスと見なされます。 Spark セッションを終了すると、SCAN 定義は自動的に削除されます。
  • SCAN 構文で定義されたビューは、データの出力先テーブルではなく、データソーステーブルとしてのみ使用できます。
  • このソーステーブルは直接処理することができますが、バッチ読み取りのみ実行でき、ストリーミング読み取りはできません。 ソーステーブルにバッチ読み取りインターフェイスがない場合は、エラーが返されます。

構文

CREATE SCAN tbName_alias
ON tbName
USING queryType
OPTIONS (propertyName=propertyValue[,propertyName=propertyValue]*)

queryType には次のいずれかを設定できます。

  • BATCH:ソーステーブル tbName に対してバッチ読み取りを実行し、一時ビュー tbName_alias を定義します。
  • STREAM:ソーステーブル tbName でストリーミング読み取りを実行し、一時ビュー tbName_alias を定義します。

OPTIONS 句ではデータソースを読み取る際の実行時パラメーターを定義できます。 実行時パラメーターは、データの読み取り方法を定義します。 これはデータソースによって異なります。 たとえば、maxOffsetsPerTrigger は Kafka データソースの実行時パラメーターです。 実行時パラメーターについての詳細は、「データソース」をご参照ください。

次の表には、各データソース特有のパラメーター以外に、データソースに設定できる一般的なパラメーターを示します。

パラメーター 説明 デフォルト値
watermark.column イベント時間を表すテーブルの列。 なし
watermark.delayThreshold 最後のデータが処理されてから、遅延データが破棄されるまでにシステムが待機する最小時間。 1 分や 5 時間などに設定します。 なし

  • Log Service データソーステーブルを作成します。
    spark-sql> CREATE TABLE loghub_table_intput_test(content string)
             > USING loghub
             > OPTIONS
             > (...)
  • BATCH を使用して Log Service テーブルのデータをオフラインで処理し、現在時刻までに生成されたデータレコードの数をカウントします。
    spark-sql> CREATE SCAN loghub_table_intput_test_batch
             > ON loghub_table_intput_test
             > USING BATCH;
    spark-sql> SELECT COUNT(*) FROM loghub_table_intput_test_batch;
  • STREAM を使用して Log Service テーブルデータを処理します。
    spark-sql> CREATE TABLE loghub_table_output_test(content string)
             > USING loghub
             > OPTIONS
             > (...)
    
    spark-sql> CREATE SCAN loghub_table_intput_test_stream
             > ON loghub_table_intput_test
             > USING STREAM
             > OPTIONS(
             > "watermark.column"="data_time",
             > "watermark.delayThreshold"="2 minutes")
    
    
    -- The following example shows an invalid SELECT operation on a streaming table, which results in an error.
    spark-sql> SELECT COUNT(*) FROM loghub_table_test_stream;
    Error in query: Queries with streaming sources must be executed with writeStream.start();; 
    spark-sql> INSERT INTO loghub_table_output_test SELECT content FROM loghub_table_intput_test_stream;