Rabbitmq + websocket

这是我参与更文挑战的第11天,活动详情查看: 更文挑战

Rabbitmq + websocket 可以实现实时向前台推送数据。

websocket

websocket 长连接
WebSocket协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端。

为什么需要长连接?

http协议通信只能由客户端发起,websocket实现服务端向客户端推送数据

使用

1.依赖

<!--websocket长连接-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
复制代码

2.配置

@Configuration
public class WebSocketConfig extends HttpSessionHandshakeInterceptor {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

复制代码

3.例子

@Component
@ServerEndpoint(value = "/web", encoders = { ServerEncoder.class })
@Slf4j
public class WsgCtrl {

    @Autowired
    RabbitTemplate rabbitTemplate;

    private Session session;

    public static CopyOnWriteArraySet<WsgCtrl> webSocketSet = new CopyOnWriteArraySet<WsgCtrl>();

    @OnOpen
    public void onOpen(Session session) {
        this.session = session;
        webSocketSet.add(this);
        log.info("【websocket消息】有新的连接, 总数:{}", webSocketSet.size());
    }

    @OnClose
    public void onClose() {
        webSocketSet.remove(this);
        log.info("【websocket消息】连接断开, 总数:{}", webSocketSet.size());
    }

    @OnMessage
    public void onMessage(String message) throws Exception {
        log.info("【websocket消息】收到客户端发来的消息:{}", message);
        System.out.println(message);
        sendMessage(message);

    }

    public void sendMessage(String message) throws Exception {
//        for (WsgCtrl webSocket: webSocketSet) {
//            log.info("【websocket消息】广播消息, message={}", message);
//            try {
//                webSocket.session.getBasicRemote().sendText(message+"======");
//            } catch (Exception e) {
//                e.printStackTrace();
//            }
//        }
        JSONObject jsonObject = JSONObject.parseObject(message);
        BopDao bopDao = JSONObject.toJavaObject(jsonObject, BopDao.class);
        this.session.getBasicRemote().sendObject(bopDao);
// 一般使用sendText方法,传递对象需要使用sendObject方法同时配置编码ServerEncoder
//         this.session.getBasicRemote().sendText(message);
    }

}
复制代码

4.ServerEncoder.java

/**
 * websocket 传递对象需要编码
 */

public class ServerEncoder implements Encoder.Text<BopDao> {

    @Override
    public void init(EndpointConfig endpointConfig) {

    }

    @Override
    public void destroy() {
        // TODO Auto-generated method stub

    }

    @Override
    public String encode(BopDao bopDao) throws EncodeException {
        String s = JSON.toJSONString(bopDao);
        return s;

    }
}
复制代码

websocket整合rabbitmq

监听方法时调用自定义的sendMessage方法

@Slf4j
@RabbitListener(queues = "messagechange")
@Controller
public class Receiver {

    @Autowired
    BopInter bopInter;
    
    @RabbitHandler
    public void receiver(BopDao bopDao,Message message,Channel channel) throws Exception {
        log.info("<=============监听到队列消息============>");
        boolean flag = TestCtrl.flag;
        log.info("flag:"+flag);
        if(bopDao!=null&&flag==true){
            for ( WsgCtrl item : WsgCtrl.webSocketSet ){

                item.sendMessage(JSON.toJSONString(bopDao));
            }
            bopInter.updateBop(bopDao);
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);

        }
//        else {
//            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
//        }

    }
}
复制代码

后续在补充吧

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享