This topic describes how to use OSS Connector for AI/ML to efficiently create and train data models.
Deployment environment
Operating system: Linux x86-64
glibc: >=2.17
Python: 3.8-3.12
PyTorch: >=2.0
To use the OSS Checkpoint feature, the Linux kernel must support userfaultfd.
NoteFor example, on an Ubuntu system, you can run the
sudo grep CONFIG_USERFAULTFD /boot/config-$(uname -r)command to check if the Linux kernel supports userfaultfd. If the command returnsCONFIG_USERFAULTFD=y, the kernel supports it. If the command returnsCONFIG_USERFAULTFD=n, the kernel does not support it, and you cannot use the OSS Checkpoint feature.
Quick installation
The following example describes how to install OSS Connector for AI/ML for Python 3.12.
On a Linux operating system or in a container created from a Linux-based image, you can run the
pip3.12 install osstorchconnectorcommand to install OSS Connector for AI/ML.pip3.12 install osstorchconnectorYou can run the
pip3.12 show osstorchconnectorcommand to check if the installation was successful.pip3.12 show osstorchconnectorIf version information for osstorchconnector is returned, the installation is successful.

Configuration
You can create a configuration file for access credentials.
mkdir -p /root/.alibabacloud && touch /root/.alibabacloud/credentialsYou can add the access credential configuration and save the file.
Replace
<Access-key-id>and<Access-key-secret>in the example with the AccessKey ID and AccessKey secret of a Resource Access Management (RAM) user. For more information about how to create an AccessKey ID and an AccessKey secret, see Create an AccessKey. For more information about configuration items and how to use temporary access credentials, see Configure access credentials.{ "AccessKeyId": "LTAI************************", "AccessKeySecret": "At32************************" }You can create the OSS Connector configuration file.
mkdir -p /etc/oss-connector/ && touch /etc/oss-connector/config.jsonYou can add the OSS Connector configuration and save the file. For more information about the configuration items, see Configure OSS Connector.
In most cases, you can use the following default configurations.
{ "logLevel": 1, "logPath": "/var/log/oss-connector/connector.log", "auditPath": "/var/log/oss-connector/audit.log", "datasetConfig": { "prefetchConcurrency": 24, "prefetchWorker": 2 }, "checkpointConfig": { "prefetchConcurrency": 24, "prefetchWorker": 4, "uploadConcurrency": 64 } }
Example
The following example shows how to create a handwriting recognition model using PyTorch. The model uses the MNIST dataset built with OssMapDataset and saves and loads model checkpoints using OssCheckpoint.
import io
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from PIL import Image
from torch.utils.data import DataLoader
from osstorchconnector import OssMapDataset
from osstorchconnector import OssCheckpoint
# Define hyperparameters.
EPOCHS = 1
BATCH_SIZE = 64
LEARNING_RATE = 0.001
CHECKPOINT_READ_URI = "oss://you_bucketname/epoch.ckpt" # The URI to read the checkpoint from OSS.
CHECKPOINT_WRITE_URI = "oss://you_bucketname/epoch.ckpt" # The URI to save the checkpoint to OSS.
ENDPOINT = "oss-cn-hangzhou-internal.aliyuncs.com" # The internal endpoint to access OSS. To use this endpoint, the ECS instance and the OSS bucket must be in the same region.
CONFIG_PATH = "/etc/oss-connector/config.json" # The path to the OSS Connector configuration file.
CRED_PATH = "/root/.alibabacloud/credentials" # The path to the access credential configuration file.
OSS_URI = "oss://you_bucketname/mnist/" # The URI of the bucket resource in OSS.
# Create an OssCheckpoint object to save checkpoints to and read checkpoints from OSS during training.
checkpoint = OssCheckpoint(endpoint=ENDPOINT, cred_path=CRED_PATH, config_path=CONFIG_PATH)
# Define a simple convolutional neural network.
class SimpleCNN(nn.Module):
def __init__(self):
super(SimpleCNN, self).__init__()
self.conv1 = nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1)
self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
# Use adaptive pooling to simplify size processing.
self.adaptive_pool = nn.AdaptiveAvgPool2d((7, 7))
self.fc1 = nn.Linear(64 * 7 * 7, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = nn.ReLU()(self.conv1(x))
x = nn.MaxPool2d(2)(x)
x = nn.ReLU()(self.conv2(x))
x = nn.MaxPool2d(2)(x)
x = self.adaptive_pool(x)
x = x.view(x.size(0), -1)
x = nn.ReLU()(self.fc1(x))
x = self.fc2(x)
return x
# Pre-process the data.
trans = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.5], std=[0.5])
])
def transform(object):
try:
img = Image.open(io.BytesIO(object.read())).convert('L')
val = trans(img)
except Exception as e:
raise e
# Extract the label from the object path. Assume the path format is oss://bucket/mnist/label/filename.
# Adjust the label extraction logic based on the actual dataset structure.
try:
label = int(object.name.split('/')[-2]) # Extract the second-to-last path segment as the label.
except (ValueError, IndexError):
label = 0 # Default label. Adjust this based on your dataset structure.
return val, torch.tensor(label)
# Load the OssMapDataset dataset.
train_dataset = OssMapDataset.from_prefix(OSS_URI, endpoint=ENDPOINT, transform=transform, cred_path=CRED_PATH, config_path=CONFIG_PATH)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, num_workers=32, prefetch_factor=2, shuffle=True)
# Initialize the model, loss function, and optimizer.
model = SimpleCNN()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
# Train the model.
for epoch in range(EPOCHS):
for i, (images, labels) in enumerate(train_loader):
optimizer.zero_grad()
outputs = model(images)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
if (i + 1) % 100 == 0:
print(f'Epoch [{epoch + 1}/{EPOCHS}], Step [{i + 1}/{len(train_loader)}], Loss: {loss.item():.4f}')
# Save the checkpoint using the OssCheckpoint object.
with checkpoint.writer(CHECKPOINT_WRITE_URI) as writer:
torch.save(model.state_dict(), writer)
print("-------------------------")
print("Checkpoint saved")
print(model.state_dict())
# Read the checkpoint using the OssCheckpoint object.
try:
with checkpoint.reader(CHECKPOINT_READ_URI) as reader:
state_dict = torch.load(reader)
# Load the model.
model = SimpleCNN()
model.load_state_dict(state_dict)
model.eval()
print("Checkpoint loaded successfully")
except Exception as e:
print(f"Failed to load checkpoint: {e}")
# You can choose to train from scratch or use another checkpoint.