Bläddra i källkod

First attempt at a Rust based Event_socketRead implementation

Caleb James DeLisle 5 månader sedan
förälder
incheckning
e99eb87b06

+ 9 - 0
rust/cjdns_sys/Rffi.h

@@ -9,6 +9,8 @@ typedef struct RTypes_CryptoAuth2_t RTypes_CryptoAuth2_t;
 
 typedef struct Rffi_EventLoop Rffi_EventLoop;
 
+typedef struct Rffi_FdReadableTx Rffi_FdReadableTx;
+
 /**
  * The guard of an acquired [`GCL`].
  */
@@ -155,6 +157,13 @@ int Rffi_isTimeoutActive(const Rffi_TimerTx *timer_tx);
  */
 void Rffi_clearAllTimeouts(Rffi_EventLoop *event_loop);
 
+void Rffi_pollFdReadable(Rffi_FdReadableTx **out,
+                         void (*cb)(void*),
+                         void *cb_context,
+                         int fd,
+                         Rffi_EventLoop *event_loop,
+                         Allocator_t *alloc);
+
 /**
  * Convert IPv4 and IPv6 addresses from binary to text form.
  */

+ 84 - 0
rust/cjdns_sys/src/rffi/event_loop/fd_readable.rs

@@ -0,0 +1,84 @@
+use super::{Rffi_EventLoop, GCL};
+use crate::cffi::{Allocator_t, Allocator__onFree, Allocator_OnFreeJob};
+use crate::rffi::allocator;
+use std::ffi::c_void;
+use std::os::raw::c_int;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Arc;
+
+/// Used only internally, to send `*mut c_void` into a tokio task.
+struct FdReadableCallback {
+    cb: unsafe extern "C" fn(*mut c_void),
+    cb_context: *mut c_void,
+}
+
+// SAFETY: this only holds if the C code is thread-safe, or the tokio Runtime uses only a single thread.
+unsafe impl Send for FdReadableCallback {}
+
+struct FdReadable {
+    _kill: tokio::sync::mpsc::UnboundedSender<()>,
+    active: Arc<AtomicBool>,
+}
+
+pub struct Rffi_FdReadableTx(Arc<FdReadable>);
+
+pub extern "C" fn fd_readable_on_free(j: *mut Allocator_OnFreeJob) -> c_int {
+    let timer_tx = unsafe { &*((*j).userData as *mut Rffi_FdReadableTx) };
+    timer_tx.0.active.store(false, Ordering::Relaxed);
+    0
+}
+
+#[no_mangle]
+pub extern "C" fn Rffi_pollFdReadable(
+    out: *mut *mut Rffi_FdReadableTx,
+    cb: unsafe extern "C" fn(*mut c_void),
+    cb_context: *mut c_void,
+    fd: c_int,
+    event_loop: *mut Rffi_EventLoop,
+    alloc: *mut Allocator_t,
+) {
+    // let cb_int = cb_context as u64;
+    let frc = FdReadableCallback{ cb, cb_context };
+
+    let (kill, mut rx_kill) = tokio::sync::mpsc::unbounded_channel();
+    let rtx = Arc::new(FdReadable {
+        _kill: kill,
+        active: Arc::new(AtomicBool::new(true)),
+    });
+
+    let event_loop = unsafe { (&*event_loop).arc_clone() };
+    let active = Arc::clone(&rtx.active);
+
+    unsafe {
+        let event_tx = allocator::adopt(alloc, Rffi_FdReadableTx(rtx));
+        // Note: we must close the event in the allocator onFree rather than in the drop
+        // because the drop only happens later, when Rust wants to.
+        Allocator__onFree(alloc,
+            Some(fd_readable_on_free),
+            event_tx as *mut c_void,
+            ("fd_readable.rs\0").as_bytes().as_ptr() as *const ::std::os::raw::c_char,
+            line!() as std::os::raw::c_int);
+        *out = event_tx;
+    }
+
+    event_loop.arc_clone().event_job(async move {
+        let fd = tokio::io::unix::AsyncFd::new(fd).unwrap();
+        loop {
+            tokio::select! {
+                _r = fd.readable() => {
+                    let _guard = GCL.lock();
+                    if active.load(Ordering::Relaxed) {
+                        unsafe { (frc.cb)(frc.cb_context); }
+                    } else {
+                        log::info!("Rffi_pollFdReadable down: active = false");
+                        break;
+                    }
+                }
+                _ = rx_kill.recv() => {
+                    log::info!("Rffi_pollFdReadable down: rx_kill received");
+                    break;
+                }
+            }
+        }
+    });
+}

+ 1 - 0
rust/cjdns_sys/src/rffi/event_loop/mod.rs

@@ -11,6 +11,7 @@ use tokio::sync::broadcast;
 
 mod process;
 mod timeout;
+mod fd_readable;
 
 /// The guard of an acquired [`GCL`].
 pub struct Rffi_Glock_guard(ReentrantMutexGuard<'static, ()>);

+ 5 - 10
util/events/Event.h

@@ -20,15 +20,10 @@
 #include "util/Linker.h"
 Linker_require("util/events/libuv/Event.c")
 
-struct Event
-{
-    int unused;
-};
-
-struct Event* Event_socketRead(void (* const callback)(void* callbackContext),
-                               void* const callbackContext,
-                               int s,
-                               struct EventBase* base,
-                               struct Allocator* alloc);
+void Event_socketRead(void (* const callback)(void* callbackContext),
+                      void* const callbackContext,
+                      int s,
+                      struct EventBase* base,
+                      struct Allocator* alloc);
 
 #endif

+ 60 - 49
util/events/libuv/Event.c

@@ -12,7 +12,8 @@
  * You should have received a copy of the GNU General Public License
  * along with this program.  If not, see <https://www.gnu.org/licenses/>.
  */
-#include "util/events/libuv/UvWrapper.h"
+#include "rust/cjdns_sys/Rffi.h"
+// #include "util/events/libuv/UvWrapper.h"
 #include "memory/Allocator.h"
 #include "util/events/libuv/EventBase_pvt.h"
 #include "util/events/Event.h"
@@ -21,64 +22,74 @@
 #include <stddef.h>
 #include <stdint.h>
 
-struct Event_pvt
-{
-    struct Event pub;
-    void (* const callback)(void* callbackContext);
-    void* const callbackContext;
-    uv_poll_t handler;
-    struct Allocator* alloc;
-    Identity
-};
+// struct Event_pvt
+// {
+//     struct Event pub;
+//     void (* const callback)(void* callbackContext);
+//     void* const callbackContext;
+//     uv_poll_t handler;
+//     struct Allocator* alloc;
+//     Identity
+// };
 
-static void handleEvent(uv_poll_t* handle, int status, int events)
-{
-    struct Event_pvt* event =
-        Identity_check((struct Event_pvt*) (((char*)handle) - offsetof(struct Event_pvt, handler)));
+// static void handleEvent(uv_poll_t* handle, int status, int events)
+// {
+//     struct Event_pvt* event =
+//         Identity_check((struct Event_pvt*) (((char*)handle) - offsetof(struct Event_pvt, handler)));
 
-    if ((status == 0) && (events & UV_READABLE)) {
-        event->callback(event->callbackContext);
-    }
-}
+//     if ((status == 0) && (events & UV_READABLE)) {
+//         event->callback(event->callbackContext);
+//     }
+// }
 
-static void onClose(uv_handle_t* handle)
-{
-    struct Event_pvt* event = Identity_check((struct Event_pvt*) handle->data);
-    Allocator_free(event->alloc);
-}
+// static void onClose(uv_handle_t* handle)
+// {
+//     struct Event_pvt* event = Identity_check((struct Event_pvt*) handle->data);
+//     Allocator_free(event->alloc);
+// }
 
-static int onFree(struct Allocator_OnFreeJob* job)
-{
-    struct Event_pvt* event = Identity_check((struct Event_pvt*) job->userData);
-    event->handler.data = event;
-    uv_close((uv_handle_t*) &event->handler, onClose);
-    return 0;
-}
+// static int onFree(struct Allocator_OnFreeJob* job)
+// {
+//     struct Event_pvt* event = Identity_check((struct Event_pvt*) job->userData);
+//     event->handler.data = event;
+//     uv_close((uv_handle_t*) &event->handler, onClose);
+//     return 0;
+// }
 
-struct Event* Event_socketRead(void (* const callback)(void* callbackContext),
-                               void* const callbackContext,
-                               int s,
-                               struct EventBase* eventBase,
-                               struct Allocator* userAlloc)
+void Event_socketRead(void (* const callback)(void* callbackContext),
+                      void* const callbackContext,
+                      int s,
+                      struct EventBase* eventBase,
+                      struct Allocator* userAlloc)
 {
     struct EventBase_pvt* base = EventBase_privatize(eventBase);
-    struct Allocator* alloc = Allocator_child(base->alloc);
-    struct Event_pvt* out = Allocator_clone(alloc, (&(struct Event_pvt) {
-        .callback = callback,
-        .callbackContext = callbackContext,
-        .alloc = alloc
-    }));
-    Identity_set(out);
+    Rffi_FdReadableTx* out = NULL;
+    Rffi_pollFdReadable(
+        &out,
+        callback,
+        callbackContext,
+        s,
+        base->rffi_loop,
+        userAlloc);
+
+    // struct EventBase_pvt* base = EventBase_privatize(eventBase);
+    // struct Allocator* alloc = Allocator_child(base->alloc);
+    // struct Event_pvt* out = Allocator_clone(alloc, (&(struct Event_pvt) {
+    //     .callback = callback,
+    //     .callbackContext = callbackContext,
+    //     .alloc = alloc
+    // }));
+    // Identity_set(out);
 
-    // != 0 check, removed because uv_poll_init always returns 0
-    uv_poll_init(base->loop, &out->handler, s);
+    // // != 0 check, removed because uv_poll_init always returns 0
+    // uv_poll_init(base->loop, &out->handler, s);
 
-    // == -1 check, removed because uv_poll_start always returns 0
-    uv_poll_start(&out->handler, UV_READABLE, handleEvent);
+    // // == -1 check, removed because uv_poll_start always returns 0
+    // uv_poll_start(&out->handler, UV_READABLE, handleEvent);
 
-    out->handler.data = out;
+    // out->handler.data = out;
 
-    Allocator_onFree(userAlloc, onFree, out);
+    // Allocator_onFree(userAlloc, onFree, out);
 
-    return &out->pub;
+    // return &out->pub;
 }