package com.ruoyi.web.webSocket;
|
|
import com.alibaba.fastjson2.JSON;
|
import com.alibaba.fastjson2.JSONArray;
|
import com.alibaba.fastjson2.JSONObject;
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
import com.ruoyi.fuzhou.domain.*;
|
import com.ruoyi.fuzhou.enums.DataTypeEnum;
|
import com.ruoyi.fuzhou.service.*;
|
import com.ruoyi.manage.domain.DsTaskList;
|
import com.ruoyi.manage.service.DsTaskListService;
|
import jakarta.websocket.*;
|
import jakarta.websocket.server.PathParam;
|
import jakarta.websocket.server.ServerEndpoint;
|
import org.apache.commons.lang3.StringUtils;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.beans.BeansException;
|
import org.springframework.context.ApplicationContext;
|
import org.springframework.context.ApplicationContextAware;
|
import org.springframework.stereotype.Component;
|
|
import java.io.IOException;
|
import java.util.ArrayList;
|
import java.util.Arrays;
|
import java.util.Iterator;
|
import java.util.List;
|
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.Executors;
|
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.TimeUnit;
|
|
@ServerEndpoint("/dp/sendOil/{userId}")
|
@Component
|
public class WebSocketEquServer implements ApplicationContextAware {
|
private final static Logger log = LoggerFactory.getLogger(WebSocketEquServer.class);
|
/**
|
* 在线人数
|
*/
|
private static int onlineCount = 0;
|
/**
|
* 在线人员session
|
*/
|
private static ConcurrentHashMap<String, WebSocketEquServer> webSocketMap = new ConcurrentHashMap<>();
|
/**
|
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
|
*/
|
private Session session;
|
/**
|
* 接收userId
|
*/
|
private String userId = "";
|
|
//定时任务
|
private ScheduledExecutorService scheduler;
|
//泊位ID
|
private Integer beId = 0;
|
private static final Object lock = new Object();
|
|
private static ApplicationContext context;
|
|
@Override
|
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
context = applicationContext;
|
}
|
|
public static <T> T getBean(Class<T> beanClass) {
|
return context.getBean(beanClass);
|
}
|
|
|
|
/**
|
* 连接建立成功调用的方法
|
*/
|
@OnOpen
|
public void onOpen(Session session, @PathParam("userId") String userId) {
|
this.session = session;
|
this.userId = userId;
|
if (webSocketMap.containsKey(userId)) {
|
webSocketMap.remove(userId);
|
webSocketMap.put(userId, this);
|
} else {
|
webSocketMap.put(userId, this);
|
}
|
try {
|
sendMessage("连接成功");
|
} catch (IOException e) {
|
log.error("用户:" + userId + ",网络异常!");
|
}
|
|
}
|
|
/**
|
* 连接关闭调用的方法
|
*/
|
@OnClose
|
public void onClose() {
|
if (scheduler != null) {
|
scheduler.shutdown();
|
}
|
if (webSocketMap.containsKey(userId)) {
|
webSocketMap.remove(userId);
|
}
|
}
|
|
|
/**
|
* 收到客户端消息后调用的方法
|
*
|
* @param message 客户端发送过来的消息
|
*/
|
@OnMessage
|
public void onMessage(String message, Session session) {
|
JSONObject beObject = JSONObject.parseObject(message);
|
String type = beObject.getString("type");
|
try {
|
beId = Integer.parseInt(beObject.getString("beId"));
|
if (!type.isEmpty() && type.equals("startSupply") && beId != null && beId != 0) {
|
//创建定时任务
|
scheduler = Executors.newSingleThreadScheduledExecutor();
|
//每10秒执行一次
|
scheduler.scheduleAtFixedRate(() -> {
|
try {
|
synchronized (lock) {
|
if (session != null && session.isOpen()) {
|
JSONObject jsonObject = new JSONObject();
|
jsonObject.put("type", "Supply");
|
EquipmentService equipmentService = getBean(EquipmentService.class);
|
LambdaQueryWrapper<DpEquipment> oilWrapper = new LambdaQueryWrapper<>();
|
oilWrapper.eq(DpEquipment::getBeId, beId)
|
.in(DpEquipment::getEquipmentTypeId, DataTypeEnum.LIJIUOIL.getCode(), DataTypeEnum.OIL.getCode(), DataTypeEnum.LIJIUJUN.getCode())
|
.orderByAsc(DpEquipment::getId);
|
List<DpEquipment> oilList = equipmentService.list(oilWrapper);
|
JSONArray oilArray = new JSONArray();
|
for (DpEquipment dpEquipment : oilList) {
|
ReceiveOilValue oilValue = (ReceiveOilValue) equDataList(dpEquipment.getId());
|
if (oilValue != null) {
|
oilValue.setId(dpEquipment.getId().longValue());
|
}
|
oilArray.add(oilValue);
|
}
|
jsonObject.put("oil", oilArray);
|
|
LambdaQueryWrapper<DpEquipment> waterWrapper = new LambdaQueryWrapper<>();
|
waterWrapper.eq(DpEquipment::getBeId, beId)
|
.eq(DpEquipment::getEquipmentTypeId, 2)
|
.orderByAsc(DpEquipment::getId);
|
List<DpEquipment> waterList = equipmentService.list(waterWrapper);
|
JSONArray waterArray = new JSONArray();
|
for (DpEquipment dpEquipment : waterList) {
|
ReceiveWaterValue waterValue = (ReceiveWaterValue) equDataList(dpEquipment.getId());
|
if (waterValue != null) {
|
waterValue.setId(dpEquipment.getId().longValue());
|
}
|
waterArray.add(waterValue);
|
}
|
jsonObject.put("water", waterArray);
|
|
LambdaQueryWrapper<DpEquipment> elecWrapper = new LambdaQueryWrapper<>();
|
elecWrapper.eq(DpEquipment::getBeId, beId)
|
.eq(DpEquipment::getEquipmentTypeId, 3)
|
.orderByAsc(DpEquipment::getId);
|
List<DpEquipment> elecList = equipmentService.list(elecWrapper);
|
JSONArray elecArray = new JSONArray();
|
for (DpEquipment dpEquipment : elecList) {
|
ReceiveElectricityValue electricityValue = (ReceiveElectricityValue) equDataList(dpEquipment.getId());
|
if (electricityValue != null) {
|
electricityValue.setId(dpEquipment.getId().longValue());
|
}
|
elecArray.add(electricityValue);
|
}
|
jsonObject.put("elec", elecArray);
|
|
DsTaskListService dsTaskListService = getBean(DsTaskListService.class);
|
|
// QueryWrapper<DsTaskList> queryWrapper = new QueryWrapper<>();
|
// queryWrapper.eq("BERTH_ID", beId).orderByDesc("PKID").last("LIMIT 1");
|
|
// String rfidNumStr = dsTaskList.getRfidNum();
|
|
QueryWrapper<DsTaskList> queryWrapperTask = new QueryWrapper<>();
|
queryWrapperTask.eq("TASK_ID",999).orderByDesc("CREATE_TIME").last("LIMIT 1");
|
DsTaskList dsTaskList = dsTaskListService.getOne(queryWrapperTask);
|
|
String rfidNumStr = dsTaskList.getTaskId()+dsTaskList.getShipNo();
|
//根据任务列表中的rfidNum查询RFID任务
|
List<DpRfidTask> rfidTaskList = GetRfidTaskList(rfidNumStr);
|
jsonObject.put("RFID_Task", rfidTaskList);
|
|
//RFID任务GoodsList中获取物品信息
|
// if(rfidTaskList.size()>0){
|
// DpRfidTask dpRfidTask = rfidTaskList.get(0);
|
// JSONArray jsonArray = JSON.parseArray(dpRfidTask.getGoodsList().toString());
|
// JSONObject goods = jsonArray.getJSONObject(0);
|
// String rfidNum = goods.getString("rfid");
|
// }
|
|
//根据任务列表中的rfidNum查询RFID车辆数据
|
List<DpRfidVehicle> vehicleList = GetVehicleList(rfidNumStr);
|
jsonObject.put("RFID_Vehicle", vehicleList);
|
|
sendMessage(jsonObject.toString());
|
|
}
|
}
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
}, 0, 10, TimeUnit.SECONDS
|
|
);
|
} else if (type.equals("endSupply")) {
|
onClose();
|
} else {
|
sendMessage("传参失败");
|
}
|
|
} catch (Exception e) {
|
log.error("心跳失败,客户端已断线", e);
|
}
|
}
|
|
/**
|
* @param session
|
* @param error
|
*/
|
@OnError
|
public void onError(Session session, Throwable error) {
|
log.error("用户错误:" + this.userId + ",原因:" + error.getMessage());
|
error.printStackTrace();
|
}
|
|
/**
|
* 实现服务器主动推送
|
*/
|
public void sendMessage(String message) throws IOException {
|
this.session.getBasicRemote().sendText(message);
|
}
|
|
|
/**
|
* 发送自定义消息
|
*/
|
public static void sendInfo(String message, @PathParam("userId") String userId) throws IOException {
|
log.info("发送消息到:" + userId + ",报文:" + message);
|
if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) {
|
webSocketMap.get(userId).sendMessage(message);
|
} else {
|
log.error("用户" + userId + ",不在线!");
|
}
|
}
|
|
public static synchronized int getOnlineCount() {
|
return onlineCount;
|
}
|
|
public static synchronized void addOnlineCount() {
|
WebSocketEquServer.onlineCount++;
|
}
|
|
public static synchronized void subOnlineCount() {
|
WebSocketEquServer.onlineCount--;
|
}
|
|
/**
|
* 实现服务器主动推送
|
*/
|
public static void sendAllMessage(String message) {
|
Iterator<String> key = webSocketMap.keys().asIterator();
|
while (key.hasNext()) {
|
String strList = (String) key.next();
|
try {
|
log.info(strList);
|
webSocketMap.get(strList).sendMessage(message);
|
} catch (Exception e) {
|
e.printStackTrace();
|
log.error("推送失败:", e);
|
}
|
}
|
}
|
//根据FIELD_NAME查询设备详情经纬度(RFID)
|
private DpEquipment GetEquipmentByFieldName(String fieldName){
|
EquipmentService equipmentService = getBean(EquipmentService.class);
|
QueryWrapper<DpEquipment> equipmentQueryWrapper = new QueryWrapper<>();
|
equipmentQueryWrapper.eq("FIELD_NAME", fieldName);
|
return equipmentService.getOne(equipmentQueryWrapper);
|
}
|
//根据任务列表中的rfidNum查询RFID车辆数据
|
private List<DpRfidVehicle> GetVehicleList(String rfidNumStr){
|
DpRfidVehicleService dpRfidVehicleService = getBean(DpRfidVehicleService.class);
|
List<DpRfidVehicle> rfidVehicleList = new ArrayList<>();
|
|
DpRfidVehicle rfidVehicle = dpRfidVehicleService.QueryVehicleByTask(rfidNumStr);
|
DpEquipment equipment = GetEquipmentByFieldName(rfidVehicle.getSn());
|
|
rfidVehicle.setEquId(equipment.getId());
|
rfidVehicle.setX(equipment.getX());
|
rfidVehicle.setY(equipment.getY());
|
rfidVehicle.setZ(equipment.getZ());
|
rfidVehicleList.add(rfidVehicle);
|
return rfidVehicleList;
|
}
|
//根据任务列表中的rfidNum查询RFID任务
|
private List<DpRfidTask> GetRfidTaskList(String rfidNumStr){
|
DpRfidTaskService dpRfidTaskService = getBean(DpRfidTaskService.class);
|
LambdaQueryWrapper<DpRfidTask> rfidTaskWrapper = new LambdaQueryWrapper<>();
|
List<DpRfidTask> rfidTaskList = new ArrayList<>();
|
rfidTaskWrapper.like(DpRfidTask::getWzData, rfidNumStr).orderByDesc(DpRfidTask::getPassTime).last("LIMIT 1");
|
rfidTaskList = dpRfidTaskService.list(rfidTaskWrapper);
|
return rfidTaskList;
|
}
|
|
private Object equDataList(Integer id) {
|
EquipmentService equipmentService = getBean(EquipmentService.class);
|
ReceiveOilValueService receiveOilValueService = getBean(ReceiveOilValueService.class);
|
ReceiveWaterValueService receiveWaterValueService = getBean(ReceiveWaterValueService.class);
|
ReceiveElectricityValueService receiveElectricityValueService = getBean(ReceiveElectricityValueService.class);
|
DpEquipment receiveInfo = equipmentService.getById(id);
|
if (receiveInfo == null) {
|
return new Object();
|
}
|
if (DataTypeEnum.LIJIUOIL.getCode().equals(receiveInfo.getEquipmentTypeId()) || DataTypeEnum.OIL.getCode().equals(receiveInfo.getEquipmentTypeId()) ||
|
DataTypeEnum.LIJIUJUN.getCode().equals(receiveInfo.getEquipmentTypeId())) {
|
return receiveOilValueService.getOne(new LambdaQueryWrapper<ReceiveOilValue>() {{
|
or().eq(ReceiveOilValue::getDeviceName, String.valueOf(receiveInfo.getId()))
|
.orderByDesc(ReceiveOilValue::getCreateTime).last("LIMIT 1");
|
}});
|
} else if (DataTypeEnum.WATER_FLOW.getCode().equals(receiveInfo.getEquipmentTypeId()) || DataTypeEnum.WATER_YA.getCode().equals(receiveInfo.getEquipmentTypeId())
|
|| DataTypeEnum.WATER_DEPT.getCode().equals(receiveInfo.getEquipmentTypeId())) {
|
return receiveWaterValueService.getOne(new LambdaQueryWrapper<ReceiveWaterValue>() {{
|
or().eq(ReceiveWaterValue::getDeviceName, String.valueOf(receiveInfo.getId()))
|
.orderByDesc(ReceiveWaterValue::getCreateTime).last("LIMIT 1");
|
}});
|
} else if (DataTypeEnum.ELECTRICITY.getCode().equals(receiveInfo.getEquipmentTypeId())) {
|
return receiveElectricityValueService.getOne(new LambdaQueryWrapper<ReceiveElectricityValue>() {{
|
or().eq(ReceiveElectricityValue::getDeviceName, String.valueOf(receiveInfo.getId()))
|
.orderByDesc(ReceiveElectricityValue::getCreateTime).last("LIMIT 1");
|
}});
|
}
|
return new Object();
|
}
|
|
}
|