package com.ruoyi.fuzhou.websocket; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.ruoyi.fuzhou.domain.*; import com.ruoyi.fuzhou.enums.DataTypeEnum; import com.ruoyi.fuzhou.service.*; import jakarta.websocket.*; import jakarta.websocket.server.PathParam; import jakarta.websocket.server.ServerEndpoint; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.Iterator; import java.util.List; import java.util.concurrent.*; //@ServerEndpoint("/dp/sendOil/{userId}") //@Component public class WebSocketOilServer implements ApplicationContextAware { private final static Logger log = LoggerFactory.getLogger(WebSocketOilServer.class); /** * 在线人数 */ private static int onlineCount = 0; /** * 在线人员session */ private static ConcurrentHashMap webSocketMap = new ConcurrentHashMap<>(); /** * 与某个客户端的连接会话,需要通过它来给客户端发送数据 */ private Session session; /** * 接收userId */ private String userId = ""; //定时任务 private ScheduledExecutorService scheduler; //泊位ID private Integer beId = 0; private static final Object lock = new Object(); private static ApplicationContext context; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { context = applicationContext; } public static T getBean(Class beanClass){ return context.getBean(beanClass); } // private EquipmentService equipmentService; // private ReceiveOilValueService receiveOilValueService; // private ReceiveWaterValueService receiveWaterValueService; // private ReceiveElectricityValueService receiveElectricityValueService; // private DpRfidTaskService dpRfidTaskService; // private DpRfidVehicleService dpRfidVehicleService; /** * 连接建立成功调用的方法 */ @OnOpen public void onOpen(Session session, @PathParam("userId") String userId) { this.session = session; this.userId = userId; if (webSocketMap.containsKey(userId)) { webSocketMap.remove(userId); webSocketMap.put(userId, this); //加入set中 } else { webSocketMap.put(userId, this); } try { sendMessage("连接成功"); } catch (IOException e) { log.error("用户:" + userId + ",网络异常!!!!!!"); } } /** * 连接关闭调用的方法 */ @OnClose public void onClose() { if(scheduler != null){ scheduler.shutdown(); } if (webSocketMap.containsKey(userId)) { webSocketMap.remove(userId); } } /** * 收到客户端消息后调用的方法 * * @param message 客户端发送过来的消息 */ @OnMessage public void onMessage(String message, Session session) { JSONObject beObject = JSONObject.parseObject(message); String type = beObject.getString("type"); try { if(!type.isEmpty()){ sendMessage(type); // beId = Integer.parseInt(beObject.getString("beId")); // if(!type.isEmpty() && type.equals("startSupply") && beId != null && beId != 0){ // //创建定时任务 // scheduler = Executors.newSingleThreadScheduledExecutor(); // //每10秒执行一次 // scheduler.scheduleAtFixedRate(()->{ // try { // synchronized (lock){ // if(session !=null && session.isOpen()){ // JSONObject jsonObject = new JSONObject(); // jsonObject.put("type","Supply"); // EquipmentService equipmentService = getBean(EquipmentService.class); // LambdaQueryWrapper oilWrapper = new LambdaQueryWrapper<>(); // oilWrapper.eq(DpEquipment::getBeId,beId) // .in(DpEquipment::getEquipmentTypeId,DataTypeEnum.LIJIUOIL.getCode(),DataTypeEnum.OIL.getCode(),DataTypeEnum.LIJIUJUN.getCode()) // .orderByAsc(DpEquipment::getId); // List oilList = equipmentService.list(oilWrapper); // JSONArray oilArray = new JSONArray(); // for (DpEquipment dpEquipment:oilList){ // ReceiveOilValue oilValue = (ReceiveOilValue) equDataList(dpEquipment.getId()); // if(oilValue != null){ // oilValue.setId(dpEquipment.getId().longValue()); // } // oilArray.add(oilValue); // } // jsonObject.put("oil",oilArray); // // LambdaQueryWrapper waterWrapper = new LambdaQueryWrapper<>(); // waterWrapper.eq(DpEquipment::getBeId,beId) // .eq(DpEquipment::getEquipmentTypeId,2) // .orderByAsc(DpEquipment::getId); // List waterList = equipmentService.list(waterWrapper); // JSONArray waterArray = new JSONArray(); // for (DpEquipment dpEquipment:waterList){ // ReceiveWaterValue waterValue = (ReceiveWaterValue) equDataList(dpEquipment.getId()); // if(waterValue != null){ // waterValue.setId(dpEquipment.getId().longValue()); // } // waterArray.add(waterValue); // } // jsonObject.put("water",waterArray); // // LambdaQueryWrapper elecWrapper = new LambdaQueryWrapper<>(); // elecWrapper.eq(DpEquipment::getBeId,beId) // .eq(DpEquipment::getEquipmentTypeId,3) // .orderByAsc(DpEquipment::getId); // List elecList = equipmentService.list(elecWrapper); // JSONArray elecArray = new JSONArray(); // for (DpEquipment dpEquipment:elecList){ // ReceiveElectricityValue electricityValue = (ReceiveElectricityValue)equDataList(dpEquipment.getId()); // if(electricityValue != null){ // electricityValue.setId(dpEquipment.getId().longValue()); // } // elecArray.add(electricityValue); // } // jsonObject.put("elec",elecArray); // // DpRfidTaskService dpRfidTaskService = getBean(DpRfidTaskService.class); // LambdaQueryWrapper rfidTaskWrapper = new LambdaQueryWrapper<>(); // rfidTaskWrapper.orderByDesc(DpRfidTask::getCreateTime).last("LIMIT 1"); // List rfidTaskList = dpRfidTaskService.list(rfidTaskWrapper); // DpRfidTask dpRfidTask = rfidTaskList.get(0); // JSONArray jsonArray = JSON.parseArray(dpRfidTask.getGoodsList().toString()); // JSONObject goods = jsonArray.getJSONObject(0); // String rfidNum = goods.getString("rfid"); // jsonObject.put("RFID_Task",rfidTaskList); // // DpRfidVehicleService dpRfidVehicleService = getBean(DpRfidVehicleService.class); // LambdaQueryWrapper rfidVehicleWrapper = new LambdaQueryWrapper<>(); // rfidVehicleWrapper.eq(DpRfidVehicle::getRfidNum,rfidNum) // .orderByDesc(DpRfidVehicle::getPassTime).last("LIMIT 1"); // List rfidVehicleList = dpRfidVehicleService.list(rfidVehicleWrapper); // jsonObject.put("RFID_Vehicle",rfidVehicleList); // // sendMessage(jsonObject.toString()); // // } // } // }catch (Exception e){ // e.printStackTrace(); // } // },0,10, TimeUnit.SECONDS // // ); }else if(type.equals("endSupply")){ onClose(); } else { sendMessage("传参失败"); } } catch (Exception e) { log.error("心跳失败,客户端已断线", e); } } /** * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { log.error("用户错误:" + this.userId + ",原因:" + error.getMessage()); error.printStackTrace(); } /** * 实现服务器主动推送 */ public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); } /** * 发送自定义消息 */ public static void sendInfo(String message, @PathParam("userId") String userId) throws IOException { log.info("发送消息到:" + userId + ",报文:" + message); if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) { webSocketMap.get(userId).sendMessage(message); } else { log.error("用户" + userId + ",不在线!"); } } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { WebSocketOilServer.onlineCount++; } public static synchronized void subOnlineCount() { WebSocketOilServer.onlineCount--; } /** * 实现服务器主动推送 */ public static void sendAllMessage(String message) { Iterator key = webSocketMap.keys().asIterator(); while (key.hasNext()) { String strList = (String) key.next(); try { log.info(strList); webSocketMap.get(strList).sendMessage(message); } catch (Exception e) { e.printStackTrace(); log.error("推送失败!", e); } } } // public Object equDataList(Integer id){ // EquipmentService equipmentService = getBean(EquipmentService.class); // ReceiveOilValueService receiveOilValueService = getBean(ReceiveOilValueService.class); // ReceiveWaterValueService receiveWaterValueService = getBean(ReceiveWaterValueService.class); // ReceiveElectricityValueService receiveElectricityValueService = getBean(ReceiveElectricityValueService.class); // DpEquipment receiveInfo = equipmentService.getById(id); // if (receiveInfo == null) { // return new Object(); // } // if (DataTypeEnum.LIJIUOIL.getCode().equals(receiveInfo.getEquSeType()) || DataTypeEnum.OIL.getCode().equals(receiveInfo.getEquSeType())) { // return receiveOilValueService.getOne(new LambdaQueryWrapper() {{ // or().eq(ReceiveOilValue::getDeviceName, String.valueOf(receiveInfo.getId())) // .orderByDesc(ReceiveOilValue::getCreateTime).last("LIMIT 1"); // }}); // } else if (DataTypeEnum.WATER_FLOW.getCode().equals(receiveInfo.getEquSeType()) || DataTypeEnum.WATER_YA.getCode().equals(receiveInfo.getEquSeType()) // || DataTypeEnum.WATER_DEPT.getCode().equals(receiveInfo.getEquSeType())) { // return receiveWaterValueService.getOne(new LambdaQueryWrapper() {{ // or().eq(ReceiveWaterValue::getDeviceName, String.valueOf(receiveInfo.getId())) // .orderByDesc(ReceiveWaterValue::getCreateTime).last("LIMIT 1"); // }}); // } else if (DataTypeEnum.ELECTRICITY.getCode().equals(receiveInfo.getEquSeType())) { // return receiveElectricityValueService.getOne(new LambdaQueryWrapper() {{ // or().eq(ReceiveElectricityValue::getDeviceName, String.valueOf(receiveInfo.getId())) // .orderByDesc(ReceiveElectricityValue::getCreateTime).last("LIMIT 1"); // }}); // } // return new Object(); // } }