package com.ruoyi.web.webSocket; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.ruoyi.fuzhou.domain.*; import com.ruoyi.fuzhou.enums.DataTypeEnum; import com.ruoyi.fuzhou.service.*; import com.ruoyi.manage.domain.DsTaskList; import com.ruoyi.manage.service.DsTaskListService; import jakarta.websocket.*; import jakarta.websocket.server.PathParam; import jakarta.websocket.server.ServerEndpoint; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @ServerEndpoint("/dp/sendOil/{userId}") @Component public class WebSocketEquServer implements ApplicationContextAware { private final static Logger log = LoggerFactory.getLogger(WebSocketEquServer.class); /** * 在线人数 */ private static int onlineCount = 0; /** * 在线人员session */ private static ConcurrentHashMap webSocketMap = new ConcurrentHashMap<>(); /** * 与某个客户端的连接会话,需要通过它来给客户端发送数据 */ private Session session; /** * 接收userId */ private String userId = ""; //定时任务 private ScheduledExecutorService scheduler; //泊位ID private Integer beId = 0; private static final Object lock = new Object(); private static ApplicationContext context; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { context = applicationContext; } public static T getBean(Class beanClass) { return context.getBean(beanClass); } /** * 连接建立成功调用的方法 */ @OnOpen public void onOpen(Session session, @PathParam("userId") String userId) { this.session = session; this.userId = userId; if (webSocketMap.containsKey(userId)) { webSocketMap.remove(userId); webSocketMap.put(userId, this); } else { webSocketMap.put(userId, this); } try { sendMessage("连接成功"); } catch (IOException e) { log.error("用户:" + userId + ",网络异常!"); } } /** * 连接关闭调用的方法 */ @OnClose public void onClose() { if (scheduler != null) { scheduler.shutdown(); } if (webSocketMap.containsKey(userId)) { webSocketMap.remove(userId); } } /** * 收到客户端消息后调用的方法 * * @param message 客户端发送过来的消息 */ @OnMessage public void onMessage(String message, Session session) { JSONObject beObject = JSONObject.parseObject(message); String type = beObject.getString("type"); try { beId = Integer.parseInt(beObject.getString("beId")); if (!type.isEmpty() && type.equals("startSupply") && beId != null && beId != 0) { //创建定时任务 scheduler = Executors.newSingleThreadScheduledExecutor(); //每10秒执行一次 scheduler.scheduleAtFixedRate(() -> { try { synchronized (lock) { if (session != null && session.isOpen()) { JSONObject jsonObject = new JSONObject(); jsonObject.put("type", "Supply"); EquipmentService equipmentService = getBean(EquipmentService.class); LambdaQueryWrapper oilWrapper = new LambdaQueryWrapper<>(); oilWrapper.eq(DpEquipment::getBeId, beId) .in(DpEquipment::getEquipmentTypeId, DataTypeEnum.LIJIUOIL.getCode(), DataTypeEnum.OIL.getCode(), DataTypeEnum.LIJIUJUN.getCode()) .orderByAsc(DpEquipment::getId); List oilList = equipmentService.list(oilWrapper); JSONArray oilArray = new JSONArray(); for (DpEquipment dpEquipment : oilList) { ReceiveOilValue oilValue = (ReceiveOilValue) equDataList(dpEquipment.getId()); if (oilValue != null) { oilValue.setId(dpEquipment.getId().longValue()); } oilArray.add(oilValue); } jsonObject.put("oil", oilArray); LambdaQueryWrapper waterWrapper = new LambdaQueryWrapper<>(); waterWrapper.eq(DpEquipment::getBeId, beId) .eq(DpEquipment::getEquipmentTypeId, 2) .orderByAsc(DpEquipment::getId); List waterList = equipmentService.list(waterWrapper); JSONArray waterArray = new JSONArray(); for (DpEquipment dpEquipment : waterList) { ReceiveWaterValue waterValue = (ReceiveWaterValue) equDataList(dpEquipment.getId()); if (waterValue != null) { waterValue.setId(dpEquipment.getId().longValue()); } waterArray.add(waterValue); } jsonObject.put("water", waterArray); LambdaQueryWrapper elecWrapper = new LambdaQueryWrapper<>(); elecWrapper.eq(DpEquipment::getBeId, beId) .eq(DpEquipment::getEquipmentTypeId, 3) .orderByAsc(DpEquipment::getId); List elecList = equipmentService.list(elecWrapper); JSONArray elecArray = new JSONArray(); for (DpEquipment dpEquipment : elecList) { ReceiveElectricityValue electricityValue = (ReceiveElectricityValue) equDataList(dpEquipment.getId()); if (electricityValue != null) { electricityValue.setId(dpEquipment.getId().longValue()); } elecArray.add(electricityValue); } jsonObject.put("elec", elecArray); DsTaskListService dsTaskListService = getBean(DsTaskListService.class); // QueryWrapper queryWrapper = new QueryWrapper<>(); // queryWrapper.eq("BERTH_ID", beId).orderByDesc("PKID").last("LIMIT 1"); // String rfidNumStr = dsTaskList.getRfidNum(); QueryWrapper queryWrapperTask = new QueryWrapper<>(); queryWrapperTask.eq("TASK_ID",999).orderByDesc("CREATE_TIME").last("LIMIT 1"); DsTaskList dsTaskList = dsTaskListService.getOne(queryWrapperTask); String rfidNumStr = dsTaskList.getTaskId()+dsTaskList.getShipNo(); //根据任务列表中的rfidNum查询RFID任务 List rfidTaskList = GetRfidTaskList(rfidNumStr); jsonObject.put("RFID_Task", rfidTaskList); //RFID任务GoodsList中获取物品信息 // if(rfidTaskList.size()>0){ // DpRfidTask dpRfidTask = rfidTaskList.get(0); // JSONArray jsonArray = JSON.parseArray(dpRfidTask.getGoodsList().toString()); // JSONObject goods = jsonArray.getJSONObject(0); // String rfidNum = goods.getString("rfid"); // } //根据任务列表中的rfidNum查询RFID车辆数据 List vehicleList = GetVehicleList(rfidNumStr); jsonObject.put("RFID_Vehicle", vehicleList); sendMessage(jsonObject.toString()); } } } catch (Exception e) { e.printStackTrace(); } }, 0, 10, TimeUnit.SECONDS ); } else if (type.equals("endSupply")) { onClose(); } else { sendMessage("传参失败"); } } catch (Exception e) { log.error("心跳失败,客户端已断线", e); } } /** * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { log.error("用户错误:" + this.userId + ",原因:" + error.getMessage()); error.printStackTrace(); } /** * 实现服务器主动推送 */ public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); } /** * 发送自定义消息 */ public static void sendInfo(String message, @PathParam("userId") String userId) throws IOException { log.info("发送消息到:" + userId + ",报文:" + message); if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) { webSocketMap.get(userId).sendMessage(message); } else { log.error("用户" + userId + ",不在线!"); } } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { WebSocketEquServer.onlineCount++; } public static synchronized void subOnlineCount() { WebSocketEquServer.onlineCount--; } /** * 实现服务器主动推送 */ public static void sendAllMessage(String message) { Iterator key = webSocketMap.keys().asIterator(); while (key.hasNext()) { String strList = (String) key.next(); try { log.info(strList); webSocketMap.get(strList).sendMessage(message); } catch (Exception e) { e.printStackTrace(); log.error("推送失败:", e); } } } //根据FIELD_NAME查询设备详情经纬度(RFID) private DpEquipment GetEquipmentByFieldName(String fieldName){ EquipmentService equipmentService = getBean(EquipmentService.class); QueryWrapper equipmentQueryWrapper = new QueryWrapper<>(); equipmentQueryWrapper.eq("FIELD_NAME", fieldName); return equipmentService.getOne(equipmentQueryWrapper); } //根据任务列表中的rfidNum查询RFID车辆数据 private List GetVehicleList(String rfidNumStr){ DpRfidVehicleService dpRfidVehicleService = getBean(DpRfidVehicleService.class); List rfidVehicleList = new ArrayList<>(); DpRfidVehicle rfidVehicle = dpRfidVehicleService.QueryVehicleByTask(rfidNumStr); DpEquipment equipment = GetEquipmentByFieldName(rfidVehicle.getSn()); rfidVehicle.setEquId(equipment.getId()); rfidVehicle.setX(equipment.getX()); rfidVehicle.setY(equipment.getY()); rfidVehicle.setZ(equipment.getZ()); rfidVehicleList.add(rfidVehicle); return rfidVehicleList; } //根据任务列表中的rfidNum查询RFID任务 private List GetRfidTaskList(String rfidNumStr){ DpRfidTaskService dpRfidTaskService = getBean(DpRfidTaskService.class); LambdaQueryWrapper rfidTaskWrapper = new LambdaQueryWrapper<>(); List rfidTaskList = new ArrayList<>(); rfidTaskWrapper.like(DpRfidTask::getWzData, rfidNumStr).orderByDesc(DpRfidTask::getPassTime).last("LIMIT 1"); rfidTaskList = dpRfidTaskService.list(rfidTaskWrapper); return rfidTaskList; } private Object equDataList(Integer id) { EquipmentService equipmentService = getBean(EquipmentService.class); ReceiveOilValueService receiveOilValueService = getBean(ReceiveOilValueService.class); ReceiveWaterValueService receiveWaterValueService = getBean(ReceiveWaterValueService.class); ReceiveElectricityValueService receiveElectricityValueService = getBean(ReceiveElectricityValueService.class); DpEquipment receiveInfo = equipmentService.getById(id); if (receiveInfo == null) { return new Object(); } if (DataTypeEnum.LIJIUOIL.getCode().equals(receiveInfo.getEquipmentTypeId()) || DataTypeEnum.OIL.getCode().equals(receiveInfo.getEquipmentTypeId()) || DataTypeEnum.LIJIUJUN.getCode().equals(receiveInfo.getEquipmentTypeId())) { return receiveOilValueService.getOne(new LambdaQueryWrapper() {{ or().eq(ReceiveOilValue::getDeviceName, String.valueOf(receiveInfo.getId())) .orderByDesc(ReceiveOilValue::getCreateTime).last("LIMIT 1"); }}); } else if (DataTypeEnum.WATER_FLOW.getCode().equals(receiveInfo.getEquipmentTypeId()) || DataTypeEnum.WATER_YA.getCode().equals(receiveInfo.getEquipmentTypeId()) || DataTypeEnum.WATER_DEPT.getCode().equals(receiveInfo.getEquipmentTypeId())) { return receiveWaterValueService.getOne(new LambdaQueryWrapper() {{ or().eq(ReceiveWaterValue::getDeviceName, String.valueOf(receiveInfo.getId())) .orderByDesc(ReceiveWaterValue::getCreateTime).last("LIMIT 1"); }}); } else if (DataTypeEnum.ELECTRICITY.getCode().equals(receiveInfo.getEquipmentTypeId())) { return receiveElectricityValueService.getOne(new LambdaQueryWrapper() {{ or().eq(ReceiveElectricityValue::getDeviceName, String.valueOf(receiveInfo.getId())) .orderByDesc(ReceiveElectricityValue::getCreateTime).last("LIMIT 1"); }}); } return new Object(); } }