ecor
Loading...
Searching...
No Matches
ecor.hpp
1
22
23#pragma once
24
25#include <atomic>
26#include <bit>
27#include <concepts>
28#include <coroutine>
29#include <cstddef>
30#include <cstdint>
31#include <cstring>
32#include <iterator>
33#include <limits>
34#include <memory>
35#include <new>
36#include <optional>
37#include <span>
38#include <type_traits>
39#include <utility>
40#include <variant>
41#include <zll.hpp>
42
43#ifdef ECOR_DEFAULT_ASSERT
44
45#include <cassert>
46#define ECOR_ASSERT( expr ) assert( expr )
47
48#else
49
50#ifndef ECOR_ASSERT
51#define ECOR_ASSERT( expr ) ( (void) ( ( expr ) ) )
52#endif
53
54#endif
55
56namespace ecor
57{
58
59template < typename T >
60struct _tag
61{
62};
63
64template < typename T, typename... Ts >
65struct _contains_type;
66
67template < typename T, typename U, typename... Ts >
68 requires( std::same_as< T, U > )
69struct _contains_type< T, U, Ts... >
70{
71 static constexpr bool value = true;
72};
73
74template < typename T, typename U, typename... Ts >
75 requires( !std::same_as< T, U > )
76struct _contains_type< T, U, Ts... > : _contains_type< T, Ts... >
77{
78};
79
80template < typename T >
81struct _contains_type< T >
82{
83 static constexpr bool value = false;
84};
85
88{
89};
90
93{
94};
95
98{
99};
100
103{
104};
105
108{
109};
110
113{
114};
115
117struct _get_stopped_t
118{
119};
120
123template < typename... S >
125{
126};
127
129template < template < typename... > class C, typename T >
130struct _apply_to_sigs;
131
132template < template < typename... > class C, typename... Ts >
133struct _apply_to_sigs< C, completion_signatures< Ts... > >
134{
135 using type = C< Ts... >;
136};
137
138template < template < typename... > class C, typename T >
139using _apply_to_sigs_t = typename _apply_to_sigs< C, T >::type;
140
143{
144};
145
148{
149};
150
153template < typename S, typename R >
154using connect_type = decltype( std::move( std::declval< S >() ).connect( std::declval< R >() ) );
155
156
158template < typename E >
160{
161 using type = std::remove_cvref_t< E >;
162 E error;
163
164 explicit with_error( E e )
165 : error( std::move( e ) )
166 {
167 }
168};
169
170template < typename T >
171struct _is_signature : std::false_type
172{
173};
174
175template < typename... Args >
176struct _is_signature< set_value_t( Args... ) > : std::true_type
177{
178};
179template < typename Err >
180struct _is_signature< set_error_t( Err ) > : std::true_type
181{
182};
183template <>
184struct _is_signature< set_stopped_t() > : std::true_type
185{
186};
187template <>
188struct _is_signature< _get_stopped_t() > : std::true_type
189{
190};
191
195template < typename T >
196concept signature = _is_signature< T >::value;
197
198template < typename T >
199struct _value_setter
200{
201 using type = set_value_t( T );
202};
203
204template <>
205struct _value_setter< void >
206{
207 using type = set_value_t();
208};
209
212template < typename T >
213using _value_setter_t = typename _value_setter< T >::type;
214
215template < typename... S >
216struct _multivalue_value_signature : std::false_type
217{
218};
219
220template < typename T, typename U, typename... Ts >
221struct _multivalue_value_signature< set_value_t( T, U, Ts... ) > : std::true_type
222{
223};
224
227template < typename... S >
228concept all_value_signatures_singular = ( ... && !_multivalue_value_signature< S >::value );
229
230template < typename T >
231struct _is_all_singular;
232
233template < typename... S >
234struct _is_all_singular< completion_signatures< S... > >
235{
236 static constexpr bool value = all_value_signatures_singular< S... >;
237};
238
241template < typename Sigs >
242static constexpr bool is_all_singular_v = _is_all_singular< Sigs >::value;
243
244
245// ------------------------------------------------------------------------------
246
247template < typename T >
248constexpr T _align_idx( uint8_t const* buff, T idx, std::size_t align ) noexcept
249{
250 auto* p = buff + idx;
251 auto a = ( (uintptr_t) p ) % align;
252 if ( a )
253 p += align - a;
254 return (T) ( p - buff );
255}
256
257template < std::size_t N >
258constexpr auto _pick_index_type() noexcept
259{
260 if constexpr ( N <= 0xff )
261 return uint8_t{};
262 else if constexpr ( N <= 0xffff )
263 return uint16_t{};
264 else if constexpr ( N <= 0xffffffff )
265 return uint32_t{};
266 else
267 return;
268}
269
271template < std::size_t N >
272using smallest_index_type = decltype( _pick_index_type< N >() );
273
274
275struct _allocate_t
276{
277 template < typename T >
278 requires( requires( T x ) { x.allocate( std::size_t{}, std::size_t{} ); } )
279 void* operator()( T& x, std::size_t bytes, std::size_t align ) const
280 noexcept( noexcept( x.allocate( bytes, align ) ) )
281 {
282 return x.allocate( bytes, align );
283 }
284};
286static constexpr _allocate_t allocate{};
287
288
289struct _deallocate_t
290{
291 template < typename T >
292 requires( requires( T x, void* p, std::size_t b, std::size_t a ) {
293 x.deallocate( p, b, a );
294 } )
295 void operator()( T& x, void* p, std::size_t bytes, std::size_t align ) const
296 noexcept( noexcept( x.deallocate( p, bytes, align ) ) )
297 {
298 x.deallocate( p, bytes, align );
299 }
300};
302static constexpr _deallocate_t deallocate{};
303
305template < typename T >
306concept memory_resource = requires( T a, std::size_t bytes, std::size_t align, void* p ) {
307 { allocate( a, bytes, align ) } -> std::same_as< void* >;
308 { deallocate( a, p, bytes, align ) } -> std::same_as< void >;
309};
310
311
315template <
316 typename T,
317 std::size_t Extent = std::dynamic_extent,
318 typename Deleter = std::default_delete< T[] > >
319struct unique_span : std::span< T, Extent >
320{
321 using span_type = std::span< T, Extent >;
322
323 unique_span() noexcept = default;
324
326 unique_span( T* data, std::size_t size, Deleter deleter = Deleter{} ) noexcept(
327 std::is_nothrow_move_constructible_v< Deleter > )
328 : span_type( data, size )
329 , _deleter( std::move( deleter ) )
330 {
331 }
332
333 unique_span( unique_span const& ) = delete;
334 unique_span& operator=( unique_span const& ) = delete;
335
337 unique_span( unique_span&& other ) noexcept
338 : span_type( other.data(), other.size() )
339 , _deleter( std::move( other._deleter ) )
340 {
341 *(span_type*) &other = span_type{};
342 }
343
345 unique_span& operator=( unique_span&& other ) noexcept
346 {
347 if ( this != &other ) {
348 if ( this->data() )
349 _deleter( this->data() );
350 *(span_type*) this = span_type( other.data(), other.size() );
351 _deleter = std::move( other._deleter );
352 *(span_type*) &other = span_type{};
353 }
354 return *this;
355 }
356
359 std::span< T, Extent > release() noexcept
360 {
361 T* data = this->data();
362 std::size_t size = this->size();
363 *(span_type*) this = span_type{};
364 return span_type( data, size );
365 }
366
369 {
370 if ( this->data() )
371 _deleter( this->data() );
372 }
373
374private:
375 [[no_unique_address]] Deleter _deleter;
376};
377
396template < typename IndexType, typename Base = noop_base >
398{
399 static_assert(
400 std::is_unsigned< IndexType >::value,
401 "IndexType must be an unsigned integer type" );
402
404 using index_type = IndexType;
405
406 template < std::size_t N >
407 requires( std::numeric_limits< IndexType >::max() >= N )
408 constexpr circular_buffer_memory( std::span< uint8_t, N > b ) noexcept
409 : _buff( std::move( b ) )
410 {
411 }
412 template < std::size_t N >
413 requires( std::numeric_limits< IndexType >::max() >= N )
414 constexpr circular_buffer_memory( uint8_t ( &b )[N] ) noexcept
415 : _buff( b )
416 {
417 }
418
419 struct _deleter
420 {
422
423 template < typename T >
424 void operator()( T* p ) noexcept
425 {
426 if ( p ) {
427 p->~T();
428 mem->deallocate( p );
429 }
430 }
431 };
432
435 template < typename T >
436 using uptr = std::unique_ptr< T, _deleter >;
437
445 template < typename T, typename... Args >
446 uptr< T > make( Args&&... args )
447 {
448 void* p = allocate( sizeof( T ), alignof( T ) );
449 if ( !p )
450 return { nullptr, _deleter{ this } };
451 T* t = new ( p ) T( (Args&&) args... );
452 return { t, _deleter{ this } };
453 }
454
457 template < typename T, std::size_t Extent = std::dynamic_extent >
459
466 template < typename T >
467 uspan< T > make_span( std::size_t n )
468 {
469 void* p = allocate( sizeof( T ) * n, alignof( T ) );
470 if ( !p )
471 return uspan< T >{ nullptr, 0, _deleter{ this } };
472 auto* pp = new ( p ) T[n]();
473 return uspan< T >{ pp, n, _deleter{ this } };
474 }
475
478 template < typename T, std::size_t Extent >
480 {
481 constexpr std::size_t n = Extent;
482 auto sp = make_span< T >( n );
483 return sp;
484 }
485
492 [[nodiscard]] void* allocate( std::size_t bytes, std::size_t align ) noexcept
493 {
494 auto* p = _allocate( _buff, bytes, align );
495 return p;
496 }
497
499 void deallocate( void* p, std::size_t bytes, std::size_t align ) noexcept
500 {
501 std::ignore = bytes;
502 std::ignore = align;
503 _deallocate( _buff.data(), (uint8_t*) p );
504 }
505
508 void deallocate( void* p ) noexcept
509 {
510 _deallocate( _buff.data(), (uint8_t*) p );
511 }
512
514 [[nodiscard]] std::size_t capacity() const noexcept
515 {
516 return _buff.size();
517 }
518
520 [[nodiscard]] std::size_t used_bytes() const noexcept
521 {
522 if ( _first == npos )
523 return 0;
524 if ( _first <= _last )
525 return ( _next - _first );
526 return ( _buff.size() - _first ) + _next;
527 }
528
530 static constexpr index_type npos = std::numeric_limits< index_type >::max();
531
537 struct _node
538 {
539 index_type next_idx = npos;
540 index_type prev_idx = npos;
541 };
542
543private:
546
547 // Pick index where to allocate `bytes` with `align`, returns npos if no space
548 // is available. The returned index points to the node header, the actual data
549 // starts after sizeof(node).
550 [[nodiscard]] index_type _pick_index(
551 std::span< uint8_t > buff,
552 std::size_t bytes,
553 std::size_t align ) const noexcept
554 {
555 index_type idx = npos;
556 int capacity = 0;
557 if ( _last == npos ) {
558 // Empty buffer
559 ECOR_ASSERT( _first == npos && _last == npos );
560 idx = _align_idx( buff.data(), sizeof( _node ), align );
561 capacity = buff.size() - idx;
562 } else if ( _first <= _next ) {
563 // Non-overflow state: [ ][x][x][x][ ][ ]
564 {
565 idx = _align_idx( buff.data(), _next + sizeof( _node ), align );
566 capacity = buff.size() - idx;
567 if ( idx < buff.size() && capacity >= (int) bytes )
568 return idx - sizeof( _node );
569 }
570 // Non-overflow state, but overflow triggered: [ ][ ][ ][x][x][x]
571 {
572 idx = _align_idx( buff.data(), sizeof( _node ), align );
573 capacity = idx - _first;
574 }
575
576 } else { // _first > _next
577 // Overflow state: [x][x][ ][ ][x][x]
578 idx = _align_idx( buff.data(), _next + sizeof( _node ), align );
579 capacity = _first - idx;
580 }
581 if ( idx < buff.size() && capacity >= (int) bytes )
582 return idx - sizeof( _node );
583 return npos;
584 }
585
586 void _set_node( uint8_t* buff, index_type idx, _node const& n ) noexcept
587 {
588 std::memcpy( buff + idx, &n, sizeof( _node ) );
589 }
590
591 void* _allocate( std::span< uint8_t > buff, std::size_t bytes, std::size_t align ) noexcept
592 {
593 auto idx = _pick_index( buff, bytes, align );
594 if ( idx == npos )
595 return nullptr;
596 auto* p = buff.data() + idx;
597 if ( _first == npos ) {
598 _first = idx;
599 } else {
600 auto nn = _get_node( buff.data(), _last );
601 nn.next_idx = idx;
602 _set_node( buff.data(), _last, nn );
603 }
604 _node const n{
605 .next_idx = npos,
606 .prev_idx = _last,
607 };
608 _last = idx;
609 _next = idx + sizeof( _node ) + bytes;
610 _set_node( buff.data(), idx, n );
611 return p + sizeof( _node );
612 }
613
614 void _deallocate( uint8_t* buff, void* p ) noexcept
615 {
616 auto* pp = (uint8_t*) p - sizeof( _node );
617 index_type const idx = pp - buff;
618 _node const n = _get_node( buff, idx );
619
620 if ( _first == idx ) {
621 _first = n.next_idx;
622 } else {
623 auto pn = _get_node( buff, n.prev_idx );
624 pn.next_idx = n.next_idx;
625 _set_node( buff, n.prev_idx, pn );
626 }
627
628 if ( _last == idx ) {
629 _last = n.prev_idx;
630 _next = idx;
631 } else {
632 auto nn = _get_node( buff, n.next_idx );
633 nn.prev_idx = n.prev_idx;
634 _set_node( buff, n.next_idx, nn );
635 }
636
637 // If buffer is now empty, reset _next to npos as well
638 if ( _first == npos && _last == npos )
639 _next = npos;
640
641 _set_node( buff, idx, _node{ .next_idx = 0, .prev_idx = 0 } );
642 }
643
644public:
649 _node _get_node( uint8_t* buff, index_type idx ) const noexcept
650 {
651 _node n;
652 std::memcpy( &n, buff + idx, sizeof( _node ) );
653 return n;
654 }
655
658 index_type _first = npos;
661 index_type _last = npos;
664 index_type _next = npos;
665
666private:
667 std::span< uint8_t > _buff;
668};
669
679template < typename T, typename IndexType, typename Base >
681{
682 using value_type = T;
683 using size_type = std::size_t;
684 using difference_type = std::ptrdiff_t;
685
686 // Required for C++17 allocator requirements
687 template < typename U >
688 struct rebind
689 {
691 };
692
696 : _buffer( &buffer )
697 {
698 }
699
703
706 template < typename U >
709 : _buffer( other._buffer )
710 {
711 }
712
720 T* allocate( std::size_t n )
721 {
722 if ( n == 0 )
723 return nullptr;
724
725 std::size_t bytes = n * sizeof( T );
726 void* ptr = _buffer->allocate( bytes, alignof( T ) );
727
728 if ( !ptr )
729 throw std::bad_alloc();
730
731 return static_cast< T* >( ptr );
732 }
733
738 void deallocate( T* ptr, std::size_t n )
739 {
740 if ( ptr )
741 _buffer->deallocate( ptr, n * sizeof( T ), alignof( T ) );
742 }
743
746 bool operator==( circular_buffer_allocator const& other ) const noexcept
747 {
748 return _buffer == other._buffer;
749 }
750
752 bool operator!=( circular_buffer_allocator const& other ) const noexcept
753 {
754 return !( *this == other );
755 }
756
759 template < typename U, typename IdxType, typename Base2 >
761
762private:
764};
765
766// ------------------------------------------------------------------------------
767
768struct _get_env_t
769{
770 template < typename T >
771 decltype( auto ) operator()( T const& o ) const noexcept
772 {
773 static constexpr bool v = requires( T const& o ) {
774 { o.get_env() };
775 };
776 if constexpr ( v )
777 return o.get_env();
778 else
779 return empty_env{};
780 }
781};
784inline constexpr _get_env_t get_env{};
785
788template < typename T >
789using env_type = decltype( get_env( std::declval< T const& >() ) );
790
791struct _get_completion_signatures_t
792{
793 template < typename S, typename E >
794 decltype( auto ) operator()( S& s, E& e ) const
795 {
796 static constexpr bool has_member = requires( S& s, E& e ) {
797 { s.get_completion_signatures( e ) };
798 };
799 if constexpr ( has_member )
800 return s.get_completion_signatures( e );
801 else {
802 using sigs = typename S::completion_signatures;
803 return sigs{};
804 }
805 }
806};
811inline constexpr _get_completion_signatures_t get_completion_signatures{};
812
814template < typename S, typename Env >
815using _sender_completions_t =
816 decltype( get_completion_signatures( std::declval< S& >(), std::declval< Env& >() ) );
817
821template < typename T >
822concept queryable = std::is_object_v< T >;
823
827template < typename T >
828concept sender = std::derived_from< typename std::remove_cvref_t< T >::sender_concept, sender_t > &&
829 requires( T const& s ) {
830 { get_env( s ) } -> queryable;
831 } && std::move_constructible< std::remove_cvref_t< T > > &&
832 std::constructible_from< std::remove_cvref_t< T >, T >;
833
837template < typename T >
838concept receiver =
839 std::derived_from< typename std::remove_cvref_t< T >::receiver_concept, receiver_t > &&
840 requires( T const& r ) {
841 { get_env( r ) } -> queryable;
842 } && std::move_constructible< std::remove_cvref_t< T > > &&
843 std::constructible_from< std::remove_cvref_t< T >, T >;
844
845template < typename S, typename R >
846concept _connectable = sender< S > && receiver< R > && requires( S&& s, R&& r ) {
847 { std::move( s ).connect( std::move( r ) ) };
848};
849
850struct _connect_t
851{
852 template < typename S, typename R >
853 requires _connectable< S, R >
854 auto operator()( S&& s, R&& r ) const
855 noexcept( noexcept( std::move( s ).connect( std::move( r ) ) ) )
856 {
857 return std::move( s ).connect( std::move( r ) );
858 }
859};
861inline constexpr _connect_t connect{};
862
863template < typename R, typename T >
864struct _receiver_sig_callable;
865
866template < typename R, typename... Args >
867struct _receiver_sig_callable< R, set_value_t( Args... ) >
868{
869 static constexpr bool value =
870 requires( R& r, Args&&... args ) { r.set_value( (Args&&) args... ); };
871};
872
873template < typename R, typename Err >
874struct _receiver_sig_callable< R, set_error_t( Err ) >
875{
876 static constexpr bool value = requires( R& r, Err&& err ) { r.set_error( (Err&&) err ); };
877};
878
879template < typename R >
880struct _receiver_sig_callable< R, set_stopped_t() >
881{
882 static constexpr bool value = requires( R& r ) { r.set_stopped(); };
883};
884
885template < typename R, typename Sigs >
886struct _receiver_for_impl;
887
888template < typename R, typename... S >
889struct _receiver_for_impl< R, completion_signatures< S... > >
890{
891 static constexpr bool value = ( ... && _receiver_sig_callable< R, S >::value );
892};
893
898template < typename R, typename S >
900 receiver< R > && _receiver_for_impl< R, _sender_completions_t< S, env_type< R > > >::value;
901
908template < typename R, typename... Sig >
909concept receiver_for_sigs = receiver< R > && ( ... && _receiver_sig_callable< R, Sig >::value );
910
918template < signature S >
919struct _sig_vtable_row;
920
921template < typename... Args >
922struct _sig_vtable_row< set_value_t( Args... ) >
923{
924
925 using f_t = void ( * )( void*, Args... );
926 f_t _set_value;
927
928 template < typename D, typename B >
929 constexpr _sig_vtable_row( _tag< D >, _tag< B > ) noexcept
930 : _set_value{ +[]( void* self, Args... args ) {
931 static_cast< D* >( static_cast< B* >( self ) )->set_value( (Args&&) args... );
932 } }
933 {
934 }
935
936 void invoke( set_value_t, void* self, Args... args ) const
937 {
938 _set_value( self, (Args&&) args... );
939 }
940};
941
942template < typename... Args >
943struct _sig_vtable_row< set_error_t( Args... ) >
944{
945 using f_t = void ( * )( void*, Args... );
946 f_t _set_error;
947
948 template < typename D, typename B >
949 constexpr _sig_vtable_row( _tag< D >, _tag< B > ) noexcept
950 : _set_error{ +[]( void* self, Args... args ) {
951 static_cast< D* >( static_cast< B* >( self ) )->set_error( (Args&&) args... );
952 } }
953 {
954 }
955
956 void invoke( set_error_t, void* self, Args... args ) const
957 {
958 _set_error( self, (Args&&) args... );
959 }
960};
961
962template <>
963struct _sig_vtable_row< set_stopped_t() >
964{
965 using f_t = void ( * )( void* );
966 f_t _set_stopped;
967
968 template < typename D, typename B >
969 constexpr _sig_vtable_row( _tag< D >, _tag< B > ) noexcept
970 : _set_stopped{ +[]( void* self ) {
971 static_cast< D* >( static_cast< B* >( self ) )->set_stopped();
972 } }
973 {
974 }
975
976 void invoke( set_stopped_t, void* self ) const
977 {
978 _set_stopped( self );
979 }
980};
981
982template <>
983struct _sig_vtable_row< _get_stopped_t() >
984{
985 using f_t = bool ( * )( void* );
986 f_t _get_stopped;
987
988 template < typename D, typename B >
989 constexpr _sig_vtable_row( _tag< D >, _tag< B > ) noexcept
990 : _get_stopped{ +[]( void* self ) {
991 return static_cast< D* >( static_cast< B* >( self ) )->get_stopped();
992 } }
993 {
994 }
995
996 bool invoke( _get_stopped_t, void* self ) const
997 {
998 return _get_stopped( self );
999 }
1000};
1001
1005template < signature... S >
1006struct _sig_vtable : _sig_vtable_row< S >...
1007{
1008 template < typename D, typename B >
1009 constexpr _sig_vtable( _tag< D > dt, _tag< B > bt ) noexcept
1010 : _sig_vtable_row< S >( dt, bt )...
1011 {
1012 }
1013
1014 using _sig_vtable_row< S >::invoke...;
1015};
1016
1019template < typename D, typename B, typename VTable >
1020static constexpr VTable _vtable_of = { _tag< D >{}, _tag< B >{} };
1021
1023template < typename VTable, typename... Args >
1024concept _vtable_can_call_value = requires( VTable v, void* self, Args&&... args ) {
1025 ( v.invoke( set_value_t{}, self, (Args&&) args... ) );
1026};
1027
1029template < typename VTable, typename Arg >
1030concept _vtable_can_call_error = requires( VTable v, void* self, Arg&& arg ) {
1031 ( v.invoke( set_error_t{}, self, (Arg&&) arg ) );
1032};
1033
1035template < typename VTable >
1036concept _vtable_can_call_stopped =
1037 requires( VTable v, void* self ) { ( v.invoke( set_stopped_t{}, self ) ); };
1038
1042template < signature... S >
1043struct _vtable_mixin
1044{
1045 using _vtable = _sig_vtable< S... >;
1046
1047 template < typename D >
1048 _vtable_mixin( _tag< D > ) noexcept
1049 : vtable( _vtable_of< D, _vtable_mixin< S... >, _vtable > )
1050 {
1051 static_assert(
1052 std::derived_from< D, _vtable_mixin< S... > >,
1053 "Derived type must derive from _vtable_mixin with the same signature list" );
1054 }
1055
1056 template < typename... Args >
1057 void _set_value( Args... args )
1058 {
1059 vtable.invoke( set_value_t{}, this, (Args&&) args... );
1060 }
1061
1062 template < typename... Args >
1063 void _set_error( Args... args )
1064 {
1065 vtable.invoke( set_error_t{}, this, (Args&&) args... );
1066 }
1067
1068 void _set_stopped()
1069 {
1070 vtable.invoke( set_stopped_t{}, this );
1071 }
1072
1073 bool _get_stopped() noexcept
1074 {
1075 return vtable.invoke( _get_stopped_t{}, this );
1076 }
1077
1078private:
1079 _vtable const& vtable;
1080};
1081
1083
1084
1100template < typename TL, typename Tag, typename Sigs, template < class... > class Tuple >
1101struct _filter_map_tag;
1102
1103template <
1104 template < class... >
1105 class Variant,
1106 typename... S,
1107 typename Tag,
1108 template < class... >
1109 class Tuple >
1110struct _filter_map_tag< Variant< S... >, Tag, completion_signatures<>, Tuple >
1111{
1112 using type = Variant< S... >;
1113};
1114
1115template <
1116 template < class... >
1117 class Variant,
1118 typename... S,
1119 typename Tag,
1120 typename... Us,
1121 typename... Ts,
1122 template < class... >
1123 class Tuple >
1124struct _filter_map_tag< Variant< S... >, Tag, completion_signatures< Tag( Us... ), Ts... >, Tuple >
1125 : _filter_map_tag< Variant< S..., Tuple< Us... > >, Tag, completion_signatures< Ts... >, Tuple >
1126{
1127};
1128
1129template <
1130 template < class... >
1131 class Variant,
1132 typename... S,
1133 typename Tag,
1134 typename T,
1135 typename... Ts,
1136 template < class... >
1137 class Tuple >
1138struct _filter_map_tag< Variant< S... >, Tag, completion_signatures< T, Ts... >, Tuple >
1139 : _filter_map_tag< Variant< S... >, Tag, completion_signatures< Ts... >, Tuple >
1140{
1141};
1142
1143template < typename T >
1144using _type_identity_t = T;
1145
1148template < typename Tag >
1149struct _sig_generator_t
1150{
1151 template < typename... Args >
1152 using type = Tag( Args... );
1153};
1154
1155template < typename Sigs >
1156using _filter_values_as_variant_tuple_t =
1157 typename _filter_map_tag< std::variant<>, set_value_t, Sigs, std::tuple >::type;
1158
1159template < typename Sigs >
1160using _filter_erros_as_variant_t =
1161 typename _filter_map_tag< std::variant<>, set_error_t, Sigs, _type_identity_t >::type;
1162
1163template < typename Sigs >
1164using _filter_values_of_t = typename _filter_map_tag<
1165 completion_signatures<>,
1166 set_value_t,
1167 Sigs,
1168 _sig_generator_t< set_value_t >::type >::type;
1169
1170template < typename Sigs >
1171using _filter_errors_of_t = typename _filter_map_tag<
1172 completion_signatures<>,
1173 set_error_t,
1174 Sigs,
1175 _sig_generator_t< set_error_t >::type >::type;
1176
1177template < typename Sigs >
1178using _filter_stopped_of_t = typename _filter_map_tag<
1179 completion_signatures<>,
1180 set_stopped_t,
1181 Sigs,
1182 _sig_generator_t< set_stopped_t >::type >::type;
1183
1184template < typename T, typename U >
1185struct _sigs_contains_type;
1186
1187template < typename T, typename... Ts >
1188struct _sigs_contains_type< T, completion_signatures< Ts... > > : _contains_type< T, Ts... >
1189{
1190};
1191
1193template < typename U >
1194concept _sigs_contains_set_stopped = _sigs_contains_type< set_stopped_t(), U >::value;
1195
1196template < typename Sigs1, typename Sigs2 >
1197struct _sig_is_underset_of_impl;
1198
1199template < typename... S1, typename... S2 >
1200struct _sig_is_underset_of_impl< completion_signatures< S1... >, completion_signatures< S2... > >
1201{
1202 static constexpr bool value = ( _contains_type< S1, S2... >::value && ... && true );
1203};
1204
1206template < typename Sigs1, typename Sigs2 >
1207concept _sig_is_underset_of = _sig_is_underset_of_impl< Sigs1, Sigs2 >::value;
1208
1209
1210template < typename O, typename... Ts >
1211struct _sigs_merge_impl;
1212
1213template < typename... S1, typename S, typename... S2, typename... Ts >
1214 requires( !_contains_type< S, S1... >::value )
1215struct _sigs_merge_impl< completion_signatures< S1... >, completion_signatures< S, S2... >, Ts... >
1216 : _sigs_merge_impl< completion_signatures< S1..., S >, completion_signatures< S2... >, Ts... >
1217{
1218};
1219
1220template < typename... S1, typename S, typename... S2, typename... Ts >
1221 requires( _contains_type< S, S1... >::value )
1222struct _sigs_merge_impl< completion_signatures< S1... >, completion_signatures< S, S2... >, Ts... >
1223 : _sigs_merge_impl< completion_signatures< S1... >, completion_signatures< S2... >, Ts... >
1224{
1225};
1226
1227template < typename... S1, typename... Ts >
1228struct _sigs_merge_impl< completion_signatures< S1... >, completion_signatures<>, Ts... >
1229 : _sigs_merge_impl< completion_signatures< S1... >, Ts... >
1230{
1231};
1232template < typename... S1 >
1233struct _sigs_merge_impl< completion_signatures< S1... > >
1234{
1235 using type = completion_signatures< S1... >;
1236};
1237
1240template < typename... S >
1241using _sigs_merge_t = typename _sigs_merge_impl< completion_signatures<>, S... >::type;
1242
1243template < typename S, typename... Ts >
1244struct _sigs_append_impl;
1245
1246template < typename... S, typename... Ts >
1247struct _sigs_append_impl< completion_signatures< S... >, Ts... >
1248{
1249 using type = completion_signatures< S..., Ts... >;
1250};
1251
1253template < typename Sigs, typename... Ts >
1254using _sigs_append_t = typename _sigs_append_impl< Sigs, Ts... >::type;
1255
1256template < typename S, typename... Ts >
1257struct _sigs_concat_impl;
1258
1259template < typename... S >
1260struct _sigs_concat_impl< completion_signatures< S... > >
1261{
1262 using type = completion_signatures< S... >;
1263};
1264
1265template < typename... S1, typename... S2, typename... Ts >
1266struct _sigs_concat_impl< completion_signatures< S1... >, completion_signatures< S2... >, Ts... >
1267 : _sigs_concat_impl< completion_signatures< S1..., S2... >, Ts... >
1268{
1269};
1270
1273template < typename... Sigs >
1274using _sigs_concat_t = typename _sigs_concat_impl< completion_signatures<>, Sigs... >::type;
1275
1277
1284template < class Token >
1285concept stoppable_token = requires( Token const tok ) {
1286 { tok.stop_requested() } noexcept -> std::same_as< bool >;
1287 { tok.stop_possible() } noexcept -> std::same_as< bool >;
1288 { Token( tok ) } noexcept;
1289} && std::copyable< Token > && std::equality_comparable< Token > && std::swappable< Token >;
1290
1293template < class Token >
1295 stoppable_token< Token > && requires { requires( !Token{}.stop_possible() ); };
1296
1302template < class Source >
1303concept stoppable_source = requires( Source& src, Source const csrc ) {
1304 { csrc.get_token() } -> stoppable_token;
1305 { csrc.stop_possible() } noexcept -> std::same_as< bool >;
1306 { csrc.stop_requested() } noexcept -> std::same_as< bool >;
1307 { src.request_stop() } -> std::same_as< bool >;
1308};
1309
1312struct inplace_stop_token;
1313
1317template < typename CallbackFn >
1319
1320struct _inplace_stop_callback_base : zll::ll_base< _inplace_stop_callback_base >
1321{
1322 virtual void _execute() = 0;
1323
1324 virtual ~_inplace_stop_callback_base() = default;
1325};
1326
1332{
1333 inplace_stop_source() noexcept = default;
1334
1335 inplace_stop_source( inplace_stop_source const& ) = delete;
1336 inplace_stop_source& operator=( inplace_stop_source const& ) = delete;
1337
1341 [[nodiscard]] inplace_stop_token get_token() const noexcept;
1342
1345 [[nodiscard]] bool stop_possible() const noexcept
1346 {
1347 return true;
1348 }
1349
1352 [[nodiscard]] bool stop_requested() const noexcept
1353 {
1354 return _stopped;
1355 }
1356
1361 {
1362 if ( _stopped )
1363 return false;
1364 _stopped = true;
1365
1366 while ( !_callbacks.empty() ) {
1367 auto& cb = _callbacks.front();
1368 cb._execute();
1369 if ( !_callbacks.empty() && &_callbacks.front() == &cb )
1370 zll::detach( cb );
1371 }
1372
1373 return true;
1374 }
1375
1376private:
1377 bool _stopped = false;
1378
1379 // XXX: veve oficially hates that standard requires this to be mutable
1380 mutable zll::ll_list< _inplace_stop_callback_base > _callbacks;
1381
1382 template < typename CallbackFn >
1383 friend struct inplace_stop_callback;
1384};
1385
1387{
1388 inplace_stop_token() noexcept = default;
1389
1393 [[nodiscard]] bool stop_requested() const noexcept
1394 {
1395 return _source && _source->stop_requested();
1396 }
1397
1401 [[nodiscard]] constexpr bool stop_possible() const noexcept
1402 {
1403 return _source != nullptr;
1404 }
1405
1408 friend bool
1409 operator==( inplace_stop_token const& lhs, inplace_stop_token const& rhs ) noexcept
1410 {
1411 return lhs._source == rhs._source;
1412 }
1413
1415 friend bool
1416 operator!=( inplace_stop_token const& lhs, inplace_stop_token const& rhs ) noexcept
1417 {
1418 return lhs._source != rhs._source;
1419 }
1420
1421 template < typename CallbackFn >
1422 using callback_type = inplace_stop_callback< CallbackFn >;
1423
1424private:
1425 inplace_stop_source const* _source = nullptr;
1426
1427 inplace_stop_token( inplace_stop_source const* source ) noexcept
1428 : _source( source )
1429 {
1430 }
1431
1432 friend struct inplace_stop_source;
1433
1434 template < typename CallbackFn >
1435 friend struct inplace_stop_callback;
1436};
1437
1438inline inplace_stop_token inplace_stop_source::get_token() const noexcept
1439{
1440 return inplace_stop_token{ this };
1441}
1442
1453template < typename CallbackFn >
1454struct inplace_stop_callback : _inplace_stop_callback_base
1455{
1456 template < typename Initializer >
1457 explicit inplace_stop_callback( inplace_stop_token st, Initializer&& init ) noexcept(
1458 std::is_nothrow_constructible_v< CallbackFn, Initializer > &&
1459 std::is_nothrow_invocable_v< CallbackFn > )
1460 : _callback_fn( (Initializer&&) init )
1461 , _source( st._source )
1462 {
1463 if ( _source && _source->stop_possible() ) {
1464 if ( _source->stop_requested() )
1465 ( (CallbackFn&&) _callback_fn )();
1466 else
1467 _source->_callbacks.link_back( *this );
1468 }
1469 }
1470
1473 inplace_stop_callback& operator=( inplace_stop_callback&& ) = delete;
1474 inplace_stop_callback& operator=( inplace_stop_callback const& ) = delete;
1475
1476 void _execute() override
1477 {
1478 ( (CallbackFn&&) _callback_fn )();
1479 }
1480
1481private:
1482 CallbackFn _callback_fn;
1483 inplace_stop_source const* _source;
1484};
1485
1486template < typename CallbackFn >
1488
1494{
1495 struct cb_type
1496 {
1497 explicit cb_type( never_stop_token, auto&& ) noexcept
1498 {
1499 }
1500 };
1501
1502public:
1503 template < class >
1504 using callback_type = cb_type;
1505
1506 static constexpr bool stop_requested() noexcept
1507 {
1508 return false;
1509 }
1510 static constexpr bool stop_possible() noexcept
1511 {
1512 return false;
1513 }
1514
1515 bool operator==( never_stop_token const& ) const = default;
1516};
1517
1519{
1520 template < typename T >
1521 decltype( auto ) operator()( T const& env ) const noexcept
1522 {
1523 static constexpr bool v = requires( get_stop_token_t t ) {
1524 { env.query( t ) };
1525 }; // namespace ecor
1526 if constexpr ( v )
1527 return env.query( *this );
1528 else
1529 return never_stop_token{};
1530 }
1531};
1534inline constexpr get_stop_token_t get_stop_token{};
1535
1539template < typename Token = inplace_stop_token >
1541{
1542 Token _token;
1543
1544 [[nodiscard]] decltype( auto ) query( get_stop_token_t ) const noexcept
1545 {
1546 return _token;
1547 }
1548};
1549
1551
1553template < signature... S >
1554struct _ll_entry : _vtable_mixin< S... >, zll::ll_base< _ll_entry< S... > >
1555{
1556 template < typename D >
1557 _ll_entry( _tag< D > ) noexcept
1558 : _vtable_mixin< S... >( _tag< D >{} )
1559 {
1560 }
1561};
1562
1566template < typename R, signature... S >
1567struct _ll_op : _ll_entry< S... >, R
1568{
1569 using operation_state_concept = operation_state_t;
1570
1571 _ll_op( auto& list, R receiver ) noexcept( std::is_nothrow_move_constructible_v< R > )
1572 : _ll_entry< S... >( _tag< _ll_op >{} )
1573 , R( std::move( receiver ) )
1574 , _list( list )
1575 {
1576 }
1577
1578 void start() noexcept
1579 {
1580 _list.link_back( *this );
1581 }
1582
1583private:
1584 zll::ll_list< _ll_entry< S... > >& _list;
1585};
1586
1588template < typename K, signature... S >
1589struct _sh_entry : _vtable_mixin< S... >, zll::sh_base< _sh_entry< K, S... > >
1590{
1591 K key;
1592
1593 template < typename D >
1594 _sh_entry( K k, _tag< D > ) noexcept( std::is_nothrow_move_constructible_v< K > )
1595 : _vtable_mixin< S... >{ _tag< D >{} }
1596 , key( std::move( k ) )
1597 {
1598 }
1599
1600 constexpr bool operator<( _sh_entry const& o ) const
1601 noexcept( noexcept( std::declval< K const& >() < std::declval< K const& >() ) )
1602 {
1603 return key < o.key;
1604 }
1605};
1606
1610template < typename R, typename K, signature... S >
1611struct _sh_op : _sh_entry< K, S... >, R
1612{
1613 using operation_state_concept = operation_state_t;
1614
1615 _sh_op( auto& heap, K key, R receiver ) noexcept(
1616 std::is_nothrow_move_constructible_v< R > && std::is_nothrow_move_constructible_v< K > )
1617 : _sh_entry< K, S... >( key, _tag< _sh_op >{} )
1618 , R( std::move( receiver ) )
1619 , _heap( heap )
1620 {
1621 }
1622
1623 void start() noexcept
1624 {
1625 _heap.link( *this );
1626 }
1627
1628private:
1629 zll::sh_heap< _sh_entry< K, S... > >& _heap;
1630};
1631
1635template < signature... S >
1636struct _ll_sender
1637{
1638 using sender_concept = sender_t;
1639
1640 _ll_sender( zll::ll_list< _ll_entry< S... > >& ll ) noexcept
1641 : _ll( ll )
1642 {
1643 }
1644
1645 using completion_signatures = ecor::completion_signatures< S... >;
1646
1647 template < receiver R >
1648 _ll_op< R, S... > connect( R receiver ) noexcept(
1649 noexcept( _ll_op< R, S... >{ _ll, std::move( receiver ) } ) )
1650 {
1651 static_assert(
1652 receiver_for_sigs< R, S... >,
1653 "Receiver does not satisfy the requirements for the sender's completion signatures" );
1654 return { _ll, std::move( receiver ) };
1655 }
1656
1657private:
1658 zll::ll_list< _ll_entry< S... > >& _ll;
1659};
1660
1664template < typename K, signature... S >
1665struct _sh_sender
1666{
1667 using sender_concept = sender_t;
1668
1669 _sh_sender( K key, zll::sh_heap< _sh_entry< K, S... > >& sh ) noexcept(
1670 std::is_nothrow_move_constructible_v< K > )
1671 : _key( key )
1672 , _sh( sh )
1673 {
1674 }
1675
1676 using completion_signatures = ecor::completion_signatures< S... >;
1677
1678 template < receiver R >
1679 _sh_op< R, K, S... > connect( R receiver ) && noexcept(
1680 noexcept( _sh_op< R, K, S... >{ _sh, std::move( _key ), std::move( receiver ) } ) )
1681 {
1682 static_assert(
1683 receiver_for_sigs< R, S... >,
1684 "Receiver does not satisfy the requirements for the sender's completion signatures" );
1685 return { _sh, std::move( _key ), std::move( receiver ) };
1686 }
1687
1688 template < receiver R >
1689 _sh_op< R, K, S... > connect( R receiver ) const& noexcept(
1690 noexcept( _sh_op< R, K, S... >{ _sh, _key, std::move( receiver ) } ) )
1691 {
1692 static_assert(
1693 receiver_for_sigs< R, S... >,
1694 "Receiver does not satisfy the requirements for the sender's completion signatures" );
1695 return { _sh, _key, std::move( receiver ) };
1696 }
1697
1698private:
1699 K _key;
1700 zll::sh_heap< _sh_entry< K, S... > >& _sh;
1701};
1702
1703
1709template < signature... S >
1711{
1712 using sender_type = _ll_sender< S... >;
1713
1715 _ll_sender< S... > schedule() noexcept
1716 {
1717 return ( _list );
1718 }
1719
1726 template < typename... Args >
1727 void set_value( Args&&... args )
1728 {
1729 static_assert(
1730 _vtable_can_call_value< _sig_vtable< S... >, Args... >,
1731 "Completion signatures do not contain set_value_t(Args...)" );
1732 for_each( [&]( auto& n ) {
1733 n._set_value( args... );
1734 } );
1735 }
1736
1743 template < typename E >
1744 void set_error( E&& err )
1745 {
1746 static_assert(
1747 _vtable_can_call_error< _sig_vtable< S... >, E >,
1748 "Completion signatures do not contain set_error_t(E)" );
1749 for_each( [&]( auto& n ) {
1750 n._set_error( err );
1751 } );
1752 }
1753
1759 {
1760 static_assert(
1761 _vtable_can_call_stopped< _sig_vtable< S... > >,
1762 "Completion signatures do not contain set_stopped_t()" );
1763 for_each( [&]( auto& n ) {
1764 n._set_stopped();
1765 } );
1766 }
1767
1768private:
1769 void for_each( auto&& f )
1770 {
1771 auto l = std::move( _list );
1772 if ( l.empty() )
1773 return;
1774
1775 auto& n = l.front();
1776 using Acc = typename _ll_entry< S... >::access;
1777 for ( _ll_entry< S... >* m = &n; m; m = _node( Acc::get( *m ).next ) )
1778 f( *m );
1779 }
1780
1781 zll::ll_list< _ll_entry< S... > > _list;
1782};
1783
1784
1792template < signature... S >
1794{
1795 using completion_sigs = completion_signatures< S... >;
1796 using sender_type = _ll_sender< S... >;
1797
1799 _ll_sender< S... > schedule() noexcept
1800 {
1801 return ( _ll );
1802 }
1803
1810 template < typename... V >
1811 void set_value( V&&... value )
1812 {
1813 static_assert(
1814 _vtable_can_call_value< _sig_vtable< S... >, V... >,
1815 "Completion signatures do not contain set_value_t(Args...)" );
1816 do_f( [&]( auto& n ) {
1817 n._set_value( (V&&) value... );
1818 } );
1819 }
1820
1827 template < typename E1 >
1828 void set_error( E1&& err )
1829 {
1830 static_assert(
1831 _vtable_can_call_error< _sig_vtable< S... >, E1 >,
1832 "Completion signatures do not contain set_error_t(E1)" );
1833 do_f( [&]( auto& n ) {
1834 n._set_error( (E1&&) err );
1835 } );
1836 }
1837
1843 {
1844 static_assert(
1845 _vtable_can_call_stopped< _sig_vtable< S... > >,
1846 "Completion signatures do not contain set_stopped_t()" );
1847 do_f( [&]( auto& n ) {
1848 n._set_stopped();
1849 } );
1850 }
1851
1852private:
1853 void do_f( auto f )
1854 {
1855 if ( _ll.empty() )
1856 return;
1857
1858 auto& n = _ll.take_front();
1859 f( n );
1860 }
1861
1862
1863 zll::ll_list< _ll_entry< S... > > _ll;
1864};
1865
1871template < typename K, signature... S >
1873{
1874 using sender_type = _sh_sender< K, S... >;
1875
1879 _sh_sender< K, S... >
1880 schedule( K key ) noexcept( noexcept( _sh_sender< K, S... >{ key, _sh } ) )
1881 {
1882 return { key, _sh };
1883 }
1884
1891 template < typename... V >
1892 void set_value( V&&... value )
1893 {
1894 static_assert(
1895 _vtable_can_call_value< _sig_vtable< S... >, V... >,
1896 "Completion signatures do not contain set_value_t(V)" );
1897 do_f( [&]( auto& n ) {
1898 n._set_value( (V&&) value... );
1899 } );
1900 }
1901
1908 template < typename E1 >
1909 void set_error( E1&& err )
1910 {
1911 static_assert(
1912 _vtable_can_call_error< _sig_vtable< S... >, E1 >,
1913 "Completion signatures do not contain set_error_t(E1)" );
1914 // XXX: should this treat all entries or just the front one?
1915 do_f( [&]( auto& n ) {
1916 n._set_error( (E1&&) err );
1917 } );
1918 }
1919
1925 {
1926 static_assert(
1927 _vtable_can_call_stopped< _sig_vtable< S... > >,
1928 "Completion signatures do not contain set_stopped_t()" );
1929 // XXX: should this treat all entries or just the front one?
1930 do_f( [&]( auto& n ) {
1931 n._set_stopped();
1932 } );
1933 }
1934
1938 [[nodiscard]] bool empty() const
1939 {
1940 return _sh.empty();
1941 }
1942
1947 [[nodiscard]] _sh_entry< K, S... > const& front() const
1948 {
1949 return *_sh.top;
1950 }
1951
1952private:
1953 void do_f( auto f )
1954 {
1955 if ( _sh.empty() )
1956 return;
1957
1958 auto& n = _sh.take();
1959 f( n );
1960 }
1961
1962 zll::sh_heap< _sh_entry< K, S... > > _sh;
1963};
1964
1965// ------------------------------------------------------------------------------
1966
1967
1968template < typename T >
1969struct _just_error
1970{
1971 using sender_concept = sender_t;
1972
1973 T err;
1974
1975 template < receiver R >
1976 struct _op
1977 {
1978 using operation_state_concept = operation_state_t;
1979
1980 T err;
1981 R receiver;
1982
1983 void start()
1984 {
1985 receiver.set_error( std::move( err ) );
1986 }
1987 };
1988
1989 template < receiver R >
1990 auto connect( R receiver ) && noexcept(
1991 std::is_nothrow_move_constructible_v< T > && std::is_nothrow_move_constructible_v< R > )
1992 {
1993 static_assert(
1994 receiver_for< R, _just_error< T > >,
1995 "Receiver does not satisfy the requirements for the sender's completion signatures" );
1996 return _op{ std::move( err ), std::move( receiver ) };
1997 }
1998
1999 using completion_signatures = ecor::completion_signatures< set_error_t( T ) >;
2000};
2001
2006template < typename T >
2007_just_error< T > just_error( T err )
2008{
2009 return _just_error< T >{ std::move( err ) };
2010}
2011
2012// ------------------------------------------------------------------------------
2013
2014enum class _awaitable_state_e : uint8_t
2015{
2016 empty,
2017 value
2018};
2019
2022struct _schedulable : zll::ll_base< _schedulable >
2023{
2024 virtual void resume() = 0;
2025};
2026
2032{
2037 {
2038 if ( _ready_tasks.empty() )
2039 return false;
2040 auto& t = _ready_tasks.front();
2041 _ready_tasks.detach_front();
2042 t.resume();
2043 return true;
2044 }
2045
2048 void run_n( std::size_t n )
2049 {
2050 for ( std::size_t i = 0; i < n; ++i )
2051 if ( !run_once() )
2052 break;
2053 }
2054
2057 void reschedule( _schedulable& op ) noexcept
2058 {
2059 _ready_tasks.link_back( op );
2060 }
2061
2062private:
2063 zll::ll_list< _schedulable > _ready_tasks;
2064};
2065
2066template < typename T >
2067struct _awaitable_expected
2068{
2069 _awaitable_expected() noexcept
2070 {
2071 }
2072
2073 _awaitable_state_e state = _awaitable_state_e::empty;
2074 union
2075 {
2076 T val;
2077 };
2078
2079 template < typename... Args >
2080 void set_value( Args&&... args ) noexcept( std::is_nothrow_constructible_v< T, Args... > )
2081 {
2082 new ( (void*) &val ) T( (Args&&) args... );
2083 state = _awaitable_state_e::value;
2084 }
2085
2086 ~_awaitable_expected() noexcept
2087 {
2088 if ( state == _awaitable_state_e::value )
2089 val.~T();
2090 }
2091};
2092
2093template <>
2094struct _awaitable_expected< void >
2095{
2096 _awaitable_expected() noexcept = default;
2097
2098 _awaitable_state_e state = _awaitable_state_e::empty;
2099
2100 void set_value() noexcept
2101 {
2102 state = _awaitable_state_e::value;
2103 }
2104};
2105
2109template < typename T >
2110struct _awaitable_extract_type;
2111
2112template < typename... U >
2113struct _awaitable_extract_type< std::variant< std::tuple< U... > > >
2114{
2115 using type = std::tuple< U... >;
2116};
2117
2118template < typename U >
2119struct _awaitable_extract_type< std::variant< std::tuple< U > > >
2120{
2121 using type = U;
2122};
2123
2124template <>
2125struct _awaitable_extract_type< std::variant< std::tuple<> > >
2126{
2127 using type = void;
2128};
2129
2130template <>
2131struct _awaitable_extract_type< std::variant<> >
2132{
2133 using type = void;
2134};
2135
2142template < typename PromiseType, typename S >
2143struct _task_awaitable
2144{
2145 _task_awaitable( S sender )
2146 : _exp()
2147 , _op( std::move( sender ).connect( _receiver{ ._self = this } ) )
2148 {
2149 }
2150
2152 [[nodiscard]] bool await_ready() const noexcept
2153 {
2154 return false;
2155 }
2156
2158 void await_suspend( std::coroutine_handle< PromiseType > ch ) noexcept
2159 {
2160 _promise = &ch.promise();
2161 _op.start();
2162 }
2163
2167 decltype( auto ) await_resume() noexcept
2168 {
2169 ECOR_ASSERT( _exp.state == _awaitable_state_e::value );
2170 if constexpr ( std::same_as< value_type, void > )
2171 return;
2172 else
2173 return std::move( _exp.val );
2174 }
2175
2176 using _env = env_type< PromiseType >;
2177 using _completions = _sender_completions_t< std::remove_cvref_t< S >, _env >;
2178 using _values = _filter_values_as_variant_tuple_t< _completions >;
2179 static_assert(
2180 std::variant_size_v< _values > <= 1,
2181 "Multiple set_value completions in awaitable not supported" );
2182
2184 using value_type = typename _awaitable_extract_type< _values >::type;
2185
2186 struct _receiver
2187 {
2188 using receiver_concept = receiver_t;
2189
2190 _task_awaitable* _self;
2191
2192 template < typename... Ts >
2193 void set_value( Ts&&... vals ) noexcept(
2194 std::is_nothrow_constructible_v< value_type, Ts... > ||
2195 std::same_as< value_type, void > )
2196 {
2197 _self->_exp.set_value( (Ts&&) vals... );
2198 _self->_promise->core.reschedule( *_self->_promise );
2199 }
2200
2201 template < typename E >
2202 void set_error( E&& err )
2203 {
2204 _self->_promise->invoke_set_error( (E&&) err );
2205 }
2206
2207 void set_stopped()
2208 {
2209 _self->_promise->invoke_set_stopped();
2210 }
2211
2212 [[nodiscard]] decltype( auto ) get_env() const noexcept
2213 {
2214 return _self->_promise->get_env();
2215 }
2216 };
2217
2218
2219private:
2220 PromiseType* _promise;
2221 _awaitable_expected< value_type > _exp;
2222
2223 using _op_t = connect_type< S, _receiver >;
2224 _op_t _op;
2225};
2226
2227template < class Alloc >
2228concept simple_allocator = requires( Alloc alloc, size_t n ) {
2229 { *alloc.allocate( n ) } -> std::same_as< typename Alloc::value_type& >;
2230 { alloc.deallocate( alloc.allocate( n ), n ) };
2231} && std::copy_constructible< Alloc > && std::equality_comparable< Alloc >;
2232
2233
2244{
2245 template < typename M >
2246 task_memory_resource( M& m ) noexcept
2247 : mem( &m )
2248 {
2249 alloc = +[]( void* mem, std::size_t const sz, std::size_t const align ) {
2250 return ecor::allocate( *( (M*) mem ), sz, align );
2251 };
2252 dealloc = +[]( void* mem, void* p, std::size_t const sz, std::size_t const align ) {
2253 ecor::deallocate( *( (M*) mem ), p, sz, align );
2254 };
2255 }
2256
2257 [[nodiscard]] void* allocate( std::size_t const sz, std::size_t const align ) const
2258 {
2259 return alloc( mem, sz, align );
2260 }
2261
2262 void deallocate( void* p, std::size_t const sz, std::size_t const align ) const
2263 {
2264 dealloc( mem, p, sz, align );
2265 }
2266
2267 void* ( *alloc )( void*, std::size_t const, std::size_t const ) = nullptr;
2268 void ( *dealloc )( void*, void*, std::size_t const, std::size_t const ) = nullptr;
2269 void* mem = nullptr;
2270};
2271
2273{
2274 template < typename T >
2275 decltype( auto ) operator()( T& t ) const noexcept
2276 {
2277 return t.query( get_task_core_t{} );
2278 }
2279};
2281inline constexpr get_task_core_t get_task_core{};
2282
2284{
2285 template < typename T >
2286 decltype( auto ) operator()( T& t ) const noexcept
2287 {
2288 return t.query( get_memory_resource_t{} );
2289 }
2290};
2292inline constexpr get_memory_resource_t get_memory_resource{};
2293
2301{
2303 task_core core;
2304
2305 task_ctx( auto& mem )
2306 : alloc( mem )
2307 {
2308 }
2309
2310 auto& query( get_task_core_t ) noexcept
2311 {
2312 return core;
2313 }
2314
2315 auto& query( get_memory_resource_t ) noexcept
2316 {
2317 return alloc;
2318 }
2319};
2320
2324template < typename T >
2325concept task_context = requires( T t ) {
2326 { get_task_core( t ) } -> std::same_as< task_core& >;
2327 { get_memory_resource( t ) } -> std::same_as< task_memory_resource& >;
2328};
2329
2332{
2334};
2335
2341template < typename T >
2342concept task_config = requires() { typename T::extra_error_signatures; };
2343
2344template < typename T, task_config CFG = task_default_cfg >
2345struct task;
2346
2348enum class task_error : uint8_t
2349{
2350 none = 1,
2351 task_unfinished = 2,
2352 task_allocation_failure = 3,
2353 task_already_started = 4,
2354 task_unhandled_exception = 5,
2355 task_missing = 6,
2356};
2357
2361struct _promise_base : _schedulable
2362{
2363 static constexpr std::size_t align = alignof( std::max_align_t );
2364 static constexpr std::size_t spacing = align > sizeof( void* ) ? align : sizeof( void* );
2365
2368 static void* _alloc( std::size_t sz, task_memory_resource& mem )
2369 {
2370 sz += spacing;
2371 void* const vp = allocate( mem, sz, align );
2372 if ( !vp )
2373 return nullptr;
2374 auto* pmem = &mem;
2375 std::memcpy( vp, (void const*) &pmem, sizeof( void* ) );
2376 return ( (char*) vp ) + spacing;
2377 }
2378
2382 static void _dealloc( void* const ptr, std::size_t const sz )
2383 {
2384 void* beg = ( (char*) ptr ) - spacing;
2385 task_memory_resource* mem = nullptr;
2386 std::memcpy( (void*) &mem, beg, sizeof( void* ) );
2387 deallocate( *mem, beg, sz + spacing, align );
2388 }
2389
2390 void* operator new( std::size_t const sz, task_context auto&& ctx, auto&&... ) noexcept
2391 {
2393 task_memory_resource& a = get_memory_resource( ctx );
2394 return _alloc( sz, a );
2395 }
2396
2397 void operator delete( void* const ptr, std::size_t const sz ) noexcept
2398 {
2400 _dealloc( ptr, sz );
2401 }
2402
2403 _promise_base( _promise_base const& ) = delete;
2404 _promise_base& operator=( _promise_base const& ) = delete;
2405
2408 _promise_base( task_context auto&& ctx, auto&&... )
2409 : core( get_task_core( ctx ) )
2410 , token()
2411 {
2412 }
2413
2415 std::suspend_always initial_suspend() noexcept
2416 {
2417 return {};
2418 }
2419
2420 task_core& core;
2421 inplace_stop_token token;
2422
2423 using _env = stop_token_env< inplace_stop_token& >;
2424
2425 _env get_env() noexcept
2426 {
2427 return { token };
2428 }
2429};
2430
2431template < typename Task, typename CFG >
2432struct _promise_type;
2433
2435template < typename Task, typename CFG, typename Val = typename Task::value_type >
2436struct _promise_return_mixin
2437{
2438 void return_value( Val v )
2439 {
2440 static_cast< _promise_type< Task, CFG >* >( this )->invoke_set_value(
2441 std::move( v ) );
2442 }
2443};
2444
2445template < typename Task, typename CFG >
2446struct _promise_return_mixin< Task, CFG, void >
2447{
2448 void return_void()
2449 {
2450 static_cast< _promise_type< Task, CFG >* >( this )->invoke_set_value();
2451 }
2452};
2453
2467template < typename Task, typename CFG >
2468struct _promise_type : _promise_base, _promise_return_mixin< Task, CFG, typename Task::value_type >
2469{
2470 using value_type = typename Task::value_type;
2471 using _promise_base::_promise_base;
2472 using vtable = _apply_to_sigs_t< _sig_vtable, typename Task::completion_signatures >;
2473
2474 _promise_type( _promise_type const& ) = delete;
2475 _promise_type& operator=( _promise_type const& ) = delete;
2476
2477 static Task get_return_object_on_allocation_failure()
2478 {
2479 return {
2480 std::coroutine_handle< _promise_type >{}, task_error::task_allocation_failure };
2481 }
2482
2483 Task get_return_object()
2484 {
2485 return { std::coroutine_handle< _promise_type >::from_promise( *this ) };
2486 }
2487
2488 template < typename E >
2489 requires( _vtable_can_call_error< vtable, E > )
2490 auto yield_value( with_error< E > error )
2491 {
2492 return await_transform( just_error( std::move( error.error ) ) );
2493 }
2494
2495 template < typename T >
2496 decltype( auto ) await_transform( T&& x ) noexcept
2497 {
2498 using U = std::remove_cvref_t< T >;
2499 if constexpr ( sender< U > ) {
2500 using compls = _sender_completions_t< U, typename _promise_base::_env >;
2501 using errs = _filter_errors_of_t< compls >;
2502 static_assert(
2503 _sig_is_underset_of< errs, typename Task::_error_completions >,
2504 "Error signatures are not compatible" );
2505 using vals = _filter_values_as_variant_tuple_t< compls >;
2506 static_assert(
2507 std::variant_size_v< vals > <= 1,
2508 "Sender used in co_await must only complete with a single set_value signature" );
2509 return _task_awaitable< _promise_type, T >{ (T&&) x };
2510 } else {
2511 return (T&&) x;
2512 }
2513 }
2514
2515 void unhandled_exception() noexcept
2516 {
2517 this->invoke_set_error( task_error::task_unhandled_exception );
2518 }
2519
2520 void resume() override
2521 {
2522 auto h = std::coroutine_handle< _promise_type >::from_promise( *this );
2523 h.resume();
2524 }
2525
2526 template < typename U >
2527 void invoke_set_error( U&& err )
2528 {
2529 ECOR_ASSERT( this->_recv );
2530 if ( this->_recv ) {
2531 auto* r = std::exchange( this->_recv, nullptr );
2532 r->invoke( set_error_t{}, _obj, (U&&) err );
2533 }
2534 }
2535
2536 template < typename... U >
2537 void invoke_set_value( U&&... v )
2538 {
2539 ECOR_ASSERT( this->_recv );
2540 if ( this->_recv ) {
2541 auto* r = std::exchange( this->_recv, nullptr );
2542 r->invoke( set_value_t{}, this->_obj, (U&&) v... );
2543 }
2544 }
2545
2546 void invoke_set_stopped()
2547 {
2548 ECOR_ASSERT( this->_recv );
2549 if ( this->_recv ) {
2550 auto* r = std::exchange( this->_recv, nullptr );
2551 r->invoke( set_stopped_t{}, this->_obj );
2552 }
2553 }
2554
2555 std::suspend_always final_suspend() noexcept
2556 {
2557 if ( _recv ) {
2558 auto* r = std::exchange( this->_recv, nullptr );
2559 r->invoke( set_error_t{}, _obj, task_error::task_unfinished );
2560 }
2561 return {};
2562 }
2563
2564 template < typename U >
2565 void setup_continuable( U& p )
2566 {
2567 this->_recv = &_vtable_of< U, U, vtable >;
2568 this->_obj = static_cast< void* >( &p );
2569 }
2570
2571 ~_promise_type() noexcept
2572 {
2573 if ( _recv )
2574 this->_recv->invoke( set_stopped_t{}, _obj );
2575 }
2576
2577 vtable const* _recv = nullptr;
2578 void* _obj = nullptr;
2579};
2580
2587template < typename T, task_config CFG, typename R >
2588struct _task_op
2589{
2590 using promise_type = _promise_type< task< T, CFG >, CFG >;
2591 using operation_state_concept = operation_state_t;
2592
2593 _task_op() noexcept = default;
2594 _task_op( task_error err, std::coroutine_handle< promise_type > h, R r ) noexcept(
2595 std::is_nothrow_move_constructible_v< R > )
2596 : _h( h )
2597 , _recv( std::move( r ) )
2598 , _error( err )
2599 {
2600 }
2601
2602 _task_op( _task_op const& ) = delete;
2603 _task_op& operator=( _task_op const& ) = delete;
2604 _task_op( _task_op&& other ) noexcept
2605 : _h( std::exchange( other._h, nullptr ) )
2606 , _recv( std::move( other._recv ) )
2607 , _error( std::exchange( other._error, task_error::none ) )
2608 {
2609 }
2610
2611 _task_op& operator=( _task_op&& other ) noexcept
2612 {
2613 if ( this == &other )
2614 return *this;
2615 _task_op cpy{ std::move( other ) };
2616 std::swap( _h, cpy._h );
2617 std::swap( _recv, cpy._recv );
2618 std::swap( _error, cpy._error );
2619 return *this;
2620 }
2621
2633 void start()
2634 {
2635 if ( _error != task_error::none ) {
2636 _recv.set_error( _error );
2637 return;
2638 }
2639 if ( !_h ) {
2640 _recv.set_error( task_error::task_missing );
2641 return;
2642 }
2643 if ( _h.done() ) {
2644 _recv.set_error( task_error::task_already_started );
2645 return;
2646 }
2647 if constexpr ( !std::same_as<
2648 decltype( ecor::get_stop_token( ecor::get_env( _recv ) ) ),
2649 never_stop_token > )
2650 _h.promise().token = ecor::get_stop_token( ecor::get_env( _recv ) );
2651 _h.promise().setup_continuable( _recv );
2652 _h.promise().core.reschedule( _h.promise() );
2653 }
2654
2655 void clear()
2656 {
2657 if ( _h )
2658 _h.destroy();
2659 _h = {};
2660 _recv = {};
2661 _error = task_error::none;
2662 }
2663
2664 operator bool()
2665 {
2666 return this->_h && !this->_h.done();
2667 }
2668
2674 ~_task_op()
2675 {
2676 if ( this->_h )
2677 this->_h.destroy();
2678 }
2679
2680 std::coroutine_handle< promise_type > _h;
2681 R _recv;
2682 task_error _error = task_error::none;
2683};
2684
2724template < typename T, task_config CFG >
2725struct task
2726{
2727 using sender_concept = sender_t;
2728 using value_type = T;
2729 using promise_type = _promise_type< task, CFG >;
2730
2731 using _error_completions =
2732 _sigs_append_t< typename CFG::extra_error_signatures, set_error_t( task_error ) >;
2733
2734 using completion_signatures =
2735 _sigs_append_t< _error_completions, _value_setter_t< T >, set_stopped_t() >;
2736
2737 static_assert(
2738 alignof( promise_type ) <= alignof( std::max_align_t ),
2739 "Unsupported alignment" );
2740
2743 std::coroutine_handle< promise_type > handle,
2744 task_error error = task_error::none ) noexcept
2745 : _h( handle )
2746 , _error( error )
2747 {
2748 }
2749
2750 task( task const& ) = delete;
2751 task& operator=( task const& ) = delete;
2752 task( task&& other ) noexcept
2753 : _h( std::exchange( other._h, nullptr ) )
2754 , _error( std::exchange( other._error, task_error::none ) )
2755 {
2756 }
2757
2758 task& operator=( task&& other ) noexcept
2759 {
2760 if ( this != &other ) {
2761 task tmp = std::move( other );
2762 std::swap( _h, tmp._h );
2763 std::swap( _error, tmp._error );
2764 }
2765 return *this;
2766 }
2767
2774 template < receiver R >
2775 auto connect( R receiver ) &&
2776 {
2777 static_assert(
2779 "Receiver does not satisfy the requirements for the task's completion signatures" );
2780 auto tmp = _h;
2781 _h = nullptr;
2782 return _task_op< T, CFG, R >{ _error, std::move( tmp ), std::move( receiver ) };
2783 }
2784
2785 ~task()
2786 {
2787 if ( _h && !_h.done() )
2788 _h.destroy();
2789 }
2790
2791private:
2792 std::coroutine_handle< promise_type > _h;
2793 task_error _error = task_error::none;
2794};
2795
2797
2798template < typename S1, typename S2, typename R >
2799struct _or_op
2800{
2801 using operation_state_concept = operation_state_t;
2802
2803 _or_op( S1 s1, S2 s2, R r )
2804 : _recv( std::move( r ) )
2805 , _op1( std::move( s1 ).connect( _r{ ._fired = _fired, ._recv = _recv } ) )
2806 , _op2( std::move( s2 ).connect( _r{ ._fired = _fired, ._recv = _recv } ) )
2807 {
2808 }
2809
2810 // XXX: cancel of the other one shall be used instead
2811 void start()
2812 {
2813 _op1.start();
2814 _op2.start();
2815 }
2816
2817 struct _r
2818 {
2819 using receiver_concept = receiver_t;
2820
2821 bool& _fired;
2822 R& _recv;
2823
2824 template < typename... Args >
2825 void set_value( Args&&... v )
2826 {
2827 if ( _fired )
2828 return;
2829 _fired = true;
2830 _recv.set_value( (Args&&) v... );
2831 }
2832
2833 template < typename T >
2834 void set_error( T&& e )
2835 {
2836 if ( _fired )
2837 return;
2838 _fired = true;
2839 _recv.set_error( (T&&) e );
2840 }
2841
2842 // XXX: investigate whenever this should cancel the other operation
2843 void set_stopped()
2844 {
2845 if ( _fired )
2846 return;
2847 _fired = true;
2848 _recv.set_stopped();
2849 }
2850
2851 decltype( auto ) get_env() noexcept
2852 {
2853 return ecor::get_env( _recv );
2854 }
2855 };
2856
2857 R _recv;
2858 bool _fired = false;
2859
2860 connect_type< S1, _r > _op1{};
2861 connect_type< S2, _r > _op2{};
2862};
2863
2864template < typename S1, typename S2 >
2865struct _or_sender
2866{
2867 using sender_concept = sender_t;
2868
2869 _or_sender( S1 s1, S2 s2 )
2870 : _s1( std::move( s1 ) )
2871 , _s2( std::move( s2 ) )
2872 {
2873 }
2874
2875 template < typename Env >
2876 using _completions =
2877 _sigs_merge_t< _sender_completions_t< S1, Env >, _sender_completions_t< S2, Env > >;
2878
2879 template < typename Env >
2880 _completions< Env > get_completion_signatures( Env&& ) noexcept
2881 {
2882 return {};
2883 }
2884
2885
2886 template < receiver R >
2887 _or_op< S1, S2, R > connect( R receiver )
2888 {
2889 static_assert(
2890 receiver_for< R, _or_sender >,
2891 "Receiver does not satisfy the requirements for the combined completion signatures of the two senders" );
2892 return { std::move( _s1 ), std::move( _s2 ), std::move( receiver ) };
2893 }
2894
2895private:
2896 S1 _s1;
2897 S2 _s2;
2898};
2899
2904template < typename S1, typename S2 >
2905_or_sender< S1, S2 > operator||( S1 s1, S2 s2 )
2906{
2907 return { std::move( s1 ), std::move( s2 ) };
2908}
2909
2911
2912template < sender S >
2913struct _as_variant
2914{
2915 using sender_concept = sender_t;
2916
2917 template < typename Env >
2918 using _s_completions = _sender_completions_t< S, Env >;
2919
2922 template < typename Env >
2923 using _values = typename _filter_map_tag<
2924 std::variant<>,
2925 set_value_t,
2926 _s_completions< Env >,
2927 _type_identity_t >::type;
2928
2929 template < typename Env >
2930 using _error = _filter_errors_of_t< _s_completions< Env > >;
2931
2932 template < typename Env >
2933 using _stopped = _filter_stopped_of_t< _s_completions< Env > >;
2934
2935 template < typename Env >
2936 using _completions = _sigs_concat_t<
2937 completion_signatures< set_value_t( _values< Env > ) >,
2938 _error< Env >,
2939 _stopped< Env > >;
2940
2941 _as_variant( S s )
2942 : _s( std::move( s ) )
2943 {
2944 }
2945
2946 template < typename Env >
2947 _completions< Env > get_completion_signatures( Env&& ) noexcept
2948 {
2949 return {};
2950 }
2951
2952
2953 template < typename R >
2954 struct _recv : R
2955 {
2956 using receiver_concept = receiver_t;
2957
2958 _recv( R r )
2959 : R( std::move( r ) )
2960 {
2961 }
2962
2963 using _env = env_type< R >;
2964 using value_type = _values< _env >;
2965
2966 template < typename T >
2967 void set_value( T&& val ) noexcept
2968 {
2969 R::set_value( value_type{ (T&&) val } );
2970 }
2971 };
2972
2973 template < receiver R >
2974 auto connect( R receiver ) &&
2975 {
2976 static_assert(
2977 is_all_singular_v< _s_completions< env_type< R > > >,
2978 "All set_value signatures of sender must be singular (exactly one argument) to use as_variant" );
2979 return std::move( _s ).connect( _recv< R >{ std::move( receiver ) } );
2980 }
2981
2982private:
2983 S _s;
2984};
2985
2995[[maybe_unused]] static inline struct as_variant_t
2996{
2997 template < sender S >
2998 auto operator()( S s ) const
2999 {
3000 return _as_variant< S >{ std::move( s ) };
3001 }
3002
3003} as_variant;
3004
3007auto operator|( auto s, as_variant_t )
3008{
3009 return as_variant( std::move( s ) );
3010}
3011
3013
3014template < sender S >
3015struct _err_to_val
3016{
3017 using sender_concept = sender_t;
3018
3019 template < typename Env >
3020 using _s_completions = _sender_completions_t< S, Env >;
3021
3022 template < typename Env >
3023 using _s_values = _filter_values_of_t< _s_completions< Env > >;
3024
3025 template < typename Env >
3026 using _stopped = _filter_stopped_of_t< _s_completions< Env > >;
3027
3028 template < typename Env >
3029 using _s_errors_as_val = typename _filter_map_tag<
3030 completion_signatures<>,
3031 set_error_t,
3032 _s_completions< Env >,
3033 _sig_generator_t< set_value_t >::type >::type;
3034
3035 template < typename Env >
3036 using _completions =
3037 _sigs_concat_t< _s_values< Env >, _s_errors_as_val< Env >, _stopped< Env > >;
3038
3039 template < typename Env >
3040 _completions< Env > get_completion_signatures( Env&& ) noexcept
3041 {
3042 return {};
3043 }
3044
3045 _err_to_val( S s )
3046 : _s( std::move( s ) )
3047 {
3048 }
3049
3050 template < typename R >
3051 struct _recv : R
3052 {
3053 using receiver_concept = receiver_t;
3054
3055 _recv( R r )
3056 : R( std::move( r ) )
3057 {
3058 }
3059
3060 template < typename... Ts >
3061 void set_error( Ts&&... errs )
3062 {
3063 R::set_value( (Ts&&) errs... );
3064 }
3065 };
3066
3067 template < receiver R >
3068 auto connect( R receiver ) &&
3069 {
3070 static_assert(
3071 receiver_for< R, _err_to_val >,
3072 "Receiver does not satisfy the requirements for the err_to_val transformation" );
3073 return std::move( _s ).connect( _recv< R >{ std::move( receiver ) } );
3074 }
3075
3076private:
3077 S _s;
3078};
3079
3082[[maybe_unused]] static inline struct err_to_val_t
3083{
3084 template < sender S >
3085 auto operator()( S s ) const
3086 {
3087 return _err_to_val< S >{ std::move( s ) };
3088 }
3089
3090} err_to_val;
3091
3094auto operator|( auto s, err_to_val_t )
3095{
3096 return err_to_val( std::move( s ) );
3097}
3098
3100
3101template < sender S >
3102struct _sink_err
3103{
3104 using sender_concept = sender_t;
3105
3106 template < typename Env >
3107 using _s_completions = _sender_completions_t< S, Env >;
3108
3109 template < typename Env >
3110 using _s_errors = typename _filter_map_tag<
3111 std::variant<>,
3112 set_error_t,
3113 _s_completions< Env >,
3114 _type_identity_t >::type;
3115
3116 template < typename Env >
3117 using _s_errors_as_val = std::optional< _s_errors< Env > >;
3118
3119 template < typename Env >
3120 using _s_values = _filter_values_of_t< _s_completions< Env > >;
3121
3122 template < typename Env >
3123 using _s_stopped = _filter_stopped_of_t< _s_completions< Env > >;
3124
3125 template < typename Env >
3126 using _completions = _sigs_merge_t<
3127 completion_signatures< set_value_t( _s_errors_as_val< Env > ) >,
3128 _s_stopped< Env > >;
3129
3130 template < typename Env >
3131 _completions< Env > get_completion_signatures( Env&& ) noexcept
3132 {
3133 return {};
3134 }
3135
3136 template < typename R >
3137 struct _recv : R
3138 {
3139 using receiver_concept = receiver_t;
3140
3141 _recv( R r )
3142 : R( std::move( r ) )
3143 {
3144 }
3145
3146 using _env = env_type< R >;
3147 using value_t = _s_errors_as_val< _env >;
3148
3149 void set_value() noexcept
3150 {
3151 R::set_value( value_t{} );
3152 }
3153
3154 template < typename T >
3155 void set_error( T&& err ) noexcept
3156 {
3157 R::set_value( value_t{ (T&&) err } );
3158 }
3159 };
3160
3161 template < receiver R >
3162 auto connect( R receiver ) &&
3163 {
3164 using _env = env_type< R >;
3165 static_assert(
3166 std::variant_size_v< _s_errors< _env > > > 0,
3167 "Sender used with sink_err must have at least one error type" );
3168 static_assert(
3169 std::same_as< _s_values< _env >, completion_signatures<> > ||
3170 std::same_as< _s_values< _env >, completion_signatures< set_value_t() > >,
3171 "Sender used with sink_err must not complete with set_value, or there has to be only one set_value of shape set_value_t()" );
3172 static_assert(
3173 receiver_for< R, _sink_err< S > >,
3174 "Receiver does not satisfy the requirements for the sink_err sender's completion signatures" );
3175 return std::move( _s ).connect( _recv< R >{ std::move( receiver ) } );
3176 }
3177
3178 S _s;
3179};
3180
3186[[maybe_unused]] static inline struct sink_err_t
3187{
3188
3189 auto operator()( sender auto s ) const noexcept
3190 {
3191 return _sink_err{ std::move( s ) };
3192 }
3193
3194} sink_err;
3195
3198auto operator|( sender auto s, sink_err_t ) noexcept
3199{
3200 return _sink_err{ std::move( s ) };
3201}
3202
3204
3207template < typename Ret >
3208struct _then_value_sig_ret
3209{
3210 using type = set_value_t( Ret );
3211};
3212
3213template <>
3214struct _then_value_sig_ret< void >
3215{
3216 using type = set_value_t();
3217};
3218
3219template < typename F >
3220struct _then_value_sig
3221{
3222 template < typename... Args >
3223 using type = typename _then_value_sig_ret< std::invoke_result_t< F, Args... > >::type;
3224};
3225
3226template < sender S, typename F >
3227struct _then
3228{
3229 using sender_concept = sender_t;
3230
3231 template < typename Env >
3232 using _s_completions = _sender_completions_t< S, Env >;
3233
3234 template < typename Env >
3235 using _s_errors = _filter_errors_of_t< _s_completions< Env > >;
3236
3237 template < typename Env >
3238 using _s_stopped = _filter_stopped_of_t< _s_completions< Env > >;
3239
3240 template < typename Env >
3241 using _s_mapped_values = typename _filter_map_tag<
3242 completion_signatures<>,
3243 set_value_t,
3244 _s_completions< Env >,
3245 _then_value_sig< F >::template type >::type;
3246
3247 template < typename Env >
3248 using _completions =
3249 _sigs_merge_t< _s_mapped_values< Env >, _s_errors< Env >, _s_stopped< Env > >;
3250
3251 template < typename Env >
3252 _completions< Env > get_completion_signatures( Env&& ) noexcept
3253 {
3254 return {};
3255 }
3256
3257 template < typename R >
3258 struct _recv : R
3259 {
3260 using receiver_concept = receiver_t;
3261
3262 F _f;
3263
3264 _recv( R r, F f )
3265 : R( std::move( r ) )
3266 , _f( std::move( f ) )
3267 {
3268 }
3269
3270 template < typename... Args >
3271 void set_value( Args&&... args )
3272 {
3273 if constexpr ( std::is_void_v< std::invoke_result_t< F, Args... > > ) {
3274 std::move( _f )( (Args&&) args... );
3275 R::set_value();
3276 } else {
3277 R::set_value( std::move( _f )( (Args&&) args... ) );
3278 }
3279 }
3280 };
3281
3282 template < receiver R >
3283 auto connect( R receiver ) &&
3284 {
3285 static_assert(
3286 receiver_for< R, _then >,
3287 "Receiver does not satisfy the requirements for the then sender's completion signatures" );
3288 return std::move( _s ).connect(
3289 _recv< R >{ std::move( receiver ), std::move( _f ) } );
3290 }
3291
3292 S _s;
3293 F _f;
3294};
3295
3297template < typename F >
3298struct _then_closure
3299{
3300 F _f;
3301};
3302
3303template < sender S, typename F >
3304auto operator|( S s, _then_closure< F > c )
3305{
3306 return _then< S, F >{ std::move( s ), std::move( c._f ) };
3307}
3308
3314[[maybe_unused]] static inline struct then_t
3315{
3316 template < typename F >
3317 auto operator()( F f ) const
3318 {
3319 return _then_closure< F >{ std::move( f ) };
3320 }
3321
3322 template < sender S, typename F >
3323 auto operator()( S s, F f ) const
3324 {
3325 return _then< S, F >{ std::move( s ), std::move( f ) };
3326 }
3327
3328} then;
3329
3331
3336template < task_config CFG = task_default_cfg >
3337struct _task_holder_base : _schedulable
3338{
3339 using _task_t = task< void, CFG >;
3340
3341 _task_holder_base( _task_holder_base const& ) = delete;
3342 _task_holder_base& operator=( _task_holder_base const& ) = delete;
3343 _task_holder_base( _task_holder_base&& ) = delete;
3344 _task_holder_base& operator=( _task_holder_base&& ) = delete;
3345
3346 explicit _task_holder_base( task_core& core )
3347 : _core( core )
3348 {
3349 }
3350
3353 void start()
3354 {
3355 _core.reschedule( *this );
3356 }
3357
3360 _ll_sender< set_value_t() > stop()
3361 {
3362 _stop_src.request_stop();
3363 return _done_src.schedule();
3364 }
3365
3366protected:
3368 virtual _task_t _make_task() = 0;
3369
3370private:
3371 struct _recv
3372 {
3373 using receiver_concept = receiver_t;
3374
3375 _task_holder_base* _holder;
3376
3377 template < typename... Args >
3378 void set_value( Args&&... ) noexcept
3379 {
3380 _holder->_core.reschedule( *_holder );
3381 }
3382
3383 template < typename E >
3384 void set_error( E&& ) noexcept
3385 {
3386 _holder->_core.reschedule( *_holder );
3387 }
3388
3389 void set_stopped() noexcept
3390 {
3391 _holder->_core.reschedule( *_holder );
3392 }
3393
3394 [[nodiscard]] stop_token_env< inplace_stop_token > get_env() const noexcept
3395 {
3396 return { _holder->_stop_src.get_token() };
3397 }
3398 };
3399
3400 using _op_t = connect_type< _task_t, _recv >;
3401
3402 void resume() override
3403 {
3404 if ( _op.has_value() ) {
3405 _op->clear(); // explicitly free the done coroutine frame
3406 _op.reset();
3407 }
3408 if ( _stop_src.stop_requested() ) {
3409 _done_src.set_value();
3410 } else {
3411 _op.emplace( _make_task().connect( _recv{ this } ) );
3412 _op->start();
3413 }
3414 }
3415
3416 task_core& _core;
3417 inplace_stop_source _stop_src;
3418 broadcast_source< set_value_t() > _done_src;
3419 std::optional< _op_t > _op;
3420};
3421
3437template <
3438 task_config CFG = task_default_cfg,
3439 task_context Ctx = task_ctx,
3440 typename Factory = void >
3441struct task_holder : _task_holder_base< CFG >
3442{
3443 using base = _task_holder_base< CFG >;
3444
3445 task_holder( Ctx& ctx, Factory factory )
3446 : _task_holder_base< CFG >( get_task_core( ctx ) )
3447 , _ctx( ctx )
3448 , _factory( std::move( factory ) )
3449 {
3450 }
3451
3452 using base::start;
3453 using base::stop;
3454
3455private:
3456 task< void, CFG > _make_task() override
3457 {
3458 return _factory( _ctx );
3459 }
3460
3461 Ctx& _ctx;
3462 Factory _factory;
3463};
3464
3467template < task_context Ctx, typename Factory >
3469
3471
3472struct _wait_until_stopped
3473{
3474 using sender_concept = sender_t;
3475
3477
3478 template < typename R >
3479 struct _op
3480 {
3481 using operation_state_concept = operation_state_t;
3482
3483 _op( R r )
3484 : _r( std::move( r ) )
3485 {
3486 }
3487
3488 void start()
3489 {
3490 auto st = get_stop_token( ecor::get_env( _r ) );
3491 static_assert(
3492 !std::same_as< decltype( st ), never_stop_token >,
3493 "Receiver must be stoppable" );
3494 _cb.emplace( st, cb_t{ .op = this } );
3495 }
3496
3497 R _r;
3498
3499 struct cb_t
3500 {
3501 void operator()() noexcept
3502 {
3503 op->_r.set_value();
3504 }
3505 _op* op;
3506 };
3507
3508 std::optional< inplace_stop_callback< cb_t > > _cb;
3509 };
3510
3511 template < receiver R >
3512 auto connect( R receiver ) &&
3513 {
3514 static_assert(
3515 receiver_for< R, _wait_until_stopped >,
3516 "Receiver does not satisfy the requirements for wait_until_stopped's completion signatures" );
3517 return _op{ std::move( receiver ) };
3518 }
3519};
3520
3524[[maybe_unused]]
3525static inline _wait_until_stopped wait_until_stopped;
3526
3528
3535struct _suspend_awaiter
3536{
3537 [[nodiscard]] bool await_ready() const noexcept
3538 {
3539 return false;
3540 }
3541
3542 template < typename Promise >
3543 void await_suspend( std::coroutine_handle< Promise > h ) noexcept
3544 {
3545 auto& p = h.promise();
3546 if ( p.token.stop_requested() ) {
3547 p.invoke_set_stopped();
3548 return;
3549 }
3550 p.core.reschedule( p );
3551 }
3552
3553 void await_resume() noexcept
3554 {
3555 }
3556};
3557
3568[[maybe_unused]]
3569static inline _suspend_awaiter suspend;
3570
3573
3574template < typename T >
3575concept _has_member_async_destroy = requires( T& obj ) {
3576 { obj.async_destroy() } -> sender;
3577};
3578
3579template < typename Ctx, typename T >
3580concept _has_adl_async_destroy = requires( Ctx& ctx, T& obj ) {
3581 { async_destroy( ctx, obj ) } -> sender;
3582};
3583
3593struct _async_destroy_t
3594{
3595 template < typename Ctx, typename T >
3596 ecor::sender auto operator()( Ctx& ctx, T& obj ) const
3597 {
3598 if constexpr ( _has_adl_async_destroy< Ctx, T > )
3599 return async_destroy( ctx, obj );
3600 else
3601 return obj.async_destroy();
3602 }
3603};
3604inline constexpr _async_destroy_t async_destroy{};
3605
3606enum class _cb_state : uint8_t
3607{
3608 alive,
3609 queued,
3610 destroying,
3611};
3612
3613struct _async_arena_cb_base : zll::ll_base< _async_arena_cb_base >
3614{
3615 virtual void start_destroy() = 0;
3616 virtual void finish_cleanup() = 0;
3617
3618protected:
3619 ~_async_arena_cb_base() = default;
3620};
3621
3622struct _async_arena_core_base : _schedulable
3623{
3624 explicit _async_arena_core_base( task_core& core ) noexcept
3625 : _core( core )
3626 {
3627 }
3628
3629 void _enqueue_destroy( _async_arena_cb_base& cb ) noexcept
3630 {
3631 _destroy_queue.link_back( cb );
3632 if ( !_active && zll::detached( static_cast< _schedulable& >( *this ) ) )
3633 _core.reschedule( *this );
3634 }
3635
3636 void _on_destroy_complete() noexcept
3637 {
3638 ECOR_ASSERT( _active );
3639 ECOR_ASSERT( zll::detached( static_cast< _schedulable& >( *this ) ) );
3640 _core.reschedule( *this );
3641 }
3642
3643 void resume() override
3644 {
3645 if ( _active ) {
3646 _active->finish_cleanup();
3647 _active = nullptr;
3648 }
3649
3650 if ( !_destroy_queue.empty() ) {
3651 _active = &_destroy_queue.take_front();
3652 _active->start_destroy();
3653 } else if ( _stopped && _alive_list.empty() ) {
3654 _done_src.set_value();
3655 }
3656 }
3657
3658 task_core& _core;
3659 zll::ll_list< _async_arena_cb_base > _alive_list;
3660 zll::ll_list< _async_arena_cb_base > _destroy_queue;
3661 _async_arena_cb_base* _active = nullptr;
3662 bool _stopped = false;
3663 broadcast_source< set_value_t() > _done_src;
3664
3665 struct _destroy_receiver
3666 {
3667 using receiver_concept = receiver_t;
3668
3669 _async_arena_core_base* _core;
3670
3671 void set_value( auto&&... ) noexcept
3672 {
3673 _core->_on_destroy_complete();
3674 }
3675
3676 void set_error( auto&& ) noexcept
3677 {
3678 _core->_on_destroy_complete();
3679 }
3680
3681 void set_stopped() noexcept
3682 {
3683 _core->_on_destroy_complete();
3684 }
3685 };
3686};
3687
3688struct _async_arena_cb_counted : _async_arena_cb_base
3689{
3690 explicit _async_arena_cb_counted( _async_arena_core_base& arena ) noexcept
3691 : _arena( arena )
3692 {
3693 }
3694
3695 void add_ref() noexcept
3696 {
3697 ECOR_ASSERT( _refs > 0 );
3698 ECOR_ASSERT( _state == _cb_state::alive );
3699 ++_refs;
3700 }
3701
3702 void release() noexcept
3703 {
3704 ECOR_ASSERT( _refs > 0 );
3705 if ( --_refs == 0 ) {
3706 _state = _cb_state::queued;
3707 _arena._enqueue_destroy( *this );
3708 }
3709 }
3710
3711 _async_arena_core_base& _arena;
3712 uint32_t _refs = 1;
3713 _cb_state _state = _cb_state::alive;
3714
3715protected:
3716 ~_async_arena_cb_counted() = default;
3717};
3718
3719template < typename Ctx, typename Mem >
3720struct async_arena;
3721
3722template < typename Ctx, typename Mem >
3723struct _async_arena_core : _async_arena_core_base
3724{
3725 _async_arena_core( Ctx& ctx, Mem& mem ) noexcept
3726 : _async_arena_core_base( get_task_core( ctx ) )
3727 , _ctx( ctx )
3728 , _mem( mem )
3729 {
3730 }
3731
3732 Ctx& _ctx;
3733 Mem& _mem;
3734};
3735
3736template < typename T, typename Ctx, typename Mem >
3737struct _async_arena_control_block final : _async_arena_cb_counted
3738{
3739 using _destroy_sender_t =
3740 decltype( ecor::async_destroy( std::declval< Ctx& >(), std::declval< T& >() ) );
3741
3742 using _destroy_receiver = _async_arena_core_base::_destroy_receiver;
3743 using _destroy_op_t = connect_type< _destroy_sender_t, _destroy_receiver >;
3744
3745 _destroy_op_t* _destroy_op = nullptr;
3746 T _object;
3747
3748 template < typename... Args >
3749 _async_arena_control_block( _async_arena_core< Ctx, Mem >& arena, Args&&... a )
3750 : _async_arena_cb_counted( arena )
3751 , _object( (Args&&) a... )
3752 {
3753 }
3754
3755 void start_destroy() override
3756 {
3757 ECOR_ASSERT( _state == _cb_state::queued );
3758 _state = _cb_state::destroying;
3759
3760 auto& arena = static_cast< _async_arena_core< Ctx, Mem >& >( _arena );
3761
3762 using op_t = _destroy_op_t;
3763
3764 auto s = ecor::async_destroy( arena._ctx, _object );
3765 void* p = ecor::allocate( arena._mem, sizeof( op_t ), alignof( op_t ) );
3766 ECOR_ASSERT( p );
3767 _destroy_op =
3768 ::new ( p ) op_t( std::move( s ).connect( _destroy_receiver{ &_arena } ) );
3769 _destroy_op->start();
3770 }
3771
3772 void finish_cleanup() override
3773 {
3774 auto& arena = static_cast< _async_arena_core< Ctx, Mem >& >( _arena );
3775
3776 using op_t = _destroy_op_t;
3777 _destroy_op->~op_t();
3778 ecor::deallocate( arena._mem, _destroy_op, sizeof( op_t ), alignof( op_t ) );
3779 _destroy_op = nullptr;
3780 auto& mem = arena._mem;
3781 this->~_async_arena_control_block();
3782 ecor::deallocate(
3783 mem,
3784 this,
3785 sizeof( _async_arena_control_block ),
3786 alignof( _async_arena_control_block ) );
3787 }
3788};
3789
3796template < typename T, typename Ctx, typename Mem >
3798{
3799 using _cb_t = _async_arena_control_block< T, Ctx, Mem >;
3800
3802 async_ptr() noexcept = default;
3803
3805 async_ptr( std::nullptr_t ) noexcept
3806 {
3807 }
3808
3810 async_ptr( async_ptr const& o ) noexcept
3811 : _cb( o._cb )
3812 {
3813 if ( _cb )
3814 _cb->add_ref();
3815 }
3816
3819 async_ptr( async_ptr&& o ) noexcept
3820 : _cb( o._cb )
3821 {
3822 o._cb = nullptr;
3823 }
3824
3828 async_ptr& operator=( async_ptr const& o ) noexcept
3829 {
3830 if ( this != &o ) {
3831 _reset();
3832 _cb = o._cb;
3833 if ( _cb )
3834 _cb->add_ref();
3835 }
3836 return *this;
3837 }
3838
3843 {
3844 if ( this != &o ) {
3845 _reset();
3846 _cb = o._cb;
3847 o._cb = nullptr;
3848 }
3849 return *this;
3850 }
3851
3855 {
3856 _reset();
3857 }
3858
3860 explicit operator bool() const noexcept
3861 {
3862 return _cb != nullptr;
3863 }
3864
3867 T& operator*() const noexcept
3868 {
3869 ECOR_ASSERT( _cb );
3870 return _cb->_object;
3871 }
3872
3875 T* operator->() const noexcept
3876 {
3877 ECOR_ASSERT( _cb );
3878 return &_cb->_object;
3879 }
3880
3882 T* get() const noexcept
3883 {
3884 return _cb ? &_cb->_object : nullptr;
3885 }
3886
3889 void reset() noexcept
3890 {
3891 _reset();
3892 }
3893
3897 friend bool operator==( async_ptr const& a, async_ptr const& b ) noexcept
3898 {
3899 return a._cb == b._cb;
3900 }
3901
3905 friend bool operator!=( async_ptr const& a, async_ptr const& b ) noexcept
3906 {
3907 return a._cb != b._cb;
3908 }
3909
3912 friend bool operator==( async_ptr const& a, std::nullptr_t ) noexcept
3913 {
3914 return a._cb == nullptr;
3915 }
3916
3919 friend bool operator!=( async_ptr const& a, std::nullptr_t ) noexcept
3920 {
3921 return a._cb != nullptr;
3922 }
3923
3924private:
3925 template < typename, typename >
3926 friend struct async_arena;
3927
3928 explicit async_ptr( _cb_t* cb ) noexcept
3929 : _cb( cb )
3930 {
3931 }
3932
3933 void _reset() noexcept
3934 {
3935 if ( _cb ) {
3936 _cb->release();
3937 _cb = nullptr;
3938 }
3939 }
3940
3941 _cb_t* _cb = nullptr;
3942};
3943
3969template < typename Ctx, typename Mem >
3971{
3973 async_arena( Ctx& ctx, Mem& mem ) noexcept
3974 : _core( ctx, mem )
3975 {
3976 }
3977
3978 async_arena( async_arena const& ) = delete;
3979 async_arena& operator=( async_arena const& ) = delete;
3980 async_arena( async_arena&& ) = delete;
3981 async_arena& operator=( async_arena&& ) = delete;
3982
3985 template < typename T, typename... Args >
3987 {
3988 ECOR_ASSERT( !_core._stopped );
3989 using cb_t = _async_arena_control_block< T, Ctx, Mem >;
3990 void* p = ecor::allocate( _core._mem, sizeof( cb_t ), alignof( cb_t ) );
3991 if ( !p )
3992 return { nullptr };
3993 auto* cb = ::new ( p ) cb_t( _core, (Args&&) args... );
3994 _core._alive_list.link_back( *cb );
3995 return async_ptr< T, Ctx, Mem >{ cb };
3996 }
3997
4000 _ll_sender< set_value_t() > async_destroy() noexcept
4001 {
4002 _core._stopped = true;
4003 if ( _core._alive_list.empty() && _core._destroy_queue.empty() &&
4004 zll::detached( static_cast< _schedulable& >( _core ) ) && !_core._active )
4005 _core._core.reschedule( _core );
4006 return _core._done_src.schedule();
4007 }
4008
4009private:
4010 _async_arena_core< Ctx, Mem > _core;
4011};
4012
4015
4016template < typename T >
4017struct _trnx_vtable_mixin;
4018
4019template < signature... S >
4020 requires( !_contains_type< set_stopped_t(), S... >::value )
4021struct _trnx_vtable_mixin< completion_signatures< S... > >
4022{
4023 using type = _vtable_mixin< S... >;
4024};
4025
4026template < signature... S >
4027 requires( _contains_type< set_stopped_t(), S... >::value )
4028struct _trnx_vtable_mixin< completion_signatures< S... > >
4029{
4030 using type = _vtable_mixin< S..., _get_stopped_t() >;
4031};
4032
4033template < typename T >
4034using _trnx_vtable_mixin_t =
4035 typename _trnx_vtable_mixin< _sender_completions_t< T, empty_env > >::type;
4036
4043template < typename T >
4044struct trnx_entry : _trnx_vtable_mixin_t< T >, zll::ll_base< trnx_entry< T > >
4045{
4046 using base = _trnx_vtable_mixin_t< T >;
4047 using _vtable = typename base::_vtable;
4048 static constexpr bool _has_stop_sig =
4049 _sigs_contains_set_stopped< _sender_completions_t< T, empty_env > >;
4050
4051 T data;
4052
4055 template < typename Derived >
4056 trnx_entry( _tag< Derived >, T data )
4057 : base( _tag< Derived >{} )
4058 , data( std::move( data ) )
4059 {
4060 }
4061
4062 template < typename... Args >
4063 void set_value( Args&&... args )
4064 {
4065 this->_set_value( (Args&&) args... );
4066 }
4067
4068 template < typename E >
4069 void set_error( E&& e )
4070 {
4071 this->_set_error( (E&&) e );
4072 }
4073
4074 void set_stopped()
4075 {
4076 this->_set_stopped();
4077 }
4078
4079 bool get_stopped() const noexcept
4080 {
4081 return this->get_stopped();
4082 }
4083};
4084template < typename T, typename R >
4085struct _trnx_controller_op : trnx_entry< T >, R
4086{
4087 using _vtable = typename trnx_entry< T >::_vtable;
4088 static constexpr bool _has_stop_sig = trnx_entry< T >::_has_stop_sig;
4089
4090 using R::set_error;
4091 using R::set_stopped;
4092 using R::set_value;
4093
4094 _trnx_controller_op( T data, R r, zll::ll_list< trnx_entry< T > >& pending )
4095 : trnx_entry< T >{ _tag< _trnx_controller_op >{}, std::move( data ) }
4096 , R( std::move( r ) )
4097 , _pending( pending )
4098 {
4099 }
4100
4101 void start()
4102 {
4103 _pending.link_front( *this );
4104 }
4105
4106 [[nodiscard]] bool get_stopped() const noexcept
4107 {
4108 if constexpr ( _has_stop_sig ) {
4109 bool res = get_stop_token( get_env( (R&) *this ) ).stop_requested();
4110 return res;
4111 } else
4112 return false;
4113 }
4114
4115private:
4116 zll::ll_list< trnx_entry< T > >& _pending;
4117};
4118
4119template < typename T >
4120struct _trnx_controller_sender
4121{
4122 using sender_concept = sender_t;
4123
4124 template < typename Env >
4125 constexpr auto get_completion_signatures( Env&& e ) noexcept
4126 {
4127 return ecor::get_completion_signatures( val, (Env&&) e );
4128 }
4129
4130 template < receiver R >
4131 auto connect( R receiver ) &&
4132 {
4133 static_assert(
4134 receiver_for< R, _trnx_controller_sender >,
4135 "Receiver does not satisfy the requirements for the transaction sender's completion signatures" );
4136
4137 return _trnx_controller_op< T, R >{
4138 std::move( val ), std::move( receiver ), _pending };
4139 }
4140
4141 T val;
4142 zll::ll_list< trnx_entry< T > >& _pending;
4143};
4144
4156template < typename T, size_t N >
4158{
4159 static_assert( std::has_single_bit( N ), "Size of circular buffer must be a power of 2" );
4160 static_assert(
4161 N < std::numeric_limits< uint16_t >::max(),
4162 "Size of circular buffer must be less than 65536" );
4163 static_assert( std::is_trivial_v< T > );
4164
4165 bool full() const noexcept
4166 {
4167 return ( enqueue - deliver ) == N;
4168 }
4169
4170 void push( T val ) noexcept
4171 {
4172 _data[enqueue++ % N] = val;
4173 }
4174
4175 bool empty() const noexcept
4176 {
4177 return deliver == enqueue;
4178 }
4179
4180 bool has_tx() const noexcept
4181 {
4182 return tx != enqueue;
4183 }
4184
4185 T& tx_front() noexcept
4186 {
4187 return _data[tx % N];
4188 }
4189
4190 void tx_done() noexcept
4191 {
4192 tx++;
4193 }
4194
4195 bool has_rx() const noexcept
4196 {
4197 return rx != tx;
4198 }
4199
4200 T& rx_front() noexcept
4201 {
4202 return _data[rx % N];
4203 }
4204
4205 void rx_done() noexcept
4206 {
4207 rx++;
4208 }
4209
4210 bool has_deliver() const noexcept
4211 {
4212 return deliver != rx;
4213 }
4214
4215 T& deliver_front() noexcept
4216 {
4217 return _data[deliver % N];
4218 }
4219
4220 void pop() noexcept
4221 {
4222 deliver++;
4223 }
4224
4225 std::atomic< uint16_t > deliver = 0;
4226 std::atomic< uint16_t > rx = 0;
4227 std::atomic< uint16_t > tx = 0;
4228 std::atomic< uint16_t > enqueue = 0;
4229
4230private:
4231 T _data[N];
4232};
4233
4264template < typename T >
4266{
4267 using sender_type = _trnx_controller_sender< T >;
4268 static constexpr bool _has_stop_sig = trnx_entry< T >::_has_stop_sig;
4269
4272 sender_type schedule( T val )
4273 {
4274 return { std::move( val ), _pending_tx };
4275 }
4276
4282 {
4283 if constexpr ( _has_stop_sig ) {
4284 _pending_tx.remove_if( []( trnx_entry< T >& tx ) {
4285 if ( !tx._get_stopped() )
4286 return false;
4287
4288 tx._set_stopped();
4289 return true;
4290 } );
4291 }
4292 if ( _pending_tx.empty() )
4293 return nullptr;
4294 return &_pending_tx.take_back();
4295 }
4296
4297private:
4298 zll::ll_list< trnx_entry< T > > _pending_tx;
4299};
4300
4301} // namespace ecor
Helper concept to check if all set_value signatures in a sender's completion signatures are singular,...
Definition: ecor.hpp:228
Type is memory resource if it can be used with allocate and deallocate CPOs.
Definition: ecor.hpp:306
Concept for types that can be used as queryable objects in the environment.
Definition: ecor.hpp:822
Concept for receivers that can handle an explicitly listed set of completion signatures.
Definition: ecor.hpp:909
Concept for receivers that can handle a specific sender's completion signatures.
Definition: ecor.hpp:899
Concept for receivers, requires that the member type receiver_concept derives from receiver_t,...
Definition: ecor.hpp:838
Concept for senders, requires that the member type sneder_concept derives from sender_t,...
Definition: ecor.hpp:828
Type is isgnature if it is one of the valid completion signatures (set_value, set_error,...
Definition: ecor.hpp:196
Concept for stoppable sources, requires that the type has get_token, stop_possible,...
Definition: ecor.hpp:1303
Concept for stoppable tokens, requires that the type has stop_requested and stop_possible member func...
Definition: ecor.hpp:1285
Concept for the task configuration.
Definition: ecor.hpp:2342
Concept for the task context.
Definition: ecor.hpp:2325
Concept for unstoppable tokens, which are a special case of stoppable tokens that cannot be stopped.
Definition: ecor.hpp:1294
Async arena.
Definition: ecor.hpp:3971
_ll_sender< set_value_t() > async_destroy() noexcept
Signal that no new objects will be created and return a sender that completes with set_value_t() when...
Definition: ecor.hpp:4000
async_arena(Ctx &ctx, Mem &mem) noexcept
Construct an async arena with the provided context and memory resource.
Definition: ecor.hpp:3973
async_ptr< T, Ctx, Mem > make(Args &&... args)
Create a new managed object of type T.
Definition: ecor.hpp:3986
Reference-counted smart pointer to an object managed by an async_arena.
Definition: ecor.hpp:3798
async_ptr & operator=(async_ptr &&o) noexcept
Move assignment operator: release the current object (if any), transfer the control block pointer fro...
Definition: ecor.hpp:3842
T * get() const noexcept
Get the raw pointer to the managed object, or nullptr if the async_ptr is null.
Definition: ecor.hpp:3882
friend bool operator!=(async_ptr const &a, async_ptr const &b) noexcept
Comparison operators compare the control block pointers for inequality.
Definition: ecor.hpp:3905
friend bool operator!=(async_ptr const &a, std::nullptr_t) noexcept
Comparison operators with nullptr compare the control block pointer to null.
Definition: ecor.hpp:3919
async_ptr(async_ptr const &o) noexcept
Copy constructor and copy assignment operator increment the reference count.
Definition: ecor.hpp:3810
friend bool operator==(async_ptr const &a, std::nullptr_t) noexcept
Comparison operators with nullptr compare the control block pointer to null.
Definition: ecor.hpp:3912
async_ptr() noexcept=default
Default constructor creates a null async_ptr.
async_ptr & operator=(async_ptr const &o) noexcept
Copy assignment operator: release the current object (if any), copy the control block pointer from th...
Definition: ecor.hpp:3828
T & operator*() const noexcept
Dereference operators return the managed object.
Definition: ecor.hpp:3867
~async_ptr()
Destructor releases the current object (if any) by decrementing the reference count and initiating as...
Definition: ecor.hpp:3854
friend bool operator==(async_ptr const &a, async_ptr const &b) noexcept
Comparison operators compare the control block pointers for equality.
Definition: ecor.hpp:3897
async_ptr(async_ptr &&o) noexcept
Move constructor and move assignment operator transfer ownership without modifying the reference coun...
Definition: ecor.hpp:3819
void reset() noexcept
Reset the async_ptr to null, releasing the current object (if any) by decrementing the reference coun...
Definition: ecor.hpp:3889
T * operator->() const noexcept
Arrow operator returns a pointer to the managed object.
Definition: ecor.hpp:3875
Broadcast source that implements a scheduler that allows multiple receivers to be scheduled with the ...
Definition: ecor.hpp:1711
_ll_sender< S... > schedule() noexcept
Schedule a new receiver with this source.
Definition: ecor.hpp:1715
void set_value(Args &&... args)
Send a set_value signal to all scheduled receivers.
Definition: ecor.hpp:1727
void set_error(E &&err)
Send a set_error signal to all scheduled receivers.
Definition: ecor.hpp:1744
void set_stopped()
Send a set_stopped signal to all scheduled receivers.
Definition: ecor.hpp:1758
Allocator that uses reference to circular_buffer_memory for allocating memory.
Definition: ecor.hpp:681
circular_buffer_allocator(circular_buffer_allocator< U, IndexType, Base > const &other) noexcept
Template copy constructor, allows copying from an allocator of a different type.
Definition: ecor.hpp:707
circular_buffer_allocator(circular_buffer_allocator const &) noexcept=default
Copy constructor, allows copying the allocator.
void deallocate(T *ptr, std::size_t n)
Deallocate memory previously allocated by allocate().
Definition: ecor.hpp:738
circular_buffer_allocator(circular_buffer_memory< IndexType, Base > &buffer) noexcept
Construct an allocator that uses the given circular_buffer_memory for allocations.
Definition: ecor.hpp:694
bool operator!=(circular_buffer_allocator const &other) const noexcept
Inequality comparison operator, defined in terms of operator==.
Definition: ecor.hpp:752
T * allocate(std::size_t n)
Allocate memory for n objects of type T, returns pointer to the allocated memory.
Definition: ecor.hpp:720
bool operator==(circular_buffer_allocator const &other) const noexcept
Equality comparison operators, allocators are equal if they refer to the same circular_buffer_memory.
Definition: ecor.hpp:746
Internal node structure for tracking allocated blocks.
Definition: ecor.hpp:538
Circular buffer memory resource, manages a provided memory block as a circular buffer for dynamic all...
Definition: ecor.hpp:398
std::size_t used_bytes() const noexcept
Get total used bytes in the buffer.
Definition: ecor.hpp:520
std::size_t capacity() const noexcept
Get total capacity of the buffer in bytes.
Definition: ecor.hpp:514
uspan< T > make_span(std::size_t n)
Allocate and construct an array of type T with given size, returning a unique span that manages the a...
Definition: ecor.hpp:467
std::unique_ptr< T, _deleter > uptr
Unique pointer type for objects allocated from this memory resource, with automatic deallocation.
Definition: ecor.hpp:436
void * allocate(std::size_t bytes, std::size_t align) noexcept
Allocate bytes with align, returns nullptr if no space is available.
Definition: ecor.hpp:492
void deallocate(void *p, std::size_t bytes, std::size_t align) noexcept
Deallocate pointer previously allocated by allocate()
Definition: ecor.hpp:499
static constexpr index_type npos
Special value indicating no position.
Definition: ecor.hpp:530
uptr< T > make(Args &&... args)
Allocate and construct an object of type T with given arguments, returning a unique pointer that mana...
Definition: ecor.hpp:446
IndexType index_type
Type of the index used for tracking allocations.
Definition: ecor.hpp:404
uspan< T, Extent > make_span()
Overload for fixed-size arrays, deduces the size from the template parameter.
Definition: ecor.hpp:479
void deallocate(void *p) noexcept
Deallocate pointer previously allocated by allocate(), overload without size and alignment.
Definition: ecor.hpp:508
Type container for completion signatures, used to specify the set of possible completion signals.
Definition: ecor.hpp:125
Standard empty environment.
Definition: ecor.hpp:143
FIFO source that implements a scheduler that allows multiple receivers to be scheduled with the same ...
Definition: ecor.hpp:1794
_ll_sender< S... > schedule() noexcept
Schedule a new sender with this scheduler.
Definition: ecor.hpp:1799
void set_stopped()
Send a set_stopped signal to the front scheduled receiver.
Definition: ecor.hpp:1842
void set_error(E1 &&err)
Send a set_error signal to the front scheduled receiver.
Definition: ecor.hpp:1828
void set_value(V &&... value)
Send a set_value signal to the front scheduled receiver.
Definition: ecor.hpp:1811
Definition: ecor.hpp:2284
Definition: ecor.hpp:1519
Definition: ecor.hpp:2273
In-place stop callback, which is a callback that can be registered with an inplace_stop_token to be i...
Definition: ecor.hpp:1455
In-place stop source, which is a simple implementation of a stoppable source that is designed to be u...
Definition: ecor.hpp:1332
bool stop_requested() const noexcept
Check if a stop has been requested.
Definition: ecor.hpp:1352
bool stop_possible() const noexcept
Check if stopping is possible.
Definition: ecor.hpp:1345
bool request_stop()
Request a stop.
Definition: ecor.hpp:1360
Definition: ecor.hpp:1387
friend bool operator==(inplace_stop_token const &lhs, inplace_stop_token const &rhs) noexcept
Equality comparison operators, two inplace_stop_tokens are equal if they are associated with the same...
Definition: ecor.hpp:1409
friend bool operator!=(inplace_stop_token const &lhs, inplace_stop_token const &rhs) noexcept
Inequality comparison operator, defined in terms of operator==.
Definition: ecor.hpp:1416
constexpr bool stop_possible() const noexcept
Check if stopping is possible.
Definition: ecor.hpp:1401
bool stop_requested() const noexcept
Check if a stop has been requested.
Definition: ecor.hpp:1393
Definition: ecor.hpp:1496
Never stop token, which is a simple implementation of an unstoppable token that can be used as a defa...
Definition: ecor.hpp:1494
Empty base class for base-class customization point.
Definition: ecor.hpp:148
Operation state concept tag, used for marking types as operation states.
Definition: ecor.hpp:98
Receiver concept tag, used for marking types as receivers.
Definition: ecor.hpp:93
Sender concept tag, used for marking types as senders.
Definition: ecor.hpp:88
Keyed source that implements a scheduler that allows multiple receivers to be scheduled with the same...
Definition: ecor.hpp:1873
void set_stopped()
Send a set_stopped signal to the scheduled receiver with the smallest key.
Definition: ecor.hpp:1924
_sh_sender< K, S... > schedule(K key) noexcept(noexcept(_sh_sender< K, S... >{ key, _sh }))
Schedule a new sender with this scheduler, using the provided key for ordering.
Definition: ecor.hpp:1880
bool empty() const
Check if there are any scheduled receivers.
Definition: ecor.hpp:1938
void set_value(V &&... value)
Send a set_value signal to the scheduled receiver with the smallest key.
Definition: ecor.hpp:1892
void set_error(E1 &&err)
Send a set_error signal to the scheduled receiver with the smallest key.
Definition: ecor.hpp:1909
_sh_entry< K, S... > const & front() const
Get a reference to the scheduled receiver with the smallest key.
Definition: ecor.hpp:1947
Type tag for set_error completion signal.
Definition: ecor.hpp:108
Type tag for set_stopped completion signal.
Definition: ecor.hpp:113
Type tag for set_value completion signal.
Definition: ecor.hpp:103
A minimal environment that carries a stop token and satisfies get_stop_token queries.
Definition: ecor.hpp:1541
Maintains a list of ready tasks that can be resumed.
Definition: ecor.hpp:2032
void reschedule(_schedulable &op) noexcept
Reschedule a task by adding it's promise to the list of ready tasks.
Definition: ecor.hpp:2057
void run_n(std::size_t n)
Run up to n ready tasks.
Definition: ecor.hpp:2048
bool run_once()
Run a single ready task if there is one.
Definition: ecor.hpp:2036
Basic task context that provides the task with necessary resources: task_core and task_memory_resourc...
Definition: ecor.hpp:2301
Task can be configured by passing type with configuration information. This is default value.
Definition: ecor.hpp:2332
Owns a task<void, CFG> that restarts automatically, it is provided with a factory to create new task ...
Definition: ecor.hpp:3442
Task memory resource that provides allocation and deallocation functions for tasks.
Definition: ecor.hpp:2244
Task type that represents an asynchronous operation that is implemented as a coroutine and is a sende...
Definition: ecor.hpp:2726
auto connect(R receiver) &&
Connect the task to a receiver.
Definition: ecor.hpp:2775
task(std::coroutine_handle< promise_type > handle, task_error error=task_error::none) noexcept
Constructor used by promise_type for coroutine creation.
Definition: ecor.hpp:2742
ISR-safe 4-cursor circular buffer for managing in-flight transactions.
Definition: ecor.hpp:4158
Source for scheduling transactions in a request-reply protocol (e.g.
Definition: ecor.hpp:4266
sender_type schedule(T val)
Schedule a new transaction with the given user data.
Definition: ecor.hpp:4272
trnx_entry< T > * query_next_trnx()
Retrieve the next pending transaction entry for processing by the driver.
Definition: ecor.hpp:4281
Type-erased linked-list node for a pending transaction.
Definition: ecor.hpp:4045
trnx_entry(_tag< Derived >, T data)
Construct a transaction entry with the given data.
Definition: ecor.hpp:4056
Unique span with custom deleter, useful for managing memory from custom memory resources.
Definition: ecor.hpp:320
std::span< T, Extent > release() noexcept
Release ownership of the managed span and return it.
Definition: ecor.hpp:359
unique_span & operator=(unique_span &&other) noexcept
Move assignment operator - transfers ownership from other to this unique_span.
Definition: ecor.hpp:345
unique_span(T *data, std::size_t size, Deleter deleter=Deleter{}) noexcept(std::is_nothrow_move_constructible_v< Deleter >)
Construct a unique_span with given data, size, and deleter.
Definition: ecor.hpp:326
~unique_span()
Destructor - calls the deleter on the managed span if it is not empty.
Definition: ecor.hpp:368
unique_span(unique_span &&other) noexcept
Move constructor - transfers ownership from other to this unique_span.
Definition: ecor.hpp:337
Thin wrapper over error value, used to tag value as an error for ecor library.
Definition: ecor.hpp:160