全部產品
Search
文件中心

Data Lake Formation:使用 Python 操作 DLF Lance 表

更新時間:May 13, 2026

本文介紹如何在 Python 中使用 lance-dlf 串連 DLF Catalog,通過 PyLance 直接讀寫 Lance 表。適用於需要精細控制 Arrow Table 層級操作的情境。

說明

如需使用 Daft DataFrame 引擎進行查詢過濾或批次運算,請參閱使用 Daft 操作 DLF Lance 表

功能說明

lance-dlf 提供以下核心功能:

  • 串連 DLF Catalog

  • 將 DLF Database 映射為 Lance Namespace

  • 僅暴露 type=lance-table 類型的表

  • 通過 DLF load_table_token 擷取 OSS 臨時訪問憑證

  • 將 OSS 臨時憑證轉換為 PyLance 可用的 storage_options

資料的實際讀寫操作由 PyLance 完成:

# 寫入資料
lance.write_dataset(table, location, storage_options=storage_options)

# 讀取資料
lance.dataset(location, storage_options=storage_options)

快速開始

安裝 lance-dlf

python3 -m pip install lance-dlf

配置 Catalog 串連

CONFIG = {
    "uri": "http://<DLF-ENDPOINT>", # 公網訪問須使用HTTPS協議
    "warehouse": "<YOUR-CATALOG>",
    "token.provider": "dlf",
    "dlf.region": "<REGION-ID>",
    "dlf.access-key-id": "<ACCESS-KEY-ID>",
    "dlf.access-key-secret": "<ACCESS-KEY-SECRET>",
    "dlf.oss-endpoint": "<OSS-ENDPOINT>", # VPC訪問可選,公網訪問必填
}

配置參數:

參數名稱

說明

uri

DLF Paimon REST 訪問端點,詳見服務存取點。VPC訪問支援HTTP和HTTPS協議,公網訪問必須使用HTTPS協議

warehouse

DLF Catalog 名稱

token.provider

AccessKey 認證模式下,填寫 dlf

dlf.region

DLF 所屬地區ID(如 cn-hangzhou),詳見服務存取點

dlf.access-key-id

訪問 DLF 的 AccessKey ID

dlf.access-key-secret

訪問 DLF 的 AccessKey Secret

dlf.security-token

(可選)STS 情境下使用的安全性權杖

dlf.oss-endpoint

(可選)OSS 公網訪問存取點(如 oss-cn-hangzhou.aliyuncs.com),僅公網訪問DLF必填。DLF預設關閉公網訪問,如需開通,見公網訪問DLF Paimon REST公測說明

重要

AccessKey ID 和 AccessKey Secret 是訪問阿里雲資源的重要憑證,請妥善保管。請勿將真實 AccessKey 提交到 Git 倉庫。建議從環境變數、密鑰管理系統或運行時配置讀取存取憑證。

串連 DLF Catalog

匯入 lance_dlf 模組會自動註冊 dlf namespace 實現:

import lance_namespace
import lance_dlf  # noqa: F401

# 串連 DLF Catalog
ns = lance_namespace.connect("dlf", CONFIG)

# 驗證串連
print(ns.namespace_id())

namespace_id() 方法返回當前串連的 Catalog 資訊,用於確認串連的 DLF 訪問端點和 Catalog名稱。

查看 Namespace 和 Table

from lance_namespace import (
    DescribeNamespaceRequest,
    DescribeTableRequest,
    ListNamespacesRequest,
    ListTablesRequest,
)

DATABASE = "<database>"
TABLE = "<table>"

# 列出所有 Namespace
namespaces = ns.list_namespaces(ListNamespacesRequest(id=[]))
print(namespaces)

# 查看指定 Namespace 中繼資料
namespace = ns.describe_namespace(DescribeNamespaceRequest(id=[DATABASE]))
print(namespace)

# 列出 Namespace 下的所有 Table
tables = ns.list_tables(ListTablesRequest(id=[DATABASE]))
print(tables)

# 查看 Table 詳情
table = ns.describe_table(DescribeTableRequest(id=[DATABASE, TABLE]))
print(table.location)
print(table.properties)
print(table.storage_options)

describe_table 返回欄位:

欄位名稱

說明

location

Lance 資料的實際儲存路徑,通常為 oss://bucket/path 格式

properties

DLF Table 的 Schema 選項,type 欄位必須為 lance-table

storage_options

PyLance 讀寫 OSS 所需的臨時訪問憑證

基礎操作

建立 Lance 表並寫入資料

如果表不存在,需要先建立表然後寫入資料。原理:

  1. 使用 ns.create_table() 在 DLF 中建立表

  2. DLF 返回 Lance 表的儲存位置(location

  3. 擷取 OSS 臨時訪問憑證(storage_options

  4. 使用PyLance 將 Arrow 資料寫入指定的儲存位置。 arrow_table_to_ipc_bytes 函數的定義見資料序列化工具函數

import lance
import pyarrow as pa
from lance_namespace import CreateTableRequest, DescribeTableRequest

DATABASE = "default"
TABLE = "test_lance_create_001"

table_id = [DATABASE, TABLE]

# 構造測試資料
data = pa.table({
    "f0": pa.array([101, 102, 103], type=pa.int64()),
    "f1": pa.array(["create-a", "create-b", "create-c"], type=pa.string()),
})

# 將 Arrow Table 轉為 IPC bytes
def arrow_table_to_ipc_bytes(table):
    sink = pa.BufferOutputStream()
    writer = pa.ipc.new_stream(sink, table.schema)
    writer.write_table(table)
    writer.close()
    return sink.getvalue().to_pybytes()

# 建立表並寫入資料
create_response = ns.create_table(
    CreateTableRequest(id=table_id),
    arrow_table_to_ipc_bytes(data),
)

print(create_response.location)
print(create_response.storage_options.keys())

# 讀取驗證
desc = ns.describe_table(DescribeTableRequest(id=table_id))
dataset = lance.dataset(desc.location, storage_options=desc.storage_options)
result = dataset.to_table()
print(result)

預期輸出:

pyarrow.Table
f0: int64
f1: string
----
f0: [[101,102,103]]
f1: [["create-a","create-b","create-c"]]

寫入已有表

如果 DLF 中已存在 Lance 表,可以先通過 describe_table 擷取表的儲存位置和訪問憑證,然後使用 PyLance 寫入資料。

import lance
import pyarrow as pa
from lance_namespace import DescribeTableRequest

DATABASE = "default"
TABLE = "test_lance_table"

table_id = [DATABASE, TABLE]

# 擷取表詳情
desc = ns.describe_table(DescribeTableRequest(id=table_id))

# 構造測試資料
data = pa.table({
    "f0": pa.array([1, 2, 3], type=pa.int64()),
    "f1": pa.array(["value-1", "value-2", "value-3"], type=pa.string()),
})

# 寫入資料
lance.write_dataset(
    data,
    desc.location,
    mode="overwrite",
    storage_options=desc.storage_options,
)

# 讀取驗證
dataset = lance.dataset(desc.location, storage_options=desc.storage_options)
print(dataset.to_table())

寫入模式說明:

模式

說明

overwrite

覆蓋寫入,適用於初始化空表或測試表

append

追加寫入,要求 Schema 相容

警告

overwrite 模式會覆蓋已有的資料,請謹慎操作。

讀取資料

如果 DLF 中已存在 type=lance-table 的表且已包含資料,可以通過 describe_table 擷取儲存位置和臨時憑證,然後使用 PyLance 讀取資料。詳見建立 Lance 表並寫入資料代碼塊末端的“讀取驗證”部分。

完整樣本

以下樣本示範了完整的工作流程:串連 DLF、建立新表、寫入資料、列出表、讀取驗證。

from datetime import datetime

import lance
import lance_namespace
import pyarrow as pa
from lance_namespace import CreateTableRequest, DescribeTableRequest, ListTablesRequest

import lance_dlf  # noqa: F401


CONFIG = {
    "uri": "http://<DLF-ENDPOINT>",
    "warehouse": "<YOUR-CATALOG>",
    "token.provider": "dlf",
    "dlf.region": "<REGION-ID>",
    "dlf.access-key-id": "<ACCESS-KEY-ID>",
    "dlf.access-key-secret": "<ACCESS-KEY-SECRET>",
    "dlf.oss-endpoint": "<OSS-ENDPOINT>", # 可選
}

DATABASE = "default"


def arrow_table_to_ipc_bytes(table: pa.Table) -> bytes:
    """將 PyArrow 錶轉換為 IPC 位元組流"""
    sink = pa.BufferOutputStream()
    with pa.ipc.new_stream(sink, table.schema) as writer:
        writer.write_table(table)
    return sink.getvalue().to_pybytes()


def main():
    # 串連 DLF Catalog
    ns = lance_namespace.connect("dlf", CONFIG)

    # 產生唯一表名
    table_name = "test_lance_create_" + datetime.now().strftime("%Y%m%d_%H%M%S")
    table_id = [DATABASE, table_name]

    # 構造測試資料
    data = pa.table({
        "f0": pa.array([101, 102, 103], type=pa.int64()),
        "f1": pa.array(["create-a", "create-b", "create-c"], type=pa.string()),
    })

    # 建立表並寫入資料
    create_response = ns.create_table(
        CreateTableRequest(id=table_id),
        arrow_table_to_ipc_bytes(data),
    )

    print("created:", ".".join(table_id))
    print("location:", create_response.location)

    # 列出表並驗證
    tables = ns.list_tables(ListTablesRequest(id=[DATABASE]))
    print("listed_after_create:", table_name in tables.tables)

    # 讀取資料並驗證
    desc = ns.describe_table(DescribeTableRequest(id=table_id))
    dataset = lance.dataset(desc.location, storage_options=desc.storage_options)
    result = dataset.to_table()

    print(result)

    # 資料完整性校正
    expected = data.to_pylist()
    actual = result.to_pylist()
    if actual != expected:
        raise AssertionError(f"readback mismatch: expected={expected}, actual={actual}")

    print("create_write_read: ok")


if __name__ == "__main__":
    main()

附錄

資料序列化工具函數

lance_namespacecreate_table 介面要求將 Arrow 表序列化為 IPC 位元組流。

import pyarrow as pa


def arrow_table_to_ipc_bytes(table: pa.Table) -> bytes:
    """將 PyArrow 錶轉換為 IPC 位元組流"""
    sink = pa.BufferOutputStream()
    with pa.ipc.new_stream(sink, table.schema) as writer:
        writer.write_table(table)
    return sink.getvalue().to_pybytes()

自動構造測試資料

如果不想寫入程式碼列名和類型,可以從 DLF Table Schema 中提取欄位資訊,自動構造測試資料。

import pyarrow as pa
from lance_dlf.common.identifier import Identifier


def sample_value(field_type: str, row: int):
    """根據欄位類型產生樣本值"""
    normalized = field_type.lower()
    if "int" in normalized:
        return row + 1
    if "string" in normalized or "char" in normalized or "varchar" in normalized:
        return f"value-{row + 1}"
    raise ValueError(f"Unsupported sample field type: {field_type}")


def build_sample_table(ns, database: str, table: str) -> pa.Table:
    """根據 DLF 表結構構造樣本資料"""
    raw_table = ns._api.get_table(Identifier(database, table))
    schema = raw_table.get_schema()
    fields = schema.fields if schema and schema.fields else []

    data = {}
    for field in fields:
        field_type = str(field.type)
        data[field.name] = [sample_value(field_type, row) for row in range(3)]

    return pa.table(data)

使用樣本:

desc = ns.describe_table(DescribeTableRequest(id=[DATABASE, TABLE]))
data = build_sample_table(ns, DATABASE, TABLE)

lance.write_dataset(
    data,
    desc.location,
    mode="overwrite",
    storage_options=desc.storage_options,
)