Realtime Compute では、ソーステーブルはストリーミングデータストレージテーブルです。 ストリーミングデータストレージは、Realtime Compute の実行をドライブするための入力を提供します。 したがって、各 Realtime Compute ジョブには少なくとも 1 つのストリーミングデータストレージテーブルが必要です。
構文
CREATE TABLE tableName
(columnName dataType [, columnName dataType ]*)
[ WITH (propertyName=propertyValue [, propertyName=propertyValue ]*) ];
例
CREATE TABLE metaq_stream(
x VARCHAR,
y VARCHAR,
z VARCHAR
) WITH (
type='mq',
topic='<yourTopicName>',
endpoint='<yourEndpoint>',
pullIntervalMs='1000',
accessId='<yourAccessId>',
accessKey='<yourAccessSecret>',
startMessageOffset='1000',
consumerGroup='yourConsumerGroup',
fieldDelimiter='|'
);
ソーステーブルの属性フィールドの取得
- ソーステーブルの属性フィールドを取得するための構文
Realtime Compute は、ソーステーブルの DDL 文に
HEADER
というキーワードを提供します。 このキーワードを使用して、ソーステーブルの属性フィールドを取得できます。CREATE TABLE sourcetable ( `timestamp` VARCHAR HEADER, name VARCHAR, MsgID VARCHAR )WITH( type='sls' );
前の例の`timestamp`
フィールドは、データ属性フィールドから値を読み取るHEADER
として定義されています。 このフィールドは、後で共通フィールドとして使用できます。注 さまざまなタイプのソーステーブル (DataHub、Log Service、MQ ソーステーブルなど) には、さまざまなデフォルト属性フィールドがあります。 一部のソーステーブルは、カスタム属性フィールドもサポートしています。 詳細については、対応するソーステーブルタイプのドキュメントをご参照ください。 - ソーステーブルの属性フィールドを取得する例
以下に、Log Service のソーステーブルの属性フィールドを取得する方法の例を示します。 現在、Log Service はデフォルトで次の 3 つの属性フィールドをサポートしています。
フィールド 説明 __source__
メッセージソース。 __topic__
メッセージトピック。 __timestamp__
ログが生成された時刻。 注 属性フィールドを取得するには、最初に通常のロジックに従ってフィールドを宣言する必要があります。 次に、キーワードHEADER
をタイプ宣言の最後に追加します。例:
- テストデータ
__topic__: ens_altar_flow result: {"MsgID":"ems0a","Version":"0.0.1"}
- テストステートメント
CREATE TABLE sls_log ( __topic__ VARCHAR HEADER, result VARCHAR )WITH( type = 'sls' ); CREATE TABLE sls_out ( name varchar, MsgID varchar, Version varchar )WITH( type ='RDS' ); INSERT INTO sls_out SELECT __topic__, JSON_VALUE(result,'$. MsgID'), JSON_VALUE(result,'$. Version') FROM sls_log
- テスト結果
name(VARCHAT) MsgID(VARCHAT) Version(VARCHAT) ens_altar_flow ems0a 0.0.1
- テストデータ
ウィンドウ関数を含むソーステーブル
Realtime Compute は、2 つの時間属性 (Event Time と Processing Time) に基づくデータのウィンドウ集計をサポートしています。 ウィンドウ関数を含む Realtime Compute ジョブの場合、「透かし」と「計算列」がソーステーブルの宣言に必要です。 Realtime Compute の時間属性ベースの集計操作の詳細については、「時間属性」をご参照ください。
サポートされているソーステーブルタイプ
Realtime Compute では、複数のタイプのソーステーブルを作成できます。 詳細については、以下のトピックをご参照ください。