camel系列-分解器(SPLIT)

概念

如果消息包含多个元素,我们如何处理它,每个元素都可能需要以不同的方式处理?

使用拆分器将复合消息分解为一系列单独的消息,每个消息都包含与一个项目相关的数据。

EIP 模式中的分解器允许您将消息拆分为多个部分并单独处理它们。

split 方法

使用split 方法对消息体进行拆分处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class SplitterABCTest extends CamelTestSupport {

@Test
public void testSplitABC() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:split");
mock.expectedBodiesReceived("A", "B", "C");

List<String> body = new ArrayList<String>();
body.add("A");
body.add("B");
body.add("C");

template.sendBody("direct:start", body);

assertMockEndpointsSatisfied();
}

@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start")
.split(body())
.log("Split line ${body}")
.to("mock:split");
}
};
}
}

XML DSL 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
<route>
<from uri="direct:start"/>
<split>
<!-- we have to use an Expression to define this, here we use the Camel EL (Simple)
to just refer to the message body -->
<!-- split the body -->
<simple>body</simple>
<!-- log each splitted message -->
<log message="Split line ${body}"/>
<!-- and send it to a mock endpoint -->
<to uri="mock:split"/>
</split>
</route>

输出结果

1
2
3
2022-05-05 10:01:44,831 [                     main] INFO  route1                         - Split line A
2022-05-05 10:01:44,831 [ main] INFO route1 - Split line B
2022-05-05 10:01:44,831 [ main] INFO route1 - Split line C

支持拆分的消息类型

一个常见的用例是拆分列表/集合/集合/映射、数组或可从消息正文中迭代的内容。

拆分 EIP 默认会根据值类型拆分消息体:

  • java.util.Collection- 按集合/列表/集合中的每个元素拆分。
  • java.util.MapMap.Entry-从地图上逐一分割。
  • Object[]- 按每个元素拆分数组
  • Iterator- 由迭代器分裂
  • Iterable- 按可迭代拆分
  • org.w3c.dom.NodeList- 按列表中的每个元素拆分 XML 文档
  • String- 用逗号分割字符串值作为分隔符

对于任何其他类型,消息体不会被拆分,而是按原样使用,这意味着拆分 EIP 将被拆分为单个消息(相同)。

并行处理拆分

您可以使用拆分 EIP 启用并行处理,以便每个拆分消息由其自己的线程并行处理。

下面的示例启用了并行模式:

1
2
3
4
5
6
7
8
9
10
11
12
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start")
.split(body()).parallelProcessing()
.log("Split line ${body}")
.to("mock:split");
}
};
}

输出结果:

1
2
3
2022-05-05 10:11:22,284 [amel-1) thread #1 - Split] INFO  route1                         - Split line A
2022-05-05 10:11:22,284 [amel-1) thread #2 - Split] INFO route1 - Split line B
2022-05-05 10:11:22,284 [amel-1) thread #4 - Split] INFO route1 - Split line C

结束拆分块

您可能希望在拆分 EIP 之后继续路由交换。在 Java DSL 中,您需要使用end()来标记分叉的结束位置,以及可以添加其他 EIP 以继续路由的位置。

在上面的示例中,发送到 mock:result 发生在 Spllit EIP 完成之后。换句话说,消息应该在消息继续之前完成整个消息的拆分。

1
2
3
4
5
6
7
from("direct:a")
.split(body()).parallelProcessing()
.to("direct:x")
.to("direct:y")
.to("direct:z")
.end()
.to("mock:result");

输出结果

1
2
3
4
2022-05-05 10:14:56,159 [amel-1) thread #3 - Split] INFO  route1                         - Split line C
2022-05-05 10:14:56,159 [amel-1) thread #2 - Split] INFO route1 - Split line B
2022-05-05 10:14:56,159 [amel-1) thread #1 - Split] INFO route1 - Split line A
2022-05-05 10:14:56,162 [ main] INFO route1 - List [A, B, C]

聚合

AggregationStrategy 用于将所有拆分交换聚合在一起作为单个响应交换,该交换成为拆分 EIP 块之后的传出交换。即通过聚合方法,对原有消息体进行转换作为输出结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start")
// tell Splitter to use the aggregation strategy
.split(body(), new MyAggregationStrategy())
// log each splitted message
.log("Split line ${body}")
// and have them translated into a quote
//.bean(WordTranslateBean.class)
// and send it to a mock
.to("mock:split")
.end()
// log the outgoing aggregated message
.log("Aggregated ${body}")
// and send it to a mock as well
.to("mock:result");
}
};
}

聚合策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class MyAggregationStrategy implements AggregationStrategy {

public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
// this is the first time so no existing aggregated exchange
return newExchange;
}

// append the new word to the existing
String body = newExchange.getIn().getBody(String.class).trim();
String existing = oldExchange.getIn().getBody(String.class).trim();

oldExchange.getIn().setBody(existing + "+" + body);
return oldExchange;
}
}

输出结果

1
2
3
4
2022-05-05 10:29:54,080 [                     main] INFO  route1                         - Split line A
2022-05-05 10:29:54,081 [ main] INFO route1 - Split line B
2022-05-05 10:29:54,081 [ main] INFO route1 - Split line C
2022-05-05 10:29:54,082 [ main] INFO route1 - Aggregated A+B+C

拆分模式 tokenize

当使用 split 模式时,内部处理逻辑会将字符串类型**默认使用”,”**分隔符拆分为数组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
    @Test
public void testSplitAggregateABC() throws Exception {
MockEndpoint split = getMockEndpoint("mock:split");
// we expect 3 messages to be split and translated into a quote
split.expectedBodiesReceived("Camel rocks", "Hi mom", "Yes it works");

MockEndpoint result = getMockEndpoint("mock:result");
// and one combined aggregated message as output with all the quotes together
result.expectedBodiesReceived("Camel rocks+Hi mom+Yes it works");

template.sendBody("direct:start", "A,B,C");

assertMockEndpointsSatisfied();
}

实现代码片段

1
2
3
4
5
6
7
8
9
10
11
12
13
private SplitterIterable(Exchange exchange, Object value) {
this.original = exchange;
this.value = value;
this.iterator = ObjectHelper.createIterator(value);
this.copy = copyExchangeNoAttachments(exchange, true);
this.routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null;
}

private static final String DEFAULT_DELIMITER = ",";

public static Iterator<Object> createIterator(Object value) {
return createIterator(value, DEFAULT_DELIMITER);
}

tokenize 功能示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
public class SplitTokenizeTest extends CamelTestSupport {

@Test
public void testSplitTokenizerA() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:split");
mock.expectedBodiesReceived("Claus", "James", "Willem");

template.sendBody("direct:a", "Claus,James,Willem");

assertMockEndpointsSatisfied();
}

@Test
public void testSplitTokenizerB() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:split");
mock.expectedBodiesReceived("Claus", "James", "Willem");

template.sendBodyAndHeader("direct:b", "Hello World", "myHeader", "Claus,James,Willem");

assertMockEndpointsSatisfied();
}

@Test
public void testSplitTokenizerC() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:split");
mock.expectedBodiesReceived("Claus", "James", "Willem");

template.sendBody("direct:c", "Claus James Willem");

assertMockEndpointsSatisfied();
}

@Test
public void testSplitTokenizerD() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:split");
mock.expectedBodiesReceived("[Claus]", "[James]", "[Willem]");

template.sendBody("direct:d", "[Claus][James][Willem]");

assertMockEndpointsSatisfied();
}

@Test
public void testSplitTokenizerE() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:split");
mock.expectedBodiesReceived("<person>Claus</person>", "<person>James</person>", "<person>Willem</person>");

String xml = "<persons><person>Claus</person><person>James</person><person>Willem</person></persons>";
template.sendBody("direct:e", xml);

assertMockEndpointsSatisfied();
}

@Test
public void testSplitTokenizerEWithSlash() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:split");
String xml = "<persons><person attr='/' /></persons>";
mock.expectedBodiesReceived("<person attr='/' />");
template.sendBody("direct:e", xml);
mock.assertIsSatisfied();
assertMockEndpointsSatisfied();
}

@Test
public void testSplitTokenizerF() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:split");
mock.expectedBodiesReceived("<person name=\"Claus\"/>", "<person>James</person>", "<person>Willem</person>");

String xml = "<persons><person/><person name=\"Claus\"/><person>James</person><person>Willem</person></persons>";
template.sendBody("direct:f", xml);

assertMockEndpointsSatisfied();
}

@Override
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
@Override
public void configure() {

from("direct:a")
.split().tokenize(",")
.to("mock:split");

from("direct:b")
.split().tokenize(",", "myHeader")
.to("mock:split");

from("direct:c")
.split().tokenize("(\\W+)\\s*", null, true)
.to("mock:split");

from("direct:d")
.split().tokenizePair("[", "]", true)
.to("mock:split");

from("direct:e")
.split().tokenizeXML("person")
.to("mock:split");

from("direct:f")
.split().xpath("//person")
// To test the body is not empty
// it will call the ObjectHelper.evaluateValuePredicate()
.filter().simple("${body}")
.to("mock:split");

}
};
}
}

参考: