package com.fastbee.base.core.hanler; import com.fastbee.common.core.protocol.Message; import com.fastbee.base.session.Session; import com.fastbee.base.util.VirtualList; import com.fastbee.common.exception.ServiceException; import io.netty.util.concurrent.DefaultThreadFactory; import lombok.extern.slf4j.Slf4j; import java.lang.reflect.Method; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * 异步批量处理报文 * @author bill */ @Slf4j public class AsyncBatchHandler extends BaseHandler{ /*消息处理队列*/ private final ConcurrentLinkedQueue queue; /*线程池*/ private final ThreadPoolExecutor executor; private final int poolSize; private final int maxEventSize; private final int maxWait; private final int warningLines; public AsyncBatchHandler(Object target, Method targetMethod, String desc, int poolSize, int maxEventSize, int maxWait) { super(target, targetMethod, desc); Class[] parameterTypes = targetMethod.getParameterTypes(); if (parameterTypes.length >1){ throw new ServiceException("参数列表过长"); } if (!parameterTypes[0].isAssignableFrom(List.class)){ throw new ServiceException("参数不是List类型"); } this.poolSize = poolSize; this.maxEventSize = maxEventSize; this.maxWait = maxWait; this.warningLines = maxEventSize * poolSize * 50; this.queue = new ConcurrentLinkedQueue<>(); this.executor = new ThreadPoolExecutor(this.poolSize,this.poolSize,1000L, TimeUnit.MILLISECONDS ,new LinkedBlockingQueue<>(500),new DefaultThreadFactory(targetMethod.getName())); for (int i = 0; i < poolSize; i++) { boolean start = i == 0; executor.execute(()->{ try { startInternal(start); }catch (Exception e){ log.error("线程池处理数据出错",e); } }); } } @Override public T invoke(T request, Session session) throws Exception { queue.offer(request); return null; } public void startInternal(boolean master) { Message[] array = new Message[maxEventSize]; long logtime = 0; long starttime = 0; for (; ; ) { Message temp; int i = 0; while ((temp = queue.poll()) != null) { array[i++] = temp; if (i >= maxEventSize) { break; } } if (i > 0) { starttime = System.currentTimeMillis(); try { targetMethod.invoke(targetObject, new VirtualList<>(array, i)); } catch (Exception e) { log.warn(targetMethod.getName(), e); } long time = System.currentTimeMillis() - starttime; if (time > 1000L) { log.warn("线程池处理数据耗时:{}ms,共{}条记录", time, i); } } if (i < maxEventSize) { try { for (int j = 0; j < i; j++) { array[j] = null; } Thread.sleep(maxWait); } catch (InterruptedException e) { log.error("sleep error!"); } } else if (master) { if (logtime < starttime) { logtime = starttime + 5000L; int size = queue.size(); if (size > warningLines) { log.warn("线程池队列已满, size:{}", size); } } } } } }