すべてのプロダクト
Search
ドキュメントセンター

DataWorks:MaxCompute データソース

最終更新日:Dec 05, 2025

MaxCompute データソースはデータハブとして機能します。MaxCompute からのデータの読み取りと MaxCompute へのデータの書き込みを行うための双方向チャネルを提供します。

機能

説明

DataWorks の MaxCompute データソースは、Tunnel エンドポイントアドレスを使用して MaxCompute プロジェクトの Tunnel サービスにアクセスできます。これにより、アップロードとダウンロードを通じてプロジェクトのデータを同期できます。Tunnel サービスを使用したアップロードとダウンロードには、DownloadTable 操作が含まれます。

2023 年 12 月 11 日以降に作成された MaxCompute データソースの場合、DataWorks サービスがアクセスしたい MaxCompute プロジェクトと異なるリージョンにある場合、Tunnel エンドポイントアドレスを使用して MaxCompute プロジェクトから直接データを同期することはできません。このシナリオでは、Cloud Enterprise Network (CEN) インスタンスを購入して、DataWorks サービスと MaxCompute プロジェクトのネットワークを接続する必要があります。ネットワークが接続された後、リージョンをまたいだデータ同期を実行できます。CEN とその関連操作の詳細については、「Cloud Enterprise Network」をご参照ください。

オフライン読み取り

  • MaxCompute Reader は、パーティションテーブルと標準テーブルからの読み取りをサポートしています。仮想ビューからの読み取りや外部テーブルの同期はサポートしていません。

  • MaxCompute のパーティションテーブルからオフラインで読み取りを行う場合、パーティションフィールドを直接マッピングすることはできません。パーティションフィールドの値を同期するには、カスタムフィールドを追加し、パーティション名を手動で入力してから、フィールドマッピングを実行する必要があります。

  • スケジューリングパラメーターを使用してパーティションを指定し、自動的に置き換えることができます。これは、スケジューリング時間に基づいてパーティションを同期する必要があるシナリオで役立ちます。

    たとえば、t0 という名前のパーティションテーブルに id と name というフィールドがあるとします。ハッシュパーティションは pt、サブパーティションは ds です。pt=<データタイムスタンプ> かつ ds=hangzhou のパーティションからデータを読み取るには、データソース設定でパーティションの値を pt=${scheduling parameter} および ds=hangzhou として指定する必要があります。その後、フィールドマッピング設定時に id と name フィールドをマッピングできます。

  • パーティションフィールドをカスタムフィールドとして追加することで、ターゲットテーブルに書き込むことができます。

  • MaxCompute Reader は、WHERE 句を使用したデータフィルタリングをサポートしています。

オフライン書き込み

  • MaxCompute Writer は、データに null 値が含まれている場合、VARCHAR 型をサポートしません。

  • DeltaTable にデータを書き込む場合は、[同期後の可視性][はい] に設定します。そうしないと、同時実行数が 1 より大きい場合にタスクが失敗します。

    image

リアルタイム書き込み

  • リアルタイムデータ同期タスクは、サーバーレスリソースグループ (推奨) とデータ統合専用リソースグループをサポートしています。

  • リアルタイムデータ同期タスクは、プライマリキーのないテーブルをサポートしていません。

  • デフォルトの MaxCompute データソース (通常は odps_first) にリアルタイムで同期する場合、デフォルトで一時的な AccessKey (AK) が使用されます。一時的な AK は 7 日後に有効期限が切れるため、タスクは失敗します。プラットフォームは、一時的な AK の有効期限切れによる失敗を検出すると、タスクを自動的に再起動します。この種のエラーに対してモニタリングアラートを設定している場合は、アラートが届きます。

  • MaxCompute へのワンクリックリアルタイム同期タスクでは、設定した日にのみ完全な履歴データをクエリできます。増分データは、翌日にマージが完了した後にのみ MaxCompute でクエリできます。

  • MaxCompute へのワンクリックリアルタイム同期タスクは、毎日完全なパーティションを生成します。過剰なデータがストレージリソースを消費するのを防ぐため、このソリューションによって自動的に作成される MaxCompute テーブルのデフォルトのライフサイクルは 30 日です。この期間がビジネス要件を満たさない場合は、タスク設定中に MaxCompute テーブル名をクリックしてライフサイクルを変更できます。

  • データ統合は、MaxCompute エンジンのデータチャネルを使用してデータのアップロードとダウンロードを行います。データチャネルの Service-Level Agreement (SLA) の詳細については、「Data Transmission Service (アップロード) のシナリオとツール」をご参照ください。MaxCompute エンジンのデータチャネルの SLA に基づいてデータ同期ソリューションを評価する必要があります。

  • インスタンスモードで MaxCompute へのワンクリックリアルタイム同期を行う場合、データ統合専用リソースグループには少なくとも 8 vCPU と 16 GB のメモリが必要です。

  • 現在のワークスペースと同じリージョンにあるユーザー作成の MaxCompute データソースのみがサポートされます。データソースの接続をテストする際には、リージョンをまたいだ MaxCompute プロジェクトに正常に接続できます。ただし、同期タスクが実行されると、MaxCompute でのテーブル作成フェーズで「engine not found」エラーが発生します。

  • MaxCompute がデータベース全体の同期先である場合、サポートされる同期方法はテーブルのタイプによって異なります。標準テーブルの場合、データベース全体の完全同期と増分同期 (ほぼリアルタイム) のみがサポートされます。Delta Table の場合、リアルタイム同期とデータベース全体の完全同期と増分同期 (ほぼリアルタイム) の両方がサポートされます。

    説明

    ユーザー作成の MaxCompute データソースを使用する場合でも、DataWorks プロジェクトは MaxCompute エンジンにアタッチされている必要があります。そうしないと、MaxCompute SQL ノードを作成できず、完全同期のための「done」ノードの作成が失敗します。

注意事項

ターゲットテーブルの列がソース列にマッピングされていない場合、テーブル作成時にターゲット列にデフォルト値が指定されていても、同期タスク完了後にその値は null に設定されます。

サポートされるフィールド型

MaxCompute 1.0、MaxCompute 2.0、および Hive 互換のデータ型がサポートされています。以下のセクションでは、各データ型バージョンでサポートされるフィールド型について説明します。

データ型 1.0 でサポートされるフィールド

フィールド型

オフライン読み取り

オフライン書き込み

リアルタイム書き込み

BIGINT

サポート対象

サポート対象

サポート対象

DOUBLE

サポート対象

サポート対象

サポート対象

DECIMAL

ヘルプとサポート

サポート対象

サポート対象

STRING

サポート対象

サポート対象

サポート対象

DATETIME

サポート対象

サポート対象

サポート対象

BOOLEAN

サポート対象

サポート対象

サポート対象

ARRAY

サポート対象

サポート対象

サポート対象

MAP

サポート対象

サポート対象

サポート対象

STRUCT

サポート対象

サポート対象

サポート対象

データ型 2.0 および Hive 互換データ型でサポートされるフィールド

フィールド型

オフライン読み取り (MaxCompute Reader)

オフライン書き込み (MaxCompute Writer)

リアルタイム書き込み

TINYINT

サポート対象

サポート対象

サポート対象

SMALLINT

サポート対象

サポート対象

サポート対象

INT

サポート対象

サポート対象

サポート対象

BIGINT

サポート対象

サポート対象

サポート対象

BINARY

サポート対象

サポート対象

サポート対象

FLOAT

サポート対象

サポート対象

サポート対象

DOUBLE

サポート対象

サポート対象

サポート対象

DECIMAL(pecision,scale)

サポート対象

サポート対象

サポート対象

VARCHAR(n)

サポート対象

サポート対象

サポート対象

CHAR(n)

サポート対象外

サポート対象

サポート対象

STRING

サポート対象

サポート対象

サポート対象

DATE

サポート対象

サポート対象

サポート対象

DATETIME

サポート対象

サポート対象

サポート対象

TIMESTAMP

サポート対象

サポート対象

サポート対象

BOOLEAN

サポート対象

サポート対象

サポート対象

ARRAY

サポート対象

サポート対象

サポート対象

MAP

サポート対象

サポート対象

サポート対象

STRUCT

サポート対象

サポート対象

サポート対象

データ型のマッピング

次の表に、MaxCompute Reader のデータ型のマッピングを示します。

型カテゴリ

データ統合の型

データベースのデータ型

整数

LONG

BIGINT、INT、TINYINT、SMALLINT

ブール値

BOOLEAN

BOOLEAN

日付と時刻

DATE

DATETIME、TIMESTAMP、DATE

浮動小数点

DOUBLE

FLOAT、DOUBLE、DECIMAL

バイナリ

BYTES

BINARY

複合

STRING

ARRAY、MAP、STRUCT

重要

データ変換に失敗した場合、または宛先データソースへのデータ書き込みに失敗した場合、そのデータはダーティデータと見なされます。これは、ダーティデータのレコードしきい値を使用して処理できます。

前提条件

MaxCompute テーブルからデータを読み書きする前に、必要なプロパティを有効にすることができます。

MaxCompute への接続とプロジェクトレベル設定の有効化

  • MaxCompute クライアントにログインします。詳細については、「ローカルクライアント (odpscmd) を使用した接続」をご参照ください。

  • MaxCompute のプロジェクトレベルの設定を有効にします。必要な権限があることを確認してください。Project Owner アカウントを使用してこれらの操作を実行できます。MaxCompute の権限の詳細については、「ロール計画」をご参照ください。

ACID 属性の有効化

Project Owner アカウントを使用してクライアントで次のコマンドを実行し、ACID 属性を有効にすることができます。MaxCompute の ACID セマンティクスの詳細については、「ACID セマンティクス」をご参照ください。

setproject odps.sql.acid.table.enable=true;

(任意) データ型 2.0 の有効化

MaxCompute データ型 2.0 の timestamp 型を使用するには、Project Owner アカウントを使用してクライアントで次のコマンドを実行し、データ型 2.0 を有効にします。

setproject odps.sql.type.system.odps2=true;

(任意) 権限の付与

MaxCompute 計算リソースをワークスペースにアタッチすると、デフォルトで DataWorks に MaxCompute データソースが作成されます。このデータソースを使用して、現在のワークスペース内でデータを同期できます。別のワークスペースでこの MaxCompute データソースからデータを同期するには、別のワークスペースのデータソースに指定されたアクセスアカウントが、この MaxCompute プロジェクトへのアクセス権限を持っていることを確認する必要があります。クロスアカウント認証の詳細については、「クロスアカウント認証 (MaxCompute および Hologres)」をご参照ください。

MaxCompute データソースの作成

データ同期タスクを開発する前に、DataWorks で MaxCompute プロジェクト用の MaxCompute データソースを作成する必要があります。MaxCompute データソースの作成方法の詳細については、「MaxCompute 計算リソースのアタッチ」をご参照ください。

説明
  • 標準モードのワークスペースは、データソースの分離をサポートしています。開発環境と本番環境のデータソースを追加して分離し、データのセキュリティを確保できます。詳細については、「開発環境と本番環境でのデータソースの分離」をご参照ください。

  • ワークスペース内の odps_first という名前の MaxCompute データソースが [データソース] ページで手動で作成されたものでない場合、それはデータソース機能が更新される前にワークスペースにアタッチされた最初の MaxCompute エンジンのために作成されたデフォルトのデータソースです。データ同期にこのデータソースを選択した場合、その MaxCompute エンジンのプロジェクトからデータを読み書きすることになります。

    データソース設定ページでデータソースが使用する MaxCompute プロジェクトの名前を表示して、データがどのプロジェクトから読み書きされるかを確認できます。詳細については、「データソース管理」をご参照ください。

データ同期タスクの開発

同期タスクの設定のエントリポイントと手順については、次の設定ガイドをご参照ください。

単一テーブルのオフライン同期タスクの設定

単一テーブルのリアルタイム同期タスクの設定

手順については、「データ統合でのリアルタイム同期タスクの設定」および「DataStudio でのリアルタイム同期タスクの設定」をご参照ください。

データベース全体の同期タスクの設定

手順については、「データベース全体のオフライン同期」、「データベース全体のリアルタイム同期」、および「データベース全体の完全同期と増分同期 (ほぼリアルタイム)」をご参照ください。

よくある質問

データ統合に関するその他のよくある質問については、「データ統合」をご参照ください。

付録:スクリプトデモとパラメーターの説明

コードエディタを使用したバッチ同期タスクの設定

コードエディタを使用してバッチ同期タスクを設定する場合、統一されたスクリプト形式の要件に基づいて、スクリプト内の関連パラメーターを設定する必要があります。詳細については、「コードエディタでのタスクの設定」をご参照ください。以下では、コードエディタを使用してバッチ同期タスクを設定する際に、データソースに対して設定する必要があるパラメーターについて説明します。

Reader スクリプトデモ

重要

タスクを実行する際は、次のコードからコメントを削除してください。

{
    "type":"job",
    "version":"2.0",
    "steps":[
        {
            "stepType":"odps",// プラグイン名。
            "parameter":{
                "partition":[],// データを読み取るパーティション。
                "isCompress":false,// データを圧縮するかどうかを指定します。
                "datasource":"",// データソース。
                "column":[// ソーステーブルの列。
                    "id"
                ],
                "where": "",// WHERE 句を使用してデータをフィルタリングする場合は、具体的な句を入力します。
                "enableWhere":false,// WHERE 句を使用してデータをフィルタリングするかどうかを指定します。
                "table":""// テーブル名。
            },
            "name":"Reader",
            "category":"reader"
        },
        { 
            "stepType":"stream",
            "parameter":{
            },
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"// エラーレコード数。
        },
        "speed":{
            "throttle":true,// throttle が false に設定されている場合、mbps パラメーターは有効にならず、データレートは制限されません。throttle が true に設定されている場合、データレートは制限されます。
            "concurrent":1, // 同時実行ジョブ数。
            "mbps":"12"// データレートの制限。1 mbps = 1 MB/s。
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

MaxCompute の Tunnel エンドポイントを指定するには、コードエディタでデータソースを手動で設定できます。上記の例の "datasource":"", を特定のデータソースパラメーターに置き換えます。次のコードは例です。

"accessId":"*******************",
"accessKey":"*******************",
"endpoint":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"odpsServer":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api", 
"tunnelServer":"http://dt.eu-central-1.maxcompute.aliyun.com", 
"project":"*****", 

Reader スクリプトのパラメーター

パラメーター

説明

必須

デフォルト値

datasource

データソースの名前。コードエディタでデータソースを追加できます。このパラメーターの値は、追加したデータソースの名前と同じである必要があります。

はい

なし

table

ソーステーブルの名前。名前は大文字と小文字を区別しません。

はい

なし

partition

データを読み取るパーティション。

  • ODPS のパーティション設定では、Linux シェルのワイルドカード文字がサポートされています。アスタリスク (*) は 0 個以上の文字を表し、疑問符 (?) は 1 個の文字を表します。

  • デフォルトでは、指定されたパーティションが存在する必要があります。パーティションが存在しない場合、タスクは失敗します。パーティションが存在しなくてもタスクを成功させたい場合は、コードエディタに切り替えて、ODPS パラメーターに "successOnNoPartition": true 設定を追加します。

たとえば、test という名前のパーティションテーブルに pt=1,ds=hangzhoupt=1,ds=shanghaipt=2,ds=hangzhoupt=2,ds=beijing の 4 つのパーティションがあるとします。次の例は、データを読み取るためのパーティションの設定方法を示しています。

  • pt=1,ds=hangzhou パーティションからデータを読み取るには、partition パラメーターを "partition":"pt=1,ds=hangzhou” に設定します。

  • pt=1 のすべてのパーティションからデータを読み取るには、パーティション情報を "partition":"pt=1,ds=*" と設定します。

  • test テーブルのすべてのパーティションからデータを読み取るには、partition パラメーターを "partition":"pt=*,ds=*" に設定します。

要件に応じて、条件を設定してパーティションデータを取得することもできます。

  • 最新のパーティションを指定するには、/*query*/ ds=(select MAX(ds) from DataXODPSReaderPPR) 設定を追加します。

  • 条件でフィルタリングするには、/*query*/ pt+expression のような条件式を追加します。たとえば、/*query*/ pt>=20170101 and pt<20170110 は、pt の値が 20170101 (含む) から 20170110 (含まない) までのパーティションからすべてのデータを取得します。

説明

/*query*/ は、後に続く内容が WHERE 条件であることを示します。

パーティションテーブルでは必須です。標準テーブルではこのパラメーターを指定しないでください。

なし

column

MaxCompute ソーステーブルから読み取る列。たとえば、test という名前のテーブルに idnameage というフィールドがあるとします。

  • idnameage の列を順番に読み取るには、column パラメーターを "column":["id","name","age"] または "column":["*"] に設定します。

    説明

    フィールドを抽出するために列パラメーターを (*) に設定することは推奨されません。この設定は、テーブルのすべての列を順番に読み取ります。テーブルのフィールドの順序、型、または数が変更されると、ソース列と宛先列の間に不整合が生じるリスクがあります。これにより、誤った結果やタスクの失敗につながる可能性があります。

  • nameid の列をその順序で読み取るには、column パラメーターを "column":["name","id"] に設定します。

  • 抽出されたソースフィールドに定数フィールドを追加して、ターゲットテーブルのフィールド順序に合わせるには、次の例に従います。age 列の値、name 列の値、定数の日付値 1988-08-08 08:08:08、および id 列の値を抽出するには、column パラメーターを "column":["age","name","'1988-08-08 08:08:08'","id"] に設定します。定数列は一重引用符 (') で囲みます。

    内部的には、設定された各フィールドをチェックすることで定数が識別されます。フィールドが一重引用符 (') で囲まれている場合、それは定数フィールドとして扱われます。その実際の値は、一重引用符 (') を除いた文字列です。

    説明
    • データフィルタリングモード (enableWhere=true かつ where が空でない) を使用する場合、column パラメーターで MaxCompute 関数を使用できます。データフィルタリングモードが無効の場合はサポートされません。

    • 同期する列のセットを明示的に指定する必要があります。column パラメーターを空にすることはできません。

はい

なし

enableWhere

WHERE 句を使用してデータをフィルタリングするかどうかを指定します。

いいえ

false

where

WHERE 句を使用してデータをフィルタリングする場合は、具体的な句を入力します。

いいえ

なし

Writer スクリプトデモ

次のコードは、サンプルスクリプト設定です。

{
    "type":"job",
    "version":"2.0",// バージョン番号。
    "steps":[
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"odps",// プラグイン名。
            "parameter":{
                "partition":"",// パーティション情報。
                "truncate":true,// クリーンアップルール。
                "compress":false,// データを圧縮するかどうかを指定します。
                "datasource":"odps_first",// データソース名。
            "column": [// ソース列名。
                "id",
                "name",
                "age",
                "sex",
                "salary",
                "interest"
                ],
                "table":""// テーブル名。
            },
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"// エラーレコード数。これは許容されるダーティデータの最大レコード数です。
        },
        "speed":{
            "throttle":true,// throttle が false に設定されている場合、mbps パラメーターは有効にならず、データレートは制限されません。throttle が true に設定されている場合、データレートは制限されます。
            "concurrent":1, // 同時実行ジョブ数。
            "mbps":"12"// データレートの制限。1 mbps = 1 MB/s。
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

MaxCompute の Tunnel エンドポイントを指定するには、コードエディタでデータソースを手動で設定できます。上記の例の "datasource":"", を特定のデータソースパラメーターに置き換えます。次のコードは例です。

"accessId":"<yourAccessKeyId>",
 "accessKey":"<yourAccessKeySecret>",
 "endpoint":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
 "odpsServer":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api", 
"tunnelServer":"http://dt.eu-central-1.maxcompute.aliyun.com", 
"project":"**********", 

Writer スクリプトのパラメーター

パラメーター

説明

必須

デフォルト値

datasource

データソースの名前。コードエディタでデータソースを追加できます。このパラメーターの値は、追加したデータソースの名前と同じである必要があります。

はい

なし

table

宛先テーブルの名前。名前は大文字と小文字を区別しません。複数のテーブルを指定することはできません。

はい

なし

partition

宛先テーブルのパーティション情報。最下位レベルのパーティションを指定する必要があります。たとえば、3 階層のパーティションテーブルにデータを書き込むには、pt=20150101, type=1, biz=2 のように最下位レベルのパーティションを指定する必要があります。

  • 標準テーブルの場合、このパラメーターは指定しないでください。これは、ターゲットテーブルへの直接インポートを示します。

  • MaxCompute Writer は、書き込み時のデータルーティングをサポートしていません。パーティションテーブルの場合、最下位レベルのパーティションにデータを書き込むようにしてください。

パーティションテーブルでは必須です。標準テーブルではこのパラメーターを指定しないでください。

なし

column

インポートするフィールドのリスト。すべてのフィールドをインポートするには、パラメーターを "column": ["*"] に設定できます。MaxCompute の一部の列にのみデータを挿入するには、"column": ["id","name"] のように、それらの列を指定します。

  • MaxCompute Writer は、列のフィルタリングと並べ替えをサポートしています。たとえば、テーブルに 3 つのフィールド (a、b、c) があり、フィールド c と b のみを同期したい場合、パラメーターを "column": ["c","b"] に設定できます。インポートプロセス中に、フィールド a は自動的に null で埋められます。

  • 同期する列のセットを明示的に指定する必要があります。column パラメーターを空にすることはできません。

はい

なし

truncate

"truncate": "true" を設定して、書き込み操作のべき等性を確保します。

データクリーンアップは MaxCompute SQL を使用して実行され、原子性が保証されないため、truncate オプションはアトミックな操作ではありません。複数のタスクが同じ Table または Partition のパーティションを同時にクリアすると、同時実行のタイミングの問題が発生する可能性があります。この点には十分注意してください。

このような問題を回避するには、同じパーティションで複数の DDL ジョブを同時に実行しないようにするか、複数の同時実行ジョブを開始する前にパーティションを作成します。

はい

なし

emptyAsNull

書き込み前に空の文字列を NULL に変換するかどうかを指定します。

いいえ

false

consistencyCommit

同期後の可視性。

  • はい (true):同期タスクが成功した後にのみ、データが一度に表示されるようになります。ただし、データ量が 1 TB を超えると、MaxCompute が同期できるブロックの最大数が 300,000 であるため、同期タスクは失敗します。

  • いいえ (false):同期タスクが完了する前に、MaxCompute に同期された一部のデータをクエリできます。ただし、具体的にどの部分が表示されるかは予測できません。したがって、ダウンストリームタスクがこのテーブルを使用する場合は、データの整合性に注意してください。

いいえ

false