All Products
Search
Document Center

Platform For AI:Use OSS

Last Updated:Apr 01, 2026

PAI services — Deep Learning Containers (DLC) and Data Science Workshop (DSW) — support four methods for reading training data from Object Storage Service (OSS): JindoFuse, ossfs 2.0, OSS Connector for AI/ML, and the OSS Python SDK. Each method suits different workloads, frameworks, and data access patterns.

Choose a method

Downloading the full dataset before training causes GPU idle time, forces re-downloads for every run, and wastes bandwidth when only a fraction of the data is needed. The four methods below let you access OSS data directly from your training container, without a full local copy.

MethodBest forNotes
JindoFuseNon-PyTorch frameworks; small datasets that fit in local cache; workloads that write back to OSSMounts OSS as a local path; supports read and write
ossfs 2.0High-throughput AI training, inference, big data, and autonomous driving workloads with sequential reads or append-only writesFUSE-based; does not require full POSIX semantics
OSS Connector for AI/MLPyTorch training on millions of small files with high-throughput requirementsStreams data without mounting; uses the native PyTorch Dataset interface
OSS SDK (oss2)Temporary or programmatic access when mounting is not neededMaximum flexibility; no mount required
OSS Connector for AI/ML is optimized for PyTorch and provides the best throughput when reading millions of small files. If your framework is not PyTorch or you need to write data back to OSS, use JindoFuse or ossfs 2.0 instead.

JindoFuse

JindoFuse is an OSS mount component from Alibaba Cloud EMR. It mounts an OSS dataset to a container path, letting training code read and write OSS data as if it were a local directory.

Limitations

Before configuring JindoFuse, note the following default behavior when Advanced Configuration is left blank:

  • Metadata is cached — directory and file lists are cached at mount time. In distributed tasks, multiple nodes creating the same directory simultaneously will fail on all but one node.

  • Files are not visible until the write completes — OSS Multipart API is used for file creation. Objects appear in OSS only after all write operations finish.

  • Concurrent reads and writes are not supported.

  • Random writes are not supported.

Adjust these behaviors using the advanced configuration options below.

Mount OSS in DLC

When creating a DLC job, add an OSS data source. Two mount types are supported. For details, see Create a training job.

image
Mount typeDescription
DatasetSelect a dataset of the Object Storage Service (OSS) type and configure the Mount Path. Public datasets support read-only mode only.
Direct MountDirectly mount an OSS bucket storage path. When using a Lingjun resource quota with local caching enabled, turn on the Use Cache switch.

Mount OSS in DSW

When creating a DSW instance, add an OSS data source. Two mount types are supported. For details, see Create a DSW instance.

image
Mount typeDescription
Mount DatasetSelect an Object Storage Service (OSS) dataset and configure the Mount Path. Public datasets can only be mounted in read-only mode.
Mount Storage PathDirectly mount an OSS bucket storage path.

Configure JindoFuse

Set advanced parameters in the Advanced Configuration field of the DLC job or DSW instance. Choose the preset that matches your workload, or combine individual options as needed.

The presets below are starting points, not universal optimal settings. For the full parameter reference, see the JindoFuse user guide.

Configuration presets

Files are static during the run — reading a fixed batch of files with no concurrent writes:

{
  "fs.oss.download.thread.concurrency": "Twice the number of CPU cores",
  "fs.jindo.args": "-oro -oattr_timeout=7200 -oentry_timeout=7200 -onegative_timeout=7200 -okernel_cache -ono_symlink"
}

Fast read/write — training data and model weights where occasional data inconsistency is acceptable. Do not use the mount path as your working directory.

{
  "fs.oss.download.thread.concurrency": "Twice the number of CPU cores",
  "fs.oss.upload.thread.concurrency": "Twice the number of CPU cores",
  "fs.jindo.args": "-oattr_timeout=3 -oentry_timeout=0 -onegative_timeout=0 -oauto_cache -ono_symlink"
}

Incremental read/write — saving model weight files where only new data is written (not overwriting existing data):

{
  "fs.oss.upload.thread.concurrency": "Twice the number of CPU cores",
  "fs.jindo.args": "-oattr_timeout=3 -oentry_timeout=0 -onegative_timeout=0 -oauto_cache -ono_symlink"
}

Consistent read/write — project code or configuration where data consistency across concurrent operations is required:

{
  "fs.jindo.args": "-oattr_timeout=0 -oentry_timeout=0 -onegative_timeout=0 -oauto_cache -ono_symlink"
}

Individual options

Select a JindoFuse version:

{
  "fs.jindo.fuse.pod.image.tag": "6.7.0"
}

Disable metadata cache — prevents write failures when multiple nodes in a distributed task write to the same directory simultaneously:

{
  "fs.jindo.args": "-oattr_timeout=0 -oentry_timeout=0 -onegative_timeout=0"
}

Adjust upload/download thread count — increases throughput for large datasets:

{
  "fs.oss.upload.thread.concurrency": "32",
  "fs.oss.download.thread.concurrency": "32",
  "fs.oss.read.readahead.buffer.count": "64",
  "fs.oss.read.readahead.buffer.size": "4194304"
}

Use AppendObject for writes — all locally created files are uploaded using the OSS AppendObject API. The maximum object size is 5 GB. For limits, see AppendObject.

{
  "fs.jindo.args": "-oattr_timeout=0 -oentry_timeout=0 -onegative_timeout=0",
  "fs.oss.append.enable": "true",
  "fs.oss.flush.interval.millisecond": "1000",
  "fs.oss.read.readahead.buffer.size": "4194304",
  "fs.oss.write.buffer.size": "262144"
}

Mount OSS-HDFS — for distributed training scenarios. To enable OSS-HDFS, see OSS-HDFS overview.

{
  "fs.jindo.args": "-oattr_timeout=0 -oentry_timeout=0 -onegative_timeout=0 -ono_symlink -ono_xattr -ono_flock -odirect_io",
  "fs.oss.flush.interval.millisecond": "10000",
  "fs.oss.randomwrite.sync.interval.millisecond": "10000"
}

Limit memory usage:

{
  "fs.jindo.fuse.pod.mem.limit": "10Gi"
}

Configure JindoFuse using the Python SDK

To update JindoFuse parameters on an existing dataset, use the alibabacloud-aiworkspace20210204 SDK.

Prerequisites:

Before you begin, make sure you have:

All examples below use the same pattern: fetch the dataset, update the options dict, and push the change. Replace <dataset-id> with your actual dataset ID and cn-hangzhou with your region.

Files are static during the run (read-only)

import json
from alibabacloud_tea_openapi.models import Config
from alibabacloud_credentials.client import Client as CredClient
from alibabacloud_aiworkspace20210204.client import Client as AIWorkspaceClient
from alibabacloud_aiworkspace20210204.models import UpdateDatasetRequest


def change_config():
    region_id = 'cn-hangzhou'
    # Load credentials from environment variables — avoid hardcoding AccessKey pairs.
    cred = CredClient()
    dataset_id = '<dataset-id>'

    workspace_client = AIWorkspaceClient(
        config=Config(
            credential=cred,
            region_id=region_id,
            endpoint="aiworkspace.{}.aliyuncs.com".format(region_id),
        )
    )
    # 1. Get the current dataset options.
    get_dataset_resp = workspace_client.get_dataset(dataset_id)
    options = json.loads(get_dataset_resp.body.options)

    # Set thread count to twice the number of CPU cores.
    options['fs.oss.download.thread.concurrency'] = 32
    options['fs.jindo.args'] = '-oro -oattr_timeout=7200 -oentry_timeout=7200 -onegative_timeout=7200 -okernel_cache -ono_symlink'

    update_request = UpdateDatasetRequest(options=json.dumps(options))
    # 2. Apply the updated options.
    workspace_client.update_dataset(dataset_id, update_request)
    print('new options is: {}'.format(update_request.options))


change_config()

Fast read/write

import json
from alibabacloud_tea_openapi.models import Config
from alibabacloud_credentials.client import Client as CredClient
from alibabacloud_aiworkspace20210204.client import Client as AIWorkspaceClient
from alibabacloud_aiworkspace20210204.models import UpdateDatasetRequest


def change_config():
    region_id = 'cn-hangzhou'
    cred = CredClient()
    dataset_id = '<dataset-id>'

    workspace_client = AIWorkspaceClient(
        config=Config(
            credential=cred,
            region_id=region_id,
            endpoint="aiworkspace.{}.aliyuncs.com".format(region_id),
        )
    )
    get_dataset_resp = workspace_client.get_dataset(dataset_id)
    options = json.loads(get_dataset_resp.body.options)

    # Set thread count to twice the number of CPU cores.
    options['fs.oss.download.thread.concurrency'] = 32
    options['fs.oss.upload.thread.concurrency'] = 32
    options['fs.jindo.args'] = '-oattr_timeout=3 -oentry_timeout=0 -onegative_timeout=0 -oauto_cache -ono_symlink'

    update_request = UpdateDatasetRequest(options=json.dumps(options))
    workspace_client.update_dataset(dataset_id, update_request)
    print('new options is: {}'.format(update_request.options))


change_config()

Incremental read/write

import json
from alibabacloud_tea_openapi.models import Config
from alibabacloud_credentials.client import Client as CredClient
from alibabacloud_aiworkspace20210204.client import Client as AIWorkspaceClient
from alibabacloud_aiworkspace20210204.models import UpdateDatasetRequest


def change_config():
    region_id = 'cn-hangzhou'
    cred = CredClient()
    dataset_id = '<dataset-id>'

    workspace_client = AIWorkspaceClient(
        config=Config(
            credential=cred,
            region_id=region_id,
            endpoint="aiworkspace.{}.aliyuncs.com".format(region_id),
        )
    )
    get_dataset_resp = workspace_client.get_dataset(dataset_id)
    options = json.loads(get_dataset_resp.body.options)

    # Set thread count to twice the number of CPU cores.
    options['fs.oss.upload.thread.concurrency'] = 32
    options['fs.jindo.args'] = '-oattr_timeout=3 -oentry_timeout=0 -onegative_timeout=0 -oauto_cache -ono_symlink'

    update_request = UpdateDatasetRequest(options=json.dumps(options))
    workspace_client.update_dataset(dataset_id, update_request)
    print('new options is: {}'.format(update_request.options))


change_config()

Consistent read/write

import json
from alibabacloud_tea_openapi.models import Config
from alibabacloud_credentials.client import Client as CredClient
from alibabacloud_aiworkspace20210204.client import Client as AIWorkspaceClient
from alibabacloud_aiworkspace20210204.models import UpdateDatasetRequest


def change_config():
    region_id = 'cn-hangzhou'
    cred = CredClient()
    dataset_id = '<dataset-id>'

    workspace_client = AIWorkspaceClient(
        config=Config(
            credential=cred,
            region_id=region_id,
            endpoint="aiworkspace.{}.aliyuncs.com".format(region_id),
        )
    )
    get_dataset_resp = workspace_client.get_dataset(dataset_id)
    options = json.loads(get_dataset_resp.body.options)

    options['fs.jindo.args'] = '-oattr_timeout=0 -oentry_timeout=0 -onegative_timeout=0 -oauto_cache -ono_symlink'

    update_request = UpdateDatasetRequest(options=json.dumps(options))
    workspace_client.update_dataset(dataset_id, update_request)
    print('new options is: {}'.format(update_request.options))


change_config()

Select a different JindoFuse version

Supported versions: 6.4.4, 6.6.0, 6.7.0. For release notes, see JindoData releases.

import json
from alibabacloud_tea_openapi.models import Config
from alibabacloud_credentials.client import Client as CredClient
from alibabacloud_aiworkspace20210204.client import Client as AIWorkspaceClient
from alibabacloud_aiworkspace20210204.models import UpdateDatasetRequest


def change_version():
    region_id = 'cn-hangzhou'
    cred = CredClient()
    dataset_id = '<dataset-id>'

    workspace_client = AIWorkspaceClient(
        config=Config(
            credential=cred,
            region_id=region_id,
            endpoint="aiworkspace.{}.aliyuncs.com".format(region_id),
        )
    )
    get_dataset_resp = workspace_client.get_dataset(dataset_id)
    options = json.loads(get_dataset_resp.body.options)

    options['fs.jindo.fuse.pod.image.tag'] = "6.7.0"

    update_request = UpdateDatasetRequest(options=json.dumps(options))
    workspace_client.update_dataset(dataset_id, update_request)
    print('new options is: {}'.format(update_request.options))


change_version()

Disable metadata cache

Use this when multiple nodes in a distributed task write to the same directory simultaneously and some writes fail.

import json
from alibabacloud_tea_openapi.models import Config
from alibabacloud_credentials.client import Client as CredClient
from alibabacloud_aiworkspace20210204.client import Client as AIWorkspaceClient
from alibabacloud_aiworkspace20210204.models import UpdateDatasetRequest


def turnOffMetaCache():
    region_id = 'cn-hangzhou'
    cred = CredClient()
    dataset_id = '<dataset-id>'

    workspace_client = AIWorkspaceClient(
        config=Config(
            credential=cred,
            region_id=region_id,
            endpoint="aiworkspace.{}.aliyuncs.com".format(region_id),
        )
    )
    get_dataset_resp = workspace_client.get_dataset(dataset_id)
    options = json.loads(get_dataset_resp.body.options)

    options['fs.jindo.args'] = '-oattr_timeout=0 -oentry_timeout=0 -onegative_timeout=0'

    update_request = UpdateDatasetRequest(options=json.dumps(options))
    workspace_client.update_dataset(dataset_id, update_request)
    print('new options is: {}'.format(update_request.options))


turnOffMetaCache()

Adjust upload/download thread count

import json
from alibabacloud_tea_openapi.models import Config
from alibabacloud_credentials.client import Client as CredClient
from alibabacloud_aiworkspace20210204.client import Client as AIWorkspaceClient
from alibabacloud_aiworkspace20210204.models import UpdateDatasetRequest


def adjustThreadNum():
    region_id = 'cn-hangzhou'
    cred = CredClient()
    dataset_id = '<dataset-id>'

    workspace_client = AIWorkspaceClient(
        config=Config(
            credential=cred,
            region_id=region_id,
            endpoint="aiworkspace.{}.aliyuncs.com".format(region_id),
        )
    )
    get_dataset_resp = workspace_client.get_dataset(dataset_id)
    options = json.loads(get_dataset_resp.body.options)

    options['fs.oss.upload.thread.concurrency'] = 32
    options['fs.oss.download.thread.concurrency'] = 32
    options['fs.oss.read.readahead.buffer.count'] = 32

    update_request = UpdateDatasetRequest(options=json.dumps(options))
    workspace_client.update_dataset(dataset_id, update_request)
    print('new options is: {}'.format(update_request.options))


adjustThreadNum()

Use AppendObject for writes

All files created locally are uploaded as OSS objects using AppendObject. The maximum object size is 5 GB. For limits, see AppendObject.

import json
from alibabacloud_tea_openapi.models import Config
from alibabacloud_credentials.client import Client as CredClient
from alibabacloud_aiworkspace20210204.client import Client as AIWorkspaceClient
from alibabacloud_aiworkspace20210204.models import UpdateDatasetRequest


def useAppendObject():
    region_id = 'cn-hangzhou'
    cred = CredClient()
    dataset_id = '<dataset-id>'

    workspace_client = AIWorkspaceClient(
        config=Config(
            credential=cred,
            region_id=region_id,
            endpoint="aiworkspace.{}.aliyuncs.com".format(region_id),
        )
    )
    get_dataset_resp = workspace_client.get_dataset(dataset_id)
    options = json.loads(get_dataset_resp.body.options)

    options['fs.jindo.args'] = '-oattr_timeout=0 -oentry_timeout=0 -onegative_timeout=0'
    options['fs.oss.append.enable'] = "true"
    options['fs.oss.flush.interval.millisecond'] = "1000"
    options['fs.oss.read.buffer.size'] = "262144"
    options['fs.oss.write.buffer.size'] = "262144"

    update_request = UpdateDatasetRequest(options=json.dumps(options))
    workspace_client.update_dataset(dataset_id, update_request)
    print('new options is: {}'.format(update_request.options))


useAppendObject()

Mount OSS-HDFS

To enable OSS-HDFS, see OSS-HDFS overview. The following example creates a dataset with an OSS-HDFS endpoint.

import json
from alibabacloud_tea_openapi.models import Config
from alibabacloud_credentials.client import Client as CredClient
from alibabacloud_aiworkspace20210204.client import Client as AIWorkspaceClient
from alibabacloud_aiworkspace20210204.models import CreateDatasetRequest


def createOssHdfsDataset():
    region_id = 'cn-hangzhou'
    cred = CredClient()
    workspace_id = '<workspace-id>'

    oss_bucket = '<oss-bucket>'
    # Use the OSS-HDFS endpoint.
    oss_endpoint = f'{region_id}.oss-dls.aliyuncs.com'
    oss_path = '/'
    mount_path = '/mnt/data/'

    workspace_client = AIWorkspaceClient(
        config=Config(
            credential=cred,
            region_id=region_id,
            endpoint="aiworkspace.{}.aliyuncs.com".format(region_id),
        )
    )

    response = workspace_client.create_dataset(CreateDatasetRequest(
        workspace_id=workspace_id,
        name="<dataset-name>",
        data_type='COMMON',
        data_source_type='OSS',
        property='DIRECTORY',
        uri=f'oss://{oss_bucket}.{oss_endpoint}{oss_path}',
        accessibility='PRIVATE',
        source_type='USER',
        options=json.dumps({
            'mountPath': mount_path,
            # Recommended for distributed training scenarios.
            'fs.jindo.args': '-oattr_timeout=0 -oentry_timeout=0 -onegative_timeout=0 -ono_symlink -ono_xattr -ono_flock -odirect_io',
            'fs.oss.flush.interval.millisecond': "10000",
            'fs.oss.randomwrite.sync.interval.millisecond': "10000",
        })
    ))
    print(f'datasetId: {response.body.dataset_id}')


createOssHdfsDataset()

Configure memory resources

import json
from alibabacloud_tea_openapi.models import Config
from alibabacloud_credentials.client import Client as CredClient
from alibabacloud_aiworkspace20210204.client import Client as AIWorkspaceClient
from alibabacloud_aiworkspace20210204.models import UpdateDatasetRequest


def adjustResource():
    region_id = 'cn-hangzhou'
    cred = CredClient()
    dataset_id = '<dataset-id>'

    workspace_client = AIWorkspaceClient(
        config=Config(
            credential=cred,
            region_id=region_id,
            endpoint="aiworkspace.{}.aliyuncs.com".format(region_id),
        )
    )
    get_dataset_resp = workspace_client.get_dataset(dataset_id)
    options = json.loads(get_dataset_resp.body.options)

    options['fs.jindo.fuse.pod.mem.limit'] = "10Gi"

    update_request = UpdateDatasetRequest(options=json.dumps(options))
    workspace_client.update_dataset(dataset_id, update_request)
    print('new options is: {}'.format(update_request.options))


adjustResource()

ossfs 2.0

ossfs 2.0 mounts OSS as a local file system using FUSE. It delivers high sequential read/write performance and is designed for AI training, inference, big data, and autonomous driving workloads that do not require full POSIX semantics.

To activate ossfs 2.0, set {"mountType":"ossfs"} in Advanced Configuration.

Mount OSS in DLC

When creating a DLC job, add an OSS data source. For configuration details, see Create a training job.

image
Mount typeDescription
DatasetSelect a dataset of the Object Storage Service (OSS) type and configure the mount path. Public datasets support read-only mode only.
Direct MountDirectly mount an OSS bucket storage path. When using a Lingjun resource quota with local cache enabled, turn on the Use Cache switch.

Mount OSS in DSW

When creating a DSW instance, add an OSS data source. For configuration details, see Create a DSW instance.

image
Mount typeDescription
Dataset mountSelect a dataset of the Object Storage Service (OSS) type and configure the mount path. Public datasets support read-only mode only.
Storage path mountDirectly mount an OSS bucket storage path.

Configure ossfs 2.0

Set advanced parameters using fs.ossfs.args in Advanced Configuration. Separate multiple parameters with a comma. For the full parameter reference, see ossfs 2.0.

Files are static during the run — reading a fixed set of files with no modifications. A longer metadata cache reduces storage API calls.

{
    "mountType": "ossfs",
    "fs.ossfs.args": "-oattr_timeout=7200"
}

Fast read/write — balances cache efficiency and data freshness for typical training workloads:

{
    "mountType": "ossfs",
    "fs.ossfs.args": "-oattr_timeout=3, -onegative_timeout=0"
}

Consistent view across distributed nodes — makes sure all nodes see the same file state after each write:

{
    "mountType": "ossfs",
    "fs.ossfs.args": "-onegative_timeout=0, -oclose_to_open"
}

Out-of-memory (OOM) from many open files — high concurrency in DLC or DSW can open many files simultaneously, causing memory pressure. The following configuration reduces memory usage:

{
    "mountType": "ossfs",
    "fs.ossfs.args": "-oreaddirplus=false, -oinode_cache_eviction_threshold=300000"
}

Writing large files — by default, ossfs 2.0 uses an 8 MiB part size for multipart uploads, which limits individual file writes to 78.125 GiB. Files larger than this will fail to write. To increase the limit, raise the part size using -oupload_buffer_size. For example, a 32 MiB part size (33,554,432 bytes) raises the per-file limit to 312.5 GiB. A larger part size uses more memory; control total memory with -total_mem_limit. For all mount options, see Mount options.

{
    "mountType": "ossfs",
    "fs.ossfs.args": "-oupload_buffer_size=33554432"
}

OSS Connector for AI/ML

OSS Connector for AI/ML is a client library for PyTorch training. It streams OSS data directly into your training loop without mounting, using the native PyTorch Dataset and IterableDataset interfaces.

Limitations

  • Official images: Only DLC jobs and DSW instances running PyTorch 2.0 or later are supported.

  • Custom images: PyTorch 2.0 or later and Python 3.8–3.12 are required. Install the connector with:

    pip install -i http://yum.tbsite.net/aliyun-pypi/simple/ --extra-index-url http://yum.tbsite.net/pypi/simple/ --trusted-host=yum.tbsite.net osstorchconnector
  • OssCheckpoint: Available only in general computing resource environments.

Prerequisites

Before you begin, make sure you have:

  • A DLC job or DSW instance running PyTorch 2.0 or later

  • Configured credentials (see below)

  • A config.json file with concurrency and prefetch settings (see below)

Step 1: Configure credentials

Use one of the following methods:

  • RAM role (recommended): Configure a DLC RAM role so the job automatically gets a Security Token Service (STS) temporary credential. No explicit authentication configuration is needed in your code. See Configure a DLC RAM role.

  • Credential file: Create a credential file in your code project with the following format. After configuring a RAM role, the default credential path is /mnt/.alibabacloud/credentials.

    Note

    Storing AccessKey information in plaintext poses a security risk. Use a RAM role whenever possible.

    FieldRequiredDescriptionExample
    AccessKeyIdYesAccessKey ID of an Alibaba Cloud account or RAM user. For STS credentials, use the temporary AccessKey ID.NTS****
    AccessKeySecretYesAccessKey secret of an Alibaba Cloud account or RAM user. For STS credentials, use the temporary AccessKey secret.7NR2****
    SecurityTokenNoRequired when using STS temporary credentials.STS.6MC2****
    ExpirationNoExpiration time of the credential. If blank, the credential never expires. The connector re-reads the file after expiration.2024-08-20T00:00:00Z
    {
      "AccessKeyId": "<access-key-id>",
      "AccessKeySecret": "<access-key-secret>",
      "SecurityToken": "<security-token>",
      "Expiration": "2024-08-20T00:00:00Z"
    }

Step 2: Create a config.json file

The config.json file sets concurrency, prefetch, and logging parameters for OSS data requests.

{
    "logLevel": 1,
    "logPath": "/var/log/oss-connector/connector.log",
    "auditPath": "/var/log/oss-connector/audit.log",
    "datasetConfig": {
        "prefetchConcurrency": 24,
        "prefetchWorker": 2
    },
    "checkpointConfig": {
        "prefetchConcurrency": 24,
        "prefetchWorker": 4,
        "uploadConcurrency": 64
    }
}
FieldRequiredDescriptionDefault
logLevelYesLog level. 0: Debug, 1: INFO, 2: WARN, 3: ERROR.1 (INFO)
logPathYesConnector log file path./var/log/oss-connector/connector.log
auditPathYesAudit log path. Records I/O requests with latency greater than 100 ms./var/log/oss-connector/audit.log
datasetConfig.prefetchConcurrencyYesConcurrent prefetch tasks for dataset reads.24
datasetConfig.prefetchWorkerYesvCPUs allocated for dataset prefetch.4
checkpointConfig.prefetchConcurrencyYesConcurrent prefetch tasks for checkpoint reads.24
checkpointConfig.prefetchWorkerYesvCPUs allocated for checkpoint prefetch.4
checkpointConfig.uploadConcurrencyYesConcurrent tasks for checkpoint writes.64

Choose a dataset interface

OSS Connector for AI/ML provides two dataset interfaces that extend standard PyTorch classes:

InterfaceExtendsReading orderBest for
OssIterableDatasetIterableDatasetSequentialLarge datasets, limited memory, no shuffle needed
OssMapDatasetDatasetDetermined by DataLoader; supports shuffleSmall datasets, sufficient memory, random access or parallel processing

Both interfaces support three access methods:

  • from_prefix() — access all files under an OSS path prefix

  • from_manifest_file() — access files listed in a manifest file; supports multiple buckets

  • from_objects() — access a specific list of OSS URIs

Use OssMapDataset

Access by OSS path prefix:

Use this when your dataset files are organized under a single OSS folder and you do not need a separate index file.

def read_and_transform(data):
    normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                     std=[0.229, 0.224, 0.225])
    transform = transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        normalize,
    ])

    try:
        img = accimage.Image((data.read()))
        val = transform(img)
        label = data.label  # file name
    except Exception as e:
        print("read failed", e)
        return None, 0
    return val, label

dataset = OssMapDataset.from_prefix(
    "{oss_data_folder_uri}",
    endpoint="{oss_endpoint}",
    transform=read_and_transform,
    cred_path=cred_path,
    config_path=config_path
)

Access by manifest file:

Use this when your dataset spans multiple OSS buckets or you need a separate index mapping files to labels.

Expected manifest format (one JSON object per line):

{'data': {'source': 'oss://examplebucket.oss-cn-wulanchabu.aliyuncs.com/dataset_folder/class1/image1.JPEG'}}
{'data': {'source': ''}}
def transform_oss_path(input_path):
    pattern = r'oss://(.*?)\.(.*?)/(.*)'
    match = re.match(pattern, input_path)
    if match:
        return f'oss://{match.group(1)}/{match.group(3)}'
    else:
        return input_path


def manifest_parser(reader: io.IOBase) -> Iterable[Tuple[str, str, int]]:
    lines = reader.read().decode("utf-8").strip().split("\n")
    for i, line in enumerate(lines):
        data = json.loads(line)
        yield transform_oss_path(data["data"]["source"]), ""

dataset = OssMapDataset.from_manifest_file(
    "{manifest_file_path}",
    manifest_parser,
    "",
    endpoint=endpoint,
    transform=read_and_trans,
    cred_path=cred_path,
    config_path=config_path
)

Access by list of OSS URIs:

uris = [
    "oss://examplebucket.oss-cn-wulanchabu.aliyuncs.com/dataset_folder/class1/image1.JPEG",
    "oss://examplebucket.oss-cn-wulanchabu.aliyuncs.com/dataset_folder/class2/image2.JPEG"
]
dataset = OssMapDataset.from_objects(
    uris,
    endpoint=endpoint,
    transform=read_and_trans,
    cred_path=cred_path,
    config_path=config_path
)

Use OssIterableDataset

OssIterableDataset supports the same three access methods. Replace OssMapDataset with OssIterableDataset in each call:

# Access by prefix
dataset = OssIterableDataset.from_prefix("{oss_data_folder_uri}", endpoint="{oss_endpoint}", transform=read_and_transform, cred_path=cred_path, config_path=config_path)

# Access by manifest file
dataset = OssIterableDataset.from_manifest_file("{manifest_file_path}", manifest_parser, "", endpoint=endpoint, transform=read_and_trans, cred_path=cred_path, config_path=config_path)

# Access by URI list
dataset = OssIterableDataset.from_objects(uris, endpoint=endpoint, transform=read_and_trans, cred_path=cred_path, config_path=config_path)

Use OssCheckpoint

OssCheckpoint saves and loads model checkpoints directly to and from OSS, without mounting.

OssCheckpoint is available only in general computing resource environments.
checkpoint = OssCheckpoint(endpoint="{oss_endpoint}", cred_path=cred_path, config_path=config_path)

checkpoint_read_uri = "{checkpoint_path}"
checkpoint_write_uri = "{checkpoint_path}"

with checkpoint.reader(checkpoint_read_uri) as reader:
    state_dict = torch.load(reader)
    model.load_state_dict(state_dict)

with checkpoint.writer(checkpoint_write_uri) as writer:
    torch.save(model.state_dict(), writer)

Full example

The following example combines dataset loading and checkpoint management in a complete training loop.

from osstorchconnector import OssMapDataset, OssCheckpoint
import torchvision.transforms as transforms
import accimage
import torchvision.models as models
import torch

cred_path = "/mnt/.alibabacloud/credentials"  # Default path after configuring a DLC RAM role.
config_path = "config.json"

checkpoint = OssCheckpoint(endpoint="{oss_endpoint}", cred_path=cred_path, config_path=config_path)
model = models.__dict__["resnet18"]()

epochs = 100
checkpoint_read_uri = "{checkpoint_path}"
checkpoint_write_uri = "{checkpoint_path}"

# Load an existing checkpoint.
with checkpoint.reader(checkpoint_read_uri) as reader:
    state_dict = torch.load(reader)
    model.load_state_dict(state_dict)


def read_and_transform(data):
    normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                     std=[0.229, 0.224, 0.225])
    transform = transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        normalize,
    ])

    try:
        img = accimage.Image((data.read()))
        value = transform(img)
    except Exception as e:
        print("read failed", e)
        return None, 0
    return value, 0

# Build a dataset from OSS — no local download or mount required.
dataset = OssMapDataset.from_prefix(
    "{oss_data_folder_uri}",
    endpoint="{oss_endpoint}",
    transform=read_and_transform,
    cred_path=cred_path,
    config_path=config_path
)
data_loader = torch.utils.data.DataLoader(
    dataset, batch_size="{batch_size}", num_workers="{num_workers}", pin_memory=True
)

for epoch in range(epochs):
    for step, (images, target) in enumerate(data_loader):
        # Batch processing and model training
        pass
    # Save checkpoint after each epoch.
    with checkpoint.writer(checkpoint_write_uri) as writer:
        torch.save(model.state_dict(), writer)

OSS SDK

Use the OSS Python API (oss2) when you need temporary or programmatic access to OSS data without mounting — for example, to apply custom business logic when selecting which data to load.

Prerequisites

Before you begin, make sure you have:

Read and write OSS data

# -*- coding: utf-8 -*-
import oss2
from oss2.credentials import EnvironmentVariableCredentialsProvider

# Load credentials from the OSS_ACCESS_KEY_ID and OSS_ACCESS_KEY_SECRET environment variables.
auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider())
bucket = oss2.Bucket(auth, '<endpoint>', '<your_bucket_name>')

# Read a complete file.
result = bucket.get_object('<your_file_path/your_file>')
print(result.read())

# Read a byte range.
result = bucket.get_object('<your_file_path/your_file>', byte_range=(0, 99))

# Write data.
bucket.put_object('<your_file_path/your_file>', '<your_object_content>')

# Append to an appendable file.
result = bucket.append_object('<your_file_path/your_file>', 0, '<your_object_content>')
result = bucket.append_object('<your_file_path/your_file>', result.next_position, '<your_object_content>')

Replace the following placeholders:

PlaceholderDescriptionExample
<endpoint>Endpoint for the region where the bucket resides.https://oss-cn-hangzhou.aliyuncs.com. For the full list, see Regions and endpoints.
<your_bucket_name>Name of the OSS bucket.my-training-data
<your_file_path/your_file>Full object path, excluding the bucket name.testfolder/exampleobject.txt
<your_object_content>Content to write or append.

Load training data with a custom dataset

Store data in an OSS bucket with an index file that maps file paths to labels. Use the DataLoader API to read data in parallel across multiple processes.

import io
import oss2
from oss2.credentials import EnvironmentVariableCredentialsProvider
import PIL
import torch

class OSSDataset(torch.utils.data.dataset.Dataset):
    def __init__(self, endpoint, bucket, auth, index_file):
        self._bucket = oss2.Bucket(auth, endpoint, bucket)
        # Index file format: each sample is "path:label", samples separated by commas.
        self._indices = self._bucket.get_object(index_file).read().split(',')

    def __len__(self):
        return len(self._indices)

    def __getitem__(self, index):
        img_path, label = self._indices(index).strip().split(':')
        img_str = self._bucket.get_object(img_path)
        img_buf = io.BytesIO()
        img_buf.write(img_str.read())
        img_buf.seek(0)
        img = Image.open(img_buf).convert('RGB')
        img_buf.close()
        return img, label


# Load credentials from environment variables.
auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider())
dataset = OSSDataset(endpoint, bucket, auth, index_file)
data_loader = torch.utils.data.DataLoader(
    dataset,
    batch_size=batch_size,
    num_workers=num_loaders,
    pin_memory=True
)
ParameterDescription
endpointEndpoint for the region where the bucket resides. For example, https://oss-cn-hangzhou.aliyuncs.com. See Regions and endpoints.
bucketBucket name.
index_filePath to the index file. Each sample uses the format path:label, separated by commas.

Save and load PyTorch models

For more details on PyTorch model serialization, see PyTorch — saving and loading models.

Save a model:

from io import BytesIO
import torch
import oss2
from oss2.credentials import EnvironmentVariableCredentialsProvider

auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider())
bucket_name = "<your_bucket_name>"  # Do not prefix with oss://
bucket = oss2.Bucket(auth, endpoint, bucket_name)
buffer = BytesIO()
torch.save(model.state_dict(), buffer)
bucket.put_object("<your_model_path>", buffer.getvalue())

Load a model:

from io import BytesIO
import torch
import oss2
from oss2.credentials import EnvironmentVariableCredentialsProvider

auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider())
bucket_name = "<your_bucket_name>"  # Do not prefix with oss://
bucket = oss2.Bucket(auth, endpoint, bucket_name)
buffer = BytesIO(bucket.get_object("<your_model_path>").read())
model.load_state_dict(torch.load(buffer))

Replace <your_bucket_name> with the bucket name (without the oss:// prefix) and <your_model_path> with the model object path.