initial broadcast and source queue in Actor constructor
Source<Ticker, SourceQueueWithComplete<Ticker>> sourceQueueWithCompleteSource =
Source.queue(config.getInt("queue-buffer-size"), OverflowStrategy.dropHead());
Pair<SourceQueueWithComplete<Ticker>, Source<Ticker, NotUsed>> sinkSourcePair = sourceQueueWithCompleteSource.log("tickerQueue")
.toMat(BroadcastHub.of(Ticker.class, config.getInt("broadcast-buffer-size")), Keep.both()).run(mat);
inputQueue = sinkSourcePair.first();
source = sinkSourcePair.second();
tickerMap = new HashMap<>();
inputQueue offer Ticker every one second
private void handleDispatchTicker(String str) {
logBackend();
tickerMap.values().forEach(p -> {
try {
QueueOfferResult result = inputQueue.offer(new Ticker(p.getTicker(), TICKER))
.toCompletableFuture().get(1, TimeUnit.SECONDS);
if (!QueueOfferResult.enqueued().equals(result)) {
logger.info("handleDispatchTicker tickerMap offer: {}", result);
}
} catch (Exception e) {
logger.error("handleDispatchTicker: {} \ncause: {} \ntickerMap: {}", e.getStackTrace(), e.getCause(), tickerMap.size());
}
});
source consumed by webSocket Actors and return to user.
private void handleWatchTicker(Messages.WatchTicker ticker) {
sender().tell(new Messages.WatchTickerResult(ticker.symbol, source), self());
}
everything works well at the beginning , but after sometime , may be one day or two, the source just stop without any exception. inputQueue offer runs normal , but source stop to emitting any new element. when start a new source from broadcastHub, source will emit fix number of elements but no new element.
there is absolutely no exception or error logs, may be swallow by stream. so I can not give more details.
maybe some very slow subscriber but not clean, maybe some bugs in broadcastHub, I do not know.
I do not need every element consumed by every consumer, so adapted to the slowest consumer is what I do not need, but no strategy for broadcastHub to set .
please help me check this, thank you guys