SpringBoot + WebSocket

SpringBoot + WebSocket

1. 简介

WebSocket协议是基于TCP的一种新的网络协议。它实现了客户端与服务器全双工通信,学过计算机网络都知道,既然是全双工,就说明了服务器可以主动发送信息给客户端 。这与我们的推送技术或者是多人在线聊天的功能不谋而合。HTTP是单工通信,通信只能由客户端发起。

HTTP单工通信 WebSocket全双工通信

image.png

2. pom.xml

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
复制代码

3. Config

@Configuration
@ConditionalOnWebApplication
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}
复制代码

4. Server

因为WebSocket是类似客户端服务端的形式(采用ws协议),那么这里的WebSocketServer其实就相当于一个ws协议的Controller

  • @ ServerEndpoint 注解是一个类层次的注解,它的功能主要是将目前的类定义成一个websocket服务器端, 注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端
  • 新建一个ConcurrentHashMap webSocketMap 用于接收当前token的WebSocket,方便传递之间对user进行推送消息。
/**
 * websocket server
 *
 * @author CShip
 * @date 2021/2/22
 */
@Slf4j
@ServerEndpoint(value = "/ws/{token}")
@Component
public class WebSocketServer {

    private static final AtomicInteger CONNECTED_COUNT = new AtomicInteger(0);

    /**
     * key userId
     * value session
     *
     */
    private static final ConcurrentHashMap<String, Session> sessionMap = new ConcurrentHashMap<>();

    private static LoginHelper loginHelper;

    @Autowired
    private LoginHelper loginHelperResource;

    @PostConstruct
    public void init() {
        WebSocketServer.loginHelper = loginHelperResource;
    }

    @OnOpen
    public void onOpen(@PathParam("token") String token, Session session) {
        // token+三位随机数
        // 使用随机数来区分同一用户打开多个客户端进行操作
        Long userId = loginHelper.token2UserId(token.substring(0, token.length() - 4));
        String num = token.substring(token.length() - 4);
        sessionMap.put(userId.toString()  + num, session);
        int count = CONNECTED_COUNT.incrementAndGet();
        log.info("连接成功:{}", userId);
        log.info("当前连接:{}", count);
        sendMessage(session, new WebSocketVO<>("connected", 0));
    }

    @OnClose
    public void onClose(@PathParam("token") String token) {
        Long userId = loginHelper.getUserIdFromToken(token.substring(0, token.length() - 4));
        String num = token.substring(token.length() - 4);
        sessionMap.remove(userId.toString()  + num);
        int count = CONNECTED_COUNT.decrementAndGet();
        log.info("连接关闭:{}", userId);
        log.info("当前连接:{}", count);
    }


    @OnMessage
    public void onMessage(@PathParam("token") String token, String message, Session session) {
        WebSocketVO vo = new WebSocketVO<>();
        vo.setMessage(message);
        vo.setFlag(0);
        sendMessage(session, vo);
    }

    @OnError
    public void onError(@PathParam("token") String token, Session session, Throwable error) {
        log.error("Session {}  error : {}", session.getId(), error.getMessage());
    }


    public static void sendMessage(Session session, WebSocketVO vo) {
        try {
            session.getBasicRemote().sendText(JSONObject.toJSONString(vo));
        } catch (IOException e) {
            log.error("发送消息异常:{}", e.getMessage());
        }
    }

    /**
     * 向指定用户发送消息
     */
    public static void sendMessage(String userId, WebSocketVO vo) {
        for (Map.Entry<String, Session> entry : sessionMap.entrySet()) {
            String userIdWithSession = entry.getKey().split("\\.")[0];
            Session session = entry.getValue();
            if (userIdWithSession.equals(userId)) {
                if (session != null && session.isOpen()) {
                    vo.setFlag(1);
                    sendMessage(session, vo);
                }
            }
        }
    }

    /**
     * 群发消息
     */
    public static void broadCastInfo(WebSocketVO vo) {
        if (!CollectionUtils.isEmpty(sessionMap)) {
            sessionMap.forEach((userId, session) -> {
                if (session.isOpen()) {
                    vo.setFlag(1);
                    sendMessage(session, vo);
                }
            });
        }
    }
}
复制代码

5. 问题

a. 调用业务Service空指针

WebSocket启动的时候优先于spring容器,从而导致在WebSocketServer中调用业务Service会报空指针异常;需要在WebSocketServer中将所需要用到的service静态初始化一下。

  1. 在WebSocketServer中使用static修饰业务service
  2. 在WebSocketConfig中使用@Autowired修饰业务service的set方法

b. Nginx转发配置

基本上就是在转发时,要把转发的TCP/IP(socket)数据的头中的Upgrade和Connection给带过去(或设置的和前端一样)即可.

#websocket相关配置
proxy_http_version 1.1;
# 将客户端的Upgrade(作为websocket重要标识)请求转发(必须)
proxy_set_header Upgrade $http_upgrade;
# 将客户端的Connection(作为websocket重要标识)转发
proxy_set_header Connection "upgrade";
# 从字面看 X-Real-IP 代表的是客户端请求真实的 IP 地址,这个参数没有相关标准规范,如果是直接访问的请求,可能是客户端真实的 IP 地址,但是中间若经过了层层的代理,就是最后一层代理的 IP 地址。
proxy_set_header X-real-ip $remote_addr;
# X-Forwarded-For 记录着从客户端发起请求后访问过的每一个 IP 地址,当然第一个是发起请求的客户端本身的地址,各 IP 地址间由“英文逗号+空格”(,)分隔。
# remote_addr与$http_x_forwarded_for用以记录客户端的ip地址 即转发IP地址
proxy_set_header X-Forwarded-For $remote_addr;
复制代码
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享