Line data Source code
1 : /* Copyright (C) 2022 Wildfire Games.
2 : * This file is part of 0 A.D.
3 : *
4 : * 0 A.D. is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 2 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * 0 A.D. is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with 0 A.D. If not, see <http://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "precompiled.h"
19 :
20 : #include "TaskManager.h"
21 :
22 : #include "lib/debug.h"
23 : #include "maths/MathUtil.h"
24 : #include "ps/CLogger.h"
25 : #include "ps/ConfigDB.h"
26 : #include "ps/Threading.h"
27 : #include "ps/ThreadUtil.h"
28 : #include "ps/Profiler2.h"
29 :
30 : #include <condition_variable>
31 : #include <deque>
32 : #include <functional>
33 : #include <memory>
34 : #include <mutex>
35 : #include <thread>
36 :
37 : namespace Threading
38 : {
39 :
40 : namespace
41 : {
42 : /**
43 : * Minimum number of TaskManager workers.
44 : */
45 : constexpr size_t MIN_WORKERS = 3;
46 :
47 : /**
48 : * Maximum number of TaskManager workers.
49 : */
50 : constexpr size_t MAX_WORKERS = 32;
51 :
52 1 : size_t GetDefaultNumberOfWorkers()
53 : {
54 1 : const size_t hardware_concurrency = std::thread::hardware_concurrency();
55 1 : return hardware_concurrency ? Clamp(hardware_concurrency - 1, MIN_WORKERS, MAX_WORKERS) : MIN_WORKERS;
56 : }
57 :
58 : } // anonymous namespace
59 :
60 1 : std::unique_ptr<TaskManager> g_TaskManager;
61 :
62 : class Thread;
63 :
64 : using QueueItem = std::function<void()>;
65 :
66 : /**
67 : * Light wrapper around std::thread. Ensures Join has been called.
68 : */
69 : class Thread
70 : {
71 : public:
72 3 : Thread() = default;
73 : Thread(const Thread&) = delete;
74 : Thread(Thread&&) = delete;
75 :
76 : template<typename T, void(T::* callable)()>
77 3 : void Start(T* object)
78 : {
79 3 : m_Thread = std::thread(HandleExceptions<DoStart<T, callable>>::Wrapper, object);
80 3 : }
81 : template<typename T, void(T::* callable)()>
82 : static void DoStart(T* object);
83 :
84 : protected:
85 3 : ~Thread()
86 3 : {
87 3 : ENSURE(!m_Thread.joinable());
88 3 : }
89 :
90 : std::thread m_Thread;
91 : std::atomic<bool> m_Kill = false;
92 : };
93 :
94 : /**
95 : * Worker thread: process the taskManager queues until killed.
96 : */
97 : class WorkerThread : public Thread
98 : {
99 : public:
100 : WorkerThread(TaskManager::Impl& taskManager);
101 : ~WorkerThread();
102 :
103 : /**
104 : * Wake the worker.
105 : */
106 : void Wake();
107 :
108 : protected:
109 : void RunUntilDeath();
110 :
111 : std::mutex m_Mutex;
112 : std::condition_variable m_ConditionVariable;
113 :
114 : TaskManager::Impl& m_TaskManager;
115 : };
116 :
117 : /**
118 : * PImpl-ed implementation of the Task manager.
119 : *
120 : * The normal priority queue is processed first, the low priority only if there are no higher-priority tasks
121 : */
122 : class TaskManager::Impl
123 : {
124 : friend class TaskManager;
125 : friend class WorkerThread;
126 : public:
127 1 : Impl() = default;
128 1 : ~Impl()
129 1 : {
130 1 : ClearQueue();
131 1 : m_Workers.clear();
132 1 : }
133 :
134 : /**
135 : * 2-phase init to avoid having to think too hard about the order of class members.
136 : */
137 : void SetupWorkers(size_t numberOfWorkers);
138 :
139 : /**
140 : * Push a task on the global queue.
141 : * Takes ownership of @a task.
142 : * May be called from any thread.
143 : */
144 : void PushTask(std::function<void()>&& task, TaskPriority priority);
145 :
146 : protected:
147 : void ClearQueue();
148 :
149 : template<TaskPriority Priority>
150 : bool PopTask(std::function<void()>& taskOut);
151 :
152 : std::atomic<bool> m_HasWork = false;
153 : std::atomic<bool> m_HasLowPriorityWork = false;
154 : std::mutex m_GlobalMutex;
155 : std::mutex m_GlobalLowPriorityMutex;
156 : std::deque<QueueItem> m_GlobalQueue;
157 : std::deque<QueueItem> m_GlobalLowPriorityQueue;
158 :
159 : // Ideally this would be a vector, since it does get iterated, but that requires movable types.
160 : std::deque<WorkerThread> m_Workers;
161 : };
162 :
163 1 : TaskManager::TaskManager() : TaskManager(GetDefaultNumberOfWorkers())
164 : {
165 1 : }
166 :
167 1 : TaskManager::TaskManager(size_t numberOfWorkers)
168 1 : : m{std::make_unique<Impl>()}
169 : {
170 1 : numberOfWorkers = Clamp<size_t>(numberOfWorkers, MIN_WORKERS, MAX_WORKERS);
171 1 : m->SetupWorkers(numberOfWorkers);
172 1 : }
173 :
174 : TaskManager::~TaskManager() = default;
175 :
176 1 : void TaskManager::Impl::SetupWorkers(size_t numberOfWorkers)
177 : {
178 4 : for (size_t i = 0; i < numberOfWorkers; ++i)
179 3 : m_Workers.emplace_back(*this);
180 1 : }
181 :
182 1 : void TaskManager::ClearQueue() { m->ClearQueue(); }
183 2 : void TaskManager::Impl::ClearQueue()
184 : {
185 : {
186 4 : std::lock_guard<std::mutex> lock(m_GlobalMutex);
187 2 : m_GlobalQueue.clear();
188 : }
189 : {
190 4 : std::lock_guard<std::mutex> lock(m_GlobalLowPriorityMutex);
191 2 : m_GlobalLowPriorityQueue.clear();
192 : }
193 2 : }
194 :
195 4 : size_t TaskManager::GetNumberOfWorkers() const
196 : {
197 4 : return m->m_Workers.size();
198 : }
199 :
200 99993 : void TaskManager::DoPushTask(std::function<void()>&& task, TaskPriority priority)
201 : {
202 99993 : m->PushTask(std::move(task), priority);
203 99185 : }
204 :
205 99904 : void TaskManager::Impl::PushTask(std::function<void()>&& task, TaskPriority priority)
206 : {
207 99904 : std::mutex& mutex = priority == TaskPriority::NORMAL ? m_GlobalMutex : m_GlobalLowPriorityMutex;
208 99904 : std::deque<QueueItem>& queue = priority == TaskPriority::NORMAL ? m_GlobalQueue : m_GlobalLowPriorityQueue;
209 99904 : std::atomic<bool>& hasWork = priority == TaskPriority::NORMAL ? m_HasWork : m_HasLowPriorityWork;
210 : {
211 199898 : std::lock_guard<std::mutex> lock(mutex);
212 99913 : queue.emplace_back(std::move(task));
213 99749 : hasWork = true;
214 : }
215 :
216 395587 : for (WorkerThread& worker : m_Workers)
217 295637 : worker.Wake();
218 99176 : }
219 :
220 : template<TaskPriority Priority>
221 133287 : bool TaskManager::Impl::PopTask(std::function<void()>& taskOut)
222 : {
223 133287 : std::mutex& mutex = Priority == TaskPriority::NORMAL ? m_GlobalMutex : m_GlobalLowPriorityMutex;
224 133287 : std::deque<QueueItem>& queue = Priority == TaskPriority::NORMAL ? m_GlobalQueue : m_GlobalLowPriorityQueue;
225 133287 : std::atomic<bool>& hasWork = Priority == TaskPriority::NORMAL ? m_HasWork : m_HasLowPriorityWork;
226 :
227 : // Particularly critical section since we're locking the global queue.
228 266647 : std::lock_guard<std::mutex> globalLock(mutex);
229 133360 : if (!queue.empty())
230 : {
231 100009 : taskOut = std::move(queue.front());
232 100009 : queue.pop_front();
233 100009 : hasWork = !queue.empty();
234 100009 : return true;
235 : }
236 33351 : return false;
237 : }
238 :
239 1 : void TaskManager::Initialise()
240 : {
241 1 : if (!g_TaskManager)
242 1 : g_TaskManager = std::make_unique<TaskManager>();
243 1 : }
244 :
245 7 : TaskManager& TaskManager::Instance()
246 : {
247 7 : ENSURE(g_TaskManager);
248 7 : return *g_TaskManager;
249 : }
250 :
251 : // Thread definition
252 :
253 3 : WorkerThread::WorkerThread(TaskManager::Impl& taskManager)
254 3 : : m_TaskManager(taskManager)
255 : {
256 3 : Start<WorkerThread, &WorkerThread::RunUntilDeath>(this);
257 3 : }
258 :
259 6 : WorkerThread::~WorkerThread()
260 : {
261 3 : m_Kill = true;
262 3 : m_ConditionVariable.notify_one();
263 3 : if (m_Thread.joinable())
264 3 : m_Thread.join();
265 3 : }
266 :
267 295635 : void WorkerThread::Wake()
268 : {
269 295635 : m_ConditionVariable.notify_one();
270 294381 : }
271 :
272 3 : void WorkerThread::RunUntilDeath()
273 : {
274 : // The profiler does better if the names are unique.
275 : static std::atomic<int> n = 0;
276 6 : std::string name = "Task Mgr #" + std::to_string(n++);
277 3 : debug_SetThreadName(name.c_str());
278 3 : g_Profiler2.RegisterCurrentThread(name);
279 :
280 :
281 6 : std::function<void()> task;
282 3 : bool hasTask = false;
283 6 : std::unique_lock<std::mutex> lock(m_Mutex, std::defer_lock);
284 200033 : while (!m_Kill)
285 : {
286 100020 : lock.lock();
287 333684 : m_ConditionVariable.wait(lock, [this](){
288 233678 : return m_Kill || m_TaskManager.m_HasWork || m_TaskManager.m_HasLowPriorityWork;
289 100058 : });
290 99967 : lock.unlock();
291 :
292 100013 : if (m_Kill)
293 3 : break;
294 :
295 : // Fetch work from the global queues.
296 99941 : hasTask = m_TaskManager.PopTask<TaskPriority::NORMAL>(task);
297 99865 : if (!hasTask)
298 33343 : hasTask = m_TaskManager.PopTask<TaskPriority::LOW>(task);
299 99807 : if (hasTask)
300 99798 : task();
301 : }
302 3 : }
303 :
304 : // Defined here - needs access to derived types.
305 : template<typename T, void(T::* callable)()>
306 3 : void Thread::DoStart(T* object)
307 : {
308 3 : std::invoke(callable, object);
309 3 : }
310 :
311 3 : } // namespace Threading
|