阻塞队列及其原理

硅谷探秘者 4183 0 0

1.什么是阻塞队列

        阻塞队列是一个在队列基础上又支持了两个附加操作的队列。

2.支持阻塞的插入方法:队列满时,队列会阻塞插入元素的线程,直到队列不满。

1.支持阻塞的移除方法:队列空时,获取元素的线程会等待队列变为非空。


2.阻塞队列的应用场景:

        阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。简而言之,阻塞队列是生产者用来存放元素、消费者获取元素的容器。


3.java的阻塞队列

        自从Java 1.5之后,在java.util.concurrent包下提供了若干个阻塞队列,主要有以下几个:

ArrayBlockingQueue:基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。

        LinkedBlockingQueue:基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE。

        PriorityBlockingQueue:以上2种队列都是先进先出队列,而PriorityBlockingQueue却不是,它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。注意,此阻塞队列为无界阻塞队列,即容量没有上限(通过源码就可以知道,它没有容器满的信号标志),前面2种都是有界队列。

        DelayQueue:基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。


4.非阻塞队列中的方法

add(E e):将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则会抛出异常;

remove():移除队首元素,若移除成功,则返回true;如果移除失败(队列为空),则会抛出异常;

offer(E e):将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则返回false;

poll():移除并获取队首元素,若成功,则返回队首元素;否则返回null;

peek():获取队首元素,若成功,则返回队首元素;否则返回null



5.阻塞队列中的方法

        阻塞队列包括了非阻塞队列中的大部分方法,上面列举的5个方法在阻塞队列中都存在,但是要注意这5个方法在阻塞队列中都进行了同步措施。除此之外,阻塞队列提供了另外4个非常有用的方法:

       put(E e)

  take()

  offer(E e,long timeout, TimeUnit unit)

  poll(long timeout, TimeUnit unit)


  put方法用来向队尾存入元素,如果队列满,则等待;

  take方法用来从队首取元素,如果队列为空,则等待;

  offer方法用来向队尾存入元素,如果队列满,则等待一定的时间,当时间期限达到时,如果还没有插入成功,则返回false;否则返回true;

  poll方法用来从队首取元素,如果队列空,则等待一定的时间,当时间期限达到时,如果取到,则返回null;否则返回取得的元素;



6.阻塞队列提供了四种处理方法

方法\处理方式 抛出异常         返回特殊值 一直阻塞         超时退出

插入方法         add(e) offer(e) put(e) offer(e,time,unit)

移除方法         remove()         poll() take() poll(time,unit)

检查方法         element()         peek() 不可用 不可用



7.阻塞队列的实现:基于双向表

package threadTest.test5;

public class Node<E> {
	private E e;
	private Node<E> prve;
	private Node<E> next;
	public Node(E e) {
		super();
		this.e = e;
	}
	public E getE() {
		return e;
	}
	public void setE(E e) {
		this.e = e;
	}
	public Node<E> getPrve() {
		return prve;
	}
	public void setPrve(Node<E> prve) {
		this.prve = prve;
	}
	public Node() {
		super();
		// TODO Auto-generated constructor stub
	}
	public Node<E> getNext() {
		return next;
	}
	public void setNext(Node<E> next) {
		this.next = next;
	}
	@Override
	protected void finalize() throws Throwable {
		System.out.println("当前对象被释放:"+e);
	}	
}
package threadTest.test5;

import threadTest.test6.Node;

public class BlockingsQueue<E> {
    //定义两把锁,只是简单的锁
    private Object full = new Object();
    private Object empty = new Object();
    private Node<E> head;//队首
	private Node<E> tail;//队尾
	private Integer size;//队列长度
	private Integer s;//当前队列长度
	/**
	 * 定义队列最大长度 size
	 * @param size
	 */
	public BlockingsQueue(Integer size) {
		super();
		head = new Node<E>();
		tail = new Node<E>();
		head.setNext(tail);
		head.setPrve(null);
		tail.setNext(null);
		tail.setPrve(head);
		this.size = size;
		this.s=0;
	}
	/**
	 * 默认队列长度为:Integer.MAX_VALUE
	 */
	public BlockingsQueue() {
		super();
		head = new Node<E>();
		tail = new Node<E>();
		head.setNext(tail);
		head.setPrve(null);
		tail.setNext(null);
		tail.setPrve(head);
		this.size=Integer.MAX_VALUE;
		this.s=0;
	}
	/**
	 * 入队操作
	 * @param e
	 */
	public void push(E e) {
		synchronized(full){
            while (this.s>=this.size) {//没有更多空间,需要阻塞
                try {
                	System.out.println("没有空间,需要阻塞");
					full.wait();
				} catch (InterruptedException e1) {
					e1.printStackTrace();
				}
            }
		}
		Node<E> n=new Node<E>(e);
		n.setPrve(tail.getPrve());
		n.setNext(tail);
		tail.getPrve().setNext(n);
		tail.setPrve(n);
		this.s++;
		synchronized(empty){
			System.out.println("有数据了,唤醒poll方法");
            empty.notify();//达到了唤醒poll方法的条件
        }
	}
	/**
	 * 出队操作
	 * @return
	 */
	public E pop() {
		synchronized(empty){
            while (s<= 0) {//没有数据,阻塞
                try {
                	System.out.println("没有数据,阻塞");
					empty.wait();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
            }
        }
		Node<E> n=head.getNext();
		n.getNext().setPrve(head);
		head.setNext(n.getNext());
		s--;
		synchronized(full){
			System.out.println("可以放数据 了");
            full.notify();
        }
		return n.getE();
	}
	@Override
	public String toString() {
		Node<E> no=head.getNext();
		while(no!=tail) {
			System.out.println(no.getE());
			no=no.getNext();
		}
		return "";
	}
}
package threadTest.test5;
//主线程
public class TestMain {
	public static void main(String[] args) {
		BlockingsQueue<Integer> q=new BlockingsQueue<>(3);
		new Thread(new Runnable() {
			@Override
			public void run() {
				// TODO Auto-generated method stub
				for(int i=0;i<100;i++) {
					try {
						Thread.sleep(1000);
						q.push(i);
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
			}
		}).start();
		
		new Thread(new Runnable() {
			@Override
			public void run() {
				// TODO Auto-generated method stub
				for(int i=0;i<100;i++) {
					try {
						Thread.sleep(1000);
						System.out.println(q.pop());
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
		}).start();
	}
}



评论区
请写下您的评论...
暂无评论...
猜你喜欢
数据结构与算法 1338 ,放入后台的一个执行中,后台可以慢慢执行,当中没有业务数据时,使该执行线程进入等待状态。当业务数据添加进中后唤醒处于等待状态的执行线程,继续处业务。一、的实现packagecom.
java基础 1526 基于双向链表结构直接代码:packagethreadTest.test6;publicclassNodeE{ privateEe; privateNodeEprve; privateNodeEnext; publicNode(Ee){ super(); this.e=e; } publicEgetE(){ returne; } publicvoidsetE(Ee){ this.e=e; }
official 996 之前的文章中提到了java中的nio是同步非的网络io模型,本文就主要说明一下同步、异步、、非的概念来帮助解nio。io操作IO分两阶段(一旦拿到数据后就变成了数据操作,不再是IO
ASM,java基础 992   关于cglib代的概念和常用api,请参考:初步探究cglib动态代:http://www.jiajiajia.club/blog/artical/yjw520
weblog 4036 int不需要。 int的默认值是0,而Integer的默认值是null。 然而仅仅知道这些,对两者的了解还是远远不够的。那么下面就继续探索。 语法糖的味道-自动装箱和拆箱的 什么是自动装箱和
official 899 态→就绪态:需修改PCB内容和相应就绪态→运行态:需恢复进程运行环境、修改PCB内容和相应运行态→态:保存进程运行环境,修改PCB内容和相应态→就绪态:需修改PCB内容和相应。果
official 1563 上一篇《一起学netty(2)nio模型多路复用器》中已经简单介绍了nio模型,以多路复用器的概念,并了解nio是非的网络模型,以与bio的区别。本篇将继续深入解nio,以select
official 744 交换机),这个Exchange就是DLX死信交换机本质上也是一个普通交换机,和一般的Exchange没有区别,只不过它处消息的特殊性,所以称之为死信。它能在任何的上被指定,实际上就是设置某个
归档
2018-11  12 2018-12  33 2019-01  28 2019-02  28 2019-03  32 2019-04  27 2019-05  33 2019-06  6 2019-07  12 2019-08  12 2019-09  21 2019-10  8 2019-11  15 2019-12  25 2020-01  9 2020-02  5 2020-03  16 2020-04  4 2020-06  1 2020-07  7 2020-08  13 2020-09  9 2020-10  5 2020-12  3 2021-01  1 2021-02  5 2021-03  7 2021-04  4 2021-05  4 2021-06  1 2021-07  7 2021-08  2 2021-09  8 2021-10  9 2021-11  16 2021-12  14 2022-01  7 2022-05  1 2022-08  3 2022-09  2 2022-10  2 2022-12  5 2023-01  3 2023-02  1 2023-03  4 2023-04  2 2023-06  3 2023-07  4 2023-08  1 2023-10  1 2024-02  1 2024-03  1 2024-04  1
标签
算法基础 linux 前端 c++ 数据结构 框架 数据库 计算机基础 储备知识 java基础 ASM 其他 深入理解java虚拟机 nginx git 消息中间件 搜索 maven redis docker dubbo vue 导入导出 软件使用 idea插件 协议 无聊的知识 jenkins springboot mqtt协议 keepalived minio mysql ensp 网络基础 xxl-job rabbitmq haproxy srs 音视频 webrtc javascript
目录
没有一个冬天不可逾越,没有一个春天不会来临。最慢的步伐不是跬步,而是徘徊,最快的脚步不是冲刺,而是坚持。