include/boost/corosio/native/detail/epoll/epoll_tcp_service.hpp

80.0% Lines (48/60) 86.7% List of functions (13/15)
epoll_tcp_service.hpp
f(x) Functions (15)
Function Calls Lines Blocks
boost::corosio::detail::epoll_tcp_service::epoll_tcp_service(boost::capy::execution_context&) :99 334x 100.0% 100.0% boost::corosio::detail::epoll_connect_op::cancel() :115 0 0.0% 0.0% boost::corosio::detail::epoll_read_op::cancel() :124 97x 80.0% 75.0% boost::corosio::detail::epoll_write_op::cancel() :133 0 0.0% 0.0% boost::corosio::detail::epoll_op::operator()() :142 44107x 100.0% 100.0% boost::corosio::detail::epoll_connect_op::operator()() :148 3765x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::epoll_tcp_socket(boost::corosio::detail::epoll_tcp_service&) :153 11358x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::~epoll_tcp_socket() :158 11358x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::connect(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::endpoint, std::stop_token, std::error_code*) :161 3765x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::read_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :172 110011x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::write_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :184 109861x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::cancel() :196 94x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::close_socket() :202 34049x 100.0% 100.0% boost::corosio::detail::epoll_tcp_service::open_socket(boost::corosio::tcp_socket::implementation&, int, int, int) :208 3785x 94.4% 94.0% boost::corosio::detail::epoll_tcp_service::bind_socket(boost::corosio::tcp_socket::implementation&, boost::corosio::endpoint) :240 6x 100.0% 100.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/detail/tcp_service.hpp>
19
20 #include <boost/corosio/native/detail/epoll/epoll_tcp_socket.hpp>
21 #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
22 #include <boost/corosio/native/detail/reactor/reactor_socket_service.hpp>
23
24 #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
25
26 #include <coroutine>
27
28 #include <errno.h>
29 #include <netinet/in.h>
30 #include <netinet/tcp.h>
31 #include <sys/epoll.h>
32 #include <sys/socket.h>
33 #include <unistd.h>
34
35 /*
36 epoll Socket Implementation
37 ===========================
38
39 Each I/O operation follows the same pattern:
40 1. Try the syscall immediately (non-blocking socket)
41 2. If it succeeds or fails with a real error, post to completion queue
42 3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
43
44 This "try first" approach avoids unnecessary epoll round-trips for
45 operations that can complete immediately (common for small reads/writes
46 on fast local connections).
47
48 One-Shot Registration
49 ---------------------
50 We use one-shot epoll registration: each operation registers, waits for
51 one event, then unregisters. This simplifies the state machine since we
52 don't need to track whether an fd is currently registered or handle
53 re-arming. The tradeoff is slightly more epoll_ctl calls, but the
54 simplicity is worth it.
55
56 Cancellation
57 ------------
58 See op.hpp for the completion/cancellation race handling via the
59 `registered` atomic. cancel() must complete pending operations (post
60 them with cancelled flag) so coroutines waiting on them can resume.
61 close_socket() calls cancel() first to ensure this.
62
63 Impl Lifetime with shared_ptr
64 -----------------------------
65 Socket impls use enable_shared_from_this. The service owns impls via
66 shared_ptr maps (impl_ptrs_) keyed by raw pointer for O(1) lookup and
67 removal. When a user calls close(), we call cancel() which posts pending
68 ops to the scheduler.
69
70 CRITICAL: The posted ops must keep the impl alive until they complete.
71 Otherwise the scheduler would process a freed op (use-after-free). The
72 cancel() method captures shared_from_this() into op.impl_ptr before
73 posting. When the op completes, impl_ptr is cleared, allowing the impl
74 to be destroyed if no other references exist.
75
76 Service Ownership
77 -----------------
78 epoll_tcp_service owns all socket impls. destroy_impl() removes the
79 shared_ptr from the map, but the impl may survive if ops still hold
80 impl_ptr refs. shutdown() closes all sockets and clears the map; any
81 in-flight ops will complete and release their refs.
82 */
83
84 namespace boost::corosio::detail {
85
86 /** epoll TCP service implementation.
87
88 Inherits from tcp_service to enable runtime polymorphism.
89 Uses key_type = tcp_service for service lookup.
90 */
91 class BOOST_COROSIO_DECL epoll_tcp_service final
92 : public reactor_socket_service<
93 epoll_tcp_service,
94 tcp_service,
95 epoll_scheduler,
96 epoll_tcp_socket>
97 {
98 public:
99 334x explicit epoll_tcp_service(capy::execution_context& ctx)
100 334x : reactor_socket_service(ctx)
101 {
102 334x }
103
104 std::error_code open_socket(
105 tcp_socket::implementation& impl,
106 int family,
107 int type,
108 int protocol) override;
109
110 std::error_code
111 bind_socket(tcp_socket::implementation& impl, endpoint ep) override;
112 };
113
114 inline void
115 epoll_connect_op::cancel() noexcept
116 {
117 if (socket_impl_)
118 socket_impl_->cancel_single_op(*this);
119 else
120 request_cancel();
121 }
122
123 inline void
124 97x epoll_read_op::cancel() noexcept
125 {
126 97x if (socket_impl_)
127 97x socket_impl_->cancel_single_op(*this);
128 else
129 request_cancel();
130 97x }
131
132 inline void
133 epoll_write_op::cancel() noexcept
134 {
135 if (socket_impl_)
136 socket_impl_->cancel_single_op(*this);
137 else
138 request_cancel();
139 }
140
141 inline void
142 44107x epoll_op::operator()()
143 {
144 44107x complete_io_op(*this);
145 44107x }
146
147 inline void
148 3765x epoll_connect_op::operator()()
149 {
150 3765x complete_connect_op(*this);
151 3765x }
152
153 11358x inline epoll_tcp_socket::epoll_tcp_socket(epoll_tcp_service& svc) noexcept
154 11358x : reactor_stream_socket(svc)
155 {
156 11358x }
157
158 11358x inline epoll_tcp_socket::~epoll_tcp_socket() = default;
159
160 inline std::coroutine_handle<>
161 3765x epoll_tcp_socket::connect(
162 std::coroutine_handle<> h,
163 capy::executor_ref ex,
164 endpoint ep,
165 std::stop_token token,
166 std::error_code* ec)
167 {
168 3765x return do_connect(h, ex, ep, token, ec);
169 }
170
171 inline std::coroutine_handle<>
172 110011x epoll_tcp_socket::read_some(
173 std::coroutine_handle<> h,
174 capy::executor_ref ex,
175 buffer_param param,
176 std::stop_token token,
177 std::error_code* ec,
178 std::size_t* bytes_out)
179 {
180 110011x return do_read_some(h, ex, param, token, ec, bytes_out);
181 }
182
183 inline std::coroutine_handle<>
184 109861x epoll_tcp_socket::write_some(
185 std::coroutine_handle<> h,
186 capy::executor_ref ex,
187 buffer_param param,
188 std::stop_token token,
189 std::error_code* ec,
190 std::size_t* bytes_out)
191 {
192 109861x return do_write_some(h, ex, param, token, ec, bytes_out);
193 }
194
195 inline void
196 94x epoll_tcp_socket::cancel() noexcept
197 {
198 94x do_cancel();
199 94x }
200
201 inline void
202 34049x epoll_tcp_socket::close_socket() noexcept
203 {
204 34049x do_close_socket();
205 34049x }
206
207 inline std::error_code
208 3785x epoll_tcp_service::open_socket(
209 tcp_socket::implementation& impl, int family, int type, int protocol)
210 {
211 3785x auto* epoll_impl = static_cast<epoll_tcp_socket*>(&impl);
212 3785x epoll_impl->close_socket();
213
214 3785x int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
215 3785x if (fd < 0)
216 return make_err(errno);
217
218 3785x if (family == AF_INET6)
219 {
220 6x int one = 1;
221 6x ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
222 }
223
224 3785x epoll_impl->fd_ = fd;
225
226 // Register fd with epoll (edge-triggered mode)
227 3785x epoll_impl->desc_state_.fd = fd;
228 {
229 3785x std::lock_guard lock(epoll_impl->desc_state_.mutex);
230 3785x epoll_impl->desc_state_.read_op = nullptr;
231 3785x epoll_impl->desc_state_.write_op = nullptr;
232 3785x epoll_impl->desc_state_.connect_op = nullptr;
233 3785x }
234 3785x scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
235
236 3785x return {};
237 }
238
239 inline std::error_code
240 6x epoll_tcp_service::bind_socket(
241 tcp_socket::implementation& impl, endpoint ep)
242 {
243 6x return static_cast<epoll_tcp_socket*>(&impl)->do_bind(ep);
244 }
245
246 } // namespace boost::corosio::detail
247
248 #endif // BOOST_COROSIO_HAS_EPOLL
249
250 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
251