农场主的黑科技.

并发操作合集-6.并发容器:ConcurrentHashMap

字数统计: 2k阅读时长: 12 min
2018/11/02 Share

🍤 并发操作合集系列 目录

🍕 并发操作合集系列 源代码

Java中提供了大量的并发容器,它们不同于并行容器(Vector,HashMap)。并发容器在确保线程安全的同时又具备较高的性能。

本章会介绍并发容器中最常用的ConcurrentHashMap。

ConcurrentHashMap简介

ConcurrentMap接口继承自Map接口,实现了高吞吐量的同时线程安全。ConcurrentHashMap是ConcurrentMap的实现。基本的使用方式和Map相似,我们在这里涉及一些原理,以便更好地理解ConcurrentHashMap为何能实现高吞吐量。之后我们会看一下JDK1.8中新添加的方法。

ConcurrentHashMap原理

整个 ConcurrentHashMap 由一个个 Segment 组成,Segment 代表”部分“或”一段“的意思,所以很多地方都会将其描述为分段锁。

简单理解就是,ConcurrentHashMap 是一个 Segment 数组,Segment 通过继承 ReentrantLock 来进行加锁,所以每次需要加锁的操作锁住的是一个 segment,这样只要保证每个 Segment 是线程安全的,也就实现了全局的线程安全。

concurrencyLevel:并行级别。默认是 16,也就是说 ConcurrentHashMap 有 16 个 Segments。这个时候,最多可以同时支持 16 个线程并发写,只要它们的操作分别分布在不同的 Segment 上。这个值可以在初始化的时候设置为其他值。

关于它的初始化和get,put的实现原理,可以参考文末的链接。

ConcurrentMap新方法

在下面的代码中,我们使用这个映射示例来展示那些JDK1.8提供新的方法:

1
2
3
4
5
ConcurrentMap<String, String> map = new ConcurrentHashMap<>();
map.put("foo", "bar");
map.put("han", "solo");
map.put("r2", "d2");
map.put("c3", "p0");

forEach()方法接受类型为BiConsumer的lambda表达式,以映射的键和值作为参数传递。它可以作为for-each循环的替代,来遍历并发映射中的元素。迭代在当前线程上串行执行。

1
map.forEach((key, value) -> System.out.printf("%s = %s\n", key, value));

新方法putIfAbsent()只在提供的键不存在时,将新的值添加到映射中。至少在ConcurrentHashMap的实现中,这一方法像put()一样是线程安全的,所以你在不同线程中并发访问映射时,不需要任何同步机制。

1
2
3
4
5
6
7
String value = map.putIfAbsent("c3","p1");
System.out.println(value); //输出p0
System.out.println(map.get("c3")); //返回的仍是原来的p0

String value2 = map.putIfAbsent("c4","p1");
System.out.println(value2); //输出null
System.out.println(map.get("c4")); //返回新值p1

无论隐射成功还是失败,putIfAbsent()都会返回该方法被调前的值。

getOrDefault()方法返回指定键的值。在传入的键不存在时,会返回默认值:

1
2
String value = map.getOrDefault("hi", "there");
System.out.println(value); // there

replaceAll()接受类型为BiFunction的lambda表达式。BiFunction接受两个参数并返回一个值。函数在这里以每个元素的键和值调用,并返回要映射到当前键的新值。

1
2
map.replaceAll((key, value) -> "r2".equals(key) ? "d3" : value);
System.out.println(map.get("r2")); // d3

compute()允许我们转换单个元素,而不是替换映射中的所有值。这个方法接受需要处理的键,和用于指定值的转换的BiFunction

1
2
map.compute("foo", (key, value) -> value + value);
System.out.println(map.get("foo")); // barbar

除了compute()之外还有两个变体:computeIfAbsent()computeIfPresent()。这些方法的函数式参数只在键不存在或存在时被调用。

最后,merge()方法可以用于以映射中的现有值来统一新的值。这个方法接受键、需要并入现有元素的新值,以及指定两个值的合并行为的BiFunction

1
2
3
4
map.merge("foo",    //键
"newVal", //想要并入的新值 “newVal”
(oldVal, newVal) -> newVal + " was " + oldVal);//oldVal是原来的值“bar”,newVal是新值 “newVal”
System.out.println(map.get("foo")); //new was bar

ConcurrentHashMap新方法

所有这些方法都是ConcurrentMap接口的一部分,因此可在所有该接口的实现上调用。此外,最重要的实现ConcurrentHashMap使用了一些新的方法来改进,便于在映射上执行并行操作。

就像并行流那样,这些方法使用特定的ForkJoinPool,由Java8中的ForkJoinPool.commonPool()提供。该池使用了取决于可用核心数量的预置并行机制。我的电脑有四个核心可用,这会使并行性的结果为3:

1
System.out.println(ForkJoinPool.getCommonPoolParallelism());  // 3

这个值可以通过设置下列JVM参数来增减:

1
-Djava.util.concurrent.ForkJoinPool.common.parallelism=5

Java8引入了三种类型的并行操作:forEachsearchreduce。这些操作中每个都以四种形式提供,接受以键、值、元素或键值对为参数的函数。

所有这些方法的第一个参数是通用的parallelismThreshold。这一阈值表示操作并行执行时的最小集合大小。例如,如果你传入阈值500,而映射的实际大小是499,那么操作就会在单线程上串行执行。在下一个例子中,我们使用阈值1,始终强制3个线程并行执行来展示。

forEach

forEach()方法可以并行迭代映射中的键值对。BiConsumer以当前迭代元素的键和值调用。为了将并行执行可视化,我们向控制台打印了当前线程的名称。要注意在我这里底层的ForkJoinPool最多使用三个线程。

1
2
3
4
5
6
7
map.forEach(1, (key, value) ->
System.out.printf("key: %s; value: %s; thread: %s\n",
key, value, Thread.currentThread().getName()));
// key: r2; value: d2; thread: main
// key: foo; value: bar; thread: ForkJoinPool.commonPool-worker-1
// key: han; value: solo; thread: ForkJoinPool.commonPool-worker-2
// key: c3; value: p0; thread: main

search()方法接受BiFunction并为当前的键值对返回一个非空的搜索结果,或者在当前迭代不匹配任何搜索条件时返回null。只要返回了非空的结果,就不会往下搜索了。要记住ConcurrentHashMap是无序的。搜索函数应该不依赖于映射实际的处理顺序。如果映射的多个元素都满足指定搜索函数,结果是非确定的。

1
2
3
4
5
6
7
8
9
10
11
12
String result = map.search(1, (key, value) -> {
System.out.println(Thread.currentThread().getName());
if ("foo".equals(key)) {
return value;
}
return null;
});
System.out.println("Result: " + result);
// ForkJoinPool.commonPool-worker-2
// main
// ForkJoinPool.commonPool-worker-3
// Result: bar

下面是另一个例子,仅仅搜索映射中的值:

1
2
3
4
5
6
7
8
9
10
11
12
13
String result = map.searchValues(1, value -> {
System.out.println(Thread.currentThread().getName());
if (value.length() > 3) {
return value;
}
return null;
});
System.out.println("Result: " + result);
// ForkJoinPool.commonPool-worker-2
// main
// main
// ForkJoinPool.commonPool-worker-1
// Result: solo

reduce

reduce()方法接受两个BiFunction类型的lambda表达式。第一个函数将每个键值对转换为任意类型的单一值。第二个函数将所有这些转换后的值组合为单一结果,并忽略所有可能的null值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
String result = map.reduce(1,
(key, value) -> {
System.out.println("Transform: " + Thread.currentThread().getName());
return key + "=" + value;
},
(s1, s2) -> {
System.out.println("Reduce: " + Thread.currentThread().getName());
return s1 + ", " + s2;
});
System.out.println("Result: " + result);
// Transform: ForkJoinPool.commonPool-worker-2
// Transform: main
// Transform: ForkJoinPool.commonPool-worker-3
// Reduce: ForkJoinPool.commonPool-worker-3
// Transform: main
// Reduce: main
// Reduce: main
// Result: r2=d2, c3=p0, han=solo, foo=bar

Reference

Java 8 Concurrency Tutorial: Synchronization and Locks 译者:飞龙

Java7/8 中的 HashMap 和 ConcurrentHashMap 全解析

CATALOG
  1. 1. ConcurrentHashMap简介
  2. 2. ConcurrentHashMap原理
  3. 3. ConcurrentMap新方法
  4. 4. ConcurrentHashMap新方法
    1. 4.1. search
    2. 4.2. reduce
  5. 5. Reference