Testing use case with stream and regular service calls

Hi,

I have service with these service calls:

  def createAccount: ServiceCall[CreateAccountRequest, Account]
  
  def closeAccount(accountUid: UUID): ServiceCall[NotUsed,Done]
  
  def getAccount(accountUid: UUID): ServiceCall[NotUsed,Account]
   
  def accountStream(): ServiceCall[Source[AccountStreamAlive,NotUsed],Source[AccountEvent,NotUsed]]

  def accountEvents: Topic[AccountEvent]

I’m struggling how to write a test case using scalatest with this flow:

  1. connect to accountStream
  2. createAccount
  3. closeAccount
  4. getAccount
  5. close accountStream

This is what I have so far:

"receive account events over stream" in {
      val input : Source[AccountStreamAlive,NotUsed] = Source.single(AccountStreamAlive(true)).concat(Source.maybe)
      val stream = accountService.accountStream.invoke(input).map{ output =>
        val probe = output.runWith(TestSink.probe(server.actorSystem))
        probe.request(2)
        probe.expectNext() shouldBe an[AccountCreated]
        probe.expectNext() shouldBe an[AccountClosed]
        probe.cancel
        succeed
      }
      
      val ownerUid = UUID.randomUUID
      val accountUid = UUID .randomUUID
      val initialAmount = 0
      
      for {
        created <- createAccount(ownerUid, initialAmount)
        closed <- closeAccount(created.accountUid)
        retrieved <- getAccount(created.accountUid)
        events: Seq[AccountEvent] <- accountService.accountEvents.subscribe.atMostOnceSource
        .filter(_.accountUid == created.accountUid)
        .take(2)
        .runWith(Sink.seq)
        
      } yield {
        retrieved.status should ===(AccountStatus.Closed)
        events.size shouldBe 2
        events.head shouldBe an[AccountCreated]
        events.drop(1).head shouldBe an[AccountClosed]
      }
      
      
    }

Everything without stream works fine but I do not know how to incorporate stream.
Any suggestions?

Thx.

BR,
Alan

I found a solution:

   "receive account events over stream" in {
      val userUid = UUID.randomUUID
      val ownerUid = UUID.randomUUID
      val accountUid = UUID .randomUUID
      val initialAmount = 0
      val input : Source[AccountStreamAlive,NotUsed] = Source.single(AccountStreamAlive(true)).concat(Source.maybe)
      accountService.accountStream(userUid).invoke(input).flatMap{ output =>
        val probe = output.runWith(TestSink.probe(server.actorSystem))
        for {
          created <- createAccount(ownerUid, initialAmount)
          closed <- closeAccount(created.accountUid)
          retrieved <- getAccount(created.accountUid)
          events: Seq[AccountEvent] <- accountService.accountEvents.subscribe.atMostOnceSource
          .filter(_.accountUid == created.accountUid)
          .take(2)
          .runWith(Sink.seq)
          
        } yield {
          retrieved.status should ===(AccountStatus.Closed)
          events.size shouldBe 2
          events.head shouldBe an[AccountCreated]
          events.drop(1).head shouldBe an[AccountClosed]
          probe.request(2)
          probe.expectNext() shouldBe an[AccountCreated]
          probe.expectNext() shouldBe an[AccountClosed]
          probe.cancel
          succeed
        }
      }  
      
    }