Browse Source

Update bundled Dasynq to 1.2.0

Davin McCall 3 years ago
parent
commit
d8b9ee4045

+ 5 - 0
src/dasynq/dasynq-daryheap.h

@@ -309,6 +309,11 @@ class dary_heap
         }
     }
 
+    size_t size() noexcept
+    {
+        return hvec.size();
+    }
+
     dary_heap() { }
 
     dary_heap(const dary_heap &) = delete;

+ 27 - 11
src/dasynq/dasynq-epoll.h

@@ -94,8 +94,8 @@ class epoll_traits
 
 template <class Base> class epoll_loop : public Base
 {
-    int epfd; // epoll fd
-    int sigfd; // signalfd fd; -1 if not initialised
+    int epfd = -1; // epoll fd
+    int sigfd = -1; // signalfd fd; -1 if not initialised
     sigset_t sigmask;
 
     std::unordered_map<int, void *> sigdataMap;
@@ -153,17 +153,33 @@ template <class Base> class epoll_loop : public Base
      *
      * Throws std::system_error or std::bad_alloc if the event loop cannot be initialised.
      */
-    epoll_loop() : sigfd(-1)
+    epoll_loop()
+    {
+        init();
+    }
+
+    epoll_loop(typename Base::delayed_init d) noexcept
+    {
+        // delayed initialisation
+    }
+
+    void init()
     {
         epfd = epoll_create1(EPOLL_CLOEXEC);
         if (epfd == -1) {
             throw std::system_error(errno, std::system_category());
         }
         sigemptyset(&sigmask);
-        Base::init(this);
+        try {
+            Base::init(this);
+        }
+        catch (...) {
+            close(epfd);
+            throw;
+        }
     }
     
-    ~epoll_loop()
+    ~epoll_loop() noexcept
     {
         close(epfd);
         if (sigfd != -1) {
@@ -199,7 +215,7 @@ template <class Base> class epoll_loop : public Base
             if (soft_fail && errno == EPERM) {
                 return false;
             }
-            throw new std::system_error(errno, std::system_category());        
+            throw std::system_error(errno, std::system_category());
         }
         return true;
     }
@@ -249,11 +265,11 @@ template <class Base> class epoll_loop : public Base
         
         if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &epevent) == -1) {
             // Shouldn't be able to fail
-            // throw new std::system_error(errno, std::system_category());
+            // throw std::system_error(errno, std::system_category());
         }
     }
     
-    void enable_fd_watch_nolock(int fd, void *userdata, int flags)
+    void enable_fd_watch_nolock(int fd, void *userdata, int flags) noexcept
     {
         enable_fd_watch(fd, userdata, flags);
     }
@@ -270,7 +286,7 @@ template <class Base> class epoll_loop : public Base
         // EPOLLIN is set.
         if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &epevent) == -1) {
             // Let's assume that this can't fail.
-            // throw new std::system_error(errno, std::system_category());
+            // throw std::system_error(errno, std::system_category());
         }
     }
     
@@ -296,7 +312,7 @@ template <class Base> class epoll_loop : public Base
         sigaddset(&sigmask, signo);
         sigfd = signalfd(sigfd, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC);
         if (sigfd == -1) {
-            throw new std::system_error(errno, std::system_category());
+            throw std::system_error(errno, std::system_category());
         }
         
         if (was_no_sigfd) {
@@ -309,7 +325,7 @@ template <class Base> class epoll_loop : public Base
             if (epoll_ctl(epfd, EPOLL_CTL_ADD, sigfd, &epevent) == -1) {
                 close(sigfd);
                 sigfd = -1;
-                throw new std::system_error(errno, std::system_category());        
+                throw std::system_error(errno, std::system_category());
             }
         }
     }

+ 28 - 12
src/dasynq/dasynq-kqueue-macos.h

@@ -72,7 +72,7 @@ class macos_kqueue_traits : public signal_traits
 
 template <class Base> class macos_kqueue_loop : public signal_events<Base, true>
 {
-    int kqfd; // kqueue fd
+    int kqfd = -1; // kqueue fd
 
     // Base contains:
     //   lock - a lock that can be used to protect internal structure.
@@ -129,20 +129,36 @@ template <class Base> class macos_kqueue_loop : public signal_events<Base, true>
      * Throws std::system_error or std::bad_alloc if the event loop cannot be initialised.
      */
     macos_kqueue_loop()
+    {
+        init();
+    }
+
+    macos_kqueue_loop(typename Base::delayed_init d) noexcept
+    {
+        // delayed initialisation
+    }
+
+    void init()
     {
         kqfd = kqueue();
         if (kqfd == -1) {
             throw std::system_error(errno, std::system_category());
         }
-        Base::init(this);
+        try {
+            Base::init(this);
+        }
+        catch (...) {
+            close(kqfd);
+            throw;
+        }
     }
 
-    ~macos_kqueue_loop()
+    ~macos_kqueue_loop() noexcept
     {
         close(kqfd);
     }
 
-    void set_filter_enabled(short filterType, uintptr_t ident, void *udata, bool enable)
+    void set_filter_enabled(short filterType, uintptr_t ident, void *udata, bool enable) noexcept
     {
         // Note, on OpenBSD enabling or disabling filter will not alter the filter parameters (udata etc);
         // on OS X however, it will. Therefore we set udata here (to the same value as it was originally
@@ -153,7 +169,7 @@ template <class Base> class macos_kqueue_loop : public signal_events<Base, true>
         kevent(kqfd, &kev, 1, nullptr, 0, nullptr);
     }
 
-    void remove_filter(short filterType, uintptr_t ident)
+    void remove_filter(short filterType, uintptr_t ident) noexcept
     {
         struct kevent kev;
         EV_SET(&kev, ident, filterType, EV_DELETE, 0, 0, 0);
@@ -176,7 +192,7 @@ template <class Base> class macos_kqueue_loop : public signal_events<Base, true>
             // We can't request poll semantics, so check for regular file:
             struct stat statbuf;
             if (fstat(fd, &statbuf) == -1) {
-                throw new std::system_error(errno, std::system_category());
+                throw std::system_error(errno, std::system_category());
             }
             if ((statbuf.st_mode & S_IFMT) == S_IFREG) {
                 // Regular file: emulation required
@@ -193,7 +209,7 @@ template <class Base> class macos_kqueue_loop : public signal_events<Base, true>
             if (filter == EVFILT_WRITE && errno == EINVAL && emulate) {
                 return false; // emulate
             }
-            throw new std::system_error(errno, std::system_category());
+            throw std::system_error(errno, std::system_category());
         }
         return true;
     }
@@ -214,7 +230,7 @@ template <class Base> class macos_kqueue_loop : public signal_events<Base, true>
         int r = kevent(kqfd, kev, 2, kev_r, 2, nullptr);
 
         if (r == -1) {
-            throw new std::system_error(errno, std::system_category());
+            throw std::system_error(errno, std::system_category());
         }
 
         // Some possibilities:
@@ -225,7 +241,7 @@ template <class Base> class macos_kqueue_loop : public signal_events<Base, true>
 
         if (kev_r[0].data != 0) {
             // read failed
-            throw new std::system_error(kev_r[0].data, std::system_category());
+            throw std::system_error(kev_r[0].data, std::system_category());
         }
 
         if (kev_r[1].data != 0) {
@@ -245,7 +261,7 @@ template <class Base> class macos_kqueue_loop : public signal_events<Base, true>
             EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, userdata);
             kevent(kqfd, kev, 1, nullptr, 0, nullptr);
             // throw exception
-            throw new std::system_error(kev_r[1].data, std::system_category());
+            throw std::system_error(kev_r[1].data, std::system_category());
         }
 
         return 0;
@@ -260,7 +276,7 @@ template <class Base> class macos_kqueue_loop : public signal_events<Base, true>
         int r = kevent(kqfd, kev, 1, nullptr, 0, nullptr);
 
         if (r == -1) {
-            throw new std::system_error(errno, std::system_category());
+            throw std::system_error(errno, std::system_category());
         }
 
         EV_SET(&kev[0], fd, EVFILT_WRITE, wflags, 0, 0, userdata);
@@ -275,7 +291,7 @@ template <class Base> class macos_kqueue_loop : public signal_events<Base, true>
             EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, userdata);
             kevent(kqfd, kev, 1, nullptr, 0, nullptr);
             // throw exception
-            throw new std::system_error(errno, std::system_category());
+            throw std::system_error(errno, std::system_category());
         }
 
         return 0;

+ 27 - 11
src/dasynq/dasynq-kqueue.h

@@ -278,15 +278,31 @@ template <class Base> class kqueue_loop : public Base
      * Throws std::system_error or std::bad_alloc if the event loop cannot be initialised.
      */
     kqueue_loop()
+    {
+        init();
+    }
+
+    kqueue_loop(typename Base::delayed_init d) noexcept
+    {
+        // delayed initialisation
+    }
+
+    void init()
     {
         kqfd = kqueue();
         if (kqfd == -1) {
             throw std::system_error(errno, std::system_category());
         }
-        Base::init(this);
+        try {
+            Base::init(this);
+        }
+        catch (...) {
+            close(kqfd);
+            throw;
+        }
     }
     
-    ~kqueue_loop()
+    ~kqueue_loop() noexcept
     {
         close(kqfd);
     }
@@ -325,7 +341,7 @@ template <class Base> class kqueue_loop : public Base
             // We can't request poll semantics, so check for regular file:
             struct stat statbuf;
             if (fstat(fd, &statbuf) == -1) {
-                throw new std::system_error(errno, std::system_category());
+                throw std::system_error(errno, std::system_category());
             }
             if ((statbuf.st_mode & S_IFMT) == S_IFREG) {
                 // Regular file: emulation required
@@ -342,7 +358,7 @@ template <class Base> class kqueue_loop : public Base
             if (filter == EVFILT_WRITE && errno == EINVAL && emulate) {
                 return false; // emulate
             }
-            throw new std::system_error(errno, std::system_category());
+            throw std::system_error(errno, std::system_category());
         }
         return true;
     }
@@ -363,7 +379,7 @@ template <class Base> class kqueue_loop : public Base
         int r = kevent(kqfd, kev, 2, kev_r, 2, nullptr);
 
         if (r == -1) {
-            throw new std::system_error(errno, std::system_category());
+            throw std::system_error(errno, std::system_category());
         }
 
         // Some possibilities:
@@ -374,7 +390,7 @@ template <class Base> class kqueue_loop : public Base
 
         if (kev_r[0].data != 0) {
             // read failed
-            throw new std::system_error(kev_r[0].data, std::system_category());
+            throw std::system_error(kev_r[0].data, std::system_category());
         }
 
         if (kev_r[1].data != 0) {
@@ -394,7 +410,7 @@ template <class Base> class kqueue_loop : public Base
             EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, userdata);
             kevent(kqfd, kev, 1, nullptr, 0, nullptr);
             // throw exception
-            throw new std::system_error(kev_r[1].data, std::system_category());
+            throw std::system_error(kev_r[1].data, std::system_category());
         }
 
         return 0;
@@ -409,7 +425,7 @@ template <class Base> class kqueue_loop : public Base
         int r = kevent(kqfd, kev, 1, nullptr, 0, nullptr);
 
         if (r == -1) {
-            throw new std::system_error(errno, std::system_category());
+            throw std::system_error(errno, std::system_category());
         }
 
         EV_SET(&kev[0], fd, EVFILT_WRITE, wflags, 0, 0, userdata);
@@ -424,7 +440,7 @@ template <class Base> class kqueue_loop : public Base
             EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, userdata);
             kevent(kqfd, kev, 1, nullptr, 0, nullptr);
             // throw exception
-            throw new std::system_error(errno, std::system_category());
+            throw std::system_error(errno, std::system_category());
         }
 
         return 0;
@@ -489,7 +505,7 @@ template <class Base> class kqueue_loop : public Base
         struct kevent evt;
         EV_SET(&evt, signo, EVFILT_SIGNAL, EV_ADD | EV_DISABLE, 0, 0, userdata);
         if (kevent(kqfd, &evt, 1, nullptr, 0, nullptr) == -1) {
-            throw new std::system_error(errno, std::system_category());
+            throw std::system_error(errno, std::system_category());
         }
         // TODO use EV_DISPATCH if available (not on OpenBSD/OS X)
         
@@ -502,7 +518,7 @@ template <class Base> class kqueue_loop : public Base
         if (enable_filt) {
             evt.flags = EV_ENABLE;
             if (kevent(kqfd, &evt, 1, nullptr, 0, nullptr) == -1) {
-                throw new std::system_error(errno, std::system_category());
+                throw std::system_error(errno, std::system_category());
             }
         }
     }

+ 11 - 1
src/dasynq/dasynq-pselect.h

@@ -61,13 +61,23 @@ template <class Base> class pselect_events : public signal_events<Base, false>
      * Throws std::system_error or std::bad_alloc if the event loop cannot be initialised.
      */
     pselect_events()
+    {
+        init();
+    }
+
+    pselect_events(typename Base::delayed_init d) noexcept
+    {
+        // delayed initialisation
+    }
+
+    void init()
     {
         FD_ZERO(&read_set);
         FD_ZERO(&write_set);
         Base::init(this);
     }
 
-    ~pselect_events()
+    ~pselect_events() noexcept
     {
     }
 

+ 10 - 0
src/dasynq/dasynq-select.h

@@ -121,6 +121,16 @@ template <class Base> class select_events : public signal_events<Base, true>
      * Throws std::system_error or std::bad_alloc if the event loop cannot be initialised.
      */
     select_events()
+    {
+        init();
+    }
+
+    select_events(typename Base::delayed_init d) noexcept
+    {
+        // delayed initialisation
+    }
+
+    void init()
     {
         FD_ZERO(&read_set);
         FD_ZERO(&write_set);

+ 6 - 0
src/dasynq/dasynq-stableheap.h

@@ -10,6 +10,7 @@
 
 #include <functional>
 #include <utility>
+
 #include <cstdint>
 
 namespace dasynq {
@@ -117,6 +118,11 @@ class stable_heap : private H<T,stable_prio<P>,compare_stable_prio<P,C>>
     {
         return Base::empty();
     }
+
+    unsigned size()
+    {
+        return Base::size();
+    }
 };
 
 } // namespace dasynq

+ 2 - 1
src/dasynq/dasynq-util.h

@@ -1,9 +1,10 @@
 #ifndef DASYNQ_UTIL_H_INCLUDED
 #define DASYNQ_UTIL_H_INCLUDED 1
 
-#include <dasynq-config.h>
 #include <unistd.h>
 
+#include "dasynq-config.h"
+
 namespace dasynq {
 
 // Define pipe2, if it's not present in the sytem library. pipe2 is like pipe with an additional flags

+ 34 - 8
src/dasynq/dasynq.h

@@ -76,7 +76,7 @@
 //   full_timer_support
 //              - boolean indicating that the monotonic and system clocks are actually different clocks and
 //                that timers against the system clock will work correctly if the system clock time is
-//                adjusted. If false, the monotic clock may not be present at all (monotonic clock will map
+//                adjusted. If false, the monotonic clock may not be present at all (monotonic clock will map
 //                to system clock), and timers against either clock are not guaranteed to work correctly if
 //                the system clock is adjusted.
 
@@ -169,6 +169,11 @@ enum class rearm
     REQUEUE
 };
 
+// Tag type to specify that initialisation should be delayed
+class delayed_init {
+    DASYNQ_EMPTY_BODY
+};
+
 namespace dprivate {
 
     // Classes for implementing a fair(ish) wait queue.
@@ -205,7 +210,7 @@ namespace dprivate {
         void wait(std::unique_lock<null_mutex> &ul) { }
         void signal() { }
         
-        DASYNQ_EMPTY_BODY;
+        DASYNQ_EMPTY_BODY
     };
 
     template <typename T_Mutex> class waitqueue_node
@@ -425,6 +430,7 @@ namespace dprivate {
         public:
         using mutex_t = typename LoopTraits::mutex_t;
         using traits_t = Traits;
+        using delayed_init = dasynq::delayed_init;
 
         private:
 
@@ -545,7 +551,7 @@ namespace dprivate {
         
         // Pull a single event from the queue; returns nullptr if the queue is empty.
         // Call with lock held.
-        base_watcher * pull_event() noexcept
+        base_watcher * pull_queued_event() noexcept
         {
             if (event_queue.empty()) {
                 return nullptr;
@@ -557,6 +563,11 @@ namespace dprivate {
             return r;
         }
         
+        size_t num_queued_events() noexcept
+        {
+            return event_queue.size();
+        }
+
         // Queue a watcher for removal, or issue "removed" callback to it.
         // Call with lock free.
         void issue_delete(base_watcher *watcher) noexcept
@@ -1410,7 +1421,15 @@ class event_loop
             return false;
         }
         
-        base_watcher * pqueue = loop_mech.pull_event();
+        // limit processing to the number of events currently queued, to avoid prolonged processing
+        // of watchers which requeueu themselves immediately (including file watchers which are using
+        // emulation for watching regular files)
+        //
+        // If limit is -1 (no limit) we rely on this being always larger than/equal to the number of
+        // queued events when cast to size_t (which is unsigned).
+        limit = std::min(size_t(limit), loop_mech.num_queued_events());
+
+        base_watcher * pqueue = loop_mech.pull_queued_event();
         bool active = false;
         
         while (pqueue != nullptr) {
@@ -1438,16 +1457,16 @@ class event_loop
 
                 // issue a secondary dispatch:
                 bbfw->dispatch_second(this);
-                pqueue = loop_mech.pull_event();
-                continue;
+            }
+            else {
+                pqueue->dispatch(this);
             }
 
-            pqueue->dispatch(this);
             if (limit > 0) {
                 limit--;
                 if (limit == 0) break;
             }
-            pqueue = loop_mech.pull_event();
+            pqueue = loop_mech.pull_queued_event();
         }
         
         loop_mech.lock.unlock();
@@ -1515,7 +1534,14 @@ class event_loop
     }
 
     event_loop() { }
+    event_loop(delayed_init d) noexcept : loop_mech(d) { }
     event_loop(const event_loop &other) = delete;
+
+    // Perform delayed initialisation, if constructed with delayed_init
+    void init()
+    {
+        loop_mech.init();
+    }
 };
 
 typedef event_loop<null_mutex> event_loop_n;