All Products
Search
Document Center

AnalyticDB:Build an efficient image labeling pipeline with Ray and LLaMA-Factory

Last Updated:Mar 28, 2026

This guide shows how to build a distributed pipeline that auto-labels image data for multimodal model training. The pipeline combines ADB Ray for distributed computing, Lance for columnar image storage, and LLaMA-Factory for Supervised Fine-Tuning (SFT) of the Qwen-VL model — delivering a 193% speed improvement for the labeling stage compared to Parquet-based workflows.

Background

Game platforms face a recurring challenge: players encounter mismatched guides and missing context that disrupts their experience. One solution is a gaming assistant that identifies the player's current game state and surfaces relevant suggestions. Training such an assistant requires a large volume of labeled image data — which is where manual annotation becomes a bottleneck.

The existing approach uses LLaMA-Factory for SFT and Continued Pre-Training (CPT), with vLLM or Alibaba Cloud Model Studio handling inference. The bottleneck is not the training itself, but the data preparation: downloading, transforming, and incrementally labeling thousands of images at scale.

This guide addresses that bottleneck by centering the workflow on ADB Ray, integrating it with Lance for structured image storage and Ray Data for distributed preprocessing. LLaMA-Factory is then connected to this pipeline to fine-tune Qwen-VL on the labeled dataset.

How it works

The pipeline runs in three stages:

  1. Data preparation — Load images from ModelScope, download binary image data, convert to a structured format, and write to Lance using Ray Data.

  2. Incremental labeling — Deploy a scoring model with Ray Serve and use Lance's column-append API to label data without rewriting existing records.

  3. Model fine-tuning — Configure LLaMA-Factory to read Lance data directly and train Qwen-VL using the LLaMA Factory Web UI.

image

Why Lance instead of Parquet?

Parquet works well for structured tabular data but falls short when images are involved. It lacks native support for storing image binary data alongside metadata, which means you either store images separately (adding remote I/O on every read) or serialize them in ways that degrade read performance. Lance solves this by integrating image binary data and structured columns in a single dataset, supporting data versioning, and allowing incremental column additions without rewriting the entire dataset. The result is a 193% speed improvement for the labeling stage compared to Parquet.

Prerequisites

Before you begin, ensure that you have:

  • An AnalyticDB for MySQL cluster (Enterprise Edition, Basic Edition, or Data Lakehouse Edition)

  • A managed Ray service enabled on the cluster, with Worker Resource Type set to GPU

  • LLaMA-Factory deployed — submit a ticket to request deployment assistance

  • A network interface card attached to enable internet access from the VPC — submit a ticket to request this configuration

Step 1: Prepare data and write it to Lance

This step loads the Tongyi-DataEngine/SA1B-Paired-Captions-Images dataset from ModelScope, downloads the image binary data for each record, converts the dataset into a multimodal conversation format, and writes the result to Lance using Ray Data's distributed map_batches.

Update the configuration parameters at the top of the script to match your environment before running it.

from modelscope.msdatasets import MsDataset
import lance
import pyarrow as pa
import pandas as pd
import os
import json
import uuid
import shutil
from tqdm import tqdm
import time
import requests
from PIL import Image
import io
import numpy as np
import ray
import pyarrow.compute as pc

# Configuration parameters
OUTPUT_DIR = "/home/ray/binary_data"
NUM_SAMPLES = 10000  # Set to a smaller value (e.g., 100) to validate quickly before scaling
FIXED_USER_PROMPT = "Please describe what is happening in this image<image>"
HEADERS = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'}

def prepare_output_directory():
    """Prepare output directory structure"""
    if os.path.exists(OUTPUT_DIR):
        shutil.rmtree(OUTPUT_DIR)

    os.makedirs(OUTPUT_DIR)
    print(f"Output directory created: {OUTPUT_DIR}")

def download_image_binary(image_url, max_retries=3):
    """Download image file from URL, return image binary data"""
    for attempt in range(max_retries):
        try:
            # Download image data
            response = requests.get(image_url, headers=HEADERS, timeout=10)
            response.raise_for_status()

            # Check if content is valid image
            try:
                image = Image.open(io.BytesIO(response.content))
                image.verify()  # Verify if it's a valid image
                return response.content
            except (IOError, SyntaxError) as e:
                raise ValueError(f"Downloaded image is invalid: {str(e)}")

        except (requests.RequestException, ValueError) as e:
            if attempt < max_retries - 1:
                wait_time = 2 ** attempt
                print(f"Download failed (attempt {attempt+1}/{max_retries}), will retry in {wait_time} seconds: {str(e)}")
                time.sleep(wait_time)
            else:
                # Create alternative image binary data
                print(f"Unable to download image, creating alternative image: {image_url}")
                img = Image.new('RGB', (256, 256), color=(122, 122, 122))
                buffer = io.BytesIO()
                img.save(buffer, format='JPEG')
                return buffer.getvalue()

def parse_global_caption(sample):
    """Extract global description text from sample"""
    # Try to get global_caption from different locations
    global_caption = None

    # First try to get directly from top-level fields
    if 'global_caption' in sample:
        global_caption = sample['global_caption']

    # Then try to find in cap_seg field
    elif 'cap_seg' in sample:
        cap_seg = sample['cap_seg']
        # Handle JSON string format
        if isinstance(cap_seg, str):
            try:
                cap_seg = json.loads(cap_seg)
            except json.JSONDecodeError:
                # Might be a string with single quotes
                try:
                    cap_seg = json.loads(cap_seg.replace("'", '"'))
                except:
                    cap_seg = {}

        # Handle dictionary format
        if isinstance(cap_seg, dict) and 'global_caption' in cap_seg:
            global_caption = cap_seg['global_caption']

    # Fallback: use default description
    if not global_caption or not isinstance(global_caption, str):
        print(f"No valid global_caption found, using default description")
        global_caption = "This is an image"

    return global_caption.strip()


def convert_samples(samples_df):
    """Convert a single batch of samples to target format"""
    results = []
    for _, row in samples_df.iterrows():
        sample = row.to_dict()
        index = sample.get("__index_level_0__", "")  # Get index column if available

        # Generate unique ID
        image_id = str(uuid.uuid4())

        # Get image URL and download
        image_url = sample.get("opensource_url", "")
        if not image_url:
            print(f"Missing opensource_url, using default image")
            image_url = "https://modelscope.cn-beijing.oss.aliyuncs.com/open_data/sa-1b-cot-qwen/default.jpg"

        # Download image binary data
        image_binary = download_image_binary(image_url)

        # Build conversation structure
        result = {
            "id": image_id,
            "messages": [
                {"content": FIXED_USER_PROMPT, "role": "user"},
                {"content": parse_global_caption(sample), "role": "assistant"},
            ],
            "images": [image_binary],  # Store binary image data directly
            "image_url": image_url,  # Keep original URL for debugging
        }
        results.append(result)

    return pd.DataFrame(results)


def save_with_ray_lance(processed_ray_dataset):
    """Save as Lance format using Ray's write_lance method"""
    print("Saving data using Ray's write_lance method...")

    lance_output_path = os.path.join(OUTPUT_DIR, "original_data.lance")

    # Write to Lance format
    processed_ray_dataset.write_lance(lance_output_path)

    print(f"Data successfully written to: {lance_output_path}")


def main():
    # Set dataset information
    dataset_name = "Tongyi-DataEngine/SA1B-Paired-Captions-Images"

    # Prepare output directory
    prepare_output_directory()

    # Download dataset
    print(f"Downloading dataset: {dataset_name} (first {NUM_SAMPLES} samples)")

    ms_dataset = MsDataset.load(dataset_name, split="train")

    ray_dataset = ray.data.from_huggingface(ms_dataset).limit(NUM_SAMPLES)

    ray_dataset = ray_dataset.repartition(100)

    # Process samples
    print("Starting to process samples...")
    processed_ray_dataset = ray_dataset.map_batches(
        convert_samples,
        batch_format="pandas",
        num_cpus=1,
        concurrency=100,
        batch_size=32,
    )

    # Save converted samples
    save_with_ray_lance(processed_ray_dataset)

    # Add metadata
    metadata = {
        "dataset": dataset_name,
        "num_samples": NUM_SAMPLES,
        "conversion_date": pd.Timestamp.now().isoformat(),
        "conversion_format": "SA1B to multimodal conversation (binary image storage)",
        "message_structure": [
            {"role": "user", "content": "Fixed prompt<image>"},
            {"role": "assistant", "content": "Image description"},
        ],
    }

    with open(os.path.join(OUTPUT_DIR, "metadata.json"), "w", encoding="utf-8") as f:
        json.dump(metadata, f, indent=2, ensure_ascii=False)

    print("\nConversion completed!")
    return 0


if __name__ == '__main__':
    main()

Scale up for production

The example uses NUM_SAMPLES = 10000 and concurrency=100 to validate the pipeline quickly. For production-scale runs, increase NUM_SAMPLES to your full dataset size and tune concurrency and batch_size based on your GPU worker count and available memory.

Step 2: Label data incrementally with Ray Serve

Lance's column-append API lets you add a scoring column to an existing dataset without rewriting it. This step deploys a scoring model as a Ray Serve endpoint, then calls it to score each record and write the scores back as a new column.

1. Deploy the scoring service.

The following code deploys a ScoringModel service using Ray Serve. The score method returns random values as a placeholder — replace it with your actual scoring logic (for example, an LLM-based quality scorer) before running in production.

"""
Ray Serve scoring service for data preprocessing.
The score function here is a placeholder. Replace with a real LLM model or scorer for production.
"""
import ray
from ray import serve
from ray.data import Dataset
import pandas as pd
import pyarrow as pa
import logging
import random

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Initialize Ray
ray.init(address="ray://127.0.0.1:10001")

# Configure Ray Serve
@serve.deployment()
class ScoringModel:
    def __init__(self):
        # Load your model here
        pass

    def score(self, data_batch):
        results = []
        print(f"Scoring {len(data_batch)} items")
        print(data_batch[0])
        for item in data_batch:
            # Replace with actual scoring logic
            score = random.randint(60, 100)
            item["score"] = score
            results.append(item)
        return results

# Deploy model service
model_deployment = ScoringModel.bind()
serve.run(model_deployment, name="scoring_model")

2. Run incremental labeling on the Lance dataset.

All records are scored in parallel. add_columns appends the score1 column to each fragment in the Lance dataset without touching existing data.

import pyarrow as pa
from pathlib import Path
import lance
import ray
import pandas as pd
from lance.ray.fragment_api import add_columns
import random
from ray import serve

path = "/nas/lance/binary_data_10w/original_data.lance"


# Define label generation logic
def generate_labels(batch: pa.RecordBatch) -> pa.RecordBatch:
    """Score a batch using the Ray Serve endpoint, return a new RecordBatch with scores"""
    # Convert RecordBatch to Pandas DataFrame
    batch_df = batch.to_pandas()

    handle = serve.get_app_handle("scoring_model")
    # Convert DataFrame to list of dictionaries (one dictionary per row)
    dict_list = batch_df.to_dict('records')

    # Asynchronously call scoring service
    scored_data_ref = handle.score.remote(dict_list)
    scores = scored_data_ref.result()  # Get scoring results

    return pa.RecordBatch.from_arrays([scores], names=["score1"])


def main():
    # Add new columns in parallel
    lance_ds = lance.dataset(path)

    add_columns(
        lance_ds,
        generate_labels,
        source_columns=["images"],  # Input columns needed
    )

    print("Data labeling completed")


if __name__ == "__main__":
    main()

Step 3: Fine-tune the model with LLaMA-Factory

1. Install LLaMA-Factory and open the Web UI.

Follow the installation guide to install LLaMA-Factory, then open the LLaMA Factory Web UI.

2. Register the Lance dataset in `dataset_info.json`.

Insert the dataset_info metadata into dataset_info.json (or create the file) to declare the mapping between your Lance data directory and the expected data fields.

image.png

3. Enable Lance data reading in LLaMA-Factory.

Submit a ticket to request technical support for modifying the LLaMA-Factory source code to read Lance datasets and load only the required columns.submit a ticketSubmit a ticketSubmit a ticket

4. Start training.

Run llamafactory webui and select the dataset by the name you registered in dataset_info.json.

image