SSE from Kafka Source

I am trying to use a kafka topic as a source for SSE events. It does not seem to be able to read the source and keeps closing the SSE connection. What am I missing?

    val route =
      path("events") {
        get {
          respondWithHeaders(RawHeader("Access-Control-Allow-Origin", "*")) {
            complete {
              Consumer
                .plainSource(consumerSettings, Subscriptions.topics("my_topic"))
                .map(msg => ServerSentEvent(msg.value()))
                .keepAlive(1.second, () => ServerSentEvent.heartbeat)
            }
          }
        }
      }

Do you get any log messages? You could try to enable akka.http.server.log-unencrypted-network-bytes = 100 to see what’s going on on the wire.