全部產品
Search
文件中心

Cloud Parallel File Storage:資料流動流式任務最佳實務

更新時間:Dec 26, 2025

為了便於CPFS智算版與OSS Bucket中的單檔案粒度持久性的資料流動,您可以通過建立流式任務實現。

方案概覽

實現某一個目錄下不同檔案的匯入匯出,只需要4步:

  1. 建立資料流動:通過建立資料流動,建立CPFS智算版檔案系統任意子目錄到OSS Bucket下任意prefix的映射。

  2. 建立流式任務:通過調用CreateDataFlowTask建立流式匯入或匯出任務,建立源端目錄到目的端的通道。建立成功後,流式任務的狀態一直保持為運行中,但實際不會流動資料,還需要為其建立流式子任務。

  3. 建立流式任務的子任務:接著通過調用CreateDataFlowSubTask提交不同檔案的匯入或匯出子任務。

  4. 查詢流式子任務狀態:最後通過調用DescribeDataFlowSubTask查詢已提交的子任務進度與狀態。當調用結果中Status值為COMPLETEProgress值為10000時,則表示來源資料已全部匯入或匯出至目標目錄。

前提條件

  • 已建立CPFS智算版檔案系統。具體操作,請參見建立檔案系統

  • 已為目標OSS Bucket設定標籤(key: cpfs-dataflow, value: true),如下圖示,且在資料流動的使用過程中,不能刪除和修改該標籤,否則CPFS智算版資料流動無法訪問Bucket的資料。具體操作,請參見管理儲存空間標籤

    image

  • 為了防止多個資料流動向同一個OSS Bucket匯出資料時產生資料衝突,需要該OSS Bucket開啟版本控制。更多資訊,請參見版本控制

重要

使用資料流動流式任務,CPFS智算版檔案系統的版本號碼必須為2.6.0及以上版本。關於如何查看檔案系統的版本號碼,請參見查詢檔案系統版本號碼

流式匯入任務

本樣本以將OSS Bucket(examplebucket)中/bmcpfs/test/file.xml下的子目錄(/test/file)資料移轉至CPFS智算版檔案系統(bmcpfs-370jz26fkr2st9****)中/oss/mnt/file.xml下的子目錄(/mnt/file)為例,介紹如何建立流式匯入任務和流式匯入子任務,實現單檔案粒度持久性的資料流動。

  1. 建立資料流動。

    您可以通過調用API或控制台為目標檔案系統建立資料流動,並擷取資料流動ID(例如,df-37bae1804cc6****)。

    • 通過調用CreateDataFlow API建立資料流動。

      {
        "FileSystemId": "bmcpfs-370jz26fkr2st9****", //CPFS智算版檔案系統ID。
        "SourceStorage": "oss://examplebucket", //源端OSS Bucket的訪問地址。
        "FileSystemPath": "/oss/",  //指定連結到OSS的CPFS智算版檔案系統目錄,且必須是已有目錄。
        "SourceStoragePath": "/bmcpfs/", //源端儲存Bucket內的訪問路徑。
      }

      預期輸出:

      {
        "RequestId": "473469C7-AA6F-4DC5-B3DB-A3DC0D****3E",
        "DataFlowId": "df-37bae1804cc6****"
      }
    • 通過控制台建立資料流動。具體操作,請參見管理資料流動

  2. 建立資料流動流式匯入任務。

    通過調用CreateDataFlowTask API建立資料流動流式匯入任務,並儲存TaskId傳回值。

    {
      "FileSystemId": "bmcpfs-370jz26fkr2st9****", //智算CPFS檔案系統ID。
      "DataFlowId": "df-37bae1804cc6****", //資料流動ID。
      "TaskAction": "StreamImport", // 資料流動流式任務類型,匯入StreamImport,匯出StreamExport。
      "DataType": "MetaAndData", // 資料類型,目前僅支援MetaAndData。
      "Directory": "/test/", // 同步源相對目錄。此情境為OSS Bucket的Bucket Prefix。
      "DstDirectory": "/mnt/", // 同步目標相對目錄。此情境為CPFS智算版檔案系統的目錄。
      "ConflictPolicy": "SKIP_THE_FILE" // 同名檔案衝突策略。OVERWRITE_EXISTING:強制覆蓋同名檔案;SKIP_THE_FILE:跳過同名檔案;KEEP_LATEST:比較更新時間,保留最新版本。
    }

    預期輸出:

    {
      "RequestId": "2D69A58F-345C-4FDE-88E4-BF518948F518",
      "TaskId": "task-376a61ab2d80****"
    }
  3. 建立流式匯入任務的子任務。

    通過調用CreateDataFlowSubTask API提交流式任務的匯入子任務。

    {
      "FileSystemId": "bmcpfs-370jz26fkr2st9****", //智算CPFS檔案系統ID。
      "DataFlowId": "df-37bae1804cc****", //資料流動ID。
      "DataFlowTaskId": "task-376a61ab2d80****", //流式匯入任務ID。
      "SrcFilePath": "/file.xml", // 流式任務中來源目錄下的某個檔案路徑。此情境為OSS Bucket的Bucket物件路徑。
      "DstFilePath": "/mnt/file.xml" // 流式任務中目標目錄下的某個檔案路徑。此情境為CPFS智算版檔案系統的目錄。
    }

    預期輸出:

    {
      "RequestId": "A70BEE5D-76D3-49FB-B58F-1F398211A5C3",
      "DataFlowSubTaskId": "subTaskId-370kyfmyknxcyzw****"
    }
  4. 查詢流式子任務的執行進度和任務狀態。

    通過調用DescribeDataFlowSubTasks API查詢已提交的子任務執行進度與任務狀態。不同的Key值對應不同的Value值。更多資訊,請參見DescribeDataFlowSubTasks

    本樣本通過篩選DataFlowIds(資料流動ID)方式查詢子任務的資訊。

    {
      "FileSystemId": "bmcpfs-370jz26fkr2st9****", //CPFS智算版檔案系統ID。
      "Filters": [
        {
          "Key": "DataFlowIds",
          "Value": "df-37bae1804cc****"
        }
      ]
    }

    預期輸出:

    {
      "RequestId": "98696EF0-1607-4E9D-B01D-F20930B6****",
      "DataFlowSubTask": {
        "DataFlowSubTask": [
          {
            "FileSystemId": "bmcpfs-370jz26fkr2st9****", //檔案系統ID。
            "DataFlowId": "df-37bae1804cc****", //資料流動ID。
            "DataFlowTaskId": "task-37b705830bcb****", //資料流動流式任務ID。
            "DataFlowSubTaskId": "subTaskId-370kyfmyknxcyzw****",//資料流動流式子任務ID。
            "SrcFilePath": "/bmcpfs/test/file.xml",//源檔案路徑。
            "DstFilePath": "/oss/mnt/file.xml", //目標檔案路徑。
            "Status": "COMPLETE",
            "Progress": 10000,
            "CreateTime": "2024-10-23 16:28:16",
            "StartTime": "2024-10-23 16:28:17",
            "EndTime": "2024-10-23 16:29:22",
            "ErrorMsg": "",//未返回或者返回為空白時,表示沒有錯誤資訊。
            "ProgressStats": {
              "BytesTotal": 68,
              "BytesDone": 68,
              "ActualBytes": 68,
              "AverageSpeed": 34
            },
            "FileDetail": {
              "ModifyTime": 1725897600000000000,
              "Size": 68,
              "Checksum": "crc64:850309505450944****"//檔案校正碼。
            }
          }
        ]
      }
    }

    調用結果中的ProgressStatus參數值即為子任務的執行進度和任務狀態資訊。當任務狀態Status值為COMPLETE時,表示任務已完成;當Progress值為10000時,表示資料已全部匯入或匯出至目標目錄。

流式匯出任務

本樣本以將CPFS智算版檔案系統(bmcpfs-370jz26fkr2st9****)中/oss_test/yaml/test/file.png資料移轉至OSS Bucket(examplebucket)/bmcpfs_test/dataflows/mnt/file.png為例,介紹如何建立流式匯出任務和流式匯出子任務,實現單檔案粒度持久性的資料流動。

  1. 建立資料流動。

    您可以通過調用API或控制台為目標檔案系統建立資料流動,並擷取資料流動ID(例如,df-37bae1804cc6****)。

    • 通過調用CreateDataFlow API建立資料流動。

      {
        "FileSystemId": "bmcpfs-370jz26fkr2st9****", //CPFS智算版檔案系統ID。
        "SourceStorage": "oss://examplebucket", //源端OSS Bucket的訪問地址。
        "FileSystemPath": "/oss/",  //指定連結到OSS的CPFS智算版檔案系統目錄,且必須是已有目錄。
        "SourceStoragePath": "/bmcpfs/", //源端儲存Bucket內的訪問路徑。
      }

      預期輸出:

      {
        "RequestId": "473469C7-AA6F-4DC5-B3DB-A3DC0D****3E",
        "DataFlowId": "df-37bae1804cc6****"
      }
    • 通過控制台建立資料流動。具體操作,請參見管理資料流動

  2. 建立資料流動流式匯出任務。

    通過調用CreateDataFlowTask API建立資料流動流式匯出任務,並儲存TaskId傳回值。

    {
      "FileSystemId": "bmcpfs-370jz26fkr2st9****", //智算CPFS檔案系統ID。
      "DataFlowId": "df-37bae1804cc6****", //資料流動ID。
      "TaskAction": "StreamImport", // 資料流動流式任務類型,此情境為匯出StreamExport。
      "DataType": "MetaAndData", // 資料類型,目前僅支援MetaAndData。
      "Directory": "/yaml/", // 同步源相對目錄。流式匯出情境時為CPFS智算版檔案系統CPFS目錄的相對路徑。
      "DstDirectory": "/dataflows/", // 同步目標相對目錄。流式匯出情境為OSS Bucket的Bucket Prefix的相對路徑。
      "ConflictPolicy": "SKIP_THE_FILE" // 同名檔案衝突策略。OVERWRITE_EXISTING:強制覆蓋同名檔案;SKIP_THE_FILE:跳過同名檔案;KEEP_LATEST:比較更新時間,保留最新版本。
    }

    預期輸出:

    {
      "RequestId": "BC7C825C-5F65-4B56-BEF6-98C56C7C930B",
      "TaskId": "task-37b705830bcb****"
    }
  3. 建立流式匯出任務的子任務。

    通過調用CreateDataFlowSubTask API提交流式匯出任務的子任務。

    {
      "FileSystemId": "bmcpfs-370jz26fkr2st9****", //智算CPFS檔案系統ID。
      "DataFlowId": "df-37bae1804cc****", //資料流動ID。
      "DataFlowTaskId": "task-37b705830bcb****", //流式匯出任務ID。
      "SrcFilePath": "/test/file.png", // 流式任務中Directory目錄下的某個檔案的相對路徑。
      "DstFilePath": "/mnt/file.png" // 流式任務中DstDirectory目錄下的某個檔案的相對路徑。
    }

    預期輸出:

    {
      "RequestId": "A70BEE5D-76D3-49FB-B58F-1F398211A5C3",
      "DataFlowSubTaskId": "subTaskId-370l4l3x6qsb1z1****"
    }
  4. 查詢流式匯出子任務的執行進度和任務狀態。

    通過調用DescribeDataFlowSubTasks API查詢已提交的子任務執行進度與任務狀態。不同的Key值對應不同的Value值。更多資訊,請參見DescribeDataFlowSubTasks

    本樣本通過篩選DataFlowIds(資料流動ID)方式查詢子任務的資訊。

    {
      "FileSystemId": "bmcpfs-370jz26fkr2st9****", //CPFS智算版檔案系統ID。
      "Filters": [
        {
          "Key": "DataFlowIds",
          "Value": "df-37bae1804cc****"
        }
      ]
    }

    預期輸出:

    {
      "RequestId": "FCBB356-96CA-135B-84B3-02E6F262B6BD",
      "DataFlowSubTask": {
        "DataFlowSubTask": [
          {
            "FileSystemId": "bmcpfs-370jz26fkr2st9****", //檔案系統ID。
            "DataFlowId": "df-37bae1804cc****", //資料流動ID。
            "DataFlowTaskId": "task-37b705830bcb****", //資料流動流式任務ID。
            "DataFlowSubTaskId": "subTaskId-370l4l3x6qsb1z1****",//資料流動流式子任務ID。
            "SrcFilePath": "/oss_test/yaml/test/file.png",//源檔案路徑。
            "DstFilePath": "/bmcpfs_test/dataflows/mnt/file.png", //目標檔案路徑。
            "Status": "COMPLETE",
            "Progress": 10000,
            "CreateTime": "2024-10-23 17:18:16",
            "StartTime": "2024-10-23 17:18:17",
            "EndTime": "2024-10-23 17:19:00",
            "ErrorMsg": "",//未返回或者返回為空白時,表示沒有錯誤資訊。
            "ProgressStats": {
              "BytesTotal": 68,
              "BytesDone": 68,
              "ActualBytes": 68,
              "AverageSpeed": 34
            },
            "FileDetail": {
              "ModifyTime": 1725897600000000000,
              "Size": 68,
              "Checksum": "crc64:850309505450944****"//檔案校正碼。
            }
          }
        ]
      }
    }

    調用結果中的ProgressStatus參數值即為子任務的執行進度和任務狀態資訊。當任務狀態Status值為COMPLETE時,表示任務已完成;當Progress值為10000時,表示資料已全部匯入或匯出至目標目錄。

相關操作

如果您需要取消流式子任務,可以通過調用CancelDataFlowSubTask API實現。僅支援取消CREATEDRUNNING狀態的流式子任務。

{
  "FileSystemId": "bmcpfs-370jz26fkr2st9****", //CPFS智算版檔案系統ID。
  "DataFlowId": "df-37bae1804cc****", //資料流動ID。
  "DataFlowTaskId": "task-37b705830bcb****", //流式匯入或匯出任務ID。
  "DataFlowSubTaskId": "subTaskId-370kyfmyknxcyzw****" // 資料流動流式子任務ID。
}