Pyrogenesis HEAD
Pyrogenesis, a RTS Engine
Future.h
Go to the documentation of this file.
1/* Copyright (C) 2024 Wildfire Games.
2 * This file is part of 0 A.D.
3 *
4 * 0 A.D. is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 2 of the License, or
7 * (at your option) any later version.
8 *
9 * 0 A.D. is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with 0 A.D. If not, see <http://www.gnu.org/licenses/>.
16 */
17
18#ifndef INCLUDED_FUTURE
19#define INCLUDED_FUTURE
20
21#include "ps/FutureForward.h"
22
23#include <atomic>
24#include <condition_variable>
25#include <exception>
26#include <functional>
27#include <mutex>
28#include <optional>
29#include <type_traits>
30
31template<typename Callback>
32class PackagedTask;
33
35{
36enum class Status
37{
38 PENDING,
39 STARTED,
40 DONE,
42};
43
44template<typename T>
45using ResultHolder = std::conditional_t<std::is_void_v<T>, std::nullopt_t, std::optional<T>>;
46
47/**
48 * Responsible for syncronization between the task and the receiving thread.
49 */
50template<typename ResultType>
52{
53 static constexpr bool VoidResult = std::is_same_v<ResultType, void>;
54public:
55 Receiver() = default;
57 {
59 }
60
61 Receiver(const Receiver&) = delete;
62 Receiver(Receiver&&) = delete;
63
64 bool IsDoneOrCanceled() const
65 {
67 }
68
69 void Wait()
70 {
71 // Fast path: we're already done.
72 if (IsDoneOrCanceled())
73 return;
74 // Slow path: we aren't done when we run the above check. Lock and wait until we are.
75 std::unique_lock<std::mutex> lock(m_Mutex);
76 m_ConditionVariable.wait(lock, [this]() -> bool { return IsDoneOrCanceled(); });
77 }
78
79 /**
80 * If the task is pending, cancel it: the status becomes CANCELED and if the task was completed, the result is destroyed.
81 * @return true if the task was indeed cancelled, false otherwise (the task is running or already done).
82 */
83 bool Cancel()
84 {
85 Status expected = Status::PENDING;
86 bool cancelled = m_Status.compare_exchange_strong(expected, Status::CANCELED);
87 // If we're done, invalidate, if we're pending, atomically cancel, otherwise fail.
88 if (cancelled || m_Status == Status::DONE)
89 {
92 if constexpr (!VoidResult)
93 std::get<ResultHolder<ResultType>>(m_Outcome).reset();
94 m_ConditionVariable.notify_all();
95 return cancelled;
96 }
97 return false;
98 }
99
100 /**
101 * Move the result away from the shared state, mark the future invalid.
102 */
103 ResultType GetResult()
104 {
105 // The caller must ensure that this is only called if we have a result.
106 if constexpr (!std::is_void_v<ResultType>)
107 ENSURE(std::get<ResultHolder<ResultType>>(m_Outcome).has_value() ||
108 std::get<std::exception_ptr>(m_Outcome));
109
111
112 if (std::get<std::exception_ptr>(m_Outcome))
113 std::rethrow_exception(std::get<std::exception_ptr>(m_Outcome));
114
115 if constexpr (std::is_void_v<ResultType>)
116 return;
117 else
118 {
119 ResultType ret = std::move(*std::get<ResultHolder<ResultType>>(m_Outcome));
120 std::get<ResultHolder<ResultType>>(m_Outcome).reset();
121 return ret;
122 }
123 }
124
125 std::atomic<Status> m_Status = Status::PENDING;
126 std::mutex m_Mutex;
127 std::condition_variable m_ConditionVariable;
128
129 // There can't be a result and an exception.
130 std::tuple<ResultHolder<ResultType>, std::exception_ptr> m_Outcome{std::nullopt,
131 std::exception_ptr{}};
132};
133
134/**
135 * The shared state between futures and packaged state.
136 */
137template<typename Callback>
139{
140 SharedState(Callback&& callbackFunc) :
141 callback{std::forward<Callback>(callbackFunc)}
142 {}
143
144 Callback callback;
146};
147
148} // namespace FutureSharedStateDetail
149
150/**
151 * Corresponds to std::future.
152 * Unlike std::future, Future can request the cancellation of the task that would produce the result.
153 * This makes it more similar to Java's CancellableTask or C#'s Task.
154 * The name Future was kept over Task so it would be more familiar to C++ users,
155 * but this all should be revised once Concurrency TS wraps up.
156 *
157 * Future is _not_ thread-safe. Call it from a single thread or ensure synchronization externally.
158 *
159 * The callback never runs after the @p Future is destroyed.
160 * TODO:
161 * - Handle exceptions.
162 */
163template<typename ResultType>
165{
166 template<typename T>
167 friend class PackagedTask;
168
169 static constexpr bool VoidResult = std::is_same_v<ResultType, void>;
170
172public:
173 Future() = default;
174 Future(const Future& o) = delete;
175 Future(Future&&) = default;
177 {
178 CancelOrWait();
179 m_Receiver = std::move(other.m_Receiver);
180 return *this;
181 }
183 {
184 CancelOrWait();
185 }
186
187 /**
188 * Make the future wait for the result of @a callback.
189 */
190 template<typename Callback>
191 PackagedTask<Callback> Wrap(Callback&& callback);
192
193 /**
194 * Move the result out of the future, and invalidate the future.
195 * If the future is not complete, calls Wait().
196 * If the future is canceled, asserts.
197 */
198 ResultType Get()
199 {
201
202 Wait();
203 ENSURE(m_Receiver->m_Status != Status::CANCELED);
204 // This mark the state invalid - can't call Get again.
205 return m_Receiver->GetResult();
206 }
207
208 /**
209 * @return true if the shared state is valid and has a result (i.e. Get can be called).
210 */
211 bool IsReady() const
212 {
213 return !!m_Receiver && m_Receiver->m_Status == Status::DONE;
214 }
215
216 /**
217 * @return true if the future has a shared state and it's not been invalidated, ie. pending, started or done.
218 */
219 bool Valid() const
220 {
221 return !!m_Receiver && m_Receiver->m_Status != Status::CANCELED;
222 }
223
224 void Wait()
225 {
226 if (Valid())
227 m_Receiver->Wait();
228 }
229
230 /**
231 * Cancels the task, waiting if the task is currently started.
232 * Use this function over Cancel() if you need to ensure determinism (i.e. in the simulation).
233 * @see Cancel.
234 */
236 {
237 if (!Valid())
238 return;
239 if (!m_Receiver->Cancel())
240 m_Receiver->Wait();
241 m_Receiver.reset();
242 }
243
244protected:
245 std::shared_ptr<FutureSharedStateDetail::Receiver<ResultType>> m_Receiver;
246};
247
248/**
249 * Corresponds somewhat to std::packaged_task.
250 * Like packaged_task, this holds a function acting as a promise.
251 * This type is mostly just the shared state and the call operator,
252 * handling the promise & continuation logic.
253 */
254template<typename Callback>
256{
257public:
258 PackagedTask() = delete;
260 m_SharedState(std::move(ss))
261 {}
262
264 {
266 if (!m_SharedState->receiver.m_Status.compare_exchange_strong(expected,
268 {
269 return;
270 }
271
272 try
273 {
274 using ResultType = std::invoke_result_t<Callback>;
275 if constexpr (std::is_void_v<ResultType>)
276 m_SharedState->callback();
277 else
278 std::get<FutureSharedStateDetail::ResultHolder<ResultType>>(
279 m_SharedState->receiver.m_Outcome).emplace(m_SharedState->callback());
280 }
281 catch(...)
282 {
283 std::get<std::exception_ptr>(m_SharedState->receiver.m_Outcome) =
284 std::current_exception();
285 }
286
287 // Because we might have threads waiting on us, we need to make sure that they either:
288 // - don't wait on our condition variable
289 // - receive the notification when we're done.
290 // This requires locking the mutex (@see Wait).
291 {
292 std::lock_guard<std::mutex> lock(m_SharedState->receiver.m_Mutex);
294 }
295
296 m_SharedState->receiver.m_ConditionVariable.notify_all();
297
298 // We no longer need the shared state, drop it immediately.
299 m_SharedState.reset();
300 }
301
302 void Cancel()
303 {
304 m_SharedState->Cancel();
305 m_SharedState.reset();
306 }
307
308private:
309 std::shared_ptr<FutureSharedStateDetail::SharedState<Callback>> m_SharedState;
310};
311
312template<typename ResultType>
313template<typename Callback>
315{
316 static_assert(std::is_same_v<std::invoke_result_t<Callback>, ResultType>,
317 "The return type of the wrapped function is not the same as the type the Future expects.");
318 CancelOrWait();
319 auto temp = std::make_shared<FutureSharedStateDetail::SharedState<Callback>>(std::move(callback));
320 m_Receiver = {temp, &temp->receiver};
321 return PackagedTask<Callback>(std::move(temp));
322}
323
324#endif // INCLUDED_FUTURE
Responsible for syncronization between the task and the receiving thread.
Definition: Future.h:52
std::condition_variable m_ConditionVariable
Definition: Future.h:127
static constexpr bool VoidResult
Definition: Future.h:53
~Receiver()
Definition: Future.h:56
bool Cancel()
If the task is pending, cancel it: the status becomes CANCELED and if the task was completed,...
Definition: Future.h:83
std::mutex m_Mutex
Definition: Future.h:126
Receiver(const Receiver &)=delete
std::atomic< Status > m_Status
Definition: Future.h:125
ResultType GetResult()
Move the result away from the shared state, mark the future invalid.
Definition: Future.h:103
bool IsDoneOrCanceled() const
Definition: Future.h:64
void Wait()
Definition: Future.h:69
std::tuple< ResultHolder< ResultType >, std::exception_ptr > m_Outcome
Definition: Future.h:130
Corresponds to std::future.
Definition: Future.h:165
static constexpr bool VoidResult
Definition: Future.h:169
bool Valid() const
Definition: Future.h:219
ResultType Get()
Move the result out of the future, and invalidate the future.
Definition: Future.h:198
~Future()
Definition: Future.h:182
PackagedTask< Callback > Wrap(Callback &&callback)
Make the future wait for the result of callback.
Definition: Future.h:314
Future(Future &&)=default
Future & operator=(Future &&other)
Definition: Future.h:176
void Wait()
Definition: Future.h:224
bool IsReady() const
Definition: Future.h:211
Future()=default
void CancelOrWait()
Cancels the task, waiting if the task is currently started.
Definition: Future.h:235
std::shared_ptr< FutureSharedStateDetail::Receiver< ResultType > > m_Receiver
Definition: Future.h:245
Future(const Future &o)=delete
Corresponds somewhat to std::packaged_task.
Definition: Future.h:256
PackagedTask()=delete
std::shared_ptr< FutureSharedStateDetail::SharedState< Callback > > m_SharedState
Definition: Future.h:309
void operator()()
Definition: Future.h:263
PackagedTask(std::shared_ptr< FutureSharedStateDetail::SharedState< Callback > > ss)
Definition: Future.h:259
void Cancel()
Definition: Future.h:302
#define ENSURE(expr)
ensure the expression <expr> evaluates to non-zero.
Definition: debug.h:277
Definition: Future.h:35
std::conditional_t< std::is_void_v< T >, std::nullopt_t, std::optional< T > > ResultHolder
Definition: Future.h:45
Status
Definition: Future.h:37
Definition: ShaderDefines.cpp:31
The shared state between futures and packaged state.
Definition: Future.h:139
Receiver< std::invoke_result_t< Callback > > receiver
Definition: Future.h:145
Callback callback
Definition: Future.h:144
SharedState(Callback &&callbackFunc)
Definition: Future.h:140