C 11 线程安全队列

新手上路,请多包涵

我正在处理的一个项目使用多个线程来处理一组文件。每个线程都可以将文件添加到要处理的文件列表中,因此我将(我认为是)一个线程安全队列放在一起。相关部分如下:

 // qMutex is a std::mutex intended to guard the queue
// populatedNotifier is a std::condition_variable intended to
//                   notify waiting threads of a new item in the queue

void FileQueue::enqueue(std::string&& filename)
{
    std::lock_guard<std::mutex> lock(qMutex);
    q.push(std::move(filename));

    // Notify anyone waiting for additional files that more have arrived
    populatedNotifier.notify_one();
}

std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout)
{
    std::unique_lock<std::mutex> lock(qMutex);
    if (q.empty()) {
        if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::no_timeout) {
            std::string ret = q.front();
            q.pop();
            return ret;
        }
        else {
            return std::string();
        }
    }
    else {
        std::string ret = q.front();
        q.pop();
        return ret;
    }
}

但是,我偶尔会在 if (...wait_for(lock, timeout) == std::cv_status::no_timeout) { } 块内发生段错误,并且 gdb 中的检查表明由于队列为空而发生段错误。这怎么可能?据我了解, wait_for 仅在收到通知时才返回 cv_status::no_timeout ,并且这只发生在 FileQueue::enqueue 刚刚将新项目推送到队列之后。

原文由 Matt Kline 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 918
2 个回答

根据标准 condition_variables 允许虚假唤醒,即使事件没有发生。如果发生虚假唤醒,它将返回 cv_status::no_timeout (因为它是唤醒而不是超时),即使它没有被通知。正确的解决方案当然是在继续之前检查唤醒是否真的合法。

详细信息在标准§30.5.1 [thread.condition.condvar] 中指定:

— 当调用 notify_one()、调用 notify_all()、abs_time 指定的绝对超时 (30.2.4) 到期或虚假发出信号时,该函数将解除阻塞。

返回: 如果 abs_time 指定的绝对超时 (30.2.4) 已过期,则返回 cv_status::timeout,否则为 cv_status::no_timeout。

原文由 Grizzly 发布,翻译遵循 CC BY-SA 4.0 许可协议

这是我在 C++20 中实现的线程队列:

 #pragma once
#include <deque>
#include <mutex>
#include <condition_variable>
#include <utility>
#include <concepts>
#include <list>

template<typename QueueType>
concept thread_queue_concept =
    std::same_as<QueueType, std::deque<typename QueueType::value_type, typename QueueType::allocator_type>>
    || std::same_as<QueueType, std::list<typename QueueType::value_type, typename QueueType::allocator_type>>;

template<typename QueueType>
    requires thread_queue_concept<QueueType>
struct thread_queue
{
    using value_type = typename QueueType::value_type;
    thread_queue();
    explicit thread_queue( typename QueueType::allocator_type const &alloc );
    thread_queue( thread_queue &&other );
    thread_queue &operator =( thread_queue const &other );
    thread_queue &operator =( thread_queue &&other );
    bool empty() const;
    std::size_t size() const;
    void shrink_to_fit();
    void clear();
    template<typename ... Args>
        requires std::is_constructible_v<typename QueueType::value_type, Args ...>
    void enque( Args &&... args );
    template<typename Producer>
        requires requires( Producer producer ) { { producer() } -> std::same_as<std::pair<bool, typename QueueType::value_type>>; }
    void enqueue_multiple( Producer producer );
    template<typename Consumer>
        requires requires( Consumer consumer, typename QueueType::value_type value ) { { consumer( std::move( value ) ) } -> std::same_as<bool>; }
    void dequeue_multiple( Consumer consumer );
    typename QueueType::value_type dequeue();
    void swap( thread_queue &other );
private:
    mutable std::mutex m_mtx;
    mutable std::condition_variable m_cv;
    QueueType m_queue;
};

template<typename QueueType>
    requires thread_queue_concept<QueueType>
thread_queue<QueueType>::thread_queue()
{
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
thread_queue<QueueType>::thread_queue( typename QueueType::allocator_type const &alloc ) :
    m_queue( alloc )
{
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
thread_queue<QueueType>::thread_queue( thread_queue &&other )
{
    using namespace std;
    lock_guard lock( other.m_mtx );
    m_queue = move( other.m_queue );
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
thread_queue<QueueType> &thread_queue<QueueType>::thread_queue::operator =( thread_queue const &other )
{
    std::lock_guard
        ourLock( m_mtx ),
        otherLock( other.m_mtx );
    m_queue = other.m_queue;
    return *this;
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
thread_queue<QueueType> &thread_queue<QueueType>::thread_queue::operator =( thread_queue &&other )
{
    using namespace std;
    lock_guard
        ourLock( m_mtx ),
        otherLock( other.m_mtx );
    m_queue = move( other.m_queue );
    return *this;
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
bool thread_queue<QueueType>::thread_queue::empty() const
{
    std::lock_guard lock( m_mtx );
    return m_queue.empty();
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
std::size_t thread_queue<QueueType>::thread_queue::size() const
{
    std::lock_guard lock( m_mtx );
    return m_queue.size();
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
void thread_queue<QueueType>::thread_queue::shrink_to_fit()
{
    std::lock_guard lock( m_mtx );
    return m_queue.shrink_to_fit();
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
void thread_queue<QueueType>::thread_queue::clear()
{
    std::lock_guard lock( m_mtx );
    m_queue.clear();
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
template<typename ... Args>
    requires std::is_constructible_v<typename QueueType::value_type, Args ...>
void thread_queue<QueueType>::thread_queue::enque( Args &&... args )
{
    using namespace std;
    unique_lock lock( m_mtx );
    m_queue.emplace_front( forward<Args>( args ) ... );
    m_cv.notify_one();
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
typename QueueType::value_type thread_queue<QueueType>::thread_queue::dequeue()
{
    using namespace std;
    unique_lock lock( m_mtx );
    while( m_queue.empty() )
        m_cv.wait( lock );
    value_type value = move( m_queue.back() );
    m_queue.pop_back();
    return value;
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
template<typename Producer>
    requires requires( Producer producer ) { { producer() } -> std::same_as<std::pair<bool, typename QueueType::value_type>>; }
void thread_queue<QueueType>::enqueue_multiple( Producer producer )
{
    using namespace std;
    lock_guard lock( m_mtx );
    for( std::pair<bool, value_type> ret; (ret = move( producer() )).first; )
        m_queue.emplace_front( move( ret.second ) ),
        m_cv.notify_one();
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
template<typename Consumer>
    requires requires( Consumer consumer, typename QueueType::value_type value ) { { consumer( std::move( value ) ) } -> std::same_as<bool>; }
void thread_queue<QueueType>::dequeue_multiple( Consumer consumer )
{
    using namespace std;
    unique_lock lock( m_mtx );
    for( ; ; )
    {
        while( m_queue.empty() )
            m_cv.wait( lock );
        try
        {
            bool cont = consumer( move( m_queue.back() ) );
            m_queue.pop_back();
            if( !cont )
                return;
        }
        catch( ... )
        {
            m_queue.pop_back();
            throw;
        }
    }
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
void thread_queue<QueueType>::thread_queue::swap( thread_queue &other )
{
    std::lock_guard
        ourLock( m_mtx ),
        otherLock( other.m_mtx );
    m_queue.swap( other.m_queue );
}

唯一的模板参数是 BaseType,它可以是 std::deque 类型或 std::list 类型,受 thread_queue_concept 限制。此类使用此类型作为内部队列类型。选择对您的应用程序最有效的 BaseType。我可能已经将这个类限制在一个更不同的 thread_queue_concepts 上,它检查 BaseType 的所有使用部分,以便这个类可能适用于与 std::list<> 或 std::deque<> 兼容的其他类型,但我太懒了为不太可能的情况实施,即某人自己实施类似的事情。此代码的一个优点是 enqueue_multiple 和 dequeue_multiple。这些函数被赋予一个函数对象,通常是一个 lambda,它只需一个锁定步骤就可以使多个项目入队或出队。对于入队,这始终成立,对于出队,这取决于队列是否有要获取的元素。

如果您有一个生产者和多个消费者,则 enqueue_multiple 通常是有意义的。它导致持有锁的时间更长,因此只有在物品可以生产或快速移动时才有意义。

如果您有多个生产者和一个消费者,则 dequeue_multiple 通常是有意义的。在这里我们也有更长的锁定时间,但由于对象通常在这里只有快速移动,这通常不会造成伤害。

如果 dequeue_multiple 的消费者函数对象在消费时抛出异常,则异常被 caugt 并提供给消费者的元素(底层队列类型对象内的右值引用)被删除。

如果你想在 C++11 中使用这个类,你必须删除这些概念或使用 #if defined(__cpp_concepts) 禁用它们。

原文由 Bonita Montero 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题