Java的同步容器和并发容器

梦里梦外; 2022-08-21 14:48 406阅读 0赞

前言:

之前在介绍Java集合的时候说到,java提供的实现类很少是线程安全的。只有几个比较古老的类,比如Vector、Hashtable等是线程安全的,尤其是Hashtable,古老到连命名规范都没统一了……

同步容器:

1)Vector和Hashtable

来简单比较下:

Hashtable和Vector实现同步的方式也很类似,都是使用synchronized关键字,利用java的内置锁来实现。那些新增的线程不安全的,如果实现线程安全呢?

  1. public class Hashtable<K,V> extends Dictionary<K,V> implements Map<K,V>, Cloneable, java.io.Serializable {
  2. public synchronized int size() {
  3. ...
  4. }
  5. public synchronized V get(Object key) {
  6. ...
  7. }

2)synchronized.XXX

介绍一个工具类Collections,其中对各种类型的集合进行了封装,实现了线程安全。

这里写图片描述

以其中的SynchronizedList为例,可以看到也是用的synchronized关键字来实现的。

注意:SynchronizedList并不是对某个实现类的封装,而面对的是List接口。

  1. static class SynchronizedList<E> extends SynchronizedCollection<E> implements List<E> {
  2. private static final long serialVersionUID = -7754090372962971524L;
  3. final List<E> list;
  4. SynchronizedList(List<E> list) {
  5. super(list);
  6. this.list = list;
  7. }
  8. SynchronizedList(List<E> list, Object mutex) {
  9. super(list, mutex);
  10. this.list = list;
  11. }
  12. public boolean equals(Object o) {
  13. if (this == o)
  14. return true;
  15. synchronized (mutex) {
  16. return list.equals(o);}
  17. }
  18. public int hashCode() {
  19. synchronized (mutex) {
  20. return list.hashCode();}
  21. }
  22. public E get(int index) {
  23. synchronized (mutex) {
  24. return list.get(index);}
  25. }
  26. public E set(int index, E element) {
  27. synchronized (mutex) {
  28. return list.set(index, element);}
  29. }
  30. public void add(int index, E element) {
  31. synchronized (mutex) {list.add(index, element);}
  32. }
  33. public E remove(int index) {
  34. synchronized (mutex) {
  35. return list.remove(index);}
  36. }
  37. }

无论是哪一种同步容器,在加锁限制时,都将同步代码块限制在了一个业务合理的最小粒度上。比如:SynchronizedList的get、set、remove等。
首先,需要承认的是这样做是相当合理的。但选择就意味着有利有弊,他不好的地方就是,如果程序需要稍微复杂点的操作,获取最后一个元素或者删除最后一个元素等等。这种情况下,我们就无法再指望人家能给你保证原子性了,就需要你客户端加锁了。

并发容器:

由于同步粒度控制的很小,这就使得容器中的方法都变成串行执行,虽然缓解了高并发,但也使得执行效率降低。高性能、高效率一直是我们追求的功能,为了解决这一点,引出了“并发容器”。

常用的有:ConcurrentHashMap、ConcurrentSkipListMap

并发容器替换同步容器,用很小的风险换得了可扩展性的提高。

在介绍ConcurrentHashMap实现之前,先来了解一下分离锁。

锁分段

如果一个锁中竞争很激烈,我们可以把他拆成两个锁。这样的话,可以通过两个线程并发执行,效果会好一点,但并不明显。

如果把拆分后的所扩展一下,分成可大可小的加锁块的集合,并且他们归属相互独立的对象,这就是我们说的锁分段。

ConcurrentHashMap

ConcurrentHashMap中主要实体类就是三个:ConcurrentHashMap(整个Hash表),Segment(桶),Node(节点)。(之前的版本中Node叫做 Hash Entry,一个意思。)

每个segment可能包含若干个table数组,每个table也都包含了若干的Node组成的链表。

如图:

这里写图片描述

ConcurrentHashMap使用了一个包含16个锁的Array,每个锁都守护Hash Bucket的十六分之一。更有利于并发访问,减少了锁的请求。

因为不是主要介绍源码,这里就简单的看几个重点的地方:

1、一些重要的变量常量

英文注释说的挺清楚的,就不翻译了,省得翻译还错了。

  1. /** * The default initial table capacity. Must be a power of 2 * (i.e., at least 1) and at most MAXIMUM_CAPACITY. */
  2. private static final int DEFAULT_CAPACITY = 16;
  3. /** * The array of bins. Lazily initialized upon first insertion. * Size is always a power of two. Accessed directly by iterators. */
  4. transient volatile Node<K,V>[] table;
  5. /** * The next table to use; non-null only while resizing. */
  6. private transient volatile Node<K,V>[] nextTable;
  7. /** * Base counter value, used mainly when there is no contention, * but also as a fallback during table initialization * races. Updated via CAS. */
  8. private transient volatile long baseCount;

2、Segment
继承ReentrantLock 类,说明Segment是可以实现加锁的。

  1. static class Segment<K,V> extends ReentrantLock implements Serializable {
  2. private static final long serialVersionUID = 2249069246763182397L;
  3. final float loadFactor;
  4. Segment(float lf) { this.loadFactor = lf; }
  5. }

3、Node类
实现了Map.entry,重写了一个equals和find方法。

  1. static class Node<K,V> implements Map.Entry<K,V> {
  2. final int hash;
  3. final K key;
  4. volatile V val;
  5. volatile Node<K,V> next;
  6. Node(int hash, K key, V val, Node<K,V> next) {
  7. this.hash = hash;
  8. this.key = key;
  9. this.val = val;
  10. this.next = next;
  11. }
  12. public final boolean equals(Object o) {
  13. Object k, v, u; Map.Entry<?,?> e;
  14. return ((o instanceof Map.Entry) &&
  15. (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
  16. (v = e.getValue()) != null &&
  17. (k == key || k.equals(key)) &&
  18. (v == (u = val) || v.equals(u)));
  19. }
  20. /** * Virtualized support for map.get(); overridden in subclasses. */
  21. Node<K,V> find(int h, Object k) {
  22. Node<K,V> e = this;
  23. if (k != null) {
  24. do {
  25. K ek;
  26. if (e.hash == h &&
  27. ((ek = e.key) == k || (ek != null && k.equals(ek))))
  28. return e;
  29. } while ((e = e.next) != null);
  30. }
  31. return null;
  32. }
  33. }

注意:这里使用了volatile关键字,这也是实现线程同步的一种方式,一般而言,可以起到加锁的作用。

4、put()

介绍了这么多,说说他加锁的地方吧。在首次插入的时候,使用cas来保持同步,其他insert、delete、replace等需要使用锁来同步。

CAS : Compare And Swap,一种常用的复合操作。

以put为例:

  1. public V put(K key, V value) {
  2. return putVal(key, value, false);
  3. }
  4. /** Implementation for put and putIfAbsent */
  5. final V putVal(K key, V value, boolean onlyIfAbsent) {
  6. //判空校验
  7. if (key == null || value == null) throw new NullPointerException();
  8. int hash = spread(key.hashCode());
  9. int binCount = 0;
  10. for (Node<K,V>[] tab = table;;) {
  11. Node<K,V> f; int n, i, fh;
  12. //延迟初始化
  13. if (tab == null || (n = tab.length) == 0)
  14. tab = initTable();
  15. //首次put添加,不使用锁,而是CAS操作
  16. else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
  17. if (casTabAt(tab, i, null,
  18. new Node<K,V>(hash, key, value, null)))
  19. break; // no lock when adding to empty bin
  20. }
  21. else if ((fh = f.hash) == MOVED)
  22. tab = helpTransfer(tab, f);
  23. else {
  24. //不是首次put,需要加锁,注意Synchronized锁住的对象时f,即Node
  25. V oldVal = null;
  26. synchronized (f) {
  27. if (tabAt(tab, i) == f) {
  28. if (fh >= 0) {
  29. binCount = 1;
  30. for (Node<K,V> e = f;; ++binCount) {
  31. K ek;
  32. //如果已经存在key值,替换掉value
  33. if (e.hash == hash &&
  34. ((ek = e.key) == key ||
  35. (ek != null && key.equals(ek)))) {
  36. oldVal = e.val;
  37. if (!onlyIfAbsent)
  38. e.val = value;
  39. break;
  40. }
  41. Node<K,V> pred = e;
  42. //如果不存在key值,创建一个Node
  43. if ((e = e.next) == null) {
  44. pred.next = new Node<K,V>(hash, key,
  45. value, null);
  46. break;
  47. }
  48. }
  49. }
  50. else if (f instanceof TreeBin) {
  51. Node<K,V> p;
  52. binCount = 2;
  53. if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
  54. value)) != null) {
  55. oldVal = p.val;
  56. if (!onlyIfAbsent)
  57. p.val = value;
  58. }
  59. }
  60. }
  61. }
  62. if (binCount != 0) {
  63. if (binCount >= TREEIFY_THRESHOLD)
  64. treeifyBin(tab, i);
  65. if (oldVal != null)
  66. return oldVal;
  67. break;
  68. }
  69. }
  70. }
  71. addCount(1L, binCount);
  72. return null;
  73. }

不足之处:

正因为锁细化的太好了,如果你要想独占互斥访问的话,那就很难的,付出的成本是很高的。

为了稍稍改变一下这方面被动的局势,ConcurrentHashMap对一些常用的复合操作进行了封装,比如:“相等便替换”,“没有就添加”等等,如果使用同步集合,需要程序员自己加锁控制,这里就直接给写好了。

这里写图片描述

感受:

    随着jdk不断更新,java一直在探索中,不断完善和改进。比如java2对集合的变动,java5对线程的扩展等等。在改进的过程中,也在不断参考C、C++更重语言的精华思想。当然,在借鉴的同时,他要保持java这门语言的设计原则和初衷。如果二者出现冲突,那么他肯定是不会借鉴的,或者借鉴一个皮毛的东西。

    行了,不扯了。其实就是想说一点,如果之前的设计很不错,那么后期就不会进行特别大的完善,都是修修补补,类似改个bug什么的。同理,如果后面进行了很大的补充或者修改,那么之前的缺点肯定是多到不行了。新的设计也许有缺点,但相比之前,一定是利大于弊。

    因此,如果想知道自己对一个知识点掌握的如何时,不妨试试能不能把他的演变历史讲出来,如果又蒙又猜,自己能说服自己的话,就很差不多了,剩下的就是去看书查阅资料验证自己的猜想吧!

发表评论

表情:
评论列表 (有 0 条评论,406人围观)

还没有评论,来说两句吧...

相关阅读

    相关 同步容器并发容器及使用

    1.什么是同步容器?同步容器使用什么方式实现线程安全 同步容器 可以简单的理解位通过synchronized来实现同步的容器,如果有多个线程调用同步容器的方法,它们将会串行执