Data Segregation using Akka Streams

I am trying to segregate the JSON data using Akka Stream. I have taken out the key tags for the segregation from the JSON data and made a case class

Eg:- Bucket1(id:String, category:String)

   Bucket1(1, type1)
  Bucket(2, type2)
  Bucket(1, type3)

Now I want to segregate on the basis of id.
I want an output like Person with id 1 has type1 and type3. If a person has same id then I want to display all its types.

How can I implement such logic?

You can use the groupBy operator to split the stream by ID and then reduce to combine the categories.

Can you give an example of similar scenario using groupBy and reduce?

Also, is it suitable for real time data streaming?

The problem with real-time data streaming is: how do you know when a group is complete?

Source-----MyClass(122, Set(“apple”,“banana”))-------------122->apple,banana,peach
MyClass(123,Set(“papaya”)) 123->papaya
MyClass(122,Set(“apple”,“peach”)

So this is the achitecture . I will be getting data from a source. After that I will put every data to a case class I created. Now in the flow, it should aggregate all the information of one userid in one place.
In the above example, there are three data. And the output should come like visitor with id 122 has apple,banana and peach and visitor with id 123 has papaya.
So these kinds of aggregation should be done in the flow and sent to the sink. And also the source is infinite.So every second we will be getting data. This is done in order to track the user activities.

So for this using groupBy is good?
Or is there a different logic or approach we can solve this case efficiently?

If the source is infinite, then what should trigger sending the aggregated element downstream? Could you add some more details about the expected behavior?