Lagom service warning "Sending an 2xx 'early' response before end of request was received"

We’ve been using Lagom for quite some time, but noticed some warnings logged by the akka http library some time ago: Sending an 2xx ‘early’ response before end of request was received…. Most (if not all) of the research I have been able to do regarding this message tends to be around streaming requests and responding before finishing the stream, which we aren’t doing (at least not explicitly). We haven’t been able to reproduce locally, and the log doesn’t have any identifying information about the request, so we’re having difficulty even identifying which http request could be causing it.

I’d love to provide more information about my system, but I’m not sure what more I can provide. As I understand it, Lagom is orchestrating the calls to akka-http such that I can’t cause this problem, and yet somehow I am. Any ideas around where I can look for it, or what type of requests could trigger this behavior in a standard Lagom service would be helpful

Hi @zmarois,

I think this message appears in cases where the request is streamed (and not strict). In Lagom that would be when the request is a Flow[_,_].


Thank you @ignasi35 for confirming my understanding of how this can happen. My confusion is that I cannot see how we are doing any such streamed request with our Service definition. I’ll paste the entire service descriptor below, if you are willing to confirm my understanding. In our entire codebase, we only use an akka.Flow in a ReadSideProcessor, and only use an akka.Source (and convert it to a CompletionStage) in the implementation of that getEvents route to read the raw journal events from the database for tracking purposes.

package io.cimpress.mcp.items.service;

import akka.Done;
import akka.NotUsed;
import com.fasterxml.jackson.databind.JsonNode;
import com.lightbend.lagom.javadsl.api.Descriptor;
import com.lightbend.lagom.javadsl.api.Service;
import com.lightbend.lagom.javadsl.api.ServiceCall;
import com.lightbend.lagom.javadsl.api.transport.Method;
import io.cimpress.mcp.items.v1.model.ItemEventQueryResult;
import io.cimpress.mcp.items.v1.model.ItemQueryResult;
import io.cimpress.mcp.items.v1.model.*;

import java.util.Map;
import java.util.Optional;

import static com.lightbend.lagom.javadsl.api.Service.*;

 * The item service interface.
 * <p>This describes everything that Lagom needs to know about how to serve and
 * consume the ItemService.
public interface ItemService extends Service {

  public static final String BASE_PATH = "/v1/items";

  ServiceCall<NotUsed, ItemResource> get(String itemId);

  ServiceCall<NotUsed, ItemQueryResult> queryItems(String orderId);

  ServiceCall<Item, Done> upsertItem(String itemId, Optional<Boolean> enableIndexing);

  ServiceCall<JsonNode, Done> updateItem(String itemId);

  ServiceCall<Map<String, ItemStatusUpdate>, Done> updateStatuses(String itemId);

  ServiceCall<NotUsed, Map<String, ItemStatus>> getStatuses(String itemId);

  ServiceCall<ItemStatusUpdate, Done> updateStatus(String itemId, String status);

  ServiceCall<NotUsed, Done> updateStatusState(String itemId, String status, String state);

  ServiceCall<Link, Done> putLink(String itemId, String rel);

  ServiceCall<NotUsed, Done> deleteLink(String itemId, String rel);

  ServiceCall<ItemEvent, Done> putEvent(String itemId, String eventId);

  ServiceCall<NotUsed, ItemEventQueryResult> getEvents(String itemId);

  ServiceCall<NotUsed, NotUsed> livecheck();

  ServiceCall<NotUsed, JsonNode> getSwagger();

  default Descriptor descriptor() {
    // @formatter:off
    return named("items").withCalls(
        restCall(Method.GET, "/livecheck", this::livecheck),
        restCall(Method.HEAD, "/livecheck", this::livecheck),
        restCall(Method.GET, "/healthcheck", this::livecheck),
        restCall(Method.HEAD, "/healthcheck", this::livecheck),
        restCall(Method.GET, "/swagger.json", this::getSwagger),
        pathCall("/items/:itemId", this::get),
        pathCall("/items?orderId", this::queryItems),
        restCall(Method.PUT, "/items/:itemId?enableIndexing", this::upsertItem),
        pathCall("/items/:itemId/events", this::getEvents),
        restCall(Method.PUT, "/items/:itemId/events/:eventId", this::putEvent),
        restCall(Method.GET, BASE_PATH + "/:itemId", this::get),
        restCall(Method.GET, BASE_PATH + "?orderId", this::queryItems),
        restCall(Method.PUT, BASE_PATH + "/:itemId?enableIndexing", this::upsertItem),
        restCall(Method.PATCH, BASE_PATH + "/:itemId", this::updateItem),
        restCall(Method.PATCH, BASE_PATH + "/:itemId/statuses", this::updateStatuses),
        restCall(Method.GET, BASE_PATH + "/:itemId/statuses", this::getStatuses),
        restCall(Method.PATCH, BASE_PATH + "/:itemId/statuses/:status", this::updateStatus),
        restCall(Method.PUT, BASE_PATH + "/:itemId/statuses/:status/state/:state", this::updateStatusState),
        restCall(Method.PUT, BASE_PATH + "/:itemId/links/:rel", this::putLink),
        restCall(Method.DELETE, BASE_PATH + "/:itemId/links/:rel", this::deleteLink),
        restCall(Method.GET, BASE_PATH + "/:itemId/events", this::getEvents),
        restCall(Method.PUT, BASE_PATH + "/:itemId/events/:eventId", this::putEvent)
    // @formatter:on


Hmmm, I wonder if big payloads on a strict call request could trigger that behavior too.

Imagine you are POSTing (or PUTing for that matter) a rather big payload. Reading from the wire and actually handled by akka streams to so even if you don’t use streamed calls there’s still streams under the covers.

If you could narrow down the actual call and example payload to reproduce it’d be awesome!


We’re absolutely trying to narrow it down to the exact call, but having difficulties doing so. I’ll look into identifying “large” payloads and if I can correlate that to the warnings. Thanks for the idea.