LCOV - code coverage report
Current view: top level - source/ps - TaskManager.cpp (source / functions) Hit Total Coverage
Test: 0 A.D. test coverage report Lines: 107 107 100.0 %
Date: 2023-01-19 00:18:29 Functions: 26 26 100.0 %

          Line data    Source code
       1             : /* Copyright (C) 2022 Wildfire Games.
       2             :  * This file is part of 0 A.D.
       3             :  *
       4             :  * 0 A.D. is free software: you can redistribute it and/or modify
       5             :  * it under the terms of the GNU General Public License as published by
       6             :  * the Free Software Foundation, either version 2 of the License, or
       7             :  * (at your option) any later version.
       8             :  *
       9             :  * 0 A.D. is distributed in the hope that it will be useful,
      10             :  * but WITHOUT ANY WARRANTY; without even the implied warranty of
      11             :  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      12             :  * GNU General Public License for more details.
      13             :  *
      14             :  * You should have received a copy of the GNU General Public License
      15             :  * along with 0 A.D.  If not, see <http://www.gnu.org/licenses/>.
      16             :  */
      17             : 
      18             : #include "precompiled.h"
      19             : 
      20             : #include "TaskManager.h"
      21             : 
      22             : #include "lib/debug.h"
      23             : #include "maths/MathUtil.h"
      24             : #include "ps/CLogger.h"
      25             : #include "ps/ConfigDB.h"
      26             : #include "ps/Threading.h"
      27             : #include "ps/ThreadUtil.h"
      28             : #include "ps/Profiler2.h"
      29             : 
      30             : #include <condition_variable>
      31             : #include <deque>
      32             : #include <functional>
      33             : #include <memory>
      34             : #include <mutex>
      35             : #include <thread>
      36             : 
      37             : namespace Threading
      38             : {
      39             : 
      40             : namespace
      41             : {
      42             : /**
      43             :  * Minimum number of TaskManager workers.
      44             :  */
      45             : constexpr size_t MIN_WORKERS = 3;
      46             : 
      47             : /**
      48             :  * Maximum number of TaskManager workers.
      49             :  */
      50             : constexpr size_t MAX_WORKERS = 32;
      51             : 
      52           1 : size_t GetDefaultNumberOfWorkers()
      53             : {
      54           1 :     const size_t hardware_concurrency = std::thread::hardware_concurrency();
      55           1 :     return hardware_concurrency ? Clamp(hardware_concurrency - 1, MIN_WORKERS, MAX_WORKERS) : MIN_WORKERS;
      56             : }
      57             : 
      58             : } // anonymous namespace
      59             : 
      60           1 : std::unique_ptr<TaskManager> g_TaskManager;
      61             : 
      62             : class Thread;
      63             : 
      64             : using QueueItem = std::function<void()>;
      65             : 
      66             : /**
      67             :  * Light wrapper around std::thread. Ensures Join has been called.
      68             :  */
      69             : class Thread
      70             : {
      71             : public:
      72           3 :     Thread() = default;
      73             :     Thread(const Thread&) = delete;
      74             :     Thread(Thread&&) = delete;
      75             : 
      76             :     template<typename T, void(T::* callable)()>
      77           3 :     void Start(T* object)
      78             :     {
      79           3 :         m_Thread = std::thread(HandleExceptions<DoStart<T, callable>>::Wrapper, object);
      80           3 :     }
      81             :     template<typename T, void(T::* callable)()>
      82             :     static void DoStart(T* object);
      83             : 
      84             : protected:
      85           3 :     ~Thread()
      86           3 :     {
      87           3 :         ENSURE(!m_Thread.joinable());
      88           3 :     }
      89             : 
      90             :     std::thread m_Thread;
      91             :     std::atomic<bool> m_Kill = false;
      92             : };
      93             : 
      94             : /**
      95             :  * Worker thread: process the taskManager queues until killed.
      96             :  */
      97             : class WorkerThread : public Thread
      98             : {
      99             : public:
     100             :     WorkerThread(TaskManager::Impl& taskManager);
     101             :     ~WorkerThread();
     102             : 
     103             :     /**
     104             :      * Wake the worker.
     105             :      */
     106             :     void Wake();
     107             : 
     108             : protected:
     109             :     void RunUntilDeath();
     110             : 
     111             :     std::mutex m_Mutex;
     112             :     std::condition_variable m_ConditionVariable;
     113             : 
     114             :     TaskManager::Impl& m_TaskManager;
     115             : };
     116             : 
     117             : /**
     118             :  * PImpl-ed implementation of the Task manager.
     119             :  *
     120             :  * The normal priority queue is processed first, the low priority only if there are no higher-priority tasks
     121             :  */
     122             : class TaskManager::Impl
     123             : {
     124             :     friend class TaskManager;
     125             :     friend class WorkerThread;
     126             : public:
     127           1 :     Impl() = default;
     128           1 :     ~Impl()
     129           1 :     {
     130           1 :         ClearQueue();
     131           1 :         m_Workers.clear();
     132           1 :     }
     133             : 
     134             :     /**
     135             :      * 2-phase init to avoid having to think too hard about the order of class members.
     136             :      */
     137             :     void SetupWorkers(size_t numberOfWorkers);
     138             : 
     139             :     /**
     140             :      * Push a task on the global queue.
     141             :      * Takes ownership of @a task.
     142             :      * May be called from any thread.
     143             :      */
     144             :     void PushTask(std::function<void()>&& task, TaskPriority priority);
     145             : 
     146             : protected:
     147             :     void ClearQueue();
     148             : 
     149             :     template<TaskPriority Priority>
     150             :     bool PopTask(std::function<void()>& taskOut);
     151             : 
     152             :     std::atomic<bool> m_HasWork = false;
     153             :     std::atomic<bool> m_HasLowPriorityWork = false;
     154             :     std::mutex m_GlobalMutex;
     155             :     std::mutex m_GlobalLowPriorityMutex;
     156             :     std::deque<QueueItem> m_GlobalQueue;
     157             :     std::deque<QueueItem> m_GlobalLowPriorityQueue;
     158             : 
     159             :     // Ideally this would be a vector, since it does get iterated, but that requires movable types.
     160             :     std::deque<WorkerThread> m_Workers;
     161             : };
     162             : 
     163           1 : TaskManager::TaskManager() : TaskManager(GetDefaultNumberOfWorkers())
     164             : {
     165           1 : }
     166             : 
     167           1 : TaskManager::TaskManager(size_t numberOfWorkers)
     168           1 :     : m{std::make_unique<Impl>()}
     169             : {
     170           1 :     numberOfWorkers = Clamp<size_t>(numberOfWorkers, MIN_WORKERS, MAX_WORKERS);
     171           1 :     m->SetupWorkers(numberOfWorkers);
     172           1 : }
     173             : 
     174             : TaskManager::~TaskManager() = default;
     175             : 
     176           1 : void TaskManager::Impl::SetupWorkers(size_t numberOfWorkers)
     177             : {
     178           4 :     for (size_t i = 0; i < numberOfWorkers; ++i)
     179           3 :         m_Workers.emplace_back(*this);
     180           1 : }
     181             : 
     182           1 : void TaskManager::ClearQueue() { m->ClearQueue(); }
     183           2 : void TaskManager::Impl::ClearQueue()
     184             : {
     185             :     {
     186           4 :         std::lock_guard<std::mutex> lock(m_GlobalMutex);
     187           2 :         m_GlobalQueue.clear();
     188             :     }
     189             :     {
     190           4 :         std::lock_guard<std::mutex> lock(m_GlobalLowPriorityMutex);
     191           2 :         m_GlobalLowPriorityQueue.clear();
     192             :     }
     193           2 : }
     194             : 
     195           4 : size_t TaskManager::GetNumberOfWorkers() const
     196             : {
     197           4 :     return m->m_Workers.size();
     198             : }
     199             : 
     200       99993 : void TaskManager::DoPushTask(std::function<void()>&& task, TaskPriority priority)
     201             : {
     202       99993 :     m->PushTask(std::move(task), priority);
     203       99185 : }
     204             : 
     205       99904 : void TaskManager::Impl::PushTask(std::function<void()>&& task, TaskPriority priority)
     206             : {
     207       99904 :     std::mutex& mutex = priority == TaskPriority::NORMAL ? m_GlobalMutex : m_GlobalLowPriorityMutex;
     208       99904 :     std::deque<QueueItem>& queue = priority == TaskPriority::NORMAL ? m_GlobalQueue : m_GlobalLowPriorityQueue;
     209       99904 :     std::atomic<bool>& hasWork = priority == TaskPriority::NORMAL ? m_HasWork : m_HasLowPriorityWork;
     210             :     {
     211      199898 :         std::lock_guard<std::mutex> lock(mutex);
     212       99913 :         queue.emplace_back(std::move(task));
     213       99749 :         hasWork = true;
     214             :     }
     215             : 
     216      395587 :     for (WorkerThread& worker : m_Workers)
     217      295637 :         worker.Wake();
     218       99176 : }
     219             : 
     220             : template<TaskPriority Priority>
     221      133287 : bool TaskManager::Impl::PopTask(std::function<void()>& taskOut)
     222             : {
     223      133287 :     std::mutex& mutex = Priority == TaskPriority::NORMAL ? m_GlobalMutex : m_GlobalLowPriorityMutex;
     224      133287 :     std::deque<QueueItem>& queue = Priority == TaskPriority::NORMAL ? m_GlobalQueue : m_GlobalLowPriorityQueue;
     225      133287 :     std::atomic<bool>& hasWork = Priority == TaskPriority::NORMAL ? m_HasWork : m_HasLowPriorityWork;
     226             : 
     227             :     // Particularly critical section since we're locking the global queue.
     228      266647 :     std::lock_guard<std::mutex> globalLock(mutex);
     229      133360 :     if (!queue.empty())
     230             :     {
     231      100009 :         taskOut = std::move(queue.front());
     232      100009 :         queue.pop_front();
     233      100009 :         hasWork = !queue.empty();
     234      100009 :         return true;
     235             :     }
     236       33351 :     return false;
     237             : }
     238             : 
     239           1 : void TaskManager::Initialise()
     240             : {
     241           1 :     if (!g_TaskManager)
     242           1 :         g_TaskManager = std::make_unique<TaskManager>();
     243           1 : }
     244             : 
     245           7 : TaskManager& TaskManager::Instance()
     246             : {
     247           7 :     ENSURE(g_TaskManager);
     248           7 :     return *g_TaskManager;
     249             : }
     250             : 
     251             : // Thread definition
     252             : 
     253           3 : WorkerThread::WorkerThread(TaskManager::Impl& taskManager)
     254           3 :     : m_TaskManager(taskManager)
     255             : {
     256           3 :     Start<WorkerThread, &WorkerThread::RunUntilDeath>(this);
     257           3 : }
     258             : 
     259           6 : WorkerThread::~WorkerThread()
     260             : {
     261           3 :     m_Kill = true;
     262           3 :     m_ConditionVariable.notify_one();
     263           3 :     if (m_Thread.joinable())
     264           3 :         m_Thread.join();
     265           3 : }
     266             : 
     267      295635 : void WorkerThread::Wake()
     268             : {
     269      295635 :     m_ConditionVariable.notify_one();
     270      294381 : }
     271             : 
     272           3 : void WorkerThread::RunUntilDeath()
     273             : {
     274             :     // The profiler does better if the names are unique.
     275             :     static std::atomic<int> n = 0;
     276           6 :     std::string name = "Task Mgr #" + std::to_string(n++);
     277           3 :     debug_SetThreadName(name.c_str());
     278           3 :     g_Profiler2.RegisterCurrentThread(name);
     279             : 
     280             : 
     281           6 :     std::function<void()> task;
     282           3 :     bool hasTask = false;
     283           6 :     std::unique_lock<std::mutex> lock(m_Mutex, std::defer_lock);
     284      200033 :     while (!m_Kill)
     285             :     {
     286      100020 :         lock.lock();
     287      333684 :         m_ConditionVariable.wait(lock, [this](){
     288      233678 :             return m_Kill || m_TaskManager.m_HasWork || m_TaskManager.m_HasLowPriorityWork;
     289      100058 :         });
     290       99967 :         lock.unlock();
     291             : 
     292      100013 :         if (m_Kill)
     293           3 :             break;
     294             : 
     295             :         // Fetch work from the global queues.
     296       99941 :         hasTask = m_TaskManager.PopTask<TaskPriority::NORMAL>(task);
     297       99865 :         if (!hasTask)
     298       33343 :             hasTask = m_TaskManager.PopTask<TaskPriority::LOW>(task);
     299       99807 :         if (hasTask)
     300       99798 :             task();
     301             :     }
     302           3 : }
     303             : 
     304             : // Defined here - needs access to derived types.
     305             : template<typename T, void(T::* callable)()>
     306           3 : void Thread::DoStart(T* object)
     307             : {
     308           3 :     std::invoke(callable, object);
     309           3 : }
     310             : 
     311           3 : } // namespace Threading

Generated by: LCOV version 1.13