15 CyclicBarrier实现原理

vvEcho 2025-03-01 14:13:49
Categories: Tags:

总结
CyclicBarrier 基于 ReentrantLock 和 Condition 实现,使用 count 记录未到达线程数;线程调用 await 时递减 count,未到0则进入条件队列等待;当最后一个线程到达时,唤醒所有等待线程,并重置count与generation,实现循环使用,同时通过 generation 防止线程串轮

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/**
* 模拟CyclicBarrier 的线程交替执行 然后执行完后同步等待其他线程执行完 然后接着汇总数据
*
* @author: echo
* @date: 2025/3/1
*/
public class CyclicBarrierTest {
// 阶段数
private static final int PHASES = 2;
// 线程数
private static final int THREAD_NUM = 3;

public static void main(String[] args) {
// 创建可复用的屏障,指定线程数和阶段完成后的汇总操作
CyclicBarrier barrier = new CyclicBarrier(THREAD_NUM, () -> {
System.out.println("所有线程都已到达屏障开始放行,进行下一步汇总校验");
});
ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUM);
// 模拟2个处理阶段
for (int phase = 1; phase <= PHASES; phase++) {
System.out.println("\n ===== 开始阶段 " + phase + " =====");

for (int i = 0; i < THREAD_NUM; i++) {
final int threadId = i + 1;
int finalPhase = phase;
executor.submit(() -> {
processDataChunk(threadId, finalPhase); // 处理数据分片
awaitBarrier(barrier, threadId); // 等待其他线程
});
}
// 重置屏障以复用(CyclicBarrier自动重置)
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
}
}
executor.shutdown();
}

// 处理数据分片(模拟耗时操作)
private static void processDataChunk(int threadId, int phase) {
System.out.printf("线程-%d 正在处理阶段%d的数据...%n", threadId, phase);
try {
Thread.sleep((long) (Math.random() * 2000));
} catch (InterruptedException e) {
}
}

// 等待屏障
private static void awaitBarrier(CyclicBarrier barrier, int threadId) {
try {
System.out.printf("线程-%d 已到达屏障,等待其他线程...%n", threadId);
barrier.await();
System.out.printf("线程-%d 突破屏障,继续后续操作%n", threadId);
} catch (Exception e) {
e.printStackTrace();
}
}
}