select.h 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. #ifndef DASYNQ_SELECT_H_
  2. #define DASYNQ_SELECT_H_
  3. #include <system_error>
  4. #include <vector>
  5. #include <atomic>
  6. #include <sys/time.h>
  7. #include <sys/types.h>
  8. #include <sys/wait.h>
  9. #include <sys/stat.h>
  10. #include <sys/select.h>
  11. #include <unistd.h>
  12. #include <csignal>
  13. #include <csetjmp>
  14. #include "config.h"
  15. #include "signal.h"
  16. // "select"-based event loop mechanism.
  17. //
  18. namespace dasynq {
  19. namespace dprivate {
  20. class proc_status; // forward declaration
  21. }
  22. template <class Base> class select_events;
  23. class select_traits : public signal_traits
  24. {
  25. public:
  26. class fd_r;
  27. // File descriptor optional storage. If the mechanism can return the file descriptor, this
  28. // class will be empty, otherwise it can hold a file descriptor.
  29. class fd_s {
  30. public:
  31. fd_s(int fd) noexcept { }
  32. DASYNQ_EMPTY_BODY
  33. };
  34. // File descriptor reference (passed to event callback). If the mechanism can return the
  35. // file descriptor, this class holds the file descriptor. Otherwise, the file descriptor
  36. // must be stored in an fd_s instance.
  37. class fd_r {
  38. int fd;
  39. public:
  40. int getFd(fd_s ss)
  41. {
  42. return fd;
  43. }
  44. fd_r(int nfd) : fd(nfd)
  45. {
  46. }
  47. };
  48. using proc_status_t = dprivate::proc_status;
  49. constexpr static bool has_bidi_fd_watch = false;
  50. constexpr static bool has_separate_rw_fd_watches = true;
  51. // requires interrupt after adding/enabling an fd:
  52. constexpr static bool interrupt_after_fd_add = true;
  53. constexpr static bool interrupt_after_signal_add = true;
  54. constexpr static bool supports_non_oneshot_fd = false;
  55. };
  56. template <class Base> class select_events : public signal_events<Base, true>
  57. {
  58. fd_set read_set;
  59. fd_set write_set;
  60. int max_fd = -1; // highest fd in any of the sets, -1 if not initialised
  61. // userdata pointers in read and write respectively, for each fd:
  62. std::vector<void *> rd_udata;
  63. std::vector<void *> wr_udata;
  64. // Base contains:
  65. // lock - a lock that can be used to protect internal structure.
  66. // receive*() methods will be called with lock held.
  67. // receive_signal(sigdata_t &, user *) noexcept
  68. // receive_fd_event(fd_r, user *, int flags) noexcept
  69. using fd_r = typename select_traits::fd_r;
  70. void process_events(fd_set *read_set_p, fd_set *write_set_p, fd_set *error_set_p)
  71. {
  72. std::lock_guard<decltype(Base::lock)> guard(Base::lock);
  73. // Note: if error is set, we expect read or write is also set.
  74. for (int i = 0; i <= max_fd; i++) {
  75. if (FD_ISSET(i, read_set_p)) {
  76. if (FD_ISSET(i, &read_set) && rd_udata[i] != nullptr) {
  77. // report read
  78. int events = IN_EVENTS | (FD_ISSET(i, error_set_p) ? ERR_EVENTS : 0);
  79. auto r = Base::receive_fd_event(*this, fd_r(i), rd_udata[i], events);
  80. if (std::get<0>(r) == 0) {
  81. FD_CLR(i, &read_set);
  82. }
  83. }
  84. }
  85. }
  86. for (int i = 0; i <= max_fd; i++) {
  87. if (FD_ISSET(i, write_set_p)) {
  88. if (FD_ISSET(i, &write_set) && wr_udata[i] != nullptr) {
  89. // report write
  90. int events = OUT_EVENTS | (FD_ISSET(i, error_set_p) ? ERR_EVENTS : 0);
  91. auto r = Base::receive_fd_event(*this, fd_r(i), wr_udata[i], events);
  92. if (std::get<0>(r) == 0) {
  93. FD_CLR(i, &write_set);
  94. }
  95. }
  96. }
  97. }
  98. }
  99. public:
  100. /**
  101. * select_events constructor.
  102. *
  103. * Throws std::system_error or std::bad_alloc if the event loop cannot be initialised.
  104. */
  105. select_events()
  106. {
  107. init();
  108. }
  109. select_events(typename Base::delayed_init d) noexcept
  110. {
  111. // delayed initialisation
  112. }
  113. void init()
  114. {
  115. max_fd = 0;
  116. FD_ZERO(&read_set);
  117. FD_ZERO(&write_set);
  118. Base::init(this);
  119. }
  120. ~select_events() noexcept
  121. {
  122. if (max_fd != -1) {
  123. Base::cleanup();
  124. }
  125. }
  126. // fd: file descriptor to watch
  127. // userdata: data to associate with descriptor
  128. // flags: IN_EVENTS | OUT_EVENTS | ONE_SHOT
  129. // (only one of IN_EVENTS/OUT_EVENTS can be specified)
  130. // soft_fail: true if unsupported file descriptors should fail by returning false instead
  131. // of throwing an exception
  132. // returns: true on success; false if file descriptor type isn't supported and emulate == true
  133. // throws: std::system_error or std::bad_alloc on failure
  134. bool add_fd_watch(int fd, void *userdata, int flags, bool enabled = true, bool soft_fail = false)
  135. {
  136. if (fd >= FD_SETSIZE) {
  137. throw std::system_error(EMFILE, std::system_category());
  138. }
  139. if (flags & IN_EVENTS) {
  140. FD_SET(fd, &read_set);
  141. if (size_t(fd) >= rd_udata.size()) {
  142. rd_udata.resize(fd + 1);
  143. }
  144. rd_udata[fd] = userdata;
  145. }
  146. else {
  147. FD_SET(fd, &write_set);
  148. if (size_t(fd) >= wr_udata.size()) {
  149. wr_udata.resize(fd + 1);
  150. }
  151. wr_udata[fd] = userdata;
  152. }
  153. max_fd = std::max(fd, max_fd);
  154. return true;
  155. }
  156. // returns: 0 on success
  157. // IN_EVENTS if in watch requires emulation
  158. // OUT_EVENTS if out watch requires emulation
  159. int add_bidi_fd_watch(int fd, void *userdata, int flags, bool emulate = false)
  160. {
  161. if (fd >= FD_SETSIZE) {
  162. throw std::system_error(EMFILE, std::system_category());
  163. }
  164. if (flags & IN_EVENTS) {
  165. FD_SET(fd, &read_set);
  166. if (size_t(fd) >= rd_udata.size()) {
  167. rd_udata.resize(fd + 1);
  168. }
  169. rd_udata[fd] = userdata;
  170. }
  171. if (flags & OUT_EVENTS) {
  172. FD_SET(fd, &write_set);
  173. if (size_t(fd) >= wr_udata.size()) {
  174. wr_udata.resize(fd + 1);
  175. }
  176. wr_udata[fd] = userdata;
  177. }
  178. max_fd = std::max(fd, max_fd);
  179. return 0;
  180. }
  181. // flags specifies which watch to remove; ignored if the loop doesn't support
  182. // separate read/write watches.
  183. void remove_fd_watch_nolock(int fd, int flags)
  184. {
  185. if (flags & IN_EVENTS) {
  186. FD_CLR(fd, &read_set);
  187. rd_udata[fd] = nullptr;
  188. }
  189. if (flags & OUT_EVENTS) {
  190. FD_CLR(fd, &write_set);
  191. wr_udata[fd] = nullptr;
  192. }
  193. }
  194. void remove_fd_watch(int fd, int flags)
  195. {
  196. std::lock_guard<decltype(Base::lock)> guard(Base::lock);
  197. remove_fd_watch_nolock(fd, flags);
  198. }
  199. void remove_bidi_fd_watch(int fd) noexcept
  200. {
  201. FD_CLR(fd, &read_set);
  202. FD_CLR(fd, &write_set);
  203. }
  204. void enable_fd_watch_nolock(int fd, void *userdata, int flags)
  205. {
  206. if (flags & IN_EVENTS) {
  207. FD_SET(fd, &read_set);
  208. }
  209. else {
  210. FD_SET(fd, &write_set);
  211. }
  212. }
  213. void enable_fd_watch(int fd, void *userdata, int flags)
  214. {
  215. std::lock_guard<decltype(Base::lock)> guard(Base::lock);
  216. enable_fd_watch_nolock(fd, userdata, flags);
  217. }
  218. void disable_fd_watch_nolock(int fd, int flags)
  219. {
  220. if (flags & IN_EVENTS) {
  221. FD_CLR(fd, &read_set);
  222. }
  223. else {
  224. FD_CLR(fd, &write_set);
  225. }
  226. }
  227. void disable_fd_watch(int fd, int flags)
  228. {
  229. std::lock_guard<decltype(Base::lock)> guard(Base::lock);
  230. disable_fd_watch_nolock(fd, flags);
  231. }
  232. // If events are pending, process an unspecified number of them.
  233. // If no events are pending, wait until one event is received and
  234. // process this event (and possibly any other events received
  235. // simultaneously).
  236. // If processing an event removes a watch, there is a possibility
  237. // that the watched event will still be reported (if it has
  238. // occurred) before pull_events() returns.
  239. //
  240. // do_wait - if false, returns immediately if no events are
  241. // pending.
  242. void pull_events(bool do_wait) noexcept
  243. {
  244. struct timeval ts;
  245. struct timeval *wait_ts = nullptr;
  246. Base::lock.lock();
  247. // Check whether any timers are pending, and what the next timeout is.
  248. // Check whether any timers are pending, and what the next timeout is.
  249. this->process_monotonic_timers(do_wait, ts, wait_ts);
  250. fd_set read_set_c;
  251. fd_set write_set_c;
  252. fd_set err_set; // "exceptional conditions" eg urgent data
  253. read_set_c = read_set;
  254. write_set_c = write_set;
  255. err_set = read_set;
  256. const sigset_t &active_sigmask = this->get_active_sigmask();
  257. int nfds = max_fd + 1;
  258. Base::lock.unlock();
  259. // using sigjmp/longjmp is ugly, but there is no other way. If a signal that we're watching is
  260. // received during polling, it will longjmp back to here:
  261. if (sigsetjmp(this->get_sigreceive_jmpbuf(), 1) != 0) {
  262. this->process_signal();
  263. do_wait = false;
  264. }
  265. if (!do_wait) {
  266. ts.tv_sec = 0;
  267. ts.tv_usec = 0;
  268. wait_ts = &ts;
  269. }
  270. std::atomic_signal_fence(std::memory_order::memory_order_release);
  271. this->sigmaskf(SIG_UNBLOCK, &active_sigmask, nullptr);
  272. int r = select(nfds, &read_set_c, &write_set_c, &err_set, wait_ts);
  273. // Note, a signal may be received here and the handler may perform siglongjmp to the above
  274. // established jmpbuf; that means we will execute the select statement again, but that's fine.
  275. this->sigmaskf(SIG_BLOCK, &active_sigmask, nullptr);
  276. if (r == -1 || r == 0) {
  277. // signal or no events
  278. if (r == 0 && do_wait) {
  279. // timeout:
  280. Base::lock.lock();
  281. this->process_monotonic_timers();
  282. Base::lock.unlock();
  283. }
  284. return;
  285. }
  286. process_events(&read_set_c, &write_set_c, &err_set);
  287. }
  288. };
  289. } // namespace dasynq
  290. #endif /* DASYNQ_SELECT_H_ */