camel系列-聚合器

概念

EIP 模式中的聚合器允许您将多个消息组合成一条消息。

我们如何组合单个但相关消息的结果,以便将它们作为一个整体进行处理?

使用有状态过滤器(聚合器)来收集和存储单个消息,直到收到完整的相关消息集。然后,聚合器发布从各个消息中提取的单个消息。

聚合器是最复杂的 EIP 之一,具有许多功能和配置。

关联表达式(correlationExpression)

关联表达式表示聚合数据相关联的关键标识分类,如下 DSL 示例,correlationExpression 标识根据 header 的 myId 标识进行分组聚合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<route>
<from uri="direct:start"/>
<!-- log the incoming message -->
<log message="Sending ${body} with correlation key ${header.myId}"/>
<!-- aggregate using the aggregation strategy and complete when 3 messages
has been aggregated -->
<aggregate strategyRef="myAggregationStrategy" completionSize="3">
<!-- a correlation expression must be provided -->
<correlationExpression>
<!-- we use the header with key myId,
but you can use any Camel Expression you like -->
<header>myId</header>
</correlationExpression>
<!-- log the published outgoing message -->
<log message="Sending out ${body}"/>
<!-- and send it to a mock endpoint -->
<to uri="mock:result"/>
</aggregate>
</route>

聚合数据实例,A,B,C 作为为一组数据,以键 myId 值为 1 的标识 header 进行聚合

1
2
3
4
5
6
7
8
template.sendBodyAndHeader("direct:start", "A", "myId", 1);
// send the 2nd message with the same correlation key
template.sendBodyAndHeader("direct:start", "B", "myId", 1);
// the F message has another correlation key
template.sendBodyAndHeader("direct:start", "F", "myId", 2);
// now we have 3 messages with the same correlation key
// and the Aggregator should publish the message
template.sendBodyAndHeader("direct:start", "C", "myId", 1);

聚合策略(AggregationStrategy)

自定义Exchange 的聚合策略,通常是对 2 个 Exchange 数据进行合并操作。通常第一次调用 oldExchange 参数为空,在随后的调用中,oldExchange 包含合并的 Exchange。

自定义的聚合策略需要实现 AggregationStrategy 接口的 aggregate 方法,如下示例

MyAggregationStrategy 对 2 个Exchange 的数据进行了合并

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public interface AggregationStrategy {
Exchange aggregate(Exchange oldExchange, Exchange newExchange);
}

public class MyAggregationStrategy implements AggregationStrategy {

public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {

if (oldExchange == null) {
return newExchange;
}

String oldBody = oldExchange.getIn().getBody(String.class).trim();
String newBody = newExchange.getIn().getBody(String.class).trim();
String body = oldBody + newBody;
oldExchange.getIn().setBody(body);
return oldExchange;
}

}

完成条件

当在某个时候聚合Exchange时,您需要指示聚合的交换已完成,因此可以将它们发送出聚合器。Camel 允许您以各种方式指示完成,如下所示:

完成大小(completionSize)

聚合完成之前聚合的消息数,表示在达到该数据大小之后,聚合就完成了,再做聚合操作,如上 DSL 示例,**completionSize 设置为 3,完整示例的输出结果为**Sending out ABC

1
2
3
4
5
2022-04-28 15:35:41,179 [                     main] INFO  route1                         - Sending A with correlation key 1
2022-04-28 15:35:41,189 [ main] INFO route1 - Sending B with correlation key 1
2022-04-28 15:35:41,191 [ main] INFO route1 - Sending F with correlation key 2
2022-04-28 15:35:41,194 [ main] INFO route1 - Sending C with correlation key 1
2022-04-28 15:35:41,202 [ main] INFO route1 - Sending out ABC

完成超时(completionTimeout)

是一个不活动超时,如果在此期间内没有为该特定关联键聚合新交换,则会触发该超时。

  1. DSL 定义
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<route>
<from uri="direct:start"/>
<!-- log the incoming message -->
<log message="Sending ${body}"/>
<!-- aggregate using the aggregation strategy and complete when either
2 messages has been aggregated or 5 sec timeout occurred -->
<aggregate strategyRef="myAggregationStrategy" completionSize="2" completionTimeout="5000">
<!-- a correlation expression must be provided -->
<correlationExpression>
<!-- we use the xpath expression,
but you can use any Camel Expression you like -->
<xpath>/order/@customer</xpath>
</correlationExpression>
<!-- log the published outgoing message -->
<log message="Sending out ${body}"/>
<!-- and send it to a mock endpoint -->
<to uri="mock:result"/>
</aggregate>
</route>
  1. 测试消息

由于 honda 只有一条消息,等不到 completionSize 为 2 的数量大小,那么 5 秒后就超时了

1
2
3
4
5
6
7
8
9
10
@Test
public void testXML() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedMessageCount(2);
template.sendBody("direct:start", "<order name=\"motor\" amount=\"1000\" customer=\"honda\"/>");
template.sendBody("direct:start", "<order name=\"motor\" amount=\"500\" customer=\"toyota\"/>");
template.sendBody("direct:start", "<order name=\"gearbox\" amount=\"200\" customer=\"toyota\"/>");

assertMockEndpointsSatisfied();
}

完成间隔(completionInterval)

每 X 周期完成一次所有当前聚合交换,即定时触发聚合。

如下示例,设置completionInterval 属性,定义间隔时长

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<route>
<from uri="direct:start"/>
<!-- log the incoming message -->
<log message="Sending ${body}"/>
<!-- aggregate using the aggregation strategy and complete when either
2 messages has been aggregated or 5 sec timeout occurred -->
<aggregate strategyRef="myAggregationStrategy" completionSize="2" completionInterval="5000">
<!-- a correlation expression must be provided -->
<correlationExpression>
<!-- we use the xpath expression,
but you can use any Camel Expression you like -->
<xpath>/order/@customer</xpath>
</correlationExpression>
<!-- log the published outgoing message -->
<log message="Sending out ${body}"/>
<!-- and send it to a mock endpoint -->
<to uri="mock:result"/>
</aggregate>
</route>

完成谓语(completionPredicate)

当一个新的交换被聚合以确定我们是否完成时运行一个谓词。可以使用动态表达式来判断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<route>
<from uri="direct:start"/>
<!-- log the incoming message -->
<log message="Sending ${body} with correlation key ${header.myId}"/>
<!-- aggregate using the aggregation strategy and check eager for completion -->
<aggregate strategyRef="myEndAggregationStrategy" eagerCheckCompletion="true">
<!-- a correlation expression must be provided -->
<correlationExpression>
<!-- we use the header with key myId,
but you can use any Camel Expression you like -->
<header>myId</header>
</correlationExpression>
<!-- trigger completion when END message arrived -->
<completionPredicate>
<simple>${body} == 'END'</simple>
</completionPredicate>
<!-- log the published outgoing message -->
<log message="Sending out ${body}"/>
<!-- and send it to a mock endpoint -->
<to uri="mock:result"/>
</aggregate>
</route>

completionFromBatchConsumer

批处理消费者的特殊选项,允许您在批处理中的所有消息都聚合后完成。

forceCompletionOnStop

指示在上下文停止时完成所有当前聚合交换

AggregateController

允许使用外部源(AggregateController 实现)来完成组或所有组。

聚合存储库(AggregationRepository)

聚合器提供了一个可持久化的存储库接口,用于存储聚合期间的数据

  1. AggregationRepository接口定义
1
2
3
4
5
6
7
8
9
10
11
12
public interface AggregationRepository {

Exchange add(CamelContext camelContext, String key, Exchange exchange);

Exchange get(CamelContext camelContext, String key);

void remove(CamelContext camelContext, String key, Exchange exchange);

void confirm(CamelContext camelContext, String exchangeId);

Set<String> getKeys();
}
  1. 接口实现

其中 MemoryAggregationRepository 是 AggregationRepository 的默认实现,还有其他的数据库的版本实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start")
.log("Sending ${body} with correlation key ${header.myId}")
.aggregate(header("myId"), new MyAggregationStrategy())
.aggregationRepository(new MemoryAggregationRepository()).completionSize(3)
.log("Sending out ${body}")
.to("mock:result");
}
};
}

聚合器分类

列表聚合器

AbstractListAggregationStrategy 提供了 List 的操作,如下定义

子类可以实现 getValue 方法,来实现自定义列表聚合器

1
2
3
4
5
6
7
8
public class MyListOfNumbersStrategy extends AbstractListAggregationStrategy<Integer> {

@Override
public Integer getValue(Exchange exchange) {
// the message body contains a number, so just return that as-is
return exchange.getIn().getBody(Integer.class);
}
}

除上述方法之外,还可以使用构造器构建

1
2
3
AggregationStrategy agg = AggregationStrategies.flexible(Integer.class)
.accumulateInCollection(ArrayList.class)
.pick(body());

分组交换聚合器

使用 Camel 系统默认提供的 3 个聚合器

超时聚合

如果您的聚合策略实现TimeoutAwareAggregationStrategy了,那么 Camel 将timeout在超时发生时调用该方法。请注意,index 和 total 参数的值将是 -1,并且只有在配置为固定值时才会提供 timeout 参数。您不得timeout从该方法中抛出任何异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface TimeoutAwareAggregationStrategy extends AggregationStrategy {
/**
* A timeout occurred.
* <p/>
* <b>Important: </b> This method must <b>not</b> throw any exceptions.
*
* @param oldExchange the current aggregated exchange, or the original {@link Exchange} if no aggregation
* has been done before the timeout occurred
* @param index the index, may be <tt>-1</tt> if not possible to determine the index
* @param total the total, may be <tt>-1</tt> if not possible to determine the total
* @param timeout the timeout value in millis, may be <tt>-1</tt> if not possible to determine the timeout
*/
void timeout(Exchange oldExchange, int index, int total, long timeout);
}