[Spread-cvs] commit: r629 - in trunk: . daemon docs libspread-util/include libspread-util/src
jschultz at spread.org
jschultz at spread.org
Tue Dec 17 12:17:17 EST 2013
Author: jschultz
Date: 2013-12-17 12:17:17 -0500 (Tue, 17 Dec 2013)
New Revision: 629
Added:
trunk/release_announcement_4.3.txt
trunk/release_announcement_accelerated_ring.txt
Modified:
trunk/Readme.txt
trunk/daemon/Makefile.in
trunk/daemon/config_gram.l
trunk/daemon/config_parse.y
trunk/daemon/configuration.c
trunk/daemon/configuration.h
trunk/daemon/flow_control.c
trunk/daemon/flow_control.h
trunk/daemon/membership.c
trunk/daemon/monitor.c
trunk/daemon/net_types.h
trunk/daemon/network.c
trunk/daemon/prot_body.h
trunk/daemon/protocol.c
trunk/daemon/protocol.h
trunk/daemon/spread.c
trunk/daemon/spread_params.h
trunk/daemon/status.h
trunk/docs/sample.spread.conf
trunk/libspread-util/include/spu_events.h
trunk/libspread-util/include/spu_objects_local.h
trunk/libspread-util/src/events.c
Log:
Merging experimental protocol into trunk:
svn merge -r581:582 branches/experimental-4.3 trunk
Modified: trunk/Readme.txt
===================================================================
--- trunk/Readme.txt 2013-11-18 19:56:46 UTC (rev 628)
+++ trunk/Readme.txt 2013-12-17 17:17:17 UTC (rev 629)
@@ -2,7 +2,7 @@
-----------------------------------------------------------
/=============================================================================\
-| The Spread Group Communication Toolkit |
+| The Spread Group Communication Toolkit - Accelerated Ring Research Version |
| Copyright (c) 1993-2013 Spread Concepts LLC |
| All rights reserved. |
| |
@@ -23,6 +23,7 @@
| John Schultz jschultz at spreadconcepts.com |
| |
| Major Contributors: |
+| Amy Babay babay at cs.jhu.edu - accelerated ring protocol |
| Ryan Caudy rcaudy at gmail.com - contribution to process groups|
| Claudiu Danilov claudiu at acm.org - scalable, wide-area support |
| Cristina Nita-Rotaru crisn at cs.purdue.edu - GC security |
@@ -48,9 +49,17 @@
| WWW : http://www.spread.org and http://www.spreadconcepts.com |
| Contact: info at spreadconcepts.com |
| |
-| Version 4.3.0 built 11/June/2013 |
+| Version 4.3.0 Accelerated Ring Experimental version built 2/July/2013 |
\=============================================================================/
+July 2, 2013 Ver 4.3.0 Final
+-----------------------------
+Features:
+ - Accelerated Ring Experimental protocol tailored for data center networks.
+ This protocol provides 30%-50% higher throughput and 20-35% lower latency
+ in modern local area networks. Both the original protocol and the accelerated
+ ring experimental protocol are supported by this version.
+
June 11, 2013 Ver 4.3.0 Final
-----------------------------
Features:
Modified: trunk/daemon/Makefile.in
===================================================================
--- trunk/daemon/Makefile.in 2013-11-18 19:56:46 UTC (rev 628)
+++ trunk/daemon/Makefile.in 2013-12-17 17:17:17 UTC (rev 629)
@@ -83,7 +83,7 @@
$(LD) -o $@ $(LDFLAGS) $(SPREADOBJS) $(LIBSPREADUTIL_DIR)/lib/libspread-util.a ../stdutil/lib/libstdutil-threaded-release.a $(LIBS)
spmonitor$(EXEEXT): $(MONITOR_OBJS) $(LIBSPREADUTIL_DIR)/lib/libspread-util.a
- $(LD) -o $@ $(LDFLAGS) $(MONITOR_OBJS) $(LIBS) $(LIBSPREADUTIL_DIR)/lib/libspread-util.a
+ $(LD) -o $@ $(LDFLAGS) $(MONITOR_OBJS) $(LIBSPREADUTIL_DIR)/lib/libspread-util.a $(LIBS)
sptmonitor$(EXEEXT): $(TMONITOR_OBJS) $(LIBSPREADUTIL_DIR)/lib/libspread-util.a
$(LD) $(THLDFLAGS) -o $@ $(TMONITOR_OBJS) $(THLIBS) $(LIBSPREADUTIL_DIR)/lib/libspread-util.a
@@ -91,10 +91,10 @@
testprog: spsend$(EXEEXT) sprecv$(EXEEXT)
spsend$(EXEEXT): s.o $(LIBSPREADUTIL_DIR)/lib/libspread-util.a
- $(LD) -o $@ $(LDFLAGS) s.o $(LIBS) $(LIBSPREADUTIL_DIR)/lib/libspread-util.a
+ $(LD) -o $@ $(LDFLAGS) s.o $(LIBSPREADUTIL_DIR)/lib/libspread-util.a $(LIBS)
sprecv$(EXEEXT): r.o $(LIBSPREADUTIL_DIR)/lib/libspread-util.a
- $(LD) -o $@ $(LDFLAGS) r.o $(LIBS) $(LIBSPREADUTIL_DIR)/lib/libspread-util.a
+ $(LD) -o $@ $(LDFLAGS) r.o $(LIBSPREADUTIL_DIR)/lib/libspread-util.a $(LIBS)
clean:
rm -f *.lo *.tlo *.to *.o *.a *.dylib $(TARGETS) spsimple_user
Modified: trunk/daemon/config_gram.l
===================================================================
--- trunk/daemon/config_gram.l 2013-11-18 19:56:46 UTC (rev 628)
+++ trunk/daemon/config_gram.l 2013-12-17 17:17:17 UTC (rev 629)
@@ -100,6 +100,8 @@
RouteMatrix { return ROUTEMATRIX; }
Window { return WINDOW; }
PersonalWindow { return PERSONALWINDOW; }
+AcceleratedRing { return ACCELERATEDRING; }
+AcceleratedWindow { return ACCELERATEDWINDOW; }
TokenTimeout { return TOKENTIMEOUT; }
HurryTimeout { return HURRYTIMEOUT; }
AliveTimeout { return ALIVETIMEOUT; }
@@ -136,7 +138,7 @@
M { yylval.mask = IFTYPE_MONITOR; return IMONITOR; }
C { yylval.mask = IFTYPE_CLIENT; return ICLIENT; }
D { yylval.mask = IFTYPE_DAEMON; return IDAEMON; }
-pDEBUG { yylval.number = 1; return PDEBUG; }
+pDEBUG { yylval.number = 1; return PDEBUG; }
INFO { yylval.number = 2; return PINFO; }
WARNING { yylval.number = 3; return PWARNING; }
ERROR { yylval.number = 4; return PERROR; }
Modified: trunk/daemon/config_parse.y
===================================================================
--- trunk/daemon/config_parse.y 2013-11-18 19:56:46 UTC (rev 628)
+++ trunk/daemon/config_parse.y 2013-12-17 17:17:17 UTC (rev 629)
@@ -327,7 +327,7 @@
%token DEBUGINITIALSEQUENCE
%token DANGEROUSMONITOR SOCKETPORTREUSE RUNTIMEDIR SPUSER SPGROUP ALLOWEDAUTHMETHODS REQUIREDAUTHMETHODS ACCESSCONTROLPOLICY
%token MAXSESSIONMESSAGES
-%token WINDOW PERSONALWINDOW
+%token WINDOW PERSONALWINDOW ACCELERATEDRING ACCELERATEDWINDOW
%token TOKENTIMEOUT HURRYTIMEOUT ALIVETIMEOUT JOINTIMEOUT REPTIMEOUT SEGTIMEOUT GATHERTIMEOUT FORMTIMEOUT LOOKUPTIMEOUT
%token SP_BOOL SP_TRIVAL LINKPROTOCOL PHOP PTCPHOP
%token IMONITOR ICLIENT IDAEMON
@@ -560,6 +560,14 @@
{
Conf_set_personal_window($3.number);
}
+ | ACCELERATEDRING EQUALS SP_BOOL
+ {
+ Conf_set_accelerated_ring($3.boolean);
+ }
+ | ACCELERATEDWINDOW EQUALS NUMBER
+ {
+ Conf_set_accelerated_window($3.number);
+ }
| TOKENTIMEOUT EQUALS NUMBER
{
Conf_set_token_timeout($3.number);
Modified: trunk/daemon/configuration.c
===================================================================
--- trunk/daemon/configuration.c 2013-11-18 19:56:46 UTC (rev 628)
+++ trunk/daemon/configuration.c 2013-12-17 17:17:17 UTC (rev 629)
@@ -106,6 +106,9 @@
static int Window = DEFAULT_WINDOW;
static int PersonalWindow = DEFAULT_PERSONAL_WINDOW;
+static bool AcceleratedRing = FALSE;
+static int AcceleratedWindow = 0;
+
enum
{
TOKEN_TIMEOUT_CONF = (0x1 << 0),
@@ -432,6 +435,14 @@
Alarmp( SPLOG_FATAL, CONF_SYS, "Failed to update string with version number!\n");
ConfStringLen += added_len;
+ /* append whether we are running with accelerated token or not */
+
+ if (ConfStringLen >= MAX_CONF_STRING) {
+ Alarmp( SPLOG_FATAL, CONF_SYS, "Failed to update string with accelerated ring type!\n");
+ }
+
+ ConfStringRep[ConfStringLen++] = '0' + AcceleratedRing;
+
/* calculate hash value of configuration.
* This daemon will only work with other daemons who have an identical hash value.
*/
@@ -486,6 +497,15 @@
}
Conf_id_to_str( My.id, ip );
+
+ if (PersonalWindow > Window) {
+ Alarmp(SPLOG_FATAL, CONF_SYS, "Conf_load_conf_file: PersonalWindow (%d) > Window (%d)!\n", PersonalWindow, Window);
+ }
+
+ if (AcceleratedRing && AcceleratedWindow > PersonalWindow) {
+ Alarmp(SPLOG_FATAL, CONF_SYS, "Conf_load_conf_file: AcceleratedWindow (%d) > PersonalWindow (%d)!\n", AcceleratedWindow, PersonalWindow);
+ }
+
Alarm( CONF_SYS, "Conf_load_conf_file: My name: %s, id: %s, port: %hu\n",
My.name, ip, My.port );
@@ -704,6 +724,45 @@
return( -1 );
}
+int32u Conf_previous( configuration *config )
+{
+ segment *seg_ptr;
+ segment *prev_seg_ptr;
+ int index_in_seg;
+ int i;
+
+ seg_ptr = &(config->segments[My.seg_index]);
+ index_in_seg = Conf_id_in_seg(seg_ptr, My.id);
+
+ if( index_in_seg > 0 )
+ {
+ /* I am not first in my segment; previous is previous proc in segment */
+ return seg_ptr->procs[index_in_seg-1]->id;
+ }
+
+ for( i = 0; i < My.seg_index; i++ )
+ {
+ if( config->segments[i].num_procs > 0 )
+ {
+ /* There is a segment before mine; previous is last in that segment */
+ prev_seg_ptr = &(config->segments[i]);
+ return prev_seg_ptr->procs[prev_seg_ptr->num_procs-1]->id;
+ }
+ }
+
+ /* I am first in first segment; previous is last in last segment */
+ for( i = config->num_segments-1; i >= My.seg_index; i-- )
+ {
+ if( config->segments[i].num_procs > 0 )
+ {
+ prev_seg_ptr = &(config->segments[i]);
+ return prev_seg_ptr->procs[prev_seg_ptr->num_procs-1]->id;
+ }
+ }
+ Alarm( EXIT, "Conf_previous: No process found\n" );
+ return( -1 );
+}
+
int32u Conf_seg_leader( configuration *config, int16 seg_index )
{
if( config->segments[seg_index].num_procs > 0 )
@@ -939,6 +998,30 @@
return PersonalWindow;
}
+void Conf_set_accelerated_ring(bool new_state)
+{
+ AcceleratedRing = new_state;
+}
+
+bool Conf_get_accelerated_ring(void)
+{
+ return AcceleratedRing;
+}
+
+void Conf_set_accelerated_window(int pwindow)
+{
+ if (pwindow < 0) {
+ Alarmp(SPLOG_FATAL, CONF_SYS, "Conf_set_accelerated_window: Attempt to set window to non-positive (%d)!\n", pwindow);
+ }
+ Alarmp(SPLOG_DEBUG, CONF_SYS, "Conf_set_accelerated_window: Set Window to %d\n", pwindow);
+ AcceleratedWindow = pwindow;
+}
+
+int Conf_get_accelerated_window(void)
+{
+ return AcceleratedWindow;
+}
+
int Conf_memb_timeouts_set(void)
{
return TimeoutMask != 0;
Modified: trunk/daemon/configuration.h
===================================================================
--- trunk/daemon/configuration.h 2013-11-18 19:56:46 UTC (rev 628)
+++ trunk/daemon/configuration.h 2013-12-17 17:17:17 UTC (rev 629)
@@ -107,6 +107,7 @@
int Conf_num_segments( configuration *config );
int32u Conf_leader( configuration *config );
int32u Conf_last( configuration *config );
+int32u Conf_previous( configuration *config );
int32u Conf_seg_leader( configuration *config, int16 seg_index );
int32u Conf_seg_last( configuration *config, int16 seg_index );
int Conf_append_id_to_seg( segment *seg, int32u id);
@@ -144,7 +145,10 @@
int Conf_get_window(void);
void Conf_set_personal_window(int pwindow);
int Conf_get_personal_window(void);
-
+void Conf_set_accelerated_ring(bool new_state);
+bool Conf_get_accelerated_ring(void);
+void Conf_set_accelerated_window(int pwindow);
+int Conf_get_accelerated_window(void);
int Conf_memb_timeouts_set(void);
int Conf_all_memb_timeouts_set(void);
void Conf_set_token_timeout(int timeout);
Modified: trunk/daemon/flow_control.c
===================================================================
--- trunk/daemon/flow_control.c 2013-11-18 19:56:46 UTC (rev 628)
+++ trunk/daemon/flow_control.c 2013-12-17 17:17:17 UTC (rev 629)
@@ -42,14 +42,20 @@
static int16 Window;
static int16 Personal_window;
+static int16 Accelerated_window;
+static bool Accelerated_ring;
void FC_init( )
{
Window = Conf_get_window();
Personal_window = Conf_get_personal_window();
+ Accelerated_ring = Conf_get_accelerated_ring();
+ Accelerated_window = Conf_get_accelerated_window();
GlobalStatus.window = Window;
GlobalStatus.personal_window = Personal_window;
+ GlobalStatus.accelerated_ring = Accelerated_ring;
+ GlobalStatus.accelerated_window = Accelerated_window;
}
void FC_new_configuration( )
@@ -72,14 +78,24 @@
return(allowed);
}
+bool FC_accelerated_ring(void)
+{
+ return Accelerated_ring;
+}
+
+int FC_accelerated_window(void)
+{
+ return Accelerated_window;
+}
+
void FC_handle_message( sys_scatter *scat )
{
- int16 *cur_fc_buf;
+ int16 (*cur_fc_buf)[2];
packet_header *pack_ptr;
proc dummy_proc;
int my_index;
- int16 temp_window,temp_personal_window;
+ int16 temp_window,temp_personal_window, temp_accelerated_window;
configuration *Cn;
Cn = Conf_ref();
@@ -96,24 +112,29 @@
"FC_handle_message: Illegal monitor request\n");
return;
}
- cur_fc_buf = (int16 *)scat->elements[1].buf;
+ cur_fc_buf = (int16 (*)[2])scat->elements[1].buf;
my_index = Conf_proc_by_id( Conf_my().id, &dummy_proc );
if( Same_endian( pack_ptr->type ) ) {
- temp_window = cur_fc_buf[Conf_num_procs( Cn )];
- temp_personal_window = cur_fc_buf[my_index];
+ temp_window = cur_fc_buf[Conf_num_procs( Cn )][0];
+ temp_personal_window = cur_fc_buf[my_index][0];
+ temp_accelerated_window = cur_fc_buf[my_index][1];
}else{
- temp_window = Flip_int16( cur_fc_buf[Conf_num_procs( Cn )] );
- temp_personal_window = Flip_int16( cur_fc_buf[my_index] );
+ temp_window = Flip_int16( cur_fc_buf[Conf_num_procs( Cn )][0] );
+ temp_personal_window = Flip_int16( cur_fc_buf[my_index][0] );
+ temp_accelerated_window = Flip_int16( cur_fc_buf[my_index][1] );
}
if( temp_window != -1 ) Window = temp_window;
if( temp_personal_window != -1 ) Personal_window = temp_personal_window;
+ if( temp_accelerated_window != -1) Accelerated_window = temp_accelerated_window;
+
GlobalStatus.window = Window;
GlobalStatus.personal_window = Personal_window;
+ GlobalStatus.accelerated_window = Accelerated_window;
Alarm( FLOW_CONTROL,
- "FC_handle_message: Got monitor mess, Window %d Personal %d\n",
- Window, Personal_window );
+ "FC_handle_message: Got monitor mess, Window %d Personal %d Accelerated %d\n",
+ Window, Personal_window, Accelerated_window );
}
void FC_signal_conf_reload( )
Modified: trunk/daemon/flow_control.h
===================================================================
--- trunk/daemon/flow_control.h 2013-11-18 19:56:46 UTC (rev 628)
+++ trunk/daemon/flow_control.h 2013-12-17 17:17:17 UTC (rev 629)
@@ -41,6 +41,8 @@
void FC_init( );
void FC_new_configuration( );
int FC_allowed( int flow_control, int num_retrans );
+bool FC_accelerated_ring(void);
+int FC_accelerated_window(void);
void FC_handle_message( sys_scatter *scat );
void FC_signal_conf_reload();
Modified: trunk/daemon/membership.c
===================================================================
--- trunk/daemon/membership.c 2013-11-18 19:56:46 UTC (rev 628)
+++ trunk/daemon/membership.c 2013-12-17 17:17:17 UTC (rev 629)
@@ -1972,6 +1972,7 @@
Alarm( EXIT, "Read_form2: BUG2 no such id %u\n", p.id);
}
Net_set_membership( Future_membership );
+ Prot_set_prev_proc(&Future_membership);
FC_new_configuration( );
/* get my ring info */
@@ -2029,7 +2030,7 @@
}
/* The token circulates in conf order, which also defines the order
- * by which we choose "leaders." So, if noone else has set the id
+ * by which we choose "leaders." So, if no one else has set the id
* for my ring, I get to, and I'll be leader. */
if( !my_rg_info->trans_time )
{
@@ -2075,6 +2076,7 @@
Token_alive = 1;
E_queue( Memb_token_loss, 0, NULL, Token_timeout );
+ Received_token_rounds = 0; /* ### Used for priority switching*/
Last_token->type = 0;
Last_token->seq = 0;
Last_token->aru = 0;
Modified: trunk/daemon/monitor.c
===================================================================
--- trunk/daemon/monitor.c 2013-11-18 19:56:46 UTC (rev 628)
+++ trunk/daemon/monitor.c 2013-12-17 17:17:17 UTC (rev 629)
@@ -81,8 +81,8 @@
static int16 Partition[MAX_PROCS_RING];
static int16 Work_partition[MAX_PROCS_RING];
-static int16 Fc_buf[MAX_PROCS_RING];
-static int16 Work_fc_buf[MAX_PROCS_RING];
+static int16 Fc_buf[MAX_PROCS_RING][2];
+static int16 Work_fc_buf[MAX_PROCS_RING][2];
static int Status_vector[MAX_PROCS_RING];
@@ -109,7 +109,7 @@
static void Define_flow_control();
static void Send_flow_control();
-static void Print_flow_control( int16 fc_buf[MAX_PROCS_RING] );
+static void Print_flow_control( int16 fc_buf[MAX_PROCS_RING][2] );
static void Activate_status();
static void Send_status_query();
@@ -385,8 +385,10 @@
break;
case '6':
- for( i=0; i < Conf_num_procs( &Cn )+1; i++ )
- Fc_buf[i] = Work_fc_buf[i];
+ for( i=0; i < Conf_num_procs( &Cn )+1; i++ ) {
+ Fc_buf[i][0] = Work_fc_buf[i][0];
+ Fc_buf[i][1] = Work_fc_buf[i][1];
+ }
Send_flow_control();
printf("\n");
@@ -588,7 +590,7 @@
#endif /* _REENTRANT */
-static void Print_flow_control( int16 fc_buf[MAX_PROCS_RING] )
+static void Print_flow_control( int16 fc_buf[MAX_PROCS_RING][2] )
{
int32 proc_id;
proc p;
@@ -600,7 +602,7 @@
printf("Flow Control Parameters:\n");
printf("------------------------\n");
printf("\n");
- printf("Window size: %d\n",fc_buf[ Conf_num_procs( &Cn )]);
+ printf("Window size: %d\n",fc_buf[ Conf_num_procs( &Cn )][0]);
printf("\n");
for( i=0; i < Cn.num_segments ; i++ )
{
@@ -608,7 +610,11 @@
{
proc_id = Cn.segments[i].procs[j]->id;
proc_index = Conf_proc_by_id( proc_id, &p );
- printf("\t%s\t%d\n", p.name, fc_buf[proc_index] );
+ printf("\t%s personal window\t%d\n", p.name, fc_buf[proc_index][0] );
+
+ if (Conf_get_accelerated_ring()) {
+ printf("\t%s accelerated window\t%d\n", p.name, fc_buf[proc_index][1] );
+ }
}
printf("\n");
}
@@ -642,11 +648,11 @@
exit(0);
}
ret = sscanf(str, "%d", &temp );
- Work_fc_buf[Conf_num_procs( &Cn )] = temp;
+ Work_fc_buf[Conf_num_procs( &Cn )][0] = temp;
if( ret > 0 ) legal = 1;
else if( ret == -1 ){
legal = 1;
- Work_fc_buf[Conf_num_procs( &Cn )] = -1;
+ Work_fc_buf[Conf_num_procs( &Cn )][0] = -1;
}else printf("Please enter a number\n");
}
printf("\n");
@@ -658,7 +664,7 @@
proc_index = Conf_proc_by_id( proc_id, &p );
for( legal=0; !legal; )
{
- printf("\t%s\t", p.name);
+ printf("\t%s personal window\t", p.name);
if( fgets( str, 70, stdin ) == NULL )
{
@@ -666,13 +672,34 @@
exit(0);
}
ret = sscanf(str, "%d", &temp);
- Work_fc_buf[proc_index] = temp;
+ Work_fc_buf[proc_index][0] = temp;
if( ret > 0 ) legal = 1;
else if( ret == -1 ){
legal = 1;
- Work_fc_buf[proc_index] = -1;
+ Work_fc_buf[proc_index][0] = -1;
}else printf("Please enter a number\n");
}
+
+ if (Conf_get_accelerated_ring()) {
+
+ for( legal=0; !legal; )
+ {
+ printf("\t%s accelerated window\t", p.name);
+
+ if( fgets( str, 70, stdin ) == NULL )
+ {
+ printf("Bye.\n");
+ exit(0);
+ }
+ ret = sscanf(str, "%d", &temp);
+ Work_fc_buf[proc_index][1] = temp;
+ if( ret > 0 ) legal = 1;
+ else if( ret == -1 ){
+ legal = 1;
+ Work_fc_buf[proc_index][1] = -1;
+ }else printf("Please enter a number\n");
+ }
+ }
}
printf("\n");
}
@@ -957,6 +984,8 @@
GlobalStatus.num_segments = Flip_int16( GlobalStatus.num_segments );
GlobalStatus.window = Flip_int16( GlobalStatus.window );
GlobalStatus.personal_window = Flip_int16( GlobalStatus.personal_window );
+ GlobalStatus.accelerated_ring = Flip_int16( GlobalStatus.accelerated_ring );
+ GlobalStatus.accelerated_window = Flip_int16( GlobalStatus.accelerated_window );
GlobalStatus.num_sessions = Flip_int16( GlobalStatus.num_sessions );
GlobalStatus.num_groups = Flip_int16( GlobalStatus.num_groups );
GlobalStatus.major_version = Flip_int16( GlobalStatus.major_version );
@@ -976,19 +1005,22 @@
GlobalStatus.major_version,GlobalStatus.minor_version,GlobalStatus.patch_version,
GlobalStatus.state, GlobalStatus.gstate, GlobalStatus.sec);
if( ret2 < 0 )
- printf("Membership : %d procs in %d segments, leader is %d\n",
+ printf("Membership : %d procs in %d segments, leader is %d, ",
GlobalStatus.num_procs,GlobalStatus.num_segments,GlobalStatus.leader_id);
- else printf("Membership : %d procs in %d segments, leader is %s\n",
+ else printf("Membership : %d procs in %d segments, leader is %s, ",
GlobalStatus.num_procs,GlobalStatus.num_segments,leader_p.name);
+ if (GlobalStatus.accelerated_ring == 0)
+ printf("regular protocol\n");
+ else
+ printf("accelerated protocol\n");
printf("rounds : %7d\ttok_hurry : %7d\tmemb change: %7d\n",GlobalStatus.token_rounds,GlobalStatus.token_hurry,GlobalStatus.membership_changes);
printf("sent pack: %7d\trecv pack : %7d\tretrans : %7d\n",GlobalStatus.packet_sent,GlobalStatus.packet_recv,GlobalStatus.retrans);
printf("u retrans: %7d\ts retrans : %7d\tb retrans : %7d\n",GlobalStatus.u_retrans,GlobalStatus.s_retrans,GlobalStatus.b_retrans);
printf("My_aru : %7d\tAru : %7d\tHighest seq: %7d\n",GlobalStatus.my_aru,GlobalStatus.aru, GlobalStatus.highest_seq);
printf("Sessions : %7d\tGroups : %7d\tWindow : %7d\n",GlobalStatus.num_sessions,GlobalStatus.num_groups,GlobalStatus.window);
- printf("Deliver M: %7d\tDeliver Pk: %7d\tPers Window: %7d\n",GlobalStatus.message_delivered,GlobalStatus.packet_delivered,GlobalStatus.personal_window);
- printf("Delta Mes: %7d\tDelta Pack: %7d\tDelta sec : %7d\n",
- GlobalStatus.message_delivered - last_mes, GlobalStatus.aru - last_aru, GlobalStatus.sec - last_sec );
+ printf("Deliver M: %7d\tDeliver Pk: %7d\tP/A Window : %7d/%d\n",GlobalStatus.message_delivered,GlobalStatus.packet_delivered,GlobalStatus.personal_window,GlobalStatus.accelerated_window);
+ printf("Delta Mes: %7d\tDelta Pk : %7d\tDelta sec : %7d\n",GlobalStatus.message_delivered - last_mes,GlobalStatus.aru - last_aru,GlobalStatus.sec - last_sec);
printf("==================================\n");
printf("\n");
@@ -1034,8 +1066,10 @@
{
Partition[i] = 0;
Work_partition[i] = 0;
- Fc_buf[i] = 0;
- Work_fc_buf[i] = 0;
+ Fc_buf[i][0] = 0;
+ Fc_buf[i][1] = 0;
+ Work_fc_buf[i][0] = 0;
+ Work_fc_buf[i][1] = 0;
Status_vector[i] = 0;
}
Mutex_lock( &Partition_mutex );
Modified: trunk/daemon/net_types.h
===================================================================
--- trunk/daemon/net_types.h 2013-11-18 19:56:46 UTC (rev 628)
+++ trunk/daemon/net_types.h 2013-12-17 17:17:17 UTC (rev 629)
@@ -107,16 +107,22 @@
#define MONITOR_HASH 1100 /* Conf_hash code for packets from spmonitor program */
+typedef struct dummy_fragment_header {
+ int16 fragment_index;
+ int16 fragment_len;
+} fragment_header;
+
typedef struct dummy_packet_header {
int32 type;
int32 transmiter_id;
int32 proc_id;
membership_id memb_id;
int32 seq;
- int32 fifo_seq;
- int16 packet_index;
+ int32 token_round; /* ### changed from fifo_seq */
+ int32 conf_hash;
int16 data_len;
- int32 conf_hash;
+ int16 padding;
+ fragment_header first_frag_header;
} packet_header;
typedef char packet_body[MAX_PACKET_SIZE-sizeof(packet_header)];
Modified: trunk/daemon/network.c
===================================================================
--- trunk/daemon/network.c 2013-11-18 19:56:46 UTC (rev 628)
+++ trunk/daemon/network.c 2013-12-17 17:17:17 UTC (rev 629)
@@ -62,12 +62,6 @@
static int32 Send_address[MAX_SEGMENTS];
static int16 Send_ports[MAX_SEGMENTS];
-/* ### Pack: 3 lines */
-/* Global in function so both Net_queue_bcast and Net_flush_bcast can access them */
-static sys_scatter Queue_scat;
-static int Queued_bytes = 0;
-static const char align_padding[4] = "padd";
-
/* address for token sending - which is always needed */
static int32 Token_address;
static int16 Token_port;
@@ -276,114 +270,6 @@
return( ret );
}
-/* ### Pack: 2 routines */
-int Net_queue_bcast( sys_scatter *scat )
-{
- packet_header *pack_ptr;
- int new_bytes;
- int i, j;
- int ret;
- int align_bytes, align_num_scatter;
-
- /* This line is redundent because of static initialization to 0 */
- if ( Queued_bytes == 0 ) Queue_scat.num_elements = 0;
-
- ret = 0;
- new_bytes = 0;
-
- for ( i=0; i < scat->num_elements; i++) {
- new_bytes += scat->elements[i].len;
- }
- /* Fix alignment of packed messages so they will each begin on a 4 byte alignment
- * This is needed for Sparc, might need enhancement if other archs
- * have more extensive alignement rules
- */
- align_bytes = 0;
- align_num_scatter = 0;
- switch(Queued_bytes % 4) {
- case 1:
- align_bytes++;
- case 2:
- align_bytes++;
- case 3:
- align_bytes++;
- align_num_scatter = 1;
- case 0:
- /* nothing since already aligned */
- break;
- }
-
- if ( ( (Queued_bytes + new_bytes + align_bytes) > MAX_PACKET_SIZE ) ||
- ( (Queue_scat.num_elements + scat->num_elements + align_num_scatter) > ARCH_SCATTER_SIZE ) )
- {
- ret = Net_flush_bcast();
- align_bytes = 0;
- align_num_scatter = 0;
- }
-
- if ( Queued_bytes == 0 ) {
- /* routing on channels if needed according to membership */
- pack_ptr = (packet_header *)scat->elements[0].buf;
- pack_ptr->type = Set_routed( pack_ptr->type );
- pack_ptr->type = Set_endian( pack_ptr->type );
- pack_ptr->conf_hash = Cn->hash_code;
- pack_ptr->transmiter_id = My.id;
- }
-
- if ( align_bytes > 0 )
- {
- Queue_scat.elements[Queue_scat.num_elements].len = align_bytes;
- Queue_scat.elements[Queue_scat.num_elements].buf = (char *)align_padding;
-
- Queued_bytes += align_bytes;
- Queue_scat.num_elements += 1;
- Alarm(NETWORK, "Net_queue_bcast: Inserted padding of %d bytes to message of size %d\n", align_bytes, new_bytes );
- }
-
- /* Add new packet to Queue_scat to be sent as packed packet */
- for ( i=0, j=Queue_scat.num_elements; i < scat->num_elements; i++, j++) {
- Queue_scat.elements[j].len = scat->elements[i].len;
- Queue_scat.elements[j].buf = scat->elements[i].buf;
- }
- Queued_bytes += new_bytes;
- Queue_scat.num_elements += scat->num_elements ;
-
- return( ret );
-}
-
-int Net_flush_bcast(void)
-{
- packet_header *pack_ptr;
- int i;
- int ret;
-
- if (Queued_bytes == 0 ) return( 0 );
-
- Alarm(NETWORK, "Net_flush_bcast: Flushing with Queued_bytes = %d; num_elements in scat = %d; size of scat0,1 = %d %d\n", Queued_bytes, Queue_scat.num_elements, Queue_scat.elements[0].len, Queue_scat.elements[1].len);
-
- ret = 0;
-
- for ( i=0; i< Num_send_needed; i++ )
- {
- ret = DL_send( Send_channel, Send_address[i], Send_ports[i], &Queue_scat );
- }
- pack_ptr = (packet_header *)Queue_scat.elements[0].buf;
- pack_ptr->type = Clear_routed( pack_ptr->type );
-
- /* broadcasting if needed according to configuration */
- if( Bcast_needed )
- {
- ret = DL_send( Send_channel, Bcast_address, Bcast_port, &Queue_scat );
- }
-
- if( !Bcast_needed && (Num_send_needed == 0) )
- ret = 1; /* No actual send is needed, but 'packet' can be considered 'sent' */
-
- Queue_scat.num_elements = 0;
- Queued_bytes = 0;
- return( ret );
-}
-
int Net_scast( int16 seg_index, sys_scatter *scat )
{
packet_header *pack_ptr;
@@ -446,9 +332,7 @@
static scatter save;
packet_header *pack_ptr;
int bytes_left;
- int received_bytes, body_offset;
- int processed_bytes;
- int pack_same_endian;
+ int received_bytes;
int i;
bool ch_found;
@@ -476,7 +360,7 @@
return( -1 );
}
- /* Fliping packet header to my form if needed */
+ /* Flipping packet header to my form if needed */
if( !Same_endian( pack_ptr->type ) ) Flip_pack( pack_ptr );
/* First reject any message whose daemon has a different configuration */
@@ -529,6 +413,13 @@
if( pack_ptr->transmiter_id == My.id )
return( 0 );
+ /* packet validity check */
+ if( received_bytes != sizeof( packet_header ) + pack_ptr->data_len) {
+ Alarm( PRINT, "Net_recv: Received invalid packet - received bytes (%d) != expected length (%d)\n",
+ received_bytes, sizeof( packet_header ) + pack_ptr->data_len );
+ return( -1 );
+ }
+
if( Bcast_needed && Is_routed( pack_ptr->type ) )
{
if( !Segment_leader ) Alarm( NETWORK,
@@ -575,61 +466,6 @@
*/
pack_ptr->type = Clear_routed ( pack_ptr->type );
- /*
- * Check validity of packet size and flip every packet header
- * other than first header (which is already flipped).
- * If packet size is not valid, return -1, otherwise
- * return size of received packet.
- */
- processed_bytes = sizeof( packet_header ) + pack_ptr->data_len;
- pack_same_endian = Same_endian( pack_ptr->type );
- /* ignore any alignment padding */
- if ( processed_bytes < received_bytes ) {
- switch(processed_bytes % 4)
- {
- case 1:
- processed_bytes++;
- case 2:
- processed_bytes++;
- case 3:
- processed_bytes++;
- case 0:
- /* already aligned */
- break;
- }
- }
- while( processed_bytes < received_bytes )
- {
- body_offset = processed_bytes - sizeof(packet_header);
- pack_ptr = (packet_header *)&scat->elements[1].buf[body_offset];
-
- /* flip contigues packet header */
- if( !pack_same_endian ) {
- Flip_pack( pack_ptr );
- }
-
- processed_bytes += sizeof( packet_header ) + pack_ptr->data_len;
- /* ignore any alignment padding */
- if ( processed_bytes < received_bytes ) {
- switch(processed_bytes % 4)
- {
- case 1:
- processed_bytes++;
- case 2:
- processed_bytes++;
- case 3:
- processed_bytes++;
- case 0:
- /* already aligned */
- break;
- }
- }
- }
- Alarm( NETWORK, "Net_recv: Received Packet - packet length(%d), packed message length(%d)\n", received_bytes, processed_bytes);
- if( processed_bytes != received_bytes ) {
- Alarm( PRINT, "Net_recv: Received Packet - packet length(%d) != packed message length(%d)\n", received_bytes, processed_bytes);
- return( -1 );
- }
return( received_bytes );
}
@@ -791,6 +627,9 @@
return( Partition[Partition_my_index] == Partition[proc_index] );
}
+/* The first fragment header is not flipped here, even though it is
+ part of the packet header. It will be flipped with the other
+ fragment headers in Prot_handle_bcast */
void Flip_pack( packet_header *pack_ptr )
{
pack_ptr->type = Flip_int32( pack_ptr->type );
@@ -799,10 +638,9 @@
pack_ptr->memb_id.proc_id = Flip_int32( pack_ptr->memb_id.proc_id );
pack_ptr->memb_id.time = Flip_int32( pack_ptr->memb_id.time );
pack_ptr->seq = Flip_int32( pack_ptr->seq );
- pack_ptr->fifo_seq = Flip_int32( pack_ptr->fifo_seq );
- pack_ptr->packet_index = Flip_int16( pack_ptr->packet_index );
- pack_ptr->data_len = Flip_int16( pack_ptr->data_len );
+ pack_ptr->token_round = Flip_int32( pack_ptr->token_round ); /* fifo_seq changed to token_round */
pack_ptr->conf_hash = Flip_int32( pack_ptr->conf_hash );
+ pack_ptr->data_len = Flip_int16( pack_ptr->data_len );
}
void Flip_token( token_header *token_ptr )
Modified: trunk/daemon/prot_body.h
===================================================================
--- trunk/daemon/prot_body.h 2013-11-18 19:56:46 UTC (rev 628)
+++ trunk/daemon/prot_body.h 2013-12-17 17:17:17 UTC (rev 629)
@@ -75,6 +75,7 @@
ext int32 Last_delivered;
ext int32 Last_seq;
ext int32 Token_rounds;
+ext int32 Received_token_rounds; /* ### Added to determine when to switch priorities*/
ext token_header *Last_token;
ext int Transitional;
Modified: trunk/daemon/protocol.c
===================================================================
--- trunk/daemon/protocol.c 2013-11-18 19:56:46 UTC (rev 628)
+++ trunk/daemon/protocol.c 2013-12-17 17:17:17 UTC (rev 629)
@@ -51,10 +51,30 @@
#include "spu_alarm.h"
#include "sess_types.h" /* for message_header */
+typedef struct dummy_queue_link{
+ sys_scatter *packet;
+ struct dummy_queue_link *next;
+} queue_link;
+
+typedef struct dummy_packet_queue{
+ int num_packets;
+ queue_link *first;
+ queue_link *last;
+} packet_queue;
+
+typedef struct dummy_message_queue{
+ int num_messages;
+ message_link *first;
+ message_link *last;
+} message_queue;
+
/* Prot variables */
static proc My;
static int My_index;
+static int32 Prev_proc_id; /* predecessor in ring (i.e. process that sends me the token) */
+static bool Token_has_priority; /* true when token channels have higher priority than bcast channels */
+
static int32 Set_aru;
static int Token_counter;
@@ -64,7 +84,7 @@
/* Used ONLY in Prot_handle_token and grurot, inited in Prot_init */
static sys_scatter New_token;
static token_header *Token;
-static sys_scatter Send_pack;
+static packet_queue Send_pack_queue;
static packet_header *Hurry_head;
static sys_scatter Hurry_pack;
@@ -73,22 +93,23 @@
/* Used to indicate a need to reload configuration at end of current membership */
static bool Prot_Need_Conf_Reload = FALSE;
-static bool Just_Installed = FALSE; /* tracks if we just installed a reg memb due to last token we sent */
+/* Used to indicate if we just installed a reg memb due to last token we sent */
+static bool Just_Installed = FALSE;
-/* ### Pack: 1 line */
-static packet_info Buffered_packets[ARCH_SCATTER_SIZE];
-
static down_queue Protocol_down_queue[2]; /* only used in spread3 */
static void Prot_handle_bcast();
static void Prot_handle_token();
static int Answer_retrans( int *ret_new_ptr, int32 *proc_id, int16 *seg_index );
static int Send_new_packets( int num_allowed );
+static int Prot_queue_bcast( sys_scatter *send_pack_ptr, packet_queue *pack_queue );
+static int Prot_flush_bcast( packet_queue *pack_queue );
static int Is_token_hold();
static int To_hold_token();
static void Handle_hurry( packet_header *pack_ptr );
static void Deliver_packet( int pack_entry, int to_copy );
static void Flip_token_body( char *buf, token_header *token_ptr );
+static void Flip_frag( fragment_header *frag_ptr );
static void Deliver_reliable_packets( int32 start_seq, int num_packets );
static void Deliver_agreed_packets();
@@ -105,11 +126,14 @@
Mem_init_object( TOKEN_HEAD_OBJ, "token_head", sizeof( token_header ), 10, 0 );
Mem_init_object( TOKEN_BODY_OBJ, "token_body", sizeof( token_body ), 10, 0 );
Mem_init_object( SCATTER, "scatter", sizeof( scatter ), 200+MAX_PROCS_RING, 0 );
+ Mem_init_object( SYS_SCATTER, "sys_scatter", sizeof( sys_scatter ), 200, 0 );
+ Mem_init_object( QUEUE_LINK, "queue_link", sizeof( queue_link ), 200, 0 );
My = Conf_my();
My_index = Conf_proc_by_id( My.id, &My );
GlobalStatus.my_id = My.id;
GlobalStatus.packet_delivered = 0;
+ Prev_proc_id = 0;
for( i=0; i < MAX_PROCS_RING+1; i++ )
Up_queue[i].exist = 0;
@@ -135,6 +159,10 @@
Last_delivered = 0;
}
+ Send_pack_queue.num_packets = 0;
+ Send_pack_queue.first = NULL;
+ Send_pack_queue.last = NULL;
+
New_pack.num_elements = 2;
New_pack.elements[0].len = sizeof(packet_header);
New_pack.elements[0].buf = (char *) new(PACK_HEAD_OBJ);
@@ -147,9 +175,6 @@
New_token.elements[1].len = sizeof(token_body);
New_token.elements[1].buf = (char *) new(TOKEN_BODY_OBJ);
- Send_pack.num_elements = 2;
- Send_pack.elements[0].len = sizeof(packet_header);
-
Token = (token_header *)New_token.elements[0].buf;
Last_token = new(TOKEN_HEAD_OBJ);
Last_token->type = 0;
@@ -178,6 +203,7 @@
E_attach_fd( *token_channels, READ_FD, Prot_handle_token, 0, NULL, MEDIUM_PRIORITY );
token_channels++;
}
+ Token_has_priority = FALSE;
FC_init( );
Memb_init();
@@ -245,14 +271,18 @@
static void Prot_handle_bcast(channel fd, int dummy, void *dummy_p)
{
packet_header *pack_ptr;
+ packet_body *pack_body_ptr;
+ fragment_header *frag_ptr;
int pack_entry;
proc p;
int received_bytes;
- int total_bytes_processed;
- int num_buffered_packets;
- int i;
- int32 j;
+ int processed_bytes;
+ int padding_bytes;
+ int i, ret;
/* int r1,r2; */
+ int num_bcast, num_token;
+ channel *bcast_channels;
+ channel *token_channels;
received_bytes = Net_recv( fd, &New_pack );
/* My own packet or from another monitor component */
@@ -262,15 +292,6 @@
pack_ptr = (packet_header *)New_pack.elements[0].buf;
- /* ### Pack, this has to move down to network.c
- * if( pack_ptr->data_len +sizeof(packet_header) != received_bytes )
- * {
- * Alarm( PRINT, "Prot_handle_bcast: received %d, should be %d\n",
- * received_bytes, pack_ptr->data_len+sizeof(packet_header) );
- * return;
- * }
- */
-
if( Is_status( pack_ptr->type ) )
{
Stat_handle_message( &New_pack );
@@ -288,7 +309,7 @@
Prot_handle_conf_reload( &New_pack );
return;
}
- /* delete random
+ /* delete random
r1 = ((-My.id)%17)+3;
r2 = get_rand() % (r1+3 );
if ( r2 == 0 ) return; */
@@ -323,98 +344,126 @@
}
}
- /* ### Pack: next 70 lines (almost till the end of the routine) have changed */
- Buffered_packets[0].head = pack_ptr;
- Buffered_packets[0].body = (packet_body *)New_pack.elements[1].buf;
- received_bytes -= sizeof(packet_header);
- total_bytes_processed = pack_ptr->data_len;
- /* ignore any alignment padding */
- switch(total_bytes_processed % 4)
- {
- case 1:
- total_bytes_processed++;
- case 2:
- total_bytes_processed++;
- case 3:
- total_bytes_processed++;
- case 0:
- /* already aligned */
- break;
- }
- for( i = 1; received_bytes > total_bytes_processed; i++ )
- {
- /* copy into each of the elements after the first element*/
- Buffered_packets[i].head = (packet_header *)new(PACK_HEAD_OBJ);
- Buffered_packets[i].body = (packet_body *)new(PACKET_BODY);
- if (Buffered_packets[i].head == NULL)
- Alarm(EXIT, "Prot_handle_bcast: Memory allocation failed for PACK_HEAD_OBJ\n");
- if (Buffered_packets[i].body == NULL)
- Alarm(EXIT, "Prot_handle_bcast: Memory allocation failed for PACKET_BODY\n");
-
- pack_ptr = (packet_header *)&New_pack.elements[1].buf[total_bytes_processed];
- memcpy( Buffered_packets[i].head, pack_ptr, sizeof( packet_header ) );
- total_bytes_processed += sizeof(packet_header);
- memcpy( Buffered_packets[i].body, &New_pack.elements[1].buf[total_bytes_processed], pack_ptr->data_len);
- total_bytes_processed += pack_ptr->data_len;
- /* ignore any alignment padding */
- switch(total_bytes_processed % 4)
- {
- case 1:
- total_bytes_processed++;
- case 2:
- total_bytes_processed++;
- case 3:
- total_bytes_processed++;
- case 0:
- /* already aligned */
- break;
- }
- }
- num_buffered_packets = i;
-
- for( i = 0; i < num_buffered_packets; i++)
- {
- pack_ptr = Buffered_packets[i].head;
-
/* do we have this packet */
if( pack_ptr->seq <= Aru )
{
Alarm( PROTOCOL, "Prot_handle_bcast: delayed packet %d already delivered (Aru %d)\n",
pack_ptr->seq, Aru );
- dispose(Buffered_packets[i].head);
- dispose(Buffered_packets[i].body);
- continue;
+ return;
}
pack_entry = pack_ptr->seq & PACKET_MASK;
if( Packets[pack_entry].exist )
{
Alarm( PROTOCOL, "Prot_handle_bcast: packet %d already exist\n",
pack_ptr->seq );
- dispose(Buffered_packets[i].head);
- dispose(Buffered_packets[i].body);
- continue;
+ return;
}
Packets[pack_entry].proc_index = Conf_proc_by_id( pack_ptr->proc_id, &p );
if( Packets[pack_entry].proc_index < 0 )
{
Alarm( PROTOCOL, "Prot_handle_bcast: unknown proc %d\n", pack_ptr->proc_id );
- dispose(Buffered_packets[i].head);
- dispose(Buffered_packets[i].body);
- continue;
+ return;
}
+
+ pack_body_ptr = (packet_body *)New_pack.elements[1].buf;
+ frag_ptr = &(pack_ptr->first_frag_header);
+ if( !Same_endian(pack_ptr->type) )
+ {
+ Flip_frag(frag_ptr);
+ processed_bytes = frag_ptr->fragment_len;
+ while( processed_bytes < pack_ptr->data_len )
+ {
+ padding_bytes = 0;
+ switch (processed_bytes % 4)
+ {
+ case 1:
+ padding_bytes++;
+ case 2:
+ padding_bytes++;
+ case 3:
+ padding_bytes++;
+ case 0:
+ /* already aligned */
+ break;
+ }
+ processed_bytes += padding_bytes;
+
+ /* Sanity check for packet validity */
+ if( processed_bytes + sizeof(fragment_header) > pack_ptr->data_len )
+ {
+ Alarm( PRINT, "Prot_handle_bcast: invalid packet with seq %d from %d, fragments exceed data_len %d %d\n",
+ pack_ptr->transmiter_id, pack_ptr->seq, processed_bytes, pack_ptr->data_len );
+ break;
+ }
+ frag_ptr = (fragment_header *) &pack_body_ptr[processed_bytes];
+ Flip_frag(frag_ptr);
+ processed_bytes += sizeof(fragment_header) + frag_ptr->fragment_len;
+ }
+ if( processed_bytes != pack_ptr->data_len )
+ {
+ Alarm( PRINT, "Prot_handle_bcast: invalid packet with seq %d from %d, processed bytes not equal data_len %d %d\n",
+ pack_ptr->transmiter_id, pack_ptr->seq, processed_bytes, pack_ptr->data_len );
+ /*
+ * This is a malformed packet, but we decide to keep it instead of throwing it away.
+ * Note that a packet with the same endianess will not even get here
+ */
+ }
+ }
+
/* insert new packet */
Packets[pack_entry].head = pack_ptr;
- Packets[pack_entry].body = Buffered_packets[i].body;
+ Packets[pack_entry].body = (packet_body *)New_pack.elements[1].buf;
Packets[pack_entry].exist = 1;
+ /* If this packet was from my predecessor in the ring
+ * (pack_ptr->transmiter_id == Prev_proc_id), and this packet is
+ * marked with a token round greater than the last round in which I
+ * received a token (pack_ptr->token_round > Received_token_rounds),
+ * then I can conclude that I've processed all bcast packets from the
+ * previous round, so it is safe to process the next token.
+ * My predecessor has already sent me the token for the round
+ * marked on this packet, but I haven't processed it yet, so I should
+ * try to process that token before handling more bcasts */
+ if( !Token_has_priority &&
+ pack_ptr->token_round > Received_token_rounds &&
+ pack_ptr->transmiter_id == Prev_proc_id )
+ {
+ bcast_channels = Net_bcast_channel();
+ token_channels = Net_token_channel();
+ Net_num_channels( &num_bcast, &num_token);
+ for ( i = 0; i < num_bcast; i++ ) {
+ ret = E_detach_fd_priority( *bcast_channels, READ_FD, HIGH_PRIORITY );
+ if( ret < 0 ) {
+ Alarm( EXIT, "Prot_handle_bcast: bcast_channel being detached was not found\n");
+ }
+ ret = E_attach_fd( *bcast_channels, READ_FD, Prot_handle_bcast, 0, NULL, MEDIUM_PRIORITY );
+ if( ret < 0 ) {
+ Alarm( EXIT, "Prot_handle_bcast: bcast_channel could not be attached\n");
+ }
+ bcast_channels++;
+ }
+ for ( i = 0; i < num_token; i++ ) {
+ ret = E_detach_fd_priority( *token_channels, READ_FD, MEDIUM_PRIORITY );
+ if( ret < 0 ) {
+ Alarm( EXIT, "Prot_handle_bcast: token_channel being detached was not found\n");
+ }
+ E_attach_fd( *token_channels, READ_FD, Prot_handle_token, 0, NULL, HIGH_PRIORITY );
+ if( ret < 0 ) {
+ Alarm( EXIT, "Prot_handle_bcast: token_channel could not be attached\n");
+ }
+ token_channels++;
+ }
+ Token_has_priority = TRUE;
+ }
+
/* update variables */
if( Highest_seq < pack_ptr->seq ) Highest_seq = pack_ptr->seq;
if( pack_ptr->seq == My_aru+1 )
{
- for( j=pack_ptr->seq; j <= Highest_seq; j++ )
+ for( i=pack_ptr->seq; i <= Highest_seq; i++ )
{
- if( ! Packets[j & PACKET_MASK].exist ) break;
+ if( ! Packets[i & PACKET_MASK].exist ) break;
My_aru++;
}
Deliver_agreed_packets();
@@ -422,7 +471,6 @@
Alarm( PROTOCOL, "Prot_handle_bcast: packet %d inserted\n",
pack_ptr->seq );
- } /* END OF LOOP */
GlobalStatus.packet_recv++;
GlobalStatus.my_aru = My_aru;
@@ -444,7 +492,11 @@
int16 rtr_seg_index;
int32 val;
int retrans_allowed; /* how many of my retrans are allowed on token */
+ int max_rtr_seq;
int i, ret;
+ int num_bcast, num_token;
+ channel *bcast_channels;
+ channel *token_channels;
/* int r1,r2;*/
@@ -549,6 +601,40 @@
if( Highest_seq < Token->seq ) Highest_seq = Token->seq;
+ /* I don't want to process my next token until I have processed
+ * all bcast packets sent in the previous round, so I should give
+ * bcast channels a higher priority until then */
+ Received_token_rounds++;
+ if( Token_has_priority == TRUE )
+ {
+ bcast_channels = Net_bcast_channel();
+ token_channels = Net_token_channel();
+ Net_num_channels( &num_bcast, &num_token );
+ for ( i = 0; i < num_bcast; i++ ) {
+ ret = E_detach_fd_priority( *bcast_channels, READ_FD, MEDIUM_PRIORITY );
+ if( ret < 0 ) {
+ Alarm( EXIT, "Prot_handle_token: bcast_channel being detached was not found\n");
+ }
+ E_attach_fd( *bcast_channels, READ_FD, Prot_handle_bcast, 0, NULL, HIGH_PRIORITY );
+ if( ret < 0 ) {
+ Alarm( EXIT, "Prot_handle_token: bcast_channel could not be attached\n");
+ }
+ bcast_channels++;
+ }
+ for ( i = 0; i < num_token; i++ ) {
+ ret = E_detach_fd_priority( *token_channels, READ_FD, HIGH_PRIORITY );
+ if( ret < 0 ) {
+ Alarm( EXIT, "Prot_handle_token: token_channel being detached was not found\n");
+ }
+ E_attach_fd( *token_channels, READ_FD, Prot_handle_token, 0, NULL, MEDIUM_PRIORITY );
+ if( ret < 0 ) {
+ Alarm( EXIT, "Prot_handle_token: token_channel could not be attached\n");
+ }
+ token_channels++;
+ }
+ Token_has_priority = FALSE;
+ }
+
/* Handle retransmissions */
num_retrans = Answer_retrans( &new_ptr, &rtr_proc_id, &rtr_seg_index );
GlobalStatus.retrans += num_retrans;
@@ -577,7 +663,18 @@
}
GlobalStatus.my_aru = My_aru;
- if( My_aru < Highest_seq )
+ /* Determine sequence up through which retransmissions should be
+ * requested: If Accelerated_ring is true, packets with sequence
+ * numbers between the sequence on the last token this process sent
+ * and the sequence on the token it just received may still be in
+ * transit or not yet sent, so they should not be requested at this point*/
+ if( FC_accelerated_ring() == TRUE && Memb_state() != EVS ){
+ max_rtr_seq = Last_token->seq;
+ }else{
+ max_rtr_seq = Highest_seq;
+ }
+
+ if( My_aru < max_rtr_seq )
{
/* Compute how many of my retransmission requests are possible to fit */
retrans_allowed = ( sizeof( token_body ) - new_ptr - sizeof( ring_rtr ) ) / sizeof( int32 );
@@ -589,7 +686,7 @@
ring_rtr_ptr->seg_index = rtr_seg_index;
ring_rtr_ptr->num_seq = 0;
new_ptr += sizeof(ring_rtr);
- for( i=My_aru+1; i <= Highest_seq && retrans_allowed > 0; i++ )
+ for( i=My_aru+1; i <= max_rtr_seq && retrans_allowed > 0; i++ )
{
if( ! Packets[i & PACKET_MASK].exist )
{
@@ -668,6 +765,9 @@
E_queue( Memb_token_loss, 0, NULL, Token_timeout );
+ /* Send any packets remaining in queue */
+ Prot_flush_bcast(&Send_pack_queue);
+
/* calculating Aru */
if( Token->aru > Last_token->aru ) {
/*Alarmp( SPLOG_INFO, PROTOCOL, "Prot_handle_token: updating Aru from Last_token; Aru %d -> %d\n", Aru, Last_token->aru );*/
@@ -784,7 +884,7 @@
/* update protocol variables with new conf */
My = Conf_my();
- My_index = Conf_proc_by_id( My.id, &My );
+ My_index = Conf_proc_by_id( My.id, &My );
if (need_memb_partition) {
/* make partition */
@@ -832,11 +932,11 @@
}
}
-/* ### Pack: this routine has changed */
static int Answer_retrans( int *ret_new_ptr,
int32 *proc_id, int16 *seg_index )
{
int num_retrans;
+ sys_scatter *send_pack_ptr;
char *rtr;
int old_ptr,new_ptr;
ring_rtr *ring_rtr_ptr;
@@ -871,38 +971,36 @@
if( Packets[pack_entry].exist )
{
+ send_pack_ptr = new(SYS_SCATTER);
+ send_pack_ptr->num_elements = 2;
+ send_pack_ptr->elements[0].len = sizeof(packet_header);
pack_ptr = Packets[pack_entry].head;
- Send_pack.elements[0].buf = (char *)Packets[pack_entry].head;
- Send_pack.elements[1].buf = (char *)Packets[pack_entry].body;
- Send_pack.elements[1].len = pack_ptr->data_len;
+ send_pack_ptr->elements[0].buf = (char *)Packets[pack_entry].head;
+ send_pack_ptr->elements[1].buf = (char *)Packets[pack_entry].body;
+ send_pack_ptr->elements[1].len = pack_ptr->data_len;
if( ring_rtr_ptr->proc_id != -1 )
{
- ret = Net_ucast ( ring_rtr_ptr->proc_id, &Send_pack );
+ ret = Net_ucast ( ring_rtr_ptr->proc_id, send_pack_ptr );
+ dispose(send_pack_ptr);
GlobalStatus.u_retrans++;
Alarmp( SPLOG_INFO, PROTOCOL, "Answer_retrans: retransmit %d to proc 0x%08X\n", *req_seq, ring_rtr_ptr->proc_id );
}else if( ring_rtr_ptr->seg_index != -1 ) {
- ret = Net_scast ( ring_rtr_ptr->seg_index, &Send_pack );
+ ret = Net_scast ( ring_rtr_ptr->seg_index, send_pack_ptr );
+ dispose(send_pack_ptr);
GlobalStatus.s_retrans++;
Alarmp( SPLOG_INFO, PROTOCOL, "Answer_retrans: retransmit %d to seg 0x%08X\n", *req_seq, ring_rtr_ptr->seg_index );
}else{
-#if 1
- ret = Net_queue_bcast ( &Send_pack );
-#else
- ret = Net_bcast ( &Send_pack );
-#endif
- if( ret > 0 ) GlobalStatus.b_retrans++;
+ ret = Prot_queue_bcast ( send_pack_ptr, &Send_pack_queue );
+ GlobalStatus.b_retrans++;
Alarmp( SPLOG_INFO, PROTOCOL, "Answer_retrans: retransmit %d to all\n", *req_seq);
}
- if( ret > 0 )
- {
num_retrans++;
- }
}else{
*proc_id = -1;
if( ring_rtr_ptr->seg_index != My.seg_index )
@@ -925,87 +1023,165 @@
}
}
*ret_new_ptr = new_ptr;
-
- ret = Net_flush_bcast();
- if( ret > 0 )
- {
- GlobalStatus.b_retrans++;
- num_retrans++;
- }
return (num_retrans);
}
-/* ### Pack: this routine has changed */
static int Send_new_packets( int num_allowed )
{
- packet_header *pack_ptr;
- scatter *scat_ptr;
- int pack_entry;
- int num_sent;
- int ret;
+ packet_header *pack_ptr;
+ scatter *scat_ptr;
+ sys_scatter *send_pack_ptr;
+ fragment_header *frag_ptr;
+ char *body_ptr;
+ int pack_entry;
+ int num_sent;
+ int padding_bytes;
+ int available_bytes;
+ int ret;
- num_sent = 0;
+ num_sent = 0;
while( num_sent < num_allowed )
{
- /* check if down queue is empty */
- if( Down_queue_ptr->num_mess == 0 ) break;
+ /* check if down queue is empty */
+ if( Down_queue_ptr->num_mess == 0 ) break;
- /* initialize packet_header */
+ /* initialize packet_header */
pack_ptr = new(PACK_HEAD_OBJ);
- scat_ptr = Down_queue_ptr->first->mess;
+ scat_ptr = Down_queue_ptr->first->mess;
pack_ptr->type = Down_queue_ptr->first->type;
pack_ptr->proc_id = My.id;
pack_ptr->memb_id = Memb_id();
pack_ptr->seq = Highest_seq+1;
Highest_seq++;
- pack_ptr->fifo_seq = Highest_fifo_seq+1;
- Highest_fifo_seq++;
+ /*pack_ptr->fifo_seq = Highest_fifo_seq+1; ### Commented out because fifo_seq was replaced with token_round*/
+ Highest_fifo_seq++;
pack_ptr->data_len = scat_ptr->elements[
- Down_queue_ptr->cur_element].len;
+ Down_queue_ptr->cur_element].len;
+ pack_ptr->first_frag_header.fragment_len = scat_ptr->elements[
+ Down_queue_ptr->cur_element].len;
- Send_pack.elements[1].buf = scat_ptr->elements[
- Down_queue_ptr->cur_element].buf;
+ send_pack_ptr = new(SYS_SCATTER); /* send_pack_ptr is freed when the packet is sent (after Net_bcast is called) */
+ send_pack_ptr->num_elements = 2;
+ send_pack_ptr->elements[0].len = sizeof(packet_header);
+ send_pack_ptr->elements[1].buf = scat_ptr->elements[
+ Down_queue_ptr->cur_element].buf;
+ body_ptr = send_pack_ptr->elements[1].buf;
- Down_queue_ptr->cur_element++;
- if( Down_queue_ptr->cur_element < scat_ptr->num_elements )
- {
- /* not last packet in message */
- pack_ptr->packet_index = Down_queue_ptr->cur_element;
- }else if( Down_queue_ptr->cur_element == scat_ptr->num_elements ){
- down_link *tmp_down;
+ /* Set frag_ptr to point to the fragment header in the packet header */
+ frag_ptr = &(pack_ptr->first_frag_header);
- /* last packet in message */
- pack_ptr->packet_index = -scat_ptr->num_elements;
+ /* Loop for filling the packet:
+ *
+ * Advance the down queue and set the fragment index for the fragment
+ * just added to the packet.
+ *
+ * Break if another fragment cannot be added to this packet.
+ * This happens in 3 cases:
+ * 1. The down queue is empty.
+ * 2. The next fragment in the down queue is too big to fit in the packet.
+ * 3. The next message does not have a type compatible with the current
+ * packet type.
+ *
+ * If the next fragment can be added (i.e. you didn't break for one of the 3
+ * reasons above), set the frag_ptr to point after the end of the last fragment
+ * in the packet body and copy the fragment from the down queue into the packet
+ * body.
+ */
+ for(;;)
+ {
+ /* Advance the down queue and set the fragment index for the fragment
+ * just added to the packet. */
+ Down_queue_ptr->cur_element++;
+ if( Down_queue_ptr->cur_element < scat_ptr->num_elements )
+ {
+ /* not last packet in message */
+ frag_ptr->fragment_index = Down_queue_ptr->cur_element;
+ }else if( Down_queue_ptr->cur_element == scat_ptr->num_elements ){
+ down_link *tmp_down;
- tmp_down = Down_queue_ptr->first;
- Down_queue_ptr->first = Down_queue_ptr->first->next;
- Down_queue_ptr->cur_element = 0;
- Down_queue_ptr->num_mess--;
- dispose( tmp_down->mess );
- dispose( tmp_down );
- if( Down_queue_ptr->num_mess < WATER_MARK )
- Sess_unblock_users_level();
- }else{
- Alarm( EXIT,
- "Send_new_packets: error in packet index: %d %d\n",
- Down_queue_ptr->cur_element,scat_ptr->num_elements );
- }
+ /* last packet in message */
+ frag_ptr->fragment_index = -scat_ptr->num_elements;
- Send_pack.elements[0].buf = (char *) pack_ptr;
- Send_pack.elements[1].len = pack_ptr->data_len;
+ tmp_down = Down_queue_ptr->first;
+ Down_queue_ptr->first = Down_queue_ptr->first->next;
+ Down_queue_ptr->cur_element = 0;
+ Down_queue_ptr->num_mess--;
+ dispose( tmp_down->mess );
+ dispose( tmp_down );
+ if( Down_queue_ptr->num_mess < WATER_MARK )
+ Sess_unblock_users_level();
+ }else{
+ Alarm( EXIT,
+ "Send_new_packets: error in packet index: %d %d\n",
+ Down_queue_ptr->cur_element,scat_ptr->num_elements );
+ }
+ /* Break if another fragment cannot be added to this packet.
+ * This happens in 3 cases:
+ * 1. The down queue is empty.
+ * 2. The next fragment in the down queue is too big to fit in the packet.
+ * 3. The next message does not have a type compatible with the current
+ * packet type.
+ */
+ if( Down_queue_ptr->num_mess == 0 ) break;
-#if 1
- ret = Net_queue_bcast( &Send_pack );
-#else
- ret = Net_bcast( &Send_pack );
-#endif
- if( ret > 0 )
- {
- num_sent++;
- }
+ padding_bytes = 0;
+ switch (pack_ptr->data_len % 4)
+ {
+ case 1:
+ padding_bytes++;
+ case 2:
+ padding_bytes++;
+ case 3:
+ padding_bytes++;
+ case 0:
+ /* already aligned */
+ break;
+ }
+ available_bytes = sizeof(packet_body) - pack_ptr->data_len - padding_bytes - sizeof(fragment_header);
+ scat_ptr = Down_queue_ptr->first->mess;
+ /*
+ * The comparison doesn't work without the (int) cast because len is size_t and available_bytes is int
+ * It is probable that size_t is defined as unsigned. Note that available_bytes can be negative.
+ * Without the casting of size_t to int, available_bytes will be automatically casted to size_t
+ * making it a very large number and causing a bug. Therefore, we have to cast the left side
+ * of the equation to int
+ */
+ if( ( (int) scat_ptr->elements[Down_queue_ptr->cur_element].len ) > available_bytes ) break;
+ if( pack_ptr->type != Down_queue_ptr->first->type )
+ {
+ if( !(Is_fifo(pack_ptr->type) || Is_agreed(pack_ptr->type)) ||
+ !(Is_fifo(Down_queue_ptr->first->type) || Is_agreed(Down_queue_ptr->first->type)) )
+ break;
+ }
+ /*
+ * If the next fragment can be added (i.e. we didn't break for one of the 3
+ * reasons above), set the frag_ptr to point after the end of the last fragment
+ * in the packet body and copy the fragment from the down queue into the packet
+ * body.
+ */
+ frag_ptr = (fragment_header *) &body_ptr[ pack_ptr->data_len + padding_bytes ];
+ frag_ptr->fragment_len = scat_ptr->elements[Down_queue_ptr->cur_element].len;
+ pack_ptr->data_len += padding_bytes + sizeof(fragment_header);
+ memcpy(&body_ptr[pack_ptr->data_len], scat_ptr->elements[Down_queue_ptr->cur_element].buf, frag_ptr->fragment_len);
+ pack_ptr->data_len += frag_ptr->fragment_len;
+ /*
+ * Dispose buf for copied fragment. Note that this is only possible because
+ * copied fragments are guaranteed to be whole messages (so we can't have
+ * a scenario in which it is part of a partially sent message). The first fragment
+ * will be disposed when the packet is discarded/the message is delivered
+ * */
+ dispose( (packet_body *) (scat_ptr->elements[Down_queue_ptr->cur_element].buf) );
+ }
+
+ send_pack_ptr->elements[0].buf = (char *) pack_ptr;
+ send_pack_ptr->elements[1].len = pack_ptr->data_len;
+
+ ret = Prot_queue_bcast( send_pack_ptr, &Send_pack_queue );
+ num_sent++;
+
pack_entry = pack_ptr->seq & PACKET_MASK;
if( Packets[pack_entry].exist )
Alarm( EXIT,
@@ -1014,37 +1190,195 @@
/* insert new created packet */
Packets[pack_entry].head = pack_ptr;
- Packets[pack_entry].body = (packet_body *)Send_pack.elements[1].buf;
+ Packets[pack_entry].body = (packet_body *)send_pack_ptr->elements[1].buf;
Packets[pack_entry].exist = 1;
Packets[pack_entry].proc_index = My_index;
Alarm( PROTOCOL,
- "Send_new_packets: packet %d sent and inserted \n",
- pack_ptr->seq );
+ "Send_new_packets: packet %d sent and inserted \n",
+ pack_ptr->seq );
}
- ret = Net_flush_bcast();
- if( ret > 0 )
- {
- num_sent++;
- }
- return ( num_sent );
+ return ( num_sent );
}
+static int Prot_queue_bcast( sys_scatter *send_pack_ptr, packet_queue *pack_queue )
+{
+ sys_scatter *scat_ptr;
+ queue_link *link_ptr;
+ packet_header *pack_ptr; /* Added to set token_rounds before sending */
+ int num_pack_to_send;
+ int ret;
+ int i;
+
+ /* To queue or send a new packet:
+ * 1. Determine what you need to send from queue.
+ * If Accelerated_ring is not used, send everything. (unless the Accelerated_ring
+ * option is changed dynamically, there should not be anything in the queue)
+ * Otherwise, send the difference between the number of packets in the queue
+ * and the max packets in the queue, plus one to account for the new packet.
+ * (unless max_packets is changed dynamically, this should be 1, if the queue
+ * is full, or 0, if the queue is not full)
+ * 2. Send the number of packets that you calculated.
+ * 3. If Accelerated_ring is not used, or the max packets in the queue is 0, send
+ * the new packet.
+ * Otherwise, add the new packet to the queue.
+ */
+ if( FC_accelerated_ring() == FALSE || Memb_state() == EVS)
+ {
+ num_pack_to_send = pack_queue->num_packets;
+ }else{
+ num_pack_to_send = pack_queue->num_packets - FC_accelerated_window() + 1;
+ if( num_pack_to_send < 0 ) num_pack_to_send = 0;
+ /* The case below can happen if accelerated window changes dynamically */
+ if( num_pack_to_send > pack_queue->num_packets ) num_pack_to_send = pack_queue->num_packets;
+ }
+
+ for( i = 0; i < num_pack_to_send; i++ )
+ {
+ scat_ptr = pack_queue->first->packet;
+ link_ptr = pack_queue->first;
+ pack_queue->first = pack_queue->first->next;
+ pack_queue->num_packets--;
+ dispose(link_ptr);
+ pack_ptr = (packet_header *) scat_ptr->elements[0].buf;
+ pack_ptr->token_round = Token_rounds;
+ Net_bcast(scat_ptr);
+ dispose(scat_ptr);
+ }
+ ret = num_pack_to_send;
+
+ if( FC_accelerated_ring() == FALSE || FC_accelerated_window() == 0 || Memb_state() == EVS )
+ {
+ pack_ptr = (packet_header *) send_pack_ptr->elements[0].buf;
+ pack_ptr->token_round = Token_rounds;
+ Net_bcast(send_pack_ptr);
+ dispose(send_pack_ptr);
+ ret++;
+ }else{
+ link_ptr = new(QUEUE_LINK);
+ link_ptr->packet = send_pack_ptr;
+ link_ptr->next = NULL;
+
+ if( pack_queue->num_packets == 0 )
+ {
+ pack_queue->first = link_ptr;
+ }else{
+ pack_queue->last->next = link_ptr;
+ }
+ pack_queue->last = link_ptr;
+ pack_queue->num_packets++;
+ }
+
+ return ( ret );
+}
+
+static int Prot_flush_bcast( packet_queue *pack_queue )
+{
+ sys_scatter *scat_ptr;
+ queue_link *link_ptr;
+ packet_header *pack_ptr;
+ int ret;
+
+ ret = pack_queue->num_packets;
+ while( pack_queue->num_packets > 0 )
+ {
+ scat_ptr = pack_queue->first->packet;
+ link_ptr = pack_queue->first;
+ pack_queue->first = pack_queue->first->next;
+ pack_queue->num_packets--;
+ dispose(link_ptr);
+ pack_ptr = (packet_header *) scat_ptr->elements[0].buf;
+ pack_ptr->token_round = Token_rounds;
+ Net_bcast(scat_ptr);
+ dispose(scat_ptr);
+ }
+ return( ret );
+}
+
static void Deliver_packet( int pack_entry, int to_copy )
{
int proc_index;
up_queue *up_ptr;
packet_header *pack_ptr;
message_link *mess_link;
+ fragment_header *frag_ptr;
+ char *pack_body_ptr;
+ message_queue mess_queue;
+ int processed_bytes;
+ int padding_bytes;
int index;
pack_ptr = Packets[pack_entry].head;
+ /*
+ * For multi-fragment packets, the following observations can be made:
+ * 1. The first fragment is either the last fragment in a multi-packet message
+ * or a whole message.
+ * 2. Each fragment after the first must be a whole message.
+ *
+ * For any fragment beyond the first, we create a message (with a new packet body)
+ * and put it in a queue. We then process the first fragment, including the
+ * decision whether to copy it or not. Finally, we deliver to session all queued
+ * messages coming from the additional fragments.
+ */
+
+ mess_queue.num_messages = 0;
+ mess_queue.first = NULL;
+ mess_queue.last = NULL;
+ pack_body_ptr = (char *) Packets[pack_entry].body;
+ frag_ptr = &(pack_ptr->first_frag_header);
+ processed_bytes = frag_ptr->fragment_len;
+ while( processed_bytes < pack_ptr->data_len )
+ {
+ padding_bytes = 0;
+ switch (processed_bytes % 4)
+ {
+ case 1:
+ padding_bytes++;
+ case 2:
+ padding_bytes++;
+ case 3:
+ padding_bytes++;
+ case 0:
+ /* already aligned */
+ break;
+ }
+ processed_bytes += padding_bytes;
+ frag_ptr = (fragment_header *) &pack_body_ptr[processed_bytes];
+ processed_bytes += sizeof(fragment_header);
+ if( processed_bytes + frag_ptr->fragment_len > pack_ptr->data_len )
+ {
+ Alarm( PRINT, "Deliver_packet: invalid packet with seq %d from %d, fragments exceed data_len %d %d %d\n",
+ pack_ptr->seq, pack_ptr->transmiter_id, processed_bytes, frag_ptr->fragment_len, pack_ptr->data_len );
+ break;
+ }
+
+ /* Creating new message */
+ mess_link = new(MESSAGE_LINK);
+ mess_link->next = NULL;
+ mess_link->mess = new(SCATTER);
+ mess_link->mess->num_elements = 1;
+ mess_link->mess->elements[0].len = frag_ptr->fragment_len;
+ mess_link->mess->elements[0].buf = new(PACKET_BODY);
+ memcpy(mess_link->mess->elements[0].buf, &pack_body_ptr[processed_bytes], frag_ptr->fragment_len);
+ processed_bytes += frag_ptr->fragment_len;
+ if( mess_queue.num_messages == 0 ) {
+ mess_queue.first = mess_link;
+ mess_queue.last = mess_link;
+ } else {
+ mess_queue.last->next = mess_link;
+ mess_queue.last = mess_link;
+ }
+ mess_queue.num_messages++;
+ }
+
if( Is_reliable( pack_ptr->type ) &&
- pack_ptr->packet_index == -1 )
+ pack_ptr->first_frag_header.fragment_index == -1 )
{
/*
* for reliable single-packets message : deliver regardless
* of what is already in the queue.
+ * This also applies to reliable multi-fragment packets where all
+ * fragments have fragment index -1.
*/
proc_index = MAX_PROCS_RING;
}else{
@@ -1062,20 +1396,20 @@
}
/* validity check */
- index = pack_ptr->packet_index;
+ index = pack_ptr->first_frag_header.fragment_index;
if( index < 0 ) index = -index;
if( up_ptr->mess->num_elements+1 != index )
{
Alarm( EXIT, "Deliver_packet: sequence error: sec is %d, should be %d\n",
- pack_ptr->packet_index,
+ pack_ptr->first_frag_header.fragment_index,
up_ptr->mess->num_elements+1 );
}
/* chain this packet */
up_ptr->mess->num_elements++;
- up_ptr->mess->elements[index-1].len = Packets[pack_entry].head->data_len;
+ up_ptr->mess->elements[index-1].len = Packets[pack_entry].head->first_frag_header.fragment_len;
up_ptr->mess->elements[index-1].buf = (char *)Packets[pack_entry].body;
- if( to_copy)
+ if( to_copy )
{
/*
* copy the packet.
@@ -1094,7 +1428,7 @@
GlobalStatus.packet_delivered++;
- if( pack_ptr->packet_index < 0 )
+ if( pack_ptr->first_frag_header.fragment_index < 0 )
{
/* end of message */
/* Push up big_scatter. i.e. up_ptr->mess */
@@ -1103,6 +1437,15 @@
up_ptr->exist = 0;
Sess_deliver_message( mess_link );
}
+
+ /* Delivering all messages coming from non-first fragments */
+ while( mess_queue.num_messages > 0 )
+ {
+ mess_link = mess_queue.first;
+ mess_queue.first = mess_queue.first->next;
+ mess_queue.num_messages--;
+ Sess_deliver_message(mess_link);
+ }
}
static void Deliver_reliable_packets( int32 start_seq, int num_packets )
@@ -1121,7 +1464,7 @@
if( Packets[pack_entry].exist == 1 )
{
if( Is_reliable( Packets[pack_entry].head->type ) &&
- Packets[pack_entry].head->packet_index == -1 )
+ Packets[pack_entry].head->first_frag_header.fragment_index == -1 )
{
Deliver_packet( pack_entry, 1 );
Alarm( PROTOCOL, "Deliver_reliable_packets: packet %d was delivered\n", i );
@@ -1445,6 +1788,12 @@
Get_arq(Last_token->type), Get_retrans(Last_token->type) );
}
+void Prot_set_prev_proc(configuration *memb)
+{
+ Prev_proc_id = Conf_previous(memb);
+ Alarm( PROTOCOL, "Prev_proc_id: %d, My.id: %d\n", Prev_proc_id, My.id );
+}
+
void Flip_token_body( char *buf, token_header *token_ptr )
{
/*
@@ -1480,3 +1829,9 @@
}
}
}
+
+void Flip_frag( fragment_header *frag_ptr )
+{
+ frag_ptr->fragment_index = Flip_int16( frag_ptr->fragment_index );
+ frag_ptr->fragment_len = Flip_int16( frag_ptr->fragment_len );
+}
Modified: trunk/daemon/protocol.h
===================================================================
--- trunk/daemon/protocol.h 2013-11-18 19:56:46 UTC (rev 628)
+++ trunk/daemon/protocol.h 2013-12-17 17:17:17 UTC (rev 629)
@@ -64,5 +64,6 @@
void Prot_Destroy_Local_Session(session *old_sess);
down_link *Prot_Create_Down_Link(message_obj *msg, int type, int mbox, int cur_element);
void Prot_kill_session(message_obj *msg);
+void Prot_set_prev_proc(configuration *memb);
#endif /* INC_PROTOCOL */
Modified: trunk/daemon/spread.c
===================================================================
--- trunk/daemon/spread.c 2013-11-18 19:56:46 UTC (rev 628)
+++ trunk/daemon/spread.c 2013-12-17 17:17:17 UTC (rev 629)
@@ -24,6 +24,7 @@
*
* Major Contributor(s):
* ---------------
+ * Amy Babay babay at cs.jhu.edu - accelerated ring protocol.
* Ryan Caudy rcaudy at gmail.com - contributions to process groups.
* Claudiu Danilov claudiu at acm.org - scalable wide area support.
* Cristina Nita-Rotaru crisn at cs.purdue.edu - group communication security.
@@ -91,7 +92,7 @@
Alarm_set_priority( SPLOG_INFO );
Alarmp( SPLOG_PRINT, SYSTEM, "/===========================================================================\\\n");
- Alarmp( SPLOG_PRINT, SYSTEM, "| The Spread Toolkit. |\n");
+ Alarmp( SPLOG_PRINT, SYSTEM, "| The Spread Toolkit - Accelerated Ring Experimental Research Version |\n");
Alarmp( SPLOG_PRINT, SYSTEM, "| Copyright (c) 1993-2013 Spread Concepts LLC |\n");
Alarmp( SPLOG_PRINT, SYSTEM, "| All rights reserved. |\n");
Alarmp( SPLOG_PRINT, SYSTEM, "| |\n");
@@ -113,6 +114,7 @@
Alarmp( SPLOG_PRINT, SYSTEM, "| John Schultz jschultz at spreadconcepts.com |\n");
Alarmp( SPLOG_PRINT, SYSTEM, "| |\n");
Alarmp( SPLOG_PRINT, SYSTEM, "| Major Contributors: |\n");
+ Alarmp( SPLOG_PRINT, SYSTEM, "| Amy Babay babay at cs.jhu.edu - accelerated ring protocol. |\n");
Alarmp( SPLOG_PRINT, SYSTEM, "| Ryan Caudy rcaudy at gmail.com - contribution to process groups.|\n");
Alarmp( SPLOG_PRINT, SYSTEM, "| Claudiu Danilov claudiu at acm.org - scalable, wide-area support. |\n");
Alarmp( SPLOG_PRINT, SYSTEM, "| Cristina Nita-Rotaru crisn at cs.purdue.edu - GC security. |\n");
@@ -132,7 +134,7 @@
Alarmp( SPLOG_PRINT, SYSTEM, "| WWW: www.spread.org www.spreadconcepts.com |\n");
Alarmp( SPLOG_PRINT, SYSTEM, "| Contact: info at spreadconcepts.com |\n");
Alarmp( SPLOG_PRINT, SYSTEM, "| |\n");
- Alarmp( SPLOG_PRINT, SYSTEM, "| Version %d.%02d.%02d Built %-17s |\n",
+ Alarmp( SPLOG_PRINT, SYSTEM, "| Version %d.%02d.%02d Built %-17s -accelerated ring research version|\n",
(int)SP_MAJOR_VERSION, (int)SP_MINOR_VERSION, (int)SP_PATCH_VERSION, Spread_build_date );
Alarmp( SPLOG_PRINT, SYSTEM, "\\===========================================================================/\n");
Modified: trunk/daemon/spread_params.h
===================================================================
--- trunk/daemon/spread_params.h 2013-11-18 19:56:46 UTC (rev 628)
+++ trunk/daemon/spread_params.h 2013-12-17 17:17:17 UTC (rev 629)
@@ -41,7 +41,7 @@
#define SP_PATCH_VERSION 0
#define SPREAD_PROTOCOL 3
-#define SPREAD_BUILD_DATE "11/June/2013"
+#define SPREAD_BUILD_DATE "02/July/2013"
#define DEFAULT_SPREAD_PORT 4803
Modified: trunk/daemon/status.h
===================================================================
--- trunk/daemon/status.h 2013-11-18 19:56:46 UTC (rev 628)
+++ trunk/daemon/status.h 2013-12-17 17:17:17 UTC (rev 629)
@@ -63,6 +63,8 @@
int16 num_segments;
int16 window;
int16 personal_window;
+ int16 accelerated_ring;
+ int16 accelerated_window;
int16 num_sessions;
int16 num_groups;
int16 major_version;
Modified: trunk/docs/sample.spread.conf
===================================================================
--- trunk/docs/sample.spread.conf 2013-11-18 19:56:46 UTC (rev 628)
+++ trunk/docs/sample.spread.conf 2013-12-17 17:17:17 UTC (rev 629)
@@ -103,7 +103,7 @@
#If option is set to true then all monitor commands are enabled.
# THIS IS A SECURTIY RISK IF YOUR NETWORK IS NOT PROTECTED!
-#DangerousMonitor = false
+DangerousMonitor = true
#Set handling of SO_REUSEADDR socket option for the daemon's TCP
# listener. This is useful for facilitating quick daemon restarts (OSes
@@ -235,10 +235,20 @@
# networks. If no letter is listed before the interface address then ALL types of
# events are handled on that interface.
+# AcceleratedRing flag indicates which protocol to use. If false, the normal Spread
+# ring protocol is used. If true, a currently experimental accelerated ring protocol
+# is used. Note that daemons can only talk with daemons that run the same protocol - they
+# will not agree to talk with daemons that run the other protocol. The accelerated ring
+# protocol uses a flow control algorithm similar to the normal protocol, with the
+# exception that up to AcceleratedWindow packets out of the PersonalWindow packets
+# are sent after the token is sent (the token still reflects these packets).
+AcceleratedRing = true
+
# Flow Control Parameters
#
-#Window = 100
-#PersonalWindow = 20
+Window = 100
+PersonalWindow = 20
+AcceleratedWindow = 20
# Membership Timeouts (in terms of milliseconds)
#
Modified: trunk/libspread-util/include/spu_events.h
===================================================================
--- trunk/libspread-util/include/spu_events.h 2013-11-18 19:56:46 UTC (rev 628)
+++ trunk/libspread-util/include/spu_events.h 2013-12-17 17:17:17 UTC (rev 629)
@@ -89,6 +89,7 @@
void (* func)( int fd, int code, void *data), int code,
void *data, int priority );
int E_detach_fd( int fd, int fd_type );
+int E_detach_fd_priority( int fd, int fd_type, int priority );
int E_set_active_threshold( int priority );
int E_activate_fd( int fd, int fd_type );
int E_deactivate_fd( int fd, int fd_type );
Modified: trunk/libspread-util/include/spu_objects_local.h
===================================================================
--- trunk/libspread-util/include/spu_objects_local.h 2013-11-18 19:56:46 UTC (rev 628)
+++ trunk/libspread-util/include/spu_objects_local.h 2013-12-17 17:17:17 UTC (rev 629)
@@ -82,6 +82,7 @@
#define MEMBER 32
#define MSG_LIST_ENTRY 33
#define SESS_SEQ_ENTRY 34
+#define QUEUE_LINK 35
#define ROUTE_WEIGHTS 36
#define PROF_FUNCT 37
Modified: trunk/libspread-util/src/events.c
===================================================================
--- trunk/libspread-util/src/events.c 2013-11-18 19:56:46 UTC (rev 628)
+++ trunk/libspread-util/src/events.c 2013-12-17 17:17:17 UTC (rev 629)
@@ -617,7 +617,7 @@
int E_detach_fd( int fd, int fd_type )
{
- int i,j;
+ int i;
int found;
if( fd_type < 0 || fd_type > NUM_FDTYPES )
@@ -628,21 +628,45 @@
found = 0;
for( i=0; i < NUM_PRIORITY; i++ )
- for( j=0; j < Fd_queue[i].num_fds; j++ )
+ {
+ if( E_detach_fd_priority( fd, fd_type, i ) == 0 )
+ {
+ found = 1;
+ }
+ }
+
+ if( ! found ) return( -1 );
+
+ return( 0 );
+}
+
+int E_detach_fd_priority( int fd, int fd_type, int priority )
+{
+ int i;
+ int found;
+
+ if( fd_type < 0 || fd_type > NUM_FDTYPES )
+ {
+ Alarm( PRINT, "E_detach_fd: invalid fd_type %d for fd %d\n", fd_type, fd );
+ return( -1 );
+ }
+
+ found = 0;
+ for( i=0; i < Fd_queue[priority].num_fds; i++ )
+ {
+ if( ( Fd_queue[priority].events[i].fd == fd ) && ( Fd_queue[priority].events[i].fd_type == fd_type ) )
{
- if( ( Fd_queue[i].events[j].fd == fd ) && ( Fd_queue[i].events[j].fd_type == fd_type ) )
- {
- if (Fd_queue[i].events[j].active)
- Fd_queue[i].num_active_fds--;
- Fd_queue[i].num_fds--;
- Fd_queue[i].events[j] = Fd_queue[i].events[Fd_queue[i].num_fds];
+ if (Fd_queue[priority].events[i].active)
+ Fd_queue[priority].num_active_fds--;
+ Fd_queue[priority].num_fds--;
+ Fd_queue[priority].events[i] = Fd_queue[priority].events[Fd_queue[priority].num_fds];
- FD_CLR( fd, &Fd_mask[fd_type] );
- found = 1;
+ FD_CLR( fd, &Fd_mask[fd_type] );
+ found = 1;
- break; /* from the j for only */
- }
- }
+ break;
+ }
+ }
if( ! found ) return( -1 );
Copied: trunk/release_announcement_4.3.txt (from rev 582, branches/experimental-4.3/release_announcement_4.3.txt)
===================================================================
--- trunk/release_announcement_4.3.txt (rev 0)
+++ trunk/release_announcement_4.3.txt 2013-12-17 17:17:17 UTC (rev 629)
@@ -0,0 +1,49 @@
+This version is an experimental version based on the Spread
+Toolkit 4.3. The file release_announcement_accelerated_ring.txt contains
+the release announcement relevant to the experimental version.
+Below is the original announcement of the Spread Toolkit version 4.3.0.
+
+Spread 4.3.0 http://www.spread.org
+
+Spread Concepts LLC is happy to announce the release of a
+new stable version, 4.3.0, of the Spread toolkit.
+
+The Spread 4.3 release is an important release focusing on improved out-of-the-box
+performance for the Spread toolkit.
+This release has a large number of small changes which include bugfixes,
+very specific performance improvements, and general improvements to the code
+and build process.
+
+The main new features of this release are:
+
+1) Performance improvements to membership timing and system throughput.
+2) Provide runtime configurability for membership timers and flow control
+parameters via the spread.conf file.
+3) Redesign of C library locking to improve throughput and remove blocking.
+4) Smarter autodetection of both the best client-server connection method
+(local system unix sockets or tcp) and the local hostname when starting daemons.
+
+It also includes a number of bug fixes. For details check the Readme.txt file.
+
+This release does not include any API changes, so applications should
+be able to be relinked or recompiled with the new Spread library without
+changes.
+
+The Spread toolkit provides a high performance messaging service
+that is resilient to faults across local and wide area networks.
+Spread functions as a unified message bus for distributed applications,
+and provides highly tuned application-level multicast, group communication,
+and point to point support. Spread services range from reliable messaging
+to fully ordered messages with virtual synchrony delivery guarantees, even in case
+of computer failures and network partitions.
+
+Please be aware, that under the Spread Open Source License, the toolkit may
+be freely used only under some conditions. For example, the license includes
+the requirement that all advertising materials (including web pages)
+mentioning software that uses Spread display a specific acknowledgment.
+Please review the license agreement for more details.
+http://www.spread.org/license/
+
+Other commercial licenses or other licensing arrangements are available.
+Please contact info at spreadconcepts.com.
+
Copied: trunk/release_announcement_accelerated_ring.txt (from rev 582, branches/experimental-4.3/release_announcement_accelerated_ring.txt)
===================================================================
--- trunk/release_announcement_accelerated_ring.txt (rev 0)
+++ trunk/release_announcement_accelerated_ring.txt 2013-12-17 17:17:17 UTC (rev 629)
@@ -0,0 +1,21 @@
+The Spread Toolkit Accelerated Ring Experimental release 4.3 is a research version
+of the Spread toolkit based on the Spread Toolkit 4.3 public release available
+at www.spread.org.
+
+This experimental release features an experimental protocol tailored for
+data center networks that can provide 30%-50% higher throughput and 20-35% lower
+latency in modern local area networks. Of course, the version also supports
+the original Ring protocol.
+
+The new features of this experimental release are:
+
+1) Accelerated Ring protocol to improve both throughput and latency in local area networks.
+
+2) Redesigned message-packing, reducing overhead for small messages.
+
+To use the experimental protocol, the AcceleratedRing parameter must be set to
+"true" in the spread.conf file, and an AcceleratedWindow with a value between 0
+and the value of the PersonalWindow parameter should be specified as a flow
+control parameter in the spread.conf file. The sample.spread.conf file included
+in this experimental release has appropriate settings for these parameters that i
+enable the Accelerated Ring protocol, as well as a description of their functions.
More information about the Spread-cvs
mailing list