概念
Camel 支持来自 EIP 模式的 Resequencer。
我们如何才能将相关但无序的消息流恢复到正确的顺序?

使用有状态过滤器 Resequencer 来收集和重新排序消息,以便它们可以按指定顺序发布到输出通道。
Camel 中的 Resequencer 实现使用 Expression 作为 Comparator 重新排序消息的方法。通过使用表达式,消息可以很容易地按消息头或消息的另一部分重新排序。
resequence 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Test public void testBatchResequencerNoDuplicate() throws Exception { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start").resequence(header("id")).to("log:result"); } }); context.start();
MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedBodiesReceived("A", "C", "E", "F");
template.sendBodyAndHeader("direct:start", "A", "id", "1"); template.sendBodyAndHeader("direct:start", "C", "id", "2"); template.sendBodyAndHeader("direct:start", "D", "id", "2"); template.sendBodyAndHeader("direct:start", "F", "id", "4"); template.sendBodyAndHeader("direct:start", "B", "id", "1"); template.sendBodyAndHeader("direct:start", "E", "id", "3");
assertMockEndpointsSatisfied(); }
|
日志输出
1 2 3 4
| 2022-05-05 11:32:37,588 [ thread #1 - Batch Sender] INFO result - Exchange[ExchangePattern: InOnly, BodyType: String, Body: A] 2022-05-05 11:32:37,589 [ thread #1 - Batch Sender] INFO result - Exchange[ExchangePattern: InOnly, BodyType: String, Body: C] 2022-05-05 11:32:37,589 [ thread #1 - Batch Sender] INFO result - Exchange[ExchangePattern: InOnly, BodyType: String, Body: E] 2022-05-05 11:32:37,590 [ thread #1 - Batch Sender] INFO result - Exchange[ExchangePattern: InOnly, BodyType: String, Body: F]
|
允许重复(allowDuplicates)
当允许重复时,重排序器保留重复的消息,而不是只保留最后一个重复的消息。
在批处理模式下,您可以按如下方式(allowDuplicates)打开重复项:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Test public void testBatchResequencerNoDuplicate() throws Exception { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start").resequence(header("id")).allowDuplicates().to("log:result"); } }); context.start();
MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedBodiesReceived("A", "C", "E", "F");
template.sendBodyAndHeader("direct:start", "A", "id", "1"); template.sendBodyAndHeader("direct:start", "C", "id", "2"); template.sendBodyAndHeader("direct:start", "D", "id", "2"); template.sendBodyAndHeader("direct:start", "F", "id", "4"); template.sendBodyAndHeader("direct:start", "B", "id", "1"); template.sendBodyAndHeader("direct:start", "E", "id", "3");
assertMockEndpointsSatisfied(); }
|
逆序
使用 reverse 方法实现逆序排列
1 2 3 4 5 6
| public void configure() throws Exception { from("direct:start").resequence(header("id")) .allowDuplicates() .reverse() .to("log:result"); }
|
1 2 3 4 5 6
| 2022-05-05 11:35:07,658 [ thread #1 - Batch Sender] INFO result - Exchange[ExchangePattern: InOnly, BodyType: String, Body: F] 2022-05-05 11:35:07,659 [ thread #1 - Batch Sender] INFO result - Exchange[ExchangePattern: InOnly, BodyType: String, Body: E] 2022-05-05 11:35:07,659 [ thread #1 - Batch Sender] INFO result - Exchange[ExchangePattern: InOnly, BodyType: String, Body: C] 2022-05-05 11:35:07,659 [ thread #1 - Batch Sender] INFO result - Exchange[ExchangePattern: InOnly, BodyType: String, Body: D] 2022-05-05 11:35:07,659 [ thread #1 - Batch Sender] INFO result - Exchange[ExchangePattern: InOnly, BodyType: String, Body: A] 2022-05-05 11:35:07,660 [ thread #1 - Batch Sender] INFO result - Exchange[ExchangePattern: InOnly, BodyType: String, Body: B]
|