当前位置: 首页 > news >正文

[数据结构/Java] 数据结构之循环队列

1 概述:循环队列

循环队列

  • 循环队列: 一种先进先出FIFO)的数据结构——它通过将【顺序队列】的末尾连接到开头,形成一个【环状结构】,从而解决了【顺序队列】的【虚假满状态问题】。

image

循环队列

  • 【队列】:一种先进先出(First In First Out)的线性表,简称FIFO。允许插入的一端称为【队尾(head)】,允许删除的一端称为【队头(tail)】。
  • 循环队列在大数据领域应用场景较为丰富。
  • Hadoop MapReduce 在 Shuffle 过程中的环形缓冲区

别名:循环缓冲队列,Circular Buffer Queue
Shuffle 过程中,【环形缓冲区】主要用于存储来自 Mapper 节点传输的数据。

  • 实时计算中,缓存单个设备的最近N秒的状态数据
  • ...
  • 环形缓冲区Ring Buffer,也称为循环缓冲区Circular Buffer Queue)是【无锁队列】实现中一种【高效的数据结构】,特别适合【高性能并发场景】,如线程池任务调度、实时系统、服务器请求处理等。
  • 环形缓冲区是一个逻辑上通过【头指针】(head)和【尾指针】(tail)形成循环队列的数据结构。

换言之,它由一个【固定大小】的【列表】和两个【指针】组成。一个指针 head 用于指向队列的头部,另一个指针 tail 则用于指向队列的尾部。
当指针到达数组末尾时,自动“绕回”到开头(通过模运算)。它常用于【无锁队列】,因为:

  • 固定内存:避免动态分配,减少内存碎片。
  • 缓存友好:连续内存布局提高缓存命中率。
  • 高效操作:头尾指针通过原子操作更新,支持并发访问。

核心组件

  • 缓冲区:固定大小的数组或链表。
  • 头指针(head):指向下一个读取位置(消费者使用)。
  • 尾指针(tail):指向下一个写入位置(生产者使用)。
  • 大小计数器(size):跟踪队列中元素数量(可选,视场景)。
  • 原子操作:使用 std::atomic(C/C++) / AtomicXxx(Java) 等锁机制,确保 head 和 tail 的线程安全更新。

不同策略下的循环队列

  • 有锁的循环队列
  • 覆盖策略(满时入队覆盖队首)
  • 阻塞策略(满/空时阻塞等待)
  • 非阻塞策略(满/空时返回 null / 抛异常)
  • 无锁的循环队列

基于 ConcurrentLinkedDeque 实现线程安全的循环队列的思路

  • ConcurrentLinkedDeque 是 Java 并发包中提供的非阻塞线程安全的【双端队列】,基于【无锁】(CAS)机制实现。

java.util.concurrent.ConcurrentLinkedDeque

  • 要基于 ConcurrentLinkedDeque 实现线程安全的循环队列。

核心思路是:利用 ConcurrentLinkedDeque 的并发安全特性封装队列操作,通过固定容量限制实现“【循环】”(队列满时入队会覆盖/阻塞/抛异常,队空时出队会阻塞/抛异常)。

  • 核心设计要点
  • 固定容量:循环队列的核心是容量固定,满后入队需遵循循环规则(覆盖旧元素/阻塞/拒绝)。
  • 线程安全:复用 ConcurrentLinkedDeque 的并发安全特性,避免手动加锁。
  • 循环逻辑:入队时若队列满,根据策略处理(如覆盖队首、阻塞等待、抛异常);出队时若空,同理。

循环队列的特点

  • 优势:
  • 无动态分配,性能高。
  • 内存布局连续,缓存命中率高。
  • 适合固定容量的高性能场景。
  • 挑战:
  • 固定容量,可能溢出或不足。
  • MPMC 场景下【头尾指针竞争】,可能导致 CAS 重试。
  • 需要仔细处理【内存序】和【数据一致性】。

2 覆盖式循环队列

此版本,项目亲测。

2.1 实现思路

  • 基于 Java JDK 的 并发双端队列(java.util.concurrent.ConcurrentLinkedDeque)实现覆盖式循环队列。

容量固定,满时入队覆盖队首元素 (即: 新元素从队尾进入,满时覆盖队首元素(第n个元素向前覆盖第n-1个元素,n>=2))

2.2 源码实现(Java)

CoverStrategyCircularQueue

import com.alibaba.fastjson.JSON;import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;/*** 基于 ConcurrentLinkedDeque 的循环队列(覆盖策略)* 容量固定,满时入队覆盖队首元素 (即: 新元素从队尾进入,满时覆盖队首元素(第n个元素向前覆盖第n-1个元素,n>=2))* @param <E> 队列元素类型*/
public class CoverStrategyCircularQueue<E> implements Serializable {// 并发安全的双端队列(底层存储)private final ConcurrentLinkedDeque<E> deque;// 队列最大容量(原子类保证并发下计数准确)private final int capacity;// 当前元素数量(原子操作,避免并发计数错误)private final AtomicInteger size = new AtomicInteger(0);/*** 构造循环队列* @param capacity 最大容量(必须>0)*/public CoverStrategyCircularQueue(int capacity) {if (capacity <= 0) {throw new IllegalArgumentException("The capacity must be greater than 0");//容量必须大于0}this.capacity = capacity;this.deque = new ConcurrentLinkedDeque<>();}/*** 基于 list , 构造循环队列* @param capacity* @param list 允许为 null*/public CoverStrategyCircularQueue(int capacity, @Nullable List<E> list) {this(capacity);if(list != null && list.size() != 0){for (E element : list) {offer(element);}}}/*** 入队:新元素从队尾进入,满时覆盖队首元素(第n个元素向前覆盖第n-1个元素,n>=2)* @param element 待入队元素* @return 成功入队返回true(覆盖时也返回true)*/public boolean offer(E element) {if (element == null) {throw new NullPointerException("The element not be null");//元素不能为null}// 循环核心:满了先移除队首,再入队while (size.get() >= capacity) {// 移除队首(CAS保证原子性,避免并发下重复移除)if (deque.pollFirst() != null) {size.decrementAndGet();}}// 入队尾deque.offerLast(element);size.incrementAndGet();return true;}/*** 出队:空时返回null* @return 队首元素,空则返回null*/public E poll() {E element = deque.pollFirst();if (element != null) {size.decrementAndGet();}return element;}/*** 查看队首元素(不移除)* @return 队首元素,空则返回null*/public E peekFirst() {return deque.peekFirst();}/*** 查看队尾元素(不移除)* @return 队尾元素,空则返回 null*/public E peekLast() {return deque.peekLast();}/*** 获取倒数第 index 个元素(不删除元素)* @note 从队尾向队首遍历* @param index 倒数第 index 个元素 , index ∈ [0, size - 1]* @return*/public E getLast(Integer index) {Iterator<E> iterator = deque.descendingIterator();E result = null;int cursor = 0;while( iterator.hasNext() && cursor < index) {result = iterator.next();}return result;}/*** 获取正数第 index 个元素(不删除元素)* @note 从队首向队尾遍历* @param index 正数第 index 个元素 , index ∈ [0, size - 1]* @return*/public E get(Integer index) {Iterator<E> iterator = deque.iterator();E result = null;int cursor = 0;while( iterator.hasNext() && cursor < index) {result = iterator.next();}return result;}/*** 获取当前元素数量* @return 元素个数*/public int size() {return size.get();}/*** 判断队列是否已满* @return 满则true*/public boolean isFull() {return size.get() >= capacity;}/*** 判断队列是否为空* @return 空则true*/public boolean isEmpty() {return size.get() == 0;}/*** 清空队列*/public void clear() {deque.clear();size.set(0);}/*** 获取底层的队列* @note 原则上,不允许获取底层队列,但为了方便特殊场景下的数据操纵、及验证测试,故此处提供该方法* @return*/public ConcurrentLinkedDeque getDeque() {return deque;}/*** 转 List* @return*/public List<E> toList(){List<E> list = new ArrayList<>();if( this.deque == null || this.deque.isEmpty() ){return list;}for (E element : this.deque) {list.add(element);}return list;}@Overridepublic String toString() {return "CoverStrategyCircularQueue{" +"deque=" + JSON.toJSONString(deque) +", capacity=" + capacity +", size=" + size +'}';}
}

CircularQueueTest

public class CircularQueueTest {private final static Logger log = LoggerFactory.getLogger(CircularQueueTest.class);@Testpublic void CoverStrategyCircularQueueTest(){int length = 5;CoverStrategyCircularQueue<String> queue = new CoverStrategyCircularQueue<>(length);//入队for (int i = 0; i < length + 3; i++) {queue.offer( (new Integer(i+1)).toString() );}log.info("queue:{}", queue);//[4, 5, 6, 7, 8]//出队queue.poll();queue.poll();queue.poll();log.info("queue:{}", queue);//[7, 8]}
}

3 阻塞式循环队列(等待策略)

  • 满时入队阻塞,空时出队阻塞,适合生产-消费模型,需结合 Lock 和 Condition 实现。
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;/*** 基于 ConcurrentLinkedDeque 的阻塞循环队列* 满时入队阻塞,空时出队阻塞* @param <E> 元素类型*/
public class BlockingCircularQueue<E> {private final ConcurrentLinkedDeque<E> deque;private final int capacity;// 锁 + 条件变量(空/满)private final ReentrantLock lock = new ReentrantLock();private final Condition notEmpty = lock.newCondition();private final Condition notFull = lock.newCondition();public BlockingCircularQueue(int capacity) {if (capacity <= 0) {throw new IllegalArgumentException("容量必须大于0");}this.capacity = capacity;this.deque = new ConcurrentLinkedDeque<>();}/*** 阻塞入队:满则等待* @param element 元素* @throws InterruptedException 中断异常*/public void put(E element) throws InterruptedException {if (element == null) {throw new NullPointerException("元素不能为null");}lock.lockInterruptibly();try {// 满则等待while (deque.size() >= capacity) {notFull.await();}deque.offerLast(element);// 唤醒出队阻塞的线程notEmpty.signal();} finally {lock.unlock();}}/*** 阻塞出队:空则等待* @return 队首元素* @throws InterruptedException 中断异常*/public E take() throws InterruptedException {lock.lockInterruptibly();try {// 空则等待while (deque.isEmpty()) {notEmpty.await();}E element = deque.pollFirst();// 唤醒入队阻塞的线程notFull.signal();return element;} finally {lock.unlock();}}/*** 非阻塞入队:满则返回false* @param element 元素* @return 成功则true*/public boolean offer(E element) {if (element == null) {throw new NullPointerException("元素不能为null");}lock.lock();try {if (deque.size() >= capacity) {return false;}deque.offerLast(element);notEmpty.signal();return true;} finally {lock.unlock();}}/*** 非阻塞出队:空则返回null* @return 队首元素*/public E poll() {lock.lock();try {if (deque.isEmpty()) {return null;}E element = deque.pollFirst();notFull.signal();return element;} finally {lock.unlock();}}// 辅助方法(size/isEmpty/isFull/clear)public int size() {lock.lock();try {return deque.size();} finally {lock.unlock();}}public boolean isEmpty() {return size() == 0;}public boolean isFull() {return size() >= capacity;}public void clear() {lock.lock();try {deque.clear();notFull.signalAll(); // 唤醒所有入队阻塞线程} finally {lock.unlock();}}
}

关键注意事项

  1. 并发计数准确性:
  • ConcurrentLinkedDeque.size() 是O(n)操作,高并发下性能差,因此基础版用 AtomicInteger 维护计数,阻塞版用锁保护计数。
  1. null元素禁止: ConcurrentLinkedDeque 不允许null元素,因此入队时需校验。
  2. 循环策略选择:
  • 覆盖策略:适合日志缓存、临时数据存储(允许旧数据被覆盖)。
  • 阻塞策略:适合生产-消费模型(如任务队列,需严格控制容量)。
  • 非阻塞策略:适合快速响应场景(满/空时直接返回,不阻塞)。

使用示例

public class CircularQueueTest {public static void main(String[] args) {// 基础版(覆盖策略)CircularQueue<String> queue = new CircularQueue<>(3);queue.offer("A");queue.offer("B");queue.offer("C");queue.offer("D"); // 满,覆盖队首"A"System.out.println(queue.poll()); // 输出BSystem.out.println(queue.size()); // 输出3(B/C/D)// 阻塞版(生产-消费)BlockingCircularQueue<Integer> blockingQueue = new BlockingCircularQueue<>(2);// 生产者线程new Thread(() -> {try {blockingQueue.put(1);blockingQueue.put(2);blockingQueue.put(3); // 满,阻塞} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).start();// 消费者线程new Thread(() -> {try {Thread.sleep(1000);System.out.println(blockingQueue.take()); // 输出1,唤醒生产者} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).start();}
}

Z 最佳实践

循环队列的性能优化建议

  • 高并发场景下,优先选择覆盖策略,避免【锁竞争】。
  • 若需阻塞策略,可考虑直接使用 ArrayBlockingQueueJDK 原生,性能更优)
  • 避免频繁调用 size() 方法,高并发下改用 isEmpty() / isFull() 替代(O(1) 操作)。

基于循环队列实现滑动窗口

  • [Python/循环队列] 数据结构之滑动窗口 - 博客园/千千寰宇

方案2:基于【循环队列】实现【滑动窗口】

Hadoop MapReduce 在 Shuffle 过程中环形缓冲区的应用

  • Hadoop MapReduce 在 Shuffle 过程中的环形缓冲区

别名:循环缓冲队列,Circular Buffer Queue
Shuffle 过程中,【环形缓冲区】主要用于存储来自 Mapper 节点传输的数据。

  • 环形缓冲区的工作原理

【环形缓冲区】的工作原理是【基于生产者-消费者模型】的。

  • 在 Shuffle 过程中,Mapper 节点充当生产者的角色,将数据写入【环形缓冲区】;而 Reducer 节点则充当【消费者】的角色,从【环形缓冲区】中读取数据并进行后续的处理。
  • 当 Mapper 节点将数据写入【环形缓冲区】时,tail 指针会递增。

如果 tail 指针追上了 head 指针,表示缓冲区已满,此时 Mapper 节点会等待一段时间,直到 Reducer 节点读取并释放了一些空间,再将数据写入【环形缓冲区】。

  • 当 Reducer 节点从【环形缓冲区】中读取数据时,head 指针会递增。

如果 head 指针追上了 tail 指针,表示缓冲区已空,此时 Reducer 节点会等待一段时间,直到 Mapper 节点写入了更多的数据,再继续读取。

  • 环形缓冲区在 Shuffle 过程中的作用
  • 环形缓冲区在 Shuffle 过程中起到了至关重要的作用。
  • 它将 Mapper 节点产生的数据进行【临时存储】,以便 Reducer 节点能够按照预定的顺序和方式进行读取和处理。
  • 另外,由于 Hadoop 在 Shuffle 过程中使用了磁盘进行大规模的数据传输,而磁盘读写较慢。

因此,【环形缓冲区】通过在【内存】中存储数据,加速了数据的传输和处理过程,提高了整个 Shuffle 过程的效率和性能。

  • 推荐文献
  • Hadoop:Shuffle 过程中的环形缓冲区 - 极简博客

Y 推荐文献

X 参考文献

  • 环形缓冲区(Ring Buffer,也称为循环缓冲区或 Circular Buffer)是无锁队列实现中一种高效的数据结构 - CSDN
http://www.gsyq.cn/news/94872.html

相关文章:

  • 检索增强生成(RAG)技术原理深度解析:突破大模型知识边界的范式革命
  • 基于springboot的技术博客交流系统的设计与实现
  • 基于springboot的运动服服饰销售购买商城系统
  • 英语口语资源合集
  • 如何用DSPy优化RAG prompt示例
  • 鸿蒙PC UI控件库 - TextInput 文本输入框详解
  • 鸿蒙PC UI控件库 - PasswordInput 密码输入框详解
  • 【路径规划】基于RRT快速探索随机树算法在包含圆形障碍物的环境中寻找从起点到目标点的路径附matlab代码
  • 【国防科大硕士论文】V调频信号脉冲压缩+V-FM ISAR成像研究附Matlab代码
  • 夜莺监控设计思考(三)时序库、agent 的一些设计考量
  • Go Module构建
  • AI中的优化5-无约束非线性规划之凸性
  • 深圳|昆明|广州|东莞-奶茶原料批发供应商|奶茶原料供应商|奶茶原料批发市场|奶茶原料批发|奶茶原料推荐|奶茶原料公司——圣旺水吧 - 老百姓的口碑
  • TDengine 新性能基准测试工具 taosgen
  • 在 C++ 中轻松实现字符串与字符数组的相互转换
  • 【WRF理论第二十期】湍流与扩散(Turbulence / Diffusion)
  • 基于透镜天线阵列的毫米波大规模多输入多输出(MIMO)系统可靠波束空间信道估计研究附Matlab代码
  • Linux的权限
  • 如何用AlphaFold预测氨基酸突变对蛋白质结构的影响
  • Python类入门:用“汽车工厂”理解面向对象编程
  • 基于文化优化算法图像量化附Matlab代码
  • 高频软件测试基础面试题
  • 【大模型预训练】09-训练数据集生成技术:数据增强与合成数据的生成方法
  • 挖漏洞一个月赚2万多,别被骗了!
  • 基于自抗扰控制ADRC的永磁同步电机仿真模型附Simulink仿真
  • C++进阶技巧:如何在同一对象中存储左值或右值
  • 我在私有漏洞赏金计划常规测试中发现IP欺骗漏洞的过程
  • 基于无迹卡尔曼滤波(UKF)与模型预测控制(MPC)的多无人机避撞研究附Matlab代码
  • 列表基本概念
  • 【JavaWeb】Servlet继承结构