libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp

79.9% Lines (397/497) 89.6% Functions (43/48) 66.7% Branches (202/303)
libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/scheduler.hpp"
15 #include "src/detail/epoll/op.hpp"
16 #include "src/detail/timer_service.hpp"
17 #include "src/detail/make_err.hpp"
18 #include "src/detail/posix/resolver_service.hpp"
19 #include "src/detail/posix/signals.hpp"
20
21 #include <boost/corosio/detail/except.hpp>
22 #include <boost/corosio/detail/thread_local_ptr.hpp>
23
24 #include <chrono>
25 #include <limits>
26 #include <utility>
27
28 #include <errno.h>
29 #include <fcntl.h>
30 #include <sys/epoll.h>
31 #include <sys/eventfd.h>
32 #include <sys/socket.h>
33 #include <sys/timerfd.h>
34 #include <unistd.h>
35
36 /*
37 epoll Scheduler - Single Reactor Model
38 ======================================
39
40 This scheduler uses a thread coordination strategy to provide handler
41 parallelism and avoid the thundering herd problem.
42 Instead of all threads blocking on epoll_wait(), one thread becomes the
43 "reactor" while others wait on a condition variable for handler work.
44
45 Thread Model
46 ------------
47 - ONE thread runs epoll_wait() at a time (the reactor thread)
48 - OTHER threads wait on cond_ (condition variable) for handlers
49 - When work is posted, exactly one waiting thread wakes via notify_one()
50 - This matches Windows IOCP semantics where N posted items wake N threads
51
52 Event Loop Structure (do_one)
53 -----------------------------
54 1. Lock mutex, try to pop handler from queue
55 2. If got handler: execute it (unlocked), return
56 3. If queue empty and no reactor running: become reactor
57 - Run epoll_wait (unlocked), queue I/O completions, loop back
58 4. If queue empty and reactor running: wait on condvar for work
59
60 The task_running_ flag ensures only one thread owns epoll_wait().
61 After the reactor queues I/O completions, it loops back to try getting
62 a handler, giving priority to handler execution over more I/O polling.
63
64 Signaling State (state_)
65 ------------------------
66 The state_ variable encodes two pieces of information:
67 - Bit 0: signaled flag (1 = signaled, persists until cleared)
68 - Upper bits: waiter count (each waiter adds 2 before blocking)
69
70 This allows efficient coordination:
71 - Signalers only call notify when waiters exist (state_ > 1)
72 - Waiters check if already signaled before blocking (fast-path)
73
74 Wake Coordination (wake_one_thread_and_unlock)
75 ----------------------------------------------
76 When posting work:
77 - If waiters exist (state_ > 1): signal and notify_one()
78 - Else if reactor running: interrupt via eventfd write
79 - Else: no-op (thread will find work when it checks queue)
80
81 This avoids waking threads unnecessarily. With cascading wakes,
82 each handler execution wakes at most one additional thread if
83 more work exists in the queue.
84
85 Work Counting
86 -------------
87 outstanding_work_ tracks pending operations. When it hits zero, run()
88 returns. Each operation increments on start, decrements on completion.
89
90 Timer Integration
91 -----------------
92 Timers are handled by timer_service. The reactor adjusts epoll_wait
93 timeout to wake for the nearest timer expiry. When a new timer is
94 scheduled earlier than current, timer_service calls interrupt_reactor()
95 to re-evaluate the timeout.
96 */
97
98 namespace boost::corosio::detail {
99
100 struct scheduler_context
101 {
102 epoll_scheduler const* key;
103 scheduler_context* next;
104 op_queue private_queue;
105 long private_outstanding_work;
106 int inline_budget;
107
108 182 scheduler_context(epoll_scheduler const* k, scheduler_context* n)
109 182 : key(k)
110 182 , next(n)
111 182 , private_outstanding_work(0)
112 182 , inline_budget(0)
113 {
114 182 }
115 };
116
117 namespace {
118
119 corosio::detail::thread_local_ptr<scheduler_context> context_stack;
120
121 struct thread_context_guard
122 {
123 scheduler_context frame_;
124
125 182 explicit thread_context_guard(
126 epoll_scheduler const* ctx) noexcept
127 182 : frame_(ctx, context_stack.get())
128 {
129 182 context_stack.set(&frame_);
130 182 }
131
132 182 ~thread_context_guard() noexcept
133 {
134
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 182 times.
182 if (!frame_.private_queue.empty())
135 frame_.key->drain_thread_queue(frame_.private_queue, frame_.private_outstanding_work);
136 182 context_stack.set(frame_.next);
137 182 }
138 };
139
140 scheduler_context*
141 465521 find_context(epoll_scheduler const* self) noexcept
142 {
143
2/2
✓ Branch 1 taken 463842 times.
✓ Branch 2 taken 1679 times.
465521 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
144
1/2
✓ Branch 0 taken 463842 times.
✗ Branch 1 not taken.
463842 if (c->key == self)
145 463842 return c;
146 1679 return nullptr;
147 }
148
149 } // namespace
150
151 void
152 87799 epoll_scheduler::
153 reset_inline_budget() const noexcept
154 {
155
1/2
✓ Branch 1 taken 87799 times.
✗ Branch 2 not taken.
87799 if (auto* ctx = find_context(this))
156 87799 ctx->inline_budget = max_inline_budget_;
157 87799 }
158
159 bool
160 233852 epoll_scheduler::
161 try_consume_inline_budget() const noexcept
162 {
163
1/2
✓ Branch 1 taken 233852 times.
✗ Branch 2 not taken.
233852 if (auto* ctx = find_context(this))
164 {
165
2/2
✓ Branch 0 taken 155969 times.
✓ Branch 1 taken 77883 times.
233852 if (ctx->inline_budget > 0)
166 {
167 155969 --ctx->inline_budget;
168 155969 return true;
169 }
170 }
171 77883 return false;
172 }
173
174 void
175 63615 descriptor_state::
176 operator()()
177 {
178 63615 is_enqueued_.store(false, std::memory_order_relaxed);
179
180 // Take ownership of impl ref set by close_socket() to prevent
181 // the owning impl from being freed while we're executing
182 63615 auto prevent_impl_destruction = std::move(impl_ref_);
183
184 63615 std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
185
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 63615 times.
63615 if (ev == 0)
186 {
187 scheduler_->compensating_work_started();
188 return;
189 }
190
191 63615 op_queue local_ops;
192
193 63615 int err = 0;
194
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 63614 times.
63615 if (ev & EPOLLERR)
195 {
196 1 socklen_t len = sizeof(err);
197
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
198 err = errno;
199
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 if (err == 0)
200 1 err = EIO;
201 }
202
203 {
204
1/1
✓ Branch 1 taken 63615 times.
63615 std::lock_guard lock(mutex);
205
2/2
✓ Branch 0 taken 19757 times.
✓ Branch 1 taken 43858 times.
63615 if (ev & EPOLLIN)
206 {
207
2/2
✓ Branch 0 taken 4902 times.
✓ Branch 1 taken 14855 times.
19757 if (read_op)
208 {
209 4902 auto* rd = read_op;
210
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4902 times.
4902 if (err)
211 rd->complete(err, 0);
212 else
213 4902 rd->perform_io();
214
215
2/4
✓ Branch 0 taken 4902 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 4902 times.
4902 if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
216 {
217 rd->errn = 0;
218 }
219 else
220 {
221 4902 read_op = nullptr;
222 4902 local_ops.push(rd);
223 }
224 }
225 else
226 {
227 14855 read_ready = true;
228 }
229 }
230
2/2
✓ Branch 0 taken 58764 times.
✓ Branch 1 taken 4851 times.
63615 if (ev & EPOLLOUT)
231 {
232
3/4
✓ Branch 0 taken 53910 times.
✓ Branch 1 taken 4854 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 53910 times.
58764 bool had_write_op = (connect_op || write_op);
233
2/2
✓ Branch 0 taken 4854 times.
✓ Branch 1 taken 53910 times.
58764 if (connect_op)
234 {
235 4854 auto* cn = connect_op;
236
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4854 times.
4854 if (err)
237 cn->complete(err, 0);
238 else
239 4854 cn->perform_io();
240 4854 connect_op = nullptr;
241 4854 local_ops.push(cn);
242 }
243
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 58764 times.
58764 if (write_op)
244 {
245 auto* wr = write_op;
246 if (err)
247 wr->complete(err, 0);
248 else
249 wr->perform_io();
250
251 if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
252 {
253 wr->errn = 0;
254 }
255 else
256 {
257 write_op = nullptr;
258 local_ops.push(wr);
259 }
260 }
261
2/2
✓ Branch 0 taken 53910 times.
✓ Branch 1 taken 4854 times.
58764 if (!had_write_op)
262 53910 write_ready = true;
263 }
264
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 63614 times.
63615 if (err)
265 {
266
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (read_op)
267 {
268 read_op->complete(err, 0);
269 local_ops.push(std::exchange(read_op, nullptr));
270 }
271
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (write_op)
272 {
273 write_op->complete(err, 0);
274 local_ops.push(std::exchange(write_op, nullptr));
275 }
276
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (connect_op)
277 {
278 connect_op->complete(err, 0);
279 local_ops.push(std::exchange(connect_op, nullptr));
280 }
281 }
282 63615 }
283
284 // Execute first handler inline — the scheduler's work_cleanup
285 // accounts for this as the "consumed" work item
286 63615 scheduler_op* first = local_ops.pop();
287
2/2
✓ Branch 0 taken 9756 times.
✓ Branch 1 taken 53859 times.
63615 if (first)
288 {
289
1/1
✓ Branch 1 taken 9756 times.
9756 scheduler_->post_deferred_completions(local_ops);
290
1/1
✓ Branch 1 taken 9756 times.
9756 (*first)();
291 }
292 else
293 {
294 53859 scheduler_->compensating_work_started();
295 }
296 63615 }
297
298 203 epoll_scheduler::
299 epoll_scheduler(
300 capy::execution_context& ctx,
301 203 int)
302 203 : epoll_fd_(-1)
303 203 , event_fd_(-1)
304 203 , timer_fd_(-1)
305 203 , outstanding_work_(0)
306 203 , stopped_(false)
307 203 , shutdown_(false)
308 203 , task_running_{false}
309 203 , task_interrupted_(false)
310 406 , state_(0)
311 {
312 203 epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
313
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 203 times.
203 if (epoll_fd_ < 0)
314 detail::throw_system_error(make_err(errno), "epoll_create1");
315
316 203 event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
317
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 203 times.
203 if (event_fd_ < 0)
318 {
319 int errn = errno;
320 ::close(epoll_fd_);
321 detail::throw_system_error(make_err(errn), "eventfd");
322 }
323
324 203 timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
325
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 203 times.
203 if (timer_fd_ < 0)
326 {
327 int errn = errno;
328 ::close(event_fd_);
329 ::close(epoll_fd_);
330 detail::throw_system_error(make_err(errn), "timerfd_create");
331 }
332
333 203 epoll_event ev{};
334 203 ev.events = EPOLLIN | EPOLLET;
335 203 ev.data.ptr = nullptr;
336
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 203 times.
203 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
337 {
338 int errn = errno;
339 ::close(timer_fd_);
340 ::close(event_fd_);
341 ::close(epoll_fd_);
342 detail::throw_system_error(make_err(errn), "epoll_ctl");
343 }
344
345 203 epoll_event timer_ev{};
346 203 timer_ev.events = EPOLLIN | EPOLLERR;
347 203 timer_ev.data.ptr = &timer_fd_;
348
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 203 times.
203 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
349 {
350 int errn = errno;
351 ::close(timer_fd_);
352 ::close(event_fd_);
353 ::close(epoll_fd_);
354 detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
355 }
356
357
1/1
✓ Branch 1 taken 203 times.
203 timer_svc_ = &get_timer_service(ctx, *this);
358
1/1
✓ Branch 3 taken 203 times.
203 timer_svc_->set_on_earliest_changed(
359 timer_service::callback(
360 this,
361 [](void* p) {
362 5066 auto* self = static_cast<epoll_scheduler*>(p);
363 5066 self->timerfd_stale_.store(true, std::memory_order_release);
364
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5066 times.
5066 if (self->task_running_.load(std::memory_order_acquire))
365 self->interrupt_reactor();
366 5066 }));
367
368 // Initialize resolver service
369
1/1
✓ Branch 1 taken 203 times.
203 get_resolver_service(ctx, *this);
370
371 // Initialize signal service
372
1/1
✓ Branch 1 taken 203 times.
203 get_signal_service(ctx, *this);
373
374 // Push task sentinel to interleave reactor runs with handler execution
375 203 completed_ops_.push(&task_op_);
376 203 }
377
378 406 epoll_scheduler::
379 203 ~epoll_scheduler()
380 {
381
1/2
✓ Branch 0 taken 203 times.
✗ Branch 1 not taken.
203 if (timer_fd_ >= 0)
382 203 ::close(timer_fd_);
383
1/2
✓ Branch 0 taken 203 times.
✗ Branch 1 not taken.
203 if (event_fd_ >= 0)
384 203 ::close(event_fd_);
385
1/2
✓ Branch 0 taken 203 times.
✗ Branch 1 not taken.
203 if (epoll_fd_ >= 0)
386 203 ::close(epoll_fd_);
387 406 }
388
389 void
390 203 epoll_scheduler::
391 shutdown()
392 {
393 {
394
1/1
✓ Branch 1 taken 203 times.
203 std::unique_lock lock(mutex_);
395 203 shutdown_ = true;
396
397
2/2
✓ Branch 1 taken 203 times.
✓ Branch 2 taken 203 times.
406 while (auto* h = completed_ops_.pop())
398 {
399
1/2
✓ Branch 0 taken 203 times.
✗ Branch 1 not taken.
203 if (h == &task_op_)
400 203 continue;
401 lock.unlock();
402 h->destroy();
403 lock.lock();
404 203 }
405
406 203 signal_all(lock);
407 203 }
408
409 203 outstanding_work_.store(0, std::memory_order_release);
410
411
1/2
✓ Branch 0 taken 203 times.
✗ Branch 1 not taken.
203 if (event_fd_ >= 0)
412 203 interrupt_reactor();
413 203 }
414
415 void
416 6851 epoll_scheduler::
417 post(std::coroutine_handle<> h) const
418 {
419 struct post_handler final
420 : scheduler_op
421 {
422 std::coroutine_handle<> h_;
423
424 explicit
425 6851 post_handler(std::coroutine_handle<> h)
426 6851 : h_(h)
427 {
428 6851 }
429
430 13702 ~post_handler() = default;
431
432 6851 void operator()() override
433 {
434 6851 auto h = h_;
435
1/2
✓ Branch 0 taken 6851 times.
✗ Branch 1 not taken.
6851 delete this;
436
1/1
✓ Branch 1 taken 6851 times.
6851 h.resume();
437 6851 }
438
439 void destroy() override
440 {
441 delete this;
442 }
443 };
444
445
1/1
✓ Branch 1 taken 6851 times.
6851 auto ph = std::make_unique<post_handler>(h);
446
447 // Fast path: same thread posts to private queue
448 // Only count locally; work_cleanup batches to global counter
449
2/2
✓ Branch 1 taken 5198 times.
✓ Branch 2 taken 1653 times.
6851 if (auto* ctx = find_context(this))
450 {
451 5198 ++ctx->private_outstanding_work;
452 5198 ctx->private_queue.push(ph.release());
453 5198 return;
454 }
455
456 // Slow path: cross-thread post requires mutex
457 1653 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
458
459
1/1
✓ Branch 1 taken 1653 times.
1653 std::unique_lock lock(mutex_);
460 1653 completed_ops_.push(ph.release());
461
1/1
✓ Branch 1 taken 1653 times.
1653 wake_one_thread_and_unlock(lock);
462 6851 }
463
464 void
465 83160 epoll_scheduler::
466 post(scheduler_op* h) const
467 {
468 // Fast path: same thread posts to private queue
469 // Only count locally; work_cleanup batches to global counter
470
2/2
✓ Branch 1 taken 83134 times.
✓ Branch 2 taken 26 times.
83160 if (auto* ctx = find_context(this))
471 {
472 83134 ++ctx->private_outstanding_work;
473 83134 ctx->private_queue.push(h);
474 83134 return;
475 }
476
477 // Slow path: cross-thread post requires mutex
478 26 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
479
480
1/1
✓ Branch 1 taken 26 times.
26 std::unique_lock lock(mutex_);
481 26 completed_ops_.push(h);
482
1/1
✓ Branch 1 taken 26 times.
26 wake_one_thread_and_unlock(lock);
483 26 }
484
485 void
486 5649 epoll_scheduler::
487 on_work_started() noexcept
488 {
489 5649 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
490 5649 }
491
492 void
493 5617 epoll_scheduler::
494 on_work_finished() noexcept
495 {
496
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5617 times.
11234 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
497 stop();
498 5617 }
499
500 bool
501 156525 epoll_scheduler::
502 running_in_this_thread() const noexcept
503 {
504
2/2
✓ Branch 1 taken 156285 times.
✓ Branch 2 taken 240 times.
156525 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
505
1/2
✓ Branch 0 taken 156285 times.
✗ Branch 1 not taken.
156285 if (c->key == this)
506 156285 return true;
507 240 return false;
508 }
509
510 void
511 44 epoll_scheduler::
512 stop()
513 {
514
1/1
✓ Branch 1 taken 44 times.
44 std::unique_lock lock(mutex_);
515
2/2
✓ Branch 0 taken 22 times.
✓ Branch 1 taken 22 times.
44 if (!stopped_)
516 {
517 22 stopped_ = true;
518 22 signal_all(lock);
519
1/1
✓ Branch 1 taken 22 times.
22 interrupt_reactor();
520 }
521 44 }
522
523 bool
524 18 epoll_scheduler::
525 stopped() const noexcept
526 {
527 18 std::unique_lock lock(mutex_);
528 36 return stopped_;
529 18 }
530
531 void
532 49 epoll_scheduler::
533 restart()
534 {
535
1/1
✓ Branch 1 taken 49 times.
49 std::unique_lock lock(mutex_);
536 49 stopped_ = false;
537 49 }
538
539 std::size_t
540 183 epoll_scheduler::
541 run()
542 {
543
2/2
✓ Branch 1 taken 33 times.
✓ Branch 2 taken 150 times.
366 if (outstanding_work_.load(std::memory_order_acquire) == 0)
544 {
545
1/1
✓ Branch 1 taken 33 times.
33 stop();
546 33 return 0;
547 }
548
549 150 thread_context_guard ctx(this);
550
1/1
✓ Branch 1 taken 150 times.
150 std::unique_lock lock(mutex_);
551
552 150 std::size_t n = 0;
553 for (;;)
554 {
555
3/3
✓ Branch 1 taken 153743 times.
✓ Branch 3 taken 150 times.
✓ Branch 4 taken 153593 times.
153743 if (!do_one(lock, -1, &ctx.frame_))
556 150 break;
557
1/2
✓ Branch 1 taken 153593 times.
✗ Branch 2 not taken.
153593 if (n != (std::numeric_limits<std::size_t>::max)())
558 153593 ++n;
559
2/2
✓ Branch 1 taken 70343 times.
✓ Branch 2 taken 83250 times.
153593 if (!lock.owns_lock())
560
1/1
✓ Branch 1 taken 70343 times.
70343 lock.lock();
561 }
562 150 return n;
563 150 }
564
565 std::size_t
566 2 epoll_scheduler::
567 run_one()
568 {
569
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
570 {
571 stop();
572 return 0;
573 }
574
575 2 thread_context_guard ctx(this);
576
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
577
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, -1, &ctx.frame_);
578 2 }
579
580 std::size_t
581 34 epoll_scheduler::
582 wait_one(long usec)
583 {
584
2/2
✓ Branch 1 taken 7 times.
✓ Branch 2 taken 27 times.
68 if (outstanding_work_.load(std::memory_order_acquire) == 0)
585 {
586
1/1
✓ Branch 1 taken 7 times.
7 stop();
587 7 return 0;
588 }
589
590 27 thread_context_guard ctx(this);
591
1/1
✓ Branch 1 taken 27 times.
27 std::unique_lock lock(mutex_);
592
1/1
✓ Branch 1 taken 27 times.
27 return do_one(lock, usec, &ctx.frame_);
593 27 }
594
595 std::size_t
596 2 epoll_scheduler::
597 poll()
598 {
599
2/2
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
600 {
601
1/1
✓ Branch 1 taken 1 time.
1 stop();
602 1 return 0;
603 }
604
605 1 thread_context_guard ctx(this);
606
1/1
✓ Branch 1 taken 1 time.
1 std::unique_lock lock(mutex_);
607
608 1 std::size_t n = 0;
609 for (;;)
610 {
611
3/3
✓ Branch 1 taken 3 times.
✓ Branch 3 taken 1 time.
✓ Branch 4 taken 2 times.
3 if (!do_one(lock, 0, &ctx.frame_))
612 1 break;
613
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (n != (std::numeric_limits<std::size_t>::max)())
614 2 ++n;
615
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (!lock.owns_lock())
616
1/1
✓ Branch 1 taken 2 times.
2 lock.lock();
617 }
618 1 return n;
619 1 }
620
621 std::size_t
622 4 epoll_scheduler::
623 poll_one()
624 {
625
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
8 if (outstanding_work_.load(std::memory_order_acquire) == 0)
626 {
627
1/1
✓ Branch 1 taken 2 times.
2 stop();
628 2 return 0;
629 }
630
631 2 thread_context_guard ctx(this);
632
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
633
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, 0, &ctx.frame_);
634 2 }
635
636 void
637 9780 epoll_scheduler::
638 register_descriptor(int fd, descriptor_state* desc) const
639 {
640 9780 epoll_event ev{};
641 9780 ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
642 9780 ev.data.ptr = desc;
643
644
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 9780 times.
9780 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
645 detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
646
647 9780 desc->registered_events = ev.events;
648 9780 desc->fd = fd;
649 9780 desc->scheduler_ = this;
650
651
1/1
✓ Branch 1 taken 9780 times.
9780 std::lock_guard lock(desc->mutex);
652 9780 desc->read_ready = false;
653 9780 desc->write_ready = false;
654 9780 }
655
656 void
657 9780 epoll_scheduler::
658 deregister_descriptor(int fd) const
659 {
660 9780 ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
661 9780 }
662
663 void
664 9914 epoll_scheduler::
665 work_started() const noexcept
666 {
667 9914 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
668 9914 }
669
670 void
671 16696 epoll_scheduler::
672 work_finished() const noexcept
673 {
674
2/2
✓ Branch 0 taken 158 times.
✓ Branch 1 taken 16538 times.
33392 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
675 {
676 // Last work item completed - wake all threads so they can exit.
677 // signal_all() wakes threads waiting on the condvar.
678 // interrupt_reactor() wakes the reactor thread blocked in epoll_wait().
679 // Both are needed because they target different blocking mechanisms.
680 158 std::unique_lock lock(mutex_);
681 158 signal_all(lock);
682
2/6
✗ Branch 1 not taken.
✓ Branch 2 taken 158 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 158 times.
158 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
683 {
684 task_interrupted_ = true;
685 lock.unlock();
686 interrupt_reactor();
687 }
688 158 }
689 16696 }
690
691 void
692 53859 epoll_scheduler::
693 compensating_work_started() const noexcept
694 {
695 53859 auto* ctx = find_context(this);
696
1/2
✓ Branch 0 taken 53859 times.
✗ Branch 1 not taken.
53859 if (ctx)
697 53859 ++ctx->private_outstanding_work;
698 53859 }
699
700 void
701 epoll_scheduler::
702 drain_thread_queue(op_queue& queue, long count) const
703 {
704 // Note: outstanding_work_ was already incremented when posting
705 std::unique_lock lock(mutex_);
706 completed_ops_.splice(queue);
707 if (count > 0)
708 maybe_unlock_and_signal_one(lock);
709 }
710
711 void
712 9756 epoll_scheduler::
713 post_deferred_completions(op_queue& ops) const
714 {
715
1/2
✓ Branch 1 taken 9756 times.
✗ Branch 2 not taken.
9756 if (ops.empty())
716 9756 return;
717
718 // Fast path: if on scheduler thread, use private queue
719 if (auto* ctx = find_context(this))
720 {
721 ctx->private_queue.splice(ops);
722 return;
723 }
724
725 // Slow path: add to global queue and wake a thread
726 std::unique_lock lock(mutex_);
727 completed_ops_.splice(ops);
728 wake_one_thread_and_unlock(lock);
729 }
730
731 void
732 251 epoll_scheduler::
733 interrupt_reactor() const
734 {
735 // Only write if not already armed to avoid redundant writes
736 251 bool expected = false;
737
2/2
✓ Branch 1 taken 235 times.
✓ Branch 2 taken 16 times.
251 if (eventfd_armed_.compare_exchange_strong(expected, true,
738 std::memory_order_release, std::memory_order_relaxed))
739 {
740 235 std::uint64_t val = 1;
741
1/1
✓ Branch 1 taken 235 times.
235 [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
742 }
743 251 }
744
745 void
746 383 epoll_scheduler::
747 signal_all(std::unique_lock<std::mutex>&) const
748 {
749 383 state_ |= 1;
750 383 cond_.notify_all();
751 383 }
752
753 bool
754 1679 epoll_scheduler::
755 maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
756 {
757 1679 state_ |= 1;
758
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1679 times.
1679 if (state_ > 1)
759 {
760 lock.unlock();
761 cond_.notify_one();
762 return true;
763 }
764 1679 return false;
765 }
766
767 void
768 194695 epoll_scheduler::
769 unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
770 {
771 194695 state_ |= 1;
772 194695 bool have_waiters = state_ > 1;
773 194695 lock.unlock();
774
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 194695 times.
194695 if (have_waiters)
775 cond_.notify_one();
776 194695 }
777
778 void
779 epoll_scheduler::
780 clear_signal() const
781 {
782 state_ &= ~std::size_t(1);
783 }
784
785 void
786 epoll_scheduler::
787 wait_for_signal(std::unique_lock<std::mutex>& lock) const
788 {
789 while ((state_ & 1) == 0)
790 {
791 state_ += 2;
792 cond_.wait(lock);
793 state_ -= 2;
794 }
795 }
796
797 void
798 epoll_scheduler::
799 wait_for_signal_for(
800 std::unique_lock<std::mutex>& lock,
801 long timeout_us) const
802 {
803 if ((state_ & 1) == 0)
804 {
805 state_ += 2;
806 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
807 state_ -= 2;
808 }
809 }
810
811 void
812 1679 epoll_scheduler::
813 wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const
814 {
815
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1679 times.
1679 if (maybe_unlock_and_signal_one(lock))
816 return;
817
818
5/6
✓ Branch 1 taken 26 times.
✓ Branch 2 taken 1653 times.
✓ Branch 3 taken 26 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 26 times.
✓ Branch 6 taken 1653 times.
1679 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
819 {
820 26 task_interrupted_ = true;
821 26 lock.unlock();
822 26 interrupt_reactor();
823 }
824 else
825 {
826 1653 lock.unlock();
827 }
828 }
829
830 /** RAII guard for handler execution work accounting.
831
832 Handler consumes 1 work item, may produce N new items via fast-path posts.
833 Net change = N - 1:
834 - If N > 1: add (N-1) to global (more work produced than consumed)
835 - If N == 1: net zero, do nothing
836 - If N < 1: call work_finished() (work consumed, may trigger stop)
837
838 Also drains private queue to global for other threads to process.
839 */
840 struct work_cleanup
841 {
842 epoll_scheduler const* scheduler;
843 std::unique_lock<std::mutex>* lock;
844 scheduler_context* ctx;
845
846 153626 ~work_cleanup()
847 {
848
1/2
✓ Branch 0 taken 153626 times.
✗ Branch 1 not taken.
153626 if (ctx)
849 {
850 153626 long produced = ctx->private_outstanding_work;
851
2/2
✓ Branch 0 taken 7 times.
✓ Branch 1 taken 153619 times.
153626 if (produced > 1)
852 7 scheduler->outstanding_work_.fetch_add(produced - 1, std::memory_order_relaxed);
853
2/2
✓ Branch 0 taken 16506 times.
✓ Branch 1 taken 137113 times.
153619 else if (produced < 1)
854 16506 scheduler->work_finished();
855 // produced == 1: net zero, handler consumed what it produced
856 153626 ctx->private_outstanding_work = 0;
857
858
2/2
✓ Branch 1 taken 83261 times.
✓ Branch 2 taken 70365 times.
153626 if (!ctx->private_queue.empty())
859 {
860 83261 lock->lock();
861 83261 scheduler->completed_ops_.splice(ctx->private_queue);
862 }
863 }
864 else
865 {
866 // No thread context - slow-path op was already counted globally
867 scheduler->work_finished();
868 }
869 153626 }
870 };
871
872 /** RAII guard for reactor work accounting.
873
874 Reactor only produces work via timer/signal callbacks posting handlers.
875 Unlike handler execution which consumes 1, the reactor consumes nothing.
876 All produced work must be flushed to global counter.
877 */
878 struct task_cleanup
879 {
880 epoll_scheduler const* scheduler;
881 std::unique_lock<std::mutex>* lock;
882 scheduler_context* ctx;
883
884 51054 ~task_cleanup()
885 51054 {
886
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 51054 times.
51054 if (!ctx)
887 return;
888
889
2/2
✓ Branch 0 taken 5059 times.
✓ Branch 1 taken 45995 times.
51054 if (ctx->private_outstanding_work > 0)
890 {
891 5059 scheduler->outstanding_work_.fetch_add(
892 5059 ctx->private_outstanding_work, std::memory_order_relaxed);
893 5059 ctx->private_outstanding_work = 0;
894 }
895
896
2/2
✓ Branch 1 taken 5059 times.
✓ Branch 2 taken 45995 times.
51054 if (!ctx->private_queue.empty())
897 {
898
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5059 times.
5059 if (!lock->owns_lock())
899 lock->lock();
900 5059 scheduler->completed_ops_.splice(ctx->private_queue);
901 }
902 51054 }
903 };
904
905 void
906 10113 epoll_scheduler::
907 update_timerfd() const
908 {
909 10113 auto nearest = timer_svc_->nearest_expiry();
910
911 10113 itimerspec ts{};
912 10113 int flags = 0;
913
914
3/3
✓ Branch 2 taken 10113 times.
✓ Branch 4 taken 10068 times.
✓ Branch 5 taken 45 times.
10113 if (nearest == timer_service::time_point::max())
915 {
916 // No timers - disarm by setting to 0 (relative)
917 }
918 else
919 {
920 10068 auto now = std::chrono::steady_clock::now();
921
3/3
✓ Branch 1 taken 10068 times.
✓ Branch 4 taken 75 times.
✓ Branch 5 taken 9993 times.
10068 if (nearest <= now)
922 {
923 // Use 1ns instead of 0 - zero disarms the timerfd
924 75 ts.it_value.tv_nsec = 1;
925 }
926 else
927 {
928 9993 auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
929
1/1
✓ Branch 1 taken 9993 times.
19986 nearest - now).count();
930 9993 ts.it_value.tv_sec = nsec / 1000000000;
931 9993 ts.it_value.tv_nsec = nsec % 1000000000;
932 // Ensure non-zero to avoid disarming if duration rounds to 0
933
3/4
✓ Branch 0 taken 9982 times.
✓ Branch 1 taken 11 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 9982 times.
9993 if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
934 ts.it_value.tv_nsec = 1;
935 }
936 }
937
938
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 10113 times.
10113 if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
939 detail::throw_system_error(make_err(errno), "timerfd_settime");
940 10113 }
941
942 void
943 51054 epoll_scheduler::
944 run_task(std::unique_lock<std::mutex>& lock, scheduler_context* ctx)
945 {
946
2/2
✓ Branch 0 taken 41069 times.
✓ Branch 1 taken 9985 times.
51054 int timeout_ms = task_interrupted_ ? 0 : -1;
947
948
2/2
✓ Branch 1 taken 9985 times.
✓ Branch 2 taken 41069 times.
51054 if (lock.owns_lock())
949
1/1
✓ Branch 1 taken 9985 times.
9985 lock.unlock();
950
951 51054 task_cleanup on_exit{this, &lock, ctx};
952
953 // Flush deferred timerfd programming before blocking
954
2/2
✓ Branch 1 taken 5054 times.
✓ Branch 2 taken 46000 times.
51054 if (timerfd_stale_.exchange(false, std::memory_order_acquire))
955
1/1
✓ Branch 1 taken 5054 times.
5054 update_timerfd();
956
957 // Event loop runs without mutex held
958 epoll_event events[128];
959
1/1
✓ Branch 1 taken 51054 times.
51054 int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
960
961
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 51054 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
51054 if (nfds < 0 && errno != EINTR)
962 detail::throw_system_error(make_err(errno), "epoll_wait");
963
964 51054 bool check_timers = false;
965 51054 op_queue local_ops;
966
967 // Process events without holding the mutex
968
2/2
✓ Branch 0 taken 68706 times.
✓ Branch 1 taken 51054 times.
119760 for (int i = 0; i < nfds; ++i)
969 {
970
2/2
✓ Branch 0 taken 32 times.
✓ Branch 1 taken 68674 times.
68706 if (events[i].data.ptr == nullptr)
971 {
972 std::uint64_t val;
973
1/1
✓ Branch 1 taken 32 times.
32 [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
974 32 eventfd_armed_.store(false, std::memory_order_relaxed);
975 32 continue;
976 32 }
977
978
2/2
✓ Branch 0 taken 5059 times.
✓ Branch 1 taken 63615 times.
68674 if (events[i].data.ptr == &timer_fd_)
979 {
980 std::uint64_t expirations;
981
1/1
✓ Branch 1 taken 5059 times.
5059 [[maybe_unused]] auto r = ::read(timer_fd_, &expirations, sizeof(expirations));
982 5059 check_timers = true;
983 5059 continue;
984 5059 }
985
986 // Deferred I/O: just set ready events and enqueue descriptor
987 // No per-descriptor mutex locking in reactor hot path!
988 63615 auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
989 63615 desc->add_ready_events(events[i].events);
990
991 // Only enqueue if not already enqueued
992 63615 bool expected = false;
993
1/2
✓ Branch 1 taken 63615 times.
✗ Branch 2 not taken.
63615 if (desc->is_enqueued_.compare_exchange_strong(expected, true,
994 std::memory_order_release, std::memory_order_relaxed))
995 {
996 63615 local_ops.push(desc);
997 }
998 }
999
1000 // Process timers only when timerfd fires
1001
2/2
✓ Branch 0 taken 5059 times.
✓ Branch 1 taken 45995 times.
51054 if (check_timers)
1002 {
1003
1/1
✓ Branch 1 taken 5059 times.
5059 timer_svc_->process_expired();
1004
1/1
✓ Branch 1 taken 5059 times.
5059 update_timerfd();
1005 }
1006
1007
1/1
✓ Branch 1 taken 51054 times.
51054 lock.lock();
1008
1009
2/2
✓ Branch 1 taken 40618 times.
✓ Branch 2 taken 10436 times.
51054 if (!local_ops.empty())
1010 40618 completed_ops_.splice(local_ops);
1011 51054 }
1012
1013 std::size_t
1014 153777 epoll_scheduler::
1015 do_one(std::unique_lock<std::mutex>& lock, long timeout_us, scheduler_context* ctx)
1016 {
1017 for (;;)
1018 {
1019
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 204830 times.
204831 if (stopped_)
1020 1 return 0;
1021
1022 204830 scheduler_op* op = completed_ops_.pop();
1023
1024 // Handle reactor sentinel - time to poll for I/O
1025
2/2
✓ Branch 0 taken 51204 times.
✓ Branch 1 taken 153626 times.
204830 if (op == &task_op_)
1026 {
1027 51204 bool more_handlers = !completed_ops_.empty();
1028
1029 // Nothing to run the reactor for: no pending work to wait on,
1030 // or caller requested a non-blocking poll
1031
4/4
✓ Branch 0 taken 10135 times.
✓ Branch 1 taken 41069 times.
✓ Branch 2 taken 150 times.
✓ Branch 3 taken 51054 times.
61339 if (!more_handlers &&
1032
3/4
✓ Branch 1 taken 9985 times.
✓ Branch 2 taken 150 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 9985 times.
20270 (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1033 timeout_us == 0))
1034 {
1035 150 completed_ops_.push(&task_op_);
1036 150 return 0;
1037 }
1038
1039
3/4
✓ Branch 0 taken 9985 times.
✓ Branch 1 taken 41069 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 9985 times.
51054 task_interrupted_ = more_handlers || timeout_us == 0;
1040 51054 task_running_.store(true, std::memory_order_release);
1041
1042
2/2
✓ Branch 0 taken 41069 times.
✓ Branch 1 taken 9985 times.
51054 if (more_handlers)
1043 41069 unlock_and_signal_one(lock);
1044
1045 51054 run_task(lock, ctx);
1046
1047 51054 task_running_.store(false, std::memory_order_relaxed);
1048 51054 completed_ops_.push(&task_op_);
1049 51054 continue;
1050 51054 }
1051
1052 // Handle operation
1053
1/2
✓ Branch 0 taken 153626 times.
✗ Branch 1 not taken.
153626 if (op != nullptr)
1054 {
1055
1/2
✓ Branch 1 taken 153626 times.
✗ Branch 2 not taken.
153626 if (!completed_ops_.empty())
1056
1/1
✓ Branch 1 taken 153626 times.
153626 unlock_and_signal_one(lock);
1057 else
1058 lock.unlock();
1059
1060 153626 work_cleanup on_exit{this, &lock, ctx};
1061
1062
1/1
✓ Branch 1 taken 153626 times.
153626 (*op)();
1063 153626 return 1;
1064 153626 }
1065
1066 // No pending work to wait on, or caller requested non-blocking poll
1067 if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1068 timeout_us == 0)
1069 return 0;
1070
1071 clear_signal();
1072 if (timeout_us < 0)
1073 wait_for_signal(lock);
1074 else
1075 wait_for_signal_for(lock, timeout_us);
1076 51054 }
1077 }
1078
1079 } // namespace boost::corosio::detail
1080
1081 #endif
1082