diff --git a/Eigen/src/ThreadPool/NonBlockingThreadPool.h b/Eigen/src/ThreadPool/NonBlockingThreadPool.h index efa6ef571b2c8c036961847a118cbb0357321cc0..08fe0da9c478362047dcc0dad09171b270e495da 100644 --- a/Eigen/src/ThreadPool/NonBlockingThreadPool.h +++ b/Eigen/src/ThreadPool/NonBlockingThreadPool.h @@ -21,12 +21,18 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { typedef typename Environment::Task Task; typedef RunQueue Queue; - ThreadPoolTempl(int num_threads, Environment env = Environment()) : ThreadPoolTempl(num_threads, true, env) {} + ThreadPoolTempl(int num_threads, Environment env = Environment()) + : ThreadPoolTempl(num_threads, /*max_spinning_threads=*/1, env) {} ThreadPoolTempl(int num_threads, bool allow_spinning, Environment env = Environment()) + : ThreadPoolTempl(num_threads, allow_spinning ? 1 : 0, 0, env) {} + + ThreadPoolTempl(int num_threads, int max_spinning_threads, int min_spinning_duration_milliseconds = 0, + Environment env = Environment()) : env_(env), num_threads_(num_threads), - allow_spinning_(allow_spinning), + max_spinning_threads_(max_spinning_threads), + min_spinning_duration_milliseconds_(min_spinning_duration_milliseconds), thread_data_(num_threads), all_coprimes_(num_threads), waiters_(num_threads), @@ -36,6 +42,9 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { done_(false), cancelled_(false), ec_(waiters_) { + eigen_assert(num_threads_ > 0 && "The number of threads must be > 0"); + eigen_assert(max_spinning_threads_ >= 0 && "The maximum number of spinning threads must be >= 0"); + eigen_assert(min_spinning_duration_milliseconds_ >= 0 && "The minimum spinning duration must be >= 0"); waiters_.resize(num_threads_); // Calculate coprimes of all numbers [1, num_threads]. // Coprimes are used for random walks over all threads in Steal @@ -226,13 +235,15 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { Environment env_; const int num_threads_; - const bool allow_spinning_; + const int max_spinning_threads_; + const int min_spinning_duration_milliseconds_; + MaxSizeVector thread_data_; MaxSizeVector> all_coprimes_; MaxSizeVector waiters_; unsigned global_steal_partition_; std::atomic blocked_; - std::atomic spinning_; + std::atomic spinning_; std::atomic done_; std::atomic cancelled_; EventCount ec_; @@ -242,6 +253,57 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { std::unordered_map> per_thread_map_; #endif + EIGEN_ALWAYS_INLINE bool MaybeStartSpinning() { + if (max_spinning_threads_ >= num_threads_) { + return true; + } + int num_spinning = spinning_.load(std::memory_order_relaxed); + while (num_spinning < max_spinning_threads_) { + if (spinning_.compare_exchange_weak(num_spinning, num_spinning + 1, std::memory_order_relaxed)) { + return true; + } + } + return false; + } + + EIGEN_ALWAYS_INLINE void StopSpinning() { + if (max_spinning_threads_ < num_threads_) { + int num_spinning = spinning_.load(std::memory_order_relaxed); + while (num_spinning > 0 && + !spinning_.compare_exchange_weak(num_spinning, num_spinning - 1, std::memory_order_relaxed)) { + } + } + } + + template + EIGEN_ALWAYS_INLINE Task TryGrabbingTask(int spin_count, GrabTaskFunctor grab_task) { + Task t = grab_task(); + if (!t.f && max_spinning_threads_ > 0) { + // To reduce sys-call overhead, only call clock routines when necessary. + std::chrono::time_point spin_start; + int spin_duration = 0; + if (min_spinning_duration_milliseconds_ > 0) { + spin_start = std::chrono::steady_clock::now(); + } + do { + // Iteratively try a small fixed number of times - further reducing sys-calls. + for (int i = 0; i < spin_count; i++) { + t = grab_task(); + if (t.f || cancelled_.load(std::memory_order_relaxed)) { + return t; + } + } + // Check current spin duration. + if (min_spinning_duration_milliseconds_ > 0) { + auto spin_stop = std::chrono::steady_clock::now(); + spin_duration = + static_cast(std::chrono::duration_cast(spin_stop - spin_start).count()); + } + } while (spin_duration < min_spinning_duration_milliseconds_); + } + return t; + } + // Main worker thread loop. void WorkerLoop(int thread_id) { #ifndef EIGEN_THREAD_LOCAL @@ -264,7 +326,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { // proportional to num_threads_ and we assume that new work is scheduled at // a constant rate, so we set spin_count to 5000 / num_threads_. The // constant was picked based on a fair dice roll, tune it. - const int spin_count = allow_spinning_ && num_threads_ > 0 ? 5000 / num_threads_ : 0; + const int spin_count = num_threads_ > 0 ? 5000 / num_threads_ : 0; if (num_threads_ == 1) { // For num_threads_ == 1 there is no point in going through the expensive // steal loop. Moreover, since NonEmptyQueueIndex() calls PopBack() on the @@ -273,12 +335,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { // counter-productive for the types of I/O workloads the single thread // pools tend to be used for. while (!cancelled_) { - Task t = q.PopFront(); - for (int i = 0; i < spin_count && !t.f; i++) { - if (!cancelled_.load(std::memory_order_relaxed)) { - t = q.PopFront(); - } - } + Task t = TryGrabbingTask(spin_count, [&q]() { return q.PopFront(); }); if (!t.f) { if (!WaitForWork(waiter, &t)) { return; @@ -296,16 +353,10 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { if (!t.f) { t = GlobalSteal(); if (!t.f) { - // Leave one thread spinning. This reduces latency. - if (allow_spinning_ && !spinning_ && !spinning_.exchange(true)) { - for (int i = 0; i < spin_count && !t.f; i++) { - if (!cancelled_.load(std::memory_order_relaxed)) { - t = GlobalSteal(); - } else { - return; - } - } - spinning_ = false; + // Maybe leave thread spinning to reduce latency. + if (MaybeStartSpinning()) { + t = TryGrabbingTask(spin_count, [this]() { return this->GlobalSteal(); }); + StopSpinning(); } if (!t.f) { if (!WaitForWork(waiter, &t)) { diff --git a/test/threads_non_blocking_thread_pool.cpp b/test/threads_non_blocking_thread_pool.cpp index e805cf2c422118d856c91a8b65670f8a5457571b..d9a07f16508c360273c393c59b7523ecb1731ae6 100644 --- a/test/threads_non_blocking_thread_pool.cpp +++ b/test/threads_non_blocking_thread_pool.cpp @@ -15,7 +15,7 @@ static void test_create_destroy_empty_pool() { // Just create and destroy the pool. This will wind up and tear down worker // threads. Ensure there are no issues in that logic. - for (int i = 0; i < 16; ++i) { + for (int i = 1; i < 16; ++i) { ThreadPool tp(i); } }