DataWorks は、MaxCompute データソースからデータを読み取り、MaxCompute データソースにデータを書き込むための MaxCompute Reader と MaxCompute Writer を提供しています。
制限
DataWorks では、トンネルエンドポイント を使用して、データソースとして DataWorks に追加された MaxCompute プロジェクトのトンネルサービスにアクセスできます。 この方法では、トンネルサービスを使用して MaxCompute プロジェクトにデータをアップロードしたり、MaxCompute プロジェクトからデータをダウンロードしたりして、データを同期できます。 トンネルサービスを使用したデータのアップロードまたはダウンロードには、テーブルのダウンロード操作が含まれます。
2023 年 12 月 11 日以降に追加された MaxCompute データソースの場合、データソースが追加されたワークスペースが、データソースとして追加された MaxCompute プロジェクトとは異なるリージョンにある場合、トンネルエンドポイントを使用して MaxCompute プロジェクトからデータを同期することはできません。 MaxCompute プロジェクトからデータを同期する場合は、Cloud Enterprise Network(CEN)インスタンスを購入して、DataWorks と MaxCompute プロジェクト間のネットワーク接続を確立する必要があります。 ネットワーク接続が確立されると、リージョンをまたいで MaxCompute プロジェクトからデータを同期できます。 CEN および CEN 関連の操作については、「CEN とは」をご参照ください。
バッチデータ読み取り
MaxCompute Reader は、パーティションテーブルと非パーティションテーブルからのみデータを読み取ることができます。 仮想ビューの読み取りや外部テーブルからのデータの読み取りはできません。
MaxCompute パーティションテーブルからデータを同期するために使用されるバッチ同期タスクを構成する場合、パーティションテーブルのパーティションフィールドと宛先テーブルのフィールド間のマッピングを直接構成することはできません。 ソースを構成するときに、データを同期する パーティション に関する情報を指定する必要があります。
たとえば、t0 という名前のパーティションテーブルに id フィールドと name フィールドが含まれているとします。 レベル 1 パーティションフィールドは pt で、レベル 2 パーティションフィールドは ds です。 t0 テーブルの pt=1,ds=hangzhou パーティションからデータを読み取る場合は、ソースを構成するときに pt=1,ds=hangzhou を指定する必要があります。 この方法では、後で id フィールドと name フィールドのマッピングを構成できます。
MaxCompute Reader では、WHERE 句を使用してデータをフィルタリングできます。
バッチデータ書き込み
ソースのデータに NULL 値が含まれている場合、MaxCompute Writer はデータのデータ型を VARCHAR に変換できません。
MaxCompute Delta table
にデータを書き込むためのバッチ同期タスクを実行する場合は、コードレス ユーザーインターフェース(UI)を使用してタスクを構成するときに、[同期後に表示] パラメータを [はい] に設定する必要があります。 そうしないと、バッチ同期タスクに複数の並列スレッドが構成されている場合、エラーが報告されます。
リアルタイムデータ書き込み
サーバーレス リソースグループまたは Data Integration 専用リソースグループを使用して、リアルタイム同期タスクを実行できます。 サーバーレス リソースグループを使用することをお勧めします。 サーバーレス リソースグループの作成および使用方法については、「サーバーレス リソースグループの作成と使用」をご参照ください。
リアルタイム同期タスクを実行して、PolarDB、Oracle、または MySQL データソースから MaxCompute にのみデータを同期できます。
リアルタイム同期タスクを使用して、プライマリキーのないテーブルからデータを同期することはできません。
デフォルトの MaxCompute データソース
odps_first
をリアルタイム同期タスクの宛先として使用する場合、デフォルトでは一時 AccessKey ペアがデータ同期に使用されます。 一時 AccessKey ペアは 7 日間のみ有効です。 7 日後、一時 AccessKey ペアは自動的に期限切れになり、リアルタイム同期タスクは失敗します。 システムが、一時 AccessKey ペアの期限切れが原因でリアルタイム同期タスクが失敗したことを検出した場合、システムはリアルタイム同期タスクを再起動します。 リアルタイム同期タスクに関連するアラートルールが構成されている場合、システムはアラートを報告します。MaxCompute にデータを同期するために使用されるワンクリック リアルタイム同期タスクを構成した日には、履歴の完全データのみをクエリできます。 完全データと増分データが翌日マージされた後でのみ、増分データをクエリできます。
MaxCompute にデータを同期するために使用されるワンクリック リアルタイム同期タスクは、毎日、MaxCompute テーブルに完全データを格納するためのパーティションを生成します。 データが過剰なストレージリソースを占有するのを防ぐため、自動的に作成される MaxCompute テーブルのデフォルトのライフサイクルは 30 日です。 ライフサイクルがビジネス要件を満たしていない場合は、関連する同期タスクを構成するときに、MaxCompute テーブルの名前をクリックしてテーブルのライフサイクルを変更できます。
Data Integration は、MaxCompute によって提供されるチャネルを使用してデータをアップロードおよびダウンロードします。 ビジネス要件に基づいてチャネルを選択できます。 MaxCompute によって提供されるチャネルの種類の詳細については、「データアップロードのシナリオとツール」をご参照ください。
インスタンス全体モードで MaxCompute にデータを同期するためにリアルタイム同期タスクを実行する場合は、同期タスクの実行に使用する Data Integration 専用リソースグループの仕様が 8 vCPU 以上、メモリ 16 GiB 以上である必要があります。
ワークスペースと同じリージョンにあるセルフマネージド MaxCompute データソースのみを使用できます。 ワークスペースとは異なるリージョンにあるセルフマネージド MaxCompute データソースを使用する場合、データソースは使用するリソースグループに接続できます。 ただし、同期タスクの実行中にシステムが MaxCompute テーブルを作成すると、コンピューティングエンジンが存在しないことを示すエラーが報告されます。
説明セルフマネージド MaxCompute データソースを使用する場合は、MaxCompute コンピューティングエンジンを DataWorks ワークスペースに関連付ける必要があります。 そうしないと、ODPS SQL ノードを作成できません。 その結果、完全同期の終了を示すために使用されるノードを作成できません。
注意事項
宛先テーブルの列にマップされたソース列がない場合、同期タスクの実行が完了した後、宛先テーブルの列の値は NULL になります。 また、システムが宛先テーブルを作成するときに列にデフォルト値が指定されている場合でも、同期タスクの実行が完了した後、列の値は NULL になります。
サポートされているデータ型
次のデータ型エディションがサポートされています。MaxCompute V1.0 データ型エディション、MaxCompute V2.0 データ型エディション、および Hive 互換データ型エディション。 このセクションでは、各エディションのデータ型のサポート状況について説明します。
MaxCompute V1.0 データ型エディション
データ型 | バッチデータ読み取り用 MaxCompute Reader | バッチデータ書き込み用 MaxCompute Writer | リアルタイムデータ書き込み用 MaxCompute Writer |
BIGINT | サポートされています | サポートされています | サポートされています |
DOUBLE | サポートされています | サポートされています | サポートされています |
DECIMAL | サポートされています | サポートされています | サポートされています |
STRING | サポートされています | サポートされています | サポートされています |
DATETIME | サポートされています | サポートされています | サポートされています |
BOOLEAN | サポートされています | サポートされています | サポートされています |
ARRAY | サポートされています | サポートされています | サポートされています |
MAP | サポートされています | サポートされています | サポートされています |
STRUCT | サポートされています | サポートされています | サポートされています |
MaxCompute V2.0 データ型エディションと Hive 互換データ型エディション
データ型 | バッチデータ読み取り用 MaxCompute Reader | バッチデータ書き込み用 MaxCompute Writer | リアルタイムデータ書き込み用 MaxCompute Writer |
TINYINT | サポートされています | サポートされています | サポートされています |
SMALLINT | サポートされています | サポートされています | サポートされています |
INT | サポートされています | サポートされています | サポートされています |
BIGINT | サポートされています | サポートされています | サポートされています |
BINARY | サポートされています | サポートされています | サポートされています |
FLOAT | サポートされています | サポートされています | サポートされています |
DOUBLE | サポートされています | サポートされています | サポートされています |
DECIMAL(pecision,scale) | サポートされています | サポートされています | サポートされています |
VARCHAR(n) | サポートされています | サポートされています | サポートされています |
CHAR(n) | サポートされていません | サポートされています | サポートされています |
STRING | サポートされています | サポートされています | サポートされています |
DATE | サポートされています | サポートされています | サポートされています |
DATETIME | サポートされています | サポートされています | サポートされています |
TIMESTAMP | サポートされています | サポートされています | サポートされています |
BOOLEAN | サポートされています | サポートされています | サポートされています |
ARRAY | サポートされています | サポートされています | サポートされています |
MAP | サポートされています | サポートされています | サポートされています |
STRUCT | サポートされています | サポートされています | サポートされています |
データ型のマッピング
次の表に、MaxCompute Reader がデータ型を変換する際に基づくデータ型のマッピングを示します。
カテゴリ | Data Integration データ型 | MaxCompute データ型 |
整数 | LONG | BIGINT、INT、TINYINT、および SMALLINT |
ブール値 | BOOLEAN | BOOLEAN |
日付と時刻 | DATE | DATETIME、TIMESTAMP、および DATE |
浮動小数点 | DOUBLE | FLOAT、DOUBLE、および DECIMAL |
バイナリ | BYTES | BINARY |
複合 | STRING | ARRAY、MAP、および STRUCT |
データ変換が失敗した場合、またはデータが宛先に書き込めなかった場合、データはダーティデータと見なされます。 許容されるダーティデータレコードの最大数を指定できます。
データ同期前の準備
MaxCompute テーブルからデータを読み取ったり、MaxCompute テーブルにデータを書き込んだりする前に、ビジネス要件に基づいて関連するプロパティを有効にするかどうかを決定できます。
MaxCompute に接続し、プロジェクトレベルの構成を実行するための権限を取得する
MaxCompute クライアントを起動します。 詳細については、「MaxCompute クライアント(odpscmd)」をご参照ください。
プロジェクトレベルの構成を実行するための権限を取得します。 アカウントに必要な権限があることを確認してください。 プロジェクトオーナーのアカウントを使用して、関連する操作を実行できます。 詳細については、「ロールプランニング」をご参照ください。
ACID セマンティクスを有効にする
プロジェクトオーナーのアカウントを使用して、MaxCompute クライアントで次のコマンドを実行し、原子性、一貫性、分離性、および耐久性(ACID)セマンティクスを有効にすることができます。 ACID セマンティクスの詳細については、「ACID セマンティクス」をご参照ください。
setproject odps.sql.acid.table.enable=true;
(オプション)MaxCompute V2.0 データ型エディションを有効にする
MaxCompute V2.0 データ型エディションで TIMESTAMP データ型を使用する場合は、プロジェクトオーナーのアカウントを使用して、MaxCompute クライアントで次のコマンドを実行し、MaxCompute V2.0 データ型エディションを有効にします。
setproject odps.sql.type.system.odps2=true;
(オプション)アカウントを作成する
MaxCompute プロジェクトをコンピューティングエンジンとしてワークスペースに関連付けると、DataWorks はワークスペースにデフォルトの MaxCompute データソースを生成します。 現在のワークスペースでのデータ同期に、デフォルトの MaxCompute データソースを使用できます。 現在のワークスペースのデフォルトの MaxCompute データソースにあるデータを別のワークスペースから同期する場合は、AccessKey ペアを作成する必要があります。 この方法では、別のワークスペースで MaxCompute データソースを追加および使用するときに、AccessKey ペアを使用してコンピューティングエンジンのデータにアクセスできます。
AccessKey ペアを作成します。 詳細については、「Alibaba Cloud アカウントを準備する」をご参照ください。
MaxCompute データソースを追加します。 詳細については、「MaxCompute データソースを追加する」をご参照ください。
MaxCompute データソースを追加する
MaxCompute データソースの同期タスクを開発する前に、MaxCompute プロジェクトをデータソースとして目的のワークスペースに追加する必要があります。 MaxCompute データソースを追加する方法の詳細については、「MaxCompute データソースを追加する」をご参照ください。
標準モードのワークスペースでは、データソースを分離できます。 開発環境と本番環境のデータソースを個別に追加して、データソースを分離できます。 これにより、データの安全性が保たれます。 詳細については、「開発環境と本番環境でデータソースを分離する」をご参照ください。
Alibaba Cloud が新しいバージョンのデータソース をリリースする前に、システムは、コンピューティングエンジンとしてワークスペースに関連付けた最初の MaxCompute プロジェクトに基づいて、odps_first という名前の MaxCompute データソースを自動的に生成します。 同期タスクを構成するときに MaxCompute データソースを選択すると、同期タスクは、MaxCompute データソースが生成された MaxCompute プロジェクトに基づいてデータを読み取るか、データを書き込みます。
DataWorks コンソール の 管理センター 内の データソース ページに移動して、MaxCompute プロジェクトの名前を表示できます。詳細については、「データソースを管理する」をご参照ください。
データ同期タスクを開発する
同期タスクのエントリポイントと手順については、次の構成ガイドを参照してください。
単一テーブルのデータを同期するためのバッチ同期タスクを構成する
構成手順の詳細については、「コードレス UI を使用してバッチ同期タスクを構成する」および「コードエディタを使用してバッチ同期タスクを構成する」をご参照ください。
コードエディタを使用してバッチ同期タスクを構成するときに構成されるすべてのパラメータと実行されるコードについては、「付録: コードとパラメータ」をご参照ください。
単一テーブルのデータを同期するためのリアルタイム同期タスクを構成する
構成手順の詳細については、「単一テーブルから増分データを同期するためのリアルタイム同期ノードを作成する」および「DataStudio でリアルタイム同期タスクを構成する」をご参照ください。
データベース内のすべてのデータのバッチ同期、データベース内の完全データまたは増分データのリアルタイム同期、およびシャーディングデータベース内のテーブルからのデータのリアルタイム同期を実装するための同期設定を構成する
構成手順の詳細については、「Data Integration で同期タスクを構成する」をご参照ください。
FAQ
MaxCompute テーブルからデータを同期するバッチ同期タスクでソースフィールドを追加するときに注意する必要がある項目は何ですか。
MaxCompute テーブルのパーティションキー列からデータを読み取るようにバッチ同期タスクを構成するにはどうすればよいですか。
MaxCompute テーブルの複数のパーティションからデータを読み取るようにバッチ同期タスクを構成するにはどうすればよいですか。
宛先 MaxCompute テーブルに書き込むフィールドの数がテーブルのフィールドの数よりも多い場合はどうすればよいですか。
タスクの再実行とフェールオーバーのシナリオで MaxCompute に書き込まれるデータのべき等性を確保するにはどうすればよいですか。
Data Integration のその他の一般的な問題については、「Data Integration についての FAQ」をご参照ください。
付録: コードとパラメータ
コードエディタを使用してバッチ同期タスクを構成する
コードエディタを使用してバッチ同期タスクを構成する場合は、統合スクリプト形式の要件に基づいてスクリプトに関連パラメータを構成する必要があります。 詳細については、「コードエディタを使用してバッチ同期タスクを構成する」をご参照ください。 次の情報では、コードエディタを使用してバッチ同期タスクを構成するときにデータソースに構成する必要があるパラメータについて説明します。
MaxCompute Reader のコード
コードを実行する前に、次のコードからコメントを削除する必要があります。
{
"type":"job",
"version":"2.0",
"steps":[
{
"stepType":"odps",// プラグイン名。
"parameter":{
"partition":[],// データを読み取るパーティションの名前。
"isCompress":false,// 圧縮を有効にするかどうかを指定します。
"datasource":"",// データソースの名前。
"column":[// 列の名前。
"id"
],
"where": "",// データをフィルタリングするために使用する WHERE 句。
"enableWhere":false,// WHERE 句を使用してデータをフィルタリングするかどうかを指定します。
"table":""// データを読み取るテーブルの名前。
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// 許容されるダーティデータレコードの最大数。
},
"speed":{
"throttle":true,// スロットリングを有効にするかどうかを指定します。 値 false はスロットリングが無効になっていることを示し、値 true はスロットリングが有効になっていることを示します。 mbps パラメータは、throttle パラメータが true に設定されている場合にのみ有効になります。
"concurrent":1, // 並列スレッドの最大数。
"mbps":"12"// 最大伝送速度。 単位: MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
MaxCompute のトンネルエンドポイント を指定する場合は、コードエディタを使用してデータソースを構成できます。 データソースを構成するには、上記のコードの "datasource":"",
をデータソースのパラメータに置き換えます。 次のコードは例を示しています。
"accessId":"*******************",
"accessKey":"*******************",
"endpoint":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"odpsServer":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"tunnelServer":"http://dt.eu-central-1.maxcompute.aliyun.com",
"project":"*****",
MaxCompute Reader のコードのパラメータ
パラメータ | 説明 | 必須 | デフォルト値 |
datasource | データソースの名前。 追加されたデータソースの名前と同じである必要があります。 コードエディタを使用してデータソースを追加できます。 | はい | デフォルト値なし |
table | データを読み取るテーブルの名前。 大文字と小文字は区別されません。 | はい | デフォルト値なし |
partition | データを読み取るパーティションの名前。
たとえば、パーティションテーブル test には、pt=1,ds=hangzhou、pt=1,ds=shanghai、pt=2,ds=hangzhou、および pt=2,ds=beijing の 4 つのパーティションが含まれています。 この場合、次の手順に基づいて partition パラメータを構成できます。
ビジネス要件に基づいて、パーティションからデータを読み取るための他の条件を指定することもできます。
説明 MaxCompute Reader は、 | パーティションテーブルの場合のみ必須です | デフォルト値はありません |
列 | データを読み取る列の名前。たとえば、test テーブルには、id、name、age 列が含まれています。
| はい | デフォルト値なし |
enableWhere | WHERE 句を使用してデータをフィルタリングするかどうかを指定します | いいえ | false |
where | データのフィルタリングに使用する WHERE 句。 | いいえ | デフォルト値なし |
MaxCompute Writer のコード
サンプルコード:
{
"type":"job",
"version":"2.0",// バージョン番号。
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"odps",// プラグイン名。
"parameter":{
"partition":"",// データを書き込むパーティションの名前。
"truncate":true,// 書き込みルール。
"compress":false,// 圧縮を有効にするかどうかを指定します。
"datasource":"odps_first",// データソースの名前。
"column": [// データを書き込む列の名前。
"id",
"name",
"age",
"sex",
"salary",
"interest"
],
"table":""// データを書き込むテーブルの名前。
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// 許容されるダーティデータレコードの最大数。
},
"speed":{
"throttle":true,// スロットリングを有効にするかどうかを指定します。 値 false はスロットリングが無効になっていることを示し、値 true はスロットリングが有効になっていることを示します。 mbps パラメータは、throttle パラメータが true に設定されている場合にのみ有効になります。
"concurrent":1, // 並列スレッドの最大数。
"mbps":"12"// 最大伝送速度。 単位: MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
MaxCompute のトンネルエンドポイント を指定する場合は、コードエディタでデータソースを構成できます。 データソースを構成するには、上記のコードの "datasource":"",
をデータソースの詳細パラメータに置き換えます。 次のコードは例を示しています。
"accessId":"<yourAccessKeyId>",
"accessKey":"<yourAccessKeySecret>",
"endpoint":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"odpsServer":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"tunnelServer":"http://dt.eu-central-1.maxcompute.aliyun.com",
"project":"**********",
MaxCompute Writer のコードのパラメータ
パラメータ | 説明 | 必須 | デフォルト値 |
datasource | データソースの名前。 追加されたデータソースの名前と同じである必要があります。 コードエディタを使用してデータソースを追加できます。 | はい | デフォルト値なし |
table | データを書き込むテーブルの名前。 大文字と小文字は区別されません。 1 つのテーブルのみを指定できます。 | はい | デフォルト値なし |
partition | データを書き込むパーティションの名前。 最終レベルのパーティションを指定する必要があります。 たとえば、3 レベルのパーティションを持つテーブルにデータを書き込む場合は、
| パーティションテーブルの場合のみ必須 | デフォルト値なし |
column | データを書き込む列の名前。 宛先テーブルのすべての列にデータを書き込むには、値を
| はい | デフォルト値なし |
truncate | 書き込み操作のべき等性を確保するには、 MaxCompute Writer は、MaxCompute SQL を使用してデータを削除します。 MaxCompute SQL はデータの原子性を保証できません。 したがって、TRUNCATE 操作はアトミック操作ではありません。 複数の同期タスクが同じ テーブル または パーティション から並行してデータを削除すると、競合が発生する可能性があります。 この問題を防ぐために、複数の DDL ステートメントを同時に実行して同じパーティションにデータを書き込まないようにすることをお勧めします。 事前に並行して実行する必要がある同期タスク用に異なるパーティションを作成できます。 | はい | デフォルト値なし |