消息推送
大约 4 分钟
在 Spring Boot 项目中,服务端推送消息到客户端的常用方法包括使用 WebSocket、Server-Sent Events (SSE) 和 HTTP 长轮询。以下是如何使用这些技术的简要介绍和示例:
1. 使用 WebSocket
提示
WebSocket 是一种全双工通信协议,允许服务端和客户端之间进行实时通信。
1.1 添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
1.2 服务端配置
@Slf4j
public class MiaWebSocketHandler extends TextWebSocketHandler {
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
// 参数可以在路径上传
String path = Objects.requireNonNull(session.getUri()).getPath();
WebSocketContext.add(session, path);
}
@Override
public void handleTextMessage(@NotNull WebSocketSession session, @NotNull TextMessage message) throws Exception {
}
@Override
public void afterConnectionClosed(@NotNull WebSocketSession session, @NotNull CloseStatus status) throws Exception {
WebSocketContext.remove(session);
}
}
1.3 websocket配置
@Configuration
@EnableWebSocket
public class WebSocketConfiguration implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
// 注册WebSocket处理器,并定义连接的路径
registry.addHandler(miaWebSocketHandler(), Constants.WEBSOCKET_PREFIX + "{userName}/{uuid}").setAllowedOrigins("*");
}
@Bean
public MiaWebSocketHandler miaWebSocketHandler() {
return new MiaWebSocketHandler();
}
}
1.4 WebSocket 会话上下文工具
@Slf4j
@Component
public class WebSocketContext {
/**
* Session 与用户的映射
*/
private static final Map<WebSocketSession, String> SESSION_USER_MAP = new ConcurrentHashMap<>();
/**
* 用户与 Session 的映射
*/
private static final Map<String, WebSocketSession> USER_SESSION_MAP = new ConcurrentHashMap<>();
/**
* 添加 Session,并绑定用户和 Session 之间的映射
*
**/
public static void add(WebSocketSession session, String user) {
USER_SESSION_MAP.put(user, session);
SESSION_USER_MAP.put(session, user);
}
/**
* 移除 Session
*
**/
public static void remove(WebSocketSession session) {
log.warn("ws主动断开:{}", SESSION_USER_MAP.get(session));
String user = SESSION_USER_MAP.remove(session);
if (user != null) {
USER_SESSION_MAP.remove(user);
}
}
/**
* 根据用户id移除
*
**/
public static void remove(String userId) {
log.warn("ws被动断开:{}", userId);
WebSocketSession session = USER_SESSION_MAP.remove(userId);
if (session != null) {
SESSION_USER_MAP.remove(session);
}
}
/**
* 广播消息给所有在线用户
*
**/
public static void broadcast(String message, WebSocketSession me) {
for (WebSocketSession session : SESSION_USER_MAP.keySet()) {
if (!session.equals(me)) {
sendTextMessage(session, message);
}
}
}
/**
* 发送消息给指定用户的 Session
*
**/
public static void send(WebSocketSession session, String message) {
sendTextMessage(session, message);
}
/**
* 发送消息给指定用户
*
**/
public static boolean send(String user, String message) {
if (ObjectUtils.isEmpty(user)) {
log.warn("websocket发送消息用户为空");
return false;
}
if (!USER_SESSION_MAP.containsKey(user)) {
return false;
}
if (!USER_SESSION_MAP.get(user).isOpen()) {
return false;
}
WebSocketSession session = USER_SESSION_MAP.get(user);
sendTextMessage(session, message);
return true;
}
/**
* 真正发送消息的实现
*
**/
private static void sendTextMessage(WebSocketSession session, String message) {
if (session == null) {
return;
}
try {
session.sendMessage(new TextMessage(message));
} catch (IOException e) {
log.error("websocket发送消息出现异常:{}", e.getMessage(), e);
}
}
/**
* 心跳监测
*
*/
@Scheduled(cron = "0/50 * * * * ?")
public void heartbeat() {
// 50秒检查一次
for (WebSocketSession session : SESSION_USER_MAP.keySet()) {
if (session.isOpen()) {
JSONObject message = new JSONObject();
message.put("type", "heartbeat");
send(SESSION_USER_MAP.get(session), message.toString());
}
}
}
}
1.4 客户端代码示例
<!DOCTYPE html>
<html>
<head>
<title>WebSocket Client</title>
</head>
<body>
<script>
var socket = new WebSocket("ws://localhost:8080/ws");
socket.onmessage = function(event) {
console.log("Received: " + event.data);
};
socket.onopen = function() {
socket.send("Hello, server!");
};
</script>
</body>
</html>
2. 使用 Server-Sent Events (SSE)
提示
SSE 是一种单向通信协议,允许服务端向客户端推送更新。
2.1 创建 SSE 控制器
sse的连接和断开通过接口操作,也可以在服务端直接断开。
@RestController
@RequestMapping("api/sse")
public class SseController {
@Resource
private SseClient sseClient;
/**
* 连接sse
*
**/
@GetMapping(value = "/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter connect(String patientId, String visitId, String uuid) {
return sseClient.connect(patientId,visitId, uuid);
}
/**
* 关闭sse连接
*
**/
@GetMapping("/closeConnect",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public void closeConnect(String patientId, String visitId, String uuid) {
sseClient.closeSseConnect(patientId, visitId, uuid);
}
}
2.2 sse工具类
@Slf4j
@Component
public class SseClient {
private static final Map<String, SseEmitter> SSE_EMITTER_MAP = new ConcurrentHashMap<>();
/**
* 连接sse
*
*/
public SseEmitter connect(String patientId, String visitId, String uuid) {
String userId = patientId + "+" + visitId + "+" + uuid;
if (SSE_EMITTER_MAP.containsKey(userId)) {
return SSE_EMITTER_MAP.get(userId);
}
// 默认30秒超时,设置为0L则永不超时
SseEmitter sseEmitter = new SseEmitter(0L);
// 完成后回调
sseEmitter.onCompletion(() -> {
SSE_EMITTER_MAP.remove(userId);
});
// 超时回调
sseEmitter.onTimeout(() -> {
log.info("{}连接sse超时", userId);
});
// 异常回调
sseEmitter.onError(throwable -> {
try {
log.info("{}连接sse异常,异常信息{}", userId, throwable.toString(), throwable);
sseEmitter.send(
SseEmitter.event().id(userId).name("发生异常!").data("发生异常请重试!").reconnectTime(3000));
SSE_EMITTER_MAP.put(userId, sseEmitter);
} catch (IOException e) {
log.error("{}连接sse出现异常:{}", userId, e.getMessage(), e);
}
});
try {
sseEmitter.send(SseEmitter.event().reconnectTime(5000));
} catch (IOException e) {
log.error("sse出现异常:{}", e.getMessage(), e);
}
SSE_EMITTER_MAP.put(userId, sseEmitter);
log.info("{}创建sse连接成功!", userId);
return sseEmitter;
}
/**
* 给指定用户发送消息
*
*/
public boolean sendMessage(String userId, String message) {
log.info("推送消息:{},消息内容:{}",userId,message);
if (StrUtil.isBlank(message)) {
log.warn("{}消息为空", userId);
return false;
}
SseEmitter sseEmitter = SSE_EMITTER_MAP.get(userId);
if (ObjectUtils.isEmpty(sseEmitter)) {
log.warn("消息推送失败uid:{},没有创建连接,请重试。消息内容:{}", userId, message);
return false;
}
try {
sseEmitter.send(SseEmitter.event().reconnectTime(60 * 1000L).data(message));
log.info("用户{},推送成功:{}", userId, message);
return true;
} catch (Exception e) {
SSE_EMITTER_MAP.remove(userId);
log.info("用户{},推送异常:{}", userId, e.getMessage());
sseEmitter.complete();
return false;
}
}
/**
* 关闭 SSE 连接
*
*/
public void closeSseConnect(String patientId, String visitId, String uuid) {
String userId = patientId + "+" + visitId + "+" + uuid;
log.info("{}关闭sse连接", userId);
if (SSE_EMITTER_MAP.containsKey(userId)) {
SseEmitter sseEmitter = SSE_EMITTER_MAP.get(userId);
sseEmitter.complete();
SSE_EMITTER_MAP.remove(userId);
} else {
log.info("用户{}已关闭sse连接", userId);
}
}
@Scheduled(cron = "0/30 * * * * ?")
public void heartbeat() {
SSE_EMITTER_MAP.forEach((key, value) -> {
sendMessage(key, "heartbeat");
});
}
}
2.3 客户端代码示例
<!DOCTYPE html>
<html>
<head>
<title>SSE Client</title>
</head>
<body>
<script>
var eventSource = new EventSource("/sse");
eventSource.onmessage = function(event) {
console.log("Received: " + event.data);
};
</script>
</body>
</html>
3. 使用 HTTP 长轮询
提示
HTTP 长轮询是一种模拟实时通信的技术,通过保持 HTTP 连接打开直到有数据可发送。
3.1 创建长轮询控制器
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class LongPollingController {
@GetMapping("/poll")
public String poll() throws InterruptedException {
// 模拟等待数据
Thread.sleep(5000);
return "New data available!";
}
}
3.2 客户端示例
<!DOCTYPE html>
<html>
<head>
<title>Long Polling Client</title>
</head>
<body>
<script>
function poll() {
fetch("/poll")
.then(response => response.text())
.then(data => {
console.log("Received: " + data);
poll(); // 继续轮询
});
}
poll(); // 开始轮询
</script>
</body>
</html>