Hi all
I’ve a server class that looks as follows:
import akka.Done
import akka.actor.typed.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.Http.ServerBinding
import akka.http.scaladsl.model.StatusCodes
import org.apache.kafka.common.serialization.StringSerializer
import akka.actor.typed.scaladsl.adapter._
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.Source
import akka.stream.typed.scaladsl.ActorMaterializer
import com.sweetsoft._
import com.sweetsoft.store.Delegator
import com.sweetsoft.store.message.Messenger._
import com.sweetsoft.utils.KafkaMessage
import org.apache.kafka.clients.producer.ProducerRecord
import io.circe.generic.auto._
import io.circe.parser._
import io.circe.syntax._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import scala.concurrent._
import scala.concurrent.duration._
import scala.util.{Failure, Success}
import com.sweetsoft.store.log.Logger._
final case class Server[T](delegator: Delegator)
(implicit system: ActorSystem[T], materializer: ActorMaterializer) {
private implicit val executor = system.executionContext
private val setting = ProducerSettings(system.toUntyped, new StringSerializer, new StringSerializer)
.withBootstrapServers(getKafkaAddr)
private def route() :
Route = {
concat(
path("producer") {
post {
entity(as[String]) { v =>
decode[Producer](v) match {
case Left(_) =>
delegator.saveLog(SaveLog(Log(Error, s"Receive invalid message from SAP $v")))
complete(StatusCodes.BadRequest -> s"Invalid message $v")
case Right(msg) =>
// Save into the store
delegator
.saveMessage(SaveMessage(msg.format()))
complete(StatusCodes.Created)
}
}
}
}, path("health") {
get {
complete(StatusCodes.OK)
}
})
}
private def forward(msg: Producer)
: Future[Done] =
Source.single(msg)
.map(value => new ProducerRecord[String, String](value.topic, KafkaMessage(value.event, value.data).asJson.noSpaces))
.runWith(Producer.plainSink(setting))
// Start the HTTP producer server
def start()
: Future[ServerBinding] = {
//Please log
Http()(system.toUntyped)
.bindAndHandle(route(), getServerIp, getServerPort)
}
def terminate(server: Future[ServerBinding]) : Unit =
Await
.result(server, 5.seconds)
.terminate(hardDeadline = 3.seconds)
.onComplete{
case Success(_) =>
delegator.saveLog(SaveLog(Log(Info, "HTTP has been successfully terminated.")))
case Failure(ex) =>
delegator.saveLog(SaveLog(Log(Error, s"Following error occurs during shutdown: ${ex.getMessage}.")))
//Will try to shutdown, until definitely terminated
terminate(server)
}
}
the problem is this line:
// Start the HTTP producer server
def start()
: Future[ServerBinding] = {
//Please log
Http()(system.toUntyped)
.bindAndHandle(route(), getServerIp, getServerPort)
}
The compiler complains:
[error] found : akka.http.scaladsl.server.Route
[error] (which expands to) akka.http.scaladsl.server.RequestContext => scala.concurrent.Future[akka.http.scaladsl.server.RouteResult]
[error] required: akka.stream.scaladsl.Flow[akka.http.scaladsl.model.HttpRequest,akka.http.scaladsl.model.HttpResponse,Any]
[error] .bindAndHandle(route(), getServerIp, getServerPort)
although I’ve imported the module
import akka.http.scaladsl.server.Directives._
In the doc, there is a hint:
1 To be picked up automatically, the implicit conversion needs to be provided in the companion object of the source type. However, as Route is just a type alias for RequestContext => Future[RouteResult], there’s no companion object for Route. Fortunately, the implicit scope for finding an implicit conversion also includes all types that are “associated with any part” of the source type which in this case means that the implicit conversion will also be picked up from RouteResult.route2HandlerFlow automatically.
Do I have to place
// Start the HTTP producer server
def start()
: Future[ServerBinding] = {
//Please log
Http()(system.toUntyped)
.bindAndHandle(route(), getServerIp, getServerPort)
}
Thanks
in a companion object?