MRPT  2.0.4
WorkerThreadsPool.h
Go to the documentation of this file.
1 /* +------------------------------------------------------------------------+
2  | Mobile Robot Programming Toolkit (MRPT) |
3  | https://www.mrpt.org/ |
4  | |
5  | Copyright (c) 2005-2020, Individual contributors, see AUTHORS file |
6  | See: https://www.mrpt.org/Authors - All rights reserved. |
7  | Released under BSD License. See: https://www.mrpt.org/License |
8  +------------------------------------------------------------------------+ */
9 
10 #pragma once
11 
12 #include <atomic>
13 #include <condition_variable>
14 #include <cstdint>
15 #include <functional>
16 #include <future>
17 #include <mutex>
18 #include <queue>
19 #include <thread>
20 #include <vector>
21 
22 namespace mrpt::system
23 {
24 /**
25  * @brief A simple thread pool
26  *
27  * \note Partly based on: https://github.com/progschj/ThreadPool (ZLib license)
28  */
30 {
31  public:
32  enum queue_policy_t : uint8_t
33  {
34  /** Default policy: all tasks are executed in FIFO order */
36  /** If a task arrives and there are more pending tasks than worker
37  threads, drop previous tasks. */
39  };
40 
41  WorkerThreadsPool() = default;
42  WorkerThreadsPool(std::size_t num_threads, queue_policy_t p = POLICY_FIFO)
43  : policy_(p)
44  {
45  resize(num_threads);
46  }
48  void resize(std::size_t num_threads);
49  void clear(); //!< Stops all working jobs
50 
51  /** Enqueue one new working item, to be executed by threads when any is
52  * available. */
53  template <class F, class... Args>
54  auto enqueue(F&& f, Args&&... args)
55  -> std::future<typename std::result_of<F(Args...)>::type>;
56 
57  /** Returns the number of enqueued tasks, currently waiting for a free
58  * working thread to process them. */
59  std::size_t pendingTasks() const noexcept;
60 
61  private:
62  std::vector<std::thread> threads_;
63  std::atomic_bool do_stop_{false};
64  std::mutex queue_mutex_;
65  std::condition_variable condition_;
66  std::queue<std::function<void()>> tasks_;
68 };
69 
70 template <class F, class... Args>
71 auto WorkerThreadsPool::enqueue(F&& f, Args&&... args)
72  -> std::future<typename std::result_of<F(Args...)>::type>
73 {
74  using return_type = typename std::result_of<F(Args...)>::type;
75 
76  auto task = std::make_shared<std::packaged_task<return_type()>>(
77  std::bind(std::forward<F>(f), std::forward<Args>(args)...));
78 
79  std::future<return_type> res = task->get_future();
80  {
81  std::unique_lock<std::mutex> lock(queue_mutex_);
82 
83  // don't allow enqueueing after stopping the pool
84  if (do_stop_) throw std::runtime_error("enqueue on stopped ThreadPool");
85 
86  // policy check: drop pending tasks if we have more tasks than threads
87  if (policy_ == POLICY_DROP_OLD)
88  {
89  while (tasks_.size() >= threads_.size())
90  {
91  tasks_.pop();
92  }
93  }
94  // Enqeue the new task:
95  tasks_.emplace([task]() { (*task)(); });
96  }
97  condition_.notify_one();
98  return res;
99 }
100 } // namespace mrpt::system
If a task arrives and there are more pending tasks than worker threads, drop previous tasks...
auto enqueue(F &&f, Args &&... args) -> std::future< typename std::result_of< F(Args...)>::type >
Enqueue one new working item, to be executed by threads when any is available.
STL namespace.
WorkerThreadsPool(std::size_t num_threads, queue_policy_t p=POLICY_FIFO)
std::condition_variable condition_
std::vector< std::thread > threads_
std::queue< std::function< void()> > tasks_
void resize(std::size_t num_threads)
void clear()
Stops all working jobs.
std::size_t pendingTasks() const noexcept
Returns the number of enqueued tasks, currently waiting for a free working thread to process them...
Default policy: all tasks are executed in FIFO order.



Page generated by Doxygen 1.8.14 for MRPT 2.0.4 Git: 33de1d0ad Sat Jun 20 11:02:42 2020 +0200 at sáb jun 20 17:35:17 CEST 2020