camel系列-Component

概念

组件(Component)本质上是端点(Endpoint)实例的工厂。

组件与 Endpoint 接口定义

Component 有 2 个方法来创建 Endpoint

  • 使用一个**统一****资源标志符 URI**创建 Endpoint
  • 使用 URI 和参数,参数用于配置 Endpoint 的属性

参考:HTTP 协议中 URI 和 URL 有什么区别?

使用 URI 创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
    @Test
public void testDirect() throws Exception {

Logger logger=LoggerFactory.getLogger(DirectTest.class);
CamelContext context=new DefaultCamelContext();

DirectComponent component= (DirectComponent)context.getComponent("direct");

Endpoint endpoint=component.createEndpoint("direct:home");

logger.info("endpoint {} {} {}",
endpoint.getEndpointUri(),
endpoint.getEndpointBaseUri(),
endpoint.getEndpointKey());
}
//输出结果
//endpoint direct:home direct:home direct:home

使用 URI 和参数创建

参数配置是可选的,并不是所有组件都支持参数创建的,我们使用 timer 组件进行测试

参数传入有 2 种方式

使用 map 和使用 url 参数传入,两者是等价的

1
2
3
4
5
6
7
TimerComponent component= (TimerComponent)context.getComponent("timer");
//使用map传入
Map<String, Object> properties=new HashMap<>();
properties.put("period","5s");
Endpoint endpoint=component.createEndpoint("timer:foo",properties);
//使用url参数传入
Endpoint endpoint=component.createEndpoint("timer:foo?period=5s");

参数校验

Component 在创建 Endpoint 时,会根据 Endpoint 的 isLenientProperties 属性对传入的参数进行校验。

isLenientProperties 表示对于传入的参数是否执行宽松非严格模式校验,默认是 false,执行严格模式校验,那么当传入不支持的参数时,会进行预校验,如下示例

1
2
//timer组件不支持id参数,所以无法创建
Endpoint endpoint=component.createEndpoint("timer:foo?period=5s&id=1");

Endpoint 参数自动绑定

在创建 Endpoint 时,其 Component 会调用 Endpoint 的 configureProperties 方法,对 Endpoint 的特定属性进行赋值,Camel 内置定义了UriParam 注解,只要对要配置的属性进行绑定设置,属性会自动进行绑定赋值,如下示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class TimerEndpoint extends DefaultEndpoint {

@UriParam
private long repeatCount;
@UriParam
private boolean fixedRate;
@UriParam(defaultValue = "true", label = "advanced")
private boolean daemon = true;
@UriParam(label = "advanced")
private Date time;
@UriParam(label = "advanced")
private String pattern;
@UriParam(label = "advanced")
private Timer timer;
}

TimerEndpoint 属性的自动参数绑定设置

Endpoint 消息生产消费接口定义

Endpoint 与 Exchange

使用 createExchange 方法创建 Exchange,Exchange 是消息的容器,用于在路由之间传递信息,如下示例

1
2
3
4
Exchange exchange=endpoint.createExchange();
Message message=new DefaultMessage(exchange);
message.setBody("testMsg");
exchange.setIn(message);

Endpoint 与 Consumer

在创建 Producer 之前,必须先创建 Consumer,否则会出现 Producer 发送的消息无法被消费的情况,如下代码示例

1
2
3
4
Consumer consumer=endpoint.createConsumer(item->{
logger.info("consume {}:",item.getIn().getBody().toString());
});
consumer.start();
  1. 使用 createConsumer 方法创建 Consumer,入参是一个 Processor 执行器,用于消费 Producer 产生的消息
  2. 调用 start 方法,初始化 Consumer,Consumer 继承自 Service,Service 定义了对象的生命周期方法,只有调用 start 方法之后,Consumer 才会生效

如下接口定义

Endpoint 与 Producer

使用 createProducer 创建,并用 Producer 的 process 方法,给 Consumer 发送消息

1
endpoint.createProducer().process(exchange);

如下接口定义

完整示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Test
public void testExchange() throws Exception {

Logger logger=LoggerFactory.getLogger(DirectTest.class);
CamelContext context=new DefaultCamelContext();

DirectComponent component= (DirectComponent)context.getComponent("direct");

Endpoint endpoint=component.createEndpoint("direct:home");

Exchange exchange=endpoint.createExchange();
Message message=new DefaultMessage(exchange);
message.setBody("testMsg");
exchange.setIn(message);

Consumer consumer=endpoint.createConsumer(item->{
logger.info("consume {}:",item.getIn().getBody().toString());
});
consumer.start();
endpoint.createProducer().process(exchange);
}

完整模型