このトピックでは、LHM 移行ツールを使用して、スケジューリングワークフローを Azure Data Factory (ADF) から DataWorks に移行する方法について説明します。このプロセスには、ADF からのタスクのエクスポート、スケジューリングタスクの変換、DataWorks へのタスクのインポートの 3 つのステップが含まれます。
1. 環境の準備
1.1. ランタイム環境の準備
No. | 項目 | 仕様 | 数量 | 注 |
1 | ECS | 4 コア 16 GB 以上 | 1 | CentOS または AliyunOS イメージがサポートされています。 |
2 | JDK 17 | |||
3 | ランタイムパッケージ |
1.2. ネットワークの準備
Elastic Compute Service (ECS) インスタンスを含む VPC は、パブリックネットワークにアクセスできる必要があります。
1.3. アカウントと権限の準備
1.3.1. サービス登録認証の使用
アプリケーションを登録します。
クライアントシークレットを取得します。
1.3.2. Azure Data Factory の権限設定
1.3.3. BLOB 権限の設定
このステップはオプションです。ノード内の BLOB からファイルコンテンツを読み取るには、その BLOB に権限を付与する必要があります。ADF のサービス登録に権限を付与したのと同じ方法を使用できます。
1.3.4. Databricks 権限の設定
このステップはオプションです。ADF の Databricks ノードの外部スクリプトファイルを読み取る場合にのみ必要です。
Databricks ワークベンチに移動します。
[User Settings] をクリックします。
左側で、[Developer] ページに移動します。
[Generate new token] をクリックします。
2. Azure Data Factory からのタスクのエクスポート
タスクは、ソフトウェア開発キット (SDK) を使用してエクスポートできます。依存関係は次のとおりです:
<dependency>
<groupId>com.azure.resourcemanager</groupId>
<artifactId>azure-resourcemanager-datafactory</artifactId>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>databricks-sdk-java</artifactId>
</dependency>2.1. 設定ファイル
{
"schedule_datasource": {
"name": "name",
"type": "adf",
"properties": {
"isUseProxy": false,
"proxyHost": "proxy.msl.cn",
"proxyPort": "8080",
"AzureCloud": "AZURE_CHINA_CLOUD",
"apiMode": "sdk"
"endpoint": "https://management.azure.com",
"subscriptionId": "xxx",
"factory": "bigdata-adf-jiman",
"project": "bigdata-adf-jiman",
"resourceGroupName": "biadata",
"tenantId": "xxx",
"clientId": "xxx",
"clientSecretValue": "xxxx",
"dbr_endpoint": "https://adb-xxxxx.16.azuredatabricks.net",
"dbr_token": "xxxxx",
"pipelineNameWhite": "dbr-demo"
},
"operaterType": "AUTO"
}
}2.2. パラメーターの説明
No. | パラメーター | 必須 | 例 | 注 |
1 | isUseProxy | いいえ | false | プロキシモードで Azure にアクセスする場合に必要です。 |
2 | proxyHost | いいえ | proxy.msl.cn | プロキシモードで Azure にアクセスする場合に必要です。 |
3 | proxyPort | いいえ | 8080 | プロキシモードで Azure にアクセスする場合に必要です。 |
4 | AzureCloud | いいえ | AZURE_CHINA_CLOUD | 有効な値を以下に示します。 有効な値を以下に示します。これらは異なるリージョンに対応しています。デフォルト値は AZURE_CHINA_CLOUD です。
|
5 | endpoint | いいえ | https://portal.azure.com | このパラメーターは AzureCloud に関連付けられており、オプションです。 |
6 | subscriptionId | はい | xxxxx | サブスクリプション ID。ADF ホームページの概要から取得できます。 |
7 | resourceGroupName | はい | xxxxx | リソースグループの名前。ADF ホームページの概要から取得できます。 |
8 | factory | はい | bigdata-adf-jiman | 識別子として使用されます。 |
9 | project | いいえ | bigdata-adf-jiman | このパラメーターは factory パラメーターと一致させてください。 |
10 | tenantId | はい | xxxxx | サービス登録のステップで取得できます。 |
11 | clientId | はい | xxxxx | サービス登録のステップで取得できます。 |
12 | clientSecretValue | はい | xxxxx | サービス登録のステップで取得できます。 |
13 | dbr_endpoint | いいえ | xxxxx | Databricks ホームページから取得します。 |
14 | dbr_token | いいえ | xxxxx | Databricks ホームページから取得します。 |
15 | pipelineNameWhite | いいえ | dbr-demo | ホワイトリスト。複数のパイプライン名を指定するには、カンマ (,) で区切ります。 |
2.3. エクスポートコマンドの実行
mkdir result
sh ./bin/run.sh read \
-c ./conf/<your_config_file>.json \
-o ./data/1_ReaderOutput/<source_export_package>.zip \
-t adf-readerパラメーターの説明
No. | パラメーター | 必須 | 例 | 注 |
1 | -c | はい | ./conf/<your_config_file>.json | 設定ファイルのパス。 |
2 | -o | はい | ./data/1_ReaderOutput/<source_export_package>.zip | 出力パス。 |
3 | -t | はい | adf-reader | プラグインタイプ。これは静的フィールドです。 |
完全なコマンドの例
mkdir result
sh ./bin/run.sh read -c ./conf/read.json -f ./result/temp.zip -o ./result/read_out.zip -t adf-reader2.4. エクスポート結果の表示
生成された ReaderOutput.zip パッケージを ./data/1_ReaderOutput/ ディレクトリで開くと、エクスポート結果をプレビューできます。統計レポートには、ワークフロー、ノード、リソース、関数、データソースに関する基本情報の概要が記載されています。data/project フォルダには、スケジューリング情報の標準化されたデータ構造が含まれています。
統計レポートには、2 つの特別な機能があります:
1. レポート内でワークフローとノードのプロパティを変更できます。編集可能なフィールドは青いフォントで示されます。タスクを DataWorks にインポートする際、ツールはこれらのプロパティ変更を適用します。
2. ワークフローの子テーブルで対応する行を削除することで、DataWorks へのインポート時に特定のワークフローをスキップできます。この機能はワークフローのブラックリストとして機能します。注意:ワークフロー間に依存関係がある場合、それらは同じバッチでインポートする必要があります。ブラックリストを使用してそれらを分離しないでください。エラーの原因となります。
詳細については、「スケジューリング移行における概要レポートを使用したスケジューリングプロパティの追加または変更」をご参照ください。
3. スケジューリングタスクの変換
3.1. 設定ファイル
{
"conf": {
"locale": "zh_CN"
},
"self": {
"if.use.default.convert": false,
"if.use.migrationx.before": false,
"if.use.dataworks.newidea": true,
"filter.rule": []
},
"schedule_datasource": {
"name": "adf",
"type": "Adf"
},
"target_schedule_datasource": {
"name": "name",
"type": "DataWorks"
}
}3.2. パラメーターの説明
デフォルト設定で十分です。
No. | パラメーター | 必須 | 例 | 注 |
1 | filter.rule | いいえ | [ { "type": "black", "element": "node", "field": "name", "value": "DataXNode" } ] | フィルター規則。このパラメーターはオプションです。 type はブラックリストまたはホワイトリストを指定します。有効な値:BLACK と WHITE。 element はフィルターする要素のタイプを指定します。サポートされているタイプ:NODE と WORKFLOW。 field は名前でフィルターするか ID でフィルターするかを指定します。有効な値:ID と NAME。 value は値を指定します。複数の値を指定するには、カンマ (,) で区切ります。 |
schedule_datasource.name はデータソースを識別し、すべてのワークフローのルートディレクトリに対応します。
3.3. 組み込みノードの変換とマッピングロジック
3.3.1. 一般的なノードマッピングロジック
ADF ノードタイプ | DataWorks ノードタイプ | 注 |
Copy | DI | 現在、元の JSON スクリプトのみが利用可能です。Data Integration (DI) ノードの具体的な内容は、データソースのタイプに依存します。変換を実行するには、すべての DI 関連のデータソースタイプに関する情報が必要です。 |
Delete | DIDE_SHELL | 削除対象のファイルパスを連結するシェルスクリプト。 |
SqlServerStoredProcedure | ODPS-SQL | デフォルトは ODPS ノードです。ロジックが SQL Server であると判断した場合、ノードタイプは Sql Server になります。 |
DatabricksSparkJar | ODPS-SPARK | ODPS Spark ノードは既存のパラメーターを使用して変換されます。一部の情報が欠落する可能性があります。 |
DatabricksSparkPython | ODPS-SPARK | ODPS Spark ノードは既存のパラメーターを使用して変換されます。一部の情報が欠落する可能性があります。 |
DatabricksNotebook | NOTEBOOK | Scala 関連のセルは処理されません。 |
SynapseNotebook | NOTEBOOK | Scala 関連のセルは処理されません。 |
SparkJob | ODPS-SPARK | 一時的に仮想ノードに変換されます。 |
AppendVariable | CONTROLLER_ASSIGNMENT | 割り当てノード。 |
ExecutePipeline | SUB_PROCESS | |
Script | ODPS_SQL | |
Wait | DIDE_SHELL | |
WebActivity | DIDE_SHELL | |
IfCondition | CONTROLLER_BRANCH | ブランチノード、サブプロセス、マージノードに変換されます。 |
FOREACH | CONTROLLER_TRAVERSE | Foreach |
Switch | CONTROLLER_BRANCH | ブランチノード、サブプロセス、マージノードに変換されます。 |
Until | CONTROLLER_CYCLE | do-while ノード。 |
Lookup | CONTROLLER_ASSIGNMENT | |
Filter | CONTROLLER_ASSIGNMENT | |
GetMeta | CONTROLLER_ASSIGNMENT | |
SetVariable | CONTROLLER_ASSIGNMENT | |
HDInsightHive | ODPS-SQL | |
HDInsightSpark | ODPS_Spark | |
HDInsightMapReduce | ODPS_MR | |
その他 | VIRTUAL |
注意:リソースファイルは手動でアップロードする必要があります。
3.3.2. 論理ノードマッピングロジック
ifcondition
foreach
switch
3.3.3. 変換コマンドの実行
sh ./bin/run.sh convert \
-c ./conf/<your_config_file>.json \
-f ./data/1_ReaderOutput/<source_export_package>.zip \
-o ./data/2_ConverterOutput/<conversion_output_package>.zip \
-t wedata-dw-converterNo. | パラメーター | 必須 | 例 | 注 |
1 | -c | はい | ./conf/<your_config_file>.json | 設定ファイルのパス。 |
2 | -f | はい | ./data/1_ReaderOutput/<source_export_package>.zip | ソースからエクスポートされたパッケージ。 |
3 | -o | はい | ./result/convert_out.zip | 出力パッケージのパス。 |
4 | -t | はい | adf-dw-converter | プラグインタイプ。これは静的フィールドです。 |
3.3.4. 変換結果の表示
生成された ConverterOutput.zip パッケージを ./data/2_ConverterOutput/ ディレクトリで開くと、変換結果をプレビューできます。
統計レポートには、変換されたワークフロー、ノード、リソース、関数、データソースに関する基本情報の概要が記載されています。
data/project フォルダには、メインの変換済みスケジューリング移行パッケージが含まれています。
統計レポートには、2 つの特別な機能があります:
1. レポート内でワークフローとノードのプロパティを変更できます。編集可能なフィールドは青いフォントで示されます。タスクを DataWorks にインポートする際、ツールはこれらのプロパティ変更を適用します。
2. ワークフローの子テーブルで対応する行を削除することで、DataWorks へのインポート時に特定のワークフローをスキップできます。この機能はワークフローのブラックリストとして機能します。注意:ワークフロー間に依存関係がある場合、それらは同じバッチでインポートする必要があります。ブラックリストを使用してそれらを分離しないでください。エラーの原因となります。
詳細については、「スケジューリング移行における概要レポートを使用したスケジューリングプロパティの追加または変更」をご参照ください。
4. DataWorks へのタスクのインポート
LHM 移行ツールは、ソースのスケジューリング要素を DataWorks のスケジューリングフォーマットに変換します。このツールは、さまざまな移行シナリオでワークフローを DataWorks にインポートできる、統一されたアップロードエントリポイントを提供します。
インポートツールは複数の書き込み操作をサポートし、上書きモードでワークフローを自動的に作成または更新します。
4.1. 前提条件
1.1. 変換の成功
変換ツールが正常に実行され、ソースのスケジューリング情報が DataWorks のスケジューリングフォーマットに変換され、ConverterOutput.zip パッケージが生成されている必要があります。
(オプション、推奨) 変換出力パッケージを開き、統計レポートを表示して、移行範囲が正常に変換されたことを確認できます。
1.2. DataWorks 構成
DataWorks では、次の操作を実行する必要があります:
1. ワークスペースを作成します。
2. AccessKey ペア (AccessKey ID と AccessKey Secret) を作成し、そのペアがワークスペースの管理者権限を持っていることを確認します。(トラブルシューティングを容易にするため、アカウントに紐付けられた AccessKey ペアを作成することを強く推奨します。)
3. ワークスペースで、データソースを作成し、計算リソースをアタッチし、リソースグループを作成します。
4. ワークスペースで、リソースファイルをアップロードし、ユーザー定義関数 (UDF) を作成します。
4.1.3. ネットワーク接続性の確認
DataWorks エンドポイントへの接続を確認する必要があります。
エンドポイントのリストについては、以下をご参照ください:
ping dataworks.aliyuncs.com4.2. インポート設定項目
プロジェクトの conf フォルダに、writer.json などの JSON エクスポート設定ファイルを作成できます。
ファイルを使用する前に、JSON からコメントを削除する必要があります。
{
"schedule_datasource": {
"name": "YourDataWorks", // DataWorks データソースに名前を付けます。
"type": "DataWorks",
"properties": {
"endpoint": "dataworks.cn-hangzhou.aliyuncs.com", // エンドポイント
"project_id": "YourProjectId", // ワークスペース ID
"project_name": "YourProject", // ワークスペース名
"ak": "************", // AccessKey ID
"sk": "************" // AccessKey Secret
},
"operaterType": "MANUAL"
},
"conf": {
"di.resource.group.identifier": "Serverless_res_group_***_***", // スケジューリングリソースグループ
"resource.group.identifier": "Serverless_res_group_***_***", // Data Integration リソースグループ
"dataworks.node.type.xls": "/Software/bwm-client/conf/CodeProgramType.xls", // DataWorks ノードタイプテーブルのパス
"qps.limit": 5 // DataWorks に送信される API リクエストの 1 秒あたりのクエリ数 (QPS) の上限
}
}2.1. エンドポイント
DataWorks ワークスペースのリージョンに基づいてエンドポイントを選択できます。詳細については、以下をご参照ください:
4.2.2. ワークスペース ID と名前
DataWorks コンソールを開き、ワークスペースの詳細ページに移動します。右側の基本情報からワークスペース ID と名前を取得できます。
4.2.3. AccessKey ペアの作成と承認
ユーザーページで、ターゲットの DataWorks ワークスペースに対する管理者としての読み取りおよび書き込み権限を持つ AccessKey ペアを作成できます。
権限管理には 2 つの場所が関わります。Resource Access Management (RAM) ユーザーを使用する場合、まずその RAM ユーザーに DataWorks を操作する権限を付与する必要があります。
ポリシーページ: https://ram.console.alibabacloud.com/policies
次に、DataWorks ワークスペースで、アカウントにワークスペースの権限を割り当てます。
注意:AccessKey にネットワークアクセス制御ポリシーを設定できます。移行ツールが配置されているマシンの IP アドレスがネットワークにアクセスできるようにしてください。
2.4. リソースグループ
DataWorks ワークスペース詳細ページの左側のメニューバーから、リソースグループページに移動します。リソースグループをアタッチして、その ID を取得できます。
汎用リソースグループは、ノードのスケジューリングと Data Integration に使用できます。スケジューリングリソースグループ (resource.group.identifier) と Data Integration リソースグループ (di.resource.group.identifier) を同じ汎用リソースグループに設定できます。
2.5. QPS 設定
このツールは、DataWorks API を呼び出すことによってデータをインポートします。DataWorks のエディションによって、読み取りおよび書き込み OpenAPI 操作の 1 秒あたりのクエリ数 (QPS) と 1 日あたりの呼び出し制限が異なります。詳細については、「制限事項」をご参照ください。
DataWorks Basic、Standard、Professional の各エディションでは、"qps.limit" を 5 に設定できます。Enterprise Edition の場合は、"qps.limit" を 20 に設定できます。
注意:複数のインポートツールを同時に実行しないでください。
2.6. DataWorks ノードタイプ ID 設定
DataWorks では、一部のノードタイプにはリージョンごとに異なる TypeId が割り当てられます。特定の TypeID は、DataWorks のデータ開発インターフェイスによって決定されます。この特性は主にデータベースノードに適用されます。詳細については、「データベースノード」をご参照ください。
たとえば、MySQL ノードの NodeTypeId は、中国 (杭州) リージョンでは 1000039 ですが、中国 (深セン) リージョンでは 1000041 です。
DataWorks のこれらのリージョン差に対応するため、ツールは、ツールが使用するノード TypeId テーブルを設定できる構成可能なメソッドを提供します。
テーブルは、インポートツールの設定項目を使用してインポートされます:
"conf": {
"dataworks.node.type.xls": "/Software/bwm-client/conf/CodeProgramType.xls" // DataWorks ノードタイプテーブルのパス
}DataWorks のデータ開発インターフェイスからノードタイプ ID を取得するには、新しいワークフローを作成し、ワークフローに新しいノードを追加して保存し、そのワークフローの Spec を表示します。
ノードタイプが正しく設定されていない場合、ワークフローの公開時に次のエラーが報告されます。
4.3. DataWorks インポートツールの実行
インポートツールはコマンドラインから呼び出されます。コマンドは次のとおりです:
sh ./bin/run.sh write \
-c ./conf/<your_config_file>.json \
-f ./data/2_ConverterOutput/<conversion_output_package>.zip \
-o ./data/4_WriterOutput/<import_result_package>.zip \
-t dw-newide-writerコマンドにおいて、-c は設定ファイルのパス、-f は ConverterOutput パッケージのストレージパス、-o は WriterOutput パッケージのストレージパス、-t はサブミッションプラグインの名前です。
たとえば、プロジェクト A を DataWorks にインポートするには:
sh ./bin/run.sh write \
-c ./conf/projectA_write.json \
-f ./data/2_ConverterOutput/projectA_ConverterOutput.zip \
-o ./data/4_WriterOutput/projectA_WriterOutput.zip \
-t dw-newide-writerインポートツールは実行中にプロセス情報を出力します。プロセス中にエラーがないか確認できます。インポートが完了すると、成功したインポートと失敗したインポートに関する統計がコマンドラインに出力されます。一部のノードのインポートが失敗しても、全体のインポートプロセスは停止しないことに注意してください。少数のノードのインポートに失敗した場合は、DataWorks で手動で修正できます。
4.4. インポート結果の表示
インポートが完了したら、DataWorks で結果を表示できます。ワークフローのインポートプロセスをモニターすることもできます。問題を発見してインポートを停止する必要がある場合は、jps コマンドを実行して BwmClientApp を見つけ、kill -9 コマンドを実行してプロセスを終了させることができます。
5. Q&A
5.1. ソースは継続的に開発されています。これらの増分や変更を DataWorks にサブミットするにはどうすればよいですか?
移行ツールは上書きモードで動作します。エクスポート、変換、インポートのプロセスを再実行して、ソースからの増分変更を DataWorks にサブミットできます。ツールはワークフローを完全なパスで照合して、作成するか更新するかを決定することに注意してください。変更が正しく移行されるように、ワークフローを移動しないでください。
5.2. ソースは継続的に開発されており、DataWorks のワークフローは変換および管理されています。増分移行は DataWorks での変更を上書きしますか?
はい、上書きします。移行ツールは上書きモードで動作します。移行が完了した後にのみ、DataWorks で後続の変換を実行することを推奨します。または、段階的な移行アプローチを使用することもできます。ワークフローのバッチが移行され、再度上書きされないことを確認した後、DataWorks での変換を開始できます。異なるバッチは互いに影響しません。
5.3. パッケージ全体のインポートに時間がかかりすぎます。一部だけをインポートすることはできますか?
はい、できます。パッケージを手動でトリミングして部分的なインポートを実行できます。data/project/workflow フォルダで、インポートしたいワークフローを保持し、その他を削除します。その後、フォルダを再圧縮してパッケージにし、インポートツールを実行します。相互に依存関係のあるワークフローは一緒にインポートする必要があることに注意してください。そうしないと、それらの間のノードリネージが失われます。