Alpakka rabbitmq send msg bug? Successfully sent when the network is interrupted!

When I disconnected the network and continued to send messages, I found that the stream returned the result of the successful transmission, but the message was not sent to the rabbitmq service.

code:

private def initQueue() = {
    val exchangeDeclaration = ExchangeDeclaration(
      mqParam.attribute.exchange,
      mqParam.attribute.eType.toString,
      mqParam.attribute.durable,
      mqParam.attribute.autoDelete,
      mqParam.attribute.internal,
      mqParam.attribute.props
    )

    val connectSettings: AmqpDetailsConnectionProvider = {
      AmqpDetailsConnectionProvider(mqParam.hostAndPorts).withCredentials(
        AmqpCredentials.create(mqParam.credentials.account, mqParam.credentials.password)
      ).withVirtualHost(mqParam.vhost)
    }

    val sinkSetting = AmqpSinkSettings(connectSettings).withDeclarations(exchangeDeclaration).withExchange(mqParam.attribute.exchange)
    val sink = AmqpSink.simple(sinkSetting)
    val newQueue = Source.queue[ByteString](mqParam.bufferSize, mqParam.overflowStrategy).toMat(sink)(Keep.left).run()
    newQueue.watchCompletion() onComplete { _ => self ! QueueCompletion}
    newQueue
  }

private def offerData(data: ByteString, client: ActorRef, queue: SourceQueueWithComplete[ByteString]) = {
    queue.offer(data) flatMap {
      case QueueOfferResult.Enqueued => Future.successful(true)
      case QueueOfferResult.Dropped => Future.failed(new RuntimeException("Queue overflowed. Try again later."))
      case QueueOfferResult.Failure(ex) => Future.failed(ex)
      case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed"))
    } pipeTo client
  }