construction and practice of kwai trillion trillions of real-time olap platform

Posted by millikan at 2020-03-15

On December 7-8, in Beijing, ArchSummit global architects summit, Kwai Lee, a quick technology big data platform architect, shared the rapid development and practice of Kwai Tai on OLAP platform. The following are the main contents of the speech, which are abridged.

Kwai App is currently 150 million live, and it generates trillions of user behavior data every day. It is a challenging and valuable job to explore these data efficiently. Today we will focus on sharing the Kwai Tai design and the main improvement process of building trillions of data scale OLAP platforms.

Kwai OLAP platform was born for a long time. Before April 2018, some of the requirements of multidimensional analysis still adopted predefined index plus off-line computing. Its shortcomings were obvious. First, the definition of index is very fixed, and the off-line calculation is also very poor.

In April this year, Druid OLAP analysis engine was launched, and superset data visualization platform solved many business pain points. In May, Druid platform was upgraded to the latest version of 0.12 in the community at that time. During the upgrade process, time zone, file loading performance and other problems were solved. In July, the number of daily input messages of Druid platform has exceeded 100 billion, and the number of user configured visualization charts has exceeded 1000. After July, the platform has entered a rapid development stage. Druid has many problems in query performance and stability, and we have made many improvements. In September, the Druid probe system, time sequence and dimension materialized view functions, indexing service fine-grained resource allocation, etc. were launched. In addition, a lot of optimization work was done at the resource scheduling level. As of November this year, the peak data volume of daily message intake of OLAP platform has exceeded 500 billion, and the number of user configured visualization charts has exceeded 10000.

In the past six months, OLAP platform has developed very fast. Thanks to Druid based high availability architecture design and the efforts of team partners, the whole OLAP platform has not experienced medium or large-scale failures since its launch, and the service is very stable.

There are 150 physical servers in the Kwai OLAP platform, and more than 2000 data sources are accessed. The number of messages entered daily is about 500 billion, and the index data storage is about 400TB. The peak number of queries per day is 10 million, which is very large, but there are many API calls triggered in the program, and the proportion of human triggered is small. On the whole, the average query delay is 50ms, P90 is about 100ms, and p99 is 500ms to 1s. In terms of visualization, there are more than 800 user Kanban and more than 10000 charts.

The first is multimedia quality analysis service. The Kwai has used several CDN manufacturers in China, covering hundreds of domain names, and tens of billions of CDN quality monitoring data reported daily. CDN service quality will directly affect the user experience of the main station app. The CDN quality team of the company needs to analyze and intelligently schedule the CDN monitoring data in real time, and monitor the scheduling effect in real time. In addition, for CDN quality problems, we need to make rapid analysis and positioning, which is also a multi-dimensional analysis process. OLAP technology can well meet this demand.

Another business scenario is A/B Test, which Kwai has already conducted about 1000 A/B experiments, with thousands of A/B indicators to be compared, and tens of billions of data per day to flow into the A/B Test platform. The analysis of a / B test index is also a typical multi-dimensional analysis process. OLAP platform should meet the needs of hundreds of thousands of query calls every day, and the query delay should be guaranteed to be 100 milliseconds.

During the selection of OLAP platform, we have investigated the needs of multiple business teams of the company. In summary, we will pay more attention to the following points. For example, with the support of super large data scale, a single data source may have tens of billions of data volume to be entered every day; query delay should be guaranteed to be in milliseconds to seconds; data real-time, many business lines clearly put forward the demand for real-time data analysis; in addition, there are high concurrent query, platform stability, etc., in addition, there are some relatively low weight requirements: such as data schema Flexible change, accurate de duplication function, and support of squ interface.

According to the summary of user research, we compare the OLAP technology which is commonly used now.

First of all, hive / sparksql is widely used in the field of data warehouse, but it is difficult to meet the requirements of milliseconds to seconds because of the query delay. At the same time, because of offline computing, the data timeliness is relatively poor.

Secondly, ES is a very powerful system, which can meet the needs well in the medium-sized data scale scenario, but in the trillion and larger data scale scenario, the data write performance and query performance are facing a big bottleneck.

Kylin and Druid have similar functions. Considering that Druid adopts OLAP architecture, data timeliness is better than kylin, and data changes are more flexible. Therefore, Druid is selected as the query engine of OLAP platform.

The above figure shows the architecture of Druid system, in which coordinator and overload are the main nodes of Druid; middle manager is mainly responsible for data index and index file generation; historical node is mainly responsible for loading index file and providing query service for historical data; broker is the access node for query; in addition, Druid also needs to store metadata, such as selecting and selecting When MySQL; middle manager generates index files, it needs to publish the index files to a shared storage system first. We chose the HDFS system that is commonly used by everyone.

As mentioned above, Druid's query performance is very good, which is mainly due to the following five technical points: data pre aggregation, column storage, bitmap index, MMAP, and the intermediate cache of query results. Let's talk about two specific points.

Let's start with data pre aggregation. Druid divides a row of data messages into three parts, including timestamp column, dimension column and indicator column. The so-called pre aggregation means that when the data is entered into the Druid system, the original data will be pre aggregated according to a certain period of time, and the index to be calculated, that is, the content to be indexed, will be aggregated according to a full dimension. All subsequent queries are secondary queries based on these pre aggregated intermediate results.

Next, let's talk about bitmap index. Bitmap index is mainly used to speed up conditional filtering scenarios when querying. When Druid generates the index file, it generates the corresponding bitmap set for each value of each column. As shown in the figure, when gender is male, the corresponding bitmap is "1001", and the gender representing lines 1 and 4 is "male". For example, if you want to filter the data with gender = female and city = Taiyuan, you only need to operate the bitmap "0110" corresponding to gender and the bitmap "0101" corresponding to Taiyuan, and the result is "0100", which means the second line meets the filtering conditions. Through bitmap, you can quickly locate the data to be read and speed up the query.

As for Druid module, Druid supports real-time data import from Kafka, as well as offline data import from HDFS or hive system in batches; Druid provides rich query API interface. In addition to the restful interface provided by default, python, Java, go and other programming languages have third-party API interfaces. In addition, Druid also provides support for SQL interfaces. It is worth mentioning that Hive has implemented support for Druid through StorageHandler after version 2.2, which can query Druid data through Hive SQL, and Kwai is also using it internally, but it needs to do some modification work, such as solving time zone problems, Druid data source dimension and index sensitivity, and realizing default. Limit, default time range selection and other functions.

This is the platform architecture diagram of the Kwai OLAP platform. The middle part is Druid's own components, and the data are imported and exported from the Hive warehouse in real time through Kafka. In addition, we also have a complete set of Metric system, probe system, Druid data source management system, etc.

In the scenario of trillions or even tens of trillions of data scale, OLAP platform also faces many challenges in the use process. For example, how to make the query faster, how to make the resource utilization more efficient, how to make the data management and access more convenient, and how to make the cluster platform more stable, we have made improvements and Optimization for these problems.

First of all, in terms of stability, we have made a variety of resource isolation deployment schemes. In the access layer, broker's high availability and load balancing are realized through agents.

In the historical data storage layer, the data is divided into two levels. One is the separation of hot and cold data. The hot data is stored on the SSD machine. When the hot data becomes cold data, it will automatically migrate to the HDD machine. Because most queries query the latest data, the acceleration effect of using SSD is very obvious. Considering the high cost of SSD, when setting the replica of hot data, you can put one replica on SSD and the other on HDD machine, and then set the weight of SSD replica. Most of the requests can still fall on SSD machine. When the SSD machine fails, the request will be sent to the HDD, which can save a lot of costs.

Apart from the consideration of separation of cool and hot data, some of them have higher requirements for query stability. Kwai has also isolated special business through Tier configuration. Special business data source index data are stored on dedicated Historical machines. In this way, when some large queries may cause historical memory GC or the system IO supports high load scenarios, the query performance is still not affected.

In the context of large-scale data scenarios, we have also done a lot of optimization to accelerate query performance. The first is materialized view. There are two levels of materialized view, one is dimension level materialization, the other is timing level materialization.

What is materialized view? Suppose that the original dimension of a data source has ten columns. Through analysis of query requests, it is found that three dimensions in group1 and three dimensions in group2 often appear at the same time, and the remaining four dimensions may have very low query frequency. What's more, there is a high base dimension in the dimension columns that are not queried, that is, the dimension with a large value of count District, such as user ID. In this case, there will be a big query performance problem, because the high base dimension will affect the data pre aggregation effect of Druid, and the poor aggregation effect will lead to the larger size of index file, which will lead to the larger read IO during query and the poorer overall query performance. For this case optimization, we will select group1 and group2 This dimension creates a pre aggregate index respectively, and then when a new query request is received, the system will first analyze the dimension set to be queried in the request. If the dimension set to be queried is a subset of the newly created special index dimension set, you can directly access the newly created index without accessing the original aggregate index, and the query performance will be compared The obvious improvement is not only a design idea of materialized view, but also a typical scheme of exchanging space for time.

Temporal materialized view: in addition to the query scenario just mentioned, there is also a query case, which Druid can't satisfy. For example, for a long-span time range query, suppose that the aggregation strength of a data source is at the level of minutes, but it is troublesome to query the data of the last three months, because it is necessary to scan all the index files of the last three months at the level of minutes, and then do the aggregation calculation again.

To solve this problem, we need to create another hour level or even level physical index on the minute level index of the data source. In this case, the aggregation effect will be better and the overall size of the index will be smaller. When a new query request is received, if the query granularity to be counted is day level or higher, the query request will be automatically routed to the day level physical index, so the query performance will also be improved significantly.

Next, we will discuss the performance optimization of Druid metadata storage system. Since the platform went online, we have accumulated about millions of segment files. The query of these millions of segment meta information, or the query of MySQL segments table also encounters performance bottlenecks.

The first is the optimization of interaction between overlord and mysql. When overlord publishes a new segment file, it will query the segments table multiple times, and the monitoring finds that there will be a large number of slow queries. The solution is simple, just add index to the segments table. Compared with the optimized MySQL query performance, it can be reduced from more than 10 seconds to 1 second, with an increase of more than 10 times.

In addition, the interaction between coordinator and MySQL is optimized. Coordinator will periodically scan the segments table in full, and each scan will take a long time. First of all, full scan is completely unnecessary. We transform it into incremental scan. The whole scan time is reduced from 1.7 minutes to about 40 seconds. Then we created MySQL index for incremental scanning SQL. The scanning time can be reduced to 30 milliseconds, and the overall performance can be improved by thousands.

The next step is to optimize the loading process of segment files. By default, the coordinator scan segment matching rule process is implemented serially. We accelerate the parallelization of this process and improve some details. The coordination time of millions of segment files in a cluster has been reduced from 3 minutes to 30 seconds. After the optimization of Druid metadata system through the above points, there is no performance bottleneck at present.

First of all, each Kafka indexing task will correspond to the service of a supervisor. The task count of the supervisor is a fixed value. When the user sets the task count, the data delay may occur because the lag of reading Kafka is too large. If the set task count is too large, the resource will be wasted. In addition, when users create an indexing task, it is difficult to estimate how much task count should be appropriate. Our optimization plan is to let the supervisor adjust the task count automatically according to the current Kafka delay consumption, so that the data delay will not occur in the peak period of business, and the resources can be returned to the cluster in the low peak period of data, and the utilization rate of the whole cluster has been significantly improved.

The other problem is the resource allocation of indexing task in middle manager. Druid allocates a fixed number of slots for each middleer manager. However, compared with Kafka indexing task, Hadoop indexing task is actually a Hadoop client only responsible for submitting a task, which does not occupy much resources. In this way, there will be some waste of resources. The optimization idea for this problem is to change the task scheduling configuration of middleer manager from slot number to memory size. We will treat different types of tasks differently. For Kafka task and Hadoop task, the memory size will be different by default. Of course, users can specify their own tasks when submitting tasks Memory size, we will do some maximum limit to prevent malicious submission.

In addition, timely comparison of segment files will be beneficial to query performance acceleration and save storage space. At present, Druid will submit a special comparison task to scan segment files serially for merging, which results in poor performance. We have made a parallel scheme for this. The idea is to submit a Hadoop task, scan the segment information on the Hadoop cluster in parallel, and then do a comparison. The performance improvement is still very obvious.

We have also done a lot of work in terms of platform ease of use. During the operation of the platform, there will be a problem. Every day, there are many data sources to access. In the early stage of the platform online, the administrator can participate in the completion, but when the business grows rapidly, the workload is very large. After the data source access, there will be many needs to modify the dimension and indicator definition of the data source, which need to be solved systematically.

In addition, most of the time, users The platform or its own data understanding is not deep enough, or the business analysis demand scenario is not clear enough. When accessing the data source, a large number of dimension and indicator information are often imported, which brings a hidden danger: the more dimensions are, the worse the aggregation effect will be, and even some high-level dimensions will seriously affect the data aggregation effect and query performance.

To solve these problems, we designed two sets of tools: Druid data source management system and Druid probe system.

The management system of data source is a web management system. Users can access, view and manage data source on this system. The information they can view includes dimension and index information, Kafka consumption rate, Kafka consumption lag, etc. The figure above shows the indexing task list information of the data source management system. The system is equipped with permission management function. Only the person in charge of the data source can modify the configuration information such as dimensions and indicators of the data source.

The figure above is the indexing task details page. In addition to some basic information, you can also see the rate of Kafka consumption. Users can independently check the online problems of their own data sources.

This is the data source new and edit page. The process of creating Kafka data source is very convenient for users. Kafka information is directly extracted from Kafka's management system. Users do not need to fill in manually and click directly. For the format of time stamp column and time stamp column, the system will automatically extract the data of user Kafka for filling. If the format of time stamp column is wrong, it can also be automatically corrected. For the dimension and indicator system, data analysis is also done in advance to provide suggestion. Users only need to click to select.

The list information of the data source shown in this figure can clearly see the data volume of this data source, the average size of segment file, dimension and indicator information on the list. In addition, if the data source is imported through an offline task, it can automatically associate the name of the offline task, so as to quickly locate its own scheduled import task.

The Druid probe system mainly solves the following problems:

First, the analysis of data source query heat. The probe system will rank all the data sources of Druid in terms of overall query heat, so that the administrator can know which data sources are the key customers of the query, and make targeted "care". In addition, some cold data sources or zombie data sources without query requests can be found, and users can be notified to go offline to avoid occupying cluster resources.

For a single data source, the probe system can also analyze the internal dimensions and indicators of the data source to find out which dimensions are frequently queried and which dimensions and indicators are not frequently queried. In particular, it can find some dimensions that are both cold dimensions and high basic dimensions. This kind of case It will seriously affect the query performance, and users should be informed in time for optimization.

Let's talk about the data visualization of OLAP platform. A powerful visualization tool is an essential component of OLAP platform. We have adopted the open source superset scheme. Superset is an open-source, deep integrated, interactive, efficient, data analysis and visualization platform for airbnb. It has powerful functions and supports a variety of data visualization charts.

Up to now, our superset has accumulated tens of thousands of charts, and users have encountered many problems in the process of using superset. For these problems, we have also made a lot of modifications to superset. Including data synchronization, authority management, alarm function, some interactive improvement of product design, etc.

Introduce several key improvement points, such as the support for multiple time shifts. The so-called time shift is to draw a comparison between the current value and the previous day's year-on-year and month on month indicators in a single chart at the same time. Here is the comparison between the current day and the previous day, as well as the same day last week. Users can add as many indicators of other days to the same chart. In addition to this kind of timeline chart, we also do a lot of time shift support for other charts.

Here is the function of multiple charts in the same Kanban of superset, which can refresh in linkage when sliding the mouse window. Select the time range of one chart, and refresh other charts in association. This is more practical when performing multi table association analysis.

Here is the design of superset alarm function. Many of the company's monitoring data rely on druid and superset for data analysis, and the alarm demand is also very strong. We refer to the design of alarm function of grafana and realize similar functions on superset. Users can customize some alarm dimensions, indicators, inspection cycle, alarm level, etc. on the platform.

In terms of performance improvement, we have optimized the materialized view and metadata interaction in terms of timing and dimension. At the resource management level, it realizes the automatic scaling of supervisor indexing task, the fine-grained resource allocation of middle manager and the parallel comparison. At the stability level, the isolation deployment of broker and historical is designed. At the level of platform usability, the management system of data source, data probe system and superset data visualization platform are researched.

Finally, we will share some work plans for the future Kwai OLAP platform. First, we will introduce some new OLAP technologies, such as Clickhouse. Second, we are considering the integration of OLAP and ad hoc, as well as routine reports. We hope that OLAP technology can also play a greater role in offline data analysis. Third, from the inflow of data to the visualization of data, it provides one-stop service, reducing the use threshold of technical and non-technical personnel. Fourth, we hope that the platform can evolve from technology output to product and service orientation.

Author introduction

Li Yuance, Kwai Tai, a big data platform architect and head of data query engine team. Responsible for the research and development of SQL Engine, OLAP engine and multi-dimensional visualization platform as well as the application in the company. He worked for Qihu 360 and is the author of the open source project xlearning. The main research fields include distributed computing, OLAP engine, SQL on Hadoop, AI on Hadoop, etc.