OSS Connector untuk AI/ML mengintegrasikan Alibaba Cloud Object Storage Service (OSS) langsung ke dalam pipeline pelatihan PyTorch. Fitur ini memungkinkan Anda mengalirkan data pelatihan dari bucket OSS dan menyimpan checkpoint model langsung ke OSS—tanpa perlu mengelola ruang disk lokal atau menulis custom data loader.
Prasyarat
Sebelum memulai, pastikan Anda telah memenuhi persyaratan berikut:
Mesin atau kontainer Linux x86-64 (glibc >= 2.17)
Python 3.8–3.12
PyTorch >= 2.0
Bucket OSS yang berisi data pelatihan Anda
ID AccessKey dan Rahasia AccessKey untuk pengguna Resource Access Management (RAM) dengan akses baca/tulis ke bucket tersebut
Untuk menggunakan fitur OSS Checkpoint, kernel Linux harus mendukung userfaultfd. Jalankan perintah berikut untuk memeriksa:
sudo grep CONFIG_USERFAULTFD /boot/config-$(uname -r)CONFIG_USERFAULTFD=y berarti fitur tersebut didukung. CONFIG_USERFAULTFD=n berarti tidak didukung, sehingga fitur OSS Checkpoint tidak tersedia.
Instal connector
Contoh berikut menginstal OSS Connector untuk AI/ML pada Python 3.12. Ganti pip3.12 dengan versi pip yang sesuai dengan instalasi Python Anda.
Instal paket:
pip3.12 install osstorchconnectorVerifikasi instalasi:
pip3.12 show osstorchconnectorJika informasi versi
osstorchconnectorditampilkan, instalasi berhasil.
Konfigurasikan kredensial akses
OSS Connector untuk AI/ML membaca kredensial dari file JSON di /root/.alibabacloud/credentials.
Buat file kredensial:
mkdir -p /root/.alibabacloud && touch /root/.alibabacloud/credentialsTambahkan ID AccessKey dan Rahasia AccessKey Anda:
{ "AccessKeyId": "LTAI************************", "AccessKeySecret": "At32************************" }Ganti placeholder dengan nilai aktual untuk pengguna RAM Anda. Untuk membuat pasangan Kunci Akses, lihat Create an AccessKey. Untuk kredensial sementara dan opsi kredensial lainnya, lihat Configure access credentials.
Konfigurasikan connector
OSS Connector untuk AI/ML menggunakan file konfigurasi di /etc/oss-connector/config.json untuk mengatur logging, pra-ambil dataset, dan konkurensi unggah checkpoint.
Buat file konfigurasi:
mkdir -p /etc/oss-connector/ && touch /etc/oss-connector/config.jsonTambahkan konfigurasi default berikut:
{ "logLevel": 1, "logPath": "/var/log/oss-connector/connector.log", "auditPath": "/var/log/oss-connector/audit.log", "datasetConfig": { "prefetchConcurrency": 24, "prefetchWorker": 2 }, "checkpointConfig": { "prefetchConcurrency": 24, "prefetchWorker": 4, "uploadConcurrency": 64 } }Pengaturan default ini cocok untuk sebagian besar beban kerja pelatihan. Untuk daftar lengkap parameter konfigurasi, lihat Configure OSS Connector.
Muat dataset dari OSS
OssMapDataset memetakan awalan OSS ke dataset bergaya map PyTorch, mendukung akses acak berdasarkan indeks. Hal ini menjadikannya pengganti langsung untuk objek Dataset lokal.
from osstorchconnector import OssMapDataset
from torch.utils.data import DataLoader
ENDPOINT = "oss-cn-hangzhou-internal.aliyuncs.com" # Titik akhir internal; ECS dan bucket harus berada di wilayah yang sama.
OSS_URI = "oss://<your-bucket>/mnist/" # URI OSS dari awalan dataset Anda.
CRED_PATH = "/root/.alibabacloud/credentials" # Path ke file kredensial Anda.
CONFIG_PATH = "/etc/oss-connector/config.json" # Path ke file konfigurasi connector.
dataset = OssMapDataset.from_prefix(
OSS_URI,
endpoint=ENDPOINT,
cred_path=CRED_PATH,
config_path=CONFIG_PATH,
)
# Periksa item pertama.
item = dataset[0]
content = item.read()Berikan dataset ke DataLoader seperti biasa:
loader = DataLoader(dataset, batch_size=64, shuffle=True)Untuk API lengkap OssMapDataset, lihat OssMapDataset.
Simpan dan muat checkpoint dengan OSS Checkpoint
Menulis checkpoint langsung ke OSS menghilangkan kebutuhan akan ruang disk lokal selama pelatihan, menjaga ketahanan dan aksesibilitas checkpoint di berbagai mesin, serta menghindari langkah tambahan mengunggah file lokal setelah setiap epoch.
from osstorchconnector import OssCheckpoint
import torch
ENDPOINT = "oss-cn-hangzhou-internal.aliyuncs.com"
CHECKPOINT_WRITE_URI = "oss://<your-bucket>/epoch.ckpt" # Path OSS untuk menulis checkpoint.
CHECKPOINT_READ_URI = "oss://<your-bucket>/epoch.ckpt" # Path OSS untuk membaca checkpoint.
CRED_PATH = "/root/.alibabacloud/credentials"
CONFIG_PATH = "/etc/oss-connector/config.json"
checkpoint = OssCheckpoint(
endpoint=ENDPOINT,
cred_path=CRED_PATH,
config_path=CONFIG_PATH,
)
# Simpan checkpoint.
with checkpoint.writer(CHECKPOINT_WRITE_URI) as writer:
torch.save(model.state_dict(), writer)
# Muat checkpoint.
with checkpoint.reader(CHECKPOINT_READ_URI) as reader:
state_dict = torch.load(reader)
model.load_state_dict(state_dict)Untuk API lengkap OssCheckpoint, lihat OssCheckpoint.
Contoh end-to-end: latih CNN pada MNIST
Contoh berikut membangun model pengenalan tulisan tangan menggunakan PyTorch. Dataset MNIST dimuat dari OSS dengan OssMapDataset, sedangkan checkpoint model disimpan dan dipulihkan dengan OssCheckpoint.
import io
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from PIL import Image
from torch.utils.data import DataLoader
from osstorchconnector import OssMapDataset
from osstorchconnector import OssCheckpoint
# Hiperparameter.
EPOCHS = 1
BATCH_SIZE = 64
LEARNING_RATE = 0.001
# Konfigurasi OSS.
# Ganti placeholder dengan nama bucket dan titik akhir wilayah Anda yang sebenarnya.
ENDPOINT = "oss-cn-hangzhou-internal.aliyuncs.com" # Titik akhir internal; ECS dan bucket harus berada di wilayah yang sama.
OSS_URI = "oss://<your-bucket>/mnist/" # URI OSS dari awalan dataset.
CHECKPOINT_WRITE_URI = "oss://<your-bucket>/epoch.ckpt" # Path OSS untuk menulis checkpoint.
CHECKPOINT_READ_URI = "oss://<your-bucket>/epoch.ckpt" # Path OSS untuk membaca checkpoint.
CONFIG_PATH = "/etc/oss-connector/config.json" # Path ke file konfigurasi connector.
CRED_PATH = "/root/.alibabacloud/credentials" # Path ke file kredensial.
# Inisialisasi objek checkpoint.
checkpoint = OssCheckpoint(endpoint=ENDPOINT, cred_path=CRED_PATH, config_path=CONFIG_PATH)
# Definisikan jaringan saraf konvolusional (CNN) sederhana.
class SimpleCNN(nn.Module):
def __init__(self):
super(SimpleCNN, self).__init__()
self.conv1 = nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1)
self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
self.adaptive_pool = nn.AdaptiveAvgPool2d((7, 7))
self.fc1 = nn.Linear(64 * 7 * 7, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = nn.ReLU()(self.conv1(x))
x = nn.MaxPool2d(2)(x)
x = nn.ReLU()(self.conv2(x))
x = nn.MaxPool2d(2)(x)
x = self.adaptive_pool(x)
x = x.view(x.size(0), -1)
x = nn.ReLU()(self.fc1(x))
x = self.fc2(x)
return x
# Pipeline pra-pemrosesan gambar.
trans = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.5], std=[0.5]),
])
def transform(object):
try:
img = Image.open(io.BytesIO(object.read())).convert('L')
val = trans(img)
except Exception as e:
raise e
# Ekstrak label dari path objek.
# Format path yang diharapkan: oss://<bucket>/mnist/<label>/<filename>
try:
label = int(object.name.split('/')[-2])
except (ValueError, IndexError):
label = 0 # Label default; sesuaikan berdasarkan struktur dataset Anda.
return val, torch.tensor(label)
# Muat dataset dan buat DataLoader.
train_dataset = OssMapDataset.from_prefix(
OSS_URI,
endpoint=ENDPOINT,
transform=transform,
cred_path=CRED_PATH,
config_path=CONFIG_PATH,
)
train_loader = DataLoader(
train_dataset,
batch_size=BATCH_SIZE,
num_workers=32,
prefetch_factor=2,
shuffle=True,
)
# Siapkan model, fungsi loss, dan pengoptimal.
model = SimpleCNN()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
# Latih dan simpan checkpoint setelah setiap epoch.
for epoch in range(EPOCHS):
for i, (images, labels) in enumerate(train_loader):
optimizer.zero_grad()
outputs = model(images)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
if (i + 1) % 100 == 0:
print(f'Epoch [{epoch + 1}/{EPOCHS}], Step [{i + 1}/{len(train_loader)}], Loss: {loss.item():.4f}')
with checkpoint.writer(CHECKPOINT_WRITE_URI) as writer:
torch.save(model.state_dict(), writer)
print("Checkpoint saved.")
# Pulihkan model dari checkpoint.
try:
with checkpoint.reader(CHECKPOINT_READ_URI) as reader:
state_dict = torch.load(reader)
model = SimpleCNN()
model.load_state_dict(state_dict)
model.eval()
print("Checkpoint loaded successfully.")
except Exception as e:
print(f"Failed to load checkpoint: {e}")Langkah selanjutnya
Referensi API OssMapDataset — daftar parameter lengkap dan opsi dataset iterabel
Referensi API OssCheckpoint — konfigurasi checkpoint lanjutan
Konfigurasikan OSS Connector — sesuaikan konkurensi pra-ambil dan pengaturan unggah sesuai beban kerja Anda
Konfigurasikan kredensial akses — gunakan kredensial sementara atau peran RAM instans ECS