Akka Actor Not Receiving Self Message

I have a simple implementation using fs2 where I stream at regular intervals a message that I want to send to my Actor instance, but somehow it seems not to receive the message at all.

Here is my code:

import cats.effect._
import cats.effect.implicits._
import cats.effect.unsafe.implicits._
import fs2._
import scala.concurrent.duration.FiniteDuration
import java.util.concurrent.TimeUnit

import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props

object StreamUtils {
  def getATaskAsStream(pingInterval: FiniteDuration)(callback: Int => Unit): Stream[IO, Int] = {
    val source = Stream.awakeEvery[IO](pingInterval).map(_ => 0)
    val sink = source.evalMap(value => IO(callback(value)))
    source.concurrently(sink)
  }
}

class HelloActor extends Actor {
    val io = StreamUtils.getATaskAsStream(FiniteDuration(500, TimeUnit.MILLISECONDS))(x => {
          println(s"In the Akka actor..... Sending message to $self $x")
          self ! x
        }).compile.toList
    io.unsafeRunSync()
  
  def receive = {
    case "hello" => println("hello back at you")
    case x: Int  => println(s"Received from stream $x")
    case _ => println("Received some shit....")
  }
}

object Main {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("HelloSystem")
    // default Actor constructor
    val helloActor = system.actorOf(Props[HelloActor], name = "helloactor")
    helloActor ! "hello"
		println("Starting....")

    println("Done....")
  }
}

Here is a running version in Scastie: Scastie - An interactive playground for Scala.

io.unsafeRunSync() is synchronous so it will block the actor constructor until that stream completes. Not familiar enough with FS2 to say what to do instead, but using actor timers would be the canonical way to repeatedly send a message to an actor in Akka if you didn’t involve cats/FS2: Akka 2.8.4 - akka.actor.Timers

1 Like

You can use unsafeRunAsync and pipeToSelf ?