1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
| public class ExchangeIdempotentConsumerTest extends ContextTestSupport { protected Endpoint startEndpoint; protected MockEndpoint resultEndpoint;
private MyIdempotentRepo repo = new MyIdempotentRepo();
@Override public boolean isUseRouteBuilder() { return false; }
@Test public void testDuplicateMessagesAreFilteredOut() throws Exception { assertEquals(0, repo.getExchange().size());
context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start").idempotentConsumer(header("messageId"), repo) .to("mock:result"); } }); context.start();
resultEndpoint.expectedBodiesReceived("one", "two", "three");
sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("3", "three");
assertMockEndpointsSatisfied();
assertEquals(6, repo.getExchange().size());
for (Exchange exchange : resultEndpoint.getExchanges()) { assertTrue(repo.getExchange().contains(exchange.getExchangeId()), "Should contain the exchange"); } }
protected void sendMessage(final Object messageId, final Object body) { template.send(startEndpoint, new Processor() { public void process(Exchange exchange) { Message in = exchange.getIn(); in.setBody(body); in.setHeader("messageId", messageId); } }); }
@Override @BeforeEach public void setUp() throws Exception { super.setUp();
startEndpoint = resolveMandatoryEndpoint("direct:start"); resultEndpoint = getMockEndpoint("mock:result"); }
private final class MyIdempotentRepo implements IdempotentRepository {
private IdempotentRepository delegate; private Set<String> exchanges = new LinkedHashSet<>();
private MyIdempotentRepo() { delegate = MemoryIdempotentRepository.memoryIdempotentRepository(200); }
@Override public boolean add(Exchange exchange, String key) { exchanges.add(exchange.getExchangeId()); return delegate.add(key); }
@Override public boolean contains(Exchange exchange, String key) { exchanges.add(exchange.getExchangeId()); return delegate.contains(key); }
@Override public boolean remove(Exchange exchange, String key) { exchanges.add(exchange.getExchangeId()); return delegate.remove(key); }
@Override public boolean confirm(Exchange exchange, String key) { exchanges.add(exchange.getExchangeId()); return delegate.confirm(key); }
@Override public void clear() { delegate.clear(); }
@Override public boolean add(String key) { throw new UnsupportedOperationException("Should not be called"); }
@Override public boolean contains(String key) { throw new UnsupportedOperationException("Should not be called"); }
@Override public boolean remove(String key) { throw new UnsupportedOperationException("Should not be called"); }
@Override public boolean confirm(String key) { throw new UnsupportedOperationException("Should not be called"); }
public Set<String> getExchange() { return exchanges; }
@Override public void start() { }
@Override public void stop() { } }
}
|