camel系列-拦截器

概念

Camel 中的拦截器功能支持在路由时拦截Exchange。

拦截器类型

  • intercept:拦截路由期间发生的每个处理步骤
  • interceptFrom:仅拦截传入的步骤
  • interceptSendToEndpoint:仅在 Exchange 即将发送到给定端点时才拦截。

示例

intercept

DSL 示例

1
2
3
4
5
6
7
8
<route>
<from uri="timer:xml?period=5s"/>
<intercept>
<to uri="log:hello"/>
</intercept>
<log message="I am XML"/>
<log message="I am YAML"/>
</route>

运行日志

每个步骤都会处理拦截

1
2
3
4
13:56:05.423 [Camel (MyJavaLoader) thread #1 - timer://xml] INFO  hello - Exchange[ExchangePattern: InOnly, BodyType: null, Body: [Body is null]]
13:56:05.424 [Camel (MyJavaLoader) thread #1 - timer://xml] INFO route1 - I am XML
13:56:05.424 [Camel (MyJavaLoader) thread #1 - timer://xml] INFO hello - Exchange[ExchangePattern: InOnly, BodyType: null, Body: [Body is null]]
13:56:05.424 [Camel (MyJavaLoader) thread #1 - timer://xml] INFO route1 - I am YAML

interceptFrom

DSL 示例

1
2
3
4
5
6
7
8
<route>
<from uri="timer:xml?period=5s"/>
<interceptFrom>
<to uri="log:hello"/>
</interceptFrom>
<log message="I am XML"/>
<log message="I am YAML"/>
</route>

运行日志

仅拦截传入的步骤

1
2
3
14:11:09.483 [Camel (MyJavaLoader) thread #1 - timer://xml] INFO  hello - Exchange[ExchangePattern: InOnly, BodyType: null, Body: [Body is null]]
14:11:09.483 [Camel (MyJavaLoader) thread #1 - timer://xml] INFO route1 - I am XML
14:11:09.484 [Camel (MyJavaLoader) thread #1 - timer://xml] INFO route1 - I am YAML

interceptSendToEndpoint

DSL 示例

1
2
3
4
5
6
7
8
9
<route>
<from uri="timer:xml?period=5s"/>
<interceptSendToEndpoint>
<to uri="log:hello"/>
</interceptSendToEndpoint>
<log message="I am XML"/>
<log message="I am YAML"/>
<to uri="log:result" />
</route>

运行日志

1
2
3
4
14:13:53.571 [Camel (MyJavaLoader) thread #1 - timer://xml] INFO  route1 - I am XML
14:13:53.572 [Camel (MyJavaLoader) thread #1 - timer://xml] INFO route1 - I am YAML
14:13:53.572 [Camel (MyJavaLoader) thread #1 - timer://xml] INFO hello - Exchange[ExchangePattern: InOnly, BodyType: null, Body: [Body is null]]
14:13:53.573 [Camel (MyJavaLoader) thread #1 - timer://xml] INFO result - Exchange[ExchangePattern: InOnly, BodyType: null, Body: [Body is null]]14:11:09.483 [Camel (MyJavaLoader) thread #1 - timer://xml] INFO hello - Exchange[ExchangePattern: InOnly, BodyType: null, Body: [Body is null]]

作用域

分为全局和局部路由范围,上面例子均为路由范围

全局拦截配置

DSL 配置

1
2
3
4
5
6
7
8
9
10
11
12
<routeConfiguration>
<interceptFrom uri="*">
<log message="Message intercepted from ${header.CamelInterceptedEndpoint}"/>
</interceptFrom>
</routeConfiguration>

<route>
<from uri="timer:xml?period=5s"/>
<log message="I am XML"/>
<log message="I am YAML"/>
<to uri="log:result" />
</route>

运行日志

1
2
3
4
14:25:42.571 [Camel (MyJavaLoader) thread #1 - timer://xml] INFO  route1 - Message intercepted from timer://xml?period=5s
14:25:42.571 [Camel (MyJavaLoader) thread #1 - timer://xml] INFO route1 - I am XML
14:25:42.572 [Camel (MyJavaLoader) thread #1 - timer://xml] INFO route1 - I am YAML
14:25:42.572 [Camel (MyJavaLoader) thread #1 - timer://xml] INFO result - Exchange[ExchangePattern: InOnly, BodyType: null, Body: [Body is null]]

拦截条件

使用 when 添加一个谓语条件

1
2
3
4
5
6
<intercept>
<when>
<simple>${in.body} contains 'Hello'</simple>
</when>
<to uri="mock:intercepted"/>
</intercept>

拦截后停止路由

使用 stop 方法,即拦截后不再执行路由流程

1
2
3
4
5
6
7
<intercept>
<when>
<simple>${body} contains 'Hello'</simple>
<to uri="log:test"/>
<stop/> <!-- stop continue routing -->
</when>
</intercept>

特定端点拦截跳过绕道

对特定端点进行拦截并跳过

以下 DSL 中,原 kafka 端点将不会被送达,而是被拦截绕道到 mock:kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<camelContext>

<interceptSendToEndpoint uri="kafka*" skipSendToOriginalEndpoint="true">
<to uri="mock:kafka"/>
</intercept>

<route>
<from uri="jms:queue:order"/>
<to uri="bean:validateOrder"/>
<to uri="bean:processOrder"/>
<to uri="kafka:order"/>
</route>

</camelContext>

参考