栅栏 CyclicBarrier

栅栏 CyclicBarrier

场景

3个人在操场跑步,跑道有两个关卡,需要3人同时通过。每个人的速度不同,先跑到关卡的人需要等待其他人然后一同通过。

实现思路

线程 T1, T2, T3, 关卡 A, B.关卡计数器等于总线程数(3)

每个线程运行到关卡,计数器减 1, 判断计数器是否为 0,不为 0 进入等待状态, 为 0 唤醒所有线程一起通过。

Java 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103


public class GuanQia {

private int threadSize;

// 防止竞争的锁,线程等待、唤醒
private final Object lock = new Object();


private Runnable runnable;

// 计数器
private volatile int count;

/**
* 构造关卡需要指定线程数量
*/
public GuanQia(int threadSiz, Runnable runnable) {
this.threadSize = threadSiz;
this.count = threadSiz;
this.runnable = runnable;
}

public void await() {
synchronized (lock) {
// 判断是最后一个线程
int i = --count;
if (i == 0) {
if (runnable != null) {
runnable.run();
}
notifyAllNow();
return;
}
}
try {
synchronized (lock) {
// 进入等待状态
this.lock.wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}

// 唤醒所有线程继续执行
private void notifyAllNow() {
synchronized (lock) {
this.lock.notifyAll();
}
}
}



public class GuanQiaTest {
public static void main(String[] args) {
GuanQia g1 = new GuanQia(3, () -> {
System.out.println(Thread.currentThread().getName() + ": 关卡 1 放行");
});
GuanQia g2 = new GuanQia(3, () -> {
System.out.println(Thread.currentThread().getName() + ": 关卡 2 放行");
});
Runnable runnable = () -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 我到关卡 1 了。。。");
g1.await();

try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 我到关卡 2 了。。。");
g2.await();
System.out.println(Thread.currentThread().getName() + " DONE");
};
ThreadUtils.start(runnable);
ThreadUtils.start(runnable);
ThreadUtils.start(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 我到关卡 1 了。。。");
g1.await();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 我到关卡 2 了。。。");
g2.await();
System.out.println(Thread.currentThread().getName() + " DONE");
});
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
Thread-1 我到关卡 1 了。。。
Thread-0 我到关卡 1 了。。。
Thread-2 我到关卡 1 了。。。
Thread-2: 关卡 1 放行
Thread-0 我到关卡 2 了。。。
Thread-1 我到关卡 2 了。。。
Thread-2 我到关卡 2 了。。。
Thread-2: 关卡 2 放行
Thread-2 DONE
Thread-1 DONE
Thread-0 DONE

大致实现了业务场景,但是还有许多问题,比如:如果有一个线程未到达或者异常终止了,其他线程都会处于等待状态。这里需要实现最后一个到达线程的等待超时时间,当最后一个到达的线程等待超过指定时间,则唤醒其他线程继续往下执行。

Jdk 的实现

java.util.concurrent.CyclicBarrier

参考: https://blog.csdn.net/defonds/article/details/44021605/