camel系列-请求/应答,返回地址

请求/应答

概念

当两个应用通过消息传递进行通信时,通信是单向的。应用可能需要双向的交互。

当应用发送一个消息后,如何才能获得接收者返回的响应?

发送一对请求/应答消息,每个消息都使用自己的通道。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* A simple request / reply test
*/
public class JmsSimpleRequestReplyTest extends ActiveMQTestSupport {

protected String componentName = "activemq";

@Test
public void testRequestReply() throws Exception {
MockEndpoint result = getMockEndpoint("mock:result");
result.expectedMessageCount(1);

template.requestBody("activemq:queue:hello", "Hello World");

result.assertIsSatisfied();
}

@Override
protected CamelContext createCamelContext() throws Exception {
CamelContext camelContext = super.createCamelContext();

ConnectionFactory connectionFactory = createConnectionFactory(null);
camelContext.addComponent("activemq", jmsComponentAutoAcknowledge(connectionFactory));

return camelContext;
}

public ConnectionFactory createConnectionFactory(String options) {
String url = vmUri("?broker.persistent=false&broker.useJmx=false");
if (options != null) {
url = url + "&" + options;
}
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
// use a pooled connection factory
PooledConnectionFactory pooled = new PooledConnectionFactory(connectionFactory);
pooled.setMaxConnections(8);
return pooled;
}

@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() {
from("activemq:queue:hello").process(exchange -> {
exchange.getIn().setBody("Bye World");
assertNotNull(exchange.getIn().getHeader("JMSReplyTo"));
}).to("mock:result");
}
};
}
}

返回地址

应用使用消息传递完成请求/应答。

应答者如何知道将应答发送到哪里?

请求消息中应该包含返回地址,它指出应该把应答消息发送到哪里。

示例

  1. 指定 ExchangePattern.InOut
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class JmsSimpleRequestReplyTest extends ActiveMQTestSupport {

protected String componentName = "activemq";

@Test
public void testRequestReply() throws Exception {
MockEndpoint result = getMockEndpoint("mock:result");
result.expectedMessageCount(1);


Exchange respExchange=template.send("activemq:queue:hello", ExchangePattern.InOut,(exchange -> {
exchange.getIn().setBody("Hello World");
}));
result.assertIsSatisfied();
}

@Override
protected CamelContext createCamelContext() throws Exception {
CamelContext camelContext = super.createCamelContext();

ConnectionFactory connectionFactory = createConnectionFactory(null);
camelContext.addComponent("activemq", jmsComponentAutoAcknowledge(connectionFactory));

return camelContext;
}

public ConnectionFactory createConnectionFactory(String options) {
String url = vmUri("?broker.persistent=false&broker.useJmx=false");
if (options != null) {
url = url + "&" + options;
}
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
// use a pooled connection factory
PooledConnectionFactory pooled = new PooledConnectionFactory(connectionFactory);
pooled.setMaxConnections(8);
return pooled;
}

@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() {
from("activemq:queue:hello").process(exchange -> {
exchange.getIn().setBody("Bye World");
assertNotNull(exchange.getIn().getHeader("JMSReplyTo"));
}).to("mock:result");
}
};
}
}
  1. 在 headers 中返回地址

MESSAGE EXCHANGE

Camel 中的交换Exchange是消息在路由期间的容器,交换还定义了系统之间的交互模式,也称为 message exchange patterns (MEP),MEP 用于区分单向消息传递和请求-响应消息传递,Camel 的数据交换拥有一个 MEP 属性,它可以是以下任意一种:

_“InOnly”—单向消息(也称为事件消息)。例如,JMS 消息传递通常是单向消息传递。
_“InOut”—请求-响应消息。例如,基于 http 的传输通常是请求-应答:客户端提交 web 请求,等待服务器的应答。

Exchange 的内容

Camel 的交换Exchange具有 ID、MEP、Exceptions 和 Properties。它还具有一个 In 消息来存储传入消息,以及一个 Out 消息来存储应答。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
public class SetExchangePatternTest extends ContextTestSupport {

@Test
public void testInOut() throws Exception {
assertMessageReceivedWithPattern("direct:testInOut", ExchangePattern.InOut);
}

@Test
public void testInOnly() throws Exception {
assertMessageReceivedWithPattern("direct:testInOnly", ExchangePattern.InOnly);
}

@Test
public void testSetToInOnlyThenTo() throws Exception {
assertMessageReceivedWithPattern("direct:testSetToInOnlyThenTo", ExchangePattern.InOnly);
}

@Test
public void testSetToInOutThenTo() throws Exception {
assertMessageReceivedWithPattern("direct:testSetToInOutThenTo", ExchangePattern.InOut);
}

@Test
public void testToWithInOnlyParam() throws Exception {
assertMessageReceivedWithPattern("direct:testToWithInOnlyParam", ExchangePattern.InOnly);
}

@Test
public void testToWithInOutParam() throws Exception {
assertMessageReceivedWithPattern("direct:testToWithInOutParam", ExchangePattern.InOut);
}

@Test
public void testSetExchangePatternInOnly() throws Exception {
assertMessageReceivedWithPattern("direct:testSetExchangePatternInOnly", ExchangePattern.InOnly);
}

@Test
public void testPreserveOldMEPInOut() throws Exception {
// the mock should get an InOut MEP
getMockEndpoint("mock:result").expectedMessageCount(1);
getMockEndpoint("mock:result").message(0).exchangePattern().isEqualTo(ExchangePattern.InOut);

// we send an InOnly
Exchange out = template.send("direct:testInOut", new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
exchange.getIn().setBody("Hello World");
exchange.setPattern(ExchangePattern.InOnly);
}
});

// the MEP should be preserved
assertNotNull(out);
assertEquals(ExchangePattern.InOnly, out.getPattern());

assertMockEndpointsSatisfied();
}

@Test
public void testPreserveOldMEPInOnly() throws Exception {
// the mock should get an InOnly MEP
getMockEndpoint("mock:result").expectedMessageCount(1);
getMockEndpoint("mock:result").message(0).exchangePattern().isEqualTo(ExchangePattern.InOnly);

// we send an InOut
Exchange out = template.send("direct:testInOnly", new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
exchange.getIn().setBody("Hello World");
exchange.setPattern(ExchangePattern.InOut);
}
});

// the MEP should be preserved
assertNotNull(out);
assertEquals(ExchangePattern.InOut, out.getPattern());

assertMockEndpointsSatisfied();
}

protected void assertMessageReceivedWithPattern(String sendUri, ExchangePattern expectedPattern)
throws InterruptedException {
ExchangePattern sendPattern;
switch (expectedPattern) {
case InOut:
sendPattern = ExchangePattern.InOnly;
break;
case InOnly:
sendPattern = ExchangePattern.InOut;
break;
default:
sendPattern = ExchangePattern.InOnly;
}

MockEndpoint resultEndpoint = getMockEndpoint("mock:result");
String expectedBody = "InOnlyMessage";
resultEndpoint.expectedBodiesReceived(expectedBody);
resultEndpoint.expectedHeaderReceived("foo", "bar");

template.sendBodyAndHeader(sendUri, sendPattern, expectedBody, "foo", "bar");
resultEndpoint.assertIsSatisfied();
ExchangePattern actualPattern = resultEndpoint.getExchanges().get(0).getPattern();
assertEquals(actualPattern, expectedPattern, "received exchange pattern");
}

@Override
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
// START SNIPPET: example
// Send to an endpoint using InOut
from("direct:testInOut").to(ExchangePattern.InOut, "mock:result");

// Send to an endpoint using InOut
from("direct:testInOnly").to(ExchangePattern.InOnly, "mock:result");

// Set the exchange pattern to InOut, then send it from
// direct:inOnly to mock:result endpoint
from("direct:testSetToInOnlyThenTo").setExchangePattern(ExchangePattern.InOnly).to("mock:result");
from("direct:testSetToInOutThenTo").setExchangePattern(ExchangePattern.InOut).to("mock:result");

// Or we can pass the pattern as a parameter to the to() method
from("direct:testToWithInOnlyParam").to(ExchangePattern.InOnly, "mock:result");
from("direct:testToWithInOutParam").to(ExchangePattern.InOut, "mock:result");

// Set the exchange pattern to InOut, then send it on
from("direct:testSetExchangePatternInOnly").setExchangePattern(ExchangePattern.InOnly).to("mock:result");
// END SNIPPET: example
}
};
}
}

参考