すべてのプロダクト
Search
ドキュメントセンター

DataWorks:OpenAPI を使用して単一テーブルのバッチ同期タスクを管理する

最終更新日:Nov 20, 2025

このトピックでは、OpenAPI を使用してデータ統合タスクを作成、変更、削除し、ソースから宛先にデータを同期する方法について説明します。

前提条件

制限事項

  • CreateDISyncTask 操作を呼び出してデータ統合タスクを作成する場合、タスクはコードエディタでのみ設定できます。 詳細については、「コードエディタでタスクを設定する」をご参照ください。

  • DataWorks は OpenAPI を使用したワークフローの作成をサポートしていません。 既存のワークフローを使用してデータ同期タスクを作成する必要があります。

SDK の取得

SDK の最新バージョンは、Alibaba Cloud OpenAPI Portal から取得できます。

設定フロー

環境を設定した後、関連する API 操作を呼び出して、ソースから宛先にデータを同期するデータ同期タスクを作成できます。 手順は次のとおりです。

  1. データ統合タスクを作成します。

  2. タスクのスケジューリング依存関係を設定します。

  3. データ統合タスクをコミットします。

  4. 同期タスクを本番環境に公開します。

手順

  1. Data Integration でデータ同期ノードを作成します。

    CreateDISyncTask 操作を呼び出して、データ統合タスクを作成します。 次のコードは、一部のパラメーターを設定する方法の例を示しています。 パラメーターの詳細については、「CreateDISyncTask」をご参照ください。

    public void createFile() throws ClientException{
            CreateDISyncTaskRequest request = new CreateDISyncTaskRequest();
            request.setProjectId(181565L);
            request.setTaskType("DI_OFFLINE");
            request.setTaskContent("{\"type\":\"job\",\"version\":\"2.0\",\"steps\":[{\"stepType\":\"mysql\",\"parameter\":{\"envType\":1,\"datasource\":\"dh_mysql\",\"column\":[\"id\",\"name\"],\"tableComment\":\"Comment for the table same\",\"connection\":[{\"datasource\":\"dh_mysql\",\"table\":[\"same\"]}],\"where\":\"\",\"splitPk\":\"id\",\"encoding\":\"UTF-8\"},\"name\":\"Reader\",\"category\":\"reader\"},{\"stepType\":\"odps\",\"parameter\":{\"partition\":\"pt=${bizdate}\",\"truncate\":true,\"datasource\":\"odps_source\",\"envType\":1,\"column\":[\"id\",\"name\"],\"emptyAsNull\":false,\"tableComment\":\"Comment for the table same\",\"table\":\"same\"},\"name\":\"Writer\",\"category\":\"writer\"}],\"setting\":{\"errorLimit\":{\"record\":\"\"},\"speed\":{\"throttle\":false,\"concurrent\":2}},\"order\":{\"hops\":[{\"from\":\"Reader\",\"to\":\"Writer\"}]}}");
            request.setTaskParam("{\"FileFolderPath\":\"Workflow/new_biz/Data Integration\",\"ResourceGroup\":\"S_res_group_280749521950784_1602767279794\"}");
            request.setTaskName("new_di_task_0607_1416");
    
    
            String regionId = "cn-hangzhou";
            // 環境変数 ALIBABA_CLOUD_ACCESS_KEY_ID と ALIBABA_CLOUD_ACCESS_KEY_SECRET が設定されていることを確認してください。https://www.alibabacloud.com/help/zh/alibaba-cloud-sdk-262060/latest/configure-credentials-378659
            IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
            DefaultProfile.addEndpoint("cn-hangzhou","dataworks-public","dataworks.cn-hangzhou.aliyuncs.com");
            IAcsClient client;
    
            client = new DefaultAcsClient(profile);
    
            CreateDISyncTaskResponse response1 = client.getAcsResponse(request);
            Gson gson1 = new Gson();
    
            System.out.println(gson1.toJson(response1));
    }
  2. UpdateFile 操作を呼び出して、スケジューリングパラメーターを更新します。

    次の表にリクエストパラメーターを示します。

    名前

    タイプ

    必須

    説明

    Action

    String

    はい

    UpdateFile

    実行する操作。

    FileFolderPath

    String

    いいえ

    Workflow/First Workflow/Data Integration/Folder 1/Folder 2

    ファイルのパス。

    ProjectId

    Long

    いいえ

    10000

    DataWorks ワークスペースの ID。 DataWorks コンソールにログインし、[ワークスペース管理] ページに移動してワークスペース ID を取得できます。

    FileName

    String

    いいえ

    ods_user_info_d

    ファイルの名前。 FileName の値をリセットすることで、ファイル名を変更できます。

    たとえば、ListFiles 操作を呼び出して、宛先フォルダ内のファイルの ID をクエリできます。 次に、UpdateFile 操作を呼び出し、FileId パラメーターをクエリされたファイル ID に設定し、FileName パラメーターを設定してファイルの名前を変更できます。

    FileDescription

    String

    いいえ

    This is a file description

    ファイルの説明。

    Content

    String

    いいえ

    SELECT "1";

    ファイルのコード。 コード形式は、ファイルタイプ (fileType) によって異なります。 [オペレーションセンター] に移動し、必要なタイプのタスクを右クリックして、[コードの表示] を選択すると、コード形式を表示できます。

    AutoRerunTimes

    Integer

    はい

    3

    エラー発生後に許可される自動再実行の回数。

    AutoRerunIntervalMillis

    Integer

    いいえ

    120000

    エラー発生後の自動再実行の間隔。 単位: ミリ秒。 最大値は 1,800,000 ミリ秒 (30 分) です。

    このパラメーターは、DataWorks コンソール のデータ開発タスクの [プロパティ] タブの [時間プロパティ] セクションにある [再実行間隔] パラメーターに対応します。

    コンソールの [再実行間隔] の単位は分です。 呼び出しを行うときは、単位の変換に注意してください。

    RerunMode

    String

    いいえ

    ALL_ALLOWED

    再実行ポリシー。 有効な値:

    • ALL_ALLOWED: タスクが成功したかどうかに関係なく、タスクを再実行できます。

    • FAILURE_ALLOWED: タスクが失敗した場合にのみ、タスクを再実行できます。

    • ALL_DENIED: タスクが成功したかどうかに関係なく、タスクを再実行することはできません。

    このパラメーターは、DataWorks コンソール のデータ開発タスクの [プロパティ] タブの [時間プロパティ] セクションにある [再実行ポリシー] パラメーターに対応します。

    Stop

    Boolean

    いいえ

    false

    スケジューリングを一時停止するかどうかを指定します。 有効な値:

    • true: スケジューリングを一時停止します。

    • false: スケジューリングを一時停止しません。

    このパラメーターは、DataWorks コンソール のデータ開発タスクの [プロパティ] タブの [時間プロパティ] セクションにある [スケジュールタイプ] の [スケジューリングの一時停止] オプションに対応します。

    ParaValue

    String

    いいえ

    x=a y=b z=c

    スケジューリングパラメーター。

    このパラメーターは、DataWorks コンソール のデータ開発タスクの [プロパティ] タブの [パラメーター] セクションに対応します。 このパラメーターの設定方法の詳細については、「スケジューリングパラメーター」をご参照ください。

    StartEffectDate

    Long

    いいえ

    936923400000

    自動スケジューリングの開始を示す UNIX タイムスタンプ。 単位: ミリ秒。

    このパラメーターは、DataWorks コンソール のデータ開発タスクの [プロパティ] タブの [時間プロパティ] セクションにある [有効日] に指定された開始時刻に対応します。

    EndEffectDate

    Long

    いいえ

    4155787800000

    自動スケジューリングの終了を示す UNIX タイムスタンプ。 単位: ミリ秒。

    このパラメーターは、DataWorks コンソール のデータ開発タスクの [プロパティ] タブの [時間プロパティ] セクションにある [有効日] に指定された終了時刻に対応します。

    CronExpress

    String

    いいえ

    00 00-59/5 1-23 * * ?

    定期的なスケジューリングの cron 式。 このパラメーターは、DataWorks コンソール のデータ開発タスクの [プロパティ] タブの [時間プロパティ] セクションにある [Cron 式] パラメーターに対応します。 スケジューリングサイクルと時間指定スケジューリングを設定すると、DataWorks は自動的に cron 式を生成します。

    例:

    • 毎日 05:30 に実行するようにスケジュール: 00 30 05 * * ?

    • 毎時 15 分に実行するようにスケジュール: 00 15 * * * ?

    • 10 分ごとに実行するようにスケジュール: 00 00/10 * * * ?

    • 毎日 08:00 から 17:00 まで 10 分ごとに実行するようにスケジュール: 00 00-59/10 8-17 * * * ?

    • 毎月 1 日の 00:20 に実行するようにスケジュール: 00 20 00 1 * ?

    • 1 月 1 日の 00:10 から 3 か月ごとに実行するようにスケジュール: 00 10 00 1 1-12/3 ?

    • 火曜日と金曜日の 00:05 に実行するようにスケジュール: 00 05 00 * * 2,5

    DataWorks スケジューリングシステムは、cron 式に次の制限を課します:

    • 最小スケジューリング間隔は 5 分です。

    • 最も早いスケジューリング時刻は毎日 00:05 です。

    CycleType

    String

    いいえ

    NOT_DAY

    スケジューリングサイクルのタイプ。 有効な値: NOT_DAY (分または時間) および DAY (日、週、または月)。

    このパラメーターは、DataWorks コンソール のデータ開発タスクの [プロパティ] タブの [時間プロパティ] セクションにある [スケジューリングサイクル] パラメーターに対応します。

    DependentType

    String

    いいえ

    USER_DEFINE

    前のサイクルへの依存関係。 有効な値:

    • SELF: ノードは自身に依存します。

    • CHILD: ノードは第 1 レベルの子ノードに依存します。

    • USER_DEFINE: ノードは他のノードに依存します。

    • NONE: ノードは前のサイクルに依存しません。

    DependentNodeIdList

    String

    いいえ

    5,10,15,20

    現在のファイルが依存するノードの ID。 このパラメーターは、DependentType パラメーターを USER_DEFINE に設定した場合にのみ有効です。 複数のノード ID はコンマ (,) で区切ります。

    このパラメーターは、DataWorks コンソール のデータ開発タスクの [プロパティ] タブの [サイクル間依存関係 (元の前のサイクル)] セクションの [依存関係] の [他のノード] オプションに対応します。

    InputList

    String

    いいえ

    project_root,project.file1,project.001_out

    ファイルが依存する上流ファイルの出力名。 複数の出力名はコンマ (,) で区切ります。

    このパラメーターは、DataWorks コンソール のデータ開発タスクの [プロパティ] タブの [依存関係] セクションの [上流ノードの出力名] に対応します。

    説明

    このパラメーターは、CreateDISyncTask または UpdateFile を呼び出してバッチ同期タスクを作成するときに必要です。

    ProjectIdentifier

    String

    いいえ

    dw_project

    DataWorks ワークスペースの名前。 DataWorks コンソールにログインし、[ワークスペース設定] ページに移動してワークスペース名を取得できます。

    API 呼び出しが行われる DataWorks ワークスペースを決定するには、このパラメーターまたは ProjectId パラメーターのいずれかを設定する必要があります。

    FileId

    Long

    はい

    100000001

    ファイルの ID。 ListFiles 操作を呼び出してファイル ID を取得できます。

    OutputList

    String

    いいえ

    dw_project.ods_user_info_d

    ファイルの出力。

    このパラメーターは、DataWorks コンソール のデータ開発タスクの [プロパティ] タブの [依存関係] セクションの [このノードの出力名] に対応します。

    ResourceGroupIdentifier

    String

    いいえ

    default_group

    ファイルが公開された後にタスクが送信されるリソースグループ。 ListResourceGroups 操作を呼び出して、ワークスペースで使用可能なリソースグループをクエリできます。

    ConnectionName

    String

    いいえ

    odps_source

    タスクの実行時にタスクが使用するデータソースの識別子。 ListDataSources 操作を呼び出して、利用可能なデータソースをクエリできます。

    Owner

    String

    いいえ

    18023848927592

    ファイル所有者のユーザー ID。

    AutoParsing

    Boolean

    いいえ

    true

    ファイルの自動解析を有効にするかどうかを指定します。 有効な値:

    • true: 自動解析が有効になります。

    • false: 自動解析が無効になります。

    このパラメーターは、DataWorks コンソール のデータ開発タスクの [プロパティ] タブの [依存関係] セクションの [コードから入力と出力を解析] に対応します。

    SchedulerType

    String

    いいえ

    NORMAL

    スケジューリングのタイプ。 有効な値:

    • NORMAL: タスクは通常スケジュールされたタスクです。

    • MANUAL: タスクはワンタイムタスクです。 定期的に実行されるようにはスケジュールされていません。 この値は、手動トリガーワークフロー内のノードに対応します。

    • PAUSE: タスクは一時停止されます。

    • SKIP: タスクはドライランタスクです。 定期的に実行されるようにスケジュールされていますが、スケジューリングシステムは、スケジュールされた時刻に達するとタスクを成功に設定します。

    AdvancedSettings

    String

    いいえ

    {"queue":"default","SPARK_CONF":"--conf spark.driver.memory=2g"}

    タスクの詳細設定。

    このパラメーターは、DataWorks コンソール の EMR Spark Streaming および EMR Streaming SQL タスクの編集ページの右側のナビゲーションウィンドウにある [詳細設定] に対応します。

    このパラメーターは、EMR Spark Streaming および EMR Streaming SQL タスクでのみサポートされます。 パラメーター値は JSON 形式である必要があります。

    StartImmediately

    Boolean

    いいえ

    true

    タスクが公開された直後にタスクを開始するかどうかを指定します。 有効な値:

    • true: タスクが公開された直後にタスクを開始します。

    • false: タスクが公開された直後にタスクを開始しません。

    このパラメーターは、DataWorks コンソール の EMR Spark Streaming および EMR Streaming SQL タスクの編集ページの右側のナビゲーションウィンドウの [設定] タブの [時間プロパティ] セクションの [開始モード] に対応します。

    InputParameters

    String

    いいえ

    [{"ValueSource": "project_001.first_node:bizdate_param","ParameterName": "bizdate_input"}]

    ノードのコンテキスト入力パラメーター。 値は JSON 形式である必要があります。 フィールドの詳細については、「GetFile 操作の戻り値における InputContextParameterList パラメーターの構造」をご参照ください。

    このパラメーターは、DataWorks コンソール のデータ開発タスクの [プロパティ] タブの [ノードコンテキスト] セクションの [このノードの入力パラメーター] に対応します。

    OutputParameters

    String

    いいえ

    [{"Type": 1,"Value": "${bizdate}","ParameterName": "bizdate_param"}]

    ノードのコンテキスト出力パラメーター。 値は JSON 形式である必要があります。 フィールドの詳細については、「GetFile 操作の戻り値における OutputContextParameterList パラメーターの構造」をご参照ください。

    このパラメーターは、DataWorks コンソール のデータ開発タスクの [プロパティ] タブの [ノードコンテキスト] セクションの [このノードの出力パラメーター] に対応します。

  3. データ統合タスクを送信します。

    SubmitFile 操作を呼び出して、データ統合タスクを CDN マッピングシステムの開発環境にコミットします。 タスクがコミットされると、応答で deploymentId が返されます。 deploymentId に基づいて GetDeployment 操作を呼び出して、デプロイメントパッケージの詳細を取得できます。

     public void submitFile() throws ClientException{
            SubmitFileRequest request = new SubmitFileRequest();
            request.setProjectId(78837L);
            request.setProjectIdentifier("zxy_8221431");
          // このノード ID は、ノードを作成したときに返される ID です。 これは、データベースの File テーブルの file_id に対応します。
            request.setFileId(501576542L);
            request.setComment("Comment");
            SubmitFileResponse acsResponse = client.getAcsResponse(request);
          // DeploymentId は、コミットまたは公開操作の戻り値です。
          Long deploymentId = acsResponse.getData();
            log.info(acsResponse.toString());
    }

    パラメーターの詳細については、「SubmitFile」および「GetDeployment」をご参照ください。

  4. 同期タスクを本番環境に公開できます。

    DeployFile 操作を呼び出して、データ統合タスクを本番環境に公開します。

    説明

    この操作は、標準モードのワークスペースに対してのみ実行する必要があります。

     public void deploy() throws ClientException{
            DeployFileRequest request = new DeployFileRequest();
            request.setProjectIdentifier("zxy_8221431");
            request.setFileId(501576542L);
            request.setComment("Comment");
            // NodeId または file_id のいずれかを指定します。 NodeId の値は、スケジューリング設定の基本プロパティにあるノード ID です。
            request.setNodeId(700004537241L);
            DeployFileResponse acsResponse = client.getAcsResponse(request);
             // DeploymentId は、コミットまたは公開操作の戻り値です。
            Long deploymentId = acsResponse.getData();
            log.info(acsResponse.getData().toString());
        }

    上記のコードは、一部のパラメーターを設定する方法の例を示しています。 パラメーターの詳細については、「DeployFile」をご参照ください。

  5. デプロイメントパッケージの詳細を取得します。

    タスクが公開されると、応答で deploymentId が返されます。 deploymentId に基づいて GetDeployment 操作を呼び出して、デプロイメントパッケージの詳細を取得できます。 GetDeployment 操作の応答の Status パラメーターの値が 1 の場合、タスクは正常に公開されています。

    public void getDeployment() throws ClientException{
            GetDeploymentRequest request = new GetDeploymentRequest();
            request.setProjectId(78837L);
            request.setProjectIdentifier("zxy_8221431");
          // DeploymentId は、コミットまたは公開操作の戻り値です。 GetDeployment 操作を呼び出して、このデプロイメントの詳細を取得します。
            request.setDeploymentId(2776067L);
            GetDeploymentResponse acsResponse = client.getAcsResponse(request);
            log.info(acsResponse.getData().toString());
        }

    上記のコードは、一部のパラメーターを設定する方法の例を示しています。 パラメーターの詳細については、「GetDeployment」をご参照ください。

同期タスクの設定の変更

データ統合タスクを作成した後、UpdateDISyncTask 操作を呼び出してタスクの内容を更新するか、TaskParam パラメーターを使用してタスクの専用リソースグループを更新できます。 更新後、タスクを再度コミットして公開する必要があります。 詳細については、「手順」をご参照ください。

同期タスクの削除

DeleteFile 操作を呼び出して、データ統合タスクを削除できます。

サンプルコード

import com.alibaba.fastjson.JSONObject;
import com.aliyun.dataworks_public20200518.Client;
import com.aliyun.dataworks_public20200518.models.*;
import com.aliyun.teaopenapi.models.Config;

public class createofflineTask {
    static Long createTask(String fileName) throws  Exception {
        Long projectId = 2043L;
        String taskType = "DI_OFFLINE";
        String taskContent = "{\n" +
                "    \"type\": \"job\",\n" +
                "    \"version\": \"2.0\",\n" +
                "    \"steps\": [\n" +
                "        {\n" +
                "            \"stepType\": \"mysql\",\n" +
                "            \"parameter\": {\n" +
                "                \"envType\": 0,\n" +
                "                \"datasource\": \"mysql_autotest_dev\",\n" +
                "                \"column\": [\n" +
                "                    \"id\",\n" +
                "                    \"name\"\n" +
                "                ],\n" +
                "                \"connection\": [\n" +
                "                    {\n" +
                "                        \"datasource\": \"mysql_autotest_dev\",\n" +
                "                        \"table\": [\n" +
                "                            \"user\"\n" +
                "                        ]\n" +
                "                    }\n" +
                "                ],\n" +
                "                \"where\": \"\",\n" +
                "                \"splitPk\": \"\",\n" +
                "                \"encoding\": \"UTF-8\"\n" +
                "            },\n" +
                "            \"name\": \"Reader\",\n" +
                "            \"category\": \"reader\"\n" +
                "        },\n" +
                "        {\n" +
                "            \"stepType\": \"odps\",\n" +
                "            \"parameter\": {\n" +
                "                \"partition\": \"pt=${bizdate}\",\n" +
                "                \"truncate\": true,\n" +
                "                \"datasource\": \"odps_source\",\n" +
                "                \"envType\": 0,\n" +
                "                \"column\": [\n" +
                "                    \"id\",\n" +
                "                    \"name\"\n" +
                "                ],\n" +
                "                \"emptyAsNull\": false,\n" +
                "                \"tableComment\": \"null\",\n" +
                "                \"table\": \"user\"\n" +
                "            },\n" +
                "            \"name\": \"Writer\",\n" +
                "            \"category\": \"writer\"\n" +
                "        }\n" +
                "    ],\n" +
                "    \"setting\": {\n" +
                "        \"executeMode\": null,\n" +
                "        \"errorLimit\": {\n" +
                "            \"record\": \"\"\n" +
                "        },\n" +
                "        \"speed\": {\n" +
                "            \"concurrent\": 2,\n" +
                "            \"throttle\": false\n" +
                "        }\n" +
                "    },\n" +
                "    \"order\": {\n" +
                "        \"hops\": [\n" +
                "            {\n" +
                "                \"from\": \"Reader\",\n" +
                "                \"to\": \"Writer\"\n" +
                "            }\n" +
                "        ]\n" +
                "    }\n" +
                "}";
        CreateDISyncTaskRequest request = new CreateDISyncTaskRequest();
        request.setProjectId(projectId);
        request.setTaskType(taskType);
        request.setTaskContent(taskContent);
        request.setTaskName(fileName);
        request.setTaskParam("{\"FileFolderPath\":\"Workflow/Automated_Testing_Workspace_Do_Not_Modify/Data_Integration\",\"ResourceGroup\":\"S_res_group_XXX\"}");
        // データ統合専用リソースグループを使用します。
        CreateDISyncTaskResponse response1 = client.getAcsResponse(request);
        return response1.getData().getFileId();
    }

    public static void updateFile(Long fileId) throws Exception {
        UpdateFileRequest request = new UpdateFileRequest();
        request.setProjectId(2043L);
        request.setFileId(fileId);
        request.setAutoRerunTimes(3);
        request.setRerunMode("FAILURE_ALLOWED");
        request.setCronExpress("00 30 05 * * ?");
        request.setCycleType("DAY");
        request.setResourceGroupIdentifier("S_res_group_XXX");
        // 専用スケジューリングリソースグループを使用します。
        request.setInputList("dataworks_di_autotest_root");

        request.setAutoParsing(true);
        request.setDependentNodeIdList("5,10,15,20");
        request.setDependentType("SELF");
        request.setStartEffectDate(0L);
        request.setEndEffectDate(4155787800000L);
        request.setFileDescription("description");
        request.setStop(false);
        request.setParaValue("x=a y=b z=c");
        request.setSchedulerType("NORMAL");
        request.setAutoRerunIntervalMillis(120000);
        UpdateFileResponse response1 = client.getAcsResponse(request);
    }
    static void deleteTask(Long fileId) throws Exception {
        DeleteFileRequest request = new DeleteFileRequest();
        Long projectId = 63845L;
        request.setProjectId(projectId);
        request.setFileId(fileId);


        String akId = "XXX";
        String akSecret = "XXXX";
        String regionId = "cn-hangzhou";
        IClientProfile profile = DefaultProfile.getProfile(regionId, akId, akSecret);
        DefaultProfile.addEndpoint("cn-hangzhou","dataworks-public","dataworks.cn-hangzhou.aliyuncs.com");
        IAcsClient client;

        client = new DefaultAcsClient(profile);

        DeleteFileResponse response1 = client.getAcsResponse(request);
        System.out.println(JSONObject.toJSONString(response1));
    }

    static IAcsClient client;

    public static void main(String[] args) throws Exception {
        String akId = "XX";
        // 環境変数 ALIBABA_CLOUD_ACCESS_KEY_ID と ALIBABA_CLOUD_ACCESS_KEY_SECRET が設定されていることを確認してください。https://www.alibabacloud.com/help/zh/alibaba-cloud-sdk-262060/latest/configure-credentials-378659
        IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        DefaultProfile.addEndpoint(regionId, "dataworks-public", "dataworks." + regionId + ".aliyuncs.com");

        client = new DefaultAcsClient(profile);
        String taskName = "offline_job_0930_1648";
        Long fileId = createTask(taskName); // データ統合タスクを作成します。
        updateFile(fileId);   // データ統合タスクのスケジューリングプロパティを変更します。
    }
}