All Products
Search
Document Center

E-MapReduce:Large-scale text deduplication using MinHash-LSH

Last Updated:Jun 21, 2026

This topic describes how to use built-in Serverless Spark functions to identify and remove near-duplicate content from large text corpora, a common data cleaning task for large language models (LLMs).

MinHash-LSH overview

MinHash-LSH is a classic algorithm combination for approximate similarity detection, widely used for large-scale set similarity calculations, such as Jaccard similarity. Its core ideas are:

  1. MinHash: Converts text into a set of n-grams, then uses multiple hash functions to generate a compact signature vector that preserves the similarity features of the original set.

  2. locality-sensitive hashing (LSH): Divides the signature vector into multiple bands. Each band is hashed separately, making it more likely for highly similar texts to fall into the same hash bucket, enabling rapid filtering of candidate pairs.

Serverless Spark integrates MinHash-LSH capabilities through the minhash_lsh and build_lsh_edges functions. It leverages the Fusion engine for vectorized execution, accelerating deduplication tasks by eliminating the overhead of row-to-column data conversion.

Built-in functions

minhash_lsh

Tokenizes the input text, generates a MinHash signature, divides the signature into bands, and returns a list of hash values for each band.

Syntax

minhash_lsh(
  tokens: ARRAY<STRING>,
  perms_a: ARRAY<BIGINT>,
  perms_b: ARRAY<BIGINT>,
  hash_ranges: ARRAY<INT>,
  ngram_size: INT,
  min_length: INT
)

Parameters

Parameter

Type

Required

Description

tokens

ARRAY<STRING>

Yes

An array of tokens, typically generated by a function like split(lower(text), '\\s+').

perms_a

ARRAY<BIGINT>

Yes

The multiplier parameter  for the MinHash function family, satisfying the formula .

perms_b

ARRAY<BIGINT>

Yes

The addend parameter  for the MinHash function family.

hash_ranges

ARRAY<INT>

Yes

The band division boundaries, formatted as [0, R, 2R, ..., B*R], where  is the number of bands and  is the number of rows per band.

ngram_size

INT

Yes

The size of each n-gram. For long texts, a value of 5–9 is recommended.

min_length

INT

Yes

The minimum number of input tokens. Records with fewer tokens are skipped.

Return value

  • Type: ARRAY<STRING>

  • Description: An array of  hexadecimal hash strings, where each string corresponds to a band.

    Example output: ["a1b2c3", "d4e5f6", ...]

build_lsh_edges

This function generates an edge list for document IDs that fall into the same LSH bucket, based on a "minimum node connection" policy. This list is used for subsequent connected components analysis to cluster duplicate documents.

Syntax

build_lsh_edges(doc_ids: ARRAY<BIGINT>)

Parameters

Parameter

Type

Required

Description

doc_ids

ARRAY<BIGINT>

Yes

A list of document IDs in the same LSH bucket.

Return value:

  • Type: ARRAY<STRUCT<src: LONG, dst: LONG>>

  • Description: An edge list where the node with the smallest ID in the bucket is the source node, connected to all other nodes in the bucket.

    Example: For bucket IDs [1003, 1001, 1005], the function sorts them, selects the minimum ID 1001, and generates the edges (1001,1003) and (1001,1005).

Example: Deduplicate the fineweb-edu dataset

This example uses the sample/10BT subset of the open-source fineweb-edu dataset to demonstrate the process.

image

  • minhash_lsh: This function splits each line of input text into a set of n-gram tokens and generates a fixed-length signature vector by using the MinHash algorithm. It then divides the signature into multiple bands based on the specified hash_ranges, calculates a hexadecimal-encoded hash value for each band, and finally returns an array of the hash values for all bands.

  • build_lsh_edges: This function receives a set of document IDs that fall into the same LSH bucket and generates an edge list for subsequent graph connected components analysis to cluster duplicate documents. The list is generated based on the "minimum node connection" strategy—the minimum ID in the bucket is used as a center node and is connected to all other nodes.

Supported versions

Only the following engine versions support the operations described in this topic:

  • esr-4.x: esr-4.1.1 and later.

  • esr-3.x: esr-3.1.1 and later.

  • esr-2.x: esr-2.5.1 and later.

Step 1: Prepare the data

  1. Download the dataset.

    The full sample/10BT open-source dataset is 28.5 GB.

  2. Upload the dataset to OSS.

    Use the console or CLI to upload the Parquet files to the specified path. For more information, see Simple upload.

  3. Verify the uploaded data.

    1. Create a temporary view in your Spark environment to verify that the data is readable.

      CREATE OR REPLACE TEMPORARY VIEW
        temp_source USING parquet OPTIONS (path 'oss://<bucket>/fineweb-edu/sample/10BT/')
      ;
    2. Query the data. The dataset contains 727,000 records.

Step 2: Write the script

Save the following code as MinHash.py. The table below describes the key parameters.

import re
from typing import List
from typing import Tuple
import pyspark
import numpy as np
import numpy.typing as npt
from graphframes import GraphFrame
from pyspark.sql import SparkSession
from pyspark.sql import functions as sf
from scipy.integrate import quad as integrate
RNG = np.random.RandomState(42)
SPLIT_PATTERN = re.compile(r"[\s\xA0]+")  # Set the text delimiter.
DTYPE = np.uint32
MAX_HASH = 4_294_967_295  # Maximum 32-bit unsigned integer.
MOD_PRIME = 4_294_967_291  # Maximum 32-bit prime number.
input_files = "oss://<bucket>/fineweb-edu/sample/10BT/*.parquet"  # Set the input files to deduplicate.
output_path = "oss://<bucket>/fineweb-edu/output/10BT"  # Set the output path.
checkpoint_dir = "oss://<bucket>/fineweb-edu/checkpoints"  # Set the checkpoint for storing intermediate results during connected components computation.
threshold = 0.8  # Set the similarity threshold.
num_perm = 256  # Set the number of permutations.
ngram_size = 5  # Set the n-gram size.
min_length = ngram_size  # Set the minimum n-gram length, usually same as ngram_size.
text_column = "text"  # Set the name of the text column to deduplicate.
index_column = "__id__"
# Set a fixed default parallelism to ensure stable connected components results.
conf = (
    pyspark.SparkConf()
    .set("spark.default.parallelism", "200")
)
spark = SparkSession.Builder() \
    .appName("MinHashLSH") \
    .config(conf=conf) \
    .enableHiveSupport() \
    .getOrCreate()
spark.sparkContext.setCheckpointDir(checkpoint_dir)
# Automatically compute optimal band parameters (B, R).
def optimal_param(
        threshold: float,
        num_perm: int,
        false_positive_weight: float = 0.5,
        false_negative_weight: float = 0.5
):
    def false_positive_area(threshold: float, b: int, r: int):
        a, _ = integrate(lambda s: 1 - (1 - s**r)**b, 0.0, threshold)
        return a
    def false_negative_area(threshold: float, b: int, r: int):
        a, _ = integrate(lambda s: 1 - (1 - (1 - s**r)**b), threshold, 1.0)
        return a
    min_error = float("inf")
    opt = (0, 0)
    for b in range(1, num_perm + 1):
        max_r = int(num_perm / b)
        for r in range(1, max_r + 1):
            fp = false_positive_area(threshold, b, r)
            fn = false_negative_area(threshold, b, r)
            error = fp * false_positive_weight + fn * false_negative_weight
            if error < min_error:
                min_error = error
                opt = (b, r)
    return opt
# B (bands) and R (rows per band) split the MinHash signature matrix into B bands,
# each containing R rows, where B * R = num_perm.
# You can set these values manually or compute them from the threshold and num_perm.
B, R = optimal_param(threshold, num_perm)
HASH_RANGES_SLICE: List[int] = [i * R for i in range(B + 1)]
PERMUTATIONS: Tuple[npt.NDArray[DTYPE], npt.NDArray[DTYPE]] = (
    RNG.randint(1, MOD_PRIME, size=(num_perm,), dtype=DTYPE),
    RNG.randint(0, MOD_PRIME, size=(num_perm,), dtype=DTYPE),
)
# If your data has a primary key ID column, use it directly as the index_column.
# You do not need to generate an index with monotonically_increasing_id().
# .withColumn(index_column, "your_primary_key_col")
df = spark.read.parquet(input_files) \
    .withColumn(index_column, sf.monotonically_increasing_id())
a, b = PERMUTATIONS
# Use minhash_lsh to generate the band hash list, then use posexplode
# to expand the list into band_idx and band_hash columns.
hash_df = df \
    .select(index_column,
            sf.split(
                sf.lower(text_column),
                pattern=SPLIT_PATTERN.pattern).alias("tokens")) \
    .select(index_column,
            sf.minhash_lsh(
                "tokens",
                a.tolist(),
                b.tolist(),
                HASH_RANGES_SLICE,
                ngram_size,
                min_length).alias("hashes")) \
    .select(index_column, sf.posexplode("hashes").alias("band_idx", "band_hash"))
# Use build_lsh_edges to generate edges for documents that fall into the same LSH bucket
# based on the "minimum node connection" policy.
edges_df = hash_df.groupBy("band_idx", "band_hash") \
    .agg(sf.count(index_column).alias("cnt"), sf.collect_list(index_column).alias("doc_ids")) \
    .filter(sf.col("cnt") > 1) \
    .select(sf.build_lsh_edges("doc_ids").alias("edges")) \
    .select(sf.explode("edges").alias("edge")) \
    .selectExpr("edge.src as src", "edge.dst as dst") \
    .persist(pyspark.StorageLevel.DISK_ONLY)
# Compute all vertices from the edge list.
vertices_df = edges_df.select(sf.col("src").alias("id")) \
    .union(edges_df.select(sf.col("dst").alias("id"))) \
    .distinct() \
    .repartition(4096) \
    .persist(pyspark.StorageLevel.MEMORY_AND_DISK_DESER)
assignment = GraphFrame(vertices_df, edges_df).connectedComponents()
# Keep the representative document (the one with the smallest ID) in each connected component.
df = df.join(assignment.select(sf.col("id").alias(index_column), sf.col("component").alias("__component__")),
             on=index_column, how="left") \
    .filter(sf.col("__component__").isNull() | (sf.col("__component__") == sf.col(index_column))) \
    .drop("__component__")
# Write the deduplicated output.
df.write.parquet(output_path, mode="overwrite", compression="snappy")
# Release cached data.
edges_df.unpersist()
vertices_df.unpersist()

Script parameters

Parameter

Example value

Description

input_files

"oss://<bucket>/fineweb-edu/sample/10BT/*.parquet"

Path to the input data in Parquet format.

output_path

"oss://<bucket>/fineweb-edu/output/10BT"

Output path for the deduplicated results.

checkpoint_dir

"oss://<bucket>/fineweb-edu/checkpoints"

Storage path for intermediate states during graph computation.

threshold

0.8

Jaccard similarity threshold. Recommended: 0.7–0.9.

num_perm

256

The length of the MinHash signature, which affects precision and performance.

ngram_size

5

The size of n-grams. Recommended: 5–9 for long texts.

min_length

5

The minimum token length of the text. Documents with fewer tokens are skipped.

text_column

"text"

The name of the text column to be deduplicated.

Step 3: Upload the file

  1. Navigate to the artifact upload page.

    1. Log on to the E-MapReduce console.

    2. In the left navigation pane, choose EMR Serverless > Spark.

    3. On the Spark page, click the name of your target workspace.

    4. On the workspace page, click Artifacts in the left navigation pane.

  2. On the Artifacts page, click Upload File.

  3. In the Upload File dialog box, click the upload area to select MinHash.py, or drag and drop MinHash.py into the upload area.

Step 4: Create and run a batch job

After uploading the script, create and run a batch job to perform the deduplication.

  1. On the workspace page, click Development in the left navigation pane.

  2. On the Development tab, click the image icon.

  3. In the dialog box, enter a name, select Application for the PySpark type, and click OK.

  4. In the upper-right corner, select a resource queue.

    For details on adding a queue, see Manage resource queues.

  5. In the new development tab, configure the following settings, leave other parameters at their default values, and then click Run.

    Parameter

    Description

    Main Python Resources

    Select the Python file you uploaded to Artifacts in the previous step. In this example, select MinHash.py.

    Engine Version

    Select an appropriate version, such as esr-4.4.0 or later.

    Resource Configuration

    A resource ratio of 4 vCPUs to 16 GB of memory per executor is recommended. For example: spark.executor.cores=4, spark.executor.memory=14GB, spark.executor.memoryOverhead=2GB.

    Spark Configuration

    spark.rdd.ensureConfigConsistency true: Required. Must be set to true.

    Execution performance parameters:

    • spark.sql.shuffle.partitions 1000: For datasets under 1 TB, a value of 1000 is recommended. Increase this by 1000 for each additional 1 TB of data.

    • spark.sql.files.maxPartitionBytes 256MB: Controls the partition size during the read phase to avoid an excessive number of small files. A value of 128 MB or 256 MB is typically sufficient.

  6. After the job starts, find the job in the Execution Records area at the bottom of the page and click Details in the Actions column to monitor its progress.

Step 5: Verify the results

After the job completes successfully, verify the deduplication results.

  1. Create a temporary view in the Spark SQL environment to load the deduplicated data.

    CREATE OR REPLACE TEMPORARY VIEW
      temp_target USING parquet OPTIONS (path 'oss://<bucket>/fineweb-edu/output/10BT')
    ;
  2. Query the data. The total record count is 724,809, down from the original 727,000, indicating that 2,191 duplicate items were removed. Run the query SELECT * FROM temp_target LIMIT 10; to validate the loaded data. The returned data includes fields such as __id__, text, id, dump, url, file_path, and language, confirming that the Common Crawl web data has been successfully deduplicated and written to the target path.

Performance tuning

To balance precision, recall, and execution efficiency across data scales and business goals, consider tuning the solution in the following three areas.

Algorithm parameters

Parameter

Recommended value

Description

num_perm

128 or 256

Higher values improve precision but increase computational cost. A value of 256 is usually sufficient.

ngram_size

5–9 (long text), 2–3 (short sentences)

Controls semantic granularity. A value that is too large may reduce sensitivity to local changes.

threshold

0.7–0.9

Values above 0.9 may result in insufficient recall (missed duplicates).

LSH band configuration: B and R

The MinHash signature is split into  bands, and each band contains  rows, such that  = num_perm. This combination directly affects the quality of candidate pair generation.

Ideally, the LSH probability curve, R B, should rise sharply near the target threshold. This provides strong separation between similar and dissimilar pairs.

Recommended configurations (for num_perm=256)

Goal

B (bands)

R (rows)

High recall (minimize missed duplicates)

32

8

High precision (minimize false positives)

8

32

Balanced (recommended starting point)

16

16

Spark execution optimization

Parameter

Recommended setting

Description

spark.sql.shuffle.partitions

For datasets under 1 TB, set to 1000. Increase by 1000 for each additional 1 TB of data.

Prevents data skew or out-of-memory (OOM) errors in individual tasks.

spark.sql.files.maxPartitionBytes

128MB or 256MB

Controls the partition size during the read phase to prevent performance degradation from processing too many small files.