ここでは、Spark SQL の SCAN 構文について説明します。 SCAN 構文は、E-MapReduce V3.23.0 以降のバージョンで使用できます。
SCAN 構文の必要性
SCAN 構文は、テーブルの読み取り方法を定義します。 基本的には、SCAN 構文は論理テーブルを作成し、パラメーターを指定することで、ソーステーブルの読み取り方法を定義します。 なぜ、SCAN 構文を使用して、テーブルの定義後に、テーブルを読み取る方法を定義する必要があるのでしょうか。 Spark DataSource V2 のデータモデル設計に基づくと、テーブルはバッチデータソースやストリーミングデータソースに限られません。 さらに、Spark はマルチカタログ設計に対応しており、外部カタログを使用して、さまざまなデータソースからのデータを解析します。 つまり、
- Spark DataSource V1 に基づいてテーブルを定義する場合、テーブルの基本情報を定義するだけでよく、テーブルの読み取り方法に関するパラメーターを定義する必要はありません。 たとえば、Kafka データソーステーブルを定義する場合、Kafka 接続アドレスとトピックを定義するだけで済みます。 テーブル定義で maxOffsetsPerTrigger (トリガーの間隔ごとに処理されるオフセット最大値のレート制限) などのパラメーターを指定する必要はありません。 これらのパラメーターは実行時に指定されます。
- 外部カタログを使用してデータソースのメタデータを提供する場合、Spark セッションで明示的にテーブルを作成する必要はありません。 これらを考慮し、データソースの読み取り方法を定義する SCAN 構文を提供します。
この構文は、Spark DataSource V2 のデータモデル設計をサポートする試みです。 また、SCAN 構文は Spark SQL にバッチクエリとストリーミングクエリを統合的に実装しています。 たとえば、Kafka データソーステーブルを定義し、バッチ読み取り用とストリーミング読み取り用に 2 つの SCAN 文を実行できます。
注意事項:
- SCAN 構文は、テーブルの読み取り方法を定義します。 基本的には、SCAN 構文は論理テーブルを作成し、パラメーターを指定することで、ソーステーブルの読み取り方法を定義します。 この論理テーブルは、Spark では一時的なビューインスタンスと見なされます。 Spark セッションを終了すると、SCAN 定義は自動的に削除されます。
- SCAN 構文で定義されたビューは、データの出力先テーブルではなく、データソーステーブルとしてのみ使用できます。
- このソーステーブルは直接処理することができますが、バッチ読み取りのみ実行でき、ストリーミング読み取りはできません。 ソーステーブルにバッチ読み取りインターフェイスがない場合は、エラーが返されます。
構文
CREATE SCAN tbName_alias
ON tbName
USING queryType
OPTIONS (propertyName=propertyValue[,propertyName=propertyValue]*)
queryType には次のいずれかを設定できます。
- BATCH:ソーステーブル tbName に対してバッチ読み取りを実行し、一時ビュー tbName_alias を定義します。
- STREAM:ソーステーブル tbName でストリーミング読み取りを実行し、一時ビュー tbName_alias を定義します。
OPTIONS 句ではデータソースを読み取る際の実行時パラメーターを定義できます。 実行時パラメーターは、データの読み取り方法を定義します。 これはデータソースによって異なります。 たとえば、maxOffsetsPerTrigger は Kafka データソースの実行時パラメーターです。 実行時パラメーターについての詳細は、「データソース」をご参照ください。
次の表には、各データソース特有のパラメーター以外に、データソースに設定できる一般的なパラメーターを示します。
パラメーター | 説明 | デフォルト値 |
---|---|---|
watermark.column | イベント時間を表すテーブルの列。 | なし |
watermark.delayThreshold | 最後のデータが処理されてから、遅延データが破棄されるまでにシステムが待機する最小時間。 1 分や 5 時間などに設定します。 | なし |
例
- Log Service データソーステーブルを作成します。
spark-sql> CREATE TABLE loghub_table_intput_test(content string) > USING loghub > OPTIONS > (...)
- BATCH を使用して Log Service テーブルのデータをオフラインで処理し、現在時刻までに生成されたデータレコードの数をカウントします。
spark-sql> CREATE SCAN loghub_table_intput_test_batch > ON loghub_table_intput_test > USING BATCH; spark-sql> SELECT COUNT(*) FROM loghub_table_intput_test_batch;
- STREAM を使用して Log Service テーブルデータを処理します。
spark-sql> CREATE TABLE loghub_table_output_test(content string) > USING loghub > OPTIONS > (...) spark-sql> CREATE SCAN loghub_table_intput_test_stream > ON loghub_table_intput_test > USING STREAM > OPTIONS( > "watermark.column"="data_time", > "watermark.delayThreshold"="2 minutes") -- The following example shows an invalid SELECT operation on a streaming table, which results in an error. spark-sql> SELECT COUNT(*) FROM loghub_table_test_stream; Error in query: Queries with streaming sources must be executed with writeStream.start();; spark-sql> INSERT INTO loghub_table_output_test SELECT content FROM loghub_table_intput_test_stream;