[Spread-users] Multipath Spread #3

Marc Zyngier Marc.Zyngier at evidian.com
Tue Oct 23 11:53:39 EDT 2001


Hello all,

This is the third release of the multipath patch for Spread (made
against CVS as of Oct 23rd). The purpose of patch is to send Spread
traffic on multiple networks, to support network failures in a high
avaibility environment.

As of this version, Spread traffic (both broadcasts and token
circulation) is replicated on all networks. There is no code to
handle primary/backup links yet, although we already introduced some
state information in this patch.

What's new :

* Nothing.

This release is just a synchronisation point with the current CVS
version. If you don't have CVS access, your best bet is certainly to
use the previous release (or even better, to ask for CVS access...).

Some API changes that were seen as unnecessary have been reverted to
their original state (data_link). This patch still includes the
(so-called) ARQ patch, which seems to efficiently narrow a race
condition on our test setup.

As for the previous versions, we would be very happy to get any
feedback from both Spread creators and users.

Thanks,

Marc Zyngier
Evidian - SafeKit Project
http://www.evidian.com

<std disclaimer>
Copyright (C) 2001 Evidian.

Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR
ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
</std disclaimer>

Index: LINUX_makefile
===================================================================
RCS file: /storage/cvsroot/spread/daemon/LINUX_makefile,v
retrieving revision 1.2
diff -u -r1.2 LINUX_makefile
--- LINUX_makefile	22 Aug 2001 21:33:39 -0000	1.2
+++ LINUX_makefile	23 Oct 2001 10:13:06 -0000
@@ -15,7 +15,7 @@
 CFLAGS = -O2 -ansi -c -D_GNU_SOURCE -Wall
 TCFLAGS = -O2 -c -D_GNU_SOURCE -Wall # ansi flag cannot be used because of an error when compiling sp.to
 
-COBJS = lex.yy.o y.tab.o configuration.o skiplist.o acm.o 
+COBJS = lex.yy.o y.tab.o configuration.o skiplist.o acm.o multipath.o
 OBJS =  alarm.o events.o memory.o membership.o data_link.o network.o status.o log.o flow_control.o message.o acp-permit.o auth-null.o auth-ip.o
 # add to spread build line if enabling password
 AUTHLIBS = -lcrypt
Index: SOLARIS2.5_makefile
===================================================================
RCS file: /storage/cvsroot/spread/daemon/SOLARIS2.5_makefile,v
retrieving revision 1.1
diff -u -r1.1 SOLARIS2.5_makefile
--- SOLARIS2.5_makefile	1 Sep 2001 13:41:19 -0000	1.1
+++ SOLARIS2.5_makefile	23 Oct 2001 10:13:06 -0000
@@ -11,7 +11,7 @@
 CFLAGS = -O2 -ansi -Wall -DNEED_SOCKLEN_T -c 
 TCFLAGS = -O2 -Wall -DNEED_SOCKLEN_T -c 	# ansi flag cannot be used because of an error when compiling sp.to
 
-COBJS = lex.yy.o y.tab.o configuration.o skiplist.o acm.o 
+COBJS = lex.yy.o y.tab.o configuration.o skiplist.o acm.o multipath.o
 OBJS =  alarm.o events.o memory.o membership.o data_link.o network.o status.o log.o flow_control.o message.o  acp-permit.o auth-null.o auth-ip.o
 
 all: spread spmonitor spuser sptuser simple_user spflooder
Index: SOLARIS8_makefile
===================================================================
RCS file: /storage/cvsroot/spread/daemon/SOLARIS8_makefile,v
retrieving revision 1.1
diff -u -r1.1 SOLARIS8_makefile
--- SOLARIS8_makefile	1 Sep 2001 13:41:19 -0000	1.1
+++ SOLARIS8_makefile	23 Oct 2001 10:13:07 -0000
@@ -11,7 +11,7 @@
 CFLAGS = -O2 -ansi -Wall -c 
 TCFLAGS = -O2 -Wall -c 	# ansi flag cannot be used because of an error when compiling sp.to
 
-COBJS = lex.yy.o y.tab.o configuration.o skiplist.o acm.o 
+COBJS = lex.yy.o y.tab.o configuration.o skiplist.o acm.o multipath.o
 OBJS =  alarm.o events.o memory.o membership.o data_link.o network.o status.o log.o flow_control.o message.o  acp-permit.o auth-null.o auth-ip.o
 
 all: spread spmonitor spuser sptuser simple_user spflooder
Index: SOLARIS_makefile
===================================================================
RCS file: /storage/cvsroot/spread/daemon/SOLARIS_makefile,v
retrieving revision 1.1.1.1
diff -u -r1.1.1.1 SOLARIS_makefile
--- SOLARIS_makefile	21 Aug 2001 14:28:21 -0000	1.1.1.1
+++ SOLARIS_makefile	23 Oct 2001 10:13:07 -0000
@@ -10,7 +10,7 @@
 CFLAGS = -O2 -ansi -Wall -c 
 TCFLAGS = -O2 -Wall -c 	# ansi flag cannot be used because of an error when compiling sp.to
 
-COBJS = lex.yy.o y.tab.o configuration.o skiplist.o acm.o 
+COBJS = lex.yy.o y.tab.o configuration.o skiplist.o acm.o multipath.o
 OBJS =  alarm.o events.o memory.o membership.o data_link.o network.o status.o log.o flow_control.o message.o  acp-permit.o auth-null.o auth-ip.o
 
 all: spread spmonitor spuser sptuser simple_user spflooder
Index: config_gram.l
===================================================================
RCS file: /storage/cvsroot/spread/daemon/config_gram.l,v
retrieving revision 1.1.1.1
diff -u -r1.1.1.1 config_gram.l
--- config_gram.l	21 Aug 2001 14:28:21 -0000	1.1.1.1
+++ config_gram.l	23 Oct 2001 10:13:07 -0000
@@ -61,6 +61,7 @@
 no      [Nn][Oo]
 ipaddr  [0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}
 ipport  [0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}:[0-9]{1,5}
+mpport  [Mm][Pp]:[0-9]{1,5}
 %option noyywrap
 %%
 #.*                             {} /* Comments */
@@ -151,6 +152,9 @@
                                   yylval.ip.port = 0;
                                   return IPADDR;
                                 }
+{mpport}			{ yylval.ip.port = (unsigned short) atoi (yytext + 3);
+				  return MPPORT;
+				}
 ([0-9]{1,3}[ \t]*)+[ \t]*    { 
                                   int fcost, i, done;
                                   char *c;
Index: config_parse.y
===================================================================
RCS file: /storage/cvsroot/spread/daemon/config_parse.y,v
retrieving revision 1.1.1.1
diff -u -r1.1.1.1 config_parse.y
--- config_parse.y	21 Aug 2001 14:28:21 -0000	1.1.1.1
+++ config_parse.y	23 Oct 2001 10:13:08 -0000
@@ -51,6 +51,7 @@
 #endif /* ARCH_PC_WIN95 */
 
 #include "alarm.h"
+#include "multipath.h"
 #include "configuration.h"
 #include "skiplist.h"
 #include "memory.h"
@@ -67,22 +68,60 @@
  static int     num_procs = 0;
  static int     segment_procs = 0;
  static int     segments = 0;
+ static int	networks = 0;
  static int     rvec_num = 0;
  static int     procs_interfaces = 0;
  static proc    *new_proc;
+ static proc    *current_proc = NULL;
  static int32   temp_seg_list[MAX_PROCS_SEGMENT];
+ static struct mp_net *current_net = NULL;
 
  static int     authentication_configured = 0;
 
+static void create_intfs_current_net (proc *p)
+{
+  int i;
+
+  if (!current_net)
+  {
+    current_net = (struct mp_net *) malloc (sizeof (struct mp_net));
+    memset ((void *) current_net, 0, sizeof (struct mp_net));
+    sprintf (current_net->net_name, "network_%d", networks);
+    current_net->segment = &Config.segments[segments];
+  }
+
+  for (i = 0; i < p->num_if; i++)
+    if (Is_IfType_Daemon(p->ifc[i].type) || Is_IfType_Any(p->ifc[i].type))
+      intf_add (p->node, current_net, &p->ifc[i]);
+}
+
+static void attach_to_current_net (struct mp_node *n, struct if_info *ifc)
+{
+  if (!current_net)
+  {
+    current_net = (struct mp_net *) malloc (sizeof (struct mp_net));
+    memset ((void *) current_net, 0, sizeof (struct mp_net));
+    sprintf (current_net->net_name, "network_%d", networks);
+    current_net->segment = &Config.segments[segments];
+  }
+
+  intf_add (n, current_net, ifc);
+}
+
 static char *segment2str(int seg) {
-  static char ipstr[40];
-  int id = Config.segments[seg].bcast_address;
-  sprintf(ipstr, "%d.%d.%d.%d:%d",
-  	(id & 0xff000000)>>24,
-  	(id & 0xff0000)>>16,
-  	(id & 0xff00)>>8,
-  	(id & 0xff),
-	Config.segments[seg].port);
+  static char ipstr[256];
+  struct mp_net *net;
+  struct list *tmp;
+  int i = 0;
+
+  for (tmp = Config.segments[seg].net_head; tmp; tmp = tmp->next)
+  {
+    net = (struct mp_net *) tmp->data;
+    i += sprintf(ipstr + i, "%s:%d/",
+		 inet_ntoa (net->bcast),
+		 Config.segments[seg].port);
+  }
+  ipstr[i-1] = '\0';
   return ipstr;
 }
 static void alarm_print_proc(proc *p, int port) {
@@ -191,6 +230,7 @@
 %token SP_BOOL LINKPROTOCOL PHOP PTCPHOP
 %token IMONITOR ICLIENT IDAEMON
 %token ROUTEMATRIX LINKCOST
+%token MPPORT
 %%
 Config		:	ConfigStructs
                         {
@@ -342,8 +382,33 @@
                           SEGMENT_CHECK( segments, inet_ntoa($2.ip.addr) );
 			  Config.segments[segments].num_procs = segment_procs;
 			  Config.segments[segments].port = $2.ip.port;
-			  Config.segments[segments].bcast_address =
-			    $2.ip.addr.s_addr;
+			  current_net->bcast.s_addr = htonl ($2.ip.addr.s_addr);
+			  net_insert (current_net);
+			  current_net = NULL;
+			  if(Config.segments[segments].port == 0)
+			    Config.segments[segments].port = DEFAULT_SPREAD_PORT;
+			  Alarm(CONF, "Successfully configured Segment %d [%s] with %d procs:\n",
+				segments,
+				segment2str(segments),
+				segment_procs);
+			  for(i=(num_procs - segment_procs); i<num_procs; i++) {
+                                  /* This '1' is to keep each proc with the same port as the segment.*/
+			    if( 1 || Config_procs[i].port==0)  {
+			      Config_procs[i].port=
+				Config.segments[segments].port;
+			    }
+			    alarm_print_proc(&Config_procs[i],
+			    	Config.segments[segments].port);
+			  }
+			  segments++;
+			  networks++;
+			  segment_procs = 0;
+			}
+		|	SEGMENT MPPORT OPENBRACE NetDescs CLOSEBRACE
+			{
+			  int i;
+			  Config.segments[segments].num_procs = segment_procs;
+			  Config.segments[segments].port = $2.ip.port;
 			  if(Config.segments[segments].port == 0)
 			    Config.segments[segments].port = DEFAULT_SPREAD_PORT;
 			  Alarm(CONF, "Successfully configured Segment %d [%s] with %d procs:\n",
@@ -369,13 +434,15 @@
 		;
 
 Segmentparam	:	STRING IPADDR OPENBRACE Interfaceparams CLOSEBRACE
-                        { 
-                          PROCS_CHECK( num_procs, $1.string );
+                        {
+			  PROCS_CHECK( num_procs, $1.string );
                           SEGMENT_CHECK( segments, $1.string );
                           SEGMENT_SIZE_CHECK( segment_procs, $1.string );
                           if (procs_interfaces == 0)
                                   yyerror("Interfaces section declared but no actual interface addresses defined\n");
                           strcpy(Config_procs[num_procs].name, $1.string);
+			  Config_procs[num_procs].node = node_add (Config_procs[num_procs].name,
+								   &Config_procs[num_procs]);
                           Config_procs[num_procs].id = $2.ip.addr.s_addr;
  		          Config_procs[num_procs].port = $2.ip.port;
 			  Config_procs[num_procs].seg_index = segments;
@@ -383,18 +450,21 @@
                           Config_procs[num_procs].num_if = procs_interfaces;
 			  Config.segments[segments].procs[segment_procs] = 
                                   &Config_procs[num_procs];
+			  create_intfs_current_net (&Config_procs[num_procs]);
 			  num_procs++;
 			  segment_procs++;
                           procs_interfaces = 0;
 			}
 		|	STRING OPENBRACE Interfaceparams CLOSEBRACE
-			{ 
+			{
                           PROCS_CHECK( num_procs, $1.string );
                           SEGMENT_CHECK( segments, $1.string );
                           SEGMENT_SIZE_CHECK( segment_procs, $1.string );
                           if (procs_interfaces == 0)
                                   yyerror("Interfaces section declared but no actual interface addresses defined\n");
                           strcpy(Config_procs[num_procs].name, $1.string);
+			  Config_procs[num_procs].node = node_add (Config_procs[num_procs].name,
+								   &Config_procs[num_procs]);
                           Config_procs[num_procs].id =
 			    name2ip(Config_procs[num_procs].name);
  		          Config_procs[num_procs].port = 0;
@@ -403,16 +473,19 @@
                           Config_procs[num_procs].num_if = procs_interfaces;
 			  Config.segments[segments].procs[segment_procs] = 
                                   &Config_procs[num_procs];
+			  create_intfs_current_net (&Config_procs[num_procs]);
 			  num_procs++;
 			  segment_procs++;
                           procs_interfaces = 0;
 			}
                 |       STRING IPADDR
-			{ 
+			{
                           PROCS_CHECK( num_procs, $1.string );
                           SEGMENT_CHECK( segments, $1.string );
                           SEGMENT_SIZE_CHECK( segment_procs, $1.string );
                           strcpy(Config_procs[num_procs].name, $1.string);
+			  Config_procs[num_procs].node = node_add (Config_procs[num_procs].name,
+								   &Config_procs[num_procs]);
                           Config_procs[num_procs].id = $2.ip.addr.s_addr;
  		          Config_procs[num_procs].port = $2.ip.port;
 			  Config_procs[num_procs].seg_index = segments;
@@ -423,6 +496,7 @@
                           Config_procs[num_procs].ifc[0].type = IFTYPE_ALL | IFTYPE_ANY;
 			  Config.segments[segments].procs[segment_procs] = 
                                   &Config_procs[num_procs];
+			  create_intfs_current_net (&Config_procs[num_procs]);
 			  num_procs++;
 			  segment_procs++;
                           procs_interfaces = 0;
@@ -433,6 +507,8 @@
                           SEGMENT_CHECK( segments, $1.string );
                           SEGMENT_SIZE_CHECK( segment_procs, $1.string );
                           strcpy(Config_procs[num_procs].name, $1.string);
+			  Config_procs[num_procs].node = node_add (Config_procs[num_procs].name,
+								   &Config_procs[num_procs]);
                           Config_procs[num_procs].id =
 			    name2ip(Config_procs[num_procs].name);
  		          Config_procs[num_procs].port = 0;
@@ -444,12 +520,89 @@
                           Config_procs[num_procs].ifc[0].type = IFTYPE_ALL | IFTYPE_ANY;
 			  Config.segments[segments].procs[segment_procs] = 
                                   &Config_procs[num_procs];
+			  create_intfs_current_net (&Config_procs[num_procs]);
 			  num_procs++;
 			  segment_procs++;
                           procs_interfaces = 0;
 			}
 		;
 
+NetDescs	:	Networks MpTypedIntfs
+		;
+
+Networks	:	Network Networks
+		|
+		;
+
+Network		:	IPADDR OPENBRACE MpIntfs CLOSEBRACE
+			{
+			  current_net->bcast.s_addr = htonl ($1.ip.addr.s_addr);
+			  net_insert (current_net);
+			  current_net = NULL;
+			  networks++;
+			}
+		;
+
+
+MpIntfs		:	MpIntf MpIntfs
+		|
+		;
+
+MpIntf		:	STRING IfTypeComp IPADDR
+			{
+			  struct mp_node *node;
+                          PROCS_CHECK( num_procs, $1.string );
+                          SEGMENT_CHECK( segments, $1.string );
+                          SEGMENT_SIZE_CHECK( segment_procs, $1.string );
+			  if (!(node = node_get ($1.string)))
+			  {
+                            strcpy(Config_procs[num_procs].name, $1.string);
+			    Config_procs[num_procs].node = node_add (Config_procs[num_procs].name,
+								     &Config_procs[num_procs]);
+                            Config_procs[num_procs].id = $3.ip.addr.s_addr;
+ 		            Config_procs[num_procs].port = $3.ip.port;
+			    Config_procs[num_procs].seg_index = segments;
+			    Config_procs[num_procs].index_in_seg = segment_procs;
+                            Config_procs[num_procs].num_if = 0;
+			    Config.segments[segments].procs[segment_procs] = 
+                                  &Config_procs[num_procs];
+			    node = Config_procs[num_procs].node;
+			    num_procs++;
+			    segment_procs++;
+			  }
+                          node->proc->ifc[node->proc->num_if].ip = $3.ip.addr.s_addr;
+                          node->proc->ifc[node->proc->num_if].port = $3.ip.port;
+			  if (!$2.mask)
+                          	  node->proc->ifc[node->proc->num_if].type = IFTYPE_ALL;
+			  else
+	                          node->proc->ifc[node->proc->num_if].type = $2.mask;
+			  attach_to_current_net (node,
+						 &node->proc->ifc[node->proc->num_if]);
+			  node->proc->num_if++;
+                          procs_interfaces = 0;
+			}
+		;
+
+MpTypedIntfs	:	MpTypedIntf MpTypedIntfs
+		|
+		;
+
+MpTypedIntf	:	STRING MpIfParms
+			{
+			  struct mp_node *node;
+			  int i;
+			  if (!(node = node_get ($1.string)))
+			    yyerror ("Unknown proc, cannot add interfaces\n");
+			  if (!current_proc)
+			    yyerror ("No interfaces for proc ??\n");
+			  for (i = 0; i < current_proc->num_if; i++)
+			    node->proc->ifc[node->proc->num_if + i] = current_proc->ifc[i];
+			  node->proc->num_if += current_proc->num_if;
+			  free (current_proc);
+			  current_proc = NULL;
+			}
+		;
+
 IfType  	:	IMONITOR { $$ = $1; }
 		|	ICLIENT { $$ = $1; }
 		|	IDAEMON { $$ = $1; }
@@ -467,7 +620,7 @@
 		;
 
 Interfaceparam	:	IfTypeComp IPADDR
-			{ 
+			{ /* Is $1.string really what you think it is ?? */
                           PROCS_CHECK( num_procs, $1.string );
                           SEGMENT_CHECK( segments, $1.string );
                           SEGMENT_SIZE_CHECK( segment_procs, $1.string );
@@ -479,6 +632,28 @@
                           else 
                                   Config_procs[num_procs].ifc[procs_interfaces].type = $1.mask;
                           procs_interfaces++;
+			}
+		;
+
+MpIfParms	:	MpIfParm MpIfParms
+		|
+		;
+
+MpIfParm	:	IfTypeComp IPADDR
+			{
+			  if (!current_proc)
+			  {
+			    current_proc = (proc *) malloc (sizeof (proc));
+			    memset ((void *) current_proc, 0, sizeof (proc));
+			  }
+			  INTERFACE_NUM_CHECK(current_proc->num_if, "MpIfParm");
+			  current_proc->ifc[current_proc->num_if].ip = $2.ip.addr.s_addr;
+			  current_proc->ifc[current_proc->num_if].port = $2.ip.port;
+			  if ($1.mask == 0)
+				  current_proc->ifc[current_proc->num_if].type = IFTYPE_ALL;
+			  else
+				  current_proc->ifc[current_proc->num_if].type = $1.mask;
+			  current_proc->num_if++;
 			}
 		;
 
Index: configuration.c
===================================================================
RCS file: /storage/cvsroot/spread/daemon/configuration.c,v
retrieving revision 1.1.1.1
diff -u -r1.1.1.1 configuration.c
--- configuration.c	21 Aug 2001 14:28:21 -0000	1.1.1.1
+++ configuration.c	23 Oct 2001 10:13:09 -0000
@@ -479,13 +479,15 @@
 	int 	s,p,ret;
 	char	ip[16];
 	proc	pr;
+	struct mp_net *net;
 
 	Alarm( PRINT, "--------------------\n" );
 	Alarm( PRINT, "Configuration at %s is:\n", My.name );
 	Alarm( PRINT, "Num Segments %d\n",config->num_segments );
 	for ( s=0; s < config->num_segments; s++ )
 	{
-		Conf_id_to_str( config->segments[s].bcast_address, ip );
+		net = (struct mp_net *) config->segments[s].net_head->data;
+		Conf_id_to_str( ntohl (net->bcast.s_addr), ip );
 		Alarm( PRINT, "\t%d\t%-16s  %hd\n",
 			config->segments[s].num_procs, ip,
 			config->segments[s].port );
Index: configuration.h
===================================================================
RCS file: /storage/cvsroot/spread/daemon/configuration.h,v
retrieving revision 1.1.1.1
diff -u -r1.1.1.1 configuration.h
--- configuration.h	21 Aug 2001 14:28:21 -0000	1.1.1.1
+++ configuration.h	23 Oct 2001 10:13:09 -0000
@@ -36,6 +36,7 @@
 
 #include "arch.h"
 #include "spread_params.h"
+#include "multipath.h"
 
 /* For what spread services should listen on what interfaces */
 
@@ -61,15 +62,16 @@
 	int16	seg_index;
 	int16	index_in_seg;
 	int32u	id;
+        struct mp_node *node;
         int     num_if;
         struct if_info ifc[MAX_INTERFACES_PROC];
 } proc;
 
 typedef struct dummy_segment{
-	int32 	bcast_address;
 	int16	port;
 	int	num_procs;
 	proc    *procs[MAX_PROCS_SEGMENT];
+	struct list *net_head;
 } segment;
 
 typedef struct dummy_configuration{
Index: multipath.c
===================================================================
RCS file: multipath.c
diff -N multipath.c
--- /dev/null	1 Jan 1970 00:00:00 -0000
+++ multipath.c	23 Oct 2001 10:13:09 -0000
@@ -0,0 +1,204 @@
+#include <stdlib.h>
+#include <string.h>
+#include "arch.h"
+
+#ifndef ARCH_PC_WIN95
+
+#include        <sys/types.h>
+#include        <sys/socket.h>
+#include        <netinet/in.h>
+#include        <arpa/inet.h>
+#include        <sys/uio.h>
+/* for select */
+#include        <sys/time.h>
+#include        <unistd.h>
+
+#else   /* ARCH_PC_WIN95 */
+
+#include        <winsock.h>
+
+#endif  /* ARCH_PC_WIN95 */
+
+#include "multipath.h"
+#include "data_link.h"
+#include "configuration.h"
+
+static struct list *net_head = NULL;
+static struct list *node_head = NULL;
+
+static void list_add (struct list **head, struct list *el)
+{
+  el->prev = NULL;
+  el->next = *head;
+  if (*head)
+    (*head)->prev = el;
+  *head = el;
+}
+
+/* Warning : If node have multiple interface on a net, intf_get only
+   returns the first... */
+struct mp_intf *intf_get (struct mp_node *node, struct mp_net *net)
+{
+  struct list    *tmp;
+  struct mp_intf *intf;
+
+  for (tmp = net->intf_head; tmp; tmp = tmp->next)
+  {
+    intf = (struct mp_intf *) tmp->data;
+    if (intf->node == node)
+      return intf;
+  }
+
+  return NULL;
+}
+
+struct mp_net *net_get (char name[])
+{
+  struct list *tmp;
+  struct mp_net *net;
+
+  for (tmp = net_head; tmp; tmp = tmp->next)
+  {
+    net = (struct mp_net *) tmp->data;
+    if (!strcmp (name, net->net_name))
+      return net;
+  }
+
+  return NULL;
+}
+
+struct mp_node *node_get (char name[])
+{
+  struct list *tmp;
+  struct mp_node *node;
+
+  for (tmp = node_head; tmp; tmp = tmp->next)
+  {
+    node = (struct mp_node *) tmp->data;
+    if (!strcmp (name, node->node_name))
+      return node;
+  }
+
+  return NULL;
+}
+
+struct mp_intf *intf_add (struct mp_node *node,
+			  struct mp_net *net,
+			  struct if_info *ifc)
+{
+  struct mp_intf *intf;
+
+  intf = intf_get (node, net);
+
+  if (!intf)
+  {
+    intf = (struct mp_intf *) malloc (sizeof (struct mp_intf));
+    memset ((void *) intf, 0, sizeof (struct mp_intf));
+    intf->node = node;
+    intf->net = net;
+    intf->intf_this_net.data = intf->intf_this_node.data = (void *) intf;
+    list_add (&node->intf_head, &intf->intf_this_node);
+    list_add (&net->intf_head, &intf->intf_this_net);
+  }
+
+  if (ifc)
+  {
+    intf->ifc = ifc;
+    intf->addr.s_addr = htonl (ifc->ip);
+  }
+
+  return intf;
+}
+
+void net_insert (struct mp_net *net)
+{
+  net->net_this_segment.data = net->global_list.data = (void *) net;
+  list_add (&net_head, &net->global_list);
+  list_add (&net->segment->net_head, &net->net_this_segment);
+}
+
+struct mp_net *net_add (char name[],
+			struct in_addr bcast,
+			struct in_addr netmask,
+			struct dummy_segment *seg)
+{
+  struct mp_net *net;
+
+  net = net_get (name);
+
+  if (!net)
+  {
+    net = (struct mp_net *) malloc (sizeof (struct mp_net));
+    memset ((void *) net, 0, sizeof (struct mp_net));
+    strcpy (net->net_name, name);
+    net->segment = seg;
+    net_insert (net);
+  }
+
+  if (bcast.s_addr)
+  {
+    net->bcast = bcast;
+    net->netmask = netmask;
+  }
+
+  return net;
+}
+
+struct mp_node *node_add (char name[], struct dummy_proc *proc)
+{
+  struct mp_node *node;
+
+  node = node_get (name);
+
+  if (!node)
+  {
+    node = (struct mp_node *) malloc (sizeof (struct mp_node));
+    memset ((void *) node, 0, sizeof (struct mp_node));
+    strcpy (node->node_name, name);
+    node->proc = proc;
+    node->global_list.data = (void *) node;
+    list_add (&node_head, &node->global_list);
+  }
+
+  return node;
+}
+
+int mp_send (channel chan,
+	     struct mp_node *mynode,
+	     struct mp_node *target,
+	     unsigned short target_port,
+	     sys_scatter *scat)
+{
+  struct mp_net *net;
+  struct mp_intf *intf;
+  struct list *tmp;
+  int ret = -1, r = 0;
+
+  /* For each of my own interfaces, find the corresponding network. If
+     the target node have an interface on this network, send message
+     to this interface. If no target has been specified, just
+     broadcast once per network */
+  
+  for (tmp = mynode->intf_head; tmp; tmp = tmp->next)
+  {
+    intf = (struct mp_intf *) tmp->data;
+    net = intf->net;
+
+    if (target)
+    {
+      if ((intf = intf_get (target, net)))
+      {
+	r = DL_send (chan, ntohl (intf->addr.s_addr), target_port, scat);
+      }
+    }
+    else
+    {
+      r = DL_send (chan, ntohl (net->bcast.s_addr), target_port, scat);
+    }
+
+    ret = MAX (ret, r);
+  }
+
+  return ret;
+}
+
Index: multipath.h
===================================================================
RCS file: multipath.h
diff -N multipath.h
--- /dev/null	1 Jan 1970 00:00:00 -0000
+++ multipath.h	23 Oct 2001 10:13:09 -0000
@@ -0,0 +1,86 @@
+#ifndef _MULTIPATH_H
+#define _MULTIPATH_H
+
+#include "arch.h"
+#include "scatter.h"
+
+#ifndef	ARCH_PC_WIN95
+
+#include        <sys/types.h>
+#include        <sys/socket.h>
+#include        <netinet/in.h>
+
+#else	/* ARCH_PC_WIN95 */
+
+#include	<winsock.h>
+
+#endif	/* ARCH_PC_WIN95 */
+
+struct mp_net;
+struct mp_node;
+struct dummy_segment;
+struct dummy_proc;
+struct if_info;
+
+struct list
+{
+  struct list    *prev;
+  struct list    *next;
+  void           *data;
+};
+
+struct mp_intf
+{
+  struct in_addr  addr;		/* Unicast address for directed send */
+  struct if_info *ifc;
+  struct mp_net  *net;
+  struct mp_node *node;
+  struct list     intf_this_net;
+  struct list     intf_this_node;
+};
+
+struct mp_net
+{
+  char            net_name[256];
+  struct in_addr  bcast;
+  struct in_addr  netmask;
+  unsigned int    state;
+  struct dummy_segment *segment;
+  struct list    *intf_head;
+  struct list     net_this_segment;
+  struct list     global_list;
+};
+
+struct mp_node
+{
+  char            node_name[256];
+  unsigned short  port;		/* Port for directed send */
+  struct dummy_proc *proc;
+  struct list    *intf_head;
+  struct list     global_list;
+};
+
+struct mp_intf *intf_get (struct mp_node *node, struct mp_net *net);
+struct mp_net *net_get (char name[]);
+struct mp_node *node_get (char name[]);
+struct mp_intf *intf_add (struct mp_node *node,
+			  struct mp_net *net,
+			  struct if_info *ifc);
+void net_insert (struct mp_net *net);
+struct mp_net *net_add (char name[],
+			struct in_addr bcast,
+			struct in_addr netmask,
+			struct dummy_segment *seg);
+struct mp_node *node_add (char name[], struct dummy_proc *);
+struct mp_node *node_get (char name[]);
+
+int mp_send (channel chan,
+	     struct mp_node *mynode,
+	     struct mp_node *target,
+	     unsigned short target_port,
+	     sys_scatter *scat);
+
+#undef MAX
+#define MAX(a, b)  (((a) > (b)) ? (a) : (b))
+
+#endif
Index: net_types.h
===================================================================
RCS file: /storage/cvsroot/spread/daemon/net_types.h,v
retrieving revision 1.2
diff -u -r1.2 net_types.h
--- net_types.h	31 Aug 2001 03:06:49 -0000	1.2
+++ net_types.h	23 Oct 2001 10:13:09 -0000
@@ -59,13 +59,13 @@
 #define		FORM2_TYPE		0x00002000
 #define		FORM_TYPE		0x00003000
 
-#define		ARQ_TYPE		0x000f0000
-#define	        RETRANS_TYPE		0x00f00000
+#define		ARQ_TYPE		0x00ff0000
+#define	        RETRANS_TYPE		0x0f000000
 
-#define		STATUS_TYPE		0x01000000
-#define		PARTITION_TYPE		0x02000000
-#define		FC_TYPE			0x04000000
-#define		CONTROL_TYPE		0x0f000000
+#define		STATUS_TYPE		0x10000000
+#define		PARTITION_TYPE		0x20000000
+#define		FC_TYPE			0x40000000
+#define		CONTROL_TYPE		0xf0000000
 
 
 #define		Is_unreliable( type )	( type &  UNRELIABLE_TYPE )
@@ -92,8 +92,8 @@
 
 #define		Get_arq( type )		( (type &  ARQ_TYPE) >> 16)
 #define		Set_arq( type, val )	( (type & ~ARQ_TYPE) | ((val << 16)&ARQ_TYPE) )
-#define		Get_retrans( type )	( (type &  RETRANS_TYPE) >> 20)
-#define		Set_retrans( type, val) ( (type & ~RETRANS_TYPE) | ((val << 20)&RETRANS_TYPE) )
+#define		Get_retrans( type )	( (type &  RETRANS_TYPE) >> 24)
+#define		Set_retrans( type, val) ( (type & ~RETRANS_TYPE) | ((val << 24)&RETRANS_TYPE) )
 
 #define		Is_status( type )	( type &  STATUS_TYPE     )
 #define		Is_partition( type )	( type &  PARTITION_TYPE  )
Index: network.c
===================================================================
RCS file: /storage/cvsroot/spread/daemon/network.c,v
retrieving revision 1.5
diff -u -r1.5 network.c
--- network.c	17 Oct 2001 14:01:07 -0000	1.5
+++ network.c	23 Oct 2001 10:13:10 -0000
@@ -50,11 +50,10 @@
 static  int             Num_token_channels;
 
 static	int		Bcast_needed;
-static	int32		Bcast_address;
 static	int16		Bcast_port;
 
 static	int		Num_send_needed;
-static  int32		Send_address[MAX_SEGMENTS];
+static  struct mp_node *Send_address[MAX_SEGMENTS];
 static	int16		Send_ports[MAX_SEGMENTS];
 
 /* ### Pack: 3 lines */
@@ -64,7 +63,7 @@
 static  const char      align_padding[4] = "padd";
 
 /* address for token sending - which is always needed */
-static	int32		Token_address;
+static	struct mp_node *Token_address;
 static	int16		Token_port;
 
 static	configuration	Net_membership;
@@ -87,8 +86,9 @@
 void	Net_init()
 {
 	proc		dummy_proc;
-        int32u          interface_addr;
-        int             i;
+	int32u		interface_addr, Bcast_address;
+	struct mp_intf *intf;
+	struct list    *tmp;
 
 	Cn = Conf();
 	My = Conf_my();
@@ -100,14 +100,12 @@
 	{
 		/* I am not allone in segment */
 		Bcast_needed  = 1;
-		Bcast_address = Cn.segments[My.seg_index].bcast_address;
 		Bcast_port    = My.port;
 
-		Alarm( NETWORK, "Net_init: Bcast needed to address (%d, %d)\n",
-			Bcast_address, Bcast_port );
+		Alarm( NETWORK, "Net_init: Bcast needed to port %d\n",
+			Bcast_port );
 	}else{
 		Bcast_needed  = 0;
-		Bcast_address = 0;
 		Alarm( NETWORK, "Net_init: Bcast is not needed\n" );
 	}
 
@@ -116,21 +114,29 @@
          * address on the interface as well as the unicast interface. That is 
          * what the double bind of the bcast_address does.
          */
-        for ( i=0; i < My.num_if; i++)
-        {
-                if (Is_IfType_Daemon( My.ifc[i].type ) || Is_IfType_Any( My.ifc[i].type ) )
-                {
-                        if (Is_IfType_Any( My.ifc[i].type ) )
-                                interface_addr = 0;
-                        else {
-                                interface_addr = My.ifc[i].ip;
-                                Bcast_channel[Num_bcast_channels++] = DL_init_channel( RECV_CHANNEL, My.port, Bcast_address, Bcast_address );
-                        }
-                        Bcast_channel[Num_bcast_channels++] = DL_init_channel( RECV_CHANNEL, My.port, Bcast_address, interface_addr );
-                        Token_channel[Num_token_channels++] = DL_init_channel( RECV_CHANNEL, My.port+1, 0, interface_addr );
-                }
-        }
+	for (tmp = My.node->intf_head; tmp; tmp = tmp->next)
+	{
+	  /* We know for sure that this interface is ANY or DAEMON */
+	  intf = (struct mp_intf *) tmp->data;
+
+	  if (Is_IfType_Any (intf->ifc->type))
+	  {
+	    interface_addr = 0;
+	    Bcast_address = 0;
+	  }
+	  else
+	  {
+	    interface_addr = intf->ifc->ip;
+	    Bcast_address = ntohl (intf->net->bcast.s_addr);
+#ifndef ARCH_PC_WIN95
+	    Bcast_channel[Num_bcast_channels++] = DL_init_channel( RECV_CHANNEL, My.port, Bcast_address, Bcast_address );
+#endif
+	  }
 
+	  Bcast_channel[Num_bcast_channels++] = DL_init_channel( RECV_CHANNEL, My.port, Bcast_address, interface_addr );
+	  Token_channel[Num_token_channels++] = DL_init_channel( RECV_CHANNEL, My.port+1, 0, interface_addr );
+	}
+	
 	Send_channel  = DL_init_channel( SEND_CHANNEL, My.port+2, 0, My.id );
 
 	Num_send_needed = 0;
@@ -169,7 +175,7 @@
 
 	    } else if( Net_membership.segments[i].num_procs > 0 ) {
 
-		Send_address[Num_send_needed] = Net_membership.segments[i].procs[0]->id;
+		Send_address[Num_send_needed] = Net_membership.segments[i].procs[0]->node;
 		Send_ports  [Num_send_needed] = Net_membership.segments[i].port;
 
 		Num_send_needed++;
@@ -182,10 +188,10 @@
 			i, Send_address[i], Send_ports[i] );
 
 	/* Calculate where to send the token */
-	Token_address = 0;
+	Token_address = NULL;
 	if( my_index_in_seg < My_seg.num_procs-1 )
 	{
-		Token_address = My_seg.procs[my_index_in_seg+1]->id;
+		Token_address = My_seg.procs[my_index_in_seg+1]->node;
 		Token_port    = My_seg.port+1;
 	}else{
 		/* I am last in my segment */
@@ -195,7 +201,7 @@
 			 * My segment is the only segment
 			 * sending token to the first in my segment 
 			 */
-			Token_address = My_seg.procs[0]->id;
+			Token_address = My_seg.procs[0]->node;
 			Token_port    = My_seg.port+1;
 		} else if( Num_send_needed == my_next_index ) {
 			/* 
@@ -214,8 +220,8 @@
 		}
 
 	}
-	Alarm( NETWORK, "Net_set_membership: Token_address : (%d, %d)\n",
-			Token_address, Token_port );
+	Alarm( NETWORK, "Net_set_membership: Token_address : (%s, %d)\n",
+			Token_address->node_name, Token_port );
 }
 
 int	Net_bcast( sys_scatter *scat )
@@ -232,14 +238,14 @@
 	pack_ptr->transmiter_id = My.id;
 	for ( i=0; i< Num_send_needed; i++ )
 	{
-	    ret = DL_send( Send_channel, Send_address[i], Send_ports[i], scat );
+	    ret = mp_send( Send_channel, My.node, Send_address[i], Send_ports[i], scat );
 	}
 	pack_ptr->type = Clear_routed( pack_ptr->type );
 
 	/* broadcasting if needed according to configuration */
 	if( Bcast_needed )
 	{	
-	    ret = DL_send( Send_channel, Bcast_address, Bcast_port, scat );
+	    ret = mp_send( Send_channel, My.node, NULL, Bcast_port, scat );
 	}
 	return( ret );
 }
@@ -332,7 +338,7 @@
 
         for ( i=0; i< Num_send_needed; i++ )
         {
-                ret = DL_send( Send_channel, Send_address[i], Send_ports[i], &Queue_scat );
+                ret = mp_send( Send_channel, My.node, Send_address[i], Send_ports[i], &Queue_scat );
         }
         pack_ptr = (packet_header *)Queue_scat.elements[0].buf; 
 	pack_ptr->type = Clear_routed( pack_ptr->type );
@@ -340,7 +346,7 @@
         /* broadcasting if needed according to configuration */
         if( Bcast_needed )
         {	
-                ret = DL_send( Send_channel, Bcast_address, Bcast_port, &Queue_scat );
+                ret = mp_send( Send_channel, My.node, NULL, Bcast_port, &Queue_scat );
         }
         Queue_scat.num_elements = 0;
         Queued_bytes = 0;
@@ -360,14 +366,14 @@
 	{
 	    if( Bcast_needed )
 	    {
-	    	ret = DL_send( Send_channel, Bcast_address, Bcast_port, scat );
+	    	ret = mp_send( Send_channel, My.node, NULL, Bcast_port, scat );
 	    }
 	}else{
 	    if( Net_membership.segments[seg_index].num_procs > 0 )
 	    {
 		pack_ptr->type = Set_routed( pack_ptr->type );
-	    	ret = DL_send( Send_channel, 
-			Net_membership.segments[seg_index].procs[0]->id,
+	    	ret = mp_send( Send_channel, My.node,
+			Net_membership.segments[seg_index].procs[0]->node,
 			Net_membership.segments[seg_index].port,
 			scat );
 		pack_ptr->type = Clear_routed( pack_ptr->type );
@@ -391,7 +397,7 @@
 		Alarm( PRINT, "Net_ucast: non existing proc_id %d\n",proc_id );
 		return( ret );
 	}
-	ret = DL_send( Send_channel, proc_id, p.port, scat );
+	ret = mp_send( Send_channel, My.node, p.node, p.port, scat );
 	return( ret );
 }
 
@@ -418,7 +424,6 @@
         if (ch_found == FALSE) {
             Alarm(EXIT, "Net_recv: Listening and received packet on un-used interface %d\n", fd);
         }
-
 	received_bytes = DL_recv( fd, scat ); 
 
 	if( received_bytes <= 0 ) return( received_bytes );
@@ -496,7 +501,7 @@
 
 		/* fliping to original form */
 		if( !Same_endian( pack_ptr->type ) ) Flip_pack( pack_ptr );
-		DL_send( Send_channel, Bcast_address, Bcast_port, scat );
+		mp_send( Send_channel, My.node, NULL, Bcast_port, scat );
 		/* re-fliping to my form */
 		if( !Same_endian( pack_ptr->type ) ) Flip_pack( pack_ptr );
 
@@ -580,7 +585,7 @@
 	token_ptr = (token_header *)scat->elements[0].buf;
 	token_ptr->type = Set_endian( token_ptr->type );
 	token_ptr->transmiter_id = My.id;
-	ret = DL_send( Send_channel, Token_address, Token_port, scat );
+	ret = mp_send( Send_channel, My.node, Token_address, Token_port, scat );
 	return ( ret );
 }
 
Index: protocol.c
===================================================================
RCS file: /storage/cvsroot/spread/daemon/protocol.c,v
retrieving revision 1.3
diff -u -r1.3 protocol.c
--- protocol.c	16 Oct 2001 15:58:42 -0000	1.3
+++ protocol.c	23 Oct 2001 10:13:12 -0000
@@ -469,7 +469,7 @@
 		    return; 
 		}
 	}else{
-		if( Get_arq(Token->type) == Get_arq(Last_token->type) )
+		if( Get_arq(Token->type) != ((Get_arq(Last_token->type)+1)%0x100) )
 		{
 		    if( Get_retrans(Token->type) > Get_retrans(Last_token->type) )
 		    {
@@ -578,7 +578,7 @@
 	if( Conf_leader( Memb_active_ptr() ) == My.id )
 	{
 	    	val = Get_arq( Token->type );
-		val = (val + 1)% 0x10;
+		val = (val + 1)% 0x100;
 		Token->type = Set_arq(     Token->type, val );
 		Token->type = Set_retrans( Token->type, 0   );
 	}
Index: session.c
===================================================================
RCS file: /storage/cvsroot/spread/daemon/session.c,v
retrieving revision 1.2
diff -u -r1.2 session.c
--- session.c	22 Aug 2001 17:41:13 -0000	1.2
+++ session.c	23 Oct 2001 10:13:14 -0000
@@ -134,7 +134,7 @@
 {
 	struct	sockaddr_in	inet_addr;
 	int16			port;
-	int			ret, i;
+	int			ret, i, on = 1;
 	mailbox			mbox;
 
 #ifndef ARCH_PC_WIN95
@@ -190,6 +190,9 @@
                                 inet_addr.sin_addr.s_addr = INADDR_ANY;
                         else
                                 inet_addr.sin_addr.s_addr = htonl(My.ifc[i].ip);
+			if (setsockopt (mbox, SOL_SOCKET, SO_REUSEADDR, &on, sizeof (on)))
+				Alarm (EXIT, "Sess_init: SO_REUSEADDR failed\n");
+			
                         if( bind( mbox,  (struct sockaddr *)&inet_addr, sizeof(inet_addr) ) == -1) 
                         {
                                 Alarm( PRINT, "Sess_init: INET unable to bind to port %d, already running \n" ,port );

-- 
And don't forget you'll never get a dog to walk upright
Just 'cause you've got the power, that don't mean you've got the right.





More information about the Spread-users mailing list