leutu
2024-06-03 3ef35e6cd16bbfa206b26bb3271eac40ad020bcb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package com.fastbee.base.core.hanler;
 
import com.fastbee.common.core.protocol.Message;
import com.fastbee.base.session.Session;
import com.fastbee.base.util.VirtualList;
import com.fastbee.common.exception.ServiceException;
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.extern.slf4j.Slf4j;
 
import java.lang.reflect.Method;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
/**
 * 异步批量处理报文
 * @author bill
 */
@Slf4j
public class AsyncBatchHandler extends BaseHandler{
 
 
   /*消息处理队列*/
   private final ConcurrentLinkedQueue<Message> queue;
 
   /*线程池*/
   private final ThreadPoolExecutor executor;
 
   private final int poolSize;
 
   private final int maxEventSize;
 
   private final int maxWait;
 
   private final int warningLines;
 
 
    public AsyncBatchHandler(Object target, Method targetMethod, String desc, int poolSize, int maxEventSize, int maxWait) {
 
        super(target, targetMethod, desc);
        Class<?>[] parameterTypes = targetMethod.getParameterTypes();
        if (parameterTypes.length >1){
            throw new ServiceException("参数列表过长");
        }
        if (!parameterTypes[0].isAssignableFrom(List.class)){
            throw new ServiceException("参数不是List类型");
        }
 
        this.poolSize = poolSize;
        this.maxEventSize = maxEventSize;
        this.maxWait = maxWait;
        this.warningLines = maxEventSize * poolSize * 50;
 
        this.queue = new ConcurrentLinkedQueue<>();
        this.executor = new ThreadPoolExecutor(this.poolSize,this.poolSize,1000L, TimeUnit.MILLISECONDS
                ,new LinkedBlockingQueue<>(500),new DefaultThreadFactory(targetMethod.getName()));
 
        for (int i = 0; i < poolSize; i++) {
            boolean start = i == 0;
            executor.execute(()->{
                try {
                    startInternal(start);
                }catch (Exception e){
                    log.error("线程池处理数据出错",e);
                }
            });
        }
 
    }
 
    @Override
    public <T extends Message> T invoke(T request, Session session) throws Exception {
        queue.offer(request);
        return null;
    }
 
    public void startInternal(boolean master) {
        Message[] array = new Message[maxEventSize];
        long logtime = 0;
        long starttime = 0;
 
        for (; ; ) {
            Message temp;
            int i = 0;
            while ((temp = queue.poll()) != null) {
                array[i++] = temp;
                if (i >= maxEventSize) {
                    break;
                }
            }
 
            if (i > 0) {
                starttime = System.currentTimeMillis();
                try {
                    targetMethod.invoke(targetObject, new VirtualList<>(array, i));
                } catch (Exception e) {
                    log.warn(targetMethod.getName(), e);
                }
                long time = System.currentTimeMillis() - starttime;
                if (time > 1000L) {
                    log.warn("线程池处理数据耗时:{}ms,共{}条记录", time, i);
                }
            }
 
            if (i < maxEventSize) {
                try {
                    for (int j = 0; j < i; j++) {
                        array[j] = null;
                    }
                    Thread.sleep(maxWait);
                } catch (InterruptedException e) {
                    log.error("sleep error!");
                }
            } else if (master) {
                if (logtime < starttime) {
                    logtime = starttime + 5000L;
 
                    int size = queue.size();
                    if (size > warningLines) {
                        log.warn("线程池队列已满, size:{}", size);
                    }
                }
            }
        }
    }
}