[Spread-cvs] cvs commit: spread/daemon Makefile.in defines.h monitor.c

jonathan at spread.org jonathan at spread.org
Thu Sep 23 13:12:53 EDT 2004


jonathan    04/09/23 13:12:53

  Modified:    daemon   Makefile.in defines.h monitor.c
  Log:
  Create new sptmonitor program that works as a multi-threaded monitor. This
  verison will work on both POSIX thread systems and Win32 systems. Thus,
  the sptmonitor program can be built and used on Windows systems.
  This has been lightly tested and the VC++ project file has not been updated
  yet to build the windows version in this patch. That will be committed later
  tonight.
  
  Revision  Changes    Path
  1.10      +10 -4     spread/daemon/Makefile.in
  
  Index: Makefile.in
  ===================================================================
  RCS file: /storage/cvsroot/spread/daemon/Makefile.in,v
  retrieving revision 1.9
  retrieving revision 1.10
  diff -u -r1.9 -r1.10
  --- Makefile.in	12 Nov 2003 21:42:54 -0000	1.9
  +++ Makefile.in	23 Sep 2004 17:12:52 -0000	1.10
  @@ -65,7 +65,9 @@
   
   SPREADOBJS= spread.o protocol.o session.o groups.o alarm.o events.o memory.o membership.o data_link.o network.o status.o log.o flow_control.o message.o lex.yy.o y.tab.o configuration.o skiplist.o acm.o acp-permit.o auth-null.o auth-ip.o
   
  -MONITOROBJS= monitor.o  alarm.o events.o memory.o data_link.o lex.yy.o y.tab.o configuration.o skiplist.o acm.o
  +MONITOR_OBJS= monitor.o  alarm.o events.o memory.o data_link.o lex.yy.o y.tab.o configuration.o skiplist.o acm.o
  +
  +TMONITOR_OBJS= monitor.to  alarm.to events.to memory.to data_link.to lex.yy.to y.tab.to configuration.to skiplist.to acm.to
   
   MANPAGES	= docs/SP_connect.3.out docs/SP_disconnect.3.out docs/SP_equal_group_ids.3.out docs/SP_error.3.out docs/SP_join.3.out docs/SP_leave.3.out docs/SP_multicast.3.out docs/SP_multigroup_multicast.3.out docs/SP_multigroup_scat_multicast.3.out docs/SP_poll.3.out docs/SP_receive.3.out docs/SP_scat_multicast.3.out docs/SP_scat_receive.3.out docs/libsp.3.out docs/spread.1.out docs/spuser.1.out docs/sptuser.1.out docs/spmonitor.1.out docs/spflooder.1.out
   MANPAGES_IN	= docs/SP_connect.3 docs/SP_disconnect.3 docs/SP_equal_group_ids.3 docs/SP_error.3 docs/SP_join.3 docs/SP_leave.3 docs/SP_multicast.3 docs/SP_multigroup_multicast.3 docs/SP_multigroup_scat_multicast.3 docs/SP_poll.3 docs/SP_receive.3 docs/SP_scat_multicast.3 docs/SP_scat_receive.3 docs/libsp.3 docs/spread.1 docs/spuser.1 docs/sptuser.1 docs/spmonitor.1 docs/spflooder.1
  @@ -80,7 +82,8 @@
   all: $(TARGETS) $(MANPAGES)
   
   $(SPREADOBJS): config.h
  -$(MONITOROBJS): config.h
  +$(MONITOR_OBJS): config.h
  +$(TMONITOR_OBJS): config.h
   
   .c.o:
   	$(CC) $(CFLAGS) $(CPPFLAGS) -c $<
  @@ -159,8 +162,11 @@
   spflooder$(EXEEXT): libspread.a flooder.o
   	$(LD) -o $@ flooder.o $(LDFLAGS) libspread.a $(LIBS)
   
  -spmonitor$(EXEEXT): $(MONITOROBJS)
  -	$(LD) -o $@ $(MONITOROBJS) $(LDFLAGS) $(LIBS) 
  +spmonitor$(EXEEXT): $(MONITOR_OBJS)
  +	$(LD) -o $@ $(MONITOR_OBJS) $(LDFLAGS) $(LIBS) 
  +
  +sptmonitor$(EXEEXT): $(TMONITOR_OBJS)
  +	$(LD) $(THLDFLAGS) -o $@ $(TMONITOR_OBJS) $(LDFLAGS) $(LIBS) $(THLIBS) 
   
   sptuser$(EXEEXT): user.to libtspread.a
   	$(LD) $(THLDFLAGS) -o $@ user.to libtspread.a $(LDFLAGS) $(LIBS) $(THLIBS)
  
  
  
  1.4       +3 -0      spread/daemon/defines.h
  
  Index: defines.h
  ===================================================================
  RCS file: /storage/cvsroot/spread/daemon/defines.h,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- defines.h	5 Mar 2004 00:32:46 -0000	1.3
  +++ defines.h	23 Sep 2004 17:12:52 -0000	1.4
  @@ -53,6 +53,9 @@
   #ifdef HAVE_LIMITS_H
   # include <limits.h>
   #endif
  +#ifdef HAVE_ERRNO_H
  +# include <errno.h>
  +#endif
   #ifdef HAVE_SYS_TIME_H
   # include <sys/time.h>
   #endif
  
  
  
  1.14      +221 -11   spread/daemon/monitor.c
  
  Index: monitor.c
  ===================================================================
  RCS file: /storage/cvsroot/spread/daemon/monitor.c,v
  retrieving revision 1.13
  retrieving revision 1.14
  diff -u -r1.13 -r1.14
  --- monitor.c	16 Apr 2004 17:58:44 -0000	1.13
  +++ monitor.c	23 Sep 2004 17:12:52 -0000	1.14
  @@ -37,8 +37,31 @@
   #include <stdio.h>
   #include <string.h>
   #include <stdlib.h>
  +#include <errno.h>
   
   #include "arch.h"
  +#include "mutex.h"
  +
  +#ifdef _REENTRANT
  +
  +#ifndef 	ARCH_PC_WIN95
  +
  +#include        <sys/types.h>
  +#include        <sys/socket.h>
  +#include 	<pthread.h>
  +
  +#else		/* ARCH_PC_WIN95 */
  +
  +#include	<windows.h>
  +#include        <winsock.h>
  +#define ioctl   ioctlsocket
  +WSADATA		WSAData;
  +
  +#endif		/* ARCH_PC_WIN95 */
  +
  +#endif /* _REENTRANT */
  +
  +
   #include "scatter.h"
   #include "configuration.h"
   #include "sp_events.h"
  @@ -50,6 +73,8 @@
   static	channel		SendChan;
   static	sp_time		Send_partition_timeout = { 25, 0 };
   static	sp_time		Send_status_timeout;
  +static  int             Status_active = 0;
  +static  int             Partition_active = 0;
   static	sys_scatter	Pack_scat;
   static	packet_header	Pack;
   static	int16		Partition[MAX_PROCS_RING];
  @@ -71,6 +96,7 @@
   
   static	sys_scatter	Report_scat;
   static	packet_header	Report_pack;
  +static  channel         Report_socket;
   
   static	void	Print_menu();
   
  @@ -89,13 +115,41 @@
   
   static	void	Report_message();
   
  +#ifdef	_REENTRANT
  +
  +#ifndef     ARCH_PC_WIN95
  +    static      mutex_type	Init_mutex = MUTEX_STATIC_INIT;
  +    static      pthread_t	Read_thread;
  +    static      pthread_t	Status_thread;
  +    static      pthread_t	Partition_thread;
  +    static      void            *Read_thread_routine();
  +    static      void            *Status_send_thread_routine();
  +    static      void            *Partition_send_thread_routine();
  +#else		/* ARCH_PC_WIN95 */
  +    static	mutex_type	Init_mutex = {MUTEX_STATIC_INIT};
  +    static	HANDLE		Read_thread;
  +    static	HANDLE		Status_thread;
  +    static	HANDLE		Partition_thread;
  +    static	DWORD WINAPI    Read_thread_routine( void *);
  +    static	DWORD WINAPI    Status_send_thread_routine( void *);
  +    static	DWORD WINAPI    Partition_send_thread_routine( void *);
  +#endif		/* ARCH_PC_WIN95 */
  +
  +static	mutex_type	Status_mutex;
  +static  mutex_type      Partition_mutex;
  +
  +#endif /* _REENTRANT */
  +
  +
   static  void    Usage( int argc, char *argv[] );
  +static  void    initialize_locks(void);
   
   int main( int argc, char *argv[] )
   {
   	int	i;
  -        channel ch;
  -
  +#ifdef _REENTRANT
  +        int     ret;
  +#endif
   	fclose(stderr);
   
   	Alarm_set_types( NONE ); 
  @@ -151,6 +205,14 @@
           Alarm_clear_types(ALL);
           Alarm_set_types(PRINT | EXIT );
   
  +#ifdef ARCH_PC_WIN95
  +        ret = WSAStartup( MAKEWORD(2,0), &WSAData );
  +        if( ret != 0 )
  +            Alarm( EXIT, "sptmonitor: main: winsock initialization error %d\n", ret );
  +#endif	/* ARCH_PC_WIN95 */
  +
  +        initialize_locks();
  +
   	for( i=0; i < Conf_num_procs( &Cn ); i++ )
   		Partition[i] = 0;
   
  @@ -164,10 +226,6 @@
   	Pack.seq = My_port;
   	Pack.memb_id.proc_id = 15051963;
   
  -	E_init();
  -	E_attach_fd( 0, READ_FD, User_command, 0, NULL, LOW_PRIORITY );
  -
  -
   	Report_scat.num_elements = 2;
   	Report_scat.elements[0].buf = (char *)&Report_pack;
   	Report_scat.elements[0].len = sizeof(packet_header);
  @@ -176,16 +234,80 @@
   
           SendChan = DL_init_channel( SEND_CHANNEL , My_port, 0, 0 );
   
  -        ch = DL_init_channel( RECV_CHANNEL, My_port, 0, 0 );
  -        E_attach_fd( ch, READ_FD, Report_message, 0, NULL, HIGH_PRIORITY );
  +        Report_socket = DL_init_channel( RECV_CHANNEL, My_port, 0, 0 );
  +
  +	E_init(); /* both reentrent and non-reentrant code uses events */
  +
  +#ifndef	_REENTRANT
  +	E_attach_fd( 0, READ_FD, User_command, 0, NULL, LOW_PRIORITY );
  +
  +        E_attach_fd( Report_socket, READ_FD, Report_message, 0, NULL, HIGH_PRIORITY );
  +#endif	/* _REENTRANT */
  +
           
   	Print_menu();
   
  +#ifdef	_REENTRANT
  +
  +#ifndef	        ARCH_PC_WIN95
  +	ret = pthread_create( &Read_thread, NULL, Read_thread_routine, 0 );
  +	ret = pthread_create( &Status_thread, NULL, Status_send_thread_routine, 0 );
  +	ret = pthread_create( &Partition_thread, NULL, Partition_send_thread_routine, 0 );
  +#else		/* ARCH_PC_WIN95 */
  +	Read_thread = CreateThread( NULL, 0, Read_thread_routine, NULL, 0, &ret );
  +	Status_thread = CreateThread( NULL, 0, Status_thread_routine, NULL, 0, &ret );
  +	Partition_thread = CreateThread( NULL, 0, Partition_thread_routine, NULL, 0, &ret );
  +#endif		/* ARCH_PC_WIN95 */
  +
  +	for(;;)
  +	{
  +		User_command();
  +	}
  +
  +#else	/*! _REENTRANT */
  +
   	E_handle_events();
   
  +#endif	/* _REENTRANT */
  +
   	return 0;
   }
   
  +static  void    initialize_locks(void)
  +{
  +        int ret;
  +
  +	ret = Mutex_trylock( &Init_mutex );
  +	if( ret == 0 )
  +	{
  +		/* 
  +		 * we managed to lock the Init_mutex. This means we are the first thread
  +		 * to get here.
  +		 */
  +
  +		Mutex_init( &Status_mutex );
  +		Mutex_init( &Partition_mutex );
  +        }
  +        return;
  +}
  +
  +#ifdef	_REENTRANT
  +
  +#ifndef 	ARCH_PC_WIN95
  +static	void	*Read_thread_routine()
  +#else		/* ARCH_PC_WIN95 */
  +static	DWORD WINAPI    Read_thread_routine( void *dummy)
  +#endif		/* ARCH_PC_WIN95 */
  +{
  +	for(;;)
  +	{
  +            Report_message( Report_socket, 0, NULL);
  +	}
  +	return( 0 );
  +}
  +
  +#endif	/* _REENTRANT */
  +
   static	void	User_command()
   {
   	char	command[80];
  @@ -217,7 +339,12 @@
   		case '2':
   			for( i=0; i < Conf_num_procs( &Cn ); i++ )
   				Partition[i] = Work_partition[i];
  +                        Mutex_lock( &Partition_mutex );
  +                        Partition_active = 1;
  +                        Mutex_unlock( &Partition_mutex );
  +#ifndef _REENTRANT
   			Send_partition();
  +#endif
   
   			printf("\n");
   			printf("Monitor> ");
  @@ -236,9 +363,14 @@
   				Partition[i] = 0;
   				Work_partition[i] = 0;
   			}
  +                        Mutex_lock( &Partition_mutex );
  +                        Partition_active = 0;
  +                        Mutex_unlock( &Partition_mutex );
  +
   			Send_partition();
  +#ifndef _REENTRANT
   			E_dequeue( Send_partition, 0, NULL );
  -
  +#endif
   			printf("\n");
   			printf("Monitor> ");
   			fflush(stdout);
  @@ -410,9 +542,42 @@
   		DL_send( SendChan, p.id, p.port, &Pack_scat );
   	    }
   	}
  +#ifndef _REENTRANT
   	E_queue( Send_partition, 0, NULL, Send_partition_timeout );
  +#endif
  +}
  +
  +#ifdef	_REENTRANT
  +
  +#ifndef 	ARCH_PC_WIN95
  +static	void	*Partition_send_thread_routine()
  +#else		/* ARCH_PC_WIN95 */
  +static	DWORD WINAPI    Partition_send_thread_routine( void *dummy)
  +#endif		/* ARCH_PC_WIN95 */
  +{
  +    sp_time onesecond_time = { 1, 0};
  +    sp_time send_interval;
  +    int active_p;
  +
  +    for(;;)
  +    {
  +        Mutex_lock( &Partition_mutex );
  +        active_p = Partition_active;
  +        send_interval = Send_partition_timeout;
  +        Mutex_unlock( &Partition_mutex );
  +        if (active_p) {
  +            Send_partition();
  +
  +            E_delay(send_interval); 
  +        } else {
  +            E_delay(onesecond_time);
  +        }
  +    }
  +    return( 0 );
   }
   
  +#endif	/* _REENTRANT */
  +
   static	void	Print_flow_control( int16 fc_buf[MAX_PROCS_RING] )
   {
   	int32	proc_id;
  @@ -578,15 +743,24 @@
   			}else printf("Please! enter a legal proc name, none, or all\n");
   		}
   	}
  +#ifndef _REENTRANT
   	E_dequeue( Send_status_query, 0, NULL );
  +#endif
  +        Mutex_lock( &Status_mutex );
  +        Status_active = 0;
   	for( i=0; i < Conf_num_procs( &Cn ); i++ )
   	{
   		if( Status_vector[i] )
   		{
  -			Send_status_query();
  +                    Status_active = 1;
   			break;
   		}
   	}
  +        Mutex_unlock( &Status_mutex );
  +#ifndef _REENTRANT
  +        if (Status_active)
  +            Send_status_query();
  +#endif
   }
   
   static	void 	Send_status_query()
  @@ -616,9 +790,42 @@
   		}
   	    }
   	}
  +#ifndef _REENTRANT
   	E_queue( Send_status_query, 0, NULL, Send_status_timeout );
  +#endif
   }
   
  +#ifdef	_REENTRANT
  +
  +#ifndef 	ARCH_PC_WIN95
  +static	void	*Status_send_thread_routine()
  +#else		/* ARCH_PC_WIN95 */
  +static	DWORD WINAPI    Status_send_thread_routine( void *dummy)
  +#endif		/* ARCH_PC_WIN95 */
  +{
  +    sp_time onesecond_time = { 1, 0};
  +    sp_time send_interval;
  +    int active_p;
  +
  +    for(;;)
  +    {
  +        Mutex_lock( &Status_mutex );
  +        active_p = Status_active;
  +        send_interval = Send_status_timeout;
  +        Mutex_unlock( &Status_mutex );
  +        if (active_p) {
  +            Send_status_query();
  +
  +            E_delay(send_interval); 
  +        } else {
  +            E_delay(onesecond_time);
  +        }
  +    }
  +    return( 0 );
  +}
  +
  +#endif	/* _REENTRANT */
  +
   static	void	Kill_spreads()
   {
   	int16	Kill_partition[MAX_PROCS_RING];
  @@ -707,7 +914,10 @@
   	last_sec = GlobalStatus.sec;
   
   	ret = DL_recv( fd, &Report_scat );
  -	if( ret <= 0 ) return;
  +	if( ret <= 0 ) {
  +            Alarm( DEBUG, "Report_messsage: DL_recv failed with ret %d, errno %d\n", ret, sock_errno);
  +            return;
  +        }
   
   	if( !Same_endian( Report_pack.type ) )
   	{
  
  
  




More information about the Spread-cvs mailing list