Pyrogenesis HEAD
Pyrogenesis, a RTS Engine
io.h
Go to the documentation of this file.
1/* Copyright (C) 2022 Wildfire Games.
2 *
3 * Permission is hereby granted, free of charge, to any person obtaining
4 * a copy of this software and associated documentation files (the
5 * "Software"), to deal in the Software without restriction, including
6 * without limitation the rights to use, copy, modify, merge, publish,
7 * distribute, sublicense, and/or sell copies of the Software, and to
8 * permit persons to whom the Software is furnished to do so, subject to
9 * the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included
12 * in all copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
15 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
17 * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
18 * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
19 * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
20 * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21 */
22
23/*
24 * provide asynchronous and synchronous I/O with hooks to allow
25 * overlapped processing or progress reporting.
26 */
27
28#ifndef INCLUDED_IO
29#define INCLUDED_IO
30
31#include "lib/config2.h"
32#include "lib/alignment.h"
33#include "lib/bits.h"
34#include "lib/timer.h"
35#include "lib/file/file.h"
36#include "lib/sysdep/rtl.h"
37#include "lib/sysdep/filesystem.h" // wtruncate
38#include "lib/posix/posix_aio.h" // LIO_READ, LIO_WRITE
39
40namespace ERR
41{
42 const Status IO = -110301;
43}
44
45namespace io {
46
48{
49 void operator()(void* pointer) { rtl_FreeAligned(pointer); }
50};
51
52using BufferPtr = std::unique_ptr<u8, FreeAligned>;
53
54// @return memory suitable for use as an I/O buffer (address is a
55// multiple of alignment, size is rounded up to a multiple of alignment)
56// @param alignment is automatically increased if required.
57//
58// use this instead of the file cache for write buffers that are
59// never reused (avoids displacing other items).
60static inline io::BufferPtr Allocate(size_t size, size_t alignment = maxSectorSize)
61{
62 ENSURE(is_pow2(alignment));
63 alignment = std::max(alignment, allocationAlignment);
64
65 u8* p = static_cast<u8*>(rtl_AllocateAligned(round_up(size, alignment), alignment));
66
67 return {p, FreeAligned{}};
68}
69
70
71#pragma pack(push, 1)
72
73// required information for any I/O (this is basically the same as aiocb,
74// but also applies to synchronous I/O and has shorter/nicer names.)
76{
77 // @param m_Buffer can be 0, in which case temporary block buffers are allocated.
78 // otherwise, it must be aligned and padded to the I/O alignment, e.g. via
79 // io::Allocate.
80 Operation(const File& file, void* buf, off_t size, off_t offset = 0)
81 : m_FileDescriptor(file.Descriptor()), m_OpenFlag((file.Flags() & O_WRONLY)? LIO_WRITE : LIO_READ)
82 , m_Offset(offset), m_Size(size), m_Buffer(buf)
83 {
84 }
85
86 void Validate() const
87 {
90
91 ENSURE(m_Offset >= 0);
92 ENSURE(m_Size >= 0);
93 // m_Buffer can legitimately be 0 (see above)
94 }
95
98
101 void* m_Buffer;
102};
103
104
105// optional information how an Operation is to be carried out
107{
108 // default to single blocking I/Os
110 : alignment(1) // no alignment requirements
111 , blockSize(0) // do not split into blocks
112 , queueDepth(1) // disable aio
113 {
114 }
115
116 // parameters for asynchronous I/O that maximize throughput on current drives
117 struct OverlappedTag {};
120 {
121 }
122
125 {
126 }
127
128 void Validate(const Operation& op) const
129 {
131 ENSURE(alignment > 0);
132
133 if(blockSize != 0)
134 {
136 ENSURE(g_PageSize <= blockSize); // (don't bother checking an upper bound)
137 }
138
140
142 // op.size doesn't need to be aligned
144 }
145
146 // (ATTO only allows 10, which improves upon 8)
147 static const size_t maxQueueDepth = 32;
148
150
151 size_t blockSize; // 0 for one big "block"
152
154};
155
156#define IO_OVERLAPPED io::Parameters(io::Parameters::OverlappedTag())
157
158
160{
161 /**
162 * called after a block I/O has completed.
163 * @return Status (see RETURN_STATUS_FROM_CALLBACK).
164 *
165 * allows progress notification and processing data while waiting for
166 * previous I/Os to complete.
167 **/
168 Status operator()(const u8* UNUSED(block), size_t UNUSED(blockSize)) const
169 {
170 return INFO::OK;
171 }
172};
173
174
176{
177 /**
178 * called before a block I/O is issued.
179 * @return Status (see RETURN_STATUS_FROM_CALLBACK).
180 *
181 * allows generating the data to write while waiting for
182 * previous I/Os to complete.
183 **/
185 {
186 return INFO::OK;
187 }
188};
189
190
191// ring buffer of partially initialized aiocb that can be passed
192// directly to aio_write etc. after setting offset and buffer.
194{
195public:
197 : controlBlocks() // zero-initialize
198 {
199 const size_t blockSize = p.blockSize? p.blockSize : static_cast<size_t>(op.m_Size);
200
201 const bool temporaryBuffersRequested = (op.m_Buffer == 0);
202 if(temporaryBuffersRequested)
203 buffers = io::Allocate(blockSize * p.queueDepth, p.alignment);
204
205 for(size_t i = 0; i < ARRAY_SIZE(controlBlocks); i++)
206 {
207 aiocb& cb = operator[](i);
209 cb.aio_nbytes = blockSize;
211 if(temporaryBuffersRequested)
212 cb.aio_buf = (volatile void*)(uintptr_t(buffers.get()) + i * blockSize);
213 }
214 }
215
216 INLINE aiocb& operator[](size_t counter)
217 {
218 return controlBlocks[counter % ARRAY_SIZE(controlBlocks)];
219 }
220
221private:
224};
225
226#pragma pack(pop)
227
228
229Status Issue(aiocb& cb, size_t queueDepth);
230Status WaitUntilComplete(aiocb& cb, size_t queueDepth);
231
232
233//-----------------------------------------------------------------------------
234// Run
235
236#ifndef ENABLE_IO_STATS
237#define ENABLE_IO_STATS 0
238#endif
239
240// (hooks must be passed by const reference to allow passing rvalues.
241// functors with non-const member data can mark them as mutable.)
242template<class CompletedHook, class IssueHook>
243static inline Status Run(const Operation& op, const Parameters& p = Parameters(), const CompletedHook& completedHook = CompletedHook(), const IssueHook& issueHook = IssueHook())
244{
245 op.Validate();
246 p.Validate(op);
247
248 ControlBlockRingBuffer controlBlockRingBuffer(op, p);
249
250#if ENABLE_IO_STATS
251 const double t0 = timer_Time();
253#endif
254
255 size_t numBlocks = p.blockSize? DivideRoundUp(static_cast<size_t>(op.m_Size), p.blockSize) : 1;
256 for(size_t blocksIssued = 0, blocksCompleted = 0; blocksCompleted < numBlocks; blocksCompleted++)
257 {
258 for(; blocksIssued != numBlocks && blocksIssued < blocksCompleted + (off_t)p.queueDepth; blocksIssued++)
259 {
260 aiocb& cb = controlBlockRingBuffer[blocksIssued];
261 cb.aio_offset = op.m_Offset + blocksIssued * p.blockSize;
262 if(op.m_Buffer)
263 cb.aio_buf = (volatile void*)(uintptr_t(op.m_Buffer) + blocksIssued * p.blockSize);
264 if(blocksIssued == numBlocks-1)
265 cb.aio_nbytes = round_up(size_t(op.m_Size - blocksIssued * p.blockSize), size_t(p.alignment));
266
267 RETURN_STATUS_FROM_CALLBACK(issueHook(cb));
268
269 RETURN_STATUS_IF_ERR(Issue(cb, p.queueDepth));
270 }
271
272 aiocb& cb = controlBlockRingBuffer[blocksCompleted];
273 RETURN_STATUS_IF_ERR(WaitUntilComplete(cb, p.queueDepth));
274
275 RETURN_STATUS_FROM_CALLBACK(completedHook((u8*)cb.aio_buf, cb.aio_nbytes));
276 }
277
278#if ENABLE_IO_STATS
280 const double t1 = timer_Time();
281 const off_t totalSize = p.blockSize? numBlocks*p.blockSize : op.m_Size;
282 debug_printf("IO: %.2f MB/s (%.2f)\n", totalSize/(t1-t0)/1e6, (t1-t0)*1e3);
283#endif
284
285 return INFO::OK;
286}
287
288// (overloads allow omitting parameters without requiring a template argument list)
289template<class CompletedHook>
290static inline Status Run(const Operation& op, const Parameters& p = Parameters(), const CompletedHook& completedHook = CompletedHook())
291{
292 return Run(op, p, completedHook, DefaultIssueHook());
293}
294
295static inline Status Run(const Operation& op, const Parameters& p = Parameters())
296{
297 return Run(op, p, DefaultCompletedHook(), DefaultIssueHook());
298}
299
300
301//-----------------------------------------------------------------------------
302// Store
303
304// efficient writing requires preallocation; the resulting file is
305// padded to the sector size and needs to be truncated afterwards.
306// this function takes care of both.
307template<class CompletedHook, class IssueHook>
308static inline Status Store(const OsPath& pathname, const void* data, size_t size, const Parameters& p = Parameters(), const CompletedHook& completedHook = CompletedHook(), const IssueHook& issueHook = IssueHook())
309{
310 File file;
311 int oflag = O_WRONLY;
312 if(p.queueDepth != 1)
313 oflag |= O_DIRECT;
314 RETURN_STATUS_IF_ERR(file.Open(pathname, oflag));
315 io::Operation op(file, (void*)data, size);
316
317#if OS_WIN
319#endif
320
321 RETURN_STATUS_IF_ERR(io::Run(op, p, completedHook, issueHook));
322
323 file.Close(); // (required by wtruncate)
324
325 RETURN_STATUS_IF_ERR(wtruncate(pathname, size));
326
327 return INFO::OK;
328}
329
330template<class CompletedHook>
331static inline Status Store(const OsPath& pathname, const void* data, size_t size, const Parameters& p = Parameters(), const CompletedHook& completedHook = CompletedHook())
332{
333 return Store(pathname, data, size, p, completedHook, DefaultIssueHook());
334}
335
336static inline Status Store(const OsPath& pathname, const void* data, size_t size, const Parameters& p = Parameters())
337{
338 return Store(pathname, data, size, p, DefaultCompletedHook(), DefaultIssueHook());
339}
340
341
342//-----------------------------------------------------------------------------
343// Load
344
345// convenience function provided for symmetry with Store.
346template<class CompletedHook, class IssueHook>
347static inline Status Load(const OsPath& pathname, void* buf, size_t size, const Parameters& p = Parameters(), const CompletedHook& completedHook = CompletedHook(), const IssueHook& issueHook = IssueHook())
348{
349 File file;
350 int oflag = O_RDONLY;
351 if(p.queueDepth != 1)
352 oflag |= O_DIRECT;
353 RETURN_STATUS_IF_ERR(file.Open(pathname, oflag));
354 io::Operation op(file, buf, size);
355 return io::Run(op, p, completedHook, issueHook);
356}
357
358template<class CompletedHook>
359static inline Status Load(const OsPath& pathname, void* buf, size_t size, const Parameters& p = Parameters(), const CompletedHook& completedHook = CompletedHook())
360{
361 return Load(pathname, buf, size, p, completedHook, DefaultIssueHook());
362}
363
364static inline Status Load(const OsPath& pathname, void* buf, size_t size, const Parameters& p = Parameters())
365{
366 return Load(pathname, buf, size, p, DefaultCompletedHook(), DefaultIssueHook());
367}
368
369} // namespace io
370
371#endif // #ifndef INCLUDED_IO
static const size_t KiB
Definition: alignment.h:106
static const size_t g_PageSize
Definition: alignment.h:93
static const size_t allocationAlignment
Definition: alignment.h:104
bool IsAligned(T t, uintptr_t multiple)
Definition: alignment.h:32
static const uintptr_t maxSectorSize
Definition: alignment.h:117
bool is_pow2(T n)
Definition: bits.h:164
T round_up(T n, T multiple)
round number up/down to the next given multiple.
Definition: bits.h:265
Definition: file.h:46
Status Open(const OsPath &pathName, int openFlag)
Definition: file.h:63
void Close()
Definition: file.h:73
Definition: path.h:80
Definition: io.h:194
INLINE aiocb & operator[](size_t counter)
Definition: io.h:216
ControlBlockRingBuffer(const Operation &op, const Parameters &p)
Definition: io.h:196
io::BufferPtr buffers
Definition: io.h:222
aiocb controlBlocks[Parameters::maxQueueDepth]
Definition: io.h:223
#define INLINE
Definition: code_annotation.h:377
#define UNUSED(param)
mark a function parameter as unused and avoid the corresponding compiler warning.
Definition: code_annotation.h:40
#define UNUSED2(param)
mark a function local variable or parameter as unused and avoid the corresponding compiler warning.
Definition: code_annotation.h:58
#define ARRAY_SIZE(name)
Definition: code_annotation.h:350
#define COMPILER_FENCE
prevent the compiler from reordering loads or stores across this point.
Definition: code_annotation.h:283
void debug_printf(const char *fmt,...)
write a formatted string to the debug channel, subject to filtering (see below).
Definition: debug.cpp:143
#define ENSURE(expr)
ensure the expression <expr> evaluates to non-zero.
Definition: debug.h:277
int wtruncate(const OsPath &pathname, off_t length)
Definition: ufilesystem.cpp:123
#define O_DIRECT
Definition: filesystem.h:67
T DivideRoundUp(T dividend, T divisor)
Definition: lib.h:68
Definition: debug.h:395
const Status IO
Definition: io.h:42
const Status OK
Definition: status.h:388
Definition: io.cpp:33
Status WaitUntilComplete(aiocb &cb, size_t queueDepth)
Definition: io.cpp:67
std::unique_ptr< u8, FreeAligned > BufferPtr
Definition: io.h:52
static Status Store(const OsPath &pathname, const void *data, size_t size, const Parameters &p=Parameters(), const CompletedHook &completedHook=CompletedHook(), const IssueHook &issueHook=IssueHook())
Definition: io.h:308
static Status Load(const OsPath &pathname, void *buf, size_t size, const Parameters &p=Parameters(), const CompletedHook &completedHook=CompletedHook(), const IssueHook &issueHook=IssueHook())
Definition: io.h:347
static io::BufferPtr Allocate(size_t size, size_t alignment=maxSectorSize)
Definition: io.h:60
static Status Run(const Operation &op, const Parameters &p=Parameters(), const CompletedHook &completedHook=CompletedHook(), const IssueHook &issueHook=IssueHook())
Definition: io.h:243
Status Issue(aiocb &cb, size_t queueDepth)
Definition: io.cpp:39
void rtl_FreeAligned(void *alignedPointer)
Definition: gcc.cpp:93
void * rtl_AllocateAligned(size_t size, size_t alignment)
Definition: gcc.cpp:66
#define RETURN_STATUS_IF_ERR(expression)
Definition: status.h:278
#define RETURN_STATUS_FROM_CALLBACK(expression)
Definition: status.h:340
i64 Status
Error handling system.
Definition: status.h:173
Definition: waio.h:64
off_t aio_offset
Definition: waio.h:66
volatile void * aio_buf
Definition: waio.h:67
size_t aio_nbytes
Definition: waio.h:68
int aio_lio_opcode
Definition: waio.h:71
int aio_fildes
Definition: waio.h:65
Definition: io.h:160
Status operator()(const u8 *block, size_t blockSize) const
called after a block I/O has completed.
Definition: io.h:168
Definition: io.h:176
Status operator()(aiocb &cb) const
called before a block I/O is issued.
Definition: io.h:184
Definition: io.h:48
void operator()(void *pointer)
Definition: io.h:49
Definition: io.h:76
void Validate() const
Definition: io.h:86
int m_OpenFlag
Definition: io.h:97
int m_FileDescriptor
Definition: io.h:96
off_t m_Offset
Definition: io.h:99
void * m_Buffer
Definition: io.h:101
off_t m_Size
Definition: io.h:100
Operation(const File &file, void *buf, off_t size, off_t offset=0)
Definition: io.h:80
Definition: io.h:117
Definition: io.h:107
Parameters(OverlappedTag)
Definition: io.h:118
static const size_t maxQueueDepth
Definition: io.h:147
size_t blockSize
Definition: io.h:151
Parameters()
Definition: io.h:109
off_t alignment
Definition: io.h:149
Parameters(size_t blockSize, size_t queueDepth, off_t alignment=maxSectorSize)
Definition: io.h:123
size_t queueDepth
Definition: io.h:153
void Validate(const Operation &op) const
Definition: io.h:128
double timer_Time()
Definition: timer.cpp:130
uint8_t u8
Definition: types.h:37
Status waio_Preallocate(int fd, off_t size)
Definition: waio.cpp:437
@ LIO_WRITE
Definition: waio.h:93
@ LIO_READ
Definition: waio.h:92
__int64 off_t
Definition: wposix_types.h:91