All Products
Search
Document Center

Data Lake Formation:PyPaimon and Ray Data

Last Updated:Apr 21, 2026

This document explains how to use PyPaimon with Ray Data to perform distributed, parallel reads from and writes to Data Lake Formation (DLF) Paimon tables. This approach is ideal for large-scale data processing and ML training pipelines.

Overview

PyPaimon is the native Python SDK for Paimon. It allows you to access DLF Paimon tables directly from the Python ecosystem, including libraries like Pandas, PyArrow, PyTorch, and Ray, without a Java Virtual Machine (JVM) dependency. In multi-modal data lake scenarios, PyPaimon acts as a critical bridge connecting an AI inference pipeline to the data lake.

Ray Data is a distributed data processing framework within the Ray ecosystem. PyPaimon provides built-in integration with Ray Data, enabling distributed, parallel reads from and writes to Paimon tables across a Ray cluster using the to_ray() and write_ray() APIs. This makes it well-suited for large-scale data processing and ML training pipelines.

Prerequisites

Prepare the data

import pyarrow as pa
from pypaimon import CatalogFactory, Schema

# Connect to the DLF catalog
catalog = CatalogFactory.create({
    'metastore': 'rest',
    'uri': 'http://${region_id}-vpc.dlf.aliyuncs.com',       
    'warehouse': '${catalog_name}',                           
    'dlf.region': '${region_id}',                            
    'token.provider': 'dlf',
    'dlf.access-key-id': '<AK>',
    'dlf.access-key-secret': '<SK>',
})

# Create a table and write sample data
pa_schema = pa.schema([
    ('id', pa.int64()),
    ('name', pa.string()),
    ('category', pa.string()),
    ('score', pa.float64()),
    ('value', pa.float64()),
])
schema = Schema.from_pyarrow_schema(pa_schema)
catalog.create_table('my_db.my_table', schema, True)

catalog.drop_table('my_db.target_table', True)
catalog.create_table('my_db.target_table', schema, True)

table = catalog.get_table('my_db.my_table')
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()

table_write.write_arrow(pa.Table.from_pydict({
    'id': list(range(1, 101)),
    'name': [f'item_{i}' for i in range(1, 101)],
    'category': ['A', 'B'] * 50,
    'score': [float(i) for i in range(1, 101)],
    'value': [float(i) * 0.1 for i in range(1, 101)],
}))
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()

Distributed reads with Ray Data

Basic usage

import ray

ray.init()  # Initialize in local mode, or connect to a remote cluster with ray.init(address='ray://cluster:10001')

table = catalog.get_table('my_db.my_table')
read_builder = table.new_read_builder()
splits = read_builder.new_scan().plan().splits()
table_read = read_builder.new_read()

# Convert to a Ray Dataset (reads in parallel automatically)
ray_dataset = table_read.to_ray(splits)

print(ray_dataset)
# MaterializedDataset(num_blocks=3, num_rows=100, schema={id: int64, name: string, category: string, score: double, value: double})

# Ray Data operations
print(ray_dataset.count())
print(ray_dataset.take(5))
df = ray_dataset.to_pandas()

Control parallelism

The to_ray() method automatically distributes splits evenly among Ray tasks.

# Specify the number of output blocks
ray_dataset = table_read.to_ray(splits, override_num_blocks=8)

# Limit the number of concurrent tasks
ray_dataset = table_read.to_ray(splits, concurrency=4)

# Configure Ray remote arguments
ray_dataset = table_read.to_ray(
    splits,
    override_num_blocks=8,
    ray_remote_args={'num_cpus': 2, 'max_retries': 3}
)

to_ray() parameters:

Parameter

Type

Description

override_num_blocks

int

Specifies the number of output blocks.

concurrency

int

Limits the number of concurrent Ray tasks.

ray_remote_args

dict

Ray remote arguments, such as num_cpus and max_retries.

Configure block size

If a Paimon split is large, it might exceed Ray's default 128 MB block limit:

from ray.data import DataContext

ctx = DataContext.get_current()
ctx.target_max_block_size = 256 * 1024 * 1024  # 256MB
ray_dataset = table_read.to_ray(splits)

Data processing pipeline

# Filter
filtered = ray_dataset.filter(lambda row: row['score'] > 80)

# Transform
mapped = ray_dataset.map(lambda row: {**row, 'score_normalized': row['score'] / 100.0})

# Group and aggregate
grouped = ray_dataset.groupby('category').sum('value')

# Convert to other formats
df = ray_dataset.to_pandas()
arrow_table = ray_dataset.to_arrow_refs()

Predicate pushdown and projection

Apply predicate pushdown and column pruning at the Paimon layer to reduce the amount of data that Ray reads.

read_builder = table.new_read_builder()
pb = read_builder.new_predicate_builder()
read_builder = read_builder \
    .with_filter(pb.equal('category', 'A')) \
    .with_projection(['id', 'name', 'value'])

splits = read_builder.new_scan().plan().splits()
table_read = read_builder.new_read()
ray_dataset = table_read.to_ray(splits)

Distributed writes with Ray Data

The write_ray() method handles commits automatically, so you do not need to call prepare_commit() or commit().

import ray

# Read from a source table, process with Ray, and write to a target table.
source_table = catalog.get_table('my_db.my_table')
read_builder = source_table.new_read_builder()
splits = read_builder.new_scan().plan().splits()
table_read = read_builder.new_read()

# 1. Convert to a Ray Dataset.
ray_dataset = table_read.to_ray(splits)

# 2. Process the data with Ray.
ray_dataset = ray_dataset.filter(lambda row: row['score'] > 80)

# 3. Write to the target table.
target_table = catalog.get_table('my_db.target_table')
write_builder = target_table.new_batch_write_builder()
table_write = write_builder.new_write()
table_write.write_ray(ray_dataset, overwrite=False, concurrency=4)

# 4. Close the writer. Commits are handled automatically by `write_ray()`.
table_write.close()

write_ray() parameters:

Parameter

Type

Description

overwrite

bool

Specifies whether to overwrite existing data. Default: False.

concurrency

int

The number of concurrent write tasks.

ray_remote_args

dict

Ray remote arguments.