跳至主要內容

消息推送

bug君大约 4 分钟服务端推送消息SSEwebsocket服务端推送消息

在 Spring Boot 项目中,服务端推送消息到客户端的常用方法包括使用 WebSocket、Server-Sent Events (SSE) 和 HTTP 长轮询。以下是如何使用这些技术的简要介绍和示例:

1. 使用 WebSocket

提示

WebSocket 是一种全双工通信协议,允许服务端和客户端之间进行实时通信。

1.1 添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

1.2 服务端配置

@Slf4j
public class MiaWebSocketHandler extends TextWebSocketHandler {
    
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        // 参数可以在路径上传
        String path = Objects.requireNonNull(session.getUri()).getPath();
        WebSocketContext.add(session, path);
    }
    
    @Override
    public void handleTextMessage(@NotNull WebSocketSession session, @NotNull TextMessage message) throws Exception {
    }
    
    @Override
    public void afterConnectionClosed(@NotNull WebSocketSession session, @NotNull CloseStatus status) throws Exception {
        WebSocketContext.remove(session);
    }
    
}

1.3 websocket配置

@Configuration
@EnableWebSocket
public class WebSocketConfiguration implements WebSocketConfigurer {
    
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        // 注册WebSocket处理器,并定义连接的路径
        registry.addHandler(miaWebSocketHandler(), Constants.WEBSOCKET_PREFIX + "{userName}/{uuid}").setAllowedOrigins("*");
    }
    
    @Bean
    public MiaWebSocketHandler miaWebSocketHandler() {
        return new MiaWebSocketHandler();
    }
}

1.4 WebSocket 会话上下文工具

@Slf4j
@Component
public class WebSocketContext {
    
    /**
     * Session 与用户的映射
     */
    private static final Map<WebSocketSession, String> SESSION_USER_MAP = new ConcurrentHashMap<>();
    
    /**
     * 用户与 Session 的映射
     */
    private static final Map<String, WebSocketSession> USER_SESSION_MAP = new ConcurrentHashMap<>();
    
    /**
     * 添加 Session,并绑定用户和 Session 之间的映射
     *
     **/
    public static void add(WebSocketSession session, String user) {
        
        USER_SESSION_MAP.put(user, session);
        SESSION_USER_MAP.put(session, user);
    }
    
    /**
     * 移除 Session
     * 
     **/
    public static void remove(WebSocketSession session) {
        
        log.warn("ws主动断开:{}", SESSION_USER_MAP.get(session));
        String user = SESSION_USER_MAP.remove(session);
        if (user != null) {
            USER_SESSION_MAP.remove(user);
        }
    }
    
    /**
     * 根据用户id移除
     * 
     **/
    public static void remove(String userId) {
        
        log.warn("ws被动断开:{}", userId);
        WebSocketSession session = USER_SESSION_MAP.remove(userId);
        if (session != null) {
            SESSION_USER_MAP.remove(session);
        }
    }
    
    /**
     * 广播消息给所有在线用户
     * 
     **/
    public static void broadcast(String message, WebSocketSession me) {
        for (WebSocketSession session : SESSION_USER_MAP.keySet()) {
            if (!session.equals(me)) {
                sendTextMessage(session, message);
            }
        }
    }
    
    /**
     * 发送消息给指定用户的 Session
     * 
     **/
    public static void send(WebSocketSession session, String message) {
        sendTextMessage(session, message);
    }
    
    /**
     * 发送消息给指定用户
     *
     **/
    public static boolean send(String user, String message) {
        
        if (ObjectUtils.isEmpty(user)) {
            log.warn("websocket发送消息用户为空");
            return false;
        }
        
        if (!USER_SESSION_MAP.containsKey(user)) {
            return false;
        }
        if (!USER_SESSION_MAP.get(user).isOpen()) {
            return false;
        }
        WebSocketSession session = USER_SESSION_MAP.get(user);
        sendTextMessage(session, message);
        return true;
    }
    
    /**
     * 真正发送消息的实现
     *
     **/
    private static void sendTextMessage(WebSocketSession session, String message) {
        if (session == null) {
            return;
        }
        try {
            session.sendMessage(new TextMessage(message));
        } catch (IOException e) {
            log.error("websocket发送消息出现异常:{}", e.getMessage(), e);
        }
    }
    
    /**
     * 心跳监测
     *
     */
    @Scheduled(cron = "0/50 * * * * ?")
    public void heartbeat() {
        // 50秒检查一次
        for (WebSocketSession session : SESSION_USER_MAP.keySet()) {
            if (session.isOpen()) {
                JSONObject message = new JSONObject();
                message.put("type", "heartbeat");
                send(SESSION_USER_MAP.get(session), message.toString());
            }
        }
    }
}

1.4 客户端代码示例

<!DOCTYPE html>
<html>
<head>
    <title>WebSocket Client</title>
</head>
<body>
    <script>
        var socket = new WebSocket("ws://localhost:8080/ws");

        socket.onmessage = function(event) {
            console.log("Received: " + event.data);
        };

        socket.onopen = function() {
            socket.send("Hello, server!");
        };
    </script>
</body>
</html>

2. 使用 Server-Sent Events (SSE)

提示

SSE 是一种单向通信协议,允许服务端向客户端推送更新。

2.1 创建 SSE 控制器

sse的连接和断开通过接口操作,也可以在服务端直接断开。

@RestController
@RequestMapping("api/sse")
public class SseController {
    
    @Resource
    private SseClient sseClient;
    
    /**
     * 连接sse
     * 
     **/
    @GetMapping(value = "/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter connect(String patientId, String visitId, String uuid) {
        return sseClient.connect(patientId,visitId, uuid);
    }
    
    /**
     * 关闭sse连接
     * 
     **/
    @GetMapping("/closeConnect",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public void closeConnect(String patientId, String visitId, String uuid) {
        
        sseClient.closeSseConnect(patientId, visitId, uuid);
    }
}

2.2 sse工具类

@Slf4j
@Component
public class SseClient {
    
    private static final Map<String, SseEmitter> SSE_EMITTER_MAP = new ConcurrentHashMap<>();
    
    
    /**
     * 连接sse
     *
     */
    public SseEmitter connect(String patientId, String visitId, String uuid) {
        
        String userId = patientId + "+" + visitId + "+" + uuid;
        if (SSE_EMITTER_MAP.containsKey(userId)) {
            return SSE_EMITTER_MAP.get(userId);
        }
        // 默认30秒超时,设置为0L则永不超时
        SseEmitter sseEmitter = new SseEmitter(0L);
        // 完成后回调
        sseEmitter.onCompletion(() -> {
            
            SSE_EMITTER_MAP.remove(userId);
        });
        // 超时回调
        sseEmitter.onTimeout(() -> {
            log.info("{}连接sse超时", userId);
        });
        // 异常回调
        sseEmitter.onError(throwable -> {
            try {
                log.info("{}连接sse异常,异常信息{}", userId, throwable.toString(), throwable);
                sseEmitter.send(
                        SseEmitter.event().id(userId).name("发生异常!").data("发生异常请重试!").reconnectTime(3000));
                SSE_EMITTER_MAP.put(userId, sseEmitter);
            } catch (IOException e) {
                log.error("{}连接sse出现异常:{}", userId, e.getMessage(), e);
            }
        });
        try {
            sseEmitter.send(SseEmitter.event().reconnectTime(5000));
        } catch (IOException e) {
            log.error("sse出现异常:{}", e.getMessage(), e);
        }
        SSE_EMITTER_MAP.put(userId, sseEmitter);
        log.info("{}创建sse连接成功!", userId);
        return sseEmitter;
    }
    
    /**
     * 给指定用户发送消息
     *
     */
    public boolean sendMessage(String userId, String message) {
        log.info("推送消息:{},消息内容:{}",userId,message);
        if (StrUtil.isBlank(message)) {
            log.warn("{}消息为空", userId);
            return false;
        }
        SseEmitter sseEmitter = SSE_EMITTER_MAP.get(userId);
        if (ObjectUtils.isEmpty(sseEmitter)) {
            log.warn("消息推送失败uid:{},没有创建连接,请重试。消息内容:{}", userId, message);
            return false;
        }
        try {
            sseEmitter.send(SseEmitter.event().reconnectTime(60 * 1000L).data(message));
            log.info("用户{},推送成功:{}", userId, message);
            return true;
        } catch (Exception e) {
            SSE_EMITTER_MAP.remove(userId);
            log.info("用户{},推送异常:{}", userId, e.getMessage());
            sseEmitter.complete();
            return false;
        }
    }
    
    
    /**
     * 关闭 SSE 连接
     *
     */
    public void closeSseConnect(String patientId, String visitId, String uuid) {
        
        String userId = patientId + "+" + visitId + "+" + uuid;
        log.info("{}关闭sse连接", userId);
        if (SSE_EMITTER_MAP.containsKey(userId)) {
            SseEmitter sseEmitter = SSE_EMITTER_MAP.get(userId);
            sseEmitter.complete();
            SSE_EMITTER_MAP.remove(userId);
        } else {
            log.info("用户{}已关闭sse连接", userId);
        }
    }
    
    @Scheduled(cron = "0/30 * * * * ?")
    public void heartbeat() {
        
        SSE_EMITTER_MAP.forEach((key, value) -> {
           sendMessage(key, "heartbeat");
        });
    }
}

2.3 客户端代码示例

<!DOCTYPE html>
<html>
<head>
    <title>SSE Client</title>
</head>
<body>
    <script>
        var eventSource = new EventSource("/sse");

        eventSource.onmessage = function(event) {
            console.log("Received: " + event.data);
        };
    </script>
</body>
</html>

3. 使用 HTTP 长轮询

提示

HTTP 长轮询是一种模拟实时通信的技术,通过保持 HTTP 连接打开直到有数据可发送。

3.1 创建长轮询控制器

   import org.springframework.web.bind.annotation.GetMapping;
   import org.springframework.web.bind.annotation.RestController;

   @RestController
   public class LongPollingController {

       @GetMapping("/poll")
       public String poll() throws InterruptedException {
           // 模拟等待数据
           Thread.sleep(5000);
           return "New data available!";
       }
   }

3.2 客户端示例

<!DOCTYPE html>
<html>
<head>
    <title>Long Polling Client</title>
</head>
<body>
    <script>
        function poll() {
            fetch("/poll")
                .then(response => response.text())
                .then(data => {
                    console.log("Received: " + data);
                    poll(); // 继续轮询
                });
        }

        poll(); // 开始轮询
    </script>
</body>
</html>
上次编辑于: