我想实现一个合并请求工具类,思路:
测试时发现,打印每个线程的返回值时,偶尔会遇到返回值为 null 。
传递的请求封装对象:
class BizTask {
private Thread thread;
private Object param;
private Object response;
public Object getParam() {
return param;
}
public void setParam(Object param) {
this.param = param;
}
public Object getResponse() {
return response;
}
public void setResponse(Object response) {
this.response = response;
}
public Thread getThread() {
return thread;
}
public void setThread(Thread thread) {
this.thread = thread;
}
}
工具类:
public class BatchHelper {
private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 10, 20L, TimeUnit.MINUTES, new LinkedBlockingQueue<>(100000), new ThreadPoolExecutor.CallerRunsPolicy());
private final BlockingQueue<BizTask> requestQueue = new LinkedBlockingQueue<>();
private final Consumer<List<BizTask>> function;
public BatchHelper(Consumer<List<BizTask>> function) {
this.function = function;
threadPool.execute(this::autoDispatch);
}
public Object take(Object param) {
BizTask task = new BizTask();
task.setParam(param);
task.setThread(Thread.currentThread());
// synchronized (task) {
try {
requestQueue.put(task);
} catch (InterruptedException ignored) {
}
// 阻塞
LockSupport.park();
// }
return task.getResponse();
}
public void autoDispatch() {
while (true) {
try {
BizTask t1 = requestQueue.take();
List<BizTask> tasks = new ArrayList<>(128);
tasks.add(t1);
BizTask t2 = requestQueue.poll();
if (t2 == null) {
dispatch(tasks);
continue;
}
int sum = 1;
while (t2 != null) {
tasks.add(t2);
sum = (sum + 1) & 127;
if (sum == 0) {
dispatch(tasks);
tasks = new ArrayList<>(128);
}
t2 = requestQueue.poll();
}
if (sum > 0) {
dispatch(tasks);
}
Thread.sleep(10);
} catch (InterruptedException ignored) {
}
}
}
private void dispatch(List<BizTask> list) {
threadPool.execute(() -> {
// 批量处理,设置返回值
function.accept(list);
for (BizTask task : list) {
// synchronized (task) {
Thread lock = task.getThread();
// 唤醒
LockSupport.unpark(lock);
// }
}
});
}
}
测试案例:
public class TestDemo {
public static void main(String[] args) {
BatchHelper batchHelper = new BatchHelper((tasks) -> {
for (int i = 0, tasksSize = tasks.size(); i < tasksSize; i++) {
BizTask task = tasks.get(i);
// System.out.println("requestId = " + requestId);
task.setResponse(i);
}
});
// 模拟请求线程
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(20, 20, 20L, TimeUnit.MINUTES, new LinkedBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 100; i++) {
int finalI = i;
threadPool.execute(() -> {
Object take = batchHelper.take(finalI + "");
if (take == null) {
System.out.println("take = " + take);
}
});
}
}
}
1
yusheng88 OP 为什么会出现返回值为 null?
|
2
TylerYY 2022-12-15 09:04:40 +08:00
是不是可见性问题呢?线程 tn 设置返回值后,upark 阻塞的线程,唤醒的线程不一定能立即看到设置的返回值?
给 BizTask 的 response 加一个 volatile 修饰试下 |
4
senninha 2022-12-15 11:21:09 +08:00
可能的原因:
LockSupport 的 permit 提前被设置了,这时候调用 park 会直接返回,resp 肯定就是 null 了,也就是 park 与 unpark 调用不对称?难道是 LinkedBlockingQueue 有问题? 看文档还有这三种情况 park 会直接返回: Some other thread invokes unpark with the current thread as the target; or Some other thread interrupts the current thread; or The call spuriously (that is, for no reason) returns. |
5
yusheng88 OP @senninha 这就是我觉得奇怪的地方,设置值在 unpark 前,获取 take 前会阻塞,无法理解为什么会出现 take=null 。我尝试过打印执行次数,次数是正确的
|
6
oldshensheep 2022-12-15 14:57:40 +08:00 1
应该是因为这个 Spurious wakeup
类似的问题 https://stackoverflow.com/questions/67118821/futuretask-get-method-may-distable-locksupport-park https://stackoverflow.com/questions/1050592/do-spurious-wakeups-in-java-actually-happen 我简化了楼主的代码逻辑 if (task.data == null) { System.out.println(task); System.out.println(task); } 里面的 56-58 行代码输出有时是这样的 Task{id=3678, data='null', thread=Thread[pool-1-thread-3,5,main]} Task{id=3678, data='3678 FINISHED.', thread=Thread[pool-1-thread-3,5,main]} 应该是提取被唤醒了…… https://gist.github.com/oldshensheep/034044093ce9608ee3d02d7629c2bf81 |
7
GloryJie 2022-12-15 15:13:33 +08:00
在执行 setResponse 之前打印时间 A ,take == null 的时候时间 B 。得出 B < A 的,还没执行前,线程就被唤醒唤醒了。感觉是楼上说的 spuriously 的原因
|
8
senninha 2022-12-15 15:36:52 +08:00
@yusheng88 楼下有说到 spuriously (that is, for no reason) returns 。看 unlock 的说明也是需要 re-check condition 的。
Callers should re-check the conditions which caused the thread to park in the first place. |
9
yusheng88 OP @oldshensheep 感谢大佬,就是这个原因了,看注释,没理解 spuriously 调用是啥 0.0
|
10
strayerxx 2022-12-21 15:14:15 +08:00
我在想 LinkedBlockingQueue 底层好像也是使用 park 和 unpark 而且也都是对当前线程操作,会不会是相互之间产生了影响
|