Alexandria  2.27.0
SDC-CH common library for the Euclid project
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ThreadPool.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2012-2022 Euclid Science Ground Segment
3  *
4  * This library is free software; you can redistribute it and/or modify it under
5  * the terms of the GNU Lesser General Public License as published by the Free
6  * Software Foundation; either version 3.0 of the License, or (at your option)
7  * any later version.
8  *
9  * This library is distributed in the hope that it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11  * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
12  * details.
13  *
14  * You should have received a copy of the GNU Lesser General Public License
15  * along with this library; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17  */
18 
27 #include <algorithm>
28 #include <numeric>
29 
30 namespace Euclid {
31 
32 namespace {
33 
34 class Worker {
35 
36 public:
37  Worker(std::mutex& queue_mutex, std::deque<ThreadPool::Task>& queue, std::atomic<bool>& run_flag,
38  std::atomic<bool>& sleeping_flag, std::atomic<bool>& done_flag, unsigned int empty_queue_wait_time,
39  std::exception_ptr& exception_ptr)
40  : m_queue_mutex(queue_mutex)
41  , m_queue(queue)
42  , m_run_flag(run_flag)
43  , m_sleeping_flag(sleeping_flag)
44  , m_done_flag(done_flag)
45  , m_empty_queue_wait_time(empty_queue_wait_time)
46  , m_exception_ptr(exception_ptr) {}
47 
48  void operator()() {
49  while (m_run_flag.get()) {
50  std::unique_ptr<ThreadPool::Task> task_ptr = nullptr;
51 
52  {
54  // If an exception was thrown, stop just here
55  if (m_exception_ptr != nullptr) {
56  break;
57  }
58  // Check if there is anything it the queue to be done and get it
59  if (!m_queue.get().empty()) {
60  task_ptr = Euclid::make_unique<ThreadPool::Task>(m_queue.get().front());
61  m_queue.get().pop_front();
62  }
63  }
64 
65  // If we have some work to do, do it. Otherwise sleep for some time.
66  if (task_ptr) {
67  try {
68  (*task_ptr)();
69  } catch (...) {
72  }
73  } else {
74  m_sleeping_flag.get() = true;
76  m_sleeping_flag.get() = false;
77  }
78  }
79  // Indicate that the worker is done
80  m_sleeping_flag.get() = true;
81  m_done_flag.get() = true;
82  m_run_flag.get() = false;
83  }
84 
85 private:
93 };
94 
95 } // end of anonymous namespace
96 
97 ThreadPool::ThreadPool(unsigned int thread_count, unsigned int empty_queue_wait_time)
98  : m_worker_run_flags(thread_count)
99  , m_worker_sleeping_flags(thread_count)
100  , m_worker_done_flags(thread_count)
101  , m_empty_queue_wait_time(empty_queue_wait_time) {
102  for (unsigned int i = 0; i < thread_count; ++i) {
103  m_worker_run_flags.at(i) = true;
104  m_worker_sleeping_flags.at(i) = false;
105  m_worker_done_flags.at(i) = false;
108  }
109 }
110 
111 namespace {
112 
113 void waitWorkers(std::vector<std::atomic<bool>>& worker_flags, unsigned int wait_time) {
114  // Now wait until all the workers have finish any current tasks
115  for (auto& flag : worker_flags) {
116  while (!flag) {
118  }
119  }
120 }
121 
122 } // namespace
123 
124 bool ThreadPool::checkForException(bool rethrow) {
126  if (m_exception_ptr) {
127  if (rethrow) {
129  } else {
130  return true;
131  }
132  }
133  return false;
134 }
135 
136 size_t ThreadPool::queued() const {
138  return m_queue.size();
139 }
140 
141 size_t ThreadPool::running() const {
144  return m_worker_sleeping_flags.size() - sleeping;
145 }
146 
149  return m_worker_done_flags.size() - done;
150 }
151 
152 void ThreadPool::block(bool throw_on_exception) {
153  // Wait for the queue to be empty
154  bool queue_is_empty = false;
155  while (!queue_is_empty) {
156  {
158  if (m_exception_ptr != nullptr) {
159  break;
160  }
161  queue_is_empty = m_queue.empty();
162  }
163  if (!queue_is_empty) {
165  }
166  }
167  // Wait for the workers to finish the currently executing tasks
169  // Check if any worker finished with an exception
170  checkForException(throw_on_exception);
171 }
172 
174  // Wait for the pool to be done with anything queued
175  block(false);
176  // Tell the threads to finish
178  // Now wait until all the workers have exited
179  for (auto& worker : m_workers) {
180  worker.join();
181  }
182 }
183 
186  if (m_worker_run_flags.empty()) {
187  task();
188  } else {
189  m_queue.emplace_back(std::move(task));
190  }
191 }
192 
193 } // namespace Euclid
std::deque< Task > m_queue
Definition: ThreadPool.h:114
void block(bool throw_on_exception=true)
Definition: ThreadPool.cpp:152
T empty(T...args)
void submit(Task task)
Submit a task to be executed.
Definition: ThreadPool.cpp:184
std::vector< std::atomic< bool > > m_worker_sleeping_flags
Definition: ThreadPool.h:111
T sleep_for(T...args)
T end(T...args)
size_t queued() const
Return the number of queued tasks.
Definition: ThreadPool.cpp:136
T current_exception(T...args)
std::vector< std::atomic< bool > > m_worker_run_flags
Definition: ThreadPool.h:110
T at(T...args)
ThreadPool(unsigned int thread_count=std::thread::hardware_concurrency(), unsigned int empty_queue_wait_time=50)
Constructs a new ThreadPool.
Definition: ThreadPool.cpp:97
std::mutex m_queue_mutex
Definition: ThreadPool.h:109
bool checkForException(bool rethrow=false)
Checks if any task has thrown an exception and optionally rethrows it.
Definition: ThreadPool.cpp:124
virtual ~ThreadPool()
Definition: ThreadPool.cpp:173
std::reference_wrapper< std::exception_ptr > m_exception_ptr
Definition: ThreadPool.cpp:92
STL class.
T lock(T...args)
std::reference_wrapper< std::mutex > m_queue_mutex
Definition: ThreadPool.cpp:86
T move(T...args)
std::vector< std::thread > m_workers
Definition: ThreadPool.h:113
T size(T...args)
STL class.
STL class.
size_t activeThreads() const
Return the number of active workers (either running or sleeping)
Definition: ThreadPool.cpp:147
T begin(T...args)
std::vector< std::atomic< bool > > m_worker_done_flags
Definition: ThreadPool.h:112
T fill(T...args)
unsigned int m_empty_queue_wait_time
Definition: ThreadPool.cpp:91
unsigned int m_empty_queue_wait_time
Definition: ThreadPool.h:115
std::reference_wrapper< std::atomic< bool > > m_done_flag
Definition: ThreadPool.cpp:90
T rethrow_exception(T...args)
T accumulate(T...args)
size_t running() const
Return the number of running tasks.
Definition: ThreadPool.cpp:141
std::exception_ptr m_exception_ptr
Definition: ThreadPool.h:116
std::reference_wrapper< std::atomic< bool > > m_run_flag
Definition: ThreadPool.cpp:88
std::reference_wrapper< std::atomic< bool > > m_sleeping_flag
Definition: ThreadPool.cpp:89
std::reference_wrapper< std::deque< ThreadPool::Task > > m_queue
Definition: ThreadPool.cpp:87
T emplace_back(T...args)