Pyrogenesis  trunk
io.h
Go to the documentation of this file.
1 /* Copyright (C) 2022 Wildfire Games.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining
4  * a copy of this software and associated documentation files (the
5  * "Software"), to deal in the Software without restriction, including
6  * without limitation the rights to use, copy, modify, merge, publish,
7  * distribute, sublicense, and/or sell copies of the Software, and to
8  * permit persons to whom the Software is furnished to do so, subject to
9  * the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included
12  * in all copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
15  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
17  * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
18  * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
19  * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
20  * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21  */
22 
23 /*
24  * provide asynchronous and synchronous I/O with hooks to allow
25  * overlapped processing or progress reporting.
26  */
27 
28 #ifndef INCLUDED_IO
29 #define INCLUDED_IO
30 
31 #include "lib/config2.h"
32 #include "lib/alignment.h"
33 #include "lib/bits.h"
34 #include "lib/timer.h"
35 #include "lib/file/file.h"
36 #include "lib/sysdep/rtl.h"
37 #include "lib/sysdep/filesystem.h" // wtruncate
38 #include "lib/posix/posix_aio.h" // LIO_READ, LIO_WRITE
39 
40 namespace ERR
41 {
42  const Status IO = -110301;
43 }
44 
45 namespace io {
46 
48 {
49  void operator()(void* pointer) { rtl_FreeAligned(pointer); }
50 };
51 
52 using BufferPtr = std::unique_ptr<u8, FreeAligned>;
53 
54 // @return memory suitable for use as an I/O buffer (address is a
55 // multiple of alignment, size is rounded up to a multiple of alignment)
56 // @param alignment is automatically increased if required.
57 //
58 // use this instead of the file cache for write buffers that are
59 // never reused (avoids displacing other items).
60 static inline io::BufferPtr Allocate(size_t size, size_t alignment = maxSectorSize)
61 {
62  ENSURE(is_pow2(alignment));
63  alignment = std::max(alignment, allocationAlignment);
64 
65  u8* p = static_cast<u8*>(rtl_AllocateAligned(round_up(size, alignment), alignment));
66 
67  return {p, FreeAligned{}};
68 }
69 
70 
71 #pragma pack(push, 1)
72 
73 // required information for any I/O (this is basically the same as aiocb,
74 // but also applies to synchronous I/O and has shorter/nicer names.)
75 struct Operation
76 {
77  // @param m_Buffer can be 0, in which case temporary block buffers are allocated.
78  // otherwise, it must be aligned and padded to the I/O alignment, e.g. via
79  // io::Allocate.
80  Operation(const File& file, void* buf, off_t size, off_t offset = 0)
81  : m_FileDescriptor(file.Descriptor()), m_OpenFlag((file.Flags() & O_WRONLY)? LIO_WRITE : LIO_READ)
82  , m_Offset(offset), m_Size(size), m_Buffer(buf)
83  {
84  }
85 
86  void Validate() const
87  {
88  ENSURE(m_FileDescriptor >= 0);
89  ENSURE(m_OpenFlag == LIO_READ || m_OpenFlag == LIO_WRITE);
90 
91  ENSURE(m_Offset >= 0);
92  ENSURE(m_Size >= 0);
93  // m_Buffer can legitimately be 0 (see above)
94  }
95 
98 
101  void* m_Buffer;
102 };
103 
104 
105 // optional information how an Operation is to be carried out
107 {
108  // default to single blocking I/Os
110  : alignment(1) // no alignment requirements
111  , blockSize(0) // do not split into blocks
112  , queueDepth(1) // disable aio
113  {
114  }
115 
116  // parameters for asynchronous I/O that maximize throughput on current drives
117  struct OverlappedTag {};
119  : alignment(maxSectorSize), blockSize(128*KiB), queueDepth(32)
120  {
121  }
122 
123  Parameters(size_t blockSize, size_t queueDepth, off_t alignment = maxSectorSize)
124  : alignment(alignment), blockSize(blockSize), queueDepth(queueDepth)
125  {
126  }
127 
128  void Validate(const Operation& op) const
129  {
130  ENSURE(is_pow2(alignment));
131  ENSURE(alignment > 0);
132 
133  if(blockSize != 0)
134  {
135  ENSURE(is_pow2(blockSize));
136  ENSURE(g_PageSize <= blockSize); // (don't bother checking an upper bound)
137  }
138 
139  ENSURE(1 <= queueDepth && queueDepth <= maxQueueDepth);
140 
141  ENSURE(IsAligned(op.m_Offset, alignment));
142  // op.size doesn't need to be aligned
143  ENSURE(IsAligned(op.m_Buffer, alignment));
144  }
145 
146  // (ATTO only allows 10, which improves upon 8)
147  static const size_t maxQueueDepth = 32;
148 
150 
151  size_t blockSize; // 0 for one big "block"
152 
153  size_t queueDepth;
154 };
155 
156 #define IO_OVERLAPPED io::Parameters(io::Parameters::OverlappedTag())
157 
158 
160 {
161  /**
162  * called after a block I/O has completed.
163  * @return Status (see RETURN_STATUS_FROM_CALLBACK).
164  *
165  * allows progress notification and processing data while waiting for
166  * previous I/Os to complete.
167  **/
168  Status operator()(const u8* UNUSED(block), size_t UNUSED(blockSize)) const
169  {
170  return INFO::OK;
171  }
172 };
173 
174 
176 {
177  /**
178  * called before a block I/O is issued.
179  * @return Status (see RETURN_STATUS_FROM_CALLBACK).
180  *
181  * allows generating the data to write while waiting for
182  * previous I/Os to complete.
183  **/
185  {
186  return INFO::OK;
187  }
188 };
189 
190 
191 // ring buffer of partially initialized aiocb that can be passed
192 // directly to aio_write etc. after setting offset and buffer.
194 {
195 public:
197  : controlBlocks() // zero-initialize
198  {
199  const size_t blockSize = p.blockSize? p.blockSize : static_cast<size_t>(op.m_Size);
200 
201  const bool temporaryBuffersRequested = (op.m_Buffer == 0);
202  if(temporaryBuffersRequested)
203  buffers = io::Allocate(blockSize * p.queueDepth, p.alignment);
204 
205  for(size_t i = 0; i < ARRAY_SIZE(controlBlocks); i++)
206  {
207  aiocb& cb = operator[](i);
209  cb.aio_nbytes = blockSize;
210  cb.aio_lio_opcode = op.m_OpenFlag;
211  if(temporaryBuffersRequested)
212  cb.aio_buf = (volatile void*)(uintptr_t(buffers.get()) + i * blockSize);
213  }
214  }
215 
216  INLINE aiocb& operator[](size_t counter)
217  {
218  return controlBlocks[counter % ARRAY_SIZE(controlBlocks)];
219  }
220 
221 private:
223  aiocb controlBlocks[Parameters::maxQueueDepth];
224 };
225 
226 #pragma pack(pop)
227 
228 
229 Status Issue(aiocb& cb, size_t queueDepth);
230 Status WaitUntilComplete(aiocb& cb, size_t queueDepth);
231 
232 
233 //-----------------------------------------------------------------------------
234 // Run
235 
236 #ifndef ENABLE_IO_STATS
237 #define ENABLE_IO_STATS 0
238 #endif
239 
240 // (hooks must be passed by const reference to allow passing rvalues.
241 // functors with non-const member data can mark them as mutable.)
242 template<class CompletedHook, class IssueHook>
243 static inline Status Run(const Operation& op, const Parameters& p = Parameters(), const CompletedHook& completedHook = CompletedHook(), const IssueHook& issueHook = IssueHook())
244 {
245  op.Validate();
246  p.Validate(op);
247 
248  ControlBlockRingBuffer controlBlockRingBuffer(op, p);
249 
250 #if ENABLE_IO_STATS
251  const double t0 = timer_Time();
253 #endif
254 
255  size_t numBlocks = p.blockSize? DivideRoundUp(static_cast<size_t>(op.m_Size), p.blockSize) : 1;
256  for(size_t blocksIssued = 0, blocksCompleted = 0; blocksCompleted < numBlocks; blocksCompleted++)
257  {
258  for(; blocksIssued != numBlocks && blocksIssued < blocksCompleted + (off_t)p.queueDepth; blocksIssued++)
259  {
260  aiocb& cb = controlBlockRingBuffer[blocksIssued];
261  cb.aio_offset = op.m_Offset + blocksIssued * p.blockSize;
262  if(op.m_Buffer)
263  cb.aio_buf = (volatile void*)(uintptr_t(op.m_Buffer) + blocksIssued * p.blockSize);
264  if(blocksIssued == numBlocks-1)
265  cb.aio_nbytes = round_up(size_t(op.m_Size - blocksIssued * p.blockSize), size_t(p.alignment));
266 
267  RETURN_STATUS_FROM_CALLBACK(issueHook(cb));
268 
269  RETURN_STATUS_IF_ERR(Issue(cb, p.queueDepth));
270  }
271 
272  aiocb& cb = controlBlockRingBuffer[blocksCompleted];
273  RETURN_STATUS_IF_ERR(WaitUntilComplete(cb, p.queueDepth));
274 
275  RETURN_STATUS_FROM_CALLBACK(completedHook((u8*)cb.aio_buf, cb.aio_nbytes));
276  }
277 
278 #if ENABLE_IO_STATS
280  const double t1 = timer_Time();
281  const off_t totalSize = p.blockSize? numBlocks*p.blockSize : op.m_Size;
282  debug_printf("IO: %.2f MB/s (%.2f)\n", totalSize/(t1-t0)/1e6, (t1-t0)*1e3);
283 #endif
284 
285  return INFO::OK;
286 }
287 
288 // (overloads allow omitting parameters without requiring a template argument list)
289 template<class CompletedHook>
290 static inline Status Run(const Operation& op, const Parameters& p = Parameters(), const CompletedHook& completedHook = CompletedHook())
291 {
292  return Run(op, p, completedHook, DefaultIssueHook());
293 }
294 
295 static inline Status Run(const Operation& op, const Parameters& p = Parameters())
296 {
297  return Run(op, p, DefaultCompletedHook(), DefaultIssueHook());
298 }
299 
300 
301 //-----------------------------------------------------------------------------
302 // Store
303 
304 // efficient writing requires preallocation; the resulting file is
305 // padded to the sector size and needs to be truncated afterwards.
306 // this function takes care of both.
307 template<class CompletedHook, class IssueHook>
308 static inline Status Store(const OsPath& pathname, const void* data, size_t size, const Parameters& p = Parameters(), const CompletedHook& completedHook = CompletedHook(), const IssueHook& issueHook = IssueHook())
309 {
310  File file;
311  int oflag = O_WRONLY;
312  if(p.queueDepth != 1)
313  oflag |= O_DIRECT;
314  RETURN_STATUS_IF_ERR(file.Open(pathname, oflag));
315  io::Operation op(file, (void*)data, size);
316 
317 #if OS_WIN
318  UNUSED2(waio_Preallocate(op.m_FileDescriptor, (off_t)size));
319 #endif
320 
321  RETURN_STATUS_IF_ERR(io::Run(op, p, completedHook, issueHook));
322 
323  file.Close(); // (required by wtruncate)
324 
325  RETURN_STATUS_IF_ERR(wtruncate(pathname, size));
326 
327  return INFO::OK;
328 }
329 
330 template<class CompletedHook>
331 static inline Status Store(const OsPath& pathname, const void* data, size_t size, const Parameters& p = Parameters(), const CompletedHook& completedHook = CompletedHook())
332 {
333  return Store(pathname, data, size, p, completedHook, DefaultIssueHook());
334 }
335 
336 static inline Status Store(const OsPath& pathname, const void* data, size_t size, const Parameters& p = Parameters())
337 {
338  return Store(pathname, data, size, p, DefaultCompletedHook(), DefaultIssueHook());
339 }
340 
341 
342 //-----------------------------------------------------------------------------
343 // Load
344 
345 // convenience function provided for symmetry with Store.
346 template<class CompletedHook, class IssueHook>
347 static inline Status Load(const OsPath& pathname, void* buf, size_t size, const Parameters& p = Parameters(), const CompletedHook& completedHook = CompletedHook(), const IssueHook& issueHook = IssueHook())
348 {
349  File file;
350  int oflag = O_RDONLY;
351  if(p.queueDepth != 1)
352  oflag |= O_DIRECT;
353  RETURN_STATUS_IF_ERR(file.Open(pathname, oflag));
354  io::Operation op(file, buf, size);
355  return io::Run(op, p, completedHook, issueHook);
356 }
357 
358 template<class CompletedHook>
359 static inline Status Load(const OsPath& pathname, void* buf, size_t size, const Parameters& p = Parameters(), const CompletedHook& completedHook = CompletedHook())
360 {
361  return Load(pathname, buf, size, p, completedHook, DefaultIssueHook());
362 }
363 
364 static inline Status Load(const OsPath& pathname, void* buf, size_t size, const Parameters& p = Parameters())
365 {
366  return Load(pathname, buf, size, p, DefaultCompletedHook(), DefaultIssueHook());
367 }
368 
369 } // namespace io
370 
371 #endif // #ifndef INCLUDED_IO
#define RETURN_STATUS_FROM_CALLBACK(expression)
Definition: status.h:336
void * rtl_AllocateAligned(size_t size, size_t align)
Definition: gcc.cpp:66
int m_OpenFlag
Definition: io.h:97
off_t m_Offset
Definition: io.h:99
#define O_DIRECT
Definition: filesystem.h:67
ControlBlockRingBuffer(const Operation &op, const Parameters &p)
Definition: io.h:196
Definition: io.h:175
#define UNUSED(param)
mark a function parameter as unused and avoid the corresponding compiler warning. ...
Definition: code_annotation.h:38
size_t aio_nbytes
Definition: waio.h:68
#define COMPILER_FENCE
prevent the compiler from reordering loads or stores across this point.
Definition: code_annotation.h:281
Status waio_Preallocate(int fd, off_t size)
Definition: waio.cpp:439
T DivideRoundUp(T dividend, T divisor)
low-level aka "lib"
Definition: lib.h:68
size_t queueDepth
Definition: io.h:153
const Status OK
Definition: status.h:384
int wtruncate(const OsPath &pathname, off_t length)
Definition: ufilesystem.cpp:123
const Status IO
Definition: io.h:42
Definition: io.h:47
Definition: io.h:75
static const uintptr_t maxSectorSize
Definition: alignment.h:115
off_t alignment
Definition: io.h:149
T round_up(T n, T multiple)
round number up/down to the next given multiple.
Definition: bits.h:265
static io::BufferPtr Allocate(size_t size, size_t alignment=maxSectorSize)
Definition: io.h:60
static const size_t g_PageSize
Definition: alignment.h:91
static Status Store(const OsPath &pathname, const void *data, size_t size, const Parameters &p=Parameters())
Definition: io.h:336
Parameters()
Definition: io.h:109
void * m_Buffer
Definition: io.h:101
void Close()
Definition: file.h:73
void operator()(void *pointer)
Definition: io.h:49
Definition: io.h:159
uint8_t u8
Definition: types.h:37
std::unique_ptr< u8, FreeAligned > BufferPtr
Definition: io.h:52
Definition: waio.h:92
volatile void * aio_buf
Definition: waio.h:67
#define ARRAY_SIZE(name)
Definition: code_annotation.h:348
bool IsAligned(T t, uintptr_t multiple)
Definition: alignment.h:30
Definition: waio.h:93
Definition: io.cpp:33
#define ENSURE(expr)
ensure the expression <expr> evaluates to non-zero.
Definition: debug.h:290
#define UNUSED2(param)
mark a function local variable or parameter as unused and avoid the corresponding compiler warning...
Definition: code_annotation.h:56
INLINE aiocb & operator[](size_t counter)
Definition: io.h:216
Status Open(const OsPath &pathName, int openFlag)
Definition: file.h:63
int aio_lio_opcode
Definition: waio.h:71
__int64 off_t
Definition: wposix_types.h:91
Definition: path.h:79
static const size_t KiB
Definition: alignment.h:104
size_t blockSize
Definition: io.h:151
io::BufferPtr buffers
Definition: io.h:222
i64 Status
Error handling system.
Definition: status.h:169
void debug_printf(const char *fmt,...)
write a formatted string to the debug channel, subject to filtering (see below).
Definition: debug.cpp:148
double timer_Time()
Definition: timer.cpp:130
Definition: io.h:117
Status operator()(aiocb &cb) const
called before a block I/O is issued.
Definition: io.h:184
Introduction
Definition: debug.h:407
Parameters(OverlappedTag)
Definition: io.h:118
Status operator()(const u8 *block, size_t blockSize) const
called after a block I/O has completed.
Definition: io.h:168
Definition: waio.h:63
Definition: io.h:193
bool is_pow2(T n)
Definition: bits.h:164
off_t aio_offset
Definition: waio.h:66
void rtl_FreeAligned(void *alignedPointer)
Definition: gcc.cpp:93
Parameters(size_t blockSize, size_t queueDepth, off_t alignment=maxSectorSize)
Definition: io.h:123
Operation(const File &file, void *buf, off_t size, off_t offset=0)
Definition: io.h:80
void Validate(const Operation &op) const
Definition: io.h:128
Definition: io.h:106
static Status Run(const Operation &op, const Parameters &p=Parameters())
Definition: io.h:295
Definition: file.h:45
void Validate() const
Definition: io.h:86
Status WaitUntilComplete(aiocb &cb, size_t queueDepth)
Definition: io.cpp:67
static int Issue(aiocb *cb)
Definition: waio.cpp:496
static Status Load(const OsPath &pathname, void *buf, size_t size, const Parameters &p=Parameters())
Definition: io.h:364
off_t m_Size
Definition: io.h:100
int m_FileDescriptor
Definition: io.h:96
static const size_t allocationAlignment
Definition: alignment.h:102
int aio_fildes
Definition: waio.h:65
#define RETURN_STATUS_IF_ERR(expression)
Definition: status.h:274
#define INLINE
Definition: code_annotation.h:375
static Status Run(const Operation &op, const Parameters &p=Parameters(), const CompletedHook &completedHook=CompletedHook(), const IssueHook &issueHook=IssueHook())
Definition: io.h:243