本文介紹如何通過 Daft DataFrame 引擎讀寫由 DLF Catalog 管理的 Lance 表。Daft 提供 lazy DataFrame API,適合需要查詢過濾或批次運算的情境。
如需使用 PyLance 直接讀寫 Lance 表,請參閱使用 Python 操作 DLF Lance 表。
術語說明
組件 | 作用 |
DLF | Catalog 服務,管理 database/table 中繼資料,儲存 Lance 表路徑,發放 OSS 臨時憑證 |
Lance/PyLance | 資料格式和底層讀寫實現,負責 OSS 上 Lance dataset 的實際 I/O |
Daft | DataFrame 計算引擎,提供 |
| 連接器,從 DLF 擷取表路徑和 OSS 臨時憑證,供 Daft 使用。 |
技術架構
使用者代碼
→ lance_namespace.connect("dlf", CONFIG) # 串連 DLF Catalog
→ DLF 返回 Lance 表路徑 + OSS 臨時憑證
→ apply_oss_environment(...) # 設定 OSS_* 環境變數
→ daft.read_lance("oss://...") # 讀取資料
→ df.write_lance("oss://...", mode="append") # 寫入資料概念映射:
DLF database → Lance namespace
DLF table → Lance table
前提條件
安裝依賴
python3 -m pip install lance-dlf daftlance-dlf 會自動安裝 lance_namespace 和 pyarrow 等依賴。
配置 Catalog 串連
CONFIG = {
"uri": "http://<DLF-ENDPOINT>", # 公網訪問須使用HTTPS協議
"warehouse": "<YOUR-CATALOG>",
"token.provider": "dlf",
"dlf.region": "<REGION-ID>",
"dlf.access-key-id": "<ACCESS-KEY-ID>",
"dlf.access-key-secret": "<ACCESS-KEY-SECRET>",
"dlf.oss-endpoint": "<OSS-ENDPOINT>", # VPC訪問可選,公網訪問必填
}配置參數:
參數名稱 | 說明 |
| DLF Paimon REST 訪問端點,詳見服務存取點。VPC訪問支援HTTP和HTTPS協議,公網訪問必須使用HTTPS協議 |
| DLF Catalog 名稱 |
| AccessKey 認證模式下,填寫 |
| DLF 所屬地區ID(如 |
| 訪問 DLF 的 AccessKey ID |
| 訪問 DLF 的 AccessKey Secret |
| (可選)STS 情境下使用的安全性權杖 |
| (可選)OSS 公網訪問存取點(如 |
AccessKey ID 和 AccessKey Secret 是訪問阿里雲資源的重要憑證,請妥善保管。請勿將真實 AccessKey 提交到 Git 倉庫。建議從環境變數、密鑰管理系統或運行時配置讀取存取憑證。
串連 DLF
匯入 lance_dlf 會自動註冊 dlf namespace:
import lance_namespace
import lance_dlf # noqa: F401
ns = lance_namespace.connect("dlf", CONFIG)
print(ns.namespace_id())讀取已有表
第一步:擷取表路徑和憑證
通過Daft 讀寫表之前,先通過 describe_table() 從 DLF 擷取表路徑和 OSS 臨時憑證。
from lance_namespace import DescribeTableRequest
DATABASE = "<database>"
TABLE = "<table>"
desc = ns.describe_table(DescribeTableRequest(id=[DATABASE, TABLE]))
print(desc.location)
print(sorted((desc.storage_options or {}).keys()))desc 包含兩個關鍵字段:
desc.location:Lance 表格儲存體路徑,格式為oss://bucket/path/to/tabledesc.storage_options:OSS 臨時憑證字典
第二步:設定並調用 OSS 憑證環境變數
Daft 底層通過 PyLance 讀寫 OSS。需要在代碼裡定義 apply_oss_environment,把 DLF 返回的臨時憑證設定成 OSS_* 環境變數,然後調用該函數:
import os
def apply_oss_environment(storage_options: dict) -> None:
os.environ["OSS_ENDPOINT"] = storage_options["oss_endpoint"]
os.environ["OSS_ACCESS_KEY_ID"] = storage_options["oss_access_key_id"]
os.environ["OSS_ACCESS_KEY_SECRET"] = storage_options["oss_secret_access_key"]
if storage_options.get("oss_security_token"):
os.environ["OSS_SECURITY_TOKEN"] = storage_options["oss_security_token"]
if storage_options.get("oss_region"):
os.environ["OSS_REGION"] = storage_options["oss_region"]
# 調用函數
apply_oss_environment(desc.storage_options or {})第三步:使用Daft讀取表資料
import daft
df = daft.read_lance(desc.location)
df.show()寫入資料
追加寫入已有表
完成第一步:擷取表路徑和憑證和第二步:設定並調用 OSS 憑證環境變數後,使用 mode="append" 追加資料:
# 前置:串連Catalog、擷取表路徑和憑證、設定並調用OSS憑證環境變數
desc = ns.describe_table(DescribeTableRequest(id=[DATABASE, TABLE]))
apply_oss_environment(desc.storage_options or {})
# 追加寫入
append_df = daft.from_pydict({
"f0": [204],
"f1": ["daft-d"],
})
append_df.write_lance(desc.location, mode="append")
# 寫入後讀取驗證:
df2 = daft.read_lance(desc.location)
df2.show()建立新表並寫入
新表必須先通過 ns.create_table() 建立(該方法同時寫入首批資料),再用 Daft 進行後續讀寫。
# 前置:串連Catalog、擷取表路徑和憑證、設定並調用OSS憑證環境變數
from datetime import datetime
import pyarrow as pa
from lance_namespace import CreateTableRequest, DescribeTableRequest
# 定義Arrow轉換成IPC位元組流
def arrow_table_to_ipc_bytes(table: pa.Table) -> bytes:
sink = pa.BufferOutputStream()
with pa.ipc.new_stream(sink, table.schema) as writer:
writer.write_table(table)
return sink.getvalue().to_pybytes()
# 建立表並寫入首批資料
table_name = "test_lance_daft_" + datetime.now().strftime("%Y%m%d_%H%M%S")
table_id = [DATABASE, table_name]
rows = {
"f0": [201, 202, 203],
"f1": ["daft-a", "daft-b", "daft-c"],
}
arrow_table = pa.table(rows)
create_response = ns.create_table(
CreateTableRequest(id=table_id),
arrow_table_to_ipc_bytes(arrow_table),
)
print(create_response.location)建立完成後,通過 describe_table 擷取憑證,再用 Daft 讀寫:
# 擷取憑證並設定環境變數
desc = ns.describe_table(DescribeTableRequest(id=table_id))
apply_oss_environment(desc.storage_options or {})
# 讀取驗證
df = daft.read_lance(desc.location)
df.show()
# 使用Daft追加資料
append_rows = {
"f0": [204],
"f1": ["daft-d"],
}
append_df = daft.from_pydict(append_rows)
meta = append_df.write_lance(desc.location, mode="append")
meta.show()
# 再次讀取確認
appended_df = daft.read_lance(desc.location)
appended_df.show()預期輸出:
[
{"f0": 201, "f1": "daft-a"},
{"f0": 202, "f1": "daft-b"},
{"f0": 203, "f1": "daft-c"},
{"f0": 204, "f1": "daft-d"},
]完整樣本
以下指令碼示範端到端流程:建立新表 → 讀取 → 追加 → 驗證。
from __future__ import annotations
from datetime import datetime
import os
import daft
import lance_namespace
import pyarrow as pa
from lance_namespace import CreateTableRequest, DescribeTableRequest
import lance_dlf # noqa: F401
CONFIG = {
"uri": "http://<DLF-ENDPOINT>",
"warehouse": "<YOUR-CATALOG>",
"token.provider": "dlf",
"dlf.region": "<REGION-ID>",
"dlf.access-key-id": "<ACCESS-KEY-ID>",
"dlf.access-key-secret": "<ACCESS-KEY-SECRET>",
"dlf.oss-endpoint": "<OSS-ENDPOINT>", # 僅公網訪問DLF必填
}
DATABASE = "default"
# 定義 Arrow 表序列化函數
def arrow_table_to_ipc_bytes(table: pa.Table) -> bytes:
sink = pa.BufferOutputStream()
with pa.ipc.new_stream(sink, table.schema) as writer:
writer.write_table(table)
return sink.getvalue().to_pybytes()
# 定義OSS環境變數函數
def apply_oss_environment(storage_options: dict) -> None:
os.environ["OSS_ENDPOINT"] = storage_options["oss_endpoint"]
os.environ["OSS_ACCESS_KEY_ID"] = storage_options["oss_access_key_id"]
os.environ["OSS_ACCESS_KEY_SECRET"] = storage_options["oss_secret_access_key"]
if storage_options.get("oss_security_token"):
os.environ["OSS_SECURITY_TOKEN"] = storage_options["oss_security_token"]
if storage_options.get("oss_region"):
os.environ["OSS_REGION"] = storage_options["oss_region"]
def df_to_pydict(df):
try:
return df.to_pydict()
except AttributeError:
return df.collect().to_pydict()
def main() -> None:
ns = lance_namespace.connect("dlf", CONFIG)
# 1. 建立新表
table_name = "test_lance_daft_" + datetime.now().strftime("%Y%m%d_%H%M%S")
table_id = [DATABASE, table_name]
rows = {
"f0": [201, 202, 203],
"f1": ["daft-a", "daft-b", "daft-c"],
}
arrow_table = pa.table(rows)
create_response = ns.create_table(
CreateTableRequest(id=table_id),
arrow_table_to_ipc_bytes(arrow_table),
)
print("created:", ".".join(table_id))
print("location:", create_response.location)
# 2. 擷取憑證
desc = ns.describe_table(DescribeTableRequest(id=table_id))
apply_oss_environment(desc.storage_options or {})
# 3. 讀取驗證
read_df = daft.read_lance(desc.location)
read_df.show()
if df_to_pydict(read_df) != rows:
raise AssertionError("Initial readback mismatch")
# 4. 追加資料
append_rows = {
"f0": [204],
"f1": ["daft-d"],
}
append_df = daft.from_pydict(append_rows)
append_df.write_lance(desc.location, mode="append").show()
# 5. 最終驗證
appended_df = daft.read_lance(desc.location)
appended_df.show()
expected = {
"f0": rows["f0"] + append_rows["f0"],
"f1": rows["f1"] + append_rows["f1"],
}
if df_to_pydict(appended_df) != expected:
raise AssertionError("Daft append readback mismatch")
print("daft + dlf + lance: ok")
if __name__ == "__main__":
main()注意事項
新表初始化:使用
ns.create_table(...)建立新表並寫入第一批資料,再用 Daft 讀寫後續資料。日誌脫敏:
storage_options的完整值包含臨時 AK/SK/token,建議僅列印 key 列表:print(sorted((desc.storage_options or {}).keys()))