- 12
- 58 889
NiFi Notes
United States
Приєднався 5 бер 2020
Auto Generate NiFi Flows from Natural Language
Mark introduces and demos Datavolo's all-new AI-powered Flow Generation capability for Apache NiFi! Use natural language to describe what the dataflow should do and let Datavolo's AI handle the rest.
Переглядів: 1 523
Відео
NiFi Introduces Python API
Переглядів 7 тис.Рік тому
Mark Payne introduces the new Python-based API that is planned for NiFi 2.0 for building processors directly in Python, and making use of all of the Python libraries out there
Apache NiFi: Make Time for What Matters Most
Переглядів 9862 роки тому
Apache NiFi: Make Time for What Matters Most
Whats New in NiFi 1.16
Переглядів 2,5 тис.2 роки тому
Discusses the top new features in version 1.16 of Apache NiFi
Whats New in NiFi 1.15
Переглядів 8852 роки тому
Provides an overview of the most impactful new features added in version 1.15 of Apache NiFi
Kafka Exactly Once with NiFi
Переглядів 9 тис.3 роки тому
Provides an overview of the new features and capabilities in version 1.15 of Apache NiFi that allow NiFi to take advantage of Apache Kafka's Exactly Once semantics
What's New in NiFi 1.14
Переглядів 6 тис.3 роки тому
An overview of the most exciting new features in Apache NiFi 1.14
Apache NiFi Anti Patterns, Part 5
Переглядів 2,7 тис.3 роки тому
Part 5 of a series on Apache NiFi Anti-Patterns
Apache NiFi Anti-Patterns Part 4 - Scheduling
Переглядів 6 тис.4 роки тому
Mark discusses anti-patterns that are commonly seen related to thread pools and scheduling Processors to run and how to properly configure these settings.
Apache NiFi Anti-Patterns Part 3 - Load Balancing
Переглядів 5 тис.4 роки тому
Apache NiFi Anti-Patterns Part 3 - Load Balancing. Blog post referenced in video is available at blog.cloudera.com/benchmarking-nifi-performance-and-scalability/
Apache NiFi Anti-Patterns Part 2: Flow Layout
Переглядів 4,9 тис.4 роки тому
Apache NiFi Anti-Patterns Part 2: Flow Layout
Apache NiFi Anti-Patterns, Part 1
Переглядів 13 тис.4 роки тому
The first part in a series that explores common anti-patterns in Apache NiFi. Part 1 examines a flow that performs splitting and re-joining of data; treating structured/semi-structured data as unstructured text; and blurring the line between FlowFile content and attributes.
I have downloaded all these and run the apache nifi, but i am not seeing the python processors in the drag and drop apache nifi canvas? Is is just to put the python libraries in the extension folder or i have to do something else to see these python processors in it?
Thank you for your video, can you please tell how can i add the puthdfs processor in apache nifi, as in apache nifi 2.0.0 its not present. Can you tell me how can i add this processor from NAR to Apache Nifi 2.0.0
what is the anti pattern? increasing the thread pool so abruptly that it overwhelms the system?
was Event-driven thread pool finally deprecated?
thanks for this!! btw do you have the template?
so the anti pattern is compressing the daya?
The anti-pattern is using load balanced connections to distribute the load across the cluster when the data is already well distributed. This uses a huge amount of resources but provides no benefit.
What is the benefit of stateless execution with the full NiFi install? I see value using the stateless-assembly keeping tings lightweight but I don't understand this application.
There are a few differences but the main difference is changing what the boundary of the transaction is. With standard NiFi, the transaction is the processor. If there’s any kind of failure you deal with it for that processor. With stateless, a failure means that all of the processing in that group is rolled back and the input gets routed to failure. For a case like consuming from Kafka, the message that is received doesn’t even get acknowledged in stateless until successfully processing through the entire group. So if there’s a failure the message won’t be acknowledged and can be consumed again.
Hello Mark, the video helped me to create a Python processor, thanks a lot! Unfortunately it does not work, it says "Processor is invalid because initializing runtime environment". This was already fixed in your Jira Ticket NIFI-12740, but for some reason I still got this message. I run version 2.0.0-M3. Thank you for any help! I updated to -M4, same problem.
You’ll need to check the nifi-app.log to understand what the error is.
@@nifinotes5127 Thank you, can this be the reason? 2024-08-28 13:15:25,230 ERROR [main] org.apache.nifi.web.server.JettyServer Failed to start web server... shutting down. java.lang.NullPointerException: Cannot invoke "jakarta.servlet.ServletContext.setAttribute(String, Object)" because "webApiServletContext" is null
when to decide that we need to use round robin after pulling data from kafka ?
If you want to distribute the data evenly across the NiFi cluster and you have more NiFi nodes than Kafka partitions you’d want to load balance. If you had say 8 partitions and 10 NiFi nodes, though, it probably wouldn’t be worth the cost of load balancing, you’d be better off just distributing across the 8 nodes that are pulling from Kafka.
Hi, can you do video stateless apache
Are you still live?
Hey @resoldab I would certainly like to be! Has just been a challenge finding the time to create videos lately with all that we have going on at Datavolo! Hopefully I can get back to it soon though. Anything in particular you’re interested in hearing about?
I hope this message finds you well. I recently watched your video and found it quite informative. However, I have been facing challenges in creating a custom processor in Python that supports a UDPMulticastListener, as the built-in one does not accommodate multicast due to the use of a diode. Despite following the guidance provided in your video and the accompanying documentation, I have been unable to achieve a functional solution over the past two weeks. Specifically, when I attempt to integrate the processors into Python extensions, they do not appear in the processor list. I would greatly appreciate any assistance or insights you could provide to help resolve this issue.
I can't understand one thing. If I read data from one kafka's cluster and write them to another kafka's cluster, will the consumer offset be committed?
No, Kafka’s Exactly Once Semantics do not apply to cross-cluster processing.
@@nifinotes5127 Thanks so much for the fast answer. I felt it logically but didn't find it written somewhere clearly.
Great. Does this support JOLT transformation as well?
It does, but even GPT-4 struggles to produce quality Jolt Transforms. Some specific training examples may help. But I suspect the better solution will be to focus on training it to write Python (or possibly groovy) with script based processors.
It does, but even GPT-4 struggles to produce quality Jolt Transforms. Some specific training examples may help. But I suspect the better solution will be to focus on training it to write Python (or possibly groovy) with script based processors.
how to add additional python libraries if we depend on them?
You have a couple of options, detailed in the Python Developers Guide: nifi.apache.org/documentation/nifi-2.0.0-M2/html/python-developer-guide.html#dependencies
Wow !!
Master piece 🎇 Kudos!
Wonderfully done & interestingly superb...
This is bad ass. Great work y’all!
If you want to try out the capability for yourself, you can join the Datavolo Community Slack: join.slack.com/t/datavolocommunity/shared_invite/zt-2clo9iv4h-MFeyT8_HPKkJQM8PskkPSA
This is pretty wild and I'll be honest in saying I never imagined this being feasible. Great work Mark!
Me either! :) it’s been pretty amazing working with these new Gen AI technologies!
This is amazing!
Hi Mark! Outstanding video, congrats! I have a question: I ran into this anti pattern myself, and I'm trying to improve performances. I have a 2.2M records flowfile coming out of an ExecuteSQL processor. Each of this record must be enriched with a REST call to an external service using a record's column as a parameter. Is there a way to optimize performances rather than splitting all the records and picking up that single column from each record to perform that REST call?
Thanks Roberto. There are options, assuming the web service allows batch queries. Generally the pattern would be to use SplitRecord to break up that 2.2MM records into something like 100 records per flowfile - or whatever size you want to be a batch to your web service. Then you’d use ForkEnrichment, transform the data on the ‘enrich’ route, use InvokeHTTP, etc to get the enrichment data, and then use JoinEnrichment to join the enrichment data back with the original data. There are examples in the ForkEnrichment docs.
@@nifinotes5127 thank you so much! Such a prompt and comprehensive response!
@@nifinotes5127 thanks again Mark! What's the reason behind splitting a big flowfile into a 100 records chunk? And another question: would it be possible to enrich my multi rows flowfile with a complex SQL query against an Oracle database? I have to pull a column out of the enrichment db based on a rather complex query, and I would like to do this with a batch approach
@onesoulgospelchoir6742 the splitting to 100 records is to provide a reasonable size for the bulk request to the web service. You probably don’t want a single web request with 2.2MM records. Even creating that request would take a lot of memory. You could do enrichment using a database with bulk query but it’s a bit complicated. You’d need to form your own query, likely using something like SELECT * FROM TABLE WHERE COLUMN IN (…). If more help is needed id recommend the slack channel or NiFi mailing lists. UA-cam comments are not very conducive to long explanations 🙂
@MarkPayne any improvements on this in new Nifi 2.0 M1 release ?
There have certainly been some tweaks and improvements. Some refactoring so that it's faster and definitely improvements to the lifecycle of creating processors that that dependencies are loaded in the background instead of on creation, etc. But this is the first release with the Python stuff available. So now that we're putting it out there into full circulation, I expect to see people contributing and a lot of growth to come in the very near future.
@@nifinotes5127 But in this version 2.0 M1, is this feature not available? I tried to replicate it, but it doesn't work.
Which feature are you referring to? Python based processors? They are available in this version.
Hello Sir i am using InvokeHttp processor in nifi for api calling is it possible to collect or generate logs of success and Failure api calls ?
Have you had to install Maven to create that custom processor in Python?
No, you shouldn’t need Maven.
Is there a NiFi 2.0 Snapshot Docker Image (Ubuntu, CentOS, etc) that comes with Java JDK 17, Maven 3.9.2 and Python 3.9 setup to test these new Python Processors and Py4J API features? I am running into an issue with Python and Java communication issues.
Sorry @jamesmedel6566 looks like I dropped the ball on responding to you. The first milestone release (2.0.0-M1) was just released, including the Python bits. There is a docker image for it on Dockerhub, as well.
Is there a NiFi Docker image that comes with JDK 17.0.6, maven 3.9.2 and Python 3.9 setup for testing NiFi’s new Py4J and Example Python Processors? I was trying to run NiFi 2.0 snapshot with Python 3.9 after building NiFi from source, but running into a communication NiFi Python Controller issue after enabling python3 in NiFi properties config. Any help would be greatly appreciated.
Hey @juniorskates94, not that I know of. That probably would have been a good idea to create a docker image, but I didn’t think about it. And unfortunately probably won’t have the cycles to do so for a bit. In the meantime, I recommend reaching out to dev@nifi.apache.org so that you can provide more details about the error and we can have a better conversation there than we can here.
Is it possible to write a script for cron scheduling of nifi processors ? Please help 🙏
I’m not sure that I understand the question. Processors do support CRON Scheduling natively.
How to schedule a cron job to run Nifi Process groups can anyone help me Please Help me 🙏
I’d recommend shooting a note to users@nifi.Apache.org or using the Slack channel. Makes it much easier to converse than UA-cam comments :)
@@nifinotes5127 please share slack channel link
nifi.apache.org/mailing_lists.html you can go to this page and at the bottom is the Invite Link for slack.
@@nifinotes5127 Thanku soo much for the help
What is the point of the batch output step? Is that only necessary if you have some splitting happening in your processing?
The batch output ensures that all of the data is transferred out as a single batch of data. This ensures that the following processor (PublishKafkaRecord) has access to all of the data when it runs. Otherwise, it may receive only some of the data, and push just that partial data in the transaction. By doing it as a batch it will send all data in the same transaction.
What does "stateless engine" mean in Nifi??
The Stateless Engine is a mechanism for running a nifi flow in a 'stateless' way. In this paradigm, the content is not persisted across restarts, the flow executes sequentially, and there are some other differences. Here's a link to the README to learn more: github.com/apache/nifi/blob/main/nifi-stateless/nifi-stateless-assembly/README.md
can send a link to the xml in git example you showed ?
Do you have any examples for how we can use the nifi api to automate uploading a template and running it? Have not been able to find an example online. Or can you explain what that process / steps involved would look like? Thank You!!
I do not have any examples. Generally, the best approach to understanding the REST API is to just perform the task in your browser with the Developer View open. This will show you exactly what is posted to the server and the response. There is also a client library in Java and there’s nipyapi for python (tho this isn’t officially a part of Apache nifi, it’s just something developed by a community member).
We have something built very similar to this but after the ExecuteStateless processor checks the controller services it says the services are INVALID. they services work without executestateless. What do I need to check? This one has me so confused.
You should be able to look in the logs and see why it’s invalid.
@@nifinotes5127 it states: keystore properties is invalid because must set either 0 or 3 properties for keystore. truststore properties is invalid because invalid truststore password or type specified for file {nifi.truststore location} though these services are running and enabled without ExecuteStateless.
@@PSNOYEA are you setting your keystore and truststore passwords as properties in your executestateless processor? If you're using parameters the property:value pair needs to match your parameter so it's picked up properly. Possibly even if you're hard coding the sensitive value in the controller service/processor, you need to change it to a parameter and then define it as a property in the execute stateless processor.
Is it possible to implement similar scenario with manual commit with RabbitMQ? Does NiFi support this? I read the documentation for bundled AMQP processors, but did not find anything about.
No, I don’t believe so. I don’t know that RabbitMQ supports such a thing. If it does, it may require updates to the processors, but I don’t know enough about RabbitMQ to say for sure
What will happen to the consumer while processing of message takes place for a long time? Since the offset hasn't changed, won't the same data be poll'ed and processed multiple times in parrallel?
In Stateless, the entire pipeline is single threaded so the consumer will not run while other day is processing. If the processing takes too long, though, the transaction will fail. In that case, the publisher will fail, and the entire transaction will be rolled back. Then the data would be pulled again.
Thank you for the video! But it's still not clear to me at what stage/processor the offset commit occurs.
The commit happens in the PublishKafkaRecord processor, as the results are pushed to the destination topic.
@@nifinotes5127 That is, even if I push the data to another topic, the offset will be commited in the topic from which it was originally read? As non obvious side effect?
Correct.
This opens up so many opportunities, for example using python LangChain libraries inside Apache NiFi and eventually using the later in the development with AI LLM operations. Similar to Flowise and LangFlow but in NiFi :)!
That is amazing! I have some knowledge of Python so now it going to be easier to write my own processor.
Very interesting indeed. Question regarding current ExecuteScript and ScriptedTransformRecord (which are built on Java), as they currently support Jython (doesn't support python native libs). Will there be new native python option support in these processors? or user can only use native python libs when new creating custom python processors?
No, there will be no change to script components. It’s not possible to run native python within the jvm
Great, I am interested to build some custom processors based on python.. could you please help to provide step-by-step guide to do this with python... thanks in advance.
This is very powerful for a mature system like NiFi to embrace the Python ecosystem 🎉
Really nice, congrats for all the team !!!
That’s great!!! 👏👏👏
That's a game changer 🎉
When will it be available to the community?
I would guess later this year. But being an Apache project there is no definitive timeline.
This is extremely cool!! I'm interested to see if these can be used on high traffic/low latency requiring flows, or should mostly be used on the more relaxed flows that can take a little longer to process. Either way, this allows for crazy expansion of functionality. Can't wait!
I suppose it depends on what your definition of "high traffic/low latency" is :) Effectively, it amounts to a Remote Procedure Call (RPC) for each FlowFIle, over a local socket. So a native Java processor will naturally be more efficient. But for well designed flows, the performance has generally been quite good. There are definitely areas that we intend to further improve performance, though.
I am mostly worried about what the performance (memory/allocations) ramifications are going to be with all of that Python? I have been either making compiled Java processors, or at a minimum, groovy scripted processors, and even those I worry about.
Agreed, you very much should be worried about those things, for any software that you write. :) You will naturally see better performance with native Java processors, as there's no need to communicate to a separate process. But of course, well-written Java will outperform poorly-written Python, and well-written Python will outperform poorly-written Java. So it will be a matter determining what the most critical concerns are in each case.
Great job as usual Mark!