camel系列-线程池

概念

创建一个线程池,让入口端点数据可以同时达到多个端点进行消费

设置线程池

使用 threads 节点在 DSL 中设置线程池

1
2
3
4
5
6
7
8
9
10
11
12
<route>
<from uri="direct:in"/>
<log message="Received ${body}:${threadName}"/>
<threads>
<log message="Processing ${body}:${threadName}"/>
<delay>
<constant>200</constant>
<!-- simulate slow route -->
</delay>
<to uri="mock:out"/>
</threads>
</route>

线程池参数

1
2
3
4
5
<route>
<from uri="seda:a"/>
<threads poolSize="5" maxQueueSize="20"/>
<to uri="mock:result"/>
</route>

自定义线程池

  1. 使用 threadPool 节点定义线程池
  2. 使用 threads 节点的 executorServiceRef 属性引用线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<camelContext xmlns="http://camel.apache.org/schema/spring">
<threadPool id="customThreadPool"
poolSize="5"
threadName="CustomThreadPool"
maxQueueSize="100"/>

<route>
<from uri="direct:in"/>
<log message="Received ${body}:${threadName}"/>
<threads executorServiceRef="customThreadPool">
<log message="Processing ${body}:${threadName}"/>
<transform>
<simple>${threadName}</simple>
</transform>
<to uri="mock:out"/>
</threads>
</route>
</camelContext>

线程池模板配置

  1. 通过 threadPoolProfile 节点配置线程池模板参数
  2. 在 threads 节点引用 threadPoolProfile
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<camelContext xmlns="http://camel.apache.org/schema/spring">
<threadPoolProfile id="customThreadPoolProfile" poolSize="5"/>

<route id="usesThreadPoolProfile">
<from uri="direct:in"/>
<log message="Received ${body}:${threadName}"/>
<threads executorServiceRef="customThreadPoolProfile">
<log message="Processing ${body}:${threadName}"/>
<transform>
<simple>${threadName}</simple>
</transform>
<to uri="mock:out"/>
</threads>
</route>
</camelContext>

threadPoolProfile 和 threadPool 的区别

  • 声明 threadPoolProfile 时,不会创建线程池,每个引用 threadPoolProfile 的地方,都会创建线程池
  • 声明 threadPool 时,会创建线程池,有多个地方引用 threadPool 时,线程池只会保留原引用

异步执行器

  1. 实现 AsyncProcessor 接口的 process 方法
  2. 实现过程中采用了自定义线程池的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public interface AsyncProcessor extends Processor {
boolean process(Exchange exchange, AsyncCallback callback);
}

public class SlowOperationProcessor implements AsyncProcessor {
private final Logger log = LoggerFactory.getLogger(SlowOperationProcessor.class);
private final ExecutorService backgroundExecutor = Executors.newSingleThreadExecutor();

@Override
public boolean process(final Exchange exchange, final AsyncCallback asyncCallback) {
final boolean completedSynchronously = false;

backgroundExecutor.submit(new Runnable() {
@Override
public void run() {
log.info("Running operation asynchronously");
try {
log.info("Doing something slowly");
Thread.sleep(200); // this runs slowly
log.info("...done");
} catch (Exception e) {
throw new RuntimeException(e);
}
// the current thread will continue to process the exchange
// through the remainder of the route
asyncCallback.done(completedSynchronously);
}
});

return completedSynchronously;
}

@Override
public void process(Exchange exchange) throws Exception {
throw new IllegalStateException("Should never be called");
}
}
  1. 声明并引用执行器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
">

<bean id="mySlowProcessor" class="org.camelcookbook.parallelprocessing.asyncprocessor.SlowOperationProcessor"/>

<camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="seda:in?concurrentConsumers=5"/>
<to uri="direct:in"/>
<log message="Processed by:${threadName}"/>
</route>

<route>
<from uri="direct:in"/>
<log message="Processing ${body}:${threadName}"/>
<process ref="mySlowProcessor"/>
<log message="Completed ${body}:${threadName}"/>
<to uri="mock:out"/>
</route>

<route>
<from uri="direct:sync?synchronous=true"/>
<log message="Processing ${body}:${threadName}"/>
<process ref="mySlowProcessor"/>
<log message="Completed ${body}:${threadName}"/>
<to uri="mock:out"/>
</route>
</camelContext>

</beans>

参考