给几个网站谢谢,仿历史网站模板下载,wordpress有用户主页吗,网站在线制作最近在项目中在做一个消息推送的功能#xff0c;比如客户下单之后通知给给对应的客户发送系统通知#xff0c;这种消息推送需要使用到全双工的websocket推送消息。
所谓的全双工表示客户端和服务端都能向对方发送消息。不使用同样是全双工的http是因为http只能由客户端主动发…最近在项目中在做一个消息推送的功能比如客户下单之后通知给给对应的客户发送系统通知这种消息推送需要使用到全双工的websocket推送消息。
所谓的全双工表示客户端和服务端都能向对方发送消息。不使用同样是全双工的http是因为http只能由客户端主动发起请求服务接收后返回消息。websocket建立起连接之后客户端和服务端都能主动向对方发送消息。
websocket在单机模式下进行消息的发送和接收
用户A和用户B和web服务器建立连接之后用户A发送一条消息到服务器服务器再推送给用户B在单机系统上所有的用户都和同一个服务器建立连接所有的session都存储在同一个服务器中。
单个服务器是无法支撑几万人同时连接同一个服务器需要使用到分布式或者集群将请求连接负载均衡到到不同的服务下。消息的发送方和接收方在同一个服务器这就和单体服务器类似能成功接收到消息 但负载均衡使用轮询的算法无法保证消息发送方和接收方处于同一个服务器当发送方和接收方不是在同一个服务器时接收方是无法接受到消息的 websocket集群问题解决思路 客户端和服务端每次建立连接时候会创建有状态的会话session服务器的保存维持连接的session。客户端每次只能和集群服务器其中的一个服务器连接后续也是和该服务器进行数据传输。
要解决集群的问题应该考虑session共享的问题客户端成功连接服务器之后其他服务器也知道客户端连接成功。
方案一session 共享不可行 和websocket类似的http是如何解决集群问题的解决方案之一就是共享session客户端登录服务端之后将session信息存储在Redis数据库中连接其他服务器时从Redis获取session实际就是将session信息存储在Redis中实现redis的共享。
session可以被共享的前提是可以被序列化而websocket的session是无法被序列化的http的session记录的是请求的数据而websocket的session对应的是连接连接到不同的服务器session也不同无法被序列化。
方案二ip hash不可行 http不使用session共享就可以使用Nginx负载均衡的ip hash算法,客户端每次都是请求同一个服务器客户端的session都保存在服务器上而后续请求都是请求该服务器都能获取到session就不存在分布式session问题了。
websocket相对http来说可以由服务端主动推动消息给客户端如果接收消息的服务端和发送消息消息的服务端不是同一个服务端发送消息的服务端无法找到接收消息对应的session即两个session不处于同一个服务端也就无法推送消息。如下图所示 解决问题的方法是将所有消息的发送方和接收方都处于同一个服务器下而消息发送方和接收方都是不确定的显然是无法实现的。
方案三广播模式 将消息的发送方和接收方都处于同一个服务器下才能发送消息那么可以转换一下思路可以将消息以消息广播的方式通知给所有的服务器可以使用消息中间件发布订阅模式消息脱离了服务器的限制通过发送到中间件再发送给订阅的服务器类似广播一样只要订阅了消息都能接收到消息的通知 发布者发布消息到消息中间件消息中间件再将发送给所有订阅者
广播模式的实现 搭建单机 websocket 参考以前写的websocket单机搭建 文章先搭建单机websocket实现消息的推送。
添加依赖
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-freemarker/artifactId
/dependency
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId
/dependency
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-websocket/artifactId
/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency创建 ServerEndpointExporter 的 bean 实例 ServerEndpointExporter 的 bean 实例自动注册 ServerEndpoint 注解声明的 websocket endpoint使用springboot自带tomcat启动需要该配置使用独立 tomcat 则不需要该配置。
Configuration
public class WebSocketConfig {//tomcat启动无需该配置Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}创建服务端点 ServerEndpoint 和 客户端端 服务端点
Component
ServerEndpoint(value /message)
Slf4j
public class WebSocket {private static MapString, WebSocket webSocketSet new ConcurrentHashMap();private Session session;OnOpenpublic void onOpen(Session session) throws SocketException {this.session session;webSocketSet.put(this.session.getId(),this);log.info(【websocket】有新的连接,总数:{},webSocketSet.size());}OnClosepublic void onClose(){String id this.session.getId();if (id ! null){webSocketSet.remove(id);log.info(【websocket】连接断开:总数:{},webSocketSet.size());}}OnMessagepublic void onMessage(String message){if (!message.equals(ping)){log.info(【wesocket】收到客户端发送的消息,message{},message);sendMessage(message);}}/*** 发送消息* param message* return*/public void sendMessage(String message){for (WebSocket webSocket : webSocketSet.values()) {webSocket.session.getAsyncRemote().sendText(message);}log.info(【wesocket】发送消息,message{}, message);}}客户端点
divinput typetext namemessage idmessagebutton idsendBtn发送/button
/div
div stylewidth:100px;height: 500px; idcontent
/div
script srchttps://cdn.bootcdn.net/ajax/libs/jquery/3.6.0/jquery.js/script
script typetext/javascriptvar ws new WebSocket(ws://127.0.0.1:8080/message);ws.onopen function(evt) {console.log(Connection open ...);};ws.onmessage function(evt) {console.log( Received Message: evt.data);var p $(pevt.data/p)$(#content).prepend(p);$(#message).val();};ws.onclose function(evt) {console.log(Connection closed.);};$(#sendBtn).click(function(){var aa $(#message).val();ws.send(aa);})/script服务端和客户端中的OnOpen、onclose、onmessage都是一一对应的。
服务启动后客户端ws.onopen调用服务端的OnOpen注解的方法储存客户端的session信息握手建立连接。 客户端调用ws.send发送消息对应服务端的OnMessage注解下面的方法接收消息。 服务端调用session.getAsyncRemote().sendText发送消息对应的客户端ws.onmessage接收消息。 添加 controller
GetMapping({,index.html})
public ModelAndView index() {ModelAndView view new ModelAndView(index);return view;
}效果展示 打开两个客户端其中的一个客户端发送消息另一个客户端也能接收到消息。 添加 RabbitMQ 中间件 这里使用比较常用的RabbitMQ作为消息中间件而RabbitMQ支持发布订阅模式
添加消息订阅 交换机使用扇形交换机消息分发给每一条绑定该交换机的队列。以服务器所在的IP 端口作为唯一标识作为队列的命名启动一个服务使用队列绑定交换机实现消息的订阅
Configuration
public class RabbitConfig {Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange(PUBLISH_SUBSCRIBE_EXCHANGE);}Beanpublic Queue psQueue() throws SocketException {// ip 端口 为队列名 String ip IpUtils.getServerIp() _ IpUtils.getPort();return new Queue(ps_ ip);}Beanpublic Binding routingFirstBinding() throws SocketException {return BindingBuilder.bind(psQueue()).to(fanoutExchange());}
}修改服务端点 ServerEndpoint 在WebSocket添加消息的接收方法RabbitListener 接收消息队列名称使用常量命名动态队列名称使用 #{name}其中的name是Queue的bean 名称
RabbitListener(queues #{psQueue.name})
public void pubsubQueueFirst(String message) {System.out.println(message);sendMessage(message);
}
然后再调用sendMessage方法发送给所在连接的客户端。
修改消息发送 在WebSocket类的onMessage方法将消息发送改成RabbitMQ方式发送:
OnMessage
public void onMessage(String message){if (!message.equals(ping)){log.info(【wesocket】收到客户端发送的消息,message{},message);//sendMessage(message);if (rabbitTemplate null) {rabbitTemplate (RabbitTemplate) SpringContextUtil.getBean(rabbitTemplate);}rabbitTemplate.convertAndSend(PUBLISH_SUBSCRIBE_EXCHANGE, null, message);}
}消息通知流程如下所示 启动两个实例模拟集群环境 打开idea的Edit Configurations 点击左上角的COPY,然后添加端口server.port8081 启动两个服务端口分别是8080和8081。在启动8081端口的服务将前端连接端口改成8081:
var ws new WebSocket(ws://127.0.0.1:8081/message);效果展示