Thank you @ignasi35 for confirming my understanding of how this can happen. My confusion is that I cannot see how we are doing any such streamed request with our Service definition. I’ll paste the entire service descriptor below, if you are willing to confirm my understanding. In our entire codebase, we only use an akka.Flow in a ReadSideProcessor, and only use an akka.Source (and convert it to a CompletionStage) in the implementation of that getEvents route to read the raw journal events from the database for tracking purposes.
package io.cimpress.mcp.items.service;
import akka.Done;
import akka.NotUsed;
import com.fasterxml.jackson.databind.JsonNode;
import com.lightbend.lagom.javadsl.api.Descriptor;
import com.lightbend.lagom.javadsl.api.Service;
import com.lightbend.lagom.javadsl.api.ServiceCall;
import com.lightbend.lagom.javadsl.api.transport.Method;
import io.cimpress.mcp.items.v1.model.ItemEventQueryResult;
import io.cimpress.mcp.items.v1.model.ItemQueryResult;
import io.cimpress.mcp.items.v1.model.*;
import java.util.Map;
import java.util.Optional;
import static com.lightbend.lagom.javadsl.api.Service.*;
/**
* The item service interface.
*
* <p>This describes everything that Lagom needs to know about how to serve and
* consume the ItemService.
*/
public interface ItemService extends Service {
public static final String BASE_PATH = "/v1/items";
ServiceCall<NotUsed, ItemResource> get(String itemId);
ServiceCall<NotUsed, ItemQueryResult> queryItems(String orderId);
ServiceCall<Item, Done> upsertItem(String itemId, Optional<Boolean> enableIndexing);
ServiceCall<JsonNode, Done> updateItem(String itemId);
ServiceCall<Map<String, ItemStatusUpdate>, Done> updateStatuses(String itemId);
ServiceCall<NotUsed, Map<String, ItemStatus>> getStatuses(String itemId);
ServiceCall<ItemStatusUpdate, Done> updateStatus(String itemId, String status);
ServiceCall<NotUsed, Done> updateStatusState(String itemId, String status, String state);
ServiceCall<Link, Done> putLink(String itemId, String rel);
ServiceCall<NotUsed, Done> deleteLink(String itemId, String rel);
ServiceCall<ItemEvent, Done> putEvent(String itemId, String eventId);
ServiceCall<NotUsed, ItemEventQueryResult> getEvents(String itemId);
ServiceCall<NotUsed, NotUsed> livecheck();
ServiceCall<NotUsed, JsonNode> getSwagger();
@Override
default Descriptor descriptor() {
// @formatter:off
return named("items").withCalls(
restCall(Method.GET, "/livecheck", this::livecheck),
restCall(Method.HEAD, "/livecheck", this::livecheck),
restCall(Method.GET, "/healthcheck", this::livecheck),
restCall(Method.HEAD, "/healthcheck", this::livecheck),
restCall(Method.GET, "/swagger.json", this::getSwagger),
pathCall("/items/:itemId", this::get),
pathCall("/items?orderId", this::queryItems),
restCall(Method.PUT, "/items/:itemId?enableIndexing", this::upsertItem),
pathCall("/items/:itemId/events", this::getEvents),
restCall(Method.PUT, "/items/:itemId/events/:eventId", this::putEvent),
restCall(Method.GET, BASE_PATH + "/:itemId", this::get),
restCall(Method.GET, BASE_PATH + "?orderId", this::queryItems),
restCall(Method.PUT, BASE_PATH + "/:itemId?enableIndexing", this::upsertItem),
restCall(Method.PATCH, BASE_PATH + "/:itemId", this::updateItem),
restCall(Method.PATCH, BASE_PATH + "/:itemId/statuses", this::updateStatuses),
restCall(Method.GET, BASE_PATH + "/:itemId/statuses", this::getStatuses),
restCall(Method.PATCH, BASE_PATH + "/:itemId/statuses/:status", this::updateStatus),
restCall(Method.PUT, BASE_PATH + "/:itemId/statuses/:status/state/:state", this::updateStatusState),
restCall(Method.PUT, BASE_PATH + "/:itemId/links/:rel", this::putLink),
restCall(Method.DELETE, BASE_PATH + "/:itemId/links/:rel", this::deleteLink),
restCall(Method.GET, BASE_PATH + "/:itemId/events", this::getEvents),
restCall(Method.PUT, BASE_PATH + "/:itemId/events/:eventId", this::putEvent)
).withAutoAcl(true);
// @formatter:on
}
}