All Products
Search
Document Center

AnalyticDB:Build a multimodal data pipeline with Notebook, PySpark, and Lance

Last Updated:Aug 22, 2025

This guide demonstrates how to build an efficient data processing pipeline for a Vision-Language Model (VLM). You will use a DMS Notebook and PySpark to process and combine image and text data stored in Object Storage Service (OSS) and store it in the high-performance Lance format.

Background information

Building multimodal capabilities and applying multimodal training data are critical technical directions. In multimodal scenarios, mixed storage of images and text is a common storage method, typically executed through distributed computing frameworks (such as Spark). Through appropriate and efficient training data solutions, you can ensure that models learn effectively from diverse multimodal data, thereby enhancing the model's generalization ability and performance.

Solution advantages

  • Based on the rich ecosystem libraries of DMS Notebook+ADB Spark, you can quickly develop and debug tasks.

  • Multimodal data is uniformly stored in Lance format and managed in a data lake (including lifecycle, permissions, and other aspects).

  • Lance format provides high-performance point query capabilities, making model training and data preprocessing scenarios more efficient. In the demonstration scenario, the performance of Lance format improves by approximately 3~4 times compared to Parquet format.

  • Lance format offers the following advantages for mixed storage of images and data:

    • Data integrity and consistency: Images and their associated data (such as metadata, annotations, etc.) are stored in the same file, avoiding loss or matching errors caused by scattered files, facilitating overall management and migration.

    • Improved reading efficiency: No need to read multiple files simultaneously, reducing the time required for IO operations and path lookups. Suitable for scenarios that process large amounts of images and data in batches, such as machine learning datasets.

    • Simplified data management: A single file can cover all relevant information, eliminating the need to maintain complex file indexes or directory structures, thereby reducing management costs and suitable for collaborative scenarios.

    • Compatibility and portability: Using a single file format helps with transmission between different systems or platforms, avoiding compatibility issues caused by dependencies on external paths.

    • Security and access control: The entire file can be encrypted or permission settings can be applied uniformly to prevent splitting and tampering of images and data, thereby enhancing data security.

Solution process

image

Before you begin

Deploy resources

  1. An AnalyticDB for MySQL Enterprise Edition, Basic Edition, or Data Lakehouse Edition cluster is created.

  2. Create a database account.

  3. Create a Job resource group.

  4. Configure the log storage address for Spark applications.

    Log on to the AnalyticDB for MySQL console, on the Job Development > Spark JAR Development page, click Log Settings, and select the default path or customize a storage path. When customizing the storage path, you cannot save logs in the root directory of OSS. Make sure that the path contains at least one folder.

  5. To use a RAM user to log on to the console for Spark job development, you need to complete authorization for the RAM user.

Prepare sample data and JAR packages

  1. Randomly select three images, name them image1.jpg, image2.jpg, and image3.jpg, and upload them to OSS.

  2. Prepare the testdata.json file and upload it to OSS.

    [
      {
            "docid": "doc_001",
            "text": "This is a prompt for image processing.",
            "image_id1": "image1",
            "image_id2": "image2"
        },
        {
            "docid": "doc_002",
            "text": "Another prompt with image references.",
            "image_id1": "image3",
            "image_id2": "image1"
        },
        {
            "docid": "doc_003",
            "text": "Interesting dialogue about images.",
            "image_id1": "image2",
            "image_id2": "image1",
            "image_id3": "image3"
        }
    ]
  3. Download the lance-spark-bundle-3.5_2.12-0.0.1.jar package and upload it to OSS.

Procedure

Step 1: Set up the Notebook environment

  1. Log on to the AnalyticDB for MySQL console, select the region where your cluster is located in the upper-left corner. In the navigation pane on the left, click Clusters, and then click the target cluster ID.

  2. Click Job Development > Notebook Development. Make sure you have completed the following preparations, and then click Enter DMS Notebook.

    image

Step 2: Create a Notebook file

  1. Prepare resources for Notebook.

  2. Prepare resources for Notebook.

    image

  3. Create a Spark cluster.

    1. Click the image button to access the resource management page, and click Compute Cluster.

    2. Select the Spark Cluster tab, click Create Cluster, and configure the following parameters:

      image

      Parameter

      Description

      Example

      Cluster Name

      Enter a cluster name that is easy to identify the usage scenario.

      spark_test

      Runtime Environment

      Currently, the following images are supported:

      • adb-spark:v3.3-python3.9-scala2.12

      • adb-spark:v3.5-python3.9-scala2.12

      adb-spark:v3.5-python3.9-scala2.12

      AnalyticDB Instance

      Select the AnalyticDB for MySQL cluster from the drop-down list.

      amv-uf6i4bi88****

      AnalyticDB MySQL Resource Group

      Select a Job resource group from the drop-down list.

      testjob

      Spark APP Executor Specification

      Select the resource specification for the Spark Executor.

      Different models correspond to different specifications. For more information, see the Model column in Spark application configuration parameter description.

      large

      VSwitch

      Select a vSwitch under the current VPC.

      vsw-uf6n9ipl6qgo****

      Dependent JARs

      Specify the OSS storage path of the JAR package. Enter the OSS path of the JAR package downloaded in the preparation work.

      oss://testBucketName/adb/lance-spark-bundle-3.5_2.12-0.0.1.jar

  4. Create and start a Notebook session.

    image

    Parameter

    Description

    Example

    Cluster

    Select the Spark cluster created in step b.

    spark_test

    Session Name

    You can customize the session name.

    new_session

    Image

    Select the image specification.

    • Spark3.5_Scala2.12_Python3.9:1.0.9

    • Spark3.3_Scala2.12_Python3.9:1.0.9

    Spark3.5_Scala2.12_Python3.9:1.0.9

    Specification

    Driver resource specification.

    • 1 core 4 GB

    • 2 cores 8 GB

    • 4 cores 16 GB

    • 8 cores 32 GB

    • 16 cores 64 GB

    4C16G

    Configuration

    Profile resource.

    You can edit the profile name, resource release duration, data storage location, Pypi package management, and environment variables.

    Important

    Resource release duration: When the resource idle time exceeds the set duration, it will be automatically released. Setting the resource release duration to 0 means the resource will never be automatically released.

    deault_profile

  5. Click the image button, and then click + > New Notebook File.

    image

Step 3: Import image and text data into Lance

  1. Set the Cell language type to Python and execute the following code to download Python dependencies.

    !pip install pyarrow==19.0.1
    !pip install pylance==0.23.2
    !pip install oss2==2.19.1
    !pip install pandas==2.2.3
    !pip install torch==2.7.0
    !pip install torchvision==0.22.0
    !pip install pillow==11.2.1
  2. Set the Cell language type to Python, replace the relevant configuration parameters in the following code, and execute it to import image data into Lance.

    import os
    import oss2
    import pandas as pd
    import pyarrow as pa
    import lance
    
    # Bucket name
    bucket_name = 'testBucketName'
    # Bucket Endpoint
    endpoint = 'oss-cn-hangzhou-internal.aliyuncs.com'
    # AccessKey ID and AccessKey Secret of an Alibaba Cloud account or a RAM user with OSS access permissions
    auth = oss2.Auth('AK', 'SK')
    bucket = oss2.Bucket(auth, endpoint, bucket_name)
    
    # Directory prefix where image data is stored
    prefix = 'lanceData/'
    
    storage_options = {
        # Region where the bucket is located
        "region": "cn-hangzhou",
        # Bucket Endpoint
        "endpoint": "https://testBucketName.oss-cn-hangzhou-internal.aliyuncs.com",
        # AccessKey ID and AccessKey Secret of an Alibaba Cloud account or a RAM user with OSS access permissions
        "access_key_id": "ak",
        "secret_access_key": "sk",
        "virtual_hosted_style_request": "True"
    }
    
    data = []
    for obj in oss2.ObjectIterator(bucket, prefix=prefix):
        if obj.key.endswith(('.png', '.jpg', '.jpeg')):
            image_id = os.path.splitext(os.path.basename(obj.key))[0]
            img_binary = bucket.get_object(obj.key).read()
            data.append({'image_id': image_id, 'image_data': img_binary})
    
    df = pd.DataFrame(data)
    
    schema = pa.schema([pa.field("image_id", pa.string()), pa.field("image_data", pa.binary())])
    table = pa.Table.from_pandas(df, schema=schema)
    # OSS path where image Lance data is stored, protocol must be s3
    uri = "s3://testBucketName/lance_data/lance_image_dataset.lance"
    lance.write_dataset(table, uri, storage_options=storage_options)
    
    ds = lance.dataset(uri, storage_options=storage_options)
    
  3. Set the Cell language type to Python, replace the relevant configuration parameters in the following code, and execute it to import text data into Lance.

    import pandas as pd
    import pyarrow as pa
    import lance
    import fsspec
    import json
    import oss2
    import json
    
    storage_options = {
        # Region where the bucket is located
        "region": "cn-hangzhou",
        # Bucket Endpoint
        "endpoint": "https://testBucketName.oss-cn-hangzhou-internal.aliyuncs.com",
        # AccessKey ID and AccessKey Secret of an Alibaba Cloud account or a RAM user with OSS access permissions
        "access_key_id": "AK",
        "secret_access_key": "SK",
        "virtual_hosted_style_request": "True"
    }
    
    
    # Initialize authentication information
    auth = oss2.Auth('AK', 'SK')
    bucket = oss2.Bucket(
        auth,
        'https://oss-cn-hangzhou-internal.aliyuncs.com', 
        'testBucketName' # Bucket name
    )
    
    # Get file content
    obj = bucket.get_object('Lance/data/testdata.json')
    text_data = json.load(obj)
    # 2. Construct DataFrame and subsequent data processing
    schema = pa.schema([
        pa.field("docid", pa.string()),
        pa.field("text", pa.string()),
        pa.field("image_id1", pa.string()),
        pa.field("image_id2", pa.string()),
        pa.field("image_id3", pa.string())
    ])
    
    df = pd.DataFrame(text_data)
    table = pa.Table.from_pandas(df, schema=schema)
    # OSS path where text Lance data is stored, protocol must be s3
    uri = "s3://testBucketName/Lance/lance_text_dataset.lance"
    
    lance.write_dataset(table, uri, storage_options=storage_options)
    
    ds = lance.dataset(uri, storage_options=storage_options)
    print(ds.to_table().to_pandas())

Step 4: Mix and store image and text data

  1. Set the Cell language type to Python, replace the relevant configuration parameters in the following code, and execute it to associate and integrate image and text data, storing them as a comprehensive dataset that contains both images and text.

    from pyspark.sql.dataframe import DataFrame
    
    from pyspark.sql import SparkSession, DataFrame
    from pyspark.sql import functions as F
    from pyspark.sql.types import StringType
    import lance
    import json
    
    storage_options = {
        # Region where the bucket is located
        "region": "cn-hangzhou",
        # Bucket Endpoint
        "endpoint": "https://testBucketName.oss-cn-hangzhou-internal.aliyuncs.com",
        # AccessKey ID and AccessKey Secret of an Alibaba Cloud account or a RAM user with OSS access permissions
        "access_key_id": "ak",
        "secret_access_key": "sk",
        "virtual_hosted_style_request": "True"
    }
    class VisionDataProcessor:
        def __init__(self, app_name="Lance Vision Data Join and Format Example"):
            self.spark = SparkSession.builder.config("spark.driver.memory", "32g") \
            .config("spark.sql.catalog.lance","com.lancedb.lance.spark.LanceCatalog") \
            .config("spark.sql.catalog.lance.aws_region","cn-hangzhou") \
             # Bucket Endpoint
            .config("spark.sql.catalog.lance.aws_endpoint","https://testBucketName.oss-cn-hangzhou-internal.aliyuncs.com") \
            # AccessKey ID and AccessKey Secret of an Alibaba Cloud account or a RAM user with OSS access permissions
            .config("spark.sql.catalog.lance.access_key_id","ak") \
            .config("spark.sql.catalog.lance.secret_access_key","sk") \
            .config("spark.sql.catalog.lance.virtual_hosted_style_request","True") \
            .config("spark.driver.cores", "8").config("spark.rpc.message.maxSize","1024").appName(app_name).getOrCreate()
    
        def get_lance_dataset(self, uri) -> DataFrame:
            # Read Lance dataset
            data = spark.read \
                .format("lance") \
                .load(uri)
            return data
    
        def join_vision_data(self) -> DataFrame:
            # OSS paths where image Lance data and text Lance data are stored, protocol must be s3
            df_text = self.get_lance_dataset(uri='s3://testBucketName/lance_data/lance_text_dataset.lance')
            df_image = self.get_lance_dataset('s3://testBucketName/lance_data/lance_image_dataset.lance')
    
            df_text = df_text.join(
                df_image.select(F.col("image_id").alias("image_id1"), F.col("image_data")),
                on="image_id1",
                how="left"
            ).withColumnRenamed("image_data", "image1_byte")
    
            df_text = df_text.join(
                df_image.select(F.col("image_id").alias("image_id2"), F.col("image_data")),
                on="image_id2",
                how="left"
            ).withColumnRenamed("image_data", "image2_byte")
    
            df_text = df_text.join(
                df_image.select(F.col("image_id").alias("image_id3"), F.col("image_data")),
                on="image_id3",
                how="left"
            ).withColumnRenamed("image_data", "image3_byte")
    
            vision_df = df_text.select(
                "docid",
                "text",
                F.col("image1_byte").alias("image1"),
                F.col("image2_byte").alias("image2"),
                F.col("image3_byte").alias("image3")
            )
            return vision_df
    
        def format_to_training_data(self, df: DataFrame) -> DataFrame:
            def format_prompt(text, image1, image2, image3):
                images = [image.hex() for image in (image1, image2, image3) if image]
                alpaca_format = {"instruction": text, "input": images, "output": ""}
                return json.dumps(alpaca_format)
    
            format_prompt_udf = F.udf(format_prompt, StringType())
            result_str = df.withColumn("formatted_prompt", format_prompt_udf("text", "image1", "image2", "image3"))
            return result_str
    
        def process_and_save(self, json_output_path="training_data.json"):
            vision_df = self.join_vision_data()
            formatted_df = self.format_to_training_data(vision_df)
    
            formatted_df.show(truncate=True)
    
            # Save as JSON file
            formatted_prompts = [row.formatted_prompt for row in formatted_df.collect()]
            with open(json_output_path, "w") as f:
                json.dump(formatted_prompts, f, indent=2)
    
  2. Set the Cell language type to Python and execute the following code to construct a training dataset.

    from torch.utils.data import Dataset, DataLoader
    from PIL import Image
    import torch
    from torchvision import transforms
    import io
    import json 
    
    class VLMDataset(Dataset):
        """
        Custom dataset for VLM training.
        """
    
        def __init__(self, df, transform=None):
            """
            Initialize the dataset.
            """
            self.rows = df.select("formatted_prompt").collect()
            self.transform = transform or transforms.Compose([
                transforms.Resize((224, 224)),
                transforms.ToTensor()
            ])
    
        def __len__(self):
            """
            Get the total number of samples in the dataset.
            """
            return len(self.rows)
    
        def __getitem__(self, idx):
            """
            Get a sample from the dataset.
            """
            row = json.loads(self.rows[idx].formatted_prompt)
            text = row["instruction"]
            images_data = row["input"]
            images = []
    
            for image_hex in images_data:
                img_data = bytes.fromhex(image_hex)
                try:
                    image = Image.open(io.BytesIO(img_data))
                    if self.transform:
                        image = self.transform(image)
                    images.append(image)
                except Exception as e:
                    print(f"Error processing image: {e}")
                    images.append(torch.zeros(3, 224, 224))  # Placeholder for missing images
    
            while len(images) < 3:
                images.append(torch.zeros(3, 224, 224))
    
            return text, torch.stack(images)
    
    
    if __name__ == "__main__":
        processor = VisionDataProcessor()
        formatted_df = processor.format_to_training_data(processor.join_vision_data())
    
        dataset = VLMDataset(formatted_df)
        dataloader = DataLoader(dataset, batch_size=8, shuffle=False)
    
        for batch in dataloader:
            texts, images = batch
            print('Texts:', texts)
            print('Images batch shape:', images.shape)
    
        processor.spark.stop()
    

    The execution result is as follows:

    A dataset of 3×3×3×224×224 (batch × number of images × image size [RGB, length, width]) is constructed.

    image