Thank you! Your explanations are helping me solve many puzzles in understanding Apache Spark. I truly appreciate the work you're doing for the Data community. :)
Hi Afaque, in 1st case, when you have configured 1500 shuffle partitions, but initially you have said 1000 cores available in a cluster, and you have also mentioned about one partition per core. Then from where we got rest 500 partitions? Another doubt, do we need to configure no of cores consumed by a job according to the shuffle partitions we provide? Also, please explain a case where we don't have enough cores available in the cluster.
Hey @arunkindra832, in scenario 1, referring to the diagram, there are 20 cores in the cluster (5 executors * 4 core each). 1500 shuffle partitions are going to be processed by a 20 core cluster. Assuming each core is going to take the same amount of time to process a shuffle partition and the distribution of shuffle partitions is uniform, there's approximately going to be 1500/20 = 75 rounds. In 1 round, 20 shuffle partitions are going to be processed.
@afaque how can I calculate total shuffle data size without executing the code before hand. Also size in disk is not same as in memory as memory data is uncompressed.
Hey @atifiu, calculating total shuffle data size without executing the code can be challenging due to dynamic nature of Spark's execution. There are several things which would come into picture for example: data distribution, skew, nature of transformations (wide vs narrow) depending on which you may / may not get an accurate shuffle data size;
@Afaque : Very good video. So in real life for varying work load size ( shuffle write size 50mb - 300GB) you have to change the shuffle partition size programmatically ? How do you figure out the shuffle write in the code which find in the spark UI? Is there any solution?
Thanks for the kind works @tsaha100. I don't think there's a clear way to estimate the shuffle write statically which is shown on the Spark UI using code , because of dynamic nature of Spark's execution. If you would like to log the Shuffle write metrics when your task completes, you could try attaching the SparkListener to your SparkContext and override onTaskEnd method to capture shuffle write metrics, but I believe it's just easier to run and refer to the Spark UI. You can refer: books.japila.pl/apache-spark-internals/SparkListenerInterface/#ontaskgettingresult
Currently watching your playlist one by one. Great content. Very detailed explanation. In the first scenario you had 5 executors and with 4 cores each. If you have 1500 shuffle partition how they are going to be accommodated.
Hey @mirli33, glad you're finding the playlist helpful. Regarding the question, when you have 1500 shuffle partitions and 20 cores (5 x 4 = 20). 1 core takes up 1 partition, so having a total of 20 cores, first round will process 20 partitions. There will be a total of 1500/20 = 75 rounds of processing
Great explanation, Thank you, But how would we know how to configure exact (at least best) "spark.sql.shuffle.partitions" at run time? Because each run/day the volume of the data is going to be changed. So, how do we determine the data volume at run time to set the shuffle.partitions number?
Hey @mahendranarayana1744, good question! There are 2 ways to solve this problem. With the new Spark versions (>= 3.0.0), you can let AQE handle the sizing which would according split partitions if the size per partition is huge or combine (coalesce) partitions if the sizes per partitions is small Another approach to solve the problem is estimating the size of the dataset at runtime and then dividing the size by the optimal partition size (100-200 MB) so that each partition has an optimal size. I've referenced about how to estimate the size of datasets in the "Bucketing" video here: ua-cam.com/video/1kWl6d1yeKA/v-deo.html You can refer to the exact algorithm on sizing datasets here: umbertogriffo.gitbook.io/apache-spark-best-practices-and-tuning/parallelism/sparksqlshufflepartitions_draft
@@afaqueahmad7117 No, basically when we have multiple layers under single session then at that time stack memory getting full so to break it we have to make sure we are using one session per layer. e.g- suppose we have 3 layers (internal, external, combined) and if you run these in single session then it will throw stackoverflow error at any place whenever its stack get overflow. We tried to increase stack as well but that was not working. Hence at the last we come up with approach like will run one layer and then close session likewise
@afaque shuffle partition will consist of both the shuffled data (keys that were not originally present in the executor and were shuffled to the partition) and the non-shuffled data (keys that were already present in the executor and were not shuffled). So, the size of the shuffle partition cannot be directly calculated from the shuffle write data alone,as it also depends on the distribution of the data across the partitions ?
in scenario 1, how exactly is reducing the size of partitions beneficial ? Correct me if I'm wrong, in case we let a core process 1.5 GB, most of the data will spill to disk for computation which will increase IO and hence increase time taken for completion. However, in case we reduce the partition size, we increase the number of partitions as a result which again would increase the time taken for job completion.
Hey @akshaybaura, @kartikthakur6461, you're on the right track. Reducing each partition from 1.5g to 200mb does increase the number of partitions, but it is beneficial for the following important reasons: 1. Reduced memory pressure: when a core processes a large partition (1.5g), it's more likely to run out of memory and start spilling to disk. This spill-over is going to cause increased IO operations which in turn is significantly going to slow down processing. However, I would still emphasize on the fact that spill would depend on the memory per core. If memory per core is > 1.5g, spills won't happen but processing is going to be significantly slow. 2. Better resource utilisation: increasing the number of partitions allows better distribution of workload across the cores. 3. Balanced workload: Smaller partitions help in achieving a more balanced workload across cores. The importance is in ensuring that each core is given to process a "manageable" amount of data. However, the goal is to strike a balance - partition sizes should be small enough to avoid excessive memory usage and I/O spillover but large enough to ensure that the overhead of managing many partitions doesn’t outweigh the benefits.
@@afaqueahmad7117 So enabling the AQE would be beneficial right, it will automatically decide the no. of shuffle partitions to increase/decrease to handle an adequate amount of data to be processed for each partition according to the cluster resources.
Thanks a ton for the insightful videos. Really appreciate your efforts. However , I have a question regarding the second case where the data is much smaller. Just because we have idle sitting cores, should we leverage them ? Because, driver program is going to have a run to gather the results processed in each partition, which is going to add to the network delays. But just use one core/partition and complete the job. Also, for small volumes, I think Spark might be a over kill :) . At the same time I have a question - during aggregations there can be a skew in data as well, right ? I mean if we take the same store and sales example one store has huge volume of sales records while other do not. how do we optimize such skews. Looking forward for your answers.
Hey @ajaykiranchundi9979, appreciate the kind words and good questions #1. The goal in the example was to emphasize maximum cluster utilization, but, you raise a valid point. In a case where using idle cores just for the sake of it leads to more shuffles or network delays because data is distributed more than it should be, is a bad idea and also could lead to small file problems. So, I agree w/ you, if your job has tons of data but being operated on a few cores while others are idle, it's ideal to split out and use the other idle cores. On the other hand if the data itself is too small, it's indeed a bottleneck to split, distribute, shuffle and gather the data. #2. I've 3 videos specifically on Skews and how you can fix them using AQE, Broadcast Joins, Salting. You can have a look here: ua-cam.com/play/PLWAuYt0wgRcLCtWzUxNg4BjnYlCZNEVth.html Hope that clarifies :)
Thank you for explaining in detail.You are the best guy around. Can you also please explain me if there is a way to dynamically update the shuffle partition with the help of dynamic calculations of size and no. of cores in the cluster(if in case the cluster is altered in future). Thanks in advance.
@tandaibhanukiran4828 I believe it's challenging to be able to dynamically configure shuffle partitions only knowing the size of your data and cluster configuration. The most important input is the "Shuffle Write". Estimating shuffle write is not very clear-cut as it depends on several factors (skew, transformation code complexity i.e. joins, aggregations, dynamic execution plans etc..) If you have historical data, or similar jobs (using same/similar) datasets with similar operations i.e. joins aggregations, you could use those "Shuffle Partition" numbers and apply the logic (as demonstrated in the scenarios) to dynamically get the number of shuffle partitions. However, I would stress to use this approach with caution.
@Afaque thank you for making these videos. Very helpful. I have questions how do we estimate the data size? We run our batches/jobs on spark and each batches could be processing varying size of data. Some batches could be dealing with 300Gb and some could be 300Mb. How do we calculate optimal number of shuffle partitions?
Thanks for the video! I also have a question: when I execute complex query, there will be multiple stage with different shuffle write sizes, which do I have to take in consideration for the computation of the optimal number of shuffle partitions?
Thanks for the content, really appreciate it. My understanding is AQE take care of Shuffle Partition Optimization and we don't need to manually intervene (starting spark 3) to optimize shuffle partitions. Could you clarify this please?
Hey man, awesome series so far. I noticed in your videos that you share your mac screen but use an apple pencil on your ipad to annotate? Can you describe that setup on how you’re able to annotate on your Jupyter Notebook (presented on mac) but from an ipad instead? Thanks in advance appreciate it
Thank you! I use Ecamm Live so adding both iPad and Mac as a screen helps me navigate easily between both. For the ones, where I’ve annotated on Notebooks, I’ve used Zoom to share my screen and annotate on Zoom.
Consider a scenario where my first data shuffle size is 100gb then giving more shuffle partitions make sense now in the last shuffle data size is drastically reduced to 10gb according to calculations how would be to give shuffle partitions giving 1500 would benefit for the first shuffle and not for the last shuffle. How do one approach this scenario
Sir still waiting for a dedicated video for driver and executor omm in very detailed. Question on if the file is 100 gb, and can we sort it? If yes will there be data spill, basically a interview quesion for 8+ year exp
Hey @AshishStudyDE, hopefully more content coming soon; The answer to the question would depend a lot on the resource configuration i.e. no. of cores, memory, execution memory per core. If the "execution memory" per core > partition size (assuming 100gb file is divided into partitions), there shouldn't be problems processing the files If you would like to know more about "Execution memory", please refer to the Memory Management video here: ua-cam.com/video/sXL1qgrPysg/v-deo.html
Thanks for the explanantion, But Isn't the parameter(spark.sql.shuffle.partitions) is no way dependent on the cradinality of the group by/ join column ?
can you please cover bucketing handson in adb(handson with file view). In your last video it is working in your IDE but not in databricks. (delta bucketing not allowed)
I have learned somewhere that the max partition size can only be 128mb in spark. Isnt that contradict to what you mentined when explaining about 300GB example? Just asking to Correct myself if wrong.
Hey @JustDinesh-1934, good question! Anything between 100-200 MB is a reasonable number for optimal performance, but it can depend on several factors, specially your cluster, number of cores, total memory and memory available per core; As long as the "execution memory" per core > partition size, things should work just fine :)
i have a question in this. Let's say that the data volume that i am processing varies on daily basis i..,e someday it can be 50gb someday it can be 10gb. keeping in mind the 200mb per shuffle partition limit the num of partition for optimum partition should change on each run in that case. But it;s not practically possible to change the code every time to have a proper shuffle partition. How should this scenario be handled ? i read about a parameter sql.files.maxPartitionBytes which is defaulted to 128mb. Should i change this to 200 and let the number of shuffle partition be calculated automatically ? In that case will the value under sql.shuffle.partitions be ignored ?
I think here you can use Kafka also with conjugation with Pyspark. So if the data is more increase the partition dynamically in Kafka and if data is less decrease the partition in kafka and then com to an optimal number or keep partition same in kafka so that each partition in kafka = partition in pyspark and then it can process the data accordingly based on the core
in scenario 1 : size per sp should be 15 GB, ur calculation is wrong(1.5 gb), according to you each executor will do 1.5GB*4=6 GB processing. Total executor is 5 so 5*6=30 instead of 300 gb?
Nice explanation! But just pointing out that the second scenario practically doesn't make sense for a 10x improvement. If your data size is 50mb, rather than trying to utilize all cores think of using just one core so that shuffle is not required all together.
You're right, the second scenario is a bit contrived; The goal was to highlight situations where fewer cores are under too much pressure and there may be room for utilising all resources, while reducing pressure on some of the cores; thanks for pointing it out!
I think its a bit dumb for spark to keep this value static... why not rather have a "target shuffle size(mb/gb)" config in spark. I wish the spark planner was a bit more sophisticated.
You could get a similar effect by turning on AQE and setting "spark.sql.adaptive.advisoryPartitionSizeInBytes" to your desired size. Documentation here: spark.apache.org/docs/latest/sql-performance-tuning.html
Really good explanation Afaque. Thank you for making such in depth videos.😊
Thank you! Your explanations are helping me solve many puzzles in understanding Apache Spark. I truly appreciate the work you're doing for the Data community. :)
Glad to hear that it's been able to clear concepts for you :)
Commenting so that you continue making such informative videos. Great work!
Your channel is so underrated, Please dont stop
Topics which scares a lot ..made it easier..Afaque you are doing a great job please keep making vedios ...Thanks a lot for the awesome content.
Appreciate it man. Those words mean a lot :)
Have watched all your videos. Seriously Gold content. Requesting not to stop making videos.
Thank you @dileepkumar-nd1fo for the kind words, it means a lot to me :)
Learning from masters❤❤❤
Wow! Thank you Afaque, this is incredible content and very helpful!
Appreciate it @anandchandrashekhar2933, thank you!
Very nice explanation! Thank you for making this video.
Thanks @purnimasharma9734, appreciate it :)
Absolutely Gem..❤
Glad you liked it :)
Thank you so much. Keep up the good work. Looking forward for more such videos to learn Spark
Thank you @Momofrayyu-pw9ki, really appreciate it :)
Great share sir, the optimal shuffle size..... Please bring more scenario basef Questions as well as best production based practises!!!!
Great work brother......... Thank you for explaining concepts in detail ❤❤
Appreciate it, @sureshpatchigolla3438 :)
Difficult subject explained with ease. 🙏
Glad you found it easy to understand :)
Industry level content.
thanks a bunch for the great content again....
Great Explanation!
Thanks for video, very informative
I don't think this kind of videos are available on Spark anywhere else. Great work Afaque!
Appreciate it @rgv5966, thank you!
Hi Afaque, in 1st case, when you have configured 1500 shuffle partitions, but initially you have said 1000 cores available in a cluster, and you have also mentioned about one partition per core. Then from where we got rest 500 partitions?
Another doubt, do we need to configure no of cores consumed by a job according to the shuffle partitions we provide?
Also, please explain a case where we don't have enough cores available in the cluster.
Hey @arunkindra832, in scenario 1, referring to the diagram, there are 20 cores in the cluster (5 executors * 4 core each). 1500 shuffle partitions are going to be processed by a 20 core cluster. Assuming each core is going to take the same amount of time to process a shuffle partition and the distribution of shuffle partitions is uniform, there's approximately going to be 1500/20 = 75 rounds. In 1 round, 20 shuffle partitions are going to be processed.
perfect video
@afaque how can I calculate total shuffle data size without executing the code before hand. Also size in disk is not same as in memory as memory data is uncompressed.
Hey @atifiu, calculating total shuffle data size without executing the code can be challenging due to dynamic nature of Spark's execution. There are several things which would come into picture for example: data distribution, skew, nature of transformations (wide vs narrow) depending on which you may / may not get an accurate shuffle data size;
Clearly explained!!
Appreciate it :)
@Afaque : Very good video. So in real life for varying work load size ( shuffle write size 50mb - 300GB) you have to change the shuffle partition size programmatically ? How do you figure out the shuffle write in the code which find in the spark UI? Is there any solution?
Thanks for the kind works @tsaha100. I don't think there's a clear way to estimate the shuffle write statically which is shown on the Spark UI using code , because of dynamic nature of Spark's execution.
If you would like to log the Shuffle write metrics when your task completes, you could try attaching the SparkListener to your SparkContext and override onTaskEnd method to capture shuffle write metrics, but I believe it's just easier to run and refer to the Spark UI.
You can refer: books.japila.pl/apache-spark-internals/SparkListenerInterface/#ontaskgettingresult
Thank you it is very useful
Currently watching your playlist one by one. Great content. Very detailed explanation. In the first scenario you had 5 executors and with 4 cores each. If you have 1500 shuffle partition how they are going to be accommodated.
Hey @mirli33, glad you're finding the playlist helpful. Regarding the question, when you have 1500 shuffle partitions and 20 cores (5 x 4 = 20). 1 core takes up 1 partition, so having a total of 20 cores, first round will process 20 partitions. There will be a total of 1500/20 = 75 rounds of processing
i like very much of your videos, it's insightful. can you please make series/videos on Spark interview oriented questions. Thanks in advance
Great explanation, Thank you,
But how would we know how to configure exact (at least best) "spark.sql.shuffle.partitions" at run time? Because each run/day the volume of the data is going to be changed. So, how do we determine the data volume at run time to set the shuffle.partitions number?
Hey @mahendranarayana1744, good question! There are 2 ways to solve this problem. With the new Spark versions (>= 3.0.0), you can let AQE handle the sizing which would according split partitions if the size per partition is huge or combine (coalesce) partitions if the sizes per partitions is small
Another approach to solve the problem is estimating the size of the dataset at runtime and then dividing the size by the optimal partition size (100-200 MB) so that each partition has an optimal size. I've referenced about how to estimate the size of datasets in the "Bucketing" video here: ua-cam.com/video/1kWl6d1yeKA/v-deo.html
You can refer to the exact algorithm on sizing datasets here: umbertogriffo.gitbook.io/apache-spark-best-practices-and-tuning/parallelism/sparksqlshufflepartitions_draft
Nice content Afaque !
Appreciate it @puneetgupta003, thank you!
could you please make video on stack overflow like what are scenario when it can occur and how to fix it
Are you referring to OOM (out of memory errors) - Driver & Executor?
@@afaqueahmad7117 No, basically when we have multiple layers under single session then at that time stack memory getting full so to break it we have to make sure we are using one session per layer. e.g- suppose we have 3 layers (internal, external, combined) and if you run these in single session then it will throw stackoverflow error at any place whenever its stack get overflow. We tried to increase stack as well but that was not working. Hence at the last we come up with approach like will run one layer and then close session likewise
@afaque shuffle partition will consist of both the shuffled data (keys that were not originally present in the executor and were shuffled to the partition) and the non-shuffled data (keys that were already present in the executor and were not shuffled). So, the size of the shuffle partition cannot be directly calculated from the shuffle write data alone,as it also depends on the distribution of the data across the partitions ?
in scenario 1, how exactly is reducing the size of partitions beneficial ?
Correct me if I'm wrong, in case we let a core process 1.5 GB, most of the data will spill to disk for computation which will increase IO and hence increase time taken for completion. However, in case we reduce the partition size, we increase the number of partitions as a result which again would increase the time taken for job completion.
I am assuming Disk computation will take much longer to complete.
Hey @akshaybaura, @kartikthakur6461, you're on the right track. Reducing each partition from 1.5g to 200mb does increase the number of partitions, but it is beneficial for the following important reasons:
1. Reduced memory pressure: when a core processes a large partition (1.5g), it's more likely to run out of memory and start spilling to disk. This spill-over is going to cause increased IO operations which in turn is significantly going to slow down processing. However, I would still emphasize on the fact that spill would depend on the memory per core. If memory per core is > 1.5g, spills won't happen but processing is going to be significantly slow.
2. Better resource utilisation: increasing the number of partitions allows better distribution of workload across the cores.
3. Balanced workload: Smaller partitions help in achieving a more balanced workload across cores. The importance is in ensuring that each core is given to process a "manageable" amount of data.
However, the goal is to strike a balance - partition sizes should be small enough to avoid excessive memory usage and I/O spillover but large enough to ensure that the overhead of managing many partitions doesn’t outweigh the benefits.
@@afaqueahmad7117 So enabling the AQE would be beneficial right, it will automatically decide the no. of shuffle partitions to increase/decrease to handle an adequate amount of data to be processed for each partition according to the cluster resources.
Thanks a ton for the insightful videos. Really appreciate your efforts.
However , I have a question regarding the second case where the data is much smaller. Just because we have idle sitting cores, should we leverage them ? Because, driver program is going to have a run to gather the results processed in each partition, which is going to add to the network delays. But just use one core/partition and complete the job. Also, for small volumes, I think Spark might be a over kill :) .
At the same time I have a question - during aggregations there can be a skew in data as well, right ? I mean if we take the same store and sales example one store has huge volume of sales records while other do not. how do we optimize such skews.
Looking forward for your answers.
Hey @ajaykiranchundi9979, appreciate the kind words and good questions
#1. The goal in the example was to emphasize maximum cluster utilization, but, you raise a valid point. In a case where using idle cores just for the sake of it leads to more shuffles or network delays because data is distributed more than it should be, is a bad idea and also could lead to small file problems. So, I agree w/ you, if your job has tons of data but being operated on a few cores while others are idle, it's ideal to split out and use the other idle cores. On the other hand if the data itself is too small, it's indeed a bottleneck to split, distribute, shuffle and gather the data.
#2. I've 3 videos specifically on Skews and how you can fix them using AQE, Broadcast Joins, Salting. You can have a look here: ua-cam.com/play/PLWAuYt0wgRcLCtWzUxNg4BjnYlCZNEVth.html
Hope that clarifies :)
Nice explanation -)
Hi Afaque if you can share the slides as notes that would be great..
Hey @hemant9253, I was writing and recording on the Nebo App on iPad. It's gone now. You can take screenshots and save (if that helps) :)
Lovely!
Thank you for explaining in detail.You are the best guy around. Can you also please explain me if there is a way to dynamically update the shuffle partition with the help of dynamic calculations of size and no. of cores in the cluster(if in case the cluster is altered in future).
Thanks in advance.
@tandaibhanukiran4828 I believe it's challenging to be able to dynamically configure shuffle partitions only knowing the size of your data and cluster configuration. The most important input is the "Shuffle Write". Estimating shuffle write is not very clear-cut as it depends on several factors (skew, transformation code complexity i.e. joins, aggregations, dynamic execution plans etc..)
If you have historical data, or similar jobs (using same/similar) datasets with similar operations i.e. joins aggregations, you could use those "Shuffle Partition" numbers and apply the logic (as demonstrated in the scenarios) to dynamically get the number of shuffle partitions.
However, I would stress to use this approach with caution.
Thank you very much.@@afaqueahmad7117
So 12 will be noof shuffle partition will going to set in data per shuffle partition is very less in 2nd solution.
@Afaque thank you for making these videos. Very helpful. I have questions how do we estimate the data size? We run our batches/jobs on spark and each batches could be processing varying size of data. Some batches could be dealing with 300Gb and some could be 300Mb. How do we calculate optimal number of shuffle partitions?
Thanks for the video! I also have a question: when I execute complex query, there will be multiple stage with different shuffle write sizes, which do I have to take in consideration for the computation of the optimal number of shuffle partitions?
Hey @leonardopetraglia6040, it's best to start with the largest shuffle write across all stages
Thanks for the content, really appreciate it. My understanding is AQE take care of Shuffle Partition Optimization and we don't need to manually intervene (starting spark 3) to optimize shuffle partitions. Could you clarify this please?
Hey man, awesome series so far. I noticed in your videos that you share your mac screen but use an apple pencil on your ipad to annotate? Can you describe that setup on how you’re able to annotate on your Jupyter Notebook (presented on mac) but from an ipad instead? Thanks in advance appreciate it
Thank you! I use Ecamm Live so adding both iPad and Mac as a screen helps me navigate easily between both.
For the ones, where I’ve annotated on Notebooks, I’ve used Zoom to share my screen and annotate on Zoom.
Consider a scenario where my first data shuffle size is 100gb then giving more shuffle partitions make sense now in the last shuffle data size is drastically reduced to 10gb according to calculations how would be to give shuffle partitions giving 1500 would benefit for the first shuffle and not for the last shuffle. How do one approach this scenario
Sir still waiting for a dedicated video for driver and executor omm in very detailed. Question on if the file is 100 gb, and can we sort it? If yes will there be data spill, basically a interview quesion for 8+ year exp
Hey @AshishStudyDE, hopefully more content coming soon; The answer to the question would depend a lot on the resource configuration i.e. no. of cores, memory, execution memory per core. If the "execution memory" per core > partition size (assuming 100gb file is divided into partitions), there shouldn't be problems processing the files
If you would like to know more about "Execution memory", please refer to the Memory Management video here: ua-cam.com/video/sXL1qgrPysg/v-deo.html
Thanks for the explanantion, But Isn't the parameter(spark.sql.shuffle.partitions) is no way dependent on the cradinality of the group by/ join column ?
can you please cover bucketing handson in adb(handson with file view). In your last video it is working in your IDE but not in databricks. (delta bucketing not allowed)
I have learned somewhere that the max partition size can only be 128mb in spark. Isnt that contradict to what you mentined when explaining about 300GB example? Just asking to Correct myself if wrong.
Hey @JustDinesh-1934, good question! Anything between 100-200 MB is a reasonable number for optimal performance, but it can depend on several factors, specially your cluster, number of cores, total memory and memory available per core; As long as the "execution memory" per core > partition size, things should work just fine :)
Can you tell how to resolve Python worker exited unexpectedly (crashed)
Good but would have been more appropriate to cover the join , how to parition data before join to make sure join works properly.
wow
i have a question in this. Let's say that the data volume that i am processing varies on daily basis i..,e someday it can be 50gb someday it can be 10gb. keeping in mind the 200mb per shuffle partition limit the num of partition for optimum partition should change on each run in that case. But it;s not practically possible to change the code every time to have a proper shuffle partition. How should this scenario be handled ? i read about a parameter sql.files.maxPartitionBytes which is defaulted to 128mb. Should i change this to 200 and let the number of shuffle partition be calculated automatically ? In that case will the value under sql.shuffle.partitions be ignored ?
I think here you can use Kafka also with conjugation with Pyspark.
So if the data is more increase the partition dynamically in Kafka and if data is less decrease the partition in kafka and then com to an optimal number or keep partition same in kafka so that each partition in kafka = partition in pyspark and then it can process the data accordingly based on the core
in scenario 1 :
size per sp should be 15 GB, ur calculation is wrong(1.5 gb),
according to you each executor will do 1.5GB*4=6 GB processing.
Total executor is 5 so 5*6=30 instead of 300 gb?
thanks again for the topic. i worried that you stopped ... because this deapth knowledge can not even get from colleauges
@user-dx9qw3cl8w More to come! :)
waiting eagerly :)
Nice explanation! But just pointing out that the second scenario practically doesn't make sense for a 10x improvement. If your data size is 50mb, rather than trying to utilize all cores think of using just one core so that shuffle is not required all together.
You're right, the second scenario is a bit contrived; The goal was to highlight situations where fewer cores are under too much pressure and there may be room for utilising all resources, while reducing pressure on some of the cores; thanks for pointing it out!
I think its a bit dumb for spark to keep this value static... why not rather have a "target shuffle size(mb/gb)" config in spark. I wish the spark planner was a bit more sophisticated.
You could get a similar effect by turning on AQE and setting "spark.sql.adaptive.advisoryPartitionSizeInBytes" to your desired size. Documentation here: spark.apache.org/docs/latest/sql-performance-tuning.html
@@afaqueahmad7117 Awesome! Thanks for the advice! Your videos have been really helpful!
In Scnario 2 - Finally, we are distributing 4.2 mb for each core. In this case whats the spark.sql.shuffle.partitions . Is it 12?
Yes @vinothvk2711, it's 12. As explained, this is going to ensure 100% utilisation of the cluster.