概念 如何使用消息传递来使得釆用不同数据格式的系统之间实现通信?
通过一个 Expression 表达式进行转换
1 2 3 4 5 6 7 <route > <from uri ="direct:cheese" /> <transform > <simple > Hello ${body}</simple > </transform > <to uri ="log:hello" /> </route >
通过 setBody 方法转换 1 2 3 from("direct:cheese" ) .setBody(simple("Hello ${body}" )) .to("log:hello" );
Transform 和 setBody 的区别 两者几乎是等价的
Transform 在 out Message 的结果转换
setBody 通过在 Exchange 对象设置
1 2 3 4 5 6 7 from("direct:start" ) .process(new Processor () { public void process (Exchange exchange) throws Exception { exchange.getIn().setBody("Hello " + exchange.getIn().getBody() + "!" ); } }) .to("mock:result" );
类型转换 1 2 3 4 5 <route > <from uri ="file:inbox" /> <convertBodyTo type ="String" /> <log message ="The file content: ${body}" /> </route >
执行器 在执行器中通过通过 setBody 方法转换
编解码 类似于序列化和反序列化
marshal
- 把一个 Message 编译成常见的数据格式,例如编译 java 对象成 XML, CSV, Json 等
unmarshal
- 反编译,把常见数据格式转换成 Message
如下 DSL 示例:unmarshal
将 xml 文件解码为对象,marshal 将 java 对象编码为一个 xml 文件
1 2 3 4 5 6 7 <route > <from uri ="file:inbox/xml" /> <unmarshal > <jaxb /> </unmarshal > <to uri ="bean:validateOrder" /> <marshal > <jaxb /> </marshal > <to uri ="jms:queue:order" /> </route >
Bean 当实例方法有返回值时,可以通过 Bean 进行自定义转换
DSL 示例
1 from("direct:start" ).bean(new ExampleBean (), "methodName" );
在 transform 中调用
1 2 3 4 5 6 7 <route > <from uri ="direct:start" /> <transform > <method ref ="htmlBean" method ="toHtml" /> </transform > <to uri ="mock:result" /> </route >
在 bean 中调用,如下示例
直接调用 bean,跟上面的 transform 方法最终效果是一致的,但是语义上理解是不同的。根据有无返回值返回,分为以下 2 个场景
toHtml 方法没有返回值,那么就是一个无数据处理的处理器(processor)
toHtml 方法有返回值,则是一个隐藏的转换器
1 2 3 4 5 <route > <from uri ="direct:start" /> <bean ref ="htmlBean" method ="toHtml" /> <to uri ="mock:result" /> </route >
有返回值但不转换处理 还有第 3 种场景,即有返回值,但不想做数据转换,只想当处理器处理。很多方法处理器之后都会返回 true,false 结果,表示执行是否成功,针对这种场景,在调用 bean 方法时,需要将其执行结果放在 header 中,不要做直接的转换,如下示例
1 2 3 4 5 6 7 <route > <from uri ="direct:start" /> <setHeader name ="toHtmlResult" > <bean ref ="htmlBean" method ="toHtml" /> </setHeader > <to uri ="mock:result" /> </route >
内容扩充器 enrich 以下示例,通过 http:remoteserver/foo 获取的内容扩展,原有 Exchange 的信息
AggregationStrategy
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class ExampleAggregationStrategy implements AggregationStrategy { public Exchange aggregate (Exchange newExchange, Exchange oldExchange) { if (newExchange == null ) { return oldExchange; } Object oldBody = oldExchange.getIn().getBody(); Object newBody = newExchange.getIn().getBody(); oldExchange.getIn().setBody(oldBody + ":" + newBody); return oldExchange; } }
DSL
1 2 3 4 5 6 7 8 9 10 11 <bean id ="myStrategy" class ="com.foo.ExampleAggregationStrategy" /> <camelContext id ="camel" xmlns ="http://camel.apache.org/schema/spring" > <route > <from uri ="direct:start" /> <enrich aggregationStrategy ="myStrategy" > <constant > http:remoteserver/foo</constant > </enrich > <to uri ="mock:result" /> </route > </camelContext >
Poll Enrich pollEnrich 使用轮询消费者来获取附加数据。它通常用于事件消息消息传递,例如读取文件或下载 FTP 文件。
工作方式与 pollEnrich 相同 enrich,但是由于它使用轮询消费者,因此我们在轮询时有三种方法:
receive:等待消息可用,然后返回。警告如果没有可用消息,此方法可能会无限期阻塞。
receiveNoWait:尝试立即接收消息交换而无需等待,null 如果消息交换尚不可用则返回。
receive(timeout):尝试接收消息交换,如果消息尚不可用,则等待给定的超时到期。返回消息或 null 超时是否过期。
DSL 示例
1 2 3 4 5 6 7 8 9 10 11 <bean id ="myStrategy" class ="com.foo.ExampleAggregationStrategy" /> <camelContext id ="camel" xmlns ="http://camel.apache.org/schema/spring" > <route > <from uri ="direct:start" /> <pollEnrich timeout ="10000" aggregationStrategy ="myStrategy" > <constant > "file:inbox?fileName=data.txt"</constant > </pollEnrich > <to uri ="mock:result" /> </route > </camelContext >
通过对 header 的添加和删除操作
1 2 3 4 5 6 7 <route > <from uri ="direct:a" /> <setHeader name ="myHeader" > <constant > test</constant > </setHeader > <to uri ="direct:b" /> </route >
1 2 3 4 5 <route > <from uri ="seda:b" /> <removeHeader name ="myHeader" /> <to uri ="mock:result" /> </route >
1 2 3 4 5 <route > <from uri ="seda:b" /> <removeHeaders pattern ="Camel*" /> <to uri ="mock:result" /> </route >
参考