すべてのプロダクト
Search
ドキュメントセンター

DataWorks:DataHub データソース

最終更新日:Aug 23, 2025

DataWorks は、DataHub データソースからデータを読み取り、DataHub データソースにデータを書き込むための DataHub Reader と DataHub Writer を提供しています。これにより、大量のデータの高速計算が容易になります。このトピックでは、DataHub データソースのデータ同期機能について説明します。

サポートされている DataHub バージョン

  • DataHub Reader は、Java 用 DataHub SDK を使用して DataHub からデータを読み取ります。次のコードは、Java 用 DataHub SDK の例を示しています。

    <dependency>
        <groupId>com.aliyun.DataHub</groupId>
        <artifactId>aliyun-sdk-DataHub</artifactId>
        <version>2.9.1</version>
    </dependency>
  • DataHub Writer は、Java 用 DataHub SDK を使用して DataHub にデータを書き込みます。次のコードは、Java 用 DataHub SDK の例を示しています。

    <dependency>
        <groupId>com.aliyun.datahub</groupId>
        <artifactId>aliyun-sdk-datahub</artifactId>
        <version>2.5.1</version>
    </dependency>

制限

バッチデータの読み取りと書き込み

文字列は UTF-8 形式でエンコードする必要があります。各文字列のサイズは 1 MB を超えてはなりません。

リアルタイムデータの読み取りと書き込み

  • リアルタイム同期タスクは、サーバーレス リソースグループ専用リソースグループをサポートしています。

  • リアルタイムで DataHub データソースにデータを同期する場合、ソースデータのハッシュ値が検証されます。同じハッシュ値を持つデータは、DataHub データソースの同じシャードに同期されます。

完全データと増分データのリアルタイム書き込み

同期ソリューションを実行すると、ソースの完全データはバッチ同期タスクを使用して宛先に書き込まれます。次に、ソースの増分データは、リアルタイム同期タスクを使用して宛先に書き込まれます。 DataHub にデータを書き込む場合は、次の点に注意してください。

  • TUPLE タイプのトピックにのみデータを書き込むことができます。 TUPLE トピックでサポートされているデータ型の詳細については、「データ型」をご参照ください。

  • リアルタイム同期タスクを実行してリアルタイムで DataHub にデータを同期する場合、デフォルトで 5 つの追加フィールドが宛先トピックに追加されます。ビジネス要件に基づいて、宛先トピックに他のフィールドを追加することもできます。 DataHub メッセージ形式の詳細については、「付録: DataHub メッセージ形式」をご参照ください。

データ型マッピング

データは、DataHub のフィールドのデータ型と指定されたサービスのデータ型の間のマッピングに基づいて同期されます。 DataHub は、[BIGINT][STRING][BOOLEAN][DOUBLE][TIMESTAMP][DECIMAL] のデータ型のみをサポートしています。

データソースを追加する

DataWorks で同期タスクを開発する前に、「データソース管理」の手順に従って、必要なデータソースを DataWorks に追加する必要があります。データソースを追加する際に、DataWorks コンソールのパラメータのヒントを表示して、パラメータの意味を理解することができます

データ同期タスクを開発する

同期タスクのエントリポイントと構成手順については、次の構成ガイドを参照してください。

単一テーブルのデータを同期するためのバッチ同期タスクを構成する

単一テーブルのデータを同期するか、データベースのすべてのデータを同期するためのリアルタイム同期タスクを構成する

構成手順の詳細については、「DataStudio でリアルタイム同期タスクを構成する」をご参照ください。

説明

ソーステーブルに対する操作によって生成されたデータ変更の同期のさまざまなトピックタイプのサポート、さまざまなトピックタイプのシャーディング戦略、データ形式、およびサンプルメッセージについては、「付録: DataHub メッセージ形式」をご参照ください。

単一テーブルまたはデータベースの完全データと増分データの(リアルタイム)同期を実装するための同期設定を構成する

構成手順の詳細については、「Data Integration で同期タスクを構成する」をご参照ください。

FAQ

一度に書き込みたいデータ量が上限を超えているため、DataHub にデータを書き込むことができない場合はどうすればよいですか?

付録: コードとパラメータ

付録: コードエディタを使用してバッチ同期タスクを構成する

コードエディタを使用してバッチ同期タスクを設定する

コードエディタを使用してバッチ同期タスクを設定する場合は、統一されたスクリプトフォーマットの要件に基づいて、スクリプトで関連するパラメーターを設定する必要があります。 詳細については、「コードエディタを使用してバッチ同期タスクを設定する」をご参照ください。 以下では、コードエディタを使用してバッチ同期タスクを設定する際に、データソース用に設定する必要があるパラメーターについて説明します。

DataHub Reader のコード

{
    "type":"job",
    "version":"2.0",// バージョン番号。
    "steps":[
        {
         "job": {
           "content": [
            {
                "reader": {
                    "name": "DataHubreader",
                    "parameter": {
                        "endpoint": "xxx" // DataHub のエンドポイント。
                        "accessId": "xxx", // DataHub に接続するために使用される AccessKey ID。
                        "accessKey": "xxx", // DataHub に接続するために使用される AccessKey シークレット。
                        "project": "xxx", // データを読み取りたい DataHub プロジェクトの名前。
                        "topic": "xxx" // データを読み取りたい DataHub トピックの名前。
                        "batchSize": 1000, // 一度に読み取るデータレコードの数。
                        "beginDateTime": "20180910111214", // データ消費の開始時刻。
                        "endDateTime": "20180910111614", // データ消費の終了時刻。
                        "column": [
                            "col0",
                            "col1",
                            "col2",
                            "col3",
                            "col4"
                                  ]
                                }
                           },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print": false
                                 }
                            }
             }
           ]
         }
       }
     ],
    "setting":{
        "errorLimit":{
            "record":"0"// ダーティデータレコードの最大許容数。
        },
        "speed":{
            "throttle":true,// スロットリングを有効にするかどうかを指定します。値 false はスロットリングが無効になっていることを示し、値 true はスロットリングが有効になっていることを示します。 mbps パラメータは、throttle パラメータが true に設定されている場合にのみ有効になります。
            "concurrent":1,// 並列スレッドの最大数。
            "mbps":"12"// 最大伝送速度。単位: MB/秒。
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

DataHub Reader のコードのパラメータ

パラメータ

説明

必須

endpoint

DataHub の エンドポイント

はい

accessId

DataHub に接続するために使用される AccessKey ID

はい

accessKey

DataHub に接続するために使用される AccessKey シークレット

はい

project

データを読み取りたい DataHub プロジェクトの名前。 DataHub プロジェクトは、リソースの隔離と制御のための DataHub のリソース管理単位です。

はい

topic

データを読み取りたい DataHub トピックの名前。

はい

batchSize

一度に読み取るデータレコードの数。デフォルト値: 1024

いいえ

beginDateTime

データ消費の開始時刻。このパラメータは、左閉右開区間の左境界を指定します。開始時刻は yyyyMMddHHmmss の形式で指定します。

増分データ同期を実装するには、スケジューリングパラメータを使用できます。たとえば、ノードスケジューリングパラメータの名前を bizdate に設定し、値を $[yyyymmdd-1] に設定します。次に、beginDateTime${bizdate}000000 に設定できます。これは、データ消費が前日の 00:00:00 に開始されることを示します。

説明

beginDateTime パラメータと endDateTime パラメータはペアで使用する必要があります。

はい

endDateTime

データ消費の終了時刻。このパラメータは、左閉右開区間の右境界を指定します。終了時刻は yyyyMMddHHmmss の形式で指定します。

増分データ同期を実装するには、スケジューリングパラメータを使用できます。たとえば、ノードスケジューリングパラメータの名前を bizdate に設定し、値を $[yyyymmdd-1] に設定します。次に、endDateTime${bizdate}235959 に設定できます。これは、データ消費が前日の 23:59:59 に終了することを示します。

説明

beginDateTime パラメータと endDateTime パラメータはペアで使用する必要があります。

はい

DataHub Writer のコード

{
    "type": "job",
    "version": "2.0",// バージョン番号。
    "steps": [
        { 
            "stepType": "stream",
            "parameter": {},
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "datahub",// プラグイン名。
            "parameter": {
                "datasource": "",// データソースの名前。
                "topic": "",// データのサブスクライブと発行の最小単位。トピックを使用して、さまざまなタイプのストリーミングデータを区別できます。
                "maxRetryCount": 500,// 同期タスクが失敗した場合の最大再試行回数。
                "maxCommitSize": 1048576// Data Integration が宛先にデータをコミットする前に累積できるバッファデータの最大量。単位: バイト。
                 // DataHub では、1 回のリクエストで最大 10,000 件のデータレコードを書き込むことができます。データレコードの数が 10,000 を超えると、同期タスクは失敗します。この場合、1 回のリクエストで書き込むことができるデータの最大量は、次の式を使用して計算されます。1 つのデータレコードの平均データ量 × 10,000。 maxCommitSize は、計算されたデータの最大量よりも小さい値に設定する必要があります。これにより、1 回のリクエストで書き込まれるデータレコードの数が 10,000 を超えないようになります。たとえば、1 つのデータレコードのデータサイズが 10 KB の場合、このパラメータの値は 10 に 10,000 を掛けた結果よりも小さくなければなりません。
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
        "errorLimit": {
            "record": ""// ダーティデータレコードの最大許容数。
        },
        "speed": {
            "throttle":true,// スロットリングを有効にするかどうかを指定します。値 false はスロットリングが無効になっていることを示し、値 true はスロットリングが有効になっていることを示します。 mbps パラメータは、throttle パラメータが true に設定されている場合にのみ有効になります。
            "concurrent":20, // 並列スレッドの最大数。
            "mbps":"12"// 最大伝送速度。単位: MB/秒。
        }
    },
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    }
}

DataHub Writer のコードのパラメータ

パラメータ

説明

必須

デフォルト値

accessId

DataHub に接続するために使用される AccessKey ID

はい

デフォルト値なし

accessKey

DataHub に接続するために使用される AccessKey シークレット

はい

デフォルト値なし

endPoint

DataHub のエンドポイント。

はい

デフォルト値なし

maxRetryCount

同期タスクが失敗した場合の最大再試行回数。

いいえ

デフォルト値なし

mode

文字列を書き込むためのモード。

はい

デフォルト値なし

parseContent

解析されるデータ。

はい

デフォルト値なし

project

DataHub におけるデータの基本的な組織単位。各プロジェクトには 1 つ以上のトピックがあります。

説明

DataHub プロジェクトは MaxCompute プロジェクトとは独立しています。 MaxCompute プロジェクトを DataHub プロジェクトとして使用することはできません。

はい

デフォルト値なし

topic

データのサブスクライブと発行の最小単位。トピックを使用して、さまざまなタイプのストリーミングデータを区別できます。

はい

デフォルト値なし

maxCommitSize

Data Integration が宛先にデータをコミットする前に累積できるバッファデータの最大量。書き込み効率を向上させるために、このパラメータを指定できます。 デフォルト値は 1048576 バイト(1 MB)です。 DataHub では、1 回のリクエストで最大 10,000 件のデータレコードを書き込むことができます。データレコードの数が 10,000 を超えると、同期タスクは失敗します。この場合、1 回のリクエストで書き込むことができるデータの最大量は、次の式を使用して計算されます。1 つのデータレコードの平均データ量 × 10,000。 maxCommitSize は、計算されたデータの最大量よりも小さい値に設定する必要があります。これにより、1 回のリクエストで書き込まれるデータレコードの数が 10,000 を超えないようになります。

いいえ

1MB