Procházet zdrojové kódy

Make Allocator_free() always synchronous. When async freeing is needed, chain allocators.

Caleb James DeLisle před 1 rokem
rodič
revize
e6160b456b

+ 98 - 30
Cargo.lock

@@ -74,9 +74,9 @@ dependencies = [
 
 [[package]]
 name = "autocfg"
-version = "1.0.1"
+version = "1.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
+checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
 
 [[package]]
 name = "backtrace"
@@ -413,7 +413,7 @@ checksum = "c9495705279e7140bf035dde1f6e750c162df8b625267cd52cc44e0b156732c8"
 dependencies = [
  "cfg-if 1.0.0",
  "libc",
- "wasi",
+ "wasi 0.10.2+wasi-snapshot-preview1",
 ]
 
 [[package]]
@@ -614,24 +614,14 @@ dependencies = [
 
 [[package]]
 name = "mio"
-version = "0.7.14"
+version = "0.8.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8067b404fe97c70829f082dec8bcf4f71225d7eaea1d8645349cb76fa06205cc"
+checksum = "5b9d9a46eff5b4ff64b45a9e316a6d1e0bc719ef429cbec4dc630684212bfdf9"
 dependencies = [
  "libc",
  "log",
- "miow",
- "ntapi",
- "winapi",
-]
-
-[[package]]
-name = "miow"
-version = "0.3.7"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21"
-dependencies = [
- "winapi",
+ "wasi 0.11.0+wasi-snapshot-preview1",
+ "windows-sys 0.45.0",
 ]
 
 [[package]]
@@ -644,15 +634,6 @@ dependencies = [
  "version_check",
 ]
 
-[[package]]
-name = "ntapi"
-version = "0.3.6"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44"
-dependencies = [
- "winapi",
-]
-
 [[package]]
 name = "num-integer"
 version = "0.1.44"
@@ -1219,19 +1200,19 @@ dependencies = [
 
 [[package]]
 name = "tokio"
-version = "1.16.1"
+version = "1.25.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0c27a64b625de6d309e8c57716ba93021dccf1b3b5c97edd6d3dd2d2135afc0a"
+checksum = "c8e00990ebabbe4c14c08aca901caed183ecd5c09562a12c824bb53d3c3fd3af"
 dependencies = [
+ "autocfg",
  "bytes",
  "libc",
  "mio",
  "num_cpus",
- "once_cell",
  "pin-project-lite",
  "signal-hook-registry",
  "tokio-macros",
- "winapi",
+ "windows-sys 0.42.0",
 ]
 
 [[package]]
@@ -1322,6 +1303,12 @@ version = "0.10.2+wasi-snapshot-preview1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6"
 
+[[package]]
+name = "wasi"
+version = "0.11.0+wasi-snapshot-preview1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
+
 [[package]]
 name = "wasm-bindgen"
 version = "0.2.70"
@@ -1425,3 +1412,84 @@ name = "winapi-x86_64-pc-windows-gnu"
 version = "0.4.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
+
+[[package]]
+name = "windows-sys"
+version = "0.42.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7"
+dependencies = [
+ "windows_aarch64_gnullvm",
+ "windows_aarch64_msvc",
+ "windows_i686_gnu",
+ "windows_i686_msvc",
+ "windows_x86_64_gnu",
+ "windows_x86_64_gnullvm",
+ "windows_x86_64_msvc",
+]
+
+[[package]]
+name = "windows-sys"
+version = "0.45.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0"
+dependencies = [
+ "windows-targets",
+]
+
+[[package]]
+name = "windows-targets"
+version = "0.42.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8e2522491fbfcd58cc84d47aeb2958948c4b8982e9a2d8a2a35bbaed431390e7"
+dependencies = [
+ "windows_aarch64_gnullvm",
+ "windows_aarch64_msvc",
+ "windows_i686_gnu",
+ "windows_i686_msvc",
+ "windows_x86_64_gnu",
+ "windows_x86_64_gnullvm",
+ "windows_x86_64_msvc",
+]
+
+[[package]]
+name = "windows_aarch64_gnullvm"
+version = "0.42.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8c9864e83243fdec7fc9c5444389dcbbfd258f745e7853198f365e3c4968a608"
+
+[[package]]
+name = "windows_aarch64_msvc"
+version = "0.42.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4c8b1b673ffc16c47a9ff48570a9d85e25d265735c503681332589af6253c6c7"
+
+[[package]]
+name = "windows_i686_gnu"
+version = "0.42.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "de3887528ad530ba7bdbb1faa8275ec7a1155a45ffa57c37993960277145d640"
+
+[[package]]
+name = "windows_i686_msvc"
+version = "0.42.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bf4d1122317eddd6ff351aa852118a2418ad4214e6613a50e0191f7004372605"
+
+[[package]]
+name = "windows_x86_64_gnu"
+version = "0.42.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c1040f221285e17ebccbc2591ffdc2d44ee1f9186324dd3e84e99ac68d699c45"
+
+[[package]]
+name = "windows_x86_64_gnullvm"
+version = "0.42.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "628bfdf232daa22b0d64fdb62b09fcc36bb01f05a3939e20ab73aaf9470d0463"
+
+[[package]]
+name = "windows_x86_64_msvc"
+version = "0.42.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "447660ad36a13288b1db4d4248e857b510e8c3a225c822ba4fb748c0aafecffd"

+ 2 - 12
memory/Allocator.c

@@ -86,19 +86,10 @@ void Allocator__adopt(struct Allocator* adoptedParent,
     Rffi_allocator_adopt(adoptedParent, childToAdopt);
 }
 
-void Allocator_onFreeComplete(struct Allocator_OnFreeJob* onFreeJob)
-{
-    Identity_check(onFreeJob);
-    Rffi_allocator_onFreeComplete((OnFreeCtx*) onFreeJob->rContext);
-}
-
-static void onFree(void* voj, OnFreeCtx *complete)
+static void onFree(void* voj)
 {
     struct Allocator_OnFreeJob* oj = Identity_check((struct Allocator_OnFreeJob*) voj);
-    oj->rContext = (void*) complete;
-    if (oj->callback(oj) != Allocator_ONFREE_ASYNC) {
-        Rffi_allocator_onFreeComplete(complete);
-    }
+    oj->callback(oj);
 }
 
 struct Allocator_OnFreeJob* Allocator__onFree(struct Allocator* alloc,
@@ -111,7 +102,6 @@ struct Allocator_OnFreeJob* Allocator__onFree(struct Allocator* alloc,
     Identity_set(oj);
     oj->callback = callback;
     oj->userData = callbackContext;
-    oj->rContext = NULL;
     Rffi_allocator_onFree(alloc, onFree, oj, file, line);
     return oj;
 }

+ 0 - 15
memory/Allocator.h

@@ -33,17 +33,9 @@ struct Allocator_OnFreeJob
     /** Set by caller. */
     Allocator_OnFreeCallback callback;
     void* userData;
-    void* rContext;
     Identity
 };
 
-/**
- * If an onFree job needs to complete asynchronously, it should return this,
- * then when it is complete it must call job->complete(job) on the OnFreeJob
- * which was passed to it.
- */
-#define Allocator_ONFREE_ASYNC 10000
-
 /**
  * Allocator for structured memory management.
  * The objective of the allocator structure is to make manual memory management easier, specifically
@@ -237,13 +229,6 @@ struct Allocator_OnFreeJob* Allocator__onFree(struct Allocator* alloc,
  */
 int Allocator_cancelOnFree(struct Allocator_OnFreeJob* toRemove);
 
-/**
- * Tell the allocator that an asynchronous onFree() job has completed.
- *
- * @param job the return value from calling Allocator_onFree().
- */
-void Allocator_onFreeComplete(struct Allocator_OnFreeJob* onFreeJob);
-
 /**
  * Adopt an allocator.
  * This creates a child of parentAlloc which is an adopted parent of toAdopt.

+ 1 - 1
rust/cjdns_sys/Cargo.toml

@@ -17,7 +17,7 @@ anyhow = { version = "1.0", features = ["backtrace"] }
 boringtun = { git = "https://github.com/cjdelisle/boringtun", rev = "f288b2f461e7322a278b63b1ddc4ab705b5b7462", version = "0.3", features = ["additional_data"] }
 slog = { version = "2.7", features = ["release_max_level_trace"] }
 pnet = { version = "0.29" }
-tokio = { version = "1", features = ["macros","time","sync","fs","rt-multi-thread","process"], default-features = false }
+tokio = { version = "1.25", features = ["macros","time","sync","fs","rt-multi-thread","process"], default-features = false }
 once_cell = "1.9"
 async-recursion = "1"
 

+ 3 - 5
rust/cjdns_sys/Rffi.h

@@ -5,8 +5,6 @@
 
 #include "RffiPrefix.h"
 
-typedef struct OnFreeCtx OnFreeCtx;
-
 typedef struct RTypes_CryptoAuth2_t RTypes_CryptoAuth2_t;
 
 typedef struct Rffi_EventLoop Rffi_EventLoop;
@@ -34,7 +32,7 @@ typedef struct {
   Rffi_Address address;
 } Rffi_NetworkInterface;
 
-typedef void (*OnFreeFun)(void *ctx, OnFreeCtx *complete);
+typedef void (*OnFreeFun)(void *ctx);
 
 extern const uintptr_t Rffi_CURRENT_PROTOCOL;
 
@@ -99,6 +97,8 @@ Rffi_Glock_guard *Rffi_glock(void);
  */
 void Rffi_gunlock(Rffi_Glock_guard *guard);
 
+void Rffi_stopEventLoop(Rffi_EventLoop *event_loop);
+
 /**
  * Create a new EventLoop data repository.
  */
@@ -194,8 +194,6 @@ RTypes_Error_t *Rffi_error_fl(const char *msg, const char *file, int line, Alloc
 
 const char *Rffi_printError(RTypes_Error_t *e, Allocator_t *alloc);
 
-void Rffi_allocator_onFreeComplete(OnFreeCtx *c);
-
 /**
  * Create a root level allocator.
  */

+ 4 - 23
rust/cjdns_sys/src/rffi/allocator.rs

@@ -1,12 +1,10 @@
 use std::sync::Arc;
 use parking_lot::Mutex;
-use tokio::sync::oneshot;
 use std::ffi::{c_void, CStr};
 use std::os::raw::c_char;
 use std::any::Any;
 use crate::cffi::Allocator_t;
 use std::cell::{RefCell,Ref};
-use crate::gcl::GCL;
 
 struct Mem {
     loc: Vec<u128>,
@@ -93,7 +91,7 @@ fn get_to_free(
     v.push((Arc::clone(alloc), depth));
 }
 
-async fn free_allocs(mut allocs: Vec<(Arc<AllocatorInner>, i32)>) {
+fn free_allocs(mut allocs: Vec<(Arc<AllocatorInner>, i32)>) {
     allocs.sort_by(|a,b|b.1.cmp(&a.1));
     for (alloc, depth) in allocs.iter() {
         let jobs = {
@@ -107,13 +105,9 @@ async fn free_allocs(mut allocs: Vec<(Arc<AllocatorInner>, i32)>) {
             log::trace!("Freeing job {} {}/{} depth {}",
                 job.file_line.print(), i, jl, depth);
             i += 1;
-            let (tx, rx) = oneshot::channel();
-            let ofc = Arc::new(OnFreeCtx(Some(tx)));
             {
-                let _guard = GCL.lock();
-                (job.f)(job.c, &*ofc as *const OnFreeCtx as *mut OnFreeCtx);
+                (job.f)(job.c);
             }
-            rx.await.unwrap();
         }
     }
     for (alloc, _) in allocs {
@@ -141,9 +135,7 @@ pub struct Allocator {
 
 const MAGIC: u32 = 0xdeaffeed;
 
-pub struct OnFreeCtx(Option<oneshot::Sender<()>>);
-
-pub type OnFreeFun = extern "C" fn(ctx: *mut c_void, complete: *mut OnFreeCtx);
+pub type OnFreeFun = extern "C" fn(ctx: *mut c_void);
 
 pub struct FileLine{
     pub file_s: Option<&'static str>,
@@ -363,9 +355,7 @@ impl Allocator {
         let mut v = Vec::new();
         log::trace!("Freeing [{}] because [{}]", self.inner.ident.borrow(), source);
         get_to_free(parent.as_ref(), &self.inner, 0, &mut v, &self.inner.ident.borrow());
-        tokio::spawn(async {
-            free_allocs(v).await
-        });
+        free_allocs(v);
     }
 }
 
@@ -377,15 +367,6 @@ impl Drop for Allocator {
     }
 }
 
-#[no_mangle]
-pub extern "C" fn Rffi_allocator_onFreeComplete(c: *mut OnFreeCtx) {
-    if let Some(x) = unsafe { (*c).0.take() } {
-        x.send(()).unwrap();
-    } else {
-        panic!("onFreeComplete called twice");
-    }
-}
-
 /// Create a root level allocator.
 #[no_mangle]
 pub extern "C" fn Rffi_allocator_newRoot(file: *const c_char, line: usize) -> *mut Allocator_t {

+ 47 - 25
rust/cjdns_sys/src/rffi/event_loop/mod.rs

@@ -1,12 +1,13 @@
-use self::timeout::{Rffi_clearAllTimeouts, TimerTx};
+use self::timeout::TimerTx;
 use crate::cffi::{self,Allocator_t};
 use crate::rffi::allocator;
 use crate::gcl::GCL;
-use parking_lot::ReentrantMutexGuard;
+use parking_lot::{Mutex,ReentrantMutexGuard};
 use std::os::raw::{c_uint,c_void};
 use std::sync::atomic::{AtomicU32, Ordering};
-use std::sync::{Mutex, Weak};
-use std::sync::Arc;
+use std::sync::{Arc,Weak};
+use std::future::Future;
+use tokio::sync::broadcast;
 
 mod process;
 mod timeout;
@@ -36,51 +37,72 @@ pub struct EventLoop {
     ref_ctr: AtomicU32,
     /// Keep a loose track of all timers created, for clearAll.
     timers: Mutex<Vec<Weak<TimerTx>>>,
+    quit: broadcast::Receiver<()>,
+    quitter: Mutex<Option<broadcast::Sender<()>>>,
     /// Keep the event base from libuv so we can wake it up when needed
     base: *mut c_void,
 }
 unsafe impl Send for EventLoop {}
 unsafe impl Sync for EventLoop {}
+impl Drop for EventLoop {
+    fn drop(&mut self) {
+        log::trace!("EventLoop DROP");
+    }
+}
 pub struct Rffi_EventLoop(Arc<EventLoop>, u64);
 impl Rffi_EventLoop {
-    fn arc_clone(&self) -> Arc<EventLoop> {
+    fn arc_clone(&self) -> Rffi_EventLoop {
         assert!(self.1 == MAGIC);
-        Arc::clone(&self.0)
+        Rffi_EventLoop(Arc::clone(&self.0), MAGIC)
+    }
+    pub fn event_job<T>(&self, future: T) -> tokio::task::JoinHandle<()>
+    where
+        T: Future + Send + 'static,
+    {
+        let evl = self.arc_clone();
+        let mut quit = evl.0.quit.resubscribe();
+        evl.incr_ref();
+        tokio::spawn(async move {
+            tokio::select! {
+                _ = future => {},
+                _ = quit.recv() => {},
+            }
+            evl.decr_ref();
+        })
     }
-}
-
-impl EventLoop {
     fn incr_ref(&self) -> u32 {
-        self.ref_ctr.fetch_add(1, Ordering::Relaxed)
+        self.0.ref_ctr.fetch_add(1, Ordering::SeqCst)
     }
     fn decr_ref(&self) -> u32 {
-        let ret = self.ref_ctr.fetch_sub(1, Ordering::Relaxed);
-        if !self.base.is_null() && ret < (1_u32<<30) {
-            let _l = GCL.lock();
-            unsafe { cffi::EventBase_wakeup(self.base) };
-        }
+        let _l = GCL.lock();
+        let ret = self.0.ref_ctr.fetch_sub(1, Ordering::SeqCst);
+        unsafe { cffi::EventBase_wakeup(self.0.base) };
         ret
     }
     fn curr_ref(&self) -> u32 {
-        self.ref_ctr.load(Ordering::Relaxed)
+        self.0.ref_ctr.load(Ordering::SeqCst)
     }
 }
 
-impl Drop for Rffi_EventLoop {
-    fn drop(&mut self) {
-        println!("Rffi_EventLoop DROP");
-        self.0.ref_ctr.store(1_u32<<31, Ordering::Relaxed);
-        Rffi_clearAllTimeouts(self)
-    }
+#[no_mangle]
+pub extern "C" fn Rffi_stopEventLoop(event_loop: *mut Rffi_EventLoop) {
+    log::trace!("Rffi_stopEventLoop()");
+    let el = unsafe { (&*event_loop).arc_clone() };
+    let mut qg_l = el.0.quitter.lock();
+    let qg = qg_l.take();
+    drop(qg);
 }
 
 /// Create a new EventLoop data repository.
 #[no_mangle]
-pub extern "C" fn Rffi_mkEventLoop(alloc: *mut Allocator_t, base: *mut c_void) -> *mut Rffi_EventLoop {
-    let data = Arc::new(EventLoop {
+pub extern "C" fn Rffi_mkEventLoop<'a>(alloc: *mut Allocator_t, base: *mut c_void) -> *mut Rffi_EventLoop {
+    let (quitter, quit) = broadcast::channel(1);
+    let data = Arc::new(EventLoop{
         ref_ctr: AtomicU32::new(0),
         timers: Mutex::new(vec![]),
         base,
+        quit,
+        quitter: Mutex::new(Some(quitter)),
     });
     allocator::adopt(alloc, Rffi_EventLoop(data, MAGIC))
 }
@@ -91,5 +113,5 @@ pub extern "C" fn Rffi_eventLoopRefCtr(event_loop: *mut Rffi_EventLoop) -> c_uin
     unsafe {
         assert!((&*event_loop).1 == MAGIC);
         &*event_loop
-    }.0.curr_ref()
+    }.curr_ref()
 }

+ 1 - 3
rust/cjdns_sys/src/rffi/event_loop/process.rs

@@ -43,8 +43,7 @@ pub unsafe extern "C" fn Rffi_spawn(
     };
     let child_status = Command::new(file).args(&args).status();
 
-    let event_loop = (&*event_loop).arc_clone();
-    tokio::spawn(async move {
+    (&*event_loop).event_job(async move {
         match (child_status.await, cb) {
             (Ok(status), Some(callback)) => {
                 let _guard = GCL.lock();
@@ -56,7 +55,6 @@ pub unsafe extern "C" fn Rffi_spawn(
             (Ok(_), None) => {}
             (Err(err), _) => eprintln!("  error spawning child '{}': {:?}", file, err),
         }
-        drop(event_loop);
     });
     0
 }

+ 2 - 7
rust/cjdns_sys/src/rffi/event_loop/timeout.rs

@@ -30,7 +30,6 @@ pub struct Rffi_TimerTx(Arc<TimerTx>);
 pub struct TimerTx {
     tx: tokio::sync::mpsc::UnboundedSender<TimerCommand>,
     active: Arc<AtomicBool>,
-    cb_int: u64,
 }
 
 pub extern "C" fn timeout_on_free(j: *mut Allocator_OnFreeJob) -> c_int {
@@ -59,11 +58,10 @@ pub extern "C" fn Rffi_setTimeout(
     let rtx = Arc::new(TimerTx {
         tx,
         active: Arc::new(AtomicBool::new(true)),
-        cb_int,
     });
 
     let event_loop = unsafe { (&*event_loop).arc_clone() };
-    event_loop.timers.lock().unwrap().push(Arc::downgrade(&rtx));
+    event_loop.0.timers.lock().push(Arc::downgrade(&rtx));
 
     // do not clone rtx so we don't keep another tx around, so the "Allocator freed" detection works.
     // although with the new Drop impl for Rffi_EventLoop, the Cancel msg should get there first.
@@ -81,8 +79,7 @@ pub extern "C" fn Rffi_setTimeout(
         *out_timer_tx = timer_tx;
     }
 
-    event_loop.incr_ref();
-    tokio::spawn(async move {
+    event_loop.arc_clone().event_job(async move {
         let mut timeout_millis = timeout_millis;
         loop {
             tokio::select! {
@@ -122,7 +119,6 @@ pub extern "C" fn Rffi_setTimeout(
         }
         is_active.store(false, Ordering::Relaxed);
         //println!("({:#x}) timer task stopped", cb_int);
-        event_loop.decr_ref();
     });
 }
 
@@ -173,7 +169,6 @@ pub extern "C" fn Rffi_clearAllTimeouts(event_loop: *mut Rffi_EventLoop) {
         &*event_loop
     }.0.timers
         .lock()
-        .unwrap()
         .drain(..)
         .filter_map(|w| w.upgrade())
         .for_each(|timer_tx| {

+ 3 - 8
test/testcjdroute.c

@@ -252,12 +252,7 @@ static int main2(int argc, char** argv, struct Allocator* alloc, struct Random*
         now = runTest(TESTS[i].func, TESTS[i].name, now, argc, argv, quiet);
     }
     for (int i = 0; i < FUZZ_CASE_COUNT; i++) {
-        // TODO(cjd): Apparently a race condition in the allocator
-        // if you have async freeing in progress and then you come in and
-        // free the root allocator, you get an assertion.
-        //
-        //struct Allocator* child = Allocator_child(alloc);
-        struct Allocator* child = Allocator_new(1<<24);
+        struct Allocator* child = Allocator_child(alloc);
         now = runFuzzTestManual(child, detRand, FUZZ_CASES[i], now, quiet);
         Allocator_free(child);
     }
@@ -267,8 +262,6 @@ static int main2(int argc, char** argv, struct Allocator* alloc, struct Random*
                 (int)((now - startTime)/1000)%1000);
     }
 
-    // We need to drop the lock before we exit, otherwise the rust thread can't complete.
-    Glock_beginBlockingCall();
     return 0;
 }
 
@@ -283,5 +276,7 @@ int testcjdroute_main(int argc, char** argv)
     struct Random* detRand = Random_newWithSeed(alloc, NULL, rs, NULL);
     int out = main2(argc, argv, alloc, detRand);
     Allocator_free(alloc);
+    // We need to drop the lock before we exit, otherwise the rust thread can't complete.
+    Glock_beginBlockingCall();
     return out;
 }

+ 10 - 9
util/events/libuv/Event.c

@@ -41,27 +41,28 @@ static void handleEvent(uv_poll_t* handle, int status, int events)
     }
 }
 
-static void freeEvent2(uv_handle_t* handle)
+static void onClose(uv_handle_t* handle)
 {
-    Allocator_onFreeComplete((struct Allocator_OnFreeJob*)handle->data);
+    struct Event_pvt* event = Identity_check((struct Event_pvt*) handle->data);
+    Allocator_free(event->alloc);
 }
 
-static int freeEvent(struct Allocator_OnFreeJob* job)
+static int onFree(struct Allocator_OnFreeJob* job)
 {
     struct Event_pvt* event = Identity_check((struct Event_pvt*) job->userData);
-    event->handler.data = job;
-    uv_close((uv_handle_t*) &event->handler, freeEvent2);
-    return Allocator_ONFREE_ASYNC;
+    event->handler.data = event;
+    uv_close((uv_handle_t*) &event->handler, onClose);
+    return 0;
 }
 
 struct Event* Event_socketRead(void (* const callback)(void* callbackContext),
                                void* const callbackContext,
                                int s,
                                struct EventBase* eventBase,
-                               struct Allocator* allocator)
+                               struct Allocator* userAlloc)
 {
     struct EventBase_pvt* base = EventBase_privatize(eventBase);
-    struct Allocator* alloc = Allocator_child(allocator);
+    struct Allocator* alloc = Allocator_child(base->alloc);
     struct Event_pvt* out = Allocator_clone(alloc, (&(struct Event_pvt) {
         .callback = callback,
         .callbackContext = callbackContext,
@@ -77,7 +78,7 @@ struct Event* Event_socketRead(void (* const callback)(void* callbackContext),
 
     out->handler.data = out;
 
-    Allocator_onFree(alloc, freeEvent, out);
+    Allocator_onFree(userAlloc, onFree, out);
 
     return &out->pub;
 }

+ 54 - 38
util/events/libuv/EventBase.c

@@ -29,24 +29,38 @@ Js({ require("../util/events/libuv/libuv.js")(builder, js); })
     #include <sys/time.h>
 #endif
 
+static int onFree2(struct EventBase_pvt* ctx)
+{
+    if (!uv_is_closing((uv_handle_t*) &ctx->uvAwakener)) {
+        uv_close((uv_handle_t*) &ctx->uvAwakener, NULL);
+    }
+    if (!uv_is_closing((uv_handle_t*) &ctx->blockTimer)) {
+        uv_close((uv_handle_t*) &ctx->blockTimer, NULL);
+    }
+    uv_loop_delete(ctx->loop);
+    Allocator_free(ctx->alloc);
+    return 0;
+}
+
 static int onFree(struct Allocator_OnFreeJob* job)
 {
     struct EventBase_pvt* ctx = Identity_check((struct EventBase_pvt*) job->userData);
-    if (ctx->running) {
-        // The job will be completed in EventLoop_beginLoop()
-        ctx->onFree = job;
-        EventBase_endLoop((struct EventBase*) ctx);
-        return Allocator_ONFREE_ASYNC;
-    } else {
-        if (!uv_is_closing((uv_handle_t*) &ctx->uvAwakener)) {
-            uv_close((uv_handle_t*) &ctx->uvAwakener, NULL);
+    ctx->userAlloc = NULL;
+    Rffi_stopEventLoop(ctx->rffi_loop);
+    if (!ctx->running) {
+        if (Rffi_eventLoopRefCtr(ctx->rffi_loop)) {
+            // We cycle the loop once in order to let Rffi tear down
+            EventBase_beginLoop(&ctx->pub);
+        } else {
+            onFree2(ctx);
         }
-        if (!uv_is_closing((uv_handle_t*) &ctx->blockTimer)) {
-            uv_close((uv_handle_t*) &ctx->blockTimer, NULL);
-        }
-        uv_loop_delete(ctx->loop);
-        return 0;
+    } else if (ctx->running == 2) {
+        // Don't allow a request to quit when we are freeing
+        ctx->running = 1;
+    } else {
+        // Running and not freeing, continue...
     }
+    return 0;
 }
 
 static void calibrateTime(struct EventBase_pvt* base)
@@ -69,7 +83,7 @@ static void calibrateTime(struct EventBase_pvt* base)
     base->baseTime = (seconds * 1000) + milliseconds - uv_now(base->loop);
 }
 
-static void doNothing(uv_async_t* handle, int status)
+static void uvAwakener(uv_async_t* handle, int status)
 {
     struct EventBase_pvt* base = Identity_containerOf(handle, struct EventBase_pvt, uvAwakener);
     if (base->running == 2) {
@@ -86,18 +100,20 @@ static void blockTimer(uv_timer_t* timer, int status)
 
 struct EventBase* EventBase_new(struct Allocator* allocator)
 {
-    struct Allocator* alloc = Allocator_child(allocator);
-    struct EventBase_pvt* base = Allocator_calloc(alloc, sizeof(struct EventBase_pvt), 1);
+    struct Allocator* loopAlloc = Allocator_new(1<<24); // 4MB
+
+    struct EventBase_pvt* base = Allocator_calloc(loopAlloc, sizeof(struct EventBase_pvt), 1);
     base->loop = uv_loop_new();
-    base->rffi_loop = Rffi_mkEventLoop(alloc, &base->pub);
-    uv_async_init(base->loop, &base->uvAwakener, doNothing);
+    base->rffi_loop = Rffi_mkEventLoop(loopAlloc, &base->pub);
+    uv_async_init(base->loop, &base->uvAwakener, uvAwakener);
     uv_unref((uv_handle_t*) &base->uvAwakener);
     uv_timer_init(base->loop, &base->blockTimer);
     Assert_true(Rffi_eventLoopRefCtr(base->rffi_loop) == 0);
-    base->alloc = alloc;
+    base->alloc = loopAlloc;
+    base->userAlloc = allocator;
     Identity_set(base);
 
-    Allocator_onFree(alloc, onFree, base);
+    Allocator_onFree(allocator, onFree, base);
     calibrateTime(base);
     return &base->pub;
 }
@@ -109,38 +125,38 @@ void EventBase_beginLoop(struct EventBase* eventBase)
     Assert_true(!ctx->running); // double begin
     ctx->running = 1;
 
-    int ret = 0;
     do {
         uv_timer_start(&ctx->blockTimer, blockTimer, 1, 0);
         // start the loop.
         uv_run(ctx->loop, UV_RUN_DEFAULT);
-        if (ctx->onFree) { break; }
-        ret = Rffi_eventLoopRefCtr(ctx->rffi_loop);
-    } while (ret);
+        if (ctx->userAlloc == NULL) {
+            // Freeing = only exit if no more events left (they are being terminated)
+            if (Rffi_eventLoopRefCtr(ctx->rffi_loop) == 0) {
+                break;
+            }
+        } else if (ctx->running == 2) {
+            // Not freeing, request to exit
+            break;
+        } else if (Rffi_eventLoopRefCtr(ctx->rffi_loop) == 0) {
+            // Not freeing, no request to exit, cycle until no events left in Rust
+            break;
+        }
+    } while (1);
 
     ctx->running = 0;
 
-    if (ctx->onFree) {
-        if (!uv_is_closing((uv_handle_t*) &ctx->uvAwakener)) {
-            uv_close((uv_handle_t*) &ctx->uvAwakener, NULL);
-        }
-        if (!uv_is_closing((uv_handle_t*) &ctx->blockTimer)) {
-            uv_close((uv_handle_t*) &ctx->blockTimer, NULL);
-        }
-        // This can't be safely done because the loop might still be used
-        // until it is completely done with - and the memory is trashed.
-        //uv_loop_delete(ctx->loop);
-        Allocator_onFreeComplete(ctx->onFree);
-        return;
+    if (ctx->userAlloc == NULL) {
+        Assert_true(Rffi_eventLoopRefCtr(ctx->rffi_loop) == 0);
+        onFree2(ctx);
     }
 }
 
 void EventBase_endLoop(struct EventBase* eventBase)
 {
     struct EventBase_pvt* ctx = Identity_check((struct EventBase_pvt*) eventBase);
-    if (ctx->running == 0) { return; }
+    // End loop is a no-op when freeing, the loop will end when all events are shutdown.
+    if (ctx->running == 0 || ctx->userAlloc == NULL) { return; }
     ctx->running = 2;
-    Rffi_clearAllTimeouts(ctx->rffi_loop);
     uv_async_send(&ctx->uvAwakener);
 }
 

+ 5 - 6
util/events/libuv/EventBase_pvt.h

@@ -33,17 +33,16 @@ struct EventBase_pvt
 
     uv_timer_t blockTimer;
 
+    // This allocator is parent of all the event listeners registered with this loop
+    // It is freed only when the loop has no more events registered
     struct Allocator* alloc;
 
+    // This allocator comes from the user who creates the loop
+    struct Allocator* userAlloc;
+
     /** True if the loop is running. */
     int running;
 
-    /**
-     * The onFree job which is passed from onFree() to EventLoop_begin()
-     * so it can be completed after the loop has ended.
-     */
-    struct Allocator_OnFreeJob* onFree;
-
     /** Number of milliseconds since epoch when the clock was calibrated. */
     uint64_t baseTime;
 

+ 19 - 27
util/events/libuv/Pipe.c

@@ -39,12 +39,6 @@ struct Pipe_pvt
 
     uv_pipe_t peer;
 
-    /** Job to close the handles when the allocator is freed */
-    struct Allocator_OnFreeJob* closeHandlesOnFree;
-
-    /** Job which blocks the freeing until the callback completes */
-    struct Allocator_OnFreeJob* blockFreeInsideCallback;
-
     // true if we can pass file descriptors through this pipe
     bool ipc;
 
@@ -60,6 +54,7 @@ struct Pipe_pvt
     struct Pipe_WriteRequest_pvt* bufferedRequest;
 
     struct Allocator* alloc;
+    struct Allocator* userAlloc;
 
     struct Log* log;
 
@@ -132,6 +127,10 @@ static Iface_DEFUN sendMessage(struct Message* m, struct Iface* iface)
 {
     struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*) iface);
 
+    if (pipe->userAlloc == NULL) {
+        return NULL;
+    }
+
     if (pipe->queueLen > 50000) {
         return Error(m, "OVERFLOW");
     }
@@ -176,8 +175,8 @@ static void onClose(uv_handle_t* handle)
     struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*)handle->data);
     handle->data = NULL;
     Log_debug(pipe->log, "Pipe closed");
-    Assert_true(pipe->closeHandlesOnFree && !pipe->peer.data);
-    Allocator_onFreeComplete((struct Allocator_OnFreeJob*) pipe->closeHandlesOnFree);
+    Assert_true(!pipe->peer.data);
+    Allocator_free(pipe->alloc);
 }
 
 #if Pipe_PADDING_AMOUNT < 8
@@ -219,8 +218,8 @@ static void incoming(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf)
     }
 
     pipe->isInCallback = 0;
-    if (pipe->blockFreeInsideCallback) {
-        Allocator_onFreeComplete((struct Allocator_OnFreeJob*) pipe->blockFreeInsideCallback);
+    if (pipe->userAlloc == NULL) {
+        uv_close((uv_handle_t*) &pipe->peer, onClose);
     }
 }
 
@@ -256,6 +255,7 @@ static void connected(uv_connect_t* req, int status)
 {
     uv_stream_t* link = req->handle;
     struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*) link->data);
+    if (pipe->userAlloc == NULL) { return; }
     Log_debug(pipe->log, "Pipe [%s] established connection", pipe->pub.fullName);
 
     int ret;
@@ -285,24 +285,16 @@ static void connected(uv_connect_t* req, int status)
     }
 }
 
-static int blockFreeInsideCallback(struct Allocator_OnFreeJob* job)
+static int onFree(struct Allocator_OnFreeJob* job)
 {
     struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*)job->userData);
+    pipe->userAlloc = NULL;
     if (!pipe->isInCallback) {
-        return 0;
+        Assert_true(pipe->peer.data);
+        uv_close((uv_handle_t*) &pipe->peer, onClose);
+        EventBase_wakeup(pipe->base);        
     }
-    pipe->blockFreeInsideCallback = job;
-    return Allocator_ONFREE_ASYNC;
-}
-
-static int closeHandlesOnFree(struct Allocator_OnFreeJob* job)
-{
-    struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*)job->userData);
-    pipe->closeHandlesOnFree = job;
-    Assert_true(pipe->peer.data);
-    uv_close((uv_handle_t*) &pipe->peer, onClose);
-    EventBase_wakeup(pipe->base);
-    return Allocator_ONFREE_ASYNC;
+    return 0;
 }
 
 static Er_DEFUN(struct Pipe_pvt* newPipeAny(struct EventBase* eb,
@@ -312,7 +304,7 @@ static Er_DEFUN(struct Pipe_pvt* newPipeAny(struct EventBase* eb,
                                   struct Allocator* userAlloc))
 {
     struct EventBase_pvt* ctx = EventBase_privatize(eb);
-    struct Allocator* alloc = Allocator_child(userAlloc);
+    struct Allocator* alloc = Allocator_child(ctx->alloc);
 
     struct Pipe_pvt* out = Allocator_clone(alloc, (&(struct Pipe_pvt) {
         .pub = {
@@ -322,6 +314,7 @@ static Er_DEFUN(struct Pipe_pvt* newPipeAny(struct EventBase* eb,
             .fullName = (fullPath) ? CString_strdup(fullPath, alloc) : NULL,
         },
         .alloc = alloc,
+        .userAlloc = userAlloc,
         .log = log,
         .ipc = ipc,
         .base = eb,
@@ -332,8 +325,7 @@ static Er_DEFUN(struct Pipe_pvt* newPipeAny(struct EventBase* eb,
         Er_raise(alloc, "uv_pipe_init() failed [%s]", uv_strerror(ret));
     }
 
-    Allocator_onFree(alloc, closeHandlesOnFree, out);
-    Allocator_onFree(alloc, blockFreeInsideCallback, out);
+    Allocator_onFree(userAlloc, onFree, out);
 
     out->peer.data = out;
     Identity_set(out);

+ 21 - 15
util/events/libuv/PipeServer.c

@@ -63,6 +63,7 @@ struct PipeServer_pvt
     struct Allocator_OnFreeJob* closeHandlesOnFree;
 
     struct Allocator* alloc;
+    struct Allocator* userAlloc;
 
     struct EventBase_pvt* base;
 
@@ -90,6 +91,7 @@ static Iface_DEFUN sendMessage(struct Message* m, struct Iface* iface)
 static Iface_DEFUN incomingFromClient(struct Message* msg, struct Iface* iface)
 {
     struct Client* cli = Identity_containerOf(iface, struct Client, iface);
+    if (!cli->psp) { return NULL; }
     struct PipeServer_pvt* psp = Identity_check(cli->psp);
     Er_assert(AddrIface_pushAddr(msg, &cli->addr));
     return Iface_next(psp->pub.iface.iface, msg);
@@ -99,10 +101,7 @@ static Iface_DEFUN incomingFromClient(struct Message* msg, struct Iface* iface)
 static void onClose(uv_handle_t* handle)
 {
     struct PipeServer_pvt* psp = Identity_check((struct PipeServer_pvt*)handle->data);
-    handle->data = NULL;
-    if (psp->closeHandlesOnFree && !psp->server.data) {
-        Allocator_onFreeComplete((struct Allocator_OnFreeJob*) psp->closeHandlesOnFree);
-    }
+    Allocator_free(psp->alloc);
 }
 
 static struct Pipe* getPipe(struct PipeServer_pvt* psp, struct Allocator* alloc)
@@ -120,11 +119,13 @@ static struct Pipe* getPipe(struct PipeServer_pvt* psp, struct Allocator* alloc)
 static int removeClientOnFree(struct Allocator_OnFreeJob* job)
 {
     struct Client* cli = Identity_check((struct Client*)job->userData);
-    struct PipeServer_pvt* psp = Identity_check(cli->psp);
-    uint32_t handle = Sockaddr_addrHandle(&cli->addr);
-    int idx = Map_Clients_indexForHandle(handle, &psp->clients);
-    if (idx > -1) {
-        Map_Clients_remove(idx, &psp->clients);
+    if (cli->psp != NULL) {
+        struct PipeServer_pvt* psp = Identity_check(cli->psp);
+        uint32_t handle = Sockaddr_addrHandle(&cli->addr);
+        int idx = Map_Clients_indexForHandle(handle, &psp->clients);
+        if (idx > -1) {
+            Map_Clients_remove(idx, &psp->clients);
+        }
     }
     return 0;
 }
@@ -148,7 +149,7 @@ static void listenCallback(uv_stream_t* server, int status)
                  psp->pub.fullName, uv_strerror(status));
         return;
     }
-    struct Allocator* pipeAlloc = Allocator_child(psp->alloc);
+    struct Allocator* pipeAlloc = Allocator_child(psp->userAlloc);
     struct Pipe* p = getPipe(psp, pipeAlloc);
     if (p == NULL) {
         Allocator_free(pipeAlloc);
@@ -179,14 +180,18 @@ static void listenCallback(uv_stream_t* server, int status)
     cli->pipe->onClose = pipeOnClose;
 }
 
-static int closeHandlesOnFree(struct Allocator_OnFreeJob* job)
+static int onFree(struct Allocator_OnFreeJob* job)
 {
     struct PipeServer_pvt* psp = Identity_check((struct PipeServer_pvt*)job->userData);
-    psp->closeHandlesOnFree = job;
+    for (uint32_t i = 0; i < psp->clients.count; i++) {
+        // The clients will expire in their own time, just cut them loose so they don't
+        // try to reference the mothership after it's gone.
+        psp->clients.values[i]->pipe->onClose = NULL;
+        psp->clients.values[i]->psp = NULL;
+    }
     if (psp->server.data) {
         uv_close((uv_handle_t*) &psp->server, onClose);
         EventBase_wakeup(psp->base);
-        return Allocator_ONFREE_ASYNC;
     }
     return 0;
 }
@@ -198,7 +203,7 @@ static struct PipeServer_pvt* newPipeAny(struct EventBase* eb,
                                   struct Allocator* userAlloc)
 {
     struct EventBase_pvt* ctx = EventBase_privatize(eb);
-    struct Allocator* alloc = Allocator_child(userAlloc);
+    struct Allocator* alloc = Allocator_child(ctx->alloc);
 
     struct PipeServer_pvt* psp = Allocator_clone(alloc, (&(struct PipeServer_pvt) {
         .pub = {
@@ -209,6 +214,7 @@ static struct PipeServer_pvt* newPipeAny(struct EventBase* eb,
         .clients = { .allocator = alloc },
         .base = ctx,
         .alloc = alloc,
+        .userAlloc = userAlloc,
         .log = log,
     }));
     psp->pub.iface.iface = &psp->iface;
@@ -218,7 +224,7 @@ static struct PipeServer_pvt* newPipeAny(struct EventBase* eb,
         Except_throw(eh, "uv_pipe_init() failed [%s]", uv_strerror(ret));
     }
 
-    Allocator_onFree(alloc, closeHandlesOnFree, psp);
+    Allocator_onFree(userAlloc, onFree, psp);
 
     psp->server.data = psp;
     //out->out = &out->peer;

+ 22 - 35
util/events/libuv/UDPAddrIface.c

@@ -29,16 +29,11 @@ struct UDPAddrIface_pvt
 {
     struct UDPAddrIface pub;
 
-    struct Allocator* allocator;
+    struct Allocator* userAlloc;
+    struct Allocator* alloc;
 
     struct Log* logger;
 
-    /** Job to close the handle when the allocator is freed */
-    struct Allocator_OnFreeJob* closeHandleOnFree;
-
-    /** Job which blocks the freeing until the callback completes */
-    struct Allocator_OnFreeJob* blockFreeInsideCallback;
-
     Iface_t iface;
 
     uv_udp_t uvHandle;
@@ -99,7 +94,7 @@ static Iface_DEFUN incomingFromIface(struct Message* m, struct Iface* iface)
     }
 
     // This allocator will hold the message allocator in existance after it is freed.
-    struct Allocator* reqAlloc = Allocator_child(context->allocator);
+    struct Allocator* reqAlloc = Allocator_child(context->alloc);
     Allocator_adopt(reqAlloc, Message_getAlloc(m));
 
     struct UDPAddrIface_WriteRequest_pvt* req =
@@ -134,6 +129,13 @@ static Iface_DEFUN incomingFromIface(struct Message* m, struct Iface* iface)
     return NULL;
 }
 
+static void onClosed(uv_handle_t* wasClosed)
+{
+    struct UDPAddrIface_pvt* context =
+        Identity_check((struct UDPAddrIface_pvt*) wasClosed->data);
+    Allocator_free(context->alloc);
+}
+
 #if UDPAddrIface_PADDING_AMOUNT < 8
     #error
 #endif
@@ -179,8 +181,8 @@ static void incoming(uv_udp_t* handle,
     }
 
     context->inCallback = 0;
-    if (context->blockFreeInsideCallback) {
-        Allocator_onFreeComplete((struct Allocator_OnFreeJob*) context->blockFreeInsideCallback);
+    if (context->userAlloc == NULL) {
+        uv_close((uv_handle_t*)&context->uvHandle, onClosed);
     }
 }
 
@@ -190,7 +192,7 @@ static void allocate(uv_handle_t* handle, size_t size, uv_buf_t* buf)
 
     size = UDPAddrIface_BUFFER_CAP;
 
-    struct Allocator* child = Allocator_child(context->allocator);
+    struct Allocator* child = Allocator_child(context->alloc);
     struct Message* msg = Message_new(
         UDPAddrIface_BUFFER_CAP,
         UDPAddrIface_PADDING_AMOUNT + context->pub.generic.addr->addrLen,
@@ -203,31 +205,15 @@ static void allocate(uv_handle_t* handle, size_t size, uv_buf_t* buf)
     buf->len = size;
 }
 
-static void onClosed(uv_handle_t* wasClosed)
-{
-    struct UDPAddrIface_pvt* context =
-        Identity_check((struct UDPAddrIface_pvt*) wasClosed->data);
-    Allocator_onFreeComplete((struct Allocator_OnFreeJob*) context->closeHandleOnFree);
-}
-
-static int closeHandleOnFree(struct Allocator_OnFreeJob* job)
-{
-    struct UDPAddrIface_pvt* context =
-        Identity_check((struct UDPAddrIface_pvt*) job->userData);
-    context->closeHandleOnFree = job;
-    uv_close((uv_handle_t*)&context->uvHandle, onClosed);
-    return Allocator_ONFREE_ASYNC;
-}
-
-static int blockFreeInsideCallback(struct Allocator_OnFreeJob* job)
+static int onFree(struct Allocator_OnFreeJob* job)
 {
     struct UDPAddrIface_pvt* context =
         Identity_check((struct UDPAddrIface_pvt*) job->userData);
+    context->userAlloc = NULL;
     if (!context->inCallback) {
-        return 0;
+        uv_close((uv_handle_t*)&context->uvHandle, onClosed);
     }
-    context->blockFreeInsideCallback = job;
-    return Allocator_ONFREE_ASYNC;
+    return 0;
 }
 
 int UDPAddrIface_setDSCP(struct UDPAddrIface* iface, uint8_t dscp)
@@ -267,15 +253,17 @@ int UDPAddrIface_setBroadcast(struct UDPAddrIface* iface, bool enable)
 
 Er_DEFUN(struct UDPAddrIface* UDPAddrIface_new(struct EventBase* eventBase,
                                       struct Sockaddr* addr,
-                                      struct Allocator* alloc,
+                                      struct Allocator* userAlloc,
                                       struct Log* logger))
 {
     struct EventBase_pvt* base = EventBase_privatize(eventBase);
+    struct Allocator* alloc = Allocator_child(base->alloc);
 
     struct UDPAddrIface_pvt* context =
         Allocator_clone(alloc, (&(struct UDPAddrIface_pvt) {
             .logger = logger,
-            .allocator = alloc
+            .userAlloc = userAlloc,
+            .alloc = alloc,
         }));
     context->pub.generic.alloc = alloc;
     context->pub.generic.iface = &context->iface;
@@ -323,8 +311,7 @@ Er_DEFUN(struct UDPAddrIface* UDPAddrIface_new(struct EventBase* eventBase,
     context->pub.generic.addr = Sockaddr_clone(&ss.addr, alloc);
     Log_debug(logger, "Bound to address [%s]", Sockaddr_print(context->pub.generic.addr, alloc));
 
-    Allocator_onFree(alloc, closeHandleOnFree, context);
-    Allocator_onFree(alloc, blockFreeInsideCallback, context);
+    Allocator_onFree(userAlloc, onFree, context);
 
     Er_ret(&context->pub);
 }