All Products
Search
Document Center

AnalyticDB:Build an efficient image labeling pipeline with Ray and LLaMA-Factory

Last Updated:Aug 20, 2025

This guide demonstrates how to build a high-performance, distributed pipeline for preparing and labeling image data for model training. By combining the power of Ray for distributed computing and Lance for efficient data storage, you can significantly accelerate the data preparation stage for training a multimodal model with LLaMA-Factory. This solution is ideal for scenarios like building a "gaming assistant" that requires training a visual model on large-scale, image-based datasets.

Background information

In a game community scenario that aims to provide game distribution and interaction services for players and developers, we need to address issues such as mismatched guides and missing context that players frequently encounter. To solve these problems, we are exploring the introduction of AI to build a "game companion assistant." This assistant aims to identify the player's current game state and provide relevant suggestions based on site content.

The existing solution is based on LLaMA-Factory for Supervised Fine-Tuning (SFT) and Continued Pre-Training (CPT), and uses VLLM or Alibaba Cloud Model Studio for inference. However, it also relies on a large amount of manually annotated image data to support visual understanding.

In this context, with ADB Ray as the center, integration with Lance is implemented to use RayData to improve distributed image and text data processing efficiency and structured capabilities. At the same time, LLaMA-Factory is integrated to achieve distributed fine-tuning of the Qwen-VL multimodal model through Ray.

Solution advantages

  • You can use RayData to efficiently load and transform source data in any format, and store it uniformly in Lance format.

  • Lance supports integrated storage of image binary and structured data, providing better data consistency and versioning, thereby reducing remote IO.

  • Combined with Ray to implement Lance distributed data labeling and incremental updates (adding columns), with a 193% speed improvement compared to Parquet data.

Solution flow

image

Before you begin

  1. An AnalyticDB for MySQL Enterprise Edition, Basic Edition, or Data Lakehouse Edition cluster is created.

  2. Managed Ray service, and select GPU as the Worker Resource Type.

  3. Submit a ticket to contact technical support for assistance with deploying the LLaMA-Factory framework.

  4. Please Submit a ticket to contact technical support for assistance with attaching a network interface card to ensure that the corresponding VPC can access the Internet.

Procedure

Step 1: Prepare data and write it to Lance

Replace the relevant configurations in the following code according to your actual environment and execute it to load the model scope dataset and download image binary data. Then use RayData for data format processing, and write the processed data to Lance.

from modelscope.msdatasets import MsDataset
import lance
import pyarrow as pa
import pandas as pd
import os
import json
import uuid
import shutil
from tqdm import tqdm
import time
import requests
from PIL import Image
import io
import numpy as np
import ray
import pyarrow.compute as pc

# Configuration parameters
OUTPUT_DIR = "/home/ray/binary_data"
NUM_SAMPLES = 10000
FIXED_USER_PROMPT = "Please describe what is happening in this image<image>"
HEADERS = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'}

def prepare_output_directory():
    """Prepare output directory structure"""
    if os.path.exists(OUTPUT_DIR):
        shutil.rmtree(OUTPUT_DIR)
    
    os.makedirs(OUTPUT_DIR)
    print(f"Output directory created: {OUTPUT_DIR}")

def download_image_binary(image_url, max_retries=3):
    """Download image file from URL, return image binary data"""
    for attempt in range(max_retries):
        try:
            # Download image data
            response = requests.get(image_url, headers=HEADERS, timeout=10)
            response.raise_for_status()
            
            # Check if content is valid image
            try:
                image = Image.open(io.BytesIO(response.content))
                image.verify()  # Verify if it's a valid image
                return response.content
            except (IOError, SyntaxError) as e:
                raise ValueError(f"Downloaded image is invalid: {str(e)}")
        
        except (requests.RequestException, ValueError) as e:
            if attempt < max_retries - 1:
                wait_time = 2 ** attempt
                print(f"Download failed (attempt {attempt+1}/{max_retries}), will retry in {wait_time} seconds: {str(e)}")
                time.sleep(wait_time)
            else:
                # Create alternative image binary data
                print(f"Unable to download image, creating alternative image: {image_url}")
                img = Image.new('RGB', (256, 256), color=(122, 122, 122))
                buffer = io.BytesIO()
                img.save(buffer, format='JPEG')
                return buffer.getvalue()

def parse_global_caption(sample):
    """Extract global description text from sample"""
    # Try to get global_caption from different locations
    global_caption = None

    # First try to get directly from top-level fields
    if 'global_caption' in sample:
        global_caption = sample['global_caption']

    # Then try to find in cap_seg field
    elif 'cap_seg' in sample:
        cap_seg = sample['cap_seg']
        # Handle JSON string format
        if isinstance(cap_seg, str):
            try:
                cap_seg = json.loads(cap_seg)
            except json.JSONDecodeError:
                # Might be a string with single quotes
                try:
                    cap_seg = json.loads(cap_seg.replace("'", '"'))
                except:
                    cap_seg = {}

        # Handle dictionary format
        if isinstance(cap_seg, dict) and 'global_caption' in cap_seg:
            global_caption = cap_seg['global_caption']

    # Fallback: use default description
    if not global_caption or not isinstance(global_caption, str):
        print(f"No valid global_caption found, using default description")
        global_caption = "This is an image"

    return global_caption.strip()


def convert_samples(samples_df):
    """Convert a single batch of samples to target format"""
    results = []
    for _, row in samples_df.iterrows():
        sample = row.to_dict()
        index = sample.get("__index_level_0__", "")  # Get index column if available

        # Generate unique ID
        image_id = str(uuid.uuid4())

        # Get image URL and download
        image_url = sample.get("opensource_url", "")
        if not image_url:
            print(f"Missing opensource_url, using default image")
            image_url = "https://modelscope.cn-beijing.oss.aliyuncs.com/open_data/sa-1b-cot-qwen/default.jpg"

        # Download image binary data
        image_binary = download_image_binary(image_url)

        # Build conversation structure
        result = {
            "id": image_id,
            "messages": [
                {"content": FIXED_USER_PROMPT, "role": "user"},
                {"content": parse_global_caption(sample), "role": "assistant"},
            ],
            "images": [image_binary],  # Store binary image data directly
            "image_url": image_url,  # Keep original URL for debugging
        }
        results.append(result)

    return pd.DataFrame(results)


def save_with_ray_lance(processed_ray_dataset):
    """Save as Lance format using Ray's write_lance method"""
    print("Saving data using Ray's write_lance method...")

    lance_output_path = os.path.join(OUTPUT_DIR, "original_data.lance")

    # Write to Lance format
    processed_ray_dataset.write_lance(lance_output_path)

    print(f"Data successfully written to: {lance_output_path}")


def main():
    # Set dataset information
    dataset_name = "Tongyi-DataEngine/SA1B-Paired-Captions-Images"

    # Prepare output directory
    prepare_output_directory()

    # Download dataset
    print(f"Downloading dataset: {dataset_name} (first {NUM_SAMPLES} samples)")

    ms_dataset = MsDataset.load(dataset_name, split="train")

    ray_dataset = ray.data.from_huggingface(ms_dataset).limit(NUM_SAMPLES)

    ray_dataset = ray_dataset.repartition(100)

    # Process samples
    print("Starting to process samples...")
    processed_ray_dataset = ray_dataset.map_batches(
        convert_samples,
        batch_format="pandas",
        num_cpus=1,
        concurrency=100,
        batch_size=32,
    )

    # Save converted samples
    save_with_ray_lance(processed_ray_dataset)

    # Add metadata
    metadata = {
        "dataset": dataset_name,
        "num_samples": NUM_SAMPLES,
        "conversion_date": pd.Timestamp.now().isoformat(),
        "conversion_format": "SA1B to multimodal conversation (binary image storage)",
        "message_structure": [
            {"role": "user", "content": "Fixed prompt<image>"},
            {"role": "assistant", "content": "Image description"},
        ],
    }

    with open(os.path.join(OUTPUT_DIR, "metadata.json"), "w", encoding="utf-8") as f:
        json.dump(metadata, f, indent=2, ensure_ascii=False)

    print("\n Conversion completed!")
    return 0


if __name__ == '__main__':
    main()

Step 2: Label data incrementally with Ray Serve

  1. Use Ray Serve to deploy the score model. Execute the following code to deploy the scoring service.

    """
    ray serve, a continuously running scoring service for data preprocessing
    in this demo, the score function is random, in production it can be replaced with an LLM model or other scorer
    """
    import ray
    from ray import serve
    from ray.data import Dataset
    import pandas as pd
    import pyarrow as pa
    import logging
    import random
    # Set up logging
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    # Initialize Ray
    ray.init(address="ray://127.0.0.1:10001")
    
    # Configure Ray Serve
    @serve.deployment()
    class ScoringModel:
        def __init__(self):
            # Load your model here
            # empty in this demo
            pass
        
        def score(self,data_batch) :
            results = []
            print(f"Scoring {len(data_batch)} items")
            print(data_batch[0])
            for item in data_batch:
                # Call actual scoring model (using random values here for simulation)
                score = random.randint(60,100)
                item["score"] = score
                results.append(item)
            return results
    
    # Deploy model service
    model_deployment = ScoringModel.bind()
    serve.run(model_deployment, name="scoring_model")
    
  2. Execute the following code to perform incremental labeling on Lance data.

    import pyarrow as pa
    from pathlib import Path
    import lance
    import ray
    import pandas as pd
    from lance.ray.fragment_api import add_columns
    import random
    from ray import serve
    
    path = "/nas/lance/binary_data_10w/original_data.lance"
    
    
    # Define label generation logic
    def generate_labels(batch: pa.RecordBatch) -> pa.RecordBatch:
        """Use Ray Serve service to score data batches, return new RecordBatch with scores"""
        # Convert RecordBatch to Pandas DataFrame
        batch_df = batch.to_pandas()
    
        handle = serve.get_app_handle("scoring_model")
        # Convert DataFrame to list of dictionaries (one dictionary per row)
        dict_list = batch_df.to_dict('records')
    
        # Asynchronously call scoring service
        scored_data_ref = handle.score.remote(dict_list)
        scores = scored_data_ref.result()  # Get scoring results
    
        return pa.RecordBatch.from_arrays([scores], names=["score1"])
    
    
    def main():
        # Add new columns in parallel
        lance_ds = lance.dataset(path)
    
        add_columns(
            lance_ds,
            generate_labels,
            source_columns=["images"],  # Input columns needed
        )
    
        print("Data labeling completed")
    
    
    if __name__ == "__main__":
        main()
    

Step 3: Fine-tune the model with LLaMA-Factory

  1. Install and log on to the LLaMA Factory Web UI interface.

  2. Insert the dataset_info metadata into dataset_info.json, or create a new dataset_info.json file to declare the mapping relationship between data directories and data fields.

    image.png

  3. Please submit a ticket to contact technical support for modifying the llama-factory source code to support Lance data reading and ensure that only the required data columns are read.

  4. Use llamafactory webui for training, specifying the dataset using the set dataset name.

    image