Shuffle Partition Spark Optimization: 10x Faster!

Поділитися
Вставка
  • Опубліковано 16 січ 2025

КОМЕНТАРІ • 93

  • @akshayshinde3703
    @akshayshinde3703 Рік тому +1

    Really good explanation Afaque. Thank you for making such in depth videos.😊

  • @kayalvizhithiyagarajan4950
    @kayalvizhithiyagarajan4950 Місяць тому

    Thank you! Your explanations are helping me solve many puzzles in understanding Apache Spark. I truly appreciate the work you're doing for the Data community. :)

    • @afaqueahmad7117
      @afaqueahmad7117  29 днів тому

      Glad to hear that it's been able to clear concepts for you :)

  • @tahiliani22
    @tahiliani22 Рік тому

    Commenting so that you continue making such informative videos. Great work!

  • @narutomaverick
    @narutomaverick 5 місяців тому

    Your channel is so underrated, Please dont stop

  • @sulthanmohiddin5294
    @sulthanmohiddin5294 Місяць тому

    Topics which scares a lot ..made it easier..Afaque you are doing a great job please keep making vedios ...Thanks a lot for the awesome content.

    • @afaqueahmad7117
      @afaqueahmad7117  Місяць тому +1

      Appreciate it man. Those words mean a lot :)

  • @dileepkumar-nd1fo
    @dileepkumar-nd1fo 9 місяців тому

    Have watched all your videos. Seriously Gold content. Requesting not to stop making videos.

    • @afaqueahmad7117
      @afaqueahmad7117  9 місяців тому

      Thank you @dileepkumar-nd1fo for the kind words, it means a lot to me :)

  • @Rafian1924
    @Rafian1924 10 місяців тому

    Learning from masters❤❤❤

  • @anandchandrashekhar2933
    @anandchandrashekhar2933 7 місяців тому

    Wow! Thank you Afaque, this is incredible content and very helpful!

    • @afaqueahmad7117
      @afaqueahmad7117  7 місяців тому

      Appreciate it @anandchandrashekhar2933, thank you!

  • @purnimasharma9734
    @purnimasharma9734 7 місяців тому

    Very nice explanation! Thank you for making this video.

    • @afaqueahmad7117
      @afaqueahmad7117  7 місяців тому

      Thanks @purnimasharma9734, appreciate it :)

  • @starmscloud
    @starmscloud Місяць тому

    Absolutely Gem..❤

  • @Momofrayyudoodle
    @Momofrayyudoodle 8 місяців тому

    Thank you so much. Keep up the good work. Looking forward for more such videos to learn Spark

    • @afaqueahmad7117
      @afaqueahmad7117  8 місяців тому

      Thank you @Momofrayyu-pw9ki, really appreciate it :)

  • @2412_Sujoy_Das
    @2412_Sujoy_Das Рік тому

    Great share sir, the optimal shuffle size..... Please bring more scenario basef Questions as well as best production based practises!!!!

  • @sureshpatchigolla3438
    @sureshpatchigolla3438 9 місяців тому

    Great work brother......... Thank you for explaining concepts in detail ❤❤

    • @afaqueahmad7117
      @afaqueahmad7117  9 місяців тому

      Appreciate it, @sureshpatchigolla3438 :)

  • @RaviSingh-dp6xc
    @RaviSingh-dp6xc 2 місяці тому

    Difficult subject explained with ease. 🙏

  • @abhisheknigam3768
    @abhisheknigam3768 4 місяці тому

    Industry level content.

  • @ashokreddyavutala8684
    @ashokreddyavutala8684 Рік тому

    thanks a bunch for the great content again....

  • @vinothvk2711
    @vinothvk2711 Рік тому

    Great Explanation!

  • @Wonderscope1
    @Wonderscope1 Рік тому

    Thanks for video, very informative

  • @rgv5966
    @rgv5966 6 місяців тому

    I don't think this kind of videos are available on Spark anywhere else. Great work Afaque!

  • @arunkindra832
    @arunkindra832 11 місяців тому +1

    Hi Afaque, in 1st case, when you have configured 1500 shuffle partitions, but initially you have said 1000 cores available in a cluster, and you have also mentioned about one partition per core. Then from where we got rest 500 partitions?
    Another doubt, do we need to configure no of cores consumed by a job according to the shuffle partitions we provide?
    Also, please explain a case where we don't have enough cores available in the cluster.

    • @afaqueahmad7117
      @afaqueahmad7117  11 місяців тому +1

      Hey @arunkindra832, in scenario 1, referring to the diagram, there are 20 cores in the cluster (5 executors * 4 core each). 1500 shuffle partitions are going to be processed by a 20 core cluster. Assuming each core is going to take the same amount of time to process a shuffle partition and the distribution of shuffle partitions is uniform, there's approximately going to be 1500/20 = 75 rounds. In 1 round, 20 shuffle partitions are going to be processed.

  • @tanushreenagar3116
    @tanushreenagar3116 6 місяців тому

    perfect video

  • @atifiu
    @atifiu 11 місяців тому +1

    @afaque how can I calculate total shuffle data size without executing the code before hand. Also size in disk is not same as in memory as memory data is uncompressed.

    • @afaqueahmad7117
      @afaqueahmad7117  11 місяців тому

      Hey @atifiu, calculating total shuffle data size without executing the code can be challenging due to dynamic nature of Spark's execution. There are several things which would come into picture for example: data distribution, skew, nature of transformations (wide vs narrow) depending on which you may / may not get an accurate shuffle data size;

  • @asokanramasamy2087
    @asokanramasamy2087 9 місяців тому

    Clearly explained!!

  • @tsaha100
    @tsaha100 Рік тому +1

    @Afaque : Very good video. So in real life for varying work load size ( shuffle write size 50mb - 300GB) you have to change the shuffle partition size programmatically ? How do you figure out the shuffle write in the code which find in the spark UI? Is there any solution?

    • @afaqueahmad7117
      @afaqueahmad7117  11 місяців тому

      Thanks for the kind works @tsaha100. I don't think there's a clear way to estimate the shuffle write statically which is shown on the Spark UI using code , because of dynamic nature of Spark's execution.
      If you would like to log the Shuffle write metrics when your task completes, you could try attaching the SparkListener to your SparkContext and override onTaskEnd method to capture shuffle write metrics, but I believe it's just easier to run and refer to the Spark UI.
      You can refer: books.japila.pl/apache-spark-internals/SparkListenerInterface/#ontaskgettingresult

  • @ravikrish006
    @ravikrish006 Рік тому

    Thank you it is very useful

  • @mirli33
    @mirli33 4 місяці тому

    Currently watching your playlist one by one. Great content. Very detailed explanation. In the first scenario you had 5 executors and with 4 cores each. If you have 1500 shuffle partition how they are going to be accommodated.

    • @afaqueahmad7117
      @afaqueahmad7117  3 місяці тому +2

      Hey @mirli33, glad you're finding the playlist helpful. Regarding the question, when you have 1500 shuffle partitions and 20 cores (5 x 4 = 20). 1 core takes up 1 partition, so having a total of 20 cores, first round will process 20 partitions. There will be a total of 1500/20 = 75 rounds of processing

  • @vijaykumar-b6i7t
    @vijaykumar-b6i7t 5 місяців тому

    i like very much of your videos, it's insightful. can you please make series/videos on Spark interview oriented questions. Thanks in advance

  • @mahendranarayana1744
    @mahendranarayana1744 5 місяців тому +1

    Great explanation, Thank you,
    But how would we know how to configure exact (at least best) "spark.sql.shuffle.partitions" at run time? Because each run/day the volume of the data is going to be changed. So, how do we determine the data volume at run time to set the shuffle.partitions number?

    • @afaqueahmad7117
      @afaqueahmad7117  3 місяці тому

      Hey @mahendranarayana1744, good question! There are 2 ways to solve this problem. With the new Spark versions (>= 3.0.0), you can let AQE handle the sizing which would according split partitions if the size per partition is huge or combine (coalesce) partitions if the sizes per partitions is small
      Another approach to solve the problem is estimating the size of the dataset at runtime and then dividing the size by the optimal partition size (100-200 MB) so that each partition has an optimal size. I've referenced about how to estimate the size of datasets in the "Bucketing" video here: ua-cam.com/video/1kWl6d1yeKA/v-deo.html
      You can refer to the exact algorithm on sizing datasets here: umbertogriffo.gitbook.io/apache-spark-best-practices-and-tuning/parallelism/sparksqlshufflepartitions_draft

  • @puneetgupta003
    @puneetgupta003 8 місяців тому

    Nice content Afaque !

    • @afaqueahmad7117
      @afaqueahmad7117  8 місяців тому

      Appreciate it @puneetgupta003, thank you!

  • @fitness_thakur
    @fitness_thakur 5 місяців тому

    could you please make video on stack overflow like what are scenario when it can occur and how to fix it

    • @afaqueahmad7117
      @afaqueahmad7117  5 місяців тому

      Are you referring to OOM (out of memory errors) - Driver & Executor?

    • @fitness_thakur
      @fitness_thakur 5 місяців тому

      @@afaqueahmad7117 No, basically when we have multiple layers under single session then at that time stack memory getting full so to break it we have to make sure we are using one session per layer. e.g- suppose we have 3 layers (internal, external, combined) and if you run these in single session then it will throw stackoverflow error at any place whenever its stack get overflow. We tried to increase stack as well but that was not working. Hence at the last we come up with approach like will run one layer and then close session likewise

  • @YounisShaik-f1j
    @YounisShaik-f1j 11 місяців тому

    @afaque shuffle partition will consist of both the shuffled data (keys that were not originally present in the executor and were shuffled to the partition) and the non-shuffled data (keys that were already present in the executor and were not shuffled). So, the size of the shuffle partition cannot be directly calculated from the shuffle write data alone,as it also depends on the distribution of the data across the partitions ?

  • @akshaybaura
    @akshaybaura Рік тому +1

    in scenario 1, how exactly is reducing the size of partitions beneficial ?
    Correct me if I'm wrong, in case we let a core process 1.5 GB, most of the data will spill to disk for computation which will increase IO and hence increase time taken for completion. However, in case we reduce the partition size, we increase the number of partitions as a result which again would increase the time taken for job completion.

    • @kartikthakur6461
      @kartikthakur6461 Рік тому +1

      I am assuming Disk computation will take much longer to complete.

    • @afaqueahmad7117
      @afaqueahmad7117  Рік тому +2

      Hey @akshaybaura, @kartikthakur6461, you're on the right track. Reducing each partition from 1.5g to 200mb does increase the number of partitions, but it is beneficial for the following important reasons:
      1. Reduced memory pressure: when a core processes a large partition (1.5g), it's more likely to run out of memory and start spilling to disk. This spill-over is going to cause increased IO operations which in turn is significantly going to slow down processing. However, I would still emphasize on the fact that spill would depend on the memory per core. If memory per core is > 1.5g, spills won't happen but processing is going to be significantly slow.
      2. Better resource utilisation: increasing the number of partitions allows better distribution of workload across the cores.
      3. Balanced workload: Smaller partitions help in achieving a more balanced workload across cores. The importance is in ensuring that each core is given to process a "manageable" amount of data.
      However, the goal is to strike a balance - partition sizes should be small enough to avoid excessive memory usage and I/O spillover but large enough to ensure that the overhead of managing many partitions doesn’t outweigh the benefits.

    • @Aravind-gz3gx
      @Aravind-gz3gx Місяць тому

      @@afaqueahmad7117 So enabling the AQE would be beneficial right, it will automatically decide the no. of shuffle partitions to increase/decrease to handle an adequate amount of data to be processed for each partition according to the cluster resources.

  • @ajaykiranchundi9979
    @ajaykiranchundi9979 Місяць тому

    Thanks a ton for the insightful videos. Really appreciate your efforts.
    However , I have a question regarding the second case where the data is much smaller. Just because we have idle sitting cores, should we leverage them ? Because, driver program is going to have a run to gather the results processed in each partition, which is going to add to the network delays. But just use one core/partition and complete the job. Also, for small volumes, I think Spark might be a over kill :) .
    At the same time I have a question - during aggregations there can be a skew in data as well, right ? I mean if we take the same store and sales example one store has huge volume of sales records while other do not. how do we optimize such skews.
    Looking forward for your answers.

    • @afaqueahmad7117
      @afaqueahmad7117  Місяць тому

      Hey @ajaykiranchundi9979, appreciate the kind words and good questions
      #1. The goal in the example was to emphasize maximum cluster utilization, but, you raise a valid point. In a case where using idle cores just for the sake of it leads to more shuffles or network delays because data is distributed more than it should be, is a bad idea and also could lead to small file problems. So, I agree w/ you, if your job has tons of data but being operated on a few cores while others are idle, it's ideal to split out and use the other idle cores. On the other hand if the data itself is too small, it's indeed a bottleneck to split, distribute, shuffle and gather the data.
      #2. I've 3 videos specifically on Skews and how you can fix them using AQE, Broadcast Joins, Salting. You can have a look here: ua-cam.com/play/PLWAuYt0wgRcLCtWzUxNg4BjnYlCZNEVth.html
      Hope that clarifies :)

  • @iamkiri_
    @iamkiri_ 11 місяців тому

    Nice explanation -)

  • @hemant9253
    @hemant9253 Місяць тому

    Hi Afaque if you can share the slides as notes that would be great..

    • @afaqueahmad7117
      @afaqueahmad7117  Місяць тому

      Hey @hemant9253, I was writing and recording on the Nebo App on iPad. It's gone now. You can take screenshots and save (if that helps) :)

  • @showbhik9700
    @showbhik9700 7 місяців тому

    Lovely!

  • @tandaibhanukiran
    @tandaibhanukiran 10 місяців тому

    Thank you for explaining in detail.You are the best guy around. Can you also please explain me if there is a way to dynamically update the shuffle partition with the help of dynamic calculations of size and no. of cores in the cluster(if in case the cluster is altered in future).
    Thanks in advance.

    • @afaqueahmad7117
      @afaqueahmad7117  10 місяців тому +1

      @tandaibhanukiran4828 I believe it's challenging to be able to dynamically configure shuffle partitions only knowing the size of your data and cluster configuration. The most important input is the "Shuffle Write". Estimating shuffle write is not very clear-cut as it depends on several factors (skew, transformation code complexity i.e. joins, aggregations, dynamic execution plans etc..)
      If you have historical data, or similar jobs (using same/similar) datasets with similar operations i.e. joins aggregations, you could use those "Shuffle Partition" numbers and apply the logic (as demonstrated in the scenarios) to dynamically get the number of shuffle partitions.
      However, I would stress to use this approach with caution.

    • @tandaibhanukiran
      @tandaibhanukiran 10 місяців тому

      Thank you very much.@@afaqueahmad7117

  • @gautamkatyal9966
    @gautamkatyal9966 Місяць тому

    So 12 will be noof shuffle partition will going to set in data per shuffle partition is very less in 2nd solution.

  • @abdulwahiddalvi7119
    @abdulwahiddalvi7119 6 місяців тому

    @Afaque thank you for making these videos. Very helpful. I have questions how do we estimate the data size? We run our batches/jobs on spark and each batches could be processing varying size of data. Some batches could be dealing with 300Gb and some could be 300Mb. How do we calculate optimal number of shuffle partitions?

  • @leonardopetraglia6040
    @leonardopetraglia6040 5 місяців тому

    Thanks for the video! I also have a question: when I execute complex query, there will be multiple stage with different shuffle write sizes, which do I have to take in consideration for the computation of the optimal number of shuffle partitions?

    • @afaqueahmad7117
      @afaqueahmad7117  3 місяці тому

      Hey @leonardopetraglia6040, it's best to start with the largest shuffle write across all stages

  • @dasaratimadanagopalan-rf9ow
    @dasaratimadanagopalan-rf9ow 5 місяців тому

    Thanks for the content, really appreciate it. My understanding is AQE take care of Shuffle Partition Optimization and we don't need to manually intervene (starting spark 3) to optimize shuffle partitions. Could you clarify this please?

  • @Kiedi7
    @Kiedi7 11 місяців тому

    Hey man, awesome series so far. I noticed in your videos that you share your mac screen but use an apple pencil on your ipad to annotate? Can you describe that setup on how you’re able to annotate on your Jupyter Notebook (presented on mac) but from an ipad instead? Thanks in advance appreciate it

    • @afaqueahmad7117
      @afaqueahmad7117  11 місяців тому

      Thank you! I use Ecamm Live so adding both iPad and Mac as a screen helps me navigate easily between both.
      For the ones, where I’ve annotated on Notebooks, I’ve used Zoom to share my screen and annotate on Zoom.

  • @nikhillingam4630
    @nikhillingam4630 6 місяців тому

    Consider a scenario where my first data shuffle size is 100gb then giving more shuffle partitions make sense now in the last shuffle data size is drastically reduced to 10gb according to calculations how would be to give shuffle partitions giving 1500 would benefit for the first shuffle and not for the last shuffle. How do one approach this scenario

  • @AshishStudyDE
    @AshishStudyDE 4 місяці тому

    Sir still waiting for a dedicated video for driver and executor omm in very detailed. Question on if the file is 100 gb, and can we sort it? If yes will there be data spill, basically a interview quesion for 8+ year exp

    • @afaqueahmad7117
      @afaqueahmad7117  3 місяці тому

      Hey @AshishStudyDE, hopefully more content coming soon; The answer to the question would depend a lot on the resource configuration i.e. no. of cores, memory, execution memory per core. If the "execution memory" per core > partition size (assuming 100gb file is divided into partitions), there shouldn't be problems processing the files
      If you would like to know more about "Execution memory", please refer to the Memory Management video here: ua-cam.com/video/sXL1qgrPysg/v-deo.html

  • @vikastangudu712
    @vikastangudu712 8 місяців тому

    Thanks for the explanantion, But Isn't the parameter(spark.sql.shuffle.partitions) is no way dependent on the cradinality of the group by/ join column ?

  • @crepantherx
    @crepantherx Рік тому

    can you please cover bucketing handson in adb(handson with file view). In your last video it is working in your IDE but not in databricks. (delta bucketing not allowed)

  • @JustDinesh-1934
    @JustDinesh-1934 4 місяці тому

    I have learned somewhere that the max partition size can only be 128mb in spark. Isnt that contradict to what you mentined when explaining about 300GB example? Just asking to Correct myself if wrong.

    • @afaqueahmad7117
      @afaqueahmad7117  3 місяці тому +1

      Hey @JustDinesh-1934, good question! Anything between 100-200 MB is a reasonable number for optimal performance, but it can depend on several factors, specially your cluster, number of cores, total memory and memory available per core; As long as the "execution memory" per core > partition size, things should work just fine :)

  • @ramvel814
    @ramvel814 11 місяців тому

    Can you tell how to resolve Python worker exited unexpectedly (crashed)

  • @SpiritOfIndiaaa
    @SpiritOfIndiaaa 6 днів тому

    Good but would have been more appropriate to cover the join , how to parition data before join to make sure join works properly.

  • @yahyashaikhworld
    @yahyashaikhworld 3 дні тому

    wow

  • @kaushikghosh-po1ew
    @kaushikghosh-po1ew 8 місяців тому

    i have a question in this. Let's say that the data volume that i am processing varies on daily basis i..,e someday it can be 50gb someday it can be 10gb. keeping in mind the 200mb per shuffle partition limit the num of partition for optimum partition should change on each run in that case. But it;s not practically possible to change the code every time to have a proper shuffle partition. How should this scenario be handled ? i read about a parameter sql.files.maxPartitionBytes which is defaulted to 128mb. Should i change this to 200 and let the number of shuffle partition be calculated automatically ? In that case will the value under sql.shuffle.partitions be ignored ?

    • @Thevisionaryaddy
      @Thevisionaryaddy 3 місяці тому

      I think here you can use Kafka also with conjugation with Pyspark.
      So if the data is more increase the partition dynamically in Kafka and if data is less decrease the partition in kafka and then com to an optimal number or keep partition same in kafka so that each partition in kafka = partition in pyspark and then it can process the data accordingly based on the core

  • @rohitsharma-mg7hd
    @rohitsharma-mg7hd Місяць тому

    in scenario 1 :
    size per sp should be 15 GB, ur calculation is wrong(1.5 gb),
    according to you each executor will do 1.5GB*4=6 GB processing.
    Total executor is 5 so 5*6=30 instead of 300 gb?

  • @VenkatakrishnaGangavarapu
    @VenkatakrishnaGangavarapu Рік тому

    thanks again for the topic. i worried that you stopped ... because this deapth knowledge can not even get from colleauges

  • @SuperSuryarocks
    @SuperSuryarocks 2 місяці тому

    Nice explanation! But just pointing out that the second scenario practically doesn't make sense for a 10x improvement. If your data size is 50mb, rather than trying to utilize all cores think of using just one core so that shuffle is not required all together.

    • @afaqueahmad7117
      @afaqueahmad7117  2 місяці тому

      You're right, the second scenario is a bit contrived; The goal was to highlight situations where fewer cores are under too much pressure and there may be room for utilising all resources, while reducing pressure on some of the cores; thanks for pointing it out!

  • @Revnge7Fold
    @Revnge7Fold 6 місяців тому +1

    I think its a bit dumb for spark to keep this value static... why not rather have a "target shuffle size(mb/gb)" config in spark. I wish the spark planner was a bit more sophisticated.

    • @afaqueahmad7117
      @afaqueahmad7117  6 місяців тому

      You could get a similar effect by turning on AQE and setting "spark.sql.adaptive.advisoryPartitionSizeInBytes" to your desired size. Documentation here: spark.apache.org/docs/latest/sql-performance-tuning.html

    • @Revnge7Fold
      @Revnge7Fold 6 місяців тому

      @@afaqueahmad7117 Awesome! Thanks for the advice! Your videos have been really helpful!

  • @vinothvk2711
    @vinothvk2711 Рік тому

    In Scnario 2 - Finally, we are distributing 4.2 mb for each core. In this case whats the spark.sql.shuffle.partitions . Is it 12?

    • @afaqueahmad7117
      @afaqueahmad7117  Рік тому

      Yes @vinothvk2711, it's 12. As explained, this is going to ensure 100% utilisation of the cluster.