Offering elements to a Source in Akka typed

I’m refering to this documentation - Source

Here under, the definition for Source, it says

“You can push elements to the queue and they will be emitted to the stream if there is demand from downstream”

I have created a source like this -
Source<Object, SourceQueueWithComplete<Object>> sourceQueue = Source.queue(4, OverflowStrategy.backpressure(), 4)

However, I am unsure of how to push elements to this queue, I don’t see an offer method for Source<Object, SourceQueueWithComplete<Object>> but there is an offer method for SourceQueueWithComplete, how can I access this SourceQueueWithComplete<Object> ?

To access the SourceQueueWithComplete you need to use Keep
Sth like this:

final SourceQueueWithComplete<Object> sourceQueue= Source.queue(4, OverflowStrategy.backpressure(), 4)
       .toMat(Sink.foreach(System.out::println), Keep.left())
       .run(materializer);
sourceQueue.offer("Hi");

The concept of running streams using materialized values and thus Keep is also explained in this video and in this stackoverflow article

So what is this object used for? Source<Object, SourceQueueWithComplete<Object>> sourceQueue
I’m trying to understand what is the difference between Source<Object, SourceQueueWithComplete<Object>> and SourceQueueWithComplete<Object>

A Source<Object, M> will provide Objects to a downstream subscriber of that source (i.e. Object is the type of things the Source is emitting to the downstream).

A SourceQueueWithComplete<Object> is a queue to which you can offer elements, with those elements being emitted (assuming demand etc.) by an associated Source.

The first Object in Source<Object, SourceQueueWithComplete<Object>> is the element type of the Source, while SourceQueueWithComplete<Object> is the materialized value which you can think of as the channel through which the Source may interact with the world outside the stream, and which doesn’t exist until the stream is actually run.

2 Likes