一、阻塞队列(BlockingQueue)概述
阻塞队列是Java并发编程中非常重要的数据结构,它提供了线程安全的队列操作,并在队列为空或满时能够自动阻塞线程,实现了生产者和消费者之间的高效协调。在多线程环境下,阻塞队列能够有效解决生产者和消费者处理速度不一致的问题,避免线程间的直接竞争,大大降低了并发编程的复杂度。
1.1 核心特性
阻塞队列具备以下核心特性:
- 线程安全:所有操作都是原子性的,多线程并发访问不会导致数据不一致
- 阻塞机制:当队列为空时,获取操作会阻塞直到有元素可用;当队列满时,插入操作会阻塞直到有空间可用
- FIFO原则:大多数阻塞队列遵循先进先出原则,但PriorityBlockingQueue支持优先级排序
1.2 核心方法
阻塞队列提供了四组操作方法,分别对应不同的处理策略:
1. 抛出异常组
boolean add(E e); // 添加元素,队列满时抛出IllegalStateException
E remove(); // 移除元素,队列空时抛出NoSuchElementException
E element(); // 查看队首元素,队列空时抛出异常
2. 返回布尔值组
boolean offer(E e); // 添加元素,成功返回true,失败返回false
E poll(); // 移除元素,队列空时返回null
E peek(); // 查看队首元素,队列空时返回null
3. 阻塞组
void put(E e) throws InterruptedException; // 添加元素,队列满时阻塞
E take() throws InterruptedException; // 移除元素,队列空时阻塞
4. 超时组
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
二、阻塞队列实现类详解
Java提供了7种阻塞队列实现,每种都有其特定的应用场景和性能特点。
2.1 ArrayBlockingQueue(数组结构有界阻塞队列)
核心特性:
- 基于数组实现,容量固定,创建时必须指定容量
- 使用单锁机制(ReentrantLock)保证线程安全
- 支持公平和非公平两种模式
- 默认情况下不保证访问者公平访问队列
使用示例:
// 创建容量为10的公平队列
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(10, true);
// 创建容量为10的非公平队列(默认)
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
适用场景:
- 需要固定大小队列的场景
- 线程池的任务队列
- 有限缓冲区的场景
2.2 LinkedBlockingQueue(链表结构阻塞队列)
核心特性:
- 基于链表实现,可以是有界或无界(默认Integer.MAX_VALUE)
- 采用双锁机制(putLock和takeLock),生产者和消费者可以并行操作
- 在高并发场景下吞吐量优于ArrayBlockingQueue
- 内存占用相对较大,因为需要为每个元素创建节点对象
使用示例:
// 创建无界队列(默认)
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
// 创建有界队列,容量为1000
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(1000);
适用场景:
- 需要动态调整队列大小的高并发场景
- 日志系统的消息队列
- 大型任务调度系统
2.3 PriorityBlockingQueue(优先级阻塞队列)
核心特性:
- 基于优先级堆实现的无界阻塞队列
- 元素根据自然顺序或自定义Comparator排序
- 支持优先级排序,队首总是优先级最高的元素
- 不会阻塞生产者,只会在队列空时阻塞消费者
使用示例:
// 创建默认自然排序的优先级队列
PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
// 创建自定义排序的优先级队列
PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<>(11,
(o1, o2) -> o1.getPriority() - o2.getPriority());
适用场景:
- 需要根据优先级执行任务的场景
- 任务调度系统
- 事件处理系统
2.4 DelayQueue(延迟队列)
核心特性:
- 基于PriorityQueue实现的无界阻塞队列
- 元素必须实现Delayed接口,指定延迟时间
- 只有在元素延迟时间到期后才能取出
- 内部使用Leader-Follower模式优化多线程性能
使用示例:
public class DelayedElement implements Delayed {
private final String data;
private final long expireTime;
public DelayedElement(String data, long delay, TimeUnit unit) {
this.data = data;
this.expireTime = System.nanoTime() + unit.toNanos(delay);
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(expireTime - System.nanoTime(), TimeUnit.NANOSECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(this.expireTime, ((DelayedElement)o).expireTime);
}
}
// 使用DelayQueue
DelayQueue<DelayedElement> queue = new DelayQueue<>();
queue.put(new DelayedElement("task1", 5, TimeUnit.SECONDS));
DelayedElement element = queue.take(); // 阻塞直到5秒后
适用场景:
- 定时任务调度
- 缓存过期清理
- 连接超时管理
2.5 SynchronousQueue(同步队列)
核心特性:
- 不存储元素的阻塞队列,每个插入操作必须等待一个删除操作
- 每个put操作必须等待一个take操作,反之亦然
- 支持公平和非公平两种模式
- 吞吐量高,适合直接传递数据的场景
使用示例:
SynchronousQueue<String> queue = new SynchronousQueue<>();
// 生产者线程
new Thread(() -> {
try {
queue.put("data");
System.out.println("数据已传递");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// 消费者线程
new Thread(() -> {
try {
String data = queue.take();
System.out.println("接收到数据: " + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
适用场景:
- 线程间直接传递数据
- 工作窃取(work-stealing)算法
- 高吞吐量的生产者-消费者场景
2.6 LinkedTransferQueue(传输队列)
核心特性:
- 基于链表实现的无界阻塞队列
- 实现了TransferQueue接口,支持直接匹配生产者和消费者
- 提供transfer()方法,生产者可以直接将元素传递给等待的消费者
- 使用CAS操作保证线程安全,性能优异
使用示例:
LinkedTransferQueue<String> queue = new LinkedTransferQueue<>();
// 生产者
new Thread(() -> {
try {
queue.transfer("data"); // 阻塞直到有消费者接收
System.out.println("数据已传输");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// 消费者
new Thread(() -> {
try {
String data = queue.take();
System.out.println("接收到: " + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
适用场景:
- 需要精确匹配生产者和消费者的场景
- 工作窃取算法
- 任务传递系统
2.7 LinkedBlockingDeque(双向阻塞队列)
核心特性:
- 基于链表实现的双向阻塞队列
- 支持从队列两端插入和移除元素
- 可以是有界或无界
- 提供addFirst、addLast、removeFirst、removeLast等方法
使用示例:
LinkedBlockingDeque<String> deque = new LinkedBlockingDeque<>(100);
// 从头部添加元素
deque.addFirst("first");
// 从尾部添加元素
deque.addLast("last");
// 从头部移除元素
String first = deque.removeFirst();
// 从尾部移除元素
String last = deque.removeLast();
适用场景:
- 需要从两端操作队列的场景
- 工作窃取算法的实现
- 双端队列的应用场景
三、阻塞队列对比与选型
3.1 ArrayBlockingQueue vs LinkedBlockingQueue
| 对比维度 | ArrayBlockingQueue | LinkedBlockingQueue |
|---|---|---|
| 底层结构 | 数组 | 链表 |
| 容量 | 固定大小 | 可指定或默认无界 |
| 锁机制 | 单锁 | 双锁(putLock和takeLock) |
| 内存占用 | 稳定,预分配 | 动态分配,每个元素需要节点对象 |
| 性能 | 小容量场景性能好 | 高并发大容量场景吞吐量高 |
| 公平性 | 支持公平和非公平 | 默认非公平 |
| 适用场景 | 固定大小队列、线程池任务队列 | 高并发、动态容量场景 |
3.2 其他队列对比
| 队列类型 | 有界/无界 | 排序方式 | 核心特点 |
|---|---|---|---|
| PriorityBlockingQueue | 无界 | 优先级排序 | 支持优先级,队首总是最高优先级 |
| DelayQueue | 无界 | 延迟时间 | 元素到期后才能取出 |
| SynchronousQueue | 无容量 | FIFO/LIFO | 不存储元素,直接传递 |
| LinkedTransferQueue | 无界 | FIFO | 支持直接传输给消费者 |
四、线程池拒绝策略详解
当线程池无法接受新任务时(核心线程满、队列满、最大线程满),拒绝策略会被触发。JDK提供了4种内置拒绝策略,开发者也可以自定义策略。
4.1 AbortPolicy(默认策略)
核心行为:
直接抛出RejectedExecutionException异常,阻止任务提交
源码实现:
public static class AbortPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " + e.toString());
}
}
适用场景:
- 金融交易、订单创建等核心业务,需要立即感知任务失败
- 开发测试阶段,便于快速定位线程池满的问题
优缺点:
- 优点:快速失败,避免任务静默丢失
- 缺点:需要显式捕获异常,否则可能中断主流程
4.2 CallerRunsPolicy(调用者执行策略)
核心行为:
由提交任务的线程(如主线程)直接执行被拒绝的任务
源码实现:
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
适用场景:
- 日志记录、异步通知等非实时任务
- 需要保证任务绝对不丢失的场景
- 系统需要自我调节能力的场景
优缺点:
- 优点:确保任务不丢失,通过降低提交速度缓解线程池压力
- 缺点:可能阻塞主线程,影响系统响应速度
4.3 DiscardPolicy(丢弃策略)
核心行为:
静默丢弃被拒绝的任务,不抛出任何异常,也不进行任何处理
源码实现:
public static class DiscardPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 什么都不做,静默丢弃
}
}
适用场景:
- 监控指标上报、数据采集等允许任务丢失的非核心业务
- 系统资源紧张时,优先保证系统稳定运行
优缺点:
- 优点:无额外开销,避免系统阻塞
- 缺点:任务丢失难追踪,不适合核心任务
4.4 DiscardOldestPolicy(丢弃最旧策略)
核心行为:
丢弃队列中等待时间最长的任务,然后尝试重新提交当前任务
源码实现:
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll(); // 丢弃队列头部的任务
e.execute(r); // 重新提交当前任务
}
}
}
适用场景:
- 实时消息推送、股票行情等时效性强的任务
- 新任务比旧任务更重要的场景
优缺点:
- 优点:优先处理新任务,适合实时系统
- 缺点:可能丢弃重要旧任务,导致状态不一致
五、拒绝策略触发机制
5.1 触发条件
拒绝策略在以下三个条件同时满足时触发:
- 核心线程数(corePoolSize)已达上限
- 任务队列(workQueue)已满
- 最大线程数(maximumPoolSize)已达上限
5.2 执行流程
public void execute(Runnable command) {
// 第一步:如果当前线程数小于核心线程数,创建新线程执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) return;
c = ctl.get();
}
// 第二步:如果线程池正在运行,将任务加入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 第三步:如果队列已满,尝试创建新线程
else if (!addWorker(command, false))
reject(command); // 创建失败,触发拒绝策略
}
final void reject(Runnable command) {
handler.rejectedExecution(command, this); // 调用拒绝策略
}
六、拒绝策略最佳实践
6.1 策略选型建议
| 策略类型 | 适用场景 | 风险提示 |
|---|---|---|
| AbortPolicy | 核心业务系统(交易、医疗) | 需处理异常,否则进程中断 |
| CallerRunsPolicy | 非实时任务(日志、异步通知) | 主线程阻塞影响响应速度 |
| DiscardPolicy | 可丢弃任务(监控采样、缓存更新) | 数据丢失难追踪 |
| DiscardOldestPolicy | 时效敏感任务(消息推送、实时竞价) | 旧任务丢失导致状态不一致 |
6.2 自定义拒绝策略
通过实现RejectedExecutionHandler接口,可以自定义拒绝策略:
public class CustomRejectionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 1. 记录任务信息到日志
log.warn("任务被拒绝: {}", r.toString());
// 2. 将任务持久化到数据库或消息队列
taskRepository.save(new TaskEntity(r));
// 3. 发送告警通知
alertService.sendAlert("线程池已满,任务被拒绝");
// 4. 尝试重试(需要控制重试次数)
if (retryCount < MAX_RETRY) {
try {
Thread.sleep(1000);
executor.execute(r);
} catch (Exception e) {
log.error("任务重试失败", e);
}
}
}
}
6.3 线程池配置建议
推荐配置:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
10, // corePoolSize:核心线程数
50, // maximumPoolSize:最大线程数
60L, // keepAliveTime:空闲线程存活时间
TimeUnit.SECONDS, // unit:时间单位
new LinkedBlockingQueue<>(1000), // workQueue:任务队列
Executors.defaultThreadFactory(), // threadFactory:线程工厂
new ThreadPoolExecutor.CallerRunsPolicy() // handler:拒绝策略
);
配置说明:
- 核心线程数:根据CPU核心数设置,通常为CPU核心数 * 2
- 最大线程数:根据业务负载和系统资源设置
- 队列容量:根据业务峰值和系统内存设置,避免OOM
- 拒绝策略:根据业务重要性选择,核心业务建议使用AbortPolicy或自定义策略
七、实战案例
7.1 电商订单处理系统
// 订单处理线程池配置
ThreadPoolExecutor orderExecutor = new ThreadPoolExecutor(
20,
100,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(5000),
new ThreadFactoryBuilder().setNameFormat("order-process-%d").build(),
new ThreadPoolExecutor.AbortPolicy() // 订单不能丢失,使用抛异常策略
);
// 日志记录线程池配置
ThreadPoolExecutor logExecutor = new ThreadPoolExecutor(
5,
10,
30L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10000),
new ThreadFactoryBuilder().setNameFormat("log-process-%d").build(),
new ThreadPoolExecutor.DiscardPolicy() // 日志可以丢弃,使用静默丢弃策略
);
// 实时消息推送线程池配置
ThreadPoolExecutor pushExecutor = new ThreadPoolExecutor(
10,
30,
10L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
new ThreadFactoryBuilder().setNameFormat("push-process-%d").build(),
new ThreadPoolExecutor.DiscardOldestPolicy() // 新消息比旧消息重要,使用丢弃最旧策略
);
7.2 定时任务调度系统
// 使用DelayQueue实现定时任务调度
public class TaskScheduler {
private final DelayQueue<DelayedTask> taskQueue = new DelayQueue<>();
private final ThreadPoolExecutor executor = new ThreadPoolExecutor(
10, 50, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy()
);
public void start() {
Thread schedulerThread = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
DelayedTask task = taskQueue.take();
executor.execute(task);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
schedulerThread.setDaemon(true);
schedulerThread.start();
}
public void scheduleTask(Runnable task, long delay, TimeUnit unit) {
taskQueue.put(new DelayedTask(task, delay, unit));
}
}
八、总结
阻塞队列和拒绝策略是Java并发编程的核心组件,正确使用它们能够显著提升系统的稳定性和性能。在选择阻塞队列时,需要根据业务场景、性能要求和资源限制综合考虑;在选择拒绝策略时,需要根据任务的重要性和系统容错能力做出合理选择。通过合理的配置和监控,可以构建出高可用、高性能的并发系统。
若内容若侵犯到您的权益,请发送邮件至:platform_service@jienda.com我们将第一时间处理!
所有资源仅限于参考和学习,版权归JienDa作者所有,更多请访问JienDa首页。





