camel系列-控制路由启动和退出

路由启动顺序

当路由有依赖关系时,由于被依赖路由未启动,主路由消费时会抛异常。

可以通过设置路由的 startupOrder 属性来设置启动顺序来解决这个问题,让被依赖路由先启动

  • 启动时:按 startupOrder 顺序加载路由
  • 退出时:按 startupOrder 顺序倒序关闭
  • 路由优雅退出的消息处理流程
  • 路由关闭时,endpoint 将停止消费,在路由中的消息会继续执行完毕直到路由退出
  • 消息会在路由中会有 300 秒的超时时间,超过该时间,消息将被丢弃

direct:start 依赖 seda:foo,所以设置 seda:foo 路由先启动

1
2
3
4
5
6
7
8
9
10
11
<routes>
<route startupOrder="1">
<from uri="seda:foo"/>
<to uri="mock:result"/>
</route>

<route startupOrder="2">
<from uri="direct:start"/>
<to uri="seda:foo"/>
</route>
</routes>

未指定顺序路由

direct:bar 未指定启动顺序,路由会在有指定 startupOrder 的路由启动完毕之后,再启动未指定启动顺序的路由

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<routes>
<route startupOrder="1">
<from uri="seda:foo"/>
<to uri="mock:result"/>
</route>

<route startupOrder="2">
<from uri="direct:start"/>
<to uri="seda:foo"/>
</route>

<route>
<from uri="direct:bar"/>
<to uri="seda:bar"/>
</route>
</routes>

配置最后启动的路由

可以指定一个比较大的数字来设置 startupOrder,表明该路由最后启动

1
2
3
4
5
6
7
8
9
10
11
// use auto assigned startup ordering
from("direct:start").to("seda:foo");

// should start first
from("seda:foo").startupOrder(1).to("mock:result");

// should start last after the default routes
from("direct:bar").startupOrder(12345).to("seda:bar");

// use auto assigned startup ordering
from("seda:bar").to("mock:other");

自动启动

您还可以使用autoStartup选项来配置是否应在 Camel 启动时启动指定路由。默认情况下,路由是自动启动的。

1
2
3
4
<route autoStartup="false">
<from uri="activemq:queue:special"/>
<to uri="file://backup"/>
</route>

全局设置启动

调用 camelContext 的 setAutoStartup 方法,设置全局启动配置

1
camelContext.setAutoStartup(false);

动态控制路由

RouteController

可以使用 RouteController 接口动态控制路由的启动和停止

1
camelContext.getRouteController().stopRoute(routeId);

ControlBus

还可以使用 ControlBus 组件来管理路由的启动和关闭,ControlBus 内部的实现实际还是调用 RouteController 接口来实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:in").routeId("mainRoute")
.log("Stopping route")
.process(new RouteStoppingProcessor())
.log("Signalled to stop route")
.to("mock:out");

from("timer:statusChecker").routeId("statusChecker")
.to("controlbus:route?routeId=mainRoute&action=status")
.filter(simple("${body} == 'Stopped'"))
.to("mock:stopped");
}
};
}

源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private final class ActionTask implements Runnable {

private final Exchange exchange;

private ActionTask(Exchange exchange) {
this.exchange = exchange;
}

@Override
public void run() {
String action = getEndpoint().getAction();
String id = getEndpoint().getRouteId();

if ("start".equals(action)) {
LOG.debug("Starting route: {}", id);
getEndpoint().getCamelContext().getRouteController().startRoute(id);
} else if ("stop".equals(action)) {
LOG.debug("Stopping route: {}", id);
getEndpoint().getCamelContext().getRouteController().stopRoute(id);
}
}
}