Java8 并发之原子变量与ConcurrentMap

小灰灰 2022-06-12 09:29 626阅读 0赞

前言

点击查看原文 原文地址

  • 第一部分:线程(Thread)与执行体(Executors)
  • 第二部分:同步(Synchronization)与锁(Locks)
  • 第三部分:原子变量与ConcurrentMap

欢迎浏览Java 8 并发教程的第二部分.本教程致力于使用简单而易于理解的代码实例来教授你关于java8中并发编程一些知识。接下来你会学到在并发编程中使用synchronized关键字,信号来同步可变的共享变量。

Java 并发API于Java5首次加入,在后来发布的版本中不断迭代完善。本文中出现的大部分概念也适合java8以下的版本,不单单针对java8。但是本文将大量使用java8 中的 lambda表达式以及新的并发功能,如果你对lambda表达式不是很熟悉的话可以查看这个教程:Java8 教程

本文的代码示例中使用了两个帮助函数:sleep(seconds)stop(executor)

  1. public static void stop(ExecutorService executor) {
  2. try {
  3. executor.shutdown();
  4. executor.awaitTermination(60, TimeUnit.SECONDS);
  5. }
  6. catch (InterruptedException e) {
  7. System.err.println("termination interrupted");
  8. }
  9. finally {
  10. if (!executor.isTerminated()) {
  11. System.err.println("killing non-finished tasks");
  12. }
  13. executor.shutdownNow();
  14. }
  15. }
  16. public static void sleep(int seconds) {
  17. try {
  18. TimeUnit.SECONDS.sleep(seconds);
  19. } catch (InterruptedException e) {
  20. throw new IllegalStateException(e);
  21. }
  22. }

AtomicInteger

java.concurrent.atomic包里面包含了很多非常有用的进行原子操作的类。当一个操作的原子性的,我们就可以在多线程中安全的执行这个操作而不用使用synchronized关键字或者锁。

原子类内部严重依赖 比较-交换这种指令(compare-and-swap (CAS)),一种被大部分现代CPU直接支持的原子操作指令。这些指令通常比使用锁同步的指令快的多。所以当我们仅仅是想在多线程的情况下修改一个可变变量的值,强烈建议优先使用原子类,而不是锁。

下面让我们一起看一下AtomicInteger类的几个例子

  1. AtomicInteger atomicInt = new AtomicInteger(0);
  2. ExecutorService executor = Executors.newFixedThreadPool(2);
  3. IntStream.range(0, 1000)
  4. .forEach(i -> executor.submit(atomicInt::incrementAndGet));
  5. stop(executor);
  6. System.out.println(atomicInt.get()); // => 1000

上例使用AtomicInteger替代Integer使的我们可以在不用同步的情况下依然可以线程安全方式同步使一个整形变量数值递增。incrementAndGet()方法是原子操作的,所以我们可以在多线程的情况下安全的调用这个方法。

AtomicInteger 支持很多种原子操作。updateAndGet()方法接受一个Lambda表达式,在这个lambda表达式中我们可以对整形变量做各种处理。

  1. AtomicInteger atomicInt = new AtomicInteger(0);
  2. ExecutorService executor = Executors.newFixedThreadPool(2);
  3. IntStream.range(0, 1000)
  4. .forEach(i -> {
  5. Runnable task = () ->
  6. atomicInt.updateAndGet(n -> n + 2);
  7. executor.submit(task);
  8. });
  9. stop(executor);
  10. System.out.println(atomicInt.get()); // => 2000

accumulateAndGet()方法接受一个IntBinaryOperator类型的Lambda表达式。下面的例子演示了使用此方法并发的将0到1000的数相加。

  1. AtomicInteger atomicInt = new AtomicInteger(0);
  2. ExecutorService executor = Executors.newFixedThreadPool(2);
  3. IntStream.range(0, 1000)
  4. .forEach(i -> {
  5. Runnable task = () ->
  6. atomicInt.accumulateAndGet(i, (n, m) -> n + m); executor.submit(task); }); stop(executor); System.out.println(atomicInt.get()); // => 499500

其他比较有用的原子类还有 AtomicBoolean, AtomicLong, AtomicReference

LongAdder

LongAdder类可以代替AtomicLong完成并发求和的功能。

  1. ExecutorService executor = Executors.newFixedThreadPool(2);
  2. IntStream.range(0, 1000)
  3. .forEach(i -> executor.submit(adder::increment));
  4. stop(executor);
  5. System.out.println(adder.sumThenReset()); // => 1000

类似于原子数字类, LongAdder也 提供了add()increment()方法,而且也是线程安全的。不过他们之间也有所不同,原子数字类是将结果累加到一个变量中,而LongAdder为了降低线程的争用而内部维护了一个变量集合。我们可以通过调用sum()或者sumThenReset()方法来获取真实结果。

这个类在更新频率大于读取频率的多线程场景下优于原子数字类。而获取分析数据就属于这种场景,例如你要统计服务器的访问量.LongAdder类的缺点是占用内存较高,这是由于其在内存中维护了一个变量集合。

LongAccumulator

LongAccumulator 是LongAdder的一种广义版本。LongAccumulator结合LongBinaryOperator类型的Lambda表达式使用,如下例所示

  1. LongBinaryOperator op = (x, y) -> 2 * x + y;
  2. LongAccumulator accumulator = new LongAccumulator(op, 1L);
  3. ExecutorService executor = Executors.newFixedThreadPool(2);
  4. IntStream.range(0, 10)
  5. .forEach(i -> executor.submit(() -> accumulator.accumulate(i))); stop(executor); System.out.println(accumulator.getThenReset()); // => 2539

我们使用函数 2 * x + y以及初始值1创建了LongAccumlator. 每次调用accumulate(i)时,当前结果和i的值都会作为参数传入lambda表达式中。

LongAdder类似,LongAccumlator在内部也维护了一个变量的集合用来降低线程之间的争用。

ConcurrentMap

ConcurrentMap接口扩展至map接口,并且邓毅了很多非常有用的同步集合类型。Java 8 通过向这个接口添加新的方法进入了函数编程。

接下来的代码片段,我们准备使用下面这个简单的map来说明这些新方法:

  1. ConcurrentMap<String, String> map = new ConcurrentHashMap<>();
  2. map.put("foo", "bar");
  3. map.put("han", "solo");
  4. map.put("r2", "d2");
  5. map.put("c3", "p0");

forEach()方法接受一个BiConsumer类型的lambda表达式,这个表达式需要连个参数,一个是map的key,一个value。这个方法可以替代同步map中使用entries迭代的 for-each 循环。

  1. map.forEach((key, value) -> System.out.printf("%s = %s\n", key, value));

putIfAbsent()方法只有当此map中不存在使用给定key获取的value时才压入一个新值。在ConcurrentHashMap中,这个方法被实现为线程安全的,所以在多线程并发访问下不需要同步。

  1. String value = map.putIfAbsent("c3", "p1");
  2. System.out.println(value); // p0

getOrDefault()方法当map中存在给定key值的value时就返回那个value,不然就返回默认值

  1. String value = map.getOrDefault("hi", "there");
  2. System.out.println(value); // there

replaceAll()方法接受一个BiFunction类型的lambda表达式,这种表达式接受两个参数,返回一个value.
下面的代码将key值为”r2”的value替换成“d3”。

  1. map.replaceAll((key, value) -> "r2".equals(key) ? "d3" : value); System.out.println(map.get("r2")); // d3

如果你只是想改变map中某一个entry,则可以使用 compute()方法。这个方法接受两个参数,一个是用于计算的key,另一个是BiFunction类型的lambda表达式,这个表达式负责转化工作。下面的代码将key是“foo”的value转化成了value+value。

  1. map.compute("foo", (key, value) -> value + value);
  2. System.out.println(map.get("foo")); // barbar

compute()方法还存在两个变种:computeIfAbsent()computeIfPresent().

最后介绍一个方法merge(),这个方法可以将一个新值合并到map中已经存在的value上面去。这个方法接受三个参数,第一个是要合并的entry的key值,第二个是要合并的新值,第三个是一个BiFunction类型的lambda表达式,这个表达式负责合并规则。例如下面的例子将“boo”合并到了key值为“foo”的value中去了。

  1. map.merge("foo", "boo", (oldVal, newVal) -> newVal + " was " + oldVal); System.out.println(map.get("foo")); // boo was foo

ConcurrentHashMap

上面介绍的所有方法都是ConcurrentMap接口中的方法,所以所有实现了此接口的类都可以使用这些方法。现在我们介绍一个最重要的实现类ConcurrentHashMap,其中加入了很多关于并发操作的方法。

在Java 8中, 类似于平行流(parallel streams),这些方法通过ForkJoinPool.commonPool()获取了一个ForkJoinPool线程池。这个线程池根据你电脑的CPU核数来预设平行参数。例如我们的CPU是4核的,JVM就帮我预设了3平行度。

  1. System.out.println(ForkJoinPool.getCommonPoolParallelism()); // 3

我们可以通过设置来改变着平行度

  1. -Djava.util.concurrent.ForkJoinPool.common.parallelism=5

我们使用下面的这个map来演示相关的方法

  1. ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
  2. map.put("foo", "bar");
  3. map.put("han", "solo");
  4. map.put("r2", "d2");
  5. map.put("c3", "p0");

Java 8 引入了3种类型的平行操作:forEach,search,reduce.每一种操作都有四种类型,四种类型区别去参入参数的类型上:key、values、entries、key-value pair.

所有操作的第一个参数都是parallelismThreshold.这个参数表明启动平行操作的最小集合数量,例如设置为500,只有当要操作的集合里面item的数量大于等于500才启用平行操作,不然都是单线程线性执行。

ForEach

  1. map.forEach(1, (key, value) ->
  2. System.out.printf("key: %s; value: %s; thread: %s\n",
  3. key, value, Thread.currentThread().getName()));
  4. // key: r2; value: d2; thread: main
  5. // key: foo; value: bar; thread: ForkJoinPool.commonPool-worker-1
  6. // key: han; value: solo; thread: ForkJoinPool.commonPool-worker-2
  7. // key: c3; value: p0; thread: main

下面的例子演示了search()方法接受一个BiFunction类型的lambda表达式,如果有符合条件的值则返回这个值,不然则返回null.

  1. String result = map.search(1, (key, value) -> { System.out.println(Thread.currentThread().getName()); if ("foo".equals(key)) { return value; } return null; }); System.out.println("Result: " + result); // ForkJoinPool.commonPool-worker-2 // main // ForkJoinPool.commonPool-worker-3 // Result: bar

值的注意的是:当首次达到条件,后面就不会再继续搜索了,而且ConcurrentHashMap是无序的,所以如果一个map中存在好几个符合搜索条件的值,返回结果不一定是第一个。

Reduce

reduce()方法接受两个BiFunction类型的lambda表达式作为参数,下面的例子演示了:第一个lambda表达式将没一个key-value 对连接起来成为一个值,第二个表达式又将第一个表达式连接起来的值再连接到一起。

  1. String result = map.reduce(1,
  2. (key, value) -> {
  3. System.out.println("Transform: " + Thread.currentThread().getName());
  4. return key + "=" + value;
  5. },
  6. (s1, s2) -> {
  7. System.out.println("Reduce: " + Thread.currentThread().getName());
  8. return s1 + ", " + s2;
  9. });
  10. System.out.println("Result: " + result);
  11. // Transform: ForkJoinPool.commonPool-worker-2
  12. // Transform: main
  13. // Transform: ForkJoinPool.commonPool-worker-3
  14. // Reduce: ForkJoinPool.commonPool-worker-3
  15. // Transform: main
  16. // Reduce: main
  17. // Reduce: main
  18. // Result: r2=d2, c3=p0, han=solo, foo=bar

教程的代码托管在GitHub上地址

发表评论

表情:
评论列表 (有 0 条评论,626人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Java并发原子

    一、原子类简介 1 什么是原子类 Java中提供了一些原子类,原子类包装了一个变量,并且提供了一系列对变量进行原子性操作的方法。原子性的意思是对于一组操作,要么全部