camel系列-重排器(resequence)

概念

Camel 支持来自 EIP 模式的 Resequencer。

我们如何才能将相关但无序的消息流恢复到正确的顺序?

使用有状态过滤器 Resequencer 来收集和重新排序消息,以便它们可以按指定顺序发布到输出通道。

Camel 中的 Resequencer 实现使用 Expression 作为 Comparator 重新排序消息的方法。通过使用表达式,消息可以很容易地按消息头或消息的另一部分重新排序。

resequence 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Test
public void testBatchResequencerNoDuplicate() throws Exception {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start").resequence(header("id")).to("log:result");
}
});
context.start();

MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedBodiesReceived("A", "C", "E", "F");

template.sendBodyAndHeader("direct:start", "A", "id", "1");
template.sendBodyAndHeader("direct:start", "C", "id", "2");
template.sendBodyAndHeader("direct:start", "D", "id", "2");
template.sendBodyAndHeader("direct:start", "F", "id", "4");
template.sendBodyAndHeader("direct:start", "B", "id", "1");
template.sendBodyAndHeader("direct:start", "E", "id", "3");

assertMockEndpointsSatisfied();
}

日志输出

1
2
3
4
2022-05-05 11:32:37,588 [ thread #1 - Batch Sender] INFO  result                         - Exchange[ExchangePattern: InOnly, BodyType: String, Body: A]
2022-05-05 11:32:37,589 [ thread #1 - Batch Sender] INFO result - Exchange[ExchangePattern: InOnly, BodyType: String, Body: C]
2022-05-05 11:32:37,589 [ thread #1 - Batch Sender] INFO result - Exchange[ExchangePattern: InOnly, BodyType: String, Body: E]
2022-05-05 11:32:37,590 [ thread #1 - Batch Sender] INFO result - Exchange[ExchangePattern: InOnly, BodyType: String, Body: F]

允许重复(allowDuplicates)

当允许重复时,重排序器保留重复的消息,而不是只保留最后一个重复的消息。

在批处理模式下,您可以按如下方式(allowDuplicates)打开重复项:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Test
public void testBatchResequencerNoDuplicate() throws Exception {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start").resequence(header("id")).allowDuplicates().to("log:result");
}
});
context.start();

MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedBodiesReceived("A", "C", "E", "F");

template.sendBodyAndHeader("direct:start", "A", "id", "1");
template.sendBodyAndHeader("direct:start", "C", "id", "2");
template.sendBodyAndHeader("direct:start", "D", "id", "2");
template.sendBodyAndHeader("direct:start", "F", "id", "4");
template.sendBodyAndHeader("direct:start", "B", "id", "1");
template.sendBodyAndHeader("direct:start", "E", "id", "3");

assertMockEndpointsSatisfied();
}

逆序

使用 reverse 方法实现逆序排列

1
2
3
4
5
6
public void configure() throws Exception {
from("direct:start").resequence(header("id"))
.allowDuplicates()
.reverse()
.to("log:result");
}
1
2
3
4
5
6
2022-05-05 11:35:07,658 [ thread #1 - Batch Sender] INFO  result                         - Exchange[ExchangePattern: InOnly, BodyType: String, Body: F]
2022-05-05 11:35:07,659 [ thread #1 - Batch Sender] INFO result - Exchange[ExchangePattern: InOnly, BodyType: String, Body: E]
2022-05-05 11:35:07,659 [ thread #1 - Batch Sender] INFO result - Exchange[ExchangePattern: InOnly, BodyType: String, Body: C]
2022-05-05 11:35:07,659 [ thread #1 - Batch Sender] INFO result - Exchange[ExchangePattern: InOnly, BodyType: String, Body: D]
2022-05-05 11:35:07,659 [ thread #1 - Batch Sender] INFO result - Exchange[ExchangePattern: InOnly, BodyType: String, Body: A]
2022-05-05 11:35:07,660 [ thread #1 - Batch Sender] INFO result - Exchange[ExchangePattern: InOnly, BodyType: String, Body: B]