scheduler.c 78 KB


  1. /*
  2. This file is part of GNUnet
  3. Copyright (C) 2009-2017 GNUnet e.V.
  4. GNUnet is free software: you can redistribute it and/or modify it
  5. under the terms of the GNU Affero General Public License as published
  6. by the Free Software Foundation, either version 3 of the License,
  7. or (at your option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. Affero General Public License for more details.
  12. You should have received a copy of the GNU Affero General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. SPDX-License-Identifier: AGPL3.0-or-later
  15. */
  16. /**
  17. * @file util/scheduler.c
  18. * @brief schedule computations using continuation passing style
  19. * @author Christian Grothoff
  20. */
  21. #include "platform.h"
  22. #include "gnunet_util_lib.h"
  23. #include "disk.h"
  24. // DEBUG
  25. #include <inttypes.h>
  26. #define LOG(kind, ...) GNUNET_log_from (kind, "util-scheduler", __VA_ARGS__)
  27. #define LOG_STRERROR(kind, syscall) GNUNET_log_from_strerror (kind, \
  28. "util-scheduler", \
  29. syscall)
  30. #if HAVE_EXECINFO_H
  31. #include "execinfo.h"
  32. /**
  33. * Use lsof to generate file descriptor reports on select error?
  34. * (turn off for stable releases).
  35. */
  36. #define USE_LSOF GNUNET_NO
  37. /**
  38. * Obtain trace information for all scheduler calls that schedule tasks.
  39. */
  40. #define EXECINFO GNUNET_NO
  41. /**
  42. * Check each file descriptor before adding
  43. */
  44. #define DEBUG_FDS GNUNET_NO
  45. /**
  46. * Depth of the traces collected via EXECINFO.
  47. */
  48. #define MAX_TRACE_DEPTH 50
  49. #endif
  50. /**
  51. * Should we figure out which tasks are delayed for a while
  52. * before they are run? (Consider using in combination with EXECINFO).
  53. */
  54. #define PROFILE_DELAYS GNUNET_NO
  55. /**
  56. * Task that were in the queue for longer than this are reported if
  57. * PROFILE_DELAYS is active.
  58. */
  59. #define DELAY_THRESHOLD GNUNET_TIME_UNIT_SECONDS
  60. /**
  61. * Argument to be passed from the driver to
  62. * #GNUNET_SCHEDULER_do_work(). Contains the
  63. * scheduler's internal state.
  64. */
  65. struct GNUNET_SCHEDULER_Handle
  66. {
  67. /**
  68. * Passed here to avoid constantly allocating/deallocating
  69. * this element, but generally we want to get rid of this.
  70. * @deprecated
  71. */
  72. struct GNUNET_NETWORK_FDSet *rs;
  73. /**
  74. * Passed here to avoid constantly allocating/deallocating
  75. * this element, but generally we want to get rid of this.
  76. * @deprecated
  77. */
  78. struct GNUNET_NETWORK_FDSet *ws;
  79. /**
  80. * context of the SIGINT handler
  81. */
  82. struct GNUNET_SIGNAL_Context *shc_int;
  83. /**
  84. * context of the SIGTERM handler
  85. */
  86. struct GNUNET_SIGNAL_Context *shc_term;
  87. #if (SIGTERM != GNUNET_TERM_SIG)
  88. /**
  89. * context of the TERM_SIG handler
  90. */
  91. struct GNUNET_SIGNAL_Context *shc_gterm;
  92. #endif
  93. /**
  94. * context of the SIGQUIT handler
  95. */
  96. struct GNUNET_SIGNAL_Context *shc_quit;
  97. /**
  98. * context of the SIGHUP handler
  99. */
  100. struct GNUNET_SIGNAL_Context *shc_hup;
  101. /**
  102. * context of the SIGPIPE handler
  103. */
  104. struct GNUNET_SIGNAL_Context *shc_pipe;
  105. };
  106. /**
  107. * Entry in list of pending tasks.
  108. */
  109. struct GNUNET_SCHEDULER_Task
  110. {
  111. /**
  112. * This is a linked list.
  113. */
  114. struct GNUNET_SCHEDULER_Task *next;
  115. /**
  116. * This is a linked list.
  117. */
  118. struct GNUNET_SCHEDULER_Task *prev;
  119. /**
  120. * Function to run when ready.
  121. */
  122. GNUNET_SCHEDULER_TaskCallback callback;
  123. /**
  124. * Closure for the @e callback.
  125. */
  126. void *callback_cls;
  127. /**
  128. * Information about which FDs are ready for this task (and why).
  129. */
  130. struct GNUNET_SCHEDULER_FdInfo *fds;
  131. /**
  132. * Storage location used for @e fds if we want to avoid
  133. * a separate malloc() call in the common case that this
  134. * task is only about a single FD.
  135. */
  136. struct GNUNET_SCHEDULER_FdInfo fdx;
  137. /**
  138. * Size of the @e fds array.
  139. */
  140. unsigned int fds_len;
  141. /**
  142. * Do we own the network and file handles referenced by the FdInfo
  143. * structs in the fds array. This will only be GNUNET_YES if the
  144. * task was created by the #GNUNET_SCHEDULER_add_select function.
  145. */
  146. int own_handles;
  147. /**
  148. * Absolute timeout value for the task, or
  149. * #GNUNET_TIME_UNIT_FOREVER_ABS for "no timeout".
  150. */
  151. struct GNUNET_TIME_Absolute timeout;
  152. #if PROFILE_DELAYS
  153. /**
  154. * When was the task scheduled?
  155. */
  156. struct GNUNET_TIME_Absolute start_time;
  157. #endif
  158. /**
  159. * Why is the task ready? Set after task is added to ready queue.
  160. * Initially set to zero. All reasons that have already been
  161. * satisfied (i.e. read or write ready) will be set over time.
  162. */
  163. enum GNUNET_SCHEDULER_Reason reason;
  164. /**
  165. * Task priority.
  166. */
  167. enum GNUNET_SCHEDULER_Priority priority;
  168. /**
  169. * Set if we only wait for reading from a single FD, otherwise -1.
  170. */
  171. int read_fd;
  172. /**
  173. * Set if we only wait for writing to a single FD, otherwise -1.
  174. */
  175. int write_fd;
  176. /**
  177. * Should the existence of this task in the queue be counted as
  178. * reason to not shutdown the scheduler?
  179. */
  180. int lifeness;
  181. /**
  182. * Is this task run on shutdown?
  183. */
  184. int on_shutdown;
  185. /**
  186. * Is this task in the ready list?
  187. */
  188. int in_ready_list;
  189. #if EXECINFO
  190. /**
  191. * Array of strings which make up a backtrace from the point when this
  192. * task was scheduled (essentially, who scheduled the task?)
  193. */
  194. char **backtrace_strings;
  195. /**
  196. * Size of the backtrace_strings array
  197. */
  198. int num_backtrace_strings;
  199. #endif
  200. /**
  201. * Asynchronous scope of the task that scheduled this scope,
  202. */
  203. struct GNUNET_AsyncScopeSave scope;
  204. };
  205. /**
  206. * A struct representing an event the select driver is waiting for
  207. */
  208. struct Scheduled
  209. {
  210. struct Scheduled *prev;
  211. struct Scheduled *next;
  212. /**
  213. * the task, the event is related to
  214. */
  215. struct GNUNET_SCHEDULER_Task *task;
  216. /**
  217. * information about the network socket / file descriptor where
  218. * the event is expected to occur
  219. */
  220. struct GNUNET_SCHEDULER_FdInfo *fdi;
  221. /**
  222. * the event types (multiple event types can be ORed) the select
  223. * driver is expected to wait for
  224. */
  225. enum GNUNET_SCHEDULER_EventType et;
  226. };
  227. /**
  228. * Driver context used by GNUNET_SCHEDULER_run
  229. */
  230. struct DriverContext
  231. {
  232. /**
  233. * the head of a DLL containing information about the events the
  234. * select driver is waiting for
  235. */
  236. struct Scheduled *scheduled_head;
  237. /**
  238. * the tail of a DLL containing information about the events the
  239. * select driver is waiting for
  240. */
  241. struct Scheduled *scheduled_tail;
  242. /**
  243. * the time when the select driver will wake up again (after
  244. * calling select)
  245. */
  246. struct GNUNET_TIME_Absolute timeout;
  247. };
  248. /**
  249. * The driver used for the event loop. Will be handed over to
  250. * the scheduler in #GNUNET_SCHEDULER_do_work(), persisted
  251. * there in this variable for later use in functions like
  252. * #GNUNET_SCHEDULER_add_select(), #add_without_sets() and
  253. * #GNUNET_SCHEDULER_cancel().
  254. */
  255. static const struct GNUNET_SCHEDULER_Driver *scheduler_driver;
  256. /**
  257. * Head of list of tasks waiting for an event.
  258. */
  259. static struct GNUNET_SCHEDULER_Task *pending_head;
  260. /**
  261. * Tail of list of tasks waiting for an event.
  262. */
  263. static struct GNUNET_SCHEDULER_Task *pending_tail;
  264. /**
  265. * Head of list of tasks waiting for shutdown.
  266. */
  267. static struct GNUNET_SCHEDULER_Task *shutdown_head;
  268. /**
  269. * Tail of list of tasks waiting for shutdown.
  270. */
  271. static struct GNUNET_SCHEDULER_Task *shutdown_tail;
  272. /**
  273. * List of tasks waiting ONLY for a timeout event.
  274. * Sorted by timeout (earliest first). Used so that
  275. * we do not traverse the list of these tasks when
  276. * building select sets (we just look at the head
  277. * to determine the respective timeout ONCE).
  278. */
  279. static struct GNUNET_SCHEDULER_Task *pending_timeout_head;
  280. /**
  281. * List of tasks waiting ONLY for a timeout event.
  282. * Sorted by timeout (earliest first). Used so that
  283. * we do not traverse the list of these tasks when
  284. * building select sets (we just look at the head
  285. * to determine the respective timeout ONCE).
  286. */
  287. static struct GNUNET_SCHEDULER_Task *pending_timeout_tail;
  288. /**
  289. * Last inserted task waiting ONLY for a timeout event.
  290. * Used to (heuristically) speed up insertion.
  291. */
  292. static struct GNUNET_SCHEDULER_Task *pending_timeout_last;
  293. /**
  294. * ID of the task that is running right now.
  295. */
  296. static struct GNUNET_SCHEDULER_Task *active_task;
  297. /**
  298. * Head of list of tasks ready to run right now, grouped by importance.
  299. */
  300. static struct
  301. GNUNET_SCHEDULER_Task *ready_head[GNUNET_SCHEDULER_PRIORITY_COUNT];
  302. /**
  303. * Tail of list of tasks ready to run right now, grouped by importance.
  304. */
  305. static struct
  306. GNUNET_SCHEDULER_Task *ready_tail[GNUNET_SCHEDULER_PRIORITY_COUNT];
  307. /**
  308. * Task for installing parent control handlers (it might happen that the
  309. * scheduler is shutdown before this task is executed, so
  310. * GNUNET_SCHEDULER_shutdown must cancel it in that case)
  311. */
  312. static struct GNUNET_SCHEDULER_Task *install_parent_control_task;
  313. /**
  314. * Task for reading from a pipe that signal handlers will use to initiate
  315. * shutdown
  316. */
  317. static struct GNUNET_SCHEDULER_Task *shutdown_pipe_task;
  318. /**
  319. * Number of tasks on the ready list.
  320. */
  321. static unsigned int ready_count;
  322. /**
  323. * Priority of the task running right now. Only
  324. * valid while a task is running.
  325. */
  326. static enum GNUNET_SCHEDULER_Priority current_priority;
  327. /**
  328. * Priority of the highest task added in the current select
  329. * iteration.
  330. */
  331. static enum GNUNET_SCHEDULER_Priority max_priority_added;
  332. /**
  333. * Value of the 'lifeness' flag for the current task.
  334. */
  335. static int current_lifeness;
  336. /**
  337. * Priority used currently in #GNUNET_SCHEDULER_do_work().
  338. */
  339. static enum GNUNET_SCHEDULER_Priority work_priority;
  340. /**
  341. * Function to use as a select() in the scheduler.
  342. * If NULL, we use GNUNET_NETWORK_socket_select().
  343. */
  344. static GNUNET_SCHEDULER_select scheduler_select;
  345. /**
  346. * Task context of the current task.
  347. */
  348. static struct GNUNET_SCHEDULER_TaskContext tc;
  349. /**
  350. * Closure for #scheduler_select.
  351. */
  352. static void *scheduler_select_cls;
  353. /**
  354. * Sets the select function to use in the scheduler (scheduler_select).
  355. *
  356. * @param new_select new select function to use
  357. * @param new_select_cls closure for @a new_select
  358. * @return previously used select function, NULL for default
  359. */
  360. void
  361. GNUNET_SCHEDULER_set_select (GNUNET_SCHEDULER_select new_select,
  362. void *new_select_cls)
  363. {
  364. scheduler_select = new_select;
  365. scheduler_select_cls = new_select_cls;
  366. }
  367. /**
  368. * Check that the given priority is legal (and return it).
  369. *
  370. * @param p priority value to check
  371. * @return p on success, 0 on error
  372. */
  373. static enum GNUNET_SCHEDULER_Priority
  374. check_priority (enum GNUNET_SCHEDULER_Priority p)
  375. {
  376. if ((p >= 0) && (p < GNUNET_SCHEDULER_PRIORITY_COUNT))
  377. return p;
  378. GNUNET_assert (0);
  379. return 0; /* make compiler happy */
  380. }
  381. /**
  382. * chooses the nearest timeout from all pending tasks, to be used
  383. * to tell the driver the next wakeup time (using its set_wakeup
  384. * callback)
  385. */
  386. struct GNUNET_TIME_Absolute
  387. get_timeout ()
  388. {
  389. struct GNUNET_SCHEDULER_Task *pos;
  390. struct GNUNET_TIME_Absolute now;
  391. struct GNUNET_TIME_Absolute timeout;
  392. pos = pending_timeout_head;
  393. now = GNUNET_TIME_absolute_get ();
  394. timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
  395. if (NULL != pos)
  396. {
  397. if (0 != pos->reason)
  398. {
  399. return now;
  400. }
  401. else
  402. {
  403. timeout = pos->timeout;
  404. }
  405. }
  406. for (pos = pending_head; NULL != pos; pos = pos->next)
  407. {
  408. if (0 != pos->reason)
  409. {
  410. return now;
  411. }
  412. else if ((pos->timeout.abs_value_us !=
  413. GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us) &&
  414. (timeout.abs_value_us > pos->timeout.abs_value_us))
  415. {
  416. timeout = pos->timeout;
  417. }
  418. }
  419. return timeout;
  420. }
  421. /**
  422. * Put a task that is ready for execution into the ready queue.
  423. *
  424. * @param task task ready for execution
  425. */
  426. static void
  427. queue_ready_task (struct GNUNET_SCHEDULER_Task *task)
  428. {
  429. enum GNUNET_SCHEDULER_Priority p = check_priority (task->priority);
  430. GNUNET_CONTAINER_DLL_insert_tail (ready_head[p],
  431. ready_tail[p],
  432. task);
  433. task->in_ready_list = GNUNET_YES;
  434. ready_count++;
  435. }
  436. /**
  437. * Request the shutdown of a scheduler. Marks all tasks
  438. * awaiting shutdown as ready. Note that tasks
  439. * scheduled with #GNUNET_SCHEDULER_add_shutdown() AFTER this call
  440. * will be delayed until the next shutdown signal.
  441. */
  442. void
  443. GNUNET_SCHEDULER_shutdown ()
  444. {
  445. struct GNUNET_SCHEDULER_Task *pos;
  446. LOG (GNUNET_ERROR_TYPE_DEBUG,
  447. "GNUNET_SCHEDULER_shutdown\n");
  448. if (NULL != install_parent_control_task)
  449. {
  450. GNUNET_SCHEDULER_cancel (install_parent_control_task);
  451. install_parent_control_task = NULL;
  452. }
  453. if (NULL != shutdown_pipe_task)
  454. {
  455. GNUNET_SCHEDULER_cancel (shutdown_pipe_task);
  456. shutdown_pipe_task = NULL;
  457. }
  458. while (NULL != (pos = shutdown_head))
  459. {
  460. GNUNET_CONTAINER_DLL_remove (shutdown_head,
  461. shutdown_tail,
  462. pos);
  463. pos->reason |= GNUNET_SCHEDULER_REASON_SHUTDOWN;
  464. queue_ready_task (pos);
  465. }
  466. }
  467. /**
  468. * Output stack trace of task @a t.
  469. *
  470. * @param t task to dump stack trace of
  471. */
  472. static void
  473. dump_backtrace (struct GNUNET_SCHEDULER_Task *t)
  474. {
  475. #if EXECINFO
  476. for (unsigned int i = 0; i < t->num_backtrace_strings; i++)
  477. LOG (GNUNET_ERROR_TYPE_WARNING,
  478. "Task %p trace %u: %s\n",
  479. t,
  480. i,
  481. t->backtrace_strings[i]);
  482. #else
  483. (void) t;
  484. #endif
  485. }
  486. /**
  487. * Destroy a task (release associated resources)
  488. *
  489. * @param t task to destroy
  490. */
  491. static void
  492. destroy_task (struct GNUNET_SCHEDULER_Task *t)
  493. {
  494. LOG (GNUNET_ERROR_TYPE_DEBUG,
  495. "destroying task %p\n",
  496. t);
  497. if (GNUNET_YES == t->own_handles)
  498. {
  499. for (unsigned int i = 0; i != t->fds_len; ++i)
  500. {
  501. const struct GNUNET_NETWORK_Handle *fd = t->fds[i].fd;
  502. const struct GNUNET_DISK_FileHandle *fh = t->fds[i].fh;
  503. if (fd)
  504. {
  505. GNUNET_NETWORK_socket_free_memory_only_ (
  506. (struct GNUNET_NETWORK_Handle *) fd);
  507. }
  508. if (fh)
  509. {
  510. // FIXME: on WIN32 this is not enough! A function
  511. // GNUNET_DISK_file_free_memory_only would be nice
  512. GNUNET_free_nz ((void *) fh);
  513. }
  514. }
  515. }
  516. if (t->fds_len > 1)
  517. {
  518. GNUNET_array_grow (t->fds, t->fds_len, 0);
  519. }
  520. #if EXECINFO
  521. GNUNET_free (t->backtrace_strings);
  522. #endif
  523. GNUNET_free (t);
  524. }
  525. /**
  526. * Pipe used to communicate shutdown via signal.
  527. */
  528. static struct GNUNET_DISK_PipeHandle *shutdown_pipe_handle;
  529. /**
  530. * Process ID of this process at the time we installed the various
  531. * signal handlers.
  532. */
  533. static pid_t my_pid;
  534. /**
  535. * Signal handler called for SIGPIPE.
  536. */
  537. static void
  538. sighandler_pipe ()
  539. {
  540. return;
  541. }
  542. ///**
  543. // * Wait for a short time.
  544. // * Sleeps for @a ms ms (as that should be long enough for virtually all
  545. // * modern systems to context switch and allow another process to do
  546. // * some 'real' work).
  547. // *
  548. // * @param ms how many ms to wait
  549. // */
  550. // static void
  551. // short_wait (unsigned int ms)
  552. // {
  553. // struct GNUNET_TIME_Relative timeout;
  554. //
  555. // timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, ms);
  556. // (void) GNUNET_NETWORK_socket_select (NULL, NULL, NULL, timeout);
  557. // }
  558. /**
  559. * Signal handler called for signals that should cause us to shutdown.
  560. */
  561. static void
  562. sighandler_shutdown ()
  563. {
  564. static char c;
  565. int old_errno = errno; /* backup errno */
  566. if (getpid () != my_pid)
  567. _exit (1); /* we have fork'ed since the signal handler was created,
  568. * ignore the signal, see https://gnunet.org/vfork discussion */
  569. GNUNET_DISK_file_write (GNUNET_DISK_pipe_handle
  570. (shutdown_pipe_handle, GNUNET_DISK_PIPE_END_WRITE),
  571. &c, sizeof(c));
  572. errno = old_errno;
  573. }
  574. static void
  575. shutdown_if_no_lifeness ()
  576. {
  577. struct GNUNET_SCHEDULER_Task *t;
  578. if (ready_count > 0)
  579. return;
  580. for (t = pending_head; NULL != t; t = t->next)
  581. if (GNUNET_YES == t->lifeness)
  582. return;
  583. for (t = shutdown_head; NULL != t; t = t->next)
  584. if (GNUNET_YES == t->lifeness)
  585. return;
  586. for (t = pending_timeout_head; NULL != t; t = t->next)
  587. if (GNUNET_YES == t->lifeness)
  588. return;
  589. /* No lifeness! */
  590. GNUNET_SCHEDULER_shutdown ();
  591. }
  592. static int
  593. select_loop (struct GNUNET_SCHEDULER_Handle *sh,
  594. struct DriverContext *context);
  595. /**
  596. * Initialize and run scheduler. This function will return when all
  597. * tasks have completed. On systems with signals, receiving a SIGTERM
  598. * (and other similar signals) will cause #GNUNET_SCHEDULER_shutdown()
  599. * to be run after the active task is complete. As a result, SIGTERM
  600. * causes all active tasks to be scheduled with reason
  601. * #GNUNET_SCHEDULER_REASON_SHUTDOWN. (However, tasks added
  602. * afterwards will execute normally!). Note that any particular signal
  603. * will only shut down one scheduler; applications should always only
  604. * create a single scheduler.
  605. *
  606. * @param task task to run immediately
  607. * @param task_cls closure of @a task
  608. */
  609. void
  610. GNUNET_SCHEDULER_run (GNUNET_SCHEDULER_TaskCallback task,
  611. void *task_cls)
  612. {
  613. struct GNUNET_SCHEDULER_Handle *sh;
  614. struct GNUNET_SCHEDULER_Driver *driver;
  615. struct DriverContext context = {
  616. .scheduled_head = NULL,
  617. .scheduled_tail = NULL,
  618. .timeout = GNUNET_TIME_absolute_get ()
  619. };
  620. driver = GNUNET_SCHEDULER_driver_select ();
  621. driver->cls = &context;
  622. sh = GNUNET_SCHEDULER_driver_init (driver);
  623. GNUNET_SCHEDULER_add_with_reason_and_priority (task,
  624. task_cls,
  625. GNUNET_SCHEDULER_REASON_STARTUP,
  626. GNUNET_SCHEDULER_PRIORITY_DEFAULT);
  627. select_loop (sh,
  628. &context);
  629. GNUNET_SCHEDULER_driver_done (sh);
  630. GNUNET_free (driver);
  631. }
  632. /**
  633. * Obtain the task context, giving the reason why the current task was
  634. * started.
  635. *
  636. * @return current tasks' scheduler context
  637. */
  638. const struct GNUNET_SCHEDULER_TaskContext *
  639. GNUNET_SCHEDULER_get_task_context ()
  640. {
  641. GNUNET_assert (NULL != active_task);
  642. return &tc;
  643. }
  644. /**
  645. * Get information about the current load of this scheduler. Use this
  646. * function to determine if an elective task should be added or simply
  647. * dropped (if the decision should be made based on the number of
  648. * tasks ready to run).
  649. *
  650. * @param p priority level to look at
  651. * @return number of tasks pending right now
  652. */
  653. unsigned int
  654. GNUNET_SCHEDULER_get_load (enum GNUNET_SCHEDULER_Priority p)
  655. {
  656. unsigned int ret;
  657. GNUNET_assert (NULL != active_task);
  658. if (p == GNUNET_SCHEDULER_PRIORITY_COUNT)
  659. return ready_count;
  660. if (p == GNUNET_SCHEDULER_PRIORITY_KEEP)
  661. p = current_priority;
  662. ret = 0;
  663. for (struct GNUNET_SCHEDULER_Task *pos = ready_head[check_priority (p)];
  664. NULL != pos;
  665. pos = pos->next)
  666. ret++;
  667. return ret;
  668. }
  669. void
  670. init_fd_info (struct GNUNET_SCHEDULER_Task *t,
  671. const struct GNUNET_NETWORK_Handle *const *read_nh,
  672. unsigned int read_nh_len,
  673. const struct GNUNET_NETWORK_Handle *const *write_nh,
  674. unsigned int write_nh_len,
  675. const struct GNUNET_DISK_FileHandle *const *read_fh,
  676. unsigned int read_fh_len,
  677. const struct GNUNET_DISK_FileHandle *const *write_fh,
  678. unsigned int write_fh_len)
  679. {
  680. // FIXME: if we have exactly two network handles / exactly two file handles
  681. // and they are equal, we can make one FdInfo with both
  682. // GNUNET_SCHEDULER_ET_IN and GNUNET_SCHEDULER_ET_OUT set.
  683. struct GNUNET_SCHEDULER_FdInfo *fdi;
  684. t->fds_len = read_nh_len + write_nh_len + read_fh_len + write_fh_len;
  685. if (1 == t->fds_len)
  686. {
  687. fdi = &t->fdx;
  688. t->fds = fdi;
  689. if (1 == read_nh_len)
  690. {
  691. GNUNET_assert (NULL != read_nh);
  692. GNUNET_assert (NULL != *read_nh);
  693. fdi->fd = *read_nh;
  694. fdi->et = GNUNET_SCHEDULER_ET_IN;
  695. fdi->sock = GNUNET_NETWORK_get_fd (*read_nh);
  696. t->read_fd = fdi->sock;
  697. t->write_fd = -1;
  698. }
  699. else if (1 == write_nh_len)
  700. {
  701. GNUNET_assert (NULL != write_nh);
  702. GNUNET_assert (NULL != *write_nh);
  703. fdi->fd = *write_nh;
  704. fdi->et = GNUNET_SCHEDULER_ET_OUT;
  705. fdi->sock = GNUNET_NETWORK_get_fd (*write_nh);
  706. t->read_fd = -1;
  707. t->write_fd = fdi->sock;
  708. }
  709. else if (1 == read_fh_len)
  710. {
  711. GNUNET_assert (NULL != read_fh);
  712. GNUNET_assert (NULL != *read_fh);
  713. fdi->fh = *read_fh;
  714. fdi->et = GNUNET_SCHEDULER_ET_IN;
  715. fdi->sock = (*read_fh)->fd; // FIXME: does not work under WIN32
  716. t->read_fd = fdi->sock;
  717. t->write_fd = -1;
  718. }
  719. else
  720. {
  721. GNUNET_assert (NULL != write_fh);
  722. GNUNET_assert (NULL != *write_fh);
  723. fdi->fh = *write_fh;
  724. fdi->et = GNUNET_SCHEDULER_ET_OUT;
  725. fdi->sock = (*write_fh)->fd; // FIXME: does not work under WIN32
  726. t->read_fd = -1;
  727. t->write_fd = fdi->sock;
  728. }
  729. }
  730. else
  731. {
  732. fdi = GNUNET_new_array (t->fds_len, struct GNUNET_SCHEDULER_FdInfo);
  733. t->fds = fdi;
  734. t->read_fd = -1;
  735. t->write_fd = -1;
  736. unsigned int i;
  737. for (i = 0; i != read_nh_len; ++i)
  738. {
  739. fdi->fd = read_nh[i];
  740. GNUNET_assert (NULL != fdi->fd);
  741. fdi->et = GNUNET_SCHEDULER_ET_IN;
  742. fdi->sock = GNUNET_NETWORK_get_fd (read_nh[i]);
  743. ++fdi;
  744. }
  745. for (i = 0; i != write_nh_len; ++i)
  746. {
  747. fdi->fd = write_nh[i];
  748. GNUNET_assert (NULL != fdi->fd);
  749. fdi->et = GNUNET_SCHEDULER_ET_OUT;
  750. fdi->sock = GNUNET_NETWORK_get_fd (write_nh[i]);
  751. ++fdi;
  752. }
  753. for (i = 0; i != read_fh_len; ++i)
  754. {
  755. fdi->fh = read_fh[i];
  756. GNUNET_assert (NULL != fdi->fh);
  757. fdi->et = GNUNET_SCHEDULER_ET_IN;
  758. fdi->sock = (read_fh[i])->fd; // FIXME: does not work under WIN32
  759. ++fdi;
  760. }
  761. for (i = 0; i != write_fh_len; ++i)
  762. {
  763. fdi->fh = write_fh[i];
  764. GNUNET_assert (NULL != fdi->fh);
  765. fdi->et = GNUNET_SCHEDULER_ET_OUT;
  766. fdi->sock = (write_fh[i])->fd; // FIXME: does not work under WIN32
  767. ++fdi;
  768. }
  769. }
  770. }
  771. /**
  772. * calls the given function @a func on each FdInfo related to @a t.
  773. * Optionally updates the event type field in each FdInfo after calling
  774. * @a func.
  775. *
  776. * @param t the task
  777. * @param driver_func the function to call with each FdInfo contained in
  778. * in @a t
  779. * @param if_not_ready only call @a driver_func on FdInfos that are not
  780. * ready
  781. * @param et the event type to be set in each FdInfo after calling
  782. * @a driver_func on it, or -1 if no updating not desired.
  783. */
  784. static void
  785. driver_add_multiple (struct GNUNET_SCHEDULER_Task *t)
  786. {
  787. struct GNUNET_SCHEDULER_FdInfo *fdi;
  788. int success = GNUNET_YES;
  789. for (unsigned int i = 0; i != t->fds_len; ++i)
  790. {
  791. fdi = &t->fds[i];
  792. success = scheduler_driver->add (scheduler_driver->cls,
  793. t,
  794. fdi) && success;
  795. fdi->et = GNUNET_SCHEDULER_ET_NONE;
  796. }
  797. if (GNUNET_YES != success)
  798. {
  799. LOG (GNUNET_ERROR_TYPE_ERROR,
  800. "driver could not add task\n");
  801. }
  802. }
  803. static void
  804. install_parent_control_handler (void *cls)
  805. {
  806. (void) cls;
  807. install_parent_control_task = NULL;
  808. GNUNET_OS_install_parent_control_handler (NULL);
  809. }
  810. static void
  811. shutdown_pipe_cb (void *cls)
  812. {
  813. char c;
  814. const struct GNUNET_DISK_FileHandle *pr;
  815. (void) cls;
  816. shutdown_pipe_task = NULL;
  817. pr = GNUNET_DISK_pipe_handle (shutdown_pipe_handle,
  818. GNUNET_DISK_PIPE_END_READ);
  819. GNUNET_assert (! GNUNET_DISK_handle_invalid (pr));
  820. /* consume the signal */
  821. GNUNET_DISK_file_read (pr, &c, sizeof(c));
  822. /* mark all active tasks as ready due to shutdown */
  823. GNUNET_SCHEDULER_shutdown ();
  824. shutdown_pipe_task =
  825. GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL,
  826. pr,
  827. &shutdown_pipe_cb,
  828. NULL);
  829. }
  830. /**
  831. * Cancel the task with the specified identifier.
  832. * The task must not yet have run. Only allowed to be called as long as the
  833. * scheduler is running, that is one of the following conditions is met:
  834. *
  835. * - #GNUNET_SCHEDULER_run has been called and has not returned yet
  836. * - #GNUNET_SCHEDULER_driver_init has been run and
  837. * #GNUNET_SCHEDULER_driver_done has not been called yet
  838. *
  839. * @param task id of the task to cancel
  840. * @return original closure of the task
  841. */
  842. void *
  843. GNUNET_SCHEDULER_cancel (struct GNUNET_SCHEDULER_Task *task)
  844. {
  845. enum GNUNET_SCHEDULER_Priority p;
  846. int is_fd_task;
  847. void *ret;
  848. LOG (GNUNET_ERROR_TYPE_DEBUG,
  849. "canceling task %p\n",
  850. task);
  851. /* scheduler must be running */
  852. GNUNET_assert (NULL != scheduler_driver);
  853. is_fd_task = (NULL != task->fds);
  854. if (is_fd_task)
  855. {
  856. int del_result = scheduler_driver->del (scheduler_driver->cls, task);
  857. if (GNUNET_OK != del_result)
  858. {
  859. LOG (GNUNET_ERROR_TYPE_ERROR,
  860. "driver could not delete task\n");
  861. GNUNET_assert (0);
  862. }
  863. }
  864. if (! task->in_ready_list)
  865. {
  866. if (is_fd_task)
  867. {
  868. GNUNET_CONTAINER_DLL_remove (pending_head,
  869. pending_tail,
  870. task);
  871. }
  872. else if (GNUNET_YES == task->on_shutdown)
  873. {
  874. GNUNET_CONTAINER_DLL_remove (shutdown_head,
  875. shutdown_tail,
  876. task);
  877. }
  878. else
  879. {
  880. GNUNET_CONTAINER_DLL_remove (pending_timeout_head,
  881. pending_timeout_tail,
  882. task);
  883. if (pending_timeout_last == task)
  884. pending_timeout_last = NULL;
  885. }
  886. }
  887. else
  888. {
  889. p = check_priority (task->priority);
  890. GNUNET_CONTAINER_DLL_remove (ready_head[p],
  891. ready_tail[p],
  892. task);
  893. ready_count--;
  894. }
  895. ret = task->callback_cls;
  896. destroy_task (task);
  897. return ret;
  898. }
  899. /**
  900. * Initialize backtrace data for task @a t
  901. *
  902. * @param t task to initialize
  903. */
  904. static void
  905. init_backtrace (struct GNUNET_SCHEDULER_Task *t)
  906. {
  907. #if EXECINFO
  908. void *backtrace_array[MAX_TRACE_DEPTH];
  909. t->num_backtrace_strings
  910. = backtrace (backtrace_array, MAX_TRACE_DEPTH);
  911. t->backtrace_strings =
  912. backtrace_symbols (backtrace_array,
  913. t->num_backtrace_strings);
  914. dump_backtrace (t);
  915. #else
  916. (void) t;
  917. #endif
  918. }
  919. /**
  920. * Continue the current execution with the given function. This is
  921. * similar to the other "add" functions except that there is no delay
  922. * and the reason code can be specified.
  923. *
  924. * @param task main function of the task
  925. * @param task_cls closure for @a task
  926. * @param reason reason for task invocation
  927. * @param priority priority to use for the task
  928. */
  929. void
  930. GNUNET_SCHEDULER_add_with_reason_and_priority (GNUNET_SCHEDULER_TaskCallback
  931. task,
  932. void *task_cls,
  933. enum GNUNET_SCHEDULER_Reason
  934. reason,
  935. enum GNUNET_SCHEDULER_Priority
  936. priority)
  937. {
  938. struct GNUNET_SCHEDULER_Task *t;
  939. /* scheduler must be running */
  940. GNUNET_assert (NULL != scheduler_driver);
  941. GNUNET_assert (NULL != task);
  942. t = GNUNET_new (struct GNUNET_SCHEDULER_Task);
  943. t->read_fd = -1;
  944. t->write_fd = -1;
  945. t->callback = task;
  946. t->callback_cls = task_cls;
  947. #if PROFILE_DELAYS
  948. t->start_time = GNUNET_TIME_absolute_get ();
  949. #endif
  950. t->reason = reason;
  951. t->priority = check_priority (priority);
  952. t->lifeness = current_lifeness;
  953. LOG (GNUNET_ERROR_TYPE_DEBUG,
  954. "Adding continuation task %p\n",
  955. t);
  956. init_backtrace (t);
  957. queue_ready_task (t);
  958. }
  959. /**
  960. * Schedule a new task to be run at the specified time. The task
  961. * will be scheduled for execution at time @a at.
  962. *
  963. * @param at time when the operation should run
  964. * @param priority priority to use for the task
  965. * @param task main function of the task
  966. * @param task_cls closure of @a task
  967. * @return unique task identifier for the job
  968. * only valid until @a task is started!
  969. */
  970. struct GNUNET_SCHEDULER_Task *
  971. GNUNET_SCHEDULER_add_at_with_priority (struct GNUNET_TIME_Absolute at,
  972. enum GNUNET_SCHEDULER_Priority priority,
  973. GNUNET_SCHEDULER_TaskCallback task,
  974. void *task_cls)
  975. {
  976. struct GNUNET_SCHEDULER_Task *t;
  977. struct GNUNET_SCHEDULER_Task *pos;
  978. struct GNUNET_SCHEDULER_Task *prev;
  979. struct GNUNET_TIME_Relative left;
  980. /* scheduler must be running */
  981. GNUNET_assert (NULL != scheduler_driver);
  982. GNUNET_assert (NULL != task);
  983. t = GNUNET_new (struct GNUNET_SCHEDULER_Task);
  984. GNUNET_async_scope_get (&t->scope);
  985. t->callback = task;
  986. t->callback_cls = task_cls;
  987. t->read_fd = -1;
  988. t->write_fd = -1;
  989. #if PROFILE_DELAYS
  990. t->start_time = GNUNET_TIME_absolute_get ();
  991. #endif
  992. t->timeout = at;
  993. t->priority = check_priority (priority);
  994. t->lifeness = current_lifeness;
  995. init_backtrace (t);
  996. left = GNUNET_TIME_absolute_get_remaining (at);
  997. if (0 == left.rel_value_us)
  998. {
  999. queue_ready_task (t);
  1000. if (priority > work_priority)
  1001. work_priority = priority;
  1002. return t;
  1003. }
  1004. /* try tail first (optimization in case we are
  1005. * appending to a long list of tasks with timeouts) */
  1006. if ((NULL == pending_timeout_head) ||
  1007. (at.abs_value_us < pending_timeout_head->timeout.abs_value_us))
  1008. {
  1009. GNUNET_CONTAINER_DLL_insert (pending_timeout_head,
  1010. pending_timeout_tail,
  1011. t);
  1012. }
  1013. else
  1014. {
  1015. /* first move from heuristic start backwards to before start time */
  1016. prev = pending_timeout_last;
  1017. while ((NULL != prev) &&
  1018. (prev->timeout.abs_value_us > t->timeout.abs_value_us))
  1019. prev = prev->prev;
  1020. /* now, move from heuristic start (or head of list) forward to insertion point */
  1021. if (NULL == prev)
  1022. pos = pending_timeout_head;
  1023. else
  1024. pos = prev->next;
  1025. while ((NULL != pos) && (pos->timeout.abs_value_us <=
  1026. t->timeout.abs_value_us))
  1027. {
  1028. prev = pos;
  1029. pos = pos->next;
  1030. }
  1031. GNUNET_CONTAINER_DLL_insert_after (pending_timeout_head,
  1032. pending_timeout_tail,
  1033. prev,
  1034. t);
  1035. }
  1036. /* finally, update heuristic insertion point to last insertion... */
  1037. pending_timeout_last = t;
  1038. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1039. "Adding task %p\n",
  1040. t);
  1041. return t;
  1042. }
  1043. /**
  1044. * Schedule a new task to be run with a specified delay. The task
  1045. * will be scheduled for execution once the delay has expired.
  1046. *
  1047. * @param delay when should this operation time out?
  1048. * @param priority priority to use for the task
  1049. * @param task main function of the task
  1050. * @param task_cls closure of @a task
  1051. * @return unique task identifier for the job
  1052. * only valid until @a task is started!
  1053. */
  1054. struct GNUNET_SCHEDULER_Task *
  1055. GNUNET_SCHEDULER_add_delayed_with_priority (struct GNUNET_TIME_Relative delay,
  1056. enum GNUNET_SCHEDULER_Priority
  1057. priority,
  1058. GNUNET_SCHEDULER_TaskCallback task,
  1059. void *task_cls)
  1060. {
  1061. return GNUNET_SCHEDULER_add_at_with_priority (
  1062. GNUNET_TIME_relative_to_absolute (delay),
  1063. priority,
  1064. task,
  1065. task_cls);
  1066. }
  1067. /**
  1068. * Schedule a new task to be run with a specified priority.
  1069. *
  1070. * @param prio how important is the new task?
  1071. * @param task main function of the task
  1072. * @param task_cls closure of @a task
  1073. * @return unique task identifier for the job
  1074. * only valid until @a task is started!
  1075. */
  1076. struct GNUNET_SCHEDULER_Task *
  1077. GNUNET_SCHEDULER_add_with_priority (enum GNUNET_SCHEDULER_Priority prio,
  1078. GNUNET_SCHEDULER_TaskCallback task,
  1079. void *task_cls)
  1080. {
  1081. return GNUNET_SCHEDULER_add_delayed_with_priority (GNUNET_TIME_UNIT_ZERO,
  1082. prio,
  1083. task,
  1084. task_cls);
  1085. }
  1086. /**
  1087. * Schedule a new task to be run at the specified time. The task
  1088. * will be scheduled for execution once specified time has been
  1089. * reached. It will be run with the DEFAULT priority.
  1090. *
  1091. * @param at time at which this operation should run
  1092. * @param task main function of the task
  1093. * @param task_cls closure of @a task
  1094. * @return unique task identifier for the job
  1095. * only valid until @a task is started!
  1096. */
  1097. struct GNUNET_SCHEDULER_Task *
  1098. GNUNET_SCHEDULER_add_at (struct GNUNET_TIME_Absolute at,
  1099. GNUNET_SCHEDULER_TaskCallback task,
  1100. void *task_cls)
  1101. {
  1102. return GNUNET_SCHEDULER_add_at_with_priority (at,
  1103. GNUNET_SCHEDULER_PRIORITY_DEFAULT,
  1104. task,
  1105. task_cls);
  1106. }
  1107. /**
  1108. * Schedule a new task to be run with a specified delay. The task
  1109. * will be scheduled for execution once the delay has expired. It
  1110. * will be run with the DEFAULT priority.
  1111. *
  1112. * @param delay when should this operation time out?
  1113. * @param task main function of the task
  1114. * @param task_cls closure of @a task
  1115. * @return unique task identifier for the job
  1116. * only valid until @a task is started!
  1117. */
  1118. struct GNUNET_SCHEDULER_Task *
  1119. GNUNET_SCHEDULER_add_delayed (struct GNUNET_TIME_Relative delay,
  1120. GNUNET_SCHEDULER_TaskCallback task,
  1121. void *task_cls)
  1122. {
  1123. return GNUNET_SCHEDULER_add_delayed_with_priority (delay,
  1124. GNUNET_SCHEDULER_PRIORITY_DEFAULT,
  1125. task,
  1126. task_cls);
  1127. }
  1128. /**
  1129. * Schedule a new task to be run as soon as possible. Note that this
  1130. * does not guarantee that this will be the next task that is being
  1131. * run, as other tasks with higher priority (or that are already ready
  1132. * to run) might get to run first. Just as with delays, clients must
  1133. * not rely on any particular order of execution between tasks
  1134. * scheduled concurrently.
  1135. *
  1136. * The task will be run with the DEFAULT priority.
  1137. *
  1138. * @param task main function of the task
  1139. * @param task_cls closure of @a task
  1140. * @return unique task identifier for the job
  1141. * only valid until @a task is started!
  1142. */
  1143. struct GNUNET_SCHEDULER_Task *
  1144. GNUNET_SCHEDULER_add_now (GNUNET_SCHEDULER_TaskCallback task,
  1145. void *task_cls)
  1146. {
  1147. struct GNUNET_SCHEDULER_Task *t;
  1148. t = GNUNET_new (struct GNUNET_SCHEDULER_Task);
  1149. GNUNET_async_scope_get (&t->scope);
  1150. t->callback = task;
  1151. t->callback_cls = task_cls;
  1152. t->read_fd = -1;
  1153. t->write_fd = -1;
  1154. #if PROFILE_DELAYS
  1155. t->start_time = GNUNET_TIME_absolute_get ();
  1156. #endif
  1157. t->timeout = GNUNET_TIME_UNIT_ZERO_ABS;
  1158. t->priority = current_priority;
  1159. t->on_shutdown = GNUNET_YES;
  1160. t->lifeness = current_lifeness;
  1161. queue_ready_task (t);
  1162. init_backtrace (t);
  1163. return t;
  1164. }
  1165. /**
  1166. * Schedule a new task to be run on shutdown, that is when a CTRL-C
  1167. * signal is received, or when #GNUNET_SCHEDULER_shutdown() is being
  1168. * invoked.
  1169. *
  1170. * @param task main function of the task
  1171. * @param task_cls closure of @a task
  1172. * @return unique task identifier for the job
  1173. * only valid until @a task is started!
  1174. */
  1175. struct GNUNET_SCHEDULER_Task *
  1176. GNUNET_SCHEDULER_add_shutdown (GNUNET_SCHEDULER_TaskCallback task,
  1177. void *task_cls)
  1178. {
  1179. struct GNUNET_SCHEDULER_Task *t;
  1180. /* scheduler must be running */
  1181. GNUNET_assert (NULL != scheduler_driver);
  1182. GNUNET_assert (NULL != task);
  1183. t = GNUNET_new (struct GNUNET_SCHEDULER_Task);
  1184. GNUNET_async_scope_get (&t->scope);
  1185. t->callback = task;
  1186. t->callback_cls = task_cls;
  1187. t->read_fd = -1;
  1188. t->write_fd = -1;
  1189. #if PROFILE_DELAYS
  1190. t->start_time = GNUNET_TIME_absolute_get ();
  1191. #endif
  1192. t->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
  1193. t->priority = GNUNET_SCHEDULER_PRIORITY_SHUTDOWN;
  1194. t->on_shutdown = GNUNET_YES;
  1195. t->lifeness = GNUNET_NO;
  1196. GNUNET_CONTAINER_DLL_insert (shutdown_head,
  1197. shutdown_tail,
  1198. t);
  1199. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1200. "Adding shutdown task %p\n",
  1201. t);
  1202. init_backtrace (t);
  1203. return t;
  1204. }
  1205. /**
  1206. * Schedule a new task to be run as soon as possible with the
  1207. * (transitive) ignore-shutdown flag either explicitly set or
  1208. * explicitly enabled. This task (and all tasks created from it,
  1209. * other than by another call to this function) will either count or
  1210. * not count for the "lifeness" of the process. This API is only
  1211. * useful in a few special cases.
  1212. *
  1213. * @param lifeness #GNUNET_YES if the task counts for lifeness, #GNUNET_NO if not.
  1214. * @param task main function of the task
  1215. * @param task_cls closure of @a task
  1216. * @return unique task identifier for the job
  1217. * only valid until @a task is started!
  1218. */
  1219. struct GNUNET_SCHEDULER_Task *
  1220. GNUNET_SCHEDULER_add_now_with_lifeness (int lifeness,
  1221. GNUNET_SCHEDULER_TaskCallback task,
  1222. void *task_cls)
  1223. {
  1224. struct GNUNET_SCHEDULER_Task *ret;
  1225. ret = GNUNET_SCHEDULER_add_now (task, task_cls);
  1226. ret->lifeness = lifeness;
  1227. return ret;
  1228. }
  1229. #if DEBUG_FDS
  1230. /**
  1231. * check a raw file descriptor and abort if it is bad (for debugging purposes)
  1232. *
  1233. * @param t the task related to the file descriptor
  1234. * @param raw_fd the raw file descriptor to check
  1235. */
  1236. void
  1237. check_fd (struct GNUNET_SCHEDULER_Task *t, int raw_fd)
  1238. {
  1239. if (-1 != raw_fd)
  1240. {
  1241. int flags = fcntl (raw_fd, F_GETFD);
  1242. if ((flags == -1) && (errno == EBADF))
  1243. {
  1244. LOG (GNUNET_ERROR_TYPE_ERROR,
  1245. "Got invalid file descriptor %d!\n",
  1246. raw_fd);
  1247. init_backtrace (t);
  1248. GNUNET_assert (0);
  1249. }
  1250. }
  1251. }
  1252. #endif
  1253. /**
  1254. * Schedule a new task to be run with a specified delay or when any of
  1255. * the specified file descriptor sets is ready. The delay can be used
  1256. * as a timeout on the socket(s) being ready. The task will be
  1257. * scheduled for execution once either the delay has expired or any of
  1258. * the socket operations is ready. This is the most general
  1259. * function of the "add" family. Note that the "prerequisite_task"
  1260. * must be satisfied in addition to any of the other conditions. In
  1261. * other words, the task will be started when
  1262. * <code>
  1263. * (prerequisite-run)
  1264. * && (delay-ready
  1265. * || any-rs-ready
  1266. * || any-ws-ready)
  1267. * </code>
  1268. *
  1269. * @param delay how long should we wait?
  1270. * @param priority priority to use
  1271. * @param rfd file descriptor we want to read (can be -1)
  1272. * @param wfd file descriptors we want to write (can be -1)
  1273. * @param task main function of the task
  1274. * @param task_cls closure of @a task
  1275. * @return unique task identifier for the job
  1276. * only valid until @a task is started!
  1277. */
  1278. static struct GNUNET_SCHEDULER_Task *
  1279. add_without_sets (struct GNUNET_TIME_Relative delay,
  1280. enum GNUNET_SCHEDULER_Priority priority,
  1281. const struct GNUNET_NETWORK_Handle *read_nh,
  1282. const struct GNUNET_NETWORK_Handle *write_nh,
  1283. const struct GNUNET_DISK_FileHandle *read_fh,
  1284. const struct GNUNET_DISK_FileHandle *write_fh,
  1285. GNUNET_SCHEDULER_TaskCallback task,
  1286. void *task_cls)
  1287. {
  1288. struct GNUNET_SCHEDULER_Task *t;
  1289. /* scheduler must be running */
  1290. GNUNET_assert (NULL != scheduler_driver);
  1291. GNUNET_assert (NULL != task);
  1292. t = GNUNET_new (struct GNUNET_SCHEDULER_Task);
  1293. GNUNET_async_scope_get (&t->scope);
  1294. init_fd_info (t,
  1295. &read_nh,
  1296. read_nh ? 1 : 0,
  1297. &write_nh,
  1298. write_nh ? 1 : 0,
  1299. &read_fh,
  1300. read_fh ? 1 : 0,
  1301. &write_fh,
  1302. write_fh ? 1 : 0);
  1303. t->callback = task;
  1304. t->callback_cls = task_cls;
  1305. #if DEBUG_FDS
  1306. check_fd (t, NULL != read_nh ? GNUNET_NETWORK_get_fd (read_nh) : -1);
  1307. check_fd (t, NULL != write_nh ? GNUNET_NETWORK_get_fd (write_nh) : -1);
  1308. check_fd (t, NULL != read_fh ? read_fh->fd : -1);
  1309. check_fd (t, NULL != write_fh ? write_fh->fd : -1);
  1310. #endif
  1311. #if PROFILE_DELAYS
  1312. t->start_time = GNUNET_TIME_absolute_get ();
  1313. #endif
  1314. t->timeout = GNUNET_TIME_relative_to_absolute (delay);
  1315. t->priority = check_priority ((priority == GNUNET_SCHEDULER_PRIORITY_KEEP) ?
  1316. current_priority : priority);
  1317. t->lifeness = current_lifeness;
  1318. GNUNET_CONTAINER_DLL_insert (pending_head,
  1319. pending_tail,
  1320. t);
  1321. driver_add_multiple (t);
  1322. max_priority_added = GNUNET_MAX (max_priority_added,
  1323. t->priority);
  1324. init_backtrace (t);
  1325. return t;
  1326. }
  1327. /**
  1328. * Schedule a new task to be run with a specified delay or when the
  1329. * specified file descriptor is ready for reading. The delay can be
  1330. * used as a timeout on the socket being ready. The task will be
  1331. * scheduled for execution once either the delay has expired or the
  1332. * socket operation is ready. It will be run with the DEFAULT priority.
  1333. * Only allowed to be called as long as the scheduler is running, that
  1334. * is one of the following conditions is met:
  1335. *
  1336. * - #GNUNET_SCHEDULER_run has been called and has not returned yet
  1337. * - #GNUNET_SCHEDULER_driver_init has been run and
  1338. * #GNUNET_SCHEDULER_driver_done has not been called yet
  1339. *
  1340. * @param delay when should this operation time out?
  1341. * @param rfd read file-descriptor
  1342. * @param task main function of the task
  1343. * @param task_cls closure of @a task
  1344. * @return unique task identifier for the job
  1345. * only valid until @a task is started!
  1346. */
  1347. struct GNUNET_SCHEDULER_Task *
  1348. GNUNET_SCHEDULER_add_read_net (struct GNUNET_TIME_Relative delay,
  1349. struct GNUNET_NETWORK_Handle *rfd,
  1350. GNUNET_SCHEDULER_TaskCallback task,
  1351. void *task_cls)
  1352. {
  1353. return GNUNET_SCHEDULER_add_read_net_with_priority (delay,
  1354. GNUNET_SCHEDULER_PRIORITY_DEFAULT,
  1355. rfd, task, task_cls);
  1356. }
  1357. /**
  1358. * Schedule a new task to be run with a specified priority and to be
  1359. * run after the specified delay or when the specified file descriptor
  1360. * is ready for reading. The delay can be used as a timeout on the
  1361. * socket being ready. The task will be scheduled for execution once
  1362. * either the delay has expired or the socket operation is ready. It
  1363. * will be run with the DEFAULT priority.
  1364. * Only allowed to be called as long as the scheduler is running, that
  1365. * is one of the following conditions is met:
  1366. *
  1367. * - #GNUNET_SCHEDULER_run has been called and has not returned yet
  1368. * - #GNUNET_SCHEDULER_driver_init has been run and
  1369. * #GNUNET_SCHEDULER_driver_done has not been called yet
  1370. *
  1371. * @param delay when should this operation time out?
  1372. * @param priority priority to use for the task
  1373. * @param rfd read file-descriptor
  1374. * @param task main function of the task
  1375. * @param task_cls closure of @a task
  1376. * @return unique task identifier for the job
  1377. * only valid until @a task is started!
  1378. */
  1379. struct GNUNET_SCHEDULER_Task *
  1380. GNUNET_SCHEDULER_add_read_net_with_priority (struct GNUNET_TIME_Relative delay,
  1381. enum GNUNET_SCHEDULER_Priority
  1382. priority,
  1383. struct GNUNET_NETWORK_Handle *rfd,
  1384. GNUNET_SCHEDULER_TaskCallback task,
  1385. void *task_cls)
  1386. {
  1387. return GNUNET_SCHEDULER_add_net_with_priority (delay, priority,
  1388. rfd,
  1389. GNUNET_YES,
  1390. GNUNET_NO,
  1391. task, task_cls);
  1392. }
  1393. /**
  1394. * Schedule a new task to be run with a specified delay or when the
  1395. * specified file descriptor is ready for writing. The delay can be
  1396. * used as a timeout on the socket being ready. The task will be
  1397. * scheduled for execution once either the delay has expired or the
  1398. * socket operation is ready. It will be run with the priority of
  1399. * the calling task.
  1400. * Only allowed to be called as long as the scheduler is running, that
  1401. * is one of the following conditions is met:
  1402. *
  1403. * - #GNUNET_SCHEDULER_run has been called and has not returned yet
  1404. * - #GNUNET_SCHEDULER_driver_init has been run and
  1405. * #GNUNET_SCHEDULER_driver_done has not been called yet
  1406. *
  1407. * @param delay when should this operation time out?
  1408. * @param wfd write file-descriptor
  1409. * @param task main function of the task
  1410. * @param task_cls closure of @a task
  1411. * @return unique task identifier for the job
  1412. * only valid until @a task is started!
  1413. */
  1414. struct GNUNET_SCHEDULER_Task *
  1415. GNUNET_SCHEDULER_add_write_net (struct GNUNET_TIME_Relative delay,
  1416. struct GNUNET_NETWORK_Handle *wfd,
  1417. GNUNET_SCHEDULER_TaskCallback task,
  1418. void *task_cls)
  1419. {
  1420. return GNUNET_SCHEDULER_add_net_with_priority (delay,
  1421. GNUNET_SCHEDULER_PRIORITY_DEFAULT,
  1422. wfd,
  1423. GNUNET_NO, GNUNET_YES,
  1424. task, task_cls);
  1425. }
  1426. /**
  1427. * Schedule a new task to be run with a specified delay or when the
  1428. * specified file descriptor is ready. The delay can be
  1429. * used as a timeout on the socket being ready. The task will be
  1430. * scheduled for execution once either the delay has expired or the
  1431. * socket operation is ready.
  1432. * Only allowed to be called as long as the scheduler is running, that
  1433. * is one of the following conditions is met:
  1434. *
  1435. * - #GNUNET_SCHEDULER_run has been called and has not returned yet
  1436. * - #GNUNET_SCHEDULER_driver_init has been run and
  1437. * #GNUNET_SCHEDULER_driver_done has not been called yet
  1438. *
  1439. * @param delay when should this operation time out?
  1440. * @param priority priority of the task
  1441. * @param fd file-descriptor
  1442. * @param on_read whether to poll the file-descriptor for readability
  1443. * @param on_write whether to poll the file-descriptor for writability
  1444. * @param task main function of the task
  1445. * @param task_cls closure of task
  1446. * @return unique task identifier for the job
  1447. * only valid until "task" is started!
  1448. */
  1449. struct GNUNET_SCHEDULER_Task *
  1450. GNUNET_SCHEDULER_add_net_with_priority (struct GNUNET_TIME_Relative delay,
  1451. enum GNUNET_SCHEDULER_Priority priority,
  1452. struct GNUNET_NETWORK_Handle *fd,
  1453. int on_read,
  1454. int on_write,
  1455. GNUNET_SCHEDULER_TaskCallback task,
  1456. void *task_cls)
  1457. {
  1458. /* scheduler must be running */
  1459. GNUNET_assert (NULL != scheduler_driver);
  1460. GNUNET_assert (on_read || on_write);
  1461. GNUNET_assert (GNUNET_NETWORK_get_fd (fd) >= 0);
  1462. return add_without_sets (delay, priority,
  1463. on_read ? fd : NULL,
  1464. on_write ? fd : NULL,
  1465. NULL,
  1466. NULL,
  1467. task, task_cls);
  1468. }
  1469. /**
  1470. * Schedule a new task to be run with a specified delay or when the
  1471. * specified file descriptor is ready for reading. The delay can be
  1472. * used as a timeout on the socket being ready. The task will be
  1473. * scheduled for execution once either the delay has expired or the
  1474. * socket operation is ready. It will be run with the DEFAULT priority.
  1475. * Only allowed to be called as long as the scheduler is running, that
  1476. * is one of the following conditions is met:
  1477. *
  1478. * - #GNUNET_SCHEDULER_run has been called and has not returned yet
  1479. * - #GNUNET_SCHEDULER_driver_init has been run and
  1480. * #GNUNET_SCHEDULER_driver_done has not been called yet
  1481. *
  1482. * @param delay when should this operation time out?
  1483. * @param rfd read file-descriptor
  1484. * @param task main function of the task
  1485. * @param task_cls closure of @a task
  1486. * @return unique task identifier for the job
  1487. * only valid until @a task is started!
  1488. */
  1489. struct GNUNET_SCHEDULER_Task *
  1490. GNUNET_SCHEDULER_add_read_file (struct GNUNET_TIME_Relative delay,
  1491. const struct GNUNET_DISK_FileHandle *rfd,
  1492. GNUNET_SCHEDULER_TaskCallback task,
  1493. void *task_cls)
  1494. {
  1495. return GNUNET_SCHEDULER_add_file_with_priority (
  1496. delay, GNUNET_SCHEDULER_PRIORITY_DEFAULT,
  1497. rfd, GNUNET_YES, GNUNET_NO,
  1498. task, task_cls);
  1499. }
  1500. /**
  1501. * Schedule a new task to be run with a specified delay or when the
  1502. * specified file descriptor is ready for writing. The delay can be
  1503. * used as a timeout on the socket being ready. The task will be
  1504. * scheduled for execution once either the delay has expired or the
  1505. * socket operation is ready. It will be run with the DEFAULT priority.
  1506. * Only allowed to be called as long as the scheduler is running, that
  1507. * is one of the following conditions is met:
  1508. *
  1509. * - #GNUNET_SCHEDULER_run has been called and has not returned yet
  1510. * - #GNUNET_SCHEDULER_driver_init has been run and
  1511. * #GNUNET_SCHEDULER_driver_done has not been called yet
  1512. *
  1513. * @param delay when should this operation time out?
  1514. * @param wfd write file-descriptor
  1515. * @param task main function of the task
  1516. * @param task_cls closure of @a task
  1517. * @return unique task identifier for the job
  1518. * only valid until @a task is started!
  1519. */
  1520. struct GNUNET_SCHEDULER_Task *
  1521. GNUNET_SCHEDULER_add_write_file (struct GNUNET_TIME_Relative delay,
  1522. const struct GNUNET_DISK_FileHandle *wfd,
  1523. GNUNET_SCHEDULER_TaskCallback task,
  1524. void *task_cls)
  1525. {
  1526. return GNUNET_SCHEDULER_add_file_with_priority (
  1527. delay, GNUNET_SCHEDULER_PRIORITY_DEFAULT,
  1528. wfd, GNUNET_NO, GNUNET_YES,
  1529. task, task_cls);
  1530. }
  1531. /**
  1532. * Schedule a new task to be run with a specified delay or when the
  1533. * specified file descriptor is ready. The delay can be
  1534. * used as a timeout on the socket being ready. The task will be
  1535. * scheduled for execution once either the delay has expired or the
  1536. * socket operation is ready.
  1537. * Only allowed to be called as long as the scheduler is running, that
  1538. * is one of the following conditions is met:
  1539. *
  1540. * - #GNUNET_SCHEDULER_run has been called and has not returned yet
  1541. * - #GNUNET_SCHEDULER_driver_init has been run and
  1542. * #GNUNET_SCHEDULER_driver_done has not been called yet
  1543. *
  1544. * @param delay when should this operation time out?
  1545. * @param priority priority of the task
  1546. * @param fd file-descriptor
  1547. * @param on_read whether to poll the file-descriptor for readability
  1548. * @param on_write whether to poll the file-descriptor for writability
  1549. * @param task main function of the task
  1550. * @param task_cls closure of @a task
  1551. * @return unique task identifier for the job
  1552. * only valid until @a task is started!
  1553. */
  1554. struct GNUNET_SCHEDULER_Task *
  1555. GNUNET_SCHEDULER_add_file_with_priority (struct GNUNET_TIME_Relative delay,
  1556. enum GNUNET_SCHEDULER_Priority
  1557. priority,
  1558. const struct
  1559. GNUNET_DISK_FileHandle *fd,
  1560. int on_read, int on_write,
  1561. GNUNET_SCHEDULER_TaskCallback task,
  1562. void *task_cls)
  1563. {
  1564. /* scheduler must be running */
  1565. GNUNET_assert (NULL != scheduler_driver);
  1566. GNUNET_assert (on_read || on_write);
  1567. GNUNET_assert (fd->fd >= 0);
  1568. return add_without_sets (delay, priority,
  1569. NULL,
  1570. NULL,
  1571. on_read ? fd : NULL,
  1572. on_write ? fd : NULL,
  1573. task, task_cls);
  1574. }
  1575. void
  1576. extract_handles (const struct GNUNET_NETWORK_FDSet *fdset,
  1577. const struct GNUNET_NETWORK_Handle ***ntarget,
  1578. unsigned int *extracted_nhandles,
  1579. const struct GNUNET_DISK_FileHandle ***ftarget,
  1580. unsigned int *extracted_fhandles)
  1581. {
  1582. // FIXME: this implementation only works for unix, for WIN32 the file handles
  1583. // in fdset must be handled separately
  1584. const struct GNUNET_NETWORK_Handle **nhandles;
  1585. const struct GNUNET_DISK_FileHandle **fhandles;
  1586. unsigned int nhandles_len;
  1587. unsigned int fhandles_len;
  1588. nhandles = NULL;
  1589. fhandles = NULL;
  1590. nhandles_len = 0;
  1591. fhandles_len = 0;
  1592. for (int sock = 0; sock != fdset->nsds; ++sock)
  1593. {
  1594. if (GNUNET_YES == GNUNET_NETWORK_fdset_test_native (fdset, sock))
  1595. {
  1596. struct GNUNET_NETWORK_Handle *nhandle;
  1597. struct GNUNET_DISK_FileHandle *fhandle;
  1598. nhandle = GNUNET_NETWORK_socket_box_native (sock);
  1599. if (NULL != nhandle)
  1600. {
  1601. GNUNET_array_append (nhandles, nhandles_len, nhandle);
  1602. }
  1603. else
  1604. {
  1605. fhandle = GNUNET_DISK_get_handle_from_int_fd (sock);
  1606. if (NULL != fhandle)
  1607. {
  1608. GNUNET_array_append (fhandles, fhandles_len, fhandle);
  1609. }
  1610. else
  1611. {
  1612. GNUNET_assert (0);
  1613. }
  1614. }
  1615. }
  1616. }
  1617. *ntarget = nhandles_len > 0 ? nhandles : NULL;
  1618. *ftarget = fhandles_len > 0 ? fhandles : NULL;
  1619. *extracted_nhandles = nhandles_len;
  1620. *extracted_fhandles = fhandles_len;
  1621. }
  1622. /**
  1623. * Schedule a new task to be run with a specified delay or when any of
  1624. * the specified file descriptor sets is ready. The delay can be used
  1625. * as a timeout on the socket(s) being ready. The task will be
  1626. * scheduled for execution once either the delay has expired or any of
  1627. * the socket operations is ready. This is the most general
  1628. * function of the "add" family. Note that the "prerequisite_task"
  1629. * must be satisfied in addition to any of the other conditions. In
  1630. * other words, the task will be started when
  1631. * <code>
  1632. * (prerequisite-run)
  1633. * && (delay-ready
  1634. * || any-rs-ready
  1635. * || any-ws-ready) )
  1636. * </code>
  1637. * Only allowed to be called as long as the scheduler is running, that
  1638. * is one of the following conditions is met:
  1639. *
  1640. * - #GNUNET_SCHEDULER_run has been called and has not returned yet
  1641. * - #GNUNET_SCHEDULER_driver_init has been run and
  1642. * #GNUNET_SCHEDULER_driver_done has not been called yet
  1643. *
  1644. * @param prio how important is this task?
  1645. * @param delay how long should we wait?
  1646. * @param rs set of file descriptors we want to read (can be NULL)
  1647. * @param ws set of file descriptors we want to write (can be NULL)
  1648. * @param task main function of the task
  1649. * @param task_cls closure of @a task
  1650. * @return unique task identifier for the job
  1651. * only valid until @a task is started!
  1652. */
  1653. struct GNUNET_SCHEDULER_Task *
  1654. GNUNET_SCHEDULER_add_select (enum GNUNET_SCHEDULER_Priority prio,
  1655. struct GNUNET_TIME_Relative delay,
  1656. const struct GNUNET_NETWORK_FDSet *rs,
  1657. const struct GNUNET_NETWORK_FDSet *ws,
  1658. GNUNET_SCHEDULER_TaskCallback task,
  1659. void *task_cls)
  1660. {
  1661. struct GNUNET_SCHEDULER_Task *t;
  1662. const struct GNUNET_NETWORK_Handle **read_nhandles = NULL;
  1663. const struct GNUNET_NETWORK_Handle **write_nhandles = NULL;
  1664. const struct GNUNET_DISK_FileHandle **read_fhandles = NULL;
  1665. const struct GNUNET_DISK_FileHandle **write_fhandles = NULL;
  1666. unsigned int read_nhandles_len = 0;
  1667. unsigned int write_nhandles_len = 0;
  1668. unsigned int read_fhandles_len = 0;
  1669. unsigned int write_fhandles_len = 0;
  1670. /* scheduler must be running */
  1671. GNUNET_assert (NULL != scheduler_driver);
  1672. GNUNET_assert (NULL != task);
  1673. int no_rs = (NULL == rs);
  1674. int no_ws = (NULL == ws);
  1675. int empty_rs = (NULL != rs) && (0 == rs->nsds);
  1676. int empty_ws = (NULL != ws) && (0 == ws->nsds);
  1677. int no_fds = (no_rs && no_ws) ||
  1678. (empty_rs && empty_ws) ||
  1679. (no_rs && empty_ws) ||
  1680. (no_ws && empty_rs);
  1681. if (! no_fds)
  1682. {
  1683. if (NULL != rs)
  1684. {
  1685. extract_handles (rs,
  1686. &read_nhandles,
  1687. &read_nhandles_len,
  1688. &read_fhandles,
  1689. &read_fhandles_len);
  1690. }
  1691. if (NULL != ws)
  1692. {
  1693. extract_handles (ws,
  1694. &write_nhandles,
  1695. &write_nhandles_len,
  1696. &write_fhandles,
  1697. &write_fhandles_len);
  1698. }
  1699. }
  1700. /**
  1701. * here we consider the case that a GNUNET_NETWORK_FDSet might be empty
  1702. * although its maximum FD number (nsds) is greater than 0. We handle
  1703. * this case gracefully because some libraries such as libmicrohttpd
  1704. * only provide a hint what the maximum FD number in an FD set might be
  1705. * and not the exact FD number (see e.g. gnunet-rest-service.c)
  1706. */int no_fds_extracted = (0 == read_nhandles_len) &&
  1707. (0 == read_fhandles_len) &&
  1708. (0 == write_nhandles_len) &&
  1709. (0 == write_fhandles_len);
  1710. if (no_fds || no_fds_extracted)
  1711. return GNUNET_SCHEDULER_add_delayed_with_priority (delay,
  1712. prio,
  1713. task,
  1714. task_cls);
  1715. t = GNUNET_new (struct GNUNET_SCHEDULER_Task);
  1716. GNUNET_async_scope_get (&t->scope);
  1717. init_fd_info (t,
  1718. read_nhandles,
  1719. read_nhandles_len,
  1720. write_nhandles,
  1721. write_nhandles_len,
  1722. read_fhandles,
  1723. read_fhandles_len,
  1724. write_fhandles,
  1725. write_fhandles_len);
  1726. t->callback = task;
  1727. t->callback_cls = task_cls;
  1728. t->own_handles = GNUNET_YES;
  1729. /* free the arrays of pointers to network / file handles, the actual
  1730. * handles will be freed in destroy_task */
  1731. GNUNET_array_grow (read_nhandles, read_nhandles_len, 0);
  1732. GNUNET_array_grow (write_nhandles, write_nhandles_len, 0);
  1733. GNUNET_array_grow (read_fhandles, read_fhandles_len, 0);
  1734. GNUNET_array_grow (write_fhandles, write_fhandles_len, 0);
  1735. #if PROFILE_DELAYS
  1736. t->start_time = GNUNET_TIME_absolute_get ();
  1737. #endif
  1738. t->timeout = GNUNET_TIME_relative_to_absolute (delay);
  1739. t->priority =
  1740. check_priority ((prio ==
  1741. GNUNET_SCHEDULER_PRIORITY_KEEP) ? current_priority :
  1742. prio);
  1743. t->lifeness = current_lifeness;
  1744. GNUNET_CONTAINER_DLL_insert (pending_head,
  1745. pending_tail,
  1746. t);
  1747. driver_add_multiple (t);
  1748. max_priority_added = GNUNET_MAX (max_priority_added,
  1749. t->priority);
  1750. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1751. "Adding task %p\n",
  1752. t);
  1753. init_backtrace (t);
  1754. return t;
  1755. }
  1756. /**
  1757. * Function used by event-loop implementations to signal the scheduler
  1758. * that a particular @a task is ready due to an event specified in the
  1759. * et field of @a fdi.
  1760. *
  1761. * This function will then queue the task to notify the application
  1762. * that the task is ready (with the respective priority).
  1763. *
  1764. * @param task the task that is ready
  1765. * @param fdi information about the related FD
  1766. */
  1767. void
  1768. GNUNET_SCHEDULER_task_ready (struct GNUNET_SCHEDULER_Task *task,
  1769. struct GNUNET_SCHEDULER_FdInfo *fdi)
  1770. {
  1771. enum GNUNET_SCHEDULER_Reason reason;
  1772. reason = task->reason;
  1773. if ((0 == (reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
  1774. (0 != (GNUNET_SCHEDULER_ET_IN & fdi->et)))
  1775. reason |= GNUNET_SCHEDULER_REASON_READ_READY;
  1776. if ((0 == (reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) &&
  1777. (0 != (GNUNET_SCHEDULER_ET_OUT & fdi->et)))
  1778. reason |= GNUNET_SCHEDULER_REASON_WRITE_READY;
  1779. reason |= GNUNET_SCHEDULER_REASON_PREREQ_DONE;
  1780. task->reason = reason;
  1781. if (GNUNET_NO == task->in_ready_list)
  1782. {
  1783. GNUNET_CONTAINER_DLL_remove (pending_head,
  1784. pending_tail,
  1785. task);
  1786. queue_ready_task (task);
  1787. }
  1788. }
  1789. /**
  1790. * Function called by external event loop implementations to tell the
  1791. * scheduler to run some of the tasks that are ready. Must be called
  1792. * only after #GNUNET_SCHEDULER_driver_init has been called and before
  1793. * #GNUNET_SCHEDULER_driver_done is called.
  1794. * This function may return even though there are tasks left to run
  1795. * just to give other tasks a chance as well. If we return #GNUNET_YES,
  1796. * the event loop implementation should call this function again as
  1797. * soon as possible, while if we return #GNUNET_NO it must block until
  1798. * either the operating system has more work (the scheduler has no more
  1799. * work to do right now) or the timeout set by the scheduler (using the
  1800. * set_wakeup callback) is reached.
  1801. *
  1802. * @param sh scheduler handle that was returned by
  1803. * #GNUNET_SCHEDULER_driver_init
  1804. * @return #GNUNET_YES if there are more tasks that are ready,
  1805. * and thus we would like to run more (yield to avoid
  1806. * blocking other activities for too long) #GNUNET_NO
  1807. * if we are done running tasks (yield to block)
  1808. */
  1809. int
  1810. GNUNET_SCHEDULER_do_work (struct GNUNET_SCHEDULER_Handle *sh)
  1811. {
  1812. struct GNUNET_SCHEDULER_Task *pos;
  1813. struct GNUNET_TIME_Absolute now;
  1814. /* check for tasks that reached the timeout! */
  1815. now = GNUNET_TIME_absolute_get ();
  1816. pos = pending_timeout_head;
  1817. while (NULL != pos)
  1818. {
  1819. struct GNUNET_SCHEDULER_Task *next = pos->next;
  1820. if (now.abs_value_us >= pos->timeout.abs_value_us)
  1821. pos->reason |= GNUNET_SCHEDULER_REASON_TIMEOUT;
  1822. if (0 == pos->reason)
  1823. break;
  1824. GNUNET_CONTAINER_DLL_remove (pending_timeout_head,
  1825. pending_timeout_tail,
  1826. pos);
  1827. if (pending_timeout_last == pos)
  1828. pending_timeout_last = NULL;
  1829. queue_ready_task (pos);
  1830. pos = next;
  1831. }
  1832. pos = pending_head;
  1833. while (NULL != pos)
  1834. {
  1835. struct GNUNET_SCHEDULER_Task *next = pos->next;
  1836. if (now.abs_value_us >= pos->timeout.abs_value_us)
  1837. {
  1838. pos->reason |= GNUNET_SCHEDULER_REASON_TIMEOUT;
  1839. GNUNET_CONTAINER_DLL_remove (pending_head,
  1840. pending_tail,
  1841. pos);
  1842. queue_ready_task (pos);
  1843. }
  1844. pos = next;
  1845. }
  1846. if (0 == ready_count)
  1847. {
  1848. struct GNUNET_TIME_Absolute timeout = get_timeout ();
  1849. if (timeout.abs_value_us > now.abs_value_us)
  1850. {
  1851. /**
  1852. * The event loop called this function before the current timeout was
  1853. * reached (and no FD tasks are ready). This is acceptable if
  1854. *
  1855. * - the system time was changed while the driver was waiting for
  1856. * the timeout
  1857. * - an external event loop called GNUnet API functions outside of
  1858. * the callbacks called in GNUNET_SCHEDULER_do_work and thus
  1859. * wasn't notified about the new timeout
  1860. *
  1861. * It might also mean we are busy-waiting because of a programming
  1862. * error in the external event loop.
  1863. */LOG (GNUNET_ERROR_TYPE_DEBUG,
  1864. "GNUNET_SCHEDULER_do_work did not find any ready "
  1865. "tasks and timeout has not been reached yet.\n");
  1866. }
  1867. else
  1868. {
  1869. /**
  1870. * the current timeout was reached but no ready tasks were found,
  1871. * internal scheduler error!
  1872. */
  1873. GNUNET_assert (0);
  1874. }
  1875. }
  1876. else
  1877. {
  1878. /* find out which task priority level we are going to
  1879. process this time */
  1880. max_priority_added = GNUNET_SCHEDULER_PRIORITY_KEEP;
  1881. GNUNET_assert (NULL == ready_head[GNUNET_SCHEDULER_PRIORITY_KEEP]);
  1882. /* yes, p>0 is correct, 0 is "KEEP" which should
  1883. * always be an empty queue (see assertion)! */
  1884. for (work_priority = GNUNET_SCHEDULER_PRIORITY_COUNT - 1;
  1885. work_priority > 0;
  1886. work_priority--)
  1887. {
  1888. pos = ready_head[work_priority];
  1889. if (NULL != pos)
  1890. break;
  1891. }
  1892. GNUNET_assert (NULL != pos); /* ready_count wrong? */
  1893. /* process all tasks at this priority level, then yield */
  1894. while (NULL != (pos = ready_head[work_priority]))
  1895. {
  1896. GNUNET_CONTAINER_DLL_remove (ready_head[work_priority],
  1897. ready_tail[work_priority],
  1898. pos);
  1899. ready_count--;
  1900. current_priority = pos->priority;
  1901. current_lifeness = pos->lifeness;
  1902. active_task = pos;
  1903. #if PROFILE_DELAYS
  1904. if (GNUNET_TIME_absolute_get_duration (pos->start_time).rel_value_us >
  1905. DELAY_THRESHOLD.rel_value_us)
  1906. {
  1907. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1908. "Task %p took %s to be scheduled\n",
  1909. pos,
  1910. GNUNET_STRINGS_relative_time_to_string (
  1911. GNUNET_TIME_absolute_get_duration (pos->start_time),
  1912. GNUNET_YES));
  1913. }
  1914. #endif
  1915. tc.reason = pos->reason;
  1916. GNUNET_NETWORK_fdset_zero (sh->rs);
  1917. GNUNET_NETWORK_fdset_zero (sh->ws);
  1918. // FIXME: do we have to remove FdInfos from fds if they are not ready?
  1919. tc.fds_len = pos->fds_len;
  1920. tc.fds = pos->fds;
  1921. for (unsigned int i = 0; i != pos->fds_len; ++i)
  1922. {
  1923. struct GNUNET_SCHEDULER_FdInfo *fdi = &pos->fds[i];
  1924. if (0 != (GNUNET_SCHEDULER_ET_IN & fdi->et))
  1925. {
  1926. GNUNET_NETWORK_fdset_set_native (sh->rs,
  1927. fdi->sock);
  1928. }
  1929. if (0 != (GNUNET_SCHEDULER_ET_OUT & fdi->et))
  1930. {
  1931. GNUNET_NETWORK_fdset_set_native (sh->ws,
  1932. fdi->sock);
  1933. }
  1934. }
  1935. tc.read_ready = sh->rs;
  1936. tc.write_ready = sh->ws;
  1937. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1938. "Running task %p\n",
  1939. pos);
  1940. GNUNET_assert (NULL != pos->callback);
  1941. {
  1942. struct GNUNET_AsyncScopeSave old_scope;
  1943. if (pos->scope.have_scope)
  1944. GNUNET_async_scope_enter (&pos->scope.scope_id, &old_scope);
  1945. else
  1946. GNUNET_async_scope_get (&old_scope);
  1947. pos->callback (pos->callback_cls);
  1948. GNUNET_async_scope_restore (&old_scope);
  1949. }
  1950. if (NULL != pos->fds)
  1951. {
  1952. int del_result = scheduler_driver->del (scheduler_driver->cls, pos);
  1953. if (GNUNET_OK != del_result)
  1954. {
  1955. LOG (GNUNET_ERROR_TYPE_ERROR,
  1956. "driver could not delete task %p\n", pos);
  1957. GNUNET_assert (0);
  1958. }
  1959. }
  1960. active_task = NULL;
  1961. dump_backtrace (pos);
  1962. destroy_task (pos);
  1963. }
  1964. }
  1965. shutdown_if_no_lifeness ();
  1966. if (0 == ready_count)
  1967. {
  1968. scheduler_driver->set_wakeup (scheduler_driver->cls,
  1969. get_timeout ());
  1970. return GNUNET_NO;
  1971. }
  1972. scheduler_driver->set_wakeup (scheduler_driver->cls,
  1973. GNUNET_TIME_absolute_get ());
  1974. return GNUNET_YES;
  1975. }
  1976. /**
  1977. * Function called by external event loop implementations to initialize
  1978. * the scheduler. An external implementation has to provide @a driver
  1979. * which contains callbacks for the scheduler (see definition of struct
  1980. * #GNUNET_SCHEDULER_Driver). The callbacks are used to instruct the
  1981. * external implementation to watch for events. If it detects any of
  1982. * those events it is expected to call #GNUNET_SCHEDULER_do_work to let
  1983. * the scheduler handle it. If an event is related to a specific task
  1984. * (e.g. the scheduler gave instructions to watch a file descriptor),
  1985. * the external implementation is expected to mark that task ready
  1986. * before by calling #GNUNET_SCHEDULER_task_ready.
  1987. * This function has to be called before any tasks are scheduled and
  1988. * before GNUNET_SCHEDULER_do_work is called for the first time. It
  1989. * allocates resources that have to be freed again by calling
  1990. * #GNUNET_SCHEDULER_driver_done.
  1991. *
  1992. * This function installs the same signal handlers as
  1993. * #GNUNET_SCHEDULER_run. This means SIGTERM (and other similar signals)
  1994. * will induce a call to #GNUNET_SCHEDULER_shutdown during the next
  1995. * call to #GNUNET_SCHEDULER_do_work. As a result, SIGTERM causes all
  1996. * active tasks to be scheduled with reason
  1997. * #GNUNET_SCHEDULER_REASON_SHUTDOWN. (However, tasks added afterwards
  1998. * will execute normally!). Note that any particular signal will only
  1999. * shut down one scheduler; applications should always only create a
  2000. * single scheduler.
  2001. *
  2002. * @param driver to use for the event loop
  2003. * @return handle to be passed to #GNUNET_SCHEDULER_do_work and
  2004. * #GNUNET_SCHEDULER_driver_done
  2005. */
  2006. struct GNUNET_SCHEDULER_Handle *
  2007. GNUNET_SCHEDULER_driver_init (const struct GNUNET_SCHEDULER_Driver *driver)
  2008. {
  2009. struct GNUNET_SCHEDULER_Handle *sh;
  2010. const struct GNUNET_DISK_FileHandle *pr;
  2011. /* scheduler must not be running */
  2012. GNUNET_assert (NULL == scheduler_driver);
  2013. GNUNET_assert (NULL == shutdown_pipe_handle);
  2014. /* general set-up */
  2015. sh = GNUNET_new (struct GNUNET_SCHEDULER_Handle);
  2016. shutdown_pipe_handle = GNUNET_DISK_pipe (GNUNET_DISK_PF_NONE);
  2017. GNUNET_assert (NULL != shutdown_pipe_handle);
  2018. pr = GNUNET_DISK_pipe_handle (shutdown_pipe_handle,
  2019. GNUNET_DISK_PIPE_END_READ);
  2020. my_pid = getpid ();
  2021. scheduler_driver = driver;
  2022. /* install signal handlers */
  2023. LOG (GNUNET_ERROR_TYPE_DEBUG,
  2024. "Registering signal handlers\n");
  2025. sh->shc_int = GNUNET_SIGNAL_handler_install (SIGINT,
  2026. &sighandler_shutdown);
  2027. sh->shc_term = GNUNET_SIGNAL_handler_install (SIGTERM,
  2028. &sighandler_shutdown);
  2029. #if (SIGTERM != GNUNET_TERM_SIG)
  2030. sh->shc_gterm = GNUNET_SIGNAL_handler_install (GNUNET_TERM_SIG,
  2031. &sighandler_shutdown);
  2032. #endif
  2033. sh->shc_pipe = GNUNET_SIGNAL_handler_install (SIGPIPE,
  2034. &sighandler_pipe);
  2035. sh->shc_quit = GNUNET_SIGNAL_handler_install (SIGQUIT,
  2036. &sighandler_shutdown);
  2037. sh->shc_hup = GNUNET_SIGNAL_handler_install (SIGHUP,
  2038. &sighandler_shutdown);
  2039. /* Setup initial tasks */
  2040. current_priority = GNUNET_SCHEDULER_PRIORITY_DEFAULT;
  2041. current_lifeness = GNUNET_NO;
  2042. /* ensure this task runs first, by using a priority level reserved for
  2043. the scheduler (not really shutdown, but start-up ;-) */
  2044. install_parent_control_task =
  2045. GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_SHUTDOWN,
  2046. &install_parent_control_handler,
  2047. NULL);
  2048. shutdown_pipe_task =
  2049. GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL,
  2050. pr,
  2051. &shutdown_pipe_cb,
  2052. NULL);
  2053. current_lifeness = GNUNET_YES;
  2054. scheduler_driver->set_wakeup (scheduler_driver->cls,
  2055. get_timeout ());
  2056. /* begin main event loop */
  2057. sh->rs = GNUNET_NETWORK_fdset_create ();
  2058. sh->ws = GNUNET_NETWORK_fdset_create ();
  2059. GNUNET_NETWORK_fdset_handle_set (sh->rs, pr);
  2060. return sh;
  2061. }
  2062. /**
  2063. * Counter-part of #GNUNET_SCHEDULER_driver_init. Has to be called
  2064. * by external event loop implementations after the scheduler has
  2065. * shut down. This is the case if both of the following conditions
  2066. * are met:
  2067. *
  2068. * - all tasks the scheduler has added through the driver's add
  2069. * callback have been removed again through the driver's del
  2070. * callback
  2071. * - the timeout the scheduler has set through the driver's
  2072. * add_wakeup callback is FOREVER
  2073. *
  2074. * @param sh the handle returned by #GNUNET_SCHEDULER_driver_init
  2075. */
  2076. void
  2077. GNUNET_SCHEDULER_driver_done (struct GNUNET_SCHEDULER_Handle *sh)
  2078. {
  2079. GNUNET_assert (NULL == pending_head);
  2080. GNUNET_assert (NULL == pending_timeout_head);
  2081. GNUNET_assert (NULL == shutdown_head);
  2082. for (int i = 0; i != GNUNET_SCHEDULER_PRIORITY_COUNT; ++i)
  2083. {
  2084. GNUNET_assert (NULL == ready_head[i]);
  2085. }
  2086. GNUNET_NETWORK_fdset_destroy (sh->rs);
  2087. GNUNET_NETWORK_fdset_destroy (sh->ws);
  2088. /* uninstall signal handlers */
  2089. GNUNET_SIGNAL_handler_uninstall (sh->shc_int);
  2090. GNUNET_SIGNAL_handler_uninstall (sh->shc_term);
  2091. #if (SIGTERM != GNUNET_TERM_SIG)
  2092. GNUNET_SIGNAL_handler_uninstall (sh->shc_gterm);
  2093. #endif
  2094. GNUNET_SIGNAL_handler_uninstall (sh->shc_pipe);
  2095. GNUNET_SIGNAL_handler_uninstall (sh->shc_quit);
  2096. GNUNET_SIGNAL_handler_uninstall (sh->shc_hup);
  2097. GNUNET_DISK_pipe_close (shutdown_pipe_handle);
  2098. shutdown_pipe_handle = NULL;
  2099. scheduler_driver = NULL;
  2100. GNUNET_free (sh);
  2101. }
  2102. static int
  2103. select_loop (struct GNUNET_SCHEDULER_Handle *sh,
  2104. struct DriverContext *context)
  2105. {
  2106. struct GNUNET_NETWORK_FDSet *rs;
  2107. struct GNUNET_NETWORK_FDSet *ws;
  2108. int select_result;
  2109. GNUNET_assert (NULL != context);
  2110. rs = GNUNET_NETWORK_fdset_create ();
  2111. ws = GNUNET_NETWORK_fdset_create ();
  2112. while ((NULL != context->scheduled_head) ||
  2113. (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us !=
  2114. context->timeout.abs_value_us))
  2115. {
  2116. struct GNUNET_TIME_Relative time_remaining;
  2117. LOG (GNUNET_ERROR_TYPE_DEBUG,
  2118. "select timeout = %s\n",
  2119. GNUNET_STRINGS_absolute_time_to_string (context->timeout));
  2120. GNUNET_NETWORK_fdset_zero (rs);
  2121. GNUNET_NETWORK_fdset_zero (ws);
  2122. for (struct Scheduled *pos = context->scheduled_head;
  2123. NULL != pos;
  2124. pos = pos->next)
  2125. {
  2126. if (0 != (GNUNET_SCHEDULER_ET_IN & pos->et))
  2127. {
  2128. GNUNET_NETWORK_fdset_set_native (rs, pos->fdi->sock);
  2129. }
  2130. if (0 != (GNUNET_SCHEDULER_ET_OUT & pos->et))
  2131. {
  2132. GNUNET_NETWORK_fdset_set_native (ws, pos->fdi->sock);
  2133. }
  2134. }
  2135. time_remaining = GNUNET_TIME_absolute_get_remaining (context->timeout);
  2136. if (0 < ready_count)
  2137. time_remaining = GNUNET_TIME_UNIT_ZERO;
  2138. if (NULL == scheduler_select)
  2139. {
  2140. select_result = GNUNET_NETWORK_socket_select (rs,
  2141. ws,
  2142. NULL,
  2143. time_remaining);
  2144. }
  2145. else
  2146. {
  2147. select_result = scheduler_select (scheduler_select_cls,
  2148. rs,
  2149. ws,
  2150. NULL,
  2151. time_remaining);
  2152. }
  2153. if (select_result == GNUNET_SYSERR)
  2154. {
  2155. if (errno == EINTR)
  2156. continue;
  2157. LOG_STRERROR (GNUNET_ERROR_TYPE_ERROR,
  2158. "select");
  2159. #if USE_LSOF
  2160. char lsof[512];
  2161. snprintf (lsof,
  2162. sizeof(lsof),
  2163. "lsof -p %d",
  2164. getpid ());
  2165. (void) close (1);
  2166. (void) dup2 (2, 1);
  2167. if (0 != system (lsof))
  2168. LOG_STRERROR (GNUNET_ERROR_TYPE_WARNING,
  2169. "system");
  2170. #endif
  2171. #if DEBUG_FDS
  2172. for (struct Scheduled *s = context->scheduled_head;
  2173. NULL != s;
  2174. s = s->next)
  2175. {
  2176. int flags = fcntl (s->fdi->sock,
  2177. F_GETFD);
  2178. if ((flags == -1) &&
  2179. (EBADF == errno))
  2180. {
  2181. LOG (GNUNET_ERROR_TYPE_ERROR,
  2182. "Got invalid file descriptor %d!\n",
  2183. s->fdi->sock);
  2184. #if EXECINFO
  2185. dump_backtrace (s->task);
  2186. #endif
  2187. }
  2188. }
  2189. #endif
  2190. GNUNET_assert (0);
  2191. GNUNET_NETWORK_fdset_destroy (rs);
  2192. GNUNET_NETWORK_fdset_destroy (ws);
  2193. return GNUNET_SYSERR;
  2194. }
  2195. if (select_result > 0)
  2196. {
  2197. for (struct Scheduled *pos = context->scheduled_head;
  2198. NULL != pos;
  2199. pos = pos->next)
  2200. {
  2201. int is_ready = GNUNET_NO;
  2202. if ((0 != (GNUNET_SCHEDULER_ET_IN & pos->et)) &&
  2203. (GNUNET_YES ==
  2204. GNUNET_NETWORK_fdset_test_native (rs,
  2205. pos->fdi->sock)) )
  2206. {
  2207. pos->fdi->et |= GNUNET_SCHEDULER_ET_IN;
  2208. is_ready = GNUNET_YES;
  2209. }
  2210. if ((0 != (GNUNET_SCHEDULER_ET_OUT & pos->et)) &&
  2211. (GNUNET_YES ==
  2212. GNUNET_NETWORK_fdset_test_native (ws,
  2213. pos->fdi->sock)) )
  2214. {
  2215. pos->fdi->et |= GNUNET_SCHEDULER_ET_OUT;
  2216. is_ready = GNUNET_YES;
  2217. }
  2218. if (GNUNET_YES == is_ready)
  2219. {
  2220. GNUNET_SCHEDULER_task_ready (pos->task,
  2221. pos->fdi);
  2222. }
  2223. }
  2224. }
  2225. if (GNUNET_YES == GNUNET_SCHEDULER_do_work (sh))
  2226. {
  2227. LOG (GNUNET_ERROR_TYPE_DEBUG,
  2228. "scheduler has more tasks ready!\n");
  2229. }
  2230. }
  2231. GNUNET_NETWORK_fdset_destroy (rs);
  2232. GNUNET_NETWORK_fdset_destroy (ws);
  2233. return GNUNET_OK;
  2234. }
  2235. static int
  2236. select_add (void *cls,
  2237. struct GNUNET_SCHEDULER_Task *task,
  2238. struct GNUNET_SCHEDULER_FdInfo *fdi)
  2239. {
  2240. struct DriverContext *context = cls;
  2241. GNUNET_assert (NULL != context);
  2242. GNUNET_assert (NULL != task);
  2243. GNUNET_assert (NULL != fdi);
  2244. GNUNET_assert (0 != (GNUNET_SCHEDULER_ET_IN & fdi->et) ||
  2245. 0 != (GNUNET_SCHEDULER_ET_OUT & fdi->et));
  2246. if (! ((NULL != fdi->fd) ^ (NULL != fdi->fh)) || (fdi->sock < 0))
  2247. {
  2248. /* exactly one out of {fd, hf} must be != NULL and the OS handle must be valid */
  2249. return GNUNET_SYSERR;
  2250. }
  2251. struct Scheduled *scheduled = GNUNET_new (struct Scheduled);
  2252. scheduled->task = task;
  2253. scheduled->fdi = fdi;
  2254. scheduled->et = fdi->et;
  2255. GNUNET_CONTAINER_DLL_insert (context->scheduled_head,
  2256. context->scheduled_tail,
  2257. scheduled);
  2258. return GNUNET_OK;
  2259. }
  2260. static int
  2261. select_del (void *cls,
  2262. struct GNUNET_SCHEDULER_Task *task)
  2263. {
  2264. struct DriverContext *context;
  2265. struct Scheduled *pos;
  2266. int ret;
  2267. GNUNET_assert (NULL != cls);
  2268. context = cls;
  2269. ret = GNUNET_SYSERR;
  2270. pos = context->scheduled_head;
  2271. while (NULL != pos)
  2272. {
  2273. struct Scheduled *next = pos->next;
  2274. if (pos->task == task)
  2275. {
  2276. GNUNET_CONTAINER_DLL_remove (context->scheduled_head,
  2277. context->scheduled_tail,
  2278. pos);
  2279. GNUNET_free (pos);
  2280. ret = GNUNET_OK;
  2281. }
  2282. pos = next;
  2283. }
  2284. return ret;
  2285. }
  2286. static void
  2287. select_set_wakeup (void *cls,
  2288. struct GNUNET_TIME_Absolute dt)
  2289. {
  2290. struct DriverContext *context = cls;
  2291. GNUNET_assert (NULL != context);
  2292. context->timeout = dt;
  2293. }
  2294. /**
  2295. * Obtain the driver for using select() as the event loop.
  2296. *
  2297. * @return NULL on error
  2298. */
  2299. struct GNUNET_SCHEDULER_Driver *
  2300. GNUNET_SCHEDULER_driver_select ()
  2301. {
  2302. struct GNUNET_SCHEDULER_Driver *select_driver;
  2303. select_driver = GNUNET_new (struct GNUNET_SCHEDULER_Driver);
  2304. select_driver->add = &select_add;
  2305. select_driver->del = &select_del;
  2306. select_driver->set_wakeup = &select_set_wakeup;
  2307. return select_driver;
  2308. }
  2309. /**
  2310. * Change the async scope for the currently executing task and (transitively)
  2311. * for all tasks scheduled by the current task after calling this function.
  2312. * Nested tasks can begin their own nested async scope.
  2313. *
  2314. * Once the current task is finished, the async scope ID is reset to
  2315. * its previous value.
  2316. *
  2317. * Must only be called from a running task.
  2318. *
  2319. * @param aid the asynchronous scope id to enter
  2320. */
  2321. void
  2322. GNUNET_SCHEDULER_begin_async_scope (struct GNUNET_AsyncScopeId *aid)
  2323. {
  2324. struct GNUNET_AsyncScopeSave dummy_old_scope;
  2325. GNUNET_assert (NULL != active_task);
  2326. /* Since we're in a task, the context will be automatically
  2327. restored by the scheduler. */
  2328. GNUNET_async_scope_enter (aid, &dummy_old_scope);
  2329. }
  2330. /* end of scheduler.c */