这是我参与更文挑战的第11天,活动详情查看: 更文挑战
Rabbitmq + websocket 可以实现实时向前台推送数据。
websocket
websocket 长连接
WebSocket协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端。
为什么需要长连接?
http协议通信只能由客户端发起,websocket实现服务端向客户端推送数据
使用
1.依赖
<!--websocket长连接-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
复制代码
2.配置
@Configuration
public class WebSocketConfig extends HttpSessionHandshakeInterceptor {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
复制代码
3.例子
@Component
@ServerEndpoint(value = "/web", encoders = { ServerEncoder.class })
@Slf4j
public class WsgCtrl {
@Autowired
RabbitTemplate rabbitTemplate;
private Session session;
public static CopyOnWriteArraySet<WsgCtrl> webSocketSet = new CopyOnWriteArraySet<WsgCtrl>();
@OnOpen
public void onOpen(Session session) {
this.session = session;
webSocketSet.add(this);
log.info("【websocket消息】有新的连接, 总数:{}", webSocketSet.size());
}
@OnClose
public void onClose() {
webSocketSet.remove(this);
log.info("【websocket消息】连接断开, 总数:{}", webSocketSet.size());
}
@OnMessage
public void onMessage(String message) throws Exception {
log.info("【websocket消息】收到客户端发来的消息:{}", message);
System.out.println(message);
sendMessage(message);
}
public void sendMessage(String message) throws Exception {
// for (WsgCtrl webSocket: webSocketSet) {
// log.info("【websocket消息】广播消息, message={}", message);
// try {
// webSocket.session.getBasicRemote().sendText(message+"======");
// } catch (Exception e) {
// e.printStackTrace();
// }
// }
JSONObject jsonObject = JSONObject.parseObject(message);
BopDao bopDao = JSONObject.toJavaObject(jsonObject, BopDao.class);
this.session.getBasicRemote().sendObject(bopDao);
// 一般使用sendText方法,传递对象需要使用sendObject方法同时配置编码ServerEncoder
// this.session.getBasicRemote().sendText(message);
}
}
复制代码
4.ServerEncoder.java
/**
* websocket 传递对象需要编码
*/
public class ServerEncoder implements Encoder.Text<BopDao> {
@Override
public void init(EndpointConfig endpointConfig) {
}
@Override
public void destroy() {
// TODO Auto-generated method stub
}
@Override
public String encode(BopDao bopDao) throws EncodeException {
String s = JSON.toJSONString(bopDao);
return s;
}
}
复制代码
websocket整合rabbitmq
监听方法时调用自定义的sendMessage方法
@Slf4j
@RabbitListener(queues = "messagechange")
@Controller
public class Receiver {
@Autowired
BopInter bopInter;
@RabbitHandler
public void receiver(BopDao bopDao,Message message,Channel channel) throws Exception {
log.info("<=============监听到队列消息============>");
boolean flag = TestCtrl.flag;
log.info("flag:"+flag);
if(bopDao!=null&&flag==true){
for ( WsgCtrl item : WsgCtrl.webSocketSet ){
item.sendMessage(JSON.toJSONString(bopDao));
}
bopInter.updateBop(bopDao);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}
// else {
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
// }
}
}
复制代码
后续在补充吧
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END