概念 Camel 的 DSL 包含一个 onCompletion 语句,当消息在路由级别成功完成或失败时,它允许你自定义所采取的操作。
onCompletion 与拦截器非常类似,onCompletion 作用于路由执行完成后的回调,拦截器则在路由执行期间操作回调
onCompletion 在 XML DSL 中,在路由中定义一个 onCompletion 块。 可以定义块在路由的任何位置。按照惯例,它通常放在开头。 在代码块中,当 exchange 消息在路由执行完成之后,定义要执行的处理步骤。
DSL 示例 定义 onCompletion 节点
1 2 3 4 5 6 7 8 <route > <from uri ="timer:onCompletion?period=5s" /> <onCompletion > <log message ="onCompletion triggered: ${threadName}" /> <to uri ="log:completed" /> </onCompletion > <log message ="Processing message: ${threadName}" /> </route >
运行结果 onCompletion 在路由结束之后执行
1 2 3 09:30:45.392 [Camel (MyJavaLoader) thread #2 - timer://onCompletion] INFO route2 - Processing message: Camel (MyJavaLoader) thread #2 - timer://onCompletion 09:30:45.396 [Camel (MyJavaLoader) thread #2 - timer://onCompletion] INFO route2 - onCompletion triggered: Camel (MyJavaLoader) thread #2 - timer://onCompletion 09:30:45.397 [Camel (MyJavaLoader) thread #2 - timer://onCompletion] INFO completed - Exchange[ExchangePattern: InOnly, BodyType: null, Body: [Body is null]]
回调时机 分为成功和失败回调
onCompleteOnly 定义为执行成功后回调
onFailureOnly 定义为抛异常后回调
DSL 定义 1 2 3 4 5 6 7 8 9 10 11 12 13 <route> <from uri="timer:onCompleteOnly?period=5s"/> <onCompletion onCompleteOnly="true"> <log message="onCompletion success triggered: ${threadName}"/> <to uri="log:completed"/> </onCompletion> <onCompletion onFailureOnly="true"> <log message="onCompletion fail triggered: ${threadName}"/> <to uri="log:failed"/> </onCompletion> <log message="Processing message: ${threadName}"/> <!-- <throwException message="Forced" exceptionType="java.lang.IllegalArgumentException"/>--> </route>
运行结果 1 2 3 4 5 6 7 8 9 成功回调 09:51:18.673 [Camel (MyJavaLoader) thread #1 - timer://onCompleteOnly] INFO route1 - Processing message: Camel (MyJavaLoader) thread #1 - timer://onCompleteOnly 09:51:18.681 [Camel (MyJavaLoader) thread #1 - timer://onCompleteOnly] INFO route1 - onCompletion success triggered: Camel (MyJavaLoader) thread #1 - timer://onCompleteOnly 09:51:18.684 [Camel (MyJavaLoader) thread #1 - timer://onCompleteOnly] INFO completed - Exchange[ExchangePattern: InOnly, BodyType: null, Body: [Body is null]] 失败回调 09:55:54.326 [Camel (MyJavaLoader) thread #1 - timer://onCompleteOnly] INFO route1 - Processing message: Camel (MyJavaLoader) thread #1 - timer://onCompleteOnly 09:55:54.343 [Camel (MyJavaLoader) thread #1 - timer://onCompleteOnly] INFO route1 - onCompletion fail triggered: Camel (MyJavaLoader) thread #1 - timer://onCompleteOnly 09:55:54.343 [Camel (MyJavaLoader) thread #1 - timer://onCompleteOnly] INFO failed - Exchange[ExchangePattern: InOnly, BodyType: null, Body: [Body is null]]
回调条件 使用 onWhen 谓语条件设置回调条件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 <route > <from uri ="direct:onCompletionFailureConditional" /> <onCompletion onFailureOnly ="true" > <onWhen > <simple > ${exception.class} == 'java.lang.IllegalArgumentException'</simple > </onWhen > <log message ="onFailureOnly thread: ${threadName}" /> <to uri ="mock:failed" /> </onCompletion > <log message ="Original thread: ${threadName}" /> <choice > <when > <simple > ${body} contains 'explode'</simple > <throwException message ="Forced" exceptionType ="java.lang.IllegalArgumentException" /> </when > </choice > </route >
全局回调 也可以将 onCompletion 的作用域定义在全局范围,那么每个路由执行完成都会执行回调
1 2 3 4 5 6 7 8 9 10 11 12 <onCompletion > <to uri ="log:global" /> <to uri ="mock:sync" /> </onCompletion > <route > <from uri ="direct:start" /> <process ref ="myProcessor" /> <to uri ="mock:result" /> </route >
消费模式 OnCompletion 支持两种影响路由消费者的模式:
AfterConsumer - 在消费者完成后运行的默认模式
BeforeConsumer - 在消费者完成之前运行,在消费者向被调用者写回响应之前
AfterConsumer 模式是默认模式,与较早的 Camel 版本中的行为相同。
新的 BeforeConsumer 模式用于 onCompletion 在消费者将其响应写回被调用者之前运行(如果处于 InOut 模式)。这允许 onCompletion 修改 Exchange,例如添加特殊标头,或将 Exchange 记录为响应记录器等。
1 2 3 4 5 <onCompletion mode ="BeforeConsumer" > <setHeader name ="createdBy" > <constant > Someone</constant > </setHeader > </onCompletion >
动态添加 onCompletion 可以在代码中通过 Exchange 的扩展类 ExtendedExchange 中 addOnCompletion 方法动态添加 onCompletion,增加灵活性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class ConfirmCancelProcessor implements Processor { private final Logger log = LoggerFactory.getLogger(ConfirmCancelProcessor.class); @Override public void process (Exchange exchange) throws Exception { log.info("Starting two-phase operation" ); final ProducerTemplate producerTemplate = exchange.getContext().createProducerTemplate(); producerTemplate.send("mock:start" , exchange); exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization () { @Override public void onComplete (Exchange exchange) { log.info("Completed - confirming" ); producerTemplate.send("mock:confirm" , exchange); } @Override public void onFailure (Exchange exchange) { log.info("Failed - cancelling" ); producerTemplate.send("mock:cancel" , exchange); } }); } }
参考