super super super detailed way thanks for uploading. i was unable understand it before but now could understand clearly..... thanks a lot. .... please do this kind indeapth topic videos when ever you are free to do. (u may not get view and money like other entertainment vidoes. but you are helping people to grow in this field surly there are so many people benifitting from you're content. please continue to do this kind of videos)
Really great explanation. Hard to find someone talking about what matters without fluff. I have more implementation related question. If I have many tables ranging from 100mb to 10gb. should i dynamically change my maxparititon bytes before i process this data. if i have 100 tables, does it make sense to adjust this parameter in a loop or something before i start transforming the dataframe
Very good explaination! One question, what do you recommend for big fact table CLUSTER BY or PARTITION BY based on months and year columns? CLUSTER BY is a new concept and you don't have to run OPTIMIZE command for maintance but one thing it doesn't create a separate directory as PARITITION BY which can lead read multiple files for the same months or years instead just go one directory folder. Please advise. Thank you!
Thanks for the detailed video. I have few questions here on partitioning. 1. How does it decide the number of partitions if we dont specify the properties and is it good to do repartition(some 400) after read. Is it good practice? 2. How does we decide the number for repartition value before writing to disk? If we put large number to repartition method, will that be optimal?
Good video. In fact, all of your videos are. One thing, in this video majorly you were talking about actual physical partitions on the disk. But towards the end, when you were talking about "maxpartitionbytes" and doing only a READ operation, you were talking about shuffle partitions which is in-memory and not disk partitions. I had found that hard to grasp for a very long time, so wanted to confirm if my understanding is right here.
Hey @utsavchanda4190, many thanks for the appreciation. To clarify, when talking about "maxpartitionbytes", I'm referring to partitions that Spark reads from files into memory. These are not shuffle partitions, shuffling will only come in picture in cases of wide transformations (e.g. groupby, joins). Therefore, "maxpartitionbytes" will dictate how many partitions will be read by Spark from the files into Dataframes in memory.
@@afaqueahmad7117 that's right. And that is still in memory and not physical partitions, right? I think this video covers both, physical disk partitions as well as in-memory partitions.
Thank you for sharing knowledge in detail . i have become big fan of you and shared spark video to couple of my friends and seniors . Just curious to know which tool you are using to run spark code. If possible please upload one video related to spark installation on mac.
Hey @purushottamkumar8336, it's humbling to hear this, appreciate your kind words. I've a local installation of Spark that I use w/ VSCode. I would suggest there are several blogs on Medium/Substack that you could refer for Spark Installation :)
Thanks for the appreciation. Yep, it's VS Code. It's quite simple, not a lot of stuff on top except the terminal. Can share the Medium article I referred to, for setting it up :)
Thank you so much again! I have one follow up question about partition during writes. If I use a df.write but specify no partitioning column or use repartition, could you pls let me know how many partitions does spark write to by default? Does it simply take the number of input partitions (total input size / 128m) or assuming if shuffling was involved and the default shuffle partitions being used were 200 , does it use that shuffled partition number ? Thank you
Hey @anandchandrashekhar2933, so basically it should fall into 2 categories: 1. If shuffling is performed: Spark will use the value of `spark.sql.shuffle.partitions` (defaults to 200) for the number of partitions during the write operation. 2. If shuffling is not performed: Spark will use the current number of partitions in the DataFrame, which could be based on the input data's size or previous operations. Hope this clarifies :)
Thank for the informative videos. I have a question regarding repatiton(4).partitionby(key) Does it mean 4 part files in each of the partition will be a separate partition while reading ? Or it considers the maxpartitonbytes specified and depending upon the size it creates partition (combining two or more part files) if the both size is within the maxpartitionbytes limit
Hey @vamsikrishnabhadragiri402, `spark.sql.files.maxPartitionBytes` will be taken into consideration when reading the files. If each of the 4 part files is smaller than `spark.sql.files.maxPartitionBytes` e.g. each part is 64 MB and `spark.sql.files.maxPartitionBytes` is defined to be 128MB, then 4 files (partitions) will be read separately. Spark does not go into the overhead of merging files to bring it to 128MB. Consider another example where each part is greater than `spark.sql.files.maxPartitionBytes` (as discussed in the video), each of those parts will be broken down into sizes defined by `spark.sql.files.maxPartitionBytes` :)
Hello, thanks for this video and for the whole course. I have a question about high cardinality columns: Say you have table A and table B with customer_id on both. You want to perform a join on this column, how do you alleviate the performance issue that occurs?
Hi bro, thanks for such valuable content. I have a doubt, What is the need of repartition before partitionBy? I understand that it helps to create files within partition but how can it help to optimize? Please clarify.
Hey @tusharannam2.0, the number of files inside each `partitionBy` is determined by several factors. One of them for e.g. can be the block size (of 128MB). `repartition` before a `partitionBy` can be helpful in situations where: 1. There are too many output files - may end up w/ the small file problem 2. The are too few files - may end up with a single large file or skewed datasets The two situations can be controlled by repartitioning before a `partitionBy`
An alternate way to do this without using `repartition` before `partitionBy` - because data volume keeps changing and setting a static number for `repartition` may cause trouble later down the line So you could write data using only a `partitionBy`: df.write.partitionBy("year", "month").parquet("/path/to/output") And dynamically breaking the files at chunks of your preferred size by setting the config here: spark.conf.set("spark.sql.files.maxPartitionBytes", "")
I am a little bit confused: at minute 15:17 in a specific folder relating to a specific value of listen_date you say that there is only 1 file that corresponds to 1 partition. But I thought that partitions are created depending on the values of listen_date, so as far as I can see, I would say there are more than 30 partitions (each one corresponding to a specific value of listen_date). After that you used repartition function to change the number of partitions inside each folder. So the question is: the number of partitions is the number of listen_date folder or the number of file inside each folder?
Hey @retenim28, each listen_date folder is a partition. So you're right in saying that each partition corresponds to a specific value of listen_date. Each unique value of listen_date would result in a separate folder (a.k.a partition). Each parquet file (those part-000.. files) inside a partition (folder) will represent the actual physical storage of data rows belonging to that partition. Therefore, to answer your question, number of partitions = number of listen date folders;
@@afaqueahmad7117 oh thank you sir, just got the point. But I have another question: since Spark is interested in the number of partitions, which is the advantage of creating more files for each partition? The number of partitions remains the same, so the parallelism is just the same in both cases where we consider 10 files inside a partition or 3 files inside.
Good question @retenim28. The level of parallelism during data processing (e.g. number of tasks to be launched, 1 task = 1 partition) is determined by the number of partitions. However, the number of parquet files inside each partition plays a role in read/write, I/O parallelism. Spark, when reading data from storage, would read each of the parquet files in parallel even if they're part of the same partition. It will hence be able to assign more resources to do a faster data load. Same is the case for writes. Just be cautious that we don't end up with too many parquet files (small file problem) or few large files (leading to data skew)
Thanks for the content Afaque. Question regarding spark.sql.files.maxPartitionBytes. I was thinking about this would be beneficial when reading a file that you know the size of upfront. What about files you don’t know the size. Do you recommend repartition or coalesce in those cases to adjust the number of partitions for the Dataframe?
Hey @kvin007, you could use a technique to determine the size of a DataFrame explained here ua-cam.com/video/1kWl6d1yeKA/v-deo.html at 23:30. The link used in the video is umbertogriffo.gitbook.io/apache-spark-best-practices-and-tuning/parallelism/sparksqlshufflepartitions_draft
Hey @Amarjeet-fb3lk, Good question, I should have pulled the editor sidebar to the right for clarity. It's 3 files actually, the remaining 3 files are `.crc` files which is created for data integrity by Spark - to make sure the written file is not corrupted.
super super super detailed way thanks for uploading. i was unable understand it before but now could understand clearly..... thanks a lot. .... please do this kind indeapth topic videos when ever you are free to do. (u may not get view and money like other entertainment vidoes. but you are helping people to grow in this field surly there are so many people benifitting from you're content. please continue to do this kind of videos)
Till now the best explaination in youtube. Thank you very much.
Really great explanation. Hard to find someone talking about what matters without fluff. I have more implementation related question. If I have many tables ranging from 100mb to 10gb. should i dynamically change my maxparititon bytes before i process this data. if i have 100 tables, does it make sense to adjust this parameter in a loop or something before i start transforming the dataframe
again!! its a great content ,very much clearly explained. 🙏
I love you bro for such crisp explanation, the way to experiment and teach helps a lot!
@kartikjaiswal8923 Appreciate it man :)
tranks for share your knowlegde, your videos are amazing.
Very good explaination! One question, what do you recommend for big fact table CLUSTER BY or PARTITION BY based on months and year columns? CLUSTER BY is a new concept and you don't have to run OPTIMIZE command for maintance but one thing it doesn't create a separate directory as PARITITION BY which can lead read multiple files for the same months or years instead just go one directory folder. Please advise. Thank you!
very great detailed way understandable way so... Great ...
Superb knowledge
Thanks for the detailed video. I have few questions here on partitioning. 1. How does it decide the number of partitions if we dont specify the properties and is it good to do repartition(some 400) after read. Is it good practice? 2. How does we decide the number for repartition value before writing to disk? If we put large number to repartition method, will that be optimal?
Good video. In fact, all of your videos are. One thing, in this video majorly you were talking about actual physical partitions on the disk. But towards the end, when you were talking about "maxpartitionbytes" and doing only a READ operation, you were talking about shuffle partitions which is in-memory and not disk partitions. I had found that hard to grasp for a very long time, so wanted to confirm if my understanding is right here.
Hey @utsavchanda4190, many thanks for the appreciation. To clarify, when talking about "maxpartitionbytes", I'm referring to partitions that Spark reads from files into memory. These are not shuffle partitions, shuffling will only come in picture in cases of wide transformations (e.g. groupby, joins). Therefore, "maxpartitionbytes" will dictate how many partitions will be read by Spark from the files into Dataframes in memory.
@@afaqueahmad7117 that's right. And that is still in memory and not physical partitions, right? I think this video covers both, physical disk partitions as well as in-memory partitions.
Yes, those are partitions in memory :)
Thank you for sharing knowledge in detail . i have become big fan of you and shared spark video to couple of my friends and seniors .
Just curious to know which tool you are using to run spark code. If possible please upload one video related to spark installation on mac.
Hey @purushottamkumar8336, it's humbling to hear this, appreciate your kind words. I've a local installation of Spark that I use w/ VSCode. I would suggest there are several blogs on Medium/Substack that you could refer for Spark Installation :)
great video as always - when can we get a video to set up our IDE like yours? really nice UI - visual studio I believe?
Thanks for the appreciation. Yep, it's VS Code. It's quite simple, not a lot of stuff on top except the terminal. Can share the Medium article I referred to, for setting it up :)
Thank you so much again! I have one follow up question about partition during writes. If I use a df.write but specify no partitioning column or use repartition, could you pls let me know how many partitions does spark write to by default?
Does it simply take the number of input partitions (total input size / 128m) or assuming if shuffling was involved and the default shuffle partitions being used were 200 , does it use that shuffled partition number ?
Thank you
Hey @anandchandrashekhar2933, so basically it should fall into 2 categories:
1. If shuffling is performed: Spark will use the value of `spark.sql.shuffle.partitions` (defaults to 200) for the number of partitions during the write operation.
2. If shuffling is not performed: Spark will use the current number of partitions in the DataFrame, which could be based on the input data's size or previous operations.
Hope this clarifies :)
Thank for the informative videos. I have a question regarding repatiton(4).partitionby(key)
Does it mean 4 part files in each of the partition will be a separate partition while reading ?
Or it considers the maxpartitonbytes specified and depending upon the size it creates partition (combining two or more part files) if the both size is within the maxpartitionbytes limit
Hey @vamsikrishnabhadragiri402, `spark.sql.files.maxPartitionBytes` will be taken into consideration when reading the files.
If each of the 4 part files is smaller than `spark.sql.files.maxPartitionBytes` e.g. each part is 64 MB and `spark.sql.files.maxPartitionBytes` is defined to be 128MB, then 4 files (partitions) will be read separately. Spark does not go into the overhead of merging files to bring it to 128MB.
Consider another example where each part is greater than `spark.sql.files.maxPartitionBytes` (as discussed in the video), each of those parts will be broken down into sizes defined by `spark.sql.files.maxPartitionBytes` :)
Hello, thanks for this video and for the whole course. I have a question about high cardinality columns: Say you have table A and table B with customer_id on both. You want to perform a join on this column, how do you alleviate the performance issue that occurs?
by doing bucketing
Hi bro, thanks for such valuable content. I have a doubt, What is the need of repartition before partitionBy? I understand that it helps to create files within partition but how can it help to optimize? Please clarify.
Hey @tusharannam2.0, the number of files inside each `partitionBy` is determined by several factors. One of them for e.g. can be the block size (of 128MB). `repartition` before a `partitionBy` can be helpful in situations where:
1. There are too many output files - may end up w/ the small file problem
2. The are too few files - may end up with a single large file or skewed datasets
The two situations can be controlled by repartitioning before a `partitionBy`
An alternate way to do this without using `repartition` before `partitionBy` - because data volume keeps changing and setting a static number for `repartition` may cause trouble later down the line
So you could write data using only a `partitionBy`:
df.write.partitionBy("year", "month").parquet("/path/to/output")
And dynamically breaking the files at chunks of your preferred size by setting the config here:
spark.conf.set("spark.sql.files.maxPartitionBytes", "")
Thanks for clarifying!
I am a little bit confused: at minute 15:17 in a specific folder relating to a specific value of listen_date you say that there is only 1 file that corresponds to 1 partition. But I thought that partitions are created depending on the values of listen_date, so as far as I can see, I would say there are more than 30 partitions (each one corresponding to a specific value of listen_date). After that you used repartition function to change the number of partitions inside each folder. So the question is: the number of partitions is the number of listen_date folder or the number of file inside each folder?
Hey @retenim28, each listen_date folder is a partition. So you're right in saying that each partition corresponds to a specific value of listen_date. Each unique value of listen_date would result in a separate folder (a.k.a partition). Each parquet file (those part-000.. files) inside a partition (folder) will represent the actual physical storage of data rows belonging to that partition.
Therefore, to answer your question,
number of partitions = number of listen date folders;
@@afaqueahmad7117 oh thank you sir, just got the point. But I have another question: since Spark is interested in the number of partitions, which is the advantage of creating more files for each partition? The number of partitions remains the same, so the parallelism is just the same in both cases where we consider 10 files inside a partition or 3 files inside.
Good question @retenim28. The level of parallelism during data processing (e.g. number of tasks to be launched, 1 task = 1 partition) is determined by the number of partitions. However, the number of parquet files inside each partition plays a role in read/write, I/O parallelism. Spark, when reading data from storage, would read each of the parquet files in parallel even if they're part of the same partition. It will hence be able to assign more resources to do a faster data load. Same is the case for writes. Just be cautious that we don't end up with too many parquet files (small file problem) or few large files (leading to data skew)
@@afaqueahmad7117 thank you very much sir. I also watched the series about data skew.. very clear explanation
thank you
Appreciate it :)
Thanks for the content Afaque. Question regarding spark.sql.files.maxPartitionBytes. I was thinking about this would be beneficial when reading a file that you know the size of upfront. What about files you don’t know the size. Do you recommend repartition or coalesce in those cases to adjust the number of partitions for the Dataframe?
Hey @kvin007, you could use a technique to determine the size of a DataFrame explained here ua-cam.com/video/1kWl6d1yeKA/v-deo.html at 23:30. The link used in the video is umbertogriffo.gitbook.io/apache-spark-best-practices-and-tuning/parallelism/sparksqlshufflepartitions_draft
@@afaqueahmad7117 awesome, thanks for the response!
At 16.39 , when u use repartition(3) , why there are 6 files?
Hey @Amarjeet-fb3lk, Good question, I should have pulled the editor sidebar to the right for clarity. It's 3 files actually, the remaining 3 files are `.crc` files which is created for data integrity by Spark - to make sure the written file is not corrupted.