DISTRIBUTED MAPJOIN is an optimized version of MAPJOIN. You can use DISTRIBUTED MAPJOIN when you join a small table with a large table. You can use DISTRIBUTED MAPJOIN and MAPJOIN to reduce shuffling and sorting on the large table.

Precautions

  • The sizes of the tables that you want to join must be different. The size of the large table must be greater than 10 TB, and the size of the small table must be within the range of [1 GB, 100 GB].
  • Data in the small table must be evenly distributed. If a small table contains long tails, excessive data is generated in a single shard of the table. As a result, an out of memory (OOM) error and the remote procedure call (RPC) timeout issue may occur.
  • If an SQL task runs more than 20 minutes, we recommend that you use DISTRIBUTED MAPJOIN for optimization.
  • Excessive resources are occupied when a task is running. Therefore, we recommend that you do not run a task in a small quota group.
    Note On the Quotas page, you can change the quota group. For more information, see Configure quota groups.

Use DISTRIBUTED MAPJOIN

To use DISTRIBUTED MAPJOIN, you must add the hint /*+distmapjoin(<table_name>(shard_count=<n>,replica_count=<m>))*/ to a SELECT statement. Both the shard_count and replica_count parameters are used to determine the parallelism of tasks. Formula: Parallelism of tasks = shard_count × replica_count.
  • Parameters
    • table_name: the name of the small table that you want to join.
    • shard_count=<n>: the number of data shards of the small table that you want to join. The data shards of the small table are distributed on each compute node for data processing. n: the number of shards. In most cases, this parameter is set to an odd number.
      Note
      • We recommend that you manually specify the shard_count parameter. You can estimate the value of the shard_count parameter based on the size of the small table. The estimated data amount that is processed by a single shard node is within the range of [200 MB, 500 MB].
      • If you set the shard_count parameter to an excessively large value, the data processing performance and stability are affected. If you set the shard_count parameter to an excessively small value, an error may occur due to the excessive use of memory.
    • replica_count=<m>: the number of replicas of the small table. m indicates the number of replicas. Default value: 1.
      Note To reduce access pressure and prevent the failure of the entire task caused by the failure of a single node, you can create multiple replicas of data in the same shard. If a node frequently restarts because the parallelism of tasks is high or the environment is unstable, you can increase the value of the replica_count parameter. You can set this parameter to 2 or 3.
  • Syntax
    -- Recommended. Specify the shard_count parameter and retain the default value 1 for the replica_count parameter.
    /*+distmapjoin(a(shard_count=5))*/
    
    -- Recommended. Specify the shard_count and replica_count parameters.
    /*+distmapjoin(a(shard_count=5,replica_count=2))*/
    
    -- Use DISTRIBUTED MAPJOIN to join multiple small tables.
    /*+distmapjoin(a(shard_count=5,replica_count=2),b(shard_count=5,replica_count=2)) */
    
    -- Use DISTRIBUTED MAPJOIN and MAPJOIN together.
    /*+distmapjoin(a(shard_count=5,replica_count=2)),mapjoin(b)*/

Example

This example describes how to use DISTRIBUTED MAPJOIN when you insert data into the partitioned table tmall_dump_lasttable.
  • Standard syntax
    insert OVERWRITE table tmall_dump_lasttable partition(ds='20211130')
    select t1.*
    from
    (
        select nid, doc,type
        from search_ods.dump_lasttable where ds='20211203'
    )t1
    join
    (
        select distinct item_id
        from tbcdm.dim_tb_itm
        where ds='20211130'
        and bc_type='B'
        and is_online='Y'
    )t2
    on t1.nid=t2.item_id;
  • Syntax after optimization
    insert OVERWRITE table tmall_dump_lasttable partition (ds='20211130')
    select /*+ distmapjoin(t2(shard_count=35)) */ t1.*
    from
    (
        select nid, doc, type
        from search_ods.dump_lasttable where ds='20211203'
    )t1
    join
    (
        select distinct item_id
        from tbcdm.dim_tb_itm
        where ds='20211130'
        and bc_type='B'
        and is_online='Y'
    )t2
    on t1.nid=t2.item_id;