总结
CyclicBarrier 基于 ReentrantLock 和 Condition 实现,使用 count 记录未到达线程数;线程调用 await 时递减 count,未到0则进入条件队列等待;当最后一个线程到达时,唤醒所有等待线程,并重置count与generation,实现循环使用,同时通过 generation 防止线程串轮
例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
|
public class CyclicBarrierTest { private static final int PHASES = 2; private static final int THREAD_NUM = 3;
public static void main(String[] args) { CyclicBarrier barrier = new CyclicBarrier(THREAD_NUM, () -> { System.out.println("所有线程都已到达屏障开始放行,进行下一步汇总校验"); }); ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUM); for (int phase = 1; phase <= PHASES; phase++) { System.out.println("\n ===== 开始阶段 " + phase + " =====");
for (int i = 0; i < THREAD_NUM; i++) { final int threadId = i + 1; int finalPhase = phase; executor.submit(() -> { processDataChunk(threadId, finalPhase); awaitBarrier(barrier, threadId); }); } try { Thread.sleep(3000); } catch (InterruptedException e) { } } executor.shutdown(); }
private static void processDataChunk(int threadId, int phase) { System.out.printf("线程-%d 正在处理阶段%d的数据...%n", threadId, phase); try { Thread.sleep((long) (Math.random() * 2000)); } catch (InterruptedException e) { } }
private static void awaitBarrier(CyclicBarrier barrier, int threadId) { try { System.out.printf("线程-%d 已到达屏障,等待其他线程...%n", threadId); barrier.await(); System.out.printf("线程-%d 突破屏障,继续后续操作%n", threadId); } catch (Exception e) { e.printStackTrace(); } } }
|