Camel - RedeliveryErrorHandler
org.apache.camel.processor.errorhandler.RedeliveryErrorHandler.RedeliveryTask#doRun
private void doRun() throws Exception {
// did previous processing cause an exception?
if (exchange.getException() != null) {
handleException();
onExceptionOccurred();
}
// compute if we are exhausted or cannot redeliver
boolean redeliverAllowed = redeliveryCounter == 0 || isRedeliveryAllowed();
boolean exhausted = false;
if (redeliverAllowed) {
// we can redeliver but check if we are exhausted first (optimized to only check when needed)
exhausted = exchange.isRedeliveryExhausted() || exchange.isRollbackOnly();
if (!exhausted && redeliveryCounter > 0) {
// its a potential redelivery so determine if we should redeliver or not
redeliverAllowed
= currentRedeliveryPolicy.shouldRedeliver(exchange, redeliveryCounter, retryWhilePredicate);
}
}
// if we are exhausted or redelivery is not allowed, then deliver to failure processor (eg such as DLC)
if (!redeliverAllowed || exhausted) {
Processor target = failureProcessor != null ? failureProcessor : deadLetter;
// we should always invoke the deliverToFailureProcessor as it prepares, logs and does a fair
// bit of work for exhausted exchanges (its only the target processor which may be null if handled by a savepoint)
boolean isDeadLetterChannel = isDeadLetterChannel() && target == deadLetter;
deliverToFailureProcessor(target, isDeadLetterChannel, exchange);
// we are breaking out
} else if (redeliveryCounter > 0) {
// calculate the redelivery delay
redeliveryDelay
= determineRedeliveryDelay(exchange, currentRedeliveryPolicy, redeliveryDelay, redeliveryCounter);
if (redeliveryDelay > 0) {
// okay there is a delay so create a scheduled task to have it executed in the future
if (currentRedeliveryPolicy.isAsyncDelayedRedelivery() && !exchange.isTransacted()) {
// we are doing a redelivery then a thread pool must be configured (see the doStart method)
ObjectHelper.notNull(executorService,
"Redelivery is enabled but ExecutorService has not been configured.", this);
// schedule the redelivery task
if (LOG.isTraceEnabled()) {
LOG.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", redeliveryDelay,
exchange.getExchangeId());
}
executorService.schedule(() -> reactiveExecutor.schedule(this::redeliver), redeliveryDelay,
TimeUnit.MILLISECONDS);
} else {
// async delayed redelivery was disabled or we are transacted so we must be synchronous
// as the transaction manager requires to execute in the same thread context
try {
// we are doing synchronous redelivery and use thread sleep, so we keep track using a counter how many are sleeping
redeliverySleepCounter.incrementAndGet();
boolean complete = sleep();
redeliverySleepCounter.decrementAndGet();
if (!complete) {
// the task was rejected
exchange.setException(new RejectedExecutionException("Redelivery not allowed while stopping"));
// mark the exchange as redelivery exhausted so the failure processor / dead letter channel can process the exchange
exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(true);
// jump to start of loop which then detects that we are failed and exhausted
reactiveExecutor.schedule(this);
} else {
reactiveExecutor.schedule(this::redeliver);
}
} catch (InterruptedException e) {
redeliverySleepCounter.decrementAndGet();
// we was interrupted so break out
exchange.setException(e);
// mark the exchange to stop continue routing when interrupted
// as we do not want to continue routing (for example a task has been cancelled)
exchange.setRouteStop(true);
reactiveExecutor.schedule(callback);
}
}
} else {
// execute the task immediately
reactiveExecutor.schedule(this::redeliver);
}
} else {
// Simple delivery
outputAsync.process(exchange, doneSync -> {
// only continue with callback if we are done
if (isDone(exchange)) {
reactiveExecutor.schedule(callback);
} else {
// error occurred so loop back around and call ourselves
reactiveExecutor.schedule(this);
}
});
}
}