camel系列-消息转换

概念

如何使用消息传递来使得釆用不同数据格式的系统之间实现通信?

Transform

通过一个 Expression 表达式进行转换

1
2
3
4
5
6
7
<route>
<from uri="direct:cheese"/>
<transform>
<simple>Hello ${body}</simple>
</transform>
<to uri="log:hello"/>
</route>

通过 setBody 方法转换

1
2
3
from("direct:cheese")
.setBody(simple("Hello ${body}"))
.to("log:hello");

Transform 和 setBody 的区别

两者几乎是等价的

  • Transform 在 out Message 的结果转换
  • setBody 通过在 Exchange 对象设置
1
2
3
4
5
6
7
from("direct:start")
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
exchange.getIn().setBody("Hello " + exchange.getIn().getBody() + "!");
}
})
.to("mock:result");

类型转换

1
2
3
4
5
<route>
<from uri="file:inbox"/>
<convertBodyTo type="String"/>
<log message="The file content: ${body}"/>
</route>

执行器

在执行器中通过通过 setBody 方法转换

编解码

类似于序列化和反序列化

  • marshal - 把一个 Message 编译成常见的数据格式,例如编译 java 对象成 XML, CSV, Json 等
  • unmarshal - 反编译,把常见数据格式转换成 Message

如下 DSL 示例:unmarshal将 xml 文件解码为对象,marshal 将 java 对象编码为一个 xml 文件

1
2
3
4
5
6
7
<route>
<from uri="file:inbox/xml"/>
<unmarshal><jaxb/></unmarshal>
<to uri="bean:validateOrder"/>
<marshal><jaxb/></marshal>
<to uri="jms:queue:order"/>
</route>

Bean

当实例方法有返回值时,可以通过 Bean 进行自定义转换

DSL 示例

1
from("direct:start").bean(new ExampleBean(), "methodName");

在 transform 中调用

1
2
3
4
5
6
7
<route>
<from uri="direct:start"/>
<transform>
<method ref="htmlBean" method="toHtml"/>
</transform>
<to uri="mock:result"/>
</route>

在 bean 中调用,如下示例

直接调用 bean,跟上面的 transform 方法最终效果是一致的,但是语义上理解是不同的。根据有无返回值返回,分为以下 2 个场景

  • toHtml 方法没有返回值,那么就是一个无数据处理的处理器(processor)
  • toHtml 方法有返回值,则是一个隐藏的转换器
1
2
3
4
5
<route>
<from uri="direct:start"/>
<bean ref="htmlBean" method="toHtml"/>
<to uri="mock:result"/>
</route>

有返回值但不转换处理

还有第 3 种场景,即有返回值,但不想做数据转换,只想当处理器处理。很多方法处理器之后都会返回 true,false 结果,表示执行是否成功,针对这种场景,在调用 bean 方法时,需要将其执行结果放在 header 中,不要做直接的转换,如下示例

1
2
3
4
5
6
7
<route>
<from uri="direct:start"/>
<setHeader name="toHtmlResult">
<bean ref="htmlBean" method="toHtml"/>
</setHeader>
<to uri="mock:result"/>
</route>

内容扩充器

enrich

以下示例,通过 http:remoteserver/foo 获取的内容扩展,原有 Exchange 的信息

  1. AggregationStrategy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ExampleAggregationStrategy implements AggregationStrategy {

public Exchange aggregate(Exchange newExchange, Exchange oldExchange) {
// this is just an example, for real-world use-cases the
// aggregation strategy would be specific to the use-case

if (newExchange == null) {
return oldExchange;
}
Object oldBody = oldExchange.getIn().getBody();
Object newBody = newExchange.getIn().getBody();
oldExchange.getIn().setBody(oldBody + ":" + newBody);
return oldExchange;
}

}
  1. DSL
1
2
3
4
5
6
7
8
9
10
11
<bean id="myStrategy" class="com.foo.ExampleAggregationStrategy"/>

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start"/>
<enrich aggregationStrategy="myStrategy">
<constant>http:remoteserver/foo</constant>
</enrich>
<to uri="mock:result"/>
</route>
</camelContext>

Poll Enrich

pollEnrich 使用轮询消费者来获取附加数据。它通常用于事件消息消息传递,例如读取文件或下载 FTP 文件。

工作方式与 pollEnrich 相同 enrich,但是由于它使用轮询消费者,因此我们在轮询时有三种方法:

  • receive:等待消息可用,然后返回。警告如果没有可用消息,此方法可能会无限期阻塞。
  • receiveNoWait:尝试立即接收消息交换而无需等待,null 如果消息交换尚不可用则返回。
  • receive(timeout):尝试接收消息交换,如果消息尚不可用,则等待给定的超时到期。返回消息或 null 超时是否过期。

DSL 示例

1
2
3
4
5
6
7
8
9
10
11
<bean id="myStrategy" class="com.foo.ExampleAggregationStrategy"/>

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start"/>
<pollEnrich timeout="10000" aggregationStrategy="myStrategy">
<constant>"file:inbox?fileName=data.txt"</constant>
</pollEnrich>
<to uri="mock:result"/>
</route>
</camelContext>

Header

通过对 header 的添加和删除操作

setHeader

1
2
3
4
5
6
7
<route>
<from uri="direct:a"/>
<setHeader name="myHeader">
<constant>test</constant>
</setHeader>
<to uri="direct:b"/>
</route>

removeHeader

1
2
3
4
5
<route>
<from uri="seda:b"/>
<removeHeader name="myHeader"/>
<to uri="mock:result"/>
</route>

removeHeaders

1
2
3
4
5
<route>
<from uri="seda:b"/>
<removeHeaders pattern="Camel*"/>
<to uri="mock:result"/>
</route>

参考