使用C++构建安全队列-CSDN博客

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

1 背景

  • STL的容器不是线程安全的我们经常会有需求要求数据结构线程安全比如写生产者消费者模型的时候就要求队列线程安全。
  • 利用std::queue和C++线程标准库的一些组件mutexcondition_variable可以写一个线程安全的队列ConcurrenceQueue。

2 思路梳理

需要4个函数

  • push入队
  • pop出队并返回原来对头的元素如果为队空则阻塞
  • tryPop出队并返回原来对头的元素如果队空返回空使用智能指针作返回类型非阻塞
  • empty返回是否为空实则没啥用多线程条件下判空下一瞬间另一线程就可能push进去东西了。

3 实现代码

#ifndef __CONCURRENCEQUEUE_H__
#define __CONCURRENCEQUEUE_H__
#include <mutex>
#include <condition_variable>
#include <deque>
#include <queue>
#include <memory>

template<typename DATATYPE, typename SEQUENCE = std::deque<DATATYPE>>
class ConcurrenceQueue 
{
public:
    ConcurrenceQueue() = default;
    
    ConcurrenceQueue(const ConcurrenceQueue & other)
    {
        std::lock_guard<std::mutex> lg(other.m_mutex);
        m_data = other.m_data;
    }
    ConcurrenceQueue(ConcurrenceQueue &&) = delete;
    ConcurrenceQueue & operator= (const ConcurrenceQueue &) = delete;
    ~ConcurrenceQueue() = default;
    bool empty() const 
    {
        std::lock_guard<std::mutex> lg(m_mutex);
        return m_data.empty();
    }
    
    void push(const DATATYPE & data) 
    {
        std::lock_guard<std::mutex> lg(m_mutex);
        m_data.push(data);
        m_cond.notify_one();
    }
    
    void push(DATATYPE && data) 
    {
        std::lock_guard<std::mutex> lg(m_mutex);
        m_data.push(std::move(data));
        m_cond.notify_one();
    }
    
    std::shared_ptr<DATATYPE> tryPop() 
    {  // 非阻塞
        std::lock_guard<std::mutex> lg(m_mutex);
        if (m_data.empty()) return {};
        auto res = std::make_shared<DATATYPE>(m_data.front());
        m_data.pop();
        return res;
    }
    
    std::shared_ptr<DATATYPE> pop() 
    {  // 非阻塞
        std::unique_lock<std::mutex> lg(m_mutex);
        m_cond.wait(lg, [this] { return !m_data.empty(); });
        auto res = std::make_shared<DATATYPE>(std::move(m_data.front()));
        m_data.pop();
        return res;
    }
    
private:
    std::queue<DATATYPE, SEQUENCE> m_data;
    mutable std::mutex m_mutex;
    std::condition_variable m_cond;
};
#endif

 4 测试

全局的

ConcurrenceQueue<int> g_queue;

void producer() 
{    
    for (int i = 0; i < 100; ++i) 
    {
        g_queue.push(i);
        std::this_thread::sleep_for(std::chrono::seconds(3));
    }
}

void consumer1() 
{
    while (1) 
    {
        std::printf("[1]  -------   %d\n", *g_queue.pop());
    }
}

void consumer2() 
{
    while (1) 
    {
        auto front = g_queue.tryPop();
        std::printf("[2]  -------   %d\n", front ? *front : -1);
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
}

测试 1消费者阻塞式消费

int main () 
{
    std::thread t1(producer);
    std::thread t2(consumer1);

    t1.join();
    t2.join();
    return 0;
}

测试 2消费者非阻塞式消费但要sleep轮询

int main () 
{
    std::thread t1(producer);
    std::thread t2(consumer2);

    t1.join();
    t2.join();
    return 0;
}

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: c++