ストリーミングタスクを作成して、Cloud Parallel File Storage (CPFS) for LingjunファイルシステムとObject Storage Service (OSS)バケットの間でファイルを1つずつ継続的に移行できます。
概要
ディレクトリにファイルをインポートおよびエクスポートするには、次の手順を実行する必要があります。
データフローを作成する: CPFS for LingjunファイルシステムのサブディレクトリをOSSバケットのプレフィックスにマッピングするデータフローを作成できます。
ストリーミングタスクを作成する: CreateDataFlowTaskオペレーションを呼び出して、ストリーミングインポートまたはエクスポートタスクを作成できます。このタスクは、ソースディレクトリとデスティネーションディレクトリの間にトンネルを作成します。ストリーミングタスクが作成されると、タスクは実行中状態のままになります。ただし、トンネルを介してデータは転送されません。データを送信するには、ストリーミングサブタスクを作成する必要があります。
ストリーミングサブタスクを作成する: CreateDataFlowSubTaskオペレーションを呼び出して、異なるファイルごとにインポートまたはエクスポートサブタスクを作成できます。
ストリーミングサブタスクのステータスを照会する: DescribeDataFlowSubTaskオペレーションを呼び出して、送信済みのストリーミングサブタスクの進捗状況とステータスを照会できます。返された結果で、Statusパラメータの値がCOMPLETEで、Progressパラメータの値が10000の場合、すべてのソースデータが指定されたデスティネーションディレクトリにインポートまたはエクスポートされます。
前提条件
CPFS for Lingjunファイルシステムが作成されていること。詳細については、ファイルシステムの作成を参照してください。
キーがcpfs-dataflow、値がtrueのタグがデスティネーションOSSバケットに追加されていること。データフローの使用中は、タグを削除または変更しないでください。そうしないと、CPFS for LingjunデータフローはOSSバケット内のデータにアクセスできません。詳細については、バケットタグの管理を参照してください。
複数のデータフローを使用してOSSバケットにデータをエクスポートする必要がある場合は、OSSバケットでバージョン管理機能が有効になっていること。これにより、データの競合を防ぎます。詳細については、概要を参照してください。
CPFS for Lingjun V2.6.0以降のファイルシステムのみがストリーミングデータフロータスクをサポートしています。ファイルシステムのバージョンを表示する方法の詳細については、「ファイルシステムの詳細を表示する」トピックのファイルシステムのバージョン番号を表示するセクションを参照してください。
ストリーミングインポートタスクの作成
この例では、ストリーミングインポートタスクとストリーミングインポートサブタスクを作成してfile.xmlファイルを移行する方法について説明します。examplebucketという名前のOSSバケットの/bmcpfs/test/ディレクトリからbmcpfs-370jz26fkr2st9****という名前のCPFS for Lingjunファイルシステムの/oss/mnt/ディレクトリに移行します。この例を参考にして、ファイルを1つずつ継続的に移行できます。
データフローを作成します。
APIオペレーションを呼び出すか、コンソールで操作を実行することにより、デスティネーションファイルシステムのデータフローを作成できます。データフローが作成されたら、df-37bae1804cc6****などのデータフローIDを取得します。
CreateDataFlowオペレーションを呼び出してデータフローを作成します。
{ "FileSystemId": "bmcpfs-370jz26fkr2st9****", // CPFS for LingjunファイルシステムのID。 "SourceStorage": "oss://examplebucket", // ソースOSSバケットのエンドポイント。 "FileSystemPath": "/oss/", // OSSにリンクされているCPFS for Lingjunファイルシステム内のディレクトリ。ディレクトリは既に存在している必要があります。 "SourceStoragePath": "/bmcpfs/", // ソースOSSバケット内のオブジェクトのパス。 }期待される出力:
{ "RequestId": "473469C7-AA6F-4DC5-B3DB-A3DC0D****3E", "DataFlowId": "df-37bae1804cc6****" }コンソールでデータフローを作成します。詳細については、データフローの管理を参照してください。
ストリーミングデータフローインポートタスクを作成します。
CreateDataFlowTaskオペレーションを呼び出してストリーミングデータフローインポートタスクを作成し、
TaskIdパラメータに返された値を保存します。{ "FileSystemId": "bmcpfs-370jz26fkr2st9****", // CPFS for LingjunファイルシステムのID。 "DataFlowId": "df-37bae1804cc6****", // データフローのID。 "TaskAction": "StreamImport", // ストリーミングデータフロータスクのタイプ。インポートタスクの場合はStreamImport、エクスポートタスクの場合はStreamExportです。 "DataType": "MetaAndData", // データ型。値をMetaAndDataに設定します。 "Directory": "/test/", // 移行するファイルが存在するディレクトリの相対パス。この例では、OSSバケットのプレフィックスが使用されます。 "DstDirectory": "/mnt/", // 移行されたファイルが存在するディレクトリの相対パス。この例では、CPFS for Lingjunファイルシステム内のディレクトリが使用されます。 "ConflictPolicy": "SKIP_THE_FILE" // 同じ名前のファイルの競合解決ポリシー。有効な値: OVERWRITE_EXISTING: 同じ名前のファイルを強制的に上書きします。 SKIP_THE_FILE: 同じ名前のファイルをスキップします。 KEEP_LATEST: 同じ名前のファイルの最新バージョンを保持します。 }期待される出力:
{ "RequestId": "2D69A58F-345C-4FDE-88E4-BF518948F518", "TaskId": "task-376a61ab2d80****" }ストリーミングインポートサブタスクを作成します。
CreateDataFlowSubTaskオペレーションを呼び出して、ストリーミングインポートサブタスクを作成します。
{ "FileSystemId": "bmcpfs-370jz26fkr2st9****", // CPFS for LingjunファイルシステムのID。 "DataFlowId": "df-37bae1804cc****", // データフローのID。 "DataFlowTaskId": "task-376a61ab2d80****", // ストリーミングインポートタスクのID。 "SrcFilePath": "/file.xml", // ストリーミングタスクで移行するファイルのパス。この例では、OSSバケット内のオブジェクトのパスが使用されます。 "DstFilePath": "/mnt/file.xml" // ストリーミングタスクで移行されたファイルのパス。この例では、CPFS for Lingjunファイルシステム内のディレクトリが使用されます。 }期待される出力:
{ "RequestId": "A70BEE5D-76D3-49FB-B58F-1F398211A5C3", "DataFlowSubTaskId": "subTaskId-370kyfmyknxcyzw****" }ストリーミングサブタスクの進捗状況とステータスを照会します。
DescribeDataFlowSubTasksオペレーションを呼び出して、送信済みのストリーミングサブタスクの進捗状況とステータスを照会します。異なるキーは異なる値に対応します。詳細については、DescribeDataFlowSubTasksを参照してください。
この例では、データフローIDでサブタスクに関する情報を照会する方法について説明します。
{ "FileSystemId": "bmcpfs-370jz26fkr2st9****", // CPFS for LingjunファイルシステムのID。 "Filters": [ { "Key": "DataFlowIds", "Value": "df-37bae1804cc****" } ] }期待される出力:
{ "RequestId": "98696EF0-1607-4E9D-B01D-F20930B6****", "DataFlowSubTask": { "DataFlowSubTask": [ { "FileSystemId": "bmcpfs-370jz26fkr2st9****", // CPFS for LingjunファイルシステムのID。 "DataFlowId": "df-37bae1804cc****", // データフローのID。 "DataFlowTaskId": "task-37b705830bcb****", // ストリーミングデータフロータスクのID。 "DataFlowSubTaskId": "subTaskId-370kyfmyknxcyzw****",// ストリーミングデータフローサブタスクのID。 "SrcFilePath": "/bmcpfs/test/file.xml",// 移行するファイルのパス。 "DstFilePath": "/oss/mnt/file.xml", // 移行されたファイルのパス。 "Status": "COMPLETE", "Progress": 10000, "CreateTime": "2024-10-23 16:28:16", "StartTime": "2024-10-23 16:28:17", "EndTime": "2024-10-23 16:29:22", "ErrorMsg": "",// このパラメータが返されないか、返された結果が空の場合は、エラーは発生していません。 "ProgressStats": { "BytesTotal": 68, "BytesDone": 68, "ActualBytes": 68, "AverageSpeed": 34 }, "FileDetail": { "ModifyTime": 1725897600000000000, "Size": 68, "Checksum": "crc64:850309505450944****"// ファイルチェックサム。 } } ] } }ProgressパラメータとStatusパラメータの値は、ストリーミングサブタスクの進捗状況とステータスを示します。Statusパラメータの値がCOMPLETEの場合、サブタスクは完了です。Progressパラメータの値が10000の場合、すべてのデータがデスティネーションディレクトリにインポートまたはエクスポートされます。
ストリーミングエクスポートタスクの作成
この例では、ストリーミングエクスポートタスクとストリーミングエクスポートサブタスクを作成してfile.pngファイルを移行する方法について説明します。bmcpfs-370jz26fkr2st9****という名前のCPFS for Lingjunファイルシステムの/oss_test/yaml/test/ディレクトリからexamplebucketという名前のOSSバケットの/bmcpfs_test/dataflows/mnt/ディレクトリに移行します。この例を参考にして、ファイルを1つずつ継続的に移行できます。
データフローを作成します。
APIオペレーションを呼び出すか、コンソールで操作を実行することにより、デスティネーションファイルシステムのデータフローを作成できます。データフローが作成されたら、df-37bae1804cc6****などのデータフローIDを取得します。
CreateDataFlowオペレーションを呼び出してデータフローを作成します。
{ "FileSystemId": "bmcpfs-370jz26fkr2st9****", // CPFS for LingjunファイルシステムのID。 "SourceStorage": "oss://examplebucket", // ソースOSSバケットのエンドポイント。 "FileSystemPath": "/oss/", // OSSにリンクされているCPFS for Lingjunファイルシステム内のディレクトリ。ディレクトリは既に存在している必要があります。 "SourceStoragePath": "/bmcpfs/", // ソースOSSバケット内のオブジェクトのパス。 }期待される出力:
{ "RequestId": "473469C7-AA6F-4DC5-B3DB-A3DC0D****3E", "DataFlowId": "df-37bae1804cc6****" }コンソールでデータフローを作成します。詳細については、データフローの管理を参照してください。
ストリーミングデータフローエクスポートタスクを作成します。
CreateDataFlowTaskオペレーションを呼び出してストリーミングデータフローエクスポートタスクを作成し、
TaskIdパラメータに返された値を保存します。{ "FileSystemId": "bmcpfs-370jz26fkr2st9****", // CPFS for LingjunファイルシステムのID。 "DataFlowId": "df-37bae1804cc6****", // データフローのID。 "TaskAction": "StreamImport", // ストリーミングデータフロータスクのタイプ。エクスポートタスクの場合はStreamExportです。 "DataType": "MetaAndData", // データ型。値をMetaAndDataに設定します。 "Directory": "/yaml/", // 移行するファイルが存在するディレクトリの相対パス。この例では、CPFS for Lingjunファイルシステム内のディレクトリの相対パスが使用されます。 "DstDirectory": "/dataflows/", // 移行されたファイルが存在するディレクトリの相対パス。この例では、OSSバケットプレフィックスの相対パスが使用されます。 "ConflictPolicy": "SKIP_THE_FILE" // 同じ名前のファイルの競合解決ポリシー。有効な値: OVERWRITE_EXISTING: 同じ名前のファイルを強制的に上書きします。 SKIP_THE_FILE: 同じ名前のファイルをスキップします。 KEEP_LATEST: 同じ名前のファイルの最新バージョンを保持します。 }期待される出力:
{ "RequestId": "BC7C825C-5F65-4B56-BEF6-98C56C7C930B", "TaskId": "task-37b705830bcb****" }ストリーミングエクスポートサブタスクを作成します。
CreateDataFlowSubTaskオペレーションを呼び出して、ストリーミングエクスポートサブタスクを作成します。
{ "FileSystemId": "bmcpfs-370jz26fkr2st9****", // CPFS for LingjunファイルシステムのID。 "DataFlowId": "df-37bae1804cc****", // データフローのID。 "DataFlowTaskId": "task-37b705830bcb****", // ストリーミングエクスポートタスクのID。 "SrcFilePath": "/test/file.png", // ストリーミングタスクで移行するファイルが存在するディレクトリの相対パス。 "DstFilePath": "/mnt/file.png" // ストリーミングタスクで移行されたファイルが存在するディレクトリの相対パス。 }期待される出力:
{ "RequestId": "A70BEE5D-76D3-49FB-B58F-1F398211A5C3", "DataFlowSubTaskId": "subTaskId-370l4l3x6qsb1z1****" }ストリーミングエクスポートサブタスクの進捗状況とステータスを照会します。
DescribeDataFlowSubTasksオペレーションを呼び出して、送信済みのストリーミングサブタスクの進捗状況とステータスを照会します。異なるキーは異なる値に対応します。詳細については、DescribeDataFlowSubTasksを参照してください。
この例では、データフローIDでサブタスクに関する情報を照会する方法について説明します。
{ "FileSystemId": "bmcpfs-370jz26fkr2st9****", // CPFS for LingjunファイルシステムのID。 "Filters": [ { "Key": "DataFlowIds", "Value": "df-37bae1804cc****" } ] }期待される出力:
{ "RequestId": "FCBB356-96CA-135B-84B3-02E6F262B6BD", "DataFlowSubTask": { "DataFlowSubTask": [ { "FileSystemId": "bmcpfs-370jz26fkr2st9****", // CPFS for LingjunファイルシステムのID。 "DataFlowId": "df-37bae1804cc****", // データフローのID。 "DataFlowTaskId": "task-37b705830bcb****", // ストリーミングデータフロータスクのID。 "DataFlowSubTaskId": "subTaskId-370l4l3x6qsb1z1****",// ストリーミングデータフローサブタスクのID。 "SrcFilePath": "/oss_test/yaml/test/file.png",// 移行するファイルのパス。 "DstFilePath": "/bmcpfs_test/dataflows/mnt/file.png", // 移行されたファイルのパス。 "Status": "COMPLETE", "Progress": 10000, "CreateTime": "2024-10-23 17:18:16", "StartTime": "2024-10-23 17:18:17", "EndTime": "2024-10-23 17:19:00", "ErrorMsg": "",// このパラメータが返されないか、返された結果が空の場合は、エラーは発生していません。 "ProgressStats": { "BytesTotal": 68, "BytesDone": 68, "ActualBytes": 68, "AverageSpeed": 34 }, "FileDetail": { "ModifyTime": 1725897600000000000, "Size": 68, "Checksum": "crc64:850309505450944****"// ファイルチェックサム。 } } ] } }ProgressパラメータとStatusパラメータの値は、ストリーミングサブタスクの進捗状況とステータスを示します。Statusパラメータの値がCOMPLETEの場合、サブタスクは完了です。Progressパラメータの値が10000の場合、すべてのデータがデスティネーションディレクトリにインポートまたはエクスポートされます。
次のステップ
ストリーミングサブタスクをキャンセルするには、CancelDataFlowSubTaskオペレーションを呼び出します。CREATEDまたはRUNNING状態のストリーミングサブタスクのみをキャンセルできます。
{
"FileSystemId": "bmcpfs-370jz26fkr2st9****", // CPFS for LingjunファイルシステムのID。
"DataFlowId": "df-37bae1804cc****", // データフローのID。
"DataFlowTaskId": "task-37b705830bcb****", // ストリーミングインポートまたはエクスポートタスクのID。
"DataFlowSubTaskId": "subTaskId-370kyfmyknxcyzw****" // ストリーミングデータフローサブタスクのID。
}