すべてのプロダクト
Search
ドキュメントセンター

DataWorks:DataHub データソース

最終更新日:Mar 07, 2026

DataHub データソースは、DataHub との間のデータ読み取りおよび書き込みのための双方向チャネルを提供します。このドキュメントでは、DataWorks でこのデータソースのデータ同期を構成する方法について説明します。

対応バージョン

  • DataHub Reader は、DataHub Java SDK を使用してデータを読み取ります。以下の SDK バージョンを使用します。

    <dependency>
        <groupId>com.aliyun.DataHub</groupId>
        <artifactId>aliyun-sdk-DataHub</artifactId>
        <version>2.9.1</version>
    </dependency>
  • DataHub Writer は、DataHub Java SDK を使用してデータを書き込みます。以下の SDK バージョンを使用します。

    <dependency>
        <groupId>com.aliyun.datahub</groupId>
        <artifactId>aliyun-sdk-datahub</artifactId>
        <version>2.5.1</version>
    </dependency>

制限事項

バッチデータ読み書き

STRING データの型は UTF-8 エンコーディングのみをサポートしています。単一の STRING フィールドのサイズは最大 1 MB です。

リアルタイムデータ読み書き

  • リアルタイム同期タスクは、サーバーレスリソースグループをサポートしています。

  • システムは、同じハッシュ値を持つデータを同じシャードに同期します。

リアルタイム全データベース書き込み

タスクの開始時に、バッチプロセスが完全データを DataHub に書き込みます。完全データ同期が完了すると、リアルタイムプロセスがソースから宛先への増分データの同期を開始します。

  • データは DataHub の TUPLE 型の Topic にのみ書き込むことができます。TUPLE データの型の詳細については、「データの型」をご参照ください。

  • DataHub にデータをリアルタイムで同期する場合、ソーステーブルフィールドに 5 つの追加フィールドが追加されます。タスクを構成する際に、カスタムフィールドを追加することもできます。DataHub に送信されるメッセージフォーマットの詳細については、「付録: メッセージフォーマット」をご参照ください。

サポートされているフィールドの型

DataHub とデータを同期すると、フィールドは対応するデータの型にマップされます。DataHub は、次のデータの型のみをサポートします: BIGINTSTRINGBOOLEANDOUBLETIMESTAMP、および DECIMAL

データソースの追加

DataWorks で同期タスクを開発する前に、「データソース管理」の手順に従って、必要なデータソースを DataWorks に追加する必要があります。データソースを追加する際に、DataWorks コンソールでパラメーターの意味を理解するために、パラメーターの説明を表示できます

データ同期タスクの開発

同期タスクの構成のエントリポイントとプロシージャについては、以下の構成ガイドをご参照ください。

単一テーブルバッチ同期

単一テーブルリアルタイム同期

プロシージャについては、「DataStudio でリアルタイム同期タスクを構成する」をご参照ください。

説明

DataHub の異なるデータの型、シャーディングポリシー、データフォーマット、およびサンプルメッセージでサポートされている操作の詳細については、「付録: メッセージフォーマット」をご参照ください。

全データベースリアルタイム同期

プロシージャについては、「リアルタイム全データベース同期タスクの構成」をご参照ください。

よくある質問

単一リクエストのデータサイズが制限を超えたために DataHub への書き込み操作が失敗した場合、どうすればよいですか?

付録: コードとパラメーター

コードエディタを使用したバッチ同期タスクの構成

コードエディタを使用してバッチ同期タスクを構成する場合は、統一されたスクリプトフォーマット要件に基づいてスクリプト内の関連パラメーターを構成する必要があります。詳細については、「コードエディタの使用」をご参照ください。以下の情報は、コードエディタを使用してバッチ同期タスクを構成する際に、データソースに対して構成する必要があるパラメーターについて説明しています。

Reader スクリプト例

{
    "type":"job",
    "version":"2.0",// バージョン番号。
    "steps":[
        {
         "job": {
           "content": [
            {
                "reader": {
                    "name": "DataHubreader",
                    "parameter": {
                        "endpoint": "xxx", // DataHub エンドポイント。
                        "accessId": "xxx", // DataHub にアクセスするために使用される AccessKey ID。
                        "accessKey": "xxx", // DataHub にアクセスするために使用される AccessKey Secret。
                        "project": "xxx", // 宛先 DataHub プロジェクトの名前。
                        "topic": "xxx", // 宛先 DataHub Topic の名前。
                        "batchSize": 1000, // 一度に読み取るレコード数。
                        "beginDateTime": "20180910111214", // データ消費の開始時刻。
                        "endDateTime": "20180910111614", // データ消費の終了時刻。
                        "column": [
                            "col0",
                            "col1",
                            "col2",
                            "col3",
                            "col4"
                                  ]
                                }
                           },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print": false
                                 }
                            }
             }
           ]
         }
       }
     ],
    "setting":{
        "errorLimit":{
            "record":"0"// 許可される最大エラー数。
        },
        "speed":{
            "throttle":true,// throttle が false に設定されている場合、mbps パラメーターは無視され、スロットルは適用されません。throttle が true に設定されている場合、スロットルが有効になります。
            "concurrent":1,// 同時スレッド数。
            "mbps":"12"// 最大データ転送レート。1 mbps = 1 MB/s。
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

Reader パラメーター

パラメーター

説明

必須

endpoint

DataHub の エンドポイント

はい

accessId

DataHub にアクセスするために使用される AccessKey ID

はい

accessKey

DataHub にアクセスするために使用される AccessKey Secret

はい

project

ターゲット DataHub 内のプロジェクトの名前。プロジェクトは、リソースの隔離とコントロールに使用される DataHub のリソース管理単位です。

はい

topic

宛先 DataHub Topic の名前。

はい

batchSize

一度に読み取るレコード数。デフォルト値: 1,024

いいえ

beginDateTime

yyyyMMddHHmmss フォーマットでのデータ消費の開始時刻。時間範囲は開始時刻を含み、終了時刻を含みません。データを増分同期するには、このパラメーターを DataWorks の スケジューリングパラメーターと共に使用できます。たとえば、パラメーター名を bizdate に、を $[yyyymmdd-1] に設定できます。次に、beginDateTime を ${bizdate}000000 に設定します。これにより、開始時刻が前日の 00:00:00 に構成されます。

説明

beginDateTimeendDateTime パラメーターは一緒に使用する必要があります。

はい

endDateTime

yyyyMMddHHmmss フォーマットでのデータ消費の終了時刻。このタイムスタンプは、時間範囲の排他的境界です。データを増分同期するには、このパラメーターを DataWorks の スケジューリングパラメーターと共に使用できます。たとえば、パラメーター名を bizdate に、を $[yyyymmdd-1] に設定できます。次に、endDateTime を ${bizdate}235959 に設定します。これにより、終了時刻が前日の 23:59:59 に構成されます。

説明

beginDateTimeendDateTime パラメーターは一緒に使用する必要があります。

はい

Writer スクリプト例

{
    "type": "job",
    "version": "2.0",// バージョン番号。
    "steps": [
        {
            "stepType": "stream",
            "parameter": {},
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "datahub",// プラグイン名。
            "parameter": {
                "datasource": "",// データソース。
                "topic": "",// Topic は、DataHub におけるサブスクリプションと公開の基本単位です。Topic を使用して、特定のタイプまたはカテゴリのストリーミングデータを表すことができます。
                "maxRetryCount": 500,// タスクが失敗した場合の最大再試行回数。
                "maxCommitSize": 1048576// バッファーがこのサイズ (バイト単位) に達すると、データはバッチでコミットされます。DataHub のリクエストあたりのレコード制限は 10,000 です。エラーを防止するために、この値が (平均レコードサイズ * 10,000) 未満であることを確認してください。
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
        "errorLimit": {
            "record": ""// 許可される最大エラー数。
        },
        "speed": {
            "throttle":true,// throttle が false に設定されている場合、mbps パラメーターは無視され、スロットルは適用されません。throttle が true に設定されている場合、スロットルが有効になります。
            "concurrent":20, // 同時スレッド数。
            "mbps":"12"// 最大データ転送レート。1 mbps = 1 MB/s。
        }
    },
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    }
}

Writer パラメーター

パラメーター

説明

必須

デフォルト

accessId

DataHub の AccessKey ID

はい

なし

accessKey

DataHub の AccessKey Secret

はい

なし

endPoint

リソースが配置されている DataHub サービスのエンドポイント。

はい

なし

maxRetryCount

タスクが失敗した場合の最大再試行回数。

いいえ

なし

mode

STRING データの型の値の書き込みモード。

はい

なし

parseContent

コンテンツを解析するかどうかを指定します。

はい

なし

project

プロジェクトは、DataHub のデータに対する基本的な組織単位です。プロジェクトには複数の Topic が含まれます。

説明

DataHub プロジェクトは MaxCompute プロジェクトとは独立しています。MaxCompute プロジェクトを DataHub で再利用することはできません。

はい

なし

topic

DataHub では、Topic はデータ公開とサブスクリプションの基本単位であり、ストリーミングデータのカテゴリを表します。

はい

なし

maxCommitSize

書き込みパフォーマンスを向上させるために、DataX はデータをバッファーし、maxCommitSize (バイト単位) で指定されたサイズにバッファーが達すると、バッチでデータを書き込みます。DataHub はリクエストあたり最大 10,000 レコードを許可します。エラーを防止するために、このパラメーターを平均レコードサイズに基づいて 10,000 レコードの合計サイズよりも小さい値に設定してください。

いいえ

1 MB