[Spread-cvs] commit: r413 - in branches/events_testing: daemon include

jschultz at spread.org jschultz at spread.org
Fri May 15 21:38:32 EDT 2009


Author: jschultz
Date: 2009-05-15 21:38:32 -0400 (Fri, 15 May 2009)
New Revision: 413

Modified:
   branches/events_testing/daemon/events.c
   branches/events_testing/include/sp_events.h
Log:
Added more comments, rearranged order of some code, added E_fd_watch*'s as a value to select's lvl_info->fd_skls to 
remove the lookup of the watch.


Modified: branches/events_testing/daemon/events.c
===================================================================
--- branches/events_testing/daemon/events.c	2009-05-15 21:54:18 UTC (rev 412)
+++ branches/events_testing/daemon/events.c	2009-05-16 01:38:32 UTC (rev 413)
@@ -129,15 +129,15 @@
 
 typedef struct
 {
-  E_priority priority;
+  E_priority priority;           /* priority associated with this lvl */
   stddll     notices;            /* <E_watch*>: watches at this priority that have triggered / been noticed */
 
   unsigned   starve_thresh;      /* threshold that determines if/when this priority elevates to "preempt" execution */
   unsigned   starve_cnter;       /* increments once for every non-preempt notice dispatch from a different priority */
 
-  int        num_active_fd_watches;
+  int        num_active_fd_watches;  /* track how many active fd watches are registered at this particular priority */
 
-  void *     backend_lvl_info;
+  void *     backend_lvl_info;   /* backend specific information for this priority */
 
 } E_priority_info;
 
@@ -148,15 +148,15 @@
 
 typedef struct
 {
-  struct epoll_event * events;
-  int                  num_events;
+  struct epoll_event * events;         /* array into which to receive results from epoll_wait */
+  int                  num_events;     /* size of events array */
 
 } E_epoll_info;
 
 typedef struct
 {
-  int epoll_fd;
-  int num_poll_fds;
+  int epoll_fd;                        /* epoll fd that is tracking all fds with priority >= associated lvl */
+  int num_poll_fds;                    /* # of fds we currently have registered with epoll_fd */
 
 } E_epoll_lvl_info;
 
@@ -167,14 +167,14 @@
 
 typedef struct
 {
-  fd_set tmp_fd_sets[NUM_FDTYPES];
+  fd_set tmp_fd_sets[NUM_FDTYPES];     /* select sets used as a scratch pad when calling select */
 
 } E_select_info;
 
 typedef struct
 {
-  fd_set fd_sets[NUM_FDTYPES];
-  stdskl fd_skls[NUM_FDTYPES];   /* <int -> nil>: ordered by E_int_cmp */
+  fd_set fd_sets[NUM_FDTYPES];         /* select sets of all fds with priority >= associated lvl of the associated type */
+  stdskl fd_skls[NUM_FDTYPES];         /* <int -> E_fd_watch*>: sparse representation of same fds (ordered by E_int_cmp) */
 
 } E_select_lvl_info;
 
@@ -191,13 +191,13 @@
   sp_time           last_loop_time;    /* most recent time when we returned from an OS query */
   
   stdskl            time_watches;      /* <E_time_watch* -> nil>: ordered by E_time_watch_ptr_cmp: [fcn, code, data] */
-  stdskl            fd_watches;        /* <int -> E_fd_watch*>: clustered, not ordered, by [fd] */
+  stdskl            fd_watches;        /* <int -> E_fd_watch*>: fds ordered by E_int_cmp */
   
   E_priority_info * priorities;        /* array of size max_priority: notification Qs and other per priority lvl info */
   
   /* notification generation mechanisms */
   
-  stdskl            timeouts;          /* <sp_time -> E_time_watch*>: ordered by absolute time out */
+  stdskl            timeouts;          /* <sp_time -> E_time_watch*>: absolute timeouts ordered by E_sp_time_cmp */
 
   E_backend_type    backend_type;      /* method used by this sytem to query fds */
   void *            backend_info;      /* backend specific info for this system */
@@ -210,17 +210,20 @@
 static int     E_int_cmp(const void * left, const void * right);
 static int     E_time_watch_ptr_cmp(const void * left, const void * right);
 static int     E_sp_time_cmp(const void * left, const void * right);
+static sp_time E_get_time_low(void);
 
+static int     E_events_init(E_events * events);
+static void    E_events_fini(E_events * events);
+
+static stdbool E_events_is_valid(const E_events * events);
+
+static void    E_events_exit_events(E_events * events);
 static E_fd_watch * E_events_get_fd_watch(E_events * events, int fd, E_watch_type fd_type, stdit * sit);
 
 static sp_time E_events_get_time(E_events * events);
 static sp_time E_events_get_loop_time(E_events * events);
 static sp_time E_events_update_loop_time(E_events * events);
 
-static int     E_events_init(E_events * events);
-static void    E_events_fini(E_events * events);
-
-static void    E_events_exit_events(E_events * events);
 static int     E_events_handle_events(E_events * events);
 static int     E_events_populate_priority_queues(E_events * events, E_priority dispatch_lvl);
 static int     E_events_handle_starvation(E_events * events);
@@ -303,78 +306,36 @@
 /***************************************************************************************************************************
  ***************************************************************************************************************************/
 
-static E_fd_watch * E_events_get_fd_watch(E_events *   events,
-                                          int          fd,
-                                          E_watch_type fd_type,
-                                          stdit *      sit)
-{
-  E_fd_watch * fd_watch;
-
-  for (stdskl_lowerb(&events->fd_watches, sit, &fd); !stdskl_is_end(&events->fd_watches, sit); stdskl_it_next(sit)) {
-
-    fd_watch = *(E_fd_watch**) stdskl_it_val(sit);
-
-    if (fd_watch->fd != fd) {                 /* no such watch */
-      break;
-    }
-
-    if (fd_watch->w.watch_type == fd_type) {  /* exact match */
-      return fd_watch;
-    }
-  }
-
-  return NULL;
-}
-
-/***************************************************************************************************************************
- ***************************************************************************************************************************/
-
-static sp_time E_events_get_time(E_events * events)
+static sp_time E_get_time_low(void)
 #ifdef ARCH_PC_WIN95
 {
+  sp_time       ret;
   struct _timeb t;
 
   _ftime(&t);
 
-  events->last_clock_time.sec  = t.time;
-  events->last_clock_time.usec = (long) t.millitm * 1000;
+  ret.sec  = t.time;
+  ret.usec = (long) t.millitm * 1000;
 
-  return events->last_clock_time;
+  return ret;
 }
 #else
 {
+  sp_time        ret;
   struct timeval t;
 
   gettimeofday(&t, NULL);
 
-  events->last_clock_time.sec  = t.tv_sec;
-  events->last_clock_time.usec = t.tv_usec;
+  ret.sec  = t.tv_sec;
+  ret.usec = t.tv_usec;
 
-  return events->last_clock_time;
+  return ret;
 }
 #endif
 
 /***************************************************************************************************************************
  ***************************************************************************************************************************/
 
-static sp_time E_events_get_loop_time(E_events * events)
-{
-  return events->last_loop_time;
-}
-
-/***************************************************************************************************************************
- ***************************************************************************************************************************/
-
-static sp_time E_events_update_loop_time(E_events * events)
-{
-  events->last_loop_time = E_events_get_time(events);
-
-  return events->last_loop_time;
-}
-
-/***************************************************************************************************************************
- ***************************************************************************************************************************/
-
 static int E_events_init(E_events * events)
 {
   E_priority_info * info;
@@ -387,7 +348,7 @@
   events->max_priority     = HIGH_PRIORITY;
   events->active_threshold = LOW_PRIORITY;
 
-  E_events_update_loop_time(events);         /* sets events->last_clock_time and events->last_loop_time */
+  events->last_loop_time = events->last_clock_time = E_get_time_low();
 
   if (stdskl_construct(&events->time_watches, sizeof(E_time_watch*), 0, E_time_watch_ptr_cmp) != 0) {
     ret = E_ALLOC_FAILURE;
@@ -512,6 +473,56 @@
 /***************************************************************************************************************************
  ***************************************************************************************************************************/
 
+static E_fd_watch * E_events_get_fd_watch(E_events *   events,
+                                          int          fd,
+                                          E_watch_type fd_type,
+                                          stdit *      sit)
+{
+  E_fd_watch * fd_watch;
+
+  for (stdskl_lowerb(&events->fd_watches, sit, &fd); !stdskl_is_end(&events->fd_watches, sit); stdskl_it_next(sit)) {
+
+    fd_watch = *(E_fd_watch**) stdskl_it_val(sit);
+
+    if (fd_watch->fd != fd) {                 /* no such watch */
+      break;
+    }
+
+    if (fd_watch->w.watch_type == fd_type) {  /* exact match */
+      return fd_watch;
+    }
+  }
+
+  return NULL;
+}
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
+static sp_time E_events_get_time(E_events * events)
+{
+  return events->last_clock_time = E_get_time_low();
+}
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
+static sp_time E_events_get_loop_time(E_events * events)
+{
+  return events->last_loop_time;
+}
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
+static sp_time E_events_update_loop_time(E_events * events)
+{
+  return events->last_loop_time = E_events_get_time(events);
+}
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
 static int E_events_handle_events(E_events * events)
 {
   int        ret          = 0;
@@ -558,7 +569,7 @@
 
       for (starve_lvl = events->max_priority; starve_lvl >= events->active_threshold; --starve_lvl) {
 
-        if (events->priorities[starve_lvl].starve_thresh != 0 && starve_lvl != dispatch_lvl) {
+        if (starve_lvl != dispatch_lvl && events->priorities[starve_lvl].starve_thresh != 0) {
           ++events->priorities[starve_lvl].starve_cnter;
         }
       }
@@ -598,24 +609,23 @@
 static int E_events_populate_priority_queues(E_events * events, 
                                              E_priority dispatch_lvl)
 {
-  E_priority starve_lvl;
-  E_priority query_lvl;
-  E_priority queried_lvl = events->max_priority;
-  E_watch *  watch;
-  sp_time    timeout;
-  sp_time *  timeout_ptr = &timeout;
-  sp_time *  abs_timeout;
-  E_priority tmp_lvl;
-  stdit      sit;
-  stdit      qit;
-  int        ret;
+  E_priority      starve_lvl;
+  E_priority      query_lvl;
+  E_priority      queried_lvl = events->max_priority;
+  E_watch *       watch;
+  sp_time         timeout_dmy;
+  sp_time *       timeout     = &timeout_dmy;
+  const sp_time * abs_timeout;
+  E_priority      tmp_lvl;
+  stdit           sit;
+  stdit           qit;
+  int             ret;
 
   /* We postpone querying a non-empty priority level's (and all lower
-     levels too) fds until its dispatch queue is completey drained.
-     This ensures we do not query fds (and process any redundant
-     results) for which notifications are already waiting in the
-     dispatch queues.  This approach also allows us to use the
-     smallest query sets possible, which often speeds up queries.
+     levels too) fds until its notification queue is completely
+     drained.  This approach also allows us to use the smallest query
+     sets possible, which often speeds up queries and processing their
+     results.
 
      However, we may lower the level to which we query in order to
      check for potentially starving notices at those lower levels.  We
@@ -639,16 +649,12 @@
   }
 
   /* dispatch_lvl was the highest non-empty priority the last time we dispatched notices */
+  /* if we didn't fully drain dispatch_lvl then just query higher priorities */
 
-  /* NOTE: dispatch_lvl can go below active_threshold if user changes
-     the threshold in a callback; can also go to E_MIN_PRIORITY - 1
-     when the active_threshold is E_MIN_PRIORITY and there are no
-     notices to dispatch
+  /* NOTE: dispatch_lvl can go below active_threshold if user raises
+     the threshold in a callback or if there are no notices left
   */
 
-  /* if we didn't drain dispatch_lvl then just query higher events->priorities */
-  /* query the minimum between that and what starvation requires */
-
   assert(dispatch_lvl <= events->max_priority && dispatch_lvl >= E_MIN_PRIORITY - 1);  
 
   if (dispatch_lvl < events->active_threshold) {
@@ -658,8 +664,9 @@
     query_lvl = (!stddll_empty(&events->priorities[dispatch_lvl].notices) ? dispatch_lvl + 1 : dispatch_lvl);
   }
 
+  /* query the minimum between what dispatch_lvl indicates and what starvation requires */
+
   query_lvl = STDMIN(query_lvl, starve_lvl);
-
   assert(query_lvl <= events->max_priority && query_lvl >= events->active_threshold && query_lvl >= E_MIN_PRIORITY);
 
   /* figure out an appropriate (delta) timeout and query backend implementation */
@@ -670,42 +677,42 @@
 
     if (query_lvl != events->active_threshold) {
 
-      timeout_ptr->sec  = 0;
-      timeout_ptr->usec = 0;
+      timeout->sec  = 0;
+      timeout->usec = 0;
 
-    } else if (!stdskl_empty(&events->timeouts)) {  /* else calcualte an appropriate timeout */
+    } else if (!stdskl_empty(&events->timeouts)) {  /* else calculate an appropriate timeout */
 
-      abs_timeout = (sp_time*) stdskl_it_key(stdskl_begin(&events->timeouts, &sit));
+      abs_timeout = (const sp_time*) stdskl_it_key(stdskl_begin(&events->timeouts, &sit));
 
       E_events_update_loop_time(events);
 
-      timeout_ptr->sec  = abs_timeout->sec - events->last_loop_time.sec;
-      timeout_ptr->usec = abs_timeout->usec - events->last_loop_time.usec;
+      timeout->sec  = abs_timeout->sec  - events->last_loop_time.sec;
+      timeout->usec = abs_timeout->usec - events->last_loop_time.usec;
       
-      if (timeout_ptr->usec < 0) {
-        timeout_ptr->usec += 1000000;
-        --timeout_ptr->sec;
+      if (timeout->usec < 0) {
+        timeout->usec += 1000000;
+        --timeout->sec;
       }
 
-      assert(timeout_ptr->usec >= 0 && timeout_ptr->usec < 1000000);
+      assert(timeout->usec >= 0 && timeout->usec < 1000000);
       
-      if (timeout_ptr->sec < 0) {  /* earliest timeout has already expired */
-        timeout_ptr->sec  = 0;
-        timeout_ptr->usec = 0;
+      if (timeout->sec < 0) {  /* earliest timeout has already expired */
+        timeout->sec  = 0;
+        timeout->usec = 0;
       }
 
-    } else {  /* NOTE: backend query mechanism must return an error if not watching any fds and timeout_ptr == NULL to avoid sleeping forever */
-      timeout_ptr = NULL;
+    } else {  /* NOTE: backend query mechanism must return an error if not watching any fds and timeout == NULL to avoid sleeping forever */
+      timeout = NULL;
     }
 
     switch (events->backend_type) {
 
     case E_BACKEND_SELECT:
-      ret = E_events_select_query(events, query_lvl, timeout_ptr, &queried_lvl);
+      ret = E_events_select_query(events, query_lvl, timeout, &queried_lvl);
       break;
       
     case E_BACKEND_EPOLL:
-      ret = E_events_epoll_query(events, query_lvl, timeout_ptr, &queried_lvl);
+      ret = E_events_epoll_query(events, query_lvl, timeout, &queried_lvl);
       break;
 
     default:
@@ -730,7 +737,6 @@
   for (stdskl_begin(&events->timeouts, &sit); !stdskl_it_eq(&sit, &qit); stdskl_it_next(&sit)) {
 
     watch = *(E_watch**) stdskl_it_val(&sit);
-
     assert(!watch->noticed);
 
     if (stddll_push_back(&events->priorities[watch->priority].notices, &watch)) {
@@ -808,7 +814,6 @@
                                     E_priority dispatch_lvl)
 {
   int       ret = 0;
-  E_watch   watch_cpy;
   E_watch * watch;
   stdit     dit;
 
@@ -826,22 +831,28 @@
   events->priorities[dispatch_lvl].starve_cnter = 0;  /* reset starvation counter on this level as we are about to "feed" it */
 
   /* execute the callback depending on watch type */
-  
-  if (watch->watch_type == READ_FD || watch->watch_type == WRITE_FD || watch->watch_type == EXCEPT_FD) {
 
+  switch (watch->watch_type) {
+
+  case READ_FD:
+  case WRITE_FD:
+  case EXCEPT_FD:
     ((E_fd_callback_fcn) watch->callback_fcn)(((E_fd_watch*) watch)->fd, watch->callback_code, watch->callback_data);
+    break;
 
-  } else {
+  case E_DELTA_TIMER:
+  case E_PERIODIC_TIMER:
+  case E_ABSOLUTE_TIMER: {
     E_time_watch * t_watch = (E_time_watch*) watch;
+    E_watch        watch_cpy;
 
-    assert(watch->watch_type == E_DELTA_TIMER || watch->watch_type == E_ABSOLUTE_TIMER || watch->watch_type == E_PERIODIC_TIMER);
     assert(!t_watch->in_timeouts);
 
     if (watch->watch_type != E_PERIODIC_TIMER) {
 
-      watch_cpy = *watch;                                                                   /* copy so cb params remain valid */
+      watch_cpy = *watch;                                                                   /* copy cb params */
       E_events_cancel_watch(events, watch);                                                 /* one shot watch: remove it */
-      watch = &watch_cpy;
+      watch = &watch_cpy;                                                                   /* watch no longer valid -> cpy is valid */
 
     } else {
       /* TODO: should we use E_events_get_time(),
@@ -862,8 +873,15 @@
     }
 
     ((E_time_callback_fcn) watch->callback_fcn)(watch->callback_code, watch->callback_data);
+    break;
   }
 
+  default:
+    assert(0);
+    ret = E_BUG;
+    goto FAIL;
+  }
+  
   assert(ret == 0);
   goto END;
 
@@ -894,7 +912,7 @@
 
   /* validate input */
 
-  if (E_compare_time(next_timeout, timeout) > 0 ||  /* negative timeouts not allowed */
+  if (E_compare_time(next_timeout, timeout) > 0 ||  /* negative timeouts are illegal for all timer types */
       priority > events->max_priority ||
       priority < E_MIN_PRIORITY) {
     ret = E_INVALID_PARAM;
@@ -907,7 +925,9 @@
 
   case E_DELTA_TIMER:
   case E_PERIODIC_TIMER:
-    /* TODO: should we use E_events_get_time(), events->last_clock_time or events->last_loop_time? */
+    /* TODO: should we use E_events_get_time(), events->last_clock_time or events->last_loop_time? 
+       Remember about !events->keep_running case: before or not during an event loop who knows how old those last two are? 
+    */
     next_timeout = E_add_time(E_events_get_time(events), timeout);
     break;
 
@@ -1125,11 +1145,11 @@
       switch (events->backend_type) {
 
       case E_BACKEND_SELECT:
-        E_events_select_deactivate_watch(events, fd_watch);
+        E_events_select_deactivate_watch(events, fd_watch);  /* TODO: should we look at return value or not? */
         break;
         
       case E_BACKEND_EPOLL:
-        E_events_epoll_deactivate_watch(events, fd_watch);
+        E_events_epoll_deactivate_watch(events, fd_watch);   /* TODO: should we look at return value or not? */
         break;
         
       default:
@@ -1216,7 +1236,7 @@
   E_time_watch   time_watch_impl;
   E_time_watch * time_watch = &time_watch_impl;
   stdit          sit;
-  int            ret = -1;  /* backwards compatability */
+  int            ret        = -1;  /* backwards compatability */
 
   /* look up any existing time watch and cancel it */
 
@@ -1246,8 +1266,8 @@
   /* cancel any previously existing watch */
 
   if ((fd_watch = E_events_get_fd_watch(events, fd, fd_type, &sit)) != NULL) {
+    E_events_cancel_watch(events, (E_watch*) fd_watch);
     ret = 0;
-    E_events_cancel_watch(events, (E_watch*) fd_watch);
   }
 
   return ret;
@@ -1260,7 +1280,7 @@
                                          E_priority priority)
 {
   if (priority < E_MIN_PRIORITY || priority > events->max_priority) {
-    return -1;  /* backwards compatability */
+    return -1;      /* backwards compatability */
   }
 
   events->active_threshold = priority;
@@ -1428,7 +1448,7 @@
 
       FD_ZERO(&lvl_info->fd_sets[fd_type]);
 
-      if (stdskl_construct(&lvl_info->fd_skls[fd_type], sizeof(int), 0, E_int_cmp) != 0) {
+      if (stdskl_construct(&lvl_info->fd_skls[fd_type], sizeof(int), sizeof(E_fd_watch*), E_int_cmp) != 0) {
         ret = E_ALLOC_FAILURE;
         goto FAIL_LVL_INFO;
       }
@@ -1533,7 +1553,7 @@
     assert(!FD_ISSET(fd_watch->fd, &lvl_info->fd_sets[fd_watch->w.watch_type]));
     assert(!stdskl_contains(&lvl_info->fd_skls[fd_watch->w.watch_type], &fd_watch->fd));
 
-    if (stdskl_put(&lvl_info->fd_skls[fd_watch->w.watch_type], &sit, &fd_watch->fd, NULL, STDFALSE) != 0) {
+    if (stdskl_put(&lvl_info->fd_skls[fd_watch->w.watch_type], &sit, &fd_watch->fd, &fd_watch, STDFALSE) != 0) {
       ret = E_ALLOC_FAILURE;
       goto FAIL;
     }
@@ -1600,6 +1620,132 @@
 /***************************************************************************************************************************
  ***************************************************************************************************************************/
 
+static int E_events_select_query(E_events *      events, 
+                                 E_priority      query_lvl, 
+                                 const sp_time * delta_timeout, 
+                                 E_priority *    queried_lvl)
+{
+  int                 ret         = 0;
+  E_select_info *     info        = (E_select_info*) events->backend_info;
+  E_select_lvl_info * lvl_info    = (E_select_lvl_info*) events->priorities[query_lvl].backend_lvl_info;
+  int                 nfds        = 0;
+  fd_set *            fd_sets[3]  = { NULL, NULL, NULL };
+  struct timeval      timeout_dmy = { 0, 0 };
+  struct timeval *    timeout     = &timeout_dmy;
+  E_watch_type        fd_type;
+  E_fd_watch *        fd_watch;
+  int                 tmp_fd;
+  stdit               sit;
+
+  /* set up select's inputs: nfds, fd_sets, timeout */
+
+  for (fd_type = 0; fd_type != NUM_FDTYPES; ++fd_type) {
+
+    if (!stdskl_empty(&lvl_info->fd_skls[fd_type])) {  /* make a tmp copy of lvl_info's fd set in info */
+
+      fd_sets[fd_type]           = &info->tmp_fd_sets[fd_type];
+      info->tmp_fd_sets[fd_type] = lvl_info->fd_sets[fd_type];
+      tmp_fd                     = 1 + *(int*) stdskl_it_key(stdskl_last(&lvl_info->fd_skls[fd_type], &sit));
+      nfds                       = STDMAX(nfds, tmp_fd);
+      assert(tmp_fd >= 1);
+    }
+  }
+
+  if (delta_timeout != NULL) {
+    timeout->tv_sec  = delta_timeout->sec;
+    timeout->tv_usec = delta_timeout->usec;
+
+  } else if (nfds != 0) {
+    timeout = NULL;
+
+  } else {
+    ret = E_DEADEND;                                /* error on infinite timeout and no fds to watch */
+    goto FAIL;
+  }
+
+  /* query select */
+
+  if ((ret = select(nfds, fd_sets[READ_FD], fd_sets[WRITE_FD], fd_sets[EXCEPT_FD], timeout)) < 0) {
+
+    ret = errno;
+    assert(ret != 0);
+
+    switch (ret) {
+
+    case EINTR:
+      ret = E_INTERRUPTED;    /* will cause this fcn to be called again */
+      goto FAIL;
+
+    case EBADF:
+      ret = E_INVALID_PARAM;  /* bad fd or fd w/ an error in a set */
+      goto FAIL;
+
+    case EINVAL:
+      assert(0);
+      ret = E_BUG;
+      goto FAIL;
+
+    default:
+      ret = E_SYS_FAILURE;
+      goto FAIL;
+    }
+  }
+
+  /* process the result sets */ 
+
+  /* TODO: this always processes in order by fd #, which ever so
+     slightly favors the lower # fds (remember we use fifo queues for
+     storing notifications across queries) -- should we randomize
+     where we start to make it more fair here somehow?
+  */
+
+  for (fd_type = 0; ret != 0 && fd_type != NUM_FDTYPES; ++fd_type) {
+
+    for (stdskl_begin(&lvl_info->fd_skls[fd_type], &sit); 
+         ret != 0 && !stdskl_is_end(&lvl_info->fd_skls[fd_type], &sit); 
+         stdskl_it_next(&sit)) {
+
+      tmp_fd = *(int*) stdskl_it_key(&sit);
+
+      if (!FD_ISSET(tmp_fd, fd_sets[fd_type])) {
+        continue;
+      }
+
+      --ret;  /* decrement count of fd bits set */
+
+      fd_watch = *(E_fd_watch**) stdskl_it_val(&sit);
+      assert(tmp_fd == fd_watch->fd && fd_type == fd_watch->w.watch_type);
+
+      if (fd_watch->w.noticed) {  /* already noticed and in notification queue */
+        continue;
+      }
+
+      if (stddll_insert(&events->priorities[fd_watch->w.priority].notices, 
+                        stddll_end(&events->priorities[fd_watch->w.priority].notices, &fd_watch->w.notice_it), 
+                        &fd_watch) != 0) {
+        ret = E_ALLOC_FAILURE;
+        goto FAIL;
+      }
+    
+      fd_watch->w.noticed = STDTRUE;      
+    }
+  }
+
+  assert(ret == 0);
+  goto END;
+
+  /* error handling and return */
+
+ FAIL:
+  assert(ret != 0);
+
+ END:
+  return ret;
+}
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
 static int E_events_epoll_init(E_events * events)
 #ifdef HAVE_LINUX
 {
@@ -1632,8 +1778,11 @@
 
     if ((lvl_info->epoll_fd = epoll_create(E_EPOLL_INITIAL_SIZE)) < 0) {
 
-      switch (errno) {
+      ret = errno;
+      assert(ret != 0);
 
+      switch (ret) {
+
       case EINVAL:
         assert(0);
         ret = E_BUG;
@@ -1752,6 +1901,9 @@
   stdit              sit;
   int                ret = 0;
 
+  assert(fd_watch == E_events_get_fd_watch(events, fd_watch->fd, fd_watch->w.watch_type, &sit));
+  assert(fd_watch->w.priority >= E_MIN_PRIORITY && fd_watch->w.priority <= events->max_priority);
+  assert(fd_watch->w.watch_type >= 0 && fd_watch->w.watch_type <= NUM_FDTYPES);
   assert(!fd_watch->active);
 
   epoll_evnt.data.fd = fd_watch->fd;
@@ -1846,7 +1998,7 @@
 
       case ENOENT:  /* can happen if watched fd's were close()'d, which epoll notices, without the user calling E_detach_fd() */
         assert(epoll_op == EPOLL_CTL_MOD);
-        epoll_op = EPOLL_CTL_ADD;  /* call again with ADD instead of MOD */
+        epoll_op = EPOLL_CTL_ADD;  /* call again with ADD instead of MOD */  /* TODO: user error -- should we just error out instead? */
         goto RETRY;
 
       case EEXIST:  /* bug either in epoll_ctl or in our code: shouldn't happen */
@@ -1855,7 +2007,7 @@
         ret = E_BUG;  
         goto FAIL;
 
-      case EBADF:  /* bad fd from user */
+      case EBADF:   /* bad fd from user */
       case EPERM:  
         ret = E_INVALID_PARAM;
         goto FAIL;
@@ -1916,6 +2068,14 @@
   stdit              sit;
   int                ret = 0;
 
+  assert(fd_watch == E_events_get_fd_watch(events, fd_watch->fd, fd_watch->w.watch_type, &sit));
+  assert(fd_watch->w.priority >= E_MIN_PRIORITY && fd_watch->w.priority <= events->max_priority);
+  assert(fd_watch->w.watch_type >= 0 && fd_watch->w.watch_type <= NUM_FDTYPES);
+
+  /* NOTE: we can't assert(fd_watch->active) bc we use this fcn to
+     clean up errors in activate_watch()
+  */
+
   epoll_evnt.data.fd = fd_watch->fd;
   stdskl_lowerb(&events->fd_watches, &fd_it, &fd_watch->fd);  /* find the beginning of all the watches on the fd */
 
@@ -1986,7 +2146,7 @@
       case EEXIST:  /* bug either in epoll_ctl or in our code: shouldn't happen */
       case ENOMEM:  /* shouldn't happen if Linux has any brains */
       default:
-        ret = 0;    /* ignore errors */
+        ret = 0;    /* ignore errors */  /* TODO: should we really swallow all errors? */
         break;
       }
     }
@@ -2044,7 +2204,7 @@
     assert(tmp_timeout >= 0.0);
 
     if (tmp_timeout <= INT_MAX) {
-      timeout = (int) tmp_timeout;
+      timeout = (int) tmp_timeout;                            /* TODO: should we round down like this? */
 
     } else {
       timeout = INT_MAX;
@@ -2069,6 +2229,7 @@
     
     info->events     = evnt;
     info->num_events = 2 * lvl_info->num_poll_fds;
+    assert(info->num_events >= lvl_info->num_poll_fds);
   }
 
   /* query epoll */
@@ -2166,125 +2327,6 @@
 /***************************************************************************************************************************
  ***************************************************************************************************************************/
 
-static int E_events_select_query(E_events *      events, 
-                                 E_priority      query_lvl, 
-                                 const sp_time * delta_timeout, 
-                                 E_priority *    queried_lvl)
-{
-  int                 ret         = 0;
-  E_select_info *     info        = (E_select_info*) events->backend_info;
-  E_select_lvl_info * lvl_info    = (E_select_lvl_info*) events->priorities[query_lvl].backend_lvl_info;
-  int                 nfds        = 0;
-  fd_set *            fd_sets[3]  = { NULL, NULL, NULL };
-  struct timeval      timeout_dmy = { 0, 0 };
-  struct timeval *    timeout     = &timeout_dmy;
-  E_watch_type        fd_type;
-  E_fd_watch *        fd_watch;
-  int                 tmp_fd;
-  stdit               sit;
-  stdit               tmp_it;
-
-  /* set up select's inputs: nfds, fd_sets, timeout */
-
-  for (fd_type = 0; fd_type != NUM_FDTYPES; ++fd_type) {
-
-    if (!stdskl_empty(&lvl_info->fd_skls[fd_type])) {  /* make a tmp copy of lvl_info's fd set in info */
-
-      fd_sets[fd_type]           = &info->tmp_fd_sets[fd_type];
-      info->tmp_fd_sets[fd_type] = lvl_info->fd_sets[fd_type];
-      tmp_fd                     = 1 + *(int*) stdskl_it_key(stdskl_last(&lvl_info->fd_skls[fd_type], &sit));
-      nfds                       = STDMAX(nfds, tmp_fd);
-      assert(tmp_fd > 0);
-    }
-  }
-
-  if (delta_timeout != NULL) {
-    timeout->tv_sec  = delta_timeout->sec;
-    timeout->tv_usec = delta_timeout->usec;
-
-  } else if (nfds != 0) {
-    timeout = NULL;
-
-  } else {
-    ret = E_DEADEND;                                /* error on infinite timeout and no fds to watch */
-    goto FAIL;
-  }
-
-  /* query select */
-
-  if ((ret = select(nfds, fd_sets[READ_FD], fd_sets[WRITE_FD], fd_sets[EXCEPT_FD], timeout)) < 0) {
-
-    ret = errno;
-    assert(ret != 0);
-
-    switch (ret) {
-
-    case EINTR:
-      ret = E_INTERRUPTED;    /* will cause this fcn to be called again */
-      goto FAIL;
-
-    case EBADF:
-      ret = E_INVALID_PARAM;  /* bad fd or fd w/ an error in a set */
-      goto FAIL;
-
-    case EINVAL:
-      assert(0);
-      ret = E_BUG;
-      goto FAIL;
-
-    default:
-      ret = E_SYS_FAILURE;
-      goto FAIL;
-    }
-  }
-
-  /* process the result sets */
-
-  for (fd_type = 0; ret != 0 && fd_type != NUM_FDTYPES; ++fd_type) {
-
-    for (stdskl_begin(&lvl_info->fd_skls[fd_type], &sit); ret != 0 && !stdskl_is_end(&lvl_info->fd_skls[fd_type], &sit); stdskl_it_next(&sit)) {
-
-      tmp_fd = *(int*) stdskl_it_key(&sit);
-
-      if (!FD_ISSET(tmp_fd, fd_sets[fd_type])) {
-        continue;
-      }
-
-      --ret;
-
-      fd_watch = E_events_get_fd_watch(events, tmp_fd, fd_type, &tmp_it);
-      assert(fd_watch != NULL);
-
-      if (fd_watch->w.noticed) {
-        continue;
-      }
-
-      if (stddll_insert(&events->priorities[fd_watch->w.priority].notices, 
-                        stddll_end(&events->priorities[fd_watch->w.priority].notices, &fd_watch->w.notice_it), 
-                        &fd_watch) != 0) {
-        ret = E_ALLOC_FAILURE;
-        goto FAIL;
-      }
-    
-      fd_watch->w.noticed = STDTRUE;      
-    }
-  }
-
-  assert(ret == 0);
-  goto END;
-
-  /* error handling and return */
-
- FAIL:
-  assert(ret != 0);
-
- END:
-  return ret;
-}
-
-/***************************************************************************************************************************
- ***************************************************************************************************************************/
-
 sp_time E_get_time(void)
 {
   return E_events_get_time(&Events);

Modified: branches/events_testing/include/sp_events.h
===================================================================
--- branches/events_testing/include/sp_events.h	2009-05-15 21:54:18 UTC (rev 412)
+++ branches/events_testing/include/sp_events.h	2009-05-16 01:38:32 UTC (rev 413)
@@ -46,12 +46,12 @@
 
 #define MAX_FD_EVENTS   2000
 
-#define NUM_PRIORITY    3
+#define NUM_PRIORITY    3  /* NOTE: the priority levels must remain 0 based */
 #define LOW_PRIORITY    0
 #define MEDIUM_PRIORITY 1
 #define HIGH_PRIORITY   2
 
-#define NUM_FDTYPES     3
+#define NUM_FDTYPES     3  /* NOTE: the fd types must remain 0 based */
 #define READ_FD         0
 #define WRITE_FD        1
 #define EXCEPT_FD       2




More information about the Spread-cvs mailing list