すべてのプロダクト
Search
ドキュメントセンター

Cloud Parallel File Storage:ストリーミングデータフロータスクのベストプラクティス

最終更新日:Mar 01, 2026

ストリーミングタスクを作成することで、Cloud Parallel File Storage (CPFS) for Lingjun ファイルシステムと Object Storage Service (OSS) バケット間でファイルを 1 つずつ継続的に移行できます。

概要

ディレクトリとの間でファイルをインポートまたはエクスポートするには、次の手順を実行する必要があります:

  1. データフローの作成:データフローを作成して、CPFS for Lingjun ファイルシステムのサブディレクトリを OSS バケットのプレフィックスにマップできます。

  2. ストリーミングタスクの作成:`CreateDataFlowTask` 操作を呼び出して、ストリーミングインポートまたはエクスポートタスクを作成できます。このタスクは、ソースディレクトリと宛先ディレクトリの間にトンネルを作成します。ストリーミングタスクが作成された後、タスクは実行中の状態のままになります。ただし、トンネルを介してデータは転送されません。データを転送するには、ストリーミングサブタスクを作成する必要があります。

  3. ストリーミングサブタスクの作成:`CreateDataFlowSubTask` 操作を呼び出して、ファイルごとにインポートまたはエクスポートのサブタスクを作成できます。

  4. ストリーミングサブタスクのステータスのクエリ:`DescribeDataFlowSubTask` 操作を呼び出して、送信されたストリーミングサブタスクの進捗とステータスをクエリできます。返された結果で [Status] パラメーターの値が COMPLETE で、[Progress] パラメーターの値が 10000 の場合、すべてのソースデータが指定された宛先ディレクトリにインポートまたはエクスポートされたことを示します。

前提条件

  • CPFS for Lingjun ファイルシステムが作成されていること。詳細については、「ファイルシステムの作成」をご参照ください。

  • 宛先の OSS バケットに、次の図に示すように、キーが `cpfs-dataflow` で値が `true` のタグを追加します。データフローの使用中は、このタグを削除したり変更したりしないでください。そうしないと、CPFS for Lingjun のデータフローがバケットデータにアクセスできなくなります。詳細については、「バケットのタグ付けの管理」をご参照ください。

    image

  • 複数のデータフローを使用して OSS バケットにデータをエクスポートするには、OSS バケットのバージョン管理機能を有効にします。これにより、データ競合を防ぐことができます。詳細については、「概要」をご参照ください。

重要

CPFS for Lingjun V2.6.0 以降のバージョンのファイルシステムのみがストリーミングデータフロータスクをサポートします。ファイルシステムのバージョンを確認する方法の詳細については、「ファイルシステムの詳細の表示」トピックの「ファイルシステムのバージョン番号の表示」セクションをご参照ください。

ストリーミングインポートタスクの作成

この例では、ストリーミングインポートジョブとストリーミングインポートサブタスクを作成し、単一ファイルの粒度でデータを継続的にストリーミングする方法について説明します。データは、OSS バケット (`examplebucket`) の /bmcpfs/test/file.xml パス配下のサブディレクトリ (`/test/file`) から、CPFS for Lingjun ファイルシステム (`bmcpfs-370jz26fkr2st9****`) の /oss/mnt/file.xml パス配下のサブディレクトリ (`/mnt/file`) に移行されます。

  1. データフローを作成します。

    API 操作を呼び出すか、コンソールで操作を実行して、宛先ファイルシステムのデータフローを作成できます。データフローが作成されたら、`df-37bae1804cc6****` などのデータフロー ID を取得します。

    • CreateDataFlow 操作を呼び出してデータフローを作成します。

      {
        "FileSystemId": "bmcpfs-370jz26fkr2st9****", // CPFS for Lingjun ファイルシステムの ID。
        "SourceStorage": "oss://examplebucket", // ソース OSS バケットのエンドポイント。
        "FileSystemPath": "/oss/",  // OSS にリンクされている CPFS for Lingjun ファイルシステム内のディレクトリ。このディレクトリはすでに存在している必要があります。
        "SourceStoragePath": "/bmcpfs/", // ソース OSS バケット内のオブジェクトのパス。
      }

      想定される出力:

      {
        "RequestId": "473469C7-AA6F-4DC5-B3DB-A3DC0D****3E",
        "DataFlowId": "df-37bae1804cc6****"
      }
    • コンソールでデータフローを作成します。詳細については、「データフローの管理」をご参照ください。

  2. ストリーミングデータフローインポートタスクを作成します。

    CreateDataFlowTask API を呼び出してストリーミングデータフローインポートタスクを作成し、返された TaskId の値を保存します。

    {
      "FileSystemId": "bmcpfs-370jz26fkr2st9****", // CPFS for Lingjun ファイルシステムの ID。
      "DataFlowId": "df-37bae1804cc6****", // データフローの ID。
      "TaskAction": "StreamImport", // ストリーミングデータフロータスクのタイプ。インポートタスクの場合は StreamImport、エクスポートタスクの場合は StreamExport です。
      "DataType": "MetaAndData", // データの型。値を MetaAndData に設定します。
      "Directory": "/test/", // 移行するファイルが存在するディレクトリの相対パス。この例では、OSS バケットのプレフィックスが使用されます。
      "DstDirectory": "/mnt/", // 移行されたファイルが存在するディレクトリの相対パス。この例では、CPFS for Lingjun ファイルシステム内のディレクトリが使用されます。
      "ConflictPolicy": "SKIP_THE_FILE" // 同名ファイルの競合解決ポリシー。有効な値:OVERWRITE_EXISTING:同名のファイルを強制的に上書きします。SKIP_THE_FILE:同名のファイルをスキップします。KEEP_LATEST:同名のファイルの最新バージョンを保持します。
    }

    想定される出力:

    {
      "RequestId": "2D69A58F-345C-4FDE-88E4-BF518948F518",
      "TaskId": "task-376a61ab2d80****"
    }
  3. ストリーミングインポートサブタスクを作成します。

    CreateDataFlowSubTask 操作を呼び出して、ストリーミングインポートサブタスクを作成します。

    {
      "FileSystemId": "bmcpfs-370jz26fkr2st9****", // CPFS for Lingjun ファイルシステムの ID。
      "DataFlowId": "df-37bae1804cc****", // データフローの ID。
      "DataFlowTaskId": "task-376a61ab2d80****", // ストリーミングインポートタスクの ID。
      "SrcFilePath": "/file.xml", // ストリーミングタスクで移行するファイルのパス。この例では、OSS バケット内のオブジェクトのパスが使用されます。
      "DstFilePath": "/mnt/file.xml" // ストリーミングタスクで移行されたファイルのパス。この例では、CPFS for Lingjun ファイルシステム内のディレクトリが使用されます。
    }

    想定される出力:

    {
      "RequestId": "A70BEE5D-76D3-49FB-B58F-1F398211A5C3",
      "DataFlowSubTaskId": "subTaskId-370kyfmyknxcyzw****"
    }
  4. ストリーミングサブタスクの進捗とステータスをクエリします。

    DescribeDataFlowSubTasks 操作を呼び出して、送信されたストリーミングサブタスクの進捗とステータスをクエリします。キーごとに対応する値が異なります。詳細については、「DescribeDataFlowSubTasks」をご参照ください。

    この例では、データフロー ID でサブタスクの情報をクエリする方法について説明します。

    {
      "FileSystemId": "bmcpfs-370jz26fkr2st9****", // CPFS for Lingjun ファイルシステムの ID。
      "Filters": [
        {
          "Key": "DataFlowIds",
          "Value": "df-37bae1804cc****"
        }
      ]
    }

    想定される出力:

    {
      "RequestId": "98696EF0-1607-4E9D-B01D-F20930B6****",
      "DataFlowSubTask": {
        "DataFlowSubTask": [
          {
            "FileSystemId": "bmcpfs-370jz26fkr2st9****", // CPFS for Lingjun ファイルシステムの ID。
            "DataFlowId": "df-37bae1804cc****", // データフローの ID。
            "DataFlowTaskId": "task-37b705830bcb****", // ストリーミングデータフロータスクの ID。
            "DataFlowSubTaskId": "subTaskId-370kyfmyknxcyzw****",// ストリーミングデータフローサブタスクの ID。
            "SrcFilePath": "/bmcpfs/test/file.xml",// 移行するファイルのパス。
            "DstFilePath": "/oss/mnt/file.xml", // 移行されたファイルのパス。
            "Status": "COMPLETE",
            "Progress": 10000,
            "CreateTime": "2024-10-23 16:28:16",
            "StartTime": "2024-10-23 16:28:17",
            "EndTime": "2024-10-23 16:29:22",
            "ErrorMsg": "",// このパラメーターが返されないか、返された結果が空の場合、エラーは発生していません。
            "ProgressStats": {
              "BytesTotal": 68,
              "BytesDone": 68,
              "ActualBytes": 68,
              "AverageSpeed": 34
            },
            "FileDetail": {
              "ModifyTime": 1725897600000000000,
              "Size": 68,
              "Checksum": "crc64:850309505450944****"// ファイルのチェックサム。
            }
          }
        ]
      }
    }

    [Progress] パラメーターと [Status] パラメーターの値は、ストリーミングサブタスクの進捗とステータスを示します。[Status] パラメーターの値が COMPLETE の場合、サブタスクは完了しています。[Progress] パラメーターの値が 10000 の場合、すべてのデータが宛先ディレクトリにインポートまたはエクスポートされたことを示します。

ストリーミングエクスポートタスク

この例では、ストリーミングエクスポートタスクとストリーミングエクスポートサブタスクを作成して、`bmcpfs-370jz26fkr2st9****` という名前の CPFS for Lingjun ファイルシステム内の /oss_test/yaml/test/ パスから file.png ファイルを、`examplebucket` という名前の OSS バケット内の `/bmcpfs_test/dataflows/mnt/file.png` パスに移行する方法について説明します。このプロセスにより、継続的なファイルレベルのデータ移行が可能になります。

  1. データフローを作成します。

    API 操作を呼び出すか、コンソールで操作を実行して、宛先ファイルシステムのデータフローを作成できます。データフローが作成されたら、`df-37bae1804cc6****` などのデータフロー ID を取得します。

    • CreateDataFlow 操作を呼び出してデータフローを作成します。

      {
        "FileSystemId": "bmcpfs-370jz26fkr2st9****", // CPFS for Lingjun ファイルシステムの ID。
        "SourceStorage": "oss://examplebucket", // ソース OSS バケットのエンドポイント。
        "FileSystemPath": "/oss/",  // OSS にリンクされている CPFS for Lingjun ファイルシステム内のディレクトリ。このディレクトリはすでに存在している必要があります。
        "SourceStoragePath": "/bmcpfs/", // ソース OSS バケット内のオブジェクトのパス。
      }

      想定される出力:

      {
        "RequestId": "473469C7-AA6F-4DC5-B3DB-A3DC0D****3E",
        "DataFlowId": "df-37bae1804cc6****"
      }
    • コンソールでデータフローを作成します。詳細については、「データフローの管理」をご参照ください。

  2. ストリーミングデータフローエクスポートタスクを作成します。

    CreateDataFlowTask API を呼び出してストリーミングデータフローエクスポートタスクを作成し、返された TaskId の値を保存します。

    {
      "FileSystemId": "bmcpfs-370jz26fkr2st9****", // CPFS for Lingjun ファイルシステムの ID。
      "DataFlowId": "df-37bae1804cc6****", // データフローの ID。
      "TaskAction": "StreamImport", // ストリーミングデータフロータスクのタイプ。エクスポートタスクの場合は StreamExport です。
      "DataType": "MetaAndData", // データの型。値を MetaAndData に設定します。
      "Directory": "/yaml/", // 移行するファイルが存在するディレクトリの相対パス。この例では、CPFS for Lingjun ファイルシステム内のディレクトリの相対パスが使用されます。
      "DstDirectory": "/dataflows/", // 移行されたファイルが存在するディレクトリの相対パス。この例では、OSS バケットプレフィックスの相対パスが使用されます。
      "ConflictPolicy": "SKIP_THE_FILE" // 同名ファイルの競合解決ポリシー。有効な値:OVERWRITE_EXISTING:同名のファイルを強制的に上書きします。SKIP_THE_FILE:同名のファイルをスキップします。KEEP_LATEST:同名のファイルの最新バージョンを保持します。
    }

    想定される出力:

    {
      "RequestId": "BC7C825C-5F65-4B56-BEF6-98C56C7C930B",
      "TaskId": "task-37b705830bcb****"
    }
  3. ストリーミングエクスポートサブタスクを作成します。

    CreateDataFlowSubTask 操作を呼び出して、ストリーミングエクスポートサブタスクを作成します。

    {
      "FileSystemId": "bmcpfs-370jz26fkr2st9****", // CPFS for Lingjun ファイルシステムの ID。
      "DataFlowId": "df-37bae1804cc****", // データフローの ID。
      "DataFlowTaskId": "task-37b705830bcb****", //ストリーミングエクスポートタスクの ID。
      "SrcFilePath": "/test/file.png", // ストリーミングタスクで移行するファイルが存在するディレクトリの相対パス。
      "DstFilePath": "/mnt/file.png" // ストリーミングタスクで移行されたファイルが存在するディレクトリの相対パス。
    }

    想定される出力:

    {
      "RequestId": "A70BEE5D-76D3-49FB-B58F-1F398211A5C3",
      "DataFlowSubTaskId": "subTaskId-370l4l3x6qsb1z1****"
    }
  4. ストリーミングエクスポートサブタスクの進捗とステータスをクエリします。

    DescribeDataFlowSubTasks 操作を呼び出して、送信されたストリーミングサブタスクの進捗とステータスをクエリします。キーごとに対応する値が異なります。詳細については、「DescribeDataFlowSubTasks」をご参照ください。

    この例では、データフロー ID でサブタスクの情報をクエリする方法について説明します。

    {
      "FileSystemId": "bmcpfs-370jz26fkr2st9****", // CPFS for Lingjun ファイルシステムの ID。
      "Filters": [
        {
          "Key": "DataFlowIds",
          "Value": "df-37bae1804cc****"
        }
      ]
    }

    想定される出力:

    {
      "RequestId": "FCBB356-96CA-135B-84B3-02E6F262B6BD",
      "DataFlowSubTask": {
        "DataFlowSubTask": [
          {
            "FileSystemId": "bmcpfs-370jz26fkr2st9****", // CPFS for Lingjun ファイルシステムの ID。
            "DataFlowId": "df-37bae1804cc****", // データフローの ID。
            "DataFlowTaskId": "task-37b705830bcb****", // ストリーミングデータフロータスクの ID。
            "DataFlowSubTaskId": "subTaskId-370l4l3x6qsb1z1****",// ストリーミングデータフローサブタスクの ID。
            "SrcFilePath": "/oss_test/yaml/test/file.png",// 移行するファイルのパス。
            "DstFilePath": "/bmcpfs_test/dataflows/mnt/file.png", // 移行されたファイルのパス。
            "Status": "COMPLETE",
            "Progress": 10000,
            "CreateTime": "2024-10-23 17:18:16",
            "StartTime": "2024-10-23 17:18:17",
            "EndTime": "2024-10-23 17:19:00",
            "ErrorMsg": "",// このパラメーターが返されないか、返された結果が空の場合、エラーは発生していません。
            "ProgressStats": {
              "BytesTotal": 68,
              "BytesDone": 68,
              "ActualBytes": 68,
              "AverageSpeed": 34
            },
            "FileDetail": {
              "ModifyTime": 1725897600000000000,
              "Size": 68,
              "Checksum": "crc64:850309505450944****"// ファイルのチェックサム。
            }
          }
        ]
      }
    }

    [Progress] パラメーターと [Status] パラメーターの値は、ストリーミングサブタスクの進捗とステータスを示します。[Status] パラメーターの値が COMPLETE の場合、サブタスクは完了しています。[Progress] パラメーターの値が 10000 の場合、すべてのデータが宛先ディレクトリにインポートまたはエクスポートされたことを示します。

関連操作

ストリーミングサブタスクをキャンセルするには、CancelDataFlowSubTask 操作を呼び出します。CREATED または RUNNING 状態のストリーミングサブタスクのみをキャンセルできます。

{
  "FileSystemId": "bmcpfs-370jz26fkr2st9****", // CPFS for Lingjun ファイルシステムの ID。
  "DataFlowId": "df-37bae1804cc****", // データフローの ID。
  "DataFlowTaskId": "task-37b705830bcb****", // ストリーミングインポートまたはエクスポートタスクの ID。
  "DataFlowSubTaskId": "subTaskId-370kyfmyknxcyzw****" // ストリーミングデータフローサブタスクの ID。
}