[Spread-cvs] cvs commit: spread/daemon Readme.txt flooder.c user.c

jonathan at spread.org jonathan at spread.org
Thu Sep 23 20:30:08 EDT 2004


jonathan    04/09/23 20:30:08

  Modified:    daemon   Readme.txt flooder.c user.c
  Log:
  Make both user.c and flooder.c use the MAX_MESSLEN constant for all
  message size tests, so that larger message can easily be used by
  recompiling.
  
  Add a new -n option to flooder which sets the number of members that will
  be active for a specific flooder run. This activates a multi-sender
  flow control algorithm which allows tests with many senders.
  
  Revision  Changes    Path
  1.59      +11 -0     spread/daemon/Readme.txt
  
  Index: Readme.txt
  ===================================================================
  RCS file: /storage/cvsroot/spread/daemon/Readme.txt,v
  retrieving revision 1.58
  retrieving revision 1.59
  diff -u -r1.58 -r1.59
  --- Readme.txt	23 Sep 2004 23:15:18 -0000	1.58
  +++ Readme.txt	24 Sep 2004 00:30:08 -0000	1.59
  @@ -74,6 +74,17 @@
         assume the name is truncated. 
   8) Cleanup compile warnings where E_queue() used with no-parameter functions 
      (not all uses fixed) and fix incorrect use of signed int with strlen().
  +9) Fix few cases in flooder.c and user.c that did not use the defined MAX_MESSLEN
  +   constant. Tested to verify that increasing MAX_SCATTER_ELEMENTS in scatter.h
  +   and the MAX_MESSLEN defines in user.c and flooder.c is sufficient to support
  +   arbitrarily large message sizes with Spread. This is NOT recommended, but
  +   several people do it anyway :-)
  +10) Add new option to spflooder. The -n option allows a fixed "number of members"
  +    to be set. This then activates a multi-sender flow control algorithm to allow
  +    flooding tests with several senders. All of the processes need to join the
  +    group (i.e. they cannot be -wo (write-only)) but not all processes have to send.
  +    This allows easy testing with differing numbers of senders (just change how
  +    many spflooders start with -ro and how many do not).
   
   SOURCE INSTALL:
   ---------------
  
  
  
  1.7       +141 -17   spread/daemon/flooder.c
  
  Index: flooder.c
  ===================================================================
  RCS file: /storage/cvsroot/spread/daemon/flooder.c,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- flooder.c	16 Apr 2004 17:58:44 -0000	1.6
  +++ flooder.c	24 Sep 2004 00:30:08 -0000	1.7
  @@ -35,18 +35,33 @@
   #include <stdio.h>
   #include <stdlib.h>
   #include <string.h>
  +
  +/* Not positive if portable JRS 5/2004 */
  +#include <limits.h>
  +
   #include "sp.h"
   
  -#define	MAX_BYTES	100000
  +#define	MAX_MESSLEN	100000
  +static	char            mess[MAX_MESSLEN] ;
  +static	char            recv_mess[MAX_MESSLEN] ;
   
  +#define FLOODER_MAX_GROUPS      100
   static	char	User[80];
   static	char	Spread_name[80];
   static	char	Private_group[MAX_GROUP_NAME];
   static	mailbox	Mbox;
   static	int	Num_bytes;
   static	int	Num_messages;
  +static	int	Num_members;
   static	int	Read_only, Write_only;
   
  +static  char	ret_groups[FLOODER_MAX_GROUPS][MAX_GROUP_NAME];
  +
  +static  int     Send_Counter;
  +static  int     Recv_Counters[FLOODER_MAX_GROUPS];
  +static  int     Lowest_Recv_Counter;
  +static  int     My_Counter_index;
  +
   static	void	Usage( int argc, char *argv[] );
   static  void    Print_help();
   
  @@ -54,19 +69,24 @@
   {
   	int		ret;
   	int		service_type, num_groups;
  -	char		ret_groups[5][MAX_GROUP_NAME];
   	char		sender[MAX_GROUP_NAME];
  -	char            mess[MAX_BYTES] ;
  -	char            recv_mess[MAX_BYTES] ;
  -	int16		dummy_mess_type;
  +	int16		mess_type;
   	int		dummy_endian_mismatch;
  -	int		i;
  +        int             joined_members;
  +        int             sender_index;
  +	int		i,j;
   
   	Usage( argc, argv );
   
  -	/* connecting to the relevant Spread daemon, no need for group info */
  -	printf("flooder: connecting to %s\n", Spread_name );
  -        ret = SP_connect( Spread_name, User, 0, 0, &Mbox, Private_group ) ;
  +        if (Num_members == 0) {
  +            /* connecting to the relevant Spread daemon, no need for group info */
  +            printf("flooder: connecting to %s\n", Spread_name );
  +            ret = SP_connect( Spread_name, User, 0, 0, &Mbox, Private_group ) ;
  +        } else {
  +            /* connecting to the relevant Spread daemon, DO need group info */
  +            printf("flooder: connecting to %s with group membership\n", Spread_name );
  +            ret = SP_connect( Spread_name, User, 0, 1, &Mbox, Private_group ) ;
  +        }
           if(ret < 0) 
   	{
   		SP_error( ret );
  @@ -88,12 +108,77 @@
   		SP_join( Mbox, "flooder" );
   		printf("flooder: starting  multicast of %d messages, %d bytes each.\n", Num_messages, Num_bytes);
   	}
  +
  +        /* Wait for all members to join */
  +        joined_members = 0;
  +        while( joined_members < Num_members)
  +        {
  +            service_type = 0;
  +            ret = SP_receive( Mbox, &service_type, sender, FLOODER_MAX_GROUPS, &num_groups, ret_groups, 
  +                              &mess_type, &dummy_endian_mismatch, sizeof(recv_mess), recv_mess );
  +            if( ret < 0 ) 
  +            {
  +                if ( (ret == GROUPS_TOO_SHORT) || (ret == BUFFER_TOO_SHORT) ) {
  +                    printf("\n========Buffers or Groups too Short=======\n");
  +                    printf("Should NOT happen in wait for members! Program quitting\n");
  +                    exit(1);
  +                }       
  +            }
  +
  +            if( Is_regular_mess( service_type ) )
  +            {
  +		mess[ret] = 0;
  +		if     ( Is_unreliable_mess( service_type ) ) printf("received UNRELIABLE ");
  +		else if( Is_reliable_mess(   service_type ) ) printf("received RELIABLE ");
  +		else if( Is_fifo_mess(       service_type ) ) printf("received FIFO ");
  +		else if( Is_causal_mess(     service_type ) ) printf("received CAUSAL ");
  +		else if( Is_agreed_mess(     service_type ) ) printf("received AGREED ");
  +		else if( Is_safe_mess(       service_type ) ) printf("received SAFE ");
  +		printf("message during wait for members, from %s, of type %d, (endian %d) to %d groups \n(%d bytes): %s\n",
  +			sender, mess_type, dummy_endian_mismatch, num_groups, ret, recv_mess );
  +            }else if( Is_membership_mess( service_type ) )
  +            {
  +		if     ( Is_reg_memb_mess( service_type ) )
  +		{
  +			printf("Received REGULAR membership for group %s with %d members, where I am member %d:\n",
  +				sender, num_groups, mess_type );
  +			for( i=0; i < num_groups; i++ )
  +				printf("\t%s\n", &ret_groups[i][0] );
  +                        /* update count of joined members */
  +                        joined_members = num_groups;
  +
  +		}else if( Is_transition_mess(   service_type ) ) {
  +			printf("received TRANSITIONAL membership for group %s\n", sender );
  +		}else if( Is_caused_leave_mess( service_type ) ){
  +			printf("received membership message that left group %s\n", sender );
  +		}else printf("received incorrecty membership message of type 0x%x\n", service_type );
  +            } else if ( Is_reject_mess( service_type ) )
  +            {
  +		printf("REJECTED message from %s, of servicetype 0x%x messtype %d, (endian %d) to %d groups \n(%d bytes): %s\n",
  +			sender, service_type, mess_type, dummy_endian_mismatch, num_groups, ret, recv_mess );
  +            }else printf("received message of unknown message type 0x%x with ret %d\n", service_type, ret);
  +           
  +        } /* joined_members < Num_members */
  +
  +        /* Update My_Counter_index field based on location of my name in last membership message */
  +        if (Num_members)
  +        {
  +            My_Counter_index = mess_type;
  +            memcpy(&mess[0], &My_Counter_index, sizeof(int));
  +        }
  +
   	for( i=1; i <= Num_messages; i++ )
   	{
   		/* multicast a message unless Read_only */
   		if( !Read_only )
   		{
  -		    ret = SP_multicast( Mbox, RELIABLE_MESS, "flooder", 0, Num_bytes, mess );
  +                    if (Num_members) 
  +                    {
  +                        ret = SP_multicast( Mbox, FIFO_MESS, "flooder", 0, Num_bytes, mess );
  +                        Send_Counter++;
  +                    } else {
  +                        ret = SP_multicast( Mbox, RELIABLE_MESS, "flooder", 0, Num_bytes, mess );
  +                    }
   		    if( ret != Num_bytes ) 
   		    {
   			if( ret < 0 )
  @@ -108,27 +193,54 @@
   		/* receive a message (Read_only) or one of my messages */
   		if( Read_only || ( i > 200 && !Write_only ) )
   		{
  +                    int notdone;
  +
   		    do{
                           service_type = 0;
   
  -			ret = SP_receive( Mbox, &service_type, sender, 5, &num_groups, ret_groups, 
  -					  &dummy_mess_type, &dummy_endian_mismatch, sizeof(recv_mess), recv_mess );
  +			ret = SP_receive( Mbox, &service_type, sender, FLOODER_MAX_GROUPS, &num_groups, ret_groups, 
  +					  &mess_type, &dummy_endian_mismatch, sizeof(recv_mess), recv_mess );
                           if( ret < 0 ) 
                           {
                                   if ( (ret == GROUPS_TOO_SHORT) || (ret == BUFFER_TOO_SHORT) ) {
                                           service_type = DROP_RECV;
                                           printf("\n========Buffers or Groups too Short=======\n");
  -                                        ret = SP_receive( Mbox, &service_type, sender, 5, &num_groups, ret_groups, 
  -                                                          &dummy_mess_type, &dummy_endian_mismatch, sizeof(recv_mess), recv_mess );
  +                                        ret = SP_receive( Mbox, &service_type, sender, FLOODER_MAX_GROUPS, &num_groups, ret_groups, 
  +                                                          &mess_type, &dummy_endian_mismatch, sizeof(recv_mess), recv_mess );
                                   }
                           }
  -                        
   			if( ret < 0 )
   			{
   				SP_error( ret );
   				exit(1);
   			}
  -		    } while( strcmp( sender, Private_group ) != 0  && !Read_only );
  +                        if (Num_members) {
  +                            /* update counters of received messages */
  +                            memcpy(&sender_index, &recv_mess[0], sizeof(int));
  +                            Recv_Counters[sender_index]++;
  +                            /* 
  +                               printf("DEBUG: updated counter %d to value %d\n", sender_index, Recv_Counters[sender_index]);
  +                            */
  +                            if (Recv_Counters[sender_index] == (Lowest_Recv_Counter + 1)) {
  +                                /* Update Lowest_Recv_Counter */
  +                                Lowest_Recv_Counter = INT_MAX;
  +                                for (j=0; j < Num_members; j++) {
  +                                    if (Recv_Counters[j] == 0) continue;
  +                                    if (Recv_Counters[j] < Lowest_Recv_Counter)
  +                                        Lowest_Recv_Counter = Recv_Counters[j];
  +                                }
  +                                if (Lowest_Recv_Counter == INT_MAX) 
  +                                    Lowest_Recv_Counter = 0;
  +                            }
  +                            /* Read loop is done if we send messages and we have received all 
  +                             * other senders messages upto 100 less then our current send count 
  +                             */
  +                            notdone = ( Lowest_Recv_Counter < (Send_Counter - 200) && !Read_only );
  +                        } else {
  +                            notdone = (strcmp( sender, Private_group ) != 0 && !Read_only);
  +                        }
  +
  +		    } while( notdone );
   		}
   
   		/* report some progress... */
  @@ -147,6 +259,7 @@
           sprintf( Spread_name, "4803 at localhost");
   	Num_bytes    =  1000;
   	Num_messages = 10000;
  +        Num_members = 0;
   	Read_only    = 0;
   	Write_only   = 0;
   
  @@ -167,6 +280,10 @@
                           if (argc < 2) Print_help();
   			sscanf(argv[1], "%d", &Num_messages );
                           argc--; argv++;
  +                }else if( !strncmp( *argv, "-n", 2 ) ){
  +                        if (argc < 2) Print_help();
  +			sscanf(argv[1], "%d", &Num_members );
  +                        argc--; argv++;
                   }else if( !strncmp( *argv, "-s", 2 ) ){
                           if (argc < 2) Print_help();
                           strcpy( Spread_name, argv[1] ); 
  @@ -181,12 +298,19 @@
                       Print_help();
                   }
   	}
  +
  +        if (Num_members > FLOODER_MAX_GROUPS)
  +        {
  +            printf("Too many members. The max is %d\n", FLOODER_MAX_GROUPS);
  +            Print_help();
  +        }
   }
   static  void    Print_help()
   {
  -    printf( "Usage: spflooder\n%s\n%s\n%s\n%s\n%s\n%s\n",
  +    printf( "Usage: spflooder\n%s\n%s\n%s\n%s\n%s\n%s\n%s\n",
               "\t[-u <user name>]     : unique (in this machine) user name",
               "\t[-m <num messages>]  : number of messages",
  +            "\t[-n <num members>]   : number of group members to wait for (also turn on multi-sender FC)",
               "\t[-b <num bytes>]     : number of bytes per message 1-100,000",
               "\t[-s <spread name>]   : either port or port at machine",
               "\t[-ro]   		: read  only (no multicast)",
  
  
  
  1.9       +1 -1      spread/daemon/user.c
  
  Index: user.c
  ===================================================================
  RCS file: /storage/cvsroot/spread/daemon/user.c,v
  retrieving revision 1.8
  retrieving revision 1.9
  diff -u -r1.8 -r1.9
  --- user.c	23 Sep 2004 23:15:18 -0000	1.8
  +++ user.c	24 Sep 2004 00:30:08 -0000	1.9
  @@ -377,7 +377,7 @@
   static	void	Read_message()
   {
   
  -static	char		mess[102400];
  +static	char		mess[MAX_MESSLEN];
   	char		sender[MAX_GROUP_NAME];
   	char		target_groups[100][MAX_GROUP_NAME];
   	group_id	*grp_id;
  
  
  




More information about the Spread-cvs mailing list