Delay, Map, and others in graph DSL

Hy there!

I just tried to add a delay to my graph with builder.add(Delay(offset, strategy)). And there is no Delay stage in bcs its tagged as internal.
If this is done for a reason; doesn’t it worth to mention in the streams-overview docs, or the stream-graphs docs? (So you can only use it with builder.add(Flow[A].delay(offset,strategy)).)

BTW where is the line where you can use a stage in a dsl with add or not? As I’m trying to figure out maybe the multi input or multi output stages whose can be used (the others need the Source, Sink, or Flow starting), but I never see that written before.

Don’t use the Graph DSL until you find a thing that is impossible to express otherwise.

Also, you don’t nee to add Flows usually, just sources / sinks usually.

Are you using Scala or Java?

Normally one would simply say from outlet.via(Flow[...].delay).to etc or the ~> DSL in Scala.

Scala. And I’m familiar with the basics.

This code is just a spike right now, I’m just playing with stuffs and concepts.

private lazy val graph = {
    val source = Source.queue[Task](10000,

    RunnableGraph.fromGraph(GraphDSL.create(source) { implicit builder: GraphDSL.Builder[SourceQueueWithComplete[Task]] => source =>
      import GraphDSL.Implicits._

      val mergePrioritized = builder.add(MergePrioritized[Task](Seq(99,1)))
      val delay1 = builder.add(Flow[Task].delay(5.seconds))
      val delay2 = builder.add(Flow[Task].delay(5.seconds))
      val groupedDelay = builder.add(Flow[Task].buffer(1000,, 5.minutes).mapConcat(identity))
      val check = builder.add(Flow[Task].map(_.copy(stats = TaskStats.empty)).via(checkerFlow))
      val log = builder.add(Flow[Task].map{t =>; t})
      val dropNonPeriodically = builder.add(Flow[Task].filter(_.isPeriodically))

      source ~> delay1 ~>; mergePrioritized.out ~> check ~> log ~> dropNonPeriodically
                delay2 ~>
                delay2 <~ groupedDelay <~ dropNonPeriodically


I think if you have a graph similar to this, the reader get a better picture if the things named as is (like log or dropNonPeriodically) instead of a long doMagic flow. And because most of the time I get my flows as a param, it was really strange to “import” with builder.add(Flow[Task].delay(5.seconds)) instead of just builder.add(Delay[Task](5.seconds)).