本文介紹如何利用 Serverless Spark 內建函數,在超大規模文本集合中快速識別並去除近似重複內容,適用於大模型預訓練語料清洗等情境。
MinHash-LSH 演算法簡介
MinHash-LSH 是一種經典的近似相似性檢測演算法組合,廣泛應用於大規模集合相似性計算(如 Jaccard 相似性)。其核心思想是:
MinHash:將文本轉換為 n-gram 集合後,通過多組雜湊函數產生緊湊的簽名向量,保留原創組合的相似性特徵。
LSH(局部敏感雜湊):將簽名向量劃分為多個“band”,每個 band 單獨雜湊,使得高相似性文本更可能落入同一雜湊桶中,從而實現候選對的快速篩選。
Serverless Spark 通過 minhash_lsh 和 build_lsh_edges 函數整合 MinHash-LSH 能力,依託 Fusion 引擎實現向量化加速,在消除資料行列轉換開銷的同時,提升了去重任務的執行效率。
內建函數說明
minhash_lsh
將輸入文本分詞後產生 MinHash 簽名,並按 bands 劃分產生對應的雜湊值列表。
命令格式
minhash_lsh(
tokens: ARRAY<STRING>,
perms_a: ARRAY<BIGINT>,
perms_b: ARRAY<BIGINT>,
hash_ranges: ARRAY<INT>,
ngram_size: INT,
min_length: INT
)參數說明
參數 | 類型 | 是否必填 | 說明 |
|
| 是 | 分詞後的詞元數組(如通過 |
|
| 是 | MinHash 雜湊函數組的乘數參數 |
|
| 是 | MinHash 雜湊函數組的加數參數 |
|
| 是 | Band 劃分邊界,格式為 |
|
| 是 | n-gram 大小,建議長文本使用 5–9。 |
|
| 是 | 輸入 tokens 最小長度,低於此值的記錄將被跳過。 |
傳回值
類型:
ARRAY<STRING>含義:每個元素為對應 band 的十六進位雜湊字串,共
個。 輸出樣本:
["a1b2c3", "d4e5f6", ...]
build_lsh_edges
對落入同一 LSH 桶的一組文檔 ID,基於“最小節點串連”策略產生邊集,用於後續圖連通分量分析以聚類重複項。
命令格式
build_lsh_edges(doc_ids: ARRAY<BIGINT>)參數說明
參數 | 類型 | 是否必填 | 說明 |
|
| 是 | 同一 LSH 桶中的文檔 ID 列表。 |
傳回值:
類型:
ARRAY<STRUCT<src: LONG, dst: LONG>>含義:以桶內最小 ID 為源節點,串連其餘所有節點所形成的邊集合。
樣本邏輯:桶內 ID 為
[1003, 1001, 1005]→ 排序後取最小1001,產生邊(1001,1003)和(1001,1005)
應用樣本:fineweb-edu 資料集去重
本樣本使用開來源資料集 fineweb-edu 的 sample/10BT 子集進行示範。

minhash_lsh:該函數將每行輸入文本切分為 n-gram 詞元集合,通過 MinHash 演算法產生固定長度的簽名向量;隨後根據指定的hash_ranges將簽名劃分為多個 band,並為每個 band 計算一個十六進位編碼的雜湊值,最終返回所有 band 的雜湊值數組。build_lsh_edges:該函數接收落入同一 LSH 桶的一組文檔 ID,基於“最小節點串連”策略——即以桶內最小 ID 作為中心節點,與其他所有節點建立串連——產生邊列表,用於後續圖連通分量分析,實現重複項的聚類。
支援版本
僅以下引擎版本支援本文操作:
esr-4.x:esr-4.1.1及之後版本。
esr-3.x:esr-3.1.1及之後版本。
esr-2.x:esr-2.5.1及之後版本。
步驟一:準備資料
下載資料集。
sample/10BT開來源資料集的完整大小為28.5GB,請根據實際情況進行下載。本文樣本僅使用該資料集的部分資料,大小為2.15GB。
上傳資料集至OSS。
使用控制台或 CLI 將 Parquet 檔案上傳至指定路徑,詳見簡單上傳。
查看上傳資料。
在 Spark 環境中建立臨時視圖,驗證資料是否可正常讀取。
CREATE OR REPLACE TEMPORARY VIEW temp_source USING parquet OPTIONS (path 'oss://<bucket>/fineweb-edu/sample/10BT/') ;查看資料,總條數727000。
步驟二:編寫指令碼
儲存為 MinHash.py,完整代碼如下(參數說明見後表):
import re
from typing import List
from typing import Tuple
import pyspark
import numpy as np
import numpy.typing as npt
from graphframes import GraphFrame
from pyspark.sql import SparkSession
from pyspark.sql import functions as sf
from scipy.integrate import quad as integrate
RNG = np.random.RandomState(42)
SPLIT_PATTERN = re.compile(r"[\s\xA0]+") # 設定文本分割符
DTYPE = np.uint32
MAX_HASH = 4_294_967_295 # maximum 32-bit unsigned integer
MOD_PRIME = 4_294_967_291 # maximum 32-bit prime number
input_files = "oss://<bucket>/fineweb-edu/sample/10BT/*.parquet" # 設定需要去重的資料檔案
output_path = "oss://<bucket>/fineweb-edu/output/10BT" # 設定輸出路徑
checkpoint_dir = "oss://<bucket>/fineweb-edu/checkpoints" # 設定checkpoint,用於圖連通分量計算時儲存中間結果
threshold = 0.8 # 設定 threshold 參數
num_perm = 256 # 設定排列的數量
ngram_size = 5 # 設定 n_gram 大小
min_length = ngram_size # 設定 n_gram 最小長度,通常與 ngram_size 相同
text_column = "text" # 設定去重文本列名
index_column = "__id__"
# 設定固定預設並發度,確保圖連通分量計算結果穩定性
conf = (
pyspark.SparkConf()
.set("spark.default.parallelism", "200")
)
spark = SparkSession.Builder() \
.appName("MinHashLSH") \
.config(conf=conf) \
.enableHiveSupport() \
.getOrCreate()
spark.sparkContext.setCheckpointDir(checkpoint_dir)
# 自動計算最優 Band 參數 (B, R)
def optimal_param(
threshold: float,
num_perm: int,
false_positive_weight: float = 0.5,
false_negative_weight: float = 0.5
):
def false_positive_area(threshold: float, b: int, r: int):
a, _ = integrate(lambda s: 1 - (1 - s**r)**b, 0.0, threshold)
return a
def false_negative_area(threshold: float, b: int, r: int):
a, _ = integrate(lambda s: 1 - (1 - (1 - s**r)**b), threshold, 1.0)
return a
min_error = float("inf")
opt = (0, 0)
for b in range(1, num_perm + 1):
max_r = int(num_perm / b)
for r in range(1, max_r + 1):
fp = false_positive_area(threshold, b, r)
fn = false_negative_area(threshold, b, r)
error = fp * false_positive_weight + fn * false_negative_weight
if error < min_error:
min_error = error
opt = (b, r)
return opt
# B (Bands), R (Rows per Band), B × R = num_perm
# B 和 R 會將 MinHash 簽名矩陣分成 B 個 bands,每個 band 包含 R 行,可以自行指定該參數,也可以通過 threshold 和 num_perm 進行計算
B, R = optimal_param(threshold, num_perm)
HASH_RANGES_SLICE: List[int] = [i * R for i in range(B + 1)]
PERMUTATIONS: Tuple[npt.NDArray[DTYPE], npt.NDArray[DTYPE]] = (
RNG.randint(1, MOD_PRIME, size=(num_perm,), dtype=DTYPE),
RNG.randint(0, MOD_PRIME, size=(num_perm,), dtype=DTYPE),
)
# 若資料本身具有主鍵 id 列,可以直接使用主鍵作為 index_column,無需使用 monotonically_increasing_id 產生 index
# .withColumn(index_column, "主鍵col")
df = spark.read.parquet(input_files) \
.withColumn(index_column, sf.monotonically_increasing_id())
a, b = PERMUTATIONS
# 使用 minhash_lsh 函數求解 band_hash list,並通過 explode 將 list 展開為 band_idx 和 band_hash
hash_df = df \
.select(index_column,
sf.split(
sf.lower(text_column),
pattern=SPLIT_PATTERN.pattern).alias("tokens")) \
.select(index_column,
sf.minhash_lsh(
"tokens",
a.tolist(),
b.tolist(),
HASH_RANGES_SLICE,
ngram_size,
min_length).alias("hashes")) \
.select(index_column, sf.posexplode("hashes").alias("band_idx", "band_hash"))
# 使用 build_lsh_edges 函數對彙總後屬於同一個 LSH 桶的 band_idx 基於 “最小節點串連” 策略產生節點之間的邊
edges_df = hash_df.groupBy("band_idx", "band_hash") \
.agg(sf.count(index_column).alias("cnt"), sf.collect_list(index_column).alias("doc_ids")) \
.filter(sf.col("cnt") > 1) \
.select(sf.build_lsh_edges("doc_ids").alias("edges")) \
.select(sf.explode("edges").alias("edge")) \
.selectExpr("edge.src as src", "edge.dst as dst") \
.persist(pyspark.StorageLevel.DISK_ONLY)
# 計算出所有邊的頂點
vertices_df = edges_df.select(sf.col("src").alias("id")) \
.union(edges_df.select(sf.col("dst").alias("id"))) \
.distinct() \
.repartition(4096) \
.persist(pyspark.StorageLevel.MEMORY_AND_DISK_DESER)
assignment = GraphFrame(vertices_df, edges_df).connectedComponents()
# 保留每個連通分量中 ID 最小的代表文檔
df = df.join(assignment.select(sf.col("id").alias(index_column), sf.col("component").alias("__component__")),
on=index_column, how="left") \
.filter(sf.col("__component__").isNull() | (sf.col("__component__") == sf.col(index_column))) \
.drop("__component__")
# 輸出去重結果
df.write.parquet(output_path, mode="overwrite", compression="snappy")
# 清理緩衝
edges_df.unpersist()
vertices_df.unpersist()參數說明表
參數 | 樣本值 | 說明 |
|
| 輸入資料路徑(Parquet 格式)。 |
|
| 去重後結果輸出路徑。 |
|
| 圖計算中間狀態儲存路徑。 |
|
| 相似性閾值(推薦 0.7–0.9)。 |
|
| MinHash 簽名長度(影響精度與效能)。 |
|
| n-gram 大小(長文字建議 5–9)。 |
|
| 文本最小 token 長度,低於則跳過。 |
|
| 待去重的文本列名。 |
步驟三:上傳檔案
進入資源上傳頁面。
在左側導覽列,選擇。
在Spark頁面,單擊目標工作空間名稱。
在EMR Serverless Spark頁面,單擊左側導覽列中的檔案管理。
在檔案管理頁面,單擊上傳檔案。
在上傳檔案對話方塊中,單擊待上傳檔案地區選取項目MinHash.py,或直接拖拽MinHash.py到待上傳檔案地區。
步驟四:建立並運行批任務
在完成指令碼開發與上傳後,您需要在 Serverless Spark 環境中建立一個批處理任務來執行該去重作業。
在EMR Serverless Spark頁面,單擊左側的資料開發。
在開發目錄頁簽下,單擊
表徵圖。在彈出的對話方塊中,輸入名稱,類型使用,單擊確定。
在右上方選擇隊列。
添加隊列的具體操作,請參見管理資源隊列。
在建立的開發頁簽中,配置以下資訊,其餘參數無需配置,然後單擊運行。
參數
說明
主Python資源
選擇前一個步驟中在檔案管理頁面上傳的Python檔案。本文樣本是
MinHash.py。引擎版本
esr-2.8.0/esr-3.4.0/esr-4.4.0及以上版本。資源配置
建議按照4CPU:16GB的比例進行配置。例如,
spark.executor.cores=4,spark.executor.memory=14GB,spark.executor.memoryOverhead=2GB。Spark配置
spark.rdd.ensureConfigConsistency true:必填項,必須設定為true。執行效率最佳化參數:
spark.sql.shuffle.partitions 1000:建議 1TB 資料以下配置為 1000 即可,每增加 1TB 資料可增加 1000。spark.sql.files.maxPartitionBytes 256MB:控制讀取階段分區大小,避免小檔案過多,通常配置為 128/256MB 即可。
運行任務後,在下方的運行記錄地區,單擊任務操作列的詳情,監控任務執行。
步驟五:驗證結果
任務成功完成後,驗證去重結果。
在 Spark SQL 環境中建立臨時視圖以載入去重後的資料。
CREATE OR REPLACE TEMPORARY VIEW temp_target USING parquet OPTIONS (path 'oss://<bucket>/fineweb-edu/output/10BT') ;查看資料,總條數724809(原 727,000 → 去除 2,191 條重複項)。
效能調優建議
為在不同資料規模和營運目標下平衡去重的精度、召回率與執行效率,建議從以下三個層面進行系統性調優。
演算法參數調優
參數 | 推薦值 | 說明 |
| 128 或 256 | 數值越大精度越高,但計算成本上升;256 通常足夠。 |
| 5–9(長文)、2–3(短句) | 控制語義粒度,過大可能損失敏感性。 |
| 0.7–0.9 | 高於 0.9 可能召回不足。 |
LSH 結構設計:B 與 R
MinHash 簽名被劃分為
理想情況下,應使 LSH 函數:
推薦配置(以 num_perm=256 為例)
目標 | B(band 數) | R(每 band 行數) |
高召回(不漏刪) | 32 | 8 |
高精度(不過刪) | 8 | 32 |
平衡型(推薦起點) | 16 | 16 |
Spark 執行最佳化
配置項 | 推薦設定 | 說明 |
| 1TB 資料以下配置為 1000 即可,每增加 1TB 資料可增加 1000。 | 防止單 task 資料扭曲或 OOM。 |
|
| 控制讀取階段分區大小,避免小檔案問題。 |