為了便於CPFS智算版與OSS Bucket中的單檔案粒度持久性的資料流動,您可以通過建立流式任務實現。
方案概覽
實現某一個目錄下不同檔案的匯入匯出,只需要4步:
建立資料流動:通過建立資料流動,建立CPFS智算版檔案系統任意子目錄到OSS Bucket下任意prefix的映射。
建立流式任務:通過調用CreateDataFlowTask建立流式匯入或匯出任務,建立源端目錄到目的端的通道。建立成功後,流式任務的狀態一直保持為運行中,但實際不會流動資料,還需要為其建立流式子任務。
建立流式任務的子任務:接著通過調用CreateDataFlowSubTask提交不同檔案的匯入或匯出子任務。
查詢流式子任務狀態:最後通過調用DescribeDataFlowSubTask查詢已提交的子任務進度與狀態。當調用結果中Status值為COMPLETE;Progress值為10000時,則表示來源資料已全部匯入或匯出至目標目錄。
前提條件
已建立CPFS智算版檔案系統。具體操作,請參見建立檔案系統。
已為目標OSS Bucket設定標籤(key: cpfs-dataflow, value: true),如下圖示,且在資料流動的使用過程中,不能刪除和修改該標籤,否則CPFS智算版資料流動無法訪問Bucket的資料。具體操作,請參見管理儲存空間標籤。

為了防止多個資料流動向同一個OSS Bucket匯出資料時產生資料衝突,需要該OSS Bucket開啟版本控制。更多資訊,請參見版本控制。
使用資料流動流式任務,CPFS智算版檔案系統的版本號碼必須為2.6.0及以上版本。關於如何查看檔案系統的版本號碼,請參見查詢檔案系統版本號碼。
流式匯入任務
本樣本以將OSS Bucket(examplebucket)中/bmcpfs/test/file.xml下的子目錄(/test/file)資料移轉至CPFS智算版檔案系統(bmcpfs-370jz26fkr2st9****)中/oss/mnt/file.xml下的子目錄(/mnt/file)為例,介紹如何建立流式匯入任務和流式匯入子任務,實現單檔案粒度持久性的資料流動。
建立資料流動。
您可以通過調用API或控制台為目標檔案系統建立資料流動,並擷取資料流動ID(例如,df-37bae1804cc6****)。
通過調用CreateDataFlow API建立資料流動。
{ "FileSystemId": "bmcpfs-370jz26fkr2st9****", //CPFS智算版檔案系統ID。 "SourceStorage": "oss://examplebucket", //源端OSS Bucket的訪問地址。 "FileSystemPath": "/oss/", //指定連結到OSS的CPFS智算版檔案系統目錄,且必須是已有目錄。 "SourceStoragePath": "/bmcpfs/", //源端儲存Bucket內的訪問路徑。 }預期輸出:
{ "RequestId": "473469C7-AA6F-4DC5-B3DB-A3DC0D****3E", "DataFlowId": "df-37bae1804cc6****" }通過控制台建立資料流動。具體操作,請參見管理資料流動。
建立資料流動流式匯入任務。
通過調用CreateDataFlowTask API建立資料流動流式匯入任務,並儲存
TaskId傳回值。{ "FileSystemId": "bmcpfs-370jz26fkr2st9****", //智算CPFS檔案系統ID。 "DataFlowId": "df-37bae1804cc6****", //資料流動ID。 "TaskAction": "StreamImport", // 資料流動流式任務類型,匯入StreamImport,匯出StreamExport。 "DataType": "MetaAndData", // 資料類型,目前僅支援MetaAndData。 "Directory": "/test/", // 同步源相對目錄。此情境為OSS Bucket的Bucket Prefix。 "DstDirectory": "/mnt/", // 同步目標相對目錄。此情境為CPFS智算版檔案系統的目錄。 "ConflictPolicy": "SKIP_THE_FILE" // 同名檔案衝突策略。OVERWRITE_EXISTING:強制覆蓋同名檔案;SKIP_THE_FILE:跳過同名檔案;KEEP_LATEST:比較更新時間,保留最新版本。 }預期輸出:
{ "RequestId": "2D69A58F-345C-4FDE-88E4-BF518948F518", "TaskId": "task-376a61ab2d80****" }建立流式匯入任務的子任務。
通過調用CreateDataFlowSubTask API提交流式任務的匯入子任務。
{ "FileSystemId": "bmcpfs-370jz26fkr2st9****", //智算CPFS檔案系統ID。 "DataFlowId": "df-37bae1804cc****", //資料流動ID。 "DataFlowTaskId": "task-376a61ab2d80****", //流式匯入任務ID。 "SrcFilePath": "/file.xml", // 流式任務中來源目錄下的某個檔案路徑。此情境為OSS Bucket的Bucket物件路徑。 "DstFilePath": "/mnt/file.xml" // 流式任務中目標目錄下的某個檔案路徑。此情境為CPFS智算版檔案系統的目錄。 }預期輸出:
{ "RequestId": "A70BEE5D-76D3-49FB-B58F-1F398211A5C3", "DataFlowSubTaskId": "subTaskId-370kyfmyknxcyzw****" }查詢流式子任務的執行進度和任務狀態。
通過調用DescribeDataFlowSubTasks API查詢已提交的子任務執行進度與任務狀態。不同的Key值對應不同的Value值。更多資訊,請參見DescribeDataFlowSubTasks。
本樣本通過篩選DataFlowIds(資料流動ID)方式查詢子任務的資訊。
{ "FileSystemId": "bmcpfs-370jz26fkr2st9****", //CPFS智算版檔案系統ID。 "Filters": [ { "Key": "DataFlowIds", "Value": "df-37bae1804cc****" } ] }預期輸出:
{ "RequestId": "98696EF0-1607-4E9D-B01D-F20930B6****", "DataFlowSubTask": { "DataFlowSubTask": [ { "FileSystemId": "bmcpfs-370jz26fkr2st9****", //檔案系統ID。 "DataFlowId": "df-37bae1804cc****", //資料流動ID。 "DataFlowTaskId": "task-37b705830bcb****", //資料流動流式任務ID。 "DataFlowSubTaskId": "subTaskId-370kyfmyknxcyzw****",//資料流動流式子任務ID。 "SrcFilePath": "/bmcpfs/test/file.xml",//源檔案路徑。 "DstFilePath": "/oss/mnt/file.xml", //目標檔案路徑。 "Status": "COMPLETE", "Progress": 10000, "CreateTime": "2024-10-23 16:28:16", "StartTime": "2024-10-23 16:28:17", "EndTime": "2024-10-23 16:29:22", "ErrorMsg": "",//未返回或者返回為空白時,表示沒有錯誤資訊。 "ProgressStats": { "BytesTotal": 68, "BytesDone": 68, "ActualBytes": 68, "AverageSpeed": 34 }, "FileDetail": { "ModifyTime": 1725897600000000000, "Size": 68, "Checksum": "crc64:850309505450944****"//檔案校正碼。 } } ] } }調用結果中的Progress和Status參數值即為子任務的執行進度和任務狀態資訊。當任務狀態Status值為COMPLETE時,表示任務已完成;當Progress值為10000時,表示資料已全部匯入或匯出至目標目錄。
流式匯出任務
本樣本以將CPFS智算版檔案系統(bmcpfs-370jz26fkr2st9****)中/oss_test/yaml/test/file.png資料移轉至OSS Bucket(examplebucket)/bmcpfs_test/dataflows/mnt/file.png為例,介紹如何建立流式匯出任務和流式匯出子任務,實現單檔案粒度持久性的資料流動。
建立資料流動。
您可以通過調用API或控制台為目標檔案系統建立資料流動,並擷取資料流動ID(例如,df-37bae1804cc6****)。
通過調用CreateDataFlow API建立資料流動。
{ "FileSystemId": "bmcpfs-370jz26fkr2st9****", //CPFS智算版檔案系統ID。 "SourceStorage": "oss://examplebucket", //源端OSS Bucket的訪問地址。 "FileSystemPath": "/oss/", //指定連結到OSS的CPFS智算版檔案系統目錄,且必須是已有目錄。 "SourceStoragePath": "/bmcpfs/", //源端儲存Bucket內的訪問路徑。 }預期輸出:
{ "RequestId": "473469C7-AA6F-4DC5-B3DB-A3DC0D****3E", "DataFlowId": "df-37bae1804cc6****" }通過控制台建立資料流動。具體操作,請參見管理資料流動。
建立資料流動流式匯出任務。
通過調用CreateDataFlowTask API建立資料流動流式匯出任務,並儲存
TaskId傳回值。{ "FileSystemId": "bmcpfs-370jz26fkr2st9****", //智算CPFS檔案系統ID。 "DataFlowId": "df-37bae1804cc6****", //資料流動ID。 "TaskAction": "StreamImport", // 資料流動流式任務類型,此情境為匯出StreamExport。 "DataType": "MetaAndData", // 資料類型,目前僅支援MetaAndData。 "Directory": "/yaml/", // 同步源相對目錄。流式匯出情境時為CPFS智算版檔案系統CPFS目錄的相對路徑。 "DstDirectory": "/dataflows/", // 同步目標相對目錄。流式匯出情境為OSS Bucket的Bucket Prefix的相對路徑。 "ConflictPolicy": "SKIP_THE_FILE" // 同名檔案衝突策略。OVERWRITE_EXISTING:強制覆蓋同名檔案;SKIP_THE_FILE:跳過同名檔案;KEEP_LATEST:比較更新時間,保留最新版本。 }預期輸出:
{ "RequestId": "BC7C825C-5F65-4B56-BEF6-98C56C7C930B", "TaskId": "task-37b705830bcb****" }建立流式匯出任務的子任務。
通過調用CreateDataFlowSubTask API提交流式匯出任務的子任務。
{ "FileSystemId": "bmcpfs-370jz26fkr2st9****", //智算CPFS檔案系統ID。 "DataFlowId": "df-37bae1804cc****", //資料流動ID。 "DataFlowTaskId": "task-37b705830bcb****", //流式匯出任務ID。 "SrcFilePath": "/test/file.png", // 流式任務中Directory目錄下的某個檔案的相對路徑。 "DstFilePath": "/mnt/file.png" // 流式任務中DstDirectory目錄下的某個檔案的相對路徑。 }預期輸出:
{ "RequestId": "A70BEE5D-76D3-49FB-B58F-1F398211A5C3", "DataFlowSubTaskId": "subTaskId-370l4l3x6qsb1z1****" }查詢流式匯出子任務的執行進度和任務狀態。
通過調用DescribeDataFlowSubTasks API查詢已提交的子任務執行進度與任務狀態。不同的Key值對應不同的Value值。更多資訊,請參見DescribeDataFlowSubTasks。
本樣本通過篩選DataFlowIds(資料流動ID)方式查詢子任務的資訊。
{ "FileSystemId": "bmcpfs-370jz26fkr2st9****", //CPFS智算版檔案系統ID。 "Filters": [ { "Key": "DataFlowIds", "Value": "df-37bae1804cc****" } ] }預期輸出:
{ "RequestId": "FCBB356-96CA-135B-84B3-02E6F262B6BD", "DataFlowSubTask": { "DataFlowSubTask": [ { "FileSystemId": "bmcpfs-370jz26fkr2st9****", //檔案系統ID。 "DataFlowId": "df-37bae1804cc****", //資料流動ID。 "DataFlowTaskId": "task-37b705830bcb****", //資料流動流式任務ID。 "DataFlowSubTaskId": "subTaskId-370l4l3x6qsb1z1****",//資料流動流式子任務ID。 "SrcFilePath": "/oss_test/yaml/test/file.png",//源檔案路徑。 "DstFilePath": "/bmcpfs_test/dataflows/mnt/file.png", //目標檔案路徑。 "Status": "COMPLETE", "Progress": 10000, "CreateTime": "2024-10-23 17:18:16", "StartTime": "2024-10-23 17:18:17", "EndTime": "2024-10-23 17:19:00", "ErrorMsg": "",//未返回或者返回為空白時,表示沒有錯誤資訊。 "ProgressStats": { "BytesTotal": 68, "BytesDone": 68, "ActualBytes": 68, "AverageSpeed": 34 }, "FileDetail": { "ModifyTime": 1725897600000000000, "Size": 68, "Checksum": "crc64:850309505450944****"//檔案校正碼。 } } ] } }調用結果中的Progress和Status參數值即為子任務的執行進度和任務狀態資訊。當任務狀態Status值為COMPLETE時,表示任務已完成;當Progress值為10000時,表示資料已全部匯入或匯出至目標目錄。
相關操作
如果您需要取消流式子任務,可以通過調用CancelDataFlowSubTask API實現。僅支援取消CREATED和RUNNING狀態的流式子任務。
{
"FileSystemId": "bmcpfs-370jz26fkr2st9****", //CPFS智算版檔案系統ID。
"DataFlowId": "df-37bae1804cc****", //資料流動ID。
"DataFlowTaskId": "task-37b705830bcb****", //流式匯入或匯出任務ID。
"DataFlowSubTaskId": "subTaskId-370kyfmyknxcyzw****" // 資料流動流式子任務ID。
}