Line data Source code
1 : /* Copyright (C) 2022 Wildfire Games.
2 : *
3 : * Permission is hereby granted, free of charge, to any person obtaining
4 : * a copy of this software and associated documentation files (the
5 : * "Software"), to deal in the Software without restriction, including
6 : * without limitation the rights to use, copy, modify, merge, publish,
7 : * distribute, sublicense, and/or sell copies of the Software, and to
8 : * permit persons to whom the Software is furnished to do so, subject to
9 : * the following conditions:
10 : *
11 : * The above copyright notice and this permission notice shall be included
12 : * in all copies or substantial portions of the Software.
13 : *
14 : * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
15 : * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16 : * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
17 : * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
18 : * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
19 : * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
20 : * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21 : */
22 :
23 : /*
24 : * provide asynchronous and synchronous I/O with hooks to allow
25 : * overlapped processing or progress reporting.
26 : */
27 :
28 : #ifndef INCLUDED_IO
29 : #define INCLUDED_IO
30 :
31 : #include "lib/config2.h"
32 : #include "lib/alignment.h"
33 : #include "lib/bits.h"
34 : #include "lib/timer.h"
35 : #include "lib/file/file.h"
36 : #include "lib/sysdep/rtl.h"
37 : #include "lib/sysdep/filesystem.h" // wtruncate
38 : #include "lib/posix/posix_aio.h" // LIO_READ, LIO_WRITE
39 :
40 : namespace ERR
41 : {
42 : const Status IO = -110301;
43 : }
44 :
45 : namespace io {
46 :
47 : struct FreeAligned
48 : {
49 2 : void operator()(void* pointer) { rtl_FreeAligned(pointer); }
50 : };
51 :
52 : using BufferPtr = std::unique_ptr<u8, FreeAligned>;
53 :
54 : // @return memory suitable for use as an I/O buffer (address is a
55 : // multiple of alignment, size is rounded up to a multiple of alignment)
56 : // @param alignment is automatically increased if required.
57 : //
58 : // use this instead of the file cache for write buffers that are
59 : // never reused (avoids displacing other items).
60 2 : static inline io::BufferPtr Allocate(size_t size, size_t alignment = maxSectorSize)
61 : {
62 2 : ENSURE(is_pow2(alignment));
63 2 : alignment = std::max(alignment, allocationAlignment);
64 :
65 2 : u8* p = static_cast<u8*>(rtl_AllocateAligned(round_up(size, alignment), alignment));
66 :
67 2 : return {p, FreeAligned{}};
68 : }
69 :
70 :
71 : #pragma pack(push, 1)
72 :
73 : // required information for any I/O (this is basically the same as aiocb,
74 : // but also applies to synchronous I/O and has shorter/nicer names.)
75 : struct Operation
76 : {
77 : // @param m_Buffer can be 0, in which case temporary block buffers are allocated.
78 : // otherwise, it must be aligned and padded to the I/O alignment, e.g. via
79 : // io::Allocate.
80 2935 : Operation(const File& file, void* buf, off_t size, off_t offset = 0)
81 5870 : : m_FileDescriptor(file.Descriptor()), m_OpenFlag((file.Flags() & O_WRONLY)? LIO_WRITE : LIO_READ)
82 5870 : , m_Offset(offset), m_Size(size), m_Buffer(buf)
83 : {
84 2935 : }
85 :
86 2935 : void Validate() const
87 : {
88 2935 : ENSURE(m_FileDescriptor >= 0);
89 2935 : ENSURE(m_OpenFlag == LIO_READ || m_OpenFlag == LIO_WRITE);
90 :
91 2935 : ENSURE(m_Offset >= 0);
92 2935 : ENSURE(m_Size >= 0);
93 : // m_Buffer can legitimately be 0 (see above)
94 2935 : }
95 :
96 : int m_FileDescriptor;
97 : int m_OpenFlag;
98 :
99 : off_t m_Offset;
100 : off_t m_Size;
101 : void* m_Buffer;
102 : };
103 :
104 :
105 : // optional information how an Operation is to be carried out
106 : struct Parameters
107 : {
108 : // default to single blocking I/Os
109 2935 : Parameters()
110 2935 : : alignment(1) // no alignment requirements
111 : , blockSize(0) // do not split into blocks
112 2935 : , queueDepth(1) // disable aio
113 : {
114 2935 : }
115 :
116 : // parameters for asynchronous I/O that maximize throughput on current drives
117 : struct OverlappedTag {};
118 : Parameters(OverlappedTag)
119 : : alignment(maxSectorSize), blockSize(128*KiB), queueDepth(32)
120 : {
121 : }
122 :
123 : Parameters(size_t blockSize, size_t queueDepth, off_t alignment = maxSectorSize)
124 : : alignment(alignment), blockSize(blockSize), queueDepth(queueDepth)
125 : {
126 : }
127 :
128 2935 : void Validate(const Operation& op) const
129 : {
130 2935 : ENSURE(is_pow2(alignment));
131 2935 : ENSURE(alignment > 0);
132 :
133 2935 : if(blockSize != 0)
134 : {
135 0 : ENSURE(is_pow2(blockSize));
136 0 : ENSURE(g_PageSize <= blockSize); // (don't bother checking an upper bound)
137 : }
138 :
139 2935 : ENSURE(1 <= queueDepth && queueDepth <= maxQueueDepth);
140 :
141 2935 : ENSURE(IsAligned(op.m_Offset, alignment));
142 : // op.size doesn't need to be aligned
143 2935 : ENSURE(IsAligned(op.m_Buffer, alignment));
144 2935 : }
145 :
146 : // (ATTO only allows 10, which improves upon 8)
147 : static const size_t maxQueueDepth = 32;
148 :
149 : off_t alignment;
150 :
151 : size_t blockSize; // 0 for one big "block"
152 :
153 : size_t queueDepth;
154 : };
155 :
156 : #define IO_OVERLAPPED io::Parameters(io::Parameters::OverlappedTag())
157 :
158 :
159 : struct DefaultCompletedHook
160 : {
161 : /**
162 : * called after a block I/O has completed.
163 : * @return Status (see RETURN_STATUS_FROM_CALLBACK).
164 : *
165 : * allows progress notification and processing data while waiting for
166 : * previous I/Os to complete.
167 : **/
168 2935 : Status operator()(const u8* UNUSED(block), size_t UNUSED(blockSize)) const
169 : {
170 2935 : return INFO::OK;
171 : }
172 : };
173 :
174 :
175 : struct DefaultIssueHook
176 : {
177 : /**
178 : * called before a block I/O is issued.
179 : * @return Status (see RETURN_STATUS_FROM_CALLBACK).
180 : *
181 : * allows generating the data to write while waiting for
182 : * previous I/Os to complete.
183 : **/
184 2935 : Status operator()(aiocb& UNUSED(cb)) const
185 : {
186 2935 : return INFO::OK;
187 : }
188 : };
189 :
190 :
191 : // ring buffer of partially initialized aiocb that can be passed
192 : // directly to aio_write etc. after setting offset and buffer.
193 2935 : class ControlBlockRingBuffer
194 : {
195 : public:
196 2935 : ControlBlockRingBuffer(const Operation& op, const Parameters& p)
197 2935 : : controlBlocks() // zero-initialize
198 : {
199 2935 : const size_t blockSize = p.blockSize? p.blockSize : static_cast<size_t>(op.m_Size);
200 :
201 2935 : const bool temporaryBuffersRequested = (op.m_Buffer == 0);
202 2935 : if(temporaryBuffersRequested)
203 0 : buffers = io::Allocate(blockSize * p.queueDepth, p.alignment);
204 :
205 96855 : for(size_t i = 0; i < ARRAY_SIZE(controlBlocks); i++)
206 : {
207 93920 : aiocb& cb = operator[](i);
208 93920 : cb.aio_fildes = op.m_FileDescriptor;
209 93920 : cb.aio_nbytes = blockSize;
210 93920 : cb.aio_lio_opcode = op.m_OpenFlag;
211 93920 : if(temporaryBuffersRequested)
212 0 : cb.aio_buf = (volatile void*)(uintptr_t(buffers.get()) + i * blockSize);
213 : }
214 2935 : }
215 :
216 99790 : INLINE aiocb& operator[](size_t counter)
217 : {
218 99790 : return controlBlocks[counter % ARRAY_SIZE(controlBlocks)];
219 : }
220 :
221 : private:
222 : io::BufferPtr buffers;
223 : aiocb controlBlocks[Parameters::maxQueueDepth];
224 : };
225 :
226 : #pragma pack(pop)
227 :
228 :
229 : Status Issue(aiocb& cb, size_t queueDepth);
230 : Status WaitUntilComplete(aiocb& cb, size_t queueDepth);
231 :
232 :
233 : //-----------------------------------------------------------------------------
234 : // Run
235 :
236 : #ifndef ENABLE_IO_STATS
237 : #define ENABLE_IO_STATS 0
238 : #endif
239 :
240 : // (hooks must be passed by const reference to allow passing rvalues.
241 : // functors with non-const member data can mark them as mutable.)
242 : template<class CompletedHook, class IssueHook>
243 2935 : static inline Status Run(const Operation& op, const Parameters& p = Parameters(), const CompletedHook& completedHook = CompletedHook(), const IssueHook& issueHook = IssueHook())
244 : {
245 2935 : op.Validate();
246 2935 : p.Validate(op);
247 :
248 5870 : ControlBlockRingBuffer controlBlockRingBuffer(op, p);
249 :
250 : #if ENABLE_IO_STATS
251 : const double t0 = timer_Time();
252 : COMPILER_FENCE;
253 : #endif
254 :
255 2935 : size_t numBlocks = p.blockSize? DivideRoundUp(static_cast<size_t>(op.m_Size), p.blockSize) : 1;
256 5870 : for(size_t blocksIssued = 0, blocksCompleted = 0; blocksCompleted < numBlocks; blocksCompleted++)
257 : {
258 8805 : for(; blocksIssued != numBlocks && blocksIssued < blocksCompleted + (off_t)p.queueDepth; blocksIssued++)
259 : {
260 2935 : aiocb& cb = controlBlockRingBuffer[blocksIssued];
261 2935 : cb.aio_offset = op.m_Offset + blocksIssued * p.blockSize;
262 2935 : if(op.m_Buffer)
263 2935 : cb.aio_buf = (volatile void*)(uintptr_t(op.m_Buffer) + blocksIssued * p.blockSize);
264 2935 : if(blocksIssued == numBlocks-1)
265 2935 : cb.aio_nbytes = round_up(size_t(op.m_Size - blocksIssued * p.blockSize), size_t(p.alignment));
266 :
267 2935 : RETURN_STATUS_FROM_CALLBACK(issueHook(cb));
268 :
269 2935 : RETURN_STATUS_IF_ERR(Issue(cb, p.queueDepth));
270 : }
271 :
272 2935 : aiocb& cb = controlBlockRingBuffer[blocksCompleted];
273 2935 : RETURN_STATUS_IF_ERR(WaitUntilComplete(cb, p.queueDepth));
274 :
275 2935 : RETURN_STATUS_FROM_CALLBACK(completedHook((u8*)cb.aio_buf, cb.aio_nbytes));
276 : }
277 :
278 : #if ENABLE_IO_STATS
279 : COMPILER_FENCE;
280 : const double t1 = timer_Time();
281 : const off_t totalSize = p.blockSize? numBlocks*p.blockSize : op.m_Size;
282 : debug_printf("IO: %.2f MB/s (%.2f)\n", totalSize/(t1-t0)/1e6, (t1-t0)*1e3);
283 : #endif
284 :
285 2935 : return INFO::OK;
286 : }
287 :
288 : // (overloads allow omitting parameters without requiring a template argument list)
289 : template<class CompletedHook>
290 0 : static inline Status Run(const Operation& op, const Parameters& p = Parameters(), const CompletedHook& completedHook = CompletedHook())
291 : {
292 0 : return Run(op, p, completedHook, DefaultIssueHook());
293 : }
294 :
295 2 : static inline Status Run(const Operation& op, const Parameters& p = Parameters())
296 : {
297 2 : return Run(op, p, DefaultCompletedHook(), DefaultIssueHook());
298 : }
299 :
300 :
301 : //-----------------------------------------------------------------------------
302 : // Store
303 :
304 : // efficient writing requires preallocation; the resulting file is
305 : // padded to the sector size and needs to be truncated afterwards.
306 : // this function takes care of both.
307 : template<class CompletedHook, class IssueHook>
308 97 : static inline Status Store(const OsPath& pathname, const void* data, size_t size, const Parameters& p = Parameters(), const CompletedHook& completedHook = CompletedHook(), const IssueHook& issueHook = IssueHook())
309 : {
310 194 : File file;
311 97 : int oflag = O_WRONLY;
312 97 : if(p.queueDepth != 1)
313 0 : oflag |= O_DIRECT;
314 97 : RETURN_STATUS_IF_ERR(file.Open(pathname, oflag));
315 97 : io::Operation op(file, (void*)data, size);
316 :
317 : #if OS_WIN
318 : UNUSED2(waio_Preallocate(op.m_FileDescriptor, (off_t)size));
319 : #endif
320 :
321 97 : RETURN_STATUS_IF_ERR(io::Run(op, p, completedHook, issueHook));
322 :
323 97 : file.Close(); // (required by wtruncate)
324 :
325 97 : RETURN_STATUS_IF_ERR(wtruncate(pathname, size));
326 :
327 97 : return INFO::OK;
328 : }
329 :
330 : template<class CompletedHook>
331 : static inline Status Store(const OsPath& pathname, const void* data, size_t size, const Parameters& p = Parameters(), const CompletedHook& completedHook = CompletedHook())
332 : {
333 : return Store(pathname, data, size, p, completedHook, DefaultIssueHook());
334 : }
335 :
336 97 : static inline Status Store(const OsPath& pathname, const void* data, size_t size, const Parameters& p = Parameters())
337 : {
338 97 : return Store(pathname, data, size, p, DefaultCompletedHook(), DefaultIssueHook());
339 : }
340 :
341 :
342 : //-----------------------------------------------------------------------------
343 : // Load
344 :
345 : // convenience function provided for symmetry with Store.
346 : template<class CompletedHook, class IssueHook>
347 2836 : static inline Status Load(const OsPath& pathname, void* buf, size_t size, const Parameters& p = Parameters(), const CompletedHook& completedHook = CompletedHook(), const IssueHook& issueHook = IssueHook())
348 : {
349 5672 : File file;
350 2836 : int oflag = O_RDONLY;
351 2836 : if(p.queueDepth != 1)
352 0 : oflag |= O_DIRECT;
353 2836 : RETURN_STATUS_IF_ERR(file.Open(pathname, oflag));
354 2836 : io::Operation op(file, buf, size);
355 2836 : return io::Run(op, p, completedHook, issueHook);
356 : }
357 :
358 : template<class CompletedHook>
359 : static inline Status Load(const OsPath& pathname, void* buf, size_t size, const Parameters& p = Parameters(), const CompletedHook& completedHook = CompletedHook())
360 : {
361 : return Load(pathname, buf, size, p, completedHook, DefaultIssueHook());
362 : }
363 :
364 2836 : static inline Status Load(const OsPath& pathname, void* buf, size_t size, const Parameters& p = Parameters())
365 : {
366 2836 : return Load(pathname, buf, size, p, DefaultCompletedHook(), DefaultIssueHook());
367 : }
368 :
369 : } // namespace io
370 :
371 : #endif // #ifndef INCLUDED_IO
|