Atomic Variables and ConcurrentMap

悠悠 2022-05-14 06:38 618阅读 0赞

AtomicInteger

java.concurrent.atomic包中包含了许多可以执行原子操作的类,所谓的原子操作是指在多线程并发的情况下无需使用synchronized或者其他锁同步机制的情况下,仍旧可以正确执行的操作,我们就称之为原子操作。
在JVM内部实现上,这些类通过使用compare-and-swap (CAS)来实现原子操作,CAS是CPU内部的一种原子指令,执行速度远比通过锁同步机制快的多。所以,在一般的多线程编程中,推荐使用原子类。我们看一个简单的例子:AtomicInteger

  1. AtomicInteger atomicInt = new AtomicInteger(0);
  2. ExecutorService executor = Executors.newFixedThreadPool(2);
  3. IntStream.range(0, 1000)
  4. .forEach(i -> executor.submit(atomicInt::incrementAndGet));
  5. stop(executor);
  6. System.out.println(atomicInt.get()); // => 1000

其中, stop 方法定义如下:

  1. public static void stop(ExecutorService executor) {
  2. try {
  3. executor.shutdown();
  4. executor.awaitTermination(60, TimeUnit.SECONDS);
  5. }
  6. catch (InterruptedException e) {
  7. System.err.println("termination interrupted");
  8. }
  9. finally {
  10. if (!executor.isTerminated()) {
  11. System.err.println("killing non-finished tasks");
  12. }
  13. executor.shutdownNow();
  14. }
  15. }

注意:不要直接shutdown,更多可参考:multithreading-with-atomicinteger-not-working

AtomicInteger is thread safe, but you have called AtomicInteger#get before all tasks finished.

ExecutorService#shutdown is not waiting for tasks to finish.

See ExecutorService#shutdown docs:

This method does not wait for previously submitted tasks to complete execution. Use awaitTermination to do that.

Use

  1. service.awaitTermination(10, TimeUnit.SECONDS)

to wait for all tasks finished

AtomicInteger有很多原子操作,updateAndGet()可以接收一个lambda表达式来执行整形数值上的任意表达式运算。

  1. AtomicInteger atomicInt = new AtomicInteger(0);
  2. ExecutorService executor = Executors.newFixedThreadPool(2);
  3. IntStream.range(0, 1000)
  4. .forEach(i -> {
  5. Runnable task = () ->
  6. atomicInt.updateAndGet(n -> n + 2);
  7. executor.submit(task);
  8. });
  9. stop(executor);
  10. System.out.println(atomicInt.get()); // => 2000

accumulateAndGet()可以接收IntBinaryOperator类型的lambda表达式。

  1. AtomicInteger atomicInt = new AtomicInteger(0);
  2. ExecutorService executor = Executors.newFixedThreadPool(2);
  3. IntStream.range(0, 1000)
  4. .forEach(i -> {
  5. Runnable task = () ->
  6. atomicInt.accumulateAndGet(i, (n, m) -> n + m); executor.submit(task); }); stop(executor); System.out.println(atomicInt.get()); // => 499500

LongAdder

AtomicLong一样,LongAdder可以用来求和

  1. ExecutorService executor = Executors.newFixedThreadPool(2);
  2. IntStream.range(0, 1000)
  3. .forEach(i -> executor.submit(adder::increment));
  4. stop(executor);
  5. System.out.println(adder.sumThenReset()); // => 1000

LongAccumulator

LongAccumulator是一个更加通用的LongAdder,相比于LongAdder仅支持简单的长整型加法操作,LongAccumulator可以支持LongBinaryOperator类型的lambda表达式。比如下面的示例:

  1. LongBinaryOperator op = (x, y) -> 2 * x + y;
  2. LongAccumulator accumulator = new LongAccumulator(op, 1L);
  3. ExecutorService executor = Executors.newFixedThreadPool(2);
  4. IntStream.range(0, 10)
  5. .forEach(i -> executor.submit(() -> accumulator.accumulate(i))); stop(executor); System.out.println(accumulator.getThenReset()); // => 2539
  6. LongBinaryOperator op = (x, y) -> 2 * x + y;
  7. LongAccumulator accumulator = new LongAccumulator(op, 1L);
  8. ExecutorService executor = Executors.newFixedThreadPool(2);
  9. IntStream.range(0, 10)
  10. .forEach(i -> executor.submit(() -> accumulator.accumulate(i))); stop(executor); System.out.println(accumulator.getThenReset()); // => 2539

ConcurrentMap and ConcurrentHashMap

ConcurrentMap继承实现了Map接口,定义了常用并发集合类型。在Java 8中,接口新增了许多方法以支持函数式编程。

  1. ConcurrentMap<String, String> map = new ConcurrentHashMap<>();
  2. map.put("foo", "bar");
  3. map.put("han", "solo");
  4. map.put("r2", "d2");
  5. map.put("c3", "p0");

forEach方法接收BiConsumer类型的lambda表达式,其将map中的键值对作为参数传入给BiConsumerforEach()可以作为迭代遍历map的方法替代。

  1. map.forEach((key, value) -> System.out.printf("%s = %s\n", key, value));
  • putIfAbsent()

    String value = map.putIfAbsent(“c3”, “p1”);
    System.out.println(value); // p0

  • getOrDefault()

    String value = map.getOrDefault(“hi”, “there”);
    System.out.println(value); // there

  • replaceAll()

    map.replaceAll((key, value) -> “r2”.equals(key) ? “d3” : value); System.out.println(map.get(“r2”)); // d3

  • compute

    map.compute(“foo”, (key, value) -> value + value);
    System.out.println(map.get(“foo”)); // barbar

  • merge()

    map.merge(“foo”, “boo”, (oldVal, newVal) -> newVal + “ was “ + oldVal); System.out.println(map.get(“foo”)); // boo was foo

  • search

    String result = map.search(1, (key, value) -> { System.out.println(Thread.currentThread().getName()); if (“foo”.equals(key)) { return value; } return null; }); System.out.println(“Result: “ + result); // ForkJoinPool.commonPool-worker-2 // main // ForkJoinPool.commonPool-worker-3 // Result: bar

    String result = map.searchValues(1, value -> {

    1. System.out.println(Thread.currentThread().getName());
    2. if (value.length() > 3) {
    3. return value;
    4. }
    5. return null;

    });

    System.out.println(“Result: “ + result);

    // ForkJoinPool.commonPool-worker-2
    // main
    // main
    // ForkJoinPool.commonPool-worker-1
    // Result: solo

  • reduce

reduce接收两个BiFunction类型的lambda表达式作为参数,第一个参数将键值对转换为任意类型的单值,第二个参数将转换后的值组合到一起组成一个结果值,忽略null场景。

  1. String result = map.reduce(1,
  2. (key, value) -> {
  3. System.out.println("Transform: " + Thread.currentThread().getName());
  4. return key + "=" + value;
  5. },
  6. (s1, s2) -> {
  7. System.out.println("Reduce: " + Thread.currentThread().getName());
  8. return s1 + ", " + s2;
  9. });
  10. System.out.println("Result: " + result);
  11. // Transform: ForkJoinPool.commonPool-worker-2
  12. // Transform: main
  13. // Transform: ForkJoinPool.commonPool-worker-3
  14. // Reduce: ForkJoinPool.commonPool-worker-3
  15. // Transform: main
  16. // Reduce: main
  17. // Reduce: main
  18. // Result: r2=d2, c3=p0, han=solo, foo=bar

https://winterbe.com/posts/2015/05/22/java8-concurrency-tutorial-atomic-concurrent-map-examples/

发表评论

表情:
评论列表 (有 0 条评论,618人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Java ConcurrentMap 接口

    Java ConcurrentMap 接口 Java 集合框架的ConcurrentMap接口提供了一个线程安全的映射。也就是说,多个线程可以一次访问该映射,而不会影响映...