阻塞队列—PriorityBlockingQueue源码分析

阻塞队列—PriorityBlockingQueue源码分析作者公众号:一角钱技术(org_yijiaoqian)前言PriorityBlockingQueue 优先级队列,线程安全(添加、读取都进行了加

欢迎大家来到IT世界,在知识的湖畔探索吧!

阻塞队列—PriorityBlockingQueue源码分析

作者公众号:一角钱技术(org_yijiaoqian)

前言

阻塞队列—PriorityBlockingQueue源码分析

PriorityBlockingQueue 优先级队列,线程安全(添加、读取都进行了加锁)、无界、读阻塞的队列,底层采用的堆结构实现(二叉树),默认是小根堆,最小的或者最大的元素会一直置顶,每次获取都取最顶端的数据

队列创建

小根堆

PriorityBlockingQueue<Integer> concurrentLinkedQueue = new PriorityBlockingQueue<Integer>();

欢迎大家来到IT世界,在知识的湖畔探索吧!

大根堆

欢迎大家来到IT世界,在知识的湖畔探索吧!PriorityBlockingQueue<Integer> concurrentLinkedQueue = new PriorityBlockingQueue<Integer>(10, new Comparator<Integer>() {
 @Override
 public int compare(Integer o1, Integer o2) {
  return o2 - o1;
 }
});

应用场景

有任务要执行,可以对任务加一个优先级的权重,这样队列会识别出来,对该任务优先进行出队。

我们来看一个具体例子,例子中定义了一个将要放入“优先阻塞队列”的任务类,并且定义了一个任务工场类和一个任务执行类,在任务工场类中产生了各种不同优先级的任务,将其添加到队列中,在任务执行类中,任务被一个个取出并执行。

package com.niuh.queue.priority;

import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * <p>
 * PriorityBlockingQueue使用示例
 * </p>
 */
public class PriorityBlockingQueueDemo {

    public static void main(String[] args) throws Exception {
        Random random = new Random(47);
        ExecutorService exec = Executors.newCachedThreadPool();
        PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<>();
        exec.execute(new PrioritizedTaskProducer(queue, exec)); // 这里需要注意,往PriorityBlockingQueue中添加任务和取出任务的
        exec.execute(new PrioritizedTaskConsumer(queue)); // 步骤是同时进行的,因而输出结果并不一定是有序的
    }
}

class PrioritizedTask implements Runnable, Comparable<PrioritizedTask> {
    private Random random = new Random(47);
    private static int counter = 0;
    private final int id = counter++;
    private final int priority;

    protected static List<PrioritizedTask> sequence = new ArrayList<>();

    public PrioritizedTask(int priority) {
        this.priority = priority;
        sequence.add(this);
    }

    @Override
    public int compareTo(PrioritizedTask o) {
        return priority < o.priority ? 1 : (priority > o.priority ? -1 : 0);  // 定义优先级计算方式
    }

    @Override
    public void run() {
        try {
            TimeUnit.MILLISECONDS.sleep(random.nextInt(250));
        } catch (InterruptedException e) {
        }
        System.out.println(this);
    }

    @Override
    public String toString() {
        return String.format("[%1$-3d]", priority) + " Task " + id;
    }

    public String summary() {
        return "(" + id + ": " + priority + ")";
    }

    public static class EndSentinel extends PrioritizedTask {
        private ExecutorService exec;

        public EndSentinel(ExecutorService exec) {
            super(-1);
            this.exec = exec;
        }

        @Override
        public void run() {
            int count = 0;
            for (PrioritizedTask pt : sequence) {
                System.out.print(pt.summary());
                if (++count % 5 == 0) {
                    System.out.println();
                }
            }
            System.out.println();
            System.out.println(this + " Calling shutdownNow()");
            exec.shutdownNow();
        }
    }
}

class PrioritizedTaskProducer implements Runnable {
    private Random random = new Random(47);
    private Queue<Runnable> queue;
    private ExecutorService exec;

    public PrioritizedTaskProducer(Queue<Runnable> queue, ExecutorService exec) {
        this.queue = queue;
        this.exec = exec;
    }

    @Override
    public void run() {
        for (int i = 0; i < 20; i++) {
            queue.add(new PrioritizedTask(random.nextInt(10))); // 往PriorityBlockingQueue中添加随机优先级的任务
            Thread.yield();
        }
        try {
            for (int i = 0; i < 10; i++) {
                TimeUnit.MILLISECONDS.sleep(250);
                queue.add(new PrioritizedTask(10)); // 往PriorityBlockingQueue中添加优先级为10的任务
            }
            for (int i = 0; i < 10; i++) {
                queue.add(new PrioritizedTask(i));// 往PriorityBlockingQueue中添加优先级为1-10的任务
            }
            queue.add(new PrioritizedTask.EndSentinel(exec));
        } catch (InterruptedException e) {
        }
        System.out.println("Finished PrioritizedTaskProducer");
    }
}

class PrioritizedTaskConsumer implements Runnable {
    private PriorityBlockingQueue<Runnable> queue;

    public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                queue.take().run(); // 任务的消费者,从PriorityBlockingQueue中取出任务执行
            }
        } catch (InterruptedException e) {
        }
        System.out.println("Finished PrioritizedTaskConsumer");
    }
}

工作原理

PriorityBlockingQueue 是 JDK1.5 的时候出来的一个阻塞队列。但是该队列入队的时候是不会阻塞的,永远会加到队尾。下面我们介绍下它的几个特点:

  • PriorityBlockingQueue 和 ArrayBlockingQueue 一样是基于数组实现的,但后者在初始化时需要指定长度,前者默认长度是 11
  • 该队列可以说是真正的无界队列,它在队列满的时候会进行扩容,而前面说的无界阻塞队列其实都有有界,只是界限太大可以忽略(最大值是 2147483647)
  • 该队列属于权重队列,可以理解为它可以进行排序,但是排序不是从小到大排或从大到小排,是基于数组的堆结构(具体如何排下面会进行分析)
  • 出队方式和前面的也不同,是根据权重来进行出队,和前面所说队列中那种先进先出或者先进后出方式不同。
  • 其存入的元素必须实现Comparator,或者在创建队列的时候自定义Comparator。

注意:

  1. 堆结构实际上是一种完全二叉树。关于二叉树可以查看 《树、二叉树、二叉搜索树的实现和特性》
  2. 堆又分为大顶堆和小顶堆 。大顶堆中第一个元素肯定是所有元素中最大的,小顶堆中第一个元素是所有元素中最小的。关于二叉堆可以查看《堆和二叉堆的实现和特性》

源码分析

定义

PriorityBlockingQueue的类继承关系如下:

阻塞队列—PriorityBlockingQueue源码分析

其包含的方法定义如下:

阻塞队列—PriorityBlockingQueue源码分析

成员属性

从下面的字段我们可以知道,该队列可以排序,使用显示锁来保证操作的原子性,在空队列时,出队线程会堵塞等。

欢迎大家来到IT世界,在知识的湖畔探索吧!/**
* 默认数组长度
*/
private static final int DEFAULT_INITIAL_CAPACITY = 11;

/**
 * 最大达容量,分配时超出可能会出现 OutOfMemoryError 异常
 */
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

/**
 * 队列,存储我们的元素
 */
private transient Object[] queue;

/**
 * 队列长度
 */
private transient int size;

/**
 * 比较器,入队进行权重的比较
 */
private transient Comparator<? super E> comparator;

/**
 * 显示锁
 */
private final ReentrantLock lock;

/**
 * 空队列时进行线程阻塞的 Condition 对象
 */
private final Condition notEmpty;

构造函数

/**
* 默认构造,使用长度为 11 的数组,比较器为空
*/
public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}
/**
* 自定义数据长度构造,比较器为空
*/
public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}
/**
* 自定义数组长度,可以自定义比较器
*/
public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    this.queue = new Object[initialCapacity];
}
/**
* 构造函数,带有初始内容的队列
*/
public PriorityBlockingQueue(Collection<? extends E> c) {
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    boolean heapify = true; // true if not known to be in heap order
    boolean screen = true;  // true if must screen for nulls
    if (c instanceof SortedSet<?>) {
        SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
        this.comparator = (Comparator<? super E>) ss.comparator();
        heapify = false;
    }
    else if (c instanceof PriorityBlockingQueue<?>) {
        PriorityBlockingQueue<? extends E> pq =
            (PriorityBlockingQueue<? extends E>) c;
        this.comparator = (Comparator<? super E>) pq.comparator();
        screen = false;
        if (pq.getClass() == PriorityBlockingQueue.class) // exact match
            heapify = false;
    }
    Object[] a = c.toArray();
    int n = a.length;
    // If c.toArray incorrectly doesn't return Object[], copy it.
    if (a.getClass() != Object[].class)
        a = Arrays.copyOf(a, n, Object[].class);
    if (screen && (n == 1 || this.comparator != null)) {
        for (int i = 0; i < n; ++i)
            if (a[i] == null)
                throw new NullPointerException();
    }
    this.queue = a;
    this.size = n;
    if (heapify)
        heapify();
}

入队方法

入队方法,下面可以看到 put 方法最终会调用 offer 方法,所以我们只看 offer 方法即可。

offer(E e)

public void put(E e) {
    offer(e); // never need to block
}

public boolean offer(E e) {
    //判断是否为空
    if (e == null)
        throw new NullPointerException();
    //显示锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    //定义临时对象
    int n, cap;
    Object[] array;
    //判断数组是否满了
    while ((n = size) >= (cap = (array = queue).length))
        //数组扩容
        tryGrow(array, cap);
    try {
        //拿到比较器
        Comparator<? super E> cmp = comparator;
        //判断是否有自定义比较器
        if (cmp == null)
            //堆上浮
            siftUpComparable(n, e, array);
        else
            //使用自定义比较器进行堆上浮
            siftUpUsingComparator(n, e, array, cmp);
        //队列长度 +1
        size = n + 1;
        //唤醒休眠的出队线程
        notEmpty.signal();
    } finally {
        //释放锁
        lock.unlock();
    }
    return true;
}

siftUpComparable(int k, T x, Object[] array)

上浮调整比较器方法的实现

private static <T> void siftUpComparable(int k, T x, Object[] array) {
        Comparable<? super T> key = (Comparable<? super T>) x;
        while (k > 0) {
         //无符号向左移,目的是找到放入位置的父节点
            int parent = (k - 1) >>> 1;
            //拿到父节点的值
            Object e = array[parent];
            //比较是否大于该元素,不大于就没比较交换
            if (key.compareTo((T) e) >= 0)
                break;
            //以下都是元素位置交换
            array[k] = e;
            k = parent;
        }
        array[k] = key;
    }

根据上面的代码,可以看出这是完全二叉树在进行上浮调整。调整入队的元素,找出最小的,将元素排列有序化。简单理解就是:父节点元素值一定要比它的子节点得小,如果父节点大于子节点了,那就两者位置进行交换。

入队图解

例子:85 添加到二叉堆中(大顶堆)

package com.niuh.queue.priority;

import java.util.Comparator;
import java.util.concurrent.PriorityBlockingQueue;

/**
 * <p>
 * PriorityBlockingQueue 简单演示 demo
 * </p>
 */
public class TestPriorityBlockingQueue {

    public static void main(String[] args) throws InterruptedException {
        // 大顶堆
        PriorityBlockingQueue<Integer> concurrentLinkedQueue = new PriorityBlockingQueue<Integer>(10, new Comparator<Integer>() {
            @Override
            public int compare(Integer o1, Integer o2) {
                return o2 - o1;
            }
        });

        concurrentLinkedQueue.offer(90);
        concurrentLinkedQueue.offer(80);
        concurrentLinkedQueue.offer(70);
        concurrentLinkedQueue.offer(60);
        concurrentLinkedQueue.offer(40);
        concurrentLinkedQueue.offer(30);
        concurrentLinkedQueue.offer(20);
        concurrentLinkedQueue.offer(10);
        concurrentLinkedQueue.offer(50);
        concurrentLinkedQueue.offer(85);
        //输出元素排列
        concurrentLinkedQueue.stream().forEach(e-> System.out.print(e+"  "));
        //取出元素
        Integer take = concurrentLinkedQueue.take();
        System.out.println();
        concurrentLinkedQueue.stream().forEach(e-> System.out.print(e+"  "));
    }
}
阻塞队列—PriorityBlockingQueue源码分析

操作的细节分为两步:

  • 第一步:首先把新元素插入到堆的尾部再说;(新的元素可能是特别大或者特别小,那么要做的一件事情就是重新维护一下堆的所有元素,把新元素挪到这个堆的相应的位置)
  • 第二步:依次向上调整整个堆的结构,就叫 HeapifyUp
阻塞队列—PriorityBlockingQueue源码分析

85 按照上面讲的先插入到堆的尾部,也就是一维数组的尾部,一维数组的尾部的话就上图的位置,因为这是一个完全二叉树,所以它的尾部就是50后面这个结点。插进来之后这个时候就破坏了堆,它的每一个结点都要大于它的儿子的这种属性了,接下来要做的事情就是要把 85 依次地向上浮动,怎么浮动?就是 85 大于它的父亲结点,那么就和父亲结点进行交换,直到走到根如果大于根的话,就和根也进行交换。

阻塞队列—PriorityBlockingQueue源码分析

85 再继续往前走之后,它要和 80 再进行比较,同理可得:也就是说这个结点每次和它的父亲比,如果它大于它的父亲的话就交换,直到它不再大于它的父亲。

阻塞队列—PriorityBlockingQueue源码分析

出队方法

入队列的方法说完后,我们来说说出队列的方法。PriorityBlockingQueue提供了多种出队操作的实现来满足不同情况下的需求,如下:

  • E take();
  • E poll();
  • E poll(long timeout, TimeUnit unit);
  • E peek()

poll 和 peek 与上面类似,这里不做说明

take()

出队方法,该方法会阻塞

public E take() throws InterruptedException {
 //显示锁
    final ReentrantLock lock = this.lock;
    //可中断锁
    lock.lockInterruptibly();
    //结果接收对象
    E result;
    try {
     //判断队列是否为空
        while ( (result = dequeue()) == null)
         //线程阻塞
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}

dequeue()

我们再来看看具体出队方法的实现,dequeue方法

 private E dequeue() {
 //长度减少 1
    int n = size - 1;
    //判断队列中是否有元素
    if (n < 0)
        return null;
    else {
     //队列对象
        Object[] array = queue;
        //取出第一个元素
        E result = (E) array[0];
        //拿出最后一个元素
        E x = (E) array[n];
        //置空
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
         //下沉调整
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        //成功则减少队列中的元素数量
        size = n;
        return result;
    }
}

总体就是找到父节点与两个子节点中最小的一个节点,然后进行交换位置,不断重复,由上而下的交换。

siftDownComparable(int k, T x, Object[] array, int n)

再来看看下沉比较器方法的实现

private static <T> void siftDownComparable(int k, T x, Object[] array,
                                               int n) {
    //判断队列长度
    if (n > 0) {
        Comparable<? super T> key = (Comparable<? super T>)x;
        //找到队列最后一个元素的父节点的索引。
        int half = n >>> 1;           // loop while a non-leaf
        while (k < half) {
         //拿到 k 节点下的左子节点
            int child = (k << 1) + 1; // assume left child is least
            //取得子节点对应的值
            Object c = array[child];
            //取得 k 右子节点的索引
            int right = child + 1;
            //比较右节点的索引是否小于队列长度和左右子节点的值进行比较
            if (right < n &&
                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                c = array[child = right];
            //比较父节点值是否大于子节点
            if (key.compareTo((T) c) <= 0)
                break;
            //下面都是元素替换
            array[k] = c;
            k = child;
        }
        array[k] = key;
    }
}

出队图解

  1. 将堆尾元素替换到顶部(即堆顶被替代删除掉)
  2. 依次从根部向下调整整个堆的结构(一直到堆尾即可) HeapifyDown

例子:90 从二叉堆中删除(大顶堆)

阻塞队列—PriorityBlockingQueue源码分析

总结

PriorityBlockingQueue 真的是个神奇的队列,可以实现优先出队。最特别的是它只有一个锁,入队操作永远成功,而出队只有在空队列的时候才会进行线程阻塞。可以说有一定的应用场景吧,比如:有任务要执行,可以对任务加一个优先级的权重,这样队列会识别出来,对该任务优先进行出队。

PS:以上代码提交在 Github

https://github.com/Niuh-Study/niuh-juc-final.git

文章持续更新,可以公众号搜一搜「 一角钱技术 」第一时间阅读, 本文 GitHub org_hejianhui/JavaStudy 已经收录,欢迎 Star。

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://itzsg.com/32174.html

(0)

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

联系我们YX

mu99908888

在线咨询: 微信交谈

邮件:itzsgw@126.com

工作时间:时刻准备着!

关注微信