[Spread-cvs] commit: r589 - in branches/experimental-4.3-threaded/libspread-util: include src

jschultz at spread.org jschultz at spread.org
Tue Sep 17 15:17:50 EDT 2013


Author: jschultz
Date: 2013-09-17 15:17:50 -0400 (Tue, 17 Sep 2013)
New Revision: 589

Modified:
   branches/experimental-4.3-threaded/libspread-util/include/spu_events.h
   branches/experimental-4.3-threaded/libspread-util/src/events.c
Log:
Changes to new events to export events loop object + fcns.


Modified: branches/experimental-4.3-threaded/libspread-util/include/spu_events.h
===================================================================
--- branches/experimental-4.3-threaded/libspread-util/include/spu_events.h	2013-09-17 17:17:05 UTC (rev 588)
+++ branches/experimental-4.3-threaded/libspread-util/include/spu_events.h	2013-09-17 19:17:50 UTC (rev 589)
@@ -35,6 +35,10 @@
 #ifndef INC_SP_EVENTS
 #define INC_SP_EVENTS
 
+#include "stdutil/stddefines.h"
+#include "stdutil/stddll.h"
+#include "stdutil/stdskl.h"
+
 /* Raise MAX_FD_EVENTS AND RECOMPILE events.c to handle more active
    fd's.  This number can limit the number of connections that can be
    handled.
@@ -62,6 +66,7 @@
 /* Time routines */
 
 sp_time E_get_time( void );
+sp_time E_neg_time( sp_time t );
 sp_time E_add_time( sp_time t, sp_time delta_t );
 sp_time E_sub_time( sp_time t, sp_time delta_t );
 int     E_compare_time( sp_time t1, sp_time t2 );
@@ -89,13 +94,103 @@
 int     E_activate_fd( int fd, int fd_type );
 int     E_deactivate_fd( int fd, int fd_type );
 
-sp_time E_get_loop_time( void );
-sp_time E_update_loop_time( void );
-
 int     E_set_elevate_count( int priority, unsigned count );
 int     E_get_elevate_count( int priority, unsigned *count );
 
 int     E_handle_events( void );
 void    E_exit_events( void );
 
+/***************************************************************************************************************************
+ * TODO: Hide implementation details in a private .h file.
+ ***************************************************************************************************************************/
+
+#define E_MIN_PRIORITY        0
+
+#define E_DELTA_TIMER         3
+#define E_ABSOLUTE_TIMER      4
+#define E_PERIODIC_TIMER      5
+
+typedef int      E_priority;
+typedef int      E_watch_type;
+typedef stdint32 E_watch_id;
+typedef void   (*E_callback_fcn)();
+typedef void   (*E_fd_callback_fcn)(int fd, int code, void *data);
+typedef void   (*E_time_callback_fcn)(int code, void *data);
+
+typedef enum
+{
+  E_BACKEND_UNSPECIFIED,
+  E_BACKEND_SELECT,
+  E_BACKEND_EPOLL,
+
+} E_backend_type;
+
+typedef struct
+{
+  E_priority priority;           /* priority associated with this lvl */
+  stddll     notices;            /* <E_watch*>: watches at this priority that have triggered / been noticed */
+
+  unsigned   starve_thresh;      /* threshold that determines if/when this priority elevates to "preempt" execution */
+  unsigned   starve_cnter;       /* increments once for every non-preempt notice dispatch from a different priority */
+
+  int        num_active_fd_watches;  /* track how many active fd watches are registered at this particular priority */
+
+  void      *backend_lvl_info;   /* backend specific information for this priority */
+
+} E_priority_info;
+
+typedef struct 
+{
+  stdbool          keep_running;       /* whether or not the current call to E_events_handle_events() should keep looping */
+  E_priority       max_priority;       /* highest priority lvl in the system */
+  E_priority       active_threshold;   /* minimum priority lvl that is currently being monitored */
+  
+  sp_time          last_clock_time;    /* most recent time returned from the clock for this system */
+  sp_time          last_loop_time;     /* most recent time when we returned from an OS query */
+  
+  stdskl           time_watches;       /* <E_time_watch* -> nil>: ordered by E_time_watch_ptr_cmp: [fcn, code, data] */
+  stdskl           fd_watches;         /* <int -> E_fd_watch*>: fds ordered by E_fd_cmp */
+  
+  E_priority_info *priorities;         /* array of size max_priority: notification Qs and other per priority lvl info */
+  
+  /* notification generation mechanisms */
+  
+  stdskl           timeouts;           /* <sp_time -> E_time_watch*>: absolute timeouts ordered by E_sp_time_cmp */
+
+  E_backend_type   backend_type;       /* method used by this sytem to query fds */
+  void            *backend_info;       /* backend specific info for this system */
+
+} E_events;
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
+int  E_events_init(E_events *e);
+void E_events_fini(E_events *e);
+
+int  E_events_in_queue(E_events *e,       E_time_callback_fcn fcn, int code, void *data);
+int  E_events_queue(E_events *e,          E_time_callback_fcn fcn, int code, void *data, sp_time delta_time);
+int  E_events_queue_absolute(E_events *e, E_time_callback_fcn fcn, int code, void *data, sp_time absolute_time);
+int  E_events_queue_periodic(E_events *e, E_time_callback_fcn fcn, int code, void *data, sp_time periodic_time);
+int  E_events_dequeue(E_events *e,        E_time_callback_fcn fcn, int code, void *data);
+
+int  E_events_attach_fd(E_events *e,          int fd, E_watch_type fd_type, E_fd_callback_fcn f, int code, void *d, E_priority p);
+int  E_events_detach_fd(E_events *e,          int fd, E_watch_type fd_type);
+int  E_events_detach_fd_priority(E_events *e, int fd, E_watch_type fd_type, E_priority priority);
+
+int  E_events_set_active_threshold(E_events *e, E_priority priority);
+int  E_events_num_active(E_events *e,           E_priority priority);
+
+int  E_events_activate_fd(E_events *e,   int fd, E_watch_type fd_type);
+int  E_events_deactivate_fd(E_events *e, int fd, E_watch_type fd_type);
+
+int  E_events_set_elevate_count(E_events *e, E_priority priority, unsigned count);
+int  E_events_get_elevate_count(E_events *e, E_priority priority, unsigned *count);
+
+int  E_events_handle_events(E_events *e);
+void E_events_exit_events(E_events *e);
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
 #endif

Modified: branches/experimental-4.3-threaded/libspread-util/src/events.c
===================================================================
--- branches/experimental-4.3-threaded/libspread-util/src/events.c	2013-09-17 17:17:05 UTC (rev 588)
+++ branches/experimental-4.3-threaded/libspread-util/src/events.c	2013-09-17 19:17:50 UTC (rev 589)
@@ -24,21 +24,11 @@
 
 #endif
 
-#include <stdutil/stddefines.h>
-#include <stdutil/stddll.h>
-#include <stdutil/stdskl.h>
-
 #include "spu_events.h"
 
 /***************************************************************************************************************************
  ***************************************************************************************************************************/
 
-#define E_MIN_PRIORITY        0
-
-#define E_DELTA_TIMER         3
-#define E_ABSOLUTE_TIMER      4
-#define E_PERIODIC_TIMER      5
-
 #define E_EPOLL_INITIAL_SIZE 32
 
 enum 
@@ -55,30 +45,11 @@
 
   E_UNKNOWN = -1,
   E_REPOPULATE = 1,
-
 };
 
-typedef enum
-{
-  E_BACKEND_UNSPECIFIED,
-  E_BACKEND_SELECT,
-  E_BACKEND_EPOLL,
-
-} E_backend_type;
-
 /***************************************************************************************************************************
  ***************************************************************************************************************************/
 
-typedef stdint32 E_watch_id;
-typedef int      E_watch_type;
-typedef int      E_priority;
-typedef void   (*E_callback_fcn)();
-typedef void   (*E_fd_callback_fcn)(int fd, int code, void *data);
-typedef void   (*E_time_callback_fcn)(int code, void *data);
-
-/***************************************************************************************************************************
- ***************************************************************************************************************************/
-
 typedef struct
 {
   E_watch_type   watch_type;
@@ -125,23 +96,6 @@
 /***************************************************************************************************************************
  ***************************************************************************************************************************/
 
-typedef struct
-{
-  E_priority priority;           /* priority associated with this lvl */
-  stddll     notices;            /* <E_watch*>: watches at this priority that have triggered / been noticed */
-
-  unsigned   starve_thresh;      /* threshold that determines if/when this priority elevates to "preempt" execution */
-  unsigned   starve_cnter;       /* increments once for every non-preempt notice dispatch from a different priority */
-
-  int        num_active_fd_watches;  /* track how many active fd watches are registered at this particular priority */
-
-  void      *backend_lvl_info;   /* backend specific information for this priority */
-
-} E_priority_info;
-
-/***************************************************************************************************************************
- ***************************************************************************************************************************/
-
 #ifdef HAVE_LINUX
 
 typedef struct
@@ -179,50 +133,19 @@
 /***************************************************************************************************************************
  ***************************************************************************************************************************/
 
-typedef struct 
-{
-  stdbool          keep_running;       /* whether or not the current call to E_events_handle_events() should keep looping */
-  E_priority       max_priority;       /* highest priority lvl in the system */
-  E_priority       active_threshold;   /* minimum priority lvl that is currently being monitored */
-  
-  sp_time          last_clock_time;    /* most recent time returned from the clock for this system */
-  sp_time          last_loop_time;     /* most recent time when we returned from an OS query */
-  
-  stdskl           time_watches;       /* <E_time_watch* -> nil>: ordered by E_time_watch_ptr_cmp: [fcn, code, data] */
-  stdskl           fd_watches;         /* <int -> E_fd_watch*>: fds ordered by E_fd_cmp */
-  
-  E_priority_info *priorities;         /* array of size max_priority: notification Qs and other per priority lvl info */
-  
-  /* notification generation mechanisms */
-  
-  stdskl           timeouts;           /* <sp_time -> E_time_watch*>: absolute timeouts ordered by E_sp_time_cmp */
-
-  E_backend_type   backend_type;       /* method used by this sytem to query fds */
-  void            *backend_info;       /* backend specific info for this system */
-
-} E_events;
-
-/***************************************************************************************************************************
- ***************************************************************************************************************************/
-
 static int     E_fd_cmp(const void *left, const void *right);
 static int     E_time_watch_ptr_cmp(const void *left, const void *right);
 static int     E_sp_time_cmp(const void *left, const void *right);
 static sp_time E_get_time_low(void);
 
-static int     E_events_init(E_events *events);
-static void    E_events_fini(E_events *events);
-
 static stdbool E_events_is_valid(const E_events *events);
 
-static void    E_events_exit_events(E_events *events);
 static E_fd_watch *E_events_get_fd_watch(E_events *events, int fd, E_watch_type fd_type, stdit *sit);
 
 static sp_time E_events_get_time(E_events *events);
 static sp_time E_events_get_loop_time(E_events *events);
 static sp_time E_events_update_loop_time(E_events *events);
 
-static int     E_events_handle_events(E_events *events);
 static int     E_events_populate_priority_queues(E_events *events, E_priority dispatch_lvl);
 static int     E_events_handle_starvation(E_events *events);
 static int     E_events_dispatch_notice(E_events *events, E_priority dispatch_lvl);
@@ -231,22 +154,6 @@
 static int     E_events_attach_fd(E_events *e, int fd, E_watch_type t, E_fd_callback_fcn f, int code, void *d, E_priority p);
 static void    E_events_cancel_watch(E_events *events, E_watch *watch);
 
-static int     E_events_in_queue(E_events *events,       E_time_callback_fcn fcn, int code, void *data);
-static int     E_events_queue(E_events *events,          E_time_callback_fcn fcn, int code, void *data, sp_time delta_time);
-static int     E_events_queue_absolute(E_events *events, E_time_callback_fcn fcn, int code, void *data, sp_time absolute_time);
-static int     E_events_queue_periodic(E_events *events, E_time_callback_fcn fcn, int code, void *data, sp_time periodic_time);
-static int     E_events_dequeue(E_events *events,        E_time_callback_fcn fcn, int code, void *data);
-static int     E_events_detach_fd(E_events *events, int fd, E_watch_type fd_type);
-static int     E_events_detach_fd_priority(E_events *events, int fd, E_watch_type fd_type, E_priority priority);
-
-static int     E_events_set_active_threshold(E_events *events, E_priority priority);
-static int     E_events_num_active(E_events *events, E_priority priority);
-static int     E_events_activate_fd(E_events *events, int fd, E_watch_type fd_type);
-static int     E_events_deactivate_fd(E_events *events, int fd, E_watch_type fd_type);
-
-static int     E_events_set_elevate_count(E_events *events, E_priority priority, unsigned count);
-static int     E_events_get_elevate_count(E_events *events, E_priority priority, unsigned *count);
-
 static int     E_events_select_init(E_events *events);
 static void    E_events_select_fini(E_events *events);
 static int     E_events_select_activate_watch(E_events *events, E_fd_watch *fd_watch);
@@ -336,165 +243,6 @@
 /***************************************************************************************************************************
  ***************************************************************************************************************************/
 
-static int E_events_init(E_events *events)
-{
-  E_priority_info *info;
-  E_priority        priority;
-  E_backend_type    backend_type;
-  int               ret = 0;
-
-  memset(events, 0, sizeof(*events));
-
-  events->keep_running     = STDFALSE;       /* NOTE: only set in E_events_handle_events; indicates loop is running */
-  events->max_priority     = HIGH_PRIORITY;
-  events->active_threshold = LOW_PRIORITY;
-
-  events->last_loop_time = events->last_clock_time = E_get_time_low();
-
-  if (stdskl_construct(&events->time_watches, sizeof(E_time_watch*), 0, E_time_watch_ptr_cmp) != 0) {
-    ret = E_ALLOC_FAILURE;
-    goto FAIL;
-  }
-
-  if (stdskl_construct(&events->fd_watches, sizeof(int), sizeof(E_fd_watch*), E_fd_cmp) != 0) {
-    ret = E_ALLOC_FAILURE;
-    goto FAIL_TIME_WATCHES;
-  }
-
-  if ((events->priorities = (E_priority_info*) calloc(events->max_priority + 1, sizeof(E_priority_info))) == NULL) {
-    ret = E_ALLOC_FAILURE;
-    goto FAIL_FD_WATCHES;
-  }
-
-  for (priority = E_MIN_PRIORITY; priority <= events->max_priority; ++priority) {
-
-    info           = &events->priorities[priority];
-    info->priority = priority;
-
-    if (stddll_construct(&info->notices, sizeof(E_watch*)) != 0) {
-      ret = E_ALLOC_FAILURE;
-      goto FAIL_PRIORITIES;
-    }
-
-    info->starve_thresh         = 0;
-    info->starve_cnter          = 0;
-    info->num_active_fd_watches = 0;
-    info->backend_lvl_info      = NULL;
-  }
-
-  if (stdskl_construct(&events->timeouts, sizeof(sp_time), sizeof(E_time_watch*), E_sp_time_cmp) != 0) {
-    ret = E_ALLOC_FAILURE;
-    goto FAIL_PRIORITIES;
-  }
-
-  events->backend_type = E_BACKEND_UNSPECIFIED;
-  
-#ifdef HAVE_LINUX
-  backend_type = E_BACKEND_EPOLL;
-#else
-  backend_type = E_BACKEND_SELECT;
-#endif
-
-  switch (backend_type) {
-
-  case E_BACKEND_SELECT:
-    ret = E_events_select_init(events);
-    break;
-
-  case E_BACKEND_EPOLL:
-    ret = E_events_epoll_init(events);
-    break;
-
-  default:
-    assert(0);
-    ret = E_BUG;
-    break;
-  }
-
-  if (ret != 0) {
-    goto FAIL_TIMEOUTS;
-  }
-
-  if (!E_events_is_valid(events)) {
-    assert(0);
-  }
-
-  assert(ret == 0);
-  goto END;
-
-  /* error handling and return */
-
- FAIL_TIMEOUTS:
-  stdskl_destruct(&events->timeouts);
-
- FAIL_PRIORITIES:
-  while (priority-- != E_MIN_PRIORITY) {
-    stddll_destruct(&events->priorities[priority].notices);
-  }
-
-  free(events->priorities);
-
- FAIL_FD_WATCHES:
-  stdskl_destruct(&events->fd_watches);
-
- FAIL_TIME_WATCHES:
-  stdskl_destruct(&events->time_watches);
-
- FAIL:
-  assert(ret != 0);
-
- END:
-  return ret;
-}
-
-/***************************************************************************************************************************
- ***************************************************************************************************************************/
-
-static void E_events_fini(E_events *events)
-{
-  E_priority priority;
-  stdit      sit;
-  
-  switch (events->backend_type) {
-
-  case E_BACKEND_SELECT:
-    E_events_select_fini(events);
-    break;
-
-  case E_BACKEND_EPOLL:
-    E_events_epoll_fini(events);
-    break;
-
-  default:
-    assert(0);
-    break;
-  }
-
-  stdskl_destruct(&events->timeouts);
-
-  for (priority = E_MIN_PRIORITY; priority <= events->max_priority; ++priority) {
-    stddll_destruct(&events->priorities[priority].notices);
-  }
-
-  free(events->priorities);
-
-  for (stdskl_begin(&events->fd_watches, &sit); !stdskl_is_end(&events->fd_watches, &sit); stdskl_it_next(&sit)) {
-    free(*(E_fd_watch**) stdskl_it_val(&sit));
-  }
-
-  stdskl_destruct(&events->fd_watches);
-
-  for (stdskl_begin(&events->time_watches, &sit); !stdskl_is_end(&events->time_watches, &sit); stdskl_it_next(&sit)) {
-    free(*(E_time_watch**) stdskl_it_key(&sit));
-  }
-
-  stdskl_destruct(&events->time_watches);
-  memset(events, 0, sizeof(*events));
-}
-
-/***************************************************************************************************************************
- ***************************************************************************************************************************/
-
 static stdbool E_events_is_valid(const E_events *events)
 {
   return STDTRUE;
@@ -503,14 +251,6 @@
 /***************************************************************************************************************************
  ***************************************************************************************************************************/
 
-static void E_events_exit_events(E_events *events)
-{
-  events->keep_running = STDFALSE;
-}
-
-/***************************************************************************************************************************
- ***************************************************************************************************************************/
-
 static E_fd_watch *E_events_get_fd_watch(E_events    *events,
                                          int          fd,
                                          E_watch_type fd_type,
@@ -561,89 +301,6 @@
 /***************************************************************************************************************************
  ***************************************************************************************************************************/
 
-static int E_events_handle_events(E_events *events)
-{
-  int        ret          = 0;
-  E_priority dispatch_lvl = events->max_priority;
-  E_priority starve_lvl;
-
-  events->keep_running = STDTRUE;
-
-  do {
-
-    /* efficiently query watches for new notices */
-
-    if ((ret = E_events_populate_priority_queues(events, dispatch_lvl)) != 0) {
-      goto FAIL;
-    }
-
-    /* handle any "left over" starvation from previous iterations */
-
-    if ((ret = E_events_handle_starvation(events)) != 0) {
-
-      if (ret == E_REPOPULATE) {  /* need to repopulate priority queues to ensure we aren't starving empty levels */
-        ret = 0;
-        continue;                 /* NOTE: should only happen here if in a starved callback the user lowered active_threshold */
-      }
-
-      goto FAIL;
-    }
-
-    /* find the highest non-empty priority */
-
-    for (dispatch_lvl = events->max_priority; 
-         dispatch_lvl >= events->active_threshold && stddll_empty(&events->priorities[dispatch_lvl].notices); 
-         --dispatch_lvl);
-
-    /* dispatch some notices from dispatch_lvl */
-
-    while (events->keep_running && dispatch_lvl >= events->active_threshold && !stddll_empty(&events->priorities[dispatch_lvl].notices)) {
-
-      if ((ret = E_events_dispatch_notice(events, dispatch_lvl)) != 0) {
-        goto FAIL;
-      }
-
-      /* increment starvation counts for other priorities */
-
-      for (starve_lvl = events->max_priority; starve_lvl >= events->active_threshold; --starve_lvl) {
-
-        if (starve_lvl != dispatch_lvl && 
-            events->priorities[starve_lvl].starve_thresh != 0 &&
-            events->priorities[starve_lvl].num_active_fd_watches != 0) {
-          ++events->priorities[starve_lvl].starve_cnter;
-        }
-      }
-
-      /* handle any starvation */
-
-      if ((ret = E_events_handle_starvation(events)) != 0) {
-
-        if (ret == E_REPOPULATE) {  /* need to repopulate priority queues to ensure we aren't starving empty levels */
-          ret = 0;
-          continue;
-        }
-
-        goto FAIL;
-      }
-    }
-
-  } while (events->keep_running);
-
-  assert(ret == 0);
-  goto END;
-
-  /* error handling and return */
-
- FAIL:
-  assert(ret != 0 && ret != E_REPOPULATE);
-
- END:
-  return ret;
-}
-
-/***************************************************************************************************************************
- ***************************************************************************************************************************/
-
 static int E_events_populate_priority_queues(E_events *events, 
                                              E_priority dispatch_lvl)
 {
@@ -960,8 +617,8 @@
 {
   E_time_watch *time_watch   = NULL;
   E_time_watch *tmp_watch    = NULL;
-  sp_time        next_timeout = { 0, 0 };
-  int            ret          = 0;
+  sp_time       next_timeout = { 0, 0 };
+  int           ret          = 0;
 
   /* validate input */
 
@@ -1062,116 +719,6 @@
 /***************************************************************************************************************************
  ***************************************************************************************************************************/
 
-static int E_events_attach_fd(E_events *       events,
-                              int               fd,
-                              E_watch_type      fd_type,
-                              E_fd_callback_fcn fcn,
-                              int               code,
-                              void *           data,
-                              int               priority)
-{
-  E_fd_watch *fd_watch;
-  E_fd_watch *tmp_watch;
-  int          ret = 0;
-
-  /* validate input */
-
-  if (fd < 0 ||
-      priority > events->max_priority ||
-      priority < E_MIN_PRIORITY || 
-      fd < 0) {
-    ret = E_INVALID_PARAM;
-    goto FAIL;
-  }
-
-  switch (fd_type) {
-
-  case READ_FD:
-  case WRITE_FD:
-  case EXCEPT_FD:
-    break;
-
-  default:
-    ret = E_INVALID_PARAM;
-    goto FAIL;
-  }
-
-  /* allocate and initialize new fd_watch */
-
-  if ((fd_watch = (E_fd_watch*) calloc(1, sizeof(E_fd_watch))) == NULL) {
-    ret = E_ALLOC_FAILURE;
-    goto FAIL;
-  }
-
-  fd_watch->w.watch_type    = fd_type;
-  fd_watch->w.priority      = priority;
-  fd_watch->w.callback_fcn  = fcn;
-  fd_watch->w.callback_code = code;
-  fd_watch->w.callback_data = data;
-  fd_watch->w.noticed       = STDFALSE;
-  fd_watch->fd              = fd;
-  fd_watch->active          = STDFALSE;  /* updated below */
-
-  /* find insertion point and cancel any previously existing watch with same parameters */
-
-  if ((tmp_watch = E_events_get_fd_watch(events, fd, fd_type, &fd_watch->fd_watches_it)) != NULL) {
-    stdskl_it_next(&fd_watch->fd_watches_it);            /* NOTE: advance iterator here so it remains valid after cancellation */
-    E_events_cancel_watch(events, (E_watch*) tmp_watch);
-  }
-
-  /* insert into events->fd_watches using events->fd_watches_it as a hint */
-
-  if (stdskl_insert(&events->fd_watches, &fd_watch->fd_watches_it, &fd, &fd_watch, STDTRUE) != 0) { 
-    ret = E_ALLOC_FAILURE;
-    goto FAIL_FD_WATCH;
-  }
-
-  /* insert into backend specific monitoring mechanism */
-
-  switch (events->backend_type) {
-
-  case E_BACKEND_SELECT:
-    ret = E_events_select_activate_watch(events, fd_watch);
-    break;
-
-  case E_BACKEND_EPOLL:
-    ret = E_events_epoll_activate_watch(events, fd_watch);
-    break;
-
-  default:
-    assert(0);
-    ret = E_BUG;
-    break;
-  }
-
-  if (ret != 0) {
-    goto FAIL_FD_WATCHES;
-  }
-
-  fd_watch->active = STDTRUE;
-  ++events->priorities[priority].num_active_fd_watches;
-
-  assert(ret == 0);
-  goto END;
-
-  /* error handling and return */
-
- FAIL_FD_WATCHES:
-  stdskl_erase(&events->fd_watches, &fd_watch->fd_watches_it);
-
- FAIL_FD_WATCH:
-  free(fd_watch);
-
- FAIL:
-  assert(ret != 0);
-
- END:
-  return ret;
-}
-
-/***************************************************************************************************************************
- ***************************************************************************************************************************/
-
 static void E_events_cancel_watch(E_events *events, 
                                   E_watch * watch)
 {
@@ -1245,293 +792,13 @@
 /***************************************************************************************************************************
  ***************************************************************************************************************************/
 
-static int E_events_in_queue(E_events           *events,
-			     E_time_callback_fcn fcn,
-			     int                 code,
-			     void               *data)
-{
-  E_time_watch  time_watch_impl;
-  E_time_watch *time_watch = &time_watch_impl;
-  stdit         sit;
-
-  /* look up any existing time watch and cancel it */
-
-  time_watch->w.callback_fcn  = fcn;
-  time_watch->w.callback_code = code;
-  time_watch->w.callback_data = data;
-
-  return !stdskl_is_end(&events->time_watches, stdskl_find(&events->time_watches, &sit, &time_watch));
-}
-
-/***************************************************************************************************************************
- ***************************************************************************************************************************/
-
-static int E_events_queue(E_events           *events,
-                          E_time_callback_fcn fcn,
-                          int                 code, 
-                          void               *data,
-                          sp_time             delta_time)
-{
-  return E_events_create_timer(events, delta_time, E_DELTA_TIMER, fcn, code, data, LOW_PRIORITY);
-}
-
-/***************************************************************************************************************************
- ***************************************************************************************************************************/
-
-static int E_events_queue_absolute(E_events           *events,
-                                   E_time_callback_fcn fcn,
-                                   int                 code,
-                                   void               *data,
-                                   sp_time             absolute_time)
-{
-  return E_events_create_timer(events, absolute_time, E_ABSOLUTE_TIMER, fcn, code, data, LOW_PRIORITY);
-}
-
-/***************************************************************************************************************************
- ***************************************************************************************************************************/
-
-static int E_events_queue_periodic(E_events           *events,
-                                   E_time_callback_fcn fcn,
-                                   int                 code,
-                                   void               *data,
-                                   sp_time             periodic_time)
-{
-  return E_events_create_timer(events, periodic_time, E_PERIODIC_TIMER, fcn, code, data, LOW_PRIORITY);
-}
-
-/***************************************************************************************************************************
- ***************************************************************************************************************************/
-
-static int E_events_dequeue(E_events           *events, 
-                            E_time_callback_fcn fcn,
-                            int                 code,
-                            void               *data)
-{
-  int           ret = -1;                       /* NOTE: backwards compatability */
-  E_time_watch  time_watch_impl;
-  E_time_watch *time_watch = &time_watch_impl;
-  stdit         sit;
-
-  /* look up any existing time watch and cancel it */
-
-  time_watch->w.callback_fcn  = fcn;
-  time_watch->w.callback_code = code;
-  time_watch->w.callback_data = data;
-
-  if (!stdskl_is_end(&events->time_watches, stdskl_find(&events->time_watches, &sit, &time_watch))) {
-    E_events_cancel_watch(events, *(E_watch**) stdskl_it_key(&sit));
-    ret = 0;
-  }
-
-  return ret;
-}
-
-/***************************************************************************************************************************
- ***************************************************************************************************************************/
-
-static int E_events_detach_fd(E_events *  events, 
-                              int          fd, 
-                              E_watch_type fd_type)
-{
-  int         ret = -1;  /* NOTE: backwards compatability */
-  E_fd_watch *fd_watch;
-  stdit       sit;
-
-  /* cancel any previously existing watch */
-
-  if ((fd_watch = E_events_get_fd_watch(events, fd, fd_type, &sit)) != NULL) {
-    E_events_cancel_watch(events, (E_watch*) fd_watch);
-    ret = 0;
-  }
-
-  return ret;
-
-#if 0
-  /* TODO: implement specific priority levels? */
-
-  int        ret = -1;  /* NOTE: backwards compatability */
-  E_priority priority;
-
-  /* cancel any previously existing watch */
-
-  for (priority = E_MIN_PRIORITY; priority <= events->max_priority; ++priority) {
-
-    if (E_events_detach_fd(events, fd, fd_type, priority) == 0) {
-      ret = 0;
-    }
-  }
-
-  return ret;
-#endif
-}
-
-/***************************************************************************************************************************
- ***************************************************************************************************************************/
-
-static int E_events_detach_fd_priority(E_events    *events, 
-				       int          fd, 
-				       E_watch_type fd_type,
-				       E_priority   priority)
-{
-  return E_events_detach_fd(events, fd, fd_type);  /* TODO: implement specific priority lvls? */
-}
-
-/***************************************************************************************************************************
- ***************************************************************************************************************************/
-
-static int E_events_set_active_threshold(E_events *events,
-                                         E_priority priority)
-{
-  if (priority < E_MIN_PRIORITY || priority > events->max_priority) {
-    return -1;      /* backwards compatability */
-  }
-
-  events->active_threshold = priority;
-
-  return priority;  /* backwards compatability */
-}
-
-/***************************************************************************************************************************
- ***************************************************************************************************************************/
-
-static int E_events_num_active(E_events *events,
-                               E_priority priority)
-{
-  if (priority < E_MIN_PRIORITY || priority > events->max_priority) {
-    return -1;  /* backwards compatability */
-  }
-
-  return events->priorities[priority].num_active_fd_watches;
-}
-
-/***************************************************************************************************************************
- ***************************************************************************************************************************/
-
-static int E_events_activate_fd(E_events *  events,
-                                int          fd, 
-                                E_watch_type fd_type)
-{
-  E_fd_watch *fd_watch;
-  stdit        sit;
-  int          ret = -1;  /* backwards compatability */
-
-  if ((fd_watch = E_events_get_fd_watch(events, fd, fd_type, &sit)) != NULL) {
-
-    ret = 0;
-
-    if (!fd_watch->active) {
-      
-      switch (events->backend_type) {
-        
-      case E_BACKEND_SELECT:
-        ret = E_events_select_activate_watch(events, fd_watch);
-        break;
-        
-      case E_BACKEND_EPOLL:
-        ret = E_events_epoll_activate_watch(events, fd_watch);
-        break;
-        
-      default:
-        assert(0);
-        ret = E_BUG;
-        break;
-      }
-
-      if (ret == 0) {
-        fd_watch->active = STDTRUE;
-        ++events->priorities[fd_watch->w.priority].num_active_fd_watches;
-      }
-    }
-  }
-
-  return ret;
-}
-
-/***************************************************************************************************************************
- ***************************************************************************************************************************/
-
-static int E_events_deactivate_fd(E_events *  events,
-                                  int          fd, 
-                                  E_watch_type fd_type)
-{
-  E_fd_watch *fd_watch;
-  stdit        sit;
-  int          ret = -1;  /* backwards compatability */
-
-  if ((fd_watch = E_events_get_fd_watch(events, fd, fd_type, &sit)) != NULL) {
-
-    ret = 0;
-
-    if (fd_watch->active) {
-      
-      switch (events->backend_type) {
-        
-      case E_BACKEND_SELECT:
-        E_events_select_deactivate_watch(events, fd_watch);  /* TODO: should we look at return value or not? */
-        break;
-        
-      case E_BACKEND_EPOLL:
-        E_events_epoll_deactivate_watch(events, fd_watch);   /* TODO: should we look at return value or not? */
-        break;
-        
-      default:
-        assert(0);
-        ret = E_BUG;
-        break;
-      }
-
-      --events->priorities[fd_watch->w.priority].num_active_fd_watches;
-      fd_watch->active = STDFALSE;
-    }
-  }
-
-  return ret;
-}
-
-/***************************************************************************************************************************
- ***************************************************************************************************************************/
-
-static int E_events_set_elevate_count(E_events *events,
-                                      E_priority priority, 
-                                      unsigned   count)
-{
-  if (priority < E_MIN_PRIORITY || priority > events->max_priority) {
-    return E_INVALID_PARAM;
-  }
-
-  if ((events->priorities[priority].starve_thresh = count) == 0) {
-    events->priorities[priority].starve_cnter = 0;
-  }
-
-  return 0;
-}
-
-/***************************************************************************************************************************
- ***************************************************************************************************************************/
-
-static int E_events_get_elevate_count(E_events *events,
-                                      E_priority priority,
-                                      unsigned *count)
-{
-  if (priority < E_MIN_PRIORITY || priority > events->max_priority) {
-    return E_INVALID_PARAM;
-  }
-
-  *count = events->priorities[priority].starve_thresh;
-
-  return 0;
-}
-
-/***************************************************************************************************************************
- ***************************************************************************************************************************/
-
 static int E_events_select_init(E_events *events)
 {
-  int                 priority;
-  E_select_info *    info;
+  int                priority;
+  E_select_info     *info;
   E_select_lvl_info *lvl_info;
-  E_watch_type        fd_type;
-  int                 ret = 0;
+  E_watch_type       fd_type;
+  int                ret = 0;
 
   assert(events->backend_type == E_BACKEND_UNSPECIFIED);
 
@@ -1604,10 +871,10 @@
 
 static void E_events_select_fini(E_events *events)
 {
-  E_priority          priority;
-  E_select_info *    info;
+  E_priority         priority;
+  E_select_info     *info;
   E_select_lvl_info *lvl_info;
-  E_watch_type        fd_type;
+  E_watch_type       fd_type;
 
   assert(events->backend_type == E_BACKEND_SELECT && events->backend_info != NULL);
 
@@ -1637,10 +904,10 @@
 static int E_events_select_activate_watch(E_events *  events, 
                                           E_fd_watch *fd_watch)
 {
-  E_priority          lvl;
+  E_priority         lvl;
   E_select_lvl_info *lvl_info;
-  stdit               sit;
-  int                 ret = 0;
+  stdit              sit;
+  int                ret = 0;
 
   assert(fd_watch == E_events_get_fd_watch(events, fd_watch->fd, fd_watch->w.watch_type, &sit));
   assert(fd_watch->w.priority >= E_MIN_PRIORITY && fd_watch->w.priority <= events->max_priority);
@@ -1685,10 +952,10 @@
 static int E_events_select_deactivate_watch(E_events *  events, 
                                             E_fd_watch *fd_watch)
 {
-  E_priority          lvl;
+  E_priority         lvl;
   E_select_lvl_info *lvl_info;
 #ifndef NDEBUG
-  stdit               sit;
+  stdit              sit;
 #endif
 
   assert(fd_watch == E_events_get_fd_watch(events, fd_watch->fd, fd_watch->w.watch_type, &sit));
@@ -1725,22 +992,22 @@
 /***************************************************************************************************************************
  ***************************************************************************************************************************/
 
-static int E_events_select_query(E_events *     events, 
-                                 E_priority      query_lvl, 
+static int E_events_select_query(E_events      *events, 
+                                 E_priority     query_lvl, 
                                  const sp_time *delta_timeout, 
-                                 E_priority *   queried_lvl)
+                                 E_priority    *queried_lvl)
 {
-  int                 ret         = 0;
-  E_select_info *    info        = (E_select_info*) events->backend_info;
+  int                ret         = 0;
+  E_select_info     *info        = (E_select_info*) events->backend_info;
   E_select_lvl_info *lvl_info    = (E_select_lvl_info*) events->priorities[query_lvl].backend_lvl_info;
-  int                 nfds        = 0;
-  fd_set *           fd_sets[3]  = { NULL, NULL, NULL };
-  struct timeval      timeout_dmy = { 0, 0 };
-  struct timeval *   timeout     = &timeout_dmy;
-  E_watch_type        fd_type;
-  E_fd_watch *       fd_watch;
-  int                 tmp_fd;
-  stdit               sit;
+  int                nfds        = 0;
+  fd_set            *fd_sets[3]  = { NULL, NULL, NULL };
+  struct timeval     timeout_dmy = { 0, 0 };
+  struct timeval    *timeout     = &timeout_dmy;
+  E_watch_type       fd_type;
+  E_fd_watch        *fd_watch;
+  int                tmp_fd;
+  stdit              sit;
 
   /* set up select's inputs: nfds, fd_sets, timeout */
 
@@ -1856,10 +1123,10 @@
 static int E_events_epoll_init(E_events *events)
 #ifdef HAVE_LINUX
 {
-  int                priority;
-  E_epoll_info *    info;
+  int               priority;
+  E_epoll_info     *info;
   E_epoll_lvl_info *lvl_info;
-  int                ret = 0;
+  int               ret = 0;
 
   assert(events->backend_type == E_BACKEND_UNSPECIFIED);
   assert(E_EPOLL_INITIAL_SIZE > 0);
@@ -1963,8 +1230,8 @@
 static void E_events_epoll_fini(E_events *events)
 #ifdef HAVE_LINUX
 {
-  E_priority         priority;
-  E_epoll_info *    info;
+  E_priority        priority;
+  E_epoll_info     *info;
   E_epoll_lvl_info *lvl_info;
 
   assert(events->backend_type == E_BACKEND_EPOLL && events->backend_info != NULL);
@@ -1995,13 +1262,13 @@
 /***************************************************************************************************************************
  ***************************************************************************************************************************/
 
-static int E_events_epoll_activate_watch(E_events *  events,
+static int E_events_epoll_activate_watch(E_events   *events,
                                          E_fd_watch *fd_watch)
 #ifdef HAVE_LINUX
 {
   E_priority         lvl;
-  E_epoll_lvl_info *lvl_info;
-  E_fd_watch *      tmp_watch;
+  E_epoll_lvl_info  *lvl_info;
+  E_fd_watch        *tmp_watch;
   int                num_events;
   struct epoll_event epoll_evnt;
   struct epoll_event epoll_evnt_cpy;
@@ -2168,13 +1435,13 @@
 /***************************************************************************************************************************
  ***************************************************************************************************************************/
 
-static int E_events_epoll_deactivate_watch(E_events *  events,
+static int E_events_epoll_deactivate_watch(E_events   *events,
                                            E_fd_watch *fd_watch)
 #ifdef HAVE_LINUX
 {
   E_priority         lvl;
-  E_epoll_lvl_info *lvl_info;
-  E_fd_watch *      tmp_watch;
+  E_epoll_lvl_info  *lvl_info;
+  E_fd_watch        *tmp_watch;
   int                num_events;
   struct epoll_event epoll_evnt;
   int                epoll_op;
@@ -2292,21 +1559,21 @@
 /***************************************************************************************************************************
  ***************************************************************************************************************************/
 
-static int E_events_epoll_query(E_events *     events,
-                                E_priority      query_lvl, 
+static int E_events_epoll_query(E_events      *events,
+                                E_priority     query_lvl, 
                                 const sp_time *delta_timeout,
-                                E_priority *   queried_lvl)
+                                E_priority    *queried_lvl)
 #ifdef HAVE_LINUX
 {
-  E_epoll_info *      info        = (E_epoll_info*) events->backend_info;
-  E_epoll_lvl_info *  lvl_info    = (E_epoll_lvl_info*) events->priorities[query_lvl].backend_lvl_info;
-  double               tmp_timeout;
-  int                  timeout;
+  E_epoll_info       *info     = (E_epoll_info*) events->backend_info;
+  E_epoll_lvl_info   *lvl_info = (E_epoll_lvl_info*) events->priorities[query_lvl].backend_lvl_info;
+  double              tmp_timeout;
+  int                 timeout;
   struct epoll_event *evnt;
   struct epoll_event *evnts_end;
-  E_fd_watch *        fd_watch;
-  stdit                sit;
-  int                  ret         = 0;
+  E_fd_watch         *fd_watch;
+  stdit               sit;
+  int                 ret      = 0;
 
   /* convert the timeout to epoll's input */
 
@@ -2456,20 +1723,13 @@
 }
 
 /***************************************************************************************************************************
- * NOTE: Bad implementation kept for backwards compatability -- should allow for negative times
  ***************************************************************************************************************************/
 
-sp_time E_sub_time(sp_time t, 
-                   sp_time delta_t)
+sp_time E_neg_time(sp_time t)
 {
-  t.sec  -= delta_t.sec;
-  t.usec -= delta_t.usec;
+  t.sec  = -t.sec;
+  t.usec = -t.usec;
 
-  if (t.usec < 0) {
-    t.usec += 1000000;
-    --t.sec;
-  } 
-
   return t;
 }
 
@@ -2485,14 +1745,48 @@
   if (t.usec > 1000000) {
     t.usec -= 1000000;
     ++t.sec;
+
+  } else if (t.usec < 1000000) {
+    t.usec += 1000000;
+    --t.sec;
   }
 
+  assert(-1000000 < t.usec && t.usec < 1000000);
+
+  if (t.sec > 0) {
+
+    if (t.usec < 0) {
+      t.usec += 1000000;
+      --t.sec;
+    }
+
+    assert(0 <= t.usec && t.usec < 1000000);
+
+  } else if (t.sec < 0) {
+
+    if (t.usec > 0) {
+      t.usec -= 1000000;
+      ++t.sec;
+    }
+
+    assert(-1000000 < t.usec && t.usec <= 0);
+  }
+
   return t;
 }
 
 /***************************************************************************************************************************
  ***************************************************************************************************************************/
 
+sp_time E_sub_time(sp_time t, 
+                   sp_time delta_t)
+{
+  return E_add_time(t, E_neg_time(delta_t));
+}
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
 int E_compare_time(sp_time t1, 
                    sp_time t2)
 {
@@ -2544,22 +1838,6 @@
 /***************************************************************************************************************************
  ***************************************************************************************************************************/
 
-sp_time E_get_loop_time(void)
-{
-  return E_events_get_loop_time(&Events);
-}
-
-/***************************************************************************************************************************
- ***************************************************************************************************************************/
-
-sp_time E_update_loop_time(void)
-{
-  return E_events_update_loop_time(&Events);
-}
-
-/***************************************************************************************************************************
- ***************************************************************************************************************************/
-
 int E_in_queue(E_time_callback_fcn fcn, int code, void *data)
 {
   return E_events_in_queue(&Events, fcn, code, data);
@@ -2687,3 +1965,643 @@
 
 /***************************************************************************************************************************
  ***************************************************************************************************************************/
+
+int E_events_init(E_events *events)
+{
+  E_priority_info *info;
+  E_priority       priority;
+  E_backend_type   backend_type;
+  int              ret = 0;
+
+  memset(events, 0, sizeof(*events));
+
+  events->keep_running     = STDFALSE;       /* NOTE: only set in E_events_handle_events; indicates loop is running */
+  events->max_priority     = HIGH_PRIORITY;
+  events->active_threshold = LOW_PRIORITY;
+
+  events->last_loop_time = events->last_clock_time = E_get_time_low();
+
+  if (stdskl_construct(&events->time_watches, sizeof(E_time_watch*), 0, E_time_watch_ptr_cmp) != 0) {
+    ret = E_ALLOC_FAILURE;
+    goto FAIL;
+  }
+
+  if (stdskl_construct(&events->fd_watches, sizeof(int), sizeof(E_fd_watch*), E_fd_cmp) != 0) {
+    ret = E_ALLOC_FAILURE;
+    goto FAIL_TIME_WATCHES;
+  }
+
+  if ((events->priorities = (E_priority_info*) calloc(events->max_priority + 1, sizeof(E_priority_info))) == NULL) {
+    ret = E_ALLOC_FAILURE;
+    goto FAIL_FD_WATCHES;
+  }
+
+  for (priority = E_MIN_PRIORITY; priority <= events->max_priority; ++priority) {
+
+    info           = &events->priorities[priority];
+    info->priority = priority;
+
+    if (stddll_construct(&info->notices, sizeof(E_watch*)) != 0) {
+      ret = E_ALLOC_FAILURE;
+      goto FAIL_PRIORITIES;
+    }
+
+    info->starve_thresh         = 0;
+    info->starve_cnter          = 0;
+    info->num_active_fd_watches = 0;
+    info->backend_lvl_info      = NULL;
+  }
+
+  if (stdskl_construct(&events->timeouts, sizeof(sp_time), sizeof(E_time_watch*), E_sp_time_cmp) != 0) {
+    ret = E_ALLOC_FAILURE;
+    goto FAIL_PRIORITIES;
+  }
+
+  events->backend_type = E_BACKEND_UNSPECIFIED;
+  
+#ifdef HAVE_LINUX
+  backend_type = E_BACKEND_EPOLL;
+#else
+  backend_type = E_BACKEND_SELECT;
+#endif
+
+  switch (backend_type) {
+
+  case E_BACKEND_SELECT:
+    ret = E_events_select_init(events);
+    break;
+
+  case E_BACKEND_EPOLL:
+    ret = E_events_epoll_init(events);
+    break;
+
+  default:
+    assert(0);
+    ret = E_BUG;
+    break;
+  }
+
+  if (ret != 0) {
+    goto FAIL_TIMEOUTS;
+  }
+
+  if (!E_events_is_valid(events)) {
+    assert(0);
+  }
+
+  assert(ret == 0);
+  goto END;
+
+  /* error handling and return */
+
+ FAIL_TIMEOUTS:
+  stdskl_destruct(&events->timeouts);
+
+ FAIL_PRIORITIES:
+  while (priority-- != E_MIN_PRIORITY) {
+    stddll_destruct(&events->priorities[priority].notices);
+  }
+
+  free(events->priorities);
+
+ FAIL_FD_WATCHES:
+  stdskl_destruct(&events->fd_watches);
+
+ FAIL_TIME_WATCHES:
+  stdskl_destruct(&events->time_watches);
+
+ FAIL:
+  assert(ret != 0);
+
+ END:
+  return ret;
+}
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
+void E_events_fini(E_events *events)
+{
+  E_priority priority;
+  stdit      sit;
+  
+  switch (events->backend_type) {
+
+  case E_BACKEND_SELECT:
+    E_events_select_fini(events);
+    break;
+
+  case E_BACKEND_EPOLL:
+    E_events_epoll_fini(events);
+    break;
+
+  default:
+    assert(0);
+    break;
+  }
+
+  stdskl_destruct(&events->timeouts);
+
+  for (priority = E_MIN_PRIORITY; priority <= events->max_priority; ++priority) {
+    stddll_destruct(&events->priorities[priority].notices);
+  }
+
+  free(events->priorities);
+
+  for (stdskl_begin(&events->fd_watches, &sit); !stdskl_is_end(&events->fd_watches, &sit); stdskl_it_next(&sit)) {
+    free(*(E_fd_watch**) stdskl_it_val(&sit));
+  }
+
+  stdskl_destruct(&events->fd_watches);
+
+  for (stdskl_begin(&events->time_watches, &sit); !stdskl_is_end(&events->time_watches, &sit); stdskl_it_next(&sit)) {
+    free(*(E_time_watch**) stdskl_it_key(&sit));
+  }
+
+  stdskl_destruct(&events->time_watches);
+  memset(events, 0, sizeof(*events));
+}
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
+int E_events_in_queue(E_events           *events,
+		      E_time_callback_fcn fcn,
+		      int                 code,
+		      void               *data)
+{
+  E_time_watch  time_watch_impl;
+  E_time_watch *time_watch = &time_watch_impl;
+  stdit         sit;
+
+  /* look up any existing time watch and cancel it */
+
+  time_watch->w.callback_fcn  = fcn;
+  time_watch->w.callback_code = code;
+  time_watch->w.callback_data = data;
+
+  return !stdskl_is_end(&events->time_watches, stdskl_find(&events->time_watches, &sit, &time_watch));
+}
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
+int E_events_queue(E_events           *events,
+		   E_time_callback_fcn fcn,
+		   int                 code, 
+		   void               *data,
+		   sp_time             delta_time)
+{
+  return E_events_create_timer(events, delta_time, E_DELTA_TIMER, fcn, code, data, LOW_PRIORITY);
+}
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
+int E_events_queue_absolute(E_events           *events,
+			    E_time_callback_fcn fcn,
+			    int                 code,
+			    void               *data,
+			    sp_time             absolute_time)
+{
+  return E_events_create_timer(events, absolute_time, E_ABSOLUTE_TIMER, fcn, code, data, LOW_PRIORITY);
+}
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
+int E_events_queue_periodic(E_events           *events,
+			    E_time_callback_fcn fcn,
+			    int                 code,
+			    void               *data,
+			    sp_time             periodic_time)
+{
+  return E_events_create_timer(events, periodic_time, E_PERIODIC_TIMER, fcn, code, data, LOW_PRIORITY);
+}
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
+int E_events_dequeue(E_events           *events, 
+		     E_time_callback_fcn fcn,
+		     int                 code,
+		     void               *data)
+{
+  int           ret = -1;                       /* NOTE: backwards compatability */
+  E_time_watch  time_watch_impl;
+  E_time_watch *time_watch = &time_watch_impl;
+  stdit         sit;
+
+  /* look up any existing time watch and cancel it */
+
+  time_watch->w.callback_fcn  = fcn;
+  time_watch->w.callback_code = code;
+  time_watch->w.callback_data = data;
+
+  if (!stdskl_is_end(&events->time_watches, stdskl_find(&events->time_watches, &sit, &time_watch))) {
+    E_events_cancel_watch(events, *(E_watch**) stdskl_it_key(&sit));
+    ret = 0;
+  }
+
+  return ret;
+}
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
+int E_events_attach_fd(E_events         *events,
+		       int               fd,
+		       E_watch_type      fd_type,
+		       E_fd_callback_fcn fcn,
+		       int               code,
+		       void             *data,
+		       int               priority)
+{
+  E_fd_watch *fd_watch;
+  E_fd_watch *tmp_watch;
+  int         ret = 0;
+
+  /* validate input */
+
+  if (fd < 0 ||
+      priority > events->max_priority ||
+      priority < E_MIN_PRIORITY || 
+      fd < 0) {
+    ret = E_INVALID_PARAM;
+    goto FAIL;
+  }
+
+  switch (fd_type) {
+
+  case READ_FD:
+  case WRITE_FD:
+  case EXCEPT_FD:
+    break;
+
+  default:
+    ret = E_INVALID_PARAM;
+    goto FAIL;
+  }
+
+  /* allocate and initialize new fd_watch */
+
+  if ((fd_watch = (E_fd_watch*) calloc(1, sizeof(E_fd_watch))) == NULL) {
+    ret = E_ALLOC_FAILURE;
+    goto FAIL;
+  }
+
+  fd_watch->w.watch_type    = fd_type;
+  fd_watch->w.priority      = priority;
+  fd_watch->w.callback_fcn  = fcn;
+  fd_watch->w.callback_code = code;
+  fd_watch->w.callback_data = data;
+  fd_watch->w.noticed       = STDFALSE;
+  fd_watch->fd              = fd;
+  fd_watch->active          = STDFALSE;  /* updated below */
+
+  /* find insertion point and cancel any previously existing watch with same parameters */
+
+  if ((tmp_watch = E_events_get_fd_watch(events, fd, fd_type, &fd_watch->fd_watches_it)) != NULL) {
+    stdskl_it_next(&fd_watch->fd_watches_it);            /* NOTE: advance iterator here so it remains valid after cancellation */
+    E_events_cancel_watch(events, (E_watch*) tmp_watch);
+  }
+
+  /* insert into events->fd_watches using events->fd_watches_it as a hint */
+
+  if (stdskl_insert(&events->fd_watches, &fd_watch->fd_watches_it, &fd, &fd_watch, STDTRUE) != 0) { 
+    ret = E_ALLOC_FAILURE;
+    goto FAIL_FD_WATCH;
+  }
+
+  /* insert into backend specific monitoring mechanism */
+
+  switch (events->backend_type) {
+
+  case E_BACKEND_SELECT:
+    ret = E_events_select_activate_watch(events, fd_watch);
+    break;
+
+  case E_BACKEND_EPOLL:
+    ret = E_events_epoll_activate_watch(events, fd_watch);
+    break;
+
+  default:
+    assert(0);
+    ret = E_BUG;
+    break;
+  }
+
+  if (ret != 0) {
+    goto FAIL_FD_WATCHES;
+  }
+
+  fd_watch->active = STDTRUE;
+  ++events->priorities[priority].num_active_fd_watches;
+
+  assert(ret == 0);
+  goto END;
+
+  /* error handling and return */
+
+ FAIL_FD_WATCHES:
+  stdskl_erase(&events->fd_watches, &fd_watch->fd_watches_it);
+
+ FAIL_FD_WATCH:
+  free(fd_watch);
+
+ FAIL:
+  assert(ret != 0);
+
+ END:
+  return ret;
+}
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
+int E_events_detach_fd(E_events    *events, 
+		       int          fd, 
+		       E_watch_type fd_type)
+{
+  int         ret = -1;  /* NOTE: backwards compatability */
+  E_fd_watch *fd_watch;
+  stdit       sit;
+
+  /* cancel any previously existing watch */
+
+  if ((fd_watch = E_events_get_fd_watch(events, fd, fd_type, &sit)) != NULL) {
+    E_events_cancel_watch(events, (E_watch*) fd_watch);
+    ret = 0;
+  }
+
+  return ret;
+
+#if 0
+  /* TODO: implement specific priority levels? */
+
+  int        ret = -1;  /* NOTE: backwards compatability */
+  E_priority priority;
+
+  /* cancel any previously existing watch */
+
+  for (priority = E_MIN_PRIORITY; priority <= events->max_priority; ++priority) {
+
+    if (E_events_detach_fd(events, fd, fd_type, priority) == 0) {
+      ret = 0;
+    }
+  }
+
+  return ret;
+#endif
+}
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
+int E_events_detach_fd_priority(E_events    *events, 
+				int          fd, 
+				E_watch_type fd_type,
+				E_priority   priority)
+{
+  return E_events_detach_fd(events, fd, fd_type);  /* TODO: implement specific priority lvls? */
+}
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
+int E_events_set_active_threshold(E_events  *events,
+				  E_priority priority)
+{
+  if (priority < E_MIN_PRIORITY || priority > events->max_priority) {
+    return -1;      /* backwards compatability */
+  }
+
+  events->active_threshold = priority;
+
+  return priority;  /* backwards compatability */
+}
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
+int E_events_num_active(E_events  *events,
+			E_priority priority)
+{
+  if (priority < E_MIN_PRIORITY || priority > events->max_priority) {
+    return -1;  /* backwards compatability */
+  }
+
+  return events->priorities[priority].num_active_fd_watches;
+}
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
+int E_events_activate_fd(E_events    *events,
+			 int          fd, 
+			 E_watch_type fd_type)
+{
+  E_fd_watch *fd_watch;
+  stdit       sit;
+  int         ret = -1;  /* backwards compatability */
+
+  if ((fd_watch = E_events_get_fd_watch(events, fd, fd_type, &sit)) != NULL) {
+
+    ret = 0;
+
+    if (!fd_watch->active) {
+      
+      switch (events->backend_type) {
+        
+      case E_BACKEND_SELECT:
+        ret = E_events_select_activate_watch(events, fd_watch);
+        break;
+        
+      case E_BACKEND_EPOLL:
+        ret = E_events_epoll_activate_watch(events, fd_watch);
+        break;
+        
+      default:
+        assert(0);
+        ret = E_BUG;
+        break;
+      }
+
+      if (ret == 0) {
+        fd_watch->active = STDTRUE;
+        ++events->priorities[fd_watch->w.priority].num_active_fd_watches;
+      }
+    }
+  }
+
+  return ret;
+}
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
+int E_events_deactivate_fd(E_events    *events,
+			   int          fd, 
+			   E_watch_type fd_type)
+{
+  E_fd_watch *fd_watch;
+  stdit       sit;
+  int         ret = -1;  /* backwards compatability */
+
+  if ((fd_watch = E_events_get_fd_watch(events, fd, fd_type, &sit)) != NULL) {
+
+    ret = 0;
+
+    if (fd_watch->active) {
+      
+      switch (events->backend_type) {
+        
+      case E_BACKEND_SELECT:
+        E_events_select_deactivate_watch(events, fd_watch);  /* TODO: should we look at return value or not? */
+        break;
+        
+      case E_BACKEND_EPOLL:
+        E_events_epoll_deactivate_watch(events, fd_watch);   /* TODO: should we look at return value or not? */
+        break;
+        
+      default:
+        assert(0);
+        ret = E_BUG;
+        break;
+      }
+
+      --events->priorities[fd_watch->w.priority].num_active_fd_watches;
+      fd_watch->active = STDFALSE;
+    }
+  }
+
+  return ret;
+}
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
+int E_events_set_elevate_count(E_events  *events,
+			       E_priority priority, 
+			       unsigned   count)
+{
+  if (priority < E_MIN_PRIORITY || priority > events->max_priority) {
+    return E_INVALID_PARAM;
+  }
+
+  if ((events->priorities[priority].starve_thresh = count) == 0) {
+    events->priorities[priority].starve_cnter = 0;
+  }
+
+  return 0;
+}
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
+int E_events_get_elevate_count(E_events  *events,
+			       E_priority priority,
+			       unsigned  *count)
+{
+  if (priority < E_MIN_PRIORITY || priority > events->max_priority) {
+    return E_INVALID_PARAM;
+  }
+
+  *count = events->priorities[priority].starve_thresh;
+
+  return 0;
+}
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
+int E_events_handle_events(E_events *events)
+{
+  int        ret          = 0;
+  E_priority dispatch_lvl = events->max_priority;
+  E_priority starve_lvl;
+
+  events->keep_running = STDTRUE;
+
+  do {
+
+    /* efficiently query watches for new notices */
+
+    if ((ret = E_events_populate_priority_queues(events, dispatch_lvl)) != 0) {
+      goto FAIL;
+    }
+
+    /* handle any "left over" starvation from previous iterations */
+
+    if ((ret = E_events_handle_starvation(events)) != 0) {
+
+      if (ret == E_REPOPULATE) {  /* need to repopulate priority queues to ensure we aren't starving empty levels */
+        ret = 0;
+        continue;                 /* NOTE: should only happen here if in a starved callback the user lowered active_threshold */
+      }
+
+      goto FAIL;
+    }
+
+    /* find the highest non-empty priority */
+
+    for (dispatch_lvl = events->max_priority; 
+         dispatch_lvl >= events->active_threshold && stddll_empty(&events->priorities[dispatch_lvl].notices); 
+         --dispatch_lvl);
+
+    /* dispatch some notices from dispatch_lvl */
+
+    while (events->keep_running && dispatch_lvl >= events->active_threshold && !stddll_empty(&events->priorities[dispatch_lvl].notices)) {
+
+      if ((ret = E_events_dispatch_notice(events, dispatch_lvl)) != 0) {
+        goto FAIL;
+      }
+
+      /* increment starvation counts for other priorities */
+
+      for (starve_lvl = events->max_priority; starve_lvl >= events->active_threshold; --starve_lvl) {
+
+        if (starve_lvl != dispatch_lvl && 
+            events->priorities[starve_lvl].starve_thresh != 0 &&
+            events->priorities[starve_lvl].num_active_fd_watches != 0) {
+          ++events->priorities[starve_lvl].starve_cnter;
+        }
+      }
+
+      /* handle any starvation */
+
+      if ((ret = E_events_handle_starvation(events)) != 0) {
+
+        if (ret == E_REPOPULATE) {  /* need to repopulate priority queues to ensure we aren't starving empty levels */
+          ret = 0;
+          continue;
+        }
+
+        goto FAIL;
+      }
+    }
+
+  } while (events->keep_running);
+
+  assert(ret == 0);
+  goto END;
+
+  /* error handling and return */
+
+ FAIL:
+  assert(ret != 0 && ret != E_REPOPULATE);
+
+ END:
+  return ret;
+}
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
+void E_events_exit_events(E_events *events)
+{
+  events->keep_running = STDFALSE;
+}
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/




More information about the Spread-cvs mailing list