select.h 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. #ifndef DASYNQ_SELECT_H_
  2. #define DASYNQ_SELECT_H_
  3. #include <system_error>
  4. #include <vector>
  5. #include <atomic>
  6. #include <sys/time.h>
  7. #include <sys/types.h>
  8. #include <sys/wait.h>
  9. #include <sys/stat.h>
  10. #include <sys/select.h>
  11. #include <unistd.h>
  12. #include <csignal>
  13. #include <csetjmp>
  14. #include "config.h"
  15. #include "signal.h"
  16. // "select"-based event loop mechanism.
  17. //
  18. namespace dasynq {
  19. template <class Base> class select_events;
  20. class select_traits : public signal_traits
  21. {
  22. public:
  23. class fd_r;
  24. // File descriptor optional storage. If the mechanism can return the file descriptor, this
  25. // class will be empty, otherwise it can hold a file descriptor.
  26. class fd_s {
  27. public:
  28. fd_s(int fd) noexcept { }
  29. DASYNQ_EMPTY_BODY
  30. };
  31. // File descriptor reference (passed to event callback). If the mechanism can return the
  32. // file descriptor, this class holds the file descriptor. Otherwise, the file descriptor
  33. // must be stored in an fd_s instance.
  34. class fd_r {
  35. int fd;
  36. public:
  37. int getFd(fd_s ss)
  38. {
  39. return fd;
  40. }
  41. fd_r(int nfd) : fd(nfd)
  42. {
  43. }
  44. };
  45. constexpr static bool has_bidi_fd_watch = false;
  46. constexpr static bool has_separate_rw_fd_watches = true;
  47. // requires interrupt after adding/enabling an fd:
  48. constexpr static bool interrupt_after_fd_add = true;
  49. constexpr static bool interrupt_after_signal_add = true;
  50. constexpr static bool supports_non_oneshot_fd = false;
  51. };
  52. template <class Base> class select_events : public signal_events<Base, true>
  53. {
  54. fd_set read_set;
  55. fd_set write_set;
  56. //fd_set error_set; // logical OR of both the above
  57. int max_fd = -1; // highest fd in any of the sets, -1 if not initialised
  58. // userdata pointers in read and write respectively, for each fd:
  59. std::vector<void *> rd_udata;
  60. std::vector<void *> wr_udata;
  61. // Base contains:
  62. // lock - a lock that can be used to protect internal structure.
  63. // receive*() methods will be called with lock held.
  64. // receive_signal(sigdata_t &, user *) noexcept
  65. // receive_fd_event(fd_r, user *, int flags) noexcept
  66. using fd_r = typename select_traits::fd_r;
  67. void process_events(fd_set *read_set_p, fd_set *write_set_p, fd_set *error_set_p)
  68. {
  69. std::lock_guard<decltype(Base::lock)> guard(Base::lock);
  70. // Note: if error is set, we expect read or write is also set.
  71. for (int i = 0; i <= max_fd; i++) {
  72. if (FD_ISSET(i, read_set_p)) {
  73. if (FD_ISSET(i, &read_set) && rd_udata[i] != nullptr) {
  74. // report read
  75. int events = IN_EVENTS | (FD_ISSET(i, error_set_p) ? ERR_EVENTS : 0);
  76. auto r = Base::receive_fd_event(*this, fd_r(i), rd_udata[i], events);
  77. if (std::get<0>(r) == 0) {
  78. FD_CLR(i, &read_set);
  79. }
  80. }
  81. }
  82. }
  83. for (int i = 0; i <= max_fd; i++) {
  84. if (FD_ISSET(i, write_set_p)) {
  85. if (FD_ISSET(i, &write_set) && wr_udata[i] != nullptr) {
  86. // report write
  87. int events = OUT_EVENTS | (FD_ISSET(i, error_set_p) ? ERR_EVENTS : 0);
  88. auto r = Base::receive_fd_event(*this, fd_r(i), wr_udata[i], events);
  89. if (std::get<0>(r) == 0) {
  90. FD_CLR(i, &write_set);
  91. }
  92. }
  93. }
  94. }
  95. }
  96. public:
  97. /**
  98. * select_events constructor.
  99. *
  100. * Throws std::system_error or std::bad_alloc if the event loop cannot be initialised.
  101. */
  102. select_events()
  103. {
  104. init();
  105. }
  106. select_events(typename Base::delayed_init d) noexcept
  107. {
  108. // delayed initialisation
  109. }
  110. void init()
  111. {
  112. max_fd = 0;
  113. FD_ZERO(&read_set);
  114. FD_ZERO(&write_set);
  115. Base::init(this);
  116. }
  117. ~select_events() noexcept
  118. {
  119. if (max_fd != -1) {
  120. Base::cleanup();
  121. }
  122. }
  123. // fd: file descriptor to watch
  124. // userdata: data to associate with descriptor
  125. // flags: IN_EVENTS | OUT_EVENTS | ONE_SHOT
  126. // (only one of IN_EVENTS/OUT_EVENTS can be specified)
  127. // soft_fail: true if unsupported file descriptors should fail by returning false instead
  128. // of throwing an exception
  129. // returns: true on success; false if file descriptor type isn't supported and emulate == true
  130. // throws: std::system_error or std::bad_alloc on failure
  131. bool add_fd_watch(int fd, void *userdata, int flags, bool enabled = true, bool soft_fail = false)
  132. {
  133. if (fd >= FD_SETSIZE) {
  134. throw std::system_error(EMFILE, std::system_category());
  135. }
  136. if (flags & IN_EVENTS) {
  137. FD_SET(fd, &read_set);
  138. if (size_t(fd) >= rd_udata.size()) {
  139. rd_udata.resize(fd + 1);
  140. }
  141. rd_udata[fd] = userdata;
  142. }
  143. else {
  144. FD_SET(fd, &write_set);
  145. if (size_t(fd) >= wr_udata.size()) {
  146. wr_udata.resize(fd + 1);
  147. }
  148. wr_udata[fd] = userdata;
  149. }
  150. max_fd = std::max(fd, max_fd);
  151. return true;
  152. }
  153. // returns: 0 on success
  154. // IN_EVENTS if in watch requires emulation
  155. // OUT_EVENTS if out watch requires emulation
  156. int add_bidi_fd_watch(int fd, void *userdata, int flags, bool emulate = false)
  157. {
  158. if (fd >= FD_SETSIZE) {
  159. throw std::system_error(EMFILE, std::system_category());
  160. }
  161. if (flags & IN_EVENTS) {
  162. FD_SET(fd, &read_set);
  163. if (size_t(fd) >= rd_udata.size()) {
  164. rd_udata.resize(fd + 1);
  165. }
  166. rd_udata[fd] = userdata;
  167. }
  168. if (flags & OUT_EVENTS) {
  169. FD_SET(fd, &write_set);
  170. if (size_t(fd) >= wr_udata.size()) {
  171. wr_udata.resize(fd + 1);
  172. }
  173. wr_udata[fd] = userdata;
  174. }
  175. max_fd = std::max(fd, max_fd);
  176. return 0;
  177. }
  178. // flags specifies which watch to remove; ignored if the loop doesn't support
  179. // separate read/write watches.
  180. void remove_fd_watch_nolock(int fd, int flags)
  181. {
  182. if (flags & IN_EVENTS) {
  183. FD_CLR(fd, &read_set);
  184. rd_udata[fd] = nullptr;
  185. }
  186. if (flags & OUT_EVENTS) {
  187. FD_CLR(fd, &write_set);
  188. wr_udata[fd] = nullptr;
  189. }
  190. }
  191. void remove_fd_watch(int fd, int flags)
  192. {
  193. std::lock_guard<decltype(Base::lock)> guard(Base::lock);
  194. remove_fd_watch_nolock(fd, flags);
  195. }
  196. void remove_bidi_fd_watch(int fd) noexcept
  197. {
  198. FD_CLR(fd, &read_set);
  199. FD_CLR(fd, &write_set);
  200. }
  201. void enable_fd_watch_nolock(int fd, void *userdata, int flags)
  202. {
  203. if (flags & IN_EVENTS) {
  204. FD_SET(fd, &read_set);
  205. }
  206. else {
  207. FD_SET(fd, &write_set);
  208. }
  209. }
  210. void enable_fd_watch(int fd, void *userdata, int flags)
  211. {
  212. std::lock_guard<decltype(Base::lock)> guard(Base::lock);
  213. enable_fd_watch_nolock(fd, userdata, flags);
  214. }
  215. void disable_fd_watch_nolock(int fd, int flags)
  216. {
  217. if (flags & IN_EVENTS) {
  218. FD_CLR(fd, &read_set);
  219. }
  220. else {
  221. FD_CLR(fd, &write_set);
  222. }
  223. }
  224. void disable_fd_watch(int fd, int flags)
  225. {
  226. std::lock_guard<decltype(Base::lock)> guard(Base::lock);
  227. disable_fd_watch_nolock(fd, flags);
  228. }
  229. // If events are pending, process an unspecified number of them.
  230. // If no events are pending, wait until one event is received and
  231. // process this event (and possibly any other events received
  232. // simultaneously).
  233. // If processing an event removes a watch, there is a possibility
  234. // that the watched event will still be reported (if it has
  235. // occurred) before pull_events() returns.
  236. //
  237. // do_wait - if false, returns immediately if no events are
  238. // pending.
  239. void pull_events(bool do_wait) noexcept
  240. {
  241. struct timeval ts;
  242. struct timeval *wait_ts = nullptr;
  243. Base::lock.lock();
  244. // Check whether any timers are pending, and what the next timeout is.
  245. // Check whether any timers are pending, and what the next timeout is.
  246. this->process_monotonic_timers(do_wait, ts, wait_ts);
  247. volatile fd_set read_set_c;
  248. volatile fd_set write_set_c;
  249. volatile fd_set err_set;
  250. read_set_c = read_set;
  251. write_set_c = write_set;
  252. err_set = read_set;
  253. const sigset_t &active_sigmask = this->get_active_sigmask();
  254. int nfds = max_fd + 1;
  255. Base::lock.unlock();
  256. // using sigjmp/longjmp is ugly, but there is no other way. If a signal that we're watching is
  257. // received during polling, it will longjmp back to here:
  258. if (sigsetjmp(this->get_sigreceive_jmpbuf(), 1) != 0) {
  259. this->process_signal();
  260. do_wait = false;
  261. }
  262. if (! do_wait) {
  263. ts.tv_sec = 0;
  264. ts.tv_usec = 0;
  265. wait_ts = &ts;
  266. }
  267. this->sigmaskf(SIG_UNBLOCK, &active_sigmask, nullptr);
  268. int r = select(nfds, &read_set_c, &write_set_c, &err_set, wait_ts);
  269. // Note, a signal may be received here and the handler may perform siglongjmp to the above
  270. // established jmpbuf; that means we will execute the select statement again, but that's fine.
  271. this->sigmaskf(SIG_BLOCK, &active_sigmask, nullptr);
  272. if (r == -1 || r == 0) {
  273. // signal or no events
  274. if (r == 0 && do_wait) {
  275. // timeout:
  276. Base::lock.lock();
  277. this->process_monotonic_timers();
  278. Base::lock.unlock();
  279. }
  280. return;
  281. }
  282. process_events(&read_set_c, &write_set_c, &err_set);
  283. }
  284. };
  285. } // namespace dasynq
  286. #endif /* DASYNQ_SELECT_H_ */