このトピックでは、OpenAPI を使用してデータ統合タスクを作成、変更、削除し、ソースから宛先にデータを同期する方法について説明します。
前提条件
ワークフローを作成済みであること。 詳細については、「定期的なワークフローの作成」をご参照ください。
同期タスクに必要なデータソースを作成済みであること。 詳細については、「データソースの設定」をご参照ください。
制限事項
CreateDISyncTask 操作を呼び出してデータ統合タスクを作成する場合、タスクはコードエディタでのみ設定できます。 詳細については、「コードエディタでタスクを設定する」をご参照ください。
DataWorks は OpenAPI を使用したワークフローの作成をサポートしていません。 既存のワークフローを使用してデータ同期タスクを作成する必要があります。
SDK の取得
SDK の最新バージョンは、Alibaba Cloud OpenAPI Portal から取得できます。
設定フロー
環境を設定した後、関連する API 操作を呼び出して、ソースから宛先にデータを同期するデータ同期タスクを作成できます。 手順は次のとおりです。
手順
Data Integration でデータ同期ノードを作成します。
CreateDISyncTask 操作を呼び出して、データ統合タスクを作成します。 次のコードは、一部のパラメーターを設定する方法の例を示しています。 パラメーターの詳細については、「CreateDISyncTask」をご参照ください。
public void createFile() throws ClientException{ CreateDISyncTaskRequest request = new CreateDISyncTaskRequest(); request.setProjectId(181565L); request.setTaskType("DI_OFFLINE"); request.setTaskContent("{\"type\":\"job\",\"version\":\"2.0\",\"steps\":[{\"stepType\":\"mysql\",\"parameter\":{\"envType\":1,\"datasource\":\"dh_mysql\",\"column\":[\"id\",\"name\"],\"tableComment\":\"Comment for the table same\",\"connection\":[{\"datasource\":\"dh_mysql\",\"table\":[\"same\"]}],\"where\":\"\",\"splitPk\":\"id\",\"encoding\":\"UTF-8\"},\"name\":\"Reader\",\"category\":\"reader\"},{\"stepType\":\"odps\",\"parameter\":{\"partition\":\"pt=${bizdate}\",\"truncate\":true,\"datasource\":\"odps_source\",\"envType\":1,\"column\":[\"id\",\"name\"],\"emptyAsNull\":false,\"tableComment\":\"Comment for the table same\",\"table\":\"same\"},\"name\":\"Writer\",\"category\":\"writer\"}],\"setting\":{\"errorLimit\":{\"record\":\"\"},\"speed\":{\"throttle\":false,\"concurrent\":2}},\"order\":{\"hops\":[{\"from\":\"Reader\",\"to\":\"Writer\"}]}}"); request.setTaskParam("{\"FileFolderPath\":\"Workflow/new_biz/Data Integration\",\"ResourceGroup\":\"S_res_group_280749521950784_1602767279794\"}"); request.setTaskName("new_di_task_0607_1416"); String regionId = "cn-hangzhou"; // 環境変数 ALIBABA_CLOUD_ACCESS_KEY_ID と ALIBABA_CLOUD_ACCESS_KEY_SECRET が設定されていることを確認してください。https://www.alibabacloud.com/help/zh/alibaba-cloud-sdk-262060/latest/configure-credentials-378659 IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")); DefaultProfile.addEndpoint("cn-hangzhou","dataworks-public","dataworks.cn-hangzhou.aliyuncs.com"); IAcsClient client; client = new DefaultAcsClient(profile); CreateDISyncTaskResponse response1 = client.getAcsResponse(request); Gson gson1 = new Gson(); System.out.println(gson1.toJson(response1)); }UpdateFile 操作を呼び出して、スケジューリングパラメーターを更新します。
次の表にリクエストパラメーターを示します。
名前
タイプ
必須
例
説明
Action
String
はい
UpdateFile
実行する操作。
FileFolderPath
String
いいえ
Workflow/First Workflow/Data Integration/Folder 1/Folder 2
ファイルのパス。
ProjectId
Long
いいえ
10000
DataWorks ワークスペースの ID。 DataWorks コンソールにログインし、[ワークスペース管理] ページに移動してワークスペース ID を取得できます。
FileName
String
いいえ
ods_user_info_d
ファイルの名前。 FileName の値をリセットすることで、ファイル名を変更できます。
たとえば、ListFiles 操作を呼び出して、宛先フォルダ内のファイルの ID をクエリできます。 次に、UpdateFile 操作を呼び出し、FileId パラメーターをクエリされたファイル ID に設定し、FileName パラメーターを設定してファイルの名前を変更できます。
FileDescription
String
いいえ
This is a file description
ファイルの説明。
Content
String
いいえ
SELECT "1";
ファイルのコード。 コード形式は、ファイルタイプ (fileType) によって異なります。 [オペレーションセンター] に移動し、必要なタイプのタスクを右クリックして、[コードの表示] を選択すると、コード形式を表示できます。
AutoRerunTimes
Integer
はい
3
エラー発生後に許可される自動再実行の回数。
AutoRerunIntervalMillis
Integer
いいえ
120000
エラー発生後の自動再実行の間隔。 単位: ミリ秒。 最大値は 1,800,000 ミリ秒 (30 分) です。
このパラメーターは、DataWorks コンソール のデータ開発タスクの [プロパティ] タブの [時間プロパティ] セクションにある [再実行間隔] パラメーターに対応します。
コンソールの [再実行間隔] の単位は分です。 呼び出しを行うときは、単位の変換に注意してください。
RerunMode
String
いいえ
ALL_ALLOWED
再実行ポリシー。 有効な値:
ALL_ALLOWED: タスクが成功したかどうかに関係なく、タスクを再実行できます。
FAILURE_ALLOWED: タスクが失敗した場合にのみ、タスクを再実行できます。
ALL_DENIED: タスクが成功したかどうかに関係なく、タスクを再実行することはできません。
このパラメーターは、DataWorks コンソール のデータ開発タスクの [プロパティ] タブの [時間プロパティ] セクションにある [再実行ポリシー] パラメーターに対応します。
Stop
Boolean
いいえ
false
スケジューリングを一時停止するかどうかを指定します。 有効な値:
true: スケジューリングを一時停止します。
false: スケジューリングを一時停止しません。
このパラメーターは、DataWorks コンソール のデータ開発タスクの [プロパティ] タブの [時間プロパティ] セクションにある [スケジュールタイプ] の [スケジューリングの一時停止] オプションに対応します。
ParaValue
String
いいえ
x=a y=b z=c
スケジューリングパラメーター。
このパラメーターは、DataWorks コンソール のデータ開発タスクの [プロパティ] タブの [パラメーター] セクションに対応します。 このパラメーターの設定方法の詳細については、「スケジューリングパラメーター」をご参照ください。
StartEffectDate
Long
いいえ
936923400000
自動スケジューリングの開始を示す UNIX タイムスタンプ。 単位: ミリ秒。
このパラメーターは、DataWorks コンソール のデータ開発タスクの [プロパティ] タブの [時間プロパティ] セクションにある [有効日] に指定された開始時刻に対応します。
EndEffectDate
Long
いいえ
4155787800000
自動スケジューリングの終了を示す UNIX タイムスタンプ。 単位: ミリ秒。
このパラメーターは、DataWorks コンソール のデータ開発タスクの [プロパティ] タブの [時間プロパティ] セクションにある [有効日] に指定された終了時刻に対応します。
CronExpress
String
いいえ
00 00-59/5 1-23 * * ?
定期的なスケジューリングの cron 式。 このパラメーターは、DataWorks コンソール のデータ開発タスクの [プロパティ] タブの [時間プロパティ] セクションにある [Cron 式] パラメーターに対応します。 スケジューリングサイクルと時間指定スケジューリングを設定すると、DataWorks は自動的に cron 式を生成します。
例:
毎日 05:30 に実行するようにスケジュール:
00 30 05 * * ?。毎時 15 分に実行するようにスケジュール:
00 15 * * * ?。10 分ごとに実行するようにスケジュール:
00 00/10 * * * ?。毎日 08:00 から 17:00 まで 10 分ごとに実行するようにスケジュール:
00 00-59/10 8-17 * * * ?。毎月 1 日の 00:20 に実行するようにスケジュール:
00 20 00 1 * ?。1 月 1 日の 00:10 から 3 か月ごとに実行するようにスケジュール:
00 10 00 1 1-12/3 ?。火曜日と金曜日の 00:05 に実行するようにスケジュール:
00 05 00 * * 2,5。
DataWorks スケジューリングシステムは、cron 式に次の制限を課します:
最小スケジューリング間隔は 5 分です。
最も早いスケジューリング時刻は毎日 00:05 です。
CycleType
String
いいえ
NOT_DAY
スケジューリングサイクルのタイプ。 有効な値: NOT_DAY (分または時間) および DAY (日、週、または月)。
このパラメーターは、DataWorks コンソール のデータ開発タスクの [プロパティ] タブの [時間プロパティ] セクションにある [スケジューリングサイクル] パラメーターに対応します。
DependentType
String
いいえ
USER_DEFINE
前のサイクルへの依存関係。 有効な値:
SELF: ノードは自身に依存します。
CHILD: ノードは第 1 レベルの子ノードに依存します。
USER_DEFINE: ノードは他のノードに依存します。
NONE: ノードは前のサイクルに依存しません。
DependentNodeIdList
String
いいえ
5,10,15,20
現在のファイルが依存するノードの ID。 このパラメーターは、DependentType パラメーターを USER_DEFINE に設定した場合にのみ有効です。 複数のノード ID はコンマ (,) で区切ります。
このパラメーターは、DataWorks コンソール のデータ開発タスクの [プロパティ] タブの [サイクル間依存関係 (元の前のサイクル)] セクションの [依存関係] の [他のノード] オプションに対応します。
InputList
String
いいえ
project_root,project.file1,project.001_out
ファイルが依存する上流ファイルの出力名。 複数の出力名はコンマ (,) で区切ります。
このパラメーターは、DataWorks コンソール のデータ開発タスクの [プロパティ] タブの [依存関係] セクションの [上流ノードの出力名] に対応します。
説明このパラメーターは、CreateDISyncTask または UpdateFile を呼び出してバッチ同期タスクを作成するときに必要です。
ProjectIdentifier
String
いいえ
dw_project
DataWorks ワークスペースの名前。 DataWorks コンソールにログインし、[ワークスペース設定] ページに移動してワークスペース名を取得できます。
API 呼び出しが行われる DataWorks ワークスペースを決定するには、このパラメーターまたは ProjectId パラメーターのいずれかを設定する必要があります。
FileId
Long
はい
100000001
ファイルの ID。 ListFiles 操作を呼び出してファイル ID を取得できます。
OutputList
String
いいえ
dw_project.ods_user_info_d
ファイルの出力。
このパラメーターは、DataWorks コンソール のデータ開発タスクの [プロパティ] タブの [依存関係] セクションの [このノードの出力名] に対応します。
ResourceGroupIdentifier
String
いいえ
default_group
ファイルが公開された後にタスクが送信されるリソースグループ。 ListResourceGroups 操作を呼び出して、ワークスペースで使用可能なリソースグループをクエリできます。
ConnectionName
String
いいえ
odps_source
タスクの実行時にタスクが使用するデータソースの識別子。 ListDataSources 操作を呼び出して、利用可能なデータソースをクエリできます。
Owner
String
いいえ
18023848927592
ファイル所有者のユーザー ID。
AutoParsing
Boolean
いいえ
true
ファイルの自動解析を有効にするかどうかを指定します。 有効な値:
true: 自動解析が有効になります。
false: 自動解析が無効になります。
このパラメーターは、DataWorks コンソール のデータ開発タスクの [プロパティ] タブの [依存関係] セクションの [コードから入力と出力を解析] に対応します。
SchedulerType
String
いいえ
NORMAL
スケジューリングのタイプ。 有効な値:
NORMAL: タスクは通常スケジュールされたタスクです。
MANUAL: タスクはワンタイムタスクです。 定期的に実行されるようにはスケジュールされていません。 この値は、手動トリガーワークフロー内のノードに対応します。
PAUSE: タスクは一時停止されます。
SKIP: タスクはドライランタスクです。 定期的に実行されるようにスケジュールされていますが、スケジューリングシステムは、スケジュールされた時刻に達するとタスクを成功に設定します。
AdvancedSettings
String
いいえ
{"queue":"default","SPARK_CONF":"--conf spark.driver.memory=2g"}
タスクの詳細設定。
このパラメーターは、DataWorks コンソール の EMR Spark Streaming および EMR Streaming SQL タスクの編集ページの右側のナビゲーションウィンドウにある [詳細設定] に対応します。
このパラメーターは、EMR Spark Streaming および EMR Streaming SQL タスクでのみサポートされます。 パラメーター値は JSON 形式である必要があります。
StartImmediately
Boolean
いいえ
true
タスクが公開された直後にタスクを開始するかどうかを指定します。 有効な値:
true: タスクが公開された直後にタスクを開始します。
false: タスクが公開された直後にタスクを開始しません。
このパラメーターは、DataWorks コンソール の EMR Spark Streaming および EMR Streaming SQL タスクの編集ページの右側のナビゲーションウィンドウの [設定] タブの [時間プロパティ] セクションの [開始モード] に対応します。
InputParameters
String
いいえ
[{"ValueSource": "project_001.first_node:bizdate_param","ParameterName": "bizdate_input"}]
ノードのコンテキスト入力パラメーター。 値は JSON 形式である必要があります。 フィールドの詳細については、「GetFile 操作の戻り値における InputContextParameterList パラメーターの構造」をご参照ください。
このパラメーターは、DataWorks コンソール のデータ開発タスクの [プロパティ] タブの [ノードコンテキスト] セクションの [このノードの入力パラメーター] に対応します。
OutputParameters
String
いいえ
[{"Type": 1,"Value": "${bizdate}","ParameterName": "bizdate_param"}]
ノードのコンテキスト出力パラメーター。 値は JSON 形式である必要があります。 フィールドの詳細については、「GetFile 操作の戻り値における OutputContextParameterList パラメーターの構造」をご参照ください。
このパラメーターは、DataWorks コンソール のデータ開発タスクの [プロパティ] タブの [ノードコンテキスト] セクションの [このノードの出力パラメーター] に対応します。
データ統合タスクを送信します。
SubmitFile 操作を呼び出して、データ統合タスクを CDN マッピングシステムの開発環境にコミットします。 タスクがコミットされると、応答で deploymentId が返されます。 deploymentId に基づいて GetDeployment 操作を呼び出して、デプロイメントパッケージの詳細を取得できます。
public void submitFile() throws ClientException{ SubmitFileRequest request = new SubmitFileRequest(); request.setProjectId(78837L); request.setProjectIdentifier("zxy_8221431"); // このノード ID は、ノードを作成したときに返される ID です。 これは、データベースの File テーブルの file_id に対応します。 request.setFileId(501576542L); request.setComment("Comment"); SubmitFileResponse acsResponse = client.getAcsResponse(request); // DeploymentId は、コミットまたは公開操作の戻り値です。 Long deploymentId = acsResponse.getData(); log.info(acsResponse.toString()); }パラメーターの詳細については、「SubmitFile」および「GetDeployment」をご参照ください。
同期タスクを本番環境に公開できます。
DeployFile 操作を呼び出して、データ統合タスクを本番環境に公開します。
説明この操作は、標準モードのワークスペースに対してのみ実行する必要があります。
public void deploy() throws ClientException{ DeployFileRequest request = new DeployFileRequest(); request.setProjectIdentifier("zxy_8221431"); request.setFileId(501576542L); request.setComment("Comment"); // NodeId または file_id のいずれかを指定します。 NodeId の値は、スケジューリング設定の基本プロパティにあるノード ID です。 request.setNodeId(700004537241L); DeployFileResponse acsResponse = client.getAcsResponse(request); // DeploymentId は、コミットまたは公開操作の戻り値です。 Long deploymentId = acsResponse.getData(); log.info(acsResponse.getData().toString()); }上記のコードは、一部のパラメーターを設定する方法の例を示しています。 パラメーターの詳細については、「DeployFile」をご参照ください。
デプロイメントパッケージの詳細を取得します。
タスクが公開されると、応答で deploymentId が返されます。 deploymentId に基づいて GetDeployment 操作を呼び出して、デプロイメントパッケージの詳細を取得できます。 GetDeployment 操作の応答の Status パラメーターの値が 1 の場合、タスクは正常に公開されています。
public void getDeployment() throws ClientException{ GetDeploymentRequest request = new GetDeploymentRequest(); request.setProjectId(78837L); request.setProjectIdentifier("zxy_8221431"); // DeploymentId は、コミットまたは公開操作の戻り値です。 GetDeployment 操作を呼び出して、このデプロイメントの詳細を取得します。 request.setDeploymentId(2776067L); GetDeploymentResponse acsResponse = client.getAcsResponse(request); log.info(acsResponse.getData().toString()); }上記のコードは、一部のパラメーターを設定する方法の例を示しています。 パラメーターの詳細については、「GetDeployment」をご参照ください。
同期タスクの設定の変更
データ統合タスクを作成した後、UpdateDISyncTask 操作を呼び出してタスクの内容を更新するか、TaskParam パラメーターを使用してタスクの専用リソースグループを更新できます。 更新後、タスクを再度コミットして公開する必要があります。 詳細については、「手順」をご参照ください。
同期タスクの削除
DeleteFile 操作を呼び出して、データ統合タスクを削除できます。
サンプルコード
import com.alibaba.fastjson.JSONObject;
import com.aliyun.dataworks_public20200518.Client;
import com.aliyun.dataworks_public20200518.models.*;
import com.aliyun.teaopenapi.models.Config;
public class createofflineTask {
static Long createTask(String fileName) throws Exception {
Long projectId = 2043L;
String taskType = "DI_OFFLINE";
String taskContent = "{\n" +
" \"type\": \"job\",\n" +
" \"version\": \"2.0\",\n" +
" \"steps\": [\n" +
" {\n" +
" \"stepType\": \"mysql\",\n" +
" \"parameter\": {\n" +
" \"envType\": 0,\n" +
" \"datasource\": \"mysql_autotest_dev\",\n" +
" \"column\": [\n" +
" \"id\",\n" +
" \"name\"\n" +
" ],\n" +
" \"connection\": [\n" +
" {\n" +
" \"datasource\": \"mysql_autotest_dev\",\n" +
" \"table\": [\n" +
" \"user\"\n" +
" ]\n" +
" }\n" +
" ],\n" +
" \"where\": \"\",\n" +
" \"splitPk\": \"\",\n" +
" \"encoding\": \"UTF-8\"\n" +
" },\n" +
" \"name\": \"Reader\",\n" +
" \"category\": \"reader\"\n" +
" },\n" +
" {\n" +
" \"stepType\": \"odps\",\n" +
" \"parameter\": {\n" +
" \"partition\": \"pt=${bizdate}\",\n" +
" \"truncate\": true,\n" +
" \"datasource\": \"odps_source\",\n" +
" \"envType\": 0,\n" +
" \"column\": [\n" +
" \"id\",\n" +
" \"name\"\n" +
" ],\n" +
" \"emptyAsNull\": false,\n" +
" \"tableComment\": \"null\",\n" +
" \"table\": \"user\"\n" +
" },\n" +
" \"name\": \"Writer\",\n" +
" \"category\": \"writer\"\n" +
" }\n" +
" ],\n" +
" \"setting\": {\n" +
" \"executeMode\": null,\n" +
" \"errorLimit\": {\n" +
" \"record\": \"\"\n" +
" },\n" +
" \"speed\": {\n" +
" \"concurrent\": 2,\n" +
" \"throttle\": false\n" +
" }\n" +
" },\n" +
" \"order\": {\n" +
" \"hops\": [\n" +
" {\n" +
" \"from\": \"Reader\",\n" +
" \"to\": \"Writer\"\n" +
" }\n" +
" ]\n" +
" }\n" +
"}";
CreateDISyncTaskRequest request = new CreateDISyncTaskRequest();
request.setProjectId(projectId);
request.setTaskType(taskType);
request.setTaskContent(taskContent);
request.setTaskName(fileName);
request.setTaskParam("{\"FileFolderPath\":\"Workflow/Automated_Testing_Workspace_Do_Not_Modify/Data_Integration\",\"ResourceGroup\":\"S_res_group_XXX\"}");
// データ統合専用リソースグループを使用します。
CreateDISyncTaskResponse response1 = client.getAcsResponse(request);
return response1.getData().getFileId();
}
public static void updateFile(Long fileId) throws Exception {
UpdateFileRequest request = new UpdateFileRequest();
request.setProjectId(2043L);
request.setFileId(fileId);
request.setAutoRerunTimes(3);
request.setRerunMode("FAILURE_ALLOWED");
request.setCronExpress("00 30 05 * * ?");
request.setCycleType("DAY");
request.setResourceGroupIdentifier("S_res_group_XXX");
// 専用スケジューリングリソースグループを使用します。
request.setInputList("dataworks_di_autotest_root");
request.setAutoParsing(true);
request.setDependentNodeIdList("5,10,15,20");
request.setDependentType("SELF");
request.setStartEffectDate(0L);
request.setEndEffectDate(4155787800000L);
request.setFileDescription("description");
request.setStop(false);
request.setParaValue("x=a y=b z=c");
request.setSchedulerType("NORMAL");
request.setAutoRerunIntervalMillis(120000);
UpdateFileResponse response1 = client.getAcsResponse(request);
}
static void deleteTask(Long fileId) throws Exception {
DeleteFileRequest request = new DeleteFileRequest();
Long projectId = 63845L;
request.setProjectId(projectId);
request.setFileId(fileId);
String akId = "XXX";
String akSecret = "XXXX";
String regionId = "cn-hangzhou";
IClientProfile profile = DefaultProfile.getProfile(regionId, akId, akSecret);
DefaultProfile.addEndpoint("cn-hangzhou","dataworks-public","dataworks.cn-hangzhou.aliyuncs.com");
IAcsClient client;
client = new DefaultAcsClient(profile);
DeleteFileResponse response1 = client.getAcsResponse(request);
System.out.println(JSONObject.toJSONString(response1));
}
static IAcsClient client;
public static void main(String[] args) throws Exception {
String akId = "XX";
// 環境変数 ALIBABA_CLOUD_ACCESS_KEY_ID と ALIBABA_CLOUD_ACCESS_KEY_SECRET が設定されていることを確認してください。https://www.alibabacloud.com/help/zh/alibaba-cloud-sdk-262060/latest/configure-credentials-378659
IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
DefaultProfile.addEndpoint(regionId, "dataworks-public", "dataworks." + regionId + ".aliyuncs.com");
client = new DefaultAcsClient(profile);
String taskName = "offline_job_0930_1648";
Long fileId = createTask(taskName); // データ統合タスクを作成します。
updateFile(fileId); // データ統合タスクのスケジューリングプロパティを変更します。
}
}