控制并发线程数的Semaphone

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以
保证合理的使用公共资源。

Semaphore中管理着一组虚拟的许可,许可的初始数量可以通过构造函数来指定,在执行操作时可以首先获得许可(只要还有剩余的许可),并在使用后释放许可,如果没有许可,acquire将阻塞直到有许可,release将返回一个许可给信号量.

应用场景

Semaphore可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。假
如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程
并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这
时我们必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,就可以使用Semaphore来做流量控制,如下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class SemaphoneTest()
{
private static final int THREAD_COUNT = 30;
private static ExecutorServicethreadPool = Executors.newFixedThreadPool(THREAD_COUNT);
private static Semaphone s =new Semaphone(10);
public static void main(String[] args)
{
for(int i=0;i<THREAD_COUNT;i++)
{
threadPool.execute(new Runnable()
{
public void run()
{
try {
s.acquire();
System.out.println("save data");
s.release;
} catch (Exception e) {
}
}
});
}
threadPool.shutdown();
}
}

在代码中,虽然有30个线程在执行,但是只允许10个并发执行。Semaphore的构造方法Semaphore(int permits)接受一个整型的数字,表示可用的许可证数量。Semaphore(10)表示允许10个线程获取许可证,也就是最大并发数是10。Semaphore的用法也很简单,首先线程使用Semaphore的acquire()方法获取一个许可证,使用完之后调用release()方法归还许可证。还可以用tryAcquire()方法尝试获取许可证。

其他方法

  • intavailablePermits():返回此信号量中当前可用的许可证数。
  • intgetQueueLength():返回正在等待获取许可证的线程数。
  • booleanhasQueuedThreads():是否有线程正在等待获取许可证。
  • void reducePermits(int reduction):减少reduction个许可证,是个protected方法。
  • Collection getQueuedThreads():返回所有等待获取许可证的线程集合,是个protected方法。

线程间交换数据的Exchanger

Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交
换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过
exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。

应用场景

Exchanger可以用于遗传算法,遗传算法里需要选出两个人作为交配对象,这时候会交换两人的数据,并使用交叉规则得出2个交配结果。Exchanger也可以用于校对工作,比如我们需
要将纸制银行流水通过人工的方式录入成电子银行流水,为了避免错误,采用AB岗两人进行
录入,录入到Excel之后,系统需要加载这两个Excel,并对两个Excel数据进行校对,看看是否
录入一致,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class ExchangerTest()
{
private static final Exchanger<String>exgr = new Exchanger<String>();
private static ExecutorServicethreadPool = Executors.newFixedThreadPool(2);

public static void main(String[] args)
{
threadPool.execute(new Runnable()
{
public void run()
{
try {
String A ="bank A" //A录入数据
exgr.exchange(A);
} catch (Exception e) {}
}
});

threadPool.execute(new Runnable()
{
public void run()
{
try {
String B ="bank B" //B录入数据
Stirng A = exgr.exchange(B);
System.out.println("AB是否一致"+A.equals(B)
+"A录得是"+A+"B录得是"+B)
} catch (Exception e) {}
}
});
threadPool.shutdown();
}
}

如果两个线程有一个没有执行exchange()方法,则会一直等待,如果担心有特殊情况发
生,避免一直等待,可以使用exchange(V x,longtimeout,TimeUnit unit)设置最大等待时长。

参考文献:Java并发编程的艺术

ThreadLocal 的使用

ThreadLocal,即一个以ThreadLocal 对象为键,任意对象为值的存储结构,这个结构被附带在线程上,也就是说一个线程可以根据一个ThreadLocal对象查询到绑定在这个线程上的一个值。

可以通过set()方法设置一个值,在当前线程下再通过get()方法获取到原先设置的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class ThreadLocalDemo {
static ThreadLocal<SimpleDateFormat> t1 =new ThreadLocal<>();//声明一个ThreadLocal变量.
public static class ParseDate implements Runnable
{
int i=0;
Date date =new Date();
public ParseDate(int i)
{
this.i=i;
}
public void run()
{
if(t1.get()==null)
{ //设置值
t1.set(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
}
try { //取值
Date t= t1.get().parse("2015-03-29 19:29:"+i%60);
System.out.println(i+" : "+t);
} catch (ParseException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args)
{
ExecutorService es = Executors.newFixedThreadPool(10);
for(int i=0;i<1000;i++)
{
es.execute(new ParseDate(i));
}
}
}