UnixDomainSocket causing "Substream Source cannot be materialized more than once"

(Lunfu Zhong) #1


I’m trying to integrate UnixDomainSocket with akka-http by customizing ClientTransport with UninDomainSocket#outgoingConnection.

final class UnixDomainSocketTransport(file: File) extends ClientTransport {
  override def connectTo(
      host: String,
      port: Int,
      settings: ClientConnectionSettings
  )(implicit system: ActorSystem): Flow[ByteString, ByteString, Future[OutgoingConnection]] = {
    implicit val ex = system.dispatcher
    val address     = InetSocketAddress.createUnresolved(host, port)

      .mapMaterializedValue(_.map { c =>
        system.log.info(s"materialized $c")
        Http.OutgoingConnection(address, address)

Unfortunately i could not get it works because of the error “Substream Source cannot be materialized more than once”. Here is the code to reproduce the error.

akka.version = 2.5.19
alpakka.version = 1.0-M2

The error was expected if Souce was operated prefixAndTail, which i found out in akka/FlowPrefixAndTailSpec.scala. While the Flow returned by UninDomainSocket#outgoingConnection was nested with a prefixAndTail operated Source, it could be found https://github.com/akka/alpakka/blob/master/unix-domain-socket/src/main/scala/akka/stream/alpakka/unixdomainsocket/impl/UnixDomainSocketImpl.scala#L239.

I’m not sure this is a bug or a wrong way to use. Hope any one could help me to work around with it, thanks!

(Martynas Mickevičius) #2

Hi @zhongl,

I gave it a quick look and the mentioned exception goes away if I changed min connection count from 2 to 1 here:

I do not think having more than one “connection” to the socket file makes sense.

(Lunfu Zhong) #3

Hi @2m,

Thanks for reply! I already knew that connect settings could avoid the problem, that why i had commented withMaxConnections(1).

But, i want to use UnixDomainSocket for watching docker daemon events, then refresh the containers or services if it gets some notifications from docker daemon. So, i need at lease two connections of UnixDomainSocket, one for long pulling (watching) events, and the other for short querying.

I realized that the semantics of Future[OutgoingConnection] in Flow[ByteString, ByteString, Future[OutgoingConnection]] is different between Tcp#OutgoingConnection (a new connection) and UnixDomainSocket#OutgoingConnection (the same connection).

Since UnixDomainSocket#OutgoingConnection followed the declaration of Tcp#OutgoingConnection , it would be better that UnixDomainSocket keep the same semantics!

(Martynas Mickevičius) #4

Ahh, yes. The semantics should be the same, or at least there should be a possibility to tweak configuration to get the same semantics. Especially since the Unix Domain Socket connector API was modeled after the TCP Stream API.

Would you be up for creating a PR which would make the UnixDomainSocket#OutgoingConnection return a new connection by default?

(Lunfu Zhong) #5