本ページでは、Realtime Compute で MQ ソーステーブルを作成する方法について説明します。 また、CSV 形式、WITH パラメーター、およびテーブル作成処理に関連するフィールドタイプマッピングについても説明します。
MQ の概要
MQ は、Alibaba Cloud が開発し、商用利用しているプロフェッショナルメッセージミドルウェアです。 エンタープライズレベルのインターネットアーキテクチャ (Aliware) のコアプロダクトです。 MQ は、高可用性分散クラスターテクノロジーに基づいて、パブリッシュ、またはサブスクリプション、メッセージトレース、リソース統計、メッセージスケジューリング (遅延)、モニタリングとアラートを含む、高性能メッセージングクラウドサービスの完全なセットを提供します。 分散コンピューティングシナリオで、すべての非同期デカップリング機能を実装するよう構築されています。 Realtime Compute は、ソーステーブルとしての MQ テーブルの作成をサポートしています。 サンプルコードは次のとおりです。
例
create table mq_stream(
x varchar,
y varchar,
z varchar
) with (
type='mq',
topic='yourTopicName',
endpoint='yourEndpoint',
pullIntervalMs='1000',
accessId='yourAccessId',
accessKey='yourAccessSecret',
startMessageOffset='1000',
consumerGroup='yourConsumerGroup',
fieldDelimiter='|'
);
注 MQ は、データスキーマを定義する必要のない非構造化ストレージフォーマットを使用しています。 データスキーマはビジネスレイヤーで指定されます。 現在、Realtime
Compute は CSV 形式およびバイナリ形式のメッセージをサポートしています。
CSV 形式
次の CSV 形式の MQ メッセージがあると仮定します。
1,name,male
2,name,female
注 MQ メッセージは、
\ n
で区切られたゼロから複数のデータレコードを含むことができます。
Realtime Compute ジョブで MQ ソーステーブルの宣言に使用される DDL 文は次のとおりです。
create table mq_stream(
x varchar,
y varchar,
z varchar
) with (
type='mq',
topic='yourTopicName',
endpoint='yourEndpoint',
pullIntervalMs='1000',
accessId='yourAccessId',
accessKey='yourAccessSecret',
startMessageOffset='1000',
consumerGroup='yourConsumerGroup',
fieldDelimiter='|'
);
バイナリ形式
バイナリ形式のサンプルコードは以下のとおりです。create table source_table (
mess varbinary
) with (
type = 'mq',
endpoint = 'yourEndpoint',
pullIntervalMs='500',
accessId='yourAccessId',
accessKey='yourAccessSecret',
topic = 'yourTopicName',
consumerGroup='yourConsumerGroup'
);
create table out_table (
commodity varchar
)with(
type='print'
);
INSERT INTO out_table
SELECT
cast(mess as varchar)
FROM source_table
注
cast(mess as varchar)
文は、Realtime Compute V2.0 およびそれ以降でサポートされています。 Realtime Compute のバージョンが V2.0 より前の場合は、最初にアップグレードしてください。- VARBINARY タイプは 1 回だけ渡すことができます。
WITH パラメーター
名称 | 説明 | 備考 |
---|---|---|
topic | トピック名。 | なし |
endPoint | エンドポイント。 |
|
accessId | AccessKey ID。 | なし |
accessKey | AccessKey Secret。 | なし |
consumerGroup | トピックにサブスクライブする消費グループ。 | なし |
pullIntervalMs | プル間隔 | 単位:ミリ秒。 |
startTime | メッセージ消費の開始時刻。 | オプション。 |
startMessageOffset | メッセージの開始オフセット。 | オプション。 このパラメーターが設定されている場合、ロードは、オフセットによって決定されるチェックポイントから優先的に開始されます。 |
tag | サブスクリクションタグ。 | オプション。 |
lineDelimiter | メッセージブロックの解析に使用される行区切り文字。 | オプション。 デフォルト値:\n |
fieldDelimiter | フィールド区切り文字。 | オプション。 デフォルト値:\u0001 。 この値は、\u0001 が読み取り専用モードの区切り文字として使用され、^A が編集モードの区切り文字として使用されることを示します。 \u0001 は読み取り専用モードでは表示されません。
|
encoding | エンコード形式。 | オプション。 デフォルト値:UTF-8 。
|
lengthCheck | 1 行のフィールド数を確認するためのポリシー。 | オプション。 デフォルト値:NONE。 有効な値:NONE、SKIP、EXCEPTION、および PAD。
|
columnErrorDebug | デバッグを有効にするかどうかを示します。 | オプション。 デフォルト値 : false。 このパラメーターが true に設定されている場合、例外の解析に関するログが表示されます。 |
フィールドタイプマッピング
MQ フィールドタイプ | 推奨される Realtime Compute のフィールドタイプ |
---|---|
STRING | VARCHAR |