I’m creating a EventSourceActor for each event source connection, and this actor create a eventsource flow. When the client call eventsource.close(), I want the actor to be notified and close itself.
I’ve tried to use Flow.watchTermination, but the flow is not terminated when client disconnect.
I don’t want to use Publisher, because I need a named actorRef, instead of a actor created with a publisher.
public class EventSourceActor extends AbstractActor {
private final Materializer mat;
private final SourceQueueWithComplete<JsonNode> sourceQueue;
private final Source<JsonNode, NotUsed> eventSource;
private final Flow<JsonNode, JsonNode, NotUsed> eventSourceFlow;
private JsonNode testJson;
@Inject
public EventSourceActor(Materializer mat) {
this.mat = mat;
Pair<Sink<JsonNode, NotUsed>, Source<JsonNode, NotUsed>> sinkSourcePair = MergeHub.of(JsonNode.class, 16)
.toMat(BroadcastHub.of(JsonNode.class, 256), Keep.both())
.run(mat);
Sink<JsonNode, NotUsed> hubSink = sinkSourcePair.first();
this.sourceQueue = Source.<JsonNode>queue(bufferSize, OverflowStrategy.backpressure())
.to(hubSink)
.run(mat);
this.eventSource = sinkSourcePair.second();
this.eventSourceFlow = Flow.fromSinkAndSourceCoupled(hubSink, eventSource)
.watchTermination((n, stage) -> {
stage.thenAccept(f -> context().stop(self()));
return NotUsed.getInstance();
});