ArrayBlockingQueue实现原理

ArrayBlockingQueue实现原理概述ArrayBlockQueue是常用的的FIFO阻塞队列,实现了BlockingQueue接口,是线程安全的。内部主要通过数组(Array)

欢迎大家来到IT世界,在知识的湖畔探索吧!

概述

ArrayBlockQueue是常用的的FIFO阻塞队列,实现了BlockingQueue接口,是线程安全的。内部主要通过数组(Array)、锁(ReentrantLock)实现。

注:此队列中不能存放null元素

常用方法

数据写入

  • add(E e)

add方法调用了父类AbstractQueue的add方法:

    public boolean add(E e) {
        return super.add(e);
    }

欢迎大家来到IT世界,在知识的湖畔探索吧!

AbstractQueue的add方法:

欢迎大家来到IT世界,在知识的湖畔探索吧!  public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

可见,add方法最终调用了本类的offer方法,当offer失败时,抛出”Queue full”异常,成功时返回true。

  • offer(E e)

offer方法不会阻塞写入,写入成功返回true,当队列已满时,写入数据失败返回false,在写入过程中加锁,在使用offer写入数据是线程安全的。

  public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //验证写入数量是否等于数组长度,如果相等,不进行写入
            if (count == items.length) 
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
  • offer(E e, long time, timeunit unit)

此方法加入了超时等待功能,在设定时间内会进行自旋等待,在设定时间内如果没有写入成功,返回false,写入成功返回true,此方法会响应中断。

欢迎大家来到IT世界,在知识的湖畔探索吧!    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        //非空验证,如果为空,会抛出异常  
        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
         //可中断锁,响应中断 
        lock.lockInterruptibly();
        try {
            //自旋等待
            while (count == items.length) {
                //超时,返回false写入失败
                if (nanos <= 0)
                    return false;
                //使用condition进行等待,此时cpu进行time-waiting状态
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }

此方法在写入过程中也进行了加锁,是线程安全的。

  • put(E e)

put方法在添加元素时,如果队列已满,会进入无限等待,此方法会响应中断。

    public void put(E e) throws InterruptedException {
        //非空验证,如果为空,会抛出异常
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        //可响应中断锁
        lock.lockInterruptibly();
        try {
            //自旋
            while (count == items.length)
                //无限等待,直到notFull.signal或signalAll方法唤醒
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    } 
  • equeue(E e)

equeue是写入底层方法,此方法直接对数组进行操作,将新元素写入到数组中:

    /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(E x) {
        //this.items即为存储元素的数组
        final Object[] items = this.items;
        //将元素按照顺序写入到数组的指定位置,并更新数组下标(加1)
        items[putIndex] = x;
        //++putIndex将putIndex加1并返回加1后的结果
        if (++putIndex == items.length)
            //当下标更新后达到数长度是,从数组下标为0开始写入,循环往复
            putIndex = 0;
        count++;
        //写入成功后,通知消费线程可继续进行消费
        notEmpty.signal();
    }

数据读取

  • take()

从阻塞队列中读取一个元素,如果队列中无元素,将进入阻塞状态,如果超时返回null,此方法可响应中断:

//此方法与put方法类似,dequeue方法是从数组中顺序取出一个元素
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            //此时线程状态是waiting
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}
  • E poll(long timeout, TimeUnit unit)

在设定时间内取出元素,如果超时,返回为空

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0) {
            //超时,则返回为空
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        return dequeue();
    } finally {
        lock.unlock();
    }
}
  • boolean remove(Object o)

删除一个元素,如果要删除的元素为null或者不存在,返回false,否则返回true。

public boolean remove(Object o) {
    if (o == null) return false;
    final Object[] items = this.items;
    //不可中断锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count > 0) {
            final int putIndex = this.putIndex;
            int i = takeIndex;
            //循环比较元素并删除
            do {
                if (o.equals(items[i])) {
                    //删除指定元素,并重新对数组元素位置进行调整保证FIFO
                    removeAt(i);
                    //删除成功返回true
                    return true;
                }
                if (++i == items.length)
                    i = 0;
            } while (i != putIndex);
        }
        return false;
    } finally {
        lock.unlock();
    }
}
  • E peek()

获取队列头部元素,此方法不会阻塞,如果队列中无元素,返回为空,此方法不会移除队列中的元素

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return itemAt(takeIndex); // null when queue is empty    } finally {
        lock.unlock();
    }
}
  • E element()

此方法是AbstractQueue的的方法,ArrayBlockingQueue没有对其重写,如果当前队列为空,则抛出NoSuchElementException异常:

public E element() {
    //调用子类的peek方法
    E x = peek();
    if (x != null)
        return x;
    else        
      throw new NoSuchElementException();
}

其它方法

  • boolean contains(Object o)

判断是否包含一个元素,是返回true,否返回false。

  • int drainTo(Collection<? super E> c)

一次性获取队列中所有元素,内部调用了int drainTo(Collection<? super E> c, int maxElements)方法,maxElements参数为Integer.MAX_VALUE,优点是一次获取队列中所有元素,此操作会移除队列中锁获取的元素,避免了多次加锁造成的性能开销。

/** 
* @throws UnsupportedOperationException {@inheritDoc} 
* @throws ClassCastException            {@inheritDoc} 
* @throws NullPointerException          {@inheritDoc} 
* @throws IllegalArgumentException      {@inheritDoc} */
public int drainTo(Collection<? super E> c) {
    return drainTo(c, Integer.MAX_VALUE);
}
  • int drainTo(Collection<? super E> c, int maxElements)

从头部获取指定数量的元素,获取的元素将会被从队列中移除

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://itzsg.com/33973.html

(0)

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

联系我们YX

mu99908888

在线咨询: 微信交谈

邮件:itzsgw@126.com

工作时间:时刻准备着!

关注微信