123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386 |
- /*
- This file is part of GNUnet
- Copyright (C) 2008--2013 GNUnet e.V.
- GNUnet is free software: you can redistribute it and/or modify it
- under the terms of the GNU Affero General Public License as published
- by the Free Software Foundation, either version 3 of the License,
- or (at your option) any later version.
- GNUnet is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Affero General Public License for more details.
- You should have received a copy of the GNU Affero General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
- SPDX-License-Identifier: AGPL3.0-or-later
- */
- /**
- * @file testbed/testbed_api_operations.c
- * @brief functions to manage operation queues
- * @author Christian Grothoff
- * @author Sree Harsha Totakura
- */
- #include "platform.h"
- #include "testbed_api_operations.h"
- #include "testbed_api_sd.h"
- /**
- * The number of readings containing past operation's timing information that we
- * keep track of for adaptive queues
- */
- #define ADAPTIVE_QUEUE_DEFAULT_HISTORY 40
- /**
- * The number of parallel opeartions we start with by default for adaptive
- * queues
- */
- #define ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE 4
- /**
- * An entry in the operation queue
- */
- struct QueueEntry
- {
- /**
- * The next DLL pointer
- */
- struct QueueEntry *next;
- /**
- * The prev DLL pointer
- */
- struct QueueEntry *prev;
- /**
- * The operation this entry holds
- */
- struct GNUNET_TESTBED_Operation *op;
- /**
- * How many units of resources does the operation need
- */
- unsigned int nres;
- };
- /**
- * Queue of operations where we can only support a certain
- * number of concurrent operations of a particular type.
- */
- struct OperationQueue;
- /**
- * A slot to record time taken by an operation
- */
- struct TimeSlot
- {
- /**
- * DLL next pointer
- */
- struct TimeSlot *next;
- /**
- * DLL prev pointer
- */
- struct TimeSlot *prev;
- /**
- * This operation queue to which this time slot belongs to
- */
- struct OperationQueue *queue;
- /**
- * The operation to which this timeslot is currently allocated to
- */
- struct GNUNET_TESTBED_Operation *op;
- /**
- * Accumulated time
- */
- struct GNUNET_TIME_Relative tsum;
- /**
- * Number of timing values accumulated
- */
- unsigned int nvals;
- };
- /**
- * Context for operation queues of type OPERATION_QUEUE_TYPE_ADAPTIVE
- */
- struct FeedbackCtx
- {
- /**
- * Handle for calculating standard deviation
- */
- struct SDHandle *sd;
- /**
- * Head for DLL of time slots which are free to be allocated to operations
- */
- struct TimeSlot *alloc_head;
- /**
- * Tail for DLL of time slots which are free to be allocated to operations
- */
- struct TimeSlot *alloc_tail;
- /**
- * Pointer to the chunk of time slots. Free all time slots at a time using
- * this pointer.
- */
- struct TimeSlot *tslots_freeptr;
- /**
- * Number of time slots filled so far
- */
- unsigned int tslots_filled;
- /**
- * Bound on the maximum number of operations which can be active
- */
- unsigned int max_active_bound;
- /**
- * Number of operations that have failed
- */
- unsigned int nfailed;
- };
- /**
- * Queue of operations where we can only support a certain
- * number of concurrent operations of a particular type.
- */
- struct OperationQueue
- {
- /**
- * DLL head for the wait queue. Operations which are waiting for this
- * operation queue are put here
- */
- struct QueueEntry *wq_head;
- /**
- * DLL tail for the wait queue.
- */
- struct QueueEntry *wq_tail;
- /**
- * DLL head for the ready queue. Operations which are in this operation queue
- * and are in ready state are put here
- */
- struct QueueEntry *rq_head;
- /**
- * DLL tail for the ready queue
- */
- struct QueueEntry *rq_tail;
- /**
- * DLL head for the active queue. Operations which are in this operation
- * queue and are currently active are put here
- */
- struct QueueEntry *aq_head;
- /**
- * DLL tail for the active queue.
- */
- struct QueueEntry *aq_tail;
- /**
- * DLL head for the inactive queue. Operations which are inactive and can be
- * evicted if the queues it holds are maxed out and another operation begins
- * to wait on them.
- */
- struct QueueEntry *nq_head;
- /**
- * DLL tail for the inactive queue.
- */
- struct QueueEntry *nq_tail;
- /**
- * Feedback context; only relevant for adaptive operation queues. NULL for
- * fixed operation queues
- */
- struct FeedbackCtx *fctx;
- /**
- * The type of this opeartion queue
- */
- enum OperationQueueType type;
- /**
- * Number of operations that are currently active in this queue.
- */
- unsigned int active;
- /**
- * Max number of operations which can be active at any time in this queue.
- * This value can be changed either by calling
- * GNUNET_TESTBED_operation_queue_reset_max_active_() or by the adaptive
- * algorithm if this operation queue is of type #OPERATION_QUEUE_TYPE_ADAPTIVE
- */
- unsigned int max_active;
- /**
- * The number of resources occupied by failed operations in the current shot.
- * This is only relavant if the operation queue is of type
- * #OPERATION_QUEUE_TYPE_ADAPTIVE
- */
- unsigned int overload;
- /**
- * Is this queue marked for expiry?
- */
- unsigned int expired;
- };
- /**
- * Operation state
- */
- enum OperationState
- {
- /**
- * The operation is just created and is in initial state
- */
- OP_STATE_INIT,
- /**
- * The operation is currently waiting for resources
- */
- OP_STATE_WAITING,
- /**
- * The operation is ready to be started
- */
- OP_STATE_READY,
- /**
- * The operation has started and is active
- */
- OP_STATE_ACTIVE,
- /**
- * The operation is inactive. It still holds resources on the operation
- * queues. However, this operation will be evicted when another operation
- * requires resources from the maxed out queues this operation is holding
- * resources from.
- */
- OP_STATE_INACTIVE
- };
- /**
- * An entry in the ready queue (implemented as DLL)
- */
- struct ReadyQueueEntry
- {
- /**
- * next ptr for DLL
- */
- struct ReadyQueueEntry *next;
- /**
- * prev ptr for DLL
- */
- struct ReadyQueueEntry *prev;
- /**
- * The operation associated with this entry
- */
- struct GNUNET_TESTBED_Operation *op;
- };
- /**
- * Opaque handle to an abstract operation to be executed by the testing framework.
- */
- struct GNUNET_TESTBED_Operation
- {
- /**
- * Function to call when we have the resources to begin the operation.
- */
- OperationStart start;
- /**
- * Function to call to clean up after the operation (which may or may
- * not have been started yet).
- */
- OperationRelease release;
- /**
- * Closure for callbacks.
- */
- void *cb_cls;
- /**
- * Array of operation queues this Operation belongs to.
- */
- struct OperationQueue **queues;
- /**
- * Array of operation queue entries corresponding to this operation in
- * operation queues for this operation
- */
- struct QueueEntry **qentries;
- /**
- * Array of number of resources an operation need from each queue. The numbers
- * in this array should correspond to the queues array
- */
- unsigned int *nres;
- /**
- * Entry corresponding to this operation in ready queue. Will be NULL if the
- * operation is not marked as READY
- */
- struct ReadyQueueEntry *rq_entry;
- /**
- * Head pointer for DLL of tslots allocated to this operation
- */
- struct TimeSlot *tslots_head;
- /**
- * Tail pointer for DLL of tslots allocated to this operation
- */
- struct TimeSlot *tslots_tail;
- /**
- * The time at which the operation is started
- */
- struct GNUNET_TIME_Absolute tstart;
- /**
- * Number of queues in the operation queues array
- */
- unsigned int nqueues;
- /**
- * The state of the operation
- */
- enum OperationState state;
- /**
- * Is this a failed operation?
- */
- int failed;
- };
- /**
- * DLL head for the ready queue
- */
- static struct ReadyQueueEntry *rq_head;
- /**
- * DLL tail for the ready queue
- */
- static struct ReadyQueueEntry *rq_tail;
- /**
- * Array of operation queues which are to be destroyed
- */
- static struct OperationQueue **expired_opqs;
- /**
- * Number of expired operation queues in the above array
- */
- static unsigned int n_expired_opqs;
- /**
- * The id of the task to process the ready queue
- */
- struct GNUNET_SCHEDULER_Task *process_rq_task_id;
- /**
- * Assigns the given operation a time slot from the given operation queue
- *
- * @param op the operation
- * @param queue the operation queue
- * @return the timeslot
- */
- static void
- assign_timeslot (struct GNUNET_TESTBED_Operation *op,
- struct OperationQueue *queue)
- {
- struct FeedbackCtx *fctx = queue->fctx;
- struct TimeSlot *tslot;
- GNUNET_assert (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type);
- tslot = fctx->alloc_head;
- GNUNET_assert (NULL != tslot);
- GNUNET_CONTAINER_DLL_remove (fctx->alloc_head, fctx->alloc_tail, tslot);
- GNUNET_CONTAINER_DLL_insert_tail (op->tslots_head, op->tslots_tail, tslot);
- tslot->op = op;
- }
- /**
- * Removes a queue entry of an operation from one of the operation queues' lists
- * depending on the state of the operation
- *
- * @param op the operation whose entry has to be removed
- * @param index the index of the entry in the operation's array of queue entries
- */
- static void
- remove_queue_entry (struct GNUNET_TESTBED_Operation *op, unsigned int index)
- {
- struct OperationQueue *opq;
- struct QueueEntry *entry;
- opq = op->queues[index];
- entry = op->qentries[index];
- switch (op->state)
- {
- case OP_STATE_INIT:
- GNUNET_assert (0);
- break;
- case OP_STATE_WAITING:
- GNUNET_CONTAINER_DLL_remove (opq->wq_head, opq->wq_tail, entry);
- break;
- case OP_STATE_READY:
- GNUNET_CONTAINER_DLL_remove (opq->rq_head, opq->rq_tail, entry);
- break;
- case OP_STATE_ACTIVE:
- GNUNET_CONTAINER_DLL_remove (opq->aq_head, opq->aq_tail, entry);
- break;
- case OP_STATE_INACTIVE:
- GNUNET_CONTAINER_DLL_remove (opq->nq_head, opq->nq_tail, entry);
- break;
- }
- }
- /**
- * Changes the state of the operation while moving its associated queue entries
- * in the operation's operation queues
- *
- * @param op the operation whose state has to be changed
- * @param state the state the operation should have. It cannot be OP_STATE_INIT
- */
- static void
- change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state)
- {
- struct QueueEntry *entry;
- struct OperationQueue *opq;
- unsigned int cnt;
- unsigned int s;
- GNUNET_assert (OP_STATE_INIT != state);
- GNUNET_assert (NULL != op->queues);
- GNUNET_assert (NULL != op->nres);
- GNUNET_assert ((OP_STATE_INIT == op->state) || (NULL != op->qentries));
- GNUNET_assert (op->state != state);
- for (cnt = 0; cnt < op->nqueues; cnt++)
- {
- if (OP_STATE_INIT == op->state)
- {
- entry = GNUNET_new (struct QueueEntry);
- entry->op = op;
- entry->nres = op->nres[cnt];
- s = cnt;
- GNUNET_array_append (op->qentries, s, entry);
- }
- else
- {
- entry = op->qentries[cnt];
- remove_queue_entry (op, cnt);
- }
- opq = op->queues[cnt];
- switch (state)
- {
- case OP_STATE_INIT:
- GNUNET_assert (0);
- break;
- case OP_STATE_WAITING:
- GNUNET_CONTAINER_DLL_insert_tail (opq->wq_head, opq->wq_tail, entry);
- break;
- case OP_STATE_READY:
- GNUNET_CONTAINER_DLL_insert_tail (opq->rq_head, opq->rq_tail, entry);
- break;
- case OP_STATE_ACTIVE:
- GNUNET_CONTAINER_DLL_insert_tail (opq->aq_head, opq->aq_tail, entry);
- break;
- case OP_STATE_INACTIVE:
- GNUNET_CONTAINER_DLL_insert_tail (opq->nq_head, opq->nq_tail, entry);
- break;
- }
- }
- op->state = state;
- }
- /**
- * Removes an operation from the ready queue. Also stops the 'process_rq_task'
- * if the given operation is the last one in the queue.
- *
- * @param op the operation to be removed
- */
- static void
- rq_remove (struct GNUNET_TESTBED_Operation *op)
- {
- GNUNET_assert (NULL != op->rq_entry);
- GNUNET_CONTAINER_DLL_remove (rq_head, rq_tail, op->rq_entry);
- GNUNET_free (op->rq_entry);
- op->rq_entry = NULL;
- if ((NULL == rq_head) && (NULL != process_rq_task_id))
- {
- GNUNET_SCHEDULER_cancel (process_rq_task_id);
- process_rq_task_id = NULL;
- }
- }
- /**
- * Processes the ready queue by calling the operation start callback of the
- * operation at the head. The operation is then removed from the queue. The
- * task is scheduled to run again immediately until no more operations are in
- * the ready queue.
- *
- * @param cls NULL
- */
- static void
- process_rq_task (void *cls)
- {
- struct GNUNET_TESTBED_Operation *op;
- struct OperationQueue *queue;
- unsigned int cnt;
- process_rq_task_id = NULL;
- GNUNET_assert (NULL != rq_head);
- GNUNET_assert (NULL != (op = rq_head->op));
- rq_remove (op);
- if (NULL != rq_head)
- process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL);
- change_state (op, OP_STATE_ACTIVE);
- for (cnt = 0; cnt < op->nqueues; cnt++)
- {
- queue = op->queues[cnt];
- if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type)
- assign_timeslot (op, queue);
- }
- op->tstart = GNUNET_TIME_absolute_get ();
- if (NULL != op->start)
- op->start (op->cb_cls);
- }
- /**
- * Adds the operation to the ready queue and starts the 'process_rq_task'
- *
- * @param op the operation to be queued
- */
- static void
- rq_add (struct GNUNET_TESTBED_Operation *op)
- {
- struct ReadyQueueEntry *rq_entry;
- GNUNET_assert (NULL == op->rq_entry);
- rq_entry = GNUNET_new (struct ReadyQueueEntry);
- rq_entry->op = op;
- GNUNET_CONTAINER_DLL_insert_tail (rq_head, rq_tail, rq_entry);
- op->rq_entry = rq_entry;
- if (NULL == process_rq_task_id)
- process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL);
- }
- /**
- * Checks if the given operation queue is empty or not
- *
- * @param opq the operation queue
- * @return GNUNET_YES if the given operation queue has no operations; GNUNET_NO
- * otherwise
- */
- static int
- is_queue_empty (struct OperationQueue *opq)
- {
- if ((NULL != opq->wq_head)
- || (NULL != opq->rq_head)
- || (NULL != opq->aq_head)
- || (NULL != opq->nq_head))
- return GNUNET_NO;
- return GNUNET_YES;
- }
- /**
- * Checks if the given operation queue has enough resources to provide for the
- * operation of the given queue entry. It also checks if any inactive
- * operations are to be released in order to accommodate the needed resources
- * and returns them as an array.
- *
- * @param opq the operation queue to check for resource accommodation
- * @param entry the operation queue entry whose operation's resources are to be
- * accommodated
- * @param ops_ pointer to return the array of operations which are to be released
- * in order to accommodate the new operation. Can be NULL
- * @param n_ops_ the number of operations in ops_
- * @return GNUNET_YES if the given entry's operation can be accommodated in this
- * queue. GNUNET_NO if it cannot be accommodated; ops_ and n_ops_ will
- * be set to NULL and 0 respectively.
- */
- static int
- decide_capacity (struct OperationQueue *opq,
- struct QueueEntry *entry,
- struct GNUNET_TESTBED_Operation ***ops_,
- unsigned int *n_ops_)
- {
- struct QueueEntry **evict_entries;
- struct GNUNET_TESTBED_Operation **ops;
- struct GNUNET_TESTBED_Operation *op;
- unsigned int n_ops;
- unsigned int n_evict_entries;
- unsigned int need;
- unsigned int max;
- int deficit;
- int rval;
- GNUNET_assert (NULL != (op = entry->op));
- GNUNET_assert (0 < (need = entry->nres));
- ops = NULL;
- n_ops = 0;
- evict_entries = NULL;
- n_evict_entries = 0;
- rval = GNUNET_YES;
- if (OPERATION_QUEUE_TYPE_ADAPTIVE == opq->type)
- {
- GNUNET_assert (NULL != opq->fctx);
- GNUNET_assert (opq->max_active >= opq->overload);
- max = opq->max_active - opq->overload;
- }
- else
- max = opq->max_active;
- if (opq->active > max)
- {
- rval = GNUNET_NO;
- goto ret;
- }
- if ((opq->active + need) <= max)
- goto ret;
- deficit = need - (max - opq->active);
- for (entry = opq->nq_head;
- (0 < deficit) && (NULL != entry);
- entry = entry->next)
- {
- GNUNET_array_append (evict_entries, n_evict_entries, entry);
- deficit -= entry->nres;
- }
- if (0 < deficit)
- {
- rval = GNUNET_NO;
- goto ret;
- }
- for (n_ops = 0; n_ops < n_evict_entries;)
- {
- op = evict_entries[n_ops]->op;
- GNUNET_array_append (ops, n_ops, op); /* increments n-ops */
- }
- ret:
- GNUNET_free_non_null (evict_entries);
- if (NULL != ops_)
- *ops_ = ops;
- else
- GNUNET_free (ops);
- if (NULL != n_ops_)
- *n_ops_ = n_ops;
- return rval;
- }
- /**
- * Merges an array of operations into another, eliminating duplicates. No
- * ordering is guaranteed.
- *
- * @param old the array into which the merging is done.
- * @param n_old the number of operations in old array
- * @param new the array from which operations are to be merged
- * @param n_new the number of operations in new array
- */
- static void
- merge_ops (struct GNUNET_TESTBED_Operation ***old,
- unsigned int *n_old,
- struct GNUNET_TESTBED_Operation **new,
- unsigned int n_new)
- {
- struct GNUNET_TESTBED_Operation **cur;
- unsigned int i;
- unsigned int j;
- unsigned int n_cur;
- GNUNET_assert (NULL != old);
- n_cur = *n_old;
- cur = *old;
- for (i = 0; i < n_new; i++)
- {
- for (j = 0; j < *n_old; j++)
- {
- if (new[i] == cur[j])
- break;
- }
- if (j < *n_old)
- continue;
- GNUNET_array_append (cur, n_cur, new[j]);
- }
- *old = cur;
- *n_old = n_cur;
- }
- /**
- * Checks for the readiness of an operation and schedules a operation start task
- *
- * @param op the operation
- */
- static int
- check_readiness (struct GNUNET_TESTBED_Operation *op)
- {
- struct GNUNET_TESTBED_Operation **evict_ops;
- struct GNUNET_TESTBED_Operation **ops;
- unsigned int n_ops;
- unsigned int n_evict_ops;
- unsigned int i;
- GNUNET_assert (NULL == op->rq_entry);
- GNUNET_assert (OP_STATE_WAITING == op->state);
- evict_ops = NULL;
- n_evict_ops = 0;
- for (i = 0; i < op->nqueues; i++)
- {
- ops = NULL;
- n_ops = 0;
- if (GNUNET_NO == decide_capacity (op->queues[i], op->qentries[i],
- &ops, &n_ops))
- {
- GNUNET_free_non_null (evict_ops);
- return GNUNET_NO;
- }
- if (NULL == ops)
- continue;
- merge_ops (&evict_ops, &n_evict_ops, ops, n_ops);
- GNUNET_free (ops);
- }
- if (NULL != evict_ops)
- {
- for (i = 0; i < n_evict_ops; i++)
- GNUNET_TESTBED_operation_release_ (evict_ops[i]);
- GNUNET_free (evict_ops);
- evict_ops = NULL;
- /* Evicting the operations should schedule this operation */
- GNUNET_assert (OP_STATE_READY == op->state);
- return GNUNET_YES;
- }
- for (i = 0; i < op->nqueues; i++)
- op->queues[i]->active += op->nres[i];
- change_state (op, OP_STATE_READY);
- rq_add (op);
- return GNUNET_YES;
- }
- /**
- * Defers a ready to be executed operation back to waiting
- *
- * @param op the operation to defer
- */
- static void
- defer (struct GNUNET_TESTBED_Operation *op)
- {
- unsigned int i;
- GNUNET_assert (OP_STATE_READY == op->state);
- rq_remove (op);
- for (i = 0; i < op->nqueues; i++)
- {
- GNUNET_assert (op->queues[i]->active >= op->nres[i]);
- op->queues[i]->active -= op->nres[i];
- }
- change_state (op, OP_STATE_WAITING);
- }
- /**
- * Cleanups the array of timeslots of an operation queue. For each time slot in
- * the array, if it is allocated to an operation, it will be deallocated from
- * the operation
- *
- * @param queue the operation queue
- */
- static void
- cleanup_tslots (struct OperationQueue *queue)
- {
- struct FeedbackCtx *fctx = queue->fctx;
- struct TimeSlot *tslot;
- struct GNUNET_TESTBED_Operation *op;
- unsigned int cnt;
- GNUNET_assert (NULL != fctx);
- for (cnt = 0; cnt < queue->max_active; cnt++)
- {
- tslot = &fctx->tslots_freeptr[cnt];
- op = tslot->op;
- if (NULL == op)
- continue;
- GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot);
- }
- GNUNET_free_non_null (fctx->tslots_freeptr);
- fctx->tslots_freeptr = NULL;
- fctx->alloc_head = NULL;
- fctx->alloc_tail = NULL;
- fctx->tslots_filled = 0;
- }
- /**
- * Cleansup the existing timing slots and sets new timing slots in the given
- * queue to accommodate given number of max active operations.
- *
- * @param queue the queue
- * @param n the number of maximum active operations. If n is greater than the
- * maximum limit set while creating the queue, then the minimum of these two
- * will be selected as n
- */
- static void
- adaptive_queue_set_max_active (struct OperationQueue *queue, unsigned int n)
- {
- struct FeedbackCtx *fctx = queue->fctx;
- struct TimeSlot *tslot;
- unsigned int cnt;
- cleanup_tslots (queue);
- n = GNUNET_MIN (n, fctx->max_active_bound);
- fctx->tslots_freeptr = GNUNET_malloc (n * sizeof(struct TimeSlot));
- fctx->nfailed = 0;
- for (cnt = 0; cnt < n; cnt++)
- {
- tslot = &fctx->tslots_freeptr[cnt];
- tslot->queue = queue;
- GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail,
- tslot);
- }
- GNUNET_TESTBED_operation_queue_reset_max_active_ (queue, n);
- }
- /**
- * Adapts parallelism in an adaptive queue by using the statistical data from
- * the feedback context.
- *
- * @param queue the queue
- */
- static void
- adapt_parallelism (struct OperationQueue *queue)
- {
- struct GNUNET_TIME_Relative avg;
- struct FeedbackCtx *fctx;
- struct TimeSlot *tslot;
- int sd;
- unsigned int nvals;
- unsigned int cnt;
- unsigned int parallelism;
- avg = GNUNET_TIME_UNIT_ZERO;
- nvals = 0;
- fctx = queue->fctx;
- for (cnt = 0; cnt < queue->max_active; cnt++)
- {
- tslot = &fctx->tslots_freeptr[cnt];
- avg = GNUNET_TIME_relative_add (avg, tslot->tsum);
- nvals += tslot->nvals;
- }
- GNUNET_assert (nvals >= queue->max_active);
- GNUNET_assert (fctx->nfailed <= nvals);
- nvals -= fctx->nfailed;
- if (0 == nvals)
- {
- if (1 == queue->max_active)
- adaptive_queue_set_max_active (queue, 1);
- else
- adaptive_queue_set_max_active (queue, queue->max_active / 2);
- return;
- }
- avg = GNUNET_TIME_relative_divide (avg, nvals);
- GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
- if (GNUNET_SYSERR ==
- GNUNET_TESTBED_SD_deviation_factor_ (fctx->sd,
- (unsigned int) avg.rel_value_us,
- &sd))
- {
- adaptive_queue_set_max_active (queue, queue->max_active); /* no change */
- return;
- }
- parallelism = 0;
- if (-1 == sd)
- parallelism = queue->max_active + 1;
- if (sd <= -2)
- parallelism = queue->max_active * 2;
- if (1 == sd)
- parallelism = queue->max_active - 1;
- if (2 <= sd)
- parallelism = queue->max_active / 2;
- parallelism = GNUNET_MAX (parallelism, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE);
- adaptive_queue_set_max_active (queue, parallelism);
- #if 0
- /* old algorithm */
- if (sd < 0)
- sd = 0;
- GNUNET_assert (0 <= sd);
- // GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
- if (0 == sd)
- {
- adaptive_queue_set_max_active (queue, queue->max_active * 2);
- return;
- }
- if (1 == sd)
- {
- adaptive_queue_set_max_active (queue, queue->max_active + 1);
- return;
- }
- if (1 == queue->max_active)
- {
- adaptive_queue_set_max_active (queue, 1);
- return;
- }
- if (2 == sd)
- {
- adaptive_queue_set_max_active (queue, queue->max_active - 1);
- return;
- }
- adaptive_queue_set_max_active (queue, queue->max_active / 2);
- #endif
- }
- /**
- * update tslots with the operation's completion time. Additionally, if
- * updating a timeslot makes all timeslots filled in an adaptive operation
- * queue, call adapt_parallelism() for that queue.
- *
- * @param op the operation
- */
- static void
- update_tslots (struct GNUNET_TESTBED_Operation *op)
- {
- struct OperationQueue *queue;
- struct GNUNET_TIME_Relative t;
- struct TimeSlot *tslot;
- struct FeedbackCtx *fctx;
- unsigned int i;
- t = GNUNET_TIME_absolute_get_duration (op->tstart);
- while (NULL != (tslot = op->tslots_head)) /* update time slots */
- {
- queue = tslot->queue;
- fctx = queue->fctx;
- GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot);
- tslot->op = NULL;
- GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail,
- tslot);
- if (op->failed)
- {
- fctx->nfailed++;
- for (i = 0; i < op->nqueues; i++)
- if (queue == op->queues[i])
- break;
- GNUNET_assert (i != op->nqueues);
- op->queues[i]->overload += op->nres[i];
- }
- tslot->tsum = GNUNET_TIME_relative_add (tslot->tsum, t);
- if (0 != tslot->nvals++)
- continue;
- fctx->tslots_filled++;
- if (queue->max_active == fctx->tslots_filled)
- adapt_parallelism (queue);
- }
- }
- /**
- * Create an 'operation' to be performed.
- *
- * @param cls closure for the callbacks
- * @param start function to call to start the operation
- * @param release function to call to close down the operation
- * @return handle to the operation
- */
- struct GNUNET_TESTBED_Operation *
- GNUNET_TESTBED_operation_create_ (void *cls, OperationStart start,
- OperationRelease release)
- {
- struct GNUNET_TESTBED_Operation *op;
- op = GNUNET_new (struct GNUNET_TESTBED_Operation);
- op->start = start;
- op->state = OP_STATE_INIT;
- op->release = release;
- op->cb_cls = cls;
- return op;
- }
- /**
- * Create an operation queue.
- *
- * @param type the type of operation queue
- * @param max_active maximum number of operations in this
- * queue that can be active in parallel at the same time
- * @return handle to the queue
- */
- struct OperationQueue *
- GNUNET_TESTBED_operation_queue_create_ (enum OperationQueueType type,
- unsigned int max_active)
- {
- struct OperationQueue *queue;
- struct FeedbackCtx *fctx;
- queue = GNUNET_new (struct OperationQueue);
- queue->type = type;
- if (OPERATION_QUEUE_TYPE_FIXED == type)
- {
- queue->max_active = max_active;
- }
- else
- {
- fctx = GNUNET_new (struct FeedbackCtx);
- fctx->max_active_bound = max_active;
- fctx->sd = GNUNET_TESTBED_SD_init_ (ADAPTIVE_QUEUE_DEFAULT_HISTORY);
- queue->fctx = fctx;
- adaptive_queue_set_max_active (queue, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE);
- }
- return queue;
- }
- /**
- * Cleanup the given operation queue.
- *
- * @param queue the operation queue to destroy
- */
- static void
- queue_destroy (struct OperationQueue *queue)
- {
- struct FeedbackCtx *fctx;
- if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type)
- {
- cleanup_tslots (queue);
- fctx = queue->fctx;
- GNUNET_TESTBED_SD_destroy_ (fctx->sd);
- GNUNET_free (fctx);
- }
- GNUNET_free (queue);
- }
- /**
- * Destroys an operation queue. If the queue is still in use by operations it
- * is marked as expired and its resources are released in the destructor
- * GNUNET_TESTBED_operations_fini().
- *
- * @param queue queue to destroy
- */
- void
- GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue)
- {
- if (GNUNET_YES != is_queue_empty (queue))
- {
- GNUNET_assert (0 == queue->expired); /* Are you calling twice on same queue? */
- queue->expired = 1;
- GNUNET_array_append (expired_opqs, n_expired_opqs, queue);
- return;
- }
- queue_destroy (queue);
- }
- /**
- * Destroys the operation queue if it is empty. If not empty return GNUNET_NO.
- *
- * @param queue the queue to destroy if empty
- * @return GNUNET_YES if the queue is destroyed. GNUNET_NO if not (because it
- * is not empty)
- */
- int
- GNUNET_TESTBED_operation_queue_destroy_empty_ (struct OperationQueue *queue)
- {
- if (GNUNET_NO == is_queue_empty (queue))
- return GNUNET_NO;
- GNUNET_TESTBED_operation_queue_destroy_ (queue);
- return GNUNET_YES;
- }
- /**
- * Rechecks if any of the operations in the given operation queue's waiting list
- * can be made active
- *
- * @param opq the operation queue
- */
- static void
- recheck_waiting (struct OperationQueue *opq)
- {
- struct QueueEntry *entry;
- struct QueueEntry *entry2;
- entry = opq->wq_head;
- while (NULL != entry)
- {
- entry2 = entry->next;
- if (GNUNET_NO == check_readiness (entry->op))
- break;
- entry = entry2;
- }
- }
- /**
- * Function to reset the maximum number of operations in the given queue. If
- * max_active is lesser than the number of currently active operations, the
- * active operations are not stopped immediately.
- *
- * @param queue the operation queue which has to be modified
- * @param max_active the new maximum number of active operations
- */
- void
- GNUNET_TESTBED_operation_queue_reset_max_active_ (struct OperationQueue *queue,
- unsigned int max_active)
- {
- struct QueueEntry *entry;
- queue->max_active = max_active;
- queue->overload = 0;
- while ((queue->active > queue->max_active)
- && (NULL != (entry = queue->rq_head)))
- defer (entry->op);
- recheck_waiting (queue);
- }
- /**
- * Add an operation to a queue. An operation can be in multiple queues at
- * once. Once the operation is inserted into all the queues
- * GNUNET_TESTBED_operation_begin_wait_() has to be called to actually start
- * waiting for the operation to become active.
- *
- * @param queue queue to add the operation to
- * @param op operation to add to the queue
- * @param nres the number of units of the resources of queue needed by the
- * operation. Should be greater than 0.
- */
- void
- GNUNET_TESTBED_operation_queue_insert2_ (struct OperationQueue *queue,
- struct GNUNET_TESTBED_Operation *op,
- unsigned int nres)
- {
- unsigned int qsize;
- GNUNET_assert (0 < nres);
- qsize = op->nqueues;
- GNUNET_array_append (op->queues, op->nqueues, queue);
- GNUNET_array_append (op->nres, qsize, nres);
- GNUNET_assert (qsize == op->nqueues);
- }
- /**
- * Add an operation to a queue. An operation can be in multiple queues at
- * once. Once the operation is inserted into all the queues
- * GNUNET_TESTBED_operation_begin_wait_() has to be called to actually start
- * waiting for the operation to become active. The operation is assumed to take
- * 1 queue resource. Use GNUNET_TESTBED_operation_queue_insert2_() if it
- * requires more than 1
- *
- * @param queue queue to add the operation to
- * @param op operation to add to the queue
- */
- void
- GNUNET_TESTBED_operation_queue_insert_ (struct OperationQueue *queue,
- struct GNUNET_TESTBED_Operation *op)
- {
- return GNUNET_TESTBED_operation_queue_insert2_ (queue, op, 1);
- }
- /**
- * Marks the given operation as waiting on the queues. Once all queues permit
- * the operation to become active, the operation will be activated. The actual
- * activation will occur in a separate task (thus allowing multiple queue
- * insertions to be made without having the first one instantly trigger the
- * operation if the first queue has sufficient resources).
- *
- * @param op the operation to marks as waiting
- */
- void
- GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation *op)
- {
- GNUNET_assert (NULL == op->rq_entry);
- change_state (op, OP_STATE_WAITING);
- (void) check_readiness (op);
- }
- /**
- * Marks an active operation as inactive - the operation will be kept in a
- * ready-to-be-released state and continues to hold resources until another
- * operation contents for them.
- *
- * @param op the operation to be marked as inactive. The operation start
- * callback should have been called before for this operation to mark
- * it as inactive.
- */
- void
- GNUNET_TESTBED_operation_inactivate_ (struct GNUNET_TESTBED_Operation *op)
- {
- struct OperationQueue **queues;
- size_t ms;
- unsigned int nqueues;
- unsigned int i;
- GNUNET_assert (OP_STATE_ACTIVE == op->state);
- change_state (op, OP_STATE_INACTIVE);
- nqueues = op->nqueues;
- ms = sizeof(struct OperationQueue *) * nqueues;
- queues = GNUNET_malloc (ms);
- /* Cloning is needed as the operation be released by waiting operations and
- hence its nqueues memory ptr will be freed */
- GNUNET_memcpy (queues, op->queues, ms);
- for (i = 0; i < nqueues; i++)
- recheck_waiting (queues[i]);
- GNUNET_free (queues);
- }
- /**
- * Marks and inactive operation as active. This fuction should be called to
- * ensure that the oprelease callback will not be called until it is either
- * marked as inactive or released.
- *
- * @param op the operation to be marked as active
- */
- void
- GNUNET_TESTBED_operation_activate_ (struct GNUNET_TESTBED_Operation *op)
- {
- GNUNET_assert (OP_STATE_INACTIVE == op->state);
- change_state (op, OP_STATE_ACTIVE);
- }
- /**
- * An operation is 'done' (was cancelled or finished); remove
- * it from the queues and release associated resources.
- *
- * @param op operation that finished
- */
- void
- GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op)
- {
- struct QueueEntry *entry;
- struct OperationQueue *opq;
- unsigned int i;
- if (OP_STATE_INIT == op->state)
- {
- GNUNET_free (op);
- return;
- }
- if (OP_STATE_READY == op->state)
- rq_remove (op);
- if (OP_STATE_INACTIVE == op->state) /* Activate the operation if inactive */
- GNUNET_TESTBED_operation_activate_ (op);
- if (OP_STATE_ACTIVE == op->state)
- update_tslots (op);
- GNUNET_assert (NULL != op->queues);
- GNUNET_assert (NULL != op->qentries);
- for (i = 0; i < op->nqueues; i++)
- {
- entry = op->qentries[i];
- remove_queue_entry (op, i);
- opq = op->queues[i];
- switch (op->state)
- {
- case OP_STATE_INIT:
- case OP_STATE_INACTIVE:
- GNUNET_assert (0);
- break;
- case OP_STATE_WAITING:
- break;
- case OP_STATE_ACTIVE:
- case OP_STATE_READY:
- GNUNET_assert (0 != opq->active);
- GNUNET_assert (opq->active >= entry->nres);
- opq->active -= entry->nres;
- recheck_waiting (opq);
- break;
- }
- GNUNET_free (entry);
- }
- GNUNET_free_non_null (op->qentries);
- GNUNET_free (op->queues);
- GNUNET_free (op->nres);
- if (NULL != op->release)
- op->release (op->cb_cls);
- GNUNET_free (op);
- }
- /**
- * Marks an operation as failed
- *
- * @param op the operation to be marked as failed
- */
- void
- GNUNET_TESTBED_operation_mark_failed (struct GNUNET_TESTBED_Operation *op)
- {
- op->failed = GNUNET_YES;
- }
- /**
- * Cleanup expired operation queues. While doing so, also check for any
- * operations which are not completed and warn about them.
- */
- void __attribute__ ((destructor))
- GNUNET_TESTBED_operations_fini ()
- {
- struct OperationQueue *queue;
- unsigned int i;
- int warn = 0;
- for (i = 0; i < n_expired_opqs; i++)
- {
- queue = expired_opqs[i];
- if (GNUNET_NO == is_queue_empty (queue))
- warn = 1;
- queue_destroy (queue);
- }
- GNUNET_free_non_null (expired_opqs);
- n_expired_opqs = 0;
- if (warn)
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Be disciplined. Some operations were not marked as done.\n");
- }
- /* end of testbed_api_operations.c */
|