本文介绍了线上业务中的一些异步调用实践经验,包含 IO 模型介绍、 的基本使用、RPC 异步调用、异步 HTTP 客户端 的使用等。RPC 使用的手写 RPC 框架,该框架支持异步调用。
本文要点:
为什么异步BIO 模型
首先我们先回顾一下 BIO 模型:
BIO 模型
当用户进程调用了 这个系统调用, 就开始了 IO 的第一个阶段:准备数据。对于 io 来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的UDP包),这个时候 就要等待足够的数据到来。而在用户进程这边异步调用,整个进程会被阻塞。当 一直等到数据准备好了,它就会将数据从 中拷贝到用户内存,然后 返回结果,用户进程才解除 block 的状态,重新运行起来。所以, IO 的特点就是在 IO 执行的两个阶段都被 block 了。
同步调用
同步调用
在同步调用的场景下,依次请求多个接口,耗时长、性能差,接口响应时长 T > T1+T2+T3+……+Tn。
减少同步等待
一般这个时候为了减少同步等待时间,会使用线程池来同时处理多个任务,接口的响应时间就是 MAX(T1,T2,T3):
线程池异步
大概代码如下:
Future future = executorService.submit(() -> {
Thread.sleep(2000);
return "hello world";
});
while (true) {
if (future.isDone()) {
System.out.println(future.get());
break;
}
}
同步模型中使用线程池确实能实现异步调用的效果,也能压缩同步等待的时间,但是也有一些缺陷:
NIO 模型
为了解决 BIO 中的缺陷,引入 NIO 模型:
NIO 模型
当用户进程发出 read 操作时,如果 中的数据还没有准备好,那么它并不会 block 用户进程,而是立刻返回一个 error。从用户进程角度讲 ,它发起一个 read 操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个 error 时,它就知道数据还没有准备好,于是它可以再次发送 read 操作。一旦 中的数据准备好了,并且又再次收到了用户进程的 call,那么它马上就将数据拷贝到了用户内存,然后返回。所以,用户进程其实是需要不断的主动询问 数据好了没有。
异步优化思路
我们知道了 NIO 的调用方式比 BIO 好,那我们怎么能在业务编码中使用到 NIO 呢?自己动手将 BIO 替换成 NIO 肯定不现实,已有组件支持 NIO 的可以直接使用,不支持的继续使用自定义线程池。
简述
是 java.util. 库在 java 8 中新增的主要工具,同传统的 相比,其支持流式计算、函数式编程、完成通知、自定义异常处理等很多新的特性。
常用 API 举例
CompletableFuture future = CompletableFuture.supplyAsync(()->{
try{
Thread.sleep(1000L);
return "hello world";
} catch (Exception e){
return "failed";
}
});
System.out.println(future.join());
// output
hello world
开启异步任务,到另一个线程执行。
CompletableFuture future1 = new CompletableFuture();
future.complete("hello world"); //异步线程执行
future.whenComplete((res, throwable) -> {
System.out.println(res);
});
System.out.println(future1.join());
CompletableFuture future2 = new CompletableFuture();
future.completeExceptionally(new Throwable("failed")); //异步线程执行
System.out.println(future2.join());
// output
hello world
hello world
Exception in thread "main"
java.util.concurrent.CompletionException:
java.lang.Throwable: failed
正常完成该 。
y 异常完成该 。
String original = "Message";
CompletableFuture cf =
CompletableFuture.completedFuture(original).thenApply(String::toUpperCase);
System.out.println(cf.join());
// output
MESSAGE
任务后置处理。
图示:
图示
CompletableFuture cf =
CompletableFuture.completedFuture("Message").thenApply(String::toUpperCase);
CompletableFuture cf1 =
CompletableFuture.completedFuture("Message").thenApply(String::toLowerCase);
CompletableFuture allCf = cf.thenCombine(cf1, (s1, s2) -> s1 + s2);
System.out.println(allCf.join());
// output
MSGmsg
合并任务,两个任务同时执行,结果由合并函数 返回。
图示:
图示allOf
CompletableFuture future1 = CompletableFuture.supplyAsync(() -> "Message1");
CompletableFuture future2 = CompletableFuture.supplyAsync(() -> "Message2");
CompletableFuture future3 = CompletableFuture.supplyAsync(() -> "Message3");
CompletableFuture future =
CompletableFuture.allOf(future1, future2, future3).thenApply(v -> {
String join1 = future1.join();
String join2 = future2.join();
String join3 = future3.join();
return join1 + join2 + join3;});
System.out.println(future.join());
// output
Msg1Msg2Msg3
allOf 会阻塞等待所有异步线程任务结束。
allOf 里的 join 并不会阻塞,传给 的函数是在 , , 全部完成时,才会执行 。
图示:
allOf 图示CF 执行线程
下面有两个小demo,可以先试着想想输出的结果:
String original = "Message";
CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync thread: " + Thread.currentThread().getName());
return original;
}).thenApply(r -> {
System.out.println("thenApply thread: " + Thread.currentThread().getName());
return r;
});
System.out.println(cf.join());
// output
supplyAsync thread: ForkJoinPool.commonPool-worker-1
thenApply thread: main
Message
String original = "Message";
CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync thread: " + Thread.currentThread().getName());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return original;
}).thenApply(r -> {
System.out.println("thenApply thread: " + Thread.currentThread().getName());
return r;
});
System.out.println(cf.join());
// output
supplyAsync thread: ForkJoinPool.commonPool-worker-1
thenApply thread: ForkJoinPool.commonPool-worker-1
Message
先看结论:
异步任务里没有 sleep 的时候,异步任务很快就会完成,意味着 JVM 执行到 的时候,前置 CF 已经提前完成所以后续的 CF 会被初始线程 main 线程执行。
异步任务里有 sleep 的时候, JVM 执行到 时,前置 CF 还没有完成,前置 CF 的线程会执行所有后续的 CF。
CF 嵌套join
ExecutorService executorService = Executors.newFixedThreadPool(2);
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
Thread.sleep(3000);
return 1;
}, executorService);
CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> {
Thread.sleep(3000);
return 2;
}, executorService);
Integer join1 = cf1.thenApply((cf1Val) -> {
System.out.println("cf1 start value:" + cf1Val);
Integer cf2Val = cf2.join();
System.out.println("cf2 end value:" + cf2Val);
return 3;
}).join();
//output
cf1 start value:1
cf2 end value:2
代码很简单,有一个线程数为 2 的线程池,cf1、cf2 都使用这个线程执行异步任务,特别的是在 cf1. 中会调用 cf2.join(),当线程数是2的时候可以顺利输出。
ExecutorService executorService = Executors.newFixedThreadPool(1);
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
Thread.sleep(3000);
return 1;
}, executorService);
CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> {
Thread.sleep(3000);
return 2;
}, executorService);
Integer join1 = cf1.thenApply((cf1Val) -> {
System.out.println("cf1 start value:" + cf1Val);
Integer cf2Val = cf2.join();
System.out.println("cf2 end value:" + cf2Val);
return 3;
}).join();
//output
cf1 start value:1
这时候我们将线程池的线程数调整为 1,这时只会输出 cf1 start value:1,然后就一直阻塞。
使用 -l pid 查看线程状态,发现是 ,等待的地方正是我们在代码里调用的cf2.join():
"pool-1-thread-1" #11 prio=5 os_prio=31 tid=0x00000001429f5000 nid=0xa903 waiting on condition
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for (a java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947)
at com.ppphuang.demo.threadPool.ExecutorsTest.lambda$main$2(ThreadPoolExecutorsTest.java:34)
原因是我们在唯一一个线程中调用 cf2.join(),阻塞等待 cf2 完成,但是 cf2 需要等待 cf1 完成之后才有空闲线程去执行。这就类似于你右手正拿着一个水杯,然后等待右手拿水壶倒满水,这是不可能完成的。所以尽量不要嵌套join,不注意隔离线程池的话很容易造成’死锁‘(线程阻塞)。
CF 常用 API
API描述
开启异步任务,到另一个线程执行,异步任务有返回值。
完成任务。
y
异常结束任务。
合并任务,两个任务同时执行,结果由合并函数 返回。
任务后置处理。
会取两个任务最先完成的任务,上个任务和这个任务同时进行,哪个先结束,先用哪个结果。
后续处理。
完成后的处理。
allOf
等待所有异步线程任务结束。
join
获取返回值,没有的 CF 对象调用join时,会等待再返回异步调用,已经 的 CF 对象调用join时,会立刻返回结果。
优化过程异步 RPC 客户端
我们手写的这个 RPC 框架支持异步调用,如果你想看具体的实现,可以在文末找到源码链接。异步调用之前会设置一个 方法,异步调用时会直接返回 null,不会等待服务端返回接果,服务端返回结果之后会通过 RPC 客户端自带的线程池执行设置的 方法。
RPC 异步调用图示:
RPC 异步调用包装异步RPC
通过 包装 RPC的客户端, 类中的 属性值为创建的某个 RPC 服务的异步客户端代理类,这个代理类在构造方法中创建并赋值给 属性。
类中的 async 方法接受 类型的参数 ,可以通过 .apply() 来通过 执行真正的 RPC 调用。
在 async 方法中实例化一个 , 并将 作为异步回调的上下文设置到 RPC 的异步回调中,之后将该 返回给调用者。
public class AsyncExecutor<C> {
private C client;
public AsyncExecutor(ClientProxyFactory clientProxyFactory, Class clazz, String group, String version) {
this.client = clientProxyFactory.getProxy(clazz, group, version, true);
}
public CompletableFuture async(Function function) {
CompletableFuture future = new CompletableFuture();
ClientProxyFactory.setLocalAsyncContextAndAsyncReceiveHandler(future, CompletableFutureAsyncCallBack.instance());
try {
function.apply(client);
} catch (Exception e) {
future.completeExceptionally(e);
}
return future;
}
}
异步回调类
public class CompletableFutureAsyncCallBack extends AsyncReceiveHandler {
private static volatile CompletableFutureAsyncCallBack INSTANCE;
private CompletableFutureAsyncCallBack() {
}
@Override
public void callBack(Object context, Object result) {
if (!(context instanceof CompletableFuture)) {
throw new IllegalStateException("the context must be CompletableFuture");
}
CompletableFuture future = (CompletableFuture) context;
if (result instanceof Throwable) {
future.completeExceptionally((Throwable) result);
return;
}
log.info("result:{}", result);
future.complete(result);
}
}
是 RPC 的异步回调抽象类,类中的 、 抽象方法需要子类实现。
实现了这个 抽象方法,第一个参数是我们在包装异步 RPC 时设置的 上下文,第二个参数是 RPC 返回的结果。方法中判断 RPC 返回的结果是否异常,若异常通过 y 异常结束这个 ,若正常通过 正常结束这个 。
注册异步客户端Bean
@Component
public class AsyncExecutorConfig {
@Autowired
ClientProxyFactory clientProxyFactory;
@Bean
public AsyncExecutor demoServiceAsyncExecutor() {
return new AsyncExecutor(clientProxyFactory, DemoService.class, "", "");
}
}
异步 RPC 调用
@Autowired
AsyncExecutor demoServiceAsyncExecutor;
CompletableFuture pppName = demoServiceAsyncExecutor.async(service -> service.hello("ppp"));
String name = pppName.join();
限时特惠:本站持续每日更新海量各大内部创业课程,一年会员仅需要98元,全站资源免费下载
点击查看详情
站长微信:Jiucxh