常用于并发处理的集合Queues,ConcurrentHashMap,ConcurrentSkipListMap
,ConcurrentSkipListSet,CopyOnWriteArrayList,CopyOnWriteArraySet, 这些集合都比较好理解和使用,不多bb。本文介绍一些比较难接触的并发处理类
工具类
Semaphore
信号量,一个经典的并发工具,常用于限制访问资源的线程数量。
通常我们会使用synchronized和Lock锁实现资源并发访问的控制,在同一时间只允许一个线程进入临界区访问资源(读锁除外),解决多个线程并发同一资源造成的数据不一致问题。Semaphore使用场景略有不同,在一个共享的资源里,有多个(有限个)副本可以使用的情况下,如果并发的线程超过可使用的副本时,我们就可以使用Semaphore来控制资源副本访问的线程数。
实现原理
Semaphore保护一个或多个共享资源的使用,内部维护了一个计数器,即同一时间最多的可访问线程数。
一个线程要访问资源时,需要先获取Semaphore, 如果Semaphore的计数器大于1,表示可以使用该资源,Semaphore.acquire()之后,计数器减1,访问该资源,使用完后,Semaphore.release()释放信号量,计数器加1,其他被阻塞的线程会被唤醒获取信号量,访问资源。 如果计数器为0,线程进入休眠.
Sample usage
class Pool {
private static final int MAX_AVAILABLE = 100;
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
public Object getItem() throws InterruptedException {
available.acquire();
return getNextAvailableItem();
}
public void putItem(Object x) {
if (markAsUnused(x))
available.release();
}
// Not a particularly efficient data structure; just for demo
protected Object[] items = ... whatever kinds of items being managed
protected boolean[] used = new boolean[MAX_AVAILABLE];
protected synchronized Object getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (!used[i]) {
used[i] = true;
return items[i];
}
}
return null; // not reached
}
protected synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
} else
return false;
}
}
return false;
}
}
CountDownLatch
使一个线程等待其他线程完成各自的工作后再执行.例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行。
实现原理
是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。
CountDownLatch构造器
//Constructs a CountDownLatch initialized with the given count.
public void CountDownLatch(int count) {...}
构造器中的计数值(count)实际上就是闭锁需要等待的线程数量。这个值只能被设置一次,而且CountDownLatch没有提供任何机制去重新设置这个计数值。
与CountDownLatch的第一次交互是主线程等待其他线程。主线程必须在启动其他线程后立即调用CountDownLatch.await()方法,这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。
其他N个线程必须引用闭锁对象,因为他们需要通知CountDownLatch对象,他们已经完成了各自的任务。这种通知机制是通过 CountDownLatch.countDown()方法来完成的;每调用一次这个方法,在构造函数中初始化的count值就减1。所以当N个线程都调 用了这个方法,count的值等于0,然后主线程就能通过await()方法,恢复执行自己的任务。
Sample usage1
子线程等待主线程完成准备工作,子线程才执行
class Driver { // ...
void main() throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);
// create and start threads
for (int i = 0; i < N; ++i)
new Thread(new Worker(startSignal, doneSignal)).start();
doSomethingElse(); // don't let run yet
startSignal.countDown(); // let all threads proceed
doSomethingElse();
// 阻塞主线程,等待其他线程完成
doneSignal.await(); // wait for all to finish
}
}
class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
// 阻塞线程
startSignal.await();
doWork();
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... }
}
Sample usage2
主线程等待子线程完成工作,才执行
class Driver2 { // ...
void main() throws InterruptedException {
CountDownLatch doneSignal = new CountDownLatch(N);
Executor e = ...
for (int i = 0; i < N; ++i) // create and start threads
e.execute(new WorkerRunnable(doneSignal, i));
doneSignal.await(); // wait for all to finish
}
}
class WorkerRunnable implements Runnable {
private final CountDownLatch doneSignal;
private final int i;
WorkerRunnable(CountDownLatch doneSignal, int i) {
this.doneSignal = doneSignal;
this.i = i;
}
public void run() {
try {
doWork(i);
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... }
}
CountDownLatch面试题:
解释一下CountDownLatch概念?
CountDownLatch 和CyclicBarrier的不同之处?
给出一些CountDownLatch使用的例子?
CountDownLatch 类中主要的方法?
CyclicBarrier
是一个同步工具类,它允许一组线程互相等待,直到到达某个公共屏障点,与CountDownLatch不同的是该barrier在释放等待线程后可以重用。 CyclicBarrier支持一个可选的Runnable命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作很有用。
CyclicBarrier 的一些关键方法
public CyclicBarrier(int parties, Runnable barrierAction)
public CyclicBarrier(int parties)
// 返回值为当前线程的索引,0表示当前线程是最后一个到达的线程
public int await() throws InterruptedException, BrokenBarrierException
//在await()的基础上增加超时机制,如果超出指定的等待时间,则抛出 //TimeoutException 异常。如果该时间小于等于零,则此方法根本不会等待。
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
Sample Usage
每个Worker处理矩阵中的一行,在处理完所有的行之前,该线程将一直在屏障处等待。在各个Worker处理完所有行后,将执行提供的Runnable屏障操作。
public class Solver {
final int N;
final float[][] data;
final CyclicBarrier barrier;
public Solver(float[][] matrix) throws InterruptedException {
data = matrix;
N = matrix.length;
Runnable barrierAction =
() -> {
System.out.println("finish row process");
};
barrier = new CyclicBarrier(N, barrierAction);
List<Thread> threads = new ArrayList<>(N);
for (int i = 0; i < N; i++) {
Thread thread = new Thread(new Worker(i));
threads.add(thread);
thread.start();
}
// System.out.println("wait until done...");
// wait until done
for (Thread thread : threads) {
thread.join();
}
}
public static void main(String[] args) throws InterruptedException {
float[][] matrix = new float[][]{
new float[]{1, 2, 3, 4, 5},
new float[]{6, 7, 8, 9, 10},
new float[]{11, 12, 13, 14, 15},
new float[]{16, 17, 18, 19, 20},
new float[]{21, 22, 23, 24, 25},
};
Solver solver = new Solver(matrix);
}
class Worker implements Runnable {
int myRow;
boolean isDone = false;
Worker(int row) {
myRow = row;
}
public void run() {
while (!done()) {
isDone = processRow(myRow);
try {
System.out.println("barrier await:" + barrier.getNumberWaiting());
barrier.await();
} catch (InterruptedException | BrokenBarrierException ex) {
return;
}
}
}
private boolean done() {
return isDone;
}
private boolean processRow(int myRow) {
final int rows = data.length;
for (int i = 0; i < rows; i++) {
if (i == myRow) {
System.out.println("========>process myRow:" + myRow);
float[] row = data[myRow];
for (float v : row) {
System.out.println("process value:" + v);
}
return true;
}
}
return false;
}
}
}
Phaser
一个可重复使用的同步屏障,功能类似于CyclicBarrier和CountDownLatch,但支持更灵活的使用。
原理
Sample usages
Phaser可以代替CountDownLatch来构建一个服务于多个对象只执行一次的行动。这个典型的应用惯例是注册–>执行任务–>撤销,例如:
void runTasks(List<Runnable> tasks) {
final Phaser phaser = new Phaser(1); // "1" to register self
// create and start threads
for (final Runnable task : tasks) {
phaser.register();
new Thread() {
public void run() {
phaser.arriveAndAwaitAdvance(); // await all creation
task.run();
}
}.start();
}
// allow threads to start and deregister self
phaser.arriveAndDeregister();
}
可以通过重写 onAdvance 来实现让多个线程把某些任务执行固定的次数。
void startTasks(List<Runnable> tasks, final int iterations) {
final Phaser phaser = new Phaser() {
protected boolean onAdvance(int phase, int registeredParties) {
return phase >= iterations || registeredParties == 0;
}
};
phaser.register();
for (final Runnable task : tasks) {
phaser.register();
new Thread() {
public void run() {
do {
task.run();
phaser.arriveAndAwaitAdvance();
} while (!phaser.isTerminated());
}
}.start();
}
phaser.arriveAndDeregister(); // deregister self, don't wait
}
如果主任务需要等待phaser的结束,那么它可以不断地注册自己从而执行一个类似循环:
// ...
phaser.register();
while (!phaser.isTerminated())
phaser.arriveAndAwaitAdvance();
你可以等待特定的不超过Integer.MAX_VALUE的阶段数。例如:
void awaitPhase(Phaser phaser, int phase) {
int p = phaser.register(); // assumes caller not already registered
while (p < phase) {
if (phaser.isTerminated())
// ... deal with unexpected termination
else
p = phaser.arriveAndAwaitAdvance();
}
phaser.arriveAndDeregister();
}
创建一个集合的任务可以使用一个树结构的phasers,你能够使用以下的代码,假如一个Task类有一个构造函数接受一个Phaser从而进行注册。在调用build(new Task[n], 0, n, new Phaser())后,这些任务能够被执行,
比如提交给一个线程池:
void build(Task[] tasks, int lo, int hi, Phaser ph) {
if (hi - lo > TASKS_PER_PHASER) {
for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
int j = Math.min(i + TASKS_PER_PHASER, hi);
build(tasks, i, j, new Phaser(ph));
}
} else {
for (int i = lo; i < hi; ++i)
tasks[i] = new Task(ph);
// assumes new Task(ph) performs ph.register()
}
}
最好的{TASKS_PER_PHASER}依赖于期待的同步速率。很小的数字比如4适合于执行非常小的任务(所以速率高),或者成百上千适合于使用大任务。
注意:我们的这个实现将参与者的上限定为65535.尝试注册更多的参与者会导致IllegalStateException异常。当然,你可以通过使用分层Phaser的方法去满足任意数量的参与者。
Exchanger
一般用于两个工作线程之间交换数据,在一些pipeline设计上非常有用.
Exchanger有几个特征:
- 此类提供对外的操作是同步的;
- 用于成对出现的线程之间交换数据;
- 可以视作双向的同步队列;
- 可应用于基因算法、流水线设计等场景。
实现原理
当一个线程到达exchange调用点时,如果它的伙伴线程此前已经调用了此方法,那么它的伙伴会被调度唤醒并与之进行对象交换,然后各自返回。如果它的伙伴还没到达交换点,那么当前线程将会被挂起,直至伙伴线程到达,依次完成交换正常返回;或者当前线程被中断——抛出中断异常;又或者是等候超时——抛出超时异常。
Sample Usage
class FillAndEmpty {
Exchanger<DataBuffer> exchanger = new Exchanger<>();
DataBuffer initialEmptyBuffer = ... a made-up type
DataBuffer initialFullBuffer = ...
class FillingLoop implements Runnable {
public void run() {
DataBuffer currentBuffer = initialEmptyBuffer;
try {
while (currentBuffer != null) {
addToBuffer(currentBuffer);
if (currentBuffer.isFull())
currentBuffer = exchanger.exchange(currentBuffer);
}
} catch (InterruptedException ex) {
... handle ...
}
}
}
class EmptyingLoop implements Runnable {
public void run() {
DataBuffer currentBuffer = initialFullBuffer;
try {
while (currentBuffer != null) {
takeFromBuffer(currentBuffer);
if (currentBuffer.isEmpty())
currentBuffer = exchanger.exchange(currentBuffer);
}
} catch (InterruptedException ex) { ... handle ...}
}
}
void start() {
new Thread(new FillingLoop()).start();
new Thread(new EmptyingLoop()).start();
}
}