WebSocket教程
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |
依赖
<!-- WebSocket依赖该依赖包含了web依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
WebSocketHandler
创建WebSocket服务器就像实现WebSocketHandler一样简单或者更可能是扩展TextWebSocketHandler或BinaryWebSocketHandler下面的示例使用TextWebSocketHandler
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.TextMessage;
public class HelloWebSocketHandler extends TextWebSocketHandler {
@Override
public void handleTextMessage(WebSocketSession session, TextMessage message) {
// ...
}
}
WebSocket握手
定制初始的HTTP WebSocket握手请求的最简单方法是通过HandshakeInterceptor它公开握手之前和之后的方法你可以使用这样的拦截器来阻止握手或使WebSocketSession的任何属性有效下面的示例使用内置的拦截器将HTTP会话属性传递给WebSocket会话
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
public static final String HELLO_SOCKET_URL = "hello";
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(webSocketHandler(), HELLO_SOCKET_URL)
.addInterceptors(new HttpSessionHandshakeInterceptor());
}
@Bean
public WebSocketHandler webSocketHandler(){
return new HelloWebSocketHandler();
}
}
更高级的选项是扩展DefaultHandshakeHandler该处理程序执行WebSocket握手的步骤包括验证客户端源、协商子协议和其他细节
@Slf4j
public class WebSocketInterceptor implements HandshakeInterceptor {
@Override
public boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception {
System.out.println("握手开始");
return true;
}
@Override
public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {
System.out.println("握手完成");
}
}
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
public static final String HELLO_SOCKET_URL = "hello";
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry webSocketHandlerRegistry) {
webSocketHandlerRegistry.addHandler(helloHandler, HELLO_URL)
.addInterceptors(helloInterceptor());
}
@Bean
public HandshakeInterceptor helloInterceptor(){
return new WebSocketInterceptor();
}
}
Spring提供了一个WebSocketHandlerDecorator基类你可以使用它来修饰WebSocketHandler具有额外的行为。默认情况下在使用WebSocket Java配置或XML命名空间时会提供并添加日志记录和异常处理实现。ExceptionWebSocketHandlerDecorator捕获来自任何WebSocketHandler的所有未捕获的异常并以状态1011关闭WebSocket会话这表示服务器错误。
服务器配置
每个基础WebSocket引擎都公开了控制运行时特性的配置属性比如消息缓冲区大小、空闲超时等等。
对于Tomcat、WildFly和GlassFish你可以将ServletServerContainerFactoryBean添加到WebSocket Java配置中如下面的示例所示
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
/**
* 服务器配置
*
* @return
*/
@Bean
public ServletServerContainerFactoryBean createWebSocketContainer(){
ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
container.setMaxTextMessageBufferSize(8192);
container.setMaxBinaryMessageBufferSize(8192);
return container;
}
}
对于客户端WebSocket配置你应该使用WebSocketContainerFactoryBeanXML或ContainerProvider.getWebSocketContainer()Java配置
对于Jetty需要提供预配置的Jetty WebSocketServerFactory并通过WebSocket Java配置将其插入Spring的DefaultHandshakeHandler下面的示例展示了如何做到这一点
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
public static final String HELLO_SOCKET_URL = "hello";
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(webSocketHandler(), HELLO_SOCKET_URL)
.setHandshakeHandler(handshakeHandler()); // 允许所有的源
}
@Bean
public DefaultHandshakeHandler handshakeHandler() {
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
policy.setInputBufferSize(8192);
policy.setIdleTimeout(600000);
return new DefaultHandshakeHandler(new JettyRequestUpgradeStrategy(new WebSocketServerFactory(policy)));
}
}
允许的源
在Spring Framework 4.1.5中WebSocket和SockJS的默认行为是只接受相同源的请求也可以允许所有或指定的源列表。这种检查主要是为浏览器客户端设计的没有什么可以阻止其他类型的客户端修改Origin header值请参阅RFC 6454: Web Origin概念以了解更多细节。
三种可能的行为是
只允许相同源的请求默认在这种模式下当启用SockJS时Iframe HTTP响应header X-Frame-Options被设置为SAMEORIGIN并且JSONP传输被禁用因为它不允许检查请求的源因此当启用该模式时不支持IE6和IE7。
允许指定的源列表每个允许的源必须以http://或https://开始在这种模式下当启用SockJS时IFrame传输将被禁用因此当启用该模式时将不支持IE6到IE9。
允许所有的源要启用此模式你应该提供*作为允许的源值在这种模式下所有的传输都是可用的。
实战
// 入参
@Data
public class UserDTO {
private String name;
private WebSocketSession session;
}
// result
@Data
public class UserVO {
private Long id;
private String name;
private String country;
}
Json工具类
@Slf4j
public class JsonUtil {
public static ObjectMapper mapper = new ObjectMapper();
public static <T> T parse(String json, Class<T> clazz){
try {
return mapper.readValue(json, clazz);
} catch (JsonProcessingException e) {
log.error("json解析失败{}", json);
return null;
}
}
public static String serialize(Object obj){
try {
return mapper.writeValueAsString(obj);
} catch (JsonProcessingException e) {
log.error("对象序列化失败");
return null;
}
}
}
@Slf4j
public class HelloWebSocketHandler extends TextWebSocketHandler {
/**
* 连接信息
*/
private static final Map<String, UserDTO> connInfoMap = new HashMap<>();
/**
* socket 成功建立事件
*
* @param session
* @throws Exception
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
log.info("websocket成功建立连接,sessionId:{}", session.getId());
}
/**
* socket 断开连接事件
*
* @param session
* @param status
* @throws Exception
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
session.close();
connInfoMap.remove(session.getId());
log.info("websocket断开连接,sessionId:{}", session.getId());
}
/**
* 接收消息事件
*
* @param session
* @param message
* @throws Exception
*/
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
String payload = message.getPayload();
log.info("接受到的json数据{}", payload);
UserDTO userDTO = JsonUtil.parse(payload, UserDTO.class);
if (Objects.isNull(userDTO) || userDTO.getName().isEmpty()){
return;
}
userDTO.setSession(session);
connInfoMap.put(session.getId(), userDTO);
this.pushBySessionId(session.getId());
}
private void pushBySessionId(String sessionId){
UserDTO userDTO = connInfoMap.get(sessionId);
Optional.ofNullable(userDTO).orElseThrow(() -> new RuntimeException("未找到该用户session信息"));
// 从数据库、kafka、内存中获取数据这里模拟数据
UserVO userVO = new UserVO();
userVO.setId(1L);
userVO.setName(userDTO.getName() + "s");
userVO.setCountry("CN");
try {
userDTO.getSession().sendMessage(new TextMessage(Objects.requireNonNull(JsonUtil.serialize(userVO))));
} catch (IOException e) {
log.error("推送消息失败名称{}errorMsg: {}",userDTO.getName(), e.getMessage());
}
}
}
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
public static final String HELLO_SOCKET_URL = "hello";
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(webSocketHandler(), HELLO_SOCKET_URL)
.addInterceptors(new HttpSessionHandshakeInterceptor())
.setAllowedOrigins("*"); // 允许所有的源
}
@Bean
public WebSocketHandler webSocketHandler(){
return new HelloWebSocketHandler();
}
/**
* 服务器配置
* 每个基础WebSocket引擎都公开了控制运行时特性的配置属性比如消息缓冲区大小、空闲超时等等
*
* @return
*/
@Bean
public ServletServerContainerFactoryBean createWebSocketContainer(){
ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
container.setMaxTextMessageBufferSize(8192);
container.setMaxBinaryMessageBufferSize(8192);
return container;
}
}
参考文献