[Spread-cvs] cvs commit: spread/daemon membership.c membership.h network.c spread_params.h Readme.txt

jonathan at spread.org jonathan at spread.org
Sat Oct 2 18:42:28 EDT 2004


jonathan    04/10/02 18:42:28

  Modified:    daemon   membership.c membership.h network.c spread_params.h
                        Readme.txt
  Log:
  Fix bug where more then 22 daemons starting up at once would cause some to
  crash or not complete membership. The fix involves better checking to
  prevent FORM token messages that are longer then they should be from
  being sent. It also requires that in fill_form1() if a new ring_info
  entry needs to be added to the current FORM token and there is not room
  on the token, the new ring_info entry is not added and the current daemon
  removes itself from the token. This way it will NOT be added to the
  current membership on this iteration of the algorithm, but once that
  completes, it will start a new membership add to merge with the rest.
  This prevents the FORM token from ever being larger then 1 packet.
  
  Revision  Changes    Path
  1.6       +224 -29   spread/daemon/membership.c
  
  Index: membership.c
  ===================================================================
  RCS file: /storage/cvsroot/spread/daemon/membership.c,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- membership.c	5 Mar 2004 00:32:46 -0000	1.5
  +++ membership.c	2 Oct 2004 22:42:28 -0000	1.6
  @@ -1048,6 +1048,13 @@
   	    {
   		return( 0 );
   	    }
  +
  +       if (m->num_members == MAX_PROCS_RING) 
  +       {
  +            /* members structure is full -- so we ignore this new member */
  +            Alarmp( SPLOG_WARNING, MEMB, "Insert_member: members structure full (%u members) so ignore new member (ID %u.%u.%u.%u)\n", m->num_members, IP1(proc_id), IP2(proc_id), IP3(proc_id), IP4(proc_id) );
  +            return( 0 );
  +        }
   	m->members[m->num_members] = proc_id;
   	m->num_members++;
   	return( 1 );
  @@ -1127,6 +1134,12 @@
   	    }
   	}else Alarm( EXIT, "Insert_rep: unknown rep.type %d \n", rep.type );
   
  +        if (r->num_reps == MAX_REPS) 
  +        {
  +            /* reps structure is full -- so we ignore this new rep */
  +            Alarmp( SPLOG_WARNING, MEMB, "Insert_rep: reps structure full (%u reps) so ignore new rep (Type %d SegIndex %u ID %u.%u.%u.%u)\n", r->num_reps, rep.type, rep.seg_index, IP1(rep.proc_id), IP2(rep.proc_id), IP3(rep.proc_id), IP4(rep.proc_id) );
  +            return( 0 );
  +        }
   	r->reps[r->num_reps] = rep;
   	r->num_reps++;
   	return( 1 );
  @@ -1427,6 +1440,7 @@
   	rep_info	temp_rep;
   	int		i,j,k;
   	int             cur_num_members;
  +        int             num_to_copy;
           members_info    valid_members;
   	
   	num_bytes  = 0;
  @@ -1568,7 +1582,7 @@
   	*new_num_rings = 0;
   	num_bytes += sizeof(int32);
   
  -	my_rg_info = 0;
  +	my_rg_info = NULL;
   	my_holes_procs_ptr = NULL; /* optimiser thinks it may be used
   				     uninitialised (its wrong, but it
   				     is too subtle for it) */
  @@ -1583,7 +1597,6 @@
   		my_holes_procs_ptr = (int32 *)&c_ptr[sizeof(ring_info)];
                   old_rg_info = (ring_info *)&c_ptr[bytes_to_copy];
   	    }else{
  -
   		new_rg_info= (ring_info    *)&rg_info_buf[num_bytes];
   		memmove((char *)new_rg_info, (char *)old_rg_info, bytes_to_copy );
   		c_ptr = (char *) old_rg_info;
  @@ -1605,42 +1618,71 @@
   	new_rg_info->aru	 = Aru;
   	new_rg_info->highest_seq = Highest_seq;
   
  -	if( my_rg_info == 0 )
  +	if( my_rg_info == NULL )
   	{
  -	    for( index = Last_discarded+1; index <= Highest_seq; index++ )
  -	    {
  -		pack_entry = index & PACKET_MASK;
  -		if( ! Packets[pack_entry].exist )
  -		{
  +
  +            if ( *new_num_rings > MAX_FORM_REPS ) 
  +            {
  +                /* This ring_info entry will NOT fit in the FORM token packet.
  +                 * So since the ring_info is needed for me (this daemon) to be
  +                 * included in the current membership ring, we will have
  +                 * to remove ourselves from the m_info list and not
  +                 * create this ring_info 
  +                 */
  +                Alarmp( SPLOG_WARNING, MEMB, "Fill_form1: ring_info entry for %u.%u.%u.%u will not fit in FORM token. Removing self from current membership attempt by removing IP from m_info list\n", IP1(My.id), IP2(My.id), IP3(My.id), IP4(My.id));
  +                /* since new ring is always at the end, we just decrease current byte count */
  +                num_bytes = num_bytes - sizeof(ring_info);
  +                (*new_num_rings)--;
  +
  +                /* Remove ourselves from m_info */
  +                for ( i=0; i < m_info->num_members; i++) 
  +                {
  +                    if( m_info->members[i] == My.id )
  +                    {
  +                        num_to_copy = m_info->num_members + m_info->num_pending - i - 1;
  +                        memmove(&m_info->members[i], &m_info->members[i+1], num_to_copy * sizeof(int32));
  +                        break;
  +                    }
  +                }
  +                m_info->num_members--;
  +
  +            } else {
  +                /* New ring_info will fit, so create it */
  +                for( index = Last_discarded+1; index <= Highest_seq; index++ )
  +                {
  +                    pack_entry = index & PACKET_MASK;
  +                    if( ! Packets[pack_entry].exist )
  +                    {
   			*new_holes_procs_ptr = index;
   			Alarm( MEMB , "INSERT HOLE 2 IS %d\n",index);
   			new_holes_procs_ptr++;
   			num_bytes     += sizeof(int32);
   			new_rg_info->num_holes++;
  -		}
  -	    }
  +                    }
  +                }
   
  -	    /* update commit-trans procs */
  +                /* update commit-trans procs */
   
  -	    /* insert self in trans and commit */
  -	    new_rg_info->num_commit	= 1;
  -	    new_rg_info->num_trans	= 1;
  -	    *new_holes_procs_ptr = My.id;
  -	    new_holes_procs_ptr++;
  -	    num_bytes += sizeof(int32);
  +                /* insert self in trans and commit */
  +                new_rg_info->num_commit	= 1;
  +                new_rg_info->num_trans	= 1;
  +                *new_holes_procs_ptr = My.id;
  +                new_holes_procs_ptr++;
  +                num_bytes += sizeof(int32);
   
  -	    /* insert other members of commit set */
  -	    for( i=0; i < Commit_set.num_members; i++ )
  -	    {
  -		/* skipping self, because already there */
  -		if( Commit_set.members[i] == My.id ) continue;
  +                /* insert other members of commit set */
  +                for( i=0; i < Commit_set.num_members; i++ )
  +                {
  +                    /* skipping self, because already there */
  +                    if( Commit_set.members[i] == My.id ) continue;
   
  -		/* insert this member */
  -		*new_holes_procs_ptr = Commit_set.members[i];
  -		new_holes_procs_ptr++;
  -		num_bytes += sizeof(int32);
  -		new_rg_info->num_commit++;
  -	    }
  +                    /* insert this member */
  +                    *new_holes_procs_ptr = Commit_set.members[i];
  +                    new_holes_procs_ptr++;
  +                    num_bytes += sizeof(int32);
  +                    new_rg_info->num_commit++;
  +                }
  +            }
   	}else{
   	
   	    members_info temp_set;
  @@ -1668,6 +1710,7 @@
   	    }
   
   	    if( my_rg_info->highest_seq < Highest_seq )
  +            {
   		for( index = my_rg_info->highest_seq+1; 
   			index <= Highest_seq; index++ )
   		{
  @@ -1681,7 +1724,7 @@
   			new_rg_info->num_holes++;
   		    }
   		} 
  -
  +            }
   	    /* setting temp_set to be trans members only */
   	    temp_set.num_members = 0;
   	    for( i = 0; i < my_rg_info->num_trans; i++ )
  @@ -1960,6 +2003,158 @@
   
   	State = EVS;
   	GlobalStatus.state = EVS;
  +}
  +
  +void	Memb_print_form_token( sys_scatter *scat )
  +{
  +	token_header	*form_token;
  +	members_info	*m_info;
  +	reps_info	*r_info = NULL; /* avoids compile warning -- gcc not detect initialization */
  +        membership_id   *m_id_info = NULL; /* avoids compile warning -- gcc not detect initialization */
  +	ring_info	*rg_info;
  +	int32		*num_rings;
  +	int32		*commit_id;
  +	char		*c_ptr;
  +	int		num_bytes, bytes_to_skip;
  +	int		i,j, scat_index;
  +        int             is_form1 = 0;
  +        char            form_name[10];
  +
  +	num_bytes  = 0;
  +        scat_index = 1;
  +	form_token = (token_header *)scat->elements[0].buf;
  +
  +	m_info	   = (members_info *)&scat->elements[scat_index].buf[num_bytes];
  +	num_bytes  += sizeof(members_info);
  +
  +        if (num_bytes == scat->elements[scat_index].len )
  +        {
  +            num_bytes = 0;
  +            scat_index++;
  +        }
  +
  +        if ( Is_form1( form_token->type ) )
  +	{
  +            r_info	= (reps_info    *)&scat->elements[scat_index].buf[num_bytes];
  +            num_bytes  += sizeof(reps_info);
  +            is_form1 = 1;
  +	} else if( Is_form2( form_token->type ) )
  +        {
  +            m_id_info   = (membership_id *)&scat->elements[scat_index].buf[num_bytes];
  +            num_bytes  += sizeof(membership_id);
  +            is_form1 = 0;
  +        } else {
  +            Alarm( EXIT, "Invalid token type received: 0x%x\n", form_token->type);
  +            return;
  +        }
  +
  +        if (num_bytes == scat->elements[scat_index].len )
  +        {
  +            num_bytes = 0;
  +            scat_index++;
  +        }
  +            
  +	num_rings  = (int32        *)&scat->elements[scat_index].buf[num_bytes];
  +	num_bytes  += sizeof(int32);
  +
  +	rg_info= (ring_info    *)&scat->elements[scat_index].buf[num_bytes];
  +
  +        /* Print form_token_header */
  +
  +        Alarmp( SPLOG_PRINT, PRINT, "=========== Form Token ==========\n");
  +
  +        if ( is_form1 )
  +            snprintf(&form_name[0], 10, "FORM 1");
  +        else 
  +            snprintf(&form_name[0], 10, "FORM 2");
  +
  +        Alarmp( SPLOG_PRINT, PRINT, "%s Token, sent by %u.%u.%u.%u. Seq: %d\n", form_name, IP1(form_token->transmiter_id), IP2(form_token->transmiter_id), IP3(form_token->transmiter_id), IP4(form_token->transmiter_id), form_token->seq);
  +        Alarmp( SPLOG_PRINT, PRINT, "ProcID: %u.%u.%u.%u\t ARU: %d, ARU LastID: %u.%u.%u.%u\n", IP1(form_token->proc_id), IP2(form_token->proc_id), IP3(form_token->proc_id), IP4(form_token->proc_id), form_token->aru, IP1(form_token->aru_last_id), IP2(form_token->aru_last_id), IP3(form_token->aru_last_id), IP4(form_token->aru_last_id) );
  +        Alarmp( SPLOG_PRINT, PRINT, "FlowControl: %hd\tRTR Len: %hd\n", form_token->flow_control, form_token->rtr_len);
  +        /* Print members list */
  +
  +        Alarmp( SPLOG_PRINT, PRINT, "Form Token members list -- Active (%hd) Pending (%hd)\n", m_info->num_members, m_info->num_pending);
  +        for (i=0; i < m_info->num_members; i++) 
  +        {
  +            Alarmp( SPLOG_PRINT_NODATE, PRINT, "\t%u: %u.%u.%u.%u ", i, IP1(m_info->members[i]), IP2(m_info->members[i]), IP3(m_info->members[i]), IP4(m_info->members[i]) );
  +            if ( (i % 3) == 2 )
  +                Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
  +        }
  +
  +        Alarmp( SPLOG_PRINT_NODATE, PRINT, "\nPending Members:\n");
  +
  +        for (i= m_info->num_members; i < ( m_info->num_members + m_info->num_pending); i++) 
  +        {
  +            Alarmp( SPLOG_PRINT_NODATE, PRINT, "\t%u: %u.%u.%u.%u ", i, IP1(m_info->members[i]), IP2(m_info->members[i]), IP3(m_info->members[i]), IP4(m_info->members[i]) );
  +            if ( (i % 3) == 2 )
  +                Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
  +        }
  +        Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
  +
  +        if ( is_form1 )
  +        {
  +            /* Print reps list */
  +            Alarmp( SPLOG_PRINT, PRINT, "Form Token reps list -- Count (%hd) index (%hd)\n", r_info->num_reps, r_info->rep_index);
  +            for (i=0; i < r_info->num_reps; i++) 
  +            {
  +                Alarmp( SPLOG_PRINT_NODATE, PRINT, "\t%u: %u.%u.%u.%u (T %hd SegInd %hd) ", i, IP1(r_info->reps[i].proc_id), IP2(r_info->reps[i].proc_id), IP3(r_info->reps[i].proc_id), IP4(r_info->reps[i].proc_id), r_info->reps[i].type, r_info->reps[i].seg_index );
  +                if ( (i % 3) == 2 )
  +                    Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
  +            }
  +            Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
  +        } else  /* so is FORM2 type */ 
  +        {
  +            Alarmp( SPLOG_PRINT, PRINT, "Form Token Membership ID %u.%u.%u.%u : %d\n", IP1(m_id_info->proc_id), IP2(m_id_info->proc_id), IP3(m_id_info->proc_id), IP4(m_id_info->proc_id), m_id_info->time );
  +        }
  +
  +        /* Print ring info */
  +        Alarmp( SPLOG_PRINT, PRINT, "Form Token RING list -- Count (%d)\n", *num_rings);
  +        for (i=0; i < *num_rings; i++) 
  +        {
  +            bytes_to_skip = sizeof(ring_info) + ( rg_info->num_holes + rg_info->num_commit ) * sizeof(int32);
  +            c_ptr = (char *) rg_info;
  +
  +            Alarmp( SPLOG_PRINT, PRINT, "Ring %u: MembID %u.%u.%u.%u - %u\tTransTime %u\n", i, IP1(rg_info->memb_id.proc_id), IP2(rg_info->memb_id.proc_id), IP3(rg_info->memb_id.proc_id), IP4(rg_info->memb_id.proc_id), rg_info->memb_id.time, rg_info->trans_time);
  +            Alarmp( SPLOG_PRINT, PRINT, "\tARU: %d\tHighSeq: %d\tNumHoles: %d\n", rg_info->aru, rg_info->highest_seq, rg_info->num_holes);
  +            Alarmp( SPLOG_PRINT, PRINT, "\tNumCommit: %hd\tNumTrans: %hd\n", rg_info->num_commit, rg_info->num_trans);
  +            /* Now print all missing messages from this ring (holes) */
  +            commit_id = (int32 *) &c_ptr[sizeof(ring_info)];
  +
  +            Alarmp( SPLOG_PRINT, PRINT, "\tMessage Holes:");
  +            for (j=0; j < rg_info->num_holes; j++) 
  +            {
  +                Alarmp( SPLOG_PRINT_NODATE, PRINT, "\t%u ", *commit_id);
  +                commit_id++;
  +            }
  +            Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
  +
  +            /* Now print transitional member list */
  +            Alarmp( SPLOG_PRINT, PRINT, "\tTrans List:");
  +            for (j=0; j < rg_info->num_trans; j++) 
  +            {
  +                Alarmp( SPLOG_PRINT_NODATE, PRINT, "\t%u: %u.%u.%u.%u ", j, IP1(*commit_id), IP2(*commit_id), IP3(*commit_id), IP4(*commit_id) );
  +                if ( (j % 3) == 2 )
  +                    Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
  +                commit_id++;
  +            }
  +            Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
  +
  +            /* Now print commit list. This follows the trans list with no gaps. */
  +            Alarmp( SPLOG_PRINT, PRINT, "\tCommit List:");
  +            for (j=rg_info->num_trans; j < rg_info->num_commit; j++) 
  +            {
  +                Alarmp( SPLOG_PRINT_NODATE, PRINT, "\t%u: %u.%u.%u.%u ", j, IP1(*commit_id), IP2(*commit_id), IP3(*commit_id), IP4(*commit_id) );
  +                if ( (j % 3) == 2 )
  +                    Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
  +                commit_id++;
  +            }
  +            Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
  +
  +            /* next ring */
  +	    rg_info = (ring_info *)&c_ptr[bytes_to_skip];
  +        }
  +
  +        Alarmp( SPLOG_PRINT, PRINT, "====================================================\n");
   }
   
   static	void	Backoff_membership()
  
  
  
  1.5       +1 -0      spread/daemon/membership.h
  
  Index: membership.h
  ===================================================================
  RCS file: /storage/cvsroot/spread/daemon/membership.h,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- membership.h	5 Mar 2004 00:32:46 -0000	1.4
  +++ membership.h	2 Oct 2004 22:42:28 -0000	1.5
  @@ -60,5 +60,6 @@
   void		Memb_commit();
   void		Memb_transitional();
   void		Memb_regular();
  +void	        Memb_print_form_token( sys_scatter *scat );
   
   #endif /* INC_MEMBERSHIP */
  
  
  
  1.13      +17 -0     spread/daemon/network.c
  
  Index: network.c
  ===================================================================
  RCS file: /storage/cvsroot/spread/daemon/network.c,v
  retrieving revision 1.12
  retrieving revision 1.13
  diff -u -r1.12 -r1.13
  --- network.c	23 Sep 2004 23:15:18 -0000	1.12
  +++ network.c	2 Oct 2004 22:42:28 -0000	1.13
  @@ -43,6 +43,9 @@
   #include "alarm.h"
   #include "configuration.h"
   
  +/* for Memb_print_form_token() */
  +#include "membership.h"
  +
   static	channel		Bcast_channel[MAX_INTERFACES_PROC];
   static  channel		Token_channel[MAX_INTERFACES_PROC];
   static	channel		Send_channel;
  @@ -600,6 +603,14 @@
   	token_ptr = (token_header *)scat->elements[0].buf;
   	token_ptr->type = Set_endian( token_ptr->type );
   	token_ptr->transmiter_id = My.id;
  +
  +        if ( token_ptr->rtr_len > (MAX_PACKET_SIZE - sizeof(token_header) ) )
  +        {
  +            if ( Is_form( token_ptr->type ) )
  +                Memb_print_form_token( scat );
  +            Alarmp( SPLOG_FATAL, PRINT, "Net_send_token: Token too long for packet!\n");
  +        }
  +
   	ret = DL_send( Send_channel, Token_address, Token_port, scat );
   	return ( ret );
   }
  @@ -653,6 +664,12 @@
   			proc_id );
   		return( ret );
   	}
  +        if ( token_ptr->rtr_len > (MAX_PACKET_SIZE - sizeof(token_header) ) )
  +        {
  +            Memb_print_form_token( scat );
  +            Alarmp( SPLOG_FATAL, PRINT, "Net_ucast_token: Token too long for packet!\n");
  +        }
  +
   	ret = DL_send( Send_channel, proc_id, p.port+1, scat );
   	return( ret );
   }
  
  
  
  1.10      +1 -0      spread/daemon/spread_params.h
  
  Index: spread_params.h
  ===================================================================
  RCS file: /storage/cvsroot/spread/daemon/spread_params.h,v
  retrieving revision 1.9
  retrieving revision 1.10
  diff -u -r1.9 -r1.10
  --- spread_params.h	5 Mar 2004 00:32:46 -0000	1.9
  +++ spread_params.h	2 Oct 2004 22:42:28 -0000	1.10
  @@ -60,6 +60,7 @@
   #define         MAX_INTERFACES_PROC      10
   
   #define         MAX_REPS                 25
  +#define         MAX_FORM_REPS            20
   
   #define		MAX_PACKETS_IN_STRUCT 	8192
   #define		PACKET_MASK		0x00001fff
  
  
  
  1.63      +3 -1      spread/daemon/Readme.txt
  
  Index: Readme.txt
  ===================================================================
  RCS file: /storage/cvsroot/spread/daemon/Readme.txt,v
  retrieving revision 1.62
  retrieving revision 1.63
  diff -u -r1.62 -r1.63
  --- Readme.txt	2 Oct 2004 22:16:00 -0000	1.62
  +++ Readme.txt	2 Oct 2004 22:42:28 -0000	1.63
  @@ -92,7 +92,9 @@
       provided with Spread is added to. This check will cause the daemon to exit 
       immediately after parsing the config file.
   13) Add new Alarm priority flag to print a line with no datestamp (for multi-line output).
  -
  +14) Fix bug where if more then 22 daemons start at the same time, some will crash or the 
  +    membership will not complete correctly. This bug was reported by several people 
  +    including Jesse Noller.  
   
   SOURCE INSTALL:
   ---------------
  
  
  




More information about the Spread-cvs mailing list