时间:2021-05-19
springboot 长轮询实现
@EnableAsync 开启异步
@Sync 标记异步方法
Future 用于接收异步返回值
result.get(10, TimeUnit.SECONDS); 阻塞,超时获取结果
Future.cancel() 中断线程
补充:通过spring提供的DeferredResult实现长轮询服务端推送消息
DeferredResult字面意思就是推迟结果,是在servlet3.0以后引入了异步请求之后,spring封装了一下提供了相应的支持,也是一个很老的特性了。DeferredResult可以允许容器线程快速释放以便可以接受更多的请求提升吞吐量,让真正的业务逻辑在其他的工作线程中去完成。
最近再看apollo配置中心的实现原理,apollo的发布配置推送变更消息就是用DeferredResult实现的,apollo客户端会像服务端发送长轮训http请求,超时时间60秒,当超时后返回客户端一个304 httpstatus,表明配置没有变更,客户端继续这个步骤重复发起请求,当有发布配置的时候,服务端会调用DeferredResult.setResult返回200状态码,然后轮训请求会立即返回(不会超时),客户端收到响应结果后,会发起请求获取变更后的配置信息。
springboot启动类:
@SpringBootApplicationpublic class DemoApplication implements WebMvcConfigurer { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } @Bean public ThreadPoolTaskExecutor mvcTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setQueueCapacity(100); executor.setMaxPoolSize(25); return executor; } //配置异步支持,设置了一个用来异步执行业务逻辑的工作线程池,设置了默认的超时时间是60秒 @Override public void configureAsyncSupport(AsyncSupportConfigurer configurer) { configurer.setTaskExecutor(mvcTaskExecutor()); configurer.setDefaultTimeout(60000L); }}import com.google.common.collect.HashMultimap;import com.google.common.collect.Multimap;import com.google.common.collect.Multimaps;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestMethod;import org.springframework.web.bind.annotation.RestController;import org.springframework.web.context.request.async.DeferredResult; import java.util.Collection; @RestControllerpublic class ApolloController { private final Logger logger = LoggerFactory.getLogger(this.getClass()); //guava中的Multimap,多值map,对map的增强,一个key可以保持多个value private Multimap<String, DeferredResult<String>> watchRequests = Multimaps.synchronizedSetMultimap(HashMultimap.create()); //模拟长轮询 @RequestMapping(value = "/watch/{namespace}", method = RequestMethod.GET, produces = "text/html") public DeferredResult<String> watch(@PathVariable("namespace") String namespace) { logger.info("Request received"); DeferredResult<String> deferredResult = new DeferredResult<>(); //当deferredResult完成时(不论是超时还是异常还是正常完成),移除watchRequests中相应的watch key deferredResult.onCompletion(new Runnable() { @Override public void run() { System.out.println("remove key:" + namespace); watchRequests.remove(namespace, deferredResult); } }); watchRequests.put(namespace, deferredResult); logger.info("Servlet thread released"); return deferredResult; } //模拟发布namespace配置 @RequestMapping(value = "/publish/{namespace}", method = RequestMethod.GET, produces = "text/html") public Object publishConfig(@PathVariable("namespace") String namespace) { if (watchRequests.containsKey(namespace)) { Collection<DeferredResult<String>> deferredResults = watchRequests.get(namespace); Long time = System.currentTimeMillis(); //通知所有watch这个namespace变更的长轮训配置变更结果 for (DeferredResult<String> deferredResult : deferredResults) { deferredResult.setResult(namespace + " changed:" + time); } } return "success"; }}当请求超时的时候会产生AsyncRequestTimeoutException,我们定义一个全局异常捕获类:
import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.http.HttpStatus;import org.springframework.web.bind.annotation.ControllerAdvice;import org.springframework.web.bind.annotation.ExceptionHandler;import org.springframework.web.bind.annotation.ResponseBody;import org.springframework.web.bind.annotation.ResponseStatus;import org.springframework.web.context.request.async.AsyncRequestTimeoutException; import javax.servlet.http.HttpServletRequest;import javax.servlet.http.HttpServletResponse; @ControllerAdviceclass GlobalControllerExceptionHandler { protected static final Logger logger = LoggerFactory.getLogger(GlobalControllerExceptionHandler.class); @ResponseStatus(HttpStatus.NOT_MODIFIED)//返回304状态码 @ResponseBody @ExceptionHandler(AsyncRequestTimeoutException.class) //捕获特定异常 public void handleAsyncRequestTimeoutException(AsyncRequestTimeoutException e, HttpServletRequest request) { System.out.println("handleAsyncRequestTimeoutException"); }}然后我们通过postman工具发送请求http://localhost:8080/watch/mynamespace,请求会挂起,60秒后,DeferredResult超时,客户端正常收到了304状态码,表明在这个期间配置没有变更过。
然后我们在模拟配置变更的情况,再次发起请求http://localhost:8080/watch/mynamespace,等待个10秒钟(不要超过60秒),然后调用http://localhost:8080/publish/mynamespace,发布配置变更。这时postman会立刻收到response响应结果:
mynamespace changed:1538880050147表明在轮训期间有配置变更过。
这里我们用了一个MultiMap来存放所有轮训的请求,Key对应的是namespace,value对应的是所有watch这个namespace变更的异步请求DeferredResult,需要注意的是:在DeferredResult完成的时候记得移除MultiMap中相应的key,避免内存溢出请求。
采用这种长轮询的好处是,相比一直循环请求服务器,实例一多的话会对服务器产生很大的压力,http长轮询的方式会在服务器变更的时候主动推送给客户端,其他时间客户端是挂起请求的,这样同时满足了性能和实时性。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持。如有错误或未考虑完全的地方,望不吝赐教。
声明:本页内容来源网络,仅供用户参考;我单位不保证亦不表示资料全面及准确无误,也不保证亦不表示这些资料为最新信息,如因任何原因,本网内容或者用户因倚赖本网内容造成任何损失或损害,我单位将不会负任何法律责任。如涉及版权问题,请提交至online#300.cn邮箱联系删除。
Comet技术。目前,Comet技术有两种方法实现:基于AJAX的长轮询方式和基于Iframe的流方式。传统的网页实时通信技术使用HTTP协议,然而,HTTP协
SpringBoot+Socket实现与html页面的长连接,客户端给服务器端发消息,服务器给客户端轮询发送消息,附案例源码功能介绍客户端给所有在线用户发送消息
什么是长轮询?长轮询是“服务器推”技术实现方式的一种,可以将服务端发生的变化实时传送到客户端而无须客户端频繁的地刷新、发送请求。长轮询原理?客户端向服务器发送A
周末在家捣鼓了一下消息推送的简单例子,其实也没什么技术含量,欢迎大伙拍砖。我设计的这个推送demo是基于ajax长轮询+msmq消息队列来实现的,具体交互过程如
本文实例讲述了JavaScript使用setInterval()函数实现简单轮询操作的方法。分享给大家供大家参考。具体分析如下:轮询(Polling)是一种CP