すべてのプロダクト
Search
ドキュメントセンター

DataWorks:MetaQ データソース

最終更新日:Nov 09, 2025

DataWorks Data Integration は、Message Queue からデータを読み取るための MetaQ Reader を提供します。このトピックでは、MetaQ データソースからデータを同期する機能について説明します。

サポートされているバージョン

MetaQ Reader は、Message Queue 用の Java ソフトウェア開発キット (SDK) を使用して、MetaQ のリアルタイムメッセージをサブスクライブします。次の Java SDK バージョンがサポートされています。

<dependency>
            <groupId>com.taobao.metaq.final</groupId>
            <artifactId>metaq-client</artifactId>
            <version>4.0.1</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-sdk</artifactId>
            <version>1.3.1</version>
        </dependency>

制限事項

データの型

次の表に、サポートされているフィールドタイプを示します。

フィールドタイプ

オフライン読み取り (MetaQ Reader)

STRING

サポート

MetaQ Reader の MetaQ タイプでは、次の変換が利用できます。

Data Integration データ型

Message Queue データ型

STRING

STRING

データ同期ノードの開発

同期タスクを構成するためのエントリポイントと手順については、次の構成ガイドをご参照ください。

付録: コードとパラメーター

コードエディタを使用したバッチ同期タスクの構成

コードエディタを使用してバッチ同期タスクを構成する場合、統一されたスクリプト形式の要件に基づいて、スクリプト内の関連パラメーターを構成する必要があります。詳細については、「コードエディタでタスクを構成する」をご参照ください。次の情報は、コードエディタを使用してバッチ同期タスクを構成するときにデータソースに構成する必要があるパラメーターについて説明します。

Reader コードサンプル

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "metaqreader",
                    "parameter": {
                        "accessId": "<yourAccessKeyId>",
                        "accessKey": "<yourAccessKeySecret>",
                        "consumerId": "Test01",
                        "topicName": "test",
                        "subExpression": "*",
                        "onsChannel": "ALIYUN",
                        "domainName": "***.aliyun.com",
                        "contentType": "singlestringcolumn",
                        "beginOffset": "lastRead",
                        "nullCurrentOffset": "begin",
                        "fieldDelimiter": ",",
                        "column": [
                            "col0"
                        ],
                        "fieldDelimiter": ","
                    }
                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print": false
                    }
                }
            }
        ]
    }
}

Reader スクリプトパラメーター

パラメーター

説明

必須

accessId

Message Queue の AccessKey ペア。AccessKey ペアは身分認証に使用されます。

はい

accessKey

はい

consumerId

コンシューマー (サブスクライバーとも呼ばれる) は、メッセージを受信して消費します。

コンシューマー ID は、コンシューマーのタイプを識別するものです。ほとんどの場合、同じコンシューマー ID を持つコンシューマーは、同じタイプのメッセージを受信して消費し、同じ消費ロジックを使用します。

はい

topicName

メッセージの Topic。Topic は、メッセージを分類するために使用される主要なメッセージタイプです。

はい

subExpression

メッセージのサブトピック。

はい

onsChannel

Message Queue 認証に使用されます。

はい

unitName

メッセージを受信する宛先ユニット。一般的なユニットを以下に示します。

  • sh: センター

  • unsz: 中国 (深圳) リージョンのユニット

  • us: 米国

  • en-us: ヨーロッパ

  • rg-ru: ロシア

  • zbyk: 中国 (張家口) リージョンの Youku

  • unzbyun: 張北クラウド

  • unshyun: 中国 (上海) リージョンの Alibaba Cloud

  • lazada-sg: シンガポールの Lazada

  • lazada-my: マレーシアの Lazada

  • lazada-vn: ベトナムの Lazada

  • lazada-ph: フィリピンの Lazada

  • lazada-th: タイの Lazada

  • lazada-id: インドネシアの Lazada

いいえ

instanceName

コンシューマーインスタンスの名前。

いいえ

domainName

Message Queue のエンドポイント。

はい

contentType

メッセージタイプ。サポートされているタイプは、STRING メッセージの場合は singlestringcolumn、テキストメッセージの場合は text、JSON メッセージの場合は json です。

はい

beginOffset

タスクがデータの読み取りを開始するオフセット。有効な値: begin (最も古いオフセットから) および lastRead (最後に読み取ったオフセットから)。

いいえ

nullCurrentOffset

最後のオフセットが空の場合にデータの読み取りを開始する位置。有効な値: begin (最も古いオフセットから) および current (現在のオフセットから)。

はい

fieldDelimiter

セパレーターモードでのメッセージ文字列の列区切り文字 (コンマ (,) など)。\u0001 などの制御文字がサポートされています。

はい

column

読み取るフィールドのリスト。

はい

beginDateTime

データ消費の開始時刻。このパラメーターは、時間範囲の左境界を指定します。時間範囲は、左が閉じて右が開いた開区間です。

beginDateTime パラメーターの値は、yyyyMMddHHmmss 形式の時間文字列です。このパラメーターは、DataWorks のスケジューリングパラメーターと一緒に使用できます。

いいえ

説明

beginDateTime パラメーターと endDateTime パラメーターは一緒に使用する必要があります。

endDateTime

データ消費の終了時刻。このパラメーターは、時間範囲の右境界を指定します。時間範囲は、左が閉じて右が開いた開区間です。

endDateTime パラメーターの値は、yyyyMMddHHmmss 形式の時間文字列です。このパラメーターは、DataWorks のスケジューリングパラメーターと一緒に使用できます。