Want to learn more Big Data Technology courses. You can get lifetime access to our courses on the Udemy platform. Visit the below link for Discounts and Coupon Code. www.learningjournal.guru/courses/
Just to add, "cluster.partitionCountForTopic(topic);" will also give you the number of partitions for a topic. I am using in kafka_2.13-2.8.1, so this should be available in the upper versions as well. Thank you for this awesome series on Apache Kafka.
running with sbt can be skipped altogether and could use an ide straight, if not really familiar with sbt yet. I used netbeans/ maven to run and it just worked great.
How we can print the Partition with Message in windows . same like you have printed at @11:44. Actually I am using window machine and can't able to find any command which give me in which partition what message reside . Through topic we can read but want to check through the partition .If possiable read all the partition and message to the topic .it's greatful to me
Thanks for the explanation, Is there a benefit of implementing such custom partitioner to determine the partition to store instead of using different topic altogether for such cases? I might be missing something here, appreciate if someone could help me answer. Thanks again
I think partitioner will help in evenly distributing the messages in the cluster of machines..it would be bad if most of our messages end up getting stored in one server while others are not utilised.
It depends on your use case. If you feel the need to create an entirely new topic to accommodate a completely different set of events, then you should go ahead and do since topics are intended for that purpose. However, if there isn't a need for a new topic, then you add partitions to your existing topic and scale horizontally - this is a perfect use of Kafka's amazing scalability. You can continue to use the same brokers, but you'd be compromising on your fault tolerance. So, when you increase partitions, better to increase the number of brokers as well.
Yes it is. And they all get routed to to same reducer. But that's not a problem because grouping happens on the key and hash is only to distribute the workload.
Thanks for this wonderful video. I have a doubt here: In custom partitioner section, you are hashing the message value in step3 and hashing key in step 4. So in step 3, do you mean message value is the actual message sent by producer? Are you hashing the actual message?
Hi Sir, Am using Java IDE and i get below error messge Exception in thread "main" org.apache.kafka.common.config.ConfigException: Invalid value SensorPartitioner for configuration partitioner.class: Class SensorPartitioner could not be found. SensorPartitioner has been define under same package
In Kafka we are using and write the code Only in Java.... we don't need to write the code in Spark. if Spark is possible send some links for learning Spark Producer codes and etc... thanks for giving very expensive Knowledge!!
I think there is a little mistake in the explanation where you say that SSP3 and SSP8 have been allotted the same partition because the hash doesn't guarantee unique value. It could very well be because we do a % (numberOfParitions - sp), which in this case will range between 3-9.. so there is a probability that every 1/6th key gets assigned to the same partition.. Correct me if I am wrong please
Hi sir, Thanks for the playlist and all are well explained. Will you please show me an example how to write the same in Scala. How to write custom partitioner in scala. I tried but implemented methods not shown in scala, bcz scala doesn't support interface. Is there any alternate way to write the same logic in scala. Thanks Venkatesh
Sir, how can we create topic while reading some csv file ? Like there are 10-15 fields in CSV and we want to create topics on every unique value which is present in column 6. is this possible ?
Sir, in the Custom partitioner that you have implemented, starts returning different partition when the number of partitions increase right? Because of the dependency of number of partitions we have moved away from Key based partitioning. Am I missing anything
You can use Key for partitioning. But be aware that data for more than one keys can come to the same partition. If that's not your requirement, Implement your own partitioner.
Hi sir, I have some queries ...., can you please clear those? In above program we are passing broker configuration through "bootstrap.servers" property. Is it mandatory, or any possibility to achieve same through Zee-Keeper? How Producer interacting with Zoo-Keeper to have brokers information(cluster information) for a topic?
when i run the program,i am getting below error,please help me.. Exception in thread "main" org.apache.kafka.common.config.ConfigException: Invalid value SensorPartitioner for configuration partitioner.class: Class SensorPartitioner could not be found.
How are you compiling the code? If you are using SBT, you need to have the code for SensorPartitioner in the same directory. If using some other tool, make sure the class for SensorPartitione is in your class path.
p = Utils.toPositive(Utils.murmur2(valueBytes)) % sp; I am getting the error at .toPositive. i am unable to find toPositive method in sensorpartitioner. Plzz Help me!!
Want to learn more Big Data Technology courses. You can get lifetime access to our courses on the Udemy platform. Visit the below link for Discounts and Coupon Code.
www.learningjournal.guru/courses/
Excellent Sir, Thank you for giving detailed insights and showing how message key partition will get in to trouble. Very Practical.. Thank you sir.
Thanks for explaining such important detail!
nice explanation! You saved my time!
Awesome sir..Good Learning for me...
Fantastic!!!!!
Just to add, "cluster.partitionCountForTopic(topic);" will also give you the number of partitions for a topic. I am using in kafka_2.13-2.8.1, so this should be available in the upper versions as well. Thank you for this awesome series on Apache Kafka.
Why is code formatting messed up?
it would be great if you can tell something about how you used sbt tool :)
Sure, will add it soon. Probably in Scala+Spark Tutorials.
Thanks for the detailed expalnation. Did you create scala + spark tutorials?
running with sbt can be skipped altogether and could use an ide straight, if not really familiar with sbt yet. I used netbeans/ maven to run and it just worked great.
How we can print the Partition with Message in windows . same like you have printed at @11:44. Actually I am using window machine and can't able to find any command which give me in which partition what message reside . Through topic we can read but want to check through the partition .If possiable read all the partition and message to the topic .it's greatful to me
Kafka Security tutorials will be helpful as well
Thanks for the explanation, Is there a benefit of implementing such custom partitioner to determine the partition to store instead of using different topic altogether for such cases? I might be missing something here, appreciate if someone could help me answer. Thanks again
I have the same doubt
I think partitioner will help in evenly distributing the messages in the cluster of machines..it would be bad if most of our messages end up getting stored in one server while others are not utilised.
It depends on your use case. If you feel the need to create an entirely new topic to accommodate a completely different set of events, then you should go ahead and do since topics are intended for that purpose. However, if there isn't a need for a new topic, then you add partitions to your existing topic and scale horizontally - this is a perfect use of Kafka's amazing scalability. You can continue to use the same brokers, but you'd be compromising on your fault tolerance. So, when you increase partitions, better to increase the number of brokers as well.
Where your calling the int partitioner???
If different keys can lead to same hash..isnt it possible for different values to give same hash??
Yes it is. And they all get routed to to same reducer. But that's not a problem because grouping happens on the key and hash is only to distribute the workload.
Thanks for this wonderful video.
I have a doubt here: In custom partitioner section, you are hashing the message value in step3 and hashing key in step 4.
So in step 3, do you mean message value is the actual message sent by producer? Are you hashing the actual message?
yup, he is hashing the entire message value to ensure its cardinality, vs. using the same key for all these messages.
Hi Sir, Am using Java IDE and i get below error messge
Exception in thread "main" org.apache.kafka.common.config.ConfigException: Invalid value SensorPartitioner for configuration partitioner.class: Class SensorPartitioner could not be found.
SensorPartitioner has been define under same package
shall we run kafka without sbt ?
You don't need SBT Kafka. SBT is a build tool compile and package your code.
In Kafka we are using and write the code Only in Java.... we don't need to write the code in Spark. if Spark is possible send some links for learning Spark Producer codes and etc...
thanks for giving very expensive Knowledge!!
I think there is a little mistake in the explanation where you say that SSP3 and SSP8 have been allotted the same partition because the hash doesn't guarantee unique value. It could very well be because we do a % (numberOfParitions - sp), which in this case will range between 3-9.. so there is a probability that every 1/6th key gets assigned to the same partition.. Correct me if I am wrong please
Hi sir,
Thanks for the playlist and all are well explained.
Will you please show me an example how to write the same in Scala. How to write custom partitioner in scala.
I tried but implemented methods not shown in scala, bcz scala doesn't support interface.
Is there any alternate way to write the same logic in scala.
Thanks
Venkatesh
I don't think there is an official Kafka client for Scala.
Sir, how can we create topic while reading some csv file ? Like there are 10-15 fields in CSV and we want to create topics on every unique value which is present in column 6. is this possible ?
Sir , Can I get list of partition from consumer class.
Yes, you can get it from the Cluster object availablePartitionsForTopic
Learning Journal thanks , I will try this ....
Sir, in the Custom partitioner that you have implemented, starts returning different partition when the number of partitions increase right? Because of the dependency of number of partitions we have moved away from Key based partitioning. Am I missing anything
You can use Key for partitioning. But be aware that data for more than one keys can come to the same partition. If that's not your requirement, Implement your own partitioner.
Hi sir, I have some queries ...., can you please clear those?
In above program we are passing broker configuration through "bootstrap.servers" property. Is it mandatory, or any possibility to achieve same through Zee-Keeper?
How Producer interacting with Zoo-Keeper to have brokers information(cluster information) for a topic?
It is a necessary property. New Kafka API is removing dependency for the Client application on Zookeeper.
when i run the program,i am getting below error,please help me..
Exception in thread "main" org.apache.kafka.common.config.ConfigException: Invalid value SensorPartitioner for configuration partitioner.class: Class SensorPartitioner could not be found.
How are you compiling the code? If you are using SBT, you need to have the code for SensorPartitioner in the same directory. If using some other tool, make sure the class for SensorPartitione is in your class path.
I have sensorpartitioner in same directory only..I am not using any build tool...
Send me the steps that you are following. Don't paste it in the comment. Send me a private message. I will try to resolve your error.
Thanks!!
Please mention absolute path(With Package name) of the class in config.
p = Utils.toPositive(Utils.murmur2(valueBytes)) % sp;
I am getting the error at .toPositive.
i am unable to find toPositive method in sensorpartitioner.
Plzz Help me!!
Do you have import org.apache.kafka.common.utils.*; in your code.
This function is in Utils.
Yess i have imported!!But still it is not working!!
I am also getting the same error..please let me know if you have resolved it
sumit kumar : can you share ur code for that line along with your import from top
Ayyappa Eswar can you share ur line u getting error also import. Also do share the kafka version
This is difficult. Describe it visually along with code also