本章中讲介绍在Java5和6中引入的一些构建并发模块,以及如何用这些模块来进行开发。
同步容器类
同步容器类包括Vector
和Hashtable
、这些类实现线程安全的方式是:将它们的状态封装起来,并对每个公有方法都进行同步,使得每次只有一个线程能访问容器的状态。
同步容器类的问题
常见的复合操作包括:
- 迭代(反复访问元素,知道遍历容器中所有元素)
- 跳转(根据指定顺序找到当前元素的下个元素)
- 条件运算:例如“若没有则添加”。在同步容器类中,这些复合操作在没有客户端加锁的情况下仍然是线程安全的,但当其他线程并发地修改容器时,它们可能会表现出意料之外的行为。
😕
1 | // 5-1 Vector上可能导致混乱结果的复合操作 |
上面是Vector中定义的两个方法,它们都会执行“先检查再运行”操作,每个方法首先都获得数组的大小,然后通过结果来获取或删除最后一个元素。
如果线程A在包含10个元素的Vector上调用removeLast
,同时线程B调用getLast
,这个时候就会出现下面的情况,并抛出ArrayIndexOutOfBoundsException
。
😊 把上面的程序改成客户端加锁
,通过list自身的锁来保护他,使getLast
和deleteLast
成为原子操作
1 | // 5-2 在使用客户端加锁的Vector上的复合操作 |
下面是带有客户端加锁的迭代的例子
1 | // 5-4 带有客户端加锁的迭代 |
迭代器与ConcurrentModificationException
不管是foreach还是iterator,迭代器没有考虑到并发修改的问题,它们表现出的行为是“及时失败(fail-fast)”,这意味着当它们发现容器在迭代过程中被修改时,就会抛出一个Concurrentmodificationexception
(并发修改异常)。从而起到一个警告的作用。
但许多场合我们都不希望像5-4那样通过上锁来保护迭代。因为迭代的时间可能会很长,会降低整体的吞吐量。如果不希望在迭代期间对容器加锁,一种替代方法就是“克隆”容器,并在副本上进行迭代。
隐藏迭代器
在某些情况下,迭代器会隐藏起来。如下面的程序所示
😧
1 | // 5-6 隐藏在字符串连接中的迭代操作(不要这么做) |
上面的代码中,那个部分存在隐藏迭代呢?"DEBUG: added ten elements to " + set
的字符串拼接就是隐藏迭代。编译器将字符串的连接操作转换为调用StringBuilder.append(Object)
,而这个方法又会调用容器的toString
方法,标准容器的toString
方法将迭代容器,并在每个元素上调用toString
来生成容器内容的格式化表示。
之所以说是不安全的,是由于在调用toString
的途中,list可能会发生变化,此时就会出现ConcurrentModificationException
的异常了。所有有必要在System.out.println()
时用锁保护。此外,容器类的hashCode
和equals
,以及containsAll
,removeAll
和retainAll
等方法,以及把容器作为参数的构造函数,都会对容器进行迭代。
并发容器
Java5.0提供了多种并发容器来改进同步容器的性能。
之前的同步容器
是将所有的容器状态的访问都串行化(各个任务按顺序执行,完成一个之后才能进行下一个),以实现它们的线程安全性。这种方法的代价是严重降低并发性,当多个线程竞争容器的锁时,吞吐量将严重减低。
相比之下并发容器
是针对多个线程并发访问设计的。Java5.0中新增的有
ConcurrentHashMap
,代替HashTable
,synchronizedMap
。CopyOnWriteArrayList
,用在遍历操作为主要操作的情况下代替Vector
,synchronizedList
。Queue
: 也就是FIFO容器,通常用LinkedList
实现。它另外提供了几种实现,包括 :ConcurrentLinkedQueue
: 传统的先进先出队列PriorityQueue
:一个(非并发)优先队列
BlockingQueue
:Queue
的扩展。增加了可阻塞的插入和获取等操作。如果队列为空,那么获取元素的操作将一直阻塞,直到队列中出现一个可用的元素。
如果队列已满(对于有界队列),插入元素操作将阻塞,直到队列中出现可用的空间。
在“生产者-消费者”这种设计模式中,阻塞队列是十分有用的。
ConcurrentHashMap
HashMap
是基于散列的链表。当我们使用HashMap.get
或set
时,会通过散列值特定一个链表,并在该链表的许多元素上调用equals
。当散列值的分布不均匀时,这种遍历调用euqals
开销是很大的。如果是同步容器的HashTable
,这段时间内而其他线程在期间不能访问该容器,会严重影响性能。
ConcurrentHashMap
也是几个基于散列的Map,但它使用了一种完全不同的加锁策略来提供更高的并发性和伸缩性。这种机制被称为分段锁
,一种粒度更细(finer-grained)的加锁机制。在这种机制中,任意数量的线程可以并发地访问Map,执行读取操作的线程和执行写入操作的线程可以并发地访问Map,并且一定数量的写入线程可以并发地修改Map。 因此并发访问环境下将实现更高的吞吐量。
另外,ConcurrentHashMap
等并发容器提供的迭代器不会抛出ConcurrentModificationException
,因此不需要在迭代过程中对容器加锁。实现原理是,让返回的迭代器具有弱一致性
,而非及时失败
。弱一致性的迭代器可以容忍并发的修改,当创建迭代器时会遍历已有额元素,并可以(但不保证)在迭代器被构造后将修改操作反映给容器。
考虑到并发环境下的使用,ConcurrentHashMap
中的size
,isEmpty
返回的并不是精确值,只是一个估值。因为这个值在并发环境下很可能已经是过期的。但也因此提升了其他操作的性能,包括get
,put
,containKey
和remove
等。
在ConcurrentHashMap
中没有实现对Map加锁以提供独占访问。
额外的原子Map操作
上一章中,我们了解了Vector
的“若没有则添加”,而 ConcurrentHashMap
本身就提供了“若没有则添加”,“若相等则移除”,“若相等则替换”等,能原子操作的接口:
1 | // 5-7 ConcurrentMap接口 |
CopyOnWriteArrayList
CopyOnWriteArrayList
用于替代同步List,在某些情况下它提供了更好的并发性能,并且在迭代期间不需要对容器进行加锁或复制(CopyOnWriteArraySet
的作用是替代同步Set
)。
“写入时复制(Copy-On-Write)”容器的线程安全性在于,只要正确发布一个事实不可变的对象,那么在访问该对象时就不再需要进一步的同步。 什么意思呢?换句话说,就是他在每次修改时,会创建并重新发布一个新的容器副本,从而实现可变性。 “写入时复刻”容器的迭代器保留一个指向底层基础数组的引用,这个数组当前位于迭代器的起始位置,由于它不会被修改,因此只需确保数组内容的可见性。
所以,多个线程可以同时这个容器进行迭代,而不会彼此干扰或与修改容器的线程互相干扰。也因此不会抛出ConcurrentModificationException
。
这么做的缺点也很明显,每当修改容器时都会复制底层数组,这需要一定开销,特别是容器规模较大时。仅当迭代操作远远多于修改操作时,才应该使用“写入时复制”容器。
阻塞队列和生产者-消费者模式
阻塞队列提供了可阻塞的put
和take
方法,以及支持定时的offer
和poll
方法。如果队列已经满了,put
方法将阻塞直到空间可用;如果队列为空,那么take
方法将会阻塞直到有元素可用。队列可以有界也可以无界,无界队列put永远不会阻塞。阻塞队列还提供了一个offer方法,如果数据项不能被添加到队列中,那么将返回一个失败状态。
BlockingQueue
中支持任意数量的生产者和消费者,常见的使用方法是线程池与工作队列的组合。
在类库中还包含了BlockingQueue的多种实现,其中:
LinkedBlockingQueue
: FIFO。与LinkedList类似ArrayBlockingQueue
:FIFO。与ArrayList类似PriorityBlockingQueue
: 一个按优先级排序的队列。想按照某种顺序而不是FIFO来处理元素时使用。SynchronousQueue
: 特殊,它并不是一个真正的队列,因为它不会为队列中元素维护存储空间。它维护的是一组线程,这些线程在等待着把元素加入或移出队列。
如果以洗盘子为比喻,就相当于没有盘架来暂时存放洗好的盘子,而是将洗好的盘子直接放入下一个空闲的烘干机中。这种实现队列由于可以直接交付工作,从而降低了将数据从生产者移动到消费者的延迟。
因为SynchronousQueue
没有存储功能,因此put和take会一直阻塞,知道有另一个线程已经准备好参与到交付过程中。仅当有足够的消费者,并且总是有一个消费者准备好获取交付的工作时,才适合使用同步队列。
示例:桌面搜索
下面来看一个用BlockingQueue
实现的生产者-消费者问题的示例。假如我们有这样一个需求,扫描本地驱动上的文件并建立索引以便随后进行搜索。
下面的FileCrawler中给出了一个生产者任务,即在某个文件层次结构中搜索符合索引标准的文件,并将它们的名称放入工作队列。 其中的crawl扫描文件并将符合fileFilter条件的文件放入数组中。
1 | // 5-8 桌面搜索应用程序中的生产者任务 |
Indexer中还给出了一个消费者任务,即从队列中取出文件名并对它们建立索引。
1 | // 5-8 桌面搜索应用中消费者任务 |
客户端调用
1 | // 5-9 启动桌面搜索 |
上面的代码启动了多个爬虫程序和索引建立程序,每个程序都在各自的线程中运行。 但这个消费者线程永远不会退出,因而程序无法停止,这将在后面的课程解决这个问题。
串行线程封闭
生产者-消费者模式就是通过阻塞队列实现的串行线程封闭。线程封闭对象只能由单个线程拥有,但可以通过安全地发布该对象来“转移”所有权。在转移所有权后,也只有另一个线程能获得这个对象的访问权限,并且发布对象的线程不会再访问它。
对象池也是利用了串行线程封闭,将对象“租借”给一个请求线程。安全地发布池中的对象,并且只要客户代码本身不会发布池中的对象,或者在将对象返回给对象池后就不再使用它,那么就可以安全地在线程之间传递所有权。
双端队列与工作密取
Java6中新增的两种容器类型Deque
和BlockingDeque
,分别对Queue
和BlockingQueue
进行了扩展。Deque
是一个双端队列,实现了在队列头和队列尾的高效插入和移除。具体实现包括ArrayDeque
和LinkedBlockingDeque
。
阻塞队列适用与生产者-消费者模式,而双端队列适用与工作密取。什么是工作密取?在生产者-消费者模式中,所有消费者有一个共享的工作队列,而在工作密取设计中,每个消费者都有各自的双端队列。如果一个消费者完成了自己双端队列中的全部工作,那么它可以从其他消费者双端队列末尾秘密地获取工作。
密取工作模式比传统的生产者-消费者模式具有更高的可伸缩性,这是因为工作者线程不会在单个共享的任务队列上发生竞争。因为在大多数情况下,它们都只是访问自己的双端队列。当工作者线程需要访问另一个队列时,它会从队列的尾部而不是头部获取工作,因此进一步降低了队列上的竞争程度。并且能确保每个线程都处于忙碌状态。
阻塞方法与中断方法
线程可能会阻塞或暂停执行,原因有多种:等待I/O操作结束,等待获得一个锁,等待从Thread.sleep方法中醒来,或是等待另一个线程的计算结果。
当线程阻塞时,它通常被挂起,并处于某个阻塞状态(BLOCKED,WAITING,或TIMED_WAITING)。被阻塞的线程必须等待某个不受它控制的事件发生后才能继续执行,例如等到I/O操作,等待某个锁变成可用,或者等待外部计算的结束。当某个外部事件发生时,线程被置回RUNNABLE状态,并可以再次被调度执行。
BlockingQueue
的put
和take
等方法会抛出受检查异常InterruptedException
。InterruptedException
代表什么?如果某个方法会抛出InterruptedException
,它是在告诉你,如果执行该方法的线程被中断,它将尝试停止它正在做的事情而提前返回,并通过抛出 InterruptedException
表明它提前返回。
Thread
提供了interrupt
方法,用于中断线程或者查询线程是否已经被中断,每个线程都有一个布尔类型的属性,表示线程的中断状态,当中断线程时将设置这个标志。什么是中断?中断并不是强制另一个线程停下来,只是请求它在适当的地方停下来,就如linux中的kill
指令,和kill 9
。
当像BlockingQueue
的方法抛出了InterruptedException
时,你就必须对中断做出响应。此时有两种策略:
- 传递
InterruptedException
。 - 恢复中断。当代码是
Runnable
的一部分时,肯定就不能继续抛异常了。必须捕获它,并且通过调用interrupt
来恢复中断状态。这样在调用栈中更高层的代码将看到引发了一个中断。
在出现InterruptedException
时不应该捕获它却不做出任何响应。这将使调用栈上更高层代码无法对中断采取处理措施,因为线程被中断的证据已经丢失。
同步工具类
同步工具类可以是任何一个对象,只要它根据自身的状态来协调线程的控制流。阻塞队列可以作为同步工具类,其他类型的同步工具类还包括信号量
,栅栏
,以及闭锁
。
所有的同步工具类都包含一些特定的结构化属性:
- 一些状态,决定是继续执行还是等待。
- 对这些状态进行操作的方法
- 用于高效地等待同步工具类进入到预期状态的方法
闭锁
闭锁是一种同步工具类,可以延迟线程的进度直到其达到终止状态。什么意思呢?闭锁的作用相当于一扇门:在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。
闭锁用来确保一些活动直到其他活动都完成后继续执行。例如:
- 确保某个计算在其需要的所有资源都被初始化之后才继续执行。
- 确保某个服务在其依赖的所有其他服务都已经启动之后才启动。
- 等到直到某个操作的所有参与者(例如所有玩家)都就绪再继续执行。
CountDownLatch
可以用于上面的各种情况。可以使一个或多个线程在等待一组事件发生。其中包含
- 计数器属性。该计数器被初始化为一个正数,表示需要等待的事件数量。
countDown
方法。递减计数器,表示有一个事件已经发生了await
方法。等待计数器达到零,这表示所有需要等待的事件都已经发生。
我们来看一下闭锁的两种常见用法。通过两个闭锁来测nThreads
个线程执行任务的时间。
1 | // 5-11 在计时测试中使用CountDownLatch来启动和停止线程 |
在同一时刻开启多个线程,随后进行阻塞当前线程直到所有线程执行结束。
Future Task
FutureTask
也可以用作闭锁,本身就是异步执行。FutureTask
是通过Callable
来实现的,相当于一种可生成结果的Runnable
。FutureTask
是Future
接口的一个唯一实现类。
可以处理以下3种状态:
- 等待状态(Waiting to run)
- 正在运行(Running)
- 运行完成(Completed)
当FutureTask
进入完成状态后,它会永远停止在“执行完成”的状态上。这个状态有可能是正常结束,由于取消而结束和由于异常而结束等。
Future.get
的行为取决于任务的状态,如果任务已经完成,那么get会立即返回结果,否则get将阻塞直到任务进入完成状态,然后返回结果或者抛出异常。FutrueTask
会将计算结果从执行计算的线程传递到获取这个结果的线程。
常用的方式是在使用结果之前,提前进行调用,从而及时拿到异步调用返回的结果。
1 | // 5-12 使用FutureTask来提前加载稍后需要的数据 |
信号量
计数信号量(Counting Semaphore)
是用来控制同时访问某个特定资源的操作数量,或者同时执行某个执行操作的数量。
Semaphore中管理着一组虚拟的许可(permit
),许可的初始数量可通过构造函数来指定。在执行操作时可以首先获得许可(只要还有剩余的许可),并且使用以后释放许可。如果没有许可,那么acquire
方法将阻塞直到有许可(或者直到被中断或请求超时)。release
方法将返回一个许可给信号量。
Semaphore可以用于实现资源池,获取资源时使用acquire
方法获取permit
,使用完毕后用release
释放permit
。
另外,你还可以使用Semaphore将任何一种容器变成有界阻塞容器。 如下所示:
1 | // 5-14 使用Semaphore为容器设置边界 |
栅栏
之前介绍的闭锁是一次性对象,一旦进入终止状态,就不能被重置。
栅栏类似于闭锁,它能阻塞一组线程直到某个事件发生。 栅栏与闭锁的区别在于,所有线程必须同事到达栅栏位置,才能继续执行,另外,闭锁是一次性的,而栅栏可以反复使用。闭锁用于等待事件,而栅栏用于等待其他线程。栅栏用于实现一些协议,例如几个家庭决定在某个地方集合:“所有人6:00在麦当劳碰头,到了以后要等其他人,之后再讨论下一步做的事情”。
CyclicBarrier
可以使一定数量的参与方反复地在栅栏位置汇集,它在并行迭代算法中非常有用:这种算法通常将一个问题拆分成一系列相互独立的子问题。当线程到达栅栏位置时将调用await
方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达了栅栏位置,那么栅栏将打开,此时所有线程都将释放,而栅栏将被重置、以便下次使用。
如果对
await
的调用超时,或者await
阻塞的线程被中断,那么栅栏就被认为是打破了,所有阻塞的await
调用都将终止并抛出BrokenBarrierException
。如果成功地通过栅栏,那么
await
将为每个线程返回一个唯一的达到索引号,我们可以利用这些索引来“选举”产生一个领导线程,并在下一次迭代中由该领导线程执行一些特殊的工作。
CyclicBarrier
还可以使你将一个栅栏操作传递给构造函数,这是一个Runnable
,当成功通过栅栏时会(在一个子任务线程中)执行它,但在阻塞线程被释放之前是不能执行的。
下面的程序中通过栅栏来计算细胞的自动化模拟。将问题分解成多个子问题,为每个子问题分配一个线程求解,之后再将所有的结果合并起来。
1 | // 5-15 通过CyclicBarrier协调细胞自动衍生系统中的计算 |
另一种形式的栅栏是Exchanger
,它是一种两方(Two-Party)栅栏,各方在栅栏位置上交换数据。当两方执行不对称的操作时,Exchanger
会非常有用,例如当一个线程向缓冲区写入数据,而另一个线程从缓冲区中读取数据。这些线程可以使用Exchanger
来汇合,并将满的缓冲区与空的缓冲区交换。当两个线程通过Exchanger交换对象时,这种交换就把这两个对象安全地发布给另一方。🤔
🤔数据交换的时机取决与应用程序的响应需求 :
- 最简单的方案:当缓冲区被填满时,由填充任务进行交换 。当缓冲区为空时,由清空任务进行交换。
这样会把需要交换的次数降至最低。但如果新数据的到达率不可预测,那么一些数据的处理过程就将延迟。 - 另一个方法 : 不仅当缓冲被填满时进行交换,并且当缓冲被填充到一定程度并保持一定时间后,也进行交换。
构建高效且可伸缩的结果缓存
本节我们尝试开发一个高效且可伸缩的缓存,用于改进一个高计算开销的函数。我们首先从简单的HashMap开始,然后分析它的并发性缺陷,并讨论如何修复它们。
😕使用HashMap和同步机制
1 | // 5-16 使用HashMap和同步机制来初始化缓冲(并不好,需改进) |
在上面的程序中,我们创建一个了Computable的包装器,使用HashMap来帮助记住之前的计算结果,并将缓存过程封装起来。
但这种情况下,可能会比没有缓存操作所花的时间还要长,因为这个缓存为了实现线程安全,把缓存操作都用synchronized
锁了起来,可能会导致阻塞时间很长。
😕 利用ConcurrentHashMap代替HashMap来对上面的程序进行改进
1 | // 5-17 用ConcurrentHashMap替换HashMap(还是不好) |
由于使用的是ConcurrentHashMap,所以就不需要显示的进行同步,性能也比上面的程序更高。
但是仍存在漏洞,当两个线程同时调用compute时,可能会导致计算得到两个相同的值(而我们只需要缓存一个)。这就不满足缓存的设计理念了。 为什么会这样?因为如果某个线程启动了一个开销很大的计算,而其他线程并不知道这个计算正在计算,就会重复这个计算。
😕 用ConcurrentHashMap<A,Future<V>>
来代替原来的ConcurrentHashMap<A,V>
1 | // 5-18 基于FutureTask的Memoizing封装器(还不够好) |
之前的程序中都是首先判断计算是否已完成,而上面的程序是判断计算是否已开始。若结果已经计算出来,那么将立刻返回。如果其他线程正在计算该结果,那么信道的线程将一直等待这个结果被计算出来。
但仍然存在两个线程计算出相同值的漏洞,这个概率比Memoizer2要小。原因在于conpute方法中的if代码块是非原子的的“先检查在执行”操作,因此仍有可能在同一时间调用compute来计算相同的值。
😊 最终实现。使用ConcurrentMap中的原子方法putIfAbsent,来避免Memoizer3的漏洞。
1 | // 5-19 最终实现 |
值得注意的是,当缓存的是Future而不是值时,将导致缓存污染(Cache Pollution)问题:如果某个计算被取消或失败,那么future在计算这个结果时将指明计算过程被取消或失败。为了避免这种情况,如果Memoizer发现计算被取消,那么将Future从缓存中移除。如果检查到RuntimeException
,也会移除Future,这样将来的计算才可能成功。