すべてのプロダクト
Search
ドキュメントセンター

DataWorks:MaxCompute データソース

最終更新日:Mar 11, 2026

MaxCompute データソースは、MaxCompute からのデータの読み取りや MaxCompute へのデータの書き込みを行うためのデータハブとして機能します。

機能

説明

DataWorks の MaxCompute データソースは、Tunnel エンドポイントアドレスを使用して、対応する MaxCompute プロジェクトの Tunnel サービスにアクセスします。この接続により、DownloadTable 操作を使用するアップロードやダウンロードなどのデータ同期が可能になります。

2023 年 12 月 11 日以降に作成された MaxCompute データソースの場合、データソースが存在する DataWorks サービスがターゲットの MaxCompute プロジェクトと異なるリージョンにある場合、Tunnel エンドポイントアドレスを使用して直接データを同期することはできません。このシナリオでクロスリージョン同期を実行するには、まず Cloud Enterprise Network (CEN) を使用して 2 つのサービスのネットワークを接続する必要があります。CEN とその操作の詳細については、「CEN とは」をご参照ください。

バッチ読み取り

  • MaxCompute Reader は、パーティションテーブルと非パーティション化テーブルからのデータ読み取りをサポートします。仮想ビューからのデータ読み取りや外部テーブルの同期はサポートしていません。

  • MaxCompute のパーティションテーブルからバッチでデータを読み取る場合、パーティション列を直接マッピングすることはできません。パーティション列の値を同期するには、カスタム列を追加し、マッピングのためにパーティション名を手動で入力します。

  • スケジューリングパラメーターを使用してパーティションを指定し、自動置換を行うことができます。これは、スケジューリング時間に対応するパーティションからデータを同期する必要があるシナリオで役立ちます。

    たとえば、列 idname、レベル 1 のパーティション pt、およびレベル 2 のパーティション ds を持つテーブル t0 を考えます。パーティション pt=<ビジネス日付> および ds=hangzhou からデータを読み取るには、データソースを設定する際にパーティション値を pt=${スケジューリングパラメーター} および ds=hangzhou として指定する必要があります。その後、idname の列をマッピングできます。

  • パーティション列を送信先テーブルに書き込むには、それらをカスタム列として追加します。

  • MaxCompute Reader は、WHERE 句を使用したデータフィルタリングをサポートします。

バッチ書き込み

  • データに NULL 値が含まれる場合、MaxCompute Writer は VARCHAR データ型をサポートしていません。

  • 送信先テーブルが DeltaTable の場合、[詳細設定] を展開し、[同期後の可視性][はい] に設定します。そうしないと、同時実行数が 1 より大きいシナリオでタスクが失敗します。

  • MaxCompute 外部テーブルへのデータ書き込みはサポートされていません。

  • 送信先列がソース列からマッピングされていない場合、同期後に NULL になり、テーブル作成時に指定されたデフォルト値を上書きします。

リアルタイム書き込み

  • リアルタイムデータ同期タスクは、サーバーレスリソースグループをサポートします。

  • リアルタイム同期の送信先テーブルには、プライマリキーが必要です。

  • MaxCompute 外部テーブルへのデータ書き込みはサポートされていません。

  • 通常 odps_first であるデフォルトの MaxCompute データソースにリアルタイムでデータを同期する場合、デフォルトでは一時的な Access Key (AK) が同期に使用されます。この一時的な AK は 7 日後に自動的に有効期限切れになり、タスクが失敗します。プラットフォームは、一時的な AK が原因で失敗したことを検出すると、タスクを自動的に再起動します。タスクにモニタリングアラートを設定している場合は、アラートメッセージが届きます。

  • ワンクリックリアルタイム同期タスクの場合、既存データは設定日にクエリできます。増分データは、翌日にマージされた後に利用可能になります。

  • MaxCompute へのワンクリックリアルタイム同期タスクは、毎日フルパーティションを作成します。過剰なストレージ使用量を防ぐため、自動的に作成される MaxCompute テーブルにはデフォルトで 30 日のライフサイクルが設定されています。この持続時間がビジネス要件を満たさない場合は、タスク設定中に MaxCompute テーブル名をクリックしてライフサイクルを変更できます。

  • Data Integration は、MaxCompute エンジンのデータ同期チャネルを使用してデータのアップロードとダウンロードを行います。このチャネルのサービスレベルアグリーメント (SLA) の詳細については、「データアップロードのシナリオとツール」をご参照ください。チャネルの SLA に基づいてデータ同期ソリューションを評価する必要があります。

  • インスタンスモードで MaxCompute へのワンクリックリアルタイム同期を行う場合、データ統合専用リソースグループには、最低でも 8 コア 16 GB の仕様が必要です。

  • 現在のワークスペースと同じリージョンにあるカスタム MaxCompute データソースのみを使用できます。クロスリージョンのデータソースの場合、接続性テストは成功する可能性がありますが、MaxCompute でのテーブル作成フェーズ中に「engine not found」エラーで同期タスクが失敗します。

  • MaxCompute がデータベース全体の同期の送信先として機能する場合、標準テーブルはリアルタイムデータベース全体の同期の増分ログモードとワンクリックリアルタイム完全増分同期のみをサポートします。対照的に、Delta Table はリアルタイムデータベース全体の同期とワンクリックリアルタイム完全増分同期の両方をサポートします。

    説明

    カスタム MaxCompute データソースを使用する場合でも、DataWorks プロジェクトは MaxCompute エンジンに関連付けられている必要があります。そうしないと、MaxCompute SQL ノードを作成できず、その結果、完全同期完了マーカーノードが失敗します。

サポートされるデータ型

MaxCompute 1.0、2.0、および Hive 互換のデータ型がサポートされています。以下のセクションでは、各データ型バージョンでサポートされる列の型について説明します。

データ型 1.0

バッチ読み取り

バッチ書き込み

リアルタイム書き込み

BIGINT

サポート

サポート

サポート

DOUBLE

サポート

サポート

サポート

DECIMAL

サポート

サポート

サポート

STRING

サポート

サポート

サポート

DATETIME

サポート

サポート

サポート

BOOLEAN

サポート

サポート

サポート

ARRAY

サポート

サポート

サポート

MAP

サポート

サポート

サポート

STRUCT

サポート

サポート

サポート

データ型 2.0 および Hive 互換のデータ型

バッチ読み取り

バッチ書き込み

リアルタイム書き込み

TINYINT

サポート

サポート

サポート

SMALLINT

サポート

サポート

サポート

INT

サポート

サポート

サポート

BIGINT

サポート

サポート

サポート

BINARY

サポート

サポート

サポート

FLOAT

サポート

サポート

サポート

DOUBLE

サポート

サポート

サポート

DECIMAL(precision,scale)

サポート

サポート

サポート

VARCHAR(n)

サポート

サポート

サポート

CHAR(n)

非サポート

サポート

サポート

STRING

サポート

サポート

サポート

DATE

サポート

サポート

サポート

DATETIME

サポート

サポート

サポート

TIMESTAMP

サポート

サポート

サポート

BOOLEAN

サポート

サポート

サポート

ARRAY

サポート

サポート

サポート

MAP

サポート

サポート

サポート

STRUCT

サポート

サポート

サポート

データ型マッピング

次の表に、MaxCompute Reader のデータ型マッピングを示します。

カテゴリ

Data Integration の型

データベースの型

整数

LONG

BIGINT、INT、TINYINT、および SMALLINT

ブール値

BOOLEAN

BOOLEAN

日付と時刻

DATE

DATETIME、TIMESTAMP、および DATE

浮動小数点

DOUBLE

FLOAT、DOUBLE、および DECIMAL

バイナリ

BYTES

BINARY

複合

STRING

ARRAY、MAP、および STRUCT

重要

データ型の変換に失敗した場合、またはデータを送信先に書き込めない場合、それはダーティデータとして分類されます。この機能は、ダーティデータのしきい値と組み合わせて使用できます。

前提条件

MaxCompute テーブルからデータを読み取る、またはテーブルにデータを書き込む前に、特定のプロパティを有効にする必要がある場合があります。

MaxCompute への接続とプロジェクト設定の有効化

  • MaxCompute クライアントにログインします。詳細については、「ローカルクライアント (odpscmd) を使用した接続」をご参照ください。

  • MaxCompute のプロジェクトレベルの設定を有効にします。必要な権限があることを確認してください。Project Owner ロールを持つアカウントを使用して、これらの操作を実行できます。MaxCompute の権限の詳細については、「ロール計画」をご参照ください。

ACID セマンティクスの有効化

Project Owner ロールを持つアカウントを使用して、クライアントで次のコマンドを実行し、ACID プロパティを有効にできます。MaxCompute の ACID セマンティクスの詳細については、「ACID セマンティクス」をご参照ください。

setproject odps.sql.acid.table.enable=true;

(オプション) データ型 2.0 の有効化

TIMESTAMP データ型を使用するには、MaxCompute データ型 2.0 を有効にする必要があります。Project Owner ロールを持つアカウントを使用して、次のコマンドを実行します。

setproject odps.sql.type.system.odps2=true;

(オプション) アカウント権限の付与

MaxCompute コンピューティングリソースをワークスペースに関連付けると、デフォルトで DataWorks に MaxCompute データソースが作成されます。このデータソースは、現在のワークスペース内でのデータ同期に使用できます。このデータソースを別のワークスペースから使用するには、他のワークスペースで設定されたアクセスアカウントが、元の MaxCompute プロジェクトに対する必要な権限を持っている必要があります。クロスアカウント認証の詳細については、「クロスアカウント認証 (MaxCompute および Hologres)」をご参照ください。

MaxCompute データソースの作成

データ同期タスクを開発する前に、MaxCompute プロジェクトを DataWorks の MaxCompute データソースとして追加する必要があります。手順については、「MaxCompute コンピューティングリソースの関連付け」をご参照ください。

説明
  • 標準モードのワークスペースは、データソースの隔離をサポートしています。開発環境と本番環境のデータソースを追加して隔離し、データを保護できます。詳細については、「開発環境と本番環境でのデータソースの隔離」をご参照ください。

  • ワークスペース内の odps_first という名前の MaxCompute データソースがデータソースページで手動で作成されなかった場合、それは新バージョンのデータソースがリリースされる前にワークスペースに関連付けた最初の MaxCompute エンジンに対してデフォルトで作成されたものです。データ同期のためにこのデータソースを選択すると、その特定の MaxCompute エンジンプロジェクトからデータを読み取るか、またはデータを書き込むことになります。

    データソースが使用する MaxCompute プロジェクトの名前をデータソース設定ページで表示して、読み取りおよび書き込み操作の最終的なプロジェクトを確認できます。詳細については、「データソース管理」をご参照ください。

データ同期タスク

同期タスクの設定のエントリポイントと手順については、以下の設定ガイドをご参照ください。

単一テーブルのバッチ同期

単一テーブルのリアルタイム同期

手順については、「単一テーブルから増分データを同期するためのリアルタイム同期ノードの作成」および「DataStudio でのリアルタイム同期タスクの設定」をご参照ください。

データベース全体の同期

手順については、「バッチデータベース全体の同期タスク」、「リアルタイムデータベース全体の同期タスクの設定」、および「ワンクリックリアルタイム完全増分同期」をご参照ください。

よくある質問

Data Integration に関するその他のよくある質問については、「Data Integration に関するよくある質問」をご参照ください。

付録:コードとパラメーター

コードエディタを使用したバッチ同期タスクの設定

コードエディタを使用してバッチ同期タスクを設定する場合、統一されたスクリプト形式の要件に基づいて、スクリプト内の関連パラメーターを設定する必要があります。詳細については、「コードエディタの使用」をご参照ください。以下の情報は、コードエディタを使用してバッチ同期タスクを設定する際に、データソースに対して設定する必要があるパラメーターについて説明しています。

Reader スクリプトのデモ

重要

タスクを実行する際は、コードからコメントを削除してください。

{
    "type":"job",
    "version":"2.0",
    "steps":[
        {
            "stepType":"odps",// プラグイン名。
            "parameter":{
                "partition":[],// データを読み取るパーティション。
                "isCompress":false,// データを圧縮するかどうかを指定します。
                "datasource":"",// データソース。
                "column":[// ソーステーブルの列。
                    "id"
                ],
                "where": "",// データフィルタリングのための WHERE 句の内容。
                "enableWhere":false,// データフィルタリングに WHERE 句を使用するかどうかを指定します。
                "table":""// テーブル名。
            },
            "name":"Reader",
            "category":"reader"
        },
        { 
            "stepType":"stream",
            "parameter":{
            },
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"// エラーレコード数。
        },
        "speed":{
            "throttle":true,// throttle を false に設定すると、mbps パラメーターは有効にならず、データ同期速度は制限されません。throttle を true に設定すると、データ同期速度は制限されます。
            "concurrent":1, // 並行スレッド数。
            "mbps":"12"// 最大転送速度。単位は MB/s です。
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

MaxCompute の Tunnel エンドポイントを指定する必要がある場合は、コードエディタでデータソースを手動で設定できます。上記の例の "datasource":"", をデータソースの特定のパラメーターに置き換えます。以下に例を示します。

"accessId":"*******************",
"accessKey":"*******************",
"endpoint":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"odpsServer":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api", 
"tunnelServer":"http://dt.eu-central-1.maxcompute.aliyun.com", 
"project":"*****", 

Reader スクリプトのパラメーター

パラメーター

説明

必須

デフォルト値

datasource

データソースの名前。コードエディタでデータソースを追加できます。このパラメーターの値は、追加されたデータソースの名前と同じでなければなりません。

はい

なし

table

ソーステーブルの名前。値は大文字と小文字を区別しません。

はい

なし

partition

データを読み取るパーティション。

  • ODPS のパーティション設定では、Linux シェルのワイルドカードがサポートされています。* は 0 個以上の文字を表し、? は任意の 1 文字を表します。

  • デフォルトでは、存在しないパーティションを読み取ろうとするとタスクはエラーを報告します。この場合にタスクを正常に実行させるには、コードエディタに切り替えて、ODPS パラメーターに "successOnNoPartition": true 設定を追加します。

たとえば、パーティションテーブル test には、pt=1,ds=hangzhoupt=1,ds=shanghaipt=2,ds=hangzhou、および pt=2,ds=beijing の 4 つのパーティションが含まれています。異なるパーティションからデータを読み取るための設定は次のとおりです。

  • pt=1,ds=hangzhou パーティションからデータを読み取る必要がある場合、パーティション情報の設定は "partition":"pt=1,ds=hangzhou” です。

  • pt=1 のすべてのパーティションからデータを読み取る必要がある場合、パーティション情報の設定は "partition":"pt=1,ds=*" です。

  • test テーブル全体のすべてのパーティションからデータを読み取る必要がある場合、パーティション設定は "partition":"pt=*,ds=*" です。

条件を設定してパーティションからデータを取得することもできます。

  • 最大パーティションを指定する必要がある場合は、/*query*/ ds=(select MAX(ds) from DataXODPSReaderPPR) 設定を追加できます。

  • 条件に基づいてデータをフィルタリングする必要がある場合は、/*query*/ pt+expression 設定を追加できます。たとえば、/*query*/ pt>=20170101 and pt<20170110 は、20170101 (含む) から 20170110 (含まない) までの日付の pt パーティションからすべてのデータを取得します。

説明

/*query*/ は、後続のコンテンツが WHERE 条件として解釈されることを示します。

このパラメーターはパーティションテーブルに必須です。非パーティション化テーブルにはこのパラメーターを設定しないでください。

なし

column

MaxCompute ソーステーブルから列情報を読み取ります。たとえば、テーブル test の列は idname、および age です。

  • idname、および age を順番に読み取る必要がある場合、設定は "column":["id","name","age"] または "column":["*"] にする必要があります。

    説明

    このパラメーターをアスタリスク (*) に設定しないことを推奨します。アスタリスクは、すべての列が順番に読み取られることを示します。テーブルスキーマ (列の順序、型、または数) が変更されると、ソース列と送信先列の間で不一致が発生する可能性があります。これにより、誤った結果やタスクの失敗が発生する可能性があります。

  • nameid を順番に読み取りたい場合は、"column":["name","id"] のように設定する必要があります。

  • ソースから抽出された列に定数列を追加して、ターゲットテーブルの列の順序に一致させたい場合は、column パラメーターで定数を定義する必要があります。たとえば、age 列、name 列、定数の日付値 1988-08-08 08:08:08、および id 列から値を抽出するには、パラメーターを "column":["age","name","'1988-08-08 08:08:08'","id"] に設定する必要があります。これは、定数値は一重引用符 (') で囲む必要があることを意味します。

    内部的には、システムは各設定済みフィールドをチェックすることで定数を識別します。フィールドの値が ' で始まり、' で終わる場合、そのフィールドは定数フィールドと見なされ、実際の値は囲む ' 文字を削除した値になります。

    説明
    • データフィルタリングモード (enableWhere=true であり、where パラメーターが空でない場合) を使用する場合、column で MaxCompute 関数を使用できます。これらの関数は、非データフィルタリングモードではサポートされていません。

    • 同期する列を明示的に指定する必要があります。このパラメーターは空にできません。

はい

なし

enableWhere

データフィルタリングに WHERE 句を使用するかどうかを指定します。

いいえ

false

where

データフィルタリングのための WHERE 句の内容。

いいえ

なし

Writer スクリプトのデモ

次のコードはスクリプトの例です。

{
    "type":"job",
    "version":"2.0",// バージョン番号。
    "steps":[
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"odps",// プラグイン名。
            "parameter":{
                "partition":"",// パーティション情報。
                "truncate":true,// クリーンアップルール。
                "compress":false,// データを圧縮するかどうかを指定します。
                "datasource":"odps_first",// データソース名。
            "column": [// ソース列名。
                "id",
                "name",
                "age",
                "sex",
                "salary",
                "interest"
                ],
                "table":""// テーブル名。
            },
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"// エラーレコード数。これは許容されるダーティデータレコードの最大数を指定します。
        },
        "speed":{
            "throttle":true,// throttle を false に設定すると、mbps パラメーターは有効にならず、データ同期速度は制限されません。throttle を true に設定すると、データ同期速度は制限されます。
            "concurrent":1, // 並行スレッド数。
            "mbps":"12"// 最大転送速度。単位は MB/s です。
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

MaxCompute の Tunnel エンドポイントを指定する必要がある場合は、コードエディタでデータソースを手動で設定できます。上記の例の "datasource":"", をデータソースの特定のパラメーターに置き換えます。以下に例を示します。

"accessId":"<yourAccessKeyId>",
 "accessKey":"<yourAccessKeySecret>",
 "endpoint":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
 "odpsServer":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api", 
"tunnelServer":"http://dt.eu-central-1.maxcompute.aliyun.com", 
"project":"**********", 

Writer スクリプトのパラメーター

パラメーター

説明

必須

デフォルト値

datasource

データソースの名前。コードエディタでデータソースを追加できます。このパラメーターの値は、追加されたデータソースの名前と同じでなければなりません。

はい

なし

table

送信先テーブルの名前。値は大文字と小文字を区別しません。1 つのテーブルのみ指定できます。

はい

なし

partition

パーティションテーブルにデータを書き込むには、最後のレベルまでパーティション情報を指定する必要があります。たとえば、3 レベルのパーティションテーブルにデータを書き込むには、pt=20150101, type=1, biz=2 のように、最後のレベルまでパーティションを指定する必要があります。

  • 非パーティション化テーブルの場合、このパラメーターを設定しないでください。データは直接送信先テーブルにインポートされます。

  • MaxCompute Writer は、書き込みのためのデータルーティングをサポートしていません。パーティションテーブルの場合、データが最下位レベルのパーティションに書き込まれることを確認する必要があります。

このパラメーターはパーティションテーブルに必須です。非パーティション化テーブルにはこのパラメーターを設定しないでください。

なし

column

インポートする列のリスト。すべての列をインポートするには、このパラメーターを "column": ["*"] に設定します。MaxCompute の列のサブセットのみをインポートするには、"column": ["id","name"] のように列名を指定します。

  • MaxCompute Writer は、列のフィルタリングと列の並べ替えをサポートしています。たとえば、テーブルに a、b、c という名前の 3 つの列があり、c と b の列のみを同期したい場合、設定を "column": ["c","b"] のように設定できます。インポートプロセス中に、列 a は自動的に null に設定されます。

  • 同期する列を明示的に指定する必要があります。このパラメーターは空にできません。

はい

なし

truncate

"truncate": "true" を設定すると、書き込みのべき等性が保証されます。これは、書き込みジョブが失敗して再実行された場合、MaxCompute Writer が前回の実行のデータをクリアし、新しいデータをインポートすることを意味します。これにより、再実行後もデータの一貫性が保たれます。

truncate オプションは、データクレンジングに MaxCompute SQL を使用するため、アトミック操作ではありません。MaxCompute SQL は原子性を保証しないためです。複数のタスクが同時に同じ テーブルまたは パーティションをクリーンアップしようとすると、同時実行の問題が発生する可能性があることに注意してください。

これを避けるには、複数の同時ジョブから同じパーティションに対して DDL 操作を実行しないでください。または、同時ジョブが開始される前にパーティションを作成してください。

はい

なし

emptyAsNull

書き込む前に空の文字列を NULL に変換するかどうかを指定します。

いいえ

false

consistencyCommit

同期後の可視性。

  • はい (true):データは、同期タスクが正常に完了した後にのみ表示されます。ただし、同期するデータ量が 1 TB を超える場合、MaxCompute は最大 300,000 ブロックしか同期できないため、タスクは失敗します。

  • いいえ (false):MaxCompute に部分的に同期されたデータは、タスクが完了する前にクエリできます。ただし、データの表示可能な部分は予測できません。したがって、このテーブルを使用する下流のアプリケーションは、データの不完全性を考慮する必要があります。

いいえ

false