概念 创建一个线程池,让入口端点数据可以同时达到多个端点进行消费
设置线程池 使用 threads 节点在 DSL 中设置线程池
1 2 3 4 5 6 7 8 9 10 11 12 <route > <from uri ="direct:in" /> <log message ="Received ${body}:${threadName}" /> <threads > <log message ="Processing ${body}:${threadName}" /> <delay > <constant > 200</constant > </delay > <to uri ="mock:out" /> </threads > </route >
线程池参数 1 2 3 4 5 <route > <from uri ="seda:a" /> <threads poolSize ="5" maxQueueSize ="20" /> <to uri ="mock:result" /> </route >
自定义线程池
使用 threadPool 节点定义线程池
使用 threads 节点的 executorServiceRef 属性引用线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 <camelContext xmlns ="http://camel.apache.org/schema/spring" > <threadPool id ="customThreadPool" poolSize ="5" threadName ="CustomThreadPool" maxQueueSize ="100" /> <route > <from uri ="direct:in" /> <log message ="Received ${body}:${threadName}" /> <threads executorServiceRef ="customThreadPool" > <log message ="Processing ${body}:${threadName}" /> <transform > <simple > ${threadName}</simple > </transform > <to uri ="mock:out" /> </threads > </route > </camelContext >
线程池模板配置
通过 threadPoolProfile 节点配置线程池模板参数
在 threads 节点引用 threadPoolProfile
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 <camelContext xmlns ="http://camel.apache.org/schema/spring" > <threadPoolProfile id ="customThreadPoolProfile" poolSize ="5" /> <route id ="usesThreadPoolProfile" > <from uri ="direct:in" /> <log message ="Received ${body}:${threadName}" /> <threads executorServiceRef ="customThreadPoolProfile" > <log message ="Processing ${body}:${threadName}" /> <transform > <simple > ${threadName}</simple > </transform > <to uri ="mock:out" /> </threads > </route > </camelContext >
threadPoolProfile 和 threadPool 的区别
声明 threadPoolProfile 时,不会创建线程池,每个引用 threadPoolProfile 的地方,都会创建线程池
声明 threadPool 时,会创建线程池,有多个地方引用 threadPool 时,线程池只会保留原引用
异步执行器
实现 AsyncProcessor 接口的 process 方法
实现过程中采用了自定义线程池的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public interface AsyncProcessor extends Processor { boolean process (Exchange exchange, AsyncCallback callback) ; } public class SlowOperationProcessor implements AsyncProcessor { private final Logger log = LoggerFactory.getLogger(SlowOperationProcessor.class); private final ExecutorService backgroundExecutor = Executors.newSingleThreadExecutor(); @Override public boolean process (final Exchange exchange, final AsyncCallback asyncCallback) { final boolean completedSynchronously = false ; backgroundExecutor.submit(new Runnable () { @Override public void run () { log.info("Running operation asynchronously" ); try { log.info("Doing something slowly" ); Thread.sleep(200 ); log.info("...done" ); } catch (Exception e) { throw new RuntimeException (e); } asyncCallback.done(completedSynchronously); } }); return completedSynchronously; } @Override public void process (Exchange exchange) throws Exception { throw new IllegalStateException ("Should never be called" ); } }
声明并引用执行器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 <beans xmlns ="http://www.springframework.org/schema/beans" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation =" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd " > <bean id ="mySlowProcessor" class ="org.camelcookbook.parallelprocessing.asyncprocessor.SlowOperationProcessor" /> <camelContext xmlns ="http://camel.apache.org/schema/spring" > <route > <from uri ="seda:in?concurrentConsumers=5" /> <to uri ="direct:in" /> <log message ="Processed by:${threadName}" /> </route > <route > <from uri ="direct:in" /> <log message ="Processing ${body}:${threadName}" /> <process ref ="mySlowProcessor" /> <log message ="Completed ${body}:${threadName}" /> <to uri ="mock:out" /> </route > <route > <from uri ="direct:sync?synchronous=true" /> <log message ="Processing ${body}:${threadName}" /> <process ref ="mySlowProcessor" /> <log message ="Completed ${body}:${threadName}" /> <to uri ="mock:out" /> </route > </camelContext > </beans >
参考