<route> <fromuri="direct:start"/> <!-- log the incoming message --> <logmessage="Sending ${body} with correlation key ${header.myId}"/> <!-- aggregate using the aggregation strategy and complete when 3 messages has been aggregated --> <aggregatestrategyRef="myAggregationStrategy"completionSize="3"> <!-- a correlation expression must be provided --> <correlationExpression> <!-- we use the header with key myId, but you can use any Camel Expression you like --> <header>myId</header> </correlationExpression> <!-- log the published outgoing message --> <logmessage="Sending out ${body}"/> <!-- and send it to a mock endpoint --> <touri="mock:result"/> </aggregate> </route>
聚合数据实例,A,B,C 作为为一组数据,以键 myId 值为 1 的标识 header 进行聚合
1 2 3 4 5 6 7 8
template.sendBodyAndHeader("direct:start", "A", "myId", 1); // send the 2nd message with the same correlation key template.sendBodyAndHeader("direct:start", "B", "myId", 1); // the F message has another correlation key template.sendBodyAndHeader("direct:start", "F", "myId", 2); // now we have 3 messages with the same correlation key // and the Aggregator should publish the message template.sendBodyAndHeader("direct:start", "C", "myId", 1);
聚合完成之前聚合的消息数,表示在达到该数据大小之后,聚合就完成了,再做聚合操作,如上 DSL 示例,**completionSize 设置为 3,完整示例的输出结果为**Sending out ABC
1 2 3 4 5
2022-04-28 15:35:41,179 [ main] INFO route1 - Sending A with correlation key 1 2022-04-28 15:35:41,189 [ main] INFO route1 - Sending B with correlation key 1 2022-04-28 15:35:41,191 [ main] INFO route1 - Sending F with correlation key 2 2022-04-28 15:35:41,194 [ main] INFO route1 - Sending C with correlation key 1 2022-04-28 15:35:41,202 [ main] INFO route1 - Sending out ABC
完成超时(completionTimeout)
是一个不活动超时,如果在此期间内没有为该特定关联键聚合新交换,则会触发该超时。
DSL 定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
<route> <fromuri="direct:start"/> <!-- log the incoming message --> <logmessage="Sending ${body}"/> <!-- aggregate using the aggregation strategy and complete when either 2 messages has been aggregated or 5 sec timeout occurred --> <aggregatestrategyRef="myAggregationStrategy"completionSize="2"completionTimeout="5000"> <!-- a correlation expression must be provided --> <correlationExpression> <!-- we use the xpath expression, but you can use any Camel Expression you like --> <xpath>/order/@customer</xpath> </correlationExpression> <!-- log the published outgoing message --> <logmessage="Sending out ${body}"/> <!-- and send it to a mock endpoint --> <touri="mock:result"/> </aggregate> </route>
<route> <fromuri="direct:start"/> <!-- log the incoming message --> <logmessage="Sending ${body}"/> <!-- aggregate using the aggregation strategy and complete when either 2 messages has been aggregated or 5 sec timeout occurred --> <aggregatestrategyRef="myAggregationStrategy"completionSize="2"completionInterval="5000"> <!-- a correlation expression must be provided --> <correlationExpression> <!-- we use the xpath expression, but you can use any Camel Expression you like --> <xpath>/order/@customer</xpath> </correlationExpression> <!-- log the published outgoing message --> <logmessage="Sending out ${body}"/> <!-- and send it to a mock endpoint --> <touri="mock:result"/> </aggregate> </route>
<route> <fromuri="direct:start"/> <!-- log the incoming message --> <logmessage="Sending ${body} with correlation key ${header.myId}"/> <!-- aggregate using the aggregation strategy and check eager for completion --> <aggregatestrategyRef="myEndAggregationStrategy"eagerCheckCompletion="true"> <!-- a correlation expression must be provided --> <correlationExpression> <!-- we use the header with key myId, but you can use any Camel Expression you like --> <header>myId</header> </correlationExpression> <!-- trigger completion when END message arrived --> <completionPredicate> <simple>${body} == 'END'</simple> </completionPredicate> <!-- log the published outgoing message --> <logmessage="Sending out ${body}"/> <!-- and send it to a mock endpoint --> <touri="mock:result"/> </aggregate> </route>
@Override public Integer getValue(Exchange exchange) { // the message body contains a number, so just return that as-is return exchange.getIn().getBody(Integer.class); } }
如果您的聚合策略实现TimeoutAwareAggregationStrategy了,那么 Camel 将timeout在超时发生时调用该方法。请注意,index 和 total 参数的值将是 -1,并且只有在配置为固定值时才会提供 timeout 参数。您不得timeout从该方法中抛出任何异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
publicinterfaceTimeoutAwareAggregationStrategyextendsAggregationStrategy { /** * A timeout occurred. * <p/> * <b>Important: </b> This method must <b>not</b> throw any exceptions. * * @param oldExchange the current aggregated exchange, or the original {@link Exchange} if no aggregation * has been done before the timeout occurred * @param index the index, may be <tt>-1</tt> if not possible to determine the index * @param total the total, may be <tt>-1</tt> if not possible to determine the total * @param timeout the timeout value in millis, may be <tt>-1</tt> if not possible to determine the timeout */ voidtimeout(Exchange oldExchange, int index, int total, long timeout); }