package com.ruoyi.fuzhou.websocket;
|
|
import com.alibaba.fastjson2.JSON;
|
import com.alibaba.fastjson2.JSONArray;
|
import com.alibaba.fastjson2.JSONObject;
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.ruoyi.fuzhou.domain.*;
|
import com.ruoyi.fuzhou.enums.DataTypeEnum;
|
import com.ruoyi.fuzhou.service.*;
|
import jakarta.websocket.*;
|
import jakarta.websocket.server.PathParam;
|
import jakarta.websocket.server.ServerEndpoint;
|
import org.apache.commons.lang3.StringUtils;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.beans.BeansException;
|
import org.springframework.context.ApplicationContext;
|
import org.springframework.context.ApplicationContextAware;
|
import org.springframework.stereotype.Component;
|
|
import java.io.IOException;
|
import java.util.Iterator;
|
import java.util.List;
|
import java.util.concurrent.*;
|
|
//@ServerEndpoint("/dp/sendOil/{userId}")
|
//@Component
|
public class WebSocketOilServer implements ApplicationContextAware {
|
private final static Logger log = LoggerFactory.getLogger(WebSocketOilServer.class);
|
/**
|
* 在线人数
|
*/
|
private static int onlineCount = 0;
|
/**
|
* 在线人员session
|
*/
|
private static ConcurrentHashMap<String, WebSocketOilServer> webSocketMap = new ConcurrentHashMap<>();
|
/**
|
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
|
*/
|
private Session session;
|
/**
|
* 接收userId
|
*/
|
private String userId = "";
|
|
//定时任务
|
private ScheduledExecutorService scheduler;
|
//泊位ID
|
private Integer beId = 0;
|
private static final Object lock = new Object();
|
|
private static ApplicationContext context;
|
|
@Override
|
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
context = applicationContext;
|
}
|
public static <T> T getBean(Class<T> beanClass){
|
return context.getBean(beanClass);
|
}
|
|
|
// private EquipmentService equipmentService;
|
// private ReceiveOilValueService receiveOilValueService;
|
// private ReceiveWaterValueService receiveWaterValueService;
|
// private ReceiveElectricityValueService receiveElectricityValueService;
|
// private DpRfidTaskService dpRfidTaskService;
|
// private DpRfidVehicleService dpRfidVehicleService;
|
|
|
/**
|
* 连接建立成功调用的方法
|
*/
|
@OnOpen
|
public void onOpen(Session session, @PathParam("userId") String userId) {
|
this.session = session;
|
this.userId = userId;
|
if (webSocketMap.containsKey(userId)) {
|
webSocketMap.remove(userId);
|
webSocketMap.put(userId, this);
|
//加入set中
|
} else {
|
webSocketMap.put(userId, this);
|
}
|
try {
|
sendMessage("连接成功");
|
} catch (IOException e) {
|
log.error("用户:" + userId + ",网络异常!!!!!!");
|
}
|
|
}
|
|
/**
|
* 连接关闭调用的方法
|
*/
|
@OnClose
|
public void onClose() {
|
if(scheduler != null){
|
scheduler.shutdown();
|
}
|
if (webSocketMap.containsKey(userId)) {
|
webSocketMap.remove(userId);
|
}
|
}
|
|
|
/**
|
* 收到客户端消息后调用的方法
|
*
|
* @param message 客户端发送过来的消息
|
*/
|
@OnMessage
|
public void onMessage(String message, Session session) {
|
JSONObject beObject = JSONObject.parseObject(message);
|
String type = beObject.getString("type");
|
try {
|
if(!type.isEmpty()){
|
sendMessage(type);
|
// beId = Integer.parseInt(beObject.getString("beId"));
|
// if(!type.isEmpty() && type.equals("startSupply") && beId != null && beId != 0){
|
// //创建定时任务
|
// scheduler = Executors.newSingleThreadScheduledExecutor();
|
// //每10秒执行一次
|
// scheduler.scheduleAtFixedRate(()->{
|
// try {
|
// synchronized (lock){
|
// if(session !=null && session.isOpen()){
|
// JSONObject jsonObject = new JSONObject();
|
// jsonObject.put("type","Supply");
|
// EquipmentService equipmentService = getBean(EquipmentService.class);
|
// LambdaQueryWrapper<DpEquipment> oilWrapper = new LambdaQueryWrapper<>();
|
// oilWrapper.eq(DpEquipment::getBeId,beId)
|
// .in(DpEquipment::getEquipmentTypeId,DataTypeEnum.LIJIUOIL.getCode(),DataTypeEnum.OIL.getCode(),DataTypeEnum.LIJIUJUN.getCode())
|
// .orderByAsc(DpEquipment::getId);
|
// List<DpEquipment> oilList = equipmentService.list(oilWrapper);
|
// JSONArray oilArray = new JSONArray();
|
// for (DpEquipment dpEquipment:oilList){
|
// ReceiveOilValue oilValue = (ReceiveOilValue) equDataList(dpEquipment.getId());
|
// if(oilValue != null){
|
// oilValue.setId(dpEquipment.getId().longValue());
|
// }
|
// oilArray.add(oilValue);
|
// }
|
// jsonObject.put("oil",oilArray);
|
//
|
// LambdaQueryWrapper<DpEquipment> waterWrapper = new LambdaQueryWrapper<>();
|
// waterWrapper.eq(DpEquipment::getBeId,beId)
|
// .eq(DpEquipment::getEquipmentTypeId,2)
|
// .orderByAsc(DpEquipment::getId);
|
// List<DpEquipment> waterList = equipmentService.list(waterWrapper);
|
// JSONArray waterArray = new JSONArray();
|
// for (DpEquipment dpEquipment:waterList){
|
// ReceiveWaterValue waterValue = (ReceiveWaterValue) equDataList(dpEquipment.getId());
|
// if(waterValue != null){
|
// waterValue.setId(dpEquipment.getId().longValue());
|
// }
|
// waterArray.add(waterValue);
|
// }
|
// jsonObject.put("water",waterArray);
|
//
|
// LambdaQueryWrapper<DpEquipment> elecWrapper = new LambdaQueryWrapper<>();
|
// elecWrapper.eq(DpEquipment::getBeId,beId)
|
// .eq(DpEquipment::getEquipmentTypeId,3)
|
// .orderByAsc(DpEquipment::getId);
|
// List<DpEquipment> elecList = equipmentService.list(elecWrapper);
|
// JSONArray elecArray = new JSONArray();
|
// for (DpEquipment dpEquipment:elecList){
|
// ReceiveElectricityValue electricityValue = (ReceiveElectricityValue)equDataList(dpEquipment.getId());
|
// if(electricityValue != null){
|
// electricityValue.setId(dpEquipment.getId().longValue());
|
// }
|
// elecArray.add(electricityValue);
|
// }
|
// jsonObject.put("elec",elecArray);
|
//
|
// DpRfidTaskService dpRfidTaskService = getBean(DpRfidTaskService.class);
|
// LambdaQueryWrapper<DpRfidTask> rfidTaskWrapper = new LambdaQueryWrapper<>();
|
// rfidTaskWrapper.orderByDesc(DpRfidTask::getCreateTime).last("LIMIT 1");
|
// List<DpRfidTask> rfidTaskList = dpRfidTaskService.list(rfidTaskWrapper);
|
// DpRfidTask dpRfidTask = rfidTaskList.get(0);
|
// JSONArray jsonArray = JSON.parseArray(dpRfidTask.getGoodsList().toString());
|
// JSONObject goods = jsonArray.getJSONObject(0);
|
// String rfidNum = goods.getString("rfid");
|
// jsonObject.put("RFID_Task",rfidTaskList);
|
//
|
// DpRfidVehicleService dpRfidVehicleService = getBean(DpRfidVehicleService.class);
|
// LambdaQueryWrapper<DpRfidVehicle> rfidVehicleWrapper = new LambdaQueryWrapper<>();
|
// rfidVehicleWrapper.eq(DpRfidVehicle::getRfidNum,rfidNum)
|
// .orderByDesc(DpRfidVehicle::getPassTime).last("LIMIT 1");
|
// List<DpRfidVehicle> rfidVehicleList = dpRfidVehicleService.list(rfidVehicleWrapper);
|
// jsonObject.put("RFID_Vehicle",rfidVehicleList);
|
//
|
// sendMessage(jsonObject.toString());
|
//
|
// }
|
// }
|
// }catch (Exception e){
|
// e.printStackTrace();
|
// }
|
// },0,10, TimeUnit.SECONDS
|
//
|
// );
|
}else if(type.equals("endSupply")){
|
onClose();
|
}
|
else {
|
sendMessage("传参失败");
|
}
|
|
} catch (Exception e) {
|
log.error("心跳失败,客户端已断线", e);
|
}
|
}
|
|
/**
|
* @param session
|
* @param error
|
*/
|
@OnError
|
public void onError(Session session, Throwable error) {
|
log.error("用户错误:" + this.userId + ",原因:" + error.getMessage());
|
error.printStackTrace();
|
}
|
|
/**
|
* 实现服务器主动推送
|
*/
|
public void sendMessage(String message) throws IOException {
|
this.session.getBasicRemote().sendText(message);
|
}
|
|
|
/**
|
* 发送自定义消息
|
*/
|
public static void sendInfo(String message, @PathParam("userId") String userId) throws IOException {
|
log.info("发送消息到:" + userId + ",报文:" + message);
|
if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) {
|
webSocketMap.get(userId).sendMessage(message);
|
} else {
|
log.error("用户" + userId + ",不在线!");
|
}
|
}
|
|
public static synchronized int getOnlineCount() {
|
return onlineCount;
|
}
|
|
public static synchronized void addOnlineCount() {
|
WebSocketOilServer.onlineCount++;
|
}
|
|
public static synchronized void subOnlineCount() {
|
WebSocketOilServer.onlineCount--;
|
}
|
|
/**
|
* 实现服务器主动推送
|
*/
|
public static void sendAllMessage(String message) {
|
Iterator<String> key = webSocketMap.keys().asIterator();
|
while (key.hasNext()) {
|
String strList = (String) key.next();
|
try {
|
log.info(strList);
|
webSocketMap.get(strList).sendMessage(message);
|
} catch (Exception e) {
|
e.printStackTrace();
|
log.error("推送失败!", e);
|
}
|
}
|
}
|
|
// public Object equDataList(Integer id){
|
// EquipmentService equipmentService = getBean(EquipmentService.class);
|
// ReceiveOilValueService receiveOilValueService = getBean(ReceiveOilValueService.class);
|
// ReceiveWaterValueService receiveWaterValueService = getBean(ReceiveWaterValueService.class);
|
// ReceiveElectricityValueService receiveElectricityValueService = getBean(ReceiveElectricityValueService.class);
|
// DpEquipment receiveInfo = equipmentService.getById(id);
|
// if (receiveInfo == null) {
|
// return new Object();
|
// }
|
// if (DataTypeEnum.LIJIUOIL.getCode().equals(receiveInfo.getEquSeType()) || DataTypeEnum.OIL.getCode().equals(receiveInfo.getEquSeType())) {
|
// return receiveOilValueService.getOne(new LambdaQueryWrapper<ReceiveOilValue>() {{
|
// or().eq(ReceiveOilValue::getDeviceName, String.valueOf(receiveInfo.getId()))
|
// .orderByDesc(ReceiveOilValue::getCreateTime).last("LIMIT 1");
|
// }});
|
// } else if (DataTypeEnum.WATER_FLOW.getCode().equals(receiveInfo.getEquSeType()) || DataTypeEnum.WATER_YA.getCode().equals(receiveInfo.getEquSeType())
|
// || DataTypeEnum.WATER_DEPT.getCode().equals(receiveInfo.getEquSeType())) {
|
// return receiveWaterValueService.getOne(new LambdaQueryWrapper<ReceiveWaterValue>() {{
|
// or().eq(ReceiveWaterValue::getDeviceName, String.valueOf(receiveInfo.getId()))
|
// .orderByDesc(ReceiveWaterValue::getCreateTime).last("LIMIT 1");
|
// }});
|
// } else if (DataTypeEnum.ELECTRICITY.getCode().equals(receiveInfo.getEquSeType())) {
|
// return receiveElectricityValueService.getOne(new LambdaQueryWrapper<ReceiveElectricityValue>() {{
|
// or().eq(ReceiveElectricityValue::getDeviceName, String.valueOf(receiveInfo.getId()))
|
// .orderByDesc(ReceiveElectricityValue::getCreateTime).last("LIMIT 1");
|
// }});
|
// }
|
// return new Object();
|
// }
|
|
}
|