[Spread-cvs] commit: r414 - in branches/events_testing: daemon examples
jschultz at spread.org
jschultz at spread.org
Sun May 17 02:08:51 EDT 2009
Author: jschultz
Date: 2009-05-17 02:08:51 -0400 (Sun, 17 May 2009)
New Revision: 414
Added:
branches/events_testing/daemon/events.old.c
branches/events_testing/daemon/microbench_events_pipes.c
Modified:
branches/events_testing/daemon/Makefile.in
branches/events_testing/daemon/events.c
branches/events_testing/examples/Makefile.in
Log:
Adding old version of events back as events.old.c. Adding new test/benchmark program for events. Significant
bug fixes for epoll version of events. Seems to be working now.
Modified: branches/events_testing/daemon/Makefile.in
===================================================================
--- branches/events_testing/daemon/Makefile.in 2009-05-16 01:38:32 UTC (rev 413)
+++ branches/events_testing/daemon/Makefile.in 2009-05-17 06:08:51 UTC (rev 414)
@@ -31,7 +31,7 @@
CFLAGS=@CFLAGS@
CPPFLAGS=-I. -I$(srcdir) -I$(top_srcdir)/include -I../stdutil/src -I$(top_srcdir)/stdutil/src @CPPFLAGS@ $(PATHS) @DEFS@
LDFLAGS=@LDFLAGS@
-LIBS=@LIBS@
+LIBS=../stdutil/lib/libstdutil.a @LIBS@
THLDFLAGS=@THLDFLAGS@
THLIBS=@THLIBS@
LEX=@LEX@
@@ -73,8 +73,8 @@
#y.tab.c: config_parse.y
# $(YACC) -d config_parse.y
-spread$(EXEEXT): $(SPREADOBJS) ../stdutil/lib/libstdutil-threaded-release.a
- $(LD) -o $@ $(SPREADOBJS) ../stdutil/lib/libstdutil-threaded-release.a $(LDFLAGS) $(LIBS)
+spread$(EXEEXT): $(SPREADOBJS)
+ $(LD) -o $@ $(SPREADOBJS) $(LDFLAGS) $(LIBS)
spmonitor$(EXEEXT): $(MONITOR_OBJS)
$(LD) -o $@ $(MONITOR_OBJS) $(LDFLAGS) $(LIBS)
@@ -90,6 +90,12 @@
sprecv$(EXEEXT): r.o alarm.o data_link.o
$(LD) -o $@ r.o alarm.o data_link.o $(LDFLAGS) $(LIBS)
+testevents$(EXEEXT): microbench_events_pipes.o events.o
+ $(LD) -o $@ microbench_events_pipes.o events.o $(LDFLAGS) $(LIBS)
+
+testevents_old$(EXEEXT): microbench_events_pipes.o events.old.o memory.o alarm.o
+ $(LD) -o $@ microbench_events_pipes.o events.old.o memory.o alarm.o $(LDFLAGS) $(LIBS)
+
clean:
rm -f *.lo *.tlo *.to *.o *.a *.dylib $(TARGETS) spsimple_user
rm -f config.cache config.log docs/*.out core
Modified: branches/events_testing/daemon/events.c
===================================================================
--- branches/events_testing/daemon/events.c 2009-05-16 01:38:32 UTC (rev 413)
+++ branches/events_testing/daemon/events.c 2009-05-17 06:08:51 UTC (rev 414)
@@ -1,6 +1,6 @@
#include "arch.h"
-/*#define HAVE_LINUX*/
+#define HAVE_LINUX
#include <stdlib.h>
#include <stdio.h>
@@ -340,6 +340,7 @@
{
E_priority_info * info;
E_priority priority;
+ E_backend_type backend_type;
int ret = 0;
memset(events, 0, sizeof(*events));
@@ -387,17 +388,37 @@
}
events->backend_type = E_BACKEND_UNSPECIFIED;
-
+
#ifdef HAVE_LINUX
- ret = E_events_epoll_init(events);
+ backend_type = E_BACKEND_EPOLL;
#else
- ret = E_events_select_init(events);
+ backend_type = E_BACKEND_SELECT;
#endif
+ switch (backend_type) {
+
+ case E_BACKEND_SELECT:
+ ret = E_events_select_init(events);
+ break;
+
+ case E_BACKEND_EPOLL:
+ ret = E_events_epoll_init(events);
+ break;
+
+ default:
+ assert(0);
+ ret = E_BUG;
+ break;
+ }
+
if (ret != 0) {
goto FAIL_TIMEOUTS;
}
+ if (!E_events_is_valid(events)) {
+ assert(0);
+ }
+
assert(ret == 0);
goto END;
@@ -434,12 +455,21 @@
E_priority priority;
stdit sit;
-#ifdef HAVE_LINUX
- E_events_epoll_fini(events);
-#else
- E_events_select_fini(events);
-#endif
+ switch (events->backend_type) {
+ case E_BACKEND_SELECT:
+ E_events_select_fini(events);
+ break;
+
+ case E_BACKEND_EPOLL:
+ E_events_epoll_fini(events);
+ break;
+
+ default:
+ assert(0);
+ break;
+ }
+
stdskl_destruct(&events->timeouts);
for (priority = E_MIN_PRIORITY; priority <= events->max_priority; ++priority) {
@@ -465,6 +495,14 @@
/***************************************************************************************************************************
***************************************************************************************************************************/
+static stdbool E_events_is_valid(const E_events * events)
+{
+ return STDTRUE;
+}
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
static void E_events_exit_events(E_events * events)
{
events->keep_running = STDFALSE;
@@ -1437,7 +1475,7 @@
goto FAIL;
}
- for (priority = E_MIN_PRIORITY; priority != events->max_priority; ++priority) {
+ for (priority = E_MIN_PRIORITY; priority <= events->max_priority; ++priority) {
if ((lvl_info = (E_select_lvl_info*) calloc(1, sizeof(E_select_lvl_info))) == NULL) {
ret = E_ALLOC_FAILURE;
@@ -1508,7 +1546,7 @@
assert(events->backend_type == E_BACKEND_SELECT && events->backend_info != NULL);
- for (priority = E_MIN_PRIORITY; priority != events->max_priority; ++priority) {
+ for (priority = E_MIN_PRIORITY; priority <= events->max_priority; ++priority) {
lvl_info = (E_select_lvl_info*) events->priorities[priority].backend_lvl_info;
assert(lvl_info != NULL);
@@ -1584,7 +1622,9 @@
{
E_priority lvl;
E_select_lvl_info * lvl_info;
+#ifndef NDEBUG
stdit sit;
+#endif
assert(fd_watch == E_events_get_fd_watch(events, fd_watch->fd, fd_watch->w.watch_type, &sit));
assert(fd_watch->w.priority >= E_MIN_PRIORITY && fd_watch->w.priority <= events->max_priority);
@@ -1740,6 +1780,8 @@
assert(ret != 0);
END:
+ *queried_lvl = query_lvl;
+
return ret;
}
@@ -1757,6 +1799,8 @@
assert(events->backend_type == E_BACKEND_UNSPECIFIED);
assert(E_EPOLL_INITIAL_SIZE > 0);
+ /*fprintf(stdout, "EPOLLIN = 0x%x; EPOLLOUT = 0x%x; EPOLLPRI = 0x%x\r\n", EPOLLIN, EPOLLOUT, EPOLLPRI);*/
+
if ((info = (E_epoll_info*) calloc(1, sizeof(E_epoll_info))) == NULL) {
ret = E_ALLOC_FAILURE;
goto FAIL;
@@ -1769,7 +1813,7 @@
info->num_events = E_EPOLL_INITIAL_SIZE;
- for (priority = E_MIN_PRIORITY; priority != events->max_priority; ++priority) {
+ for (priority = E_MIN_PRIORITY; priority <= events->max_priority; ++priority) {
if ((lvl_info = (E_epoll_lvl_info*) calloc(1, sizeof(E_epoll_lvl_info))) == NULL) {
ret = E_ALLOC_FAILURE;
@@ -1860,7 +1904,7 @@
assert(events->backend_type == E_BACKEND_EPOLL && events->backend_info != NULL);
- for (priority = E_MIN_PRIORITY; priority != events->max_priority; ++priority) {
+ for (priority = E_MIN_PRIORITY; priority <= events->max_priority; ++priority) {
lvl_info = (E_epoll_lvl_info*) events->priorities[priority].backend_lvl_info;
assert(lvl_info != NULL);
@@ -1911,9 +1955,9 @@
/* go down to the watch's priority lvl adding the new watch to each priority's epoll fd sets */
- for (lvl = events->max_priority; lvl >= fd_watch->w.priority; --lvl) {
+ for (lvl = E_MIN_PRIORITY; lvl <= fd_watch->w.priority; ++lvl) {
- lvl_info = (E_epoll_lvl_info*) events->priorities[fd_watch->w.priority].backend_lvl_info;
+ lvl_info = (E_epoll_lvl_info*) events->priorities[lvl].backend_lvl_info;
/* calculate what events we should query for at this priority lvl for all the other watches on this fd */
@@ -1989,6 +2033,10 @@
RETRY: /* allow a single retry w/ a different epoll_op in the case of ENOENT (see below) */
epoll_evnt_cpy = epoll_evnt; /* epoll_ctl's interface isn't const so make a copy in case they trash our input */
+ /*fprintf(stdout, "Adding fd(%d) to epoll_fd(%d) for priority(%d) w/ op(%s) and events(0x%x)\r\n",
+ fd_watch->fd, lvl_info->epoll_fd, lvl, (epoll_op == EPOLL_CTL_ADD ? "EPOLL_CTL_ADD" : "EPOLL_CTL_MOD"), epoll_evnt.events);
+ */
+
if ((ret = epoll_ctl(lvl_info->epoll_fd, epoll_op, fd_watch->fd, &epoll_evnt_cpy)) != 0) {
ret = errno;
@@ -2081,9 +2129,9 @@
/* go down to the watch's priority lvl removing the watch from each priority's epoll fd sets */
- for (lvl = events->max_priority; lvl >= fd_watch->w.priority; --lvl) {
+ for (lvl = E_MIN_PRIORITY; lvl <= fd_watch->w.priority; ++lvl) {
- lvl_info = (E_epoll_lvl_info*) events->priorities[fd_watch->w.priority].backend_lvl_info;
+ lvl_info = (E_epoll_lvl_info*) events->priorities[lvl].backend_lvl_info;
/* calculate what events we should query for at this priority lvl for all the other watches on this fd */
@@ -2194,8 +2242,6 @@
stdit sit;
int ret = 0;
- assert(info->num_events >= lvl_info->num_poll_fds);
-
/* convert the timeout to epoll's input */
if (delta_timeout != NULL) {
@@ -2260,15 +2306,18 @@
/* process the result set */
- for (evnt = info->events, evnts_end = info->events + ret; evnt != evnts_end; ++evnt) {
+ evnts_end = info->events + ret;
+ ret = 0;
+ for (evnt = info->events; evnt != evnts_end; ++evnt) {
+
/* look up the fd and process all watches on it */
for (stdskl_lowerb(&events->fd_watches, &sit, &evnt->data.fd); !stdskl_is_end(&events->fd_watches, &sit); stdskl_it_next(&sit)) {
stdbool generate_notice = STDFALSE;
- fd_watch = (E_fd_watch*) stdskl_it_val(&sit);
+ fd_watch = *(E_fd_watch**) stdskl_it_val(&sit);
if (fd_watch->fd != evnt->data.fd) { /* we've moved onto the next fd */
break;
@@ -2296,11 +2345,11 @@
/* push a new notice onto the associated priority's notice queue and record where it is */
if (generate_notice &&
- stddll_insert(&events->priorities[fd_watch->w.priority].notices,
- stddll_end(&events->priorities[fd_watch->w.priority].notices, &fd_watch->w.notice_it),
- &fd_watch) != 0) {
- ret = E_ALLOC_FAILURE;
- goto FAIL;
+ stddll_insert(&events->priorities[fd_watch->w.priority].notices,
+ stddll_end(&events->priorities[fd_watch->w.priority].notices, &fd_watch->w.notice_it),
+ &fd_watch) != 0) {
+ ret = E_ALLOC_FAILURE;
+ goto FAIL;
}
fd_watch->w.noticed = STDTRUE;
@@ -2316,6 +2365,8 @@
assert(ret != 0);
END:
+ *queried_lvl = query_lvl;
+
return ret;
}
#else
Added: branches/events_testing/daemon/events.old.c
===================================================================
--- branches/events_testing/daemon/events.old.c (rev 0)
+++ branches/events_testing/daemon/events.old.c 2009-05-17 06:08:51 UTC (rev 414)
@@ -0,0 +1,764 @@
+/*
+ * The Spread Toolkit.
+ *
+ * The contents of this file are subject to the Spread Open-Source
+ * License, Version 1.0 (the ``License''); you may not use
+ * this file except in compliance with the License. You may obtain a
+ * copy of the License at:
+ *
+ * http://www.spread.org/license/
+ *
+ * or in the file ``license.txt'' found in this distribution.
+ *
+ * Software distributed under the License is distributed on an AS IS basis,
+ * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+ * for the specific language governing rights and limitations under the
+ * License.
+ *
+ * The Creators of Spread are:
+ * Yair Amir, Michal Miskin-Amir, Jonathan Stanton, John Schultz.
+ *
+ * Copyright (C) 1993-2006 Spread Concepts LLC <info at spreadconcepts.com>
+ *
+ * All Rights Reserved.
+ *
+ * Major Contributor(s):
+ * ---------------
+ * Ryan Caudy rcaudy at gmail.com - contributions to process groups.
+ * Claudiu Danilov claudiu at acm.org - scalable wide area support.
+ * Cristina Nita-Rotaru crisn at cs.purdue.edu - group communication security.
+ * Theo Schlossnagle jesus at omniti.com - Perl, autoconf, old skiplist.
+ * Dan Schoenblum dansch at cnds.jhu.edu - Java interface.
+ *
+ */
+
+
+#include "arch.h"
+
+/* undef redefined variables under windows */
+#ifdef ARCH_PC_WIN95
+#undef EINTR
+#undef EAGAIN
+#undef EWOULDBLOCK
+#undef EINPROGRESS
+#endif
+#include <errno.h>
+
+#ifndef ARCH_PC_WIN95
+
+#include <time.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <unistd.h>
+#else /* ARCH_PC_WIN95 */
+
+#include <winsock.h>
+#include <sys/timeb.h>
+
+#endif /* ARCH_PC_WIN95 */
+
+#include <string.h>
+#include "sp_events.h"
+#include "objects.h" /* For memory */
+#include "memory.h" /* for memory */
+#include "alarm.h"
+
+typedef struct dummy_t_event {
+ sp_time t;
+ void (* func)( int code, void *data );
+ int code;
+ void *data;
+ struct dummy_t_event *next;
+} time_event;
+
+typedef struct dummy_fd_event {
+ int fd;
+ int fd_type;
+ void (* func)( mailbox mbox, int code, void *data );
+ int code;
+ void *data;
+ int active; /* true if active, false if inactive */
+} fd_event;
+
+typedef struct dummy_fd_queue {
+ int num_fds;
+ int num_active_fds;
+ fd_event events[MAX_FD_EVENTS];
+} fd_queue;
+
+static time_event *Time_queue;
+static sp_time Now;
+
+static fd_queue Fd_queue[NUM_PRIORITY];
+static fd_set Fd_mask[NUM_FDTYPES];
+static int Active_priority;
+static int Exit_events;
+
+int E_init(void)
+{
+ int i,ret;
+
+ Time_queue = NULL;
+
+ ret = Mem_init_object(TIME_EVENT, sizeof(time_event), 100,0);
+ if (ret < 0)
+ {
+ Alarm(EXIT, "E_Init: Failure to Initialize TIME_EVENT memory objects\n");
+ }
+
+ for ( i=0; i < NUM_PRIORITY; i++ )
+ {
+ Fd_queue[i].num_fds = 0;
+ Fd_queue[i].num_active_fds = 0;
+ }
+ for ( i=0; i < NUM_FDTYPES; i++ )
+ {
+ FD_ZERO( &Fd_mask[i] );
+ }
+ Active_priority = LOW_PRIORITY;
+
+ E_get_time();
+
+ Alarm( EVENTS, "E_init: went ok\n");
+
+ return( 0 );
+}
+
+sp_time E_get_time(void)
+{
+#ifndef ARCH_PC_WIN95
+ struct timeval read_time;
+
+#if HAVE_STRUCT_TIMEZONE
+ struct timezone dummy_tz;
+#else
+ sp_time dummy_tz;
+#endif
+ int ret;
+
+ ret = gettimeofday( &read_time, &dummy_tz );
+ if ( ret < 0 ) Alarm( EXIT, "E_get_time: gettimeofday problems.\n" );
+ Now.sec = read_time.tv_sec;
+ Now.usec = read_time.tv_usec;
+
+#else /* ARCH_PC_WIN95 */
+
+ struct _timeb timebuffer;
+
+ _ftime( &timebuffer );
+
+ Now.sec = timebuffer.time;
+ Now.usec= timebuffer.millitm;
+ Now.usec= Now.usec * 1000;
+
+#endif /* ARCH_PC_WIN95 */
+#if 0
+ Alarm( EVENTS, "E_get_time: time is (%d, %d)\n", Now.sec, Now.usec);
+#endif
+ return ( Now );
+}
+
+sp_time E_sub_time( sp_time t, sp_time delta_t )
+{
+ sp_time res;
+
+ res.sec = t.sec - delta_t.sec;
+ res.usec = t.usec - delta_t.usec;
+ if ( res.usec < 0 )
+ {
+ res.usec = res.usec + 1000000;
+ res.sec--;
+ }
+ if ( res.sec < 0 ) Alarm( EVENTS, "E_sub_time: negative time result.\n");
+ return ( res );
+}
+
+sp_time E_add_time( sp_time t, sp_time delta_t )
+{
+ sp_time res;
+
+ res.sec = t.sec + delta_t.sec;
+ res.usec = t.usec + delta_t.usec;
+ if ( res.usec > 1000000 )
+ {
+ res.usec = res.usec - 1000000;
+ res.sec++;
+ }
+ return ( res );
+}
+
+int E_compare_time( sp_time t1, sp_time t2 )
+{
+ if ( t1.sec > t2.sec ) return ( 1 );
+ else if ( t1.sec < t2.sec ) return ( -1 );
+ else if ( t1.usec > t2.usec ) return ( 1 );
+ else if ( t1.usec < t2.usec ) return ( -1 );
+ else return ( 0 );
+}
+
+int E_queue( void (* func)( int code, void *data ), int code, void *data,
+ sp_time delta_time )
+{
+ time_event *t_pre;
+ time_event *t_post;
+ time_event *t_e;
+ int inserted;
+ int deleted;
+ int compare;
+
+ t_e = new( TIME_EVENT );
+
+ t_e->t = E_add_time( E_get_time(), delta_time );
+ t_e->func = func;
+ t_e->code = code;
+ t_e->data = data;
+ deleted = 0;
+ inserted = 0;
+
+ if( Time_queue != NULL )
+ {
+ if( Time_queue->func == t_e->func &&
+ Time_queue->data == t_e->data &&
+ Time_queue->code == t_e->code )
+ {
+ t_pre = Time_queue;
+ Time_queue = Time_queue->next;
+ dispose( t_pre );
+ deleted = 1;
+ Alarm( EVENTS, "E_queue: dequeued a (first) simillar event\n" );
+ }
+ }
+ if( Time_queue == NULL )
+ {
+ t_e->next = NULL;
+ Time_queue = t_e;
+ Alarm( EVENTS, "E_queue: (only) event queued func 0x%x code %d data 0x%x in future (%u:%u)\n",t_e->func,t_e->code, t_e->data, delta_time.sec, delta_time.usec );
+ return( 0 );
+ }else{
+ compare = E_compare_time ( t_e->t, Time_queue->t );
+ if( compare < 0 )
+ {
+ t_e->next = Time_queue;
+ Time_queue = t_e;
+ inserted = 1;
+ Alarm( EVENTS, "E_queue: (first) event queued func 0x%x code %d data 0x%x in future (%u:%u)\n",t_e->func,t_e->code, t_e->data, delta_time.sec,delta_time.usec );
+ }
+ }
+ t_pre = Time_queue ;
+ t_post = Time_queue->next;
+ while ( t_post != NULL && ( !inserted || !deleted ) )
+ {
+ if( t_post->func == t_e->func &&
+ t_post->data == t_e->data &&
+ t_post->code == t_e->code )
+ {
+ t_pre->next = t_post->next;
+ dispose( t_post );
+ t_post = t_pre->next;
+ deleted = 1;
+ Alarm( EVENTS, "E_queue: dequeued a simillar event\n" );
+ continue;
+ }
+
+ if ( !inserted )
+ {
+ compare = E_compare_time ( t_e->t, t_post->t );
+ if( compare < 0 )
+ {
+ t_pre->next = t_e;
+ t_e->next = t_post;
+ inserted = 1;
+ Alarm( EVENTS, "E_queue: event queued for func 0x%x code %d data 0x%x in future (%u:%u)\n",t_e->func,t_e->code, t_e->data, delta_time.sec, delta_time.usec );
+ }
+ }
+
+ t_pre = t_post;
+ t_post = t_post->next;
+ }
+
+ if( !inserted )
+ {
+ t_pre->next = t_e;
+ t_e->next = NULL;
+ Alarm( EVENTS, "E_queue: (last) event queued func 0x%x code %d data 0x%x in future (%u:%u)\n",t_e->func,t_e->code, t_e->data, delta_time.sec,delta_time.usec );
+ }
+
+ return( 0 );
+}
+
+int E_dequeue( void (* func)( int code, void *data ), int code,
+ void *data )
+{
+ time_event *t_pre;
+ time_event *t_ptr;
+
+ if( Time_queue == NULL )
+ {
+ Alarm( EVENTS, "E_dequeue: no such event\n" );
+ return( -1 );
+ }
+
+ if( Time_queue->func == func &&
+ Time_queue->data == data &&
+ Time_queue->code == code )
+ {
+ t_ptr = Time_queue;
+ Time_queue = Time_queue->next;
+ dispose( t_ptr );
+ Alarm( EVENTS, "E_dequeue: first event dequeued func 0x%x code %d data 0x%x\n",func,code, data);
+ return( 0 );
+ }
+
+ t_pre = Time_queue;
+ while ( t_pre->next != NULL )
+ {
+ t_ptr = t_pre->next;
+ if( t_ptr->func == func &&
+ t_ptr->data == data &&
+ t_ptr->code == code )
+ {
+ t_pre->next = t_ptr->next;
+ dispose( t_ptr );
+ Alarm( EVENTS, "E_dequeue: event dequeued func 0x%x code %d data 0x%x\n",func,code, data);
+ return( 0 );
+ }
+ t_pre = t_ptr;
+ }
+
+ Alarm( EVENTS, "E_dequeue: no such event\n" );
+ return( -1 );
+}
+
+void E_delay( sp_time t )
+{
+ struct timeval tmp_t;
+
+ tmp_t.tv_sec = t.sec;
+ tmp_t.tv_usec = t.usec;
+
+#ifndef ARCH_PC_WIN95
+ if (select(0, NULL, NULL, NULL, &tmp_t ) < 0)
+ {
+ Alarm( EVENTS, "E_delay: select delay returned error: %s\n", strerror(errno));
+ }
+#else /* ARCH_PC_WIN95 */
+ SleepEx( tmp_t.tv_sec*1000+tmp_t.tv_usec/1000, 0 );
+#endif /* ARCH_PC_WIN95 */
+
+}
+
+int E_attach_fd( int fd, int fd_type,
+ void (* func)( mailbox mbox, int code, void *data ),
+ int code, void *data, int priority )
+{
+ int num_fds;
+ int j;
+
+ if( priority < 0 || priority > NUM_PRIORITY )
+ {
+ Alarm( PRINT, "E_attach_fd: invalid priority %d for fd %d with fd_type %d\n", priority, fd, fd_type );
+ return( -1 );
+ }
+ if( fd_type < 0 || fd_type > NUM_FDTYPES )
+ {
+ Alarm( PRINT, "E_attach_fd: invalid fd_type %d for fd %d with priority %d\n", fd_type, fd, priority );
+ return( -1 );
+ }
+#ifndef ARCH_PC_WIN95
+ /* Windows bug: Reports FD_SETSIZE of 64 but select works on all
+ * fd's even ones with numbers greater then 64.
+ */
+ if( fd < 0 || fd > FD_SETSIZE )
+ {
+ Alarm( PRINT, "E_attach_fd: invalid fd %d (max %d) with fd_type %d with priority %d\n", fd, FD_SETSIZE, fd_type, priority );
+ return( -1 );
+ }
+#endif
+ for( j=0; j < Fd_queue[priority].num_fds; j++ )
+ {
+ if( ( Fd_queue[priority].events[j].fd == fd ) && ( Fd_queue[priority].events[j].fd_type == fd_type ) )
+ {
+ Fd_queue[priority].events[j].func = func;
+ Fd_queue[priority].events[j].code = code;
+ Fd_queue[priority].events[j].data = data;
+ if ( !(Fd_queue[priority].events[j].active) )
+ Fd_queue[priority].num_active_fds++;
+ Fd_queue[priority].events[j].active = TRUE;
+ Alarm( PRINT,
+ "E_attach_fd: fd %d with type %d exists & replaced & activated\n", fd, fd_type );
+ return( 1 );
+ }
+ }
+ num_fds = Fd_queue[priority].num_fds;
+
+ if ( num_fds == MAX_FD_EVENTS ) {
+ Alarm( PRINT, "E_attach_fd: Reached Maximum number of events. Recompile with larger MAX_FD_EVENTS\n");
+ return( -1 );
+ }
+ Fd_queue[priority].events[num_fds].fd = fd;
+ Fd_queue[priority].events[num_fds].fd_type = fd_type;
+ Fd_queue[priority].events[num_fds].func = func;
+ Fd_queue[priority].events[num_fds].code = code;
+ Fd_queue[priority].events[num_fds].data = data;
+ Fd_queue[priority].events[num_fds].active = TRUE;
+ Fd_queue[priority].num_fds++;
+ Fd_queue[priority].num_active_fds++;
+ if( Active_priority <= priority ) FD_SET( fd, &Fd_mask[fd_type] );
+
+ Alarm( EVENTS, "E_attach_fd: fd %d, fd_type %d, code %d, data 0x%x, priority %d Active_priority %d\n",
+ fd, fd_type, code, data, priority, Active_priority );
+
+ return( 0 );
+}
+
+int E_detach_fd( int fd, int fd_type )
+{
+ int i,j;
+ int found;
+
+ if( fd_type < 0 || fd_type > NUM_FDTYPES )
+ {
+ Alarm( PRINT, "E_detach_fd: invalid fd_type %d for fd %d\n", fd_type, fd );
+ return( -1 );
+ }
+
+ found = 0;
+ for( i=0; i < NUM_PRIORITY; i++ )
+ for( j=0; j < Fd_queue[i].num_fds; j++ )
+ {
+ if( ( Fd_queue[i].events[j].fd == fd ) && ( Fd_queue[i].events[j].fd_type == fd_type ) )
+ {
+ if (Fd_queue[i].events[j].active)
+ Fd_queue[i].num_active_fds--;
+ Fd_queue[i].num_fds--;
+ Fd_queue[i].events[j] = Fd_queue[i].events[Fd_queue[i].num_fds];
+
+ FD_CLR( fd, &Fd_mask[fd_type] );
+ found = 1;
+
+ break; /* from the j for only */
+ }
+ }
+
+ if( ! found ) return( -1 );
+
+ return( 0 );
+}
+
+int E_deactivate_fd( int fd, int fd_type )
+{
+ int i,j;
+ int found;
+
+ if( fd_type < 0 || fd_type > NUM_FDTYPES )
+ {
+ Alarm( PRINT, "E_deactivate_fd: invalid fd_type %d for fd %d\n", fd_type, fd );
+ return( -1 );
+ }
+
+ found = 0;
+
+ for( i=0; i < NUM_PRIORITY; i++ )
+ for( j=0; j < Fd_queue[i].num_fds; j++ )
+ {
+ if( ( Fd_queue[i].events[j].fd == fd ) && ( Fd_queue[i].events[j].fd_type == fd_type ) )
+ {
+ if (Fd_queue[i].events[j].active)
+ Fd_queue[i].num_active_fds--;
+ Fd_queue[i].events[j].active = FALSE;
+ FD_CLR( fd, &Fd_mask[fd_type] );
+ found = 1;
+
+ break; /* from the j for only */
+ }
+ }
+
+ if( ! found ) return( -1 );
+ return( 0 );
+}
+
+int E_activate_fd( int fd, int fd_type )
+{
+ int i,j;
+ int found;
+
+ if( fd_type < 0 || fd_type > NUM_FDTYPES )
+ {
+ Alarm( PRINT, "E_activate_fd: invalid fd_type %d for fd %d\n", fd_type, fd );
+ return( -1 );
+ }
+
+ found = 0;
+
+ for( i=0; i < NUM_PRIORITY; i++ )
+ for( j=0; j < Fd_queue[i].num_fds; j++ )
+ {
+ if( ( Fd_queue[i].events[j].fd == fd ) && ( Fd_queue[i].events[j].fd_type == fd_type ) )
+ {
+ if ( !(Fd_queue[i].events[j].active) )
+ Fd_queue[i].num_active_fds++;
+ Fd_queue[i].events[j].active = TRUE;
+ if( i >= Active_priority ) FD_SET( fd, &Fd_mask[ fd_type ] );
+ found = 1;
+
+ break; /* from the j for only */
+ }
+ }
+
+ if( ! found ) return( -1 );
+ return( 0 );
+}
+
+int E_set_active_threshold( int priority )
+{
+ int fd_type;
+ int i,j;
+
+ if( priority < 0 || priority > NUM_PRIORITY )
+ {
+ Alarm( PRINT, "E_set_active_threshold: invalid priority %d\n", priority );
+ return( -1 );
+ }
+
+ if( priority == Active_priority ) return( priority );
+
+ Active_priority = priority;
+ for ( i=0; i < NUM_FDTYPES; i++ )
+ {
+ FD_ZERO( &Fd_mask[i] );
+ }
+
+ for( i = priority; i < NUM_PRIORITY; i++ )
+ for( j=0; j < Fd_queue[i].num_fds; j++ )
+ {
+ fd_type = Fd_queue[i].events[j].fd_type;
+ if (Fd_queue[i].events[j].active)
+ FD_SET( Fd_queue[i].events[j].fd, &Fd_mask[fd_type] );
+ }
+
+ Alarm( EVENTS, "E_set_active_threshold: changed to %d\n",Active_priority);
+
+ return( priority );
+}
+
+int E_num_active( int priority )
+{
+ if( priority < 0 || priority > NUM_PRIORITY )
+ {
+ Alarm( PRINT, "E_num_active: invalid priority %d\n", priority );
+ return( -1 );
+ }
+ return( Fd_queue[priority].num_active_fds );
+}
+
+int E_handle_events(void)
+{
+static int Round_robin = 0;
+static const sp_time long_timeout = { 10000, 0};
+static const sp_time zero_sec = { 0, 0};
+#ifdef BADCLOCK
+static const sp_time mili_sec = { 0, 1000};
+ int clock_sync;
+#endif
+ int num_set;
+ int treated;
+ int fd;
+ int fd_type;
+ int i,j;
+ sp_time timeout;
+ struct timeval sel_timeout, wait_timeout;
+ fd_set current_mask[NUM_FDTYPES];
+ time_event *temp_ptr;
+ int first=1;
+#ifdef TESTTIME
+ sp_time tmp_late,start,stop,req_time; /* DEBUGGING */
+#endif
+#ifdef BADCLOCK
+ clock_sync = 0;
+#endif
+ for( Exit_events = 0 ; !Exit_events ; )
+ {
+ Alarm( EVENTS, "E_handle_events: next event \n");
+
+ /* Handle time events */
+ timeout = long_timeout;
+#ifdef TESTTIME
+ start = E_get_time();
+#endif
+ while( Time_queue != NULL )
+ {
+#ifdef BADCLOCK
+ if ( clock_sync >= 0 )
+ {
+ E_get_time();
+ clock_sync = -20;
+ }
+#else
+ E_get_time();
+#endif
+ if ( !first && E_compare_time( Now, Time_queue->t ) >= 0 )
+ {
+#ifdef TESTTIME
+ tmp_late = E_sub_time( Now, Time_queue->t );
+#endif
+ temp_ptr = Time_queue;
+ Time_queue = Time_queue->next;
+ Alarm( EVENTS, "E_handle_events: exec time event \n");
+#ifdef TESTTIME
+ Alarm( DEBUG, "Events: TimeEv is %d %d late\n",tmp_late.sec, tmp_late.usec);
+#endif
+ temp_ptr->func( temp_ptr->code, temp_ptr->data );
+ dispose( temp_ptr );
+#ifdef BADCLOCK
+ Now = E_add_time( Now, mili_sec );
+ clock_sync++;
+#else
+ E_get_time();
+#endif
+ if (Exit_events) goto end_handler;
+ }else{
+ timeout = E_sub_time( Time_queue->t, Now );
+ break;
+ }
+ }
+ if (timeout.sec < 0 )
+ timeout.sec = timeout.usec = 0; /* this can happen until first is unset */
+#ifdef TESTTIME
+ stop = E_get_time();
+ tmp_late = E_sub_time(stop, start);
+ Alarm(DEBUG, "Events: TimeEv's took %d %d to handle\n", tmp_late.sec, tmp_late.usec);
+#endif
+ /* Handle fd events */
+ for( i=0; i < NUM_FDTYPES; i++ )
+ {
+ current_mask[i] = Fd_mask[i];
+ }
+ Alarm( EVENTS, "E_handle_events: poll select\n");
+#ifdef TESTTIME
+ req_time = zero_sec;
+#endif
+ wait_timeout.tv_sec = zero_sec.sec;
+ wait_timeout.tv_usec = zero_sec.usec;
+ num_set = select( FD_SETSIZE, ¤t_mask[READ_FD], ¤t_mask[WRITE_FD], ¤t_mask[EXCEPT_FD],
+ &wait_timeout );
+ if (num_set == 0 && !Exit_events)
+ {
+#ifdef BADCLOCK
+ clock_sync = 0;
+#endif
+ for( i=0; i < NUM_FDTYPES; i++ )
+ {
+ current_mask[i] = Fd_mask[i];
+ }
+ Alarm( EVENTS, "E_handle_events: select with timeout (%d, %d)\n",
+ timeout.sec,timeout.usec );
+#ifdef TESTTIME
+ req_time = E_add_time(req_time, timeout);
+#endif
+ sel_timeout.tv_sec = timeout.sec;
+ sel_timeout.tv_usec = timeout.usec;
+ num_set = select( FD_SETSIZE, ¤t_mask[READ_FD], ¤t_mask[WRITE_FD],
+ ¤t_mask[EXCEPT_FD], &sel_timeout );
+ }
+#ifdef TESTTIME
+ start = E_get_time();
+ tmp_late = E_sub_time(start, stop);
+ Alarm( DEBUG, "Events: Waiting for fd or timout took %d %d asked for %d %d\n", tmp_late.sec, tmp_late.usec, req_time.sec, req_time.usec);
+#endif
+ /* Handle all high and medium priority fd events */
+ for( i=NUM_PRIORITY-1,treated=0;
+ i > LOW_PRIORITY && num_set > 0 && !treated;
+ i-- )
+ {
+ for( j=0; j < Fd_queue[i].num_fds && num_set > 0; j++ )
+ {
+ fd = Fd_queue[i].events[j].fd;
+ fd_type = Fd_queue[i].events[j].fd_type;
+ if( FD_ISSET( fd, ¤t_mask[fd_type] ) )
+ {
+ Alarm( EVENTS, "E_handle_events: exec handler for fd %d, fd_type %d, priority %d\n",
+ fd, fd_type, i );
+ Fd_queue[i].events[j].func(
+ Fd_queue[i].events[j].fd,
+ Fd_queue[i].events[j].code,
+ Fd_queue[i].events[j].data );
+ treated = 1;
+ num_set--;
+#ifdef BADCLOCK
+ Now = E_add_time( Now, mili_sec );
+ clock_sync++;
+#else
+ E_get_time();
+#endif
+ if (Exit_events) goto end_handler;
+ }
+ }
+ }
+ /* Don't handle timed events until all non-low-priority fd events have been handled
+ * FIXME: This may or may not be right. If continual high priority events occur, then
+ * timed events will starve, I'm not sure if that is better then what we have. We
+ * could also set first=0 no matter what after trying the high events once, then
+ * they will get a shot first, but after that timed events will also be handled.
+ */
+ if (!treated)
+ first = 0;
+
+#ifdef TESTTIME
+ stop = E_get_time();
+ tmp_late = E_sub_time(stop, start);
+ Alarm(DEBUG, "Events: High & Med took %d %d time to handle\n", tmp_late.sec, tmp_late.usec);
+#endif
+ /* Handle one low priority fd event.
+ However, verify that Active_priority still allows LOW_PRIORITY events.
+ Active_priority can change because of calls to E_set_threshold() during the current select loop.
+ */
+ for( i=0; i < Fd_queue[LOW_PRIORITY].num_fds
+ && num_set > 0
+ && Active_priority == LOW_PRIORITY;
+ i++ )
+ {
+ j = ( i + Round_robin ) % Fd_queue[LOW_PRIORITY].num_fds;
+ fd = Fd_queue[LOW_PRIORITY].events[j].fd;
+ fd_type = Fd_queue[LOW_PRIORITY].events[j].fd_type;
+ if( FD_ISSET( fd, ¤t_mask[fd_type] ) )
+ {
+ Round_robin = ( j + 1 ) % Fd_queue[LOW_PRIORITY].num_fds;
+
+ Alarm( EVENTS , "E_handle_events: exec ext fd event \n");
+ Fd_queue[LOW_PRIORITY].events[j].func(
+ Fd_queue[LOW_PRIORITY].events[j].fd,
+ Fd_queue[LOW_PRIORITY].events[j].code,
+ Fd_queue[LOW_PRIORITY].events[j].data );
+ num_set--;
+#ifdef BADCLOCK
+ Now = E_add_time( Now, mili_sec );
+ clock_sync++;
+#else
+ E_get_time();
+#endif
+ if (Exit_events) goto end_handler;
+ break;
+ }
+ }
+#ifdef TESTTIME
+ start = E_get_time();
+ tmp_late = E_sub_time(start, stop);
+ Alarm(DEBUG, "Events: Low priority took %d %d to handle\n", tmp_late.sec, tmp_late.usec);
+#endif
+ }
+ end_handler:
+ /* Clean up data structures for exit OR restart of handler loop */
+ /* Actually nothing needs to be cleaned up to allow E_handle_events()
+ * to be called again. The events are still registered (or not registered)
+ * and the only state for the actual events loop is Exit_events which is reset
+ * in the for loop.
+ */
+
+ return 0;
+}
+
+void E_exit_events(void)
+{
+ Alarm( EVENTS, "E_exit_events:\n");
+ Exit_events = 1;
+}
Added: branches/events_testing/daemon/microbench_events_pipes.c
===================================================================
--- branches/events_testing/daemon/microbench_events_pipes.c (rev 0)
+++ branches/events_testing/daemon/microbench_events_pipes.c 2009-05-17 06:08:51 UTC (rev 414)
@@ -0,0 +1,222 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <unistd.h>
+#include <assert.h>
+
+#include <sys/ioctl.h>
+
+#include <stdutil/stdskl.h>
+#include <sp_events.h>
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
+int Num_Pipes = 1000;
+int Num_Ops = 100000;
+int Num_Pending = 1;
+int Seed = 0;
+
+int Num_Ops_Done = 0;
+int Num_Ops_Cmplt = 0;
+int (*Pipes)[2] = NULL;
+
+stdskl Active_Fds; /* <int -> int>: fd -> num_bytes */
+
+sp_time T1;
+sp_time T2;
+sp_time T3;
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
+int get_rand_pipe(void)
+{
+ return (int) (Num_Pipes * (rand() / (RAND_MAX + 1.0)));
+}
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
+void write_pipe(int ping_pipe)
+{
+ int num_bytes;
+ stdit sit;
+
+ assert(Num_Ops_Done < Num_Ops);
+
+ /* track how many ops/bytes are outstanding on the read side of this pipe */
+
+ if (stdskl_is_end(&Active_Fds, stdskl_lowerb(&Active_Fds, &sit, &Pipes[ping_pipe][0])) ||
+ Pipes[ping_pipe][0] != *(int*) stdskl_it_key(&sit)) {
+
+ num_bytes = 1;
+
+ if (stdskl_insert(&Active_Fds, &sit, &Pipes[ping_pipe][0], &num_bytes, STDTRUE) != 0) {
+ perror("stdskl_put failed"), abort();
+ }
+
+ } else {
+ num_bytes = ++*(int*) stdskl_it_val(&sit); /* increment num_bytes */
+ assert(num_bytes > 1);
+ }
+
+ /* write a byte to the pipe */
+
+ if (write(Pipes[ping_pipe][1], &ping_pipe, 1) != 1) {
+ perror("write() failed"), abort();
+ }
+
+ ++Num_Ops_Done;
+}
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
+void read_pipe(int fd, int code, void * data)
+{
+ stdit sit;
+ int num_bytes;
+ char c;
+
+ /* look up the fd in Active_Fds */
+
+ if (stdskl_is_end(&Active_Fds, stdskl_find(&Active_Fds, &sit, &fd))) {
+ fprintf(stderr, "Got activity on an inative fd(%d)?!\r\n", fd), abort();
+ }
+
+ /* read from the fd */
+
+ if (read(fd, &c, 1) != 1) {
+ perror("read() failed"), abort();
+ }
+
+ /* update the num ops/bytes outstanding on the fd */
+
+ num_bytes = --*(int*) stdskl_it_val(&sit);
+ assert(num_bytes >= 0);
+
+ if (num_bytes == 0) {
+ stdskl_erase(&Active_Fds, &sit);
+ }
+
+ /* generate a replacement event for this one */
+
+ assert(Num_Ops_Cmplt < Num_Ops_Done && Num_Ops_Done <= Num_Ops);
+
+ if (Num_Ops_Done != Num_Ops) {
+ write_pipe(get_rand_pipe());
+ }
+
+ ++Num_Ops_Cmplt;
+
+ if ((Num_Ops_Cmplt & 0xffff) == 0) {
+ printf("read_pipe: completed %d operations!\r\n", Num_Ops_Cmplt);
+ }
+
+ if (Num_Ops_Cmplt == Num_Ops) {
+ E_exit_events();
+ }
+}
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
+void usage(int argc, char ** argv)
+{
+ switch (argc) {
+ case 5:
+ Seed = atoi(argv[4]);
+
+ case 4:
+ Num_Pending = atoi(argv[3]);
+
+ case 3:
+ Num_Ops = atoi(argv[2]);
+
+ case 2:
+ Num_Pipes = atoi(argv[1]);
+
+ case 1:
+ break;
+
+ default:
+ fprintf(stderr, "Usage: %s [Num_Pipes [Num_Ops [Seed] ] ]\r\n", argv[0]);
+ exit(-1);
+ }
+
+ if (Num_Pipes <= 0 || Num_Ops <= 0 || Num_Pending <= 0) {
+ fprintf(stderr, "Usage: Num_Pipes, Num_Ops and Num_Pending must be positive integers!\r\n"), exit(-1);
+ }
+
+ if ((Pipes = (int(*)[2]) malloc(2 * sizeof(int) * Num_Pipes)) == NULL) {
+ perror("malloc() failed"), abort();
+ }
+
+ if (stdskl_construct(&Active_Fds, sizeof(int), sizeof(int), NULL) != 0) {
+ perror("stdskl_construct failed"), abort();
+ }
+
+ srand(Seed);
+}
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
+
+int main(int argc, char ** argv)
+{
+ int ret;
+ int cmd;
+ int i;
+
+ E_init();
+
+ usage(argc, argv);
+
+ /* set up pipes */
+
+ for (i = 0; i != Num_Pipes; ++i) {
+
+ if (pipe(Pipes[i])) {
+ perror("pipe() failed"), abort();
+ }
+
+ cmd = 1;
+
+ if ((ret = ioctl(Pipes[i][0], FIONBIO, &cmd)) != 0) {
+ perror("ioctl failed"), abort();
+ }
+
+ cmd = 1;
+
+ if ((ret = ioctl(Pipes[i][1], FIONBIO, &cmd)) != 0) {
+ perror("ioctl failed"), abort();
+ }
+
+ E_attach_fd(Pipes[i][0], READ_FD, read_pipe, 0, NULL, HIGH_PRIORITY);
+ }
+
+ /* seed the system */
+
+ for (i = 0; i != Num_Pending && i != Num_Ops; ++i) {
+ write_pipe(get_rand_pipe());
+ }
+
+ /* loop handling events */
+
+ T1 = E_get_time();
+
+ ret = E_handle_events();
+
+ T2 = E_get_time();
+ T3 = E_sub_time(T2, T1);
+
+ fprintf(stdout, "E_handle_events returned %d size of Active_Fds is %lu!\r\n", ret, (stdulong) stdskl_size(&Active_Fds));
+ fprintf(stdout, "Took %ld.%06ld seconds for an average rate of %f ops/s!\r\n",
+ T3.sec, T3.usec, Num_Ops / (T3.sec + T3.usec / 1000000.0));
+
+ return 0;
+}
+
+/***************************************************************************************************************************
+ ***************************************************************************************************************************/
Modified: branches/events_testing/examples/Makefile.in
===================================================================
--- branches/events_testing/examples/Makefile.in 2009-05-16 01:38:32 UTC (rev 413)
+++ branches/events_testing/examples/Makefile.in 2009-05-17 06:08:51 UTC (rev 414)
@@ -31,7 +31,7 @@
CFLAGS=@CFLAGS@
CPPFLAGS=-I. -I$(srcdir) -I$(top_srcdir)/include @CPPFLAGS@ $(PATHS) @DEFS@
LDFLAGS=@LDFLAGS@
-LIBS=@LIBS@
+LIBS=../stdutil/lib/libstdutil.a @LIBS@
THLDFLAGS=@THLDFLAGS@
THLIBS=@THLIBS@
AR=@AR@
More information about the Spread-cvs
mailing list