本文介紹如何在 Python 中使用 lance-dlf 串連 DLF Catalog,通過 PyLance 直接讀寫 Lance 表。適用於需要精細控制 Arrow Table 層級操作的情境。
如需使用 Daft DataFrame 引擎進行查詢過濾或批次運算,請參閱使用 Daft 操作 DLF Lance 表。
功能說明
lance-dlf 提供以下核心功能:
串連 DLF Catalog
將 DLF Database 映射為 Lance Namespace
僅暴露
type=lance-table類型的表通過 DLF
load_table_token擷取 OSS 臨時訪問憑證將 OSS 臨時憑證轉換為 PyLance 可用的
storage_options
資料的實際讀寫操作由 PyLance 完成:
# 寫入資料
lance.write_dataset(table, location, storage_options=storage_options)
# 讀取資料
lance.dataset(location, storage_options=storage_options)快速開始
安裝 lance-dlf
python3 -m pip install lance-dlf配置 Catalog 串連
CONFIG = {
"uri": "http://<DLF-ENDPOINT>", # 公網訪問須使用HTTPS協議
"warehouse": "<YOUR-CATALOG>",
"token.provider": "dlf",
"dlf.region": "<REGION-ID>",
"dlf.access-key-id": "<ACCESS-KEY-ID>",
"dlf.access-key-secret": "<ACCESS-KEY-SECRET>",
"dlf.oss-endpoint": "<OSS-ENDPOINT>", # VPC訪問可選,公網訪問必填
}配置參數:
參數名稱 | 說明 |
| DLF Paimon REST 訪問端點,詳見服務存取點。VPC訪問支援HTTP和HTTPS協議,公網訪問必須使用HTTPS協議 |
| DLF Catalog 名稱 |
| AccessKey 認證模式下,填寫 |
| DLF 所屬地區ID(如 |
| 訪問 DLF 的 AccessKey ID |
| 訪問 DLF 的 AccessKey Secret |
| (可選)STS 情境下使用的安全性權杖 |
| (可選)OSS 公網訪問存取點(如 |
AccessKey ID 和 AccessKey Secret 是訪問阿里雲資源的重要憑證,請妥善保管。請勿將真實 AccessKey 提交到 Git 倉庫。建議從環境變數、密鑰管理系統或運行時配置讀取存取憑證。
串連 DLF Catalog
匯入 lance_dlf 模組會自動註冊 dlf namespace 實現:
import lance_namespace
import lance_dlf # noqa: F401
# 串連 DLF Catalog
ns = lance_namespace.connect("dlf", CONFIG)
# 驗證串連
print(ns.namespace_id())namespace_id() 方法返回當前串連的 Catalog 資訊,用於確認串連的 DLF 訪問端點和 Catalog名稱。
查看 Namespace 和 Table
from lance_namespace import (
DescribeNamespaceRequest,
DescribeTableRequest,
ListNamespacesRequest,
ListTablesRequest,
)
DATABASE = "<database>"
TABLE = "<table>"
# 列出所有 Namespace
namespaces = ns.list_namespaces(ListNamespacesRequest(id=[]))
print(namespaces)
# 查看指定 Namespace 中繼資料
namespace = ns.describe_namespace(DescribeNamespaceRequest(id=[DATABASE]))
print(namespace)
# 列出 Namespace 下的所有 Table
tables = ns.list_tables(ListTablesRequest(id=[DATABASE]))
print(tables)
# 查看 Table 詳情
table = ns.describe_table(DescribeTableRequest(id=[DATABASE, TABLE]))
print(table.location)
print(table.properties)
print(table.storage_options)describe_table 返回欄位:
欄位名稱 | 說明 |
| Lance 資料的實際儲存路徑,通常為 |
| DLF Table 的 Schema 選項, |
| PyLance 讀寫 OSS 所需的臨時訪問憑證 |
基礎操作
建立 Lance 表並寫入資料
如果表不存在,需要先建立表然後寫入資料。原理:
使用
ns.create_table()在 DLF 中建立表DLF 返回 Lance 表的儲存位置(
location)擷取 OSS 臨時訪問憑證(
storage_options)使用PyLance 將 Arrow 資料寫入指定的儲存位置。
arrow_table_to_ipc_bytes函數的定義見資料序列化工具函數
import lance
import pyarrow as pa
from lance_namespace import CreateTableRequest, DescribeTableRequest
DATABASE = "default"
TABLE = "test_lance_create_001"
table_id = [DATABASE, TABLE]
# 構造測試資料
data = pa.table({
"f0": pa.array([101, 102, 103], type=pa.int64()),
"f1": pa.array(["create-a", "create-b", "create-c"], type=pa.string()),
})
# 將 Arrow Table 轉為 IPC bytes
def arrow_table_to_ipc_bytes(table):
sink = pa.BufferOutputStream()
writer = pa.ipc.new_stream(sink, table.schema)
writer.write_table(table)
writer.close()
return sink.getvalue().to_pybytes()
# 建立表並寫入資料
create_response = ns.create_table(
CreateTableRequest(id=table_id),
arrow_table_to_ipc_bytes(data),
)
print(create_response.location)
print(create_response.storage_options.keys())
# 讀取驗證
desc = ns.describe_table(DescribeTableRequest(id=table_id))
dataset = lance.dataset(desc.location, storage_options=desc.storage_options)
result = dataset.to_table()
print(result)預期輸出:
pyarrow.Table
f0: int64
f1: string
----
f0: [[101,102,103]]
f1: [["create-a","create-b","create-c"]]寫入已有表
如果 DLF 中已存在 Lance 表,可以先通過 describe_table 擷取表的儲存位置和訪問憑證,然後使用 PyLance 寫入資料。
import lance
import pyarrow as pa
from lance_namespace import DescribeTableRequest
DATABASE = "default"
TABLE = "test_lance_table"
table_id = [DATABASE, TABLE]
# 擷取表詳情
desc = ns.describe_table(DescribeTableRequest(id=table_id))
# 構造測試資料
data = pa.table({
"f0": pa.array([1, 2, 3], type=pa.int64()),
"f1": pa.array(["value-1", "value-2", "value-3"], type=pa.string()),
})
# 寫入資料
lance.write_dataset(
data,
desc.location,
mode="overwrite",
storage_options=desc.storage_options,
)
# 讀取驗證
dataset = lance.dataset(desc.location, storage_options=desc.storage_options)
print(dataset.to_table())寫入模式說明:
模式 | 說明 |
| 覆蓋寫入,適用於初始化空表或測試表 |
| 追加寫入,要求 Schema 相容 |
overwrite 模式會覆蓋已有的資料,請謹慎操作。
讀取資料
如果 DLF 中已存在 type=lance-table 的表且已包含資料,可以通過 describe_table 擷取儲存位置和臨時憑證,然後使用 PyLance 讀取資料。詳見建立 Lance 表並寫入資料代碼塊末端的“讀取驗證”部分。
完整樣本
以下樣本示範了完整的工作流程:串連 DLF、建立新表、寫入資料、列出表、讀取驗證。
from datetime import datetime
import lance
import lance_namespace
import pyarrow as pa
from lance_namespace import CreateTableRequest, DescribeTableRequest, ListTablesRequest
import lance_dlf # noqa: F401
CONFIG = {
"uri": "http://<DLF-ENDPOINT>",
"warehouse": "<YOUR-CATALOG>",
"token.provider": "dlf",
"dlf.region": "<REGION-ID>",
"dlf.access-key-id": "<ACCESS-KEY-ID>",
"dlf.access-key-secret": "<ACCESS-KEY-SECRET>",
"dlf.oss-endpoint": "<OSS-ENDPOINT>", # 可選
}
DATABASE = "default"
def arrow_table_to_ipc_bytes(table: pa.Table) -> bytes:
"""將 PyArrow 錶轉換為 IPC 位元組流"""
sink = pa.BufferOutputStream()
with pa.ipc.new_stream(sink, table.schema) as writer:
writer.write_table(table)
return sink.getvalue().to_pybytes()
def main():
# 串連 DLF Catalog
ns = lance_namespace.connect("dlf", CONFIG)
# 產生唯一表名
table_name = "test_lance_create_" + datetime.now().strftime("%Y%m%d_%H%M%S")
table_id = [DATABASE, table_name]
# 構造測試資料
data = pa.table({
"f0": pa.array([101, 102, 103], type=pa.int64()),
"f1": pa.array(["create-a", "create-b", "create-c"], type=pa.string()),
})
# 建立表並寫入資料
create_response = ns.create_table(
CreateTableRequest(id=table_id),
arrow_table_to_ipc_bytes(data),
)
print("created:", ".".join(table_id))
print("location:", create_response.location)
# 列出表並驗證
tables = ns.list_tables(ListTablesRequest(id=[DATABASE]))
print("listed_after_create:", table_name in tables.tables)
# 讀取資料並驗證
desc = ns.describe_table(DescribeTableRequest(id=table_id))
dataset = lance.dataset(desc.location, storage_options=desc.storage_options)
result = dataset.to_table()
print(result)
# 資料完整性校正
expected = data.to_pylist()
actual = result.to_pylist()
if actual != expected:
raise AssertionError(f"readback mismatch: expected={expected}, actual={actual}")
print("create_write_read: ok")
if __name__ == "__main__":
main()附錄
資料序列化工具函數
lance_namespace 的 create_table 介面要求將 Arrow 表序列化為 IPC 位元組流。
import pyarrow as pa
def arrow_table_to_ipc_bytes(table: pa.Table) -> bytes:
"""將 PyArrow 錶轉換為 IPC 位元組流"""
sink = pa.BufferOutputStream()
with pa.ipc.new_stream(sink, table.schema) as writer:
writer.write_table(table)
return sink.getvalue().to_pybytes()自動構造測試資料
如果不想寫入程式碼列名和類型,可以從 DLF Table Schema 中提取欄位資訊,自動構造測試資料。
import pyarrow as pa
from lance_dlf.common.identifier import Identifier
def sample_value(field_type: str, row: int):
"""根據欄位類型產生樣本值"""
normalized = field_type.lower()
if "int" in normalized:
return row + 1
if "string" in normalized or "char" in normalized or "varchar" in normalized:
return f"value-{row + 1}"
raise ValueError(f"Unsupported sample field type: {field_type}")
def build_sample_table(ns, database: str, table: str) -> pa.Table:
"""根據 DLF 表結構構造樣本資料"""
raw_table = ns._api.get_table(Identifier(database, table))
schema = raw_table.get_schema()
fields = schema.fields if schema and schema.fields else []
data = {}
for field in fields:
field_type = str(field.type)
data[field.name] = [sample_value(field_type, row) for row in range(3)]
return pa.table(data)使用樣本:
desc = ns.describe_table(DescribeTableRequest(id=[DATABASE, TABLE]))
data = build_sample_table(ns, DATABASE, TABLE)
lance.write_dataset(
data,
desc.location,
mode="overwrite",
storage_options=desc.storage_options,
)