In-depth analysis of hundreds of PB level data bus technology
Data Bus Introduction
As the traffic hub under the big data architecture, the data bus plays the role of data bridge between different big data components. Through the data bus, all kinds of heterogeneous data generated from servers, K8s, APP, Web, IoT/mobile terminals can be accessed in real time for unified data management, so as to achieve decoupling with downstream systems; After that, data cleaning, data distribution, real-time computing, offline computing and other computing processes can be realized asynchronously, and then the structured data can be delivered to the downstream analysis and archiving system, so as to achieve the goal of building a clear data flow. In a broad sense, data acquisition and access, transmission link, storage queue, consumption calculation, delivery, etc. belong to the category of data bus, and can be divided into acquisition access layer, pipeline layer, and computing layer as a whole.
Through the data bus, the following purposes can be easily achieved:
• Decoupling producers and consumers: consumers can completely avoid perceiving any details of the writer, reduce the complexity of system connection, and improve system reliability.
• Deal with the peak flow, make the production of data asynchronous with the consumption of data, and eliminate the peak and fill the valley.
• Define unified format and operation semantics: access multiple heterogeneous data, and build a unified format through data processing.
To take a simple example, in the computational advertising retrieval system, the point spread data of advertising is very important. A point spread data is often consumed by multiple subscriptions, and the application scenarios are different. There are real-time computing services with precision up to the second level, as well as hourly or daily batch processing tasks similar to Hadoop. If the data is directly connected, various abnormal scenarios need to be considered, which will make the system extremely complex. Through the data bus, the system complexity can be greatly reduced and the system reliability can be improved. This can ensure that any data subscription system can continue to process data from the previous breakpoint after it is back online, even after offline maintenance or downtime.
Technical challenges in cloud native scenarios
Facing the scenario of tens of billions of reads and writes every day, nearly one hundred petabytes of data traffic, and tens of thousands of users, building a highly available data bus will be a very challenging thing. Here are some traffic scenarios:
• Producer: due to business promotion and other activities, the traffic has increased to more than ten or hundreds of times in a few minutes;
• Consumers: There are dozens of subscribers consuming a data at the same time;
• Hundreds of heterogeneous data sources are accessed every day in different ways, requiring a large number of adaptations.
After decades of rapid development, the entire development mode, system architecture, deployment mode, infrastructure, etc. have also undergone several disruptive changes, which have brought about faster development and deployment efficiency. However, the whole system and network environment has become more complex, the deployment mode and operating environment have become more dynamic and uncertain, the data source and data volume of access have increased significantly, the flow fluctuation and other uncertain factors have become larger, and the access difficulty has become more different from the original structure. These are new requirements for the data bus in the cloud native era.
To sum up, the technical challenges of data bus in the cloud native era can be expanded from three aspects: acquisition access layer, pipeline layer and computing layer. The acquisition access layer focuses on the access richness, access ease, resource cost and data acquisition reliability of data sources; The pipeline layer focuses on network quality, bandwidth and horizontal expansion capability, security and isolation, ecological richness, etc; The computing layer focuses on computing syntax, traffic processing bandwidth and scalability.
Selection and comparison of open source solutions
At present, the mainstream big data architecture in the industry can be divided into five parts, of which the first three parts constitute the data bus.
• Acquisition end: It carries the function of observable data acquisition and part of the pre data processing. With the development of cloud native, the collection terminal also needs to adapt to the trend of the times and provide friendly support for K8s collection. Common collection terminals include Filebeat, FluentD/Fluent bit, Telegraf, and our open-source iLogtail.
• Message queue: The collection agent usually does not directly send the collected data to the storage system, but writes it to the message queue, which plays the role of cutting the peak and filling the valley to avoid the storage system downtime caused by the traffic peak. Common message queues are Kafka, RabbitMQ, etc.
• Calculation: It is used to consume the data in the message queue and output it to the storage system after processing and aggregation. Flink and Logstash are common. It is worth noting that some companies have begun to deploy the iLogtail open source version as a computing layer in the log architecture.
• Storage analysis engine: It provides the ability to collect data, store them persistently, and query and analyze them. Common storage analysis engines are Elasticsearch, ClickHouse, Loki, Prometheus and influxdb.
• Visualization: With Kibana and Grafana, it provides the visualization capability of data collection.
Users can build a set of data bus by using the open source components provided by the collection end, message queue, and computing. Although it is technically feasible to build a data bus based on open source components, the overall implementation complexity is high, and multiple systems need to be maintained for collaboration. In addition, it is also unable to fully meet the technical challenges faced by Cloud Native under the cloud native scenario mentioned above. For example, network quality and regional planning are a relatively insurmountable gap.
Overall architecture of data bus
The observable platform should not only solve the problem of how to obtain and query data, but also provide the application capability of specific business scenarios to help customers mine greater data value from fragmented and low information data. A typical cloud native observable platform can be divided into four levels from bottom to top: data bus, storage analysis, tools, and applications. Among them, the data bus is the data base of the entire observable platform, providing data guarantee for data analysis and upper business applications. Just because the data bus is basic enough, it must be reliable, stable enough to ensure the smooth flow of data and flexible enough to meet the demand for traffic changes in the process of enterprise digitalization. Next, we will focus on sharing the architecture and practice of Alibaba Cloud SLS data bus.
In the first part of this article, we introduced that a typical data bus can be divided into three layers: the acquisition access layer, the pipeline layer, and the computing layer. The architecture of the corresponding SLS data bus can be similarly divided.
• Acquisition access layer: It carries the access of all data on the data bus (supports Log, Metric, Trace, Event, etc.), and has deep integration with Alibaba Cloud services. The access method is based on SDK/API and extends a variety of access methods such as the end observable collector iLogtail, data import service, and open source standard protocol.
• Pipeline layer: LogHub is used as the core flow hub of the data bus, which can completely replace Kafka. The Restful API provides external access services, and the consumption group provides real-time consumption services.
• Computing layer: as a downstream service of LogHub, it provides various data processing and delivery services based on real-time consumption groups; It also supports the docking of mainstream open source stream computing ecologically.
Above, we have an overall understanding of SLS data bus. Next, we will introduce data access, traffic hub and data processing in detail.
Data Access Technology Architecture and Practice
Overview of data access capability
The self-developed open-source observable data collector iLogtail carries the observable data collection capabilities of server scenarios and container scenarios, covering Windows, Linux operating systems, X86 and ARM architectures. With the advantages of Alibaba Cloud, it seamlessly supports the collection of logs, indicators, and security audit data of various mainstream cloud services on Alibaba Cloud.
It also has rich support for general protocols, and is compatible with open source protocols such as Syslog, Kafka, Promethous, JDBC, and OpenTelemertry; It supports many open source collection tools, such as Logstash, Fluentd, Telegraf, etc.
For Java, Go and other big data, high concurrency scenarios, we provide Java Producer and Go Producer on the basis of SDK; C Producer is introduced for IoT/embedded devices.
Compared with sending data directly through APIs or SDKs, using Producer provides a higher level of encapsulation, and greatly improves performance and reliability.
• Thread safety: All methods and exposed interfaces in Producer are thread safe.
• Asynchronous sending: client computing is separated from I/O logic. Calling the producer's sending interface usually returns immediately. The producer caches and merges the sending data internally, and then sends the data in batches to improve throughput.
• Failure retry: For retrievable exceptions, the Producer will retry according to the maximum number of retries and the retry backoff time configured by the user.
• Elegant closing: When the user calls the closing method to close, the producer will send all its cached data to prevent the log from being lost.
• High performance: effectively improve the sending efficiency by means of multithreading, caching strategy, batch sending, etc.
Observable data collector iLogtail
ILogtail is an important traffic source for data bus data access. With its lightweight, high-performance, automatic configuration and many other production level features, iLogtail can be deployed in physical machines, virtual machines, Kubernetes and other environments to collect telemetry data. The core positioning of iLogtail is the collector of observable data, which helps developers build a unified data collection layer and helps observable platforms create various upper application scenarios. The iLogtail can solve the problem of data bus data access.
Thanks to the continuous refining of Alibaba/Ant Group and the public cloud scenario, iLogtail is more advanced in performance, resource consumption, reliability, multi tenant isolation, K8s support and other hard indicators compared with open source agents (such as Fluentd, Logstash, Beats), and can meet the stringent requirements under multiple business scenarios. At present, iLogtail has been completely open source on June 29, 2022, attracting the attention of many developers, and the number of GitHub Stars also exceeded 1K on November 7, 2022.
One of the core advantages of iLogtail is its high performance and low overhead. ILogtail uses inotify as the main means of file monitoring under Linux, providing the ability to discover data with millisecond delay. In order to take into account different operating systems and support various special collection scenarios, iLogtail also uses polling as the data discovery method. By using the hybrid method of polling and event coexistence, iLogtail has created a file discovery mechanism that has both performance advantages and robustness. In addition, iLogtail adopts a non lock event processing model. Unlike other open source agents in the industry, which allocate independent threads/Routine to read data for each configuration, only one thread is configured to read iLogtail data. Since the bottleneck of data reading is not computing but disk, a single thread is sufficient to complete all configured event processing and data reading. The use of single thread enables the iLogtail event processing and data reading to run in a lockless environment, and the data structure is lighter, thus achieving better cost performance than multithreading processing.
In the production environment, hundreds of collection configurations exist for a service, which is normal. The priority, log generation speed, processing method, and upload destination address of each configuration may be different. Therefore, it is necessary to effectively solve the problem of how to isolate various user-defined configurations to ensure that the QoS of the collection configuration is not affected by some configuration exceptions. ILogtail adopts multiple key technologies such as time slice based acquisition scheduling, multi-level high and low water level feedback queue, event non blocking processing, flow control/stop acquisition strategy, and configuration dynamic update, and integrates to achieve a multi tenant isolation scheme with five characteristics: isolation, fairness, reliability, controllability, and cost performance. After years of double 11 traffic peak, this solution has proved to be more stable and cost-effective than other open-source solutions.
It is no exaggeration to say that the diversity of data sources can be the lifeblood of the data bus, otherwise, it is difficult to make bricks without straw. Through plug-in design, iLogtail breaks through the category of simple file collection, effectively expands the upstream and downstream ecology, and makes iLogtail a real observable collector. At present, iLogtail has supported access to many data sources. The data source types cover Log, Metric, and Trace. In addition to file collection, data sources also include support for standard protocols, such as HTTP, MySQL Binlog, Prometheus, Skywalking, and Syslog; ILogtail is also supported by eBPF, enabling intrusion free network data collection. The data output ecology is also gradually expanded from SLS to Kafka, gPRC, etc. In the future, ClickHouse, Elastic Search, etc. will also be supported through the co construction of open source communities.
Facing the access of many heterogeneous data, one of the responsibilities of data bus is to build a unified data format through data processing. ILogtail also provides powerful data processing capabilities, which can pre complete data format regulation, data filtering, context correlation, etc. The iLogtail is designed as a PipeLine. First, the data is collected through the Input plug-in, processed by the Processor set in the collection configuration, packaged by the Aggregator plug-in, and finally sent to the storage system through Flusher. The data processing environment includes data segmentation, field extraction, filtering, data enhancement, etc. All plug-ins can be combined freely.
With the cloud native landing, Logtail has fully supported Kubernetes, and currently supports Docker and Container two mainstream container runtimes. ILogtail provides the ultimate container data collection experience by monitoring the container list in real time and maintaining the container and log collection path mapping, combined with efficient file collection capabilities. ILogtail supports container filtering using container tags, environment variables, K8s tags, pod names, namespaces, and other methods, providing users with convenient collection source configuration capabilities. It supports DaemonSet, Sidecar, CRD and other deployment methods, providing flexible deployment capabilities for different use scenarios. In addition, iLogtail also provides K8s native support for users with high requirements for CICD automatic deployment and operation and maintenance, and supports collection configuration management through CRD. In this scheme, iLogtail K8s adds a CustomResourceDefinition extension called AliyunLogConfig. At the same time, Alibaba log controller is developed to listen to AliyunLogConfig events and automatically create iLogtail collection configurations to complete log collection.
Traffic Hub Technology Architecture and Practice
As the traffic hub of SLS data bus, LogHub is a high throughput data channel that supports real-time access and consumption of massive observable data. In the observable data scenario, Kafka and other message queue products can be used, and the performance, ease of use and stability are better.
LogHub can be understood as an append only log queue structure, which achieves horizontal expansion of IO and storage through multiple shards. In addition, a multi-level index is created on the basis of the queue, which can quickly locate the position of each piece of data in the queue and endow the queue model with the ability of random query.
• Resilience: 1~512 shards are supported, which can be expanded and shrunk in milliseconds.
• High throughput: A single shard supports 5MB/sec writes and 10MB/sec reads.
• High error tolerance: A stand-alone failover does not affect normal writes.
• De duplication: Supports ExtractlyOnce de duplication.
• Random query: 1% extra storage cost, supporting random query of any data (1 IO).
Logstore/Metricstore is the collection, storage and query unit of SLS observable data. LogHub, as the queue model of Logstore/Metricstore, provides the ability of real-time data writing and streaming consumption. At the same time, on this basis, the model is extended, and then a unified observability analysis engine is built.
• Index model: provides data filter capability by creating indexes. When querying, you can quickly locate the hit data according to the index and efficiently select the required data from the queue model.
• Original column storage model: It provides big data statistics and analysis capabilities for column storage of specific fields.
• PK (time series) column storage model: according to the characteristics of time series data, it provides a model of field column storage after primary key sorting, which effectively improves the efficiency of time series data reading.
Globalization support and intelligent acceleration
LogHub is the infrastructure on Alibaba Cloud. With the help of Alibaba Cloud and other global deployments, it has priority, and is synchronized with Alibaba Cloud regions. This is also an advantage that other open source data buses cannot match. It can ensure that the global business chooses the nearest region, and it can also meet the requirements of some countries or organizations that the relevant legal data cannot be exported.
LogHub and CDN launched a global automatic upload acceleration scheme. Based on Alibaba Cloud CDN hardware resources (covering 2800+nodes and 70+countries), global data is accessed to nearby edge nodes and routed to LogHub through internal high-speed channels, greatly reducing network latency and jitter.
Elasticity and order keeping processing
In production, we often face the situation of peak and low traffic, as well as the scenario where a partition (shard) has very large traffic due to unbalanced business layer mapping. Elastic scaling (Merge/Split) is the mechanism to solve this problem.
The principle of shard splitting
Shard splitting operation is an important means of traffic expansion, which is to split a readwrite shard into two readwrite shards; At the same time, the original Shard becomes readonly, and the above data can continue to be consumed and read.
The merge operation is the opposite of the split operation. It combines two adjacent readwrite shards into a readwrite shard, and the original two shards become readonly.
Load balancing: elastic capacity expansion according to peak value and bottom value to control costs.
In the figure below, only Shard0 provided services at the earliest; Later, with the peak traffic in the evening, a single shard was not enough to support business traffic, and shard0 was split into shard1 and shard2 to provide services; When the traffic becomes stable again, a new Shard3 will be merged to provide services for the sake of cost.
Log preservation processing: map different instances to different partitions, and adjust the processing capacity of partitions.
For some business scenarios, there is a need to preserve order. When writing data, the data of different businesses are often mapped to fixed shards through Hash rules.
In the business scenario shown in the following figure, Shard1 carries the traffic of three DBs. At this time, the traffic can be balanced by shard splitting. After splitting, the original Shard1 becomes read-only, and the data can still be consumed, but no new data will be received for writing. The newly split Shard3 and Shard4 should provide external services.
How to preserve the order of shard's boundary data before and after adjustment? Loghub provides the ability for shards to consume sequentially. That is, after shards are split, the original shard data (i.e., shard1 data) is consumed first, and then the shard data split by the shard is consumed simultaneously (Hash policy ensures that the same business falls on one shard). Similarly, in the shard consolidation scenario, the original shard data is consumed first, and then the new shard data merged from the original shard is consumed. Of course, for scenarios that are not strictly sequential, in order to improve the consumption rate, you can turn off the sequential consumption function of shards, so that all shards can be consumed at the same time.
Coarse grained flow control: Project level
The main purpose of project global flow control is to limit the overall resource usage of users, reject requests at the front end, and prevent traffic from penetrating the back end and exploding the entire cluster.
• Thousands of Nginx front ends per second summarize the traffic and requests of various received projects and send them to QuotaServer.
• QuotaServer summarizes all the project statistics from Nginx to determine whether the quota upper limit is exceeded, and then decides whether to take flow limiting measures. After that, all Nginx front ends will be notified of the project list exceeding the quota.
• After the Nginx front end obtains the list of disabled projects, it will react immediately and reject the requests of these projects.
Fine grained flow control: shard level
The shard level flow control is more sophisticated, semantic (error code) is more explicit, and more controllable.
• Each shard clearly defines the processing capacity, such as 5MB/sec writes and 10MB/sec reads.
• When the shard queue is blocked, whether the user is flow limiting or system error (the returned Http error code is 403 or 500) will be returned based on whether the shard traffic exceeds the quota. At the same time, the shard flow limiting information will be notified to the QuotaServer.
• After receiving the current limiting information, QuotaServer can instantly synchronize the current limiting information to all Nginx.
• Nginx terminal obtains shard flow control information, and then conducts accurate flow control on shard.
Data processing technology architecture and practice
From the data access part above, we can see that the data bus, as the traffic hub, carries the access of various heterogeneous data. Although iLogtail has strong pre data processing capabilities, there are often data formats that may vary greatly due to historical reasons or differences in access methods. It is difficult to achieve a unified format specification, which brings great difficulties to subsequent analysis. Therefore, the data bus often needs a data processing link, for which we provide data processing capabilities mainly based on data processing services and scheduled SQL.
• Data processing: stream model implementation based on LogHub, real-time line processing of data and link arrangement.
• Scheduled SQL: Implemented based on the Logstore index model, it performs scheduled analysis and aggregation processing on data, and is generally used for sampling, de duplication, or indicator construction.
In addition, in order to better support the open source ecosystem, it also supports the ability of custom consumption or Flink streaming consumption.
Data processing services can effectively solve most of the processing and orchestration scenarios:
• Regularization: This is the most frequently used scenario, such as extracting key information from text log data and converting it into standardized data.
• Enrichment: For example, the user's click data only contains the commodity ID, and detailed information needs to be associated from the database during analysis.
• Desensitization: With the improvement of information security related laws in China, the requirements for handling sensitive data (such as personal information) are becoming higher and higher.
• Splitting: When writing data, multiple pieces of data will be combined for output for performance and convenience, and independent data items need to be split before analysis.
• Distribution: write different types of data to different specific targets for downstream customization.
The data processing service implements load balanced consumption data from the source Logstore based on the real-time consumption group. The number of shards in the source Logstore determines the upper limit of the concurrency of data processing tasks. When the data processing scheduler starts a new job, it will automatically create and bind to a consumption group in the source Logstore. Multiple consumers in the consumption group are independently responsible for processing data in different shards. As the amount of data increases, more shards need to be split from the source Logstore. At the same time, more consumers can work independently for data processing jobs.
When jobs need to associate external resources, each consumer independently maintains a copy of the resources to achieve high-performance correlation computing.
In addition to the huge amount of data, the log data is characterized by periodic fluctuations in the amount of data and extremely narrow wave peaks. For example, for the live broadcast platform, 90% of its daily traffic comes from the leisure period from 21:00 to 23:00, which leads to the data volume in this period being dozens of times as much as usual. In the face of such extreme scenarios, data processing needs to be able to achieve the elastic expansion of computing power, ensure the high-performance operation of user jobs, and minimize resource waste.
The elastic scaling of data processing in task scheduling is based on LogHub's consumption group and K8s' HPA mechanism. In the system, the data processing computing force can be expanded freely through the separation of storage and computing. On the user side, we see a service-oriented computing platform that pays as you go, without caring about complicated details.
The native K8S HPA only supports CPU and memory, which is sufficient for most data intensive jobs. However, in data processing, some customers will arrange data and transfer data across regions. In the face of such network/IO sensitive scenarios, the built-in HPA indicators cannot meet. Therefore, we have introduced more comprehensive HPA indicators to support job requirements in a variety of business scenarios. In addition, we will use intelligent algorithms to continuously upgrade and optimize HPA indicators. For example, based on the historical running characteristics of jobs, we will allocate resources in advance to further improve the stability of job running.
Cloud Data Processing Language（DPL）
The data processing service designs the user side interface based on Python syntax and provides perfect built-in data processing functions. Here we call it Cloud Data Processing Language (DPL). With the help of DPL, very complex data processing logic can be completed through a few codes.
The following is an example of information enrichment: in an http request, the detailed description of the request status code (http_status field) (http_code_desc field) is added to the original data through the association of RDS dimension tables.
The number of time-based data (logs, indicators) is amazing after accumulation. Taking Nginx access logs as an example, each HTTP/HTTPS access request will record an access log. Assuming 10 million pieces of data are generated every day, it will be 3.6 billion pieces of data every year. A large amount of data storage for a long time will not only occupy a large amount of storage space, but also cause great performance pressure on data analysis and processing.
Schedule SQL can perform regular aggregation, analysis and processing of data (supporting standard SQL, SLS queries and analysis statements), execute periodically according to the scheduling rules, and write the running results to the target database. Compared with the self built API calling method, it has the following advantages:
• The SQL runtime is improved to 600 seconds, and the maximum processing time is 10 billion level data.
• It shall be executed in a minimum 1-minute cycle, and supports dispatching operation in a resident or fixed time interval.
• Support flexible query time window parameter configuration to meet diversified needs.
• Write exactly once to the target library.
• Fully managed operation, automatically handling various exceptions.
• Other enterprise features: perfect job instance viewing, retry support (console, API), and alarm notification for instance execution failure.
Application Practice of Cloud Service Data Dispatch Technology
Overview of Data Distribution Technology
Cloud services (such as OSS, SLB, etc.) are often hosted by cloud vendors, and users cannot directly log in to the server to view logs. Data dispatching technology is a key service module based on SLS data bus technology. On the premise of user authorization, it can deliver observable data from the cloud service side to each user side. The implementation mechanism is that the cloud product service side stores logs in the centralized data hub (LogHub) through the pre collection, cleaning (processing) process, and then distributes the data to the user side in the form of multiple tenants through distribution services. It has the following advantages:
• Multi tenant isolation
• Unlimited expansion of traffic
• High concurrency and low latency through multiple shards
Cloud Lens architecture
The data distribution technology only solves the problem of the flow of observable data from the cloud product service side to the user side. However, to achieve the collection of cloud service logs, cloud asset ownership, multi account system authentication, etc. still need some upper layer services to maintain. To this end, we have built a number of basic services, including asset synchronization, automatic collection services, multi account system, etc., which are collectively referred to as unified access services. Later, based on the unified access service, we further provided unified interaction and functional components for each cloud product, and built a higher-level APP capability -- Alibaba Cloud Lens.
Alibaba Cloud Lens builds the observability of unified cloud products based on SLS, and provides services such as usage analysis, performance monitoring, security analysis, data protection, exception detection, and access analysis of Alibaba Cloud products. From six dimensions of cost, performance, security, data protection, stability, and access analysis, it provides the ability to assist in the analysis of cloud product operation and maintenance, effectively reducing the threshold of cloud product observability.
Dynamic data discovery and collection
Automatic scheduling of acquisition links is a major feature of Cloud Lens. Through the graphical method, one click, automated asset discovery and automatic data collection (to the user side) can be realized. The real reason behind this is to rely on DSL code to achieve the layout of collection logic.
Thinking and prospect of the future direction
In the future, SLS data bus technology will also continue to work at the three levels of acquisition access, pipeline and computing, providing a stable and reliable data base for the observable platform.
Knowledge Base Team
Knowledge Base Team
Knowledge Base Team
Knowledge Base Team
Explore More Special Offers
50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00