多线程(二十、阻塞队列-PriorityBlockingQueue)

PriorityBlockingQueue简介

1、一种优先级队列,元素并不是以FIFO的方式出/入队,而是以按照权重大小的顺序出队;
2、PriorityBlockingQueue是真正的×××队列(仅受内存大小限制),它不像ArrayBlockingQueue那样构造时必须指定最大容量,也不像LinkedBlockingQueue默认最大容量为Integer.MAX_VALUE;
3、PriorityBlockingQueue是按照元素的权重进入排序,所以队列中的元素必须是可以比较的,也就是说元素必须实现Comparable接口;
4、PriorityBlockingQueue×××队列,所以插入元素永远不会阻塞线程;
5、PriorityBlockingQueue底层是一种基于数组实现的堆结构。

让客户满意是我们工作的目标,不断超越客户的期望值来自于我们对这个行业的热爱。我们立志把好的技术通过有效、简单的方式提供给客户,将通过不懈努力成为客户在信息化领域值得信任、有价值的长期合作伙伴,公司提供的服务项目有:域名与空间、虚拟空间、营销软件、网站建设、松岭网站维护、网站推广。

PriorityBlockingQueue构造

构造方法

1、PriorityBlockingQueue一共有4个构造方法,我们重点看看第三个

/**
 * 指定初始容量和比较器的构造器.
 */
public PriorityBlockingQueue(int initialCapacity,
                             Comparator comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    this.queue = new Object[initialCapacity];
}

1、PriorityBlockingQueue内部利用了ReentrantLock来保证并发访问时的线程安全。
2、PriorityBlockingQueue如果不指定容量,默认容量为11。
3、PriorityBlockingQueue只有一个条件等待队列——notEmpty,因为会自动扩容,所以插入元素并不会阻塞,仅当队列为空时,才可能阻塞“出队”线程。

2、成员变量

public class PriorityBlockingQueue extends AbstractQueue
        implements BlockingQueue, java.io.Serializable {

    /**
     * 默认容量,大小11
     */
    private static final int DEFAULT_INITIAL_CAPACITY = 11;

    /**
     * 最大容量,并不是真正的×××
     */
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

    /**
     * 内部堆数组, 保存实际数据, 可以看成一颗二叉树:
     * 对于顶点queue[n], queue[2*n+1]表示左子结点, queue[2*(n+1)]表示右子结点.
     */
    private transient Object[] queue;

    /**
     * 队列中的元素个数.
     */
    private transient int size;

    /**
     * 比较器, 如果为null, 表示以元素自身的自然顺序进行比较(元素必须实现Comparable接口).
     */
    private transient Comparator comparator;

    /**
     * 全局锁.
     */
    private final ReentrantLock lock;

    /**
     * 当队列为空时,出队线程在该条件队列上等待.
     */
    private final Condition notEmpty;

    // ...
}

3、插入元素

1、put方法调用offer方法
2、offer方法

public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();

    final ReentrantLock lock = this.lock;   // 加锁
    lock.lock();

    int n, cap;
    Object[] array;
    while ((n = size) >= (cap = (array = queue).length))    // 队列已满, 则进行扩容
        tryGrow(array, cap);//扩容方法

    try {
        Comparator cmp = comparator;
        if (cmp == null)    // 比较器为空, 则按照元素的自然顺序进行堆调整
            siftUpComparable(n, e, array);//堆的上浮
        else                // 比较器非空, 则按照比较器进行堆调整
            siftUpUsingComparator(n, e, array, cmp);//堆的上浮
        size = n + 1;       // 队列元素总数+1
        notEmpty.signal();  // 唤醒一个可能正在等待的"出队线程"
    } finally {
        lock.unlock();
    }
    return true;
}

4、堆上浮

小顶堆,“上浮调整”,可以把堆可以想象成一棵完全二叉树,每次插入元素都链接到二叉树的最右下方,然后将插入的元素与其父结点比较,如果父结点大,则交换元素,直到没有父结点比插入的结点大为止,大顶堆反之。

/**
 * 将元素x插入到array[k]的位置.
 * 然后按照元素的自然顺序进行堆调整——"上浮",以维持"堆"有序.
 * 最终的结果是一个"小顶堆".
 */
private static  void siftUpComparable(int k, T x, Object[] array) {
    Comparable key = (Comparable) x;
    while (k > 0) {
        int parent = (k - 1) >>> 1;     // 相当于(k-1)除2, 就是求k结点的父结点所在数组的索引位置
        Object e = array[parent];
        if (key.compareTo((T) e) >= 0)  // 如果插入的结点值大于父结点, 则退出
            break;

        // 否则,交换父结点和当前结点的值
        array[k] = e;
        k = parent;
    }
    array[k] = key;
}

5、删除元素

/**
 * 出队一个元素.
 * 如果队列为空, 则阻塞线程.
 */
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();   // 获取全局锁
    E result;
    try {
        while ((result = dequeue()) == null)    // 队列为空
            notEmpty.await();                   // 线程在noEmpty条件队列等待
    } finally {
        lock.unlock();
    }
    return result;
}
//堆出队,每次都是出队对顶元素
//对于“小顶堆”就是队列中的最小值,对于“大顶堆”就是队列中的最大值
private E dequeue() {
    int n = size - 1;   // n表示出队后的剩余元素个数
    if (n < 0)          // 队列为空, 则返回null
        return null;
    else {
        Object[] array = queue;
        E result = (E) array[0];    // array[0]是堆顶结点, 每次出队都删除堆顶结点
        E x = (E) array[n];         // array[n]是堆的最后一个结点, 也就是二叉树的最右下结点
        array[n] = null;
        Comparator cmp = comparator;
        if (cmp == null)
            siftDownComparable(0, x, array, n);//堆下沉
        else
            siftDownUsingComparator(0, x, array, n, cmp);//堆下沉
        size = n;
        return result;
    }
}

6、堆下沉

/**
 * 堆的"下沉"调整.
 * 删除array[k]对应的结点,并重新调整堆使其有序.
 *
 * @param k     待删除的位置
 * @param x     待比较的健
 * @param array 堆数组
 * @param n     堆的大小
 */
private static  void siftDownComparable(int k, T x, Object[] array, int n) {
    if (n > 0) {
        Comparable key = (Comparable) x;
        int half = n >>> 1;           // 相当于n除2, 即找到索引n对应结点的父结点
        while (k < half) {
            /**
             * 下述代码中:
             * c保存k的左右子结点中的较小结点值 
             * child保存较小结点对应的索引
             */
            int child = (k << 1) + 1; // k的左子结点
            Object c = array[child];

            int right = child + 1;    // k的右子结点
            if (right < n && ((Comparable) c).compareTo((T) array[right]) > 0)
                c = array[child = right];

            if (key.compareTo((T) c) <= 0)
                break;
            array[k] = c;
            k = child;
        }
        array[k] = key;
    }
}

下沉步骤:
1、顶点与最后一个节点交换,删除最后一个节点即删除顶点
2、最后一个节点位于顶点,开始下沉,找到左右子结点中较小的那个;
3、结点交换;
4、重复2-3步直到当前结点没有左右子结点或比左右子结点都小。

总结

PriorityBlockingQueue属于比较特殊的阻塞队列,适用于有元素优先级要求的场景。它的内部和ArrayBlockingQueue一样,使用一个了全局独占锁来控制同时只有一个线程可以进行入队和出队,近似×××队列,入队线程并不会阻塞。

PriorityBlockingQueue始终保证出队的元素是优先级最高的元素,并且可以定制优先级的规则,内部通过使用堆(基于数组形式)来维护元素顺序,它的内部数组是可扩容的,扩容和出/入队可以并发进行。


分享标题:多线程(二十、阻塞队列-PriorityBlockingQueue)
分享地址:http://ybzwz.com/article/joisci.html