[Spread-cvs] commit: r414 - in branches/events_testing: daemon examples

jschultz at spread.org jschultz at spread.org
Sun May 17 02:08:51 EDT 2009


Author: jschultz
Date: 2009-05-17 02:08:51 -0400 (Sun, 17 May 2009)
New Revision: 414

Added:
   branches/events_testing/daemon/events.old.c
   branches/events_testing/daemon/microbench_events_pipes.c
Modified:
   branches/events_testing/daemon/Makefile.in
   branches/events_testing/daemon/events.c
   branches/events_testing/examples/Makefile.in
Log:
Adding old version of events back as events.old.c.  Adding new test/benchmark program for events. Significant 
bug fixes for epoll version of events.  Seems to be working now.


Modified: branches/events_testing/daemon/Makefile.in
===================================================================
--- branches/events_testing/daemon/Makefile.in	2009-05-16 01:38:32 UTC (rev 413)
+++ branches/events_testing/daemon/Makefile.in	2009-05-17 06:08:51 UTC (rev 414)
@@ -31,7 +31,7 @@
 CFLAGS=@CFLAGS@
 CPPFLAGS=-I. -I$(srcdir) -I$(top_srcdir)/include -I../stdutil/src -I$(top_srcdir)/stdutil/src @CPPFLAGS@ $(PATHS) @DEFS@
 LDFLAGS=@LDFLAGS@ 
-LIBS=@LIBS@ 
+LIBS=../stdutil/lib/libstdutil.a @LIBS@ 
 THLDFLAGS=@THLDFLAGS@
 THLIBS=@THLIBS@
 LEX=@LEX@
@@ -73,8 +73,8 @@
 #y.tab.c:        config_parse.y
 #	$(YACC) -d config_parse.y
 
-spread$(EXEEXT): $(SPREADOBJS) ../stdutil/lib/libstdutil-threaded-release.a
-	$(LD) -o $@ $(SPREADOBJS) ../stdutil/lib/libstdutil-threaded-release.a $(LDFLAGS) $(LIBS)
+spread$(EXEEXT): $(SPREADOBJS)
+	$(LD) -o $@ $(SPREADOBJS) $(LDFLAGS) $(LIBS)
 
 spmonitor$(EXEEXT): $(MONITOR_OBJS)
 	$(LD) -o $@ $(MONITOR_OBJS) $(LDFLAGS) $(LIBS) 
@@ -90,6 +90,12 @@
 sprecv$(EXEEXT): r.o alarm.o data_link.o
 	$(LD) -o $@ r.o alarm.o data_link.o $(LDFLAGS) $(LIBS)
 
+testevents$(EXEEXT): microbench_events_pipes.o events.o
+	$(LD) -o $@ microbench_events_pipes.o events.o $(LDFLAGS) $(LIBS)
+
+testevents_old$(EXEEXT): microbench_events_pipes.o events.old.o memory.o alarm.o
+	$(LD) -o $@ microbench_events_pipes.o events.old.o memory.o alarm.o $(LDFLAGS) $(LIBS)
+
 clean:
 	rm -f *.lo *.tlo *.to *.o *.a *.dylib $(TARGETS) spsimple_user
 	rm -f config.cache config.log docs/*.out core

Modified: branches/events_testing/daemon/events.c
===================================================================
--- branches/events_testing/daemon/events.c	2009-05-16 01:38:32 UTC (rev 413)
+++ branches/events_testing/daemon/events.c	2009-05-17 06:08:51 UTC (rev 414)
@@ -1,6 +1,6 @@
 #include "arch.h"
 
-/*#define HAVE_LINUX*/
+#define HAVE_LINUX
 
 #include <stdlib.h>
 #include <stdio.h>
@@ -340,6 +340,7 @@
 {
   E_priority_info * info;
   E_priority        priority;
+  E_backend_type    backend_type;
   int               ret = 0;
 
   memset(events, 0, sizeof(*events));
@@ -387,17 +388,37 @@
   }
 
   events->backend_type = E_BACKEND_UNSPECIFIED;
-
+  
 #ifdef HAVE_LINUX
-  ret = E_events_epoll_init(events);
+  backend_type = E_BACKEND_EPOLL;
 #else
-  ret = E_events_select_init(events);
+  backend_type = E_BACKEND_SELECT;
 #endif
 
+  switch (backend_type) {
+
+  case E_BACKEND_SELECT:
+    ret = E_events_select_init(events);
+    break;
+
+  case E_BACKEND_EPOLL:
+    ret = E_events_epoll_init(events);
+    break;
+
+  default:
+    assert(0);
+    ret = E_BUG;
+    break;
+  }
+
   if (ret != 0) {
     goto FAIL_TIMEOUTS;
   }
 
+  if (!E_events_is_valid(events)) {
+    assert(0);
+  }
+
   assert(ret == 0);
   goto END;
 
@@ -434,12 +455,21 @@
   E_priority priority;
   stdit      sit;
   
-#ifdef HAVE_LINUX
-  E_events_epoll_fini(events);
-#else
-  E_events_select_fini(events);
-#endif
+  switch (events->backend_type) {
 
+  case E_BACKEND_SELECT:
+    E_events_select_fini(events);
+    break;
+
+  case E_BACKEND_EPOLL:
+    E_events_epoll_fini(events);
+    break;
+
+  default:
+    assert(0);
+    break;
+  }
+
   stdskl_destruct(&events->timeouts);
 
   for (priority = E_MIN_PRIORITY; priority <= events->max_priority; ++priority) {
@@ -465,6 +495,14 @@
 /***************************************************************************************************************************
  ***************************************************************************************************************************/
 
+static stdbool E_events_is_valid(const E_events * events)
+{
+  return STDTRUE;
+}
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
 static void E_events_exit_events(E_events * events)
 {
   events->keep_running = STDFALSE;
@@ -1437,7 +1475,7 @@
     goto FAIL;
   }
 
-  for (priority = E_MIN_PRIORITY; priority != events->max_priority; ++priority) {
+  for (priority = E_MIN_PRIORITY; priority <= events->max_priority; ++priority) {
 
     if ((lvl_info = (E_select_lvl_info*) calloc(1, sizeof(E_select_lvl_info))) == NULL) {
       ret = E_ALLOC_FAILURE;
@@ -1508,7 +1546,7 @@
 
   assert(events->backend_type == E_BACKEND_SELECT && events->backend_info != NULL);
 
-  for (priority = E_MIN_PRIORITY; priority != events->max_priority; ++priority) {
+  for (priority = E_MIN_PRIORITY; priority <= events->max_priority; ++priority) {
 
     lvl_info = (E_select_lvl_info*) events->priorities[priority].backend_lvl_info;
     assert(lvl_info != NULL);
@@ -1584,7 +1622,9 @@
 {
   E_priority          lvl;
   E_select_lvl_info * lvl_info;
+#ifndef NDEBUG
   stdit               sit;
+#endif
 
   assert(fd_watch == E_events_get_fd_watch(events, fd_watch->fd, fd_watch->w.watch_type, &sit));
   assert(fd_watch->w.priority >= E_MIN_PRIORITY && fd_watch->w.priority <= events->max_priority);
@@ -1740,6 +1780,8 @@
   assert(ret != 0);
 
  END:
+  *queried_lvl = query_lvl;
+
   return ret;
 }
 
@@ -1757,6 +1799,8 @@
   assert(events->backend_type == E_BACKEND_UNSPECIFIED);
   assert(E_EPOLL_INITIAL_SIZE > 0);
 
+  /*fprintf(stdout, "EPOLLIN = 0x%x; EPOLLOUT = 0x%x; EPOLLPRI = 0x%x\r\n", EPOLLIN, EPOLLOUT, EPOLLPRI);*/
+
   if ((info = (E_epoll_info*) calloc(1, sizeof(E_epoll_info))) == NULL) {
     ret = E_ALLOC_FAILURE;
     goto FAIL;
@@ -1769,7 +1813,7 @@
 
   info->num_events = E_EPOLL_INITIAL_SIZE;
 
-  for (priority = E_MIN_PRIORITY; priority != events->max_priority; ++priority) {
+  for (priority = E_MIN_PRIORITY; priority <= events->max_priority; ++priority) {
 
     if ((lvl_info = (E_epoll_lvl_info*) calloc(1, sizeof(E_epoll_lvl_info))) == NULL) {
       ret = E_ALLOC_FAILURE;
@@ -1860,7 +1904,7 @@
 
   assert(events->backend_type == E_BACKEND_EPOLL && events->backend_info != NULL);
 
-  for (priority = E_MIN_PRIORITY; priority != events->max_priority; ++priority) {
+  for (priority = E_MIN_PRIORITY; priority <= events->max_priority; ++priority) {
 
     lvl_info = (E_epoll_lvl_info*) events->priorities[priority].backend_lvl_info;
     assert(lvl_info != NULL);
@@ -1911,9 +1955,9 @@
 
   /* go down to the watch's priority lvl adding the new watch to each priority's epoll fd sets */
 
-  for (lvl = events->max_priority; lvl >= fd_watch->w.priority; --lvl) {
+  for (lvl = E_MIN_PRIORITY; lvl <= fd_watch->w.priority; ++lvl) {
 
-    lvl_info = (E_epoll_lvl_info*) events->priorities[fd_watch->w.priority].backend_lvl_info;
+    lvl_info = (E_epoll_lvl_info*) events->priorities[lvl].backend_lvl_info;
 
     /* calculate what events we should query for at this priority lvl for all the other watches on this fd */
 
@@ -1989,6 +2033,10 @@
   RETRY:                          /* allow a single retry w/ a different epoll_op in the case of ENOENT (see below) */
     epoll_evnt_cpy = epoll_evnt;  /* epoll_ctl's interface isn't const so make a copy in case they trash our input */
 
+    /*fprintf(stdout, "Adding fd(%d) to epoll_fd(%d) for priority(%d) w/ op(%s) and events(0x%x)\r\n", 
+              fd_watch->fd, lvl_info->epoll_fd, lvl, (epoll_op == EPOLL_CTL_ADD ? "EPOLL_CTL_ADD" : "EPOLL_CTL_MOD"), epoll_evnt.events);
+    */
+
     if ((ret = epoll_ctl(lvl_info->epoll_fd, epoll_op, fd_watch->fd, &epoll_evnt_cpy)) != 0) {
 
       ret = errno;
@@ -2081,9 +2129,9 @@
 
   /* go down to the watch's priority lvl removing the watch from each priority's epoll fd sets */
 
-  for (lvl = events->max_priority; lvl >= fd_watch->w.priority; --lvl) {
+  for (lvl = E_MIN_PRIORITY; lvl <= fd_watch->w.priority; ++lvl) {
 
-    lvl_info = (E_epoll_lvl_info*) events->priorities[fd_watch->w.priority].backend_lvl_info;
+    lvl_info = (E_epoll_lvl_info*) events->priorities[lvl].backend_lvl_info;
 
     /* calculate what events we should query for at this priority lvl for all the other watches on this fd */
 
@@ -2194,8 +2242,6 @@
   stdit                sit;
   int                  ret         = 0;
 
-  assert(info->num_events >= lvl_info->num_poll_fds);
-
   /* convert the timeout to epoll's input */
 
   if (delta_timeout != NULL) {
@@ -2260,15 +2306,18 @@
 
   /* process the result set */
 
-  for (evnt = info->events, evnts_end = info->events + ret; evnt != evnts_end; ++evnt) {
+  evnts_end = info->events + ret;
+  ret       = 0;
 
+  for (evnt = info->events; evnt != evnts_end; ++evnt) {
+
     /* look up the fd and process all watches on it */
 
     for (stdskl_lowerb(&events->fd_watches, &sit, &evnt->data.fd); !stdskl_is_end(&events->fd_watches, &sit); stdskl_it_next(&sit)) {
 
       stdbool generate_notice = STDFALSE;
 
-      fd_watch = (E_fd_watch*) stdskl_it_val(&sit);
+      fd_watch = *(E_fd_watch**) stdskl_it_val(&sit);
 
       if (fd_watch->fd != evnt->data.fd) {  /* we've moved onto the next fd */
         break;
@@ -2296,11 +2345,11 @@
       /* push a new notice onto the associated priority's notice queue and record where it is */
 
       if (generate_notice &&
-          stddll_insert(&events->priorities[fd_watch->w.priority].notices, 
-                        stddll_end(&events->priorities[fd_watch->w.priority].notices, &fd_watch->w.notice_it), 
-                        &fd_watch) != 0) {
-        ret = E_ALLOC_FAILURE;
-        goto FAIL;
+	  stddll_insert(&events->priorities[fd_watch->w.priority].notices, 
+			stddll_end(&events->priorities[fd_watch->w.priority].notices, &fd_watch->w.notice_it), 
+			&fd_watch) != 0) {
+	ret = E_ALLOC_FAILURE;
+	goto FAIL;
       }
 
       fd_watch->w.noticed = STDTRUE;      
@@ -2316,6 +2365,8 @@
   assert(ret != 0);
 
  END:
+  *queried_lvl = query_lvl;
+
   return ret;
 }
 #else

Added: branches/events_testing/daemon/events.old.c
===================================================================
--- branches/events_testing/daemon/events.old.c	                        (rev 0)
+++ branches/events_testing/daemon/events.old.c	2009-05-17 06:08:51 UTC (rev 414)
@@ -0,0 +1,764 @@
+/*
+ * The Spread Toolkit.
+ *     
+ * The contents of this file are subject to the Spread Open-Source
+ * License, Version 1.0 (the ``License''); you may not use
+ * this file except in compliance with the License.  You may obtain a
+ * copy of the License at:
+ *
+ * http://www.spread.org/license/
+ *
+ * or in the file ``license.txt'' found in this distribution.
+ *
+ * Software distributed under the License is distributed on an AS IS basis, 
+ * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License 
+ * for the specific language governing rights and limitations under the 
+ * License.
+ *
+ * The Creators of Spread are:
+ *  Yair Amir, Michal Miskin-Amir, Jonathan Stanton, John Schultz.
+ *
+ *  Copyright (C) 1993-2006 Spread Concepts LLC <info at spreadconcepts.com>
+ *
+ *  All Rights Reserved.
+ *
+ * Major Contributor(s):
+ * ---------------
+ *    Ryan Caudy           rcaudy at gmail.com - contributions to process groups.
+ *    Claudiu Danilov      claudiu at acm.org - scalable wide area support.
+ *    Cristina Nita-Rotaru crisn at cs.purdue.edu - group communication security.
+ *    Theo Schlossnagle    jesus at omniti.com - Perl, autoconf, old skiplist.
+ *    Dan Schoenblum       dansch at cnds.jhu.edu - Java interface.
+ *
+ */
+
+
+#include "arch.h"
+
+/* undef redefined variables under windows */
+#ifdef ARCH_PC_WIN95
+#undef EINTR
+#undef EAGAIN
+#undef EWOULDBLOCK
+#undef EINPROGRESS
+#endif
+#include <errno.h>
+
+#ifndef	ARCH_PC_WIN95
+
+#include <time.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <unistd.h>
+#else 	/* ARCH_PC_WIN95 */
+
+#include <winsock.h>
+#include <sys/timeb.h>
+
+#endif	/* ARCH_PC_WIN95 */
+
+#include <string.h>
+#include "sp_events.h"
+#include "objects.h"    /* For memory */
+#include "memory.h"     /* for memory */
+#include "alarm.h"
+
+typedef	struct dummy_t_event {
+	sp_time		t;
+	void		(* func)( int code, void *data );
+        int             code;
+        void            *data;
+	struct dummy_t_event	*next;
+} time_event;
+
+typedef struct dummy_fd_event {
+	int		fd;
+	int		fd_type;
+	void		(* func)( mailbox mbox, int code, void *data );
+	int		code;
+        void            *data;
+        int             active; /* true if active, false if inactive */
+} fd_event;
+
+typedef struct dummy_fd_queue {
+	int		num_fds;
+        int             num_active_fds;
+	fd_event	events[MAX_FD_EVENTS];
+} fd_queue;
+
+static	time_event	*Time_queue;
+static	sp_time		Now;
+
+static	fd_queue	Fd_queue[NUM_PRIORITY];
+static	fd_set		Fd_mask[NUM_FDTYPES];
+static	int		Active_priority;
+static	int		Exit_events;
+
+int 	E_init(void)
+{
+	int	i,ret;
+	
+	Time_queue = NULL;
+
+        ret = Mem_init_object(TIME_EVENT, sizeof(time_event), 100,0);
+        if (ret < 0)
+        {
+                Alarm(EXIT, "E_Init: Failure to Initialize TIME_EVENT memory objects\n");
+        }
+
+	for ( i=0; i < NUM_PRIORITY; i++ )
+        {
+		Fd_queue[i].num_fds = 0;
+                Fd_queue[i].num_active_fds = 0;
+        }
+	for ( i=0; i < NUM_FDTYPES; i++ )
+        {
+		FD_ZERO( &Fd_mask[i] );
+        }
+	Active_priority = LOW_PRIORITY;
+
+	E_get_time();
+
+	Alarm( EVENTS, "E_init: went ok\n");
+
+	return( 0 );
+}
+
+sp_time	E_get_time(void)
+{
+#ifndef	ARCH_PC_WIN95
+        struct timeval  read_time;
+
+#if HAVE_STRUCT_TIMEZONE
+        struct timezone dummy_tz;
+#else
+	sp_time		dummy_tz;
+#endif
+	int		ret;
+
+	ret = gettimeofday( &read_time, &dummy_tz );
+	if ( ret < 0 ) Alarm( EXIT, "E_get_time: gettimeofday problems.\n" );
+        Now.sec = read_time.tv_sec;
+        Now.usec = read_time.tv_usec;
+
+#else	/* ARCH_PC_WIN95 */
+
+	struct _timeb timebuffer;
+
+	_ftime( &timebuffer );
+
+	Now.sec = timebuffer.time;
+	Now.usec= timebuffer.millitm;
+	Now.usec= Now.usec * 1000;
+
+#endif	/* ARCH_PC_WIN95 */
+#if 0
+	Alarm( EVENTS, "E_get_time: time is (%d, %d)\n", Now.sec, Now.usec);
+#endif
+	return ( Now );
+}
+
+sp_time	E_sub_time( sp_time t, sp_time delta_t )
+{
+	sp_time	res;
+
+	res.sec  = t.sec  - delta_t.sec;
+	res.usec = t.usec - delta_t.usec;
+	if ( res.usec < 0 )
+	{
+		res.usec = res.usec + 1000000;
+		res.sec--;
+	} 
+	if ( res.sec < 0 ) Alarm( EVENTS, "E_sub_time: negative time result.\n");
+	return ( res );
+}
+
+sp_time	E_add_time( sp_time t, sp_time delta_t )
+{
+	sp_time	res;
+
+	res.sec  = t.sec  + delta_t.sec;
+	res.usec = t.usec + delta_t.usec;
+	if ( res.usec > 1000000 )
+	{
+		res.usec = res.usec - 1000000;
+		res.sec++;
+	}
+	return ( res );
+}
+
+int	E_compare_time( sp_time t1, sp_time t2 )
+{
+	if	( t1.sec  > t2.sec  ) return (  1 );
+	else if ( t1.sec  < t2.sec  ) return ( -1 );
+	else if ( t1.usec > t2.usec ) return (  1 );
+	else if ( t1.usec < t2.usec ) return ( -1 );
+	else			      return (  0 );
+}
+
+int 	E_queue( void (* func)( int code, void *data ), int code, void *data,
+		 sp_time delta_time )
+{
+	time_event *t_pre;
+	time_event *t_post;
+	time_event *t_e;
+	int	   inserted;
+	int	   deleted;
+	int	   compare;
+
+	t_e       = new( TIME_EVENT );
+
+	t_e->t    = E_add_time( E_get_time(), delta_time );
+	t_e->func = func;
+        t_e->code = code;
+        t_e->data = data;
+	deleted   = 0;
+	inserted  = 0;
+
+	if( Time_queue != NULL )
+	{
+		if( Time_queue->func == t_e->func && 
+                    Time_queue->data == t_e->data &&
+                    Time_queue->code == t_e->code )
+		{
+			t_pre = Time_queue;
+			Time_queue = Time_queue->next;
+			dispose( t_pre );
+			deleted = 1;
+			Alarm( EVENTS, "E_queue: dequeued a (first) simillar event\n" );
+		}
+	}
+	if( Time_queue == NULL )
+	{
+		t_e->next  = NULL;
+		Time_queue = t_e;
+		Alarm( EVENTS, "E_queue: (only) event queued func 0x%x code %d data 0x%x in future (%u:%u)\n",t_e->func,t_e->code, t_e->data, delta_time.sec, delta_time.usec );
+		return( 0 );
+	}else{
+		compare = E_compare_time ( t_e->t, Time_queue->t );
+		if( compare < 0 )
+		{
+			t_e->next   = Time_queue;
+			Time_queue  = t_e;
+			inserted    = 1;
+			Alarm( EVENTS, "E_queue: (first) event queued func 0x%x code %d data 0x%x in future (%u:%u)\n",t_e->func,t_e->code, t_e->data, delta_time.sec,delta_time.usec );
+		}
+	}
+	t_pre    = Time_queue ; 
+	t_post   = Time_queue->next;
+	while ( t_post != NULL && ( !inserted || !deleted ) )
+	{
+		if( t_post->func == t_e->func && 
+                    t_post->data == t_e->data &&
+                    t_post->code == t_e->code )
+		{
+			t_pre->next = t_post->next;
+			dispose( t_post );
+			t_post = t_pre->next;
+			deleted = 1;
+			Alarm( EVENTS, "E_queue: dequeued a simillar event\n" );
+			continue;
+		}
+
+		if ( !inserted )
+		{
+			compare = E_compare_time ( t_e->t, t_post->t );
+			if( compare < 0 )
+			{
+				t_pre->next = t_e;
+				t_e->next   = t_post;
+				inserted    = 1;
+				Alarm( EVENTS, "E_queue: event queued for func 0x%x code %d data 0x%x in future (%u:%u)\n",t_e->func,t_e->code, t_e->data, delta_time.sec, delta_time.usec );
+			}
+		}
+
+		t_pre  = t_post;
+		t_post = t_post->next;
+	}
+
+	if( !inserted )
+	{
+		t_pre->next = t_e;
+		t_e->next   = NULL;
+		Alarm( EVENTS, "E_queue: (last) event queued func 0x%x code %d data 0x%x in future (%u:%u)\n",t_e->func,t_e->code, t_e->data, delta_time.sec,delta_time.usec );
+	}
+
+	return( 0 );
+}
+
+int 	E_dequeue( void (* func)( int code, void *data ), int code,
+		   void *data )
+{
+	time_event *t_pre;
+	time_event *t_ptr;
+
+	if( Time_queue == NULL )
+	{
+		Alarm( EVENTS, "E_dequeue: no such event\n" );
+		return( -1 );
+	}
+
+	if( Time_queue->func == func && 
+            Time_queue->data == data &&
+            Time_queue->code == code )
+	{
+		t_ptr = Time_queue;
+		Time_queue = Time_queue->next;
+		dispose( t_ptr );
+		Alarm( EVENTS, "E_dequeue: first event dequeued func 0x%x code %d data 0x%x\n",func,code, data);
+		return( 0 );
+	}
+
+	t_pre = Time_queue;
+	while ( t_pre->next != NULL )
+	{
+		t_ptr = t_pre->next;
+		if( t_ptr->func == func && 
+                    t_ptr->data == data &&
+                    t_ptr->code == code )   
+		{
+			t_pre->next = t_ptr->next;
+			dispose( t_ptr );
+			Alarm( EVENTS, "E_dequeue: event dequeued func 0x%x code %d data 0x%x\n",func,code, data);
+			return( 0 );
+		}
+		t_pre = t_ptr;
+	}
+
+	Alarm( EVENTS, "E_dequeue: no such event\n" );
+	return( -1 );
+}
+
+void	E_delay( sp_time t )
+{
+	struct timeval 	tmp_t;
+
+	tmp_t.tv_sec = t.sec;
+	tmp_t.tv_usec = t.usec;
+
+#ifndef ARCH_PC_WIN95
+        if (select(0, NULL, NULL, NULL, &tmp_t ) < 0)
+        {
+                Alarm( EVENTS, "E_delay: select delay returned error: %s\n", strerror(errno));
+        }
+#else  /* ARCH_PC_WIN95 */
+        SleepEx( tmp_t.tv_sec*1000+tmp_t.tv_usec/1000, 0 );
+#endif /* ARCH_PC_WIN95 */   
+
+}
+	
+int	E_attach_fd( int fd, int fd_type,
+		     void (* func)( mailbox mbox, int code, void *data ),
+		     int code, void *data, int priority )
+{
+	int	num_fds;
+	int	j;
+
+	if( priority < 0 || priority > NUM_PRIORITY )
+	{
+		Alarm( PRINT, "E_attach_fd: invalid priority %d for fd %d with fd_type %d\n", priority, fd, fd_type );
+		return( -1 );
+	}
+	if( fd_type < 0 || fd_type > NUM_FDTYPES )
+	{
+		Alarm( PRINT, "E_attach_fd: invalid fd_type %d for fd %d with priority %d\n", fd_type, fd, priority );
+		return( -1 );
+	}
+#ifndef	ARCH_PC_WIN95
+	/* Windows bug: Reports FD_SETSIZE of 64 but select works on all
+	 * fd's even ones with numbers greater then 64.
+	 */
+        if( fd < 0 || fd > FD_SETSIZE )
+        {
+                Alarm( PRINT, "E_attach_fd: invalid fd %d (max %d) with fd_type %d with priority %d\n", fd, FD_SETSIZE, fd_type, priority );
+                return( -1 );
+        }
+#endif
+	for( j=0; j < Fd_queue[priority].num_fds; j++ )
+	{
+		if( ( Fd_queue[priority].events[j].fd == fd ) && ( Fd_queue[priority].events[j].fd_type == fd_type ) )
+		{
+			Fd_queue[priority].events[j].func = func;
+			Fd_queue[priority].events[j].code = code;
+                        Fd_queue[priority].events[j].data = data;
+                        if ( !(Fd_queue[priority].events[j].active) )
+                                Fd_queue[priority].num_active_fds++;
+                        Fd_queue[priority].events[j].active = TRUE;
+			Alarm( PRINT, 
+				"E_attach_fd: fd %d with type %d exists & replaced & activated\n", fd, fd_type );
+			return( 1 );
+		}
+	}
+	num_fds = Fd_queue[priority].num_fds;
+
+        if ( num_fds == MAX_FD_EVENTS ) {
+                Alarm( PRINT, "E_attach_fd: Reached Maximum number of events. Recompile with larger MAX_FD_EVENTS\n");
+                return( -1 );
+        }
+	Fd_queue[priority].events[num_fds].fd	   = fd;
+	Fd_queue[priority].events[num_fds].fd_type = fd_type;
+	Fd_queue[priority].events[num_fds].func	   = func;
+	Fd_queue[priority].events[num_fds].code    = code;
+        Fd_queue[priority].events[num_fds].data    = data;
+        Fd_queue[priority].events[num_fds].active  = TRUE;
+	Fd_queue[priority].num_fds++;
+        Fd_queue[priority].num_active_fds++;
+	if( Active_priority <= priority ) FD_SET( fd, &Fd_mask[fd_type] );
+
+	Alarm( EVENTS, "E_attach_fd: fd %d, fd_type %d, code %d, data 0x%x, priority %d Active_priority %d\n",
+		fd, fd_type, code, data, priority, Active_priority );
+
+	return( 0 );
+}
+
+int 	E_detach_fd( int fd, int fd_type )
+{
+	int	i,j;
+	int	found;
+
+	if( fd_type < 0 || fd_type > NUM_FDTYPES )
+	{
+		Alarm( PRINT, "E_detach_fd: invalid fd_type %d for fd %d\n", fd_type, fd );
+		return( -1 );
+	}
+
+	found = 0;
+	for( i=0; i < NUM_PRIORITY; i++ )
+	    for( j=0; j < Fd_queue[i].num_fds; j++ )
+	    {
+		if( ( Fd_queue[i].events[j].fd == fd ) && ( Fd_queue[i].events[j].fd_type == fd_type ) )
+		{
+                        if (Fd_queue[i].events[j].active)
+                                Fd_queue[i].num_active_fds--;
+			Fd_queue[i].num_fds--;
+			Fd_queue[i].events[j] = Fd_queue[i].events[Fd_queue[i].num_fds];
+
+			FD_CLR( fd, &Fd_mask[fd_type] );
+			found = 1;
+
+			break; /* from the j for only */
+		}
+	    }
+
+	if( ! found ) return( -1 );
+
+	return( 0 );
+}
+
+int     E_deactivate_fd( int fd, int fd_type )
+{
+	int	i,j;
+	int	found;
+
+	if( fd_type < 0 || fd_type > NUM_FDTYPES )
+	{
+		Alarm( PRINT, "E_deactivate_fd: invalid fd_type %d for fd %d\n", fd_type, fd );
+		return( -1 );
+	}
+
+	found = 0;
+
+	for( i=0; i < NUM_PRIORITY; i++ )
+	    for( j=0; j < Fd_queue[i].num_fds; j++ )
+	    {
+		if( ( Fd_queue[i].events[j].fd == fd ) && ( Fd_queue[i].events[j].fd_type == fd_type ) )
+		{
+                        if (Fd_queue[i].events[j].active)
+                                Fd_queue[i].num_active_fds--;
+                        Fd_queue[i].events[j].active = FALSE;
+			FD_CLR( fd, &Fd_mask[fd_type] );
+			found = 1;
+
+			break; /* from the j for only */
+		}
+	    }
+
+	if( ! found ) return( -1 );
+	return( 0 );
+}
+
+int     E_activate_fd( int fd, int fd_type )
+{
+	int	i,j;
+	int	found;
+
+	if( fd_type < 0 || fd_type > NUM_FDTYPES )
+	{
+		Alarm( PRINT, "E_activate_fd: invalid fd_type %d for fd %d\n", fd_type, fd );
+		return( -1 );
+	}
+
+	found = 0;
+
+	for( i=0; i < NUM_PRIORITY; i++ )
+	    for( j=0; j < Fd_queue[i].num_fds; j++ )
+	    {
+		if( ( Fd_queue[i].events[j].fd == fd ) && ( Fd_queue[i].events[j].fd_type == fd_type ) )
+		{
+                        if ( !(Fd_queue[i].events[j].active) )
+                                Fd_queue[i].num_active_fds++;
+                        Fd_queue[i].events[j].active = TRUE;
+			if( i >= Active_priority ) FD_SET( fd, &Fd_mask[ fd_type ] );
+			found = 1;
+
+			break; /* from the j for only */
+		}
+	    }
+
+	if( ! found ) return( -1 );
+	return( 0 );
+}
+
+int 	E_set_active_threshold( int priority )
+{
+	int	fd_type;
+	int	i,j;
+
+	if( priority < 0 || priority > NUM_PRIORITY )
+	{
+		Alarm( PRINT, "E_set_active_threshold: invalid priority %d\n", priority );
+		return( -1 );
+	}
+
+	if( priority == Active_priority ) return( priority );
+
+	Active_priority = priority;
+	for ( i=0; i < NUM_FDTYPES; i++ )
+        {
+		FD_ZERO( &Fd_mask[i] );
+        }
+
+	for( i = priority; i < NUM_PRIORITY; i++ )
+	    for( j=0; j < Fd_queue[i].num_fds; j++ )
+	    {
+		fd_type = Fd_queue[i].events[j].fd_type;
+                if (Fd_queue[i].events[j].active)
+                	FD_SET( Fd_queue[i].events[j].fd, &Fd_mask[fd_type] );
+	    }
+
+	Alarm( EVENTS, "E_set_active_threshold: changed to %d\n",Active_priority);
+
+	return( priority );
+}
+
+int	E_num_active( int priority )
+{
+	if( priority < 0 || priority > NUM_PRIORITY )
+	{
+		Alarm( PRINT, "E_num_active: invalid priority %d\n", priority );
+		return( -1 );
+	}
+	return( Fd_queue[priority].num_active_fds );
+}
+
+int 	E_handle_events(void)
+{
+static	int			Round_robin	= 0;
+static	const sp_time		long_timeout 	= { 10000,    0};
+static  const sp_time           zero_sec        = {     0,    0};
+#ifdef  BADCLOCK
+static	const sp_time		mili_sec 	= {     0, 1000};
+	int			clock_sync;
+#endif
+	int			num_set;
+	int			treated;
+	int			fd;
+	int			fd_type;
+	int			i,j;
+	sp_time			timeout;
+        struct timeval          sel_timeout, wait_timeout;
+	fd_set			current_mask[NUM_FDTYPES];
+	time_event		*temp_ptr;
+        int                     first=1;
+#ifdef TESTTIME
+        sp_time         	tmp_late,start,stop,req_time;       /* DEBUGGING */
+#endif
+#ifdef BADCLOCK
+    clock_sync = 0;
+#endif
+    for( Exit_events = 0 ; !Exit_events ; )
+    {
+	Alarm( EVENTS, "E_handle_events: next event \n");
+
+	/* Handle time events */
+	timeout = long_timeout;
+#ifdef TESTTIME
+        start = E_get_time();
+#endif
+	while( Time_queue != NULL )
+	{
+#ifdef BADCLOCK
+		if ( clock_sync >= 0 )
+		{
+		    E_get_time();
+		    clock_sync = -20;
+		}
+#else
+                E_get_time();
+#endif
+		if ( !first && E_compare_time( Now, Time_queue->t ) >= 0 )
+		{
+#ifdef TESTTIME
+                        tmp_late = E_sub_time( Now, Time_queue->t );
+#endif
+			temp_ptr = Time_queue;
+			Time_queue = Time_queue->next;
+			Alarm( EVENTS, "E_handle_events: exec time event \n");
+#ifdef TESTTIME 
+                        Alarm( DEBUG, "Events: TimeEv is %d %d late\n",tmp_late.sec, tmp_late.usec); 
+#endif
+			temp_ptr->func( temp_ptr->code, temp_ptr->data );
+			dispose( temp_ptr );
+#ifdef BADCLOCK
+			Now = E_add_time( Now, mili_sec );
+			clock_sync++;
+#else
+                        E_get_time();
+#endif
+                        if (Exit_events) goto end_handler;
+		}else{
+			timeout = E_sub_time( Time_queue->t, Now );
+			break;
+		}
+	}
+        if (timeout.sec < 0 )
+                timeout.sec = timeout.usec = 0; /* this can happen until first is unset */
+#ifdef TESTTIME
+        stop = E_get_time();
+        tmp_late = E_sub_time(stop, start);
+        Alarm(DEBUG, "Events: TimeEv's took %d %d to handle\n", tmp_late.sec, tmp_late.usec); 
+#endif
+	/* Handle fd events   */
+	for( i=0; i < NUM_FDTYPES; i++ )
+	{
+		current_mask[i] = Fd_mask[i];
+	}
+	Alarm( EVENTS, "E_handle_events: poll select\n");
+#ifdef TESTTIME
+        req_time = zero_sec;
+#endif
+        wait_timeout.tv_sec = zero_sec.sec;
+        wait_timeout.tv_usec = zero_sec.usec;
+	num_set = select( FD_SETSIZE, &current_mask[READ_FD], &current_mask[WRITE_FD], &current_mask[EXCEPT_FD], 
+			  &wait_timeout );
+	if (num_set == 0 && !Exit_events)
+	{
+#ifdef BADCLOCK
+		clock_sync = 0;
+#endif
+		for( i=0; i < NUM_FDTYPES; i++ )
+		{
+			current_mask[i] = Fd_mask[i];
+		}
+		Alarm( EVENTS, "E_handle_events: select with timeout (%d, %d)\n",
+			timeout.sec,timeout.usec );
+#ifdef TESTTIME
+                req_time = E_add_time(req_time, timeout);
+#endif
+                sel_timeout.tv_sec = timeout.sec;
+                sel_timeout.tv_usec = timeout.usec;
+		num_set = select( FD_SETSIZE, &current_mask[READ_FD], &current_mask[WRITE_FD], 
+				  &current_mask[EXCEPT_FD], &sel_timeout );
+	}
+#ifdef TESTTIME
+        start = E_get_time();
+        tmp_late = E_sub_time(start, stop);
+        Alarm( DEBUG, "Events: Waiting for fd or timout took %d %d asked for %d %d\n", tmp_late.sec, tmp_late.usec, req_time.sec, req_time.usec);
+#endif
+	/* Handle all high and medium priority fd events */
+	for( i=NUM_PRIORITY-1,treated=0; 
+	     i > LOW_PRIORITY && num_set > 0 && !treated;
+	     i-- )
+	{
+	    for( j=0; j < Fd_queue[i].num_fds && num_set > 0; j++ )
+	    {
+		fd      = Fd_queue[i].events[j].fd;
+		fd_type = Fd_queue[i].events[j].fd_type;
+		if( FD_ISSET( fd, &current_mask[fd_type] ) )
+		{
+		    Alarm( EVENTS, "E_handle_events: exec handler for fd %d, fd_type %d, priority %d\n", 
+					fd, fd_type, i );
+		    Fd_queue[i].events[j].func( 
+				Fd_queue[i].events[j].fd,
+				Fd_queue[i].events[j].code,
+				Fd_queue[i].events[j].data );
+		    treated = 1;
+		    num_set--;
+#ifdef BADCLOCK
+		    Now = E_add_time( Now, mili_sec );
+		    clock_sync++;
+#else
+                    E_get_time();
+#endif
+                    if (Exit_events) goto end_handler;
+		}
+	    }
+	}
+        /* Don't handle timed events until all non-low-priority fd events have been handled 
+         * FIXME: This may or may not be right. If continual high priority events occur, then
+         * timed events will starve, I'm not sure if that is better then what we have. We 
+         * could also set first=0 no matter what after trying the high events once, then
+         * they will get a shot first, but after that timed events will also be handled.
+         */
+        if (!treated)
+                first = 0;
+
+#ifdef TESTTIME
+        stop = E_get_time();
+        tmp_late = E_sub_time(stop, start);
+        Alarm(DEBUG, "Events: High & Med took %d %d time to handle\n", tmp_late.sec, tmp_late.usec);
+#endif
+	/* Handle one low priority fd event. 
+           However, verify that Active_priority still allows LOW_PRIORITY events. 
+           Active_priority can change because of calls to E_set_threshold() during the current select loop.
+        */
+	for( i=0; i < Fd_queue[LOW_PRIORITY].num_fds 
+                     && num_set > 0
+                     && Active_priority == LOW_PRIORITY; 
+             i++ )
+	{
+	    j = ( i + Round_robin ) % Fd_queue[LOW_PRIORITY].num_fds;
+	    fd      = Fd_queue[LOW_PRIORITY].events[j].fd;
+	    fd_type = Fd_queue[LOW_PRIORITY].events[j].fd_type;
+	    if( FD_ISSET( fd, &current_mask[fd_type] ) )
+	    {
+		Round_robin = ( j + 1 ) % Fd_queue[LOW_PRIORITY].num_fds;
+
+		Alarm( EVENTS , "E_handle_events: exec ext fd event \n");
+	 	Fd_queue[LOW_PRIORITY].events[j].func( 
+				Fd_queue[LOW_PRIORITY].events[j].fd,
+				Fd_queue[LOW_PRIORITY].events[j].code,
+				Fd_queue[LOW_PRIORITY].events[j].data );
+		num_set--;
+#ifdef BADCLOCK
+		Now = E_add_time( Now, mili_sec );
+		clock_sync++;
+#else
+                E_get_time();
+#endif
+                if (Exit_events) goto end_handler;
+		break;
+	    }
+	}	
+#ifdef TESTTIME
+        start = E_get_time();
+        tmp_late = E_sub_time(start, stop);
+        Alarm(DEBUG, "Events: Low priority took %d %d to handle\n", tmp_late.sec, tmp_late.usec);
+#endif
+    }
+ end_handler:
+    /* Clean up data structures for exit OR restart of handler loop */
+    /* Actually nothing needs to be cleaned up to allow E_handle_events()
+     * to be called again. The events are still registered (or not registered)
+     * and the only state for the actual events loop is Exit_events which is reset
+     * in the for loop.
+     */
+
+    return 0;
+}
+
+void 	E_exit_events(void)
+{
+	Alarm( EVENTS, "E_exit_events:\n");
+	Exit_events = 1;
+}

Added: branches/events_testing/daemon/microbench_events_pipes.c
===================================================================
--- branches/events_testing/daemon/microbench_events_pipes.c	                        (rev 0)
+++ branches/events_testing/daemon/microbench_events_pipes.c	2009-05-17 06:08:51 UTC (rev 414)
@@ -0,0 +1,222 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <unistd.h>
+#include <assert.h>
+
+#include <sys/ioctl.h>
+
+#include <stdutil/stdskl.h>
+#include <sp_events.h>
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
+int     Num_Pipes     = 1000;
+int     Num_Ops       = 100000;
+int     Num_Pending   = 1;
+int     Seed          = 0;
+
+int     Num_Ops_Done  = 0;
+int     Num_Ops_Cmplt = 0;
+int     (*Pipes)[2]   = NULL;
+
+stdskl  Active_Fds;         /* <int -> int>: fd -> num_bytes */
+
+sp_time T1;
+sp_time T2;
+sp_time T3;
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
+int get_rand_pipe(void)
+{
+  return (int) (Num_Pipes * (rand() / (RAND_MAX + 1.0)));
+}
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
+void write_pipe(int ping_pipe)
+{
+  int   num_bytes;
+  stdit sit;
+
+  assert(Num_Ops_Done < Num_Ops);
+
+  /* track how many ops/bytes are outstanding on the read side of this pipe */
+
+  if (stdskl_is_end(&Active_Fds, stdskl_lowerb(&Active_Fds, &sit, &Pipes[ping_pipe][0])) ||
+      Pipes[ping_pipe][0] != *(int*) stdskl_it_key(&sit)) {
+
+    num_bytes = 1;
+
+    if (stdskl_insert(&Active_Fds, &sit, &Pipes[ping_pipe][0], &num_bytes, STDTRUE) != 0) {
+      perror("stdskl_put failed"), abort();
+    }
+
+  } else {
+    num_bytes = ++*(int*) stdskl_it_val(&sit);  /* increment num_bytes */
+    assert(num_bytes > 1);
+  }
+
+  /* write a byte to the pipe */
+
+  if (write(Pipes[ping_pipe][1], &ping_pipe, 1) != 1) {
+    perror("write() failed"), abort();
+  }
+
+  ++Num_Ops_Done;
+}
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
+void read_pipe(int fd, int code, void * data)
+{
+  stdit sit;
+  int   num_bytes;
+  char  c;
+
+  /* look up the fd in Active_Fds */
+
+  if (stdskl_is_end(&Active_Fds, stdskl_find(&Active_Fds, &sit, &fd))) {
+    fprintf(stderr, "Got activity on an inative fd(%d)?!\r\n", fd), abort();
+  }
+
+  /* read from the fd */
+
+  if (read(fd, &c, 1) != 1) {
+    perror("read() failed"), abort();
+  }
+
+  /* update the num ops/bytes outstanding on the fd */
+
+  num_bytes = --*(int*) stdskl_it_val(&sit);
+  assert(num_bytes >= 0);
+
+  if (num_bytes == 0) {
+    stdskl_erase(&Active_Fds, &sit);
+  }
+
+  /* generate a replacement event for this one */
+
+  assert(Num_Ops_Cmplt < Num_Ops_Done && Num_Ops_Done <= Num_Ops);
+
+  if (Num_Ops_Done != Num_Ops) {
+    write_pipe(get_rand_pipe());
+  }
+
+  ++Num_Ops_Cmplt;
+
+  if ((Num_Ops_Cmplt & 0xffff) == 0) {
+    printf("read_pipe: completed %d operations!\r\n", Num_Ops_Cmplt);
+  }
+
+  if (Num_Ops_Cmplt == Num_Ops) {
+    E_exit_events();
+  }
+}
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
+void usage(int argc, char ** argv)
+{
+  switch (argc) {
+  case 5:
+    Seed = atoi(argv[4]);
+    
+  case 4:
+    Num_Pending = atoi(argv[3]);
+
+  case 3:
+    Num_Ops = atoi(argv[2]);
+
+  case 2:
+    Num_Pipes = atoi(argv[1]);
+
+  case 1:
+    break;
+
+  default:
+    fprintf(stderr, "Usage: %s [Num_Pipes [Num_Ops [Seed] ] ]\r\n", argv[0]);
+    exit(-1);
+  }
+
+  if (Num_Pipes <= 0 || Num_Ops <= 0 || Num_Pending <= 0) {
+    fprintf(stderr, "Usage: Num_Pipes, Num_Ops and Num_Pending must be positive integers!\r\n"), exit(-1);
+  }
+
+  if ((Pipes = (int(*)[2]) malloc(2 * sizeof(int) * Num_Pipes)) == NULL) {
+    perror("malloc() failed"), abort();
+  }
+
+  if (stdskl_construct(&Active_Fds, sizeof(int), sizeof(int), NULL) != 0) {
+    perror("stdskl_construct failed"), abort();
+  }
+
+  srand(Seed);
+}
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
+int main(int argc, char ** argv) 
+{
+  int ret;
+  int cmd;
+  int i;
+
+  E_init();
+
+  usage(argc, argv);
+
+  /* set up pipes */
+
+  for (i = 0; i != Num_Pipes; ++i) {
+
+    if (pipe(Pipes[i])) {
+      perror("pipe() failed"), abort();
+    }
+
+    cmd = 1;
+
+    if ((ret = ioctl(Pipes[i][0], FIONBIO, &cmd)) != 0) {
+      perror("ioctl failed"), abort();
+    }
+
+    cmd = 1;
+
+    if ((ret = ioctl(Pipes[i][1], FIONBIO, &cmd)) != 0) {
+      perror("ioctl failed"), abort();
+    }
+
+    E_attach_fd(Pipes[i][0], READ_FD, read_pipe, 0, NULL, HIGH_PRIORITY);
+  }
+
+  /* seed the system */
+
+  for (i = 0; i != Num_Pending && i != Num_Ops; ++i) {
+    write_pipe(get_rand_pipe());
+  }
+
+  /* loop handling events */
+
+  T1 = E_get_time();
+
+  ret = E_handle_events();
+
+  T2 = E_get_time();
+  T3 = E_sub_time(T2, T1);
+
+  fprintf(stdout, "E_handle_events returned %d size of Active_Fds is %lu!\r\n", ret, (stdulong) stdskl_size(&Active_Fds));
+  fprintf(stdout, "Took %ld.%06ld seconds for an average rate of %f ops/s!\r\n", 
+	  T3.sec, T3.usec, Num_Ops / (T3.sec + T3.usec / 1000000.0));
+
+  return 0;
+}
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/

Modified: branches/events_testing/examples/Makefile.in
===================================================================
--- branches/events_testing/examples/Makefile.in	2009-05-16 01:38:32 UTC (rev 413)
+++ branches/events_testing/examples/Makefile.in	2009-05-17 06:08:51 UTC (rev 414)
@@ -31,7 +31,7 @@
 CFLAGS=@CFLAGS@
 CPPFLAGS=-I. -I$(srcdir) -I$(top_srcdir)/include @CPPFLAGS@ $(PATHS) @DEFS@
 LDFLAGS=@LDFLAGS@
-LIBS=@LIBS@
+LIBS=../stdutil/lib/libstdutil.a @LIBS@
 THLDFLAGS=@THLDFLAGS@
 THLIBS=@THLIBS@
 AR=@AR@




More information about the Spread-cvs mailing list