概念
组件(Component)本质上是端点(Endpoint)实例的工厂。
组件与 Endpoint 接口定义

Component 有 2 个方法来创建 Endpoint
- 使用一个**统一****资源标志符 URI**创建 Endpoint
- 使用 URI 和参数,参数用于配置 Endpoint 的属性
参考:HTTP 协议中 URI 和 URL 有什么区别?
使用 URI 创建
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Test public void testDirect() throws Exception {
Logger logger=LoggerFactory.getLogger(DirectTest.class); CamelContext context=new DefaultCamelContext();
DirectComponent component= (DirectComponent)context.getComponent("direct");
Endpoint endpoint=component.createEndpoint("direct:home");
logger.info("endpoint {} {} {}", endpoint.getEndpointUri(), endpoint.getEndpointBaseUri(), endpoint.getEndpointKey()); }
|
使用 URI 和参数创建
参数配置是可选的,并不是所有组件都支持参数创建的,我们使用 timer 组件进行测试
参数传入有 2 种方式
使用 map 和使用 url 参数传入,两者是等价的
1 2 3 4 5 6 7
| TimerComponent component= (TimerComponent)context.getComponent("timer");
Map<String, Object> properties=new HashMap<>(); properties.put("period","5s"); Endpoint endpoint=component.createEndpoint("timer:foo",properties);
Endpoint endpoint=component.createEndpoint("timer:foo?period=5s");
|
参数校验
Component 在创建 Endpoint 时,会根据 Endpoint 的 isLenientProperties 属性对传入的参数进行校验。
isLenientProperties 表示对于传入的参数是否执行宽松非严格模式校验,默认是 false,执行严格模式校验,那么当传入不支持的参数时,会进行预校验,如下示例
1 2
| Endpoint endpoint=component.createEndpoint("timer:foo?period=5s&id=1");
|
Endpoint 参数自动绑定
在创建 Endpoint 时,其 Component 会调用 Endpoint 的 configureProperties 方法,对 Endpoint 的特定属性进行赋值,Camel 内置定义了UriParam 注解,只要对要配置的属性进行绑定设置,属性会自动进行绑定赋值,如下示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public class TimerEndpoint extends DefaultEndpoint {
@UriParam private long repeatCount; @UriParam private boolean fixedRate; @UriParam(defaultValue = "true", label = "advanced") private boolean daemon = true; @UriParam(label = "advanced") private Date time; @UriParam(label = "advanced") private String pattern; @UriParam(label = "advanced") private Timer timer; }
|
TimerEndpoint 属性的自动参数绑定设置
Endpoint 消息生产消费接口定义

Endpoint 与 Exchange
使用 createExchange 方法创建 Exchange,Exchange 是消息的容器,用于在路由之间传递信息,如下示例
1 2 3 4
| Exchange exchange=endpoint.createExchange(); Message message=new DefaultMessage(exchange); message.setBody("testMsg"); exchange.setIn(message);
|
Endpoint 与 Consumer
在创建 Producer 之前,必须先创建 Consumer,否则会出现 Producer 发送的消息无法被消费的情况,如下代码示例
1 2 3 4
| Consumer consumer=endpoint.createConsumer(item->{ logger.info("consume {}:",item.getIn().getBody().toString()); }); consumer.start();
|
- 使用 createConsumer 方法创建 Consumer,入参是一个 Processor 执行器,用于消费 Producer 产生的消息
- 调用 start 方法,初始化 Consumer,Consumer 继承自 Service,Service 定义了对象的生命周期方法,只有调用 start 方法之后,Consumer 才会生效
如下接口定义

Endpoint 与 Producer
使用 createProducer 创建,并用 Producer 的 process 方法,给 Consumer 发送消息
1
| endpoint.createProducer().process(exchange);
|
如下接口定义

完整示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @Test public void testExchange() throws Exception {
Logger logger=LoggerFactory.getLogger(DirectTest.class); CamelContext context=new DefaultCamelContext();
DirectComponent component= (DirectComponent)context.getComponent("direct");
Endpoint endpoint=component.createEndpoint("direct:home");
Exchange exchange=endpoint.createExchange(); Message message=new DefaultMessage(exchange); message.setBody("testMsg"); exchange.setIn(message);
Consumer consumer=endpoint.createConsumer(item->{ logger.info("consume {}:",item.getIn().getBody().toString()); }); consumer.start(); endpoint.createProducer().process(exchange); }
|
完整模型
