最近在项目中在做一个消息推送的功能,比如客户下单之后通知给给对应的客户发送系统通知,这种消息推送需要使用到全双工的推送消息。

所谓的全双工表示客户端和服务端都能向对方发送消息。不使用同样是全双工的http是因为http只能由客户端主动发起请求,服务接收后返回消息。建立起连接之后,客户端和服务端都能主动向对方发送消息。

上一篇文章 Boot 整合单机介绍了在单机模式下进行消息的发送和接收:

用户A和用户B和web服务器建立连接之后,用户A发送一条消息到服务器web集群,服务器再推送给用户B,在单机系统上所有的用户都和同一个服务器建立连接,所有的都存储在同一个服务器中。

单个服务器是无法支撑几万人同时连接同一个服务器,需要使用到分布式或者集群将请求连接负载均衡到到不同的服务下。消息的发送方和接收方在同一个服务器web集群,这就和单体服务器类似,能成功接收到消息:

但负载均衡使用轮询的算法,无法保证消息发送方和接收方处于同一个服务器,当发送方和接收方不是在同一个服务器时,接收方是无法接受到消息的:

集群问题解决思路

客户端和服务端每次建立连接时候,会创建有状态的会话,服务器的保存维持连接的。客户端每次只能和集群服务器其中的一个服务器连接,后续也是和该服务器进行数据传输。

要解决集群的问题,应该考虑共享的问题,客户端成功连接服务器之后,其他服务器也知道客户端连接成功。

方案一: 共享(不可行)

和类似的http是如何解决集群问题的?解决方案之一就是共享,客户端登录服务端之后,将信息存储在Redis数据库中,连接其他服务器时,从Redis获取,实际就是将信息存储在Redis中,实现redis的共享。

可以被共享的前提是可以被序列化,而的是无法被序列化的,http的记录的是请求的数据,而的对应的是连接,连接到不同的服务器,也不同,无法被序列化。

方案二:ip hash(不可行)

http不使用共享,就可以使用Nginx负载均衡的ip hash算法,客户端每次都是请求同一个服务器,客户端的都保存在服务器上,而后续请求都是请求该服务器,都能获取到,就不存在分布式问题了。

相对http来说,可以由服务端主动推动消息给客户端,如果接收消息的服务端和发送消息消息的服务端不是同一个服务端,发送消息的服务端无法找到接收消息对应的,即两个不处于同一个服务端,也就无法推送消息。如下图所示:

解决问题的方法是将所有消息的发送方和接收方都处于同一个服务器下,而消息发送方和接收方都是不确定的,显然是无法实现的。

方案三:广播模式

将消息的发送方和接收方都处于同一个服务器下才能发送消息,那么可以转换一下思路,可以将消息以消息广播的方式通知给所有的服务器,可以使用消息中间件发布订阅模式,消息脱离了服务器的限制,通过发送到中间件,再发送给订阅的服务器,类似广播一样,只要订阅了消息,都能接收到消息的通知:

发布者发布消息到消息中间件,消息中间件再将发送给所有订阅者:

广播模式的实现搭建单机

参考以前写的单机搭建 文章,先搭建单机实现消息的推送。

1. 添加依赖


    org.springframework.boot
    spring-boot-starter-freemarker


    org.springframework.boot
    spring-boot-starter-web


    org.springframework.boot
    spring-boot-starter-websocket



    org.springframework.boot
    spring-boot-starter-amqp

2. 创建 er 的 bean 实例

er 的 bean 实例自动注册 @ 注解声明的 ,使用自带启动需要该配置,使用独立 则不需要该配置。

@Configuration
public class WebSocketConfig {
    //tomcat启动无需该配置
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

3. 创建服务端点 和 客户端端

@Component
@ServerEndpoint(value = "/message")
@Slf4j
public class WebSocket {

 private static Map webSocketSet = new ConcurrentHashMap();

 private Session session;

 @OnOpen
 public void onOpen(Session session) throws SocketException {
  this.session = session;
  webSocketSet.put(this.session.getId(),this);

  log.info("【websocket】有新的连接,总数:{}",webSocketSet.size());
 }

 @OnClose
 public void onClose(){
  String id = this.session.getId();
  if (id != null){
   webSocketSet.remove(id);
   log.info("【websocket】连接断开:总数:{}",webSocketSet.size());
  }
 }

 @OnMessage
 public void onMessage(String message){
  if (!message.equals("ping")){
   log.info("【wesocket】收到客户端发送的消息,message={}",message);
   sendMessage(message);
  }
 }

 /**
  * 发送消息
  * @param message
  * @return
  */
 public void sendMessage(String message){
  for (WebSocket webSocket : webSocketSet.values()) {
   webSocket.session.getAsyncRemote().sendText(message);
  }
  log.info("【wesocket】发送消息,message={}", message);

 }

}


    <input type="text" name="message" id="message">
    <button id="sendBtn">发送

<div style="width:100px;height: 500px;" id="content">

<script src="https://cdn.bootcdn.net/ajax/libs/jquery/3.6.0/jquery.js">
<script type="text/javascript">
    var ws = new WebSocket("ws://127.0.0.1:8080/message");
    ws.onopen = function(evt) {
        console.log("Connection open ...");
    };

    ws.onmessage = function(evt) {
        console.log( "Received Message: " + evt.data);
        var p = $("

"+evt.data+"

"
)
        $("#content").prepend(p);
        $("#message").val("");
    };

    ws.onclose = function(evt) {
        console.log("Connection closed.");
    };

    $("#sendBtn").click(function(){
        var aa = $("#message").val();
        ws.send(aa);
    })


服务端和客户端中的、、都是一一对应的。

添加

@GetMapping({"","index.html"})
public ModelAndView index() {
 ModelAndView view = new ModelAndView("index");
 return view;
}

效果展示

打开两个客户端,其中的一个客户端发送消息,另一个客户端也能接收到消息。

添加 中间件

这里使用比较常用的作为消息中间件,而支持发布订阅模式:


限时特惠:
本站持续每日更新海量各大内部创业课程,一年会员仅需要98元,全站资源免费下载
点击查看详情

站长微信:Jiucxh

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注