Java并发 - Queue

- 日理万妓 2022-09-07 05:49 249阅读 0赞

​ 队列的特点是先进先出,栈的特点是后进先出。Queue继承Collection接口,Stack继承Vector容器类,最顶层接口也是Collection。在Java中容器分为Collection和Map两大类。 Collection家族中除了常见的List、Set,现在又新增一个Queue、Stack。

  1. public interface Queue<E> extends Collection<E> { }

BlockingQueue 的四组API


































方式 抛出异常 不抛异常,有返回值 阻塞等待 超时等待
入队列 add offer put offer(Element,Time,TimeUnit)
出队列 remove poll take poll(,)
返回头部元素 element peek - -

BlockingQueue主要有两个实现:ArrayBlockingQueue、LinkedBlockingQueue。

BlockingQueue的其中一组核心方法(只介绍其中一组,其它三组类似):






































方法名 描述
offer(anObject) 表示如果可以的话,将anObject加到BlockingQueue里,放入成功返回true,否则返回false。
offer(E o, long timeout, TimeUnit unit) 可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。
put(anObject) 把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续。
poll(time) 取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null。
poll(long timeout, TimeUnit unit) 从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则直到时间超时还没有数据可取,返回失败。
take() 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入;。
drainTo() 一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

在ArrayBlockingQueue中对poll(long timeout, TimeUnit unit) 方法的具体实现:

  1. public E poll(long timeout, TimeUnit unit) throws InterruptedException {
  2. // 底层统一是以纳秒作为时间的计算单位的
  3. long nanos = unit.toNanos(timeout);
  4. final ReentrantLock lock = this.lock;
  5. lock.lockInterruptibly();
  6. try {
  7. // 轮询获取元素出队列
  8. while (count == 0) {
  9. if (nanos <= 0)
  10. return null;
  11. nanos = notEmpty.awaitNanos(nanos);
  12. }
  13. return dequeue();
  14. } finally {
  15. lock.unlock();
  16. }
  17. }

应用示例

  1. BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
  2. // 同步队列不存储元素:一个元素进出完成后,下一个元素才能进出。
  3. BlockingQueue<String> syncQueue = new SynchronousQueue<>();
  4. /** * 抛异常: * add()、remove()、element() */
  5. public void test1() {
  6. // ============== add ============= //
  7. queue.add("1");
  8. queue.add("2");
  9. queue.add("3");
  10. // 再进一个元素则报错
  11. queue.add("4");
  12. // ============== element ============= //
  13. // 检查队首元素(该操作不会出队列)
  14. System.out.println(queue.element());
  15. // ============== remove ============= //
  16. System.out.println(queue.remove());
  17. System.out.println(queue.remove());
  18. System.out.println(queue.remove());
  19. // 再取一个
  20. System.out.println(queue.remove());
  21. }
  22. /** * 不抛异常,有返回值 * offer()、poll()、peek() */
  23. public void test2() {
  24. // ============== offer ============= //
  25. queue.offer("1");
  26. queue.offer("2");
  27. queue.offer("3");
  28. // 再添加一个
  29. System.out.println(queue.offer("4"));
  30. // ============== poll ============= //
  31. // 检查队首元素(该操作不会出队列)
  32. System.out.println(queue.peek());
  33. // ============== peek ============= //
  34. queue.poll();
  35. queue.poll();
  36. queue.poll();
  37. System.out.println(queue.size());
  38. // 再取一个
  39. System.out.println(queue.poll());
  40. }
  41. /** * 阻塞等待 * put、take */
  42. public void test3() throws InterruptedException {
  43. // ============== put ============= //
  44. queue.put("1");
  45. queue.put("2");
  46. queue.put("3");
  47. // 再添加一个,阻塞
  48. queue.put("4");
  49. // ============== take ============= //
  50. queue.take();
  51. queue.take();
  52. queue.take();
  53. System.out.println(queue.size());
  54. // 再取一个,阻塞
  55. queue.take();
  56. }
  57. /** * 超时等待 * offer、poll */
  58. public void test4() throws InterruptedException {
  59. queue.offer("1");
  60. queue.offer("2");
  61. queue.offer("3");
  62. // 2s后入队列失败
  63. System.out.println(queue.offer("4", 2, TimeUnit.SECONDS));
  64. queue.poll();
  65. queue.poll();
  66. queue.poll();
  67. System.out.println(queue.size());
  68. // 2s后出队列失败
  69. System.out.println(queue.poll(2, TimeUnit.SECONDS));
  70. }

SynchronousQueue

同步队列,不存储元素,一个元素进出完成后,下一个元素才能进出。

  1. /** * 同步队列 * 创建两个线程:线程一存取三个元素,线程二读取三个元素 */
  2. public void test5() {
  3. new Thread(() -> {
  4. try {
  5. System.out.println(Thread.currentThread().getName() + " put 1");
  6. syncQueue.put("1");
  7. System.out.println(Thread.currentThread().getName() + " put 2");
  8. syncQueue.put("2");
  9. System.out.println(Thread.currentThread().getName() + " put 3");
  10. syncQueue.put("3");
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. }, "A").start();
  15. new Thread(() -> {
  16. try {
  17. // 等待两秒再取,确保线程1入队列成功
  18. TimeUnit.SECONDS.sleep(2);
  19. System.out.println(Thread.currentThread().getName() + "=>" + syncQueue.take());
  20. TimeUnit.SECONDS.sleep(2);
  21. System.out.println(Thread.currentThread().getName() + "=>" + syncQueue.take());
  22. TimeUnit.SECONDS.sleep(2);
  23. System.out.println(Thread.currentThread().getName() + "=>" + syncQueue.take());
  24. } catch (InterruptedException e) {
  25. e.printStackTrace();
  26. }
  27. }, "B").start();
  28. }

PriorityQueue 优先级队列

PriorityQueue是一个带有优先级的队列,而不是先进先出队列,元素按优先级顺序被移除,该队列也没有上限(即 Integer.MAX_VALUE),无容量限制,自动扩容。

此队列虽然没有容量限制,但是会由于服务器资源耗尽抛OutOfMemoryError异常。

如果队列为空,那么取元素的操作take就会阻塞,所以检索操作take是受阻的。

放入PriorityQueue中的元素需要具有比较能力。

  1. public class PriorityQueueDemo {
  2. public static void main(String[] args) {
  3. // 设置比对方式
  4. PriorityQueue<String> priorityQueue = new PriorityQueue<>(new Comparator<String>() {
  5. @Override
  6. public int compare(String o1, String o2) {
  7. return 0;
  8. }
  9. });
  10. priorityQueue.add("c");
  11. priorityQueue.add("a");
  12. priorityQueue.add("b");
  13. System.out.println(priorityQueue.poll());
  14. System.out.println(priorityQueue.poll());
  15. System.out.println(priorityQueue.poll());
  16. // 定义MessageObject存放的优先级
  17. PriorityQueue<MessageObject> MessageObjectQueue = new PriorityQueue<>(new Comparator<MessageObject>() {
  18. @Override
  19. public int compare(MessageObject o1, MessageObject o2) {
  20. // Order比较大的MessageObject放后面
  21. return o1.order > o2.order ? -1 : 1;
  22. }
  23. });
  24. }
  25. static class MessageObject {
  26. String content;
  27. int order;
  28. }
  29. }

下面探寻一下延时队列的实现原理。

  1. public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
  2. implements BlockingQueue<E> {
  3. // 基于PriorityQueue来实现的延时队列
  4. private final PriorityQueue<E> q = new PriorityQueue<E>();
  5. ...
  6. }

DelayQueue的泛型必须实现Delayed接口。

  1. public interface Delayed extends Comparable<Delayed> {
  2. /** * Returns the remaining delay associated with this object, in the * given time unit. * 这个元素需要在队列中待多久时间 * @param unit the time unit * @return the remaining delay; zero or negative values indicate * that the delay has already elapsed */
  3. long getDelay(TimeUnit unit);
  4. }

应用示例

线程池中的定时调度就是使用这样的方法实现的。

  1. public class DelayQueueDemo {
  2. public static void main(String[] args) throws InterruptedException {
  3. DelayQueue<Message> delayQueue = new DelayQueue<Message>();
  4. // 这条消息5秒后发送
  5. Message message = new Message("message - 00001", new Date(System.currentTimeMillis() + 5000L));
  6. delayQueue.add(message);
  7. while (true) {
  8. System.out.println(delayQueue.poll());
  9. Thread.sleep(1000L);
  10. }
  11. }
  12. // 实现Delayed接口的元素才能存到DelayQueue
  13. static class Message implements Delayed {
  14. String content;
  15. Date sendTime;
  16. /** * @param content 消息内容 * @param sendTime 定时发送 */
  17. public Message(String content, Date sendTime) {
  18. this.content = content;
  19. this.sendTime = sendTime;
  20. }
  21. /** * 判断当前这个元素是不是已经到了需要被拿出来的时间 */
  22. @Override
  23. public long getDelay(TimeUnit unit) {
  24. long duration = sendTime.getTime() - System.currentTimeMillis();
  25. return TimeUnit.NANOSECONDS.convert(duration, TimeUnit.MILLISECONDS);
  26. }
  27. @Override
  28. public int compareTo(Delayed o) {
  29. return o.getDelay(TimeUnit.NANOSECONDS) > this.getDelay(TimeUnit.NANOSECONDS) ? 1 : -1;
  30. }
  31. }
  32. }

发表评论

表情:
评论列表 (有 0 条评论,249人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Java并发 - Queue

    ​ 队列的特点是先进先出,栈的特点是后进先出。Queue继承Collection接口,Stack继承Vector容器类,最顶层接口也是Collection。在Java中容器分为

    相关 Queue Java

    队列是一种数据结构.它有两个基本操作:在队列尾部加人一个元素,和从队列头部移除一个元素就是说,队列以一种先进先出的方式管理数据,如果你试图向一个 已经满了的阻塞队列中添加一个元

    相关 并发Queue

    并发Queue        在并发的队列上jdk提供了两套实现,一个是以ConcurrentLinkedQueue为代表的高性能队列,一个是以BlockingQueue接