C 0x 没有信号量?如何同步线程?

新手上路,请多包涵

C++0x 真的会没有信号量吗? Stack Overflow 上已经有一些关于信号量使用的问题。我一直使用它们(posix 信号量)让一个线程等待另一个线程中的某个事件:

 void thread0(...)
{
  doSomething0();

  event1.wait();

  ...
}

void thread1(...)
{
  doSomething1();

  event1.post();

  ...
}

如果我会用互斥锁来做到这一点:

 void thread0(...)
{
  doSomething0();

  event1.lock(); event1.unlock();

  ...
}

void thread1(...)
{
  event1.lock();

  doSomethingth1();

  event1.unlock();

  ...
}

问题:很难看,并且不能保证thread1首先锁定互斥锁(鉴于同一个线程应该锁定和解锁互斥锁,您也不能在thread0和thread1启动之前锁定event1)。

因此,既然 boost 也没有信号量,那么实现上述目标的最简单方法是什么?

原文由 tauran 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 439
2 个回答

您可以轻松地从互斥锁和条件变量构建一个:

 #include <mutex>
#include <condition_variable>

class semaphore {
    std::mutex mutex_;
    std::condition_variable condition_;
    unsigned long count_ = 0; // Initialized as locked.

public:
    void release() {
        std::lock_guard<decltype(mutex_)> lock(mutex_);
        ++count_;
        condition_.notify_one();
    }

    void acquire() {
        std::unique_lock<decltype(mutex_)> lock(mutex_);
        while(!count_) // Handle spurious wake-ups.
            condition_.wait(lock);
        --count_;
    }

    bool try_acquire() {
        std::lock_guard<decltype(mutex_)> lock(mutex_);
        if(count_) {
            --count_;
            return true;
        }
        return false;
    }
};

原文由 Maxim Egorushkin 发布,翻译遵循 CC BY-SA 4.0 许可协议

有老问题,但我想提供另一种解决方案。看来您需要的不是信号量,而是像 Windows 事件这样的事件。非常有效的事件可以像下面这样完成:

 #ifdef _MSC_VER
  #include <concrt.h>
#else
  // pthread implementation
  #include <cstddef>
  #include <cstdint>
  #include <shared_mutex>

namespace Concurrency
{
const unsigned int COOPERATIVE_TIMEOUT_INFINITE = (unsigned int)-1;
const size_t COOPERATIVE_WAIT_TIMEOUT = SIZE_MAX;

class event
{
public:
    event();
    ~event();

    size_t wait(unsigned int timeout = COOPERATIVE_TIMEOUT_INFINITE);
    void set();
    void reset();
    static size_t wait_for_multiple(event** _PPEvents, size_t _Count, bool _FWaitAll, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE);

    static const unsigned int timeout_infinite = COOPERATIVE_TIMEOUT_INFINITE;

private:
    int d;
    std::shared_mutex guard;
};

};

namespace concurrency = Concurrency;

#include <unistd.h>
#include <errno.h>
#include <sys/eventfd.h>
#include <sys/epoll.h>

#include <chrono>

#include "../HandleHolder.h"

typedef CommonHolder<int, close> fd_holder;

namespace Concurrency
{
    int watch(int ep_fd, int fd)
    {
        epoll_event ep_event;
        ep_event.events = EPOLLIN;
        ep_event.data.fd = fd;

        return epoll_ctl(ep_fd, EPOLL_CTL_ADD, fd, &ep_event);
    }

    event::event()
        : d(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK))
    {
    }

    event::~event()
    {
        std::unique_lock<std::shared_mutex> lock(guard);
        close(d);
        d = -1;
    }

    size_t event::wait(unsigned int timeout)
    {
        fd_holder ep_fd(epoll_create1(EPOLL_CLOEXEC));
        {
            std::shared_lock<std::shared_mutex> lock(guard);
            if (d == -1 || watch(ep_fd.GetHandle(), d) < 0)
                return COOPERATIVE_WAIT_TIMEOUT;
        }

        epoll_event ep_event;
        return epoll_wait(ep_fd.GetHandle(), &ep_event, 1, timeout) == 1 && (ep_event.events & EPOLLIN) ? 0 : COOPERATIVE_WAIT_TIMEOUT;
    }

    void event::set()
    {
        uint64_t count = 1;
        write(d, &count, sizeof(count));
    }

    void event::reset()
    {
        uint64_t count;
        read(d, &count, sizeof(count));
    }

    size_t event::wait_for_multiple(event** _PPEvents, size_t _Count, bool _FWaitAll, unsigned int _Timeout)
    {
        if (_FWaitAll) // not implemented
            std::abort();

        const auto deadline = _Timeout != COOPERATIVE_TIMEOUT_INFINITE ? std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count() + _Timeout : COOPERATIVE_TIMEOUT_INFINITE;

        fd_holder ep_fd(epoll_create1(EPOLL_CLOEXEC));
        int fds[_Count];
        for (int i = 0; i < _Count; ++i)
        {
            std::shared_lock<std::shared_mutex> lock(_PPEvents[i]->guard);
            fds[i] = _PPEvents[i]->d;
            if (fds[i] != -1 && watch(ep_fd.GetHandle(), fds[i]) < 0)
                fds[i] = -1;
        }

        epoll_event ep_events[_Count];

        // Вызов epoll_wait может быть прерван сигналом. Ждём весь тайм-аут, так же, как в Windows
        int res = 0;
        while (true)
        {
            res = epoll_wait(ep_fd.GetHandle(), &ep_events[0], _Count, _Timeout);
            if (res == -1 && errno == EINTR && std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count() < deadline)
                continue;
            break;
        }

        for (int i = 0; i < _Count; ++i)
        {
            if (fds[i] == -1)
                continue;

            for (int j = 0; j < res; ++j)
                if (ep_events[j].data.fd == fds[i] && (ep_events[j].events & EPOLLIN))
                    return i;
        }

        return COOPERATIVE_WAIT_TIMEOUT;
    }
};
#endif

然后只需使用 concurrency::event

原文由 Andrey 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题