In the MapReduce framework, the Shuffle stage is a bridge between Map and Reduce, and the Map stage outputs data to the Reduce stage through the Shuffle process. Since Shuffle involves disk reading and writing and network I/O, the performance of Shuffle directly affects the performance of the entire program. Spark also has a Map stage and a Reduce stage, so Shuffle will also appear.
Spark Shuffle
Spark Shuffle is divided into two types: one is Hash-based Shuffle; the other is Sort-based Shuffle. First introduce their development history, which will help us better understand Shuffle:
Before Spark 1.1, only one Shuffle method was implemented in Spark, that is, Hash-based Shuffle. The Sort-based Shuffle implementation was introduced in Spark 1.1, and after Spark 1.2, the default implementation was changed from Hash-based Shuffle to Sort-based Shuffle, that is, the ShuffleManager used changed from the default hash Modified to sort. In Spark 2.0, the Hash Shuffle method is no longer used.
The reason why Spark provides a Hash-based Shuffle implementation mechanism from the beginning is one of its main purposes to avoid unnecessary sorting. If you think of MapReduce in Hadoop, sorting is a fixed step, and there are many tasks that do not need to be sorted. , MapReduce will also sort it, causing a lot of unnecessary overhead.
In the Hash-based Shuffle implementation, each Mapper stage Task will generate a file for each Reduce stage Task, usually a large number of files (that is, corresponding to M*R intermediate files, where M represents the Mapper stage The number of Tasks, R represents the number of Tasks in the Reduce stage) accompanied by a large number of random disk I/O operations and a large amount of memory overhead.
In order to alleviate the above problems, the ShuffleConsolidate mechanism (that is, the file merging mechanism) was introduced for the Hash-based Shuffle implementation in Spark version 0.8.1, which is a processing mechanism for merging the intermediate files generated by the Mapper side. Through the configuration property spark. shuffie. consolidateFiles=true, reduce the number of files generated in the middle. Through file merging, the generation method of intermediate files can be modified so that each execution unit generates a file for each Task in the Reduce stage.
The execution unit corresponds to: the number of Cores on each Mapper side/the number of Cores allocated to each Task (the default is 1). Finally, the number of files can be changed from M*R to E*C/T*R, where E represents the number of Executors, C represents the number of available Cores, and T represents the number of Cores allocated by Task.
Spark1.1 version introduced Sort Shuffle:
In the implementation of Hash-based Shuffle, the number of generated intermediate result files depends on the number of Tasks in the Reduce phase, that is, the parallelism of the Reduce side. Therefore, the number of files is still uncontrollable and cannot really solve the problem. In order to better solve the problem, the Sort-based Shuffle implementation was introduced in the Spark1.1 version, and after the Spark 1.2 version, the default implementation was also changed from the Hash-based Shuffle to the Sort-based Shuffle implementation. That is, the ShuffleManager used is changed from the default hash to sort.
In the Sort-based Shuffle, the Task in each Mapper stage does not generate a separate file for the Task in each Reduce stage, but writes all of them into a data (Data) file, and generates an index (Index) file at the same time, Reduce Each task in the stage can obtain relevant data through the index file. The immediate benefit of avoiding large files is reduced random disk I/O and memory overhead. The final number of generated files is reduced to 2*M, where M represents the number of tasks in the Mapper stage, and each task in the Mapper stage generates two files (1 data file, 1 index file), and the final number of files There are M data files and M index files. Therefore, the final number of files is 2*M.
Starting from Spark 1.4, a Tungsten-Sort-based Shuffie implementation is also introduced in the Shuffle process. Through the optimization done by the Tungsten project, the performance of Spark in data processing can be greatly improved. (Tungsten translates into Chinese as tungsten wire)
Note: In some specific application scenarios, the performance of the Shuffle mechanism based on Hash will exceed that of the Shuffle mechanism based on Sort.
A picture to understand the iteration history of Spark Shuffle:
Why did Spark finally abandon HashShuffle and use Sorted-Based Shuffle?
We can find the answer from the most fundamental optimization and urgent problems that Spark needs to solve. Spark using HashShuffle generates a large number of files during Shuffle. When the amount of data increases, the amount of files generated is uncontrollable, which seriously restricts the performance and scalability of Spark, so Spark must solve this problem and reduce the number of files generated by ShuffleWriter on the Mapper side, so that Spark can From the scale of hundreds of clusters to the scale that can support thousands or even tens of thousands of clusters in an instant.
But is it perfect to use Sorted-Based Shuffle? The answer is no. Sorted-Based Shuffle also has disadvantages. Instead, its disadvantage is its sorting characteristics. It forces data to be sorted first on the Mapper side, which leads to its sorting speed. a little slow. Fortunately, Tungsten-Sort Shuffle appeared, which improved the sorting algorithm and optimized the speed of sorting. Tungsten-SortShuffle has been incorporated into Sorted-Based Shuffle, and Spark’s engine will automatically identify whether the program needs Sorted-BasedShuffle or Tungsten-Sort Shuffle.
The following is a detailed analysis of the underlying execution principle of each Shuffle:
1. Analysis of Hash Shuffle
The following discussion assumes that each Executor has 1 cpu core.
1. HashShuffleManager
The shuffle write stage is mainly to “divide” the data processed by each task by key so that the next stage can execute shuffle operators (such as reduceByKey) after the calculation of a stage is completed. The so-called “partition” is to execute the hash algorithm on the same key, so that the same key is written into the same disk file, and each disk file belongs to only one task of the downstream stage. Before writing the data to the disk, the data will be written into the memory buffer first, and when the memory buffer is full, it will be overflowed to the disk file.
How many tasks are there in the next stage, and how many disk files are created for each task in the current stage. For example, if the next stage has a total of 100 tasks, then each task of the current stage must create 100 disk files. If the current stage has 50 tasks, there are 10 Executors in total, and each Executor executes 5 tasks, then a total of 500 disk files will be created on each Executor, and 5000 disk files will be created on all Executors. It can be seen that the number of disk files generated by unoptimized shuffle write operations is extremely staggering.
The shuffle read phase is usually what a stage does at the beginning. At this time, each task of this stage needs to pull all the same keys in the calculation results of the previous stage from each node to its own node through the network, and then perform operations such as key aggregation or connection. In the process of shuffle write, the map task creates a disk file for each reduce task in the downstream stage. Therefore, in the process of shuffle read, each reduce task only needs to pull the files belonging to One of your own disk files will do.
The pulling process of shuffle read is aggregation while pulling. Each shuffle read task will have its own buffer buffer, and each time it can only pull data of the same size as the buffer buffer, and then perform aggregation and other operations through a Map in memory. After a batch of data is aggregated, the next batch of data is pulled and put into the buffer buffer for aggregation. And so on, until all the data is finally pulled, and the final result is obtained.
The working principle of HashShuffleManager is shown in the following figure:
2. Optimized HashShuffleManager
In order to optimize HashShuffleManager we can set a parameter: spark. shuffle. consolidateFiles, the default value of this parameter is false, set it to true to enable the optimization mechanism, generally speaking, if we use HashShuffleManager, it is recommended to enable this option.
After the consolidate mechanism is enabled, in the shuffle write process, the task does not create a disk file for each task of the downstream stage. At this time, the concept of shuffleFileGroup will appear. Each shuffleFileGroup will correspond to a batch of disk files. The number of disk files is the same as The number of tasks in the downstream stage is the same. As many CPU cores as there are on an Executor, how many tasks can be executed in parallel. Each task executed in parallel in the first batch will create a shuffleFileGroup and write the data to the corresponding disk file.
When Executor’s cpu core executes a batch of tasks, and then executes the next batch of tasks, the next batch of tasks will reuse the existing shuffleFileGroup, including the disk files in it, that is to say, the task will write the data to into an existing disk file and not into a new disk file. Therefore, the consolidate mechanism allows different tasks to reuse the same batch of disk files, so that the disk files of multiple tasks can be effectively merged to a certain extent, thereby greatly reducing the number of disk files and improving the performance of shuffle write.
Suppose the second stage has 100 tasks, the first stage has 50 tasks, there are still 10 Executors in total (the number of Executor CPUs is 1), and each Executor executes 5 tasks. Then when using the unoptimized HashShuffleManager, each Executor will generate 500 disk files, and all Executors will generate 5000 disk files. However, after optimization at this time, the calculation formula for the number of disk files created by each Executor is: the number of cpu cores * the number of tasks in the next stage, that is, each Executor will only create 100 disk files at this time, All Executors will only create 1000 disk files.
This function has obvious advantages, but why has Spark not set the function as the default option in the implementation based on Hash Shuffle? The official statement is that this function is not yet stable.
The optimized HashShuffleManager works as shown in the figure below:
Advantages and disadvantages of Hash-based Shuffle mechanism
advantage:
Unnecessary sorting overhead can be omitted.
Avoids the memory overhead required for sorting.
shortcoming:
Too many files are produced, which puts pressure on the file system.
The random reading and writing of a large number of small files brings certain disk overhead.
The cache space required for data block writing will also increase accordingly, putting pressure on memory.
2. Analysis of SortShuffle
The operating mechanism of SortShuffleManager is mainly divided into three types:
common operating mechanism;
Bypass operation mechanism, when the number of shuffle read tasks is less than or equal to spark. shuffle. sort. When the value of the bypassMergeThreshold parameter is set (the default is 200), the bypass mechanism will be enabled;
Tungsten Sort operation mechanism, to open this operation mechanism needs to set the configuration item spark. shuffle. manager=tungsten-sort. Turning on this configuration does not guarantee that this operating mechanism will be adopted (explained later).
1. common operating mechanism
In this mode, the data will be first written into a memory data structure. At this time, different data structures may be selected according to different shuffle operators. If it is an aggregation type shuffle operator such as reduceByKey, then the Map data structure will be selected, and while aggregation is performed through Map, it will be written into the memory; if it is a common shuffle operator such as join, then the Array data structure will be selected and written directly into memory. Then, after each piece of data is written into the memory data structure, it will be judged whether a certain critical threshold has been reached. If the critical threshold is reached, it will try to overflow the data in the memory data structure to disk, and then clear the memory data structure.
Before overflowing to the disk file, the existing data in the memory data structure will be sorted according to the key. After sorting, the data will be written to disk files in batches. The default batch number is 10,000, that is to say, the sorted data will be written to disk files in batches of 10,000 pieces of data. Writing to a disk file is accomplished through Java’s BufferedOutputStream. BufferedOutputStream is Java’s buffered output stream. First, the data is buffered in memory, and when the memory buffer is full, it is written to the disk file again, which can reduce the number of disk IOs and improve performance.
When a task writes all data into the memory data structure, multiple disk overflow operations will occur, and multiple temporary files will be generated. Finally, all the previous temporary disk files will be merged. This is the merge process. At this time, the data in all the previous temporary disk files will be read out, and then written into the final disk file in turn. In addition, since a task corresponds to only one disk file, it means that the data prepared by the task for the task of the downstream stage is all in this file, so a separate index file will be written, which identifies the downstream tasks. The start offset and end offset of the data in the file.
Because SortShuffleManager has a disk file merge process, it greatly reduces the number of files. For example, the first stage has 50 tasks, there are 10 Executors in total, each Executor executes 5 tasks, and the second stage has 100 tasks. Since each task has only one disk file in the end, there are only 5 disk files on each Executor at this time, and only 50 disk files on all Executors.
The working principle of the SortShuffleManager of the general operating mechanism is shown in the figure below:
2. bypass operation mechanism
When the number of tasks on the Reducer side is relatively small, the Hash Shuffle-based implementation mechanism is obviously faster than the Sort Shuffle-based implementation mechanism. Therefore, a Hash-style fallback solution is provided based on the Sort Shuffle implementation mechanism, which is the bypass operation mechanism. For the Reducer, the number of tasks is less than the configuration attribute spark. shuffle. sort. When the number of bypassMergeThreshold is set, a Hash-style fallback plan is used.
The trigger conditions of the bypass operation mechanism are as follows:
The number of shuffle map tasks is less than spark. shuffle. sort. The value of the bypassMergeThreshold=200 parameter. Not the shuffle operator of the aggregation class.
At this point, each task will create a temporary disk file for each downstream task, hash the data according to the key, and then write the key into the corresponding disk file according to the hash value of the key. Of course, when writing to a disk file, the memory buffer is also written first, and then overflowed to the disk file after the buffer is full. Finally, all temporary disk files are also merged into one disk file and a single index file is created.
The disk writing mechanism of this process is actually exactly the same as the unoptimized HashShuffleManager, because an astonishing number of disk files are created, but a disk file merge will be done at the end. Therefore, a small number of final disk files also makes the mechanism better in shuffle read performance than the unoptimized HashShuffleManager.
The difference between this mechanism and the ordinary SortShuffleManager operation mechanism is: first, the disk writing mechanism is different; second, no sorting will be performed. In other words, the biggest advantage of enabling this mechanism is that during the shuffle write process, data sorting operations are not required, which saves this part of the performance overhead.
The working principle of the SortShuffleManager of the bypass operation mechanism is shown in the figure below:
3. Tungsten Sort Shuffle Operation Mechanism
Tungsten Sort is an optimization of ordinary Sort. Tungsten Sort will sort, but what is sorted is not the content itself, but the pointer (metadata) of the byte array after the content is serialized, transforming the sorting of data into the sorting of pointer arrays , to achieve direct sorting of serialized binary data. Since the operation is directly based on binary data, there is no serialization and deserialization process in it. The memory consumption is greatly reduced, and correspondingly, the GC overhead will be greatly reduced.
Spark provides configuration properties to select a specific Shuffle implementation mechanism, but it should be noted that although Spark is enabled by default based on the SortShuffle implementation mechanism, in fact, referring to the Shuffle framework kernel part, we can see that the SortShuffle-based implementation Both the mechanism and the implementation mechanism based on Tungsten Sort Shuffle use SortShuffleManager, and the specific implementation mechanism used internally is judged by the two methods provided:
When corresponding to non-Tungsten Sort, use SortShuffleWriter. The shouldBypassMergeSort method judges whether it is necessary to fall back to the Hash-style Shuffle implementation mechanism. When the condition returned by this method is not satisfied, the SortShuffleManager is used. The canUseSerializedShuffle method judges whether it is necessary to adopt the implementation mechanism based on Tungsten Sort Shuffle, and when these two methods return false, that is, when the corresponding conditions are not met, the normal operation mechanism will be automatically adopted.
Therefore, when spark. shuffle. When manager=tungsten-sort, there is no guarantee that the Shuffle implementation mechanism based on Tungsten Sort will be adopted.
To implement the Tungsten Sort Shuffle mechanism, the following conditions need to be met:
Shuffle dependencies do not have aggregate operations or require output to be sorted.
Shuffle’s serializer supports relocation of serialized values (currently only supports KryoSerializer Spark SQL framework custom serializer).
The number of output partitions during Shuffle is less than 16777216.
In fact, there are other restrictions in the process of use. For example, after introducing the memory management model in the form of Page, the length of a single internal record cannot exceed 128 MB (for specific memory models, please refer to the PackedRecordPointer class). In addition, the limitation on the number of partitions is also caused by this memory model.
Therefore, the conditions for using the implementation mechanism based on Tungsten Sort Shuffle are relatively harsh.