All Products
Search
Document Center

E-MapReduce:Large-scale text deduplication using MinHash-LSH

Last Updated:Dec 04, 2025

This topic describes how to use the built-in functions of Serverless Spark to quickly identify and remove near-duplicate content from massive text collections. This method is suitable for scenarios such as cleaning pre-training corpora for large models.

Introduction to the MinHash-LSH algorithm

MinHash-LSH is a classic algorithm combination for approximate similarity detection. It is widely used for large-scale set similarity calculations, such as Jaccard similarity. The core idea is as follows:

  1. MinHash: Converts text into a set of n-grams and then uses multiple hash functions to generate a compact signature vector that preserves the similarity features of the original set.

  2. Locality-Sensitive Hashing (LSH): Divides the signature vector into multiple bands and hashes each band separately. This process increases the likelihood that highly similar texts are placed in the same hash bucket, which allows for the rapid filtering of candidate pairs.

Serverless Spark integrates MinHash-LSH capabilities through the minhash_lsh and build_lsh_edges functions. It uses the Fusion engine for vectorized acceleration. This eliminates the overhead of data row-to-column conversion and improves the execution efficiency of deduplication tasks.

Built-in function descriptions

minhash_lsh

This function generates a MinHash signature by tokenizing the input text and then creates a list of corresponding hash values by dividing the signature into bands.

Command format

minhash_lsh(
  tokens: ARRAY<STRING>,
  perms_a: ARRAY<BIGINT>,
  perms_b: ARRAY<BIGINT>,
  hash_ranges: ARRAY<INT>,
  ngram_size: INT,
  min_length: INT
)

Parameter descriptions

Parameter

Type

Required

Description

tokens

ARRAY<STRING>

Yes

An array of tokens after tokenization. For example, generated by split(lower(text), '\\s+').

perms_a

ARRAY<BIGINT>

Yes

The multiplier parameter  for the MinHash function group, satisfying.

perms_b

ARRAY<BIGINT>

Yes

The addend parameter for the MinHash function group.

hash_ranges

ARRAY<INT>

Yes

The boundaries for band division. The format is [0, R, 2R, ..., B*R], where is the number of bands and is the number of rows per band.

ngram_size

INT

Yes

The size of the n-gram. A value of 5–9 is recommended for long texts.

min_length

INT

Yes

The minimum length of the input tokens. Records with fewer tokens are skipped.

Return value

  • Type: ARRAY<STRING>

  • Description: An array of hexadecimal hash strings. Each string corresponds to a band, and there are elements in total.

    Output example: ["a1b2c3", "d4e5f6", ...]

build_lsh_edges

This function processes a group of document IDs that are in the same LSH bucket. It generates an edge set based on the "minimum node connection" policy. This set is then used for graph connected components analysis to cluster duplicate documents.

Command format

build_lsh_edges(doc_ids: ARRAY<BIGINT>)

Parameter descriptions

Parameter

Type

Required

Description

doc_ids

ARRAY<BIGINT>

Yes

A list of document IDs in the same LSH bucket.

Return value:

  • Type: ARRAY<STRUCT<src: LONG, dst: LONG>>

  • Description: A collection of edges formed by connecting the node with the smallest ID in a bucket (the source node) to all other nodes in the bucket.

    Example: For the IDs [1003, 1001, 1005] in a bucket, the minimum ID after sorting is 1001, and the edges (1001,1003) and (1001,1005)

Example: Deduplicate the fineweb-edu dataset

This example demonstrates the process using the sample/10BT subset of the fineweb-edu open-source dataset.

image

  • minhash_lsh: This function splits each line of input text into a set of n-gram tokens and generates a fixed-length signature vector using the MinHash algorithm. The function then divides the signature into multiple bands based on the specified hash_ranges and calculates a hexadecimal-encoded hash value for each band. Finally, it returns an array of all band hash values.

  • build_lsh_edges: This function takes a group of document IDs that fall into the same LSH bucket and generates a list of edges based on a "minimum node connection" policy. In this policy, the smallest ID in the bucket acts as a central node and connects to all other nodes. This list is then used for graph connected components analysis to cluster duplicate documents.

Supported versions

Only the following engine versions support the operations described in this topic:

  • esr-4.x: esr-4.1.1 and later.

  • esr-3.x: esr-3.1.1 and later.

  • esr-2.x: esr-2.5.1 and later.

Step 1: Prepare the data

  1. Download the dataset.

    The full size of the sample/10BT open-source dataset is 28.5 GB. This example uses only a 2.15 GB portion of the dataset.

  2. Upload the dataset to OSS.

    You can use the console or command-line interface (CLI) to upload the Parquet files to a specified path. For more information, see Simple upload.

  3. View the uploaded data.

    1. You can create a temporary view in the Spark environment to verify that the data can be read correctly.

      CREATE OR REPLACE TEMPORARY VIEW
        temp_source USING parquet OPTIONS (path 'oss://<bucket>/fineweb-edu/sample/10BT/')
      ;
    2. View the data. The total number of records is 727,000.

Step 2: Write the script

Save the following code as MinHash.py. The parameters are described in the table below.

import re
from typing import List
from typing import Tuple

import pyspark
import numpy as np
import numpy.typing as npt
from graphframes import GraphFrame
from pyspark.sql import SparkSession
from pyspark.sql import functions as sf
from scipy.integrate import quad as integrate

RNG = np.random.RandomState(42)
SPLIT_PATTERN = re.compile(r"[\s\xA0]+")  # Set the text delimiter
DTYPE = np.uint32
MAX_HASH = 4_294_967_295  # maximum 32-bit unsigned integer
MOD_PRIME = 4_294_967_291  # maximum 32-bit prime number

input_files = "oss://<bucket>/fineweb-edu/sample/10BT/*.parquet"  # Set the data files to be deduplicated
output_path = "oss://<bucket>/fineweb-edu/output/10BT"  # Set the output path
checkpoint_dir = "oss://<bucket>/fineweb-edu/checkpoints"  # Set the checkpoint directory to store intermediate results during graph connected components calculation
threshold = 0.8  # Set the threshold parameter
num_perm = 256  # Set the number of permutations
ngram_size = 5  # Set the n-gram size
min_length = ngram_size  # Set the minimum n-gram length, usually the same as ngram_size
text_column = "text"  # Set the name of the text column to be deduplicated
index_column = "__id__"

# Set a fixed default parallelism to ensure the stability of the graph connected components calculation results
conf = (
    pyspark.SparkConf()
    .set("spark.default.parallelism", "200")
)

spark = SparkSession.Builder() \
    .appName("MinHashLSH") \
    .config(conf=conf) \
    .enableHiveSupport() \
    .getOrCreate()

spark.sparkContext.setCheckpointDir(checkpoint_dir)
# Automatically calculate the optimal band parameters (B, R)
def optimal_param(
        threshold: float,
        num_perm: int,
        false_positive_weight: float = 0.5,
        false_negative_weight: float = 0.5
):
    def false_positive_area(threshold: float, b: int, r: int):
        a, _ = integrate(lambda s: 1 - (1 - s**r)**b, 0.0, threshold)
        return a

    def false_negative_area(threshold: float, b: int, r: int):
        a, _ = integrate(lambda s: 1 - (1 - (1 - s**r)**b), threshold, 1.0)
        return a

    min_error = float("inf")
    opt = (0, 0)
    for b in range(1, num_perm + 1):
        max_r = int(num_perm / b)
        for r in range(1, max_r + 1):
            fp = false_positive_area(threshold, b, r)
            fn = false_negative_area(threshold, b, r)
            error = fp * false_positive_weight + fn * false_negative_weight
            if error < min_error:
                min_error = error
                opt = (b, r)
    return opt


# B (Bands), R (Rows per Band), B × R = num_perm
# B and R divide the MinHash signature matrix into B bands, each with R rows. You can specify these parameters yourself or calculate them from the threshold and num_perm.
B, R = optimal_param(threshold, num_perm)

HASH_RANGES_SLICE: List[int] = [i * R for i in range(B + 1)]
PERMUTATIONS: Tuple[npt.NDArray[DTYPE], npt.NDArray[DTYPE]] = (
    RNG.randint(1, MOD_PRIME, size=(num_perm,), dtype=DTYPE),
    RNG.randint(0, MOD_PRIME, size=(num_perm,), dtype=DTYPE),
)

# If the data has a primary key ID column, you can use it directly as the index_column instead of generating an index with monotonically_increasing_id
# .withColumn(index_column, "primary_key_col")
df = spark.read.parquet(input_files) \
    .withColumn(index_column, sf.monotonically_increasing_id())

a, b = PERMUTATIONS

# Use the minhash_lsh function to get the band_hash list, and then use explode to expand the list into band_idx and band_hash
hash_df = df \
    .select(index_column,
            sf.split(
                sf.lower(text_column),
                pattern=SPLIT_PATTERN.pattern).alias("tokens")) \
    .select(index_column,
            sf.minhash_lsh(
                "tokens",
                a.tolist(),
                b.tolist(),
                HASH_RANGES_SLICE,
                ngram_size,
                min_length).alias("hashes")) \
    .select(index_column, sf.posexplode("hashes").alias("band_idx", "band_hash"))

# Use the build_lsh_edges function to generate edges between nodes for aggregated band_idx values in the same LSH bucket, based on the "minimum node connection" policy
edges_df = hash_df.groupBy("band_idx", "band_hash") \
    .agg(sf.count(index_column).alias("cnt"), sf.collect_list(index_column).alias("doc_ids")) \
    .filter(sf.col("cnt") > 1) \
    .select(sf.build_lsh_edges("doc_ids").alias("edges")) \
    .select(sf.explode("edges").alias("edge")) \
    .selectExpr("edge.src as src", "edge.dst as dst") \
    .persist(pyspark.StorageLevel.DISK_ONLY)

# Calculate the vertices for all edges
vertices_df = edges_df.select(sf.col("src").alias("id")) \
    .union(edges_df.select(sf.col("dst").alias("id"))) \
    .distinct() \
    .repartition(4096) \
    .persist(pyspark.StorageLevel.MEMORY_AND_DISK_DESER)

assignment = GraphFrame(vertices_df, edges_df).connectedComponents()
# Keep the representative document with the smallest ID in each connected component
df = df.join(assignment.select(sf.col("id").alias(index_column), sf.col("component").alias("__component__")),
             on=index_column, how="left") \
    .filter(sf.col("__component__").isNull() | (sf.col("__component__") == sf.col(index_column))) \
    .drop("__component__")
    
# Output the deduplicated results
df.write.parquet(output_path, mode="overwrite", compression="snappy")

# Clear the cache
edges_df.unpersist()
vertices_df.unpersist()

Parameter descriptions

Parameter

Example value

Description

input_files

"oss://<bucket>/fineweb-edu/sample/10BT/*.parquet"

Input data path (Parquet format).

output_path

"oss://<bucket>/fineweb-edu/output/10BT"

Output path for the deduplicated results.

checkpoint_dir

"oss://<bucket>/fineweb-edu/checkpoints"

Storage path for the intermediate state of the graph computation.

threshold

0.8

Similarity threshold (0.7–0.9 recommended).

num_perm

256

Length of the MinHash signature (affects accuracy and performance).

ngram_size

5

n-gram size (5–9 recommended for long text).

min_length

5

Minimum token length for a text. Shorter texts are skipped.

text_column

"text"

Name of the text column to be deduplicated.

Step 3: Upload the file

  1. Go to the resource upload page.

    1. Log on to the E-MapReduce console.

    2. In the navigation pane on the left, choose EMR Serverless > Spark.

    3. On the Spark page, click the name of the target workspace.

    4. On the EMR Serverless Spark page, click Artifacts in the navigation pane on the left.

  2. On the Artifacts page, click Upload File.

  3. In the Upload File dialog box, click the upload area to select MinHash.py, or drag MinHash.py to the upload area.

Step 4: Create and run a batch job

After you write and upload the script, you can create a batch processing job in the Serverless Spark environment to run the deduplication job.

  1. On the EMR Serverless Spark page, click Development on the left.

  2. On the Development tab, click the image icon.

  3. In the dialog box that appears, enter a name, set Type to Application(Batch) > PySpark, and click OK.

  4. In the upper-right corner, select a queue.

    For more information about how to add a queue, see Manage resource queues.

  5. On the new development tab, configure the following information, leave the other parameters at their default settings, and then click Run.

    Parameter

    Description

    Main Python Resource

    Select the Python file that you uploaded on the Artifacts page in the previous step. In this example, it is MinHash.py.

    Engine Version

    esr-2.8.0/esr-3.4.0/esr-4.4.0 or later.

    Resource Configuration

    We recommend a ratio of 4 CPU to 16 GB of memory. For example, spark.executor.cores=4, spark.executor.memory=14GB, spark.executor.memoryOverhead=2GB.

    Spark Configuration

    spark.rdd.ensureConfigConsistency true: Required. This parameter must be set to true.

    Execution efficiency optimization parameters:

    • spark.sql.shuffle.partitions 1000: For datasets smaller than 1 TB, a value of 1000 is recommended. For each additional 1 TB of data, increase this value by 1000.

    • spark.sql.files.maxPartitionBytes 256MB: Controls the partition size during the read phase to avoid too many small files. A value of 128 MB or 256 MB is usually sufficient.

  6. After the job starts, in the Execution Records section at the bottom, click Details in the Actions column of the job to monitor its execution.

Step 5: Verify the results

After the job is successfully completed, you can verify the deduplication results.

  1. Create a temporary view in the Spark SQL environment to load the deduplicated data.

    CREATE OR REPLACE TEMPORARY VIEW
      temp_target USING parquet OPTIONS (path 'oss://<bucket>/fineweb-edu/output/10BT')
    ;
  2. View the data. The total number of records is 724,809. This indicates that 2,191 duplicate items were removed from the original 727,000 records.

Performance tuning recommendations

To balance the accuracy, recall rate, and execution efficiency of deduplication for different data scales and business requirements, we recommend systematically tuning the following three aspects.

Algorithm parameter tuning

Parameter

Recommended value

Description

num_perm

128 or 256

A larger value increases accuracy but also increases computational cost. A value of 256 is usually sufficient.

ngram_size

5–9 (for long text), 2–3 (for short sentences)

Controls semantic granularity. A value that is too large may reduce sensitivity.

threshold

0.7–0.9

A value higher than 0.9 may result in insufficient recall.

LSH schema design: B and R

The MinHash signature is divided into bands, and each band contains rows, where = num_perm. This combination directly affects the quality of candidate pair generation.

Ideally, the LSH function RB should rise steeply near the target threshold to achieve high differentiation.

Recommended configurations (for num_perm=256)

Goal

B (number of bands)

R (rows per band)

High recall (no missed deletions)

32

8

High precision (no over-deletions)

8

32

Balanced (recommended starting point)

16

16

Spark execution optimization

Configuration item

Recommended setting

Description

spark.sql.shuffle.partitions

For datasets smaller than 1 TB, a value of 1000 is recommended. For each additional 1 TB of data, increase this value by 1000.

Prevents data skew or out-of-memory (OOM) errors in a single task.

spark.sql.files.maxPartitionBytes

128MB or 256MB

Controls the shard size during the read phase to avoid small file issues.