DataWorks Data Integration は、Message Queue からデータを読み取るための MetaQ Reader を提供します。このトピックでは、MetaQ データソースからデータを同期する機能について説明します。
サポートされているバージョン
MetaQ Reader は、Message Queue 用の Java ソフトウェア開発キット (SDK) を使用して、MetaQ のリアルタイムメッセージをサブスクライブします。次の Java SDK バージョンがサポートされています。
<dependency>
<groupId>com.taobao.metaq.final</groupId>
<artifactId>metaq-client</artifactId>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-sdk</artifactId>
<version>1.3.1</version>
</dependency>制限事項
MetaQ Reader プラグインを使用して Message Queue からデータを読み取ることができるのは、コードエディタのみです。
MetaQ Reader は、サーバーレスリソースグループ (推奨) および Data Integration の専用リソースグループ をサポートしています。
データの型
次の表に、サポートされているフィールドタイプを示します。
フィールドタイプ | オフライン読み取り (MetaQ Reader) |
STRING | サポート |
MetaQ Reader の MetaQ タイプでは、次の変換が利用できます。
Data Integration データ型 | Message Queue データ型 |
STRING | STRING |
データ同期ノードの開発
同期タスクを構成するためのエントリポイントと手順については、次の構成ガイドをご参照ください。
構成手順については、「コードエディタでタスクを構成する」をご参照ください。
コードエディタのすべてのパラメーターとコードサンプルのリストについては、「付録: MetaQ コードサンプルとパラメーターの説明」をご参照ください。
付録: コードとパラメーター
コードエディタを使用したバッチ同期タスクの構成
コードエディタを使用してバッチ同期タスクを構成する場合、統一されたスクリプト形式の要件に基づいて、スクリプト内の関連パラメーターを構成する必要があります。詳細については、「コードエディタでタスクを構成する」をご参照ください。次の情報は、コードエディタを使用してバッチ同期タスクを構成するときにデータソースに構成する必要があるパラメーターについて説明します。
Reader コードサンプル
{
"job": {
"content": [
{
"reader": {
"name": "metaqreader",
"parameter": {
"accessId": "<yourAccessKeyId>",
"accessKey": "<yourAccessKeySecret>",
"consumerId": "Test01",
"topicName": "test",
"subExpression": "*",
"onsChannel": "ALIYUN",
"domainName": "***.aliyun.com",
"contentType": "singlestringcolumn",
"beginOffset": "lastRead",
"nullCurrentOffset": "begin",
"fieldDelimiter": ",",
"column": [
"col0"
],
"fieldDelimiter": ","
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": false
}
}
}
]
}
}Reader スクリプトパラメーター
パラメーター | 説明 | 必須 |
accessId | Message Queue の AccessKey ペア。AccessKey ペアは身分認証に使用されます。 | はい |
accessKey | はい | |
consumerId | コンシューマー (サブスクライバーとも呼ばれる) は、メッセージを受信して消費します。 コンシューマー ID は、コンシューマーのタイプを識別するものです。ほとんどの場合、同じコンシューマー ID を持つコンシューマーは、同じタイプのメッセージを受信して消費し、同じ消費ロジックを使用します。 | はい |
topicName | メッセージの Topic。Topic は、メッセージを分類するために使用される主要なメッセージタイプです。 | はい |
subExpression | メッセージのサブトピック。 | はい |
onsChannel | Message Queue 認証に使用されます。 | はい |
unitName | メッセージを受信する宛先ユニット。一般的なユニットを以下に示します。
| いいえ |
instanceName | コンシューマーインスタンスの名前。 | いいえ |
domainName | Message Queue のエンドポイント。 | はい |
contentType | メッセージタイプ。サポートされているタイプは、STRING メッセージの場合は singlestringcolumn、テキストメッセージの場合は text、JSON メッセージの場合は json です。 | はい |
beginOffset | タスクがデータの読み取りを開始するオフセット。有効な値: begin (最も古いオフセットから) および lastRead (最後に読み取ったオフセットから)。 | いいえ |
nullCurrentOffset | 最後のオフセットが空の場合にデータの読み取りを開始する位置。有効な値: begin (最も古いオフセットから) および current (現在のオフセットから)。 | はい |
fieldDelimiter | セパレーターモードでのメッセージ文字列の列区切り文字 (コンマ (,) など)。\u0001 などの制御文字がサポートされています。 | はい |
column | 読み取るフィールドのリスト。 | はい |
beginDateTime | データ消費の開始時刻。このパラメーターは、時間範囲の左境界を指定します。時間範囲は、左が閉じて右が開いた開区間です。 beginDateTime パラメーターの値は、yyyyMMddHHmmss 形式の時間文字列です。このパラメーターは、DataWorks のスケジューリングパラメーターと一緒に使用できます。 | いいえ 説明 beginDateTime パラメーターと endDateTime パラメーターは一緒に使用する必要があります。 |
endDateTime | データ消費の終了時刻。このパラメーターは、時間範囲の右境界を指定します。時間範囲は、左が閉じて右が開いた開区間です。 endDateTime パラメーターの値は、yyyyMMddHHmmss 形式の時間文字列です。このパラメーターは、DataWorks のスケジューリングパラメーターと一緒に使用できます。 |