From ee801e2047508581d9c3026b46d1bdfe510040ea Mon Sep 17 00:00:00 2001 From: Antonio Sanchez Date: Tue, 28 Feb 2023 13:45:03 +0100 Subject: [PATCH 1/2] Allow more spinning threads in threadpool. This can help reduce latency, at the cost of additional wasted spin cycles. Coordinating with ARM/Intel to agree on an approach. Both have requested this to reduce latency in certain models. --- Eigen/src/ThreadPool/NonBlockingThreadPool.h | 46 ++++++++++++++++---- 1 file changed, 37 insertions(+), 9 deletions(-) diff --git a/Eigen/src/ThreadPool/NonBlockingThreadPool.h b/Eigen/src/ThreadPool/NonBlockingThreadPool.h index efa6ef571..a0db8f214 100644 --- a/Eigen/src/ThreadPool/NonBlockingThreadPool.h +++ b/Eigen/src/ThreadPool/NonBlockingThreadPool.h @@ -21,12 +21,20 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { typedef typename Environment::Task Task; typedef RunQueue Queue; - ThreadPoolTempl(int num_threads, Environment env = Environment()) : ThreadPoolTempl(num_threads, true, env) {} + ThreadPoolTempl(int num_threads, Environment env = Environment()) + : ThreadPoolTempl(num_threads, /*max_spinning_threads=*/1, env) {} - ThreadPoolTempl(int num_threads, bool allow_spinning, Environment env = Environment()) + // This constructor is for backward-compatibility only. + EIGEN_DEPRECATED + ThreadPoolTempl(int num_threads, bool allow_spinning, + Environment env = Environment()) + : ThreadPoolTempl(num_threads, allow_spinning ? 1 : 0, env) {} + + ThreadPoolTempl(int num_threads, int max_spinning_threads, + Environment env = Environment()) : env_(env), num_threads_(num_threads), - allow_spinning_(allow_spinning), + max_spinning_threads_(max_spinning_threads), thread_data_(num_threads), all_coprimes_(num_threads), waiters_(num_threads), @@ -226,13 +234,13 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { Environment env_; const int num_threads_; - const bool allow_spinning_; + const int max_spinning_threads_; MaxSizeVector thread_data_; MaxSizeVector> all_coprimes_; MaxSizeVector waiters_; unsigned global_steal_partition_; std::atomic blocked_; - std::atomic spinning_; + std::atomic spinning_; std::atomic done_; std::atomic cancelled_; EventCount ec_; @@ -242,6 +250,24 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { std::unordered_map> per_thread_map_; #endif + EIGEN_ALWAYS_INLINE bool MaybeStartSpinning() { + int num_spinning = spinning_.load(std::memory_order_relaxed); + while (num_spinning < max_spinning_threads_) { + if (spinning_.compare_exchange_weak(num_spinning, num_spinning + 1, + std::memory_order_relaxed)) { + return true; + } + } + return false; + } + + EIGEN_ALWAYS_INLINE void StopSpinning() { + int num_spinning = spinning_.load(std::memory_order_relaxed); + while (!spinning_.compare_exchange_weak(num_spinning, num_spinning - 1, + std::memory_order_relaxed)) { + } + } + // Main worker thread loop. void WorkerLoop(int thread_id) { #ifndef EIGEN_THREAD_LOCAL @@ -264,7 +290,8 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { // proportional to num_threads_ and we assume that new work is scheduled at // a constant rate, so we set spin_count to 5000 / num_threads_. The // constant was picked based on a fair dice roll, tune it. - const int spin_count = allow_spinning_ && num_threads_ > 0 ? 5000 / num_threads_ : 0; + const int spin_count = + max_spinning_threads_ > 0 && num_threads_ > 0 ? 5000 / num_threads_ : 0; if (num_threads_ == 1) { // For num_threads_ == 1 there is no point in going through the expensive // steal loop. Moreover, since NonEmptyQueueIndex() calls PopBack() on the @@ -296,8 +323,9 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { if (!t.f) { t = GlobalSteal(); if (!t.f) { - // Leave one thread spinning. This reduces latency. - if (allow_spinning_ && !spinning_ && !spinning_.exchange(true)) { + // Maybe leave thread spinning to reduce latency. + const bool start_spinning = MaybeStartSpinning(); + if (start_spinning) { for (int i = 0; i < spin_count && !t.f; i++) { if (!cancelled_.load(std::memory_order_relaxed)) { t = GlobalSteal(); @@ -305,7 +333,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { return; } } - spinning_ = false; + StopSpinning(); } if (!t.f) { if (!WaitForWork(waiter, &t)) { -- GitLab From 92ef7960d4afda9c9fd4575343ae5181ecaada56 Mon Sep 17 00:00:00 2001 From: Antonio Sanchez Date: Thu, 30 Mar 2023 10:03:13 -0700 Subject: [PATCH 2/2] Include some changes from intel. --- Eigen/src/ThreadPool/NonBlockingThreadPool.h | 79 +++++++++++++------- test/threads_non_blocking_thread_pool.cpp | 2 +- 2 files changed, 52 insertions(+), 29 deletions(-) diff --git a/Eigen/src/ThreadPool/NonBlockingThreadPool.h b/Eigen/src/ThreadPool/NonBlockingThreadPool.h index a0db8f214..08fe0da9c 100644 --- a/Eigen/src/ThreadPool/NonBlockingThreadPool.h +++ b/Eigen/src/ThreadPool/NonBlockingThreadPool.h @@ -24,17 +24,15 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { ThreadPoolTempl(int num_threads, Environment env = Environment()) : ThreadPoolTempl(num_threads, /*max_spinning_threads=*/1, env) {} - // This constructor is for backward-compatibility only. - EIGEN_DEPRECATED - ThreadPoolTempl(int num_threads, bool allow_spinning, - Environment env = Environment()) - : ThreadPoolTempl(num_threads, allow_spinning ? 1 : 0, env) {} + ThreadPoolTempl(int num_threads, bool allow_spinning, Environment env = Environment()) + : ThreadPoolTempl(num_threads, allow_spinning ? 1 : 0, 0, env) {} - ThreadPoolTempl(int num_threads, int max_spinning_threads, + ThreadPoolTempl(int num_threads, int max_spinning_threads, int min_spinning_duration_milliseconds = 0, Environment env = Environment()) : env_(env), num_threads_(num_threads), max_spinning_threads_(max_spinning_threads), + min_spinning_duration_milliseconds_(min_spinning_duration_milliseconds), thread_data_(num_threads), all_coprimes_(num_threads), waiters_(num_threads), @@ -44,6 +42,9 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { done_(false), cancelled_(false), ec_(waiters_) { + eigen_assert(num_threads_ > 0 && "The number of threads must be > 0"); + eigen_assert(max_spinning_threads_ >= 0 && "The maximum number of spinning threads must be >= 0"); + eigen_assert(min_spinning_duration_milliseconds_ >= 0 && "The minimum spinning duration must be >= 0"); waiters_.resize(num_threads_); // Calculate coprimes of all numbers [1, num_threads]. // Coprimes are used for random walks over all threads in Steal @@ -235,6 +236,8 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { Environment env_; const int num_threads_; const int max_spinning_threads_; + const int min_spinning_duration_milliseconds_; + MaxSizeVector thread_data_; MaxSizeVector> all_coprimes_; MaxSizeVector waiters_; @@ -251,10 +254,12 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { #endif EIGEN_ALWAYS_INLINE bool MaybeStartSpinning() { + if (max_spinning_threads_ >= num_threads_) { + return true; + } int num_spinning = spinning_.load(std::memory_order_relaxed); while (num_spinning < max_spinning_threads_) { - if (spinning_.compare_exchange_weak(num_spinning, num_spinning + 1, - std::memory_order_relaxed)) { + if (spinning_.compare_exchange_weak(num_spinning, num_spinning + 1, std::memory_order_relaxed)) { return true; } } @@ -262,10 +267,41 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { } EIGEN_ALWAYS_INLINE void StopSpinning() { - int num_spinning = spinning_.load(std::memory_order_relaxed); - while (!spinning_.compare_exchange_weak(num_spinning, num_spinning - 1, - std::memory_order_relaxed)) { + if (max_spinning_threads_ < num_threads_) { + int num_spinning = spinning_.load(std::memory_order_relaxed); + while (num_spinning > 0 && + !spinning_.compare_exchange_weak(num_spinning, num_spinning - 1, std::memory_order_relaxed)) { + } + } + } + + template + EIGEN_ALWAYS_INLINE Task TryGrabbingTask(int spin_count, GrabTaskFunctor grab_task) { + Task t = grab_task(); + if (!t.f && max_spinning_threads_ > 0) { + // To reduce sys-call overhead, only call clock routines when necessary. + std::chrono::time_point spin_start; + int spin_duration = 0; + if (min_spinning_duration_milliseconds_ > 0) { + spin_start = std::chrono::steady_clock::now(); + } + do { + // Iteratively try a small fixed number of times - further reducing sys-calls. + for (int i = 0; i < spin_count; i++) { + t = grab_task(); + if (t.f || cancelled_.load(std::memory_order_relaxed)) { + return t; + } + } + // Check current spin duration. + if (min_spinning_duration_milliseconds_ > 0) { + auto spin_stop = std::chrono::steady_clock::now(); + spin_duration = + static_cast(std::chrono::duration_cast(spin_stop - spin_start).count()); + } + } while (spin_duration < min_spinning_duration_milliseconds_); } + return t; } // Main worker thread loop. @@ -290,8 +326,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { // proportional to num_threads_ and we assume that new work is scheduled at // a constant rate, so we set spin_count to 5000 / num_threads_. The // constant was picked based on a fair dice roll, tune it. - const int spin_count = - max_spinning_threads_ > 0 && num_threads_ > 0 ? 5000 / num_threads_ : 0; + const int spin_count = num_threads_ > 0 ? 5000 / num_threads_ : 0; if (num_threads_ == 1) { // For num_threads_ == 1 there is no point in going through the expensive // steal loop. Moreover, since NonEmptyQueueIndex() calls PopBack() on the @@ -300,12 +335,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { // counter-productive for the types of I/O workloads the single thread // pools tend to be used for. while (!cancelled_) { - Task t = q.PopFront(); - for (int i = 0; i < spin_count && !t.f; i++) { - if (!cancelled_.load(std::memory_order_relaxed)) { - t = q.PopFront(); - } - } + Task t = TryGrabbingTask(spin_count, [&q]() { return q.PopFront(); }); if (!t.f) { if (!WaitForWork(waiter, &t)) { return; @@ -324,15 +354,8 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { t = GlobalSteal(); if (!t.f) { // Maybe leave thread spinning to reduce latency. - const bool start_spinning = MaybeStartSpinning(); - if (start_spinning) { - for (int i = 0; i < spin_count && !t.f; i++) { - if (!cancelled_.load(std::memory_order_relaxed)) { - t = GlobalSteal(); - } else { - return; - } - } + if (MaybeStartSpinning()) { + t = TryGrabbingTask(spin_count, [this]() { return this->GlobalSteal(); }); StopSpinning(); } if (!t.f) { diff --git a/test/threads_non_blocking_thread_pool.cpp b/test/threads_non_blocking_thread_pool.cpp index e805cf2c4..d9a07f165 100644 --- a/test/threads_non_blocking_thread_pool.cpp +++ b/test/threads_non_blocking_thread_pool.cpp @@ -15,7 +15,7 @@ static void test_create_destroy_empty_pool() { // Just create and destroy the pool. This will wind up and tear down worker // threads. Ensure there are no issues in that logic. - for (int i = 0; i < 16; ++i) { + for (int i = 1; i < 16; ++i) { ThreadPool tp(i); } } -- GitLab