Selaa lähdekoodia

Implement adding dependencies via control protocol.

Davin McCall 5 vuotta sitten
vanhempi
commit
0ad34d9fbb
4 muutettua tiedostoa jossa 151 lisäystä ja 3 poistoa
  1. 117 0
      src/control.cc
  2. 3 1
      src/includes/control-cmds.h
  3. 2 0
      src/includes/control.h
  4. 29 2
      src/includes/service.h

+ 117 - 0
src/control.cc

@@ -1,3 +1,6 @@
+#include <algorithm>
+#include <unordered_set>
+
 #include "control.h"
 #include "service.h"
 
@@ -66,6 +69,9 @@ bool control_conn_t::process_packet()
     if (pktType == DINIT_CP_LISTSERVICES) {
         return list_services();
     }
+    if (pktType == DINIT_CP_ADD_DEP) {
+        return add_service_dep();
+    }
     else {
         // Unrecognized: give error response
         char outbuf[] = { DINIT_RP_BADREQ };
@@ -370,6 +376,117 @@ bool control_conn_t::list_services()
     }
 }
 
+// check for value in a set
+static inline bool contains(const int (&v)[3], int i)
+{
+    return std::find(std::begin(v), std::end(v), i) != std::end(v);
+}
+
+bool control_conn_t::add_service_dep()
+{
+    // 1 byte packet type
+    // 1 byte dependency type
+    // handle: "from"
+    // handle: "to"
+
+    constexpr int pkt_size = 2 + sizeof(handle_t) * 2;
+
+    if (rbuf.get_length() < pkt_size) {
+        chklen = pkt_size;
+        return true;
+    }
+
+    handle_t from_handle;
+    handle_t to_handle;
+    rbuf.extract((char *) &from_handle, 2, sizeof(from_handle));
+    rbuf.extract((char *) &to_handle, 2 + sizeof(from_handle), sizeof(to_handle));
+
+    service_record *from_service = find_service_for_key(from_handle);
+    service_record *to_service = find_service_for_key(to_handle);
+    if (from_service == nullptr || to_service == nullptr || from_service == to_service) {
+        // Service handle is bad
+        char badreq_rep[] = { DINIT_RP_BADREQ };
+        if (! queue_packet(badreq_rep, 1)) return false;
+        bad_conn_close = true;
+        iob.set_watches(OUT_EVENTS);
+        return true;
+    }
+
+    // Check dependency type is valid:
+    int dep_type_int = rbuf[1];
+    if (! contains({(int)dependency_type::MILESTONE, (int)dependency_type::REGULAR,
+            (int)dependency_type::WAITS_FOR}, dep_type_int)) {
+        char badreqRep[] = { DINIT_RP_BADREQ };
+        if (! queue_packet(badreqRep, 1)) return false;
+        bad_conn_close = true;
+        iob.set_watches(OUT_EVENTS);
+    }
+    dependency_type dep_type = static_cast<dependency_type>(dep_type_int);
+
+    // Check current service states are valid for given dep type
+    if (dep_type == dependency_type::REGULAR) {
+        if (from_service->get_state() != service_state_t::STOPPED &&
+                to_service->get_state() != service_state_t::STARTED) {
+            // Cannot create dependency now since it would be contradicted:
+            char nak_rep[] = { DINIT_RP_NAK };
+            if (! queue_packet(nak_rep, 1)) return false;
+            rbuf.consume(pkt_size);
+            chklen = 0;
+            return true;
+        }
+    }
+
+    // Check for creation of circular dependency chain
+    std::unordered_set<service_record *> dep_marks;
+    std::vector<service_record *> dep_queue;
+    dep_queue.push_back(to_service);
+    while (! dep_queue.empty()) {
+        service_record * sr = dep_queue.back();
+        dep_queue.pop_back();
+        // iterate deps; if dep == from, abort; otherwise add to set/queue
+        // (only add to queue if not already in set)
+        for (auto &dep : sr->get_dependencies()) {
+            service_record * dep_to = dep.get_to();
+            if (dep_to == from_service) {
+                // fail, circular dependency!
+                char nak_rep[] = { DINIT_RP_NAK };
+                if (! queue_packet(nak_rep, 1)) return false;
+                rbuf.consume(pkt_size);
+                chklen = 0;
+                return true;
+            }
+            if (dep_marks.insert(dep_to).second) {
+                dep_queue.push_back(dep_to);
+            }
+        }
+    }
+    dep_marks.clear();
+    dep_queue.clear();
+
+    // Prevent creation of duplicate dependency:
+    for (auto &dep : from_service->get_dependencies()) {
+        service_record * dep_to = dep.get_to();
+        if (dep_to == to_service && dep.dep_type == dep_type) {
+            // Dependency already exists: return success
+            char ack_rep[] = { DINIT_RP_ACK };
+            if (! queue_packet(ack_rep, 1)) return false;
+            rbuf.consume(pkt_size);
+            chklen = 0;
+            return true;
+        }
+    }
+
+    // Create dependency:
+    from_service->add_dep(to_service, dep_type);
+    services->process_queues();
+
+    char ack_rep[] = { DINIT_RP_ACK };
+    if (! queue_packet(ack_rep, 1)) return false;
+    rbuf.consume(pkt_size);
+    chklen = 0;
+    return true;
+}
+
 control_conn_t::handle_t control_conn_t::allocate_service_handle(service_record *record)
 {
     // Try to find a unique handle (integer) in a single pass. Since the map is ordered, we can search until

+ 3 - 1
src/includes/control-cmds.h

@@ -29,7 +29,9 @@ constexpr static int DINIT_CP_UNLOADSERVICE = 9;
 constexpr static int DINIT_CP_SHUTDOWN = 10;
  // followed by 1-byte shutdown type
 
-
+// Add/remove dependency to existing service:
+constexpr static int DINIT_CP_ADD_DEP = 11;
+constexpr static int DINIT_CP_REM_DEP = 12;
 
 // Replies:
 

+ 2 - 0
src/includes/control.h

@@ -146,6 +146,8 @@ class control_conn_t : private service_listener
 
     bool list_services();
 
+    bool add_service_dep();
+
     // Notify that data is ready to be read from the socket. Returns true if the connection should
     // be closed.
     bool data_ready() noexcept;

+ 29 - 2
src/includes/service.h

@@ -202,12 +202,12 @@ class service_dep
             : from(from), to(to), waiting_on(false), holding_acq(false), dep_type(dep_type_p)
     {  }
 
-    service_record * get_from() noexcept
+    service_record * get_from() const noexcept
     {
         return from;
     }
 
-    service_record * get_to() noexcept
+    service_record * get_to() const noexcept
     {
         return to;
     }
@@ -646,6 +646,33 @@ class service_record
     {
         return 0;
     }
+
+    const dep_list & get_dependencies()
+    {
+        return depends_on;
+    }
+
+    // Add a dependency. Caller must ensure that the services are in an appropriate state and that
+    // a circular dependency chain is not created. Propagation queues should be processed after
+    // calling this. May throw std::bad_alloc.
+    void add_dep(service_record *to, dependency_type dep_type)
+    {
+        depends_on.emplace_back(this, to, dep_type);
+        try {
+            to->dependents.push_back(& depends_on.back());
+        }
+        catch (...) {
+            depends_on.pop_back();
+            throw;
+        }
+
+        if (dep_type == dependency_type::REGULAR) {
+            if (service_state == service_state_t::STARTING || service_state == service_state_t::STARTED) {
+                to->require();
+                depends_on.back().holding_acq = true;
+            }
+        }
+    }
 };
 
 inline auto extract_prop_queue(service_record *sr) -> decltype(sr->prop_queue_node) &