全部產品
Search
文件中心

Data Lake Formation:使用 Daft 操作 DLF Lance 表

更新時間:May 13, 2026

本文介紹如何通過 Daft DataFrame 引擎讀寫由 DLF Catalog 管理的 Lance 表。Daft 提供 lazy DataFrame API,適合需要查詢過濾或批次運算的情境。

說明

如需使用 PyLance 直接讀寫 Lance 表,請參閱使用 Python 操作 DLF Lance 表

術語說明

組件

作用

DLF

Catalog 服務,管理 database/table 中繼資料,儲存 Lance 表路徑,發放 OSS 臨時憑證

Lance/PyLance

資料格式和底層讀寫實現,負責 OSS 上 Lance dataset 的實際 I/O

Daft

DataFrame 計算引擎,提供 read_lance / write_lance 介面

lance-dlf

連接器,從 DLF 擷取表路徑和 OSS 臨時憑證,供 Daft 使用。lance-dlf 僅暴露 type=lance-table 的表

技術架構

使用者代碼
  → lance_namespace.connect("dlf", CONFIG)     # 串連 DLF Catalog
  → DLF 返回 Lance 表路徑 + OSS 臨時憑證
  → apply_oss_environment(...)                  # 設定 OSS_* 環境變數
  → daft.read_lance("oss://...")               # 讀取資料
  → df.write_lance("oss://...", mode="append") # 寫入資料

概念映射:

  • DLF database → Lance namespace

  • DLF table → Lance table

前提條件

安裝依賴

python3 -m pip install lance-dlf daft

lance-dlf 會自動安裝 lance_namespacepyarrow 等依賴。

配置 Catalog 串連

CONFIG = {
    "uri": "http://<DLF-ENDPOINT>", # 公網訪問須使用HTTPS協議
    "warehouse": "<YOUR-CATALOG>",
    "token.provider": "dlf",
    "dlf.region": "<REGION-ID>",
    "dlf.access-key-id": "<ACCESS-KEY-ID>",
    "dlf.access-key-secret": "<ACCESS-KEY-SECRET>",
    "dlf.oss-endpoint": "<OSS-ENDPOINT>", # VPC訪問可選,公網訪問必填
}

配置參數:

參數名稱

說明

uri

DLF Paimon REST 訪問端點,詳見服務存取點。VPC訪問支援HTTP和HTTPS協議,公網訪問必須使用HTTPS協議

warehouse

DLF Catalog 名稱

token.provider

AccessKey 認證模式下,填寫 dlf

dlf.region

DLF 所屬地區ID(如 cn-hangzhou),詳見服務存取點

dlf.access-key-id

訪問 DLF 的 AccessKey ID

dlf.access-key-secret

訪問 DLF 的 AccessKey Secret

dlf.security-token

(可選)STS 情境下使用的安全性權杖

dlf.oss-endpoint

(可選)OSS 公網訪問存取點(如 oss-cn-hangzhou.aliyuncs.com),僅公網訪問DLF必填。DLF預設關閉公網訪問,如需開通,見公網訪問DLF Paimon REST公測說明

重要

AccessKey ID 和 AccessKey Secret 是訪問阿里雲資源的重要憑證,請妥善保管。請勿將真實 AccessKey 提交到 Git 倉庫。建議從環境變數、密鑰管理系統或運行時配置讀取存取憑證。

串連 DLF

匯入 lance_dlf 會自動註冊 dlf namespace:

import lance_namespace
import lance_dlf  # noqa: F401

ns = lance_namespace.connect("dlf", CONFIG)
print(ns.namespace_id())

讀取已有表

第一步:擷取表路徑和憑證

通過Daft 讀寫表之前,先通過 describe_table() 從 DLF 擷取表路徑和 OSS 臨時憑證。

from lance_namespace import DescribeTableRequest

DATABASE = "<database>"
TABLE = "<table>"

desc = ns.describe_table(DescribeTableRequest(id=[DATABASE, TABLE]))

print(desc.location)
print(sorted((desc.storage_options or {}).keys()))

desc 包含兩個關鍵字段:

  • desc.location:Lance 表格儲存體路徑,格式為 oss://bucket/path/to/table

  • desc.storage_options:OSS 臨時憑證字典

第二步:設定並調用 OSS 憑證環境變數

Daft 底層通過 PyLance 讀寫 OSS。需要在代碼裡定義 apply_oss_environment,把 DLF 返回的臨時憑證設定成 OSS_* 環境變數,然後調用該函數:

import os


def apply_oss_environment(storage_options: dict) -> None:
    os.environ["OSS_ENDPOINT"] = storage_options["oss_endpoint"]
    os.environ["OSS_ACCESS_KEY_ID"] = storage_options["oss_access_key_id"]
    os.environ["OSS_ACCESS_KEY_SECRET"] = storage_options["oss_secret_access_key"]
    if storage_options.get("oss_security_token"):
        os.environ["OSS_SECURITY_TOKEN"] = storage_options["oss_security_token"]
    if storage_options.get("oss_region"):
        os.environ["OSS_REGION"] = storage_options["oss_region"]

# 調用函數
apply_oss_environment(desc.storage_options or {})

第三步:使用Daft讀取表資料

import daft

df = daft.read_lance(desc.location)
df.show()

寫入資料

追加寫入已有表

完成第一步:擷取表路徑和憑證第二步:設定並調用 OSS 憑證環境變數後,使用 mode="append" 追加資料:

# 前置:串連Catalog、擷取表路徑和憑證、設定並調用OSS憑證環境變數
desc = ns.describe_table(DescribeTableRequest(id=[DATABASE, TABLE]))
apply_oss_environment(desc.storage_options or {})

# 追加寫入
append_df = daft.from_pydict({
    "f0": [204],
    "f1": ["daft-d"],
})

append_df.write_lance(desc.location, mode="append")

# 寫入後讀取驗證:
df2 = daft.read_lance(desc.location)
df2.show()

建立新表並寫入

新表必須先通過 ns.create_table() 建立(該方法同時寫入首批資料),再用 Daft 進行後續讀寫。

# 前置:串連Catalog、擷取表路徑和憑證、設定並調用OSS憑證環境變數
from datetime import datetime
import pyarrow as pa
from lance_namespace import CreateTableRequest, DescribeTableRequest

# 定義Arrow轉換成IPC位元組流
def arrow_table_to_ipc_bytes(table: pa.Table) -> bytes:
    sink = pa.BufferOutputStream()
    with pa.ipc.new_stream(sink, table.schema) as writer:
        writer.write_table(table)
    return sink.getvalue().to_pybytes()


# 建立表並寫入首批資料
table_name = "test_lance_daft_" + datetime.now().strftime("%Y%m%d_%H%M%S")
table_id = [DATABASE, table_name]

rows = {
    "f0": [201, 202, 203],
    "f1": ["daft-a", "daft-b", "daft-c"],
}
arrow_table = pa.table(rows)

create_response = ns.create_table(
    CreateTableRequest(id=table_id),
    arrow_table_to_ipc_bytes(arrow_table),
)
print(create_response.location)

建立完成後,通過 describe_table 擷取憑證,再用 Daft 讀寫:

# 擷取憑證並設定環境變數
desc = ns.describe_table(DescribeTableRequest(id=table_id))
apply_oss_environment(desc.storage_options or {})

# 讀取驗證
df = daft.read_lance(desc.location)
df.show()

# 使用Daft追加資料
append_rows = {
    "f0": [204],
    "f1": ["daft-d"],
}
append_df = daft.from_pydict(append_rows)

meta = append_df.write_lance(desc.location, mode="append")
meta.show()

# 再次讀取確認
appended_df = daft.read_lance(desc.location)
appended_df.show()

預期輸出:

[
    {"f0": 201, "f1": "daft-a"},
    {"f0": 202, "f1": "daft-b"},
    {"f0": 203, "f1": "daft-c"},
    {"f0": 204, "f1": "daft-d"},
]

完整樣本

以下指令碼示範端到端流程:建立新表 → 讀取 → 追加 → 驗證。

from __future__ import annotations

from datetime import datetime
import os

import daft
import lance_namespace
import pyarrow as pa
from lance_namespace import CreateTableRequest, DescribeTableRequest

import lance_dlf  # noqa: F401


CONFIG = {
    "uri": "http://<DLF-ENDPOINT>",
    "warehouse": "<YOUR-CATALOG>",
    "token.provider": "dlf",
    "dlf.region": "<REGION-ID>",
    "dlf.access-key-id": "<ACCESS-KEY-ID>",
    "dlf.access-key-secret": "<ACCESS-KEY-SECRET>",
    "dlf.oss-endpoint": "<OSS-ENDPOINT>", # 僅公網訪問DLF必填
}

DATABASE = "default"

# 定義 Arrow 表序列化函數
def arrow_table_to_ipc_bytes(table: pa.Table) -> bytes:
    sink = pa.BufferOutputStream()
    with pa.ipc.new_stream(sink, table.schema) as writer:
        writer.write_table(table)
    return sink.getvalue().to_pybytes()

# 定義OSS環境變數函數
def apply_oss_environment(storage_options: dict) -> None:
    os.environ["OSS_ENDPOINT"] = storage_options["oss_endpoint"]
    os.environ["OSS_ACCESS_KEY_ID"] = storage_options["oss_access_key_id"]
    os.environ["OSS_ACCESS_KEY_SECRET"] = storage_options["oss_secret_access_key"]
    if storage_options.get("oss_security_token"):
        os.environ["OSS_SECURITY_TOKEN"] = storage_options["oss_security_token"]
    if storage_options.get("oss_region"):
        os.environ["OSS_REGION"] = storage_options["oss_region"]


def df_to_pydict(df):
    try:
        return df.to_pydict()
    except AttributeError:
        return df.collect().to_pydict()


def main() -> None:
    ns = lance_namespace.connect("dlf", CONFIG)

    # 1. 建立新表
    table_name = "test_lance_daft_" + datetime.now().strftime("%Y%m%d_%H%M%S")
    table_id = [DATABASE, table_name]

    rows = {
        "f0": [201, 202, 203],
        "f1": ["daft-a", "daft-b", "daft-c"],
    }
    arrow_table = pa.table(rows)

    create_response = ns.create_table(
        CreateTableRequest(id=table_id),
        arrow_table_to_ipc_bytes(arrow_table),
    )
    print("created:", ".".join(table_id))
    print("location:", create_response.location)

    # 2. 擷取憑證
    desc = ns.describe_table(DescribeTableRequest(id=table_id))
    apply_oss_environment(desc.storage_options or {})

    # 3. 讀取驗證
    read_df = daft.read_lance(desc.location)
    read_df.show()
    if df_to_pydict(read_df) != rows:
        raise AssertionError("Initial readback mismatch")

    # 4. 追加資料
    append_rows = {
        "f0": [204],
        "f1": ["daft-d"],
    }
    append_df = daft.from_pydict(append_rows)
    append_df.write_lance(desc.location, mode="append").show()


    # 5. 最終驗證
    appended_df = daft.read_lance(desc.location)
    appended_df.show()
    expected = {
        "f0": rows["f0"] + append_rows["f0"],
        "f1": rows["f1"] + append_rows["f1"],
    }
    if df_to_pydict(appended_df) != expected:
        raise AssertionError("Daft append readback mismatch")

    print("daft + dlf + lance: ok")


if __name__ == "__main__":
    main()

注意事項

  • 新表初始化:使用 ns.create_table(...) 建立新表並寫入第一批資料,再用 Daft 讀寫後續資料。

  • 日誌脫敏storage_options 的完整值包含臨時 AK/SK/token,建議僅列印 key 列表:

    print(sorted((desc.storage_options or {}).keys()))