Akka HTTP routes not responding when using ngrok


(Steve Peterson) #1

I’ve built a simple REST service using Akka HTTP to interface with some Java Beans. The service runs fine when talking to it directly, but stops responding when putting the service behind ngrok (https://ngrok.com/). The flow is as follows:

  1. POST to open
  2. POST to message
  3. GET to messages
  4. etc.
  5. GET to close

Step 2 does not respond when running behind ngrok.

I’ve logged everything I can think of, and the request is not reaching the route when the system fails. The connection is still being maintained, so I’m guessing this is a back pressure issue. I think I’m consuming the entities, but guessing something is different when running with ngrok that is not being accounted for.

I’m using Akka HTTP version 2.11-10.0.10, running on Windows. I’ve tried running with Java 1.8 and 1.10 with the same results.

Any help with figuring out what is happening is welcome. In addition to the logging in the route below, I’ve added logging to the SupervisionStrategy for the Flow Materializer. I’m not seeing any uncaught errors in the flow.

Here is the Route

private Route createRoute() {
	final ExecutionContextExecutor openExecutor = _system.dispatchers().lookup("open-dispatcher");
	final ExecutionContextExecutor messagesExecutor = _system.dispatchers().lookup("messages-dispatcher");
	//
	// Create the routes for the service endpoint.
	return logRequest("Incoming request", Logging.InfoLevel(), () -> route(
			//path("chat", () -> route(
			//
			// Open session.
			path("open",
					 () -> withExecutionContext(openExecutor,
							() -> withRequestTimeout(Duration.create(180, TimeUnit.SECONDS), () -> post(
									() -> entity(Jackson.unmarshaller(Messages.OpenMessage.class), openMessage -> {
										_log.info("Web Interface method \"POST open\" called.");
										final Timeout timeout = Timeout
												.durationToTimeout(Duration.create(180, TimeUnit.SECONDS));

										CompletionStage<Messages.OpenResponse> response = ask(_chatActor,
												openMessage, timeout)
														.thenApply((Messages.OpenResponse.class::cast));
										return completeOKWithFuture(response, Jackson.marshaller());
									}))))),
			//
			// Get messages - uses long polling
			path("messages",
					() -> withExecutionContext(messagesExecutor,
									() -> withRequestTimeout(Duration.create(90, TimeUnit.SECONDS),
											() -> extractSessionID(sessionID -> get(() -> {
												_log.info("Web Interface method \"GET messages\" called.");
												final Timeout timeout = Timeout
														.durationToTimeout(Duration.create(90, TimeUnit.SECONDS));

												CompletionStage<Messages.CallbackMessage> response = ask(_chatActor,
														new Messages.GetChatMessage(sessionID), timeout)
																.thenApply((Messages.CallbackMessage.class::cast));
												return completeOKWithFuture(response, Jackson.marshaller());
											}))))),
			//
			// Send message
			path("message", () -> extractSessionID(
					sessionID -> post(() -> entity(Unmarshaller.entityToString(), sendMessage -> {
						_log.info("Web Interface method \"POST message\" called.");
						_chatActor.tell(new Messages.SendChatMessage(sessionID, sendMessage), ActorRef.noSender());
						return complete(StatusCodes.ACCEPTED);
					})))),
			//
			// Keep alive
			path("keepAlive", () -> extractSessionID(sessionID -> get(() -> {
						_log.info("Web Interface method \"GET keepAlive\" called.");
						_chatActor.tell(new Messages.KeepAliveMessage(sessionID), ActorRef.noSender());
						return complete(StatusCodes.OK);
					}))),
			//
			// Push a url - pops browser window in the agent UI
			path("pushUrl", () -> extractSessionID(sessionID -> post(() -> entity(Unmarshaller.entityToString(), url -> {
						_log.info("Web Interface method \"POST pushUrl\" called.");
						_chatActor.tell(new Messages.PushUrlMessage(sessionID, url), ActorRef.noSender());
						return complete(StatusCodes.OK);
					})))),
			//
			// Dial a number
			path("dialNumber", () -> extractSessionID(
					sessionID -> post(() -> entity(Unmarshaller.entityToString(), phoneNumber -> {
						_log.info("Web Interface method \"POST dialNumber\" called.");
						_chatActor.tell(new Messages.DialNumberMessage(sessionID, phoneNumber),
								ActorRef.noSender());
						return complete(StatusCodes.OK);
					})))),
			//
			// Schedule a callback
			// path(segment("scheduleCallback"), () ->
			// extractSessionID(sessionID -> post())),
			//
			// Close the session.
			path("close", () -> extractSessionID(sessionID -> get(() -> {
						_log.info("Web Interface method \"GET close\" called.");
						_chatActor.tell(new Messages.CloseMessage(sessionID, "User closed chat."),
								ActorRef.noSender());
						return complete(StatusCodes.OK);
					})))
	//))
	));
}