libs/corosio/src/corosio/src/detail/timer_service.cpp

87.2% Lines (321/368) 93.3% Functions (42/45) 73.3% Branches (126/172)
libs/corosio/src/corosio/src/detail/timer_service.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include "src/detail/timer_service.hpp"
11 #include "src/detail/scheduler_impl.hpp"
12
13 #include <boost/corosio/basic_io_context.hpp>
14 #include <boost/corosio/detail/thread_local_ptr.hpp>
15 #include "src/detail/scheduler_op.hpp"
16 #include "src/detail/intrusive.hpp"
17 #include <boost/capy/error.hpp>
18 #include <boost/capy/ex/executor_ref.hpp>
19 #include <system_error>
20
21 #include <atomic>
22 #include <coroutine>
23 #include <limits>
24 #include <mutex>
25 #include <optional>
26 #include <stop_token>
27 #include <vector>
28
29 /*
30 Timer Service
31 =============
32
33 The public timer class holds an opaque timer_impl* and forwards
34 all operations through extern free functions defined at the bottom
35 of this file.
36
37 Data Structures
38 ---------------
39 waiter_node holds per-waiter state: coroutine handle, executor,
40 error output, stop_token, embedded completion_op. Each concurrent
41 co_await t.wait() allocates one waiter_node.
42
43 timer_impl holds per-timer state: expiry, heap index, and an
44 intrusive_list of waiter_nodes. Multiple coroutines can wait on
45 the same timer simultaneously.
46
47 timer_service_impl owns a min-heap of active timers, a free list
48 of recycled impls, and a free list of recycled waiter_nodes. The
49 heap is ordered by expiry time; the scheduler queries
50 nearest_expiry() to set the epoll/timerfd timeout.
51
52 Optimization Strategy
53 ---------------------
54 The common timer lifecycle is: construct, set expiry, cancel or
55 wait, destroy. Several optimizations target this path:
56
57 1. Deferred heap insertion — expires_after() stores the expiry
58 but does not insert into the heap. Insertion happens in
59 wait(). If the timer is cancelled or destroyed before wait(),
60 the heap is never touched and no mutex is taken. This also
61 enables the already-expired fast path: when wait() sees
62 expiry <= now before inserting, it posts the coroutine
63 handle to the executor and returns noop_coroutine — no
64 heap, no mutex, no epoll. This is only possible because
65 the coroutine API guarantees wait() always follows
66 expires_after(); callback APIs cannot assume this call
67 order.
68
69 2. Thread-local impl cache — A single-slot per-thread cache of
70 timer_impl avoids the mutex on create/destroy for the common
71 create-then-destroy-on-same-thread pattern. On pop, if the
72 cached impl's svc_ doesn't match the current service, the
73 stale impl is deleted eagerly rather than reused.
74
75 3. Embedded completion_op — Each waiter_node embeds a
76 scheduler_op subclass, eliminating heap allocation per
77 fire/cancel. Its destroy() is a no-op since the waiter_node
78 owns the lifetime.
79
80 4. Cached nearest expiry — An atomic<int64_t> mirrors the heap
81 root's time, updated under the lock. nearest_expiry() and
82 empty() read the atomic without locking.
83
84 5. might_have_pending_waits_ flag — Set on wait(), cleared on
85 cancel. Lets cancel_timer() return without locking when no
86 wait was ever issued.
87
88 6. Thread-local waiter cache — Single-slot per-thread cache of
89 waiter_node avoids the free-list mutex for the common
90 wait-then-complete-on-same-thread pattern.
91
92 With all fast paths hit (idle timer, same thread), the
93 schedule/cancel cycle takes zero mutex locks.
94
95 Concurrency
96 -----------
97 stop_token callbacks can fire from any thread. The impl_
98 pointer on waiter_node is used as a "still in list" marker:
99 set to nullptr under the mutex when a waiter is removed by
100 cancel_timer() or process_expired(). cancel_waiter() checks
101 this under the mutex to avoid double-removal races.
102
103 Multiple io_contexts in the same program are safe. The
104 service pointer is obtained directly from the scheduler,
105 and TL-cached impls are validated by comparing svc_ against
106 the current service pointer. Waiter nodes have no service
107 affinity and can safely migrate between contexts.
108 */
109
110 namespace boost::corosio::detail {
111
112 class timer_service_impl;
113 struct timer_impl;
114 struct waiter_node;
115
116 void timer_service_invalidate_cache() noexcept;
117
118 struct waiter_node
119 : intrusive_list<waiter_node>::node
120 {
121 // Embedded completion op — avoids heap allocation per fire/cancel
122 struct completion_op final : scheduler_op
123 {
124 waiter_node* waiter_ = nullptr;
125
126 static void do_complete(
127 void* owner,
128 scheduler_op* base,
129 std::uint32_t,
130 std::uint32_t);
131
132 142 completion_op() noexcept
133 142 : scheduler_op(&do_complete)
134 {
135 142 }
136
137 void operator()() override;
138 // No-op — lifetime owned by waiter_node, not the scheduler queue
139 void destroy() override {}
140 };
141
142 // Per-waiter stop_token cancellation
143 struct canceller
144 {
145 waiter_node* waiter_;
146 void operator()() const;
147 };
148
149 // nullptr once removed from timer's waiter list (concurrency marker)
150 timer_impl* impl_ = nullptr;
151 timer_service_impl* svc_ = nullptr;
152 std::coroutine_handle<> h_;
153 capy::executor_ref d_;
154 std::error_code* ec_out_ = nullptr;
155 std::stop_token token_;
156 std::optional<std::stop_callback<canceller>> stop_cb_;
157 completion_op op_;
158 std::error_code ec_value_;
159 waiter_node* next_free_ = nullptr;
160
161 142 waiter_node() noexcept
162 142 {
163 142 op_.waiter_ = this;
164 142 }
165 };
166
167 struct timer_impl
168 : timer::timer_impl
169 {
170 using clock_type = std::chrono::steady_clock;
171 using time_point = clock_type::time_point;
172 using duration = clock_type::duration;
173
174 timer_service_impl* svc_ = nullptr;
175 time_point expiry_;
176 std::size_t heap_index_ = (std::numeric_limits<std::size_t>::max)();
177 // Lets cancel_timer() skip the lock when no wait() was ever issued
178 bool might_have_pending_waits_ = false;
179 intrusive_list<waiter_node> waiters_;
180
181 // Free list linkage (reused when impl is on free_list)
182 timer_impl* next_free_ = nullptr;
183
184 explicit timer_impl(timer_service_impl& svc) noexcept;
185
186
187 void release() override;
188
189 std::coroutine_handle<> wait(
190 std::coroutine_handle<>,
191 capy::executor_ref,
192 std::stop_token,
193 std::error_code*) override;
194 };
195
196 timer_impl* try_pop_tl_cache(timer_service_impl*) noexcept;
197 bool try_push_tl_cache(timer_impl*) noexcept;
198 waiter_node* try_pop_waiter_tl_cache() noexcept;
199 bool try_push_waiter_tl_cache(waiter_node*) noexcept;
200
201 class timer_service_impl : public timer_service
202 {
203 public:
204 using clock_type = std::chrono::steady_clock;
205 using time_point = clock_type::time_point;
206 using key_type = timer_service;
207
208 private:
209 struct heap_entry
210 {
211 time_point time_;
212 timer_impl* timer_;
213 };
214
215 scheduler* sched_ = nullptr;
216 mutable std::mutex mutex_;
217 std::vector<heap_entry> heap_;
218 timer_impl* free_list_ = nullptr;
219 waiter_node* waiter_free_list_ = nullptr;
220 callback on_earliest_changed_;
221 // Avoids mutex in nearest_expiry() and empty()
222 mutable std::atomic<std::int64_t> cached_nearest_ns_{
223 (std::numeric_limits<std::int64_t>::max)()};
224
225 public:
226 336 timer_service_impl(capy::execution_context&, scheduler& sched)
227 336 : timer_service()
228 336 , sched_(&sched)
229 {
230 336 }
231
232 17078 scheduler& get_scheduler() noexcept { return *sched_; }
233
234 672 ~timer_service_impl() = default;
235
236 timer_service_impl(timer_service_impl const&) = delete;
237 timer_service_impl& operator=(timer_service_impl const&) = delete;
238
239 336 void set_on_earliest_changed(callback cb) override
240 {
241 336 on_earliest_changed_ = cb;
242 336 }
243
244 336 void shutdown() override
245 {
246 336 timer_service_invalidate_cache();
247
248 // Cancel waiting timers still in the heap
249
1/2
✗ Branch 5 not taken.
✓ Branch 6 taken 336 times.
336 for (auto& entry : heap_)
250 {
251 auto* impl = entry.timer_;
252 while (auto* w = impl->waiters_.pop_front())
253 {
254 w->stop_cb_.reset();
255 w->h_.destroy();
256 sched_->on_work_finished();
257 delete w;
258 }
259 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
260 delete impl;
261 }
262 336 heap_.clear();
263 336 cached_nearest_ns_.store(
264 (std::numeric_limits<std::int64_t>::max)(),
265 std::memory_order_release);
266
267 // Delete free-listed impls
268
2/2
✓ Branch 0 taken 48 times.
✓ Branch 1 taken 336 times.
384 while (free_list_)
269 {
270 48 auto* next = free_list_->next_free_;
271
1/2
✓ Branch 0 taken 48 times.
✗ Branch 1 not taken.
48 delete free_list_;
272 48 free_list_ = next;
273 }
274
275 // Delete free-listed waiters
276
2/2
✓ Branch 0 taken 58 times.
✓ Branch 1 taken 336 times.
394 while (waiter_free_list_)
277 {
278 58 auto* next = waiter_free_list_->next_free_;
279
1/2
✓ Branch 0 taken 58 times.
✗ Branch 1 not taken.
58 delete waiter_free_list_;
280 58 waiter_free_list_ = next;
281 }
282 336 }
283
284 8775 timer::timer_impl* create_impl() override
285 {
286 8775 timer_impl* impl = try_pop_tl_cache(this);
287
2/2
✓ Branch 0 taken 8602 times.
✓ Branch 1 taken 173 times.
8775 if (impl)
288 {
289 8602 impl->svc_ = this;
290 8602 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
291 8602 impl->might_have_pending_waits_ = false;
292 8602 return impl;
293 }
294
295
1/1
✓ Branch 1 taken 173 times.
173 std::lock_guard lock(mutex_);
296
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 173 times.
173 if (free_list_)
297 {
298 impl = free_list_;
299 free_list_ = impl->next_free_;
300 impl->next_free_ = nullptr;
301 impl->svc_ = this;
302 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
303 impl->might_have_pending_waits_ = false;
304 }
305 else
306 {
307
1/1
✓ Branch 1 taken 173 times.
173 impl = new timer_impl(*this);
308 }
309 173 return impl;
310 173 }
311
312 8775 void destroy_impl(timer_impl& impl)
313 {
314
1/1
✓ Branch 1 taken 8775 times.
8775 cancel_timer(impl);
315
316
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 8775 times.
8775 if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
317 {
318 std::lock_guard lock(mutex_);
319 remove_timer_impl(impl);
320 refresh_cached_nearest();
321 }
322
323
2/2
✓ Branch 1 taken 8727 times.
✓ Branch 2 taken 48 times.
8775 if (try_push_tl_cache(&impl))
324 8727 return;
325
326
1/1
✓ Branch 1 taken 48 times.
48 std::lock_guard lock(mutex_);
327 48 impl.next_free_ = free_list_;
328 48 free_list_ = &impl;
329 48 }
330
331 8539 waiter_node* create_waiter()
332 {
333
2/2
✓ Branch 1 taken 8397 times.
✓ Branch 2 taken 142 times.
8539 if (auto* w = try_pop_waiter_tl_cache())
334 8397 return w;
335
336
1/1
✓ Branch 1 taken 142 times.
142 std::lock_guard lock(mutex_);
337
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 142 times.
142 if (waiter_free_list_)
338 {
339 auto* w = waiter_free_list_;
340 waiter_free_list_ = w->next_free_;
341 w->next_free_ = nullptr;
342 return w;
343 }
344
345
1/1
✓ Branch 1 taken 142 times.
142 return new waiter_node();
346 142 }
347
348 8539 void destroy_waiter(waiter_node* w)
349 {
350
2/2
✓ Branch 1 taken 8481 times.
✓ Branch 2 taken 58 times.
8539 if (try_push_waiter_tl_cache(w))
351 8481 return;
352
353
1/1
✓ Branch 1 taken 58 times.
58 std::lock_guard lock(mutex_);
354 58 w->next_free_ = waiter_free_list_;
355 58 waiter_free_list_ = w;
356 58 }
357
358 // Heap insertion deferred to wait() — avoids lock when timer is idle
359 8782 std::size_t update_timer(timer_impl& impl, time_point new_time)
360 {
361 bool in_heap =
362 8782 (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
363
5/6
✓ Branch 0 taken 8776 times.
✓ Branch 1 taken 6 times.
✓ Branch 3 taken 8776 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 8776 times.
✓ Branch 6 taken 6 times.
8782 if (!in_heap && impl.waiters_.empty())
364 8776 return 0;
365
366 6 bool notify = false;
367 6 intrusive_list<waiter_node> canceled;
368
369 {
370
1/1
✓ Branch 1 taken 6 times.
6 std::lock_guard lock(mutex_);
371
372
2/2
✓ Branch 1 taken 10 times.
✓ Branch 2 taken 6 times.
16 while (auto* w = impl.waiters_.pop_front())
373 {
374 10 w->impl_ = nullptr;
375 10 canceled.push_back(w);
376 10 }
377
378
1/2
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
6 if (impl.heap_index_ < heap_.size())
379 {
380 6 time_point old_time = heap_[impl.heap_index_].time_;
381 6 heap_[impl.heap_index_].time_ = new_time;
382
383
2/3
✓ Branch 1 taken 6 times.
✓ Branch 4 taken 6 times.
✗ Branch 5 not taken.
6 if (new_time < old_time)
384
1/1
✓ Branch 1 taken 6 times.
6 up_heap(impl.heap_index_);
385 else
386 down_heap(impl.heap_index_);
387
388 6 notify = (impl.heap_index_ == 0);
389 }
390
391 6 refresh_cached_nearest();
392 6 }
393
394 6 std::size_t count = 0;
395
2/2
✓ Branch 1 taken 10 times.
✓ Branch 2 taken 6 times.
16 while (auto* w = canceled.pop_front())
396 {
397 10 w->ec_value_ = make_error_code(capy::error::canceled);
398
1/1
✓ Branch 1 taken 10 times.
10 sched_->post(&w->op_);
399 10 ++count;
400 10 }
401
402
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6 if (notify)
403
1/1
✓ Branch 1 taken 6 times.
6 on_earliest_changed_();
404
405 6 return count;
406 }
407
408 // Inserts timer into heap if needed and pushes waiter, all under
409 // one lock to prevent races with cancel_waiter/process_expired
410 8539 void insert_waiter(timer_impl& impl, waiter_node* w)
411 {
412 8539 bool notify = false;
413 {
414
1/1
✓ Branch 1 taken 8539 times.
8539 std::lock_guard lock(mutex_);
415
2/2
✓ Branch 1 taken 8517 times.
✓ Branch 2 taken 22 times.
8539 if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
416 {
417 8517 impl.heap_index_ = heap_.size();
418
1/1
✓ Branch 1 taken 8517 times.
8517 heap_.push_back({impl.expiry_, &impl});
419
1/1
✓ Branch 2 taken 8517 times.
8517 up_heap(heap_.size() - 1);
420 8517 notify = (impl.heap_index_ == 0);
421 8517 refresh_cached_nearest();
422 }
423 8539 impl.waiters_.push_back(w);
424 8539 }
425
2/2
✓ Branch 0 taken 8505 times.
✓ Branch 1 taken 34 times.
8539 if (notify)
426 8505 on_earliest_changed_();
427 8539 }
428
429 8795 std::size_t cancel_timer(timer_impl& impl)
430 {
431
2/2
✓ Branch 0 taken 8779 times.
✓ Branch 1 taken 16 times.
8795 if (!impl.might_have_pending_waits_)
432 8779 return 0;
433
434 // Not in heap and no waiters — just clear the flag
435 16 if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)()
436
2/6
✗ Branch 0 not taken.
✓ Branch 1 taken 16 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 16 times.
16 && impl.waiters_.empty())
437 {
438 impl.might_have_pending_waits_ = false;
439 return 0;
440 }
441
442 16 intrusive_list<waiter_node> canceled;
443
444 {
445
1/1
✓ Branch 1 taken 16 times.
16 std::lock_guard lock(mutex_);
446
1/1
✓ Branch 1 taken 16 times.
16 remove_timer_impl(impl);
447
2/2
✓ Branch 1 taken 20 times.
✓ Branch 2 taken 16 times.
36 while (auto* w = impl.waiters_.pop_front())
448 {
449 20 w->impl_ = nullptr;
450 20 canceled.push_back(w);
451 20 }
452 16 refresh_cached_nearest();
453 16 }
454
455 16 impl.might_have_pending_waits_ = false;
456
457 16 std::size_t count = 0;
458
2/2
✓ Branch 1 taken 20 times.
✓ Branch 2 taken 16 times.
36 while (auto* w = canceled.pop_front())
459 {
460 20 w->ec_value_ = make_error_code(capy::error::canceled);
461
1/1
✓ Branch 1 taken 20 times.
20 sched_->post(&w->op_);
462 20 ++count;
463 20 }
464
465 16 return count;
466 }
467
468 // Cancel a single waiter (called from stop_token callback, any thread)
469 4 void cancel_waiter(waiter_node* w)
470 {
471 {
472
1/1
✓ Branch 1 taken 4 times.
4 std::lock_guard lock(mutex_);
473 // Already removed by cancel_timer or process_expired
474
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4 times.
4 if (!w->impl_)
475 return;
476 4 auto* impl = w->impl_;
477 4 w->impl_ = nullptr;
478 4 impl->waiters_.remove(w);
479
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
4 if (impl->waiters_.empty())
480 {
481
1/1
✓ Branch 1 taken 2 times.
2 remove_timer_impl(*impl);
482 2 impl->might_have_pending_waits_ = false;
483 }
484 4 refresh_cached_nearest();
485 4 }
486
487 4 w->ec_value_ = make_error_code(capy::error::canceled);
488 4 sched_->post(&w->op_);
489 }
490
491 // Cancel front waiter only (FIFO), return 0 or 1
492 4 std::size_t cancel_one_waiter(timer_impl& impl)
493 {
494
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 2 times.
4 if (!impl.might_have_pending_waits_)
495 2 return 0;
496
497 2 waiter_node* w = nullptr;
498
499 {
500
1/1
✓ Branch 1 taken 2 times.
2 std::lock_guard lock(mutex_);
501 2 w = impl.waiters_.pop_front();
502
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (!w)
503 return 0;
504 2 w->impl_ = nullptr;
505
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
2 if (impl.waiters_.empty())
506 {
507 remove_timer_impl(impl);
508 impl.might_have_pending_waits_ = false;
509 }
510 2 refresh_cached_nearest();
511 2 }
512
513 2 w->ec_value_ = make_error_code(capy::error::canceled);
514 2 sched_->post(&w->op_);
515 2 return 1;
516 }
517
518 bool empty() const noexcept override
519 {
520 return cached_nearest_ns_.load(std::memory_order_acquire)
521 == (std::numeric_limits<std::int64_t>::max)();
522 }
523
524 20115 time_point nearest_expiry() const noexcept override
525 {
526 20115 auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
527 20115 return time_point(time_point::duration(ns));
528 }
529
530 100884 std::size_t process_expired() override
531 {
532 100884 intrusive_list<waiter_node> expired;
533
534 {
535
1/1
✓ Branch 1 taken 100884 times.
100884 std::lock_guard lock(mutex_);
536 100884 auto now = clock_type::now();
537
538
7/7
✓ Branch 1 taken 109017 times.
✓ Branch 2 taken 366 times.
✓ Branch 5 taken 109017 times.
✓ Branch 8 taken 8499 times.
✓ Branch 9 taken 100518 times.
✓ Branch 10 taken 8499 times.
✓ Branch 11 taken 100884 times.
109383 while (!heap_.empty() && heap_[0].time_ <= now)
539 {
540 8499 timer_impl* t = heap_[0].timer_;
541
1/1
✓ Branch 1 taken 8499 times.
8499 remove_timer_impl(*t);
542
2/2
✓ Branch 1 taken 8503 times.
✓ Branch 2 taken 8499 times.
17002 while (auto* w = t->waiters_.pop_front())
543 {
544 8503 w->impl_ = nullptr;
545 8503 w->ec_value_ = {};
546 8503 expired.push_back(w);
547 8503 }
548 8499 t->might_have_pending_waits_ = false;
549 }
550
551 100884 refresh_cached_nearest();
552 100884 }
553
554 100884 std::size_t count = 0;
555
2/2
✓ Branch 1 taken 8503 times.
✓ Branch 2 taken 100884 times.
109387 while (auto* w = expired.pop_front())
556 {
557
1/1
✓ Branch 1 taken 8503 times.
8503 sched_->post(&w->op_);
558 8503 ++count;
559 8503 }
560
561 100884 return count;
562 }
563
564 private:
565 109429 void refresh_cached_nearest() noexcept
566 {
567 109429 auto ns = heap_.empty()
568
2/2
✓ Branch 0 taken 382 times.
✓ Branch 1 taken 109047 times.
109429 ? (std::numeric_limits<std::int64_t>::max)()
569 109047 : heap_[0].time_.time_since_epoch().count();
570 109429 cached_nearest_ns_.store(ns, std::memory_order_release);
571 109429 }
572
573 8517 void remove_timer_impl(timer_impl& impl)
574 {
575 8517 std::size_t index = impl.heap_index_;
576
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 8517 times.
8517 if (index >= heap_.size())
577 return; // Not in heap
578
579
2/2
✓ Branch 1 taken 103 times.
✓ Branch 2 taken 8414 times.
8517 if (index == heap_.size() - 1)
580 {
581 // Last element, just pop
582 103 impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
583 103 heap_.pop_back();
584 }
585 else
586 {
587 // Swap with last and reheapify
588 8414 swap_heap(index, heap_.size() - 1);
589 8414 impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
590 8414 heap_.pop_back();
591
592
2/6
✗ Branch 0 not taken.
✓ Branch 1 taken 8414 times.
✗ Branch 6 not taken.
✗ Branch 7 not taken.
✗ Branch 8 not taken.
✓ Branch 9 taken 8414 times.
8414 if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
593 up_heap(index);
594 else
595 8414 down_heap(index);
596 }
597 }
598
599 8523 void up_heap(std::size_t index)
600 {
601
2/2
✓ Branch 0 taken 8416 times.
✓ Branch 1 taken 8511 times.
16927 while (index > 0)
602 {
603 8416 std::size_t parent = (index - 1) / 2;
604
2/2
✓ Branch 4 taken 12 times.
✓ Branch 5 taken 8404 times.
8416 if (!(heap_[index].time_ < heap_[parent].time_))
605 12 break;
606 8404 swap_heap(index, parent);
607 8404 index = parent;
608 }
609 8523 }
610
611 8414 void down_heap(std::size_t index)
612 {
613 8414 std::size_t child = index * 2 + 1;
614
2/2
✓ Branch 1 taken 4 times.
✓ Branch 2 taken 8410 times.
8414 while (child < heap_.size())
615 {
616 4 std::size_t min_child = (child + 1 == heap_.size() ||
617 heap_[child].time_ < heap_[child + 1].time_)
618
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4 times.
4 ? child : child + 1;
619
620
1/2
✓ Branch 4 taken 4 times.
✗ Branch 5 not taken.
4 if (heap_[index].time_ < heap_[min_child].time_)
621 4 break;
622
623 swap_heap(index, min_child);
624 index = min_child;
625 child = index * 2 + 1;
626 }
627 8414 }
628
629 16818 void swap_heap(std::size_t i1, std::size_t i2)
630 {
631 16818 heap_entry tmp = heap_[i1];
632 16818 heap_[i1] = heap_[i2];
633 16818 heap_[i2] = tmp;
634 16818 heap_[i1].timer_->heap_index_ = i1;
635 16818 heap_[i2].timer_->heap_index_ = i2;
636 16818 }
637 };
638
639 173 timer_impl::
640 173 timer_impl(timer_service_impl& svc) noexcept
641 173 : svc_(&svc)
642 {
643 173 }
644
645 void
646 4 waiter_node::canceller::
647 operator()() const
648 {
649 4 waiter_->svc_->cancel_waiter(waiter_);
650 4 }
651
652 void
653 waiter_node::completion_op::
654 do_complete(
655 void* owner,
656 scheduler_op* base,
657 std::uint32_t,
658 std::uint32_t)
659 {
660 if (!owner)
661 return;
662 static_cast<completion_op*>(base)->operator()();
663 }
664
665 void
666 8539 waiter_node::completion_op::
667 operator()()
668 {
669 8539 auto* w = waiter_;
670 8539 w->stop_cb_.reset();
671
1/2
✓ Branch 0 taken 8539 times.
✗ Branch 1 not taken.
8539 if (w->ec_out_)
672 8539 *w->ec_out_ = w->ec_value_;
673
674 8539 auto h = w->h_;
675 8539 auto d = w->d_;
676 8539 auto* svc = w->svc_;
677 8539 auto& sched = svc->get_scheduler();
678
679
1/1
✓ Branch 1 taken 8539 times.
8539 svc->destroy_waiter(w);
680
681
1/1
✓ Branch 1 taken 8539 times.
8539 d.post(h);
682 8539 sched.on_work_finished();
683 8539 }
684
685 void
686 8775 timer_impl::
687 release()
688 {
689 8775 svc_->destroy_impl(*this);
690 8775 }
691
692 std::coroutine_handle<>
693 8764 timer_impl::
694 wait(
695 std::coroutine_handle<> h,
696 capy::executor_ref d,
697 std::stop_token token,
698 std::error_code* ec)
699 {
700 // Already-expired fast path — no waiter_node, no mutex.
701 // Post instead of dispatch so the coroutine yields to the
702 // scheduler, allowing other queued work to run.
703
2/2
✓ Branch 1 taken 8742 times.
✓ Branch 2 taken 22 times.
8764 if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
704 {
705
3/3
✓ Branch 2 taken 8742 times.
✓ Branch 5 taken 225 times.
✓ Branch 6 taken 8517 times.
8742 if (expiry_ <= clock_type::now())
706 {
707
1/2
✓ Branch 0 taken 225 times.
✗ Branch 1 not taken.
225 if (ec)
708 225 *ec = {};
709 225 d.post(h);
710 225 return std::noop_coroutine();
711 }
712 }
713
714 8539 auto* w = svc_->create_waiter();
715 8539 w->impl_ = this;
716 8539 w->svc_ = svc_;
717 8539 w->h_ = h;
718 8539 w->d_ = std::move(d);
719 8539 w->token_ = std::move(token);
720 8539 w->ec_out_ = ec;
721
722 8539 svc_->insert_waiter(*this, w);
723 8539 might_have_pending_waits_ = true;
724 8539 svc_->get_scheduler().on_work_started();
725
726
2/2
✓ Branch 1 taken 4 times.
✓ Branch 2 taken 8535 times.
8539 if (w->token_.stop_possible())
727 4 w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
728
729 8539 return std::noop_coroutine();
730 }
731
732 // Extern free functions called from timer.cpp
733 //
734 // Two thread-local caches avoid hot-path mutex acquisitions:
735 //
736 // 1. Impl cache — single-slot, validated by comparing svc_ on the
737 // impl against the current service pointer.
738 //
739 // 2. Waiter cache — single-slot, no service affinity.
740 //
741 // The service pointer is obtained from the scheduler_impl's
742 // timer_svc_ member, avoiding find_service() on the hot path.
743 // All caches are cleared by timer_service_invalidate_cache()
744 // during shutdown.
745
746 thread_local_ptr<timer_impl> tl_cached_impl;
747 thread_local_ptr<waiter_node> tl_cached_waiter;
748
749 timer_impl*
750 8775 try_pop_tl_cache(timer_service_impl* svc) noexcept
751 {
752 8775 auto* impl = tl_cached_impl.get();
753
2/2
✓ Branch 0 taken 8602 times.
✓ Branch 1 taken 173 times.
8775 if (impl)
754 {
755 8602 tl_cached_impl.set(nullptr);
756
1/2
✓ Branch 0 taken 8602 times.
✗ Branch 1 not taken.
8602 if (impl->svc_ == svc)
757 8602 return impl;
758 // Stale impl from a destroyed service
759 delete impl;
760 }
761 173 return nullptr;
762 }
763
764 bool
765 8775 try_push_tl_cache(timer_impl* impl) noexcept
766 {
767
2/2
✓ Branch 1 taken 8727 times.
✓ Branch 2 taken 48 times.
8775 if (!tl_cached_impl.get())
768 {
769 8727 tl_cached_impl.set(impl);
770 8727 return true;
771 }
772 48 return false;
773 }
774
775 waiter_node*
776 8539 try_pop_waiter_tl_cache() noexcept
777 {
778 8539 auto* w = tl_cached_waiter.get();
779
2/2
✓ Branch 0 taken 8397 times.
✓ Branch 1 taken 142 times.
8539 if (w)
780 {
781 8397 tl_cached_waiter.set(nullptr);
782 8397 return w;
783 }
784 142 return nullptr;
785 }
786
787 bool
788 8539 try_push_waiter_tl_cache(waiter_node* w) noexcept
789 {
790
2/2
✓ Branch 1 taken 8481 times.
✓ Branch 2 taken 58 times.
8539 if (!tl_cached_waiter.get())
791 {
792 8481 tl_cached_waiter.set(w);
793 8481 return true;
794 }
795 58 return false;
796 }
797
798 void
799 336 timer_service_invalidate_cache() noexcept
800 {
801
2/2
✓ Branch 1 taken 125 times.
✓ Branch 2 taken 211 times.
336 delete tl_cached_impl.get();
802 336 tl_cached_impl.set(nullptr);
803
804
2/2
✓ Branch 1 taken 84 times.
✓ Branch 2 taken 252 times.
336 delete tl_cached_waiter.get();
805 336 tl_cached_waiter.set(nullptr);
806 336 }
807
808 struct timer_service_access
809 {
810 8775 static scheduler_impl& get_scheduler(basic_io_context& ctx) noexcept
811 {
812 8775 return static_cast<scheduler_impl&>(*ctx.sched_);
813 }
814 };
815
816 timer::timer_impl*
817 8775 timer_service_create(capy::execution_context& ctx)
818 {
819
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 8775 times.
8775 if (!ctx.target<basic_io_context>())
820 detail::throw_logic_error();
821 8775 auto& ioctx = static_cast<basic_io_context&>(ctx);
822 auto* svc = static_cast<timer_service_impl*>(
823 8775 timer_service_access::get_scheduler(ioctx).timer_svc_);
824
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8775 times.
8775 if (!svc)
825 detail::throw_logic_error();
826 8775 return svc->create_impl();
827 }
828
829 void
830 8775 timer_service_destroy(timer::timer_impl& base) noexcept
831 {
832 8775 static_cast<timer_impl&>(base).release();
833 8775 }
834
835 timer::time_point
836 34 timer_service_expiry(timer::timer_impl& base) noexcept
837 {
838 34 return static_cast<timer_impl&>(base).expiry_;
839 }
840
841 std::size_t
842 18 timer_service_expires_at(timer::timer_impl& base, timer::time_point t)
843 {
844 18 auto& impl = static_cast<timer_impl&>(base);
845 18 impl.expiry_ = t;
846 18 return impl.svc_->update_timer(impl, t);
847 }
848
849 std::size_t
850 8764 timer_service_expires_after(timer::timer_impl& base, timer::duration d)
851 {
852 8764 auto& impl = static_cast<timer_impl&>(base);
853
1/1
✓ Branch 2 taken 8764 times.
8764 impl.expiry_ = timer::clock_type::now() + d;
854 8764 return impl.svc_->update_timer(impl, impl.expiry_);
855 }
856
857 std::size_t
858 20 timer_service_cancel(timer::timer_impl& base) noexcept
859 {
860 20 auto& impl = static_cast<timer_impl&>(base);
861 20 return impl.svc_->cancel_timer(impl);
862 }
863
864 std::size_t
865 4 timer_service_cancel_one(timer::timer_impl& base) noexcept
866 {
867 4 auto& impl = static_cast<timer_impl&>(base);
868 4 return impl.svc_->cancel_one_waiter(impl);
869 }
870
871 timer_service&
872 336 get_timer_service(capy::execution_context& ctx, scheduler& sched)
873 {
874 336 return ctx.make_service<timer_service_impl>(sched);
875 }
876
877 } // namespace boost::corosio::detail
878