How can i create an akka stream from a java Native Queue?

My use case is I want to create an Akka stream in which the source is a java Queue. The stream should keep on the pooling values from the queue and if the queue is empty then wait for the values in the queue. Akka stream queue is another option but if in case of any failure I want to store the values that are there in the queue (I don’t know how to do that with Akka stream Queue). I tried the following:

val source: Source[String, NotUsed] = Source.from(queue)
source.ask(1, actor, classOf[String], 10 seconds).runWith(Sink.ignore(), mat)

I tried setting idleTimeOut and keepAlive properties but it is not working. The stream goes to Done state if there is no values in the queue.

Note that java.util.Queue won’t be safe in general to push values onto from one thread and then consume on a different thread like it will do with Akka Stream. You will need to specifically use a thread safe one for that to work, such as an implementation of java.util.BlockingQueue. It will still mean you need to block a thread just waiting for elements though, so if you can avoid it, it is better to use for example Source.queue.

If you have to use a Java Queue, and can make it be a thread safe queue, you can implement that with Source.unfoldResource and block on poll (in a loop) to produce elements.

1 Like

Hey john,
Thank you for your response. Can you explain java blocking queue as stream source with an example?Also Is there any function in Akka stream to set a polling interval on source?

You can poll with sth like:

//if you can pull 0 or more data at once
def getNDataFromTheQueue(n: Int): Seq[Data]
val source: Source[Data, NotUsed] = Source.tick(1.millis, 250.millis, 1).mapConcat(_ => getNDataFromTheQueue(10))

(I think kinda same can be achived with Source.repeate + throttle.)

I tried your code block but I am getting a compile-time error. Error is :
typemismatch expected: Int => Iterable[NotInferedT] actual: Int => Seq[Data]

I would be more happy if you could complete the remaining part of the puzzle, but I’m always up for the challenge:

import java.util.concurrent._
import akka.stream.scaladsl._
import akka.actor.Cancellable
import concurrent.duration._

type Data = String
val bq: BlockingQueue[Data] = new ArrayBlockingQueue[Data](1000)

def getNDataFromTheQueue(n: Int): List[Data] = {
  import scala.collection.JavaConverters._
  val arrayList = new java.util.ArrayList[Data]()
  bq.drainTo(arrayList, n)
  arrayList.asScala.toList
}

val source: Source[Data, Cancellable] = Source.tick(1.millis, 250.millis, 1).mapConcat(_ => getNDataFromTheQueue(10))

This compiles. Also your bq would be come from somewhere else, and the Data type alias can be used to your concrete type, or can be totally find-and-replaced. (Ofc. this code can be written in java too, the main logic would be the same, you drain the queue, and convert the list to an immutable one, and you need to use javadsl, and some more verbose duration settings.)

HI,
Thank you so much for your reply. I am trying to achieve my use case from your above code. But I am getting exception. Below is my code:

import akka.Done
import akka.actor.{Actor, ActorSystem, Props}
import akka.stream.ActorMaterializer
import akka.util.Timeout

import scala.util.{Failure, Success}

object QueueTest extends App {

  import java.util.concurrent._

  import akka.actor.Cancellable
  import akka.stream.scaladsl._

  import concurrent.duration._

  type Data = String
  implicit val system = ActorSystem("testSystem")
  implicit val mater = ActorMaterializer()
  implicit val ec = system.dispatcher
  implicit val timeout = Timeout(1 seconds)
  val bq: BlockingQueue[Data] = new ArrayBlockingQueue[Data](1000)

  def getNDataFromTheQueue(n: Int): List[Data] = {
    import scala.collection.JavaConverters._
    val arrayList = new java.util.ArrayList[Data]()
    bq.drainTo(arrayList, n)
    arrayList.asScala.toList
  }

  (1 to 10).foreach(x => bq.add(x.toString))

  val actor = system.actorOf(Props[TestActor])
  val source: Source[Data, Cancellable] = Source.tick(1.millis, 250.millis, 1).mapConcat(_ => getNDataFromTheQueue(10))

  val future: concurrent.Future[Done] = source.ask(actor).runWith(Sink.ignore)

  (11 to 100).foreach(x => bq.add(x.toString))


  future onComplete {
    case Success(result) => print("Success")
    case Failure(t) => print("Fail" + t)
  }


  class TestActor extends Actor {
    override def receive: Receive = {
      case str: String => {
        println("Str === " + str)
        sender() ! str
      }
    }
  }


}

The output of the above code is :

Str === 1
Str === 2
Failjava.lang.ClassCastException: Cannot cast java.lang.String to scala.runtime.Nothing$

I want the stream to send all the values from the queue to the Test actor and if the queue is empty keep polling the queue for values.

Every time test with babysteps!

Super small printer function:

val printer = Flow[Data].map { x =>
  println(x)
  x
}

Super easy test flow:

val future: concurrent.Future[Done] =
    source
      .via(printer)
      .runWith(Sink.ignore)

With these two, the flow works, so the problem is with the ask!

Another thing: read the errors:
Cannot cast java.lang.String to scala.runtime.Nothing$
Where the hell we cast String to Nothing?

With these infos you can reach the answer faster:
.ask[Data](actor)

For the record, it was super slow for me to find that missing type param too :P
Another common trick to click into the functions you use and read the docs :D

1 Like