Elizabeth
Engineer
Engineer
  • UID625
  • Fans2
  • Follows1
  • Posts68
Reads:1326Replies:0

Analysis on Data Partitioning and Routing Mechanism of Apache Geode/GemFire

Created#
More Posted time:Sep 6, 2016 11:39 AM
This article mainly explains how Apache Geode/GemFire implements data partitioning.
Like most distributed systems, GemFire also implements partitioning of data in Hash mode, that is, to distribute the Entry data to PartitionedRegion. We all know that the Entry data is mainly stored in ConcurrentHashMap, and ConcurrentHashMap is stored in a Bucket. Therefore, after PR server startup, PartitionedRegion will create a corresponding Bucket to store this ConcurrentHashMap. Therefore, the mapping relationship among the three is as follows:
Entry —> ConcurrentHashMap —> Bucket —> Region
How to implement data partitioning
After processing by the front-end applications, the Entry will be partitioned and distributed to the PR server through the following steps.
1. The Entry will be put into the Bucket created by PR during insertion into a node server of the distributed cluster.
2. During the Put operation, an EntryOperation event will be generated, in which the PartionedRegion and the Key for the Put operation can be found. This routing object is generally known as the POJO operation class.
3. Get the Key during the entry operation on the obtained routed object, and then get the relevant Object via the Key.
4. Then get bucketId via the Key, and get PR via the bucketId, set a key/routing object, run the hashCode() to generate a long value, and then implement modular operation for this value and the bucket size to obtain the bucketId.
   *To better facilitate the hash key distribution, it is recommended to use MD5, SHA or other ID generation methods.
See the getHashKey methods in PartionedRegionHelper.
private static int getHashKey(EntryOperation event, PartitionedRegion pr,
      Operation operation, Object key, Object value, Object callbackArgument) {
    // avoid creating EntryOperation if there is no resolver
    if (event != null) {
      pr = (PartitionedRegion)event.getRegion();
      key = event.getKey();
      callbackArgument = event.getCallbackArgument();
    }

    PartitionResolver resolver = getResolver(pr, key, callbackArgument);
    Object resolveKey = null;
    if (pr.isFixedPartitionedRegion()) {
      String partition = null ;
      if (resolver instanceof FixedPartitionResolver) {
        Map<String, Integer[]> partitionMap = pr.getPartitionsMap();
        if (event == null) {
          event = new EntryOperationImpl(pr, operation, key, value,
              callbackArgument);
        }
        partition = ((FixedPartitionResolver)resolver).getPartitionName(
            event, partitionMap.keySet());
        if (partition == null) {
          Object[] prms = new Object[] { pr.getName(), resolver };
          throw new IllegalStateException(
              LocalizedStrings.PartitionedRegionHelper_FOR_REGION_0_PARTITIONRESOLVER_1_RETURNED_PARTITION_NAME_NULL.toLocalizedString(prms));
        }
        Integer[] bucketArray = partitionMap.get(partition);
        if (bucketArray == null) {
          Object[] prms = new Object[] { pr.getName(), partition };
          throw new PartitionNotAvailableException(
              LocalizedStrings.PartitionedRegionHelper_FOR_FIXED_PARTITIONED_REGION_0_FIXED_PARTITION_1_IS_NOT_AVAILABLE_ON_ANY_DATASTORE.toLocalizedString(prms));
        }
        int numBukets = bucketArray[1];
        resolveKey = (numBukets == 1) ? partition : resolver.getRoutingObject(event);
      }
      else if (resolver == null) {
        throw new IllegalStateException(
            LocalizedStrings.PartitionedRegionHelper_FOR_FIXED_PARTITIONED_REGION_0_FIXED_PARTITION_RESOLVER_IS_NOT_AVAILABLE.toString(pr.getName()));
      }
      else if (!(resolver instanceof FixedPartitionResolver)) {
        Object[] prms = new Object[] { pr.getName(), resolver };
        throw new IllegalStateException(
            LocalizedStrings.PartitionedRegionHelper_FOR_FIXED_PARTITIONED_REGION_0_RESOLVER_DEFINED_1_IS_NOT_AN_INSTANCE_OF_FIXEDPARTITIONRESOLVER.toLocalizedString(prms));
      }
      return assignFixedBucketId(pr, partition, resolveKey);
    }
    else {
      // Calculate resolveKey.
      if (resolver == null) {
        // no custom partitioning at all
        resolveKey = key;
        if (resolveKey == null) {
          throw new IllegalStateException("attempting to hash null");
        }
      }
      else {
        if (event == null) {
          event = new EntryOperationImpl(pr, operation, key, value,
              callbackArgument);
        }
        //Get a route object via the Entry operation, get the resolveKey, and then implement Hash computing via the resolveKey.
        resolveKey = resolver.getRoutingObject(event);
        if (resolveKey == null) {
          throw new IllegalStateException(
              LocalizedStrings.PartitionedRegionHelper_THE_ROUTINGOBJECT_RETURNED_BY_PARTITIONRESOLVER_IS_NULL.toLocalizedString());
        }
      }
      // Finally, calculate the hash.
      return getHashKey(pr, resolveKey);
    }
  }


How to implement data-aware routing
GemFire is developed with a Function Service module which enables the client to process the tasks submitted together with the server node. If the data covers multiple node partitions, GemFire can transparently route the data execution behavior to the node for processing, so that the inter-network movement of data is avoided, which is called "data-aware function routing". The applications with data-aware routing functions do not need to manage the data at all.
GemFire routes the execution behavior of data, rather than the data itself, directly to the node to implement parallel processing or convergence of results. This characteristic enables GemFire to radically reduce the time to perform complex tasks. The distributed & parallel processing activities are abstracted out and irrelevant to the application invocation side.
The application can be subject to execution either in a single node or a small cluster, and even to parallel execution across the entire distributed cluster.
The parallel model of GemFire is very similar to the Map-Reduce model of Google. The data-aware routing is best suited to execution of iterative query or data entry aggregation operations. With data parallel processing and computing, the system throughput is significantly increased. Most importantly, the delay of computing is inversely proportional to the number of nodes for parallel computing.
The execution of Function at a single node is similar to that of a Stored Procedure in a relational database.
After the parallel computing, the results will be sent to the result collector in Function, and the ResultCollector will be invoked to collect the processed results in a unified manner. It is equivalent to the output collector in the Map-Reduce model.
Guest