camel系列-OnCompletion

概念

Camel 的 DSL 包含一个 onCompletion 语句,当消息在路由级别成功完成或失败时,它允许你自定义所采取的操作。

onCompletion 与拦截器非常类似,onCompletion 作用于路由执行完成后的回调,拦截器则在路由执行期间操作回调

onCompletion

在 XML DSL 中,在路由中定义一个 onCompletion 块。 可以定义块在路由的任何位置。按照惯例,它通常放在开头。 在代码块中,当 exchange 消息在路由执行完成之后,定义要执行的处理步骤。

DSL 示例

定义 onCompletion 节点

1
2
3
4
5
6
7
8
<route>
<from uri="timer:onCompletion?period=5s"/>
<onCompletion>
<log message="onCompletion triggered: ${threadName}"/>
<to uri="log:completed"/>
</onCompletion>
<log message="Processing message: ${threadName}"/>
</route>

运行结果

onCompletion 在路由结束之后执行

1
2
3
09:30:45.392 [Camel (MyJavaLoader) thread #2 - timer://onCompletion] INFO  route2 - Processing message: Camel (MyJavaLoader) thread #2 - timer://onCompletion
09:30:45.396 [Camel (MyJavaLoader) thread #2 - timer://onCompletion] INFO route2 - onCompletion triggered: Camel (MyJavaLoader) thread #2 - timer://onCompletion
09:30:45.397 [Camel (MyJavaLoader) thread #2 - timer://onCompletion] INFO completed - Exchange[ExchangePattern: InOnly, BodyType: null, Body: [Body is null]]

回调时机

分为成功和失败回调

  • onCompleteOnly 定义为执行成功后回调
  • onFailureOnly 定义为抛异常后回调

DSL 定义

1
2
3
4
5
6
7
8
9
10
11
12
13
<route>
<from uri="timer:onCompleteOnly?period=5s"/>
<onCompletion onCompleteOnly="true">
<log message="onCompletion success triggered: ${threadName}"/>
<to uri="log:completed"/>
</onCompletion>
<onCompletion onFailureOnly="true">
<log message="onCompletion fail triggered: ${threadName}"/>
<to uri="log:failed"/>
</onCompletion>
<log message="Processing message: ${threadName}"/>
<!-- <throwException message="Forced" exceptionType="java.lang.IllegalArgumentException"/>-->
</route>

运行结果

1
2
3
4
5
6
7
8
9
成功回调
09:51:18.673 [Camel (MyJavaLoader) thread #1 - timer://onCompleteOnly] INFO route1 - Processing message: Camel (MyJavaLoader) thread #1 - timer://onCompleteOnly
09:51:18.681 [Camel (MyJavaLoader) thread #1 - timer://onCompleteOnly] INFO route1 - onCompletion success triggered: Camel (MyJavaLoader) thread #1 - timer://onCompleteOnly
09:51:18.684 [Camel (MyJavaLoader) thread #1 - timer://onCompleteOnly] INFO completed - Exchange[ExchangePattern: InOnly, BodyType: null, Body: [Body is null]]

失败回调
09:55:54.326 [Camel (MyJavaLoader) thread #1 - timer://onCompleteOnly] INFO route1 - Processing message: Camel (MyJavaLoader) thread #1 - timer://onCompleteOnly
09:55:54.343 [Camel (MyJavaLoader) thread #1 - timer://onCompleteOnly] INFO route1 - onCompletion fail triggered: Camel (MyJavaLoader) thread #1 - timer://onCompleteOnly
09:55:54.343 [Camel (MyJavaLoader) thread #1 - timer://onCompleteOnly] INFO failed - Exchange[ExchangePattern: InOnly, BodyType: null, Body: [Body is null]]

回调条件

使用 onWhen 谓语条件设置回调条件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<route>
<from uri="direct:onCompletionFailureConditional"/>
<onCompletion onFailureOnly="true">
<onWhen>
<simple>${exception.class} == 'java.lang.IllegalArgumentException'</simple>
</onWhen>
<log message="onFailureOnly thread: ${threadName}"/>
<to uri="mock:failed"/>
</onCompletion>
<log message="Original thread: ${threadName}"/>
<choice>
<when>
<simple>${body} contains 'explode'</simple>
<throwException message="Forced" exceptionType="java.lang.IllegalArgumentException"/>
</when>
</choice>
</route>

全局回调

也可以将 onCompletion 的作用域定义在全局范围,那么每个路由执行完成都会执行回调

1
2
3
4
5
6
7
8
9
10
11
12
<!-- this is a global onCompletion route that is invoked when any exchange is done being routed
as a kind of after callback -->
<onCompletion>
<to uri="log:global"/>
<to uri="mock:sync"/>
</onCompletion>

<route>
<from uri="direct:start"/>
<process ref="myProcessor"/>
<to uri="mock:result"/>
</route>

消费模式

OnCompletion 支持两种影响路由消费者的模式:

  • AfterConsumer - 在消费者完成后运行的默认模式
  • BeforeConsumer - 在消费者完成之前运行,在消费者向被调用者写回响应之前

AfterConsumer 模式是默认模式,与较早的 Camel 版本中的行为相同。

新的 BeforeConsumer 模式用于onCompletion在消费者将其响应写回被调用者之前运行(如果处于 InOut 模式)。这允许onCompletion修改 Exchange,例如添加特殊标头,或将 Exchange 记录为响应记录器等。

1
2
3
4
5
<onCompletion mode="BeforeConsumer">
<setHeader name="createdBy">
<constant>Someone</constant>
</setHeader>
</onCompletion>

动态添加 onCompletion

可以在代码中通过 Exchange 的扩展类 ExtendedExchange 中 addOnCompletion 方法动态添加 onCompletion,增加灵活性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ConfirmCancelProcessor implements Processor {
private final Logger log = LoggerFactory.getLogger(ConfirmCancelProcessor.class);

@Override
public void process(Exchange exchange) throws Exception {
log.info("Starting two-phase operation");

final ProducerTemplate producerTemplate =
exchange.getContext().createProducerTemplate();
producerTemplate.send("mock:start", exchange);

exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() {
@Override
public void onComplete(Exchange exchange) {
log.info("Completed - confirming");
producerTemplate.send("mock:confirm", exchange);
}

@Override
public void onFailure(Exchange exchange) {
log.info("Failed - cancelling");
producerTemplate.send("mock:cancel", exchange);
}
});
}
}

参考