MaxCompute データソースはデータハブとして機能します。MaxCompute からのデータの読み取りと MaxCompute へのデータの書き込みを行うための双方向チャネルを提供します。
機能
DataWorks の MaxCompute データソースは、Tunnel エンドポイントアドレスを使用して MaxCompute プロジェクトの Tunnel サービスにアクセスできます。これにより、アップロードとダウンロードを通じてプロジェクトのデータを同期できます。Tunnel サービスを使用したアップロードとダウンロードには、DownloadTable 操作が含まれます。
2023 年 12 月 11 日以降に作成された MaxCompute データソースの場合、DataWorks サービスがアクセスしたい MaxCompute プロジェクトと異なるリージョンにある場合、Tunnel エンドポイントアドレスを使用して MaxCompute プロジェクトから直接データを同期することはできません。このシナリオでは、Cloud Enterprise Network (CEN) インスタンスを購入して、DataWorks サービスと MaxCompute プロジェクトのネットワークを接続する必要があります。ネットワークが接続された後、リージョンをまたいだデータ同期を実行できます。CEN とその関連操作の詳細については、「Cloud Enterprise Network」をご参照ください。
オフライン読み取り
MaxCompute Reader は、パーティションテーブルと標準テーブルからの読み取りをサポートしています。仮想ビューからの読み取りや外部テーブルの同期はサポートしていません。
MaxCompute のパーティションテーブルからオフラインで読み取りを行う場合、パーティションフィールドを直接マッピングすることはできません。パーティションフィールドの値を同期するには、カスタムフィールドを追加し、パーティション名を手動で入力してから、フィールドマッピングを実行する必要があります。
スケジューリングパラメーターを使用してパーティションを指定し、自動的に置き換えることができます。これは、スケジューリング時間に基づいてパーティションを同期する必要があるシナリオで役立ちます。
たとえば、t0 という名前のパーティションテーブルに id と name というフィールドがあるとします。ハッシュパーティションは pt、サブパーティションは ds です。pt=<データタイムスタンプ> かつ ds=hangzhou のパーティションからデータを読み取るには、データソース設定でパーティションの値を pt=${scheduling parameter} および ds=hangzhou として指定する必要があります。その後、フィールドマッピング設定時に id と name フィールドをマッピングできます。
パーティションフィールドをカスタムフィールドとして追加することで、ターゲットテーブルに書き込むことができます。
MaxCompute Reader は、WHERE 句を使用したデータフィルタリングをサポートしています。
オフライン書き込み
MaxCompute Writer は、データに null 値が含まれている場合、VARCHAR 型をサポートしません。
DeltaTableにデータを書き込む場合は、[同期後の可視性] を [はい] に設定します。そうしないと、同時実行数が 1 より大きい場合にタスクが失敗します。
リアルタイム書き込み
リアルタイムデータ同期タスクは、サーバーレスリソースグループ (推奨) とデータ統合専用リソースグループをサポートしています。
リアルタイムデータ同期タスクは、プライマリキーのないテーブルをサポートしていません。
デフォルトの MaxCompute データソース (通常は
odps_first) にリアルタイムで同期する場合、デフォルトで一時的な AccessKey (AK) が使用されます。一時的な AK は 7 日後に有効期限が切れるため、タスクは失敗します。プラットフォームは、一時的な AK の有効期限切れによる失敗を検出すると、タスクを自動的に再起動します。この種のエラーに対してモニタリングアラートを設定している場合は、アラートが届きます。MaxCompute へのワンクリックリアルタイム同期タスクでは、設定した日にのみ完全な履歴データをクエリできます。増分データは、翌日にマージが完了した後にのみ MaxCompute でクエリできます。
MaxCompute へのワンクリックリアルタイム同期タスクは、毎日完全なパーティションを生成します。過剰なデータがストレージリソースを消費するのを防ぐため、このソリューションによって自動的に作成される MaxCompute テーブルのデフォルトのライフサイクルは 30 日です。この期間がビジネス要件を満たさない場合は、タスク設定中に MaxCompute テーブル名をクリックしてライフサイクルを変更できます。
データ統合は、MaxCompute エンジンのデータチャネルを使用してデータのアップロードとダウンロードを行います。データチャネルの Service-Level Agreement (SLA) の詳細については、「Data Transmission Service (アップロード) のシナリオとツール」をご参照ください。MaxCompute エンジンのデータチャネルの SLA に基づいてデータ同期ソリューションを評価する必要があります。
インスタンスモードで MaxCompute へのワンクリックリアルタイム同期を行う場合、データ統合専用リソースグループには少なくとも 8 vCPU と 16 GB のメモリが必要です。
現在のワークスペースと同じリージョンにあるユーザー作成の MaxCompute データソースのみがサポートされます。データソースの接続をテストする際には、リージョンをまたいだ MaxCompute プロジェクトに正常に接続できます。ただし、同期タスクが実行されると、MaxCompute でのテーブル作成フェーズで「engine not found」エラーが発生します。
MaxCompute がデータベース全体の同期先である場合、サポートされる同期方法はテーブルのタイプによって異なります。標準テーブルの場合、データベース全体の完全同期と増分同期 (ほぼリアルタイム) のみがサポートされます。Delta Table の場合、リアルタイム同期とデータベース全体の完全同期と増分同期 (ほぼリアルタイム) の両方がサポートされます。
説明ユーザー作成の MaxCompute データソースを使用する場合でも、DataWorks プロジェクトは MaxCompute エンジンにアタッチされている必要があります。そうしないと、MaxCompute SQL ノードを作成できず、完全同期のための「done」ノードの作成が失敗します。
注意事項
ターゲットテーブルの列がソース列にマッピングされていない場合、テーブル作成時にターゲット列にデフォルト値が指定されていても、同期タスク完了後にその値は null に設定されます。
サポートされるフィールド型
MaxCompute 1.0、MaxCompute 2.0、および Hive 互換のデータ型がサポートされています。以下のセクションでは、各データ型バージョンでサポートされるフィールド型について説明します。
データ型 1.0 でサポートされるフィールド
フィールド型 | オフライン読み取り | オフライン書き込み | リアルタイム書き込み |
BIGINT | サポート対象 | サポート対象 | サポート対象 |
DOUBLE | サポート対象 | サポート対象 | サポート対象 |
DECIMAL | ヘルプとサポート | サポート対象 | サポート対象 |
STRING | サポート対象 | サポート対象 | サポート対象 |
DATETIME | サポート対象 | サポート対象 | サポート対象 |
BOOLEAN | サポート対象 | サポート対象 | サポート対象 |
ARRAY | サポート対象 | サポート対象 | サポート対象 |
MAP | サポート対象 | サポート対象 | サポート対象 |
STRUCT | サポート対象 | サポート対象 | サポート対象 |
データ型 2.0 および Hive 互換データ型でサポートされるフィールド
フィールド型 | オフライン読み取り (MaxCompute Reader) | オフライン書き込み (MaxCompute Writer) | リアルタイム書き込み |
TINYINT | サポート対象 | サポート対象 | サポート対象 |
SMALLINT | サポート対象 | サポート対象 | サポート対象 |
INT | サポート対象 | サポート対象 | サポート対象 |
BIGINT | サポート対象 | サポート対象 | サポート対象 |
BINARY | サポート対象 | サポート対象 | サポート対象 |
FLOAT | サポート対象 | サポート対象 | サポート対象 |
DOUBLE | サポート対象 | サポート対象 | サポート対象 |
DECIMAL(pecision,scale) | サポート対象 | サポート対象 | サポート対象 |
VARCHAR(n) | サポート対象 | サポート対象 | サポート対象 |
CHAR(n) | サポート対象外 | サポート対象 | サポート対象 |
STRING | サポート対象 | サポート対象 | サポート対象 |
DATE | サポート対象 | サポート対象 | サポート対象 |
DATETIME | サポート対象 | サポート対象 | サポート対象 |
TIMESTAMP | サポート対象 | サポート対象 | サポート対象 |
BOOLEAN | サポート対象 | サポート対象 | サポート対象 |
ARRAY | サポート対象 | サポート対象 | サポート対象 |
MAP | サポート対象 | サポート対象 | サポート対象 |
STRUCT | サポート対象 | サポート対象 | サポート対象 |
データ型のマッピング
次の表に、MaxCompute Reader のデータ型のマッピングを示します。
型カテゴリ | データ統合の型 | データベースのデータ型 |
整数 | LONG | BIGINT、INT、TINYINT、SMALLINT |
ブール値 | BOOLEAN | BOOLEAN |
日付と時刻 | DATE | DATETIME、TIMESTAMP、DATE |
浮動小数点 | DOUBLE | FLOAT、DOUBLE、DECIMAL |
バイナリ | BYTES | BINARY |
複合 | STRING | ARRAY、MAP、STRUCT |
データ変換に失敗した場合、または宛先データソースへのデータ書き込みに失敗した場合、そのデータはダーティデータと見なされます。これは、ダーティデータのレコードしきい値を使用して処理できます。
前提条件
MaxCompute テーブルからデータを読み書きする前に、必要なプロパティを有効にすることができます。
MaxCompute への接続とプロジェクトレベル設定の有効化
MaxCompute クライアントにログインします。詳細については、「ローカルクライアント (odpscmd) を使用した接続」をご参照ください。
MaxCompute のプロジェクトレベルの設定を有効にします。必要な権限があることを確認してください。Project Owner アカウントを使用してこれらの操作を実行できます。MaxCompute の権限の詳細については、「ロール計画」をご参照ください。
ACID 属性の有効化
Project Owner アカウントを使用してクライアントで次のコマンドを実行し、ACID 属性を有効にすることができます。MaxCompute の ACID セマンティクスの詳細については、「ACID セマンティクス」をご参照ください。
setproject odps.sql.acid.table.enable=true;(任意) データ型 2.0 の有効化
MaxCompute データ型 2.0 の timestamp 型を使用するには、Project Owner アカウントを使用してクライアントで次のコマンドを実行し、データ型 2.0 を有効にします。
setproject odps.sql.type.system.odps2=true;(任意) 権限の付与
MaxCompute 計算リソースをワークスペースにアタッチすると、デフォルトで DataWorks に MaxCompute データソースが作成されます。このデータソースを使用して、現在のワークスペース内でデータを同期できます。別のワークスペースでこの MaxCompute データソースからデータを同期するには、別のワークスペースのデータソースに指定されたアクセスアカウントが、この MaxCompute プロジェクトへのアクセス権限を持っていることを確認する必要があります。クロスアカウント認証の詳細については、「クロスアカウント認証 (MaxCompute および Hologres)」をご参照ください。
MaxCompute データソースの作成
データ同期タスクを開発する前に、DataWorks で MaxCompute プロジェクト用の MaxCompute データソースを作成する必要があります。MaxCompute データソースの作成方法の詳細については、「MaxCompute 計算リソースのアタッチ」をご参照ください。
標準モードのワークスペースは、データソースの分離をサポートしています。開発環境と本番環境のデータソースを追加して分離し、データのセキュリティを確保できます。詳細については、「開発環境と本番環境でのデータソースの分離」をご参照ください。
ワークスペース内の odps_first という名前の MaxCompute データソースが [データソース] ページで手動で作成されたものでない場合、それはデータソース機能が更新される前にワークスペースにアタッチされた最初の MaxCompute エンジンのために作成されたデフォルトのデータソースです。データ同期にこのデータソースを選択した場合、その MaxCompute エンジンのプロジェクトからデータを読み書きすることになります。
データソース設定ページでデータソースが使用する MaxCompute プロジェクトの名前を表示して、データがどのプロジェクトから読み書きされるかを確認できます。詳細については、「データソース管理」をご参照ください。
データ同期タスクの開発
同期タスクの設定のエントリポイントと手順については、次の設定ガイドをご参照ください。
単一テーブルのオフライン同期タスクの設定
手順については、「コードレス UI でのタスクの設定」および「コードエディタでのタスクの設定」をご参照ください。
コードエディタでの設定に関するすべてのパラメーターとスクリプトデモについては、「付録:スクリプトデモとパラメーターの説明」をご参照ください。
単一テーブルのリアルタイム同期タスクの設定
手順については、「データ統合でのリアルタイム同期タスクの設定」および「DataStudio でのリアルタイム同期タスクの設定」をご参照ください。
データベース全体の同期タスクの設定
手順については、「データベース全体のオフライン同期」、「データベース全体のリアルタイム同期」、および「データベース全体の完全同期と増分同期 (ほぼリアルタイム)」をご参照ください。
よくある質問
データ統合に関するその他のよくある質問については、「データ統合」をご参照ください。
付録:スクリプトデモとパラメーターの説明
コードエディタを使用したバッチ同期タスクの設定
コードエディタを使用してバッチ同期タスクを設定する場合、統一されたスクリプト形式の要件に基づいて、スクリプト内の関連パラメーターを設定する必要があります。詳細については、「コードエディタでのタスクの設定」をご参照ください。以下では、コードエディタを使用してバッチ同期タスクを設定する際に、データソースに対して設定する必要があるパラメーターについて説明します。
Reader スクリプトデモ
タスクを実行する際は、次のコードからコメントを削除してください。
{
"type":"job",
"version":"2.0",
"steps":[
{
"stepType":"odps",// プラグイン名。
"parameter":{
"partition":[],// データを読み取るパーティション。
"isCompress":false,// データを圧縮するかどうかを指定します。
"datasource":"",// データソース。
"column":[// ソーステーブルの列。
"id"
],
"where": "",// WHERE 句を使用してデータをフィルタリングする場合は、具体的な句を入力します。
"enableWhere":false,// WHERE 句を使用してデータをフィルタリングするかどうかを指定します。
"table":""// テーブル名。
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// エラーレコード数。
},
"speed":{
"throttle":true,// throttle が false に設定されている場合、mbps パラメーターは有効にならず、データレートは制限されません。throttle が true に設定されている場合、データレートは制限されます。
"concurrent":1, // 同時実行ジョブ数。
"mbps":"12"// データレートの制限。1 mbps = 1 MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}MaxCompute の Tunnel エンドポイントを指定するには、コードエディタでデータソースを手動で設定できます。上記の例の "datasource":"", を特定のデータソースパラメーターに置き換えます。次のコードは例です。
"accessId":"*******************",
"accessKey":"*******************",
"endpoint":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"odpsServer":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"tunnelServer":"http://dt.eu-central-1.maxcompute.aliyun.com",
"project":"*****", Reader スクリプトのパラメーター
パラメーター | 説明 | 必須 | デフォルト値 |
datasource | データソースの名前。コードエディタでデータソースを追加できます。このパラメーターの値は、追加したデータソースの名前と同じである必要があります。 | はい | なし |
table | ソーステーブルの名前。名前は大文字と小文字を区別しません。 | はい | なし |
partition | データを読み取るパーティション。
たとえば、test という名前のパーティションテーブルに pt=1,ds=hangzhou、pt=1,ds=shanghai、pt=2,ds=hangzhou、pt=2,ds=beijing の 4 つのパーティションがあるとします。次の例は、データを読み取るためのパーティションの設定方法を示しています。
要件に応じて、条件を設定してパーティションデータを取得することもできます。
説明
| パーティションテーブルでは必須です。標準テーブルではこのパラメーターを指定しないでください。 | なし |
column | MaxCompute ソーステーブルから読み取る列。たとえば、test という名前のテーブルに id、name、age というフィールドがあるとします。
| はい | なし |
enableWhere | WHERE 句を使用してデータをフィルタリングするかどうかを指定します。 | いいえ | false |
where | WHERE 句を使用してデータをフィルタリングする場合は、具体的な句を入力します。 | いいえ | なし |
Writer スクリプトデモ
次のコードは、サンプルスクリプト設定です。
{
"type":"job",
"version":"2.0",// バージョン番号。
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"odps",// プラグイン名。
"parameter":{
"partition":"",// パーティション情報。
"truncate":true,// クリーンアップルール。
"compress":false,// データを圧縮するかどうかを指定します。
"datasource":"odps_first",// データソース名。
"column": [// ソース列名。
"id",
"name",
"age",
"sex",
"salary",
"interest"
],
"table":""// テーブル名。
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// エラーレコード数。これは許容されるダーティデータの最大レコード数です。
},
"speed":{
"throttle":true,// throttle が false に設定されている場合、mbps パラメーターは有効にならず、データレートは制限されません。throttle が true に設定されている場合、データレートは制限されます。
"concurrent":1, // 同時実行ジョブ数。
"mbps":"12"// データレートの制限。1 mbps = 1 MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}MaxCompute の Tunnel エンドポイントを指定するには、コードエディタでデータソースを手動で設定できます。上記の例の "datasource":"", を特定のデータソースパラメーターに置き換えます。次のコードは例です。
"accessId":"<yourAccessKeyId>",
"accessKey":"<yourAccessKeySecret>",
"endpoint":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"odpsServer":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"tunnelServer":"http://dt.eu-central-1.maxcompute.aliyun.com",
"project":"**********", Writer スクリプトのパラメーター
パラメーター | 説明 | 必須 | デフォルト値 |
datasource | データソースの名前。コードエディタでデータソースを追加できます。このパラメーターの値は、追加したデータソースの名前と同じである必要があります。 | はい | なし |
table | 宛先テーブルの名前。名前は大文字と小文字を区別しません。複数のテーブルを指定することはできません。 | はい | なし |
partition | 宛先テーブルのパーティション情報。最下位レベルのパーティションを指定する必要があります。たとえば、3 階層のパーティションテーブルにデータを書き込むには、
| パーティションテーブルでは必須です。標準テーブルではこのパラメーターを指定しないでください。 | なし |
column | インポートするフィールドのリスト。すべてのフィールドをインポートするには、パラメーターを
| はい | なし |
truncate |
データクリーンアップは MaxCompute SQL を使用して実行され、原子性が保証されないため、truncate オプションはアトミックな操作ではありません。複数のタスクが同じ Table または Partition のパーティションを同時にクリアすると、同時実行のタイミングの問題が発生する可能性があります。この点には十分注意してください。 このような問題を回避するには、同じパーティションで複数の DDL ジョブを同時に実行しないようにするか、複数の同時実行ジョブを開始する前にパーティションを作成します。 | はい | なし |
emptyAsNull | 書き込み前に空の文字列を NULL に変換するかどうかを指定します。 | いいえ | false |
consistencyCommit | 同期後の可視性。
| いいえ | false |