[Spread-cvs] commit: r582 - in branches/experimental-4.3: . daemon docs libspread-util/include libspread-util/src

jschultz at spread.org jschultz at spread.org
Thu Jul 18 14:21:26 EDT 2013


Author: jschultz
Date: 2013-07-18 14:21:26 -0400 (Thu, 18 Jul 2013)
New Revision: 582

Added:
   branches/experimental-4.3/release_announcement_4.3.txt
   branches/experimental-4.3/release_announcement_accelerated_ring.txt
Modified:
   branches/experimental-4.3/Readme.txt
   branches/experimental-4.3/daemon/Makefile.in
   branches/experimental-4.3/daemon/config_gram.l
   branches/experimental-4.3/daemon/config_parse.y
   branches/experimental-4.3/daemon/configuration.c
   branches/experimental-4.3/daemon/configuration.h
   branches/experimental-4.3/daemon/flow_control.c
   branches/experimental-4.3/daemon/flow_control.h
   branches/experimental-4.3/daemon/membership.c
   branches/experimental-4.3/daemon/monitor.c
   branches/experimental-4.3/daemon/net_types.h
   branches/experimental-4.3/daemon/network.c
   branches/experimental-4.3/daemon/prot_body.h
   branches/experimental-4.3/daemon/protocol.c
   branches/experimental-4.3/daemon/protocol.h
   branches/experimental-4.3/daemon/spread.c
   branches/experimental-4.3/daemon/spread_params.h
   branches/experimental-4.3/daemon/status.h
   branches/experimental-4.3/docs/sample.spread.conf
   branches/experimental-4.3/libspread-util/include/spu_events.h
   branches/experimental-4.3/libspread-util/include/spu_objects_local.h
   branches/experimental-4.3/libspread-util/src/events.c
Log:
Updating 4_3 release to be experimental version


Modified: branches/experimental-4.3/Readme.txt
===================================================================
--- branches/experimental-4.3/Readme.txt	2013-07-18 17:30:58 UTC (rev 581)
+++ branches/experimental-4.3/Readme.txt	2013-07-18 18:21:26 UTC (rev 582)
@@ -2,7 +2,7 @@
 -----------------------------------------------------------
 
 /=============================================================================\
-| The Spread Group Communication Toolkit                                      |
+| The Spread Group Communication Toolkit - Accelerated Ring Research Version  |
 | Copyright (c) 1993-2013 Spread Concepts LLC                                 |
 | All rights reserved.                                                        |
 |                                                                             |
@@ -23,6 +23,7 @@
 |    John Schultz            jschultz at spreadconcepts.com		      |
 |                                                                             |
 | Major Contributors:                                                         |
+|    Amy Babay               babay at cs.jhu.edu - accelerated ring protocol     |
 |    Ryan Caudy              rcaudy at gmail.com - contribution to process groups|
 |    Claudiu Danilov	     claudiu at acm.org - scalable, wide-area support    |
 |    Cristina Nita-Rotaru    crisn at cs.purdue.edu - GC security                |
@@ -48,9 +49,17 @@
 | WWW    : http://www.spread.org  and  http://www.spreadconcepts.com          |
 | Contact: info at spreadconcepts.com                                            |
 |                                                                             |
-| Version 4.3.0  built 11/June/2013                                           |
+| Version 4.3.0  Accelerated Ring Experimental version built 2/July/2013      |
 \=============================================================================/
 
+July 2, 2013 Ver 4.3.0 Final
+-----------------------------
+Features:
+ - Accelerated Ring Experimental protocol tailored for data center networks.
+   This protocol provides 30%-50% higher throughput and 20-35% lower latency 
+   in modern local area networks. Both the original protocol and the accelerated
+   ring experimental protocol are supported by this version.
+
 June 11, 2013 Ver 4.3.0 Final
 -----------------------------
 Features:

Modified: branches/experimental-4.3/daemon/Makefile.in
===================================================================
--- branches/experimental-4.3/daemon/Makefile.in	2013-07-18 17:30:58 UTC (rev 581)
+++ branches/experimental-4.3/daemon/Makefile.in	2013-07-18 18:21:26 UTC (rev 582)
@@ -83,7 +83,7 @@
 	$(LD) -o $@ $(LDFLAGS) $(SPREADOBJS) $(LIBSPREADUTIL_DIR)/lib/libspread-util.a ../stdutil/lib/libstdutil-threaded-release.a $(LIBS)
 
 spmonitor$(EXEEXT): $(MONITOR_OBJS) $(LIBSPREADUTIL_DIR)/lib/libspread-util.a
-	$(LD) -o $@ $(LDFLAGS) $(MONITOR_OBJS) $(LIBS) $(LIBSPREADUTIL_DIR)/lib/libspread-util.a
+	$(LD) -o $@ $(LDFLAGS) $(MONITOR_OBJS) $(LIBSPREADUTIL_DIR)/lib/libspread-util.a $(LIBS)
 
 sptmonitor$(EXEEXT): $(TMONITOR_OBJS) $(LIBSPREADUTIL_DIR)/lib/libspread-util.a
 	$(LD) $(THLDFLAGS) -o $@ $(TMONITOR_OBJS) $(THLIBS) $(LIBSPREADUTIL_DIR)/lib/libspread-util.a
@@ -91,10 +91,10 @@
 testprog: spsend$(EXEEXT) sprecv$(EXEEXT)
 
 spsend$(EXEEXT): s.o $(LIBSPREADUTIL_DIR)/lib/libspread-util.a
-	$(LD) -o $@ $(LDFLAGS) s.o $(LIBS) $(LIBSPREADUTIL_DIR)/lib/libspread-util.a
+	$(LD) -o $@ $(LDFLAGS) s.o $(LIBSPREADUTIL_DIR)/lib/libspread-util.a $(LIBS)
 
 sprecv$(EXEEXT): r.o $(LIBSPREADUTIL_DIR)/lib/libspread-util.a
-	$(LD) -o $@ $(LDFLAGS) r.o $(LIBS) $(LIBSPREADUTIL_DIR)/lib/libspread-util.a
+	$(LD) -o $@ $(LDFLAGS) r.o $(LIBSPREADUTIL_DIR)/lib/libspread-util.a $(LIBS)
 
 clean:
 	rm -f *.lo *.tlo *.to *.o *.a *.dylib $(TARGETS) spsimple_user

Modified: branches/experimental-4.3/daemon/config_gram.l
===================================================================
--- branches/experimental-4.3/daemon/config_gram.l	2013-07-18 17:30:58 UTC (rev 581)
+++ branches/experimental-4.3/daemon/config_gram.l	2013-07-18 18:21:26 UTC (rev 582)
@@ -100,6 +100,8 @@
 RouteMatrix                     { return ROUTEMATRIX; }
 Window				{ return WINDOW; }
 PersonalWindow			{ return PERSONALWINDOW; }
+AcceleratedRing                 { return ACCELERATEDRING; }
+AcceleratedWindow               { return ACCELERATEDWINDOW; }
 TokenTimeout                    { return TOKENTIMEOUT; }
 HurryTimeout                    { return HURRYTIMEOUT; }
 AliveTimeout                    { return ALIVETIMEOUT; }
@@ -136,7 +138,7 @@
 M                               { yylval.mask = IFTYPE_MONITOR; return IMONITOR; }
 C                               { yylval.mask = IFTYPE_CLIENT; return ICLIENT; }
 D                               { yylval.mask = IFTYPE_DAEMON; return IDAEMON; }
-pDEBUG                           { yylval.number = 1; return PDEBUG; }
+pDEBUG                          { yylval.number = 1; return PDEBUG; }
 INFO                            { yylval.number = 2; return PINFO; }
 WARNING                         { yylval.number = 3; return PWARNING; }
 ERROR                           { yylval.number = 4; return PERROR; }

Modified: branches/experimental-4.3/daemon/config_parse.y
===================================================================
--- branches/experimental-4.3/daemon/config_parse.y	2013-07-18 17:30:58 UTC (rev 581)
+++ branches/experimental-4.3/daemon/config_parse.y	2013-07-18 18:21:26 UTC (rev 582)
@@ -327,7 +327,7 @@
 %token DEBUGINITIALSEQUENCE
 %token DANGEROUSMONITOR SOCKETPORTREUSE RUNTIMEDIR SPUSER SPGROUP ALLOWEDAUTHMETHODS REQUIREDAUTHMETHODS ACCESSCONTROLPOLICY
 %token MAXSESSIONMESSAGES
-%token WINDOW PERSONALWINDOW
+%token WINDOW PERSONALWINDOW ACCELERATEDRING ACCELERATEDWINDOW
 %token TOKENTIMEOUT HURRYTIMEOUT ALIVETIMEOUT JOINTIMEOUT REPTIMEOUT SEGTIMEOUT GATHERTIMEOUT FORMTIMEOUT LOOKUPTIMEOUT
 %token SP_BOOL SP_TRIVAL LINKPROTOCOL PHOP PTCPHOP
 %token IMONITOR ICLIENT IDAEMON
@@ -560,6 +560,14 @@
 			{
 			    Conf_set_personal_window($3.number);
 			}
+		|	ACCELERATEDRING EQUALS SP_BOOL
+			{
+			    Conf_set_accelerated_ring($3.boolean);
+			}
+		|	ACCELERATEDWINDOW EQUALS NUMBER
+			{
+			    Conf_set_accelerated_window($3.number);
+			}
 		|	TOKENTIMEOUT EQUALS NUMBER
 			{
 			    Conf_set_token_timeout($3.number);

Modified: branches/experimental-4.3/daemon/configuration.c
===================================================================
--- branches/experimental-4.3/daemon/configuration.c	2013-07-18 17:30:58 UTC (rev 581)
+++ branches/experimental-4.3/daemon/configuration.c	2013-07-18 18:21:26 UTC (rev 582)
@@ -106,6 +106,9 @@
 static  int     Window = DEFAULT_WINDOW;
 static  int     PersonalWindow = DEFAULT_PERSONAL_WINDOW;
 
+static  bool    AcceleratedRing = FALSE;
+static  int     AcceleratedWindow = 0;
+
 enum 
 {
   TOKEN_TIMEOUT_CONF  = (0x1 << 0),
@@ -432,6 +435,14 @@
             Alarmp( SPLOG_FATAL, CONF_SYS, "Failed to update string with version number!\n");
         ConfStringLen += added_len;
 
+	/* append whether we are running with accelerated token or not */
+
+	if (ConfStringLen >= MAX_CONF_STRING) {
+	  Alarmp( SPLOG_FATAL, CONF_SYS, "Failed to update string with accelerated ring type!\n");
+	}
+
+	ConfStringRep[ConfStringLen++] = '0' + AcceleratedRing;
+	
         /* calculate hash value of configuration. 
          * This daemon will only work with other daemons who have an identical hash value.
          */
@@ -486,6 +497,15 @@
 	}
 
 	Conf_id_to_str( My.id, ip );
+
+	if (PersonalWindow > Window) {
+	  Alarmp(SPLOG_FATAL, CONF_SYS, "Conf_load_conf_file: PersonalWindow (%d) > Window (%d)!\n", PersonalWindow, Window);
+	}
+
+	if (AcceleratedRing && AcceleratedWindow > PersonalWindow) {
+	  Alarmp(SPLOG_FATAL, CONF_SYS, "Conf_load_conf_file: AcceleratedWindow (%d) > PersonalWindow (%d)!\n", AcceleratedWindow, PersonalWindow);
+	}
+
 	Alarm( CONF_SYS, "Conf_load_conf_file: My name: %s, id: %s, port: %hd\n",
 		My.name, ip, My.port );
 
@@ -704,6 +724,45 @@
 	return( -1 );
 }
 
+int32u Conf_previous( configuration *config )
+{
+        segment *seg_ptr;
+        segment *prev_seg_ptr;
+        int index_in_seg;
+        int i;
+
+        seg_ptr = &(config->segments[My.seg_index]);
+        index_in_seg = Conf_id_in_seg(seg_ptr, My.id);
+
+        if( index_in_seg > 0 )
+        {
+                /* I am not first in my segment; previous is previous proc in segment */
+                return seg_ptr->procs[index_in_seg-1]->id;
+        }
+
+        for( i = 0; i < My.seg_index; i++ )
+        {
+                if( config->segments[i].num_procs > 0 )
+                {
+                        /* There is a segment before mine; previous is last in that segment */
+                        prev_seg_ptr = &(config->segments[i]);
+                        return prev_seg_ptr->procs[prev_seg_ptr->num_procs-1]->id;
+                }
+        }
+
+        /* I am first in first segment; previous is last in last segment */
+        for( i = config->num_segments-1; i >= My.seg_index; i-- )
+        {
+                if( config->segments[i].num_procs > 0 )
+                {
+                        prev_seg_ptr = &(config->segments[i]);
+                        return prev_seg_ptr->procs[prev_seg_ptr->num_procs-1]->id;
+                }
+        }
+        Alarm( EXIT, "Conf_previous: No process found\n" );
+        return( -1 );
+}
+
 int32u	Conf_seg_leader( configuration *config, int16 seg_index )
 {
 	if( config->segments[seg_index].num_procs > 0 )
@@ -939,6 +998,30 @@
 	return PersonalWindow;
 }
 
+void Conf_set_accelerated_ring(bool new_state)
+{
+  AcceleratedRing = new_state;
+}
+
+bool Conf_get_accelerated_ring(void)
+{
+  return AcceleratedRing;
+}
+
+void Conf_set_accelerated_window(int pwindow)
+{
+        if (pwindow < 0) {
+	    Alarmp(SPLOG_FATAL, CONF_SYS, "Conf_set_accelerated_window: Attempt to set window to non-positive (%d)!\n", pwindow);
+        }
+        Alarmp(SPLOG_DEBUG, CONF_SYS, "Conf_set_accelerated_window: Set Window to %d\n", pwindow);
+	AcceleratedWindow = pwindow;
+}
+
+int Conf_get_accelerated_window(void)
+{
+  return AcceleratedWindow;
+}
+
 int Conf_memb_timeouts_set(void)
 {
   return TimeoutMask != 0;

Modified: branches/experimental-4.3/daemon/configuration.h
===================================================================
--- branches/experimental-4.3/daemon/configuration.h	2013-07-18 17:30:58 UTC (rev 581)
+++ branches/experimental-4.3/daemon/configuration.h	2013-07-18 18:21:26 UTC (rev 582)
@@ -107,6 +107,7 @@
 int             Conf_num_segments( configuration *config );
 int32u		Conf_leader( configuration *config );
 int32u		Conf_last( configuration *config );
+int32u		Conf_previous( configuration *config );
 int32u		Conf_seg_leader( configuration *config, int16 seg_index );
 int32u		Conf_seg_last( configuration *config, int16 seg_index );
 int             Conf_append_id_to_seg( segment *seg, int32u id);
@@ -144,7 +145,10 @@
 int		Conf_get_window(void);
 void		Conf_set_personal_window(int pwindow);
 int		Conf_get_personal_window(void);
-
+void		Conf_set_accelerated_ring(bool new_state);
+bool		Conf_get_accelerated_ring(void);
+void		Conf_set_accelerated_window(int pwindow);
+int		Conf_get_accelerated_window(void);
 int             Conf_memb_timeouts_set(void);
 int             Conf_all_memb_timeouts_set(void);
 void		Conf_set_token_timeout(int timeout);

Modified: branches/experimental-4.3/daemon/flow_control.c
===================================================================
--- branches/experimental-4.3/daemon/flow_control.c	2013-07-18 17:30:58 UTC (rev 581)
+++ branches/experimental-4.3/daemon/flow_control.c	2013-07-18 18:21:26 UTC (rev 582)
@@ -42,14 +42,20 @@
 
 static	int16	Window;
 static	int16	Personal_window;
+static  int16   Accelerated_window;
+static  bool    Accelerated_ring;
 
 void	FC_init( )
 {
 	Window = Conf_get_window();
 	Personal_window = Conf_get_personal_window();
+	Accelerated_ring = Conf_get_accelerated_ring();
+	Accelerated_window = Conf_get_accelerated_window();
 
 	GlobalStatus.window = Window;
 	GlobalStatus.personal_window = Personal_window;
+	GlobalStatus.accelerated_ring = Accelerated_ring;
+	GlobalStatus.accelerated_window = Accelerated_window;
 }
 
 void	FC_new_configuration( )
@@ -72,14 +78,24 @@
 	return(allowed);
 }
 
+bool    FC_accelerated_ring(void)
+{
+	return Accelerated_ring;
+}
+
+int     FC_accelerated_window(void)
+{
+        return Accelerated_window;
+}
+
 void	FC_handle_message( sys_scatter *scat )
 {
 
-	int16		*cur_fc_buf;
+        int16		(*cur_fc_buf)[2];
 	packet_header	*pack_ptr;
 	proc		dummy_proc;
 	int		my_index;
-	int16		temp_window,temp_personal_window;
+	int16		temp_window,temp_personal_window, temp_accelerated_window;
         configuration   *Cn;
 
         Cn = Conf_ref();
@@ -96,24 +112,29 @@
 			"FC_handle_message: Illegal monitor request\n");
 		return;
 	}
-	cur_fc_buf = (int16 *)scat->elements[1].buf;
+	cur_fc_buf = (int16 (*)[2])scat->elements[1].buf;
 
 	my_index = Conf_proc_by_id( Conf_my().id, &dummy_proc );
 
 	if( Same_endian( pack_ptr->type ) ) {
-		temp_window = cur_fc_buf[Conf_num_procs( Cn )];
-		temp_personal_window = cur_fc_buf[my_index];
+		temp_window = cur_fc_buf[Conf_num_procs( Cn )][0];
+		temp_personal_window = cur_fc_buf[my_index][0];
+		temp_accelerated_window = cur_fc_buf[my_index][1];
 	}else{
-		temp_window = Flip_int16( cur_fc_buf[Conf_num_procs( Cn )] );
-		temp_personal_window = Flip_int16( cur_fc_buf[my_index] );
+		temp_window = Flip_int16( cur_fc_buf[Conf_num_procs( Cn )][0] );
+		temp_personal_window = Flip_int16( cur_fc_buf[my_index][0] );
+		temp_accelerated_window = Flip_int16( cur_fc_buf[my_index][1] );
 	}
 	if( temp_window != -1 ) Window = temp_window;
 	if( temp_personal_window != -1 ) Personal_window = temp_personal_window;
+	if( temp_accelerated_window != -1) Accelerated_window = temp_accelerated_window;
+
 	GlobalStatus.window = Window;
 	GlobalStatus.personal_window = Personal_window;
+	GlobalStatus.accelerated_window = Accelerated_window;
 	Alarm( FLOW_CONTROL, 
-		"FC_handle_message: Got monitor mess, Window %d Personal %d\n", 
-			Window, Personal_window );
+		"FC_handle_message: Got monitor mess, Window %d Personal %d Accelerated %d\n", 
+	        Window, Personal_window, Accelerated_window );
 }
 
 void FC_signal_conf_reload( )

Modified: branches/experimental-4.3/daemon/flow_control.h
===================================================================
--- branches/experimental-4.3/daemon/flow_control.h	2013-07-18 17:30:58 UTC (rev 581)
+++ branches/experimental-4.3/daemon/flow_control.h	2013-07-18 18:21:26 UTC (rev 582)
@@ -41,6 +41,8 @@
 void	FC_init( );
 void	FC_new_configuration( );
 int	FC_allowed( int flow_control, int num_retrans );
+bool    FC_accelerated_ring(void);
+int     FC_accelerated_window(void);
 void	FC_handle_message( sys_scatter *scat );
 void    FC_signal_conf_reload();
 

Modified: branches/experimental-4.3/daemon/membership.c
===================================================================
--- branches/experimental-4.3/daemon/membership.c	2013-07-18 17:30:58 UTC (rev 581)
+++ branches/experimental-4.3/daemon/membership.c	2013-07-18 18:21:26 UTC (rev 582)
@@ -1960,6 +1960,7 @@
                         Alarm( EXIT, "Read_form2: BUG2 no such id %u\n", p.id);
 	}
 	Net_set_membership( Future_membership );
+        Prot_set_prev_proc(&Future_membership);
 	FC_new_configuration( );
 
 	/* get my ring info */
@@ -2014,7 +2015,7 @@
 	}
 
         /* The token circulates in conf order, which also defines the order
-         * by which we choose "leaders."  So, if noone else has set the id
+         * by which we choose "leaders."  So, if no one else has set the id
          * for my ring, I get to, and I'll be leader. */
         if( !my_rg_info->trans_time )
         {
@@ -2059,6 +2060,7 @@
 	Token_alive = 1;
 	E_queue( Memb_token_loss, 0, NULL, Token_timeout );
     	
+        Received_token_rounds = 0; /* ### Used for priority switching*/
 	Last_token->type = 0;
 	Last_token->seq  = 0;
 	Last_token->aru  = 0;

Modified: branches/experimental-4.3/daemon/monitor.c
===================================================================
--- branches/experimental-4.3/daemon/monitor.c	2013-07-18 17:30:58 UTC (rev 581)
+++ branches/experimental-4.3/daemon/monitor.c	2013-07-18 18:21:26 UTC (rev 582)
@@ -81,8 +81,8 @@
 static	int16		Partition[MAX_PROCS_RING];
 static	int16		Work_partition[MAX_PROCS_RING];
 
-static	int16		Fc_buf[MAX_PROCS_RING];
-static	int16		Work_fc_buf[MAX_PROCS_RING];
+static	int16		Fc_buf[MAX_PROCS_RING][2];
+static	int16		Work_fc_buf[MAX_PROCS_RING][2];
 
 static	int		Status_vector[MAX_PROCS_RING];
 
@@ -109,7 +109,7 @@
 
 static	void	Define_flow_control();
 static	void	Send_flow_control();
-static	void	Print_flow_control( int16 fc_buf[MAX_PROCS_RING] );
+static	void	Print_flow_control( int16 fc_buf[MAX_PROCS_RING][2] );
 
 static  void	Activate_status();
 static	void	Send_status_query();
@@ -385,8 +385,10 @@
 			break;
 
 		case '6':
-			for( i=0; i < Conf_num_procs( &Cn )+1; i++ )
-				Fc_buf[i] = Work_fc_buf[i];
+		        for( i=0; i < Conf_num_procs( &Cn )+1; i++ ) {
+				Fc_buf[i][0] = Work_fc_buf[i][0];
+				Fc_buf[i][1] = Work_fc_buf[i][1];
+			}
 			Send_flow_control();
 
 			printf("\n");
@@ -588,7 +590,7 @@
 
 #endif	/* _REENTRANT */
 
-static	void	Print_flow_control( int16 fc_buf[MAX_PROCS_RING] )
+static	void	Print_flow_control( int16 fc_buf[MAX_PROCS_RING][2] )
 {
 	int32	proc_id;
 	proc	p;
@@ -600,7 +602,7 @@
 	printf("Flow Control Parameters:\n");
 	printf("------------------------\n");
 	printf("\n");
-	printf("Window size:  %d\n",fc_buf[ Conf_num_procs( &Cn )]);
+	printf("Window size:  %d\n",fc_buf[ Conf_num_procs( &Cn )][0]);
 	printf("\n");
 	for( i=0; i < Cn.num_segments ; i++ )
 	{
@@ -608,7 +610,11 @@
 	    {
 		proc_id = Cn.segments[i].procs[j]->id;
 		proc_index = Conf_proc_by_id( proc_id, &p );
-		printf("\t%s\t%d\n", p.name, fc_buf[proc_index] );
+		printf("\t%s personal window\t%d\n", p.name, fc_buf[proc_index][0] );
+
+		if (Conf_get_accelerated_ring()) {
+		  printf("\t%s accelerated window\t%d\n", p.name, fc_buf[proc_index][1] );
+		}
 	    }
 	    printf("\n");
 	}
@@ -642,11 +648,11 @@
 		exit(0);
 	    }
 	    ret = sscanf(str, "%d", &temp );
-	    Work_fc_buf[Conf_num_procs( &Cn )] = temp;
+	    Work_fc_buf[Conf_num_procs( &Cn )][0] = temp;
 	    if( ret > 0 ) legal = 1;
 	    else if( ret == -1 ){
 		legal = 1;
-		Work_fc_buf[Conf_num_procs( &Cn )] = -1;
+		Work_fc_buf[Conf_num_procs( &Cn )][0] = -1;
 	    }else printf("Please enter a number\n");
 	}
 	printf("\n");
@@ -658,7 +664,7 @@
 		proc_index = Conf_proc_by_id( proc_id, &p );
 		for( legal=0; !legal; )
 		{
-		    printf("\t%s\t", p.name);
+		    printf("\t%s personal window\t", p.name);
 
 		    if( fgets( str, 70, stdin ) == NULL )
 		    {
@@ -666,13 +672,34 @@
 			exit(0);
 		    }
 		    ret = sscanf(str, "%d", &temp);
-		    Work_fc_buf[proc_index] = temp;
+		    Work_fc_buf[proc_index][0] = temp;
 		    if( ret > 0 ) legal = 1;
 		    else if( ret == -1 ){
 			legal = 1;
-			Work_fc_buf[proc_index] = -1;
+			Work_fc_buf[proc_index][0] = -1;
 		    }else printf("Please enter a number\n");
 		}
+
+		if (Conf_get_accelerated_ring()) {
+
+		  for( legal=0; !legal; )
+		    {
+		      printf("\t%s accelerated window\t", p.name);
+
+		      if( fgets( str, 70, stdin ) == NULL )
+			{
+			  printf("Bye.\n");
+			  exit(0);
+			}
+		      ret = sscanf(str, "%d", &temp);
+		      Work_fc_buf[proc_index][1] = temp;
+		      if( ret > 0 ) legal = 1;
+		      else if( ret == -1 ){
+			legal = 1;
+			Work_fc_buf[proc_index][1] = -1;
+		      }else printf("Please enter a number\n");
+		    }
+		}
 	    }
 	    printf("\n");
 	}
@@ -957,6 +984,8 @@
 		GlobalStatus.num_segments	= Flip_int16( GlobalStatus.num_segments );
 		GlobalStatus.window		= Flip_int16( GlobalStatus.window );
 		GlobalStatus.personal_window	= Flip_int16( GlobalStatus.personal_window );
+		GlobalStatus.accelerated_ring	= Flip_int16( GlobalStatus.accelerated_ring );
+		GlobalStatus.accelerated_window	= Flip_int16( GlobalStatus.accelerated_window );
 		GlobalStatus.num_sessions	= Flip_int16( GlobalStatus.num_sessions );
 		GlobalStatus.num_groups	        = Flip_int16( GlobalStatus.num_groups );
 		GlobalStatus.major_version		= Flip_int16( GlobalStatus.major_version );
@@ -976,19 +1005,22 @@
                GlobalStatus.major_version,GlobalStatus.minor_version,GlobalStatus.patch_version, 
                GlobalStatus.state, GlobalStatus.gstate, GlobalStatus.sec);
 	if( ret2 < 0 )
-	     printf("Membership  :  %d  procs in %d segments, leader is %d\n",
+	     printf("Membership  :  %d  procs in %d segments, leader is %d, ",
 			GlobalStatus.num_procs,GlobalStatus.num_segments,GlobalStatus.leader_id);
-	else printf("Membership  :  %d  procs in %d segments, leader is %s\n",
+	else printf("Membership  :  %d  procs in %d segments, leader is %s, ",
 		GlobalStatus.num_procs,GlobalStatus.num_segments,leader_p.name);
+        if (GlobalStatus.accelerated_ring == 0)
+            printf("regular protocol\n");
+        else
+            printf("accelerated protocol\n");
 
 	printf("rounds   : %7d\ttok_hurry : %7d\tmemb change: %7d\n",GlobalStatus.token_rounds,GlobalStatus.token_hurry,GlobalStatus.membership_changes);
 	printf("sent pack: %7d\trecv pack : %7d\tretrans    : %7d\n",GlobalStatus.packet_sent,GlobalStatus.packet_recv,GlobalStatus.retrans);
 	printf("u retrans: %7d\ts retrans : %7d\tb retrans  : %7d\n",GlobalStatus.u_retrans,GlobalStatus.s_retrans,GlobalStatus.b_retrans);
 	printf("My_aru   : %7d\tAru       : %7d\tHighest seq: %7d\n",GlobalStatus.my_aru,GlobalStatus.aru, GlobalStatus.highest_seq);
 	printf("Sessions : %7d\tGroups    : %7d\tWindow     : %7d\n",GlobalStatus.num_sessions,GlobalStatus.num_groups,GlobalStatus.window);
-	printf("Deliver M: %7d\tDeliver Pk: %7d\tPers Window: %7d\n",GlobalStatus.message_delivered,GlobalStatus.packet_delivered,GlobalStatus.personal_window);
-	printf("Delta Mes: %7d\tDelta Pack: %7d\tDelta sec  : %7d\n",
-			GlobalStatus.message_delivered - last_mes, GlobalStatus.aru - last_aru, GlobalStatus.sec - last_sec );
+	printf("Deliver M: %7d\tDeliver Pk: %7d\tP/A Window : %7d/%d\n",GlobalStatus.message_delivered,GlobalStatus.packet_delivered,GlobalStatus.personal_window,GlobalStatus.accelerated_window);
+	printf("Delta Mes: %7d\tDelta Pk  : %7d\tDelta sec  : %7d\n",GlobalStatus.message_delivered - last_mes,GlobalStatus.aru - last_aru,GlobalStatus.sec - last_sec);
 	printf("==================================\n");
 
 	printf("\n");
@@ -1034,8 +1066,10 @@
 	{
             Partition[i] = 0;
             Work_partition[i] = 0;
-            Fc_buf[i] = 0;
-            Work_fc_buf[i] = 0;
+            Fc_buf[i][0] = 0;
+            Fc_buf[i][1] = 0;
+            Work_fc_buf[i][0] = 0;
+            Work_fc_buf[i][1] = 0;
             Status_vector[i] = 0;
 	}
         Mutex_lock( &Partition_mutex );

Modified: branches/experimental-4.3/daemon/net_types.h
===================================================================
--- branches/experimental-4.3/daemon/net_types.h	2013-07-18 17:30:58 UTC (rev 581)
+++ branches/experimental-4.3/daemon/net_types.h	2013-07-18 18:21:26 UTC (rev 582)
@@ -107,16 +107,22 @@
 
 #define MONITOR_HASH    1100    /* Conf_hash code for packets from spmonitor program */
 
+typedef	struct	dummy_fragment_header {
+	int16		fragment_index;
+	int16		fragment_len;
+} fragment_header;
+
 typedef	struct	dummy_packet_header {
 	int32		type;
 	int32		transmiter_id;
 	int32		proc_id;
 	membership_id	memb_id;
 	int32		seq;
-	int32		fifo_seq;
-	int16		packet_index;
+	int32		token_round; /* ### changed from fifo_seq */
+        int32           conf_hash;
 	int16		data_len;
-        int32           conf_hash;
+	int16           padding;
+        fragment_header first_frag_header;
 } packet_header;
 
 typedef	char       packet_body[MAX_PACKET_SIZE-sizeof(packet_header)];

Modified: branches/experimental-4.3/daemon/network.c
===================================================================
--- branches/experimental-4.3/daemon/network.c	2013-07-18 17:30:58 UTC (rev 581)
+++ branches/experimental-4.3/daemon/network.c	2013-07-18 18:21:26 UTC (rev 582)
@@ -62,12 +62,6 @@
 static  int32		Send_address[MAX_SEGMENTS];
 static	int16		Send_ports[MAX_SEGMENTS];
 
-/* ### Pack: 3 lines */
-/* Global in function so both Net_queue_bcast and Net_flush_bcast can access them */
-static  sys_scatter     Queue_scat;
-static  int             Queued_bytes = 0;
-static  const char      align_padding[4] = "padd";
-
 /* address for token sending - which is always needed */
 static	int32		Token_address;
 static	int16		Token_port;
@@ -276,114 +270,6 @@
 	return( ret );
 }
 
-/* ### Pack: 2 routines */
-int	Net_queue_bcast( sys_scatter *scat )
-{
-	packet_header	*pack_ptr;
-        int             new_bytes;
-	int 		i, j;
-        int             ret;
-        int             align_bytes, align_num_scatter;
-
-        /* This line is redundent because of static initialization to 0 */
-        if ( Queued_bytes == 0 ) Queue_scat.num_elements = 0;
-
-        ret = 0;
-        new_bytes = 0;
-
-        for ( i=0; i < scat->num_elements; i++) {
-                new_bytes += scat->elements[i].len;
-        }
-        /* Fix alignment of packed messages  so they will each begin on a 4 byte alignment 
-         * This is needed for Sparc, might need enhancement if other archs 
-         * have more extensive alignement rules
-         */
-        align_bytes = 0;
-        align_num_scatter = 0;
-        switch(Queued_bytes % 4) {
-        case 1:
-                align_bytes++;
-        case 2:
-                align_bytes++;
-        case 3:
-                align_bytes++;
-                align_num_scatter = 1;
-        case 0:
-                /* nothing since already aligned */
-                break; 
-        }
-
-        if ( ( (Queued_bytes + new_bytes + align_bytes)  >  MAX_PACKET_SIZE ) ||
-             ( (Queue_scat.num_elements + scat->num_elements + align_num_scatter) > ARCH_SCATTER_SIZE ) )
-        {
-                ret = Net_flush_bcast();
-                align_bytes = 0;
-                align_num_scatter = 0;
-        }
-
-        if ( Queued_bytes == 0 ) {
-                /* routing on channels if needed according to membership */
-                pack_ptr = (packet_header *)scat->elements[0].buf;
-                pack_ptr->type  = Set_routed( pack_ptr->type );
-                pack_ptr->type  = Set_endian( pack_ptr->type );
-                pack_ptr->conf_hash = Cn->hash_code;
-                pack_ptr->transmiter_id = My.id;
-        }
-
-        if ( align_bytes > 0 )
-        {
-                Queue_scat.elements[Queue_scat.num_elements].len = align_bytes;
-                Queue_scat.elements[Queue_scat.num_elements].buf = (char *)align_padding;
-
-                Queued_bytes += align_bytes;
-                Queue_scat.num_elements += 1;
-                Alarm(NETWORK, "Net_queue_bcast: Inserted padding of %d bytes to message of size %d\n", align_bytes, new_bytes );
-        }
-
-        /* Add new packet to Queue_scat to be sent as packed packet */
-        for ( i=0, j=Queue_scat.num_elements; i < scat->num_elements; i++, j++) {
-                Queue_scat.elements[j].len = scat->elements[i].len;
-                Queue_scat.elements[j].buf = scat->elements[i].buf;
-        }
-        Queued_bytes += new_bytes;
-        Queue_scat.num_elements += scat->num_elements ;
-        
-	return( ret );
-}
-
-int     Net_flush_bcast(void)
-{
-        packet_header   *pack_ptr;
-        int             i;
-        int             ret;
-
-        if (Queued_bytes == 0 ) return( 0 );
-        
-        Alarm(NETWORK, "Net_flush_bcast: Flushing with Queued_bytes = %d; num_elements in scat = %d; size of scat0,1 = %d %d\n", Queued_bytes, Queue_scat.num_elements, Queue_scat.elements[0].len, Queue_scat.elements[1].len);
-        
-        ret = 0;
-
-        for ( i=0; i< Num_send_needed; i++ )
-        {
-                ret = DL_send( Send_channel, Send_address[i], Send_ports[i], &Queue_scat );
-        }
-        pack_ptr = (packet_header *)Queue_scat.elements[0].buf; 
-	pack_ptr->type = Clear_routed( pack_ptr->type );
-
-        /* broadcasting if needed according to configuration */
-        if( Bcast_needed )
-        {	
-                ret = DL_send( Send_channel, Bcast_address, Bcast_port, &Queue_scat );
-        }
-
-        if( !Bcast_needed && (Num_send_needed == 0) )
-            ret = 1; /* No actual send is needed, but 'packet' can be considered 'sent' */
-
-        Queue_scat.num_elements = 0;
-        Queued_bytes = 0;
-        return( ret );
-}
-
 int	Net_scast( int16 seg_index, sys_scatter *scat )
 {
 	packet_header	*pack_ptr;
@@ -446,9 +332,7 @@
 static	scatter		save;
 	packet_header	*pack_ptr;
 	int		bytes_left;
-	int		received_bytes, body_offset;
-	int		processed_bytes;
-	int		pack_same_endian;
+	int		received_bytes;
 	int		i;
         bool            ch_found;
 
@@ -476,7 +360,7 @@
 		return( -1 );
 	}
 
-	/* Fliping packet header to my form if needed */
+	/* Flipping packet header to my form if needed */
 	if( !Same_endian( pack_ptr->type ) ) Flip_pack( pack_ptr );
 
         /* First reject any message whose daemon has a different configuration */
@@ -529,6 +413,13 @@
 	if( pack_ptr->transmiter_id == My.id )
 		return( 0 );
 
+        /* packet validity check */
+	if( received_bytes != sizeof( packet_header ) + pack_ptr->data_len) {
+                Alarm( PRINT, "Net_recv: Received invalid packet - received bytes (%d) != expected length (%d)\n", 
+                       received_bytes, sizeof( packet_header ) + pack_ptr->data_len );
+                return( -1 );
+        }
+
 	if( Bcast_needed && Is_routed( pack_ptr->type ) )
 	{
 		if( !Segment_leader ) Alarm( NETWORK, 
@@ -575,61 +466,6 @@
 	 */
 	pack_ptr->type = Clear_routed ( pack_ptr->type );
 
-	/* 
-	 * Check validity of packet size and flip every packet header 
-	 * other than first header (which is already flipped).
-	 * If packet size is not valid, return -1, otherwise 
-	 * return size of received packet.
-	 */
-	processed_bytes = sizeof( packet_header ) + pack_ptr->data_len;
-	pack_same_endian = Same_endian( pack_ptr->type );
-        /* ignore any alignment padding */
-        if ( processed_bytes < received_bytes ) {
-                switch(processed_bytes % 4)
-                {
-                case 1:
-                        processed_bytes++;
-                case 2:
-                        processed_bytes++;
-                case 3:
-                        processed_bytes++;
-                case 0:
-                        /* already aligned */
-                        break;
-                }
-        }
-	while( processed_bytes < received_bytes )
-	{
-                body_offset = processed_bytes - sizeof(packet_header); 
-                pack_ptr = (packet_header *)&scat->elements[1].buf[body_offset];
-
-                /* flip contigues packet header */
-		if( !pack_same_endian  ) {
-                        Flip_pack( pack_ptr );
-		}
-
-		processed_bytes += sizeof( packet_header ) + pack_ptr->data_len;
-                /* ignore any alignment padding */
-                if ( processed_bytes < received_bytes ) {
-                        switch(processed_bytes % 4)
-                        {
-                        case 1:
-                                processed_bytes++;
-                        case 2:
-                                processed_bytes++;
-                        case 3:
-                                processed_bytes++;
-                        case 0:
-                                /* already aligned */
-                                break;
-                        }
-                }
-	}                
-        Alarm( NETWORK, "Net_recv: Received Packet - packet length(%d), packed message length(%d)\n", received_bytes, processed_bytes);
-	if( processed_bytes != received_bytes ) {
-                Alarm( PRINT, "Net_recv: Received Packet - packet length(%d) != packed message length(%d)\n", received_bytes, processed_bytes);
-                return( -1 );
-        }
 	return( received_bytes );
 }
 
@@ -785,6 +621,9 @@
 	return( Partition[Partition_my_index] == Partition[proc_index] );
 }
 
+/* The first fragment header is not flipped here, even though it is
+   part of the packet header. It will be flipped with the other
+   fragment headers in Prot_handle_bcast */
 void	Flip_pack( packet_header *pack_ptr )
 {
 	pack_ptr->type		  = Flip_int32( pack_ptr->type );
@@ -793,10 +632,9 @@
 	pack_ptr->memb_id.proc_id = Flip_int32( pack_ptr->memb_id.proc_id );
 	pack_ptr->memb_id.time	  = Flip_int32( pack_ptr->memb_id.time );
 	pack_ptr->seq		  = Flip_int32( pack_ptr->seq );
-	pack_ptr->fifo_seq	  = Flip_int32( pack_ptr->fifo_seq );
-	pack_ptr->packet_index	  = Flip_int16( pack_ptr->packet_index );
-	pack_ptr->data_len	  = Flip_int16( pack_ptr->data_len );
+	pack_ptr->token_round	  = Flip_int32( pack_ptr->token_round ); /* fifo_seq changed to token_round */
 	pack_ptr->conf_hash	  = Flip_int32( pack_ptr->conf_hash );
+	pack_ptr->data_len	  = Flip_int16( pack_ptr->data_len );
 }
 
 void	Flip_token( token_header *token_ptr )

Modified: branches/experimental-4.3/daemon/prot_body.h
===================================================================
--- branches/experimental-4.3/daemon/prot_body.h	2013-07-18 17:30:58 UTC (rev 581)
+++ branches/experimental-4.3/daemon/prot_body.h	2013-07-18 18:21:26 UTC (rev 582)
@@ -75,6 +75,7 @@
 ext	int32		Last_delivered;
 ext	int32		Last_seq;
 ext	int32		Token_rounds;
+ext     int32           Received_token_rounds; /* ### Added to determine when to switch priorities*/
 ext	token_header	*Last_token;
 
 ext	int		Transitional;

Modified: branches/experimental-4.3/daemon/protocol.c
===================================================================
--- branches/experimental-4.3/daemon/protocol.c	2013-07-18 17:30:58 UTC (rev 581)
+++ branches/experimental-4.3/daemon/protocol.c	2013-07-18 18:21:26 UTC (rev 582)
@@ -51,10 +51,30 @@
 #include "spu_alarm.h"
 #include "sess_types.h" /* for message_header */
 
+typedef struct dummy_queue_link{
+        sys_scatter *packet;
+        struct dummy_queue_link *next;
+} queue_link;
+
+typedef struct dummy_packet_queue{
+        int num_packets;
+        queue_link *first;
+        queue_link *last;
+} packet_queue;
+
+typedef struct dummy_message_queue{
+        int num_messages;
+        message_link *first;
+        message_link *last;
+} message_queue;
+
 /* Prot variables */
 static	proc		My;
 static	int		My_index;
 
+static  int32           Prev_proc_id; /* predecessor in ring (i.e. process that sends me the token) */
+static  bool            Token_has_priority; /* true when token channels have higher priority than bcast channels */
+
 static	int32		Set_aru;
 static	int		Token_counter;
 
@@ -64,7 +84,7 @@
 /* Used ONLY in Prot_handle_token and grurot, inited in Prot_init */
 static	sys_scatter	New_token;
 static	token_header	*Token;
-static	sys_scatter	Send_pack;
+static  packet_queue    Send_pack_queue;
 
 static	packet_header	*Hurry_head;
 static	sys_scatter	Hurry_pack;
@@ -73,20 +93,20 @@
 /* Used to indicate a need to reload configuration at end of current membership */
 static  bool            Prot_Need_Conf_Reload  = FALSE;
 
-/* ### Pack: 1 line */
-static	packet_info	Buffered_packets[ARCH_SCATTER_SIZE];
-
 static	down_queue	Protocol_down_queue[2]; /* only used in spread3 */
 
 static	void	Prot_handle_bcast();
 static	void	Prot_handle_token();
 static	int	Answer_retrans( int *ret_new_ptr, int32 *proc_id, int16 *seg_index );
 static	int	Send_new_packets( int num_allowed );
+static  int     Prot_queue_bcast( sys_scatter *send_pack_ptr, packet_queue *pack_queue );
+static  int     Prot_flush_bcast( packet_queue *pack_queue );
 static	int	Is_token_hold();
 static	int	To_hold_token();
 static	void	Handle_hurry( packet_header *pack_ptr );
 static	void	Deliver_packet( int pack_entry, int to_copy );
 static	void	Flip_token_body( char *buf, token_header *token_ptr );
+static  void    Flip_frag( fragment_header *frag_ptr );
 static	void	Deliver_reliable_packets( int32 start_seq, int num_packets );
 static	void	Deliver_agreed_packets();
 
@@ -103,11 +123,14 @@
 	Mem_init_object( TOKEN_HEAD_OBJ, "token_head", sizeof( token_header ), 10, 0 );
 	Mem_init_object( TOKEN_BODY_OBJ, "token_body", sizeof( token_body ), 10, 0 );
 	Mem_init_object( SCATTER, "scatter", sizeof( scatter ), 200+MAX_PROCS_RING, 0 );
+	Mem_init_object( SYS_SCATTER, "sys_scatter", sizeof( sys_scatter ), 200, 0 );
+	Mem_init_object( QUEUE_LINK, "queue_link", sizeof( queue_link ), 200, 0 );
 
 	My = Conf_my();
 	My_index = Conf_proc_by_id( My.id, &My );
 	GlobalStatus.my_id = My.id;
 	GlobalStatus.packet_delivered = 0;
+        Prev_proc_id = 0;
 
 	for( i=0; i < MAX_PROCS_RING+1; i++ )
 		Up_queue[i].exist = 0;
@@ -133,6 +156,10 @@
             Last_delivered	 = 0;
         }
 
+        Send_pack_queue.num_packets = 0;
+        Send_pack_queue.first = NULL;
+        Send_pack_queue.last = NULL;
+
 	New_pack.num_elements = 2;
 	New_pack.elements[0].len = sizeof(packet_header);
 	New_pack.elements[0].buf = (char *) new(PACK_HEAD_OBJ);
@@ -145,9 +172,6 @@
 	New_token.elements[1].len = sizeof(token_body);
 	New_token.elements[1].buf = (char *) new(TOKEN_BODY_OBJ);
 
-	Send_pack.num_elements = 2;
-	Send_pack.elements[0].len = sizeof(packet_header);
-
 	Token = (token_header *)New_token.elements[0].buf;
 	Last_token = new(TOKEN_HEAD_OBJ);
 	Last_token->type = 0; 
@@ -176,6 +200,7 @@
             E_attach_fd( *token_channels, READ_FD, Prot_handle_token, 0, NULL, MEDIUM_PRIORITY );
             token_channels++;
         }
+        Token_has_priority = FALSE;
 
 	FC_init( );
 	Memb_init();
@@ -243,14 +268,18 @@
 static  void	Prot_handle_bcast(channel fd, int dummy, void *dummy_p)
 {
 	packet_header	*pack_ptr;
+        packet_body     *pack_body_ptr;
+        fragment_header *frag_ptr;
 	int		pack_entry;
 	proc		p;
 	int		received_bytes;
-        int             total_bytes_processed;
-        int             num_buffered_packets;
-	int		i;
-	int32		j;
+        int             processed_bytes;
+        int             padding_bytes;
+	int		i, ret;
 	/* int		r1,r2; */
+        int             num_bcast, num_token;
+        channel         *bcast_channels;
+        channel         *token_channels;
 
 	received_bytes = Net_recv( fd, &New_pack );
 	/* My own packet or from another monitor component */
@@ -260,15 +289,6 @@
 
 	pack_ptr = (packet_header *)New_pack.elements[0].buf;
 
-	/* ### Pack, this has to move down to network.c
-	 * if( pack_ptr->data_len +sizeof(packet_header) != received_bytes )
-	 * {
-	 *	Alarm( PRINT, "Prot_handle_bcast: received %d, should be %d\n",
-	 *	received_bytes, pack_ptr->data_len+sizeof(packet_header) );
-	 * 	return; 
-	 * }
-	 */
-
 	if( Is_status( pack_ptr->type ) )
 	{
 		Stat_handle_message( &New_pack );
@@ -286,7 +306,7 @@
                 Prot_handle_conf_reload( &New_pack );
                 return;
         }
-	/* delete random 
+	/* delete random  
 	r1 = ((-My.id)%17)+3;
 	r2 = get_rand() % (r1+3 );
 	if ( r2 == 0 ) return; */
@@ -321,98 +341,126 @@
                 }
         }
 
-	/* ### Pack: next 70 lines (almost till the end of the routine) have changed */
-        Buffered_packets[0].head = pack_ptr;
-        Buffered_packets[0].body = (packet_body *)New_pack.elements[1].buf;
-	received_bytes -= sizeof(packet_header);
-        total_bytes_processed = pack_ptr->data_len;
-        /* ignore any alignment padding */
-        switch(total_bytes_processed % 4)
-        {
-        case 1:
-                total_bytes_processed++;
-        case 2:
-                total_bytes_processed++;
-        case 3:
-                total_bytes_processed++;
-        case 0:
-                /* already aligned */
-                break;
-        }
-        for( i = 1; received_bytes > total_bytes_processed; i++ )
-        {                
-        	/* copy into each of the elements after the first element*/
-                Buffered_packets[i].head = (packet_header *)new(PACK_HEAD_OBJ);
-                Buffered_packets[i].body = (packet_body *)new(PACKET_BODY);
-                if (Buffered_packets[i].head == NULL) 
-                        Alarm(EXIT, "Prot_handle_bcast: Memory allocation failed for PACK_HEAD_OBJ\n");
-                if (Buffered_packets[i].body == NULL) 
-                        Alarm(EXIT, "Prot_handle_bcast: Memory allocation failed for PACKET_BODY\n");
-                        
-                pack_ptr = (packet_header *)&New_pack.elements[1].buf[total_bytes_processed];
-                memcpy( Buffered_packets[i].head, pack_ptr, sizeof( packet_header ) );
-                total_bytes_processed += sizeof(packet_header);
-                memcpy( Buffered_packets[i].body, &New_pack.elements[1].buf[total_bytes_processed], pack_ptr->data_len);
-                total_bytes_processed += pack_ptr->data_len;
-                /* ignore any alignment padding */
-                switch(total_bytes_processed % 4)
-                {
-                case 1:
-                        total_bytes_processed++;
-                case 2:
-                        total_bytes_processed++;
-                case 3:
-                        total_bytes_processed++;
-                case 0:
-                        /* already aligned */
-                        break;
-                }
-        }
-        num_buffered_packets = i;
-
-        for( i = 0; i < num_buffered_packets; i++)
-        {
-                pack_ptr = Buffered_packets[i].head;
-                 
                 /* do we have this packet */
                 if( pack_ptr->seq <= Aru )
                 {
                         Alarm( PROTOCOL, "Prot_handle_bcast: delayed packet %d already delivered (Aru %d)\n",
                                pack_ptr->seq, Aru );
-                        dispose(Buffered_packets[i].head);
-                        dispose(Buffered_packets[i].body);
-                        continue;
+                return;
                 }
                 pack_entry = pack_ptr->seq & PACKET_MASK;
                 if( Packets[pack_entry].exist ) 
                 {
                         Alarm( PROTOCOL, "Prot_handle_bcast: packet %d already exist\n",
                                pack_ptr->seq );
-                        dispose(Buffered_packets[i].head);
-                        dispose(Buffered_packets[i].body);
-                        continue;
+                return;
                 }
 
                 Packets[pack_entry].proc_index = Conf_proc_by_id( pack_ptr->proc_id, &p );
                 if( Packets[pack_entry].proc_index < 0 )
                 {
                         Alarm( PROTOCOL, "Prot_handle_bcast: unknown proc %d\n", pack_ptr->proc_id );
-                        dispose(Buffered_packets[i].head);
-                        dispose(Buffered_packets[i].body);
-                        continue;
+                return;
                 }
+
+        pack_body_ptr = (packet_body *)New_pack.elements[1].buf;
+        frag_ptr = &(pack_ptr->first_frag_header);
+        if( !Same_endian(pack_ptr->type) ) 
+        {
+                Flip_frag(frag_ptr);
+                processed_bytes = frag_ptr->fragment_len;
+                while( processed_bytes < pack_ptr->data_len )
+                {
+                        padding_bytes = 0; 
+                        switch (processed_bytes % 4)
+                        {
+                                case 1:
+                                        padding_bytes++;
+                                case 2:
+                                        padding_bytes++;
+                                case 3:
+                                        padding_bytes++;
+                                case 0:
+                                        /* already aligned */
+                                        break;
+                        }
+                        processed_bytes += padding_bytes;
+                        
+                        /* Sanity check for packet validity */
+                        if( processed_bytes + sizeof(fragment_header) > pack_ptr->data_len )
+                        {
+                                Alarm( PRINT, "Prot_handle_bcast: invalid packet with seq %d from %d, fragments exceed data_len %d %d\n", 
+                                        pack_ptr->transmiter_id, pack_ptr->seq, processed_bytes, pack_ptr->data_len );
+                                break;
+                        }
+                        frag_ptr = (fragment_header *) &pack_body_ptr[processed_bytes];
+                        Flip_frag(frag_ptr);
+                        processed_bytes += sizeof(fragment_header) + frag_ptr->fragment_len;
+                }
+                if( processed_bytes != pack_ptr->data_len )
+                {
+                        Alarm( PRINT, "Prot_handle_bcast: invalid packet with seq %d from %d, processed bytes not equal data_len %d %d\n", 
+                                pack_ptr->transmiter_id, pack_ptr->seq, processed_bytes, pack_ptr->data_len );
+                        /*  
+                         * This is a malformed packet, but we decide to keep it instead of throwing it away. 
+                         * Note that a packet with the same endianess will not even get here 
+                         */
+                }
+        }
+
                 /* insert new packet */
                 Packets[pack_entry].head  = pack_ptr;
-                Packets[pack_entry].body  = Buffered_packets[i].body;
+        Packets[pack_entry].body  = (packet_body *)New_pack.elements[1].buf;
                 Packets[pack_entry].exist = 1;
 
+        /*  If this packet was from my predecessor in the ring
+         *  (pack_ptr->transmiter_id == Prev_proc_id), and this packet is
+         *  marked with a token round greater than the last round in which I
+         *  received a token (pack_ptr->token_round > Received_token_rounds),
+         *  then I can conclude that I've processed all bcast packets from the
+         *  previous round, so it is safe to process the next token.
+         *  My predecessor has already sent me the token for the round
+         *  marked on this packet, but I haven't processed it yet, so I should
+         *  try to process that token before handling more bcasts */
+        if( !Token_has_priority && 
+            pack_ptr->token_round > Received_token_rounds &&
+            pack_ptr->transmiter_id == Prev_proc_id )
+        {
+                bcast_channels = Net_bcast_channel();
+                token_channels = Net_token_channel();
+                Net_num_channels( &num_bcast, &num_token);
+                for ( i = 0; i < num_bcast; i++ ) {
+                    ret = E_detach_fd_priority( *bcast_channels, READ_FD, HIGH_PRIORITY );
+                    if( ret < 0 ) {
+                        Alarm( EXIT, "Prot_handle_bcast: bcast_channel being detached was not found\n");
+                    }
+                    ret = E_attach_fd( *bcast_channels, READ_FD, Prot_handle_bcast, 0, NULL, MEDIUM_PRIORITY );
+                    if( ret < 0 ) {
+                        Alarm( EXIT, "Prot_handle_bcast: bcast_channel could not be attached\n");
+                    }
+                    bcast_channels++;
+                }
+                for ( i = 0; i < num_token; i++ ) {
+                    ret = E_detach_fd_priority( *token_channels, READ_FD, MEDIUM_PRIORITY );
+                    if( ret < 0 ) {
+                        Alarm( EXIT, "Prot_handle_bcast: token_channel being detached was not found\n");
+                    }
+                    E_attach_fd( *token_channels, READ_FD, Prot_handle_token, 0, NULL, HIGH_PRIORITY );
+                    if( ret < 0 ) {
+                        Alarm( EXIT, "Prot_handle_bcast: token_channel could not be attached\n");
+                    }
+                    token_channels++;
+                }
+                Token_has_priority = TRUE;
+        }
+
                 /* update variables */
                 if( Highest_seq < pack_ptr->seq ) Highest_seq = pack_ptr->seq;
                 if( pack_ptr->seq == My_aru+1 )
                 {
-                        for( j=pack_ptr->seq; j <= Highest_seq; j++ )
+                for( i=pack_ptr->seq; i <= Highest_seq; i++ )
                         {
-                                if( ! Packets[j & PACKET_MASK].exist ) break;
+                        if( ! Packets[i & PACKET_MASK].exist ) break;
                                 My_aru++;
                         }
                         Deliver_agreed_packets();
@@ -420,7 +468,6 @@
 
                 Alarm( PROTOCOL, "Prot_handle_bcast: packet %d inserted\n",
                        pack_ptr->seq );
-        }      /* END OF LOOP */
 
         GlobalStatus.packet_recv++;
 	GlobalStatus.my_aru = My_aru;
@@ -442,7 +489,11 @@
 	int16		rtr_seg_index;
 	int32		val;
 	int		retrans_allowed; /* how many of my retrans are allowed on token */
+	int		max_rtr_seq;
 	int		i, ret;
+        int             num_bcast, num_token;
+        channel         *bcast_channels;
+        channel         *token_channels;
 
 	/*	int		r1,r2;*/
 
@@ -532,6 +583,40 @@
 	}
 	if( Highest_seq < Token->seq ) Highest_seq = Token->seq;
 		
+        /* I don't want to process my next token until I have processed
+        *  all bcast packets sent in the previous round, so I should give 
+        *  bcast channels a higher priority until then */
+        Received_token_rounds++;
+        if( Token_has_priority == TRUE )
+        {
+                bcast_channels = Net_bcast_channel();
+                token_channels = Net_token_channel();
+                Net_num_channels( &num_bcast, &num_token );
+                for ( i = 0; i < num_bcast; i++ ) {
+                    ret = E_detach_fd_priority( *bcast_channels, READ_FD, MEDIUM_PRIORITY );
+                    if( ret < 0 ) {
+                        Alarm( EXIT, "Prot_handle_token: bcast_channel being detached was not found\n");
+                    }
+                    E_attach_fd( *bcast_channels, READ_FD, Prot_handle_bcast, 0, NULL, HIGH_PRIORITY );
+                    if( ret < 0 ) {
+                        Alarm( EXIT, "Prot_handle_token: bcast_channel could not be attached\n");
+                    }
+                    bcast_channels++;
+                }
+                for ( i = 0; i < num_token; i++ ) {
+                    ret = E_detach_fd_priority( *token_channels, READ_FD, HIGH_PRIORITY );
+                    if( ret < 0 ) {
+                        Alarm( EXIT, "Prot_handle_token: token_channel being detached was not found\n");
+                    }
+                    E_attach_fd( *token_channels, READ_FD, Prot_handle_token, 0, NULL, MEDIUM_PRIORITY );
+                    if( ret < 0 ) {
+                        Alarm( EXIT, "Prot_handle_token: token_channel could not be attached\n");
+                    }
+                    token_channels++;
+                }
+                Token_has_priority = FALSE;
+        }
+		
 	/* Handle retransmissions */
 	num_retrans = Answer_retrans( &new_ptr, &rtr_proc_id, &rtr_seg_index );
 	GlobalStatus.retrans += num_retrans;
@@ -560,7 +645,18 @@
 	}
 	GlobalStatus.my_aru = My_aru;
 
-	if( My_aru < Highest_seq )
+	/* Determine sequence up through which retransmissions should be
+         * requested: If Accelerated_ring is true, packets with sequence
+         * numbers between the sequence on the last token this process sent
+         * and the sequence on the token it just received may still be in
+         * transit or not yet sent, so they should not be requested at this point*/
+	if( FC_accelerated_ring() == TRUE && Memb_state() != EVS ){
+		max_rtr_seq = Last_token->seq;
+	}else{
+		max_rtr_seq = Highest_seq;
+	}
+
+	if( My_aru < max_rtr_seq )
 	{
 	    /* Compute how many of my retransmission requests are possible to fit */
 	    retrans_allowed = ( sizeof( token_body ) - new_ptr - sizeof( ring_rtr ) ) / sizeof( int32 );
@@ -572,7 +668,7 @@
 	    	ring_rtr_ptr->seg_index	= rtr_seg_index;
 	    	ring_rtr_ptr->num_seq	= 0;
 	    	new_ptr += sizeof(ring_rtr);
-	    	for( i=My_aru+1; i <= Highest_seq && retrans_allowed > 0; i++ )
+	    	for( i=My_aru+1; i <= max_rtr_seq && retrans_allowed > 0; i++ )
 	    	{
 			if( ! Packets[i & PACKET_MASK].exist ) 
 			{
@@ -651,6 +747,8 @@
 
 	E_queue( Memb_token_loss, 0, NULL, Token_timeout );
 
+        /* Send any packets remaining in queue */
+        Prot_flush_bcast(&Send_pack_queue);
 
 	/* calculating Aru */
 	if( Token->aru > Last_token->aru )
@@ -762,7 +860,7 @@
 
         /* update protocol variables with new conf */
         My = Conf_my();
-	My_index = Conf_proc_by_id( My.id, &My );
+        My_index = Conf_proc_by_id( My.id, &My );
 
         if (need_memb_partition) {
                 /* make partition */
@@ -810,11 +908,11 @@
 	}
 }
 
-/* ### Pack: this routine has changed */
 static	int	Answer_retrans( int *ret_new_ptr, 
 				int32 *proc_id, int16 *seg_index )
 {
 	int		num_retrans;
+	sys_scatter	*send_pack_ptr;
 	char		*rtr;
 	int		old_ptr,new_ptr;
 	ring_rtr	*ring_rtr_ptr;
@@ -851,42 +949,40 @@
 
                         if( Packets[pack_entry].exist )
                         {
+                                send_pack_ptr = new(SYS_SCATTER);
+                                send_pack_ptr->num_elements = 2;
+                                send_pack_ptr->elements[0].len = sizeof(packet_header);
                                 pack_ptr = Packets[pack_entry].head;
-                                Send_pack.elements[0].buf = 
+                                send_pack_ptr->elements[0].buf = 
                                     (char *)Packets[pack_entry].head;
-                                Send_pack.elements[1].buf = 
+                                send_pack_ptr->elements[1].buf = 
                                     (char *)Packets[pack_entry].body;
-                                Send_pack.elements[1].len = 
+                                send_pack_ptr->elements[1].len = 
                                     pack_ptr->data_len; 
 
                                 if( ring_rtr_ptr->proc_id != -1 )
                                 {
-                                    ret = Net_ucast ( ring_rtr_ptr->proc_id, &Send_pack );
+                                    ret = Net_ucast ( ring_rtr_ptr->proc_id, send_pack_ptr );
+                                    dispose(send_pack_ptr);
 				    GlobalStatus.u_retrans++;
 
                                     Alarm( PROTOCOL, 
         "Answer_retrans: retransmit to proc %d\n", ring_rtr_ptr->proc_id );
                                 }else if( ring_rtr_ptr->seg_index != -1 ) {
-                                    ret = Net_scast ( ring_rtr_ptr->seg_index, &Send_pack );
+                                    ret = Net_scast ( ring_rtr_ptr->seg_index, send_pack_ptr );
+                                    dispose(send_pack_ptr);
 				    GlobalStatus.s_retrans++;
 
                                     Alarm( PROTOCOL, 
         "Answer_retrans: retransmit to seg %d\n", ring_rtr_ptr->seg_index );
                                 }else{
-#if 1
-                                    ret = Net_queue_bcast ( &Send_pack );
-#else
-                                    ret = Net_bcast ( &Send_pack );
-#endif
-				    if( ret > 0 ) GlobalStatus.b_retrans++;
+                                    ret = Prot_queue_bcast ( send_pack_ptr, &Send_pack_queue );
+				    GlobalStatus.b_retrans++;
 
                                     Alarm( PROTOCOL, 
         "Answer_retrans: retransmit to all\n");
                                 }
-				if( ret > 0 )
-				{
                                 	num_retrans++;
-				}
                         }else{
                                 *proc_id = -1;
                                 if( ring_rtr_ptr->seg_index != My.seg_index )
@@ -909,87 +1005,165 @@
             }
         }
 	*ret_new_ptr = new_ptr;
-
-	ret = Net_flush_bcast();
-	if( ret > 0 )
-	{
-		GlobalStatus.b_retrans++;
-		num_retrans++;
-	}
 	return (num_retrans);
 }
 
-/* ### Pack: this routine has changed */
 static	int	Send_new_packets( int num_allowed )
 {
-	packet_header	*pack_ptr;
-	scatter	        *scat_ptr;
-	int		pack_entry;
-	int		num_sent;
-	int		ret;
+        packet_header   *pack_ptr;
+        scatter         *scat_ptr;
+        sys_scatter     *send_pack_ptr;
+        fragment_header *frag_ptr;
+        char            *body_ptr;
+        int             pack_entry;
+        int             num_sent;
+        int             padding_bytes;
+        int             available_bytes;
+        int             ret;
 
-	num_sent = 0;
+        num_sent = 0;
         while( num_sent < num_allowed )
         {
-		/* check if down queue is empty */
-		if( Down_queue_ptr->num_mess == 0 ) break;
+                /* check if down queue is empty */
+                if( Down_queue_ptr->num_mess == 0 ) break;
 
-		/* initialize packet_header */
+                /* initialize packet_header */
                 pack_ptr =  new(PACK_HEAD_OBJ);
 
-		scat_ptr = Down_queue_ptr->first->mess;
+                scat_ptr = Down_queue_ptr->first->mess;
 
                 pack_ptr->type = Down_queue_ptr->first->type;
                 pack_ptr->proc_id = My.id;
                 pack_ptr->memb_id = Memb_id();
                 pack_ptr->seq = Highest_seq+1;
                 Highest_seq++;
-                pack_ptr->fifo_seq = Highest_fifo_seq+1;
-		Highest_fifo_seq++;
+                /*pack_ptr->fifo_seq = Highest_fifo_seq+1; ### Commented out because fifo_seq was replaced with token_round*/
+                Highest_fifo_seq++;
                 pack_ptr->data_len = scat_ptr->elements[
-				Down_queue_ptr->cur_element].len;
+                                Down_queue_ptr->cur_element].len;
+                pack_ptr->first_frag_header.fragment_len = scat_ptr->elements[
+                                Down_queue_ptr->cur_element].len;
 
-                Send_pack.elements[1].buf = scat_ptr->elements[
-					Down_queue_ptr->cur_element].buf;
+                send_pack_ptr = new(SYS_SCATTER); /* send_pack_ptr is freed when the packet is sent (after Net_bcast is called)  */
+                send_pack_ptr->num_elements = 2;
+                send_pack_ptr->elements[0].len = sizeof(packet_header);
+                send_pack_ptr->elements[1].buf = scat_ptr->elements[
+                                        Down_queue_ptr->cur_element].buf;
+                body_ptr = send_pack_ptr->elements[1].buf;
 
-		Down_queue_ptr->cur_element++;
-		if( Down_queue_ptr->cur_element < scat_ptr->num_elements )
-		{
-			/* not last packet in message */
-			pack_ptr->packet_index = Down_queue_ptr->cur_element;
-		}else if( Down_queue_ptr->cur_element == scat_ptr->num_elements ){
-			down_link	*tmp_down;
+                /* Set frag_ptr to point to the fragment header in the packet header */
+                frag_ptr = &(pack_ptr->first_frag_header); 
 
-			/* last packet in message */
-			pack_ptr->packet_index = -scat_ptr->num_elements;
+                /* Loop for filling the packet:
+                 *
+                 * Advance the down queue and set the fragment index for the fragment
+                 * just added to the packet.
+                 *
+                 * Break if another fragment cannot be added to this packet.
+                 *    This happens in 3 cases:
+                 *    1. The down queue is empty.
+                 *    2. The next fragment in the down queue is too big to fit in the packet.
+                 *    3. The next message does not have a type compatible with the current
+                 *       packet type.
+                 *
+                 * If the next fragment can be added (i.e. you didn't break for one of the 3
+                 * reasons above), set the frag_ptr to point after the end of the last fragment
+                 * in the packet body and copy the fragment from the down queue into the packet
+                 * body.
+                 */
+                for(;;)
+                {
+                        /* Advance the down queue and set the fragment index for the fragment
+                         * just added to the packet. */
+                        Down_queue_ptr->cur_element++;
+                        if( Down_queue_ptr->cur_element < scat_ptr->num_elements )
+                        {
+                                /* not last packet in message */
+                                frag_ptr->fragment_index = Down_queue_ptr->cur_element;
+                        }else if( Down_queue_ptr->cur_element == scat_ptr->num_elements ){
+                                down_link	*tmp_down;
 
-			tmp_down = Down_queue_ptr->first;
-			Down_queue_ptr->first = Down_queue_ptr->first->next;
-			Down_queue_ptr->cur_element = 0;
-			Down_queue_ptr->num_mess--;
-			dispose( tmp_down->mess );
-			dispose( tmp_down );
-			if( Down_queue_ptr->num_mess < WATER_MARK ) 
-				Sess_unblock_users_level();
-		}else{
-			Alarm( EXIT, 
-                          "Send_new_packets: error in packet index: %d %d\n",
-			  Down_queue_ptr->cur_element,scat_ptr->num_elements );
-		}
+                                /* last packet in message */
+                                frag_ptr->fragment_index = -scat_ptr->num_elements;
 
-                Send_pack.elements[0].buf = (char *) pack_ptr;
-                Send_pack.elements[1].len = pack_ptr->data_len;
+                                tmp_down = Down_queue_ptr->first;
+                                Down_queue_ptr->first = Down_queue_ptr->first->next;
+                                Down_queue_ptr->cur_element = 0;
+                                Down_queue_ptr->num_mess--;
+                                dispose( tmp_down->mess );
+                                dispose( tmp_down );
+                        if( Down_queue_ptr->num_mess < WATER_MARK ) 
+                                Sess_unblock_users_level();
+                        }else{
+                                Alarm( EXIT, 
+                                        "Send_new_packets: error in packet index: %d %d\n",
+                                        Down_queue_ptr->cur_element,scat_ptr->num_elements );
+                        }
+                        /* Break if another fragment cannot be added to this packet.
+                         *    This happens in 3 cases:
+                         *    1. The down queue is empty.
+                         *    2. The next fragment in the down queue is too big to fit in the packet.
+                         *    3. The next message does not have a type compatible with the current
+                         *       packet type.
+                         */
+                        if( Down_queue_ptr->num_mess == 0 ) break;
 
-#if 1
-                ret = Net_queue_bcast( &Send_pack );
-#else
-                ret = Net_bcast( &Send_pack );
-#endif
-		if( ret > 0 )
-		{
-			num_sent++;
-		}
+                        padding_bytes = 0; 
+                        switch (pack_ptr->data_len % 4)
+                        {
+                                case 1:
+                                        padding_bytes++;
+                                case 2:
+                                        padding_bytes++;
+                                case 3:
+                                        padding_bytes++;
+                                case 0:
+                                        /* already aligned */
+                                        break;
+                        }
+                        available_bytes = sizeof(packet_body) - pack_ptr->data_len - padding_bytes - sizeof(fragment_header);
+                        scat_ptr = Down_queue_ptr->first->mess;
+                        /* 
+                         * The comparison doesn't work without the (int) cast because len is size_t and available_bytes is int
+                         * It is probable that size_t is defined as unsigned. Note that available_bytes can be negative.
+                         * Without the casting of size_t to int, available_bytes will be automatically casted to size_t
+                         * making it a very large number and causing a bug. Therefore, we have to cast the left side
+                         * of the equation to int
+                         */ 
+                        if( ( (int) scat_ptr->elements[Down_queue_ptr->cur_element].len ) > available_bytes ) break;
 
+                        if( pack_ptr->type != Down_queue_ptr->first->type )
+                        {
+                                if( !(Is_fifo(pack_ptr->type) || Is_agreed(pack_ptr->type)) ||
+                                            !(Is_fifo(Down_queue_ptr->first->type) || Is_agreed(Down_queue_ptr->first->type)) )
+                                               break;
+                        }
+                        /* 
+                         * If the next fragment can be added (i.e. we didn't break for one of the 3
+                         * reasons above), set the frag_ptr to point after the end of the last fragment
+                         * in the packet body and copy the fragment from the down queue into the packet
+                         * body.
+                         */
+                        frag_ptr = (fragment_header *) &body_ptr[ pack_ptr->data_len + padding_bytes ];
+                        frag_ptr->fragment_len = scat_ptr->elements[Down_queue_ptr->cur_element].len;
+                        pack_ptr->data_len += padding_bytes + sizeof(fragment_header);
+                        memcpy(&body_ptr[pack_ptr->data_len], scat_ptr->elements[Down_queue_ptr->cur_element].buf, frag_ptr->fragment_len);
+                        pack_ptr->data_len += frag_ptr->fragment_len;
+                        /* 
+                         * Dispose buf for copied fragment. Note that this is only possible because
+                         * copied fragments are guaranteed to be whole messages (so we can't have
+                         * a scenario in which it is part of a partially sent message). The first fragment
+                         * will be disposed when the packet is discarded/the message is delivered 
+                         * */
+                        dispose( (packet_body *) (scat_ptr->elements[Down_queue_ptr->cur_element].buf) );
+                }
+
+                send_pack_ptr->elements[0].buf = (char *) pack_ptr;
+                send_pack_ptr->elements[1].len = pack_ptr->data_len;
+
+                ret = Prot_queue_bcast( send_pack_ptr, &Send_pack_queue );
+                num_sent++;
+ 
                 pack_entry = pack_ptr->seq & PACKET_MASK;
                 if( Packets[pack_entry].exist ) 
                     Alarm( EXIT, 
@@ -998,37 +1172,195 @@
 
                 /* insert new created packet */
                 Packets[pack_entry].head       = pack_ptr;
-                Packets[pack_entry].body       = (packet_body *)Send_pack.elements[1].buf;
+                Packets[pack_entry].body       = (packet_body *)send_pack_ptr->elements[1].buf;
                 Packets[pack_entry].exist      = 1;
                 Packets[pack_entry].proc_index = My_index;
                 Alarm( PROTOCOL, 
-			"Send_new_packets: packet %d sent and inserted \n",
-			 pack_ptr->seq );
+                        "Send_new_packets: packet %d sent and inserted \n",
+                         pack_ptr->seq );
         }
-	ret = Net_flush_bcast();
-	if( ret > 0 )
-	{
-		num_sent++;
-	}
-	return ( num_sent );
+        return ( num_sent );
 }
  
+static  int     Prot_queue_bcast( sys_scatter *send_pack_ptr, packet_queue *pack_queue )
+{
+        sys_scatter *scat_ptr;
+        queue_link *link_ptr;
+        packet_header *pack_ptr; /* Added to set token_rounds before sending */
+        int num_pack_to_send;
+        int ret;
+        int i; 
+
+        /* To queue or send a new packet:
+         * 1. Determine what you need to send from queue.
+         *    If Accelerated_ring is not used, send everything. (unless the Accelerated_ring
+         *    option is changed dynamically, there should not be anything in the queue)
+         *    Otherwise, send the difference between the number of packets in the queue
+         *      and the max packets in the queue, plus one to account for the new packet.
+         *      (unless max_packets is changed dynamically, this should be 1, if the queue
+         *      is full, or 0, if the queue is not full)
+         * 2. Send the number of packets that you calculated.
+         * 3. If Accelerated_ring is not used, or the max packets in the queue is 0, send
+         *    the new packet.
+         *    Otherwise, add the new packet to the queue.
+         */
+        if( FC_accelerated_ring() == FALSE || Memb_state() == EVS)
+        {
+                num_pack_to_send = pack_queue->num_packets;
+        }else{
+                num_pack_to_send = pack_queue->num_packets - FC_accelerated_window() + 1;
+                if( num_pack_to_send < 0 ) num_pack_to_send = 0;
+                /* The case below can happen if accelerated window changes dynamically */
+                if( num_pack_to_send > pack_queue->num_packets ) num_pack_to_send = pack_queue->num_packets;
+        }
+
+        for( i = 0; i < num_pack_to_send; i++ )
+        {
+                scat_ptr = pack_queue->first->packet;
+                link_ptr = pack_queue->first;
+                pack_queue->first = pack_queue->first->next;
+                pack_queue->num_packets--;
+                dispose(link_ptr);
+                pack_ptr = (packet_header *) scat_ptr->elements[0].buf;
+                pack_ptr->token_round = Token_rounds;
+                Net_bcast(scat_ptr);
+                dispose(scat_ptr);
+        }
+        ret = num_pack_to_send;
+
+        if( FC_accelerated_ring() == FALSE || FC_accelerated_window() == 0 || Memb_state() == EVS )
+        {
+                pack_ptr = (packet_header *) send_pack_ptr->elements[0].buf;
+                pack_ptr->token_round = Token_rounds;
+                Net_bcast(send_pack_ptr);
+                dispose(send_pack_ptr);
+                ret++;
+        }else{
+                link_ptr = new(QUEUE_LINK);
+                link_ptr->packet = send_pack_ptr;
+                link_ptr->next = NULL;
+
+                if( pack_queue->num_packets == 0 )
+                {
+                        pack_queue->first = link_ptr;
+                }else{
+                        pack_queue->last->next = link_ptr;
+                }
+                pack_queue->last = link_ptr;
+                pack_queue->num_packets++;
+        }
+
+        return ( ret );
+}
+
+static  int     Prot_flush_bcast( packet_queue *pack_queue )
+{
+        sys_scatter *scat_ptr;
+        queue_link *link_ptr;
+        packet_header *pack_ptr;
+        int ret;
+
+        ret = pack_queue->num_packets;
+        while( pack_queue->num_packets > 0 )
+        {
+                scat_ptr = pack_queue->first->packet;
+                link_ptr = pack_queue->first;
+                pack_queue->first = pack_queue->first->next;
+                pack_queue->num_packets--;
+                dispose(link_ptr);
+                pack_ptr = (packet_header *) scat_ptr->elements[0].buf;
+                pack_ptr->token_round = Token_rounds;
+                Net_bcast(scat_ptr);
+                dispose(scat_ptr);
+        }
+        return( ret );
+}
+ 
 static	void	Deliver_packet( int pack_entry, int to_copy )
 {	
 	int		proc_index;
 	up_queue	*up_ptr;
 	packet_header	*pack_ptr;
 	message_link	*mess_link;
+        fragment_header *frag_ptr;
+        char            *pack_body_ptr;
+        message_queue   mess_queue;
+        int             processed_bytes;
+        int             padding_bytes;
 	int		index;
 
 	pack_ptr = Packets[pack_entry].head;
 
+        /* 
+         * For multi-fragment packets, the following observations can be made:
+         *     1. The first fragment is either the last fragment in a multi-packet message
+         *        or a whole message.
+         *     2. Each fragment after the first must be a whole message.
+         *
+         * For any fragment beyond the first, we create a message (with a new packet body)
+         * and put it in a queue. We then process the first fragment, including the
+         * decision whether to copy it or not. Finally, we deliver to session all queued
+         * messages coming from the additional fragments.
+         */
+
+        mess_queue.num_messages = 0;
+        mess_queue.first = NULL;
+        mess_queue.last = NULL;
+        pack_body_ptr = (char *) Packets[pack_entry].body;
+        frag_ptr = &(pack_ptr->first_frag_header);
+        processed_bytes = frag_ptr->fragment_len;
+        while( processed_bytes < pack_ptr->data_len )
+        {
+                padding_bytes = 0;
+                switch (processed_bytes % 4)
+                {
+                        case 1:
+                                padding_bytes++;
+                        case 2:
+                                padding_bytes++;
+                        case 3:
+                                padding_bytes++;
+                        case 0:
+                                /* already aligned */
+                                break;
+                }
+                processed_bytes += padding_bytes;
+                frag_ptr = (fragment_header *) &pack_body_ptr[processed_bytes];
+                processed_bytes += sizeof(fragment_header);
+                if( processed_bytes + frag_ptr->fragment_len > pack_ptr->data_len )
+                {
+                        Alarm( PRINT, "Deliver_packet: invalid packet with seq %d from %d, fragments exceed data_len %d %d %d\n", 
+                                pack_ptr->seq, pack_ptr->transmiter_id, processed_bytes, frag_ptr->fragment_len, pack_ptr->data_len );
+                        break;
+                }
+
+                /* Creating new message */
+                mess_link = new(MESSAGE_LINK);
+                mess_link->next = NULL;
+                mess_link->mess = new(SCATTER);
+                mess_link->mess->num_elements = 1;
+                mess_link->mess->elements[0].len = frag_ptr->fragment_len;
+                mess_link->mess->elements[0].buf = new(PACKET_BODY);
+                memcpy(mess_link->mess->elements[0].buf, &pack_body_ptr[processed_bytes], frag_ptr->fragment_len);
+                processed_bytes += frag_ptr->fragment_len;
+                if( mess_queue.num_messages == 0 ) {
+                        mess_queue.first = mess_link;
+                        mess_queue.last = mess_link;
+                } else {
+                        mess_queue.last->next = mess_link;
+                        mess_queue.last = mess_link;
+                }
+                mess_queue.num_messages++;
+        }
+
 	if( Is_reliable( pack_ptr->type ) &&
-	    pack_ptr->packet_index == -1 ) 
+	    pack_ptr->first_frag_header.fragment_index == -1 ) 
 	{
 		/*
 		 * for reliable single-packets message : deliver regardless
 		 * of what is already in the queue.
+                 * This also applies to reliable multi-fragment packets where all
+                 * fragments have fragment index -1.
 		 */
 		proc_index = MAX_PROCS_RING;
 	}else{
@@ -1046,20 +1378,20 @@
 	}
 
 	/* validity check */
-	index = pack_ptr->packet_index;
+	index = pack_ptr->first_frag_header.fragment_index;
 	if( index < 0 ) index = -index;
 	if( up_ptr->mess->num_elements+1 != index )
 	{
 		Alarm( EXIT, "Deliver_packet: sequence error: sec is %d, should be %d\n",
-			pack_ptr->packet_index,
+			pack_ptr->first_frag_header.fragment_index,
 			up_ptr->mess->num_elements+1 );
 	}
 
 	/* chain this packet */
 	up_ptr->mess->num_elements++;
-	up_ptr->mess->elements[index-1].len = Packets[pack_entry].head->data_len;
+	up_ptr->mess->elements[index-1].len = Packets[pack_entry].head->first_frag_header.fragment_len;
 	up_ptr->mess->elements[index-1].buf = (char *)Packets[pack_entry].body;
-	if( to_copy)
+	if( to_copy )
 	{
 		/* 
 		 * copy the packet.
@@ -1078,7 +1410,7 @@
 
 	GlobalStatus.packet_delivered++;
 
-	if( pack_ptr->packet_index < 0 )
+	if( pack_ptr->first_frag_header.fragment_index < 0 )
 	{
 		/* end of message */
 		/* Push up big_scatter. i.e. up_ptr->mess */
@@ -1087,6 +1419,15 @@
 		up_ptr->exist = 0;
 		Sess_deliver_message( mess_link );
 	}
+
+        /* Delivering all messages coming from non-first fragments */
+        while( mess_queue.num_messages > 0 )
+        {
+                mess_link = mess_queue.first;
+                mess_queue.first = mess_queue.first->next;
+                mess_queue.num_messages--;
+                Sess_deliver_message(mess_link);
+        }
 }
 
 static	void	Deliver_reliable_packets( int32 start_seq, int num_packets )
@@ -1105,7 +1446,7 @@
 		if( Packets[pack_entry].exist == 1 )
 		{
 			if( Is_reliable( Packets[pack_entry].head->type ) &&
-			    Packets[pack_entry].head->packet_index == -1 )
+			    Packets[pack_entry].head->first_frag_header.fragment_index == -1 )
 			{
 				Deliver_packet( pack_entry, 1 );
 				Alarm( PROTOCOL, "Deliver_reliable_packets: packet %d was delivered\n", i );
@@ -1427,6 +1768,12 @@
 		Get_arq(Last_token->type), Get_retrans(Last_token->type) );
 }
 
+void    Prot_set_prev_proc(configuration *memb)
+{
+        Prev_proc_id = Conf_previous(memb);
+        Alarm( PROTOCOL, "Prev_proc_id: %d, My.id: %d\n", Prev_proc_id, My.id );
+}
+
 void	Flip_token_body( char *buf, token_header *token_ptr )
 {
 /*
@@ -1462,3 +1809,9 @@
 	    }	
 	}
 }
+
+void Flip_frag( fragment_header *frag_ptr )
+{
+        frag_ptr->fragment_index = Flip_int16( frag_ptr->fragment_index );
+        frag_ptr->fragment_len   = Flip_int16( frag_ptr->fragment_len );
+}

Modified: branches/experimental-4.3/daemon/protocol.h
===================================================================
--- branches/experimental-4.3/daemon/protocol.h	2013-07-18 17:30:58 UTC (rev 581)
+++ branches/experimental-4.3/daemon/protocol.h	2013-07-18 18:21:26 UTC (rev 582)
@@ -64,5 +64,6 @@
 void    Prot_Destroy_Local_Session(session *old_sess);
 down_link       *Prot_Create_Down_Link(message_obj *msg, int type, int mbox, int cur_element);
 void    Prot_kill_session(message_obj *msg);
+void	Prot_set_prev_proc(configuration *memb);
 
 #endif	/* INC_PROTOCOL */ 

Modified: branches/experimental-4.3/daemon/spread.c
===================================================================
--- branches/experimental-4.3/daemon/spread.c	2013-07-18 17:30:58 UTC (rev 581)
+++ branches/experimental-4.3/daemon/spread.c	2013-07-18 18:21:26 UTC (rev 582)
@@ -24,6 +24,7 @@
  *
  * Major Contributor(s):
  * ---------------
+ *    Amy Babay            babay at cs.jhu.edu - accelerated ring protocol.
  *    Ryan Caudy           rcaudy at gmail.com - contributions to process groups.
  *    Claudiu Danilov      claudiu at acm.org - scalable wide area support.
  *    Cristina Nita-Rotaru crisn at cs.purdue.edu - group communication security.
@@ -93,7 +94,7 @@
         Alarm_set_priority( SPLOG_INFO );
 
 	Alarmp( SPLOG_PRINT, SYSTEM, "/===========================================================================\\\n");
-	Alarmp( SPLOG_PRINT, SYSTEM, "| The Spread Toolkit.                                                       |\n");
+	Alarmp( SPLOG_PRINT, SYSTEM, "| The Spread Toolkit - Accelerated Ring Experimental Research Version       |\n");
 	Alarmp( SPLOG_PRINT, SYSTEM, "| Copyright (c) 1993-2013 Spread Concepts LLC                               |\n"); 
 	Alarmp( SPLOG_PRINT, SYSTEM, "| All rights reserved.                                                      |\n");
 	Alarmp( SPLOG_PRINT, SYSTEM, "|                                                                           |\n");
@@ -115,6 +116,7 @@
         Alarmp( SPLOG_PRINT, SYSTEM, "|    John Schultz          jschultz at spreadconcepts.com                      |\n");
 	Alarmp( SPLOG_PRINT, SYSTEM, "|                                                                           |\n");
 	Alarmp( SPLOG_PRINT, SYSTEM, "| Major Contributors:                                                       |\n");
+        Alarmp( SPLOG_PRINT, SYSTEM, "|    Amy Babay            babay at cs.jhu.edu - accelerated ring protocol.     |\n");
         Alarmp( SPLOG_PRINT, SYSTEM, "|    Ryan Caudy           rcaudy at gmail.com - contribution to process groups.|\n");
         Alarmp( SPLOG_PRINT, SYSTEM, "|    Claudiu Danilov      claudiu at acm.org - scalable, wide-area support.    |\n");
         Alarmp( SPLOG_PRINT, SYSTEM, "|    Cristina Nita-Rotaru crisn at cs.purdue.edu - GC security.                |\n");
@@ -134,7 +136,7 @@
 	Alarmp( SPLOG_PRINT, SYSTEM, "| WWW:     www.spread.org     www.spreadconcepts.com                        |\n");
 	Alarmp( SPLOG_PRINT, SYSTEM, "| Contact: info at spreadconcepts.com                                          |\n");
 	Alarmp( SPLOG_PRINT, SYSTEM, "|                                                                           |\n");
-	Alarmp( SPLOG_PRINT, SYSTEM, "| Version %d.%02d.%02d Built %-17s                                   |\n", 
+	Alarmp( SPLOG_PRINT, SYSTEM, "| Version %d.%02d.%02d Built %-17s -accelerated ring research version|\n", 
 		(int)SP_MAJOR_VERSION, (int)SP_MINOR_VERSION, (int)SP_PATCH_VERSION, Spread_build_date );
 	Alarmp( SPLOG_PRINT, SYSTEM, "\\===========================================================================/\n");
 

Modified: branches/experimental-4.3/daemon/spread_params.h
===================================================================
--- branches/experimental-4.3/daemon/spread_params.h	2013-07-18 17:30:58 UTC (rev 581)
+++ branches/experimental-4.3/daemon/spread_params.h	2013-07-18 18:21:26 UTC (rev 582)
@@ -41,7 +41,7 @@
 #define         SP_PATCH_VERSION        0
 #define         SPREAD_PROTOCOL         3
 
-#define         SPREAD_BUILD_DATE       "11/June/2013"
+#define         SPREAD_BUILD_DATE       "02/July/2013"
 
 #define		DEFAULT_SPREAD_PORT	4803
 

Modified: branches/experimental-4.3/daemon/status.h
===================================================================
--- branches/experimental-4.3/daemon/status.h	2013-07-18 17:30:58 UTC (rev 581)
+++ branches/experimental-4.3/daemon/status.h	2013-07-18 18:21:26 UTC (rev 582)
@@ -63,6 +63,8 @@
 	int16	num_segments;
 	int16	window;
 	int16	personal_window;
+	int16	accelerated_ring;
+	int16	accelerated_window;
 	int16	num_sessions;
 	int16	num_groups;
 	int16	major_version;

Modified: branches/experimental-4.3/docs/sample.spread.conf
===================================================================
--- branches/experimental-4.3/docs/sample.spread.conf	2013-07-18 17:30:58 UTC (rev 581)
+++ branches/experimental-4.3/docs/sample.spread.conf	2013-07-18 18:21:26 UTC (rev 582)
@@ -103,7 +103,7 @@
 #If option is set to true then all monitor commands are enabled. 
 #   THIS IS A SECURTIY RISK IF YOUR NETWORK IS NOT PROTECTED!
 
-#DangerousMonitor = false
+DangerousMonitor = true
 
 #Set handling of SO_REUSEADDR socket option for the daemon's TCP
 # listener.  This is useful for facilitating quick daemon restarts (OSes
@@ -235,10 +235,20 @@
 # networks. If no letter is listed before the interface address then ALL types of 
 # events are handled on that interface.
 
+# AcceleratedRing flag indicates which protocol to use. If false, the normal Spread
+# ring protocol is used. If true, a currently experimental accelerated ring protocol
+# is used. Note that daemons can only talk with daemons that run the same protocol - they
+# will not agree to talk with daemons that run the other protocol. The accelerated ring
+# protocol uses a flow control algorithm similar to the normal protocol, with the
+# exception that up to AcceleratedWindow packets out of the PersonalWindow packets
+# are sent after the token is sent (the token still reflects these packets).
+AcceleratedRing = true
+
 # Flow Control Parameters
 #
-#Window = 100
-#PersonalWindow = 20
+Window = 100
+PersonalWindow = 20
+AcceleratedWindow = 20
 
 # Membership Timeouts (in terms of milliseconds)
 # 

Modified: branches/experimental-4.3/libspread-util/include/spu_events.h
===================================================================
--- branches/experimental-4.3/libspread-util/include/spu_events.h	2013-07-18 17:30:58 UTC (rev 581)
+++ branches/experimental-4.3/libspread-util/include/spu_events.h	2013-07-18 18:21:26 UTC (rev 582)
@@ -89,6 +89,7 @@
 		     void (* func)( int fd, int code, void *data), int code,
 		     void *data, int priority );
 int 	E_detach_fd( int fd, int fd_type );
+int 	E_detach_fd_priority( int fd, int fd_type, int priority );
 int 	E_set_active_threshold( int priority );
 int     E_activate_fd( int fd, int fd_type );
 int     E_deactivate_fd( int fd, int fd_type );

Modified: branches/experimental-4.3/libspread-util/include/spu_objects_local.h
===================================================================
--- branches/experimental-4.3/libspread-util/include/spu_objects_local.h	2013-07-18 17:30:58 UTC (rev 581)
+++ branches/experimental-4.3/libspread-util/include/spu_objects_local.h	2013-07-18 18:21:26 UTC (rev 582)
@@ -82,6 +82,7 @@
 #define MEMBER                  32
 #define MSG_LIST_ENTRY          33
 #define SESS_SEQ_ENTRY          34
+#define QUEUE_LINK              35
 
 #define ROUTE_WEIGHTS           36
 #define PROF_FUNCT              37

Modified: branches/experimental-4.3/libspread-util/src/events.c
===================================================================
--- branches/experimental-4.3/libspread-util/src/events.c	2013-07-18 17:30:58 UTC (rev 581)
+++ branches/experimental-4.3/libspread-util/src/events.c	2013-07-18 18:21:26 UTC (rev 582)
@@ -617,7 +617,7 @@
 
 int 	E_detach_fd( int fd, int fd_type )
 {
-	int	i,j;
+	int	i;
 	int	found;
 
 	if( fd_type < 0 || fd_type > NUM_FDTYPES )
@@ -628,21 +628,45 @@
 
 	found = 0;
 	for( i=0; i < NUM_PRIORITY; i++ )
-	    for( j=0; j < Fd_queue[i].num_fds; j++ )
+        {
+            if( E_detach_fd_priority( fd, fd_type, i ) == 0 )
+            {
+                found = 1;
+            }
+	}
+
+	if( ! found ) return( -1 );
+
+	return( 0 );
+}
+
+int 	E_detach_fd_priority( int fd, int fd_type, int priority )
+{
+	int	i;
+	int	found;
+
+	if( fd_type < 0 || fd_type > NUM_FDTYPES )
+	{
+		Alarm( PRINT, "E_detach_fd: invalid fd_type %d for fd %d\n", fd_type, fd );
+		return( -1 );
+	}
+
+	found = 0;
+	for( i=0; i < Fd_queue[priority].num_fds; i++ )
+	{
+	    if( ( Fd_queue[priority].events[i].fd == fd ) && ( Fd_queue[priority].events[i].fd_type == fd_type ) )
 	    {
-		if( ( Fd_queue[i].events[j].fd == fd ) && ( Fd_queue[i].events[j].fd_type == fd_type ) )
-		{
-                        if (Fd_queue[i].events[j].active)
-                                Fd_queue[i].num_active_fds--;
-			Fd_queue[i].num_fds--;
-			Fd_queue[i].events[j] = Fd_queue[i].events[Fd_queue[i].num_fds];
+                    if (Fd_queue[priority].events[i].active)
+                            Fd_queue[priority].num_active_fds--;
+	            Fd_queue[priority].num_fds--;
+		    Fd_queue[priority].events[i] = Fd_queue[priority].events[Fd_queue[priority].num_fds];
 
-			FD_CLR( fd, &Fd_mask[fd_type] );
-			found = 1;
+		    FD_CLR( fd, &Fd_mask[fd_type] );
+		    found = 1;
 
-			break; /* from the j for only */
-		}
-	    }
+		    break;
+            }
+	}
 
 	if( ! found ) return( -1 );
 

Added: branches/experimental-4.3/release_announcement_4.3.txt
===================================================================
--- branches/experimental-4.3/release_announcement_4.3.txt	                        (rev 0)
+++ branches/experimental-4.3/release_announcement_4.3.txt	2013-07-18 18:21:26 UTC (rev 582)
@@ -0,0 +1,49 @@
+This version is an experimental version based on the Spread
+Toolkit 4.3. The file release_announcement_accelerated_ring.txt contains
+the release announcement relevant to the experimental version.
+Below is the original announcement of the Spread Toolkit version 4.3.0.
+
+Spread 4.3.0  http://www.spread.org
+
+Spread Concepts LLC is happy to announce the release of a 
+new stable version, 4.3.0, of the Spread toolkit. 
+
+The Spread 4.3 release is an important release focusing on improved out-of-the-box
+performance for the Spread toolkit. 
+This release has a large number of small changes which include bugfixes,
+very specific performance improvements, and general improvements to the code 
+and build process.
+
+The main new features of this release are:
+
+1) Performance improvements to membership timing and system throughput. 
+2) Provide runtime configurability for membership timers and flow control 
+parameters via the spread.conf file. 
+3) Redesign of C library locking to improve throughput and remove blocking. 
+4) Smarter autodetection of both the best client-server connection method 
+(local system unix sockets or tcp) and the local hostname when starting daemons.
+
+It also includes a number of bug fixes. For details check the Readme.txt file. 
+
+This release does not include any API changes, so applications should
+be able to be relinked or recompiled with the new Spread library without
+changes. 
+
+The Spread toolkit provides a high performance messaging service
+that is resilient to faults across local and wide area networks.
+Spread functions as a unified message bus for distributed applications, 
+and provides highly tuned application-level multicast, group communication, 
+and point to point support. Spread services range from reliable messaging 
+to fully ordered messages with virtual synchrony delivery guarantees, even in case 
+of computer failures and network partitions.
+
+Please be aware, that under the Spread Open Source License, the toolkit may 
+be freely used only under some conditions. For example, the license includes 
+the requirement that all advertising materials (including web pages) 
+mentioning software that uses Spread display a specific acknowledgment. 
+Please review the license agreement for more details.
+http://www.spread.org/license/
+
+Other commercial licenses or other licensing arrangements are available. 
+Please contact info at spreadconcepts.com. 
+

Added: branches/experimental-4.3/release_announcement_accelerated_ring.txt
===================================================================
--- branches/experimental-4.3/release_announcement_accelerated_ring.txt	                        (rev 0)
+++ branches/experimental-4.3/release_announcement_accelerated_ring.txt	2013-07-18 18:21:26 UTC (rev 582)
@@ -0,0 +1,21 @@
+The Spread Toolkit Accelerated Ring Experimental release 4.3 is a research version 
+of the Spread toolkit based on the Spread Toolkit 4.3 public release available 
+at www.spread.org. 
+
+This experimental release features an experimental protocol tailored for
+data center networks that can provide 30%-50% higher throughput and 20-35% lower
+latency in modern local area networks. Of course, the version also supports 
+the original Ring protocol.
+
+The new features of this experimental release are:
+
+1) Accelerated Ring protocol to improve both throughput and latency in local area networks.
+
+2) Redesigned message-packing, reducing overhead for small messages.
+
+To use the experimental protocol, the AcceleratedRing parameter must be set to
+"true" in the spread.conf file, and an AcceleratedWindow with a value between 0
+and the value of the PersonalWindow parameter should be specified as a flow
+control parameter in the spread.conf file. The sample.spread.conf file included
+in this experimental release has appropriate settings for these parameters that i
+enable the Accelerated Ring protocol, as well as a description of their functions.




More information about the Spread-cvs mailing list