管道基础大数据平台系统开发-【后端】-Server
1
sws
2022-11-26 ab849f796bdc17236a95ea5fe5c166fb8f24a75c
src/main/java/com/lf/server/service/all/WebSocketService.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,132 @@
package com.lf.server.service.all;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * WebSocket服务类
 * @author WWW
 */
@ServerEndpoint(value = "/ws/select")
@Component
public class WebSocketService {
    @PostConstruct
    public void init() {
        System.out.println("websocket åŠ è½½");
    }
    private static Logger log = LoggerFactory.getLogger(WebSocketService.class);
    private static final AtomicInteger ONLINE_COUNT = new AtomicInteger(0);
    /**
     * ç”¨æ¥å­˜æ”¾æ¯ä¸ªå®¢æˆ·ç«¯å¯¹åº”çš„Session对象(线程安全Set)
     */
    private static CopyOnWriteArraySet<Session> SessionSet = new CopyOnWriteArraySet<Session>();
    /**
     * è¿žæŽ¥å»ºç«‹æˆåŠŸè°ƒç”¨çš„æ–¹æ³•
     */
    @OnOpen
    public void onOpen(Session session) {
        SessionSet.add(session);
        int cnt = ONLINE_COUNT.incrementAndGet();
        log.info("有连接加入,当前连接数为:{}", cnt);
        sendMessage(session, "连接成功");
    }
    /**
     * è¿žæŽ¥å…³é—­è°ƒç”¨çš„æ–¹æ³•
     */
    @OnClose
    public void onClose(Session session) {
        SessionSet.remove(session);
        int cnt = ONLINE_COUNT.decrementAndGet();
        log.info("有连接关闭,当前连接数为:{}", cnt);
    }
    /**
     * æ”¶åˆ°å®¢æˆ·ç«¯æ¶ˆæ¯åŽè°ƒç”¨çš„æ–¹æ³•
     *
     * @param message å®¢æˆ·ç«¯å‘送过来的消息
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("来自客户端的消息:{}", message);
        sendMessage(session, "收到消息,消息内容:" + message);
    }
    /**
     * å‡ºçŽ°é”™è¯¯
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("发生错误:{},Session ID: {}", error.getMessage(), session.getId());
        error.printStackTrace();
    }
    /**
     * å‘送消息,实践表明,每次浏览器刷新,session会发生变化。
     *
     * @param session
     * @param message
     */
    public static void sendMessage(Session session, String message) {
        try {
            session.getBasicRemote().sendText(String.format("%s", message));
        } catch (IOException e) {
            log.error("发送消息出错:{}", e.getMessage());
            e.printStackTrace();
        }
    }
    /**
     * ç¾¤å‘消息
     *
     * @param message
     * @throws IOException
     */
    public static void broadCastInfo(String message) throws IOException {
        for (Session session : SessionSet) {
            if (session.isOpen()) {
                sendMessage(session, message);
            }
        }
    }
    /**
     * æŒ‡å®šSession发送消息
     *
     * @param sessionId
     * @param message
     * @throws IOException
     */
    public static void sendMessage(String message, String sessionId) throws IOException {
        Session session = null;
        for (Session s : SessionSet) {
            if (s.getId().equals(sessionId)) {
                session = s;
                break;
            }
        }
        if (session != null) {
            sendMessage(session, message);
        } else {
            log.warn("没有找到你指定ID的会话:{}", sessionId);
        }
    }
}