camel系列-AsyncProducer

概念

AsyncProducer 是能够以异步的形式进行消息生产

CompletableFuture

CompletableFuture 能够以监听回调机制执行异步任务,如下例子

调用 CompletableFuture 的 complete 方法,来通知 Future 已经执行完成

更多参考:异步编程利器:CompletableFuture 详解 Java

AsyncProducer 模型

AsyncCallbackToCompletableFutureAdapter

其是 AsyncCallback 的默认实现,对 CompletableFuture 进行了封装,如下定义及使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class AsyncCallbackToCompletableFutureAdapter<T> implements AsyncCallback {
private final CompletableFuture<T> future;
private volatile T result;

public AsyncCallbackToCompletableFutureAdapter(CompletableFuture<T> future, T result) {
this.future = future != null ? future : new CompletableFuture<>();
this.result = result;
}

public CompletableFuture<T> getFuture() {
return future;
}

@Override
public void done(boolean doneSync) {
future.complete(result);
}
}

@Test
public void test3() throws ExecutionException, InterruptedException {

AsyncCallbackToCompletableFutureAdapter asyncCallback=new AsyncCallbackToCompletableFutureAdapter("异步执行结果");
CompletableFuture<String> completableFutureTwo = asyncCallback.getFuture().whenComplete((s, t) -> {
System.out.println("two=" +Thread.currentThread().getName());
System.out.println("异步执行完毕后,打印异步任务的结果:" + s);
});
asyncCallback.done(false);
}

AsyncProducer

DefaultAsyncProducer 是 AsyncProducer 的默认实现,其实现了 AsyncProcessor 的 processAsync 方法,执行完毕后会返回一个 CompletableFuture 对象,这意味着你可以拿着 CompletableFuture 去监听执行完毕的事件,如下定义

1
2
3
4
5
6
@Override
public CompletableFuture<Exchange> processAsync(Exchange exchange) {
AsyncCallbackToCompletableFutureAdapter<Exchange> callback = new AsyncCallbackToCompletableFutureAdapter<>(exchange);
process(exchange, callback);
return callback.getFuture();
}

如下使用示例

1
2
3
4
5
AsyncProducer producer=endpoint.createAsyncProducer();
CompletableFuture<Exchange> completableFuture=producer.processAsync(exchange);
completableFuture.whenComplete((s, t) -> {
System.out.println("异步执行完毕后,打印异步任务的结果:" + s);
});

AsyncConsumer 和 AsyncProcessor

DefaultConsumer 是 Consumer 的默认实现,消费者也可以采用异步的方式进行消费,DefaultConsumer 内部持有一个 AsyncProcessor 实例,Consumer 可以在适当的场景选择 AsyncProcessor 对消息执行消费,如下代码片段的定义和示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class DefaultConsumer implements Consumer {

public Processor getProcessor() {
return processor;
}

public AsyncProcessor getAsyncProcessor() {
return asyncProcessor;
}
}

public class DirectProducer extends DefaultAsyncProducer {
public boolean process(Exchange exchange, AsyncCallback callback) {
if (consumer.getEndpoint().isSynchronous()) {
consumer.getProcessor().process(exchange);
callback.done(true);
return true;
} else {
return consumer.getAsyncProcessor().process(exchange, callback);
}
}
}