[ VIGRA Homepage | Function Index | Class Index | Namespaces | File List | Main Page ]

threadpool.hxx VIGRA

1 /************************************************************************/
2 /* */
3 /* Copyright 2014-2015 by Thorsten Beier, Philip Schill */
4 /* and Ullrich Koethe */
5 /* */
6 /* This file is part of the VIGRA computer vision library. */
7 /* The VIGRA Website is */
8 /* http://hci.iwr.uni-heidelberg.de/vigra/ */
9 /* Please direct questions, bug reports, and contributions to */
10 /* ullrich.koethe@iwr.uni-heidelberg.de or */
11 /* vigra@informatik.uni-hamburg.de */
12 /* */
13 /* Permission is hereby granted, free of charge, to any person */
14 /* obtaining a copy of this software and associated documentation */
15 /* files (the "Software"), to deal in the Software without */
16 /* restriction, including without limitation the rights to use, */
17 /* copy, modify, merge, publish, distribute, sublicense, and/or */
18 /* sell copies of the Software, and to permit persons to whom the */
19 /* Software is furnished to do so, subject to the following */
20 /* conditions: */
21 /* */
22 /* The above copyright notice and this permission notice shall be */
23 /* included in all copies or substantial portions of the */
24 /* Software. */
25 /* */
26 /* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND */
27 /* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES */
28 /* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND */
29 /* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT */
30 /* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, */
31 /* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING */
32 /* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR */
33 /* OTHER DEALINGS IN THE SOFTWARE. */
34 /* */
35 /************************************************************************/
36 #ifndef VIGRA_THREADPOOL_HXX
37 #define VIGRA_THREADPOOL_HXX
38 
39 #include <vector>
40 #include <queue>
41 #include <stdexcept>
42 #include <cmath>
43 #include "mathutil.hxx"
44 #include "counting_iterator.hxx"
45 #include "threading.hxx"
46 
47 
48 namespace vigra
49 {
50 
51 /** \addtogroup ParallelProcessing Functions and classes for parallel processing.
52 */
53 
54 //@{
55 
56  /**\brief Option base class for parallel algorithms.
57 
58  <b>\#include</b> <vigra/threadpool.hxx><br>
59  Namespace: vigra
60  */
62 {
63  public:
64 
65  /** Constants for special settings.
66  */
67  enum {
68  Auto = -1, ///< Determine number of threads automatically (from <tt>threading::thread::hardware_concurrency()</tt>)
69  Nice = -2, ///< Use half as many threads as <tt>Auto</tt> would.
70  NoThreads = 0 ///< Switch off multi-threading (i.e. execute tasks sequentially)
71  };
72 
74  : numThreads_(actualNumThreads(Auto))
75  {}
76 
77  /** \brief Get desired number of threads.
78 
79  <b>Note:</b> This function may return 0, which means that multi-threading
80  shall be switched off entirely. If an algorithm receives this value,
81  it should revert to a sequential implementation. In contrast, if
82  <tt>numThread() == 1</tt>, the parallel algorithm version shall be
83  executed with a single thread.
84  */
85  int getNumThreads() const
86  {
87  return numThreads_;
88  }
89 
90  /** \brief Get desired number of threads.
91 
92  In contrast to <tt>numThread()</tt>, this will always return a value <tt>>=1</tt>.
93  */
94  int getActualNumThreads() const
95  {
96  return std::max(1,numThreads_);
97  }
98 
99  /** \brief Set the number of threads or one of the constants <tt>Auto</tt>,
100  <tt>Nice</tt> and <tt>NoThreads</tt>.
101 
102  Default: <tt>ParallelOptions::Auto</tt> (use system default)
103 
104  This setting is ignored if the preprocessor flag <tt>VIGRA_SINGLE_THREADED</tt>
105  is defined. Then, the number of threads is set to 0 and all tasks revert to
106  sequential algorithm implementations. The same can be achieved at runtime
107  by passing <tt>n = 0</tt> to this function. In contrast, passing <tt>n = 1</tt>
108  causes the parallel algorithm versions to be executed with a single thread.
109  Both possibilities are mainly useful for debugging.
110  */
112  {
113  numThreads_ = actualNumThreads(n);
114  return *this;
115  }
116 
117 
118  private:
119  // helper function to compute the actual number of threads
120  static size_t actualNumThreads(const int userNThreads)
121  {
122  #ifdef VIGRA_SINGLE_THREADED
123  return 0;
124  #else
125  return userNThreads >= 0
126  ? userNThreads
127  : userNThreads == Nice
128  ? threading::thread::hardware_concurrency() / 2
129  : threading::thread::hardware_concurrency();
130  #endif
131  }
132 
133  int numThreads_;
134 };
135 
136 /********************************************************/
137 /* */
138 /* ThreadPool */
139 /* */
140 /********************************************************/
141 
142  /**\brief Thread pool class to manage a set of parallel workers.
143 
144  <b>\#include</b> <vigra/threadpool.hxx><br>
145  Namespace: vigra
146  */
148 {
149  public:
150 
151  /** Create a thread pool from ParallelOptions. The constructor just launches
152  the desired number of workers. If the number of threads is zero,
153  no workers are started, and all tasks will be executed in synchronously
154  in the present thread.
155  */
156  ThreadPool(const ParallelOptions & options)
157  : stop(false)
158  {
159  init(options);
160  }
161 
162  /** Create a thread pool with n threads. The constructor just launches
163  the desired number of workers. If \arg n is <tt>ParallelOptions::Auto</tt>,
164  the number of threads is determined by <tt>threading::thread::hardware_concurrency()</tt>.
165  <tt>ParallelOptions::Nice</tt> will create half as many threads.
166  If <tt>n = 0</tt>, no workers are started, and all tasks will be executed
167  synchronously in the present thread. If the preprocessor flag
168  <tt>VIGRA_SINGLE_THREADED</tt> is defined, the number of threads is always set
169  to zero (i.e. synchronous execution), regardless of the value of \arg n. This
170  is useful for debugging.
171  */
172  ThreadPool(const int n)
173  : stop(false)
174  {
175  init(ParallelOptions().numThreads(n));
176  }
177 
178  /**
179  * The destructor joins all threads.
180  */
181  ~ThreadPool();
182 
183  /**
184  * Enqueue a task that will be executed by the thread pool.
185  * The task result can be obtained using the get() function of the returned future.
186  * If the task throws an exception, it will be raised on the call to get().
187  */
188  template<class F>
189  auto enqueueReturning(F&& f) -> threading::future<decltype(f(0))>;
190 
191  /**
192  * Enqueue function for tasks without return value.
193  * This is a special case of the enqueueReturning template function, but
194  * some compilers fail on <tt>std::result_of<F(int)>::type</tt> for void(int) functions.
195  */
196  template<class F>
197  threading::future<void> enqueue(F&& f) ;
198 
199  /**
200  * Block until all tasks are finished.
201  */
203  {
204  threading::unique_lock<threading::mutex> lock(queue_mutex);
205  finish_condition.wait(lock, [this](){ return tasks.empty() && (busy == 0); });
206  }
207 
208  /**
209  * Return the number of worker threads.
210  */
211  size_t nThreads() const
212  {
213  return workers.size();
214  }
215 
216 private:
217 
218  // helper function to init the thread pool
219  void init(const ParallelOptions & options);
220 
221  // need to keep track of threads so we can join them
222  std::vector<threading::thread> workers;
223 
224  // the task queue
225  std::queue<std::function<void(int)> > tasks;
226 
227  // synchronization
228  threading::mutex queue_mutex;
229  threading::condition_variable worker_condition;
230  threading::condition_variable finish_condition;
231  bool stop;
232  threading::atomic_long busy, processed;
233 };
234 
235 inline void ThreadPool::init(const ParallelOptions & options)
236 {
237  busy.store(0);
238  processed.store(0);
239 
240  const size_t actualNThreads = options.getNumThreads();
241  for(size_t ti = 0; ti<actualNThreads; ++ti)
242  {
243  workers.emplace_back(
244  [ti,this]
245  {
246  for(;;)
247  {
248  std::function<void(int)> task;
249  {
250  threading::unique_lock<threading::mutex> lock(this->queue_mutex);
251 
252  // will wait if : stop == false AND queue is empty
253  // if stop == true AND queue is empty thread function will return later
254  //
255  // so the idea of this wait, is : If where are not in the destructor
256  // (which sets stop to true, we wait here for new jobs)
257  this->worker_condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); });
258  if(!this->tasks.empty())
259  {
260  ++busy;
261  task = std::move(this->tasks.front());
262  this->tasks.pop();
263  lock.unlock();
264  task(ti);
265  ++processed;
266  --busy;
267  finish_condition.notify_one();
268  }
269  else if(stop)
270  {
271  return;
272  }
273  }
274  }
275  }
276  );
277  }
278 }
279 
281 {
282  {
283  threading::unique_lock<threading::mutex> lock(queue_mutex);
284  stop = true;
285  }
286  worker_condition.notify_all();
287  for(threading::thread &worker: workers)
288  worker.join();
289 }
290 
291 template<class F>
292 inline auto
293 ThreadPool::enqueueReturning(F&& f) -> threading::future<decltype(f(0))>
294 {
295  typedef decltype(f(0)) result_type;
296  typedef threading::packaged_task<result_type(int)> PackageType;
297 
298  auto task = std::make_shared<PackageType>(f);
299  auto res = task->get_future();
300 
301  if(workers.size()>0){
302  {
303  threading::unique_lock<threading::mutex> lock(queue_mutex);
304 
305  // don't allow enqueueing after stopping the pool
306  if(stop)
307  throw std::runtime_error("enqueue on stopped ThreadPool");
308 
309  tasks.emplace(
310  [task](int tid)
311  {
312  (*task)(std::move(tid));
313  }
314  );
315  }
316  worker_condition.notify_one();
317  }
318  else{
319  (*task)(0);
320  }
321 
322  return res;
323 }
324 
325 template<class F>
326 inline threading::future<void>
328 {
329 #if defined(USE_BOOST_THREAD) && \
330  !defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD)
331  // Without variadic templates, boost:thread::packaged_task only
332  // supports the signature 'R()' (functions with no arguments).
333  // We bind the thread_id parameter to 0, so this parameter
334  // must NOT be used in function f (fortunately, this is the case
335  // for the blockwise versions of convolution, labeling and
336  // watersheds).
337  typedef threading::packaged_task<void()> PackageType;
338  auto task = std::make_shared<PackageType>(std::bind(f, 0));
339 #else
340  typedef threading::packaged_task<void(int)> PackageType;
341  auto task = std::make_shared<PackageType>(f);
342 #endif
343 
344  auto res = task->get_future();
345  if(workers.size()>0){
346  {
347  threading::unique_lock<threading::mutex> lock(queue_mutex);
348 
349  // don't allow enqueueing after stopping the pool
350  if(stop)
351  throw std::runtime_error("enqueue on stopped ThreadPool");
352 
353  tasks.emplace(
354  [task](int tid)
355  {
356 #if defined(USE_BOOST_THREAD) && \
357  !defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD)
358  (*task)();
359 #else
360  (*task)(std::move(tid));
361 #endif
362  }
363  );
364  }
365  worker_condition.notify_one();
366  }
367  else{
368 #if defined(USE_BOOST_THREAD) && \
369  !defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD)
370  (*task)();
371 #else
372  (*task)(0);
373 #endif
374  }
375  return res;
376 }
377 
378 /********************************************************/
379 /* */
380 /* parallel_foreach */
381 /* */
382 /********************************************************/
383 
384 // nItems must be either zero or std::distance(iter, end).
385 template<class ITER, class F>
386 inline void parallel_foreach_impl(
387  ThreadPool & pool,
388  const std::ptrdiff_t nItems,
389  ITER iter,
390  ITER end,
391  F && f,
392  std::random_access_iterator_tag
393 ){
394  std::ptrdiff_t workload = std::distance(iter, end);
395  vigra_precondition(workload == nItems || nItems == 0, "parallel_foreach(): Mismatch between num items and begin/end.");
396  const float workPerThread = float(workload)/pool.nThreads();
397  const std::ptrdiff_t chunkedWorkPerThread = std::max<std::ptrdiff_t>(roundi(workPerThread/3.0), 1);
398 
399  std::vector<threading::future<void> > futures;
400  for( ;iter<end; iter+=chunkedWorkPerThread)
401  {
402  const size_t lc = std::min(workload, chunkedWorkPerThread);
403  workload-=lc;
404  futures.emplace_back(
405  pool.enqueue(
406  [&f, iter, lc]
407  (int id)
408  {
409  for(size_t i=0; i<lc; ++i)
410  f(id, iter[i]);
411  }
412  )
413  );
414  }
415  for (auto & fut : futures)
416  {
417  fut.get();
418  }
419 }
420 
421 
422 
423 // nItems must be either zero or std::distance(iter, end).
424 template<class ITER, class F>
425 inline void parallel_foreach_impl(
426  ThreadPool & pool,
427  const std::ptrdiff_t nItems,
428  ITER iter,
429  ITER end,
430  F && f,
431  std::forward_iterator_tag
432 ){
433  if (nItems == 0)
434  nItems = std::distance(iter, end);
435 
436  std::ptrdiff_t workload = nItems;
437  const float workPerThread = float(workload)/pool.nThreads();
438  const std::ptrdiff_t chunkedWorkPerThread = std::max<std::ptrdiff_t>(roundi(workPerThread/3.0), 1);
439 
440  std::vector<threading::future<void> > futures;
441  for(;;)
442  {
443  const size_t lc = std::min(chunkedWorkPerThread, workload);
444  workload -= lc;
445  futures.emplace_back(
446  pool.enqueue(
447  [&f, iter, lc]
448  (int id)
449  {
450  auto iterCopy = iter;
451  for(size_t i=0; i<lc; ++i){
452  f(id, *iterCopy);
453  ++iterCopy;
454  }
455  }
456  )
457  );
458  for (size_t i = 0; i < lc; ++i)
459  {
460  ++iter;
461  if (iter == end)
462  {
463  vigra_postcondition(workload == 0, "parallel_foreach(): Mismatch between num items and begin/end.");
464  break;
465  }
466  }
467  if(workload==0)
468  break;
469  }
470  for (auto & fut : futures)
471  fut.get();
472 }
473 
474 
475 
476 // nItems must be either zero or std::distance(iter, end).
477 template<class ITER, class F>
478 inline void parallel_foreach_impl(
479  ThreadPool & pool,
480  const std::ptrdiff_t nItems,
481  ITER iter,
482  ITER end,
483  F && f,
484  std::input_iterator_tag
485 ){
486  size_t num_items = 0;
487  std::vector<threading::future<void> > futures;
488  for (; iter != end; ++iter)
489  {
490  auto item = *iter;
491  futures.emplace_back(
492  pool.enqueue(
493  [&f, &item](int id){
494  f(id, item);
495  }
496  )
497  );
498  ++num_items;
499  }
500  vigra_postcondition(num_items == nItems || nItems == 0, "parallel_foreach(): Mismatch between num items and begin/end.");
501  for (auto & fut : futures)
502  fut.get();
503 }
504 
505 // Runs foreach on a single thread.
506 // Used for API compatibility when the numbe of threads is 0.
507 template<class ITER, class F>
508 inline void parallel_foreach_single_thread(
509  ITER begin,
510  ITER end,
511  F && f,
512  const std::ptrdiff_t nItems = 0
513 ){
514  size_t n = 0;
515  for (; begin != end; ++begin)
516  {
517  f(0, *begin);
518  ++n;
519  }
520  vigra_postcondition(n == nItems || nItems == 0, "parallel_foreach(): Mismatch between num items and begin/end.");
521 }
522 
523 /** \brief Apply a functor to all items in a range in parallel.
524 
525  Create a thread pool (or use an existing one) to apply the functor \arg f
526  to all items in the range <tt>[begin, end)</tt> in parallel. \arg f must
527  be callable with two arguments of type <tt>size_t</tt> and <tt>T</tt>, where
528  the first argument is the thread index (starting at 0) and T is convertible
529  from the iterator's <tt>reference_type</tt> (i.e. the result of <tt>*begin</tt>).
530 
531  If the iterators are forward iterators (<tt>std::forward_iterator_tag</tt>), you
532  can provide the optional argument <tt>nItems</tt> to avoid the a
533  <tt>std::distance(begin, end)</tt> call to compute the range's length.
534 
535  Parameter <tt>nThreads</tt> controls the number of threads. <tt>parallel_foreach</tt>
536  will split the work into about three times as many parallel tasks.
537  If <tt>nThreads = ParallelOptions::Auto</tt>, the number of threads is set to
538  the machine default (<tt>std::thread::hardware_concurrency()</tt>).
539 
540  If <tt>nThreads = 0</tt>, the function will not use threads,
541  but will call the functor sequentially. This can also be enforced by setting the
542  preprocessor flag <tt>VIGRA_SINGLE_THREADED</tt>, ignoring the value of
543  <tt>nThreads</tt> (useful for debugging).
544 
545  <b> Declarations:</b>
546 
547  \code
548  namespace vigra {
549  // pass the desired number of threads or ParallelOptions::Auto
550  // (creates an internal thread pool accordingly)
551  template<class ITER, class F>
552  void parallel_foreach(int64_t nThreads,
553  ITER begin, ITER end,
554  F && f,
555  const uint64_t nItems = 0);
556 
557  // use an existing thread pool
558  template<class ITER, class F>
559  void parallel_foreach(ThreadPool & pool,
560  ITER begin, ITER end,
561  F && f,
562  const uint64_t nItems = 0);
563 
564  // pass the integers from 0 ... (nItems-1) to the functor f,
565  // using the given number of threads or ParallelOptions::Auto
566  template<class F>
567  void parallel_foreach(int64_t nThreads,
568  uint64_t nItems,
569  F && f);
570 
571  // likewise with an existing thread pool
572  template<class F>
573  void parallel_foreach(ThreadPool & threadpool,
574  uint64_t nItems,
575  F && f);
576  }
577  \endcode
578 
579  <b>Usage:</b>
580 
581  \code
582  #include <iostream>
583  #include <algorithm>
584  #include <vector>
585  #include <vigra/threadpool.hxx>
586 
587  using namespace std;
588  using namespace vigra;
589 
590  int main()
591  {
592  size_t const n_threads = 4;
593  size_t const n = 2000;
594  vector<int> input(n);
595 
596  auto iter = input.begin(),
597  end = input.end();
598 
599  // fill input with 0, 1, 2, ...
600  iota(iter, end, 0);
601 
602  // compute the sum of the elements in the input vector.
603  // (each thread computes the partial sum of the items it sees
604  // and stores the sum at the appropriate index of 'results')
605  vector<int> results(n_threads, 0);
606  parallel_foreach(n_threads, iter, end,
607  // the functor to be executed, defined as a lambda function
608  // (first argument: thread ID, second argument: result of *iter)
609  [&results](size_t thread_id, int items)
610  {
611  results[thread_id] += items;
612  }
613  );
614 
615  // collect the partial sums of all threads
616  int sum = accumulate(results.begin(), results.end(), 0);
617 
618  cout << "The sum " << sum << " should be equal to " << (n*(n-1))/2 << endl;
619  }
620  \endcode
621  */
623 
624 template<class ITER, class F>
625 inline void parallel_foreach(
626  ThreadPool & pool,
627  ITER begin,
628  ITER end,
629  F && f,
630  const std::ptrdiff_t nItems = 0)
631 {
632  if(pool.nThreads()>1)
633  {
634  parallel_foreach_impl(pool,nItems, begin, end, f,
635  typename std::iterator_traits<ITER>::iterator_category());
636  }
637  else
638  {
639  parallel_foreach_single_thread(begin, end, f, nItems);
640  }
641 }
642 
643 template<class ITER, class F>
644 inline void parallel_foreach(
645  int64_t nThreads,
646  ITER begin,
647  ITER end,
648  F && f,
649  const std::ptrdiff_t nItems = 0)
650 {
651 
652  ThreadPool pool(nThreads);
653  parallel_foreach(pool, begin, end, f, nItems);
654 }
655 
656 template<class F>
657 inline void parallel_foreach(
658  int64_t nThreads,
659  std::ptrdiff_t nItems,
660  F && f)
661 {
662  auto iter = range(nItems);
663  parallel_foreach(nThreads, iter, iter.end(), f, nItems);
664 }
665 
666 
667 template<class F>
668 inline void parallel_foreach(
669  ThreadPool & threadpool,
670  std::ptrdiff_t nItems,
671  F && f)
672 {
673  auto iter = range(nItems);
674  parallel_foreach(threadpool, iter, iter.end(), f, nItems);
675 }
676 
677 //@}
678 
679 } // namespace vigra
680 
681 #endif // VIGRA_THREADPOOL_HXX
Int32 roundi(FixedPoint16< IntBits, OverflowHandling > v)
rounding to the nearest integer.
Definition: fixedpoint.hxx:1775
ParallelOptions & numThreads(const int n)
Set the number of threads or one of the constants Auto, Nice and NoThreads.
Definition: threadpool.hxx:111
Determine number of threads automatically (from threading::thread::hardware_concurrency()) ...
Definition: threadpool.hxx:68
Definition: accessor.hxx:43
doxygen_overloaded_function(template<... > void separableConvolveBlockwise) template< unsigned int N
Separated convolution on ChunkedArrays.
~ThreadPool()
Definition: threadpool.hxx:280
Thread pool class to manage a set of parallel workers.
Definition: threadpool.hxx:147
int getActualNumThreads() const
Get desired number of threads.
Definition: threadpool.hxx:94
auto enqueueReturning(F &&f) -> threading::future< decltype(f(0))>
Definition: threadpool.hxx:293
void parallel_foreach(...)
Apply a functor to all items in a range in parallel.
Option base class for parallel algorithms.
Definition: threadpool.hxx:61
size_t nThreads() const
Definition: threadpool.hxx:211
void waitFinished()
Definition: threadpool.hxx:202
threading::future< void > enqueue(F &&f)
Definition: threadpool.hxx:327
Use half as many threads as Auto would.
Definition: threadpool.hxx:69
Switch off multi-threading (i.e. execute tasks sequentially)
Definition: threadpool.hxx:70
ThreadPool(const int n)
Definition: threadpool.hxx:172
int getNumThreads() const
Get desired number of threads.
Definition: threadpool.hxx:85
ThreadPool(const ParallelOptions &options)
Definition: threadpool.hxx:156

© Ullrich Köthe (ullrich.koethe@iwr.uni-heidelberg.de)
Heidelberg Collaboratory for Image Processing, University of Heidelberg, Germany

html generated using doxygen and Python
vigra 1.11.0