camel系列-复合消息处理器

概念

使用组合消息处理器来处理组合消息。组合消息处理器将消息拆分,将子消息路由到适当的目的地,并将响应重新聚合回单个消息。

示例说明

  1. 拆分处理
    1. 使用 split 对消息进行拆分
    2. 使用 bean 为 MyOrderService 的 handleOrder 方法进行单条消息处理
    3. 自定义 MyOrderStrategy 聚合策略,对输出结果消息进行聚合处理
  2. 拆分处理结束标识:处理完之后使用 end 方法说明结束标识
  3. 聚合处理:使用 bean 为 MyOrderService 的 buildCombinedResponse 方法进行聚合的结果消息进行处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
public class SplitAggregateInOutTest extends ContextTestSupport {

private static final Logger LOG = LoggerFactory.getLogger(SplitAggregateInOutTest.class);

private String expectedBody = "Response[(id=1,item=A);(id=2,item=B);(id=3,item=C)]";

@Test
public void testSplitAndAggregateInOut() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedBodiesReceived(expectedBody);

// use requestBody as its InOut
Object out = template.requestBody("direct:start", "A@B@C");
assertEquals(expectedBody, out);
LOG.debug("Response to caller: " + out);

assertMockEndpointsSatisfied();
}

@Override
protected Registry createRegistry() throws Exception {
Registry jndi = super.createRegistry();
jndi.bind("MyOrderService", new MyOrderService());
return jndi;
}

@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
// START SNIPPET: e1
// this routes starts from the direct:start endpoint
// the body is then splitted based on @ separator
// the splitter in Camel supports InOut as well and for that we
// need
// to be able to aggregate what response we need to send back,
// so we provide our
// own strategy with the class MyOrderStrategy.
from("direct:start").split(body().tokenize("@"), new MyOrderStrategy())
// each splitted message is then send to this bean where we
// can process it
.to("bean:MyOrderService?method=handleOrder")
// this is important to end the splitter route as we do not
// want to do more routing
// on each splitted message
.end()
// after we have splitted and handled each message we want
// to send a single combined
// response back to the original caller, so we let this bean
// build it for us
// this bean will receive the result of the aggregate
// strategy: MyOrderStrategy
.to("bean:MyOrderService?method=buildCombinedResponse")
// END SNIPPET: e1
.to("mock:result");
}
};
}

// START SNIPPET: e2
public static class MyOrderService {

private static int counter;

/**
* We just handle the order by returning a id line for the order
*/
public String handleOrder(String line) {
LOG.debug("HandleOrder: " + line);
return "(id=" + ++counter + ",item=" + line + ")";
}

/**
* We use the same bean for building the combined response to send back to the original caller
*/
public String buildCombinedResponse(String line) {
LOG.debug("BuildCombinedResponse: " + line);
return "Response[" + line + "]";
}
}
// END SNIPPET: e2

// START SNIPPET: e3
/**
* This is our own order aggregation strategy where we can control how each splitted message should be combined. As
* we do not want to loos any message we copy from the new to the old to preserve the order lines as long we process
* them
*/
public static class MyOrderStrategy implements AggregationStrategy {

@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
// put order together in old exchange by adding the order from new
// exchange

if (oldExchange == null) {
// the first time we aggregate we only have the new exchange,
// so we just return it
return newExchange;
}

String orders = oldExchange.getIn().getBody(String.class);
String newLine = newExchange.getIn().getBody(String.class);

LOG.debug("Aggregate old orders: " + orders);
LOG.debug("Aggregate new order: " + newLine);

// put orders together separating by semi colon
orders = orders + ";" + newLine;
// put combined order back on old to preserve it
oldExchange.getIn().setBody(orders);

// return old as this is the one that has all the orders gathered
// until now
return oldExchange;
}
}
// END SNIPPET: e3

}

参考: