Java并发编程---并发类容器(Queue容器)

﹏ヽ暗。殇╰゛Y 2022-06-02 03:06 496阅读 0赞

一.概念

1.1 并发Queue

在并发队列上JDK提供了两套实现,一个是以ConcurrentLinkedQueue为代表的高性能队列,一个是以BlockingQueue接口为代表的阻塞队列,无论哪种都继承自Queue

Center

1.2 ConcurrentLinkedQueue(属于Queue队列接口下的方法)

ConcurrentLinkedQueue是一个适用于高并发场景的队列,通过无锁的方式,实现了高并发状态下的高性能,通常ConcurrentLinkedQueue性能好于BlockingQueue.它是一个基于链接节点的无界线程安全队列.该队列的元素遵循先进先出的原则.头是最先加入的,尾是最近加入的,该队列不允许null元素

ConcurrentLinkedQueue重要方法:

add()和offer()都是加入元素的方法(在ConcurrentLinkedQueue中,这两个方法没有任何区别)

poll()和peek()都是取头元素节点,区别在于前者会删除元素,后者不会

1.3 BlockingQueue(阻塞队列)接口

(1) ArrayBlockingQueue: 基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,其内部没实现读写分离,也就意味着生产者和消费者不能完全并行,长度是需要定义的,可以指定先进先出或者先进后出,也叫有界队列,在很多场合非常适合使用.

(2) LinkedBlockingQueue:基于链表的阻塞队列,同ArrayBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),LinkedBlockingQueue之所以能够高效地处理并发数据,是因为内部实现采用分离锁(读写分离两个锁),从而实现生产者和消费者操作的完全并行运行.他是一个无界队列

(3) synchronousQueue: 一种没有缓冲的队列,生产者产生的数据直接会被消费者获取并消费

(4) PriorityBlockingQueue: 基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定,也就是说传入队列的对象必须实现Comparable接口),在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁,他也是一个无界的队列.

(5) DelayQueue: 带有延时时间的Queue,其中的元素只有当其指定的延时时间到了,才能够从队列中获取到该元素.DelayQueue中的元素必须实现Delayed接口,DelayQueue是一个没有大小限制的队列,应用场景很多,比如对缓存超时的数据进行移除,任务超时处理,空闲连接的关闭等等.

二.代码示例

  1. package com.thread.container;
  2. import java.util.ArrayList;
  3. import java.util.Iterator;
  4. import java.util.List;
  5. import java.util.concurrent.ArrayBlockingQueue;
  6. import java.util.concurrent.ConcurrentLinkedQueue;
  7. import java.util.concurrent.LinkedBlockingQueue;
  8. import java.util.concurrent.SynchronousQueue;
  9. import java.util.concurrent.TimeUnit;
  10. public class UseQueue {
  11. public static void main(String[] args) throws Exception {
  12. // TODO Auto-generated method stub
  13. //高性能无阻塞无界队列: ConcurrentLinkedQueue
  14. ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<String>();
  15. q.offer("a");
  16. q.offer("b");
  17. q.offer("c");
  18. q.offer("d");
  19. q.add("e");
  20. System.out.println(q.poll()); //a 从头部取出元素,并从队列里删除
  21. System.out.println(q.size()); //4
  22. System.out.println(q.peek()); //b
  23. System.out.println(q.size()); //4
  24. ArrayBlockingQueue<String> array = new ArrayBlockingQueue<String>(5);
  25. array.put("a");
  26. array.put("b");
  27. array.put("c");
  28. array.put("d");
  29. array.put("e");
  30. array.put("f");
  31. System.out.println(array.offer("a",3,TimeUnit.SECONDS)); //队列有界是5个,现在放入6个,所以会报Queue full(容器满了)的异常
  32. //阻塞队列
  33. LinkedBlockingQueue<String> q = new LinkedBlockingQueue<String>();
  34. q.offer("a");
  35. q.offer("b");
  36. q.offer("c");
  37. q.offer("d");
  38. q.offer("e");
  39. q.add("f");
  40. System.out.println(q.size());
  41. for(Iterator iterator = q.iterator(); iterator.hasNext();){
  42. String str = (String) iterator.next();
  43. System.out.println(str);
  44. }
  45. List<String> list = new ArrayList<String>();
  46. System.out.println(q.drainTo(list,3));
  47. System.out.println(list.size());
  48. for(String str : list){
  49. System.out.println(str);
  50. }
  51. //一种没有缓冲的队列,生产者产生的数据直接会被消费者获取并消费
  52. final SynchronousQueue<String> q = new SynchronousQueue<String>();
  53. Thread t1 = new Thread(new Runnable() {
  54. @Override
  55. public void run() {
  56. // TODO Auto-generated method stub
  57. try {
  58. System.out.println("进入t1线程中,阻塞等待获取元素...");
  59. System.out.println("消费: " + q.take());
  60. } catch (InterruptedException e) {
  61. // TODO Auto-generated catch block
  62. e.printStackTrace();
  63. }
  64. }
  65. });
  66. t1.start();
  67. TimeUnit.SECONDS.sleep(2); //阻塞2秒钟
  68. Thread t2 = new Thread(new Runnable() {
  69. @Override
  70. public void run() {
  71. // TODO Auto-generated method stub
  72. q.add("haige");
  73. }
  74. });
  75. t2.start();
  76. }
  77. }

PriorityBlockingQueue容器的示例

  1. package com.thread.container;
  2. public class Task implements Comparable<Task> {
  3. private int id;
  4. private String name;
  5. public int getId() {
  6. return id;
  7. }
  8. public void setId(int id) {
  9. this.id = id;
  10. }
  11. public String getName() {
  12. return name;
  13. }
  14. public void setName(String name) {
  15. this.name = name;
  16. }
  17. public int compareTo(Task task) {
  18. // TODO Auto-generated method stub
  19. return this.id>task.id ? 1 : (this.id<task.id ? -1 : 0);
  20. }
  21. @Override
  22. public String toString() {
  23. return "Task [id=" + id + ", name=" + name + "]";
  24. }
  25. }
  26. package com.thread.container;
  27. import java.util.concurrent.PriorityBlockingQueue;
  28. /**
  29. * PriorityBlockingQueue示例
  30. * @author lhy
  31. * @time 2018.01.02
  32. */
  33. public class UsePriorityBlockingQueue {
  34. public static void main(String[] args) throws Exception {
  35. // TODO Auto-generated method stub
  36. PriorityBlockingQueue<Task> q = new PriorityBlockingQueue<Task>();
  37. Task t1 = new Task();
  38. t1.setId(3);
  39. t1.setName("haige");
  40. Task t2 = new Task();
  41. t2.setId(4);
  42. t2.setName("xiaozhang");
  43. Task t3 = new Task();
  44. t3.setId(1);
  45. t3.setName("xiaoke");
  46. q.add(t1); //3
  47. q.add(t2); //4
  48. q.add(t3); //1
  49. //1 3 4
  50. System.out.println("容器: " + q);
  51. System.out.println(q.take().getId());
  52. System.out.println("容器: " + q);
  53. System.out.println(q.take().getId());
  54. System.out.println("容器: " + q);
  55. System.out.println(q.take().getId());
  56. }
  57. }

DelayQueue示例

  1. package com.thread.container;
  2. import java.util.concurrent.Delayed;
  3. import java.util.concurrent.TimeUnit;
  4. public class WangMin implements Delayed {
  5. private String name;
  6. //身份证
  7. private String id;
  8. //截止时间
  9. private long endTime;
  10. //定义时间工具类
  11. private TimeUnit timeUtil = TimeUnit.SECONDS;
  12. public WangMin(String name, String id, long endTime) {
  13. super();
  14. this.name = name;
  15. this.id = id;
  16. this.endTime = endTime;
  17. }
  18. public WangMin() {
  19. // TODO Auto-generated constructor stub
  20. }
  21. public String getName() {
  22. return name;
  23. }
  24. public void setName(String name) {
  25. this.name = name;
  26. }
  27. public String getId() {
  28. return id;
  29. }
  30. public void setId(String id) {
  31. this.id = id;
  32. }
  33. public long getEndTime() {
  34. return endTime;
  35. }
  36. public void setEndTime(long endTime) {
  37. this.endTime = endTime;
  38. }
  39. //重写下面两方法,比较
  40. @Override
  41. public int compareTo(Delayed delayed) {
  42. // TODO Auto-generated method stub
  43. WangMin w = (WangMin) delayed;
  44. return this.getDelay(this.timeUtil) - w.getDelay(this.timeUtil) > 0 ?1:0 ;
  45. }
  46. //用来判断是否到了截止时间(两者相减<=0,则说明到了下机时间)
  47. @Override
  48. public long getDelay(TimeUnit unit) {
  49. // TODO Auto-generated method stub
  50. return endTime - System.currentTimeMillis();
  51. }
  52. }
  53. package com.thread.container;
  54. import java.util.concurrent.DelayQueue;
  55. /**
  56. * DelayQueue示例
  57. * @author lhy
  58. * @ time 2018.01.02
  59. */
  60. public class WangBa implements Runnable {
  61. private DelayQueue<WangMin> queue = new DelayQueue<WangMin>();
  62. public boolean yinyue = true;
  63. public void Shangji(String name, String id, int money){
  64. WangMin man = new WangMin(name, id, 1000 * money + System.currentTimeMillis());
  65. System.out.println("网民" + man.getName() + "身份证" + man.getId() + "交钱" + money + "块,开始上机...");
  66. this.queue.add(man);
  67. }
  68. public void Xiaji(WangMin man){
  69. System.out.println("网民" + man.getName() + "身份证" + man.getId() + "时间到下机");
  70. }
  71. @Override
  72. public void run() {
  73. // TODO Auto-generated method stub
  74. while(yinyue){
  75. try {
  76. WangMin man = queue.take();
  77. Xiaji(man);
  78. } catch (InterruptedException e) {
  79. // TODO Auto-generated catch block
  80. e.printStackTrace();
  81. }
  82. }
  83. }
  84. public static void main(String[] args) {
  85. // TODO Auto-generated method stub
  86. try {
  87. System.out.println("网吧开始营业");
  88. WangBa haige = new WangBa();
  89. Thread shangwang = new Thread(haige);
  90. shangwang.start();
  91. haige.Shangji("yangxu", "123", 1);
  92. haige.Shangji("yangming", "123", 5);
  93. haige.Shangji("yangge", "123", 10);
  94. } catch (Exception e) {
  95. // TODO Auto-generated catch block
  96. e.printStackTrace();
  97. }
  98. }
  99. }

发表评论

表情:
评论列表 (有 0 条评论,496人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Java并发编程---同步容器

    一.概念 同步类容器都是线程安全的,但在某些场景下可能需要加锁来保护复合操作.复合类操作如:迭代(反复访问元素,遍历完容器中所有的元素),跳转(根据指定的顺序找到当前元素的下