edit-icon download-icon

Example of grid jobs

Last Updated: Mar 15, 2018

Introduction to grid jobs

Grid jobs are mainly used for scheduled jobs with a large amount of data to be processed or computed.

After a job is triggered, you can define your own business logic to split the job into multiple sub-jobs. These sub-jobs can be distributed to all the application machines in your application cluster and executed concurrently. To run these sub-jobs, the application machines must add SchedulerX-Client dependency in their properties file and be connected to the specific job group. This mechanism achieves quick processing of large jobs by using the processing capabilities of the cluster.

The grid jobs support custom sub-job distribution logic, route logic, and processing logic. They also support multi-level distribution of sub-jobs.

Features of grid jobs

  • Self-built client machine cluster: Job data can be transferred among these clients.

  • Job distribution load balancing: Detects the scale-in or scale-out of the client cluster intelligently.

  • Failover: If a client is down, the jobs assigned to this client but not completed yet are dynamically transferred to other client machines.

  • Job distribution routing: Routes can be configured to dynamically distribute jobs to specific machines. For more information, see Route distribution.

  • Support millions or even ten millions of jobs: Fore more information about large-scale sub-job scenarios, see Best practices.

  • Failure compensation: Sound mechanisms are established to compensate failures. When a job at the business layer fails, you can return a specific object to inform SchedulerX that the job needs to be re-executed.

  • Distribution-only mode: The client machine that distributes jobs does not execute the jobs that it distributes, unless only one client machine is available.

Implementation of grid jobs

Prerequisites:

  • To execute grid jobs, you must use SchedulerX-Client 2.1.3 or a later version.

  • The sub-job objects distributed by dispatchTaskList must implement the Java Serializable interface.

  • The size of a single sub-job after serialization cannot exceed 64 KB.

  1. Implement the job processor interface.

    1. import com.alibaba.dts.client.executor.grid.processor.GridJobContext;
    2. import com.alibaba.dts.client.executor.job.processor.GridJobProcessor;
    3. import com.alibaba.dts.common.constants.Constants;
    4. import com.alibaba.dts.common.domain.result.ProcessResult;
    5. import java.util.ArrayList;
    6. import java.util.List;
    7. public class HelloGridJobProcessor implements GridJobProcessor {
    8. @Override
    9. public ProcessResult process(GridJobContext context) throws Exception {
    10. String taskName = context.getTaskName();
    11. if (Constants.DEFAULT_ROOT_LEVEL_TASK_NAME.equals(taskName)) {
    12. context.dispatchTaskList(buildDataSmall(context), "first-level-task");
    13. return new ProcessResult(true);
    14. } else if ("first-level-task".equals(taskName)) {
    15. System.out.println("business process!")
    16. return new ProcessResult(true);
    17. }
    18. return new ProcessResult(true);
    19. }
    20. private List<? extends Object> buildDataSmall(GridJobContext context) {
    21. int count = 1000000;
    22. List<Integer> data = new ArrayList<Integer>();
    23. for (int i = 0; i < count; i++) {
    24. data.add(i);
    25. }
    26. return data;
    27. }
    28. }
  2. On the Scheduler>Jobs page of the EDAS console, click Create Job in the upper-right corner, and select Grid Job for job type.

  3. Add SchedulerX-Client dependency to the application and configure the client.

    For more information about how to configure the client, see Quick start.

Rate limiting and throttling

Rate limiting aims to ensure smooth and stable distribution of jobs and prevent the accumulation of a large amount of sub-jobs from affecting the system performance.

Grid jobs support both global and job-level rate throttling. The global throttling works on all jobs in a specific group, while job throttling works on a specific job. Currently, the NONE and H2_COUNT policies are available for job-level throttling.

We do not recommend that you use global throttling. Instead, we recommend that you configure only one level of job distribution and set the H2_COUNT policy when the number of sub-jobs reaches one million.

The following is an example of throttling configuration:

  1. <bean id="schedulerxClient1" class="com.alibaba.dts.client.SchedulerxClient">
  2. <property name="groupId" value="1-3-3-3415"></property>
  3. <property name="flowControlStrategyMap">
  4. <map>
  5. <entry key="com.alibaba.schedulerx.grid.FibonacciGridJobProcessor" <!-- Replace the key with the fully qualified category name of the job to be throttled. -->
  6. value-ref="flowControlStrategyH2Count"></entry>
  7. </map>
  8. </property>
  9. </bean>
  10. <bean id="flowControlStrategyH2Count" class="org.springframework.beans.factory.config.FieldRetrievingFactoryBean">
  11. <property name="staticField"
  12. value="com.alibaba.dts.client.executor.grid.flowcontrol.FlowControlStrategy.H2_COUNT"/>
  13. </bean>

Route distribution

Note: This is an optional configuration item. If it is not configured, RoundRobinRule is used by default.

Route distribution refers to the job distribution method of calculating the route conditions using code and sending jobs to the client machine for execution accordingly.

The RandomRule and RoundRobinRule (default) routing policies are implemented by default. You can use RoundRobinRule to extend your own routing policy. To do this, you only need to implement the com.alibaba.dts.client.route.RouteRule interface and use it to replace the RemoteMachine rule(JobContext jobContext, List machines) method.

  1. public class RandomRule implements RouteRule {
  2. /** Randomly generated number */
  3. private Random random = new Random();
  4. @Override
  5. public RemoteMachine rule(JobContext jobContext, List<RemoteMachine> machines) {
  6. if(null == machines || machines.isEmpty()) {
  7. return null;
  8. }
  9. //Generate a random subscript
  10. int index = random.nextInt() % machines.size();
  11. return machines.get(Math.abs(index));
  12. }
  13. }

Spring bean configuration

  1. <bean id="schedulerxClient" class="com.alibaba.dts.client.SchedulerxClient">
  2. <property name="groupId">
  3. <value>1-1-1-1111</value>
  4. </property>
  5. <property name="routeMap">
  6. <map>
  7. <entry key="com.taobao.spring.TestGridJobProcessor" value-ref="randomRule"></entry>
  8. </map>
  9. </property>
  10. </bean>
  11. <bean id="randomRule" class="com.taobao.xxx.RandomRule"></bean>

Best practices

  • We recommend that you enable rate limiting for large-scale jobs to ensure stablility. The rate limiting parameter is flowControlStrategy, and the default value is FlowControlStrategy.NONE.

  • Do not use complex and time-consuming computing operations for job distribution routing.

  • To enable the distribution-only mode, set dispatchOnly to true. The default value is false. When a large number of sub-jobs, for example, more than one million sub-jobs, are to be distributed, we recommend that you enable the distribution-only mode to ensure the distribution efficiency. This mode is invalid when only one client machine exists.

  • When a large number of sub-jobs exist or a job contains large sub-jobs, we recommend that you evaluate the size of memory occupied by all sub-jobs in next distribution and set a proper number of sub-jobs to be distributed to prevent memory overflow. A proper configuration is that the occupied memory size accounts for less than 20% of the maximum heap memory size. If your sub-jobs require only a small amount of memory, skip this configuration.

Advanced parameter configuration - Number of threads consumed by the client

By default, the thread number of a standalone machine when executing sub-jobs is 5. You can modify the number of threads in the following configuration. key indicates the full name of the JobProcessor implementation class, and value indicates the number of threads.

  1. <bean id="dtsClientA" class="com.alibaba.dts.client.DtsClient">
  2. <property name="groupId">
  3. <value>1-3-3-2395</value>
  4. </property>
  5. <property name="consumerThreadsMap">
  6. <map>
  7. <entry key="com.taobao.spring.TestParallelJobProcessor"> <!--key indicates the full name of the JobProcessor implementation class.-->
  8. <value>64</value> <!--value indicates the value of consumerThreads, which is the number of threads in standalone mode.-->
  9. </entry>
  10. </map>
  11. </property>
  12. </bean>
Thank you! We've received your feedback.