LCOV - code coverage report
Current view: top level - source/ps - TaskManager.cpp (source / functions) Hit Total Coverage
Test: 0 A.D. test coverage report Lines: 120 127 94.5 %
Date: 2022-06-14 00:41:00 Functions: 18 22 81.8 %

          Line data    Source code
       1             : /* Copyright (C) 2022 Wildfire Games.
       2             :  * This file is part of 0 A.D.
       3             :  *
       4             :  * 0 A.D. is free software: you can redistribute it and/or modify
       5             :  * it under the terms of the GNU General Public License as published by
       6             :  * the Free Software Foundation, either version 2 of the License, or
       7             :  * (at your option) any later version.
       8             :  *
       9             :  * 0 A.D. is distributed in the hope that it will be useful,
      10             :  * but WITHOUT ANY WARRANTY; without even the implied warranty of
      11             :  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      12             :  * GNU General Public License for more details.
      13             :  *
      14             :  * You should have received a copy of the GNU General Public License
      15             :  * along with 0 A.D.  If not, see <http://www.gnu.org/licenses/>.
      16             :  */
      17             : 
      18             : #include "precompiled.h"
      19             : 
      20             : #include "TaskManager.h"
      21             : 
      22             : #include "lib/debug.h"
      23             : #include "maths/MathUtil.h"
      24             : #include "ps/CLogger.h"
      25             : #include "ps/ConfigDB.h"
      26             : #include "ps/Threading.h"
      27             : #include "ps/ThreadUtil.h"
      28             : #include "ps/Profiler2.h"
      29             : 
      30             : #include <condition_variable>
      31             : #include <deque>
      32             : #include <functional>
      33             : #include <memory>
      34             : #include <mutex>
      35             : #include <thread>
      36             : 
      37             : namespace Threading
      38             : {
      39             : /**
      40             :  * Minimum number of TaskManager workers.
      41             :  */
      42             : static constexpr size_t MIN_THREADS = 3;
      43             : 
      44             : /**
      45             :  * Maximum number of TaskManager workers.
      46             :  */
      47             : static constexpr size_t MAX_THREADS = 32;
      48             : 
      49             : std::unique_ptr<TaskManager> g_TaskManager;
      50             : 
      51             : class Thread;
      52             : 
      53             : using QueueItem = std::function<void()>;
      54             : 
      55             : /**
      56             :  * Light wrapper around std::thread. Ensures Join has been called.
      57             :  */
      58             : class Thread
      59             : {
      60             : public:
      61           3 :     Thread() = default;
      62             :     Thread(const Thread&) = delete;
      63             :     Thread(Thread&&) = delete;
      64             : 
      65             :     template<typename T, void(T::* callable)()>
      66           3 :     void Start(T* object)
      67             :     {
      68           3 :         m_Thread = std::thread(HandleExceptions<DoStart<T, callable>>::Wrapper, object);
      69           3 :     }
      70             :     template<typename T, void(T::* callable)()>
      71             :     static void DoStart(T* object);
      72             : 
      73             : protected:
      74           3 :     ~Thread()
      75           3 :     {
      76           6 :         ENSURE(!m_Thread.joinable());
      77           3 :     }
      78             : 
      79             :     std::thread m_Thread;
      80             :     std::atomic<bool> m_Kill = false;
      81             : };
      82             : 
      83             : /**
      84             :  * Worker thread: process the taskManager queues until killed.
      85             :  */
      86             : class WorkerThread : public Thread
      87             : {
      88             : public:
      89             :     WorkerThread(TaskManager::Impl& taskManager);
      90             :     ~WorkerThread();
      91             : 
      92             :     /**
      93             :      * Wake the worker.
      94             :      */
      95             :     void Wake();
      96             : 
      97             : protected:
      98             :     void RunUntilDeath();
      99             : 
     100             :     std::mutex m_Mutex;
     101             :     std::condition_variable m_ConditionVariable;
     102             : 
     103             :     TaskManager::Impl& m_TaskManager;
     104             : };
     105             : 
     106             : /**
     107             :  * PImpl-ed implementation of the Task manager.
     108             :  *
     109             :  * The normal priority queue is processed first, the low priority only if there are no higher-priority tasks
     110             :  */
     111             : class TaskManager::Impl
     112             : {
     113             :     friend class TaskManager;
     114             :     friend class WorkerThread;
     115             : public:
     116             :     Impl(TaskManager& backref);
     117           1 :     ~Impl()
     118           1 :     {
     119           1 :         ClearQueue();
     120           1 :         m_Workers.clear();
     121           1 :     }
     122             : 
     123             :     /**
     124             :      * 2-phase init to avoid having to think too hard about the order of class members.
     125             :      */
     126             :     void SetupWorkers(size_t numberOfWorkers);
     127             : 
     128             :     /**
     129             :      * Push a task on the global queue.
     130             :      * Takes ownership of @a task.
     131             :      * May be called from any thread.
     132             :      */
     133             :     void PushTask(std::function<void()>&& task, TaskPriority priority);
     134             : 
     135             : protected:
     136             :     void ClearQueue();
     137             : 
     138             :     template<TaskPriority Priority>
     139             :     bool PopTask(std::function<void()>& taskOut);
     140             : 
     141             :     // Back reference (keep this first).
     142             :     TaskManager& m_TaskManager;
     143             : 
     144             :     std::atomic<bool> m_HasWork = false;
     145             :     std::atomic<bool> m_HasLowPriorityWork = false;
     146             :     std::mutex m_GlobalMutex;
     147             :     std::mutex m_GlobalLowPriorityMutex;
     148             :     std::deque<QueueItem> m_GlobalQueue;
     149             :     std::deque<QueueItem> m_GlobalLowPriorityQueue;
     150             : 
     151             :     // Ideally this would be a vector, since it does get iterated, but that requires movable types.
     152             :     std::deque<WorkerThread> m_Workers;
     153             : 
     154             :     // Round-robin counter for GetWorker.
     155             :     mutable size_t m_RoundRobinIdx = 0;
     156             : };
     157             : 
     158           1 : TaskManager::TaskManager() : TaskManager(std::thread::hardware_concurrency() - 1)
     159             : {
     160           0 : }
     161             : 
     162           1 : TaskManager::TaskManager(size_t numberOfWorkers)
     163             : {
     164           1 :     m = std::make_unique<Impl>(*this);
     165           1 :     numberOfWorkers = Clamp<size_t>(numberOfWorkers, MIN_THREADS, MAX_THREADS);
     166           2 :     m->SetupWorkers(numberOfWorkers);
     167           1 : }
     168             : 
     169             : TaskManager::~TaskManager() = default;
     170             : 
     171           1 : TaskManager::Impl::Impl(TaskManager& backref)
     172           5 :     : m_TaskManager(backref)
     173             : {
     174           1 : }
     175             : 
     176           0 : void TaskManager::Impl::SetupWorkers(size_t numberOfWorkers)
     177             : {
     178           4 :     for (size_t i = 0; i < numberOfWorkers; ++i)
     179           3 :         m_Workers.emplace_back(*this);
     180           0 : }
     181             : 
     182           2 : void TaskManager::ClearQueue() { m->ClearQueue(); }
     183           2 : void TaskManager::Impl::ClearQueue()
     184             : {
     185           2 :     {
     186           2 :         std::lock_guard<std::mutex> lock(m_GlobalMutex);
     187           2 :         m_GlobalQueue.clear();
     188             :     }
     189           2 :     {
     190           2 :         std::lock_guard<std::mutex> lock(m_GlobalLowPriorityMutex);
     191           2 :         m_GlobalLowPriorityQueue.clear();
     192             :     }
     193           2 : }
     194             : 
     195           4 : size_t TaskManager::GetNumberOfWorkers() const
     196             : {
     197          12 :     return m->m_Workers.size();
     198             : }
     199             : 
     200       99070 : void TaskManager::DoPushTask(std::function<void()>&& task, TaskPriority priority)
     201             : {
     202      198140 :     m->PushTask(std::move(task), priority);
     203       98587 : }
     204             : 
     205       99632 : void TaskManager::Impl::PushTask(std::function<void()>&& task, TaskPriority priority)
     206             : {
     207       99632 :     std::mutex& mutex = priority == TaskPriority::NORMAL ? m_GlobalMutex : m_GlobalLowPriorityMutex;
     208       99632 :     std::deque<QueueItem>& queue = priority == TaskPriority::NORMAL ? m_GlobalQueue : m_GlobalLowPriorityQueue;
     209       99632 :     std::atomic<bool>& hasWork = priority == TaskPriority::NORMAL ? m_HasWork : m_HasLowPriorityWork;
     210       99632 :     {
     211       99632 :         std::lock_guard<std::mutex> lock(mutex);
     212       98863 :         queue.emplace_back(std::move(task));
     213      198624 :         hasWork = true;
     214             :     }
     215             : 
     216      985168 :     for (WorkerThread& worker : m_Workers)
     217      295428 :         worker.Wake();
     218       98939 : }
     219             : 
     220             : template<TaskPriority Priority>
     221      133094 : bool TaskManager::Impl::PopTask(std::function<void()>& taskOut)
     222             : {
     223      133094 :     std::mutex& mutex = Priority == TaskPriority::NORMAL ? m_GlobalMutex : m_GlobalLowPriorityMutex;
     224      133094 :     std::deque<QueueItem>& queue = Priority == TaskPriority::NORMAL ? m_GlobalQueue : m_GlobalLowPriorityQueue;
     225      133094 :     std::atomic<bool>& hasWork = Priority == TaskPriority::NORMAL ? m_HasWork : m_HasLowPriorityWork;
     226             : 
     227             :     // Particularly critical section since we're locking the global queue.
     228      266454 :     std::lock_guard<std::mutex> globalLock(mutex);
     229      266720 :     if (!queue.empty())
     230             :     {
     231      200018 :         taskOut = std::move(queue.front());
     232      100009 :         queue.pop_front();
     233      200018 :         hasWork = !queue.empty();
     234      100009 :         return true;
     235             :     }
     236             :     return false;
     237             : }
     238       33324 : 
     239             : void TaskManager::Initialise()
     240       33324 : {
     241       33324 :     if (!g_TaskManager)
     242       33324 :         g_TaskManager = std::make_unique<TaskManager>();
     243             : }
     244             : 
     245       66667 : TaskManager& TaskManager::Instance()
     246       66686 : {
     247             :     ENSURE(g_TaskManager);
     248       66670 :     return *g_TaskManager;
     249       33335 : }
     250       66670 : 
     251       33335 : // Thread definition
     252             : 
     253             : WorkerThread::WorkerThread(TaskManager::Impl& taskManager)
     254             :     : m_TaskManager(taskManager)
     255       99770 : {
     256             :     Start<WorkerThread, &WorkerThread::RunUntilDeath>(this);
     257       99770 : }
     258       99770 : 
     259       99770 : WorkerThread::~WorkerThread()
     260             : {
     261             :     m_Kill = true;
     262      199787 :     m_ConditionVariable.notify_one();
     263      200034 :     if (m_Thread.joinable())
     264             :         m_Thread.join();
     265      133348 : }
     266       66674 : 
     267      133348 : void WorkerThread::Wake()
     268       66674 : {
     269             :     m_ConditionVariable.notify_one();
     270             : }
     271             : 
     272             : void WorkerThread::RunUntilDeath()
     273           1 : {
     274             :     // The profiler does better if the names are unique.
     275           2 :     static std::atomic<int> n = 0;
     276           1 :     std::string name = "Task Mgr #" + std::to_string(n++);
     277           1 :     debug_SetThreadName(name.c_str());
     278             :     g_Profiler2.RegisterCurrentThread(name);
     279           7 : 
     280             : 
     281          14 :     std::function<void()> task;
     282          14 :     bool hasTask = false;
     283             :     std::unique_lock<std::mutex> lock(m_Mutex, std::defer_lock);
     284             :     while (!m_Kill)
     285             :     {
     286             :         lock.lock();
     287           3 :         m_ConditionVariable.wait(lock, [this](){
     288           9 :             return m_Kill || m_TaskManager.m_HasWork || m_TaskManager.m_HasLowPriorityWork;
     289             :         });
     290           3 :         lock.unlock();
     291           3 : 
     292             :         if (m_Kill)
     293           3 :             break;
     294             : 
     295           3 :         // Fetch work from the global queues.
     296           3 :         hasTask = m_TaskManager.PopTask<TaskPriority::NORMAL>(task);
     297           6 :         if (!hasTask)
     298           3 :             hasTask = m_TaskManager.PopTask<TaskPriority::LOW>(task);
     299           3 :         if (hasTask)
     300             :             task();
     301           0 :     }
     302             : }
     303      295067 : 
     304           0 : // Defined here - needs access to derived types.
     305             : template<typename T, void(T::* callable)()>
     306           3 : void Thread::DoStart(T* object)
     307             : {
     308             :     std::invoke(callable, object);
     309           3 : }
     310           6 : 
     311           6 : } // namespace Threading

Generated by: LCOV version 1.13