Backpressure aware WebSocket actor

We use actor based Websocket handling and do experience issues with high rate of send messages. similar to WebSockets may drop outgoing messages · Issue #6246 · playframework/playframework · GitHub. Play ActorFlow play.libs.streams.ActorFlow does not support backpressure and it depends on OverflowStrategy how messages are handled. I have implemented a backpressure aware ActorFlow that acknowledges back to user Websocket actor, this flow uses Source.actorRefWithAck (Play2.7/Akka2.5).

In our use case an we have a external source, we use acknowledgements coupled with ask pattern with external source to manage backpressure.

Would like any feedback on the approach we have taken, any alternative options, performance considerations etc.

Long term plan is to move to streams.

public class BackpressureActorFlow {

	private final Logger logger = LoggerFactory.getLogger(BackpressureActorFlow.class);

	public static class WebSocketMessage {

		public long messageCount = 0;
		public String message = "";

		public WebSocketMessage(long messageCount, String message) {
			super();
			this.messageCount = messageCount;
			this.message = message;
		}

	}

	public static class WSAck {
	};

	public static Flow<String, String, NotUsed> actorRef(Function<ActorRef, Props> props, int bufferSize,
			ActorRefFactory factory, Materializer mat) {

		Source<WebSocketMessage, ActorRef> outerActor = Source.actorRefWithAck(new WSAck());
		Pair<ActorRef, Publisher<WebSocketMessage>> outerActoPublisherPair = outerActor
				.toMat(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), Keep.both()).run(mat);

		Sink<String, NotUsed> webSocketSink = Sink.actorRef(
				factory.actorOf(Props.create(SinkActor.class, props, outerActoPublisherPair.first())),
				new Success(CompletionStrategy.draining()));

		Source<String, NotUsed> webSocketSource = Source.fromPublisher(outerActoPublisherPair.second())
				.map(wsMessage -> {
					// logger.info("webSocketSource out {} {} ", wsMessage.messageCount);
					return wsMessage.message;
				}).addAttributes(Attributes.inputBuffer(1, bufferSize));

		Flow<String, String, NotUsed> flow = Flow.fromSinkAndSource(webSocketSink, webSocketSource);
		return flow;
	}

	static class SinkActor extends AbstractActor {
		private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);

		public static Props props(final Function<ActorRef, Props> props, final ActorRef outerActor) {
			return Props.create(SinkActor.class, props, outerActor);
		}

		private final ActorRef flowActor;

		public SinkActor(Function<ActorRef, Props> props, ActorRef outerActor) {
			this.flowActor = getContext().watch(getContext().actorOf(props.apply(outerActor), "flowActor"));
		}

		@Override
		public Receive createReceive() {
			return receiveBuilder().match(Status.Success.class, x -> {
				flowActor.tell(PoisonPill.getInstance(), self());
			}).match(Status.Failure.class, x -> {
				flowActor.tell(PoisonPill.getInstance(), self());
			}).match(Terminated.class, x -> {
				getContext().stop(self());
			}).matchAny(o -> flowActor.tell(o, self())).build();
		}

		//TODO - supervisorStrategy() to return SupervisorStrategy.stop()
	}

}