This guide demonstrates how to build a high-performance, distributed pipeline for preparing and labeling image data for model training. By combining the power of Ray for distributed computing and Lance for efficient data storage, you can significantly accelerate the data preparation stage for training a multimodal model with LLaMA-Factory. This solution is ideal for scenarios like building a "gaming assistant" that requires training a visual model on large-scale, image-based datasets.
Background information
In a game community scenario that aims to provide game distribution and interaction services for players and developers, we need to address issues such as mismatched guides and missing context that players frequently encounter. To solve these problems, we are exploring the introduction of AI to build a "game companion assistant." This assistant aims to identify the player's current game state and provide relevant suggestions based on site content.
The existing solution is based on LLaMA-Factory for Supervised Fine-Tuning (SFT) and Continued Pre-Training (CPT), and uses VLLM or Alibaba Cloud Model Studio for inference. However, it also relies on a large amount of manually annotated image data to support visual understanding.
In this context, with ADB Ray as the center, integration with Lance is implemented to use RayData to improve distributed image and text data processing efficiency and structured capabilities. At the same time, LLaMA-Factory is integrated to achieve distributed fine-tuning of the Qwen-VL multimodal model through Ray.
Solution advantages
You can use RayData to efficiently load and transform source data in any format, and store it uniformly in Lance format.
Lance supports integrated storage of image binary and structured data, providing better data consistency and versioning, thereby reducing remote IO.
Combined with Ray to implement Lance distributed data labeling and incremental updates (adding columns), with a 193% speed improvement compared to Parquet data.
Solution flow
Before you begin
An AnalyticDB for MySQL Enterprise Edition, Basic Edition, or Data Lakehouse Edition cluster is created.
Managed Ray service, and select GPU as the Worker Resource Type.
Submit a ticket to contact technical support for assistance with deploying the LLaMA-Factory framework.
Please Submit a ticket to contact technical support for assistance with attaching a network interface card to ensure that the corresponding VPC can access the Internet.
Procedure
Step 1: Prepare data and write it to Lance
Replace the relevant configurations in the following code according to your actual environment and execute it to load the model scope dataset and download image binary data. Then use RayData for data format processing, and write the processed data to Lance.
from modelscope.msdatasets import MsDataset
import lance
import pyarrow as pa
import pandas as pd
import os
import json
import uuid
import shutil
from tqdm import tqdm
import time
import requests
from PIL import Image
import io
import numpy as np
import ray
import pyarrow.compute as pc
# Configuration parameters
OUTPUT_DIR = "/home/ray/binary_data"
NUM_SAMPLES = 10000
FIXED_USER_PROMPT = "Please describe what is happening in this image<image>"
HEADERS = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'}
def prepare_output_directory():
"""Prepare output directory structure"""
if os.path.exists(OUTPUT_DIR):
shutil.rmtree(OUTPUT_DIR)
os.makedirs(OUTPUT_DIR)
print(f"Output directory created: {OUTPUT_DIR}")
def download_image_binary(image_url, max_retries=3):
"""Download image file from URL, return image binary data"""
for attempt in range(max_retries):
try:
# Download image data
response = requests.get(image_url, headers=HEADERS, timeout=10)
response.raise_for_status()
# Check if content is valid image
try:
image = Image.open(io.BytesIO(response.content))
image.verify() # Verify if it's a valid image
return response.content
except (IOError, SyntaxError) as e:
raise ValueError(f"Downloaded image is invalid: {str(e)}")
except (requests.RequestException, ValueError) as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt
print(f"Download failed (attempt {attempt+1}/{max_retries}), will retry in {wait_time} seconds: {str(e)}")
time.sleep(wait_time)
else:
# Create alternative image binary data
print(f"Unable to download image, creating alternative image: {image_url}")
img = Image.new('RGB', (256, 256), color=(122, 122, 122))
buffer = io.BytesIO()
img.save(buffer, format='JPEG')
return buffer.getvalue()
def parse_global_caption(sample):
"""Extract global description text from sample"""
# Try to get global_caption from different locations
global_caption = None
# First try to get directly from top-level fields
if 'global_caption' in sample:
global_caption = sample['global_caption']
# Then try to find in cap_seg field
elif 'cap_seg' in sample:
cap_seg = sample['cap_seg']
# Handle JSON string format
if isinstance(cap_seg, str):
try:
cap_seg = json.loads(cap_seg)
except json.JSONDecodeError:
# Might be a string with single quotes
try:
cap_seg = json.loads(cap_seg.replace("'", '"'))
except:
cap_seg = {}
# Handle dictionary format
if isinstance(cap_seg, dict) and 'global_caption' in cap_seg:
global_caption = cap_seg['global_caption']
# Fallback: use default description
if not global_caption or not isinstance(global_caption, str):
print(f"No valid global_caption found, using default description")
global_caption = "This is an image"
return global_caption.strip()
def convert_samples(samples_df):
"""Convert a single batch of samples to target format"""
results = []
for _, row in samples_df.iterrows():
sample = row.to_dict()
index = sample.get("__index_level_0__", "") # Get index column if available
# Generate unique ID
image_id = str(uuid.uuid4())
# Get image URL and download
image_url = sample.get("opensource_url", "")
if not image_url:
print(f"Missing opensource_url, using default image")
image_url = "https://modelscope.cn-beijing.oss.aliyuncs.com/open_data/sa-1b-cot-qwen/default.jpg"
# Download image binary data
image_binary = download_image_binary(image_url)
# Build conversation structure
result = {
"id": image_id,
"messages": [
{"content": FIXED_USER_PROMPT, "role": "user"},
{"content": parse_global_caption(sample), "role": "assistant"},
],
"images": [image_binary], # Store binary image data directly
"image_url": image_url, # Keep original URL for debugging
}
results.append(result)
return pd.DataFrame(results)
def save_with_ray_lance(processed_ray_dataset):
"""Save as Lance format using Ray's write_lance method"""
print("Saving data using Ray's write_lance method...")
lance_output_path = os.path.join(OUTPUT_DIR, "original_data.lance")
# Write to Lance format
processed_ray_dataset.write_lance(lance_output_path)
print(f"Data successfully written to: {lance_output_path}")
def main():
# Set dataset information
dataset_name = "Tongyi-DataEngine/SA1B-Paired-Captions-Images"
# Prepare output directory
prepare_output_directory()
# Download dataset
print(f"Downloading dataset: {dataset_name} (first {NUM_SAMPLES} samples)")
ms_dataset = MsDataset.load(dataset_name, split="train")
ray_dataset = ray.data.from_huggingface(ms_dataset).limit(NUM_SAMPLES)
ray_dataset = ray_dataset.repartition(100)
# Process samples
print("Starting to process samples...")
processed_ray_dataset = ray_dataset.map_batches(
convert_samples,
batch_format="pandas",
num_cpus=1,
concurrency=100,
batch_size=32,
)
# Save converted samples
save_with_ray_lance(processed_ray_dataset)
# Add metadata
metadata = {
"dataset": dataset_name,
"num_samples": NUM_SAMPLES,
"conversion_date": pd.Timestamp.now().isoformat(),
"conversion_format": "SA1B to multimodal conversation (binary image storage)",
"message_structure": [
{"role": "user", "content": "Fixed prompt<image>"},
{"role": "assistant", "content": "Image description"},
],
}
with open(os.path.join(OUTPUT_DIR, "metadata.json"), "w", encoding="utf-8") as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
print("\n Conversion completed!")
return 0
if __name__ == '__main__':
main()
Step 2: Label data incrementally with Ray Serve
Use Ray Serve to deploy the score model. Execute the following code to deploy the scoring service.
""" ray serve, a continuously running scoring service for data preprocessing in this demo, the score function is random, in production it can be replaced with an LLM model or other scorer """ import ray from ray import serve from ray.data import Dataset import pandas as pd import pyarrow as pa import logging import random # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize Ray ray.init(address="ray://127.0.0.1:10001") # Configure Ray Serve @serve.deployment() class ScoringModel: def __init__(self): # Load your model here # empty in this demo pass def score(self,data_batch) : results = [] print(f"Scoring {len(data_batch)} items") print(data_batch[0]) for item in data_batch: # Call actual scoring model (using random values here for simulation) score = random.randint(60,100) item["score"] = score results.append(item) return results # Deploy model service model_deployment = ScoringModel.bind() serve.run(model_deployment, name="scoring_model")Execute the following code to perform incremental labeling on Lance data.
import pyarrow as pa from pathlib import Path import lance import ray import pandas as pd from lance.ray.fragment_api import add_columns import random from ray import serve path = "/nas/lance/binary_data_10w/original_data.lance" # Define label generation logic def generate_labels(batch: pa.RecordBatch) -> pa.RecordBatch: """Use Ray Serve service to score data batches, return new RecordBatch with scores""" # Convert RecordBatch to Pandas DataFrame batch_df = batch.to_pandas() handle = serve.get_app_handle("scoring_model") # Convert DataFrame to list of dictionaries (one dictionary per row) dict_list = batch_df.to_dict('records') # Asynchronously call scoring service scored_data_ref = handle.score.remote(dict_list) scores = scored_data_ref.result() # Get scoring results return pa.RecordBatch.from_arrays([scores], names=["score1"]) def main(): # Add new columns in parallel lance_ds = lance.dataset(path) add_columns( lance_ds, generate_labels, source_columns=["images"], # Input columns needed ) print("Data labeling completed") if __name__ == "__main__": main()
Step 3: Fine-tune the model with LLaMA-Factory
Insert the
dataset_infometadata intodataset_info.json, or create a newdataset_info.jsonfile to declare the mapping relationship between data directories and data fields.
Please submit a ticket to contact technical support for modifying the
llama-factorysource code to support Lance data reading and ensure that only the required data columns are read.Use
llamafactory webuifor training, specifying the dataset using the set dataset name.