同步屏障CyclicBarrier

CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。它的功能比CountDownLatch 更加复杂且强大

CyclicBarrier 可以理解为循环栅栏,栅栏就是一种障碍物,这里就是用来阻止线程继续执行要求线程在栅栏处等待, Cyclic意为循环,也就是说这个计数器可以重复使用,比如,我们将技术器设置为10,那么凑齐第一批10个线程后,计数器就会归0,然后接着凑齐下一批10个线程,这就是循环栅栏的内在含义.

比countDownLatch 略微强大一些,CyclicBarrier可以接收一个参数作为barrierAction, barrierAction 就是当计数器一次计数完成后系统会执行的动作,所以有如下的构造函数

1
public CyclicBarrier(int number,Runnable barrierAction)

简介

CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。当都到达await()的时候,屏障自动打开,示例代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class CyclicBarrierTest
{
static CyclicBarrier c =new CyclicBarrier(2);
public static void main(String[] args)
{
new Thread(new Runnable(){
@Override
public void run()
{
try {c.await();}
catch (Exception e) {}
System.out.print(1);
}
}).start();
try{
c.await();
}catch (Exception e){}
System.out.print(2);
}
}

因为主线程,子线程的调度是由CPU决定的,两个线程都有可能先执行,所以会产生两种输出

1 2 或者 2 1

如果把new CyclicBarrier(2)改成new CyclicBarrier(3) 则主线程和子线程都会永远等待,因为没有第三个线程执行await方法,即没有第三个线程到达屏障,所以之前到达的两个线程都不会继续运行.

CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties,Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public class CyclicBarrierDemo {
public static class Soldier implements Runnable
{
private String soldier;
private final CyclicBarrier cyclic;

Soldier(CyclicBarrier cyclic, String soldiername)
{
this.cyclic=cyclic;
this.soldier =soldiername;
}

public void run()
{
try {
// 每一个都要等待,直到所有的都完成.
cyclic.await();
dowork();
cyclic.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}

}

void dowork() //模拟做任务
{
try {
Thread.sleep(Math.abs(new Random().nextInt()%10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println( soldier+"任务完成");
}
}

public static class BarrierRun implements Runnable
{
boolean flag;
int N;
public BarrierRun(boolean flag,int N)
{
this.flag=flag;
this.N =N;
}
public void run()
{
if(flag)
{
System.out.println(N+"任务完成");
}
else
{
System.out.println(N+"集合完毕");
flag=true;
}
}
}

public static void main(String[] args) throws Exception
{
final int N=10;
Thread[] threads =new Thread[N];
boolean flag = false;
CyclicBarrier cyclicBarrier =new CyclicBarrier(N,new BarrierRun(flag,N));// 创建了一个屏障
System.out.println("集合队伍");
for(int i=0;i<N;i++)
{
System.out.println(i+"报道");
threads[i]=new Thread(new Soldier(cyclicBarrier," "+i));//每一个线程都传进去屏障.
threads[i].start();
}
}
}

CyclicBarrier的应用场景

CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景。例如,用一个Excel保存了用户所有银行流水,每个Sheet保存一个账户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel 的日均银行流水.

CyclicBarrier 实现

CyclicBarrier实现主要基于ReentrantLock

CyclicBarrier和CountDownLatch 的区别

Countdownlatch 是在多个线程都进行了latch.countDown后才会触发事件,唤醒await在latch上的线程,而执行countDown的线程,执行完countDown后会继续自己的工作。

CyclicBarrier是一个栅栏,用于同步所有调用await方法的线程,并且等所有线程都到了await方法时,这些线程才一起返回继续各自的工作。

CountDownLatch简单的说就是一个线程等待,直到他所等待的其他线程都执行完成并且调用countDown()方法发出通知后,当前线程才可以继续执行。cyclicBarrier是所有线程都进行等待,直到所有线程都准备好进入await()方法之后,所有线程同时开始执行.

CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以用reset()方法重置。所以CyclicBarrier能处理更为复杂的业务场景。例如,如果计算发生错误,可以重置计数器,并让线程重新执行一次。
CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken()方法用来了解阻塞的线程是否被中断。

还有很重要的一个区别是 闭锁用于等待事件,而栅栏常用语等待其他线程.