[Spread-cvs] cvs commit: spread/daemon Makefile.in defines.h monitor.c
jonathan at spread.org
jonathan at spread.org
Thu Sep 23 13:12:53 EDT 2004
jonathan 04/09/23 13:12:53
Modified: daemon Makefile.in defines.h monitor.c
Log:
Create new sptmonitor program that works as a multi-threaded monitor. This
verison will work on both POSIX thread systems and Win32 systems. Thus,
the sptmonitor program can be built and used on Windows systems.
This has been lightly tested and the VC++ project file has not been updated
yet to build the windows version in this patch. That will be committed later
tonight.
Revision Changes Path
1.10 +10 -4 spread/daemon/Makefile.in
Index: Makefile.in
===================================================================
RCS file: /storage/cvsroot/spread/daemon/Makefile.in,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -r1.9 -r1.10
--- Makefile.in 12 Nov 2003 21:42:54 -0000 1.9
+++ Makefile.in 23 Sep 2004 17:12:52 -0000 1.10
@@ -65,7 +65,9 @@
SPREADOBJS= spread.o protocol.o session.o groups.o alarm.o events.o memory.o membership.o data_link.o network.o status.o log.o flow_control.o message.o lex.yy.o y.tab.o configuration.o skiplist.o acm.o acp-permit.o auth-null.o auth-ip.o
-MONITOROBJS= monitor.o alarm.o events.o memory.o data_link.o lex.yy.o y.tab.o configuration.o skiplist.o acm.o
+MONITOR_OBJS= monitor.o alarm.o events.o memory.o data_link.o lex.yy.o y.tab.o configuration.o skiplist.o acm.o
+
+TMONITOR_OBJS= monitor.to alarm.to events.to memory.to data_link.to lex.yy.to y.tab.to configuration.to skiplist.to acm.to
MANPAGES = docs/SP_connect.3.out docs/SP_disconnect.3.out docs/SP_equal_group_ids.3.out docs/SP_error.3.out docs/SP_join.3.out docs/SP_leave.3.out docs/SP_multicast.3.out docs/SP_multigroup_multicast.3.out docs/SP_multigroup_scat_multicast.3.out docs/SP_poll.3.out docs/SP_receive.3.out docs/SP_scat_multicast.3.out docs/SP_scat_receive.3.out docs/libsp.3.out docs/spread.1.out docs/spuser.1.out docs/sptuser.1.out docs/spmonitor.1.out docs/spflooder.1.out
MANPAGES_IN = docs/SP_connect.3 docs/SP_disconnect.3 docs/SP_equal_group_ids.3 docs/SP_error.3 docs/SP_join.3 docs/SP_leave.3 docs/SP_multicast.3 docs/SP_multigroup_multicast.3 docs/SP_multigroup_scat_multicast.3 docs/SP_poll.3 docs/SP_receive.3 docs/SP_scat_multicast.3 docs/SP_scat_receive.3 docs/libsp.3 docs/spread.1 docs/spuser.1 docs/sptuser.1 docs/spmonitor.1 docs/spflooder.1
@@ -80,7 +82,8 @@
all: $(TARGETS) $(MANPAGES)
$(SPREADOBJS): config.h
-$(MONITOROBJS): config.h
+$(MONITOR_OBJS): config.h
+$(TMONITOR_OBJS): config.h
.c.o:
$(CC) $(CFLAGS) $(CPPFLAGS) -c $<
@@ -159,8 +162,11 @@
spflooder$(EXEEXT): libspread.a flooder.o
$(LD) -o $@ flooder.o $(LDFLAGS) libspread.a $(LIBS)
-spmonitor$(EXEEXT): $(MONITOROBJS)
- $(LD) -o $@ $(MONITOROBJS) $(LDFLAGS) $(LIBS)
+spmonitor$(EXEEXT): $(MONITOR_OBJS)
+ $(LD) -o $@ $(MONITOR_OBJS) $(LDFLAGS) $(LIBS)
+
+sptmonitor$(EXEEXT): $(TMONITOR_OBJS)
+ $(LD) $(THLDFLAGS) -o $@ $(TMONITOR_OBJS) $(LDFLAGS) $(LIBS) $(THLIBS)
sptuser$(EXEEXT): user.to libtspread.a
$(LD) $(THLDFLAGS) -o $@ user.to libtspread.a $(LDFLAGS) $(LIBS) $(THLIBS)
1.4 +3 -0 spread/daemon/defines.h
Index: defines.h
===================================================================
RCS file: /storage/cvsroot/spread/daemon/defines.h,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- defines.h 5 Mar 2004 00:32:46 -0000 1.3
+++ defines.h 23 Sep 2004 17:12:52 -0000 1.4
@@ -53,6 +53,9 @@
#ifdef HAVE_LIMITS_H
# include <limits.h>
#endif
+#ifdef HAVE_ERRNO_H
+# include <errno.h>
+#endif
#ifdef HAVE_SYS_TIME_H
# include <sys/time.h>
#endif
1.14 +221 -11 spread/daemon/monitor.c
Index: monitor.c
===================================================================
RCS file: /storage/cvsroot/spread/daemon/monitor.c,v
retrieving revision 1.13
retrieving revision 1.14
diff -u -r1.13 -r1.14
--- monitor.c 16 Apr 2004 17:58:44 -0000 1.13
+++ monitor.c 23 Sep 2004 17:12:52 -0000 1.14
@@ -37,8 +37,31 @@
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
+#include <errno.h>
#include "arch.h"
+#include "mutex.h"
+
+#ifdef _REENTRANT
+
+#ifndef ARCH_PC_WIN95
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <pthread.h>
+
+#else /* ARCH_PC_WIN95 */
+
+#include <windows.h>
+#include <winsock.h>
+#define ioctl ioctlsocket
+WSADATA WSAData;
+
+#endif /* ARCH_PC_WIN95 */
+
+#endif /* _REENTRANT */
+
+
#include "scatter.h"
#include "configuration.h"
#include "sp_events.h"
@@ -50,6 +73,8 @@
static channel SendChan;
static sp_time Send_partition_timeout = { 25, 0 };
static sp_time Send_status_timeout;
+static int Status_active = 0;
+static int Partition_active = 0;
static sys_scatter Pack_scat;
static packet_header Pack;
static int16 Partition[MAX_PROCS_RING];
@@ -71,6 +96,7 @@
static sys_scatter Report_scat;
static packet_header Report_pack;
+static channel Report_socket;
static void Print_menu();
@@ -89,13 +115,41 @@
static void Report_message();
+#ifdef _REENTRANT
+
+#ifndef ARCH_PC_WIN95
+ static mutex_type Init_mutex = MUTEX_STATIC_INIT;
+ static pthread_t Read_thread;
+ static pthread_t Status_thread;
+ static pthread_t Partition_thread;
+ static void *Read_thread_routine();
+ static void *Status_send_thread_routine();
+ static void *Partition_send_thread_routine();
+#else /* ARCH_PC_WIN95 */
+ static mutex_type Init_mutex = {MUTEX_STATIC_INIT};
+ static HANDLE Read_thread;
+ static HANDLE Status_thread;
+ static HANDLE Partition_thread;
+ static DWORD WINAPI Read_thread_routine( void *);
+ static DWORD WINAPI Status_send_thread_routine( void *);
+ static DWORD WINAPI Partition_send_thread_routine( void *);
+#endif /* ARCH_PC_WIN95 */
+
+static mutex_type Status_mutex;
+static mutex_type Partition_mutex;
+
+#endif /* _REENTRANT */
+
+
static void Usage( int argc, char *argv[] );
+static void initialize_locks(void);
int main( int argc, char *argv[] )
{
int i;
- channel ch;
-
+#ifdef _REENTRANT
+ int ret;
+#endif
fclose(stderr);
Alarm_set_types( NONE );
@@ -151,6 +205,14 @@
Alarm_clear_types(ALL);
Alarm_set_types(PRINT | EXIT );
+#ifdef ARCH_PC_WIN95
+ ret = WSAStartup( MAKEWORD(2,0), &WSAData );
+ if( ret != 0 )
+ Alarm( EXIT, "sptmonitor: main: winsock initialization error %d\n", ret );
+#endif /* ARCH_PC_WIN95 */
+
+ initialize_locks();
+
for( i=0; i < Conf_num_procs( &Cn ); i++ )
Partition[i] = 0;
@@ -164,10 +226,6 @@
Pack.seq = My_port;
Pack.memb_id.proc_id = 15051963;
- E_init();
- E_attach_fd( 0, READ_FD, User_command, 0, NULL, LOW_PRIORITY );
-
-
Report_scat.num_elements = 2;
Report_scat.elements[0].buf = (char *)&Report_pack;
Report_scat.elements[0].len = sizeof(packet_header);
@@ -176,16 +234,80 @@
SendChan = DL_init_channel( SEND_CHANNEL , My_port, 0, 0 );
- ch = DL_init_channel( RECV_CHANNEL, My_port, 0, 0 );
- E_attach_fd( ch, READ_FD, Report_message, 0, NULL, HIGH_PRIORITY );
+ Report_socket = DL_init_channel( RECV_CHANNEL, My_port, 0, 0 );
+
+ E_init(); /* both reentrent and non-reentrant code uses events */
+
+#ifndef _REENTRANT
+ E_attach_fd( 0, READ_FD, User_command, 0, NULL, LOW_PRIORITY );
+
+ E_attach_fd( Report_socket, READ_FD, Report_message, 0, NULL, HIGH_PRIORITY );
+#endif /* _REENTRANT */
+
Print_menu();
+#ifdef _REENTRANT
+
+#ifndef ARCH_PC_WIN95
+ ret = pthread_create( &Read_thread, NULL, Read_thread_routine, 0 );
+ ret = pthread_create( &Status_thread, NULL, Status_send_thread_routine, 0 );
+ ret = pthread_create( &Partition_thread, NULL, Partition_send_thread_routine, 0 );
+#else /* ARCH_PC_WIN95 */
+ Read_thread = CreateThread( NULL, 0, Read_thread_routine, NULL, 0, &ret );
+ Status_thread = CreateThread( NULL, 0, Status_thread_routine, NULL, 0, &ret );
+ Partition_thread = CreateThread( NULL, 0, Partition_thread_routine, NULL, 0, &ret );
+#endif /* ARCH_PC_WIN95 */
+
+ for(;;)
+ {
+ User_command();
+ }
+
+#else /*! _REENTRANT */
+
E_handle_events();
+#endif /* _REENTRANT */
+
return 0;
}
+static void initialize_locks(void)
+{
+ int ret;
+
+ ret = Mutex_trylock( &Init_mutex );
+ if( ret == 0 )
+ {
+ /*
+ * we managed to lock the Init_mutex. This means we are the first thread
+ * to get here.
+ */
+
+ Mutex_init( &Status_mutex );
+ Mutex_init( &Partition_mutex );
+ }
+ return;
+}
+
+#ifdef _REENTRANT
+
+#ifndef ARCH_PC_WIN95
+static void *Read_thread_routine()
+#else /* ARCH_PC_WIN95 */
+static DWORD WINAPI Read_thread_routine( void *dummy)
+#endif /* ARCH_PC_WIN95 */
+{
+ for(;;)
+ {
+ Report_message( Report_socket, 0, NULL);
+ }
+ return( 0 );
+}
+
+#endif /* _REENTRANT */
+
static void User_command()
{
char command[80];
@@ -217,7 +339,12 @@
case '2':
for( i=0; i < Conf_num_procs( &Cn ); i++ )
Partition[i] = Work_partition[i];
+ Mutex_lock( &Partition_mutex );
+ Partition_active = 1;
+ Mutex_unlock( &Partition_mutex );
+#ifndef _REENTRANT
Send_partition();
+#endif
printf("\n");
printf("Monitor> ");
@@ -236,9 +363,14 @@
Partition[i] = 0;
Work_partition[i] = 0;
}
+ Mutex_lock( &Partition_mutex );
+ Partition_active = 0;
+ Mutex_unlock( &Partition_mutex );
+
Send_partition();
+#ifndef _REENTRANT
E_dequeue( Send_partition, 0, NULL );
-
+#endif
printf("\n");
printf("Monitor> ");
fflush(stdout);
@@ -410,9 +542,42 @@
DL_send( SendChan, p.id, p.port, &Pack_scat );
}
}
+#ifndef _REENTRANT
E_queue( Send_partition, 0, NULL, Send_partition_timeout );
+#endif
+}
+
+#ifdef _REENTRANT
+
+#ifndef ARCH_PC_WIN95
+static void *Partition_send_thread_routine()
+#else /* ARCH_PC_WIN95 */
+static DWORD WINAPI Partition_send_thread_routine( void *dummy)
+#endif /* ARCH_PC_WIN95 */
+{
+ sp_time onesecond_time = { 1, 0};
+ sp_time send_interval;
+ int active_p;
+
+ for(;;)
+ {
+ Mutex_lock( &Partition_mutex );
+ active_p = Partition_active;
+ send_interval = Send_partition_timeout;
+ Mutex_unlock( &Partition_mutex );
+ if (active_p) {
+ Send_partition();
+
+ E_delay(send_interval);
+ } else {
+ E_delay(onesecond_time);
+ }
+ }
+ return( 0 );
}
+#endif /* _REENTRANT */
+
static void Print_flow_control( int16 fc_buf[MAX_PROCS_RING] )
{
int32 proc_id;
@@ -578,15 +743,24 @@
}else printf("Please! enter a legal proc name, none, or all\n");
}
}
+#ifndef _REENTRANT
E_dequeue( Send_status_query, 0, NULL );
+#endif
+ Mutex_lock( &Status_mutex );
+ Status_active = 0;
for( i=0; i < Conf_num_procs( &Cn ); i++ )
{
if( Status_vector[i] )
{
- Send_status_query();
+ Status_active = 1;
break;
}
}
+ Mutex_unlock( &Status_mutex );
+#ifndef _REENTRANT
+ if (Status_active)
+ Send_status_query();
+#endif
}
static void Send_status_query()
@@ -616,9 +790,42 @@
}
}
}
+#ifndef _REENTRANT
E_queue( Send_status_query, 0, NULL, Send_status_timeout );
+#endif
}
+#ifdef _REENTRANT
+
+#ifndef ARCH_PC_WIN95
+static void *Status_send_thread_routine()
+#else /* ARCH_PC_WIN95 */
+static DWORD WINAPI Status_send_thread_routine( void *dummy)
+#endif /* ARCH_PC_WIN95 */
+{
+ sp_time onesecond_time = { 1, 0};
+ sp_time send_interval;
+ int active_p;
+
+ for(;;)
+ {
+ Mutex_lock( &Status_mutex );
+ active_p = Status_active;
+ send_interval = Send_status_timeout;
+ Mutex_unlock( &Status_mutex );
+ if (active_p) {
+ Send_status_query();
+
+ E_delay(send_interval);
+ } else {
+ E_delay(onesecond_time);
+ }
+ }
+ return( 0 );
+}
+
+#endif /* _REENTRANT */
+
static void Kill_spreads()
{
int16 Kill_partition[MAX_PROCS_RING];
@@ -707,7 +914,10 @@
last_sec = GlobalStatus.sec;
ret = DL_recv( fd, &Report_scat );
- if( ret <= 0 ) return;
+ if( ret <= 0 ) {
+ Alarm( DEBUG, "Report_messsage: DL_recv failed with ret %d, errno %d\n", ret, sock_errno);
+ return;
+ }
if( !Same_endian( Report_pack.type ) )
{
More information about the Spread-cvs
mailing list