I expected the Gatherers to include all the methods from the java-util-stream-Stream as well (perhaps in a different class like `DefaultGatherers` or `CoreGatherers`). This way, I could also use these methods to create connecting Gatherers with the `.andThen` function.
Will Collectors do something about short#-circuiting as well, or will we have to short circuit in gatherers and then reduce? I.e. implementing allMatch.
I personally don't think that's possible to do as the evaluation strategy for Collector is already established and no current evaluation implementation checks for short-circuting. As such, I suspect that it would be hard to implement it in a backwards-compatible way.
5:10 Ngl, i am a little bit conused.. reorder() exists as sort(), duplicateConsecutive() exists with distinct(), groupdUsing() exists through collect() and of course mapConcurrent() exists trhough map(). Yes.. you have to do some extra stuff instead of just putting a your lambda operation as argument but... it exists. All of that would be just another method name or syntax.
My thinking is that reorder() can emit elements as they are determined to be next in order (using sequence numbers or otherwise), whereas sort can only start to emit once the entire upstream has been consumed-this will never terminate for unbounded streams. deduplicateConsecutive() is definitely not the same as distinct() as you only want to eliminate adjacent duplicates, and collect() is a terminal operation and not an intermediate one. And, unfortunately, mapConcurrent() cannot be implemented using map() since you need to flush during finish, and the contract of map() is also that the supplied function is stateless and non-interfering. I hope that helps :)
Love this as I've run into many cases where I've wanted to extend the streams API. I even ended up building something allowing arbitrary operators based on mapMulti since it was the first official operator to provide access to the downstream sink but it had many shortcomings and didnt parallelize, etc. I kept expecting him to point to the library that has this but then disappointed this is not available yet 😢. Looking forward to this. Some feedback though... gather and gatherer don't seem like the right names, why not operate and operator? Also, where does this fit with reactive streams API? Java streams are great but I've found gaps like it being pull-based (and not push-pull with backpressure like reactive streams) and doesn't inherently support asynchronous operations that have made me use reactive streams APIs instead. It would be good to incorporate these concepts into Java streams. That with virtual threads would truly kill reactive streams.
Java Streams are push-based (in general) over a Spliterator. As for naming it is always a challenge-as you can imagine, a lot of different names have been suggested (would be a long conversation to fully elaborate all the concerns to address). What gather+gatherer had going for it is its relationship with collect+collector :)
@@viktor_klang Didn’t expect you to notice my comment! Since you are here, for the mapMulti() example at 26:07, shouldn’t the lambda return !downstream.isRejecting()? Also, I’m curious about isRejecting(), wouldn’t most usecases negate the returned boolean? Might be better to have isAccepting() instead - it would be more consistent with push().
@@DidierLoiseauSorry, completely missed your reply. Having mapMulti return !downstream.isRejecting() wouldn’t hurt, but on the other hand it doesn’t really fix the problem that the user-supplied BiConsumer cannot know when it must stop pushing. As for isRejecting instead of isAccepting is that we do not know if it is accepting, we only know if it is rejecting. isRejecting() is primarily intended to be used when the upstream wants to avoid performing expensive work which cannot be passed downstream anyway. For instance in the finisher.
Рік тому
Looks like most of this featurers Reactor already supports
I would anticipate having both `Gatherer::endThen` and an alias `Gatherer::gather = Gatherer::endThen`. Why, you ask? This would enable me to effortlessly extract a group of Gatherers from an existing Stream pipeline. Otherwise, I would have to go through the process of renaming them step by step, changing from `.gather()` to `.endThen`, while retaining the same functionality.
I decided against doing so because I found that it made code reviews more difficult to understand (am I looking at Stream composition or Gatherer composition here?). Reusing the name for composition used by things like Function made more sense to me.
Love Viktor's presentation style and content. Learnt loads from this presentation. It's great news to see so much progress in the JDK!
I really appreciate that, @dazraf-thank you!
I am only 12 minutes into this presentation and I've learned a lot. Great work Viktor
Thank you so much!
Love the inclusion of an andThen api!
Amazing work! Showed me operations that I didn't even think I needed but I do now.
That's awesome to hear!
Amazing work. Looks a lot inspired by Reactive frameworks. But, this is much simpler and sounds like this would be a great addition to Java.
Amazing presentation.
Thank you!
I hope this will come as an incubator api soon.
I expected the Gatherers to include all the methods from the java-util-stream-Stream as well (perhaps in a different class like `DefaultGatherers` or `CoreGatherers`). This way, I could also use these methods to create connecting Gatherers with the `.andThen` function.
Nice and detail explain
Thank you, @hepin1989!
Great presentation!
Thank you! :)
I see this gather is somewhat similar to spark functions data frame api's expr functions. Happy that, we are taking good things...
I hope this will make its way beyond previews, to catch up to, and to some extent, improve on Kotlin's more powerful collection API 🤞
Will Collectors do something about short#-circuiting as well, or will we have to short circuit in gatherers and then reduce? I.e. implementing allMatch.
I personally don't think that's possible to do as the evaluation strategy for Collector is already established and no current evaluation implementation checks for short-circuting. As such, I suspect that it would be hard to implement it in a backwards-compatible way.
Cool stuff, is there any way to play with it, other that implementing it myself?
When will we have these things in Java to play with?
If Viktor Klang talks about this, I'd like to see the comparison with Scala.
Follow the progress of JEP-461: Stream Gatherers for more info
5:10 Ngl, i am a little bit conused.. reorder() exists as sort(), duplicateConsecutive() exists with distinct(), groupdUsing() exists through collect() and of course mapConcurrent() exists trhough map(). Yes.. you have to do some extra stuff instead of just putting a your lambda operation as argument but... it exists. All of that would be just another method name or syntax.
My thinking is that reorder() can emit elements as they are determined to be next in order (using sequence numbers or otherwise), whereas sort can only start to emit once the entire upstream has been consumed-this will never terminate for unbounded streams. deduplicateConsecutive() is definitely not the same as distinct() as you only want to eliminate adjacent duplicates, and collect() is a terminal operation and not an intermediate one. And, unfortunately, mapConcurrent() cannot be implemented using map() since you need to flush during finish, and the contract of map() is also that the supplied function is stateless and non-interfering.
I hope that helps :)
Love this as I've run into many cases where I've wanted to extend the streams API. I even ended up building something allowing arbitrary operators based on mapMulti since it was the first official operator to provide access to the downstream sink but it had many shortcomings and didnt parallelize, etc. I kept expecting him to point to the library that has this but then disappointed this is not available yet 😢. Looking forward to this. Some feedback though... gather and gatherer don't seem like the right names, why not operate and operator? Also, where does this fit with reactive streams API? Java streams are great but I've found gaps like it being pull-based (and not push-pull with backpressure like reactive streams) and doesn't inherently support asynchronous operations that have made me use reactive streams APIs instead. It would be good to incorporate these concepts into Java streams. That with virtual threads would truly kill reactive streams.
Java Streams are push-based (in general) over a Spliterator. As for naming it is always a challenge-as you can imagine, a lot of different names have been suggested (would be a long conversation to fully elaborate all the concerns to address). What gather+gatherer had going for it is its relationship with collect+collector :)
I was kind of expecting at the end: “… and this is why we have decided to deprecate all intermediate operations in the Stream API in Java 23” 😅
😂
@@viktor_klang Didn’t expect you to notice my comment! Since you are here, for the mapMulti() example at 26:07, shouldn’t the lambda return !downstream.isRejecting()?
Also, I’m curious about isRejecting(), wouldn’t most usecases negate the returned boolean? Might be better to have isAccepting() instead - it would be more consistent with push().
@@DidierLoiseauSorry, completely missed your reply.
Having mapMulti return !downstream.isRejecting() wouldn’t hurt, but on the other hand it doesn’t really fix the problem that the user-supplied BiConsumer cannot know when it must stop pushing.
As for isRejecting instead of isAccepting is that we do not know if it is accepting, we only know if it is rejecting.
isRejecting() is primarily intended to be used when the upstream wants to avoid performing expensive work which cannot be passed downstream anyway. For instance in the finisher.
Looks like most of this featurers Reactor already supports
So... Gatherer is in Java 21 yet?
No, it is not in Java 21
Maybe preview in 22
@@zaccicala2341 indeed it is!
I would anticipate having both `Gatherer::endThen` and an alias `Gatherer::gather = Gatherer::endThen`.
Why, you ask? This would enable me to effortlessly extract a group of Gatherers from an existing Stream pipeline. Otherwise, I would have to go through the process of renaming them step by step, changing from `.gather()` to `.endThen`, while retaining the same functionality.
I decided against doing so because I found that it made code reviews more difficult to understand (am I looking at Stream composition or Gatherer composition here?).
Reusing the name for composition used by things like Function made more sense to me.