本文介紹如何利用Ray與LLaMA Factory結合的技術方案實現高效的映像打標。
背景資訊
某遊戲社區情境,旨在為玩家和開發人員提供遊戲分發與互動服務。針對玩家在遊戲中頻繁遇到的攻略不匹配、上下文缺失等問題,探索引入AI構建“遊戲陪玩助手”,旨在通過識別玩家當前遊戲狀態,結合站內內容提供相應建議。
現有方案基於LLaMA-Factory進行SFT和CPT訓練,並藉助VLLM或阿里雲百鍊進行推理,但同時依賴大量人工標註的映像資料以支援視覺理解。
在這一背景下,以ADB Ray為中心,與Lance進行整合,利用RayData提升分布式圖文資料處理效率和結構化能力;同時整合LLaMA-Factory,通過Ray實現對Qwen-VL多模態模型的分布式微調。
方案優勢
使用RayData實現任意格式來源資料的高效載入與轉換,並將其統一儲存為Lance格式。
Lance支援映像二進位和結構化資料的整合儲存,提供更優的資料一致性和版本控制,從而減少遠程IO。
結合Ray實現Lance分布式資料打標和累加式更新(新增列),與Parquet資料相比,速度提升193%。
方案流程
準備工作
操作步驟
步驟一:準備資料並將其寫入Lance
根據實際環境,替換以下代碼中的相關配置並執行,載入model scope資料集並下載圖片位元據,然後使用RayData進行資料格式處理,處理完成後將資料寫入Lance。
from modelscope.msdatasets import MsDataset
import lance
import pyarrow as pa
import pandas as pd
import os
import json
import uuid
import shutil
from tqdm import tqdm
import time
import requests
from PIL import Image
import io
import numpy as np
import ray
import pyarrow.compute as pc
# 配置參數
OUTPUT_DIR = "/home/ray/binary_data"
NUM_SAMPLES = 10000
FIXED_USER_PROMPT = "請描述下述圖片中發生了什麼<image>"
HEADERS = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'}
def prepare_output_directory():
"""準備輸出目錄結構"""
if os.path.exists(OUTPUT_DIR):
shutil.rmtree(OUTPUT_DIR)
os.makedirs(OUTPUT_DIR)
print(f"輸出目錄已建立: {OUTPUT_DIR}")
def download_image_binary(image_url, max_retries=3):
"""從URL下載影像檔,返回圖片位元據"""
for attempt in range(max_retries):
try:
# 下載映像資料
response = requests.get(image_url, headers=HEADERS, timeout=10)
response.raise_for_status()
# 檢查內容是否為有效映像
try:
image = Image.open(io.BytesIO(response.content))
image.verify() # 驗證是否為有效映像
return response.content
except (IOError, SyntaxError) as e:
raise ValueError(f"下載的映像無效: {str(e)}")
except (requests.RequestException, ValueError) as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt
print(f"下載失敗 (嘗試 {attempt+1}/{max_retries}), 將在 {wait_time} 秒後重試: {str(e)}")
time.sleep(wait_time)
else:
# 建立替代映像位元據
print(f"無法下載映像,建立替代映像: {image_url}")
img = Image.new('RGB', (256, 256), color=(122, 122, 122))
buffer = io.BytesIO()
img.save(buffer, format='JPEG')
return buffer.getvalue()
def parse_global_caption(sample):
"""從樣本中提取全域描述文本"""
# 嘗試從不同位置擷取global_caption
global_caption = None
# 首先嘗試直接從頂級欄位擷取
if 'global_caption' in sample:
global_caption = sample['global_caption']
# 然後嘗試在cap_seg欄位中尋找
elif 'cap_seg' in sample:
cap_seg = sample['cap_seg']
# 處理JSON字串格式
if isinstance(cap_seg, str):
try:
cap_seg = json.loads(cap_seg)
except json.JSONDecodeError:
# 可能是帶單引號的字串
try:
cap_seg = json.loads(cap_seg.replace("'", '"'))
except:
cap_seg = {}
# 處理字典格式
if isinstance(cap_seg, dict) and 'global_caption' in cap_seg:
global_caption = cap_seg['global_caption']
# 回退方案:使用預設描述
if not global_caption or not isinstance(global_caption, str):
print(f"未找到有效global_caption,使用預設描述")
global_caption = "這是一張圖片"
return global_caption.strip()
def convert_samples(samples_df):
"""轉換單個批次的樣本為目標格式"""
results = []
for _, row in samples_df.iterrows():
sample = row.to_dict()
index = sample.get("__index_level_0__", "") # 如果有索引列則擷取
# 產生唯一ID
image_id = str(uuid.uuid4())
# 擷取映像URL並下載
image_url = sample.get("opensource_url", "")
if not image_url:
print(f"缺少opensource_url,使用預設圖片")
image_url = "https://modelscope.cn-beijing.oss.aliyuncs.com/open_data/sa-1b-cot-qwen/default.jpg"
# 下載映像位元據
image_binary = download_image_binary(image_url)
# 構建對話結構
result = {
"id": image_id,
"messages": [
{"content": FIXED_USER_PROMPT, "role": "user"},
{"content": parse_global_caption(sample), "role": "assistant"},
],
"images": [image_binary], # 直接儲存二進位圖片資料
"image_url": image_url, # 保留原始URL用於調試
}
results.append(result)
return pd.DataFrame(results)
def save_with_ray_lance(processed_ray_dataset):
"""使用 Ray 的 write_lance 方法儲存為 Lance 格式"""
print("正在使用 Ray 的 write_lance 方法儲存資料...")
lance_output_path = os.path.join(OUTPUT_DIR, "original_data.lance")
# 寫入 Lance 格式
processed_ray_dataset.write_lance(lance_output_path)
print(f"資料已成功寫入: {lance_output_path}")
def main():
# 設定資料集資訊
dataset_name = "Tongyi-DataEngine/SA1B-Paired-Captions-Images"
# 準備輸出目錄
prepare_output_directory()
# 下載資料集
print(f"正在下載資料集: {dataset_name} (前 {NUM_SAMPLES} 個樣本)")
ms_dataset = MsDataset.load(dataset_name, split="train")
ray_dataset = ray.data.from_huggingface(ms_dataset).limit(NUM_SAMPLES)
ray_dataset = ray_dataset.repartition(100)
# 處理樣本
print("開始處理樣本...")
processed_ray_dataset = ray_dataset.map_batches(
convert_samples,
batch_format="pandas",
num_cpus=1,
concurrency=100,
batch_size=32,
)
# 儲存轉換後的樣本
save_with_ray_lance(processed_ray_dataset)
# 添加中繼資料
metadata = {
"dataset": dataset_name,
"num_samples": NUM_SAMPLES,
"conversion_date": pd.Timestamp.now().isoformat(),
"conversion_format": "SA1B轉多模態對話(二進位圖片儲存)",
"message_structure": [
{"role": "user", "content": "固定提示詞<image>"},
{"role": "assistant", "content": "映像描述"},
],
}
with open(os.path.join(OUTPUT_DIR, "metadata.json"), "w", encoding="utf-8") as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
print("\n 轉換完成!")
return 0
if __name__ == '__main__':
main()
步驟二:Lance增量打標
使用Ray Serve部署score模型,執行如下代碼,部署打分服務。
""" ray serve,一個持續運行中的打分服務,供應資料預先處理使用 demo中score函數為random,生產中可以替換為LLM模型或者其他打分器 """ import ray from ray import serve from ray.data import Dataset import pandas as pd import pyarrow as pa import logging import random # 設定日誌 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # 初始化Ray ray.init(address="ray://127.0.0.1:10001") # 配置Ray Serve @serve.deployment() class ScoringModel: def __init__(self): # 在這裡載入您的模型 # demo中為空白 pass def score(self,data_batch) : results = [] print(f"Scoring {len(data_batch)} items") print(data_batch[0]) for item in data_batch: # 調用實際評分模型 (這裡使用隨機值類比) score = random.randint(60,100) item["score"] = score results.append(item) return results # 部署模型服務 model_deployment = ScoringModel.bind() serve.run(model_deployment, name="scoring_model")執行如下代碼,對Lance資料增量打標。
import pyarrow as pa from pathlib import Path import lance import ray import pandas as pd from lance.ray.fragment_api import add_columns import random from ray import serve path = "/nas/lance/binary_data_10w/original_data.lance" # Define label generation logic def generate_labels(batch: pa.RecordBatch) -> pa.RecordBatch: """使用Ray Serve服務對資料批次進行評分,返回帶分數的新RecordBatch""" # 將RecordBatch轉為Pandas DataFrame batch_df = batch.to_pandas() handle = serve.get_app_handle("scoring_model") # 將DataFrame轉換為字典列表(每行一個字典) dict_list = batch_df.to_dict('records') # 非同步呼叫評分服務 scored_data_ref = handle.score.remote(dict_list) scores = scored_data_ref.result() # 擷取評分結果 return pa.RecordBatch.from_arrays([scores], names=["score1"]) def main(): # Add new columns in parallel lance_ds = lance.dataset(path) add_columns( lance_ds, generate_labels, source_columns=["images"], # Input columns needed ) print("資料打標完成") if __name__ == "__main__": main()
步驟三:LLaMA-Factory多模訓練
將
dataset_info中繼資料插入dataset_info.json中,或建立一個dataset_info.json檔案,用於聲明資料目錄和資料欄位的映射關係。
請提交工單聯絡支援人員修改
llama-factory原始碼,以支援Lance資料讀取,並確保僅讀取需要的資料列。使用
llamafactory webui訓練,使用設定的資料集名稱指定資料集。