lf_queue.h
#pragma once
#include <iostream>
#include <vector>
#include <atomic>
#include "macros.h"
namespace Common {
// 无锁队列模板类,T为存储元素类型
template<typename T>
class LFQueue final {
public:
// 构造函数,预分配num_elems个元素的空间
LFQueue(std::size_t num_elems) :
store_(num_elems, T()) /* 预分配vector存储空间,每个元素初始化为T的默认值 */ {
}
// 获取下一个可写入的位置,返回指向元素的指针
auto getNextToWriteTo() noexcept {
return &store_[next_write_index_];
}
// 写入后更新写指针索引,并递增元素计数
auto updateWriteIndex() noexcept {
next_write_index_ = (next_write_index_ + 1) % store_.size();
num_elements_++;
}
// 获取下一个可读元素的指针,如果队列为空则返回nullptr
auto getNextToRead() const noexcept -> const T * {
return (size() ? &store_[next_read_index_] : nullptr);
}
// 读取后更新读指针索引,并递减元素计数
auto updateReadIndex() noexcept {
next_read_index_ = (next_read_index_ + 1) % store_.size();
// 断言确保队列非空,避免无效读取
ASSERT(num_elements_ != 0, "Read an invalid element in:" + std::to_string(pthread_self()));
num_elements_--;
}
// 获取当前队列中元素的数量
auto size() const noexcept {
return num_elements_.load();
}
// 禁用默认构造、拷贝构造、移动构造和赋值操作
// 这样可以防止队列被意外复制或移动,导致资源管理错误
LFQueue() = delete;
LFQueue(const LFQueue &) = delete;
LFQueue(const LFQueue &&) = delete;
LFQueue &operator=(const LFQueue &) = delete;
LFQueue &operator=(const LFQueue &&) = delete;
private:
// 存储队列元素的向量,预分配固定大小
std::vector<T> store_;
// 原子类型的写索引,指向下一个可写入的位置
std::atomic<size_t> next_write_index_ = {0};
// 原子类型的读索引,指向下一个可读取的位置
std::atomic<size_t> next_read_index_ = {0};
// 原子类型的元素计数器,记录当前队列中的元素数量
std::atomic<size_t> num_elements_ = {0};
};
}例子:
#include "thread_utils.h"
#include "lf_queue.h"
struct MyStruct {
int d_[3];
};
using namespace Common;
auto consumeFunction(LFQueue<MyStruct>* lfq) {
using namespace std::literals::chrono_literals;
std::this_thread::sleep_for(5s);
while(lfq->size()) {
const auto d = lfq->getNextToRead();
lfq->updateReadIndex();
std::cout << "consumeFunction read elem:" << d->d_[0] << "," << d->d_[1] << "," << d->d_[2] << " lfq-size:" << lfq->size() << std::endl;
std::this_thread::sleep_for(1s);
}
std::cout << "consumeFunction exiting." << std::endl;
}
int main(int, char **) {
LFQueue<MyStruct> lfq(20);
auto ct = createAndStartThread(-1, "", consumeFunction, &lfq);
for(auto i = 0; i < 50; ++i) {
const MyStruct d{i, i * 10, i * 100};
*(lfq.getNextToWriteTo()) = d;
lfq.updateWriteIndex();
std::cout << "main constructed elem:" << d.d_[0] << "," << d.d_[1] << "," << d.d_[2] << " lfq-size:" << lfq.size() << std::endl;
using namespace std::literals::chrono_literals;
std::this_thread::sleep_for(1s);
}
ct->join();
std::cout << "main exiting." << std::endl;
return 0;
}
系统当前共有 481 篇文章