[Spread-cvs] commit: r860 - branches/spread_5/daemon
jschultz at spread.org
jschultz at spread.org
Mon Nov 14 12:19:35 EST 2016
Author: jschultz
Date: 2016-11-14 12:19:30 -0500 (Mon, 14 Nov 2016)
New Revision: 860
Modified:
branches/spread_5/daemon/arch.h
branches/spread_5/daemon/configuration.c
branches/spread_5/daemon/configuration.h
branches/spread_5/daemon/membership.c
branches/spread_5/daemon/membership.h
branches/spread_5/daemon/net_types.h
branches/spread_5/daemon/network.c
branches/spread_5/daemon/prot_body.h
branches/spread_5/daemon/protocol.c
branches/spread_5/daemon/spread_params.h
Log:
Futher work
Modified: branches/spread_5/daemon/arch.h
===================================================================
--- branches/spread_5/daemon/arch.h 2016-11-03 14:51:42 UTC (rev 859)
+++ branches/spread_5/daemon/arch.h 2016-11-14 17:19:30 UTC (rev 860)
@@ -59,7 +59,14 @@
#undef INTSIZE64
#undef INTSIZE16
+#ifndef int8
+# define int8 signed char
+#endif
+#ifndef int8u
+# define int8u unsigned char
+#endif
+
#ifndef ARCH_PC_WIN95
/* If we aren't using windows... we can use autoconf */
Modified: branches/spread_5/daemon/configuration.c
===================================================================
--- branches/spread_5/daemon/configuration.c 2016-11-03 14:51:42 UTC (rev 859)
+++ branches/spread_5/daemon/configuration.c 2016-11-14 17:19:30 UTC (rev 860)
@@ -487,81 +487,121 @@
int Conf_proc_by_id( int32u id, proc *p )
{
- return( Conf_proc_by_id_in_conf( Config, id, p ));
+ return Conf_proc_by_id_in_conf( Config, id, p );
}
-int Conf_proc_ref_by_id( int32u id, proc **p )
+int Conf_proc_by_name( char *name, proc *p )
{
- return( Conf_proc_ref_by_id_in_conf( Config, id, p ));
+ return Conf_proc_by_name_in_conf( Config, name, p );
}
-int Conf_proc_by_name( char *name, proc *p )
+int Conf_proc_by_index( int index, proc *p )
{
- return( Conf_proc_by_name_in_conf( Config, name, p));
+ return Conf_proc_by_index_in_conf( Config, index, p );
}
+int Conf_proc_ref_by_id( int32u id, proc **p )
+{
+ return Conf_proc_ref_by_id_in_conf( Config, id, p );
+}
+
int Conf_proc_ref_by_name( char *name, proc **p )
{
- return( Conf_proc_ref_by_name_in_conf( Config, name, p));
+ return Conf_proc_ref_by_name_in_conf( Config, name, p );
}
+int Conf_proc_ref_by_index( int index, proc **p )
+{
+ return Conf_proc_ref_by_index_in_conf( Config, index, p );
+}
+
int Conf_proc_by_id_in_conf( configuration *config, int32u id, proc *p )
{
- int ret;
- proc *p2;
+ proc *p2 = NULL;
+ int ret = Conf_proc_ref_by_id_in_conf( config, id, &p2 );
- if ( (ret = Conf_proc_ref_by_id_in_conf( config, id, &p2 )) >= 0 )
+ if ( ret >= 0 )
*p = *p2;
- return( ret );
+ return ret;
}
+int Conf_proc_by_name_in_conf( configuration *config, char *name, proc *p )
+{
+ proc *p2 = NULL;
+ int ret = Conf_proc_ref_by_name_in_conf( config, name, &p2 );
+
+ if ( ret >= 0 )
+ *p = *p2;
+
+ return ret;
+}
+
+int Conf_proc_by_index_in_conf( configuration *config, int index, proc *p )
+{
+ proc *p2 = NULL;
+ int ret = Conf_proc_ref_by_index_in_conf( config, index, &p2 );
+
+ if ( ret >= 0 )
+ *p = *p2;
+
+ return ret;
+}
+
int Conf_proc_ref_by_id_in_conf( configuration *config, int32u id, proc **p )
{
int i;
- for ( i=0; i < config->num_total_procs; i++ )
+ for ( i = 0; i < config->num_total_procs; ++i )
{
if ( config->allprocs[i].id == id )
{
*p = &config->allprocs[i];
- return( i );
+ return i;
}
}
- return( -1 );
+
+ return -1;
}
-int Conf_proc_by_name_in_conf( configuration *config, char *name, proc *p )
-{
- int ret;
- proc *p2;
-
- if ( (ret = Conf_proc_ref_by_name_in_conf( config, name, &p2 )) >= 0 )
- *p = *p2;
-
- return( ret );
-}
-
int Conf_proc_ref_by_name_in_conf( configuration *config, char *name, proc **p )
{
- int i;
+ int i;
- for ( i=0; i < config->num_total_procs; i++ )
+ for ( i = 0; i < config->num_total_procs; ++i )
{
- if ( !strcmp( config->allprocs[i].name, name ) )
+ if ( !strncmp( config->allprocs[i].name, name, sizeof(config->allprocs[i].name) ) )
{
*p = &config->allprocs[i];
- return( i );
+ return i;
}
- }
- return( -1 );
+ }
+
+ return -1;
}
+int Conf_proc_ref_by_index_in_conf( configuration *config, int index, proc **p )
+{
+ assert( index >= 0 && index < config->num_total_procs );
+
+ if ( index < 0 || index >= config->num_total_procs )
+ return -1;
+
+ *p = &config->allprocs[index];
+
+ return index;
+}
+
const char *Conf_name_by_id( int32u id )
{
- return Conf_name_by_in_conf( Config, id );
+ return Conf_name_by_id_in_conf( Config, id );
}
+const char *Conf_name_by_index( int conf_index )
+{
+ return Conf_name_by_index_in_conf( Config, conf_index );
+}
+
const char *Conf_name_by_id_in_conf( configuration *config, int32u id )
{
const char *ret = "unknown_proc";
@@ -573,6 +613,18 @@
return ret;
}
+const char *Conf_name_by_index_in_conf( configuration *config, int conf_index )
+{
+ const char *ret = "unknown_proc";
+ proc *p = NULL;
+
+ if ( Conf_proc_ref_by_index_in_conf( config, conf_index, &p ) >= 0 )
+ ret = p->name;
+
+ return ret;
+}
+
+
int Conf_id_in_seg( segment *seg, int32u id )
{
int j;
@@ -686,7 +738,7 @@
index_in_seg = Conf_id_in_seg( &config->segments[My.seg_index], My.id );
if( index_in_seg > 0 )
- return config->segments[My.seg_index].procs[index_in_seg - 1]->id; /* I am not first in my segment; previous is previous proc in segment */
+ return config->segments[My.seg_index].procs[index_in_seg - 1]->id; /* I am not first in my segment; previous is previous proc in segment */
for( i = My.seg_index - 1; i >= 0; --i )
if( config->segments[i].num_procs > 0 )
@@ -751,12 +803,14 @@
/* NOTE: auto-generated vids must be a cross platform (in host byte order), deterministic function of proc.name only! */
- if (ret == 0) /* protect reserved proc id's: 0 ~1 in 2^32 chance */
+ if (ret == 0 || ret == (int32u) -1) /* protect reserved proc id's: 0 and -1: ~2 in 2^32 chance */
{
- ret = (int32u) tlen << 16; /* assume users will typically configure either most or least significant byte of VIDs; avoid them */
+ ret = (int32u) tlen << 16; /* assume users will typically configure either most or least significant byte of VIDs; avoid them */
- if (ret == 0) /* next to impossible: requires a tlen with lower 16 bits == 0 and that hashes to zero */
- ret = (int32u) 0x1 << 16; /* assume users will typically configure either most or least significant byte of VIDs; avoid them */
+ if (ret == 0) /* next to impossible: requires a tlen with lower 16 bits == 0 and that hashes to zero */
+ ret = (int32u) 0x1 << 16; /* assume users will typically configure either most or least significant byte of VIDs; avoid them */
+
+ assert( ret != (int32u) -1 ); /* -1 should be impossible based on above operations */
}
return ret;
Modified: branches/spread_5/daemon/configuration.h
===================================================================
--- branches/spread_5/daemon/configuration.h 2016-11-03 14:51:42 UTC (rev 859)
+++ branches/spread_5/daemon/configuration.h 2016-11-14 17:19:30 UTC (rev 860)
@@ -109,16 +109,29 @@
configuration Conf(void);
configuration *Conf_ref(void);
proc Conf_my(void);
+
int Conf_proc_by_id( int32u id, proc *p );
+int Conf_proc_by_name( char *name, proc *p );
+int Conf_proc_by_index( int conf_index, proc *p );
+
int Conf_proc_ref_by_id( int32u id, proc **p );
-int Conf_proc_by_name( char *name, proc *p );
int Conf_proc_ref_by_name( char *name, proc **p );
+int Conf_proc_ref_by_index( int conf_index, proc **p );
+
int Conf_proc_by_id_in_conf( configuration *config, int32u id, proc *p );
+int Conf_proc_by_name_in_conf( configuration *config, char *name, proc *p );
+int Conf_proc_by_index_in_conf( configuration *config, int conf_index, proc *p );
+
int Conf_proc_ref_by_id_in_conf( configuration *config, int32u id, proc **p );
-int Conf_proc_by_name_in_conf( configuration *config, char *name, proc *p );
int Conf_proc_ref_by_name_in_conf( configuration *config, char *name, proc **p );
+int Conf_proc_ref_by_index_in_conf( configuration *config, int conf_index, proc *p );
+
const char *Conf_name_by_id( int32u id );
+const char *Conf_name_by_index( int conf_index );
+
const char *Conf_name_by_id_in_conf( configuration *config, int32u id );
+const char *Conf_name_by_index_in_conf( configuration *config, int conf_index );
+
int Conf_id_in_seg( segment *seg, int32u id );
int Conf_id_in_conf( configuration *config, int32u id );
int Conf_num_procs( configuration *config );
Modified: branches/spread_5/daemon/membership.c
===================================================================
--- branches/spread_5/daemon/membership.c 2016-11-03 14:51:42 UTC (rev 859)
+++ branches/spread_5/daemon/membership.c 2016-11-14 17:19:30 UTC (rev 860)
@@ -33,6 +33,12 @@
*
*/
+/* TODO: We assume that Conf_proc_by_id et al always return the
+ * position of the daemon in the OVERALL static configuration, not
+ * some subset of it (e.g. - the current membership). Make sure this
+ * is true.
+ */
+
#include <string.h>
#include <stdio.h>
#include <assert.h>
@@ -51,32 +57,63 @@
#define SEG_REP 1
#define RING_REP 2
+/* NOTE: One reason we use signed version so that we can assign value
+ * from Conf_proc_by_id and then check if result is negative.
+ */
+
+typedef int8 encoded_id;
+
typedef struct dummy_members_info{
int16 num_members;
int16 num_pending;
int32 members[MAX_PROCS_RING];
} members_info;
+typedef struct
+{
+ int8 num_members;
+ int8 num_pending;
+ encoded_id members[MAX_PROCS_RING];
+
+} encoded_members_info;
+
typedef struct dummy_rep_info{
int32 proc_id;
int16 type;
int16 seg_index;
} rep_info;
+typedef struct
+{
+ encoded_id index_id;
+ int8 type;
+ int8 seg_index;
+
+} encoded_rep_info;
+
typedef struct dummy_reps_info{
int16 num_reps;
int16 rep_index;
rep_info reps[MAX_REPS];
} reps_info;
-typedef struct dummy_ring_info{
+typedef struct
+{
+ int8 num_reps;
+ int8 rep_index;
+ encoded_rep_info reps[MAX_REPS];
+
+} encoded_reps_info;
+
+typedef struct dummy_ring_info
+{
membership_id memb_id;
int32 trans_time;
int32 aru;
int32 highest_seq;
- int32 num_holes;
- int16 num_commit;
- int16 num_trans;
+ int16 num_holes;
+ int8 num_commit;
+ int8 num_trans;
} ring_info;
bool Memb_Just_Installed = FALSE; /* tracks if we just installed a reg memb due to last token we sent */
@@ -107,7 +144,7 @@
static sys_scatter Send_pack;
-static sp_time Zero_timeout = { 0, 0};
+static sp_time Zero_timeout = { 0, 0 };
static void Memb_handle_alive ( sys_scatter *scat );
static void Memb_handle_join ( sys_scatter *scat );
@@ -132,7 +169,7 @@
static int Insert_rep( reps_info *r, rep_info rep );
static int32 Smallest_member( members_info *m, int *index );
static int32 Smallest_rep( reps_info *r, int *index );
-static void Sort_members( members_info *m );
+static void Sort_members( encoded_members_info *m );
static void Sort_reps( reps_info *r );
static void Create_form1();
static void Fill_form1( sys_scatter *scat );
@@ -140,9 +177,11 @@
static void Backoff_membership();
static void Flip_members( members_info *members_ptr );
static void Flip_reps( reps_info *reps_ptr );
-static void Flip_rings( char *buf );
+static int Fix_form1_token( sys_scatter *scat, encoded_members_info **m_info_ptr, encoded_reps_info **r_info_ptr, int *num_rings_ptr, int *num_bytes_ptr );
+static int Fix_form2_token( sys_scatter *scat, encoded_members_info **m_info_ptr, membership_id **memb_info_ptr, int *num_rings_ptr, int *num_bytes_ptr );
+static int Fix_form_token( sys_scatter *scat, encoded_members_info **m_info_ptr, encoded_reps_info **r_info_ptr, membership_id **memb_info_ptr, int *num_rings_ptr, int *num_bytes_ptr );
-static const char *State_str(void)
+const char *Memb_state_str(void)
{
const char *ret = "UNKNOWN";
@@ -154,7 +193,7 @@
case SEG: ret = "SEG/Alive"; break;
case REPRESENTED: ret = "REPRESENTED/Alive"; break;
case GATHER: ret = "GATHER/Alive"; break;
- case FORM: ret = "FORM/Alive"; break; /* NOTE: shouldn't happen */
+ case FORM: ret = "FORM/Alive"; break; /* shouldn't happen */
case EVS: ret = "EVS/Alive"; break;
}
}
@@ -162,12 +201,12 @@
{
switch (State)
{
- case OP: ret = "OP/Dead"; break; /* NOTE: shouldn't happen */
+ case OP: ret = "OP/Dead"; break; /* shouldn't happen */
case SEG: ret = "SEG/Dead"; break;
case REPRESENTED: ret = "REPRESENTED/Dead"; break;
case GATHER: ret = "GATHER/Dead"; break;
case FORM: ret = "FORM/Dead"; break;
- case EVS: ret = "EVS/Dead"; break; /* NOTE: shouldn't happen */
+ case EVS: ret = "EVS/Dead"; break; /* shouldn't happen */
}
}
@@ -369,7 +408,7 @@
{
packet_header *pack_ptr = (packet_header *)scat->elements[0].buf;
- Alarmp( SPLOG_INFO, MEMB, "Memb_handle_alive from '%s', State is %s\n", Conf_name_by_id( pack_ptr->proc_id ), State_str() );
+ Alarmp( SPLOG_INFO, MEMB, "Memb_handle_alive from '%s', State is %s\n", Conf_name_by_id( pack_ptr->proc_id ), Memb_state_str() );
switch( State )
{
@@ -433,9 +472,8 @@
static void Memb_handle_join( sys_scatter *scat )
{
- packet_header *pack_ptr = (packet_header *) scat->elements[0].buf;
- members_info *members_ptr = (members_info *) scat->elements[1].buf;
- reps_info *reps_ptr;
+ packet_header *pack_ptr = (packet_header *) scat->elements[0].buf;
+ reps_info *reps_ptr = (reps_info *) scat->elements[1].buf;
packet_header refer_pack;
sys_scatter send_scat;
proc p;
@@ -443,15 +481,9 @@
int ret;
int dummy;
- Alarmp( SPLOG_INFO, MEMB, "Memb_handle_join from '%s', State is %s\n", Conf_name_by_id( pack_ptr->proc_id ), State_str() );
+ Alarmp( SPLOG_INFO, MEMB, "Memb_handle_join from '%s', State is %s\n", Conf_name_by_id( pack_ptr->proc_id ), Memb_state_str() );
if( !Same_endian( pack_ptr->type ) )
- Flip_members( members_ptr );
-
- i = 2 * sizeof(int16) + members_ptr->num_members * sizeof(int32);
- reps_ptr = (reps_info *) &scat->elements[1].buf[i];
-
- if( !Same_endian( pack_ptr->type ) )
Flip_reps( reps_ptr );
switch( State )
@@ -509,8 +541,6 @@
/* this guy is my representative */
My_seg_rep = pack_ptr->proc_id;
Shift_to_represented();
- for( i=0; i < members_ptr->num_members; i++ )
- if( members_ptr->members[i] == My.id ) break;
Scast_alive( 1 );
}else{
/* no need to remember this join */
@@ -530,8 +560,6 @@
My_seg_rep = pack_ptr->proc_id;
}
E_queue( Shift_to_seg_event, 0, NULL, Rep_timeout );
- for( i=0; i < members_ptr->num_members; i++ )
- if( members_ptr->members[i] == My.id ) break;
Scast_alive( 1 );
}
else
@@ -591,7 +619,7 @@
rep_info temp_rep;
int ret;
- Alarmp( SPLOG_INFO, MEMB, "Memb_handle_refer from '%s', State is %s\n", Conf_name_by_id( pack_ptr->proc_id ), State_str() );
+ Alarmp( SPLOG_INFO, MEMB, "Memb_handle_refer from '%s', State is %s\n", Conf_name_by_id( pack_ptr->proc_id ), Memb_state_str() );
switch( State )
{
@@ -633,7 +661,7 @@
proc p;
int ret;
- Alarmp( SPLOG_INFO, MEMB, "Memb_handle_foreign from '%s', State is %s\n", Conf_name_by_id( pack_ptr->proc_id ), State_str() );
+ Alarmp( SPLOG_INFO, MEMB, "Memb_handle_foreign from '%s', State is %s\n", Conf_name_by_id( pack_ptr->proc_id ), Memb_state_str() );
switch( State )
{
@@ -696,7 +724,7 @@
{
token_header *form_token = (token_header *) scat->elements[0].buf;
- Alarmp( SPLOG_INFO, MEMB, "Memb_handle_form1 from '%s', State is %s\n", Conf_name_by_id( form_token->proc_id ), State_str() );
+ Alarmp( SPLOG_INFO, MEMB, "Memb_handle_form1 from '%s', State is %s\n", Conf_name_by_id( form_token->proc_id ), Memb_state_str() );
switch( State )
{
@@ -728,7 +756,7 @@
{
token_header *form_token = (token_header *) scat->elements[0].buf;
- Alarmp( SPLOG_INFO, MEMB, "Memb_handle_form2 from '%s', State is %s\n", Conf_name_by_id( form_token->proc_id ), State_str() );
+ Alarmp( SPLOG_INFO, MEMB, "Memb_handle_form2 from '%s', State is %s\n", Conf_name_by_id( form_token->proc_id ), Memb_state_str() );
switch( State )
{
@@ -758,7 +786,7 @@
rep_info temp_rep;
int i;
- Alarmp( SPLOG_WARNING, MEMB, "Memb_token_loss: ############### I lost my token, State is %s\n", State_str() );
+ Alarmp( SPLOG_WARNING, MEMB, "Memb_token_loss: ############### I lost my token, State is %s\n", Memb_state_str() );
switch( State )
{
@@ -873,7 +901,7 @@
static void Shift_to_op( void )
{
- Alarmp( SPLOG_Info, MEMB, "Shift_to_op: State was %s\n", State_str() );
+ Alarmp( SPLOG_Info, MEMB, "Shift_to_op: State was %s\n", Memb_state_str() );
State = OP;
GlobalStatus.state = OP;
@@ -884,7 +912,7 @@
static void Shift_to_seg( void )
{
- Alarmp( SPLOG_Info, MEMB, "Shift_to_seg: State was %s\n", State_str() );
+ Alarmp( SPLOG_Info, MEMB, "Shift_to_seg: State was %s\n", Memb_state_str() );
State = SEG;
GlobalStatus.state = SEG;
@@ -907,7 +935,7 @@
int dummy;
Alarmp( SPLOG_Info, MEMB, "Gather_or_represented: State was %s; I am%s smallest.\n",
- State_str(), ( smallest != My.id ? " NOT" : "" ) );
+ Memb_state_str(), ( smallest != My.id ? " NOT" : "" ) );
My_seg_rep = -1;
@@ -926,7 +954,7 @@
static void Shift_to_gather( void )
{
- Alarmp( SPLOG_Info, MEMB, "Shift_to_gather: State was %s\n", State_str() );
+ Alarmp( SPLOG_Info, MEMB, "Shift_to_gather: State was %s\n", Memb_state_str() );
State = GATHER;
GlobalStatus.state = GATHER;
@@ -946,7 +974,7 @@
static void Shift_to_represented( void )
{
- Alarmp( SPLOG_Info, MEMB, "Shift_to_represented: State was %s\n", State_str() );
+ Alarmp( SPLOG_Info, MEMB, "Shift_to_represented: State was %s\n", Memb_state_str() );
State = REPRESENTED;
GlobalStatus.state = REPRESENTED;
@@ -963,7 +991,7 @@
int i;
int dummy;
- Alarmp( SPLOG_Info, MEMB, "Form_or_fail: State is %s\n", State_str() );
+ Alarmp( SPLOG_Info, MEMB, "Form_or_fail: State is %s\n", Memb_state_str() );
if( Smallest_rep( &F_reps, &dummy ) == My.id )
{
@@ -1022,16 +1050,12 @@
{
packet_header *pack_ptr;
- Alarmp( SPLOG_INFO, MEMB, "Scast_alive: State is %s\n", State_str() );
+ Alarmp( SPLOG_INFO, MEMB, "Scast_alive: State is %s\n", Memb_state_str() );
pack_ptr = (packet_header *)Send_pack.elements[0].buf;
pack_ptr->type = ALIVE_TYPE;
- pack_ptr->data_len =
- 2*sizeof(int16) + (F_members.num_members)*sizeof(int32);
-
- Send_pack.elements[1].buf = (char *)&F_members;
- Send_pack.elements[1].len = pack_ptr->data_len;
- Send_pack.num_elements = 2;
+ pack_ptr->data_len = 0;
+ Send_pack.num_elements = 1;
Net_scast( My.seg_index, &Send_pack );
if( code == 0 )
E_queue( Scast_alive_event, 0, NULL, Alive_timeout );
@@ -1047,20 +1071,19 @@
packet_header *pack_ptr;
int i;
- Alarmp( SPLOG_INFO, MEMB, "Send_join: State is %s\n", State_str() );
+ Alarmp( SPLOG_INFO, MEMB, "Send_join: State is %s\n", Memb_state_str() );
pack_ptr = (packet_header *)Send_pack.elements[0].buf;
pack_ptr->type = JOIN_TYPE;
- Send_pack.elements[1].buf = (char *)&F_members;
- Send_pack.elements[1].len =
- 2*sizeof(int16) + (F_members.num_members)*sizeof(int32);
- Send_pack.elements[2].buf = (char *)&F_reps;
- Send_pack.elements[2].len =
- 2*sizeof(int16) + (F_reps.num_reps)*sizeof(rep_info);
- Send_pack.num_elements = 3;
- pack_ptr->data_len = Send_pack.elements[1].len + Send_pack.elements[2].len;
+ assert(F_reps.num_reps >= 0 && F_reps.num_reps <= MAX_REPS);
+
+ Send_pack.elements[1].buf = (char *)&F_reps;
+ Send_pack.elements[1].len = sizeof(F_reps) - sizeof(F_reps.reps[0]) * (MAX_REPS - F_reps.num_reps);
+ Send_pack.num_elements = 2;
+ pack_ptr->data_len = Send_pack.elements[1].len;
+
if( !Token_alive )
{
Net_scast( My.seg_index, &Send_pack );
@@ -1084,11 +1107,11 @@
int num_missing;
int i,j;
- Alarmp( SPLOG_INFO, MEMB, "Memb_lookup_new_member: State is %s\n", State_str() );
+ Alarmp( SPLOG_INFO, MEMB, "Memb_lookup_new_member: State is %s\n", Memb_state_str() );
if( State != OP )
{
- Alarmp( SPLOG_INFO, MEMB, "Memb_lookup_new_member: State is not OP, returning\n", State_str() );
+ Alarmp( SPLOG_INFO, MEMB, "Memb_lookup_new_member: State is not OP, returning\n", Memb_state_str() );
return;
}
@@ -1103,15 +1126,14 @@
pack_ptr = (packet_header *)Send_pack.elements[0].buf;
pack_ptr->type = JOIN_TYPE;
- Send_pack.elements[1].buf = (char *)&F_members;
- Send_pack.elements[1].len =
- 2*sizeof(int16) + (F_members.num_members)*sizeof(int32);
- Send_pack.elements[2].buf = (char *)&F_reps;
- Send_pack.elements[2].len =
- 2*sizeof(int16) + (F_reps.num_reps)*sizeof(rep_info);
- Send_pack.num_elements = 3;
+
+ assert(F_reps.num_reps >= 0 && F_reps.num_reps <= MAX_REPS);
+
+ Send_pack.elements[1].buf = (char *)&F_reps;
+ Send_pack.elements[1].len = sizeof(F_reps) - sizeof(F_reps.reps[0]) * (MAX_REPS - F_reps.num_reps);
+ Send_pack.num_elements = 2;
- pack_ptr->data_len = Send_pack.elements[1].len + Send_pack.elements[2].len;
+ pack_ptr->data_len = Send_pack.elements[1].len;
num_missing = 0;
/* For single segment configured, send local broadcast of join to entire segment -- current members will ignore */
@@ -1317,22 +1339,28 @@
return ( r->reps[current].proc_id );
}
-static void Sort_members( members_info *m )
+static void Sort_members( encoded_members_info *m )
{
- members_info temp_members;
- int index;
- int i;
- int32 dummy;
+ int i, j, curr_index;
+ encoded_id tmp;
- temp_members = *m;
- for( i=0; i < m->num_members; i++ )
- {
- dummy = Smallest_member( &temp_members, &index );
- m->members[i] = temp_members.members[index];
- temp_members.num_members--;
- temp_members.members[index] =
- temp_members.members[temp_members.num_members];
- }
+ assert( m->num_members <= MAX_PROCS_RING );
+
+ for ( i = 0; i < m->num_members; ++i )
+ {
+ curr_index = i;
+
+ for ( j = i + 1; j < m->num_members; ++j )
+ if ( m->members[j] < m->members[curr_index] )
+ curr_index = j;
+
+ if ( curr_index != i )
+ {
+ tmp = m->members[i];
+ m->members[i] = m->members[curr_index];
+ m->members[curr_index] = tmp;
+ }
+ }
}
static void Sort_reps( reps_info *r )
@@ -1353,198 +1381,316 @@
}
}
+
static void Create_form1( void )
{
token_header form_token = { 0 };
- ring_info *rg_info;
- int32 *num_rings;
- int32 *holes_procs_ptr;
+ ring_info *rg_info;
+ char *base_ptr;
+ int32 *num_rings;
+ int32 *holes_ptr;
int32 index;
- int pack_entry;
int num_bytes;
+ int tmp_bytes;
sys_scatter send_scat;
- char rg_info_buf[sizeof(token_body)];
rep_info temp_rep;
- int i,j;
+ int i, j, tmp;
int cur_num_members;
- members_info valid_members;
- Alarmp( SPLOG_INFO, MEMB, "Create_form1: State is %s\n", State_str() );
+ encoded_members_info *e_membs_info;
+ encoded_reps_info *e_reps_info;
+ encoded_id *id_ptr;
+ proc p;
+
+ union
+ {
+ int32 force_align32;
+ char tok_buf[sizeof(token_body)];
+
+ } u = { 0 };
+
+ Alarmp( SPLOG_INFO, MEMB, "Create_form1: State is %s\n", Memb_state_str() );
form_token.type = FORM1_TYPE;
form_token.proc_id = My.id;
form_token.memb_id.proc_id = My.id; /* NOTE: this memb_id is only used to ensure a FORM2 token matches up with the most recent FORM1 token we processed */
form_token.memb_id.time = E_get_time().sec;
- form_token.seq = Highest_seq+3333;
+ form_token.seq = Highest_seq + 3333;
if ( form_token.memb_id.time <= Last_time_used )
form_token.memb_id.time = Last_time_used + 1;
Last_time_used = form_token.memb_id.time;
+ Form1_memb_id = form_token.memb_id;
- Form1_memb_id = form_token.memb_id;
-
/* if I am a ring leader - update my F_members */
- if( F_reps.reps[0].type == RING_REP )
- {
- F_members.num_members = 0;
- for( i=0; i < Membership.num_segments; i++ )
- for( j=0; j < Membership.segments[i].num_procs; j++ )
- {
- F_members.members[F_members.num_members] =
- Membership.segments[i].procs[j]->id;
- F_members.num_members++;
- }
- }
- cur_num_members = 0;
- for ( i = 0; i < F_members.num_members; i++ ) {
- int invalid_member;
- invalid_member = 0;
+ if ( F_reps.reps[0].type == RING_REP )
+ for ( i = 0, F_members.num_members = 0; i < Membership.num_segments; ++i )
+ for ( j = 0; j < Membership.segments[i].num_procs; ++j )
+ F_members.members[F_members.num_members++] = Membership.segments[i].procs[j]->id;
+
+ /* Remove from F_members any members that are also in our F_reps (except myself) */
+
+ for ( i = 0, cur_num_members = 0; i < F_members.num_members; ++i )
+ {
+ for ( j = 0; j < F_reps.num_reps && ( F_members.members[i] != F_reps.reps[j].proc_id || F_members.members[i] == My.id ); ++j )
+ ;
- /* Remove from F_members any members that are also in OUR F_reps (except myself). */
- for ( j = 0; j < F_reps.num_reps; j++ ) {
- if ( (F_members.members[i] == F_reps.reps[j].proc_id ) &&
- (F_members.members[i] != My.id) ) {
- invalid_member = 1;
- break;
- }
- }
- if (!invalid_member) {
- valid_members.members[cur_num_members] = F_members.members[i];
- cur_num_members++;
- }
+ if ( j == F_reps.num_reps )
+ F_members.members[cur_num_members++] = F_members.members[i];
}
- memcpy( &F_members.members[0], &valid_members.members[0], cur_num_members * sizeof(int32) );
+
F_members.num_members = cur_num_members;
- /* I am the first in F_members. put the rest in pending */
+ /* I am the first in F_members. Put the rest in pending */
+
+ assert( F_members.num_members >= 1 && F_members.members[0] == My.id );
+
F_members.num_pending = F_members.num_members - 1;
F_members.num_members = 1;
Sort_reps( &F_reps );
F_reps.rep_index = 1;
+
/* update potential in case of failure */
- Potential_reps.num_reps = 0;
- for( i=0; i < F_reps.num_reps; i++ )
+
+ for ( i = 0, Potential_reps.num_reps = 0; i < F_reps.num_reps; i++ )
{
temp_rep = F_reps.reps[i];
- if( temp_rep.seg_index != My.seg_index )
+
+ if ( temp_rep.seg_index != My.seg_index )
{
temp_rep.type = POTENTIAL_REP;
Insert_rep( &Potential_reps, temp_rep );
}
}
- num_bytes = 0;
- num_rings = (int32 *)rg_info_buf;
- *num_rings= 1;
+ /* begin building form1 token to send */
+
+ num_bytes = 0;
+ send_scat.num_elements = 0;
+
+ send_scat.elements[send_scat.num_elements].buf = (char*) &form_token;
+ send_scat.elements[send_scat.num_elements].len = sizeof(token_header);
+ ++send_scat.num_elements;
+
+ /* fill in F_members */
+
+ base_ptr = &u.tok_buf[num_bytes];
+ e_membs_info = (encoded_members_info*) base_ptr;
+ num_bytes += sizeof(encoded_members_info);
+
+ if ( num_bytes > (int) sizeof( u.tok_buf ) || num_bytes <= 0 )
+ Alarmp( SPLOG_FATAL, MEMB, "Create_form1:%d: token too big; num_bytes (%d)\n", __LINE__, num_bytes );
+
+ e_membs_info->num_members = (int8) F_members.num_members;
+ assert( e_membs_info->num_members == F_members.num_members );
+
+ e_membs_info->num_pending = (int8) F_members.num_pending;
+ assert( e_membs_info->num_pending == F_members.num_pending );
+
+ assert( F_members.num_members > 0 && F_members.num_pending >= 0 && (long) F_members.num_members + F_members.num_pending <= MAX_PROCS_RING );
+
+ for ( i = 0; i < F_members.num_members + F_members.num_pending; ++i )
+ {
+ e_membs_info->members[i] = (encoded_id) Conf_proc_by_id( F_members.members[i], &p );
+ assert( e_membs_info->members[i] >= 0 );
+ }
+
+ if ( ( tmp_bytes = num_bytes % 4 ) != 0 ) /* realign to 32b boundary for simplicity + safety */
+ {
+ num_bytes += 4 - tmp_bytes;
+
+ if ( num_bytes > (int) sizeof( u.tok_buf ) || num_bytes <= 0 )
+ Alarmp( SPLOG_FATAL, MEMB, "Create_form1:%d: token too big; num_bytes (%d)\n", __LINE__, num_bytes );
+ }
+
+ send_scat.elements[send_scat.num_elements].buf = base_ptr;
+ send_scat.elements[send_scat.num_elements].len = &u.tok_buf[num_bytes] - base_ptr;
+ ++send_scat.num_elements;
+
+ /* fill in F_reps */
+
+ base_ptr = &u.tok_buf[num_bytes];
+ e_reps_info = (encoded_reps_info*) base_ptr;
+ num_bytes += sizeof(encoded_reps_info);
+
+ if ( num_bytes > (int) sizeof( u.tok_buf ) || num_bytes <= 0 )
+ Alarmp( SPLOG_FATAL, MEMB, "Create_form1:%d: token too big; num_bytes (%d)\n", __LINE__, num_bytes );
+
+ e_reps_info->num_reps = (int8) F_reps.num_reps;
+ assert( e_reps_info->num_reps == F_reps.num_reps );
+
+ e_reps_info->rep_index = (int8) F_reps.rep_index;
+ assert( e_reps_info->rep_index == F_reps.rep_index );
+
+ assert( F_reps.num_reps > 0 && F_reps.num_reps <= MAX_REPS );
+
+ for ( i = 0; i < F_reps.num_reps; ++i )
+ {
+ e_reps_info->reps[i].index_id = (encoded_id) Conf_proc_by_id( F_reps.reps[i].proc_id, &p );
+ assert( e_reps_info->reps[i].index_id >= 0 );
+
+ e_reps_info->reps[i].type = (int8) F_reps.reps[i].type;
+ assert( e_reps_info->reps[i].type == F_reps.reps[i].type );
+
+ e_reps_info->reps[i].seg_index = (int8) F_reps.reps[i].seg_index;
+ assert( e_reps_info->reps[i].seg_index == F_reps.reps[i].seg_index );
+ }
+
+ if ( ( tmp_bytes = num_bytes % 4 ) != 0 ) /* realign to 32b boundary for simplicity + safety */
+ {
+ num_bytes += 4 - tmp_bytes;
+
+ if ( num_bytes > (int) sizeof( u.tok_buf ) || num_bytes <= 0 )
+ Alarmp( SPLOG_FATAL, MEMB, "Create_form1:%d: token too big; num_bytes (%d)\n", __LINE__, num_bytes );
+ }
+
+ send_scat.elements[send_scat.num_elements].buf = base_ptr;
+ send_scat.elements[send_scat.num_elements].len = &u.tok_buf[num_bytes] - base_ptr;
+ ++send_scat.num_elements;
+
+ /* fill in ring info */
+
+ base_ptr = &u.tok_buf[num_bytes];
+ num_rings = (int32*) base_ptr;
num_bytes += sizeof(int32);
- rg_info = (ring_info *)&rg_info_buf[num_bytes];
+
+ if ( num_bytes > (int) sizeof( u.tok_buf ) || num_bytes <= 0 )
+ Alarmp( SPLOG_FATAL, MEMB, "Create_form1:%d: token too big; num_bytes (%d)\n", __LINE__, num_bytes );
+
+ *num_rings = 1;
+
+ rg_info = (ring_info*) &u.tok_buf[num_bytes];
num_bytes += sizeof(ring_info);
- holes_procs_ptr = (int32 *)&rg_info_buf[num_bytes];
+ if ( num_bytes > (int) sizeof( u.tok_buf ) || num_bytes <= 0 )
+ Alarmp( SPLOG_FATAL, MEMB, "Create_form1:%d: token too big; num_bytes (%d)\n", __LINE__, num_bytes );
+
rg_info->memb_id = Membership_id;
rg_info->trans_time = 0;
rg_info->aru = Aru;
rg_info->highest_seq = Highest_seq;
+ rg_info->num_holes = 0;
+ rg_info->num_commit = 1;
+ rg_info->num_trans = 1;
+
+ Alarmp(SPLOG_INFO, MEMB, "Create_form1: putting MID = (%ld (%s), %ld), Aru = %ld and Highest_Seq = %ld on rg_info form1 token\n",
+ (long) Membership_id.proc_id, Conf_name_by_id(Membership_id.proc_id), (long) Membership_id.time, (long) Aru, (long) Highest_seq);
- Alarmp(SPLOG_INFO, MEMB, "Create_form1: putting Aru = %d and Highest_Seq = %d on rg_info form1 token\n", Aru, Highest_seq);
-
/* update holes */
- rg_info->num_holes = 0;
-
- for( index = My_aru+1; index <= Highest_seq; index++ )
- {
- pack_entry = index & PACKET_MASK;
- if( ! Packets[pack_entry].exist )
+ holes_ptr = (int32*) &u.tok_buf[num_bytes];
+
+ for ( index = My_aru + 1; index <= Highest_seq; ++index )
+ if( ! Packets[index & PACKET_MASK].exist )
{
num_bytes += sizeof(int32);
rg_info->num_holes++;
- if ( num_bytes > (int) sizeof( rg_info_buf ) ) {
+ if ( num_bytes > (int) sizeof( u.tok_buf ) || num_bytes <= 0 )
Alarmp( SPLOG_FATAL, MEMB, "Create_form1:%d: token too big; num_bytes (%d); too many holes (%d)?\n", __LINE__, num_bytes, rg_info->num_holes );
- }
Alarmp( SPLOG_INFO, MEMB, "INSERT HOLE 1 IS %d My_aru is %d, Highest_seq is %d\n", index, My_aru, Highest_seq );
- *holes_procs_ptr = index;
- holes_procs_ptr++;
+ *holes_ptr = index;
+ holes_ptr++;
}
- }
- /* update commit-trans procs */
-
+ /* update commit-trans procs; NOTE: only needs single byte alignment */
/* insert self in trans and commit */
- rg_info->num_commit = 1;
- rg_info->num_trans = 1;
- num_bytes += sizeof(int32);
- if ( num_bytes > (int) sizeof( rg_info_buf ) ) {
+ id_ptr = (encoded_id*) &u.tok_buf[num_bytes];
+ num_bytes += sizeof(encoded_id);
+
+ if ( num_bytes > (int) sizeof( u.tok_buf ) || num_bytes <= 0 )
Alarmp( SPLOG_FATAL, MEMB, "Create_form1:%d: token too big; num_bytes (%d); too many holes (%d)?\n", __LINE__, num_bytes, rg_info->num_holes );
- }
+
+ *id_ptr = (encoded_id) Conf_proc_by_id( My.id, &p );
+ assert( *id_ptr >= 0 );
+ ++id_ptr;
- *holes_procs_ptr = My.id;
- holes_procs_ptr++;
-
/* insert other members of commit set */
- for( i=0; i < Commit_set.num_members; i++ )
+
+ for( i = 0; i < Commit_set.num_members; i++ )
{
- /* skipping self, because already there */
- if( Commit_set.members[i] == My.id ) continue;
+ if ( Commit_set.members[i] == My.id )
+ continue; /* skipping self, because already there */
- /* insert this member */
- num_bytes += sizeof(int32);
+ num_bytes += sizeof(encoded_id);
rg_info->num_commit++;
- if ( num_bytes > (int) sizeof( rg_info_buf ) ) {
+ if ( num_bytes > (int) sizeof( u.tok_buf ) || num_bytes <= 0 )
Alarmp( SPLOG_FATAL, MEMB, "Create_form1:%d: token too big; num_bytes (%d); too many holes (%d), too many num_commit (%d)?\n",
__LINE__, num_bytes, rg_info->num_holes, rg_info->num_commit );
- }
- *holes_procs_ptr = Commit_set.members[i];
- holes_procs_ptr++;
+ *id_ptr = (encoded_id) Conf_proc_by_id( Commit_set.members[i], &p );
+ assert(*id_ptr >= 0);
+ ++id_ptr;
}
-
- send_scat.num_elements = 4;
- send_scat.elements[0].buf = (char *)&form_token;
- send_scat.elements[0].len = sizeof(token_header);
- send_scat.elements[1].buf = (char *)&F_members;
- send_scat.elements[1].len = sizeof(members_info);
- send_scat.elements[2].buf = (char *)&F_reps;
- send_scat.elements[2].len = sizeof(reps_info);
- send_scat.elements[3].buf = rg_info_buf;
- send_scat.elements[3].len = num_bytes;
- form_token.rtr_len = send_scat.elements[1].len + send_scat.elements[2].len + send_scat.elements[3].len;
+ if ( ( tmp_bytes = num_bytes % 4 ) != 0 ) /* realign to 32b boundary for simplicity + safety */
+ {
+ num_bytes += 4 - tmp_bytes;
- /* compute whom to send to */
- if( F_members.num_pending > 0 )
+ if ( num_bytes > (int) sizeof( u.tok_buf ) || num_bytes <= 0 )
+ Alarmp( SPLOG_FATAL, MEMB, "Create_form1:%d: token too big; num_bytes (%d)\n", __LINE__, num_bytes );
+ }
+
+ send_scat.elements[send_scat.num_elements].buf = base_ptr;
+ send_scat.elements[send_scat.num_elements].len = &u.tok_buf[num_bytes] - base_ptr;
+ ++send_scat.num_elements;
+
+ /* fill in data length in token header */
+
+ for ( i = 1, form_token.rtr_len = 0; i < send_scat.num_elements; ++i )
+ form_token.rtr_len += send_scat.elements[i].len;
+
+ assert( send_scat.num_elements == 4 && form_token.rtr_len == num_bytes );
+
+ /* compute to whom to send */
+
+ if ( F_members.num_pending > 0 )
{
/* send to next member in pending list */
Net_ucast_token( F_members.members[F_members.num_members], &send_scat );
Net_ucast_token( F_members.members[F_members.num_members], &send_scat );
/*Net_ucast_token( F_members.members[F_members.num_members], &send_scat );*/
-
- }else if( F_reps.rep_index < F_reps.num_reps){
+ }
+ else if ( F_reps.rep_index < F_reps.num_reps)
+ {
/* send to next rep */
Net_ucast_token( F_reps.reps[F_reps.rep_index].proc_id, &send_scat );
Net_ucast_token( F_reps.reps[F_reps.rep_index].proc_id, &send_scat );
/*Net_ucast_token( F_reps.reps[F_reps.rep_index].proc_id, &send_scat );*/
+ }
+ else
+ {
+ /* singleton membership */
- }else{
- /* singleton membership */
- F_members.num_pending = 1;
- F_members.num_members = 0;
- form_token.type = FORM2_TYPE;
+ form_token.type = FORM2_TYPE;
+ F_members.num_pending = e_membs_info->num_pending = 1;
+ F_members.num_members = e_membs_info->num_members = 0;
+
+ num_bytes -= (int) send_scat.elements[2].len;
+
+ assert( send_scat.elements[2].len >= sizeof(membership_id) );
send_scat.elements[2].len = sizeof(membership_id);
- form_token.rtr_len = send_scat.elements[1].len + send_scat.elements[2].len + send_scat.elements[3].len;
+
+ num_bytes += (int) send_scat.elements[2].len;
+
+ for ( i = 1, form_token.rtr_len = 0; i < send_scat.num_elements; ++i )
+ form_token.rtr_len += send_scat.elements[i].len;
+
+ assert( send_scat.num_elements == 4 && form_token.rtr_len == num_bytes );
+
Net_ucast_token( My.id, &send_scat );
Net_ucast_token( My.id, &send_scat );
/*Net_ucast_token( My.id, &send_scat );*/
}
- if ( Alarm_get_priority() >= SPLOG_INFO && ( Alarm_get_types() & MEMB ) != 0 ) {
+ if ( Alarm_get_priority() >= SPLOG_INFO && ( Alarm_get_types() & MEMB ) != 0 )
+ {
Alarmp( SPLOG_INFO, MEMB, "Create_form1: SENT following token:\n" );
Memb_print_form_token( &send_scat );
}
@@ -1560,28 +1706,21 @@
GlobalStatus.state = FORM;
}
-/* TODO: there are all sorts of memory accesses offset from the token
- * based on information written in the token itself. This is highly
- * dangerous without careful bounds checking at the very least. A
- * malformed packet could easily cause a seg fault or worse.
- */
-
static void Fill_form1( sys_scatter *scat )
{
sys_scatter send_scat;
- token_header *form_token = (token_header *)scat->elements[0].buf;
- members_info *m_info;
- reps_info *r_info;
+ token_header *form_token = (token_header*) scat->elements[0].buf;
ring_info *old_rg_info, *new_rg_info;
ring_info *my_rg_info;
- int32 *old_num_rings, *new_num_rings;
+ int old_num_rings;
+ int32 *new_num_rings;
int32 *my_holes_procs_ptr, *new_holes_procs_ptr;
int32 index;
+ encoded_id *new_id_ptr, *my_id_ptr;
int pack_entry;
- char rg_info_buf[sizeof(token_body)];
char *c_ptr;
char *rings_buf;
- int num_bytes = 0;
+ int num_bytes, tmp_bytes;
int bytes_to_copy;
rep_info temp_rep;
int i,j,k;
@@ -1589,62 +1728,77 @@
int num_to_copy;
members_info valid_members;
- if ( scat->elements[0].len != (int) sizeof(token_header) )
+ encoded_members_info *m_info;
+ encoded_reps_info *r_info;
+ proc p;
+
+ union
{
- Alarmp( SPLOG_WARNING, MEMB, "Fill_form1: WARNING!!! Wrong size header %d (should be %d)\n", (int) scat->elements[0].len, (int) sizeof(token_header) );
- return;
- }
+ int32 force_align32;
+ char tok_buf[sizeof(token_body)];
+
+ } u = { 0 };
- Alarmp( SPLOG_INFO, MEMB, "Fill_form1 from %s, State is %s\n", Conf_name_by_id( form_token->proc_id ), State_str() );
-
- if ( Alarm_get_priority() >= SPLOG_DEBUG && ( Alarm_get_types() & MEMB ) != 0 )
+ assert( scat->num_elements == 2 && scat->elements[0].len == sizeof( token_header) && scat->elements[1].len == sizeof( token_body ) );
+
+ Alarmp( SPLOG_INFO, MEMB, "Fill_form1: transmitter = 0x%08X (%s); proc = 0x%08X (%s), State is %s\n",
+ (unsigned) form_token->transmiter_id, Conf_name_by_id( form_token->transmiter_id ), (unsigned) form_token->proc_id, Conf_name_by_id( form_token->proc_id ), Memb_state_str() );
+
+ /* integrity check + endian flip body of token */
+
+ if ( Fix_form1_token( scat, &m_info, &r_info, &num_rings, &old_rg_info ) )
{
- Alarmp( SPLOG_DEBUG, MEMB, "Fill_form1 received following token:\n" );
- Memb_print_form_token( scat );
+ Alarmp( SPLOG_ERROR, MEMB, "Fill_form1:%d: Bad token from 0x%08X (%s)! Ignoring!\n", (unsigned) form_token->proc_id, Conf_name_by_id( form_token->proc_id ) );
+ return;
}
-
- m_info = (members_info *)scat->elements[1].buf;
- num_bytes += sizeof(members_info);
- r_info = (reps_info *)&scat->elements[1].buf[num_bytes];
- num_bytes += sizeof(reps_info);
+ num_bytes = 0;
+ m_info = (encoded_members_info*) &scat->elements[1].buf[num_bytes];
- rings_buf = &scat->elements[1].buf[num_bytes];
- old_num_rings = (int32 *)&scat->elements[1].buf[num_bytes];
- num_bytes += sizeof(int32);
+
+ rings_buf = &scat->elements[1].buf[num_bytes];
+
+ if ( Fix_rings( rings_buf, (int) scat->elements[1].len - num_bytes, !Same_endian( form_token->type ) ) )
+ {
+ }
- old_rg_info= (ring_info *)&scat->elements[1].buf[num_bytes];
+ old_num_rings = (int32*) &scat->elements[1].buf[num_bytes];
+ num_bytes += sizeof(int32);
+ old_rg_info = (ring_info*) &scat->elements[1].buf[num_bytes];
- if( !Same_endian( form_token->type ) )
- {
- Flip_members( m_info );
- Flip_reps( r_info );
- Flip_rings( rings_buf );
- }
-
/* update header */
+
form_token->proc_id = My.id;
- if( form_token->seq < Highest_seq+3333 ) form_token->seq = Highest_seq+3333;
+
+ if( form_token->seq < Highest_seq+3333 )
+ form_token->seq = Highest_seq+3333;
/* update members and reps */
+
if( State == OP || State == REPRESENTED )
{
/* validity check */
- if( m_info->members[m_info->num_members] != My.id ||
- m_info->num_pending <= 0 ) return;
+ if( m_info->members[m_info->num_members] != Conf_proc_by_id( My.id, &p ) || m_info->num_pending <= 0 )
+ {
+ Alarmp( SPLOG_WARNING, MEMB, "Fill_form1:%d: Bad token; num_members = %d, num_pending = %d, members[num_members] = %d (%d)\n",
+ __LINE__, m_info->num_members, m_info->num_pending, m_info->members[m_info->num_members], Conf_proc_by_id( My.id, &p ) );
+ return;
+ }
m_info->num_members++;
m_info->num_pending--;
-
}
else if( State == GATHER )
{
/* validity check */
- if( r_info->reps[r_info->rep_index].proc_id != My.id ||
+ if( r_info->reps[r_info->rep_index].index_id != Conf_proc_by_id( My.id, &p ) ||
( Token_alive && r_info->reps[r_info->rep_index].type == SEG_REP ) ||
( !Token_alive && r_info->reps[r_info->rep_index].type == RING_REP ) ||
m_info->num_pending != 0 )
{
+ Alarmp( SPLOG_WARNING, MEMB, "Fill_form1:%d: Bad token; rep_index = %d, index_id = %d (%d), Token_alive = %d, type = %d\n",
+ __LINE__, r_info->rep_index, r_info->reps[r_info->rep_index].index_id, Conf_proc_by_id( My.id, &p ),
+ Token_alive, r_info->reps[r_info->rep_index].type );
return;
}
@@ -1672,8 +1826,9 @@
/* 2) Any members of r_info I just received in form1 (except myself). */
for ( j = 0; !invalid_member && j < r_info->num_reps; j++ ) {
- if ( (F_members.members[i] == r_info->reps[j].proc_id ) &&
- (F_members.members[i] != My.id) ) {
+ if ( Conf_proc_by_id( F_members.members[i], &p ) == r_info->reps[j].index_id &&
+ F_members.members[i] != My.id )
+ {
invalid_member = 1;
break;
}
@@ -1701,7 +1856,10 @@
/* Fill myself and my members */
i = m_info->num_members;
for( j=0; j < F_members.num_members; j++, i++ )
- m_info->members[i] = F_members.members[j];
+ {
+ m_info->members[i] = Conf_proc_by_id( F_members.members[j], &p );
+ assert( m_info->members[i] >= 0 );
+ }
m_info->num_pending = F_members.num_members - 1;
m_info->num_members += 1;
@@ -1711,9 +1869,12 @@
i = m_info->num_members;
for( j=0; j < Membership.num_segments; j++ )
for( k=0; k < Membership.segments[j].num_procs; k++, i++ )
- m_info->members[i] = Membership.segments[j].procs[k]->id;
+ {
+ m_info->members[i] = Conf_proc_by_id( Membership.segments[j].procs[k]->id, &p );
+ assert( m_info->members[i] >= 0 );
+ }
- m_info->num_pending = i - m_info->num_members -1;
+ m_info->num_pending = i - m_info->num_members - 1;
m_info->num_members += 1;
}else Alarmp( SPLOG_FATAL, EXIT, "Fill_form1: invalid rep type: %d\n", r_info->reps[r_info->rep_index].type );
@@ -1730,7 +1891,13 @@
Potential_reps.num_reps = 0;
for( i=0; i < r_info->num_reps; i++ )
{
- temp_rep = r_info->reps[i];
+ k = Conf_proc_by_index( r_info->reps[i].index_id, &p );
+ assert( k >= 0 );
+
+ temp_rep.proc_id = p.id;
+ temp_rep.type = r_info->reps[i].type;
+ temp_rep.seg_index = r_info->reps[i].seg_index;
+
if( temp_rep.seg_index != My.seg_index )
{
temp_rep.type = POTENTIAL_REP;
@@ -1738,8 +1905,10 @@
}
}
+ /* update rings info */
- /* update rings info */
+#error "Need some alignment here"
+
num_bytes = 0;
new_num_rings = (int32 *)&rg_info_buf[num_bytes];
*new_num_rings = 0;
@@ -1751,7 +1920,7 @@
is too subtle for it) */
for( i=0; i < *old_num_rings; i++ )
{
- bytes_to_copy = sizeof(ring_info) + ( old_rg_info->num_holes + old_rg_info->num_commit ) * sizeof(int32);
+ bytes_to_copy = sizeof(ring_info) + old_rg_info->num_holes * sizeof(int32) + old_rg_info->num_commit * sizeof(encoded_id);
if( Memb_is_equal( old_rg_info->memb_id, Membership_id ) )
{
@@ -1815,10 +1984,10 @@
/* Remove ourselves from m_info */
for ( i=0; i < m_info->num_members; i++)
{
- if( m_info->members[i] == My.id )
+ if( m_info->members[i] == Conf_proc_by_id( My.id, &p) )
{
num_to_copy = m_info->num_members + m_info->num_pending - i - 1;
- memmove(&m_info->members[i], &m_info->members[i+1], num_to_copy * sizeof(int32));
+ memmove(&m_info->members[i], &m_info->members[i+1], num_to_copy * sizeof(encoded_id));
break;
}
}
@@ -1849,15 +2018,17 @@
/* insert self in trans and commit */
new_rg_info->num_commit = 1;
new_rg_info->num_trans = 1;
- num_bytes += sizeof(int32);
+ new_id_ptr = (encoded_id*) &rg_info_buf[num_bytes];
+ num_bytes += sizeof(encoded_id);
- if ( num_bytes > (int) sizeof( rg_info_buf ) ) {
+ if ( num_bytes > (int) sizeof( rg_info_buf ) )
Alarmp( SPLOG_FATAL, MEMB, "Fill_form1:%d: token too big; num_bytes (%d)\n", __LINE__, num_bytes );
- }
- *new_holes_procs_ptr = My.id;
- new_holes_procs_ptr++;
+ *new_id_ptr = (encoded_id) Conf_proc_by_id( My.id, &p );
+ assert( *new_id_ptr >= 0 );
+ new_id_ptr++;
+
/* insert other members of commit set */
for( i=0; i < Commit_set.num_members; i++ )
{
@@ -1865,15 +2036,15 @@
if( Commit_set.members[i] == My.id ) continue;
/* insert this member */
- num_bytes += sizeof(int32);
+ num_bytes += sizeof(encoded_id);
new_rg_info->num_commit++;
- if ( num_bytes > (int) sizeof( rg_info_buf ) ) {
+ if ( num_bytes > (int) sizeof( rg_info_buf ) )
Alarmp( SPLOG_FATAL, MEMB, "Fill_form1:%d: token too big; num_bytes (%d)\n", __LINE__, num_bytes );
- }
- *new_holes_procs_ptr = Commit_set.members[i];
- new_holes_procs_ptr++;
+ *new_id_ptr = Conf_proc_by_id( Commit_set.members[i], &p );
+ assert( *new_id_ptr >= 0 );
+ new_id_ptr++;
}
}
}else{
@@ -1929,13 +2100,18 @@
}
}
}
-
+
/* setting temp_set to be trans members only */
temp_set.num_members = 0;
+#error "Pointer type aliasing danger?!"
+ my_id_ptr = (encoded_id*) my_holes_procs_ptr;
for( i = 0; i < my_rg_info->num_trans; i++ )
{
- Insert_member( &temp_set, *my_holes_procs_ptr);
- my_holes_procs_ptr++;
+ k = Conf_proc_by_index( *my_id_ptr, &p );
+ assert( k >= 0 );
+
+ Insert_member( &temp_set, p.id );
+ my_id_ptr++;
}
/* creating an updated temp_set based on my_rg_info and Commit_set */
@@ -1949,8 +2125,11 @@
/* adding rest of original commit set */
for( i = my_rg_info->num_trans; i < my_rg_info->num_commit; i++ )
{
- Insert_member( &temp_set, *my_holes_procs_ptr );
- my_holes_procs_ptr++;
+ k = Conf_proc_by_index( *my_id_ptr, &p);
+ assert( k >= 0 );
+
+ Insert_member( &temp_set, p.id );
+ my_id_ptr++;
}
/* adding my commit set */
@@ -1962,16 +2141,18 @@
/* writing my ring commit and trans information to new_rg_info */
new_rg_info->num_commit = temp_set.num_members;
new_rg_info->num_trans = temp_set.num_pending;
+#error "Pointer type aliasing danger?!"
+ new_id_ptr = (encoded_id*) new_holes_procs_ptr;
for( i = 0; i < temp_set.num_members; i++ )
{
- num_bytes += sizeof(int32);
+ num_bytes += sizeof(encoded_id);
- if ( num_bytes > (int) sizeof( rg_info_buf ) ) {
+ if ( num_bytes > (int) sizeof( rg_info_buf ) )
Alarmp( SPLOG_FATAL, MEMB, "Fill_form1:%d: token too big; num_bytes (%d)\n", __LINE__, num_bytes );
- }
- *new_holes_procs_ptr = temp_set.members[i];
- new_holes_procs_ptr++;
+ *new_id_ptr = (encoded_id) Conf_proc_by_id( temp_set.members[i], &p );
+ assert( *new_id_ptr >= 0 );
+ new_id_ptr++;
}
}
@@ -1979,27 +2160,34 @@
send_scat.elements[0].buf = (char *)form_token;
send_scat.elements[0].len = sizeof(token_header);
send_scat.elements[1].buf = (char *)m_info;
- send_scat.elements[1].len = sizeof(members_info);
+ send_scat.elements[1].len = sizeof(encoded_members_info);
send_scat.elements[2].buf = (char *)r_info;
- send_scat.elements[2].len = sizeof(reps_info);
+ send_scat.elements[2].len = sizeof(encoded_reps_info);
send_scat.elements[3].buf = rg_info_buf;
send_scat.elements[3].len = num_bytes;
form_token->rtr_len = send_scat.elements[1].len + send_scat.elements[2].len + send_scat.elements[3].len;
- /* compute whom to send to */
+ /* compute to whom to send */
if( m_info->num_pending > 0 )
{
+ k = Conf_proc_by_index( m_info->members[m_info->num_members], &p );
+ assert( k >= 0 );
+
/* send to next member in pending list */
- Net_ucast_token( m_info->members[m_info->num_members], &send_scat );
- Net_ucast_token( m_info->members[m_info->num_members], &send_scat );
- /*Net_ucast_token( m_info->members[m_info->num_members], &send_scat );*/
+ Net_ucast_token( p.id, &send_scat );
+ Net_ucast_token( p.id, &send_scat );
+ /*Net_ucast_token( p.id, &send_scat );*/
}else if( r_info->rep_index < r_info->num_reps){
+
+ k = Conf_proc_by_index( r_info->reps[r_info->rep_index].index_id, &p );
+ assert( k >= 0 );
+
/* send to next rep */
- Net_ucast_token( r_info->reps[r_info->rep_index].proc_id, &send_scat );
- Net_ucast_token( r_info->reps[r_info->rep_index].proc_id, &send_scat );
- /*Net_ucast_token( r_info->reps[r_info->rep_index].proc_id, &send_scat );*/
+ Net_ucast_token( p.id, &send_scat );
+ Net_ucast_token( p.id, &send_scat );
+ /*Net_ucast_token( p.id, &send_scat );*/
}else{
/* prepare form2 token */
@@ -2010,9 +2198,13 @@
/* this is the only difference between form1 and form2 tokens */
send_scat.elements[2].len = sizeof(membership_id);
form_token->rtr_len = send_scat.elements[1].len + send_scat.elements[2].len + send_scat.elements[3].len;
- Net_ucast_token( m_info->members[0], &send_scat );
- Net_ucast_token( m_info->members[0], &send_scat );
- /*Net_ucast_token( m_info->members[0], &send_scat );*/
+
+ k = Conf_proc_by_index( m_info->members[0], &p );
+ assert( k >= 0 );
+
+ Net_ucast_token( p.id, &send_scat );
+ Net_ucast_token( p.id, &send_scat );
+ /*Net_ucast_token( p.id, &send_scat );*/
}
if ( Alarm_get_priority() >= SPLOG_INFO && ( Alarm_get_types() & MEMB ) != 0 ) {
@@ -2037,7 +2229,6 @@
{
sys_scatter send_scat;
token_header *form_token = (token_header *)scat->elements[0].buf;
- members_info *m_info;
membership_id *m_id_info;
ring_info *rg_info;
ring_info *my_rg_info;
@@ -2046,7 +2237,7 @@
int pack_entry;
char *c_ptr;
char *rings_buf;
- int num_bytes = 0;
+ int num_bytes;
int bytes_to_skip;
int tot_len;
proc p;
@@ -2054,13 +2245,16 @@
int i;
int32 memb_time = 0;
+ encoded_members_info *m_info;
+ encoded_id *id_ptr;
+
if ( scat->elements[0].len != (int) sizeof(token_header) )
{
Alarmp( SPLOG_WARNING, MEMB, "Read_form2: WARNING!!! Wrong size header %d (should be %d)\n", (int) scat->elements[0].len, (int) sizeof(token_header) );
return;
}
- Alarmp( SPLOG_INFO, MEMB, "Fill_form1 from %s, State is %s\n", Conf_name_by_id( form_token->proc_id ), State_str() );
+ Alarmp( SPLOG_INFO, MEMB, "Fill_form1 from %s, State is %s\n", Conf_name_by_id( form_token->proc_id ), Memb_state_str() );
if ( Alarm_get_priority() >= SPLOG_DEBUG && ( Alarm_get_types() & MEMB ) != 0 )
{
@@ -2069,19 +2263,14 @@
}
tot_len = (int) scat->elements[1].len;
+ num_bytes = 0;
- m_info = (members_info *)scat->elements[1].buf;
- num_bytes += sizeof(members_info);
+ m_info = (encoded_members_info*) scat->elements[1].buf;
+ num_bytes += sizeof(encoded_members_info);
- m_id_info = (membership_id *)&scat->elements[1].buf[num_bytes];
- num_bytes += sizeof(membership_id);
+ m_id_info = (membership_id*) &scat->elements[1].buf[num_bytes];
+ num_bytes += sizeof(membership_id);
- rings_buf = &scat->elements[1].buf[num_bytes];
- num_rings = (int32 *)&scat->elements[1].buf[num_bytes];
- num_bytes += sizeof(int32);
-
- rg_info = (ring_info *)&scat->elements[1].buf[num_bytes];
-
if ( num_bytes > tot_len )
{
Alarmp( SPLOG_WARNING, MEMB, "Read_form2:%d: WARNING!!! Malformed packet; num_bytes (%d), tot_len (%d)\n", __LINE__, num_bytes, tot_len );
@@ -2090,15 +2279,25 @@
if( !Same_endian( form_token->type ) )
{
- Flip_members( m_info );
m_id_info->proc_id = Flip_int32( m_id_info->proc_id );
m_id_info->time = Flip_int32( m_id_info->time );
- Flip_rings( rings_buf );
}
- /* print */
- if ( Alarm_get_priority() >= SPLOG_INFO && ( Alarm_get_types() & MEMB ) != 0 )
+ rings_buf = &scat->elements[1].buf[num_bytes];
+
+ if ( Fix_rings( rings_buf, tot_len - num_bytes, !Same_endian( form_token->type ) ) )
{
+ Alarmp( SPLOG_WARNING, MEMB, "Read_form2:%d: WARNING!!! Malformed packet; num_bytes (%d), tot_len (%d)\n", __LINE__, num_bytes, tot_len );
+ return;
+ }
+
+ num_rings = (int32*) &scat->elements[1].buf[num_bytes];
+ num_bytes += sizeof(int32);
+
+ rg_info = (ring_info*) &scat->elements[1].buf[num_bytes];
+
+ if ( Alarm_get_priority() >= SPLOG_DEBUG && ( Alarm_get_types() & MEMB ) != 0 )
+ {
Alarmp( SPLOG_INFO, MEMB, "Read_form2: RECEIVED following token:\n" );
Memb_print_form_token( scat );
}
@@ -2112,11 +2311,14 @@
return;
}
- if( m_info->num_members < 0 || m_info->num_pending <= 0 || (int) m_info->num_members + m_info->num_pending >= MAX_PROCS_RING ||
- m_info->members[m_info->num_members] != My.id )
+ if( m_info->num_members < 0 || m_info->num_pending <= 0 || (int) m_info->num_members + m_info->num_pending > MAX_PROCS_RING ||
+ m_info->members[m_info->num_members] != Conf_proc_by_id( My.id, &p ) )
{
- Alarmp( SPLOG_WARNING, MEMB, "Read_form2:%d: WARNING!!! Malformed packet; num_members (%hd), num_pending (%hd), next (0x%08X) -- dropping!\n",
- __LINE__, m_info->num_members, m_info->num_pending, ( m_info->num_members < MAX_PROCS_RING ? m_info->members[m_info->num_members] : -1 ) );
+ Alarmp( SPLOG_WARNING, MEMB, "Read_form2:%d: WARNING!!! Malformed packet; num_members = %d, num_pending = %d, next = %d, me = %d -- dropping!\n",
+ __LINE__, m_info->num_members, m_info->num_pending,
+ ( m_info->num_members >= 0 && m_info->num_members < MAX_PROCS_RING ? m_info->members[m_info->num_members] : -10000 ),
+ Conf_proc_by_id( My.id, &p ) );
+
return;
}
@@ -2134,7 +2336,7 @@
return;
}
- bytes_to_skip = sizeof(ring_info) + ( rg_info->num_holes + rg_info->num_commit ) * sizeof(int32);
+ bytes_to_skip = sizeof(ring_info) + rg_info->num_holes * sizeof(int32) + rg_info->num_commit * sizeof(encoded_id);
if( Memb_is_equal( rg_info->memb_id, Membership_id ) )
{
my_rg_info = rg_info;
@@ -2190,10 +2392,10 @@
for( i=0; i < (int) m_info->num_members + m_info->num_pending; i++ )
{
- ret = Conf_proc_by_id( m_info->members[i], &p );
+ ret = Conf_proc_by_index( m_info->members[i], &p );
if( ret < 0 )
- Alarmp( SPLOG_FATAL, EXIT, "Read_form2: no such id %u\n", m_info->members[i] );
+ Alarmp( SPLOG_FATAL, EXIT, "Read_form2: no such index_id %d\n", m_info->members[i] );
if ( Conf_append_id_to_seg( &Future_membership.segments[p.seg_index], p.id) == -1)
Alarmp( SPLOG_FATAL, EXIT, "Read_form2: BUG2 no such id %u\n", p.id);
@@ -2255,10 +2457,14 @@
/* extract future commit set (and future trans membership) */
Future_commit_set.num_members = my_rg_info->num_commit;
Future_commit_set.num_pending = my_rg_info->num_trans;
+ id_ptr = (encoded_id*) my_holes_procs_ptr;
for( i=0; i < my_rg_info->num_commit; i++ )
{
- Future_commit_set.members[i] = *my_holes_procs_ptr;
- my_holes_procs_ptr++;
+ ret = Conf_proc_by_index( *id_ptr, &p );
+ assert( ret >= 0 );
+
+ Future_commit_set.members[i] = p.id;
+ id_ptr++
}
/* The token circulates in conf order, which also defines the order
@@ -2342,157 +2548,167 @@
GlobalStatus.state = EVS;
}
+/* NOTE: This function assumes that the token has already been
+ * completely checked for validity, that all of its references stay
+ * within appropriate memory bounds and has been endian corrected to
+ * local host form.
+ */
+
void Memb_print_form_token( sys_scatter *scat )
{
token_header *form_token;
- members_info *m_info;
- reps_info *r_info = NULL;
membership_id *m_id_info = NULL;
ring_info *rg_info;
int32 *num_rings;
+ int32 *holes_ptr;
int32 *commit_id;
char *c_ptr;
- int num_bytes, bytes_to_skip;
- int i,j, scat_index;
- int is_form1 = 0;
- char form_name[10];
+ int num_bytes, len_bytes, tmp_bytes, bytes_to_skip;
+ int i, j, scat_index, tmp;
+ int is_form1;
+ encoded_members_info *m_info;
+ encoded_reps_info *r_info = NULL;
+ encoded_id *commit_id;
+ proc p;
+
+ scat_index = 0;
+ form_token = (token_header*) scat->elements[scat_index++].buf;
+ is_form1 = Is_form1( form_token->type );
num_bytes = 0;
- scat_index = 1;
- form_token = (token_header *)scat->elements[0].buf;
+ len_bytes = 0;
- m_info = (members_info *)&scat->elements[scat_index].buf[num_bytes];
- num_bytes += sizeof(members_info);
+ /* Print form_token_header */
- if ( num_bytes == scat->elements[scat_index].len )
- {
- num_bytes = 0;
- scat_index++;
- }
+ Alarmp( SPLOG_PRINT, PRINT, "=========== Form %d Token ==========\n", ( is_form1 ? 1 : 2 ) );
+ Alarmp( SPLOG_PRINT, PRINT, "type = 0x%X, transmiter_id = 0x%X (%s), proc_id = 0x%X (%s)\n",
+ (unsigned) form_token->type, (unsigned) form_token->transmiter_id, Conf_name_by_id( form_token->transmiter_id ), (unsigned) form_token->proc_id, Conf_name_by_id( form_token->proc_id ) );
+ Alarmp( SPLOG_PRINT, PRINT, "memb_id = [ proc_id = 0x%X (%s), time = %ld ]\n", (unsigned) form_token->memb_id.proc_id, Conf_name_by_id( form_token->memb_id.proc_id ), (long) form_token->memb_id.time );
+ Alarmp( SPLOG_PRINT, PRINT, "seq = %ld, aru = %ld, aru_last_id = 0x%X (%s)\n", (long) form_token->seq, (long) form_token->aru, (unsigned) form_token->aru_last_id, Conf_name_by_id( form_token->aru_last_id ) );
+ Alarmp( SPLOG_PRINT, PRINT, "flow_control = %ld, rtr_len = %ld\n", (long) form_token->flow_control, (long) form_token->rtr_len );
+ Alarmp( SPLOG_PRINT, PRINT, "conf_hash = 0x%X (local hash = 0x%X)\n", (unsigned) form_token->conf_hash, (unsigned) Cn->hash_code );
+
+ m_info = (encoded_members_info*) &scat->elements[scat_index].buf[len_bytes];
+ num_bytes += sizeof(encoded_members_info);
+ len_bytes += sizeof(encoded_members_info);
- if ( Is_form1( form_token->type ) )
- {
- r_info = (reps_info *)&scat->elements[scat_index].buf[num_bytes];
- num_bytes += sizeof(reps_info);
- is_form1 = 1;
- }
- else if ( Is_form2( form_token->type ) )
+ if ( ( tmp_bytes = num_bytes % 4 ) != 0 ) /* realign to 32b boundary for simplicity + safety */
{
- m_id_info = (membership_id *)&scat->elements[scat_index].buf[num_bytes];
- num_bytes += sizeof(membership_id);
- is_form1 = 0;
+ num_bytes += 4 - tmp_bytes;
+ len_bytes += 4 - tmp_bytes;
}
- else
- {
- Alarmp( SPLOG_FATAL, EXIT, "Invalid token type received: 0x%x\n", form_token->type);
- return;
- }
- if ( num_bytes == scat->elements[scat_index].len )
+ if ( len_bytes == scat->elements[scat_index].len )
{
- num_bytes = 0;
- scat_index++;
+ len_bytes = 0;
+ ++scat_index;
}
-
- num_rings = (int32 *)&scat->elements[scat_index].buf[num_bytes];
- num_bytes += sizeof(int32);
+ else if ( len_bytes > scat->elements[scat_index].len )
+ Alarmp( SPLOG_FATAL, EXIT, "Memb_print_form_token:%d: bad token: %d > %d\n", __LINE__, len_bytes, (int) scat->elements[scat_index].len );
- rg_info= (ring_info *)&scat->elements[scat_index].buf[num_bytes];
+ /* Print token members list */
+
+ Alarmp( SPLOG_PRINT, PRINT, "Form Token members list: num_members = %ld, num_pending = %ld\n", (long) m_info->num_members, (long) m_info->num_pending);
+
+ for ( i = 0; i < m_info->num_members; ++i )
+ Alarmp( SPLOG_PRINT, PRINT, "\t%d: %ld (%s)\n", i, (long) m_info->members[i], Conf_name_by_index( m_info->members[i] ) );
- /* Print form_token_header */
+ Alarmp( SPLOG_PRINT, PRINT, "Pending Members:\n");
- Alarmp( SPLOG_PRINT, PRINT, "=========== Form Token ==========\n");
+ for ( i = m_info->num_members; i < m_info->num_members + m_info->num_pending; ++i )
+ Alarmp( SPLOG_PRINT, PRINT, "\t%d: %ld (%s)\n", i, (long) m_info->members[i], Conf_name_by_index( m_info->members[i] ) );
+ /* Print reps list or membership ID depending on token type */
+
if ( is_form1 )
- snprintf(&form_name[0], 10, "FORM 1");
- else
- snprintf(&form_name[0], 10, "FORM 2");
+ {
+ /* NOTE: encoded_reps_info only needs single byte alignment */
+
+ r_info = (encoded_reps_info*) &scat->elements[scat_index].buf[len_bytes];
+ num_bytes += sizeof(encoded_reps_info);
+ len_bytes += sizeof(encoded_reps_info);
- Alarmp( SPLOG_PRINT, PRINT, "%s Token, sent by %u.%u.%u.%u. Seq: %d\n", form_name, IP1(form_token->transmiter_id), IP2(form_token->transmiter_id), IP3(form_token->transmiter_id), IP4(form_token->transmiter_id), form_token->seq);
- Alarmp( SPLOG_PRINT, PRINT, "Configuration hash: %u (local hash %u)\n", form_token->conf_hash, Cn->hash_code);
- Alarmp( SPLOG_PRINT, PRINT, "ProcID: %u.%u.%u.%u\t ARU: %d, ARU LastID: %u.%u.%u.%u\n", IP1(form_token->proc_id), IP2(form_token->proc_id), IP3(form_token->proc_id), IP4(form_token->proc_id), form_token->aru, IP1(form_token->aru_last_id), IP2(form_token->aru_last_id), IP3(form_token->aru_last_id), IP4(form_token->aru_last_id) );
- Alarmp( SPLOG_PRINT, PRINT, "FlowControl: %hd\tRTR Len: %hd\n", form_token->flow_control, form_token->rtr_len);
- /* Print members list */
+ Alarmp( SPLOG_PRINT, PRINT, "Form1 Token reps list: num_reps = %ld, rep_index = %ld\n", (long) r_info->num_reps, (long) r_info->rep_index );
+
+ for ( i = 0; i < r_info->num_reps; ++i )
+ Alarmp( SPLOG_PRINT, PRINT, "\t%d: %ld (%s), type = %ld, seg_index = %ld\n", i, (long) r_info->reps[i].index_id,
+ Conf_name_by_index( r_info->reps[i].index_id ), (long) r_info->reps[i].type, (long) r_info->reps[i].seg_index );
+ }
+ else if ( Is_form2( form_token->type ) )
+ {
+ m_id_info = (membership_id*) &scat->elements[scat_index].buf[len_bytes];
+ num_bytes += sizeof(membership_id);
+ len_bytes += sizeof(membership_id);
- Alarmp( SPLOG_PRINT, PRINT, "Form Token members list -- Active (%hd) Pending (%hd)\n", m_info->num_members, m_info->num_pending);
- for (i=0; i < m_info->num_members; i++)
- {
- Alarmp( SPLOG_PRINT | SPLOG_NODATE, PRINT, "\t%u: %u.%u.%u.%u ", i, IP1(m_info->members[i]), IP2(m_info->members[i]), IP3(m_info->members[i]), IP4(m_info->members[i]) );
- if ( (i % 3) == 2 )
- Alarmp( SPLOG_PRINT | SPLOG_NODATE, PRINT, "\n");
+ Alarmp( SPLOG_PRINT, PRINT, "Form2 Token Membership ID = [ proc_id = 0x%X (%s), time = %ld ]\n",
+ (unsigned) m_id_info->proc_id, Conf_name_by_id( m_id_info->proc_id ), (long) m_id_info->time );
}
+ else
+ Alarmp( SPLOG_FATAL, EXIT, "Memb_print_form_token:%d: Invalid token type: 0x%X\n", __LINE__, (unsigned) form_token->type);
- Alarmp( SPLOG_PRINT | SPLOG_NODATE, PRINT, "\nPending Members:\n");
-
- for (i= m_info->num_members; i < ( m_info->num_members + m_info->num_pending); i++)
+ if ( ( tmp_bytes = num_bytes % 4 ) != 0 ) /* realign to 32b boundary for simplicity + safety */
{
- Alarmp( SPLOG_PRINT | SPLOG_NODATE, PRINT, "\t%u: %u.%u.%u.%u ", i, IP1(m_info->members[i]), IP2(m_info->members[i]), IP3(m_info->members[i]), IP4(m_info->members[i]) );
- if ( (i % 3) == 2 )
- Alarmp( SPLOG_PRINT | SPLOG_NODATE, PRINT, "\n");
+ num_bytes += 4 - tmp_bytes;
+ len_bytes += 4 - tmp_bytes;
}
- Alarmp( SPLOG_PRINT | SPLOG_NODATE, PRINT, "\n");
- if ( is_form1 )
+ if ( len_bytes == scat->elements[scat_index].len )
{
- /* Print reps list */
- Alarmp( SPLOG_PRINT, PRINT, "Form Token reps list -- Count (%hd) index (%hd)\n", r_info->num_reps, r_info->rep_index);
- for (i=0; i < r_info->num_reps; i++)
- {
- Alarmp( SPLOG_PRINT | SPLOG_NODATE, PRINT, "\t%u: %u.%u.%u.%u (T %hd SegInd %hd) ", i, IP1(r_info->reps[i].proc_id), IP2(r_info->reps[i].proc_id), IP3(r_info->reps[i].proc_id), IP4(r_info->reps[i].proc_id), r_info->reps[i].type, r_info->reps[i].seg_index );
- if ( (i % 3) == 2 )
- Alarmp( SPLOG_PRINT | SPLOG_NODATE, PRINT, "\n");
- }
- Alarmp( SPLOG_PRINT | SPLOG_NODATE, PRINT, "\n");
- } else /* so is FORM2 type */
- {
- Alarmp( SPLOG_PRINT, PRINT, "Form Token Membership ID %u.%u.%u.%u : %d\n", IP1(m_id_info->proc_id), IP2(m_id_info->proc_id), IP3(m_id_info->proc_id), IP4(m_id_info->proc_id), m_id_info->time );
+ len_bytes = 0;
+ ++scat_index;
}
+ else if ( len_bytes > scat->elements[scat_index].len )
+ Alarmp( SPLOG_FATAL, EXIT, "Memb_print_form_token:%d: bad token: %d > %d\n", __LINE__, len_bytes, (int) scat->elements[scat_index].len );
+ num_rings = (int32*) &scat->elements[scat_index].buf[len_bytes];
+ num_bytes += sizeof(int32);
+ len_bytes += sizeof(int32);
+
/* Print ring info */
- Alarmp( SPLOG_PRINT, PRINT, "Form Token RING list -- Count (%d)\n", *num_rings);
- for (i=0; i < *num_rings; i++)
+
+ Alarmp( SPLOG_PRINT, PRINT, "Form Token RING list: num_rings = %ld\n", (long) *num_rings);
+
+ for ( i = 0; i < *num_rings; ++i )
{
- bytes_to_skip = sizeof(ring_info) + ( rg_info->num_holes + rg_info->num_commit ) * sizeof(int32);
- c_ptr = (char *) rg_info;
-
- Alarmp( SPLOG_PRINT, PRINT, "Ring %u: MembID %u.%u.%u.%u - %u\tTransTime %u\n", i, IP1(rg_info->memb_id.proc_id), IP2(rg_info->memb_id.proc_id), IP3(rg_info->memb_id.proc_id), IP4(rg_info->memb_id.proc_id), rg_info->memb_id.time, rg_info->trans_time);
- Alarmp( SPLOG_PRINT, PRINT, "\tARU: %d\tHighSeq: %d\tNumHoles: %d\n", rg_info->aru, rg_info->highest_seq, rg_info->num_holes);
- Alarmp( SPLOG_PRINT, PRINT, "\tNumCommit: %hd\tNumTrans: %hd\n", rg_info->num_commit, rg_info->num_trans);
- /* Now print all missing messages from this ring (holes) */
- commit_id = (int32 *) &c_ptr[sizeof(ring_info)];
-
- Alarmp( SPLOG_PRINT, PRINT, "\tMessage Holes:");
- for (j=0; j < rg_info->num_holes; j++)
+ if ( ( tmp_bytes = num_bytes % 4 ) != 0 ) /* realign to 32b boundary for simplicity + safety */
{
- Alarmp( SPLOG_PRINT | SPLOG_NODATE, PRINT, "\t%u ", *commit_id);
- commit_id++;
+ num_bytes += 4 - tmp_bytes;
+ len_bytes += 4 - tmp_bytes;
}
- Alarmp( SPLOG_PRINT | SPLOG_NODATE, PRINT, "\n");
+
+ rg_info = (ring_info*) &scat->elements[scat_index].buf[len_bytes];
+ num_bytes += sizeof(ring_info) + rg_info->num_holes * sizeof(int32) + rg_info->num_commit * sizeof(encoded_id);
+ len_bytes += sizeof(ring_info) + rg_info->num_holes * sizeof(int32) + rg_info->num_commit * sizeof(encoded_id);
+
+ Alarmp( SPLOG_PRINT, PRINT, "Ring %d: memb_id = [ proc_id = 0x%X (%s), time = %ld ], trans_time = %ld\n", i, rg_info->memb_id.proc_id,
+ Conf_name_by_id( rg_info->memb_id.proc_id ), (long) rg_info->memb_id.time, (long) rg_info->trans_time );
+ Alarmp( SPLOG_PRINT, PRINT, "\t" "aru = %ld, highest_seq = %ld, num_holes = %ld\n", (long) rg_info->aru, (long) rg_info->highest_seq, (long) rg_info->num_holes );
+ Alarmp( SPLOG_PRINT, PRINT, "\t" "num_commit = %ld, num_trans = %ld\n", (long) rg_info->num_commit, (long) rg_info->num_trans );
+
+ /* Print all missing messages from this ring (holes) */
+
+ commit_id = (int32*) &scat->elements[scat_index].buf[len_bytes];
- /* Now print transitional member list */
- Alarmp( SPLOG_PRINT, PRINT, "\tTrans List:");
- for (j=0; j < rg_info->num_trans; j++)
- {
- Alarmp( SPLOG_PRINT | SPLOG_NODATE, PRINT, "\t%u: %u.%u.%u.%u ", j, IP1(*commit_id), IP2(*commit_id), IP3(*commit_id), IP4(*commit_id) );
- if ( (j % 3) == 2 )
- Alarmp( SPLOG_PRINT | SPLOG_NODATE, PRINT, "\n");
- commit_id++;
- }
+ Alarmp( SPLOG_PRINT, PRINT, "\t" "Message Holes:" );
+
+ for ( j = 0; j < rg_info->num_holes; ++j, ++commit_id )
+ Alarmp( SPLOG_PRINT | SPLOG_NODATE, PRINT, " %ld", (long) *commit_id );
+
Alarmp( SPLOG_PRINT | SPLOG_NODATE, PRINT, "\n");
- /* Now print commit list. This follows the trans list with no gaps. */
- Alarmp( SPLOG_PRINT, PRINT, "\tCommit List:");
- for (j=rg_info->num_trans; j < rg_info->num_commit; j++)
- {
- Alarmp( SPLOG_PRINT | SPLOG_NODATE, PRINT, "\t%u: %u.%u.%u.%u ", j, IP1(*commit_id), IP2(*commit_id), IP3(*commit_id), IP4(*commit_id) );
- if ( (j % 3) == 2 )
- Alarmp( SPLOG_PRINT | SPLOG_NODATE, PRINT, "\n");
- commit_id++;
- }
- Alarmp( SPLOG_PRINT | SPLOG_NODATE, PRINT, "\n");
+ /* Print transitional member list */
+
+ Alarmp( SPLOG_PRINT, PRINT, "\t" "Trans List:\n");
- /* next ring */
- rg_info = (ring_info *)&c_ptr[bytes_to_skip];
+ for ( j = 0, id_ptr = (encoded_id*) commit_id; j < rg_info->num_trans; ++j, ++id_ptr )
+ Alarmp( SPLOG_PRINT | SPLOG_NODATE, PRINT, "\t\t" "%d: %ld (%s)\n", j, (long) *id_ptr, Conf_name_by_index( *id_ptr ) );
+
+ /* Print commit list. This follows the trans list with no gaps. */
+
+ Alarmp( SPLOG_PRINT, PRINT, "\t" "Commit List:\n");
+
+ for ( ; j < rg_info->num_commit; ++j, ++id_ptr )
+ Alarmp( SPLOG_PRINT | SPLOG_NODATE, PRINT, "\t\t" "%d: %ld (%s)\n", j, (long) *id_ptr, Conf_name_by_index( *id_ptr ) );
}
Alarmp( SPLOG_PRINT, PRINT, "====================================================\n");
@@ -2661,46 +2877,227 @@
}
}
-void Flip_rings( char *buf )
+static int Fix_form1_token( sys_scatter *scat, encoded_members_info **m_info_ptr, encoded_reps_info **r_info_ptr, int *num_rings, int *num_bytes )
{
-/*
- * This routine can not be called twice beacuse of *num_rings
- * and of ring_info_ptr->num_holes
- */
- ring_info *ring_info_ptr;
- int32 *num_rings;
- int ptr;
- char *c_ptr;
- int32 *seq_or_proc;
- int i,j;
+ return Fix_form_token( scat, m_info_ptr, r_info_ptr, NULL, num_rings, num_bytes );
+}
- c_ptr = buf;
- ptr = 0;
- num_rings = (int32 *)&c_ptr[ptr];
+static int Fix_form2_token( sys_scatter *scat, encoded_members_info **m_info_ptr, membership_id **memb_info_ptr, int *num_rings, int *num_bytes )
+{
+ return Fix_form_token( scat, m_info_ptr, NULL, memb_info_ptr, num_rings, num_bytes );
+}
- *num_rings = Flip_int32( *num_rings );
- ptr += sizeof(int32);
+static int Fix_form_token( sys_scatter *scat, encoded_members_info **m_info_ptr, encoded_reps_info **r_info_ptr, membership_id **memb_info_ptr, int *num_rings_ptr, int *num_bytes_ptr )
+{
+ token_header *form_token;
+ ring_info *rp;
+ int32 *num_rings;
+ int num_bytes, tmp_bytes;
+ int32 *hole;
+ encoded_id *id;
+ proc p;
+ int i, j;
- for( i=0; i < *num_rings; i++ )
- {
- ring_info_ptr = (ring_info *)&c_ptr[ptr];
+ assert( scat->num_elements == 2 && scat->elements[0].len == sizeof( token_header) && scat->elements[1].len == sizeof( token_body ) );
+
+ form_token = (token_header*) scat->elements[0].buf;
+ endian_flip = !Same_endian( form->token_type );
+ num_bytes = 0;
- ring_info_ptr->memb_id.proc_id = Flip_int32( ring_info_ptr->memb_id.proc_id );
- ring_info_ptr->memb_id.time = Flip_int32( ring_info_ptr->memb_id.time );
- ring_info_ptr->trans_time = Flip_int32( ring_info_ptr->trans_time );
- ring_info_ptr->aru = Flip_int32( ring_info_ptr->aru );
- ring_info_ptr->highest_seq = Flip_int32( ring_info_ptr->highest_seq );
- ring_info_ptr->num_holes = Flip_int32( ring_info_ptr->num_holes );
- ring_info_ptr->num_commit = Flip_int16( ring_info_ptr->num_commit );
- ring_info_ptr->num_trans = Flip_int16( ring_info_ptr->num_trans );
+#error "Additional token intergrity checks?"
+
+ if ( !Is_form1( form_token->type) && !Is_form2( form_token->type ) )
+ {
+ Alarmp( SPLOG_ERROR, MEMB, "Fix_form_token:%d: Bad token type: 0x%08X\n", __LINE__, form_token->type );
+ return -1;
+ }
- ptr += sizeof(ring_info);
+ *m_info_ptr = (encoded_members_info*) &scat->elements[1].buf[num_bytes];
+ num_bytes += sizeof(encoded_members_info);
- for( j=0; j < ( ring_info_ptr->num_holes + ring_info_ptr->num_commit ); j++ )
- {
- seq_or_proc = (int32 *)&c_ptr[ptr];
- *seq_or_proc = Flip_int32( *seq_or_proc );
- ptr += sizeof(int32);
- }
- }
+ if ( ( tmp_bytes = num_bytes % 4 ) != 0 ) /* realign to 32b boundary for simplicity + safety */
+ num_bytes += 4 - tmp_bytes;
+
+ if ( num_bytes > (int) scat->elements[1].len || num_bytes < 0 )
+ {
+ Alarmp( SPLOG_ERROR, MEMB, "Fix_form_token:%d: buffer too small: num_bytes = %d, len = %ld\n", __LINE__, num_bytes, (long) scat->elements[1].len );
+ return -1;
+ }
+
+ /* NOTE: encoded_members_info doesn't require any endian correction */
+
+ if ( m_info->num_members < 0 || m_info->num_pending < 0 || (long) m_info->num_members + m_info->num_pending > MAX_PROCS_RING )
+ {
+ Alarmp( SPLOG_ERROR, MEMB, "Fix_form_token:%d: Bad members info: num_members = %ld, num_pending = %ld, m+p = %ld, MAX_PROCS_RING = %ld\n", __LINE__,
+ (long) m_info->num_members, (long) m_info->num_pending, (long) m_info->num_members + m_info->num_pending, (long) MAX_PROCS_RING );
+ return -1;
+ }
+
+ if ( Is_form1( form_token->type ) )
+ {
+ assert( r_info_ptr != NULL && memb_info_ptr == NULL );
+
+ *r_info_ptr = (encoded_reps_info*) &scat->elements[1].buf[num_bytes];
+ num_bytes += sizeof(encoded_reps_info);
+
+ if ( ( tmp_bytes = num_bytes % 4 ) != 0 ) /* realign to 32b boundary for simplicity + safety */
+ num_bytes += 4 - tmp_bytes;
+
+ if ( num_bytes > (int) scat->elements[1].len || num_bytes < 0 )
+ {
+ Alarmp( SPLOG_ERROR, MEMB, "Fix_form_token:%d: buffer too small: num_bytes = %d, len = %ld\n", __LINE__, num_bytes, (long) scat->elements[1].len );
+ return -1;
+ }
+
+ /* NOTE: encoded_reps_info doesn't require any endian correction */
+
+ #error "ensure that rep_index stays below num_reps"
+
+ if ( r_info->num_reps < 0 || r_info->rep_index < 0 || r_info->num_reps > MAX_REPS || r_info->rep_index >= r_info->num_reps )
+ {
+ Alarmp( SPLOG_ERROR, MEMB, "Fix_form_token:%d: Bad reps info: num_reps = %ld, rep_index = %ld, MAX_REPS = %ld\n", __LINE__,
+ (long) r_info->num_reps, (long) r_info->rep_index, (long) MAX_REPS );
+ return -1;
+ }
+ }
+ else
+ {
+ assert( r_info_ptr == NULL && memb_info_ptr != NULL );
+
+ *memb_info_ptr = (membership_id*) &scat->elements[1].buf[num_bytes];
+ num_bytes += sizeof(membership_id);
+
+ if ( ( tmp_bytes = num_bytes % 4 ) != 0 ) /* realign to 32b boundary for simplicity + safety */
+ num_bytes += 4 - tmp_bytes;
+
+ if ( num_bytes > (int) scat->elements[1].len || num_bytes < 0 )
+ {
+ Alarmp( SPLOG_ERROR, MEMB, "Fix_form_token:%d: buffer too small: num_bytes = %d, len = %ld\n", __LINE__, num_bytes, (long) scat->elements[1].len );
+ return -1;
+ }
+
+ if ( endian_flip )
+ {
+ (*memb_info_ptr)->proc_id = Flip_int32( (*memb_info_ptr)->proc_id );
+ (*memb_info_ptr)->time = Flip_int32( (*memb_info_ptr)->time );
+ }
+
+#error "Additional integrity checks possible?"
+ }
+
+ num_rings = (int32*) &scat->elements[1].buf[num_bytes];
+ num_bytes += sizeof(int32);
+
+ if ( num_bytes > (int) scat->elements[1].len || num_bytes < 0 )
+ {
+ Alarmp( SPLOG_ERROR, MEMB, "Fix_form_token:%d: buffer too small: num_bytes = %d, len = %ld\n", __LINE__, num_bytes, (long) scat->elements[1].len );
+ return -1;
+ }
+
+ if ( endian_flip )
+ *num_rings = Flip_int32( *num_rings );
+
+ if ( *num_rings <= 0 || *num_rings > MAX_REPS )
+ {
+ Alarmp( SPLOG_ERROR, MEMB, "Fix_form_token:%d: bad num_rings = %ld, MAX_REPS = %ld\n", __LINE__, (long) *num_rings, (long) MAX_REPS );
+ return -1;
+ }
+
+ *num_rings_ptr = (int) *num_rings;
+
+ for ( i = 0; i < *num_rings_ptr; ++i )
+ {
+ if ( ( tmp_bytes = num_bytes % 4 ) != 0 ) /* realign to 32b boundary for simplicity + safety */
+ num_bytes += 4 - tmp_bytes;
+
+ rp = (ring_info*) &scat->elements[1].buf[num_bytes];
+ num_bytes += sizeof(ring_info);
+
+ if ( num_bytes > (int) scat->elements[1].len || num_bytes < 0 )
+ {
+ Alarmp( SPLOG_ERROR, MEMB, "Fix_form_token:%d: buffer too small: num_bytes = %d, len = %ld\n", __LINE__, num_bytes, (long) scat->elements[1].len );
+ return -1;
+ }
+
+ if ( endian_flip )
+ {
+ rp->memb_id.proc_id = Flip_int32( rp->memb_id.proc_id );
+ rp->memb_id.time = Flip_int32( rp->memb_id.time );
+ rp->trans_time = Flip_int32( rp->trans_time );
+ rp->aru = Flip_int32( rp->aru );
+ rp->highest_seq = Flip_int32( rp->highest_seq );
+ rp->num_holes = Flip_int16( rp->num_holes );
+ /*rp->num_commit = rp->num_commit;*/
+ /*rp->num_trans = rp->num_trans;*/
+ }
+
+#error "Extra assertions on memb_id trans_time?"
+#error "Double check these assertions are correct"
+
+ if ( rp->aru < 0 || rp->highest_seq < 0 || rp->num_holes < 0 || rp->num_commit < 0 || rp->num_trans < 0 )
+ {
+ Alarmp( SPLOG_ERROR, MEMB, "Fix_form_token:%d: bad ring info: aru = %ld, highest_seq = %ld, num_holes = %ld, num_commit = %ld, num_trans = %ld\n", __LINE__,
+ (long) rp->aru, (long) rp->highest_seq, (long) rp->num_holes, (long) rp->num_commit, (long) rp->num_trans );
+ return -1;
+ }
+
+ if ( rp->aru > rp->highest_seq || rp->highest_seq - rp->aru > MAX_SEQ_GAP )
+ {
+ Alarmp( SPLOG_ERROR, MEMB, "Fix_form_token:%d: bad aru: aru = %ld, highest_seq = %ld, diff = %ld, MAX_SEQ_GAP = %ld\n", __LINE__,
+ (long) rp->aru, (long) rp->highest_seq, (long) (rp->highest_seq - rp->aru), (long) MAX_SEQ_GAP );
+ return -1;
+ }
+
+ if ( rp->num_holes > rp->highest_seq - rp->aru )
+ {
+ Alarmp( SPLOG_ERROR, MEMB, "Fix_form_token:%d: bad num holes: num_holes = %ld, aru = %ld, highest_seq = %ld, diff = %ld, MAX_SEQ_GAP = %ld\n", __LINE__,
+ (long) rp->num_holes, (long) rp->aru, (long) rp->highest_seq, (long) (rp->highest_seq - rp->aru), (long) MAX_SEQ_GAP );
+ return -1;
+ }
+
+ if ( rp->num_commit < rp->num_trans )
+ {
+ Alarmp( SPLOG_ERROR, MEMB, "Fix_form_token:%d: bad commit list: num_commit = %ld < num_trans = %ld\n", __LINE__, (long) rp->num_commit, (long) rp->num_trans );
+ return -1;
+ }
+
+ hole = (int32*) &scat->elements[1].buf[num_bytes];
+ num_bytes += rp->num_holes * sizeof(int32);
+
+ if ( num_bytes > (int) scat->elements[1].len || num_bytes < 0 )
+ {
+ Alarmp( SPLOG_ERROR, MEMB, "Fix_form_token:%d: buffer too small: num_bytes = %d, len = %ld, num_holes = %ld\n", __LINE__, num_bytes, (long) scat->elements[1].len, (long) rp->num_holes );
+ return -1;
+ }
+
+ for ( j = 0; j < rp->num_holes; ++j, ++hole )
+ {
+ if ( endian_flip )
+ *hole = Flip_int32( *hole );
+
+ if ( *hole < 0 || hole <= rp->aru || hole > rp->highest_seq )
+ {
+ Alarmp( SPLOG_ERROR, MEMB, "Fix_form_token:%d: bad hole: aru = %ld, hole = %ld, highest_seq = %ld\n", __LINE__, (long) rp->aru, (long) *hole, (long) rp->highest_seq );
+ return -1;
+ }
+ }
+
+ id = (encoded_id*) &scat->elements[1].buf[num_bytes];
+ num_bytes += rp->num_commit * sizeof(encoded_id);
+
+ if ( num_bytes > (int) scat->elements[1].len || num_bytes < 0 )
+ {
+ Alarmp( SPLOG_ERROR, MEMB, "Fix_form_token:%d: buffer too small: num_bytes = %d, len = %ld\n", __LINE__, num_bytes, (long) scat->elements[1].len );
+ return -1;
+ }
+
+ for ( j = 0; j < rp->num_commit; ++j, ++id )
+ if ( Conf_proc_by_index( *id, &p ) < 0 )
+ {
+ Alarmp( SPLOG_ERROR, MEMB, "Fix_form_token:%d: bad commit list entry: %ld\n", __LINE__, (long) *id );
+ return -1;
+ }
+ }
+
+ return 0;
}
Modified: branches/spread_5/daemon/membership.h
===================================================================
--- branches/spread_5/daemon/membership.h 2016-11-03 14:51:42 UTC (rev 859)
+++ branches/spread_5/daemon/membership.h 2016-11-14 17:19:30 UTC (rev 860)
@@ -49,6 +49,8 @@
#define EVS 6
+const char *Memb_state_str(void);
+
void Memb_init( void );
configuration *Memb_active_ptr( void );
membership_id Memb_active_id( void );
Modified: branches/spread_5/daemon/net_types.h
===================================================================
--- branches/spread_5/daemon/net_types.h 2016-11-03 14:51:42 UTC (rev 859)
+++ branches/spread_5/daemon/net_types.h 2016-11-14 17:19:30 UTC (rev 860)
@@ -74,38 +74,38 @@
#define CONTROL_TYPE 0x0f000000
-#define Is_unreliable( t ) ( (t) & UNRELIABLE_TYPE )
-#define Is_reliable( t ) ( (t) & RELIABLE_TYPE )
-#define Is_fifo( t ) ( (t) & FIFO_TYPE )
-#define Is_agreed( t ) ( (t) & AGREED_TYPE )
-#define Is_safe( t ) ( (t) & SAFE_TYPE )
-#define Is_regular( t ) ( (t) & REGULAR_TYPE )
+#define Is_unreliable( t ) ( ( (t) & REGULAR_TYPE ) == UNRELIABLE_TYPE )
+#define Is_reliable( t ) ( ( (t) & REGULAR_TYPE ) == RELIABLE_TYPE )
+#define Is_fifo( t ) ( ( (t) & REGULAR_TYPE ) == FIFO_TYPE )
+#define Is_agreed( t ) ( ( (t) & REGULAR_TYPE ) == AGREED_TYPE )
+#define Is_safe( t ) ( ( (t) & REGULAR_TYPE ) == SAFE_TYPE )
+#define Is_regular( t ) ( ( (t) & REGULAR_TYPE ) != 0 )
-#define Is_routed( t ) ( (t) & ROUTED_TYPE )
-#define Set_routed( t ) ( (t) | ROUTED_TYPE )
-#define Clear_routed( t ) ( (t) & ~ROUTED_TYPE )
+#define Is_routed( t ) ( (t) & ROUTED_TYPE )
+#define Set_routed( t ) ( (t) | ROUTED_TYPE )
+#define Clear_routed( t ) ( (t) & ~ROUTED_TYPE )
-#define Is_hurry( t ) ( (t) & HURRY_TYPE )
+#define Is_hurry( t ) ( (t) & HURRY_TYPE )
-#define Is_alive( t ) ( (t) & ALIVE_TYPE )
-#define Is_join( t ) ( (t) & JOIN_TYPE )
-#define Is_refer( t ) ( (t) & REFER_TYPE )
-#define Is_membership( t ) ( (t) & MEMBERSHIP_TYPE )
+#define Is_alive( t ) ( ( (t) & MEMBERSHIP_TYPE ) == ALIVE_TYPE )
+#define Is_join( t ) ( ( (t) & MEMBERSHIP_TYPE ) == JOIN_TYPE )
+#define Is_refer( t ) ( ( (t) & MEMBERSHIP_TYPE ) == REFER_TYPE )
+#define Is_membership( t ) ( ( (t) & MEMBERSHIP_TYPE ) != 0 )
-#define Is_form( t ) ( (t) & FORM_TYPE )
-#define Is_form1( t ) ( (t) & FORM1_TYPE )
-#define Is_form2( t ) ( (t) & FORM2_TYPE )
+#define Is_form1( t ) ( ( (t) & FORM_TYPE ) == FORM1_TYPE )
+#define Is_form2( t ) ( ( (t) & FORM_TYPE ) == FORM2_TYPE )
+#define Is_form( t ) ( ( (t) & FORM_TYPE ) != 0 )
#define Get_arq( t ) ( ( (t) & ARQ_TYPE ) >> 16 )
#define Set_arq( t, val ) ( ( (t) & ~ARQ_TYPE ) | ( ( (val) << 16) & ARQ_TYPE ) )
#define Get_retrans( t ) ( ( (t) & RETRANS_TYPE ) >> 20 )
#define Set_retrans( t, val) ( ( (t) & ~RETRANS_TYPE ) | ( ( (val) << 20) & RETRANS_TYPE ) )
-#define Is_status( t ) ( (t) & STATUS_TYPE )
-#define Is_partition( t ) ( (t) & PARTITION_TYPE )
-#define Is_fc( t ) ( (t) & FC_TYPE )
-#define Is_conf_reload( t ) ( (t) & RELOAD_TYPE )
-#define Is_control( t ) ( (t) & CONTROL_TYPE )
+#define Is_status( t ) ( ( (t) & CONTROL_TYPE ) == STATUS_TYPE )
+#define Is_partition( t ) ( ( (t) & CONTROL_TYPE ) == PARTITION_TYPE )
+#define Is_fc( t ) ( ( (t) & CONTROL_TYPE ) == FC_TYPE )
+#define Is_conf_reload( t ) ( ( (t) & CONTROL_TYPE ) == RELOAD_TYPE )
+#define Is_control( t ) ( ( (t) & CONTROL_TYPE ) != 0 )
typedef struct
{
Modified: branches/spread_5/daemon/network.c
===================================================================
--- branches/spread_5/daemon/network.c 2016-11-03 14:51:42 UTC (rev 859)
+++ branches/spread_5/daemon/network.c 2016-11-14 17:19:30 UTC (rev 860)
@@ -737,17 +737,12 @@
int received_bytes;
size_t i;
- if (scat->num_elements <= 0 || scat->elements[0].len < sizeof(token_header))
- Alarmp(SPLOG_FATAL, NETWORK, "Net_recv_token: BUG! Scatter is too small for token header!\n");
+ if ( scat->num_elements != 2 || scat->elements[0].len != sizeof( token_header ) || scat->elements[1].len != sizeof( token_body ) )
+ Alarmp( SPLOG_FATAL, NETWORK, "Net_recv_token: BUG! Scatter wrong size to receive token!\n" );
- for (i = 0; i < Num_token_channels && fd != Token_channel[i]; ++i);
-
- if (i == Num_token_channels)
- Alarm(EXIT, "Net_recv_token: Listening and received packet on non-token channel %d\n", fd);
-
received_bytes = DL_recvfrom_gen( fd, scat, &src_addr );
- if( received_bytes < 0 )
+ if ( received_bytes < 0 )
{
Alarmp( SPLOG_ERROR, NETWORK, "Net_recv_token: error: %d %d '%s' receiving on channel %d\n", received_bytes, sock_errno, sock_strerror(sock_errno), fd);
return( received_bytes );
@@ -755,27 +750,24 @@
if ( (size_t) received_bytes < sizeof( token_header ) )
{
- Alarmp( SPLOG_INFO, NETWORK, "Net_recv_token: ignoring token of size %d, smaller than token header size %lu from [%s]:%u on channel %d\n",
- received_bytes, (unsigned long) sizeof(token_header), SPU_ADDR_NTOP(&src_addr), (unsigned) spu_addr_ip_get_port(&src_addr), fd);
+ Alarmp( SPLOG_INFO, NETWORK, "Net_recv_token: ignoring token of size %d, smaller than token header size %ld from [%s]:%u on channel %d\n",
+ received_bytes, (long) sizeof(token_header), SPU_ADDR_NTOP(&src_addr), (unsigned) spu_addr_ip_get_port(&src_addr), fd);
return( 0 );
}
- if( !Same_endian( token_ptr->type ) )
+ if ( !Same_endian( token_ptr->type ) )
Flip_token( token_ptr );
- /* TODO: recv size integrity check?
-
if ( (size_t) received_bytes != sizeof( token_header ) + token_ptr->rtr_len)
{
- Alarmp( SPLOG_INFO, NETWORK, "Net_recv_token: Received invalid token: received bytes (%d) != expected length (%lu)\n",
- received_bytes, (unsigned long) (sizeof( token_header ) + token_ptr->rtr_len) );
+ Alarmp( SPLOG_INFO, NETWORK, "Net_recv_token: Received invalid token: received bytes (%d) != expected length (%ld)\n",
+ received_bytes, (long) ( sizeof( token_header ) + token_ptr->rtr_len ) );
return( 0 );
}
- */
- if (token_ptr->conf_hash != Cn->hash_code)
+ if ( token_ptr->conf_hash != Cn->hash_code )
{
- Alarmp( SPLOG_INFO, NETWORK, "Net_recv_token: ignoring token from different spread configuration; hash (%u) != local hash (%u); from [%s]:%u on channel %d\n",
+ Alarmp( SPLOG_INFO, NETWORK, "Net_recv_token: ignoring token with wrong spread configuration; hash (%u) != local hash (%u); from [%s]:%u on channel %d\n",
(unsigned) token_ptr->conf_hash, (unsigned) Cn->hash_code, SPU_ADDR_NTOP(&src_addr), (unsigned) spu_addr_ip_get_port(&src_addr), fd);
return( 0 );
}
@@ -787,10 +779,10 @@
return( 0 );
}
- if ( spu_addr_ip_cmp(&src_addr, &pp->proc_addr, FALSE) )
+ if ( spu_addr_ip_cmp( &src_addr, &pp->proc_addr, FALSE ) )
{
- Alarmp( SPLOG_WARNING, NETWORK, "Net_recv_token: ignoring token from transmitter (0x%08X) bc src_addr doesn't match my configuration; from [%s]:%u on channel %d\n",
- (unsigned) token_ptr->transmiter_id, SPU_ADDR_NTOP(&src_addr), (unsigned) spu_addr_ip_get_port(&src_addr), fd );
+ Alarmp( SPLOG_WARNING, NETWORK, "Net_recv_token: ignoring token from transmitter (0x%08X) bc src_addr [%s]:%u doesn't match my configuration [%s] on channel %d\n",
+ (unsigned) token_ptr->transmiter_id, SPU_ADDR_NTOP( &src_addr ), (unsigned) spu_addr_ip_get_port( &src_addr ), SPU_ADDR_NTOP2( &pp->proc_addr ), fd );
return( 0 );
}
@@ -807,13 +799,13 @@
"Net_recv_token: type = 0x%08X; transmiter_id = 0x%08X; proc_id = 0x%08X; "
"memb_id = { proc_id = 0x%08X, time = %ld }; "
"seq = %ld; aru = %ld; aru_last_id = 0x%08X; "
- "flow_control = %d; rtr_len = %u; conf_hash = %u; "
- "src_addr = [%s]:%u; received_bytes = %d\n",
+ "flow_control = %ld; rtr_len = %ld; conf_hash = %u; "
+ "src_addr = [%s]:%u; received_bytes = %ld\n",
(unsigned) token_ptr->type, (unsigned) token_ptr->transmiter_id, (unsigned) token_ptr->proc_id,
(unsigned) token_ptr->memb_id.proc_id, (long) token_ptr->memb_id.time,
(long) token_ptr->seq, (long) token_ptr->aru, (unsigned) token_ptr->aru_last_id,
- (int) token_ptr->flow_control, (unsigned) token_ptr->rtr_len, (unsigned) token_ptr->conf_hash,
- SPU_ADDR_NTOP(&src_addr), (unsigned) spu_addr_ip_get_port(&src_addr), received_bytes );
+ (long) token_ptr->flow_control, (long) token_ptr->rtr_len, (unsigned) token_ptr->conf_hash,
+ SPU_ADDR_NTOP(&src_addr), (unsigned) spu_addr_ip_get_port(&src_addr), (long) received_bytes );
return ( received_bytes );
}
@@ -844,7 +836,7 @@
else if ( send_len > MAX_PACKET_SIZE )
{
Alarmp( SPLOG_WARNING, PRINT, "Net_ucast_token: WARNING!!! Token is longer (%lu bytes) than a single fast ethernet MTU (%lu bytes)! "
- "IP fragmentation will likely occur and greatly increase the chance the token is lost!\n",
+ "IP fragmentation will likely occur and significantly increase the chance the token is lost!\n",
(unsigned long) send_len, (unsigned long) MAX_PACKET_SIZE );
}
Modified: branches/spread_5/daemon/prot_body.h
===================================================================
--- branches/spread_5/daemon/prot_body.h 2016-11-03 14:51:42 UTC (rev 859)
+++ branches/spread_5/daemon/prot_body.h 2016-11-14 17:19:30 UTC (rev 860)
@@ -68,6 +68,7 @@
ext packet_info Packets[MAX_PACKETS_IN_STRUCT];
+
ext int32 Aru;
ext int32 My_aru;
ext int32 Highest_seq;
Modified: branches/spread_5/daemon/protocol.c
===================================================================
--- branches/spread_5/daemon/protocol.c 2016-11-03 14:51:42 UTC (rev 859)
+++ branches/spread_5/daemon/protocol.c 2016-11-14 17:19:30 UTC (rev 860)
@@ -160,13 +160,13 @@
Send_pack_queue.first = NULL;
Send_pack_queue.last = NULL;
- New_pack.num_elements = 2;
+ New_pack.num_elements = 2;
New_pack.elements[0].len = sizeof(packet_header);
New_pack.elements[0].buf = (char *) new(PACK_HEAD_OBJ);
New_pack.elements[1].len = sizeof(packet_body);
New_pack.elements[1].buf = (char *) new(PACKET_BODY);
- New_token.num_elements = 2;
+ New_token.num_elements = 2;
New_token.elements[0].len = sizeof(token_header);
New_token.elements[0].buf = (char *) new(TOKEN_HEAD_OBJ);
New_token.elements[1].len = sizeof(token_body);
@@ -531,7 +531,9 @@
if ( ret == 0 )
{
- if ( ALARMP_NEEDED( SPLOG_DEBUG, PROTOCOL) ) Alarmp( SPLOG_DEBUG, PROTOCOL, "Prot_handle_token: ignoring token dropped by Net_recv_token.\n" );
+ if ( ALARMP_NEEDED( SPLOG_DEBUG, PROTOCOL) )
+ Alarmp( SPLOG_DEBUG, PROTOCOL, "Prot_handle_token: ignoring token dropped by Net_recv_token.\n" );
+
return;
}
Modified: branches/spread_5/daemon/spread_params.h
===================================================================
--- branches/spread_5/daemon/spread_params.h 2016-11-03 14:51:42 UTC (rev 859)
+++ branches/spread_5/daemon/spread_params.h 2016-11-14 17:19:30 UTC (rev 860)
@@ -73,11 +73,15 @@
#define MAX_REPS 25
#define MAX_FORM_REPS 20
-#define MAX_PACKETS_IN_STRUCT 8192
-#define PACKET_MASK 0x00001fff
+#define MAX_PACKETS_IN_STRUCT ( 0x1 << 11 )
+#define PACKET_MASK ( MAX_PACKETS_IN_STRUCT - 1 )
#define MAX_SEQ_GAP 1600 /* used in flow control to limit difference between highest_seq and aru */
+#if MAX_SEQ_GAP >= MAX_PACKETS_IN_STRUCT
+# error "MAX_SEQ_GAP must be less than MAX_PACKETS_IN_STRUCT!"
+#endif
+
#define MAX_EVS_ROUNDS 500 /* used in EVS state to limit total # of rounds to complete EVS */
#define WATER_MARK 500 /* used to limit incoming user messages */
More information about the Spread-cvs
mailing list