This document explains how to use PyPaimon with Ray Data to perform distributed, parallel reads from and writes to Data Lake Formation (DLF) Paimon tables. This approach is ideal for large-scale data processing and ML training pipelines.
Overview
PyPaimon is the native Python SDK for Paimon. It allows you to access DLF Paimon tables directly from the Python ecosystem, including libraries like Pandas, PyArrow, PyTorch, and Ray, without a Java Virtual Machine (JVM) dependency. In multi-modal data lake scenarios, PyPaimon acts as a critical bridge connecting an AI inference pipeline to the data lake.
Ray Data is a distributed data processing framework within the Ray ecosystem. PyPaimon provides built-in integration with Ray Data, enabling distributed, parallel reads from and writes to Paimon tables across a Ray cluster using the to_ray() and write_ray() APIs. This makes it well-suited for large-scale data processing and ML training pipelines.
Prerequisites
Create a new data catalog.
Python 3.8 or later.
Install pypaimon-1.5.dev0.tar.gz or a later version.
Install Ray (
pip3 install ray).
Prepare the data
import pyarrow as pa
from pypaimon import CatalogFactory, Schema
# Connect to the DLF catalog
catalog = CatalogFactory.create({
'metastore': 'rest',
'uri': 'http://${region_id}-vpc.dlf.aliyuncs.com',
'warehouse': '${catalog_name}',
'dlf.region': '${region_id}',
'token.provider': 'dlf',
'dlf.access-key-id': '<AK>',
'dlf.access-key-secret': '<SK>',
})
# Create a table and write sample data
pa_schema = pa.schema([
('id', pa.int64()),
('name', pa.string()),
('category', pa.string()),
('score', pa.float64()),
('value', pa.float64()),
])
schema = Schema.from_pyarrow_schema(pa_schema)
catalog.create_table('my_db.my_table', schema, True)
catalog.drop_table('my_db.target_table', True)
catalog.create_table('my_db.target_table', schema, True)
table = catalog.get_table('my_db.my_table')
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
table_write.write_arrow(pa.Table.from_pydict({
'id': list(range(1, 101)),
'name': [f'item_{i}' for i in range(1, 101)],
'category': ['A', 'B'] * 50,
'score': [float(i) for i in range(1, 101)],
'value': [float(i) * 0.1 for i in range(1, 101)],
}))
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()Distributed reads with Ray Data
Basic usage
import ray
ray.init() # Initialize in local mode, or connect to a remote cluster with ray.init(address='ray://cluster:10001')
table = catalog.get_table('my_db.my_table')
read_builder = table.new_read_builder()
splits = read_builder.new_scan().plan().splits()
table_read = read_builder.new_read()
# Convert to a Ray Dataset (reads in parallel automatically)
ray_dataset = table_read.to_ray(splits)
print(ray_dataset)
# MaterializedDataset(num_blocks=3, num_rows=100, schema={id: int64, name: string, category: string, score: double, value: double})
# Ray Data operations
print(ray_dataset.count())
print(ray_dataset.take(5))
df = ray_dataset.to_pandas()Control parallelism
The to_ray() method automatically distributes splits evenly among Ray tasks.
# Specify the number of output blocks
ray_dataset = table_read.to_ray(splits, override_num_blocks=8)
# Limit the number of concurrent tasks
ray_dataset = table_read.to_ray(splits, concurrency=4)
# Configure Ray remote arguments
ray_dataset = table_read.to_ray(
splits,
override_num_blocks=8,
ray_remote_args={'num_cpus': 2, 'max_retries': 3}
)to_ray() parameters:
Parameter | Type | Description |
override_num_blocks | int | Specifies the number of output blocks. |
concurrency | int | Limits the number of concurrent Ray tasks. |
ray_remote_args | dict | Ray remote arguments, such as |
Configure block size
If a Paimon split is large, it might exceed Ray's default 128 MB block limit:
from ray.data import DataContext
ctx = DataContext.get_current()
ctx.target_max_block_size = 256 * 1024 * 1024 # 256MB
ray_dataset = table_read.to_ray(splits)Data processing pipeline
# Filter
filtered = ray_dataset.filter(lambda row: row['score'] > 80)
# Transform
mapped = ray_dataset.map(lambda row: {**row, 'score_normalized': row['score'] / 100.0})
# Group and aggregate
grouped = ray_dataset.groupby('category').sum('value')
# Convert to other formats
df = ray_dataset.to_pandas()
arrow_table = ray_dataset.to_arrow_refs()Predicate pushdown and projection
Apply predicate pushdown and column pruning at the Paimon layer to reduce the amount of data that Ray reads.
read_builder = table.new_read_builder()
pb = read_builder.new_predicate_builder()
read_builder = read_builder \
.with_filter(pb.equal('category', 'A')) \
.with_projection(['id', 'name', 'value'])
splits = read_builder.new_scan().plan().splits()
table_read = read_builder.new_read()
ray_dataset = table_read.to_ray(splits)Distributed writes with Ray Data
The write_ray() method handles commits automatically, so you do not need to call prepare_commit() or commit().
import ray
# Read from a source table, process with Ray, and write to a target table.
source_table = catalog.get_table('my_db.my_table')
read_builder = source_table.new_read_builder()
splits = read_builder.new_scan().plan().splits()
table_read = read_builder.new_read()
# 1. Convert to a Ray Dataset.
ray_dataset = table_read.to_ray(splits)
# 2. Process the data with Ray.
ray_dataset = ray_dataset.filter(lambda row: row['score'] > 80)
# 3. Write to the target table.
target_table = catalog.get_table('my_db.target_table')
write_builder = target_table.new_batch_write_builder()
table_write = write_builder.new_write()
table_write.write_ray(ray_dataset, overwrite=False, concurrency=4)
# 4. Close the writer. Commits are handled automatically by `write_ray()`.
table_write.close()write_ray() parameters:
Parameter | Type | Description |
overwrite | bool | Specifies whether to overwrite existing data. Default: |
concurrency | int | The number of concurrent write tasks. |
ray_remote_args | dict | Ray remote arguments. |