uRick's PKM uRick's PKM
首页
导航
  • Java
  • 数据库
书单
  • 优质好文
  • 有趣的工具
  • 分类
  • 标签
  • 归档
关于
首页
导航
  • Java
  • 数据库
书单
  • 优质好文
  • 有趣的工具
  • 分类
  • 标签
  • 归档
关于
  • 一文搞懂JVM知识
  • 多线程基础
  • JUC☞Thread Pool
  • JUC☞Locker
  • JUC☞Queue
    • 1. 队列
      • 1.1. 队列基础
      • 1.1.1. Queue
      • 1.1.2. BlockingQueue
      • 1.1.3. BlockingDeque
      • 1.2. 常用队列
      • 1.2.1. ConcurrentLinkedQueue
      • 1.2.2. ArrayBlockingQueue
      • 1.2.3. LinkedBlockingQueue
      • 1.2.4. PriorityBlockingQueue
      • 1.2.5. SynchronousQueue
      • 1.2.6. DelayQueue
      • 1.2.7. LinkedTransferQueue
  • NIO浅谈
  • 有趣的二进制
  • 深入理解Lambda
  • Java8新特性
  • 单实例多种实现与解析
  • java
uRick
2020-02-20
目录

JUC☞Queue

# 1. 队列

# 1.1. 队列基础

队列其实就是一种特殊的链表结构,在JUC下,Doug Lea大师为我们提供了很多线程安全的队列,运用了很多数据结构相关算法知识(可参考:https://cloud.tencent.com/developer/article/1173781 (opens new window)),其中抽象层UML结构如下:

队列顶层接口UML

# 1.1.1. Queue

Queue 是所有队列的一个抽象接口,它实现Collection接口,它提供了最顶层的几个队列操作方法,如下:

public interface Queue<E> extends Collection<E> {
    // 在队列允许的情况下,向队列中添加数据,若元素无法插入到队列,
    // 也就是达到队列容量时会抛出IllegalStateException
    boolean add(E e);
    // 在队列允许的情况下,向队列中添加数据,若元素无法插入到队列,则返回false,
    boolean offer(E e);
    // 移除队列头元素,当队列为空时抛出异常NoSuchElementException
    E remove();
    // 移除队列头元素,区别于remove(),当队列头没有元素时不会抛出异常
    E poll();
    // 检索队列头部元素,但不删除,区别于peek(),它在队列为空时抛出异常NoSuchElementException。
    E element();
    // 检索队列头部元素,但不删除,当队列头没有元素时不会抛出异常。
    E peek();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 1.1.2. BlockingQueue

BlockingQueue 阻塞队列抽象接口,定义了阻塞队列中常用的API

public interface BlockingQueue<E> extends Queue<E> {
    boolean add(E e);
    boolean offer(E e);
    // 提供阻塞的方式向队列插入元素
    void put(E e) throws InterruptedException;
    // 指定等待时间想队列插入元素 
    boolean offer(E e, long timeout, TimeUnit unit)
    // 提供阻塞的方式获取元素
    E take() throws InterruptedException;
    // 指定等待时间,从队列中获取元素
    E poll(long timeout, TimeUnit unit)throws InterruptedException;
    // 获取队列剩余容量,官方建议:不能用它检测元素是否插入成功操作。
    int remainingCapacity();
    boolean remove(Object o);
    // 检测队列是否包括某个元素
    public boolean contains(Object o);
    // 将队列中的数据移动到另一个集合中,并清空当前队列
    int drainTo(Collection<? super E> c);
    // 队列中的数据移动到另一个集合中,maxElements是指订的最大传输元素个数,同时队列会移除指定maxElements的元素
    int drainTo(Collection<? super E> c, int maxElements);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

阻塞队列中常用方法归纳概括:

方法/处理方式 抛异常 返回特殊值 阻塞 超时退出
插入方法 add(E e) offer(E e) put(E e) offer(E e, long timeout, TimeUnit unit)
移除方法 remove(Object o) poll() take() poll(long timeout, TimeUnit unit)
检查方法 element() peek() - -

# 1.1.3. BlockingDeque

BlockingDeque 阻塞双端队列抽象接口,所谓双端队列就是一个支持队列两端操作元素出队和入队,定义了阻塞队列中常用的API。 BlockingDeque 继承BlockingQueue接口,BlockingDeque在BlockingQueue的基础上新增双端阻塞队列的操作方法;主要新增了以first结尾和last结尾的方法,用于操作队列两端的元素。

public interface BlockingDeque<E> extends BlockingQueue<E>, Deque<E> {
    void addFirst(E e);
    void addLast(E e);
    boolean offerFirst(E e);
    boolean offerLast(E e);
    void putFirst(E e) throws InterruptedException;
    void putLast(E e) throws InterruptedException;
    boolean offerFirst(E e, long timeout, TimeUnit unit)throws InterruptedException;
    boolean offerLast(E e, long timeout, TimeUnit unit)throws InterruptedException;
    E takeLast() throws InterruptedException;
    E pollFirst(long timeout, TimeUnit unit)throws InterruptedException;
    E pollLast(long timeout, TimeUnit unit)throws InterruptedException;
    boolean removeFirstOccurrence(Object o);
    boolean removeLastOccurrence(Object o);
    boolean add(E e);
    boolean offer(E e);
    void put(E e) throws InterruptedException;
    boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException;
    E remove();
    E poll();
    E take() throws InterruptedException;
    E poll(long timeout, TimeUnit unit)throws InterruptedException;
    E element();
    E peek();
    boolean remove(Object o);
    public boolean contains(Object o);
    public int size();
    Iterator<E> iterator();
    void push(E e);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

双端队列中常用方法归纳概括:

  • 添加元素队头相关方法
方法/处理方式 抛异常 返回特殊值 阻塞 超时退出
插入方法 addFirst(e) offerFirst(e) putFirst(e) offerFirst(e, time, unit)
移除方法 removeFirst() pollFirst() takeFirst() pollFirst(time, unit)
检测方法 getFirst() peekFirst() - -
  • 添加元素到队尾相关方法
方法/处理方式 抛异常 返回特殊值 阻塞 超时退出
插入方法 addLast(e) offerLast(e) putLast(e) offerLast(e, time, unit)
移除方法 removeLast() pollLast() takeLast() pollLast(time, unit)
检测方法 getLast() peekLast() - -

# 1.2. 常用队列

队列 说明 版本
ConcurrentLinkedQueue 基于链接节点的无界线程安全非阻塞队列,它采用先进先出的规则对节点进行排序,使用CAS算法实现元素操作 JDK1.5
ConcurrentLinkedDeque 基于链接节点的无界线程安全双端队列,它采用先进先出的规则对节点进行排序,使用CAS算法实现元素操作 JDK1.7
ArrayBlockingQueue 基于数组实现的有界阻塞队列,按照FIFO的原则对元素进行排序 JDK1.5
LinkedBlockingQueue 基于链表实现的有界阻塞队列,默认容量和最大长度为Integer.MAX_VALUE,按照FIFO的原则对元素进行排序 JDK1.5
SynchronousQueue 不存储元素的阻塞队列,每一个put操作必须等待一个take操作,否则不能继续添加元素 JDK1.5
LinkedTransferQueue 由链表结构组成的无界阻塞TransferQueue队列,与其他队列的主要区别在于多了tryTransfer和transfer方法 JDK1.7
PriorityBlockingQueue 基于数组实现的有界阻塞优先级队列,默认按照自然顺序排序,也通过比较器进行排序 JDK1.5
PriorityQueue 基于数组实现的有界非阻塞优先级队列,默认按照自然顺序排序,也通过比较器进行排序 JDK1.5
DelayQueue 使用优先级队列PriorityQueue来实现,是一个支持延时获取元素的无界阻塞队列 JDK1.5
LinkedBlockingDeque 由链表结构组成的双向阻塞队列 JDK1.6

# 1.2.1. ConcurrentLinkedQueue

一个基于链接节点的无界(可以一直向队列插入数据)程安全的非阻塞的队列,基于CAS自旋方式实现。采用FIFO的规则对节点进行排序,元素入队时添加到队列尾部,出队时从队列头部取出元素。

ConcurrentLinkedQueue是由一个节点头head和一个尾节点tail组成,缺省情况下,head等于tail,队列存储一个空节点。而节点是由节点项item和节点关系next组成的,如下源码片段:

// 队列组成
private transient volatile Node<E> head;
private transient volatile Node<E> tail;

// 节点
private static class Node<E> {
        volatile E item;
        volatile Node<E> next;
}
1
2
3
4
5
6
7
8
9

JUC中所有队列都实现了接口QueueAPI,通用API如下:

  • public E peek() 获取头部元素,但是不会出队(删除)
  • public E poll() 元素出队,删除头元素,让下一个元素指向head头
  • boolean add(E e) 元素入队,调用offer方法入队
  • boolean offer(E e); 元素入队
  • boolean isEmpty(E e); 检测队列元素是否为空

# 1.2.2. ArrayBlockingQueue

一个基于数组实现的有界阻塞队列,默认情况下多线程访问队列是非公平的(多线程竞争访问队列资源);如下源码片段中构造方法表明ArrayBlockingQueue对象创建需要设置一个容量,默认ReentrantLock重入锁来实现队列访问控制的,使用一个数组items维护队列元素,定义两个监控等待队列notEmpty和notFull成员变量。

// 指定固定容量创建队列
public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

// 指定容量创建队列,同时定义线程访问队列的方式:公平或非公平
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

// 指定的集合初始化队列,容量需定义指定集合大小,否则抛出异常IllegalArgumentException
 public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
        this(capacity, fair);
        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                for (E e : c) {//遍历集合c,存入队列中
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

# 1.2.3. LinkedBlockingQueue

LinkedBlockingQueues是基于链表Node节点来实现的一个有界阻塞队列,队列默认容量和最大容量都为 Integer.MAX_VALUE ,也是通过重入锁实现多线程访问控制的,它通过两个锁来维护元素的出队和入队,一个putLock锁控制元素入队操作,一个takeLock锁控制出队操作。

# 1.2.4. PriorityBlockingQueue

它是一个基于数组实现自定义排序的无界阻塞的优先级队列,队列初始容量为11,当元素存储超过指定容量或者初始容量时,扩增为原容量的50%;
队列中的元素时按照自然顺序排序的,可以根据需求自定排序方式,同时注意同一级别的元素时不能保证优先级的。它提供了四种构造方式:

// 默认创建队列,容量为11
public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}

// 指定容量创建队列,比较器使用默认的排序规则(自然排序),可查看排序算法siftUpComparable
public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}

// 指定容量,和比较器创建队列
public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();// 获取重入锁
    this.notEmpty = lock.newCondition();// 构建监控队列
    this.comparator = comparator;// 指定比较器
    this.queue = new Object[initialCapacity];// 初始化数组
}

// 通过给定集合创建队列
public PriorityBlockingQueue(Collection<? extends E> c)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

# 1.2.5. SynchronousQueue

SynchronousQueue 是一个同步阻塞队列,也可以说是数据传输队列,它是不存储任何元素;每个put入队操作都必须等待一个take出队操作,可以将SynchronousQueue理解最简易的中间件,消息生产者负责将消息存储队列中,消费者需要从队列中取出消息,当消费者没有消费消息,则生产者会一直等待消费者消费,当消费者没有可消费的消息,会一直等待生产者生产消息; SynchronousQueue 内部分别定义了TransferQueue类和TransferStack类来实现公平访问和非公平访问策略,可以通过以下两种方式创建队列。

// 默认创建非公平策略的队列
public SynchronousQueue() {
        this(false);
    }

// 自定义同步策略
public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
1
2
3
4
5
6
7
8
9

# 1.2.6. DelayQueue

基于PriorityQueued队列实现的无界阻塞延时队列,队列中的元素都必须要实现Delayed接口,创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素,可用于做缓存或定时任务。

# 1.2.7. LinkedTransferQueue

LinkedTransferQueue是一个基于链表结构的即时传输无界阻塞队列,实现了TransferQueue接口,相对于以上介绍的队列,它新增了两种方法transfer和trytransfer
对于transfer()类方法,如果当前有消费者正在等待接收元素,transfer()可以把生产者传入的元素立刻transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。 trytransfer()与transfer()相反,它会试探性的传输数据给消费者,若没有消费者消费信息,则直接返回false,若存在直接传输,返回true。

#多线程#队列
上次更新: 2024/03/02, 14:21:03
JUC☞Locker
NIO浅谈

← JUC☞Locker NIO浅谈→

最近更新
01
从0到1:开启商业与未来的秘密
11-26
02
如何阅读一本书: 读懂一本书,精于一件事
10-25
03
深入理解Lambda
06-27
更多文章>
Theme by Vdoing | Copyright © 2019-2024 uRick | CC BY 4.0
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式