本ページでは、Realtime Compute で MQ ソーステーブルを作成する方法について説明します。 また、CSV 形式、WITH パラメーター、およびテーブル作成処理に関連するフィールドタイプマッピングについても説明します。

MQ の概要

MQ は、Alibaba Cloud が開発し、商用利用しているプロフェッショナルメッセージミドルウェアです。 エンタープライズレベルのインターネットアーキテクチャ (Aliware) のコアプロダクトです。 MQ は、高可用性分散クラスターテクノロジーに基づいて、パブリッシュ、またはサブスクリプション、メッセージトレース、リソース統計、メッセージスケジューリング (遅延)、モニタリングとアラートを含む、高性能メッセージングクラウドサービスの完全なセットを提供します。 分散コンピューティングシナリオで、すべての非同期デカップリング機能を実装するよう構築されています。 Realtime Compute は、ソーステーブルとしての MQ テーブルの作成をサポートしています。 サンプルコードは次のとおりです。

create table mq_stream(
 x varchar,
 y varchar,
 z varchar
) with (
 type='mq',
 topic='yourTopicName',
 endpoint='yourEndpoint',
 pullIntervalMs='1000',
 accessId='yourAccessId',
 accessKey='yourAccessSecret',
 startMessageOffset='1000',
 consumerGroup='yourConsumerGroup',
 fieldDelimiter='|'
);
MQ は、データスキーマを定義する必要のない非構造化ストレージフォーマットを使用しています。 データスキーマはビジネスレイヤーで指定されます。 現在、Realtime Compute は CSV 形式およびバイナリ形式のメッセージをサポートしています。

CSV 形式

次の CSV 形式の MQ メッセージがあると仮定します。

1,name,male 
2,name,female
MQ メッセージは、\ nで区切られたゼロから複数のデータレコードを含むことができます。

Realtime Compute ジョブで MQ ソーステーブルの宣言に使用される DDL 文は次のとおりです。

create table mq_stream(
 x varchar,
 y varchar,
 z varchar
) with (
 type='mq',
 topic='yourTopicName',
 endpoint='yourEndpoint',
 pullIntervalMs='1000',
 accessId='yourAccessId',
 accessKey='yourAccessSecret',
 startMessageOffset='1000',
 consumerGroup='yourConsumerGroup',
 fieldDelimiter='|'
);

バイナリ形式

バイナリ形式のサンプルコードは以下のとおりです。
create table source_table (
  mess varbinary
) with (
  type = 'mq',
  endpoint = 'yourEndpoint',
  pullIntervalMs='500',
  accessId='yourAccessId',
  accessKey='yourAccessSecret',
  topic = 'yourTopicName',
  consumerGroup='yourConsumerGroup'
);

create table out_table (
  commodity varchar
)with(
  type='print'
);

INSERT INTO out_table
SELECT
  cast(mess as varchar)
FROM source_table
  • cast(mess as varchar) 文は、Realtime Compute V2.0 およびそれ以降でサポートされています。 Realtime Compute のバージョンが V2.0 より前の場合は、最初にアップグレードしてください。
  • VARBINARY タイプは 1 回だけ渡すことができます。

WITH パラメーター

名称 説明 備考
topic トピック名。 なし
endPoint エンドポイント。
  • Alibaba Cloud パブリッククラウド (Alibaba Cloud クラシックネットワークまたは VPC) へのイントラネットアクセス:中国 (杭州)、中国 (上海)、中国 (青島)、中国 (北京)、中国 (深セン)、および中国 (香港) のエンドポイントは、onsaddr-internal.aliyun.com:8080 です。
  • Alibaba Cloud パブリッククラウドへのインターネットアクセス:エンドポイントは、http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet です。
accessId AccessKey ID。 なし
accessKey AccessKey Secret。 なし
consumerGroup トピックにサブスクライブする消費グループ。 なし
pullIntervalMs プル間隔 単位:ミリ秒。
startTime メッセージ消費の開始時刻。 オプション。
startMessageOffset メッセージの開始オフセット。 オプション。 このパラメーターが設定されている場合、ロードは、オフセットによって決定されるチェックポイントから優先的に開始されます。
tag サブスクリクションタグ。 オプション。
lineDelimiter メッセージブロックの解析に使用される行区切り文字。 オプション。 デフォルト値:\n
fieldDelimiter フィールド区切り文字。 オプション。 デフォルト値:\u0001。 この値は、\u0001 が読み取り専用モードの区切り文字として使用され、^A が編集モードの区切り文字として使用されることを示します。 \u0001 は読み取り専用モードでは表示されません。
encoding エンコード形式。 オプション。 デフォルト値:UTF-8
lengthCheck 1 行のフィールド数を確認するためのポリシー。 オプション。 デフォルト値:NONE。 有効な値:NONE、SKIP、EXCEPTION、および PAD。
  • SKIP:レコードのフィールド数が指定された数と一致しない場合、データレコードをスキップします。
  • EXCEPTION:レコードのフィールド数が指定した数と一致しない場合、例外をスローします。
  • PAD:フィールドを順番に埋め込みます。 フィールドが存在しない場合は null で埋めます。
columnErrorDebug デバッグを有効にするかどうかを示します。 オプション。 デフォルト値 : false。 このパラメーターが true に設定されている場合、例外の解析に関するログが表示されます。

フィールドタイプマッピング

MQ フィールドタイプ 推奨される Realtime Compute のフィールドタイプ
STRING VARCHAR