[Spread-cvs] cvs commit: spread/daemon membership.c membership.h network.c spread_params.h Readme.txt
jonathan at spread.org
jonathan at spread.org
Sat Oct 2 18:42:28 EDT 2004
jonathan 04/10/02 18:42:28
Modified: daemon membership.c membership.h network.c spread_params.h
Readme.txt
Log:
Fix bug where more then 22 daemons starting up at once would cause some to
crash or not complete membership. The fix involves better checking to
prevent FORM token messages that are longer then they should be from
being sent. It also requires that in fill_form1() if a new ring_info
entry needs to be added to the current FORM token and there is not room
on the token, the new ring_info entry is not added and the current daemon
removes itself from the token. This way it will NOT be added to the
current membership on this iteration of the algorithm, but once that
completes, it will start a new membership add to merge with the rest.
This prevents the FORM token from ever being larger then 1 packet.
Revision Changes Path
1.6 +224 -29 spread/daemon/membership.c
Index: membership.c
===================================================================
RCS file: /storage/cvsroot/spread/daemon/membership.c,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- membership.c 5 Mar 2004 00:32:46 -0000 1.5
+++ membership.c 2 Oct 2004 22:42:28 -0000 1.6
@@ -1048,6 +1048,13 @@
{
return( 0 );
}
+
+ if (m->num_members == MAX_PROCS_RING)
+ {
+ /* members structure is full -- so we ignore this new member */
+ Alarmp( SPLOG_WARNING, MEMB, "Insert_member: members structure full (%u members) so ignore new member (ID %u.%u.%u.%u)\n", m->num_members, IP1(proc_id), IP2(proc_id), IP3(proc_id), IP4(proc_id) );
+ return( 0 );
+ }
m->members[m->num_members] = proc_id;
m->num_members++;
return( 1 );
@@ -1127,6 +1134,12 @@
}
}else Alarm( EXIT, "Insert_rep: unknown rep.type %d \n", rep.type );
+ if (r->num_reps == MAX_REPS)
+ {
+ /* reps structure is full -- so we ignore this new rep */
+ Alarmp( SPLOG_WARNING, MEMB, "Insert_rep: reps structure full (%u reps) so ignore new rep (Type %d SegIndex %u ID %u.%u.%u.%u)\n", r->num_reps, rep.type, rep.seg_index, IP1(rep.proc_id), IP2(rep.proc_id), IP3(rep.proc_id), IP4(rep.proc_id) );
+ return( 0 );
+ }
r->reps[r->num_reps] = rep;
r->num_reps++;
return( 1 );
@@ -1427,6 +1440,7 @@
rep_info temp_rep;
int i,j,k;
int cur_num_members;
+ int num_to_copy;
members_info valid_members;
num_bytes = 0;
@@ -1568,7 +1582,7 @@
*new_num_rings = 0;
num_bytes += sizeof(int32);
- my_rg_info = 0;
+ my_rg_info = NULL;
my_holes_procs_ptr = NULL; /* optimiser thinks it may be used
uninitialised (its wrong, but it
is too subtle for it) */
@@ -1583,7 +1597,6 @@
my_holes_procs_ptr = (int32 *)&c_ptr[sizeof(ring_info)];
old_rg_info = (ring_info *)&c_ptr[bytes_to_copy];
}else{
-
new_rg_info= (ring_info *)&rg_info_buf[num_bytes];
memmove((char *)new_rg_info, (char *)old_rg_info, bytes_to_copy );
c_ptr = (char *) old_rg_info;
@@ -1605,42 +1618,71 @@
new_rg_info->aru = Aru;
new_rg_info->highest_seq = Highest_seq;
- if( my_rg_info == 0 )
+ if( my_rg_info == NULL )
{
- for( index = Last_discarded+1; index <= Highest_seq; index++ )
- {
- pack_entry = index & PACKET_MASK;
- if( ! Packets[pack_entry].exist )
- {
+
+ if ( *new_num_rings > MAX_FORM_REPS )
+ {
+ /* This ring_info entry will NOT fit in the FORM token packet.
+ * So since the ring_info is needed for me (this daemon) to be
+ * included in the current membership ring, we will have
+ * to remove ourselves from the m_info list and not
+ * create this ring_info
+ */
+ Alarmp( SPLOG_WARNING, MEMB, "Fill_form1: ring_info entry for %u.%u.%u.%u will not fit in FORM token. Removing self from current membership attempt by removing IP from m_info list\n", IP1(My.id), IP2(My.id), IP3(My.id), IP4(My.id));
+ /* since new ring is always at the end, we just decrease current byte count */
+ num_bytes = num_bytes - sizeof(ring_info);
+ (*new_num_rings)--;
+
+ /* Remove ourselves from m_info */
+ for ( i=0; i < m_info->num_members; i++)
+ {
+ if( m_info->members[i] == My.id )
+ {
+ num_to_copy = m_info->num_members + m_info->num_pending - i - 1;
+ memmove(&m_info->members[i], &m_info->members[i+1], num_to_copy * sizeof(int32));
+ break;
+ }
+ }
+ m_info->num_members--;
+
+ } else {
+ /* New ring_info will fit, so create it */
+ for( index = Last_discarded+1; index <= Highest_seq; index++ )
+ {
+ pack_entry = index & PACKET_MASK;
+ if( ! Packets[pack_entry].exist )
+ {
*new_holes_procs_ptr = index;
Alarm( MEMB , "INSERT HOLE 2 IS %d\n",index);
new_holes_procs_ptr++;
num_bytes += sizeof(int32);
new_rg_info->num_holes++;
- }
- }
+ }
+ }
- /* update commit-trans procs */
+ /* update commit-trans procs */
- /* insert self in trans and commit */
- new_rg_info->num_commit = 1;
- new_rg_info->num_trans = 1;
- *new_holes_procs_ptr = My.id;
- new_holes_procs_ptr++;
- num_bytes += sizeof(int32);
+ /* insert self in trans and commit */
+ new_rg_info->num_commit = 1;
+ new_rg_info->num_trans = 1;
+ *new_holes_procs_ptr = My.id;
+ new_holes_procs_ptr++;
+ num_bytes += sizeof(int32);
- /* insert other members of commit set */
- for( i=0; i < Commit_set.num_members; i++ )
- {
- /* skipping self, because already there */
- if( Commit_set.members[i] == My.id ) continue;
+ /* insert other members of commit set */
+ for( i=0; i < Commit_set.num_members; i++ )
+ {
+ /* skipping self, because already there */
+ if( Commit_set.members[i] == My.id ) continue;
- /* insert this member */
- *new_holes_procs_ptr = Commit_set.members[i];
- new_holes_procs_ptr++;
- num_bytes += sizeof(int32);
- new_rg_info->num_commit++;
- }
+ /* insert this member */
+ *new_holes_procs_ptr = Commit_set.members[i];
+ new_holes_procs_ptr++;
+ num_bytes += sizeof(int32);
+ new_rg_info->num_commit++;
+ }
+ }
}else{
members_info temp_set;
@@ -1668,6 +1710,7 @@
}
if( my_rg_info->highest_seq < Highest_seq )
+ {
for( index = my_rg_info->highest_seq+1;
index <= Highest_seq; index++ )
{
@@ -1681,7 +1724,7 @@
new_rg_info->num_holes++;
}
}
-
+ }
/* setting temp_set to be trans members only */
temp_set.num_members = 0;
for( i = 0; i < my_rg_info->num_trans; i++ )
@@ -1960,6 +2003,158 @@
State = EVS;
GlobalStatus.state = EVS;
+}
+
+void Memb_print_form_token( sys_scatter *scat )
+{
+ token_header *form_token;
+ members_info *m_info;
+ reps_info *r_info = NULL; /* avoids compile warning -- gcc not detect initialization */
+ membership_id *m_id_info = NULL; /* avoids compile warning -- gcc not detect initialization */
+ ring_info *rg_info;
+ int32 *num_rings;
+ int32 *commit_id;
+ char *c_ptr;
+ int num_bytes, bytes_to_skip;
+ int i,j, scat_index;
+ int is_form1 = 0;
+ char form_name[10];
+
+ num_bytes = 0;
+ scat_index = 1;
+ form_token = (token_header *)scat->elements[0].buf;
+
+ m_info = (members_info *)&scat->elements[scat_index].buf[num_bytes];
+ num_bytes += sizeof(members_info);
+
+ if (num_bytes == scat->elements[scat_index].len )
+ {
+ num_bytes = 0;
+ scat_index++;
+ }
+
+ if ( Is_form1( form_token->type ) )
+ {
+ r_info = (reps_info *)&scat->elements[scat_index].buf[num_bytes];
+ num_bytes += sizeof(reps_info);
+ is_form1 = 1;
+ } else if( Is_form2( form_token->type ) )
+ {
+ m_id_info = (membership_id *)&scat->elements[scat_index].buf[num_bytes];
+ num_bytes += sizeof(membership_id);
+ is_form1 = 0;
+ } else {
+ Alarm( EXIT, "Invalid token type received: 0x%x\n", form_token->type);
+ return;
+ }
+
+ if (num_bytes == scat->elements[scat_index].len )
+ {
+ num_bytes = 0;
+ scat_index++;
+ }
+
+ num_rings = (int32 *)&scat->elements[scat_index].buf[num_bytes];
+ num_bytes += sizeof(int32);
+
+ rg_info= (ring_info *)&scat->elements[scat_index].buf[num_bytes];
+
+ /* Print form_token_header */
+
+ Alarmp( SPLOG_PRINT, PRINT, "=========== Form Token ==========\n");
+
+ if ( is_form1 )
+ snprintf(&form_name[0], 10, "FORM 1");
+ else
+ snprintf(&form_name[0], 10, "FORM 2");
+
+ Alarmp( SPLOG_PRINT, PRINT, "%s Token, sent by %u.%u.%u.%u. Seq: %d\n", form_name, IP1(form_token->transmiter_id), IP2(form_token->transmiter_id), IP3(form_token->transmiter_id), IP4(form_token->transmiter_id), form_token->seq);
+ Alarmp( SPLOG_PRINT, PRINT, "ProcID: %u.%u.%u.%u\t ARU: %d, ARU LastID: %u.%u.%u.%u\n", IP1(form_token->proc_id), IP2(form_token->proc_id), IP3(form_token->proc_id), IP4(form_token->proc_id), form_token->aru, IP1(form_token->aru_last_id), IP2(form_token->aru_last_id), IP3(form_token->aru_last_id), IP4(form_token->aru_last_id) );
+ Alarmp( SPLOG_PRINT, PRINT, "FlowControl: %hd\tRTR Len: %hd\n", form_token->flow_control, form_token->rtr_len);
+ /* Print members list */
+
+ Alarmp( SPLOG_PRINT, PRINT, "Form Token members list -- Active (%hd) Pending (%hd)\n", m_info->num_members, m_info->num_pending);
+ for (i=0; i < m_info->num_members; i++)
+ {
+ Alarmp( SPLOG_PRINT_NODATE, PRINT, "\t%u: %u.%u.%u.%u ", i, IP1(m_info->members[i]), IP2(m_info->members[i]), IP3(m_info->members[i]), IP4(m_info->members[i]) );
+ if ( (i % 3) == 2 )
+ Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
+ }
+
+ Alarmp( SPLOG_PRINT_NODATE, PRINT, "\nPending Members:\n");
+
+ for (i= m_info->num_members; i < ( m_info->num_members + m_info->num_pending); i++)
+ {
+ Alarmp( SPLOG_PRINT_NODATE, PRINT, "\t%u: %u.%u.%u.%u ", i, IP1(m_info->members[i]), IP2(m_info->members[i]), IP3(m_info->members[i]), IP4(m_info->members[i]) );
+ if ( (i % 3) == 2 )
+ Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
+ }
+ Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
+
+ if ( is_form1 )
+ {
+ /* Print reps list */
+ Alarmp( SPLOG_PRINT, PRINT, "Form Token reps list -- Count (%hd) index (%hd)\n", r_info->num_reps, r_info->rep_index);
+ for (i=0; i < r_info->num_reps; i++)
+ {
+ Alarmp( SPLOG_PRINT_NODATE, PRINT, "\t%u: %u.%u.%u.%u (T %hd SegInd %hd) ", i, IP1(r_info->reps[i].proc_id), IP2(r_info->reps[i].proc_id), IP3(r_info->reps[i].proc_id), IP4(r_info->reps[i].proc_id), r_info->reps[i].type, r_info->reps[i].seg_index );
+ if ( (i % 3) == 2 )
+ Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
+ }
+ Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
+ } else /* so is FORM2 type */
+ {
+ Alarmp( SPLOG_PRINT, PRINT, "Form Token Membership ID %u.%u.%u.%u : %d\n", IP1(m_id_info->proc_id), IP2(m_id_info->proc_id), IP3(m_id_info->proc_id), IP4(m_id_info->proc_id), m_id_info->time );
+ }
+
+ /* Print ring info */
+ Alarmp( SPLOG_PRINT, PRINT, "Form Token RING list -- Count (%d)\n", *num_rings);
+ for (i=0; i < *num_rings; i++)
+ {
+ bytes_to_skip = sizeof(ring_info) + ( rg_info->num_holes + rg_info->num_commit ) * sizeof(int32);
+ c_ptr = (char *) rg_info;
+
+ Alarmp( SPLOG_PRINT, PRINT, "Ring %u: MembID %u.%u.%u.%u - %u\tTransTime %u\n", i, IP1(rg_info->memb_id.proc_id), IP2(rg_info->memb_id.proc_id), IP3(rg_info->memb_id.proc_id), IP4(rg_info->memb_id.proc_id), rg_info->memb_id.time, rg_info->trans_time);
+ Alarmp( SPLOG_PRINT, PRINT, "\tARU: %d\tHighSeq: %d\tNumHoles: %d\n", rg_info->aru, rg_info->highest_seq, rg_info->num_holes);
+ Alarmp( SPLOG_PRINT, PRINT, "\tNumCommit: %hd\tNumTrans: %hd\n", rg_info->num_commit, rg_info->num_trans);
+ /* Now print all missing messages from this ring (holes) */
+ commit_id = (int32 *) &c_ptr[sizeof(ring_info)];
+
+ Alarmp( SPLOG_PRINT, PRINT, "\tMessage Holes:");
+ for (j=0; j < rg_info->num_holes; j++)
+ {
+ Alarmp( SPLOG_PRINT_NODATE, PRINT, "\t%u ", *commit_id);
+ commit_id++;
+ }
+ Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
+
+ /* Now print transitional member list */
+ Alarmp( SPLOG_PRINT, PRINT, "\tTrans List:");
+ for (j=0; j < rg_info->num_trans; j++)
+ {
+ Alarmp( SPLOG_PRINT_NODATE, PRINT, "\t%u: %u.%u.%u.%u ", j, IP1(*commit_id), IP2(*commit_id), IP3(*commit_id), IP4(*commit_id) );
+ if ( (j % 3) == 2 )
+ Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
+ commit_id++;
+ }
+ Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
+
+ /* Now print commit list. This follows the trans list with no gaps. */
+ Alarmp( SPLOG_PRINT, PRINT, "\tCommit List:");
+ for (j=rg_info->num_trans; j < rg_info->num_commit; j++)
+ {
+ Alarmp( SPLOG_PRINT_NODATE, PRINT, "\t%u: %u.%u.%u.%u ", j, IP1(*commit_id), IP2(*commit_id), IP3(*commit_id), IP4(*commit_id) );
+ if ( (j % 3) == 2 )
+ Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
+ commit_id++;
+ }
+ Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n");
+
+ /* next ring */
+ rg_info = (ring_info *)&c_ptr[bytes_to_skip];
+ }
+
+ Alarmp( SPLOG_PRINT, PRINT, "====================================================\n");
}
static void Backoff_membership()
1.5 +1 -0 spread/daemon/membership.h
Index: membership.h
===================================================================
RCS file: /storage/cvsroot/spread/daemon/membership.h,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- membership.h 5 Mar 2004 00:32:46 -0000 1.4
+++ membership.h 2 Oct 2004 22:42:28 -0000 1.5
@@ -60,5 +60,6 @@
void Memb_commit();
void Memb_transitional();
void Memb_regular();
+void Memb_print_form_token( sys_scatter *scat );
#endif /* INC_MEMBERSHIP */
1.13 +17 -0 spread/daemon/network.c
Index: network.c
===================================================================
RCS file: /storage/cvsroot/spread/daemon/network.c,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -r1.12 -r1.13
--- network.c 23 Sep 2004 23:15:18 -0000 1.12
+++ network.c 2 Oct 2004 22:42:28 -0000 1.13
@@ -43,6 +43,9 @@
#include "alarm.h"
#include "configuration.h"
+/* for Memb_print_form_token() */
+#include "membership.h"
+
static channel Bcast_channel[MAX_INTERFACES_PROC];
static channel Token_channel[MAX_INTERFACES_PROC];
static channel Send_channel;
@@ -600,6 +603,14 @@
token_ptr = (token_header *)scat->elements[0].buf;
token_ptr->type = Set_endian( token_ptr->type );
token_ptr->transmiter_id = My.id;
+
+ if ( token_ptr->rtr_len > (MAX_PACKET_SIZE - sizeof(token_header) ) )
+ {
+ if ( Is_form( token_ptr->type ) )
+ Memb_print_form_token( scat );
+ Alarmp( SPLOG_FATAL, PRINT, "Net_send_token: Token too long for packet!\n");
+ }
+
ret = DL_send( Send_channel, Token_address, Token_port, scat );
return ( ret );
}
@@ -653,6 +664,12 @@
proc_id );
return( ret );
}
+ if ( token_ptr->rtr_len > (MAX_PACKET_SIZE - sizeof(token_header) ) )
+ {
+ Memb_print_form_token( scat );
+ Alarmp( SPLOG_FATAL, PRINT, "Net_ucast_token: Token too long for packet!\n");
+ }
+
ret = DL_send( Send_channel, proc_id, p.port+1, scat );
return( ret );
}
1.10 +1 -0 spread/daemon/spread_params.h
Index: spread_params.h
===================================================================
RCS file: /storage/cvsroot/spread/daemon/spread_params.h,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -r1.9 -r1.10
--- spread_params.h 5 Mar 2004 00:32:46 -0000 1.9
+++ spread_params.h 2 Oct 2004 22:42:28 -0000 1.10
@@ -60,6 +60,7 @@
#define MAX_INTERFACES_PROC 10
#define MAX_REPS 25
+#define MAX_FORM_REPS 20
#define MAX_PACKETS_IN_STRUCT 8192
#define PACKET_MASK 0x00001fff
1.63 +3 -1 spread/daemon/Readme.txt
Index: Readme.txt
===================================================================
RCS file: /storage/cvsroot/spread/daemon/Readme.txt,v
retrieving revision 1.62
retrieving revision 1.63
diff -u -r1.62 -r1.63
--- Readme.txt 2 Oct 2004 22:16:00 -0000 1.62
+++ Readme.txt 2 Oct 2004 22:42:28 -0000 1.63
@@ -92,7 +92,9 @@
provided with Spread is added to. This check will cause the daemon to exit
immediately after parsing the config file.
13) Add new Alarm priority flag to print a line with no datestamp (for multi-line output).
-
+14) Fix bug where if more then 22 daemons start at the same time, some will crash or the
+ membership will not complete correctly. This bug was reported by several people
+ including Jesse Noller.
SOURCE INSTALL:
---------------
More information about the Spread-cvs
mailing list