如何在 C 中创建高效的多线程任务调度程序?

新手上路,请多包涵

我想用 C++ 创建一个非常高效的任务调度系统。

基本思想是这样的:

 class Task {
    public:
        virtual void run() = 0;
};

class Scheduler {
    public:
        void add(Task &task, double delayToRun);
};

Scheduler 后面应该有一个固定大小的线程池,用来运行任务(我不想为每个任务创建一个线程)。 delayToRun means that the task doesn’t get executed immediately, but delayToRun seconds later (measuring from the point it was added into the Scheduler )。

( delayToRun 当然是一个“至少”的值。如果系统被加载,或者如果我们向调度器询问不可能,它将无法处理我们的请求。但它应该尽力而为)

这是我的问题。如何有效地实现 delayToRun 功能?我试图通过使用互斥锁和条件变量来解决这个问题。

我看到两种方法:

带经理线程

调度程序包含两个队列: allTasksQueuetasksReadyToRunQueue 。一个任务被添加到 allTasksQueueScheduler::add 。有一个管理器线程,它等待的时间最短,因此它可以将任务从 allTasksQueuetasksReadyToRunQueue 。工作线程等待 tasksReadyToRunQueue 中可用的任务。

如果 Scheduler::addallTasksQueue 前面添加一个任务(一个任务,它的值为 delayToRun 所以它应该在当前任务之前最快运行),则需要唤醒管理器任务,以便更新等待时间。

这种方法可以被认为是低效的,因为它需要两个队列,并且它需要两个 condvar.signals 来使任务运行(一个用于 allTasksQueue -> tasksReadyToRunQueue ,一个用于发送信号工作线程来实际运行任务)

没有经理线程

调度程序中有一个队列。一个任务在 Scheduler::add 被添加到这个队列中。工作线程检查队列。如果它是空的,它会在没有时间限制的情况下等待。如果不为空,则等待最快的任务。

  1. 如果工作线程等待的条件变量只有一个:这种方法可以认为是低效的,因为如果队列前面增加了一个任务(前面的意思是,如果有N个工作线程,那么任务索引)然后需要唤醒 所有 工作线程以更新他们正在等待的时间。

  2. 如果每个线程有单独的条件变量,那么我们可以控制唤醒哪个线程,所以这种情况下我们不需要唤醒所有线程(只需要唤醒等待时间最长的线程) ,所以我们需要管理这个值)。我目前正在考虑实现这一点,但确定确切的细节很复杂。对这种方法有什么建议/想法/文件吗?


这个问题有更好的解决方案吗?我正在尝试使用标准 C++ 功能,但如果它们提供更好的解决方案,我也愿意使用平台相关(我的主要平台是 linux)工具(如 pthreads),甚至是 linux 特定工具(如 futexes)。

原文由 geza 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 329
2 个回答

通过使用单个池线程等待“下一个运行”任务的设计,您可以避免拥有单独的“管理器”线程,并且在下一个运行的任务更改时必须唤醒大量任务(如果有)一个条件变量,其余的池线程无限期地等待第二个条件变量。

池线程将按照以下几行执行伪代码:

 pthread_mutex_lock(&queue_lock);

while (running)
{
    if (head task is ready to run)
    {
        dequeue head task;
        if (task_thread == 1)
            pthread_cond_signal(&task_cv);
        else
            pthread_cond_signal(&queue_cv);

        pthread_mutex_unlock(&queue_lock);
        run dequeued task;
        pthread_mutex_lock(&queue_lock);
    }
    else if (!queue_empty && task_thread == 0)
    {
        task_thread = 1;
        pthread_cond_timedwait(&task_cv, &queue_lock, time head task is ready to run);
        task_thread = 0;
    }
    else
    {
        pthread_cond_wait(&queue_cv, &queue_lock);
    }
}

pthread_mutex_unlock(&queue_lock);

如果您更改要运行的下一个任务,则执行:

 if (task_thread == 1)
    pthread_cond_signal(&task_cv);
else
    pthread_cond_signal(&queue_cv);

持有 queue_lock

在这种方案下,所有唤醒都直接在一个线程上,只有一个优先级任务队列,不需要管理线程。

原文由 caf 发布,翻译遵循 CC BY-SA 3.0 许可协议

你的规范有点太强了:

delayToRun 表示任务不会立即执行,但是 delayToRun 秒后

您忘记添加“至少”:

  • 该任务现在没有执行,但 至少 delayToRun 秒后

关键是,如果一万个任务都以 0.1 delayToRun 进行调度,那么它们实际上肯定无法同时运行。

通过这样的更正,您只需维护一些(预定开始时间,关闭运行)的队列(或议程),保持该队列排序,然后开始 N (一些固定数量)线程原子地弹出议程的第一个元素并运行它。

然后需要唤醒所有工作线程以更新他们正在等待的时间。

不, 一些 工作线程会被唤醒。

阅读条件变量和广播。

您还可以使用 POSIX 计时器,请参阅 timer_create(2) 或 Linux 特定的 fd 计时器,请参阅 timerfd_create(2)

你可能会避免在你的线程中运行阻塞系统调用,并让一些 中央 线程使用一些事件循环来管理它们(参见 poll(2) …);否则,如果您有一百个任务正在运行 sleep(100) 并且一个任务计划在半秒内运行,它不会在一百秒之前运行。

您可能想阅读有关 延续传递风格 编程的内容(它与 -CPS- 高度相关)。阅读 Juliusz Chroboczek 关于 继续传递 C 的论文。

还要查看 Qt 线程

你也可以考虑在 Go 中编码(使用它的 Goroutines)。

原文由 Basile Starynkevitch 发布,翻译遵循 CC BY-SA 3.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题