Dynamically decide on whether to do Elasticsearch retries based on error messages

With akka-stream-alpakka-elasticsearch version 1.1.2 there was an option to implement RetryLogic which allowed us to override boolean shouldRetry(final int retries, final Seq<String> errors) method.

That gave us an option to dynamically decide based on an error message whether or not retry should be made for a particular document. For example if the error type is version_conflict_engine_exception we don’t want to retry while we want to retry for other errors.

Can something similar be achieved with 2.0.2 release? I was thinking about wrapping the flow with RetryFlow for which can define a decider function but not sure if that’s recommended for our use case.

Any advice is highly appreciated!

This is how the flow looks like:

Flow<Record<T, C>, Record<T, C>, NotUsed> sinkFlow = Flow.create();

            Flow<Record<T, C>, Record<T, C>, NotUsed> sink = sinkFlow
                .map(record -> {
                    ElasticsearchDocument doc = recordToDocument.apply(record);

                    WriteMessage<Object, Record<T, C>> message = recordToWriteMessage
                        .apply(record, doc)
                        .withPassThrough(record);

                    if (doc.indexName != null) {
                        message = message.withIndexName(doc.indexName);
                    } else {
                        message = message.withIndexName(configuration.defaultIndexName);
                    }

                    return message;
                })
                .via(ElasticsearchFlow.createWithPassThrough(
                    configuration.defaultIndexName,
                    configuration.indexType,
                    sinkSettings,
                    client,
                    JsonUtil.JSON_MAPPER))
                .map(result -> {
                    if (result.getError().isPresent()) {
                        LOG.warn(
                            "Error while publishing message to Elasticsearch: {}", result.getErrorReason());
                    }

                    return result.message().passThrough();
                });