camel系列-幂等消费(IdempotentConsumer)

概念

即使发送者应用只发送了一次消息,接收者应用也可能会多次接收到相同的消息。

消息接收者如何处理重复的消息?

把接收者设计成幕等接收者(Idempotent Receiver),它能安全地多次接收相同的消息。

idempotentConsumer 方法

  1. IdempotentRepository 接口

通过 IdempotentRepository 接口自定义幂等逻辑实现

  1. 通过 idempotentConsumer 方法指定 IdempotentRepository

被幂等后的消息将不再被消费,如下示例,key 为 1,2,3 的数据只被消费一次,不会重复消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
public class ExchangeIdempotentConsumerTest extends ContextTestSupport {
protected Endpoint startEndpoint;
protected MockEndpoint resultEndpoint;

private MyIdempotentRepo repo = new MyIdempotentRepo();

@Override
public boolean isUseRouteBuilder() {
return false;
}

@Test
public void testDuplicateMessagesAreFilteredOut() throws Exception {
assertEquals(0, repo.getExchange().size());

context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start").idempotentConsumer(header("messageId"), repo)
.to("mock:result");
}
});
context.start();

resultEndpoint.expectedBodiesReceived("one", "two", "three");

sendMessage("1", "one");
sendMessage("2", "two");
sendMessage("1", "one");
sendMessage("2", "two");
sendMessage("1", "one");
sendMessage("3", "three");

assertMockEndpointsSatisfied();

// we used 6 different exchanges
assertEquals(6, repo.getExchange().size());

for (Exchange exchange : resultEndpoint.getExchanges()) {
// should be in repo list
assertTrue(repo.getExchange().contains(exchange.getExchangeId()), "Should contain the exchange");
}
}

protected void sendMessage(final Object messageId, final Object body) {
template.send(startEndpoint, new Processor() {
public void process(Exchange exchange) {
// now lets fire in a message
Message in = exchange.getIn();
in.setBody(body);
in.setHeader("messageId", messageId);
}
});
}

@Override
@BeforeEach
public void setUp() throws Exception {
super.setUp();

startEndpoint = resolveMandatoryEndpoint("direct:start");
resultEndpoint = getMockEndpoint("mock:result");
}

private final class MyIdempotentRepo implements IdempotentRepository {

private IdempotentRepository delegate;
private Set<String> exchanges = new LinkedHashSet<>();

private MyIdempotentRepo() {
delegate = MemoryIdempotentRepository.memoryIdempotentRepository(200);
}

@Override
public boolean add(Exchange exchange, String key) {
exchanges.add(exchange.getExchangeId());
return delegate.add(key);
}

@Override
public boolean contains(Exchange exchange, String key) {
exchanges.add(exchange.getExchangeId());
return delegate.contains(key);
}

@Override
public boolean remove(Exchange exchange, String key) {
exchanges.add(exchange.getExchangeId());
return delegate.remove(key);
}

@Override
public boolean confirm(Exchange exchange, String key) {
exchanges.add(exchange.getExchangeId());
return delegate.confirm(key);
}

@Override
public void clear() {
delegate.clear();
}

@Override
public boolean add(String key) {
throw new UnsupportedOperationException("Should not be called");
}

@Override
public boolean contains(String key) {
throw new UnsupportedOperationException("Should not be called");
}

@Override
public boolean remove(String key) {
throw new UnsupportedOperationException("Should not be called");
}

@Override
public boolean confirm(String key) {
throw new UnsupportedOperationException("Should not be called");
}

public Set<String> getExchange() {
return exchanges;
}

@Override
public void start() {
// noop
}

@Override
public void stop() {
// noop
}
}

}