@@nifinotes5127 That is, even if I push the data to another topic, the offset will be commited in the topic from which it was originally read? As non obvious side effect?
Is it possible to implement similar scenario with manual commit with RabbitMQ? Does NiFi support this? I read the documentation for bundled AMQP processors, but did not find anything about.
No, I don’t believe so. I don’t know that RabbitMQ supports such a thing. If it does, it may require updates to the processors, but I don’t know enough about RabbitMQ to say for sure
What is the benefit of stateless execution with the full NiFi install? I see value using the stateless-assembly keeping tings lightweight but I don't understand this application.
There are a few differences but the main difference is changing what the boundary of the transaction is. With standard NiFi, the transaction is the processor. If there’s any kind of failure you deal with it for that processor. With stateless, a failure means that all of the processing in that group is rolled back and the input gets routed to failure. For a case like consuming from Kafka, the message that is received doesn’t even get acknowledged in stateless until successfully processing through the entire group. So if there’s a failure the message won’t be acknowledged and can be consumed again.
What will happen to the consumer while processing of message takes place for a long time? Since the offset hasn't changed, won't the same data be poll'ed and processed multiple times in parrallel?
In Stateless, the entire pipeline is single threaded so the consumer will not run while other day is processing. If the processing takes too long, though, the transaction will fail. In that case, the publisher will fail, and the entire transaction will be rolled back. Then the data would be pulled again.
We have something built very similar to this but after the ExecuteStateless processor checks the controller services it says the services are INVALID. they services work without executestateless. What do I need to check? This one has me so confused.
@@nifinotes5127 it states: keystore properties is invalid because must set either 0 or 3 properties for keystore. truststore properties is invalid because invalid truststore password or type specified for file {nifi.truststore location} though these services are running and enabled without ExecuteStateless.
@@PSNOYEA are you setting your keystore and truststore passwords as properties in your executestateless processor? If you're using parameters the property:value pair needs to match your parameter so it's picked up properly. Possibly even if you're hard coding the sensitive value in the controller service/processor, you need to change it to a parameter and then define it as a property in the execute stateless processor.
The batch output ensures that all of the data is transferred out as a single batch of data. This ensures that the following processor (PublishKafkaRecord) has access to all of the data when it runs. Otherwise, it may receive only some of the data, and push just that partial data in the transaction. By doing it as a batch it will send all data in the same transaction.
I can't understand one thing. If I read data from one kafka's cluster and write them to another kafka's cluster, will the consumer offset be committed?
Thanks.. This feature is only works for Kafka ? Is It possible that will be could to work for ConsumeJMS, ConsumeMQTT ? I'm not sure why you use "Batch outbound".
Yes, this makes use of Apache Kafka's Exactly Once Semantics. It's been many years since I've looked into how JMS transactions work, if such a thing would be possible. I've never used MQTT personally enough to know if MQTT supports such a mechanism. But in either case, such a capability would have to be built into the processors. The Batch Outbound Policy is required in order to ensure that all FlowFiles are transferred out in a single batch. Otherwise, it could transfer out only a portion of the FlowFiles, say 50%, for example. And then PublishKafkaRecord could send those 50% as a single transaction without sending all of the outbound data. This would cause problems because the transaction would be completed without sending all data. It's necessary that the publisher send the entire batch in a single transaction for Kafka's Exactly Once semantics to work.
The Stateless Engine is a mechanism for running a nifi flow in a 'stateless' way. In this paradigm, the content is not persisted across restarts, the flow executes sequentially, and there are some other differences. Here's a link to the README to learn more: github.com/apache/nifi/blob/main/nifi-stateless/nifi-stateless-assembly/README.md
i want to push data from nfs to hdfs. i dont want loss data if a node goes down. can i using ExecuteStateless such as: nfs -(getname)>kafka -> fetch hdfs -> push hdfs ?. if not, any idea guarantee Warranty to avoid data loss
You can. Or, as an alternative: If you're pulling data from an NFS mount and pushing to HDFS with NiFi, you don't need to involve Kafka at all, or stateless. I'd recommend ListFile -> FetchFile -> PutHDFS. If the data volumes are high enough that you need a cluster to process it, I'd configure the connection between ListFile and FetchFile to use Load-Balanced Connections (ua-cam.com/video/by9P0Zi8Dk8/v-deo.html). This gives you a very efficient way to perform a listing of the files on the NFS mounted volume, and then distribute that listing across the cluster. Then each node is responsible for its share of files, to fetch the file from NFS and push to HDFS. Data is persisted across restarts. If you want to be super sure that you're in good shape, even in the case of an Operating System failure/crash you can disable disk caching. And if you want redundancy in case of disk failure, use RAID.
Yes this should be available in the upcoming release. Expect it soon. We are currently in the process of voting on release candidates. So likely next week. Possibly a bit sooner or a bit later.
This is really another level feature ..Kudos to NiFi Team.
Thank you for the video! But it's still not clear to me at what stage/processor the offset commit occurs.
The commit happens in the PublishKafkaRecord processor, as the results are pushed to the destination topic.
@@nifinotes5127 That is, even if I push the data to another topic, the offset will be commited in the topic from which it was originally read? As non obvious side effect?
Correct.
Is it possible to implement similar scenario with manual commit with RabbitMQ? Does NiFi support this? I read the documentation for bundled AMQP processors, but did not find anything about.
No, I don’t believe so. I don’t know that RabbitMQ supports such a thing. If it does, it may require updates to the processors, but I don’t know enough about RabbitMQ to say for sure
What is the benefit of stateless execution with the full NiFi install? I see value using the stateless-assembly keeping tings lightweight but I don't understand this application.
There are a few differences but the main difference is changing what the boundary of the transaction is. With standard NiFi, the transaction is the processor. If there’s any kind of failure you deal with it for that processor. With stateless, a failure means that all of the processing in that group is rolled back and the input gets routed to failure. For a case like consuming from Kafka, the message that is received doesn’t even get acknowledged in stateless until successfully processing through the entire group. So if there’s a failure the message won’t be acknowledged and can be consumed again.
What will happen to the consumer while processing of message takes place for a long time? Since the offset hasn't changed, won't the same data be poll'ed and processed multiple times in parrallel?
In Stateless, the entire pipeline is single threaded so the consumer will not run while other day is processing. If the processing takes too long, though, the transaction will fail. In that case, the publisher will fail, and the entire transaction will be rolled back. Then the data would be pulled again.
We have something built very similar to this but after the ExecuteStateless processor checks the controller services it says the services are INVALID. they services work without executestateless. What do I need to check? This one has me so confused.
You should be able to look in the logs and see why it’s invalid.
@@nifinotes5127 it states: keystore properties is invalid because must set either 0 or 3 properties for keystore. truststore properties is invalid because invalid truststore password or type specified for file {nifi.truststore location}
though these services are running and enabled without ExecuteStateless.
@@PSNOYEA are you setting your keystore and truststore passwords as properties in your executestateless processor? If you're using parameters the property:value pair needs to match your parameter so it's picked up properly. Possibly even if you're hard coding the sensitive value in the controller service/processor, you need to change it to a parameter and then define it as a property in the execute stateless processor.
What is the point of the batch output step? Is that only necessary if you have some splitting happening in your processing?
The batch output ensures that all of the data is transferred out as a single batch of data. This ensures that the following processor (PublishKafkaRecord) has access to all of the data when it runs. Otherwise, it may receive only some of the data, and push just that partial data in the transaction. By doing it as a batch it will send all data in the same transaction.
I can't understand one thing. If I read data from one kafka's cluster and write them to another kafka's cluster, will the consumer offset be committed?
No, Kafka’s Exactly Once Semantics do not apply to cross-cluster processing.
@@nifinotes5127 Thanks so much for the fast answer. I felt it logically but didn't find it written somewhere clearly.
Thanks..
This feature is only works for Kafka ?
Is It possible that will be could to work for ConsumeJMS, ConsumeMQTT ?
I'm not sure why you use "Batch outbound".
Yes, this makes use of Apache Kafka's Exactly Once Semantics. It's been many years since I've looked into how JMS transactions work, if such a thing would be possible. I've never used MQTT personally enough to know if MQTT supports such a mechanism. But in either case, such a capability would have to be built into the processors.
The Batch Outbound Policy is required in order to ensure that all FlowFiles are transferred out in a single batch. Otherwise, it could transfer out only a portion of the FlowFiles, say 50%, for example. And then PublishKafkaRecord could send those 50% as a single transaction without sending all of the outbound data. This would cause problems because the transaction would be completed without sending all data. It's necessary that the publisher send the entire batch in a single transaction for Kafka's Exactly Once semantics to work.
What does "stateless engine" mean in Nifi??
The Stateless Engine is a mechanism for running a nifi flow in a 'stateless' way. In this paradigm, the content is not persisted across restarts, the flow executes sequentially, and there are some other differences. Here's a link to the README to learn more: github.com/apache/nifi/blob/main/nifi-stateless/nifi-stateless-assembly/README.md
i want to push data from nfs to hdfs. i dont want loss data if a node goes down. can i using ExecuteStateless such as: nfs -(getname)>kafka -> fetch hdfs -> push hdfs ?. if not, any idea guarantee Warranty to avoid data loss
You can. Or, as an alternative:
If you're pulling data from an NFS mount and pushing to HDFS with NiFi, you don't need to involve Kafka at all, or stateless. I'd recommend ListFile -> FetchFile -> PutHDFS. If the data volumes are high enough that you need a cluster to process it, I'd configure the connection between ListFile and FetchFile to use Load-Balanced Connections (ua-cam.com/video/by9P0Zi8Dk8/v-deo.html). This gives you a very efficient way to perform a listing of the files on the NFS mounted volume, and then distribute that listing across the cluster. Then each node is responsible for its share of files, to fetch the file from NFS and push to HDFS. Data is persisted across restarts. If you want to be super sure that you're in good shape, even in the case of an Operating System failure/crash you can disable disk caching. And if you want redundancy in case of disk failure, use RAID.
Oke, thanks sir !
can send a link to the xml in git example you showed ?
@7:50 did i hear 1.15 ? when is it planned for release ?
Yes this should be available in the upcoming release. Expect it soon. We are currently in the process of voting on release candidates. So likely next week. Possibly a bit sooner or a bit later.
It is already released
Yes, was released today - Nov 8.
Awesome