1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_SERVICE_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_SERVICE_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_SELECT
15  
#if BOOST_COROSIO_HAS_SELECT
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/corosio/detail/tcp_service.hpp>
18  
#include <boost/corosio/detail/tcp_service.hpp>
19  

19  

20  
#include <boost/corosio/native/detail/select/select_tcp_socket.hpp>
20  
#include <boost/corosio/native/detail/select/select_tcp_socket.hpp>
21  
#include <boost/corosio/native/detail/select/select_scheduler.hpp>
21  
#include <boost/corosio/native/detail/select/select_scheduler.hpp>
22  
#include <boost/corosio/native/detail/reactor/reactor_socket_service.hpp>
22  
#include <boost/corosio/native/detail/reactor/reactor_socket_service.hpp>
23  

23  

24  
#include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
24  
#include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
25  

25  

26  
#include <coroutine>
26  
#include <coroutine>
27  
#include <mutex>
27  
#include <mutex>
28  

28  

29  
#include <errno.h>
29  
#include <errno.h>
30  
#include <fcntl.h>
30  
#include <fcntl.h>
31  
#include <netinet/in.h>
31  
#include <netinet/in.h>
32  
#include <netinet/tcp.h>
32  
#include <netinet/tcp.h>
33  
#include <sys/select.h>
33  
#include <sys/select.h>
34  
#include <sys/socket.h>
34  
#include <sys/socket.h>
35  
#include <unistd.h>
35  
#include <unistd.h>
36  

36  

37  
/*
37  
/*
38  
    Each I/O op tries the syscall speculatively; only registers with
38  
    Each I/O op tries the syscall speculatively; only registers with
39  
    the reactor on EAGAIN. Fd is registered once at open time and
39  
    the reactor on EAGAIN. Fd is registered once at open time and
40  
    stays registered until close. The reactor only marks ready_events_;
40  
    stays registered until close. The reactor only marks ready_events_;
41  
    actual I/O happens in invoke_deferred_io(). cancel() captures
41  
    actual I/O happens in invoke_deferred_io(). cancel() captures
42  
    shared_from_this() into op.impl_ptr to keep the impl alive.
42  
    shared_from_this() into op.impl_ptr to keep the impl alive.
43  
*/
43  
*/
44  

44  

45  
namespace boost::corosio::detail {
45  
namespace boost::corosio::detail {
46  

46  

47  
/** select TCP service implementation.
47  
/** select TCP service implementation.
48  

48  

49  
    Inherits from tcp_service to enable runtime polymorphism.
49  
    Inherits from tcp_service to enable runtime polymorphism.
50  
    Uses key_type = tcp_service for service lookup.
50  
    Uses key_type = tcp_service for service lookup.
51  
*/
51  
*/
52  
class BOOST_COROSIO_DECL select_tcp_service final
52  
class BOOST_COROSIO_DECL select_tcp_service final
53  
    : public reactor_socket_service<
53  
    : public reactor_socket_service<
54  
          select_tcp_service,
54  
          select_tcp_service,
55  
          tcp_service,
55  
          tcp_service,
56  
          select_scheduler,
56  
          select_scheduler,
57  
          select_tcp_socket>
57  
          select_tcp_socket>
58  
{
58  
{
59  
public:
59  
public:
60  
    explicit select_tcp_service(capy::execution_context& ctx)
60  
    explicit select_tcp_service(capy::execution_context& ctx)
61  
        : reactor_socket_service(ctx)
61  
        : reactor_socket_service(ctx)
62  
    {
62  
    {
63  
    }
63  
    }
64  

64  

65  
    std::error_code open_socket(
65  
    std::error_code open_socket(
66  
        tcp_socket::implementation& impl,
66  
        tcp_socket::implementation& impl,
67  
        int family,
67  
        int family,
68  
        int type,
68  
        int type,
69  
        int protocol) override;
69  
        int protocol) override;
70  

70  

71  
    std::error_code
71  
    std::error_code
72  
    bind_socket(tcp_socket::implementation& impl, endpoint ep) override;
72  
    bind_socket(tcp_socket::implementation& impl, endpoint ep) override;
73  
};
73  
};
74  

74  

75  
inline void
75  
inline void
76  
select_connect_op::cancel() noexcept
76  
select_connect_op::cancel() noexcept
77  
{
77  
{
78  
    if (socket_impl_)
78  
    if (socket_impl_)
79  
        socket_impl_->cancel_single_op(*this);
79  
        socket_impl_->cancel_single_op(*this);
80  
    else
80  
    else
81  
        request_cancel();
81  
        request_cancel();
82  
}
82  
}
83  

83  

84  
inline void
84  
inline void
85  
select_read_op::cancel() noexcept
85  
select_read_op::cancel() noexcept
86  
{
86  
{
87  
    if (socket_impl_)
87  
    if (socket_impl_)
88  
        socket_impl_->cancel_single_op(*this);
88  
        socket_impl_->cancel_single_op(*this);
89  
    else
89  
    else
90  
        request_cancel();
90  
        request_cancel();
91  
}
91  
}
92  

92  

93  
inline void
93  
inline void
94  
select_write_op::cancel() noexcept
94  
select_write_op::cancel() noexcept
95  
{
95  
{
96  
    if (socket_impl_)
96  
    if (socket_impl_)
97  
        socket_impl_->cancel_single_op(*this);
97  
        socket_impl_->cancel_single_op(*this);
98  
    else
98  
    else
99  
        request_cancel();
99  
        request_cancel();
100  
}
100  
}
101  

101  

102  
inline void
102  
inline void
103  
select_op::operator()()
103  
select_op::operator()()
104  
{
104  
{
105  
    complete_io_op(*this);
105  
    complete_io_op(*this);
106  
}
106  
}
107  

107  

108  
inline void
108  
inline void
109  
select_connect_op::operator()()
109  
select_connect_op::operator()()
110  
{
110  
{
111  
    complete_connect_op(*this);
111  
    complete_connect_op(*this);
112  
}
112  
}
113  

113  

114  
inline select_tcp_socket::select_tcp_socket(select_tcp_service& svc) noexcept
114  
inline select_tcp_socket::select_tcp_socket(select_tcp_service& svc) noexcept
115  
    : reactor_stream_socket(svc)
115  
    : reactor_stream_socket(svc)
116  
{
116  
{
117  
}
117  
}
118  

118  

119  
inline select_tcp_socket::~select_tcp_socket() = default;
119  
inline select_tcp_socket::~select_tcp_socket() = default;
120  

120  

121  
inline std::coroutine_handle<>
121  
inline std::coroutine_handle<>
122  
select_tcp_socket::connect(
122  
select_tcp_socket::connect(
123  
    std::coroutine_handle<> h,
123  
    std::coroutine_handle<> h,
124  
    capy::executor_ref ex,
124  
    capy::executor_ref ex,
125  
    endpoint ep,
125  
    endpoint ep,
126  
    std::stop_token token,
126  
    std::stop_token token,
127  
    std::error_code* ec)
127  
    std::error_code* ec)
128  
{
128  
{
129  
    auto result = do_connect(h, ex, ep, token, ec);
129  
    auto result = do_connect(h, ex, ep, token, ec);
130  
    // Rebuild fd_sets so select() watches for writability
130  
    // Rebuild fd_sets so select() watches for writability
131  
    if (result == std::noop_coroutine())
131  
    if (result == std::noop_coroutine())
132  
        svc_.scheduler().notify_reactor();
132  
        svc_.scheduler().notify_reactor();
133  
    return result;
133  
    return result;
134  
}
134  
}
135  

135  

136  
inline std::coroutine_handle<>
136  
inline std::coroutine_handle<>
137  
select_tcp_socket::read_some(
137  
select_tcp_socket::read_some(
138  
    std::coroutine_handle<> h,
138  
    std::coroutine_handle<> h,
139  
    capy::executor_ref ex,
139  
    capy::executor_ref ex,
140  
    buffer_param param,
140  
    buffer_param param,
141  
    std::stop_token token,
141  
    std::stop_token token,
142  
    std::error_code* ec,
142  
    std::error_code* ec,
143  
    std::size_t* bytes_out)
143  
    std::size_t* bytes_out)
144  
{
144  
{
145  
    return do_read_some(h, ex, param, token, ec, bytes_out);
145  
    return do_read_some(h, ex, param, token, ec, bytes_out);
146  
}
146  
}
147  

147  

148  
inline std::coroutine_handle<>
148  
inline std::coroutine_handle<>
149  
select_tcp_socket::write_some(
149  
select_tcp_socket::write_some(
150  
    std::coroutine_handle<> h,
150  
    std::coroutine_handle<> h,
151  
    capy::executor_ref ex,
151  
    capy::executor_ref ex,
152  
    buffer_param param,
152  
    buffer_param param,
153  
    std::stop_token token,
153  
    std::stop_token token,
154  
    std::error_code* ec,
154  
    std::error_code* ec,
155  
    std::size_t* bytes_out)
155  
    std::size_t* bytes_out)
156  
{
156  
{
157  
    auto result = do_write_some(h, ex, param, token, ec, bytes_out);
157  
    auto result = do_write_some(h, ex, param, token, ec, bytes_out);
158  
    // Rebuild fd_sets so select() watches for writability
158  
    // Rebuild fd_sets so select() watches for writability
159  
    if (result == std::noop_coroutine())
159  
    if (result == std::noop_coroutine())
160  
        svc_.scheduler().notify_reactor();
160  
        svc_.scheduler().notify_reactor();
161  
    return result;
161  
    return result;
162  
}
162  
}
163  

163  

164  
inline void
164  
inline void
165  
select_tcp_socket::cancel() noexcept
165  
select_tcp_socket::cancel() noexcept
166  
{
166  
{
167  
    do_cancel();
167  
    do_cancel();
168  
}
168  
}
169  

169  

170  
inline void
170  
inline void
171  
select_tcp_socket::close_socket() noexcept
171  
select_tcp_socket::close_socket() noexcept
172  
{
172  
{
173  
    do_close_socket();
173  
    do_close_socket();
174  
}
174  
}
175  

175  

176  
inline std::error_code
176  
inline std::error_code
177  
select_tcp_service::open_socket(
177  
select_tcp_service::open_socket(
178  
    tcp_socket::implementation& impl, int family, int type, int protocol)
178  
    tcp_socket::implementation& impl, int family, int type, int protocol)
179  
{
179  
{
180  
    auto* select_impl = static_cast<select_tcp_socket*>(&impl);
180  
    auto* select_impl = static_cast<select_tcp_socket*>(&impl);
181  
    select_impl->close_socket();
181  
    select_impl->close_socket();
182  

182  

183  
    int fd = ::socket(family, type, protocol);
183  
    int fd = ::socket(family, type, protocol);
184  
    if (fd < 0)
184  
    if (fd < 0)
185  
        return make_err(errno);
185  
        return make_err(errno);
186  

186  

187  
    if (family == AF_INET6)
187  
    if (family == AF_INET6)
188  
    {
188  
    {
189  
        int one = 1;
189  
        int one = 1;
190  
        ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
190  
        ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
191  
    }
191  
    }
192  

192  

193  
    int flags = ::fcntl(fd, F_GETFL, 0);
193  
    int flags = ::fcntl(fd, F_GETFL, 0);
194  
    if (flags == -1)
194  
    if (flags == -1)
195  
    {
195  
    {
196  
        int errn = errno;
196  
        int errn = errno;
197  
        ::close(fd);
197  
        ::close(fd);
198  
        return make_err(errn);
198  
        return make_err(errn);
199  
    }
199  
    }
200  
    if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
200  
    if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
201  
    {
201  
    {
202  
        int errn = errno;
202  
        int errn = errno;
203  
        ::close(fd);
203  
        ::close(fd);
204  
        return make_err(errn);
204  
        return make_err(errn);
205  
    }
205  
    }
206  
    if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
206  
    if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
207  
    {
207  
    {
208  
        int errn = errno;
208  
        int errn = errno;
209  
        ::close(fd);
209  
        ::close(fd);
210  
        return make_err(errn);
210  
        return make_err(errn);
211  
    }
211  
    }
212  

212  

213  
    if (fd >= FD_SETSIZE)
213  
    if (fd >= FD_SETSIZE)
214  
    {
214  
    {
215  
        ::close(fd);
215  
        ::close(fd);
216  
        return make_err(EMFILE);
216  
        return make_err(EMFILE);
217  
    }
217  
    }
218  

218  

219  
#ifdef SO_NOSIGPIPE
219  
#ifdef SO_NOSIGPIPE
220  
    {
220  
    {
221  
        int one = 1;
221  
        int one = 1;
222  
        ::setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
222  
        ::setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
223  
    }
223  
    }
224  
#endif
224  
#endif
225  

225  

226  
    select_impl->fd_ = fd;
226  
    select_impl->fd_ = fd;
227  

227  

228  
    select_impl->desc_state_.fd = fd;
228  
    select_impl->desc_state_.fd = fd;
229  
    {
229  
    {
230  
        std::lock_guard lock(select_impl->desc_state_.mutex);
230  
        std::lock_guard lock(select_impl->desc_state_.mutex);
231  
        select_impl->desc_state_.read_op    = nullptr;
231  
        select_impl->desc_state_.read_op    = nullptr;
232  
        select_impl->desc_state_.write_op   = nullptr;
232  
        select_impl->desc_state_.write_op   = nullptr;
233  
        select_impl->desc_state_.connect_op = nullptr;
233  
        select_impl->desc_state_.connect_op = nullptr;
234  
    }
234  
    }
235  
    scheduler().register_descriptor(fd, &select_impl->desc_state_);
235  
    scheduler().register_descriptor(fd, &select_impl->desc_state_);
236  

236  

237  
    return {};
237  
    return {};
238  
}
238  
}
239  

239  

240  
inline std::error_code
240  
inline std::error_code
241  
select_tcp_service::bind_socket(
241  
select_tcp_service::bind_socket(
242  
    tcp_socket::implementation& impl, endpoint ep)
242  
    tcp_socket::implementation& impl, endpoint ep)
243  
{
243  
{
244  
    return static_cast<select_tcp_socket*>(&impl)->do_bind(ep);
244  
    return static_cast<select_tcp_socket*>(&impl)->do_bind(ep);
245  
}
245  
}
246  

246  

247  
} // namespace boost::corosio::detail
247  
} // namespace boost::corosio::detail
248  

248  

249  
#endif // BOOST_COROSIO_HAS_SELECT
249  
#endif // BOOST_COROSIO_HAS_SELECT
250  

250  

251  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_SERVICE_HPP
251  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_SERVICE_HPP