本文將為您詳細介紹如何快速使用OSS Connector for AI/ML來高效配合資料模型的建立以及訓練工作。
部署環境
作業系統:Linux x86-64
glibc:>=2.17
Python:3.8-3.12
PyTorch: >=2.0
使用OSS Checkpoint功能需Linux核心支援userfaultfd
說明以Ubuntu系統為例,您可以執行
sudo grep CONFIG_USERFAULTFD /boot/config-$(uname -r)命令確認Linux是否支援userfaultfd,當返回結果中顯示CONFIG_USERFAULTFD=y時,則表示核心支援。返回結果顯示CONFIG_USERFAULTFD=n時,則表示核心不支援,即無法使用OSS Checkpoint功能。
快速安裝
以下內容為Python3.12版本安裝OSS Connector for AI/ML樣本:
在Linux作業系統或基於Linux作業系統構建鏡像所產生容器空間內,執行
pip3.12 install osstorchconnector命令安裝OSS Connector for AI/ML。pip3.12 install osstorchconnector執行
pip3.12 show osstorchconnector命令查看是否安裝成功。pip3.12 show osstorchconnector當返回結果中顯示osstorchconnector的版本資訊時表示OSS Connector for AI/ML安裝成功。

配置
建立訪問憑證設定檔。
mkdir -p /root/.alibabacloud && touch /root/.alibabacloud/credentials添加訪問憑證配置並儲存。
樣本中的
<Access-key-id>、<Access-key-secret>請分別替換為RAM使用者的AccessKey ID、AccessKeySecret。關於如何建立AccessKey ID和AccessKeySecret,請參見建立AccessKey,配置項說明以及使用臨時訪問憑證配置請參見配置訪問憑證。{ "AccessKeyId": "LTAI************************", "AccessKeySecret": "At32************************" }建立OSS Connector設定檔。
mkdir -p /etc/oss-connector/ && touch /etc/oss-connector/config.json添加OSS Connector相關配置並儲存。配置項說明請參見配置OSS Connector。
正常情況下使用以下預設配置即可。
{ "logLevel": 1, "logPath": "/var/log/oss-connector/connector.log", "auditPath": "/var/log/oss-connector/audit.log", "datasetConfig": { "prefetchConcurrency": 24, "prefetchWorker": 2 }, "checkpointConfig": { "prefetchConcurrency": 24, "prefetchWorker": 4, "uploadConcurrency": 64 } }
樣本
以下樣本旨在使用PyTorch建立一個手寫數字識別模型。該模型使用由OssMapDataset構建的MNIST資料集,同時藉助OssCheckpoint來實現模型檢查點的儲存和載入。
import io
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from PIL import Image
from torch.utils.data import DataLoader
from osstorchconnector import OssMapDataset
from osstorchconnector import OssCheckpoint
# 定義超參數。
EPOCHS = 1
BATCH_SIZE = 64
LEARNING_RATE = 0.001
CHECKPOINT_READ_URI = "oss://you_bucketname/epoch.ckpt" # 讀取OSS中檢查點的地址。
CHECKPOINT_WRITE_URI = "oss://you_bucketname/epoch.ckpt" # 儲存檢查點到OSS的地址。
ENDPOINT = "oss-cn-hangzhou-internal.aliyuncs.com" # 訪問OSS的內網地區節點地址,使用此地址需ECS執行個體與OSS執行個體處於同一地區。
CONFIG_PATH = "/etc/oss-connector/config.json" # OSS Connector設定檔路徑。
CRED_PATH = "/root/.alibabacloud/credentials" # 訪問憑證設定檔路徑。
OSS_URI = "oss://you_bucketname/mnist/" # OSS中Bucket資源路徑地址。
# 建立OssCheckpoint對象,用於訓練過程中將檢查點儲存到OSS以及從OSS中讀取檢查點。
checkpoint = OssCheckpoint(endpoint=ENDPOINT, cred_path=CRED_PATH, config_path=CONFIG_PATH)
# 定義簡單的卷積神經網路。
class SimpleCNN(nn.Module):
def __init__(self):
super(SimpleCNN, self).__init__()
self.conv1 = nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1)
self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
# 使用自適應池化簡化尺寸處理
self.adaptive_pool = nn.AdaptiveAvgPool2d((7, 7))
self.fc1 = nn.Linear(64 * 7 * 7, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = nn.ReLU()(self.conv1(x))
x = nn.MaxPool2d(2)(x)
x = nn.ReLU()(self.conv2(x))
x = nn.MaxPool2d(2)(x)
x = self.adaptive_pool(x)
x = x.view(x.size(0), -1)
x = nn.ReLU()(self.fc1(x))
x = self.fc2(x)
return x
# 資料預先處理。
trans = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.5], std=[0.5])
])
def transform(object):
try:
img = Image.open(io.BytesIO(object.read())).convert('L')
val = trans(img)
except Exception as e:
raise e
# 從物件路徑中提取標籤,假設路徑格式為 oss://bucket/mnist/label/filename
# 根據實際資料集組織圖調整標籤提取邏輯
try:
label = int(object.name.split('/')[-2]) # 提取倒數第二個路徑段作為標籤
except (ValueError, IndexError):
label = 0 # 預設標籤,實際使用時需根據資料集結構調整
return val, torch.tensor(label)
# 載入OssMapDataset資料集。
train_dataset = OssMapDataset.from_prefix(OSS_URI, endpoint=ENDPOINT, transform=transform, cred_path=CRED_PATH, config_path=CONFIG_PATH)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, num_workers=32, prefetch_factor=2, shuffle=True)
# 初始化模型、損失函數與最佳化器。
model = SimpleCNN()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
# 訓練模型。
for epoch in range(EPOCHS):
for i, (images, labels) in enumerate(train_loader):
optimizer.zero_grad()
outputs = model(images)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
if (i + 1) % 100 == 0:
print(f'Epoch [{epoch + 1}/{EPOCHS}], Step [{i + 1}/{len(train_loader)}], Loss: {loss.item():.4f}')
# 使用OssCheckpoint對象儲存檢查點。
with checkpoint.writer(CHECKPOINT_WRITE_URI) as writer:
torch.save(model.state_dict(), writer)
print("-------------------------")
print("檢查點已儲存")
print(model.state_dict())
# 使用OssCheckpoint對象讀取檢查點。
try:
with checkpoint.reader(CHECKPOINT_READ_URI) as reader:
state_dict = torch.load(reader)
# 載入模型。
model = SimpleCNN()
model.load_state_dict(state_dict)
model.eval()
print("檢查點載入成功")
except Exception as e:
print(f"檢查點載入失敗: {e}")
# 可以選擇從頭開始訓練或使用其他檢查點