Browse Source

trigger: rework timeout handling and command queueing

Instead of queueing the full json_script, only queue actual script calls
issued by it. This fixes a long standing issue where trigger events were
dropped, triggered by the following scenario:

- Set up a trigger with timeout and condition check in the script.
- Fire an event that matches the condition.
- Fire another event that does not match the condition.

This series of events will fire the delay timer of the trigger, but the
second event will replace the trigger event data. When the timer
expires, the json_script is run, but no script call is issued

Signed-off-by: Felix Fietkau <nbd@nbd.name>
Felix Fietkau 7 years ago
parent
commit
2c9f5d4af1
1 changed files with 84 additions and 121 deletions
  1. 84 121
      service/trigger.c

+ 84 - 121
service/trigger.c

@@ -21,6 +21,8 @@
 #include <libubox/runqueue.h>
 #include <libubox/ustream.h>
 #include <libubox/uloop.h>
+#include <libubox/avl.h>
+#include <libubox/avl-cmp.h>
 
 #include <fcntl.h>
 #include <unistd.h>
@@ -32,37 +34,30 @@
 struct trigger {
 	struct list_head list;
 
+	void *id;
 	char *type;
-
-	int pending;
-	int remove;
 	int timeout;
 
-	void *id;
-
 	struct blob_attr *rule;
 	struct blob_attr *data;
-	struct uloop_timeout delay;
 
 	struct json_script_ctx jctx;
 };
 
-struct job;
-struct cmd {
-	char *name;
-	void (*handler)(struct job *job, struct blob_attr *exec, struct blob_attr *env);
-};
+struct trigger_command {
+	struct avl_node avl;
+	struct uloop_timeout delay;
+	bool requeue;
 
-struct job {
 	struct runqueue_process proc;
-	struct cmd *cmd;
-	struct trigger *trigger;
-	struct blob_attr *exec;
-	struct blob_attr *env;
+	struct json_script_ctx jctx;
+
+	struct blob_attr data[];
 };
 
 static LIST_HEAD(triggers);
 static RUNQUEUE(q, 1);
+static AVL_TREE(trigger_pending, avl_blobcmp, false, NULL);
 
 static const char* rule_handle_var(struct json_script_ctx *ctx, const char *name, struct blob_attr *vars)
 {
@@ -80,81 +75,45 @@ rule_load_script(struct json_script_ctx *ctx, const char *name)
 	return json_script_file_from_blobmsg(t->type, t->rule, blob_pad_len(t->rule));
 }
 
-static void q_job_run(struct runqueue *q, struct runqueue_task *t)
-{
-	struct job *j = container_of(t, struct job, proc.task);
-
-	DEBUG(4, "handle event %s\n", j->cmd->name);
-	j->cmd->handler(j, j->exec, j->env);
-}
-
 static void trigger_free(struct trigger *t)
 {
 	json_script_free(&t->jctx);
-	uloop_timeout_cancel(&t->delay);
 	free(t->data);
 	list_del(&t->list);
 	free(t);
 }
 
-static void q_job_complete(struct runqueue *q, struct runqueue_task *p)
+static void trigger_command_complete(struct runqueue *q, struct runqueue_task *p)
 {
-	struct job *j = container_of(p, struct job, proc.task);
+	struct trigger_command *cmd = container_of(p, struct trigger_command, proc.task);
 
-	if (j->trigger->remove) {
-		trigger_free(j->trigger);
-	} else {
-		j->trigger->pending = 0;
+	if (cmd->requeue) {
+		cmd->requeue = false;
+		runqueue_task_add(q, p, false);
+		return;
 	}
-	free(j);
-}
 
-static void add_job(struct trigger *t, struct cmd *cmd, struct blob_attr *exec, struct blob_attr *data)
-{
-	static const struct runqueue_task_type job_type = {
-		.run = q_job_run,
-		.cancel = runqueue_process_cancel_cb,
-		.kill = runqueue_process_kill_cb,
-	};
-	struct blob_attr *d, *e;
-	struct job *j = calloc_a(sizeof(*j), &e, blob_pad_len(exec), &d, blob_pad_len(data));
-
-	j->env = d;
-	j->exec = e;
-	j->cmd = cmd;
-	j->trigger = t;
-	j->proc.task.type = &job_type;
-	j->proc.task.complete = q_job_complete;
-	t->pending = 1;
-
-	memcpy(j->exec, exec, blob_pad_len(exec));
-	memcpy(j->env, data, blob_pad_len(data));
-
-	runqueue_task_add(&q, &j->proc.task, false);
+	avl_delete(&trigger_pending, &cmd->avl);
+	free(cmd);
 }
 
-static void _setenv(const char *key, const char *val)
+static void trigger_command_run(struct runqueue *q, struct runqueue_task *t)
 {
-	char _key[32];
-
-	snprintf(_key, sizeof(_key), "PARAM_%s", key);
-	setenv(_key, val, 1);
-}
-
-static void handle_run_script(struct job *j, struct blob_attr *exec, struct blob_attr *env)
-{
-	char *argv[8];
+	struct trigger_command *cmd = container_of(t, struct trigger_command, proc.task);
 	struct blob_attr *cur;
-	int rem;
-	int i = 0;
+	char **argv;
 	pid_t pid;
+	int n = 0;
+	int rem;
 
 	pid = fork();
-	if (pid < 0)
+	if (pid < 0) {
+		trigger_command_complete(q, t);
 		return;
+	}
 
 	if (pid) {
-		runqueue_process_add(&q, &j->proc, pid);
+		runqueue_process_add(q, &cmd->proc, pid);
 		return;
 	}
 
@@ -164,46 +123,75 @@ static void handle_run_script(struct job *j, struct blob_attr *exec, struct blob
 		close(STDERR_FILENO);
 	}
 
-	_setenv("type", j->trigger->type);
-	blobmsg_for_each_attr(cur, j->env, rem)
-		_setenv(blobmsg_name(cur), blobmsg_data(cur));
+	blobmsg_for_each_attr(cur, cmd->data, rem)
+		n++;
 
-	blobmsg_for_each_attr(cur, j->exec, rem) {
-		argv[i] = blobmsg_data(cur);
-		i++;
-		if (i == 7)
-			break;
-	}
+	argv = alloca((n + 1) * sizeof(*argv));
+	n = 0;
+	blobmsg_for_each_attr(cur, cmd->data, rem)
+		argv[n++] = blobmsg_get_string(cur);
+	argv[n] = NULL;
 
-	if (i > 0) {
-		argv[i] = NULL;
+	if (n > 0)
 		execvp(argv[0], &argv[0]);
-	}
 
 	exit(1);
 }
 
-static struct cmd handlers[] = {
-	{
-		.name = "run_script",
-		.handler = handle_run_script,
-	},
-};
+static void trigger_command_start(struct uloop_timeout *timeout)
+{
+	static const struct runqueue_task_type trigger_command_type = {
+		.run = trigger_command_run,
+		.cancel = runqueue_process_cancel_cb,
+		.kill = runqueue_process_kill_cb,
+	};
+	struct trigger_command *cmd = container_of(timeout, struct trigger_command, delay);
+
+	cmd->proc.task.type = &trigger_command_type;
+	cmd->proc.task.complete = trigger_command_complete;
+	runqueue_task_add(&q, &cmd->proc.task, false);
+}
+
+static void trigger_command_add(struct trigger *t, struct blob_attr *data)
+{
+	struct trigger_command *cmd;
+	int remaining;
+
+	cmd = avl_find_element(&trigger_pending, data, cmd, avl);
+	if (cmd) {
+		/* Command currently running? */
+		if (!cmd->delay.pending) {
+			cmd->requeue = true;
+			return;
+		}
+
+		/* Extend timer if trigger timeout is bigger than remaining time */
+		remaining = uloop_timeout_remaining(&cmd->delay);
+		if (remaining < t->timeout)
+			uloop_timeout_set(&cmd->delay, t->timeout);
+
+		return;
+	}
+
+	cmd = calloc(1, sizeof(*cmd) + blob_pad_len(data));
+	if (!cmd)
+		return;
+
+	cmd->avl.key = cmd->data;
+	cmd->delay.cb = trigger_command_start;
+	memcpy(cmd->data, data, blob_pad_len(data));
+	avl_insert(&trigger_pending, &cmd->avl);
+	uloop_timeout_set(&cmd->delay, t->timeout > 0 ? t->timeout : 1);
+}
 
 static void rule_handle_command(struct json_script_ctx *ctx, const char *name,
 				struct blob_attr *exec, struct blob_attr *vars)
 {
 	struct trigger *t = container_of(ctx, struct trigger, jctx);
-	int i;
 
-	if (t->pending)
+	if (!strcmp(name, "run_script")) {
+		trigger_command_add(t, exec);
 		return;
-
-	for (i = 0; i < ARRAY_SIZE(handlers); i++) {
-		if (!strcmp(handlers[i].name, name)) {
-			add_job(t, &handlers[i], exec, vars);
-			break;
-		}
 	}
 }
 
@@ -217,15 +205,6 @@ static void rule_handle_error(struct json_script_ctx *ctx, const char *msg,
 	free(s);
 }
 
-static void trigger_delay_cb(struct uloop_timeout *tout)
-{
-	struct trigger *t = container_of(tout, struct trigger, delay);
-
-	json_script_run(&t->jctx, t->type, t->data);
-	free(t->data);
-	t->data = NULL;
-}
-
 static struct trigger* _trigger_add(char *type, struct blob_attr *rule, int timeout, void *id)
 {
 	char *_t;
@@ -234,10 +213,7 @@ static struct trigger* _trigger_add(char *type, struct blob_attr *rule, int time
 
 	t->type = _t;
 	t->rule = _r;
-	t->delay.cb = trigger_delay_cb;
 	t->timeout = timeout;
-	t->pending = 0;
-	t->remove = 0;
 	t->id = id;
 	t->jctx.handle_var = rule_handle_var,
 	t->jctx.handle_error = rule_handle_error,
@@ -303,11 +279,6 @@ void trigger_del(void *id)
 		if (t->id != id)
 			continue;
 
-		if (t->pending) {
-			t->remove = 1;
-			continue;
-		}
-
 		trigger_free(t);
 	}
 }
@@ -325,16 +296,8 @@ void trigger_event(const char *type, struct blob_attr *data)
 	struct trigger *t;
 
 	list_for_each_entry(t, &triggers, list) {
-		if (t->remove)
-			continue;
 		if (!trigger_match(type, t->type))
 			continue;
-		if (t->timeout) {
-			free(t->data);
-			t->data = blob_memdup(data);
-			uloop_timeout_set(&t->delay, t->timeout);
-		} else {
-			json_script_run(&t->jctx, t->type, data);
-		}
+		json_script_run(&t->jctx, t->type, data);
 	}
 }