An Enterprise-level Distributed Batch Processing Solution Based on Task Scheduling

Background

Let's talk about distributed batch processing first. Literally, there is a large amount of business data that requires applications to perform batch computing and processing. However, executing in stand-alone mode can take a long processing time, and it cannot fully utilize the processing capabilities of each application node in a business cluster. Some common distributed batch processing schemes can effectively enable all business application nodes in a business cluster to collaborate to complete a large batch of data processing tasks, thereby improving the overall processing efficiency and reliability.

Batch Model

In a simple stand-alone scenario, multiple threads can be enabled to simultaneously process a large task. In multiple machines, multiple machines can simultaneously process the same task in parallel. Therefore, the distributed batch processing scheme needs to shield developers from the distributed coordination logic between business application clusters such as task splitting, distribution, parallel execution, result aggregation, failure tolerance, and dynamic expansion at the code development level, allowing users to focus only on the business logic fragmentation rules and business logic processing described in the red box above.

Big Data Batch Processing Comparison

In big data processing scenarios, we will also use the MapReduce model, whose processing logic essence is consistent with the business batch processing logic we will discuss. In the big data scenario, batch processing is mainly oriented towards the data itself, and it is necessary to deploy corresponding big data platform clusters to support data storage and data batch processing. Therefore, the main purpose of this scenario is to build a complete data platform. Compared to the big data batch processing scenario, this time we mainly focus on discussing the distributed business batch processing scenario, building distributed batch processing logic based on existing business application service clusters. The following requirements can be addressed through a distributed batch processing solution

• Decoupling time-consuming business logic to ensure fast response to core link business processing

• Fully schedule all application nodes of the business cluster to cooperate and batch complete business processing

• Unlike big data processing, other online business services are also called to participate in the batch processing process during subtask processing

Open source batch processing scheme

ElasticJob

ElasticJob is a distributed task scheduling framework. Its main characteristics are to implement scheduled scheduling based on Quartz and provide the ability to partition and coordinate tasks in a business cluster. The entire architecture is based on Zookeeper to achieve task fragmentation execution, dynamic elastic scheduling of application clusters, and high availability of subtask execution. The fragmentation scheduling model can support balanced distribution of mass business data processing to each node in the business cluster for processing, effectively improving task processing efficiency.

• SimpleJob

The Spring Boot project can configure task definitions through YAML, specifying the following: task implementation classes, scheduled scheduling cycles, and fragmentation information.

The configured org.example.job.SpringBootSimpleJob class needs to implement the execute method of the SimpleJob interface, and obtain the corresponding business fragment data through the ShardingContext parameter for business logic processing.

We deploy three application services as a scheduling processing cluster to process the above tasks. When the task triggers to run, the ElasticJob will process the corresponding three fragmented tasks to the three application services to complete the entire task data processing.

• DataflowJob

Currently, there is no essential difference between DataflowJob and SimpleJob in terms of their overall structure. Referring to the following interface, compared to SimpleJob, it adds a fetchData method for the business party to load the data to be processed by itself. In fact, it disassembles the execute method of SimpleJob into two steps in logical definition. The only difference is that DataflowJob provides a resident data processing task (which can be called streaming process), supporting the task to run permanently until fetchData is empty.

Add props: streaming. process=true to the yaml configuration of the DataflowJob task to achieve the effect of streaming process for this task. After the task is triggered for execution, each sharded task will be executed in a loop according to the corresponding process: fetchData ->processData ->fetchData until the fetchData is empty. Scenario analysis of this mode:

• A single sharding task has a large amount of data to wait for. When fetching Data, read the partial paging data of the sharding and process it until all data is processed

• Fragmented pending data is continuously generated, enabling tasks to continuously acquire data through fetchData, achieving long-term residence and continuous business data processing

• Characteristic analysis

ElasticJob's distributed sharding scheduling model provides great convenience and support for common and simple batch processing scenarios, and solves the entire coordination process of distributed sharding execution for a large batch of business data processing. In addition, there may be some shortcomings in the following aspects:

• The core of the entire architecture depends on ZK stability

• Requires additional O&M deployments and high availability

• A large number of tasks are stored and triggered, and the running process relies on ZK. When the task volume is large, the ZK cluster can easily become a scheduling performance bottleneck

• The number of sharding configurations is fixed, and dynamic sharding is not supported

• When the amount of data to be processed for each partition varies greatly, it is easy to break the cluster processing capacity balance

• If the definition of fragmentation is not reasonable, the cluster elasticity will lose effect when the cluster size is much larger than the number of fragments

• Fragmentation definition and business logic are relatively fragmented, and it is difficult to maintain the relationship between the two manually

• Weak control desk capabilities

Spring Batch Batch Framework

The Spring Batch batch processing framework provides lightweight and sophisticated batch processing capabilities. The Spring Batch task batch processing box mainly provides two methods: single process multithreading and distributed multiprocessing. In the single process multithreaded processing mode, users can customize a job as a batch task unit. A job is composed of one or more step steps in series or in parallel. Each step is composed of a reader, a process, and a writer to complete the reading, processing, and output of each step of the task. The following discussion focuses on analyzing scenarios where a job only contains one step.

Personally, the Spring Batch framework does not have much practical significance in multithreading in a single process. The main reason is that it takes a bit of effort to implement this framework for processing small batches of data tasks. You can completely open a thread pool to solve the problem yourself. This discussion mainly focuses on the scenario of distributed collaborative completion of business data batch processing tasks in a certain scale business cluster. Spring Batch provides remote sharding/partitioning processing capabilities. In the Step of a job, tasks can be divided into multiple subtasks according to specific rules and distributed to other workers in the cluster for processing, to achieve distributed parallel batch processing capabilities. Its remote interaction capabilities typically rely on third-party messaging middleware to achieve subtask distribution and execution result aggregation.

• Remote Chunking

Remote chunking is a distributed batch processing solution provided by Spring Batch when processing large batches of data tasks. It can load data through ItemReader in a single step to build multiple chunks, and ItemWriter distributes these chunks to cluster nodes through message middleware or other forms. Cluster application nodes perform business processing on each chunk.

Remote Chunking Example

In the above main nodes, ItemReader and ItemWriter can be mapped to the "task split split" phase of the batch model discussed in this discussion. The main node pair, ItemWriter, can use the ChunkMessageChannelItemWriter provided by Spring Batch Integration, which completes batch task data loading and block distribution through integration with other channels provided by Spring Integration (such as AMQP and JMS).

The Slave node mainly performs corresponding business logic processing and data result output on the distributed Chunk block data (which can be understood as subtasks). Therefore, on the subtask processing side, it is necessary to configure the ChunkProcessorChunkHandler provided by Spring Batch Integration to complete related actions such as subtask reception, actual business processing, and feedback processing results.

• Remote Partitioning

The main difference between remote partitioning and remote blocking is that the master node is not responsible for data loading, which can be understood as splitting the current step into multiple sub steps (also known as sub tasks) through the Partitioner, and then distributing the corresponding sub tasks to each slave node for processing through the PartitionHandler. Therefore, Spring Batch Integration provides a MessageChannelPartitionHandler to achieve corresponding sub task distribution, Its underlying layer also needs to rely on messaging middleware for adaptation and docking. At each slave node, it is necessary to read the context information of the subtask Step, and perform complete ItemReader, ItemProcess, and ItemWrite processing based on this information.

• Characteristic analysis

Spring Batch framework, comprehensive feature analysis:

• Complete batch processing capabilities: support single machine multithreading, distributed multiprocess collaborative batch processing, and support customized fragmentation models.

• Unscheduled scheduling support: The native unscheduled scheduling capability requires the integration of a third-party timing framework (e.g., Spring Task needs to resolve cluster repeated triggers on its own).

• Weak visual control ability: Spring Batch commonly uses programs or files to configure tasks, and the control console needs to be built additionally, with weak control ability.

High integration difficulty: its distributed batch processing capability requires additional third-party middleware integration, or self-development based on its interface; Completing enterprise level usage based on officially provided methods requires relatively complex planning integration.

Enterprise level batch processing scheme - SchedulerX visualization MapReduce task

The SchedulerX task scheduling platform provides a complete overall solution for enterprise level batch processing needs. Users can easily achieve distributed batch processing capabilities for business application clusters by directly using the services of public cloud platforms (users can also support docking for non Alibaba Cloud business application deployments), without the need to deploy additional middleware integration maintenance.

Principle analysis

In the entire solution, the task scheduling platform provides users with comprehensive visual control, highly reliable scheduled scheduling, and visual query capabilities for registered tasks. In addition, by integrating the SchedulerX SDK on the user business application side, rapid access to distributed batch processing capabilities can be achieved. At this time, users only need to care about the subtask business segmentation rules and the processing logic of each subtask in the batch processing model. This distributed batch process has the following features:

• High availability of subtasks: When the cluster execution node goes down, automatic failover is supported to redistribute the subtasks on the offline machine to other nodes

Automatic elastic expansion: When a new pair of application nodes are deployed in the cluster, they can automatically participate in the execution of subsequent tasks

• Visualization capability: Provide various monitoring, operation, maintenance, and business log query capabilities for the execution process of tasks and subtasks

The following describes the general principle process:

• After the MapReduce task is created on the platform, the scheduled scheduling service will enable highly reliable scheduled trigger execution for it

• When a MapReduce task triggers execution, the scheduling service will select a node from the accessed business worker nodes as the main node for this task operation

• The master node runs subtask splitting and loading logic that executes user-defined development, and distributes subtask processing requests evenly to other worker nodes in the cluster through map method calls

• The master node will monitor the processing process of the entire distributed batch processing task, as well as the health monitoring of each worker node, to ensure the overall operation and high availability

• After receiving subtask processing requests, other worker nodes begin to callback and execute user-defined business logic, ultimately completing the processing requirements for each subtask; And the number of parallel threads that a single application node can simultaneously process subtasks can be configured.

• After all subtasks are completed, the main node will aggregate the execution results of all subtasks and call back the reduce method, and feed back the scheduling platform to record the execution results of this time

Developers only need to implement a MapReduceJobProcessor abstract class in the business application, and load the list of business subtask data objects that need to be processed this time in the isRootTask; In a non root request, obtain single subtask object information through jobContext. getTask(), and execute business processing logic based on this information. After the business application deployment is published to the cluster node, when the task triggers to run, all nodes of the cluster will participate in coordinating the execution of the entire distributed batch processing task until it is completed.

Functional advantages

• Subtask visualization capabilities

User Dashboard: provides visual recording information for triggering and running all tasks.

Visualize subtask details: By querying the task execution record details, you can obtain the execution status and node of each subtask.

• Sub task business log

Click "Log" in the subtask list to obtain logging information during the current subtask processing process.

• Execution stack view

The execution stack view function can be used to facilitate troubleshooting of the corresponding execution thread stack information in scenarios where a sub task is stuck during processing and the operation is not completed.

• Custom business tags

The subtask business tag capability provides users with the ability to quickly and visually view and query subtask business information. In the following figure, "Account Name" is the business tag information segmented from this subtask. Users can quickly understand the processing status of the corresponding business subtask based on this information, and support querying the subtask processing status of the specified business tag information.

To configure custom labels for subtasks, you only need to implement the BizSubTask interface for the subtask objects distributed in this map and implement its labelMap method to add their own business feature labels for each subtask for visual queries.

• Compatible with open source

SchedulerX supports executors written based on common open source frameworks, including XXL-Job and ElasticJob. Subsequent scheduling platforms will also plan to support scheduling Spring Batch tasks.

Case scenario

Distributed batch processing models (visual MapReduce models) exist in a large number of demand scenarios in actual enterprise level applications. Some common usage scenarios include:

• For batch parallel processing of sub database or sub table data, distribute sub database or sub table information among cluster nodes to achieve parallel processing

• Processing logistics order data by city region, distributing cities and regions as sub task objects between cluster nodes to achieve parallel processing

• Given the visual MapReduce subtask visualization capability, key customer/order information can be used as subtask processing objects for corresponding data report processing or information push to achieve visual tracking of important subtasks

• Fund sales business case

The following provides a fund sales business case for reference. If a distributed batch processing model is used, users can freely play in their own business scenarios. Case Description: Every day, investors' account/transaction application data is processed synchronously between a fund company and a fund sales company (such as Ant Wealth), often using file data interaction. A fund company is completely independent of more than N vendors (and vice versa), and the data files provided by each vendor are completely independent; Each vendor's data file requires several fixed steps such as file verification, interface file parsing, data verification, and data import. When processing the above fixed steps, fund companies are very suitable for adopting distributed batch processing to speed up data file processing. Each vendor is distributed as a subtask object to the cluster, and all application nodes participate in parsing the data file processing of different vendors assigned to them.

The main purpose of the case is to process a business step in fund transaction clearing using parallel batch processing, and each subsequent processing step can also be processed in a similar manner. In addition, each visual MapReduce task node can build a complete automatic business clearing process through DAG dependency orchestration.

Summary

The distributed task scheduling platform SchedulerX provides a comprehensive solution for enterprise level distributed batch processing, providing users with a fast and easy to use access mode, and supporting scheduled scheduling, visual operation tracking, manageable, simple operation and maintenance, and highly available scheduling services. At the same time, it is equipped with a set of enterprise level monitoring capabilities such as large disk, log services, and monitoring alarms.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us