DataWorks では、Oozie、Azkaban、Airflow、DolphinScheduler などのオープンソースのスケジューリングエンジンから DataWorks にタスクを移行できます。このトピックでは、オープンソースのスケジューリングエンジンからタスクをエクスポートする方法について説明します。
背景情報
オープンソースのスケジューリングエンジンのタスクを DataWorks にインポートする前に、タスクをオンプレミスマシンまたは Object Storage Service (OSS) にエクスポートする必要があります。インポート手順の詳細については、「オープンソースエンジンのタスクをインポートする」をご参照ください。
Oozie からタスクをエクスポートする
エクスポート要件
パッケージには、タスクの XML 形式の定義ファイルと構成ファイルが含まれている必要があります。パッケージは ZIP 形式でエクスポートされます。
パッケージの構造
Oozie タスクの説明は、Hadoop 分散ファイルシステム (HDFS) ディレクトリに保存されます。たとえば、Apache Oozie の公式 Web サイトでは、Examples パッケージの apps ディレクトリの下の各サブディレクトリは Oozie のタスクです。各サブディレクトリには、タスクの XML 形式の定義ファイルと構成ファイルが含まれています。次の図は、エクスポートされたパッケージの構造を示しています。
Azkaban からジョブをエクスポートする
フローをダウンロードする
Azkaban はコンソールを提供します。コンソールで特定のフローをダウンロードできます。
Azkaban コンソールにログオンし、[プロジェクト] ページに移動します。
パッケージをダウンロードするプロジェクトを選択します。プロジェクトページで、[フロー] をクリックして、プロジェクトのすべてのフローを表示します。
ページの右上隅にある [ダウンロード] をクリックして、プロジェクトのパッケージをダウンロードします。
ネイティブの Azkaban パッケージをエクスポートできます。Azkaban のパッケージには形式の制限はありません。ZIP 形式でエクスポートされたパッケージには、Azkaban の特定のプロジェクトのすべてのジョブとリレーションシップに関する情報が含まれています。Azkaban コンソールからエクスポートされた ZIP パッケージを、DataWorks の スケジューリングエンジンインポート ページに直接アップロードできます。
変換ロジック
次の表に、Azkaban 項目と DataWorks 項目のマッピング、および変換ロジックを示します。
Azkaban 項目 | DataWorks 項目 | 変換ロジック |
フロー | DataStudio のワークフロー | フロー内のジョブは、フローに対応するワークフローに配置され、ワークフロー内のノードとして使用されます。 フロー内のネストされたフローは、DataWorks の個別のワークフローに変換されます。変換後、ワークフロー内のノード間の依存関係が自動的に確立されます。 |
コマンドタイプのジョブ | Shell ノード | EMR モードの DataWorks では、コマンドタイプのジョブは E-MapReduce (EMR) Shell ノードに変換されます。[詳細設定] ダイアログボックスで関連パラメータを構成することにより、マップされたノードタイプを指定できます。 コマンドタイプのジョブの CLI で他のスクリプトを呼び出す場合、分析後に取得されたスクリプトファイルを DataWorks のリソースファイルとして登録し、変換された Shell コードでリソースファイルを参照できます。 |
Hive タイプのジョブ | ODPS SQL ノード | MaxCompute モードの DataWorks では、Hive タイプのジョブは ODPS SQL ノードに変換されます。[詳細設定] ダイアログボックスで関連パラメータを構成することにより、マップされたノードタイプを指定できます。 |
DataWorks でサポートされていない他のタイプのノード | ゼロロードノードまたは Shell ノード | [詳細設定] ダイアログボックスで関連パラメータを構成することにより、マップされたノードタイプを指定できます。 |

Airflow からタスクをエクスポートする
制限
Airflow 1.10.x のみで、Python 3.6 以後に依存する Airflow タスクをエクスポートできます。
手順
Airflow のランタイム環境に移動します。
Airflow の Python ライブラリを使用して、Airflow でスケジュールされている有向非循環グラフ (DAG) フォルダを読み込みます。DAG Python ファイルは DAG フォルダに保存されます。
エクスポートツールを使用して、メモリ内の Airflow の Python ライブラリに基づいて、DAG Python ファイルに保存されているタスク情報と依存関係を読み取ります。次に、生成された DAG 情報を JSON ファイルに書き込み、ファイルをエクスポートします。
DataWorks [クラウドタスク] の [スケジューリングエンジンエクスポート] ページで [移行アシスタント] からエクスポートツールをダウンロードできます。[スケジューリングエンジンエクスポート] ページに移動する方法については、別のオープンソースエンジンのタスクをエクスポートする を参照してください。
エクスポートツールの使用上の注意
エクスポートツールの使用上の注意:
次のコマンドを実行して、airflow-exporter.tgz パッケージを解凍します。
tar zxvf airflow-exporter.tgz次のコマンドを実行して、PYTHONPATH パラメータを Python ライブラリのディレクトリに設定します。
export PYTHONPATH=/usr/local/lib/python3.6/site-packages次のコマンドを実行して、Airflow からタスクをエクスポートします。
cd airflow-exporter python3.6 ./parser -d /path/to/airflow/dag/floder/ -o output.json次のコマンドを実行して、エクスポートされた output.json ファイルを ZIP パッケージに圧縮します。
zip out.zip output.json
ZIP パッケージが生成されたら、次の操作を実行してインポートタスクを作成し、ZIP パッケージを DataWorks にインポートできます。DataWorks コンソールの [移行アシスタント] ページに移動します。左側のナビゲーションウィンドウで、 を選択します。詳細については、「オープンソースエンジンのタスクをインポートする」をご参照ください。
DolphinScheduler からノードをエクスポートする
DolphinScheduler ノードを 旧バージョンの DataStudio と 新バージョンの DataStudio にインポートできます。
仕組み
DataWorks エクスポートツールは、DolphinScheduler から一度に複数のプロセスをエクスポートするために使用される API 操作を呼び出すことによって、DolphinScheduler 内のプロセスの JSON 構成を取得します。エクスポートツールは、JSON 構成に基づいて ZIP ファイルを生成します。次に、DataWorks エクスポートツールは dolphinscheduler_to_dataworks コンバーターを使用して ZIP ファイルを DataWorks でサポートされているタイプのファイルまたはタスクに変換し、DolphinScheduler ノードのインポート用に新しく作成されたインポートタスクを使用して ZIP ファイル内のコードと依存関係を解析および変換し、最終的に変換結果を DataWorks ワークスペースにインポートします。
制限
バージョンの制限: DataWorks エクスポートツールを使用して、DolphinScheduler 1.3.x、2.x、および 3.x の DolphinScheduler ノードのみをエクスポートし、DataWorks にインポートできます。
ノードタイプの変換の制限:
SQL ノード: SQL ノードの変換は、一部のタイプのコンピューティングエンジンでのみサポートされています。ノードタイプの変換中に、SQL コードの構文は変換されず、SQL コードは変更されません。
Cron 式: 特定のシナリオでは、Cron 式が削除されるか、Cron 式がサポートされない場合があります。構成されているスケジュール時間がビジネス要件を満たしているかどうかを確認する必要があります。スケジュール時間については、「時間プロパティを構成する」をご参照ください。
Python ノード: DataWorks は Python ノードを提供していません。DataWorks は、DolphinScheduler の Python ノードを Python ファイルリソースと、Python ファイルリソースを参照する Shell ノードに変換できます。ただし、Python ノードのスケジュールパラメータが渡されるときに問題が発生する可能性があります。したがって、デバッグとチェックが必要です。スケジュールパラメータについては、「スケジュールパラメータを構成して使用する」をご参照ください。
依存ノード: DataWorks は、サイクル間スケジュール依存関係が構成されている依存ノードを変換できません。依存ノードに対してサイクル間スケジュール依存関係が構成されている場合、DataWorks は依存関係を DataWorks 内のマップされた自動トリガーノードの同一サイクルスケジュール依存関係に変換します。同一サイクルスケジュール依存関係の構成方法については、「同一サイクルスケジュール依存関係を構成する」をご参照ください。
タスクタイプマッピングを構成する
このセクションの手順を実行して、DataWorks エクスポートツールをダウンロードし、タスクタイプマッピングを構成できます。
エクスポートツールをダウンロードします。
エクスポートツールのソースコード をダウンロードします。
タスクタイプマッピングを構成します。
エクスポートツールのディレクトリに移動し、
lib、bin、およびconfディレクトリを表示します。confディレクトリのdataworks-transformer-config.jsonファイルのマッピングを変更する必要があります。パラメータの説明:
次のコードは、DolphinScheduler ノードを
ODPSタイプのオブジェクトに変換するために、dataworks-transformer-config.jsonファイルで構成する必要があるパラメータを示しています。{ "format": "WORKFLOW", "locale": "zh_CN", "skipUnSupportType": true, "transformContinueWithError": true, "specContinueWithError": true, "processFilter": { "releaseState": "ONLINE", "includeSubProcess": true } "settings": { "workflow.converter.shellNodeType": "DIDE_SHELL", "workflow.converter.commandSqlAs": "ODPS_SQL", "workflow.converter.sparkSubmitAs": "ODPS_SPARK", "workflow.converter.target.unknownNodeTypeAs": "DIDE_SHELL", "workflow.converter.mrNodeType": "ODPS_MR", "workflow.converter.target.engine.type": "ODPS", "workflow.converter.dolphinscheduler.sqlNodeTypeMapping": { "POSTGRESQL": "POSTGRESQL", "MYSQL": "MYSQL" } }, "replaceMapping": [ { "taskType": "SHELL", "desc": "$[yyyyMMdd-1]", "pattern": "\$\[yyyyMMdd-1\]", "target": "\${dt}", "param": "dt=$[yyyyMMdd-1]" }, { "taskType": "PYTHON", "desc": "$[yyyyMMdd-1]", "pattern": "\$\[yyyyMMdd-1\]", "target": "\${dt}", "param": "dt=$[yyyyMMdd-1]" } ] }パラメータ
説明
format
データを移行する場合は、移行先ワークスペースが新バージョンの DataStudio のパブリックプレビューに参加しているかどうかによって、このパラメータを構成する必要があります。
移行先ワークスペースが新バージョンの DataStudio のパブリックプレビューに参加している場合は、このパラメータを
WORKFLOWに設定する必要があります。移行先ワークスペースが新バージョンの DataStudio のパブリックプレビューに参加していない場合は、このパラメータを
SPECに設定する必要があります。
必須。
locale
言語環境。デフォルト値:
zh_CN。skipUnSupportType
タスクタイプの変換中に、サポートされていないタイプをスキップするかどうかを指定します。有効な値: true および false。
このパラメータを
trueに設定すると、サポートされていないタイプはスキップされます。このパラメータを
falseに設定すると、変換は失敗します。
transformContinueWithError
タスクタイプの変換中にエラーが発生した場合に、変換を続行するかどうかを指定します。有効な値: true および false。
このパラメータを
trueに設定すると、変換は続行されます。このパラメータを
falseに設定すると、変換は停止します。
specContinueWithError
タスクタイプの変換が失敗した場合に、変換を続行するかどうかを指定します。有効な値: true および false。
このパラメータを
trueに設定すると、変換は続行されます。このパラメータを
falseに設定すると、変換は停止します。
processFilter
releaseState
状態が ONLINE のフローを処理するために使用されるフィルタ条件。
タスクタイプの変換中のフィルタリングがサポートされています。DolphinScheduler ノードを DataWorks にインポートする前にフィルタリングする場合は、このパラメータを構成する必要があります。
includeSubProcess
状態が ONLINE のサププロセスを処理するかどうかを指定します。
settings
workflow.converter.shellNodeType
ソースシステムの Shell ノードが移行先システムでマップされるオブジェクトタイプ。値の例: DIDE_SHELL。
必須。
workflow.converter.commandSqlAs
移行先システムで SQL ノードを実行するために使用されるエンジンのタイプ。値の例: ODPS_SQL。
workflow.converter.sparkSubmitAs
移行先システムで Spark エンジンを使用して送信されたノードを実行するために使用されるエンジンのタイプ。値の例: ODPS_SPARK。
workflow.converter.target.unknownNodeTypeAs
不明なタイプのノードがマップされるデフォルトのオブジェクトタイプ。値の例: DIDE_SHELL。
workflow.converter.mrNodeType
移行先システムで MapReduce ノードを実行するために使用されるエンジンのタイプ。値の例: ODPS_MR。
workflow.converter.target.engine.type
デフォルトで使用されるエンジンのタイプ。値の例: ODPS。
workflow.converter.dolphinscheduler.sqlNodeTypeMapping
DolphinScheduler の SQL ノードのデータベースと移行先システムのデータベースのマッピング。値の例:
"POSTGRESQL": "POSTGRESQL""MYSQL": "MYSQL"
replaceMapping
taskType
ルールを適用できるタスクタイプ。値の例: Shell または Python。
正規表現に基づいて一致するノードの内容を置き換えることができます。
desc
説明。これは情報フィールドであり、処理には関与しません。このパラメータは空のままにすることができます。
pattern
置き換える必要のある正規表現のパターン。値の例: $[yyyyMMdd-1]。
target
置き換え後に取得される移行先文字列。値の例: ${dt}。
param
移行先文字列に割り当てる値。たとえば、コード変数の値をノード変数 (dt=$[yyyyMMdd-1] など) に割り当てます。
DolphinScheduler からノードをエクスポートする
エクスポートツールを使用して、DolphinScheduler ジョブを ZIP ファイルとしてエクスポートできます。次のコードはサンプルコマンドを示しています。次のコードを使用する場合は、ビジネス要件に基づいてパラメータを構成する必要があります。次のコマンドを実行した後、DolphinScheduler のリソースは、-f で指定されたファイル名を使用してカレントディレクトリに保存されます。
python3 bin/reader.py \
-a dolphinscheduler \
-e http://xxx.xxx.xxx.xxx:12345 \
-t {token} \
-v 1.3.9 \
-p 123456,456256 \
-f project_dp01.zip\
-sr false;パラメータ | 説明 |
-a | ノードをエクスポートするシステムのタイプ。このパラメータを |
-e | インターネット経由で DolphinScheduler にアクセスするために使用される URL。 |
-t | DolphinScheduler アプリケーションの |
-v | エクスポートする DolphinScheduler ノードのバージョン。 |
-p | ノードをエクスポートする DolphinScheduler プロジェクトの名前。複数の名前を構成できます。複数の名前を構成する場合は、コンマ (,) で区切ります。 |
-f | エクスポート後に取得される圧縮ファイルの名前。zip 形式の圧縮パッケージのみがサポートされています。 |
-sr | リソースのダウンロードをスキップするかどうかを指定します。有効な値: true および false。デフォルト値は true です。このパラメータを 説明
|
タスクタイプを変換する
次のスクリプトを構成して実行し、このトピックの タスクタイプマッピングを構成する セクションで定義されている dataworks-transformer-config.json 構成ファイルに基づいて、dolphinscheduler_to_dataworks コンバーターを使用して、DolphinScheduler プロジェクト内のファイルを DataWorks のファイルまたはタスクに変換できます。
python3 bin/transformer.py \
-a dolphinscheduler_to_dataworks \
-c conf/dataworks-transformer-config.json \
-s project_dp01.zip \
-t project_dw01.zip;パラメータ | 説明 |
-a | コンバーターの名前。デフォルト値: |
-c | 変換用の構成ファイル。このパラメータのデフォルト値は、タスクタイプマッピングを構成する ときに構成した |
-s | 変換する必要のある DolphinScheduler ファイルの名前。このパラメータの値は、DolphinScheduler からノードをエクスポートする ときにエクスポートされる結果ファイルです。 |
-t | DolphinScheduler ファイルの変換後に取得される結果ファイルの名前。結果ファイルは |
DolphinScheduler ノードを DataWorks にインポートする
次のスクリプトを実行して、変換後に取得されたファイルを DataWorks ワークスペースにインポートできます。
python3 bin/writer.py \
-a dataworks \
-e dataworks.cn-shanghai.aliyuncs.com \
-i $ALIYUN_ACCESS_KEY_ID \
-k $ALIYUN_ACCESS_KEY_SECRET \
-p $ALIYUN_DATAWORKS_WORKSPACE_ID \
-r cn-shanghai \
-f project_dw01.zip \
-t SPEC;パラメータ | 説明 |
-a | ファイルをインポートするシステムのタイプ。このパラメータのデフォルト値は |
-e | DataWorks API 操作のエンドポイント。 エンドポイント を参照して、ワークスペースが存在するリージョンに基づいてこのパラメーターの値を決定できます。 |
-i | Alibaba Cloud アカウントの AccessKey ID。Alibaba Cloud アカウントには、ワークスペースにオブジェクトをインポートするための権限が必要です。 |
-k | Alibaba Cloud アカウントの AccessKey シークレット。Alibaba Cloud アカウントには、ワークスペースにオブジェクトをインポートするための権限が必要です。 |
-p | ワークスペース ID。前のスクリプトを実行した後にデータが書き込まれるワークスペースを指定します。 |
-r | ワークスペースが存在するリージョンの ID。 エンドポイント を参照して、リージョン ID を取得できます。 |
-f | ワークスペースにインポートするファイル。このパラメータの値は、タスクタイプを変換する ときに取得した結果ファイルです。 |
-t | インポート環境。新バージョンの DataStudio のパブリックプレビューに参加しているワークスペースにファイルをインポートする場合は、このパラメータを構成する必要はありません。 |
上記の操作が完了したら、移行先ワークスペースに移動して、移行状況を確認できます。
他のオープンソースエンジンからタスクをエクスポートする
DataWorks は、Oozie、Azkaban、Airflow、および DolphinScheduler 以外のオープンソースエンジンのタスクをエクスポートするための標準テンプレートを提供します。タスクをエクスポートする前に、[標準テンプレート] をダウンロードし、テンプレートのファイル構造に基づいてコンテンツを変更する必要があります。[スケジューリングエンジンエクスポート] ページに移動して、標準テンプレートをダウンロードし、ファイル構造を表示できます。
DataWorks コンソール にログオンします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、 を選択します。表示されるページで、ドロップダウンリストから目的のワークスペースを選択し、[データ開発に移動] をクリックします。
DataStudio ページで、左上隅にある
アイコンをクリックし、 を選択します。左側のナビゲーションウィンドウで、 を選択します。
[標準テンプレート] タブをクリックします。
[標準テンプレート] タブで、[標準フォーマットテンプレート] をクリックしてテンプレートをダウンロードします。
テンプレートのコンテンツを変更して、エクスポートするパッケージを生成します。