DataWorks Data Integration は、Message Queue からデータを読み取るための MetaQ Reader を提供しています。このトピックでは、MetaQ データソースからデータを同期する機能について説明します。
サポートされている MetaQ バージョン
MetaQ Reader は、次のバージョンの Message Queue SDK for Java を使用して、Message Queue からリアルタイムデータをサブスクライブします。
<dependency>
<groupId>com.taobao.metaq.final</groupId>
<artifactId>metaq-client</artifactId>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-sdk</artifactId>
<version>1.3.1</version>
</dependency>
制限事項
- Message Queue からデータを読み取るには、コードエディターを使用してのみ MetaQ Reader を構成できます。
- MetaQ Reader は、Data Integration 専用のリソースグループのみをサポートしています。
データ型
次の表に、MetaQ の主要なデータ型のサポート状況を示します。
次の表に、MetaQ Reader がデータ型を変換する際に基づくデータ型のマッピングを示します。
データ型 | バッチデータ読み取り用の MetaQ Reader |
STRING | サポートされています |
Data Integration データ型 | Message Queue データ型 |
STRING | STRING |
データ同期ノードの開発
- 構成手順の詳細については、「コードエディターを使用したバッチ同期タスクの構成」をご参照ください。
- コードエディターを使用してバッチ同期ノードを構成する際に構成されるすべてのパラメーターと実行されるコードについては、「付録:コードとパラメーター」をご参照ください。
付録:コードとパラメーター
付録:コードエディターを使用したバッチ同期ノードの構成
コードエディターを使用してバッチ同期タスクを構成する場合は、コードエディターの形式要件に基づいて、関連データソースのリーダーとライターのパラメーターを構成する必要があります。形式要件の詳細については、「コードエディターを使用したバッチ同期タスクの構成」をご参照ください。次の情報は、コードエディターのリーダーとライターのパラメーターの構成の詳細について説明しています。
MetaQ Reader のコード
{
"job": {
"content": [
{
"reader": {
"name": "metaqreader",
"parameter": {
"accessId": "<yourAccessKeyId>", // あなたの AccessKeyId
"accessKey": "<yourAccessKeySecret>", // あなたの AccessKeySecret
"consumerId": "Test01",
"topicName": "test",
"subExpression": "*",
"onsChannel": "ALIYUN",
"domainName": "***.aliyun.com",
"contentType": "singlestringcolumn",
"beginOffset": "lastRead",
"nullCurrentOffset": "begin",
"fieldDelimiter": ",",
"column": [
"col0"
],
"fieldDelimiter": ","
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": false
}
}
}
]
}
}
MetaQ Reader のコードのパラメーター
パラメーター | 説明 | 必須 |
accessId | Message Queue にアクセスするために使用する AccessKey ID。 | はい |
accessKey | Message Queue にアクセスするために使用する AccessKey シークレット。 | はい |
consumerId | コンシューマー ID。コンシューマーは、メッセージを受信して消費するメッセージサブスクライバーとも呼ばれます。 consumer ID は、コンシューマーの種類の識別子です。ほとんどの場合、同じコンシューマー ID を持つコンシューマーは、同じ種類のメッセージを受信して消費し、同じ消費ロジックを使用します。 | はい |
topicName | 消費するメッセージのトピック。トピックは、メッセージを分類するために使用されます。これは主要な分類子です。 | はい |
subExpression | メッセージのサブトピック。 | はい |
onsChannel | MetaQ Reader が Message Queue に接続するときの認証に使用されるチャネル。 | はい |
unitName | メッセージを受信する宛先ユニット。有効な値:
| いいえ |
instanceName | コンシューマーインスタンスの名前。 | いいえ |
domainName | Message Queue に接続するために使用するエンドポイント。 | はい |
contentType | メッセージの種類。有効な値: singlestringcolumn、 text、 json。 | はい |
beginOffset | MetaQ Reader がデータの読み取りを開始するオフセット。有効な値:begin と lastRead。 | いいえ |
nullCurrentOffset | 最後のオフセットが null の場合に、MetaQ Reader がデータの読み取りを開始するオフセット。有効な値:begin と current。 | はい |
fieldDelimiter | メッセージ文字列を区切るために使用する列区切り文字(コンマ(,)など)。制御文字がサポートされています。例:\u0001。 | はい |
column | メッセージ内のデータを読み取るフィールドの名前。 | はい |
beginDateTime | データ消費の開始時刻。このパラメーターは、左閉右開区間の左境界を指定します。 beginDateTime パラメーターの値は、yyyyMMddHHmmss 形式の時刻文字列です。このパラメーターは、DataWorks のスケジューリングパラメーターと一緒に使用できます。 | いいえ 説明 beginDateTime パラメーターと endDateTime パラメーターはペアで使用する必要があります。 |
endDateTime | データ消費の終了時刻。このパラメーターは、左閉右開区間の右境界を指定します。 endDateTime パラメーターの値は、yyyyMMddHHmmss 形式の時刻文字列です。このパラメーターは、DataWorks のスケジューリングパラメーターと一緒に使用できます。 | いいえ