BroadcastHub emit no new element after random time

initial broadcast and source queue in Actor constructor

 Source<Ticker, SourceQueueWithComplete<Ticker>> sourceQueueWithCompleteSource =
                Source.queue(config.getInt("queue-buffer-size"), OverflowStrategy.dropHead());
        Pair<SourceQueueWithComplete<Ticker>, Source<Ticker, NotUsed>> sinkSourcePair = sourceQueueWithCompleteSource.log("tickerQueue")
                .toMat(BroadcastHub.of(Ticker.class, config.getInt("broadcast-buffer-size")), Keep.both()).run(mat);
        inputQueue = sinkSourcePair.first();
        source = sinkSourcePair.second();
        tickerMap = new HashMap<>();

inputQueue offer Ticker every one second

   private void handleDispatchTicker(String str) {
        tickerMap.values().forEach(p -> {
            try {
                QueueOfferResult result = inputQueue.offer(new Ticker(p.getTicker(), TICKER))
                        .toCompletableFuture().get(1, TimeUnit.SECONDS);
                if (!QueueOfferResult.enqueued().equals(result)) {
          "handleDispatchTicker tickerMap offer: {}", result);
            } catch (Exception e) {
                logger.error("handleDispatchTicker: {} \ncause: {} \ntickerMap: {}", e.getStackTrace(), e.getCause(), tickerMap.size());

source consumed by webSocket Actors and return to user.

 private void handleWatchTicker(Messages.WatchTicker ticker) {
        sender().tell(new Messages.WatchTickerResult(ticker.symbol, source), self());

everything works well at the beginning , but after sometime , may be one day or two, the source just stop without any exception. inputQueue offer runs normal , but source stop to emitting any new element. when start a new source from broadcastHub, source will emit fix number of elements but no new element.

there is absolutely no exception or error logs, may be swallow by stream. so I can not give more details.

maybe some very slow subscriber but not clean, maybe some bugs in broadcastHub, I do not know.

I do not need every element consumed by every consumer, so adapted to the slowest consumer is what I do not need, but no strategy for broadcastHub to set .

please help me check this, thank you guys