testbed_api_operations.c 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386
  1. /*
  2. This file is part of GNUnet
  3. Copyright (C) 2008--2013 GNUnet e.V.
  4. GNUnet is free software: you can redistribute it and/or modify it
  5. under the terms of the GNU Affero General Public License as published
  6. by the Free Software Foundation, either version 3 of the License,
  7. or (at your option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. Affero General Public License for more details.
  12. You should have received a copy of the GNU Affero General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. SPDX-License-Identifier: AGPL3.0-or-later
  15. */
  16. /**
  17. * @file testbed/testbed_api_operations.c
  18. * @brief functions to manage operation queues
  19. * @author Christian Grothoff
  20. * @author Sree Harsha Totakura
  21. */
  22. #include "platform.h"
  23. #include "testbed_api_operations.h"
  24. #include "testbed_api_sd.h"
  25. /**
  26. * The number of readings containing past operation's timing information that we
  27. * keep track of for adaptive queues
  28. */
  29. #define ADAPTIVE_QUEUE_DEFAULT_HISTORY 40
  30. /**
  31. * The number of parallel opeartions we start with by default for adaptive
  32. * queues
  33. */
  34. #define ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE 4
  35. /**
  36. * An entry in the operation queue
  37. */
  38. struct QueueEntry
  39. {
  40. /**
  41. * The next DLL pointer
  42. */
  43. struct QueueEntry *next;
  44. /**
  45. * The prev DLL pointer
  46. */
  47. struct QueueEntry *prev;
  48. /**
  49. * The operation this entry holds
  50. */
  51. struct GNUNET_TESTBED_Operation *op;
  52. /**
  53. * How many units of resources does the operation need
  54. */
  55. unsigned int nres;
  56. };
  57. /**
  58. * Queue of operations where we can only support a certain
  59. * number of concurrent operations of a particular type.
  60. */
  61. struct OperationQueue;
  62. /**
  63. * A slot to record time taken by an operation
  64. */
  65. struct TimeSlot
  66. {
  67. /**
  68. * DLL next pointer
  69. */
  70. struct TimeSlot *next;
  71. /**
  72. * DLL prev pointer
  73. */
  74. struct TimeSlot *prev;
  75. /**
  76. * This operation queue to which this time slot belongs to
  77. */
  78. struct OperationQueue *queue;
  79. /**
  80. * The operation to which this timeslot is currently allocated to
  81. */
  82. struct GNUNET_TESTBED_Operation *op;
  83. /**
  84. * Accumulated time
  85. */
  86. struct GNUNET_TIME_Relative tsum;
  87. /**
  88. * Number of timing values accumulated
  89. */
  90. unsigned int nvals;
  91. };
  92. /**
  93. * Context for operation queues of type OPERATION_QUEUE_TYPE_ADAPTIVE
  94. */
  95. struct FeedbackCtx
  96. {
  97. /**
  98. * Handle for calculating standard deviation
  99. */
  100. struct SDHandle *sd;
  101. /**
  102. * Head for DLL of time slots which are free to be allocated to operations
  103. */
  104. struct TimeSlot *alloc_head;
  105. /**
  106. * Tail for DLL of time slots which are free to be allocated to operations
  107. */
  108. struct TimeSlot *alloc_tail;
  109. /**
  110. * Pointer to the chunk of time slots. Free all time slots at a time using
  111. * this pointer.
  112. */
  113. struct TimeSlot *tslots_freeptr;
  114. /**
  115. * Number of time slots filled so far
  116. */
  117. unsigned int tslots_filled;
  118. /**
  119. * Bound on the maximum number of operations which can be active
  120. */
  121. unsigned int max_active_bound;
  122. /**
  123. * Number of operations that have failed
  124. */
  125. unsigned int nfailed;
  126. };
  127. /**
  128. * Queue of operations where we can only support a certain
  129. * number of concurrent operations of a particular type.
  130. */
  131. struct OperationQueue
  132. {
  133. /**
  134. * DLL head for the wait queue. Operations which are waiting for this
  135. * operation queue are put here
  136. */
  137. struct QueueEntry *wq_head;
  138. /**
  139. * DLL tail for the wait queue.
  140. */
  141. struct QueueEntry *wq_tail;
  142. /**
  143. * DLL head for the ready queue. Operations which are in this operation queue
  144. * and are in ready state are put here
  145. */
  146. struct QueueEntry *rq_head;
  147. /**
  148. * DLL tail for the ready queue
  149. */
  150. struct QueueEntry *rq_tail;
  151. /**
  152. * DLL head for the active queue. Operations which are in this operation
  153. * queue and are currently active are put here
  154. */
  155. struct QueueEntry *aq_head;
  156. /**
  157. * DLL tail for the active queue.
  158. */
  159. struct QueueEntry *aq_tail;
  160. /**
  161. * DLL head for the inactive queue. Operations which are inactive and can be
  162. * evicted if the queues it holds are maxed out and another operation begins
  163. * to wait on them.
  164. */
  165. struct QueueEntry *nq_head;
  166. /**
  167. * DLL tail for the inactive queue.
  168. */
  169. struct QueueEntry *nq_tail;
  170. /**
  171. * Feedback context; only relevant for adaptive operation queues. NULL for
  172. * fixed operation queues
  173. */
  174. struct FeedbackCtx *fctx;
  175. /**
  176. * The type of this opeartion queue
  177. */
  178. enum OperationQueueType type;
  179. /**
  180. * Number of operations that are currently active in this queue.
  181. */
  182. unsigned int active;
  183. /**
  184. * Max number of operations which can be active at any time in this queue.
  185. * This value can be changed either by calling
  186. * GNUNET_TESTBED_operation_queue_reset_max_active_() or by the adaptive
  187. * algorithm if this operation queue is of type #OPERATION_QUEUE_TYPE_ADAPTIVE
  188. */
  189. unsigned int max_active;
  190. /**
  191. * The number of resources occupied by failed operations in the current shot.
  192. * This is only relavant if the operation queue is of type
  193. * #OPERATION_QUEUE_TYPE_ADAPTIVE
  194. */
  195. unsigned int overload;
  196. /**
  197. * Is this queue marked for expiry?
  198. */
  199. unsigned int expired;
  200. };
  201. /**
  202. * Operation state
  203. */
  204. enum OperationState
  205. {
  206. /**
  207. * The operation is just created and is in initial state
  208. */
  209. OP_STATE_INIT,
  210. /**
  211. * The operation is currently waiting for resources
  212. */
  213. OP_STATE_WAITING,
  214. /**
  215. * The operation is ready to be started
  216. */
  217. OP_STATE_READY,
  218. /**
  219. * The operation has started and is active
  220. */
  221. OP_STATE_ACTIVE,
  222. /**
  223. * The operation is inactive. It still holds resources on the operation
  224. * queues. However, this operation will be evicted when another operation
  225. * requires resources from the maxed out queues this operation is holding
  226. * resources from.
  227. */
  228. OP_STATE_INACTIVE
  229. };
  230. /**
  231. * An entry in the ready queue (implemented as DLL)
  232. */
  233. struct ReadyQueueEntry
  234. {
  235. /**
  236. * next ptr for DLL
  237. */
  238. struct ReadyQueueEntry *next;
  239. /**
  240. * prev ptr for DLL
  241. */
  242. struct ReadyQueueEntry *prev;
  243. /**
  244. * The operation associated with this entry
  245. */
  246. struct GNUNET_TESTBED_Operation *op;
  247. };
  248. /**
  249. * Opaque handle to an abstract operation to be executed by the testing framework.
  250. */
  251. struct GNUNET_TESTBED_Operation
  252. {
  253. /**
  254. * Function to call when we have the resources to begin the operation.
  255. */
  256. OperationStart start;
  257. /**
  258. * Function to call to clean up after the operation (which may or may
  259. * not have been started yet).
  260. */
  261. OperationRelease release;
  262. /**
  263. * Closure for callbacks.
  264. */
  265. void *cb_cls;
  266. /**
  267. * Array of operation queues this Operation belongs to.
  268. */
  269. struct OperationQueue **queues;
  270. /**
  271. * Array of operation queue entries corresponding to this operation in
  272. * operation queues for this operation
  273. */
  274. struct QueueEntry **qentries;
  275. /**
  276. * Array of number of resources an operation need from each queue. The numbers
  277. * in this array should correspond to the queues array
  278. */
  279. unsigned int *nres;
  280. /**
  281. * Entry corresponding to this operation in ready queue. Will be NULL if the
  282. * operation is not marked as READY
  283. */
  284. struct ReadyQueueEntry *rq_entry;
  285. /**
  286. * Head pointer for DLL of tslots allocated to this operation
  287. */
  288. struct TimeSlot *tslots_head;
  289. /**
  290. * Tail pointer for DLL of tslots allocated to this operation
  291. */
  292. struct TimeSlot *tslots_tail;
  293. /**
  294. * The time at which the operation is started
  295. */
  296. struct GNUNET_TIME_Absolute tstart;
  297. /**
  298. * Number of queues in the operation queues array
  299. */
  300. unsigned int nqueues;
  301. /**
  302. * The state of the operation
  303. */
  304. enum OperationState state;
  305. /**
  306. * Is this a failed operation?
  307. */
  308. int failed;
  309. };
  310. /**
  311. * DLL head for the ready queue
  312. */
  313. static struct ReadyQueueEntry *rq_head;
  314. /**
  315. * DLL tail for the ready queue
  316. */
  317. static struct ReadyQueueEntry *rq_tail;
  318. /**
  319. * Array of operation queues which are to be destroyed
  320. */
  321. static struct OperationQueue **expired_opqs;
  322. /**
  323. * Number of expired operation queues in the above array
  324. */
  325. static unsigned int n_expired_opqs;
  326. /**
  327. * The id of the task to process the ready queue
  328. */
  329. struct GNUNET_SCHEDULER_Task *process_rq_task_id;
  330. /**
  331. * Assigns the given operation a time slot from the given operation queue
  332. *
  333. * @param op the operation
  334. * @param queue the operation queue
  335. * @return the timeslot
  336. */
  337. static void
  338. assign_timeslot (struct GNUNET_TESTBED_Operation *op,
  339. struct OperationQueue *queue)
  340. {
  341. struct FeedbackCtx *fctx = queue->fctx;
  342. struct TimeSlot *tslot;
  343. GNUNET_assert (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type);
  344. tslot = fctx->alloc_head;
  345. GNUNET_assert (NULL != tslot);
  346. GNUNET_CONTAINER_DLL_remove (fctx->alloc_head, fctx->alloc_tail, tslot);
  347. GNUNET_CONTAINER_DLL_insert_tail (op->tslots_head, op->tslots_tail, tslot);
  348. tslot->op = op;
  349. }
  350. /**
  351. * Removes a queue entry of an operation from one of the operation queues' lists
  352. * depending on the state of the operation
  353. *
  354. * @param op the operation whose entry has to be removed
  355. * @param index the index of the entry in the operation's array of queue entries
  356. */
  357. static void
  358. remove_queue_entry (struct GNUNET_TESTBED_Operation *op, unsigned int index)
  359. {
  360. struct OperationQueue *opq;
  361. struct QueueEntry *entry;
  362. opq = op->queues[index];
  363. entry = op->qentries[index];
  364. switch (op->state)
  365. {
  366. case OP_STATE_INIT:
  367. GNUNET_assert (0);
  368. break;
  369. case OP_STATE_WAITING:
  370. GNUNET_CONTAINER_DLL_remove (opq->wq_head, opq->wq_tail, entry);
  371. break;
  372. case OP_STATE_READY:
  373. GNUNET_CONTAINER_DLL_remove (opq->rq_head, opq->rq_tail, entry);
  374. break;
  375. case OP_STATE_ACTIVE:
  376. GNUNET_CONTAINER_DLL_remove (opq->aq_head, opq->aq_tail, entry);
  377. break;
  378. case OP_STATE_INACTIVE:
  379. GNUNET_CONTAINER_DLL_remove (opq->nq_head, opq->nq_tail, entry);
  380. break;
  381. }
  382. }
  383. /**
  384. * Changes the state of the operation while moving its associated queue entries
  385. * in the operation's operation queues
  386. *
  387. * @param op the operation whose state has to be changed
  388. * @param state the state the operation should have. It cannot be OP_STATE_INIT
  389. */
  390. static void
  391. change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state)
  392. {
  393. struct QueueEntry *entry;
  394. struct OperationQueue *opq;
  395. unsigned int cnt;
  396. unsigned int s;
  397. GNUNET_assert (OP_STATE_INIT != state);
  398. GNUNET_assert (NULL != op->queues);
  399. GNUNET_assert (NULL != op->nres);
  400. GNUNET_assert ((OP_STATE_INIT == op->state) || (NULL != op->qentries));
  401. GNUNET_assert (op->state != state);
  402. for (cnt = 0; cnt < op->nqueues; cnt++)
  403. {
  404. if (OP_STATE_INIT == op->state)
  405. {
  406. entry = GNUNET_new (struct QueueEntry);
  407. entry->op = op;
  408. entry->nres = op->nres[cnt];
  409. s = cnt;
  410. GNUNET_array_append (op->qentries, s, entry);
  411. }
  412. else
  413. {
  414. entry = op->qentries[cnt];
  415. remove_queue_entry (op, cnt);
  416. }
  417. opq = op->queues[cnt];
  418. switch (state)
  419. {
  420. case OP_STATE_INIT:
  421. GNUNET_assert (0);
  422. break;
  423. case OP_STATE_WAITING:
  424. GNUNET_CONTAINER_DLL_insert_tail (opq->wq_head, opq->wq_tail, entry);
  425. break;
  426. case OP_STATE_READY:
  427. GNUNET_CONTAINER_DLL_insert_tail (opq->rq_head, opq->rq_tail, entry);
  428. break;
  429. case OP_STATE_ACTIVE:
  430. GNUNET_CONTAINER_DLL_insert_tail (opq->aq_head, opq->aq_tail, entry);
  431. break;
  432. case OP_STATE_INACTIVE:
  433. GNUNET_CONTAINER_DLL_insert_tail (opq->nq_head, opq->nq_tail, entry);
  434. break;
  435. }
  436. }
  437. op->state = state;
  438. }
  439. /**
  440. * Removes an operation from the ready queue. Also stops the 'process_rq_task'
  441. * if the given operation is the last one in the queue.
  442. *
  443. * @param op the operation to be removed
  444. */
  445. static void
  446. rq_remove (struct GNUNET_TESTBED_Operation *op)
  447. {
  448. GNUNET_assert (NULL != op->rq_entry);
  449. GNUNET_CONTAINER_DLL_remove (rq_head, rq_tail, op->rq_entry);
  450. GNUNET_free (op->rq_entry);
  451. op->rq_entry = NULL;
  452. if ((NULL == rq_head) && (NULL != process_rq_task_id))
  453. {
  454. GNUNET_SCHEDULER_cancel (process_rq_task_id);
  455. process_rq_task_id = NULL;
  456. }
  457. }
  458. /**
  459. * Processes the ready queue by calling the operation start callback of the
  460. * operation at the head. The operation is then removed from the queue. The
  461. * task is scheduled to run again immediately until no more operations are in
  462. * the ready queue.
  463. *
  464. * @param cls NULL
  465. */
  466. static void
  467. process_rq_task (void *cls)
  468. {
  469. struct GNUNET_TESTBED_Operation *op;
  470. struct OperationQueue *queue;
  471. unsigned int cnt;
  472. process_rq_task_id = NULL;
  473. GNUNET_assert (NULL != rq_head);
  474. GNUNET_assert (NULL != (op = rq_head->op));
  475. rq_remove (op);
  476. if (NULL != rq_head)
  477. process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL);
  478. change_state (op, OP_STATE_ACTIVE);
  479. for (cnt = 0; cnt < op->nqueues; cnt++)
  480. {
  481. queue = op->queues[cnt];
  482. if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type)
  483. assign_timeslot (op, queue);
  484. }
  485. op->tstart = GNUNET_TIME_absolute_get ();
  486. if (NULL != op->start)
  487. op->start (op->cb_cls);
  488. }
  489. /**
  490. * Adds the operation to the ready queue and starts the 'process_rq_task'
  491. *
  492. * @param op the operation to be queued
  493. */
  494. static void
  495. rq_add (struct GNUNET_TESTBED_Operation *op)
  496. {
  497. struct ReadyQueueEntry *rq_entry;
  498. GNUNET_assert (NULL == op->rq_entry);
  499. rq_entry = GNUNET_new (struct ReadyQueueEntry);
  500. rq_entry->op = op;
  501. GNUNET_CONTAINER_DLL_insert_tail (rq_head, rq_tail, rq_entry);
  502. op->rq_entry = rq_entry;
  503. if (NULL == process_rq_task_id)
  504. process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL);
  505. }
  506. /**
  507. * Checks if the given operation queue is empty or not
  508. *
  509. * @param opq the operation queue
  510. * @return GNUNET_YES if the given operation queue has no operations; GNUNET_NO
  511. * otherwise
  512. */
  513. static int
  514. is_queue_empty (struct OperationQueue *opq)
  515. {
  516. if ((NULL != opq->wq_head)
  517. || (NULL != opq->rq_head)
  518. || (NULL != opq->aq_head)
  519. || (NULL != opq->nq_head))
  520. return GNUNET_NO;
  521. return GNUNET_YES;
  522. }
  523. /**
  524. * Checks if the given operation queue has enough resources to provide for the
  525. * operation of the given queue entry. It also checks if any inactive
  526. * operations are to be released in order to accommodate the needed resources
  527. * and returns them as an array.
  528. *
  529. * @param opq the operation queue to check for resource accommodation
  530. * @param entry the operation queue entry whose operation's resources are to be
  531. * accommodated
  532. * @param ops_ pointer to return the array of operations which are to be released
  533. * in order to accommodate the new operation. Can be NULL
  534. * @param n_ops_ the number of operations in ops_
  535. * @return GNUNET_YES if the given entry's operation can be accommodated in this
  536. * queue. GNUNET_NO if it cannot be accommodated; ops_ and n_ops_ will
  537. * be set to NULL and 0 respectively.
  538. */
  539. static int
  540. decide_capacity (struct OperationQueue *opq,
  541. struct QueueEntry *entry,
  542. struct GNUNET_TESTBED_Operation ***ops_,
  543. unsigned int *n_ops_)
  544. {
  545. struct QueueEntry **evict_entries;
  546. struct GNUNET_TESTBED_Operation **ops;
  547. struct GNUNET_TESTBED_Operation *op;
  548. unsigned int n_ops;
  549. unsigned int n_evict_entries;
  550. unsigned int need;
  551. unsigned int max;
  552. int deficit;
  553. int rval;
  554. GNUNET_assert (NULL != (op = entry->op));
  555. GNUNET_assert (0 < (need = entry->nres));
  556. ops = NULL;
  557. n_ops = 0;
  558. evict_entries = NULL;
  559. n_evict_entries = 0;
  560. rval = GNUNET_YES;
  561. if (OPERATION_QUEUE_TYPE_ADAPTIVE == opq->type)
  562. {
  563. GNUNET_assert (NULL != opq->fctx);
  564. GNUNET_assert (opq->max_active >= opq->overload);
  565. max = opq->max_active - opq->overload;
  566. }
  567. else
  568. max = opq->max_active;
  569. if (opq->active > max)
  570. {
  571. rval = GNUNET_NO;
  572. goto ret;
  573. }
  574. if ((opq->active + need) <= max)
  575. goto ret;
  576. deficit = need - (max - opq->active);
  577. for (entry = opq->nq_head;
  578. (0 < deficit) && (NULL != entry);
  579. entry = entry->next)
  580. {
  581. GNUNET_array_append (evict_entries, n_evict_entries, entry);
  582. deficit -= entry->nres;
  583. }
  584. if (0 < deficit)
  585. {
  586. rval = GNUNET_NO;
  587. goto ret;
  588. }
  589. for (n_ops = 0; n_ops < n_evict_entries;)
  590. {
  591. op = evict_entries[n_ops]->op;
  592. GNUNET_array_append (ops, n_ops, op); /* increments n-ops */
  593. }
  594. ret:
  595. GNUNET_free_non_null (evict_entries);
  596. if (NULL != ops_)
  597. *ops_ = ops;
  598. else
  599. GNUNET_free (ops);
  600. if (NULL != n_ops_)
  601. *n_ops_ = n_ops;
  602. return rval;
  603. }
  604. /**
  605. * Merges an array of operations into another, eliminating duplicates. No
  606. * ordering is guaranteed.
  607. *
  608. * @param old the array into which the merging is done.
  609. * @param n_old the number of operations in old array
  610. * @param new the array from which operations are to be merged
  611. * @param n_new the number of operations in new array
  612. */
  613. static void
  614. merge_ops (struct GNUNET_TESTBED_Operation ***old,
  615. unsigned int *n_old,
  616. struct GNUNET_TESTBED_Operation **new,
  617. unsigned int n_new)
  618. {
  619. struct GNUNET_TESTBED_Operation **cur;
  620. unsigned int i;
  621. unsigned int j;
  622. unsigned int n_cur;
  623. GNUNET_assert (NULL != old);
  624. n_cur = *n_old;
  625. cur = *old;
  626. for (i = 0; i < n_new; i++)
  627. {
  628. for (j = 0; j < *n_old; j++)
  629. {
  630. if (new[i] == cur[j])
  631. break;
  632. }
  633. if (j < *n_old)
  634. continue;
  635. GNUNET_array_append (cur, n_cur, new[j]);
  636. }
  637. *old = cur;
  638. *n_old = n_cur;
  639. }
  640. /**
  641. * Checks for the readiness of an operation and schedules a operation start task
  642. *
  643. * @param op the operation
  644. */
  645. static int
  646. check_readiness (struct GNUNET_TESTBED_Operation *op)
  647. {
  648. struct GNUNET_TESTBED_Operation **evict_ops;
  649. struct GNUNET_TESTBED_Operation **ops;
  650. unsigned int n_ops;
  651. unsigned int n_evict_ops;
  652. unsigned int i;
  653. GNUNET_assert (NULL == op->rq_entry);
  654. GNUNET_assert (OP_STATE_WAITING == op->state);
  655. evict_ops = NULL;
  656. n_evict_ops = 0;
  657. for (i = 0; i < op->nqueues; i++)
  658. {
  659. ops = NULL;
  660. n_ops = 0;
  661. if (GNUNET_NO == decide_capacity (op->queues[i], op->qentries[i],
  662. &ops, &n_ops))
  663. {
  664. GNUNET_free_non_null (evict_ops);
  665. return GNUNET_NO;
  666. }
  667. if (NULL == ops)
  668. continue;
  669. merge_ops (&evict_ops, &n_evict_ops, ops, n_ops);
  670. GNUNET_free (ops);
  671. }
  672. if (NULL != evict_ops)
  673. {
  674. for (i = 0; i < n_evict_ops; i++)
  675. GNUNET_TESTBED_operation_release_ (evict_ops[i]);
  676. GNUNET_free (evict_ops);
  677. evict_ops = NULL;
  678. /* Evicting the operations should schedule this operation */
  679. GNUNET_assert (OP_STATE_READY == op->state);
  680. return GNUNET_YES;
  681. }
  682. for (i = 0; i < op->nqueues; i++)
  683. op->queues[i]->active += op->nres[i];
  684. change_state (op, OP_STATE_READY);
  685. rq_add (op);
  686. return GNUNET_YES;
  687. }
  688. /**
  689. * Defers a ready to be executed operation back to waiting
  690. *
  691. * @param op the operation to defer
  692. */
  693. static void
  694. defer (struct GNUNET_TESTBED_Operation *op)
  695. {
  696. unsigned int i;
  697. GNUNET_assert (OP_STATE_READY == op->state);
  698. rq_remove (op);
  699. for (i = 0; i < op->nqueues; i++)
  700. {
  701. GNUNET_assert (op->queues[i]->active >= op->nres[i]);
  702. op->queues[i]->active -= op->nres[i];
  703. }
  704. change_state (op, OP_STATE_WAITING);
  705. }
  706. /**
  707. * Cleanups the array of timeslots of an operation queue. For each time slot in
  708. * the array, if it is allocated to an operation, it will be deallocated from
  709. * the operation
  710. *
  711. * @param queue the operation queue
  712. */
  713. static void
  714. cleanup_tslots (struct OperationQueue *queue)
  715. {
  716. struct FeedbackCtx *fctx = queue->fctx;
  717. struct TimeSlot *tslot;
  718. struct GNUNET_TESTBED_Operation *op;
  719. unsigned int cnt;
  720. GNUNET_assert (NULL != fctx);
  721. for (cnt = 0; cnt < queue->max_active; cnt++)
  722. {
  723. tslot = &fctx->tslots_freeptr[cnt];
  724. op = tslot->op;
  725. if (NULL == op)
  726. continue;
  727. GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot);
  728. }
  729. GNUNET_free_non_null (fctx->tslots_freeptr);
  730. fctx->tslots_freeptr = NULL;
  731. fctx->alloc_head = NULL;
  732. fctx->alloc_tail = NULL;
  733. fctx->tslots_filled = 0;
  734. }
  735. /**
  736. * Cleansup the existing timing slots and sets new timing slots in the given
  737. * queue to accommodate given number of max active operations.
  738. *
  739. * @param queue the queue
  740. * @param n the number of maximum active operations. If n is greater than the
  741. * maximum limit set while creating the queue, then the minimum of these two
  742. * will be selected as n
  743. */
  744. static void
  745. adaptive_queue_set_max_active (struct OperationQueue *queue, unsigned int n)
  746. {
  747. struct FeedbackCtx *fctx = queue->fctx;
  748. struct TimeSlot *tslot;
  749. unsigned int cnt;
  750. cleanup_tslots (queue);
  751. n = GNUNET_MIN (n, fctx->max_active_bound);
  752. fctx->tslots_freeptr = GNUNET_malloc (n * sizeof(struct TimeSlot));
  753. fctx->nfailed = 0;
  754. for (cnt = 0; cnt < n; cnt++)
  755. {
  756. tslot = &fctx->tslots_freeptr[cnt];
  757. tslot->queue = queue;
  758. GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail,
  759. tslot);
  760. }
  761. GNUNET_TESTBED_operation_queue_reset_max_active_ (queue, n);
  762. }
  763. /**
  764. * Adapts parallelism in an adaptive queue by using the statistical data from
  765. * the feedback context.
  766. *
  767. * @param queue the queue
  768. */
  769. static void
  770. adapt_parallelism (struct OperationQueue *queue)
  771. {
  772. struct GNUNET_TIME_Relative avg;
  773. struct FeedbackCtx *fctx;
  774. struct TimeSlot *tslot;
  775. int sd;
  776. unsigned int nvals;
  777. unsigned int cnt;
  778. unsigned int parallelism;
  779. avg = GNUNET_TIME_UNIT_ZERO;
  780. nvals = 0;
  781. fctx = queue->fctx;
  782. for (cnt = 0; cnt < queue->max_active; cnt++)
  783. {
  784. tslot = &fctx->tslots_freeptr[cnt];
  785. avg = GNUNET_TIME_relative_add (avg, tslot->tsum);
  786. nvals += tslot->nvals;
  787. }
  788. GNUNET_assert (nvals >= queue->max_active);
  789. GNUNET_assert (fctx->nfailed <= nvals);
  790. nvals -= fctx->nfailed;
  791. if (0 == nvals)
  792. {
  793. if (1 == queue->max_active)
  794. adaptive_queue_set_max_active (queue, 1);
  795. else
  796. adaptive_queue_set_max_active (queue, queue->max_active / 2);
  797. return;
  798. }
  799. avg = GNUNET_TIME_relative_divide (avg, nvals);
  800. GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
  801. if (GNUNET_SYSERR ==
  802. GNUNET_TESTBED_SD_deviation_factor_ (fctx->sd,
  803. (unsigned int) avg.rel_value_us,
  804. &sd))
  805. {
  806. adaptive_queue_set_max_active (queue, queue->max_active); /* no change */
  807. return;
  808. }
  809. parallelism = 0;
  810. if (-1 == sd)
  811. parallelism = queue->max_active + 1;
  812. if (sd <= -2)
  813. parallelism = queue->max_active * 2;
  814. if (1 == sd)
  815. parallelism = queue->max_active - 1;
  816. if (2 <= sd)
  817. parallelism = queue->max_active / 2;
  818. parallelism = GNUNET_MAX (parallelism, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE);
  819. adaptive_queue_set_max_active (queue, parallelism);
  820. #if 0
  821. /* old algorithm */
  822. if (sd < 0)
  823. sd = 0;
  824. GNUNET_assert (0 <= sd);
  825. // GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
  826. if (0 == sd)
  827. {
  828. adaptive_queue_set_max_active (queue, queue->max_active * 2);
  829. return;
  830. }
  831. if (1 == sd)
  832. {
  833. adaptive_queue_set_max_active (queue, queue->max_active + 1);
  834. return;
  835. }
  836. if (1 == queue->max_active)
  837. {
  838. adaptive_queue_set_max_active (queue, 1);
  839. return;
  840. }
  841. if (2 == sd)
  842. {
  843. adaptive_queue_set_max_active (queue, queue->max_active - 1);
  844. return;
  845. }
  846. adaptive_queue_set_max_active (queue, queue->max_active / 2);
  847. #endif
  848. }
  849. /**
  850. * update tslots with the operation's completion time. Additionally, if
  851. * updating a timeslot makes all timeslots filled in an adaptive operation
  852. * queue, call adapt_parallelism() for that queue.
  853. *
  854. * @param op the operation
  855. */
  856. static void
  857. update_tslots (struct GNUNET_TESTBED_Operation *op)
  858. {
  859. struct OperationQueue *queue;
  860. struct GNUNET_TIME_Relative t;
  861. struct TimeSlot *tslot;
  862. struct FeedbackCtx *fctx;
  863. unsigned int i;
  864. t = GNUNET_TIME_absolute_get_duration (op->tstart);
  865. while (NULL != (tslot = op->tslots_head)) /* update time slots */
  866. {
  867. queue = tslot->queue;
  868. fctx = queue->fctx;
  869. GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot);
  870. tslot->op = NULL;
  871. GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail,
  872. tslot);
  873. if (op->failed)
  874. {
  875. fctx->nfailed++;
  876. for (i = 0; i < op->nqueues; i++)
  877. if (queue == op->queues[i])
  878. break;
  879. GNUNET_assert (i != op->nqueues);
  880. op->queues[i]->overload += op->nres[i];
  881. }
  882. tslot->tsum = GNUNET_TIME_relative_add (tslot->tsum, t);
  883. if (0 != tslot->nvals++)
  884. continue;
  885. fctx->tslots_filled++;
  886. if (queue->max_active == fctx->tslots_filled)
  887. adapt_parallelism (queue);
  888. }
  889. }
  890. /**
  891. * Create an 'operation' to be performed.
  892. *
  893. * @param cls closure for the callbacks
  894. * @param start function to call to start the operation
  895. * @param release function to call to close down the operation
  896. * @return handle to the operation
  897. */
  898. struct GNUNET_TESTBED_Operation *
  899. GNUNET_TESTBED_operation_create_ (void *cls, OperationStart start,
  900. OperationRelease release)
  901. {
  902. struct GNUNET_TESTBED_Operation *op;
  903. op = GNUNET_new (struct GNUNET_TESTBED_Operation);
  904. op->start = start;
  905. op->state = OP_STATE_INIT;
  906. op->release = release;
  907. op->cb_cls = cls;
  908. return op;
  909. }
  910. /**
  911. * Create an operation queue.
  912. *
  913. * @param type the type of operation queue
  914. * @param max_active maximum number of operations in this
  915. * queue that can be active in parallel at the same time
  916. * @return handle to the queue
  917. */
  918. struct OperationQueue *
  919. GNUNET_TESTBED_operation_queue_create_ (enum OperationQueueType type,
  920. unsigned int max_active)
  921. {
  922. struct OperationQueue *queue;
  923. struct FeedbackCtx *fctx;
  924. queue = GNUNET_new (struct OperationQueue);
  925. queue->type = type;
  926. if (OPERATION_QUEUE_TYPE_FIXED == type)
  927. {
  928. queue->max_active = max_active;
  929. }
  930. else
  931. {
  932. fctx = GNUNET_new (struct FeedbackCtx);
  933. fctx->max_active_bound = max_active;
  934. fctx->sd = GNUNET_TESTBED_SD_init_ (ADAPTIVE_QUEUE_DEFAULT_HISTORY);
  935. queue->fctx = fctx;
  936. adaptive_queue_set_max_active (queue, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE);
  937. }
  938. return queue;
  939. }
  940. /**
  941. * Cleanup the given operation queue.
  942. *
  943. * @param queue the operation queue to destroy
  944. */
  945. static void
  946. queue_destroy (struct OperationQueue *queue)
  947. {
  948. struct FeedbackCtx *fctx;
  949. if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type)
  950. {
  951. cleanup_tslots (queue);
  952. fctx = queue->fctx;
  953. GNUNET_TESTBED_SD_destroy_ (fctx->sd);
  954. GNUNET_free (fctx);
  955. }
  956. GNUNET_free (queue);
  957. }
  958. /**
  959. * Destroys an operation queue. If the queue is still in use by operations it
  960. * is marked as expired and its resources are released in the destructor
  961. * GNUNET_TESTBED_operations_fini().
  962. *
  963. * @param queue queue to destroy
  964. */
  965. void
  966. GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue)
  967. {
  968. if (GNUNET_YES != is_queue_empty (queue))
  969. {
  970. GNUNET_assert (0 == queue->expired); /* Are you calling twice on same queue? */
  971. queue->expired = 1;
  972. GNUNET_array_append (expired_opqs, n_expired_opqs, queue);
  973. return;
  974. }
  975. queue_destroy (queue);
  976. }
  977. /**
  978. * Destroys the operation queue if it is empty. If not empty return GNUNET_NO.
  979. *
  980. * @param queue the queue to destroy if empty
  981. * @return GNUNET_YES if the queue is destroyed. GNUNET_NO if not (because it
  982. * is not empty)
  983. */
  984. int
  985. GNUNET_TESTBED_operation_queue_destroy_empty_ (struct OperationQueue *queue)
  986. {
  987. if (GNUNET_NO == is_queue_empty (queue))
  988. return GNUNET_NO;
  989. GNUNET_TESTBED_operation_queue_destroy_ (queue);
  990. return GNUNET_YES;
  991. }
  992. /**
  993. * Rechecks if any of the operations in the given operation queue's waiting list
  994. * can be made active
  995. *
  996. * @param opq the operation queue
  997. */
  998. static void
  999. recheck_waiting (struct OperationQueue *opq)
  1000. {
  1001. struct QueueEntry *entry;
  1002. struct QueueEntry *entry2;
  1003. entry = opq->wq_head;
  1004. while (NULL != entry)
  1005. {
  1006. entry2 = entry->next;
  1007. if (GNUNET_NO == check_readiness (entry->op))
  1008. break;
  1009. entry = entry2;
  1010. }
  1011. }
  1012. /**
  1013. * Function to reset the maximum number of operations in the given queue. If
  1014. * max_active is lesser than the number of currently active operations, the
  1015. * active operations are not stopped immediately.
  1016. *
  1017. * @param queue the operation queue which has to be modified
  1018. * @param max_active the new maximum number of active operations
  1019. */
  1020. void
  1021. GNUNET_TESTBED_operation_queue_reset_max_active_ (struct OperationQueue *queue,
  1022. unsigned int max_active)
  1023. {
  1024. struct QueueEntry *entry;
  1025. queue->max_active = max_active;
  1026. queue->overload = 0;
  1027. while ((queue->active > queue->max_active)
  1028. && (NULL != (entry = queue->rq_head)))
  1029. defer (entry->op);
  1030. recheck_waiting (queue);
  1031. }
  1032. /**
  1033. * Add an operation to a queue. An operation can be in multiple queues at
  1034. * once. Once the operation is inserted into all the queues
  1035. * GNUNET_TESTBED_operation_begin_wait_() has to be called to actually start
  1036. * waiting for the operation to become active.
  1037. *
  1038. * @param queue queue to add the operation to
  1039. * @param op operation to add to the queue
  1040. * @param nres the number of units of the resources of queue needed by the
  1041. * operation. Should be greater than 0.
  1042. */
  1043. void
  1044. GNUNET_TESTBED_operation_queue_insert2_ (struct OperationQueue *queue,
  1045. struct GNUNET_TESTBED_Operation *op,
  1046. unsigned int nres)
  1047. {
  1048. unsigned int qsize;
  1049. GNUNET_assert (0 < nres);
  1050. qsize = op->nqueues;
  1051. GNUNET_array_append (op->queues, op->nqueues, queue);
  1052. GNUNET_array_append (op->nres, qsize, nres);
  1053. GNUNET_assert (qsize == op->nqueues);
  1054. }
  1055. /**
  1056. * Add an operation to a queue. An operation can be in multiple queues at
  1057. * once. Once the operation is inserted into all the queues
  1058. * GNUNET_TESTBED_operation_begin_wait_() has to be called to actually start
  1059. * waiting for the operation to become active. The operation is assumed to take
  1060. * 1 queue resource. Use GNUNET_TESTBED_operation_queue_insert2_() if it
  1061. * requires more than 1
  1062. *
  1063. * @param queue queue to add the operation to
  1064. * @param op operation to add to the queue
  1065. */
  1066. void
  1067. GNUNET_TESTBED_operation_queue_insert_ (struct OperationQueue *queue,
  1068. struct GNUNET_TESTBED_Operation *op)
  1069. {
  1070. return GNUNET_TESTBED_operation_queue_insert2_ (queue, op, 1);
  1071. }
  1072. /**
  1073. * Marks the given operation as waiting on the queues. Once all queues permit
  1074. * the operation to become active, the operation will be activated. The actual
  1075. * activation will occur in a separate task (thus allowing multiple queue
  1076. * insertions to be made without having the first one instantly trigger the
  1077. * operation if the first queue has sufficient resources).
  1078. *
  1079. * @param op the operation to marks as waiting
  1080. */
  1081. void
  1082. GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation *op)
  1083. {
  1084. GNUNET_assert (NULL == op->rq_entry);
  1085. change_state (op, OP_STATE_WAITING);
  1086. (void) check_readiness (op);
  1087. }
  1088. /**
  1089. * Marks an active operation as inactive - the operation will be kept in a
  1090. * ready-to-be-released state and continues to hold resources until another
  1091. * operation contents for them.
  1092. *
  1093. * @param op the operation to be marked as inactive. The operation start
  1094. * callback should have been called before for this operation to mark
  1095. * it as inactive.
  1096. */
  1097. void
  1098. GNUNET_TESTBED_operation_inactivate_ (struct GNUNET_TESTBED_Operation *op)
  1099. {
  1100. struct OperationQueue **queues;
  1101. size_t ms;
  1102. unsigned int nqueues;
  1103. unsigned int i;
  1104. GNUNET_assert (OP_STATE_ACTIVE == op->state);
  1105. change_state (op, OP_STATE_INACTIVE);
  1106. nqueues = op->nqueues;
  1107. ms = sizeof(struct OperationQueue *) * nqueues;
  1108. queues = GNUNET_malloc (ms);
  1109. /* Cloning is needed as the operation be released by waiting operations and
  1110. hence its nqueues memory ptr will be freed */
  1111. GNUNET_memcpy (queues, op->queues, ms);
  1112. for (i = 0; i < nqueues; i++)
  1113. recheck_waiting (queues[i]);
  1114. GNUNET_free (queues);
  1115. }
  1116. /**
  1117. * Marks and inactive operation as active. This fuction should be called to
  1118. * ensure that the oprelease callback will not be called until it is either
  1119. * marked as inactive or released.
  1120. *
  1121. * @param op the operation to be marked as active
  1122. */
  1123. void
  1124. GNUNET_TESTBED_operation_activate_ (struct GNUNET_TESTBED_Operation *op)
  1125. {
  1126. GNUNET_assert (OP_STATE_INACTIVE == op->state);
  1127. change_state (op, OP_STATE_ACTIVE);
  1128. }
  1129. /**
  1130. * An operation is 'done' (was cancelled or finished); remove
  1131. * it from the queues and release associated resources.
  1132. *
  1133. * @param op operation that finished
  1134. */
  1135. void
  1136. GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op)
  1137. {
  1138. struct QueueEntry *entry;
  1139. struct OperationQueue *opq;
  1140. unsigned int i;
  1141. if (OP_STATE_INIT == op->state)
  1142. {
  1143. GNUNET_free (op);
  1144. return;
  1145. }
  1146. if (OP_STATE_READY == op->state)
  1147. rq_remove (op);
  1148. if (OP_STATE_INACTIVE == op->state) /* Activate the operation if inactive */
  1149. GNUNET_TESTBED_operation_activate_ (op);
  1150. if (OP_STATE_ACTIVE == op->state)
  1151. update_tslots (op);
  1152. GNUNET_assert (NULL != op->queues);
  1153. GNUNET_assert (NULL != op->qentries);
  1154. for (i = 0; i < op->nqueues; i++)
  1155. {
  1156. entry = op->qentries[i];
  1157. remove_queue_entry (op, i);
  1158. opq = op->queues[i];
  1159. switch (op->state)
  1160. {
  1161. case OP_STATE_INIT:
  1162. case OP_STATE_INACTIVE:
  1163. GNUNET_assert (0);
  1164. break;
  1165. case OP_STATE_WAITING:
  1166. break;
  1167. case OP_STATE_ACTIVE:
  1168. case OP_STATE_READY:
  1169. GNUNET_assert (0 != opq->active);
  1170. GNUNET_assert (opq->active >= entry->nres);
  1171. opq->active -= entry->nres;
  1172. recheck_waiting (opq);
  1173. break;
  1174. }
  1175. GNUNET_free (entry);
  1176. }
  1177. GNUNET_free_non_null (op->qentries);
  1178. GNUNET_free (op->queues);
  1179. GNUNET_free (op->nres);
  1180. if (NULL != op->release)
  1181. op->release (op->cb_cls);
  1182. GNUNET_free (op);
  1183. }
  1184. /**
  1185. * Marks an operation as failed
  1186. *
  1187. * @param op the operation to be marked as failed
  1188. */
  1189. void
  1190. GNUNET_TESTBED_operation_mark_failed (struct GNUNET_TESTBED_Operation *op)
  1191. {
  1192. op->failed = GNUNET_YES;
  1193. }
  1194. /**
  1195. * Cleanup expired operation queues. While doing so, also check for any
  1196. * operations which are not completed and warn about them.
  1197. */
  1198. void __attribute__ ((destructor))
  1199. GNUNET_TESTBED_operations_fini ()
  1200. {
  1201. struct OperationQueue *queue;
  1202. unsigned int i;
  1203. int warn = 0;
  1204. for (i = 0; i < n_expired_opqs; i++)
  1205. {
  1206. queue = expired_opqs[i];
  1207. if (GNUNET_NO == is_queue_empty (queue))
  1208. warn = 1;
  1209. queue_destroy (queue);
  1210. }
  1211. GNUNET_free_non_null (expired_opqs);
  1212. n_expired_opqs = 0;
  1213. if (warn)
  1214. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  1215. "Be disciplined. Some operations were not marked as done.\n");
  1216. }
  1217. /* end of testbed_api_operations.c */