MaxCompute データソースは、MaxCompute からのデータの読み取りや MaxCompute へのデータの書き込みを行うためのデータハブとして機能します。
機能
DataWorks の MaxCompute データソースは、Tunnel エンドポイントアドレスを使用して、対応する MaxCompute プロジェクトの Tunnel サービスにアクセスします。この接続により、DownloadTable 操作を使用するアップロードやダウンロードなどのデータ同期が可能になります。
2023 年 12 月 11 日以降に作成された MaxCompute データソースの場合、データソースが存在する DataWorks サービスがターゲットの MaxCompute プロジェクトと異なるリージョンにある場合、Tunnel エンドポイントアドレスを使用して直接データを同期することはできません。このシナリオでクロスリージョン同期を実行するには、まず Cloud Enterprise Network (CEN) を使用して 2 つのサービスのネットワークを接続する必要があります。CEN とその操作の詳細については、「CEN とは」をご参照ください。
バッチ読み取り
MaxCompute Reader は、パーティションテーブルと非パーティション化テーブルからのデータ読み取りをサポートします。仮想ビューからのデータ読み取りや外部テーブルの同期はサポートしていません。
MaxCompute のパーティションテーブルからバッチでデータを読み取る場合、パーティション列を直接マッピングすることはできません。パーティション列の値を同期するには、カスタム列を追加し、マッピングのためにパーティション名を手動で入力します。
スケジューリングパラメーターを使用してパーティションを指定し、自動置換を行うことができます。これは、スケジューリング時間に対応するパーティションからデータを同期する必要があるシナリオで役立ちます。
たとえば、列
idとname、レベル 1 のパーティションpt、およびレベル 2 のパーティションdsを持つテーブルt0を考えます。パーティションpt=<ビジネス日付>およびds=hangzhouからデータを読み取るには、データソースを設定する際にパーティション値をpt=${スケジューリングパラメーター}およびds=hangzhouとして指定する必要があります。その後、idとnameの列をマッピングできます。パーティション列を送信先テーブルに書き込むには、それらをカスタム列として追加します。
MaxCompute Reader は、WHERE 句を使用したデータフィルタリングをサポートします。
バッチ書き込み
データに NULL 値が含まれる場合、MaxCompute Writer は VARCHAR データ型をサポートしていません。
送信先テーブルが
DeltaTableの場合、[詳細設定] を展開し、[同期後の可視性] を [はい] に設定します。そうしないと、同時実行数が 1 より大きいシナリオでタスクが失敗します。MaxCompute 外部テーブルへのデータ書き込みはサポートされていません。
送信先列がソース列からマッピングされていない場合、同期後に NULL になり、テーブル作成時に指定されたデフォルト値を上書きします。
リアルタイム書き込み
リアルタイムデータ同期タスクは、サーバーレスリソースグループをサポートします。
リアルタイム同期の送信先テーブルには、プライマリキーが必要です。
MaxCompute 外部テーブルへのデータ書き込みはサポートされていません。
通常
odps_firstであるデフォルトの MaxCompute データソースにリアルタイムでデータを同期する場合、デフォルトでは一時的な Access Key (AK) が同期に使用されます。この一時的な AK は 7 日後に自動的に有効期限切れになり、タスクが失敗します。プラットフォームは、一時的な AK が原因で失敗したことを検出すると、タスクを自動的に再起動します。タスクにモニタリングアラートを設定している場合は、アラートメッセージが届きます。ワンクリックリアルタイム同期タスクの場合、既存データは設定日にクエリできます。増分データは、翌日にマージされた後に利用可能になります。
MaxCompute へのワンクリックリアルタイム同期タスクは、毎日フルパーティションを作成します。過剰なストレージ使用量を防ぐため、自動的に作成される MaxCompute テーブルにはデフォルトで 30 日のライフサイクルが設定されています。この持続時間がビジネス要件を満たさない場合は、タスク設定中に MaxCompute テーブル名をクリックしてライフサイクルを変更できます。
Data Integration は、MaxCompute エンジンのデータ同期チャネルを使用してデータのアップロードとダウンロードを行います。このチャネルのサービスレベルアグリーメント (SLA) の詳細については、「データアップロードのシナリオとツール」をご参照ください。チャネルの SLA に基づいてデータ同期ソリューションを評価する必要があります。
インスタンスモードで MaxCompute へのワンクリックリアルタイム同期を行う場合、データ統合専用リソースグループには、最低でも 8 コア 16 GB の仕様が必要です。
現在のワークスペースと同じリージョンにあるカスタム MaxCompute データソースのみを使用できます。クロスリージョンのデータソースの場合、接続性テストは成功する可能性がありますが、MaxCompute でのテーブル作成フェーズ中に「engine not found」エラーで同期タスクが失敗します。
MaxCompute がデータベース全体の同期の送信先として機能する場合、標準テーブルはリアルタイムデータベース全体の同期の増分ログモードとワンクリックリアルタイム完全増分同期のみをサポートします。対照的に、Delta Table はリアルタイムデータベース全体の同期とワンクリックリアルタイム完全増分同期の両方をサポートします。
説明カスタム MaxCompute データソースを使用する場合でも、DataWorks プロジェクトは MaxCompute エンジンに関連付けられている必要があります。そうしないと、MaxCompute SQL ノードを作成できず、その結果、完全同期完了マーカーノードが失敗します。
サポートされるデータ型
MaxCompute 1.0、2.0、および Hive 互換のデータ型がサポートされています。以下のセクションでは、各データ型バージョンでサポートされる列の型について説明します。
データ型 1.0
型 | バッチ読み取り | バッチ書き込み | リアルタイム書き込み |
BIGINT | サポート | サポート | サポート |
DOUBLE | サポート | サポート | サポート |
DECIMAL | サポート | サポート | サポート |
STRING | サポート | サポート | サポート |
DATETIME | サポート | サポート | サポート |
BOOLEAN | サポート | サポート | サポート |
ARRAY | サポート | サポート | サポート |
MAP | サポート | サポート | サポート |
STRUCT | サポート | サポート | サポート |
データ型 2.0 および Hive 互換のデータ型
型 | バッチ読み取り | バッチ書き込み | リアルタイム書き込み |
TINYINT | サポート | サポート | サポート |
SMALLINT | サポート | サポート | サポート |
INT | サポート | サポート | サポート |
BIGINT | サポート | サポート | サポート |
BINARY | サポート | サポート | サポート |
FLOAT | サポート | サポート | サポート |
DOUBLE | サポート | サポート | サポート |
DECIMAL(precision,scale) | サポート | サポート | サポート |
VARCHAR(n) | サポート | サポート | サポート |
CHAR(n) | 非サポート | サポート | サポート |
STRING | サポート | サポート | サポート |
DATE | サポート | サポート | サポート |
DATETIME | サポート | サポート | サポート |
TIMESTAMP | サポート | サポート | サポート |
BOOLEAN | サポート | サポート | サポート |
ARRAY | サポート | サポート | サポート |
MAP | サポート | サポート | サポート |
STRUCT | サポート | サポート | サポート |
データ型マッピング
次の表に、MaxCompute Reader のデータ型マッピングを示します。
カテゴリ | Data Integration の型 | データベースの型 |
整数 | LONG | BIGINT、INT、TINYINT、および SMALLINT |
ブール値 | BOOLEAN | BOOLEAN |
日付と時刻 | DATE | DATETIME、TIMESTAMP、および DATE |
浮動小数点 | DOUBLE | FLOAT、DOUBLE、および DECIMAL |
バイナリ | BYTES | BINARY |
複合 | STRING | ARRAY、MAP、および STRUCT |
データ型の変換に失敗した場合、またはデータを送信先に書き込めない場合、それはダーティデータとして分類されます。この機能は、ダーティデータのしきい値と組み合わせて使用できます。
前提条件
MaxCompute テーブルからデータを読み取る、またはテーブルにデータを書き込む前に、特定のプロパティを有効にする必要がある場合があります。
MaxCompute への接続とプロジェクト設定の有効化
MaxCompute クライアントにログインします。詳細については、「ローカルクライアント (odpscmd) を使用した接続」をご参照ください。
MaxCompute のプロジェクトレベルの設定を有効にします。必要な権限があることを確認してください。Project Owner ロールを持つアカウントを使用して、これらの操作を実行できます。MaxCompute の権限の詳細については、「ロール計画」をご参照ください。
ACID セマンティクスの有効化
Project Owner ロールを持つアカウントを使用して、クライアントで次のコマンドを実行し、ACID プロパティを有効にできます。MaxCompute の ACID セマンティクスの詳細については、「ACID セマンティクス」をご参照ください。
setproject odps.sql.acid.table.enable=true;(オプション) データ型 2.0 の有効化
TIMESTAMP データ型を使用するには、MaxCompute データ型 2.0 を有効にする必要があります。Project Owner ロールを持つアカウントを使用して、次のコマンドを実行します。
setproject odps.sql.type.system.odps2=true;(オプション) アカウント権限の付与
MaxCompute コンピューティングリソースをワークスペースに関連付けると、デフォルトで DataWorks に MaxCompute データソースが作成されます。このデータソースは、現在のワークスペース内でのデータ同期に使用できます。このデータソースを別のワークスペースから使用するには、他のワークスペースで設定されたアクセスアカウントが、元の MaxCompute プロジェクトに対する必要な権限を持っている必要があります。クロスアカウント認証の詳細については、「クロスアカウント認証 (MaxCompute および Hologres)」をご参照ください。
MaxCompute データソースの作成
データ同期タスクを開発する前に、MaxCompute プロジェクトを DataWorks の MaxCompute データソースとして追加する必要があります。手順については、「MaxCompute コンピューティングリソースの関連付け」をご参照ください。
標準モードのワークスペースは、データソースの隔離をサポートしています。開発環境と本番環境のデータソースを追加して隔離し、データを保護できます。詳細については、「開発環境と本番環境でのデータソースの隔離」をご参照ください。
ワークスペース内の odps_first という名前の MaxCompute データソースがデータソースページで手動で作成されなかった場合、それは新バージョンのデータソースがリリースされる前にワークスペースに関連付けた最初の MaxCompute エンジンに対してデフォルトで作成されたものです。データ同期のためにこのデータソースを選択すると、その特定の MaxCompute エンジンプロジェクトからデータを読み取るか、またはデータを書き込むことになります。
データソースが使用する MaxCompute プロジェクトの名前をデータソース設定ページで表示して、読み取りおよび書き込み操作の最終的なプロジェクトを確認できます。詳細については、「データソース管理」をご参照ください。
データ同期タスク
同期タスクの設定のエントリポイントと手順については、以下の設定ガイドをご参照ください。
単一テーブルのバッチ同期
手順については、「コードレス UI の使用」および「コードエディタの使用」をご参照ください。
コードエディタのパラメーターの完全なリストとコード例については、「付録:コードとパラメーター」をご参照ください。
単一テーブルのリアルタイム同期
手順については、「単一テーブルから増分データを同期するためのリアルタイム同期ノードの作成」および「DataStudio でのリアルタイム同期タスクの設定」をご参照ください。
データベース全体の同期
手順については、「バッチデータベース全体の同期タスク」、「リアルタイムデータベース全体の同期タスクの設定」、および「ワンクリックリアルタイム完全増分同期」をご参照ください。
よくある質問
MaxCompute (ODPS) テーブルからデータを読み取る際に、ソーステーブルの列マッピングに「行の追加」または「フィールドの追加」を使用する場合に知っておくべきことは何ですか?
MaxCompute (ODPS) テーブルからデータを読み取る際に、パーティション列を同期するにはどうすればよいですか?
MaxCompute (ODPS) テーブルからデータを読み取る際に、複数のパーティションからデータを同期するにはどうすればよいですか?
Data Integration に関するその他のよくある質問については、「Data Integration に関するよくある質問」をご参照ください。
付録:コードとパラメーター
コードエディタを使用したバッチ同期タスクの設定
コードエディタを使用してバッチ同期タスクを設定する場合、統一されたスクリプト形式の要件に基づいて、スクリプト内の関連パラメーターを設定する必要があります。詳細については、「コードエディタの使用」をご参照ください。以下の情報は、コードエディタを使用してバッチ同期タスクを設定する際に、データソースに対して設定する必要があるパラメーターについて説明しています。
Reader スクリプトのデモ
タスクを実行する際は、コードからコメントを削除してください。
{
"type":"job",
"version":"2.0",
"steps":[
{
"stepType":"odps",// プラグイン名。
"parameter":{
"partition":[],// データを読み取るパーティション。
"isCompress":false,// データを圧縮するかどうかを指定します。
"datasource":"",// データソース。
"column":[// ソーステーブルの列。
"id"
],
"where": "",// データフィルタリングのための WHERE 句の内容。
"enableWhere":false,// データフィルタリングに WHERE 句を使用するかどうかを指定します。
"table":""// テーブル名。
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// エラーレコード数。
},
"speed":{
"throttle":true,// throttle を false に設定すると、mbps パラメーターは有効にならず、データ同期速度は制限されません。throttle を true に設定すると、データ同期速度は制限されます。
"concurrent":1, // 並行スレッド数。
"mbps":"12"// 最大転送速度。単位は MB/s です。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}MaxCompute の Tunnel エンドポイントを指定する必要がある場合は、コードエディタでデータソースを手動で設定できます。上記の例の "datasource":"", をデータソースの特定のパラメーターに置き換えます。以下に例を示します。
"accessId":"*******************",
"accessKey":"*******************",
"endpoint":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"odpsServer":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"tunnelServer":"http://dt.eu-central-1.maxcompute.aliyun.com",
"project":"*****", Reader スクリプトのパラメーター
パラメーター | 説明 | 必須 | デフォルト値 |
datasource | データソースの名前。コードエディタでデータソースを追加できます。このパラメーターの値は、追加されたデータソースの名前と同じでなければなりません。 | はい | なし |
table | ソーステーブルの名前。値は大文字と小文字を区別しません。 | はい | なし |
partition | データを読み取るパーティション。
たとえば、パーティションテーブル test には、pt=1,ds=hangzhou、pt=1,ds=shanghai、pt=2,ds=hangzhou、および pt=2,ds=beijing の 4 つのパーティションが含まれています。異なるパーティションからデータを読み取るための設定は次のとおりです。
条件を設定してパーティションからデータを取得することもできます。
説明
| このパラメーターはパーティションテーブルに必須です。非パーティション化テーブルにはこのパラメーターを設定しないでください。 | なし |
column | MaxCompute ソーステーブルから列情報を読み取ります。たとえば、テーブル test の列は id、name、および age です。
| はい | なし |
enableWhere | データフィルタリングに WHERE 句を使用するかどうかを指定します。 | いいえ | false |
where | データフィルタリングのための WHERE 句の内容。 | いいえ | なし |
Writer スクリプトのデモ
次のコードはスクリプトの例です。
{
"type":"job",
"version":"2.0",// バージョン番号。
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"odps",// プラグイン名。
"parameter":{
"partition":"",// パーティション情報。
"truncate":true,// クリーンアップルール。
"compress":false,// データを圧縮するかどうかを指定します。
"datasource":"odps_first",// データソース名。
"column": [// ソース列名。
"id",
"name",
"age",
"sex",
"salary",
"interest"
],
"table":""// テーブル名。
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// エラーレコード数。これは許容されるダーティデータレコードの最大数を指定します。
},
"speed":{
"throttle":true,// throttle を false に設定すると、mbps パラメーターは有効にならず、データ同期速度は制限されません。throttle を true に設定すると、データ同期速度は制限されます。
"concurrent":1, // 並行スレッド数。
"mbps":"12"// 最大転送速度。単位は MB/s です。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}MaxCompute の Tunnel エンドポイントを指定する必要がある場合は、コードエディタでデータソースを手動で設定できます。上記の例の "datasource":"", をデータソースの特定のパラメーターに置き換えます。以下に例を示します。
"accessId":"<yourAccessKeyId>",
"accessKey":"<yourAccessKeySecret>",
"endpoint":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"odpsServer":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"tunnelServer":"http://dt.eu-central-1.maxcompute.aliyun.com",
"project":"**********", Writer スクリプトのパラメーター
パラメーター | 説明 | 必須 | デフォルト値 |
datasource | データソースの名前。コードエディタでデータソースを追加できます。このパラメーターの値は、追加されたデータソースの名前と同じでなければなりません。 | はい | なし |
table | 送信先テーブルの名前。値は大文字と小文字を区別しません。1 つのテーブルのみ指定できます。 | はい | なし |
partition | パーティションテーブルにデータを書き込むには、最後のレベルまでパーティション情報を指定する必要があります。たとえば、3 レベルのパーティションテーブルにデータを書き込むには、
| このパラメーターはパーティションテーブルに必須です。非パーティション化テーブルにはこのパラメーターを設定しないでください。 | なし |
column | インポートする列のリスト。すべての列をインポートするには、このパラメーターを
| はい | なし |
truncate |
truncate オプションは、データクレンジングに MaxCompute SQL を使用するため、アトミック操作ではありません。MaxCompute SQL は原子性を保証しないためです。複数のタスクが同時に同じ テーブルまたは パーティションをクリーンアップしようとすると、同時実行の問題が発生する可能性があることに注意してください。 これを避けるには、複数の同時ジョブから同じパーティションに対して DDL 操作を実行しないでください。または、同時ジョブが開始される前にパーティションを作成してください。 | はい | なし |
emptyAsNull | 書き込む前に空の文字列を NULL に変換するかどうかを指定します。 | いいえ | false |
consistencyCommit | 同期後の可視性。
| いいえ | false |