异步处理的作用及必要性
传统的Servlet处理都是同步调用,亦即Web容器会为每个需要处理的请求都分配一个线程,线程处理完成后,就会将对应的结果返回给调用方。所以对于一些耗时较长的请求来说,或者对一些特定的场景(例如需要轮询结果的场景),采用异步处理很有必要。
Servlet对异步处理的支持
spring mvc是基于servlet的,所以我们要了解spring mvc的异步支持的实现,需要首先了解servlet对异步请求的处理。
假设我们现在基于servlet来实现一个请求,请求里需要耗费5秒的请求去处理某件事情(示例代码里用休眠5秒来模拟),如果我们自己要实现这个功能,就只能先将原始的请求线程进行休眠,直到处理完成后才进行唤醒,示例代码用闭锁处理:
public void demo1(HttpServletRequest req, HttpServletResponse rsp) throws Exception{
String value = req.getParameter("demoParameter");
CountDownLatch latch = new CountDownLatch(1);
new Thread(){
@Override
public void run() {
try {
//模拟正在处理任务
TimeUnit.SECONDS.sleep(5);
//模拟处理任务完成
rsp.getWriter().println("async run success, demoParameter = " + value);
latch.countDown();
}catch (Exception e){
log.error("任务处理失败", e)
}
//super.run();
}
}.start();
latch.await();
}
上述代码我们没有引入任何的Servlet的异步机制,仅在处理过程中,通过阻塞原请求线程直至工作线程完成后再返回结果,因为请求线程仍然被阻塞,这种方法本质上这种方法是换汤不换药的。
在servlet3.0,支持了异步http请求:AsyncContext,我们将上述的demo1代码改写如下:
public void demo2(HttpServletRequest req, HttpServletResponse rsp) throws Exception{
AsyncContext context = req.startAsync();
String value = req.getParameter("demoParameter");
new Thread(){
@Override
public void run() {
try {
//模拟正在处理任务
TimeUnit.SECONDS.sleep(5);
//模拟处理任务完成
context.getResponse().getWriter().println("async run success, demoParameter = " + value);
context.complete();
}catch (Exception e){
e.printStackTrace();
}
}
}.start();
}
在上述的demo2中,我们同样开启了一个新的线程来处理任务,和demo1不同的是,我们没有阻塞请求线程,而是利用了servlet3.0的异步能力构造了一个 AsyncContext,后续任务完成后的返回结果,我们也是直接写入到AsyncContext的Response中。
因为没有阻塞主线程,因此web中间件(例如tomcat)等的后续请求不会因为请求线程不足而无法正确响应。实际上,AsyncContext也是后文的spring mvc异步处理的基石,只是方便我们在spring mvc中更方便地使用它。
spring mvc 的异步处理
返回callback对象
如上文,异步处理的本质是需要构造一个新的工作线程,线程执行特定的任务,任务执行完成后,获取到任务的返回结果,再把结果响应给调用方。
因此,我们可以将我们实际要处理的任务封装在callable对象里,返回给spring mvc,spring mvc就会帮我们自动开启异步处理,示例代码如下:
public Callable<String> demo3(HttpServletRequest req, HttpServletResponse rsp) throws Exception{
String value = req.getParameter("demoParameter");
return new Callable<String>() {
@Override
public String call() throws Exception {
//模拟正在处理任务
TimeUnit.SECONDS.sleep(5);
//模拟处理任务完成
return "async run success, params = " + value;
}
};
}
读者可能会疑惑,这里返回的callable是用哪个线程池执行的呢?实际上,如果没特殊配置,用的是spring mvc内部的SimpleAsyncTaskExecutor,当然我们也可以自定义一个自己的线程池来做处理: 配置方法如下:
public class WebConfig extends WebMvcConfigurerAdapter {
@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
//定义自定义线程池,并将其设置到AsyncSupportConfigurer对象里
AsyncTaskExecutor myExecutors = new SimpleAsyncTaskExecutor();
configurer.setTaskExecutor(myExecutors);
}
}
返回DeferredResult对象
有很多时候,我们并不需要主动定义一个明确的线程来处理任务,很多请求是带有轮询的性质的,轮询分为两类:短轮询和长轮询
短轮询是一个很直接的http请求-响应流程,服务端收到请求后,判断是否有数据变化,并立即将这个结果返回给客户端,短轮询的有点是实现简单,但是缺点也很明显,会产生大量无用的请求,也会耗费服务端大量的资源。
长轮询是在短轮询的基础上作的一个改进,既然短轮询大量请求都是无用的,那么就让服务端在判断到返回无用请求时,先保持连接,直到获取到有效返回或者达到设定的超时时间为止,使用长轮询既在一定程度上减轻了服务端的负担(相比短轮询),也可以更及时地返回数据(短轮询有轮询的时间间隔),实际上,相当部分的开源组件都是使用的长轮询来解决数据变化的感知问题,例如apollo配置中心,也是使用的DeferredResult返回对象来实现的配置变更的长轮询
使用DeferredResult返回对象和Callback不同的地方是DeferredResult不需要自己创建一个线程池来执行处理,在明确需要返回结果时,通过DeferredResult.setResult设置返回即可。
也因此,DeferredResult的返回一般用于轮询的场景,当明确有轮询结果时,通过主动设置DeferredResult.setResult即可实现明确的通知返回:
我们现在做一个建单的例子,客户端轮询某个配置的值,当值发生改变时,立即返回改变值,值没发生改变,则请求不返回,直到超时
//记录DeferredResult的全局变量
private static Map<String, DeferredResult<String>> dataMap = new ConcurrentHashMap<>();
@GetMapping("/demo4")
public DeferredResult<String> demo4(@RequestParam String confKey, @RequestParam String oldVal){
DeferredResult<String> result = new DeferredResult<String>(60000L,"超时了");
result.onTimeout(()-> dataMap.remove(confKey));
result.onCompletion(()-> dataMap.remove(confKey));
//获取配置的实际值
String confVal = getConfVal(confKey);
//配置值不相等,立即返回
if(!confVal.equals(oldVal)){
result.setResult(confVal);
}
return result;
}
/**
* 这里是更新配置的方法
*/
private void updateConf(String confKey, String confVal){
//真正的更新配置的逻辑
//TODO
//配置更新完成后,触发DeferredResult通知
if(dataMap.contains(confKey)){
dataMap.get(confKey).setResult(confVal);
}
}
Spring实现的SSE
SSE即Server-Sent Events,亦即服务端推送消息,用于服务端将消息推送给客户端用,它本身也是一个长连接,在服务端需要推送消息时向客户端进行推送。 需要特别注意它和websocket的区别,SSE是单向推送的,仅仅是服务端到客户端,而websocket是双向的,服务端和客户端能互相通信,websocket不在本文的讨论范围,所以不展开讨论。 以下是实现的示例:
@GetMapping(path = "/demo5",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter handle(HttpServletResponse rsp) {
SseEmitter emitter = new SseEmitter();
//模拟发送推送消息
new Thread(){
@Override
public void run() {
try{
for(int i=0;i<10;i++) {
TimeUnit.SECONDS.sleep(1);
emitter.send("msg" + i );
}
emitter.complete();
}catch (Exception e){
e.printStackTrace();
}
}
}.start();
return emitter;
}
具体sse的相关内容,感兴趣的读者可以参考W3C关于SSE部分的标准文档:https://www.w3.org/TR/2021/SPSD-eventsource-20210128/
最后,springmvc还支持文本流和字节流的相关内容,亦即返回ResponseBodyEmitter 和StreamingResponseBody,这部分可以允许服务端向客户端逐步发送响应数据,具体内容此处就不展开了。
参考文档
https://docs.spring.io/spring-framework/docs/current/reference/html/web.html#mvc-ann-asynchttps://www.w3.org/TR/2021/SPSD-eventsource-20210128/