Streaming text over http from a Source.tick

I’m creating small demos to develop my understanding of streaming objects over http using akka. My end goal is a route that streams a generated object (an image from a webcam to be precise) over http. My attempt at a smaller version of this is a route that contains a Source.tick with a call to a method that returns a string.

My route:

 path("test", () ->
                        get(() -> extractRequest(request -> complete(testHandler.handleTextSource(request))))
                ),

My handler

 public HttpResponse handleTextSource(HttpRequest httpRequest) {
        final Source<ByteString, Cancellable> source
                = Source.tick(Duration.ofSeconds(1), Duration.ofSeconds(2), getText()).map(ByteString::fromString);

        HttpEntity.Chunked textEntity = HttpEntities.create(ContentTypes.TEXT_PLAIN_UTF8, source);
        return HttpResponse.create().withEntity(textEntity);
    }

public String getText() {
        System.out.println("getText()");
        return "text";
    }

I ran a curl command on this route and notice getText() is called only once and nothing is displayed in the output. Once I kill the server all of a sudden a number of responses arrive in the terminal. From looking at the twitter streaming example online I noticed they use completeOkWithSource. In the example they use a JsonEntityStreamingSupport and a Jackson.marshaller().

a) Why is source.tick not being called every duration? I would expect getText() to be called every 2 seconds. Am I not using the source correctly?
b) If I use a source do I need to use completeOkWithSource, if so must I create my own EntityStreamingSupport (creating my own requires I implement many methods I don’t understand so I was avoiding this).

Thank you very much
David

After watching some akka lectures I re-wrote the code and made the route return a flow which I used with handleWebSocketMessage instead of complete(). Here is the method in the hopes it helps someone else. If anyone has further comments as to why my previous code didn’t work I’d appreciate hearing it. I believe it’s because I was missing a sink, but unsure.

public Flow<Message, Message, NotUsed> createTextStreamFlow(HttpRequest httpRequest) {
       Source<String, NotUsed> repeat = Source.repeat("Single text from source with repeat");
       Source<Message, NotUsed> textMap = repeat.throttle(1, Duration.ofSeconds(3)).map(TextMessage::create);

       return Flow.fromSinkAndSource(Sink.foreach(f -> System.out.println("Sink: " + f.toString())), textMap);
   }

Some feedback on your original code

Source.tick takes an element and sends that at an interval, not a factory for elements, so that it only prints to standard out once is expected. If you want a factory to produce elements at a given interval you can map each constant tick to a new value by calling the getText() method from map for every element.

For the elements not being printed on the client side, there is nothing wrong with your code, the issue is that curl buffers response data and prints it in chunks, and you only give it small strings at a time so it takes time for that buffer to fill up. When you kill the server the connection is closed and curl prints what it has in the buffer. You can disable curl buffers with -N and see the ticks come out right away.

1 Like

Thank you very much for your response.