[Spread-users] Multipath Spread #3
Marc Zyngier
Marc.Zyngier at evidian.com
Tue Oct 23 11:53:39 EDT 2001
Hello all,
This is the third release of the multipath patch for Spread (made
against CVS as of Oct 23rd). The purpose of patch is to send Spread
traffic on multiple networks, to support network failures in a high
avaibility environment.
As of this version, Spread traffic (both broadcasts and token
circulation) is replicated on all networks. There is no code to
handle primary/backup links yet, although we already introduced some
state information in this patch.
What's new :
* Nothing.
This release is just a synchronisation point with the current CVS
version. If you don't have CVS access, your best bet is certainly to
use the previous release (or even better, to ask for CVS access...).
Some API changes that were seen as unnecessary have been reverted to
their original state (data_link). This patch still includes the
(so-called) ARQ patch, which seems to efficiently narrow a race
condition on our test setup.
As for the previous versions, we would be very happy to get any
feedback from both Spread creators and users.
Thanks,
Marc Zyngier
Evidian - SafeKit Project
http://www.evidian.com
<std disclaimer>
Copyright (C) 2001 Evidian.
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR
ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
</std disclaimer>
Index: LINUX_makefile
===================================================================
RCS file: /storage/cvsroot/spread/daemon/LINUX_makefile,v
retrieving revision 1.2
diff -u -r1.2 LINUX_makefile
--- LINUX_makefile 22 Aug 2001 21:33:39 -0000 1.2
+++ LINUX_makefile 23 Oct 2001 10:13:06 -0000
@@ -15,7 +15,7 @@
CFLAGS = -O2 -ansi -c -D_GNU_SOURCE -Wall
TCFLAGS = -O2 -c -D_GNU_SOURCE -Wall # ansi flag cannot be used because of an error when compiling sp.to
-COBJS = lex.yy.o y.tab.o configuration.o skiplist.o acm.o
+COBJS = lex.yy.o y.tab.o configuration.o skiplist.o acm.o multipath.o
OBJS = alarm.o events.o memory.o membership.o data_link.o network.o status.o log.o flow_control.o message.o acp-permit.o auth-null.o auth-ip.o
# add to spread build line if enabling password
AUTHLIBS = -lcrypt
Index: SOLARIS2.5_makefile
===================================================================
RCS file: /storage/cvsroot/spread/daemon/SOLARIS2.5_makefile,v
retrieving revision 1.1
diff -u -r1.1 SOLARIS2.5_makefile
--- SOLARIS2.5_makefile 1 Sep 2001 13:41:19 -0000 1.1
+++ SOLARIS2.5_makefile 23 Oct 2001 10:13:06 -0000
@@ -11,7 +11,7 @@
CFLAGS = -O2 -ansi -Wall -DNEED_SOCKLEN_T -c
TCFLAGS = -O2 -Wall -DNEED_SOCKLEN_T -c # ansi flag cannot be used because of an error when compiling sp.to
-COBJS = lex.yy.o y.tab.o configuration.o skiplist.o acm.o
+COBJS = lex.yy.o y.tab.o configuration.o skiplist.o acm.o multipath.o
OBJS = alarm.o events.o memory.o membership.o data_link.o network.o status.o log.o flow_control.o message.o acp-permit.o auth-null.o auth-ip.o
all: spread spmonitor spuser sptuser simple_user spflooder
Index: SOLARIS8_makefile
===================================================================
RCS file: /storage/cvsroot/spread/daemon/SOLARIS8_makefile,v
retrieving revision 1.1
diff -u -r1.1 SOLARIS8_makefile
--- SOLARIS8_makefile 1 Sep 2001 13:41:19 -0000 1.1
+++ SOLARIS8_makefile 23 Oct 2001 10:13:07 -0000
@@ -11,7 +11,7 @@
CFLAGS = -O2 -ansi -Wall -c
TCFLAGS = -O2 -Wall -c # ansi flag cannot be used because of an error when compiling sp.to
-COBJS = lex.yy.o y.tab.o configuration.o skiplist.o acm.o
+COBJS = lex.yy.o y.tab.o configuration.o skiplist.o acm.o multipath.o
OBJS = alarm.o events.o memory.o membership.o data_link.o network.o status.o log.o flow_control.o message.o acp-permit.o auth-null.o auth-ip.o
all: spread spmonitor spuser sptuser simple_user spflooder
Index: SOLARIS_makefile
===================================================================
RCS file: /storage/cvsroot/spread/daemon/SOLARIS_makefile,v
retrieving revision 1.1.1.1
diff -u -r1.1.1.1 SOLARIS_makefile
--- SOLARIS_makefile 21 Aug 2001 14:28:21 -0000 1.1.1.1
+++ SOLARIS_makefile 23 Oct 2001 10:13:07 -0000
@@ -10,7 +10,7 @@
CFLAGS = -O2 -ansi -Wall -c
TCFLAGS = -O2 -Wall -c # ansi flag cannot be used because of an error when compiling sp.to
-COBJS = lex.yy.o y.tab.o configuration.o skiplist.o acm.o
+COBJS = lex.yy.o y.tab.o configuration.o skiplist.o acm.o multipath.o
OBJS = alarm.o events.o memory.o membership.o data_link.o network.o status.o log.o flow_control.o message.o acp-permit.o auth-null.o auth-ip.o
all: spread spmonitor spuser sptuser simple_user spflooder
Index: config_gram.l
===================================================================
RCS file: /storage/cvsroot/spread/daemon/config_gram.l,v
retrieving revision 1.1.1.1
diff -u -r1.1.1.1 config_gram.l
--- config_gram.l 21 Aug 2001 14:28:21 -0000 1.1.1.1
+++ config_gram.l 23 Oct 2001 10:13:07 -0000
@@ -61,6 +61,7 @@
no [Nn][Oo]
ipaddr [0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}
ipport [0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}:[0-9]{1,5}
+mpport [Mm][Pp]:[0-9]{1,5}
%option noyywrap
%%
#.* {} /* Comments */
@@ -151,6 +152,9 @@
yylval.ip.port = 0;
return IPADDR;
}
+{mpport} { yylval.ip.port = (unsigned short) atoi (yytext + 3);
+ return MPPORT;
+ }
([0-9]{1,3}[ \t]*)+[ \t]* {
int fcost, i, done;
char *c;
Index: config_parse.y
===================================================================
RCS file: /storage/cvsroot/spread/daemon/config_parse.y,v
retrieving revision 1.1.1.1
diff -u -r1.1.1.1 config_parse.y
--- config_parse.y 21 Aug 2001 14:28:21 -0000 1.1.1.1
+++ config_parse.y 23 Oct 2001 10:13:08 -0000
@@ -51,6 +51,7 @@
#endif /* ARCH_PC_WIN95 */
#include "alarm.h"
+#include "multipath.h"
#include "configuration.h"
#include "skiplist.h"
#include "memory.h"
@@ -67,22 +68,60 @@
static int num_procs = 0;
static int segment_procs = 0;
static int segments = 0;
+ static int networks = 0;
static int rvec_num = 0;
static int procs_interfaces = 0;
static proc *new_proc;
+ static proc *current_proc = NULL;
static int32 temp_seg_list[MAX_PROCS_SEGMENT];
+ static struct mp_net *current_net = NULL;
static int authentication_configured = 0;
+static void create_intfs_current_net (proc *p)
+{
+ int i;
+
+ if (!current_net)
+ {
+ current_net = (struct mp_net *) malloc (sizeof (struct mp_net));
+ memset ((void *) current_net, 0, sizeof (struct mp_net));
+ sprintf (current_net->net_name, "network_%d", networks);
+ current_net->segment = &Config.segments[segments];
+ }
+
+ for (i = 0; i < p->num_if; i++)
+ if (Is_IfType_Daemon(p->ifc[i].type) || Is_IfType_Any(p->ifc[i].type))
+ intf_add (p->node, current_net, &p->ifc[i]);
+}
+
+static void attach_to_current_net (struct mp_node *n, struct if_info *ifc)
+{
+ if (!current_net)
+ {
+ current_net = (struct mp_net *) malloc (sizeof (struct mp_net));
+ memset ((void *) current_net, 0, sizeof (struct mp_net));
+ sprintf (current_net->net_name, "network_%d", networks);
+ current_net->segment = &Config.segments[segments];
+ }
+
+ intf_add (n, current_net, ifc);
+}
+
static char *segment2str(int seg) {
- static char ipstr[40];
- int id = Config.segments[seg].bcast_address;
- sprintf(ipstr, "%d.%d.%d.%d:%d",
- (id & 0xff000000)>>24,
- (id & 0xff0000)>>16,
- (id & 0xff00)>>8,
- (id & 0xff),
- Config.segments[seg].port);
+ static char ipstr[256];
+ struct mp_net *net;
+ struct list *tmp;
+ int i = 0;
+
+ for (tmp = Config.segments[seg].net_head; tmp; tmp = tmp->next)
+ {
+ net = (struct mp_net *) tmp->data;
+ i += sprintf(ipstr + i, "%s:%d/",
+ inet_ntoa (net->bcast),
+ Config.segments[seg].port);
+ }
+ ipstr[i-1] = '\0';
return ipstr;
}
static void alarm_print_proc(proc *p, int port) {
@@ -191,6 +230,7 @@
%token SP_BOOL LINKPROTOCOL PHOP PTCPHOP
%token IMONITOR ICLIENT IDAEMON
%token ROUTEMATRIX LINKCOST
+%token MPPORT
%%
Config : ConfigStructs
{
@@ -342,8 +382,33 @@
SEGMENT_CHECK( segments, inet_ntoa($2.ip.addr) );
Config.segments[segments].num_procs = segment_procs;
Config.segments[segments].port = $2.ip.port;
- Config.segments[segments].bcast_address =
- $2.ip.addr.s_addr;
+ current_net->bcast.s_addr = htonl ($2.ip.addr.s_addr);
+ net_insert (current_net);
+ current_net = NULL;
+ if(Config.segments[segments].port == 0)
+ Config.segments[segments].port = DEFAULT_SPREAD_PORT;
+ Alarm(CONF, "Successfully configured Segment %d [%s] with %d procs:\n",
+ segments,
+ segment2str(segments),
+ segment_procs);
+ for(i=(num_procs - segment_procs); i<num_procs; i++) {
+ /* This '1' is to keep each proc with the same port as the segment.*/
+ if( 1 || Config_procs[i].port==0) {
+ Config_procs[i].port=
+ Config.segments[segments].port;
+ }
+ alarm_print_proc(&Config_procs[i],
+ Config.segments[segments].port);
+ }
+ segments++;
+ networks++;
+ segment_procs = 0;
+ }
+ | SEGMENT MPPORT OPENBRACE NetDescs CLOSEBRACE
+ {
+ int i;
+ Config.segments[segments].num_procs = segment_procs;
+ Config.segments[segments].port = $2.ip.port;
if(Config.segments[segments].port == 0)
Config.segments[segments].port = DEFAULT_SPREAD_PORT;
Alarm(CONF, "Successfully configured Segment %d [%s] with %d procs:\n",
@@ -369,13 +434,15 @@
;
Segmentparam : STRING IPADDR OPENBRACE Interfaceparams CLOSEBRACE
- {
- PROCS_CHECK( num_procs, $1.string );
+ {
+ PROCS_CHECK( num_procs, $1.string );
SEGMENT_CHECK( segments, $1.string );
SEGMENT_SIZE_CHECK( segment_procs, $1.string );
if (procs_interfaces == 0)
yyerror("Interfaces section declared but no actual interface addresses defined\n");
strcpy(Config_procs[num_procs].name, $1.string);
+ Config_procs[num_procs].node = node_add (Config_procs[num_procs].name,
+ &Config_procs[num_procs]);
Config_procs[num_procs].id = $2.ip.addr.s_addr;
Config_procs[num_procs].port = $2.ip.port;
Config_procs[num_procs].seg_index = segments;
@@ -383,18 +450,21 @@
Config_procs[num_procs].num_if = procs_interfaces;
Config.segments[segments].procs[segment_procs] =
&Config_procs[num_procs];
+ create_intfs_current_net (&Config_procs[num_procs]);
num_procs++;
segment_procs++;
procs_interfaces = 0;
}
| STRING OPENBRACE Interfaceparams CLOSEBRACE
- {
+ {
PROCS_CHECK( num_procs, $1.string );
SEGMENT_CHECK( segments, $1.string );
SEGMENT_SIZE_CHECK( segment_procs, $1.string );
if (procs_interfaces == 0)
yyerror("Interfaces section declared but no actual interface addresses defined\n");
strcpy(Config_procs[num_procs].name, $1.string);
+ Config_procs[num_procs].node = node_add (Config_procs[num_procs].name,
+ &Config_procs[num_procs]);
Config_procs[num_procs].id =
name2ip(Config_procs[num_procs].name);
Config_procs[num_procs].port = 0;
@@ -403,16 +473,19 @@
Config_procs[num_procs].num_if = procs_interfaces;
Config.segments[segments].procs[segment_procs] =
&Config_procs[num_procs];
+ create_intfs_current_net (&Config_procs[num_procs]);
num_procs++;
segment_procs++;
procs_interfaces = 0;
}
| STRING IPADDR
- {
+ {
PROCS_CHECK( num_procs, $1.string );
SEGMENT_CHECK( segments, $1.string );
SEGMENT_SIZE_CHECK( segment_procs, $1.string );
strcpy(Config_procs[num_procs].name, $1.string);
+ Config_procs[num_procs].node = node_add (Config_procs[num_procs].name,
+ &Config_procs[num_procs]);
Config_procs[num_procs].id = $2.ip.addr.s_addr;
Config_procs[num_procs].port = $2.ip.port;
Config_procs[num_procs].seg_index = segments;
@@ -423,6 +496,7 @@
Config_procs[num_procs].ifc[0].type = IFTYPE_ALL | IFTYPE_ANY;
Config.segments[segments].procs[segment_procs] =
&Config_procs[num_procs];
+ create_intfs_current_net (&Config_procs[num_procs]);
num_procs++;
segment_procs++;
procs_interfaces = 0;
@@ -433,6 +507,8 @@
SEGMENT_CHECK( segments, $1.string );
SEGMENT_SIZE_CHECK( segment_procs, $1.string );
strcpy(Config_procs[num_procs].name, $1.string);
+ Config_procs[num_procs].node = node_add (Config_procs[num_procs].name,
+ &Config_procs[num_procs]);
Config_procs[num_procs].id =
name2ip(Config_procs[num_procs].name);
Config_procs[num_procs].port = 0;
@@ -444,12 +520,89 @@
Config_procs[num_procs].ifc[0].type = IFTYPE_ALL | IFTYPE_ANY;
Config.segments[segments].procs[segment_procs] =
&Config_procs[num_procs];
+ create_intfs_current_net (&Config_procs[num_procs]);
num_procs++;
segment_procs++;
procs_interfaces = 0;
}
;
+NetDescs : Networks MpTypedIntfs
+ ;
+
+Networks : Network Networks
+ |
+ ;
+
+Network : IPADDR OPENBRACE MpIntfs CLOSEBRACE
+ {
+ current_net->bcast.s_addr = htonl ($1.ip.addr.s_addr);
+ net_insert (current_net);
+ current_net = NULL;
+ networks++;
+ }
+ ;
+
+
+MpIntfs : MpIntf MpIntfs
+ |
+ ;
+
+MpIntf : STRING IfTypeComp IPADDR
+ {
+ struct mp_node *node;
+ PROCS_CHECK( num_procs, $1.string );
+ SEGMENT_CHECK( segments, $1.string );
+ SEGMENT_SIZE_CHECK( segment_procs, $1.string );
+ if (!(node = node_get ($1.string)))
+ {
+ strcpy(Config_procs[num_procs].name, $1.string);
+ Config_procs[num_procs].node = node_add (Config_procs[num_procs].name,
+ &Config_procs[num_procs]);
+ Config_procs[num_procs].id = $3.ip.addr.s_addr;
+ Config_procs[num_procs].port = $3.ip.port;
+ Config_procs[num_procs].seg_index = segments;
+ Config_procs[num_procs].index_in_seg = segment_procs;
+ Config_procs[num_procs].num_if = 0;
+ Config.segments[segments].procs[segment_procs] =
+ &Config_procs[num_procs];
+ node = Config_procs[num_procs].node;
+ num_procs++;
+ segment_procs++;
+ }
+ node->proc->ifc[node->proc->num_if].ip = $3.ip.addr.s_addr;
+ node->proc->ifc[node->proc->num_if].port = $3.ip.port;
+ if (!$2.mask)
+ node->proc->ifc[node->proc->num_if].type = IFTYPE_ALL;
+ else
+ node->proc->ifc[node->proc->num_if].type = $2.mask;
+ attach_to_current_net (node,
+ &node->proc->ifc[node->proc->num_if]);
+ node->proc->num_if++;
+ procs_interfaces = 0;
+ }
+ ;
+
+MpTypedIntfs : MpTypedIntf MpTypedIntfs
+ |
+ ;
+
+MpTypedIntf : STRING MpIfParms
+ {
+ struct mp_node *node;
+ int i;
+ if (!(node = node_get ($1.string)))
+ yyerror ("Unknown proc, cannot add interfaces\n");
+ if (!current_proc)
+ yyerror ("No interfaces for proc ??\n");
+ for (i = 0; i < current_proc->num_if; i++)
+ node->proc->ifc[node->proc->num_if + i] = current_proc->ifc[i];
+ node->proc->num_if += current_proc->num_if;
+ free (current_proc);
+ current_proc = NULL;
+ }
+ ;
+
IfType : IMONITOR { $$ = $1; }
| ICLIENT { $$ = $1; }
| IDAEMON { $$ = $1; }
@@ -467,7 +620,7 @@
;
Interfaceparam : IfTypeComp IPADDR
- {
+ { /* Is $1.string really what you think it is ?? */
PROCS_CHECK( num_procs, $1.string );
SEGMENT_CHECK( segments, $1.string );
SEGMENT_SIZE_CHECK( segment_procs, $1.string );
@@ -479,6 +632,28 @@
else
Config_procs[num_procs].ifc[procs_interfaces].type = $1.mask;
procs_interfaces++;
+ }
+ ;
+
+MpIfParms : MpIfParm MpIfParms
+ |
+ ;
+
+MpIfParm : IfTypeComp IPADDR
+ {
+ if (!current_proc)
+ {
+ current_proc = (proc *) malloc (sizeof (proc));
+ memset ((void *) current_proc, 0, sizeof (proc));
+ }
+ INTERFACE_NUM_CHECK(current_proc->num_if, "MpIfParm");
+ current_proc->ifc[current_proc->num_if].ip = $2.ip.addr.s_addr;
+ current_proc->ifc[current_proc->num_if].port = $2.ip.port;
+ if ($1.mask == 0)
+ current_proc->ifc[current_proc->num_if].type = IFTYPE_ALL;
+ else
+ current_proc->ifc[current_proc->num_if].type = $1.mask;
+ current_proc->num_if++;
}
;
Index: configuration.c
===================================================================
RCS file: /storage/cvsroot/spread/daemon/configuration.c,v
retrieving revision 1.1.1.1
diff -u -r1.1.1.1 configuration.c
--- configuration.c 21 Aug 2001 14:28:21 -0000 1.1.1.1
+++ configuration.c 23 Oct 2001 10:13:09 -0000
@@ -479,13 +479,15 @@
int s,p,ret;
char ip[16];
proc pr;
+ struct mp_net *net;
Alarm( PRINT, "--------------------\n" );
Alarm( PRINT, "Configuration at %s is:\n", My.name );
Alarm( PRINT, "Num Segments %d\n",config->num_segments );
for ( s=0; s < config->num_segments; s++ )
{
- Conf_id_to_str( config->segments[s].bcast_address, ip );
+ net = (struct mp_net *) config->segments[s].net_head->data;
+ Conf_id_to_str( ntohl (net->bcast.s_addr), ip );
Alarm( PRINT, "\t%d\t%-16s %hd\n",
config->segments[s].num_procs, ip,
config->segments[s].port );
Index: configuration.h
===================================================================
RCS file: /storage/cvsroot/spread/daemon/configuration.h,v
retrieving revision 1.1.1.1
diff -u -r1.1.1.1 configuration.h
--- configuration.h 21 Aug 2001 14:28:21 -0000 1.1.1.1
+++ configuration.h 23 Oct 2001 10:13:09 -0000
@@ -36,6 +36,7 @@
#include "arch.h"
#include "spread_params.h"
+#include "multipath.h"
/* For what spread services should listen on what interfaces */
@@ -61,15 +62,16 @@
int16 seg_index;
int16 index_in_seg;
int32u id;
+ struct mp_node *node;
int num_if;
struct if_info ifc[MAX_INTERFACES_PROC];
} proc;
typedef struct dummy_segment{
- int32 bcast_address;
int16 port;
int num_procs;
proc *procs[MAX_PROCS_SEGMENT];
+ struct list *net_head;
} segment;
typedef struct dummy_configuration{
Index: multipath.c
===================================================================
RCS file: multipath.c
diff -N multipath.c
--- /dev/null 1 Jan 1970 00:00:00 -0000
+++ multipath.c 23 Oct 2001 10:13:09 -0000
@@ -0,0 +1,204 @@
+#include <stdlib.h>
+#include <string.h>
+#include "arch.h"
+
+#ifndef ARCH_PC_WIN95
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <sys/uio.h>
+/* for select */
+#include <sys/time.h>
+#include <unistd.h>
+
+#else /* ARCH_PC_WIN95 */
+
+#include <winsock.h>
+
+#endif /* ARCH_PC_WIN95 */
+
+#include "multipath.h"
+#include "data_link.h"
+#include "configuration.h"
+
+static struct list *net_head = NULL;
+static struct list *node_head = NULL;
+
+static void list_add (struct list **head, struct list *el)
+{
+ el->prev = NULL;
+ el->next = *head;
+ if (*head)
+ (*head)->prev = el;
+ *head = el;
+}
+
+/* Warning : If node have multiple interface on a net, intf_get only
+ returns the first... */
+struct mp_intf *intf_get (struct mp_node *node, struct mp_net *net)
+{
+ struct list *tmp;
+ struct mp_intf *intf;
+
+ for (tmp = net->intf_head; tmp; tmp = tmp->next)
+ {
+ intf = (struct mp_intf *) tmp->data;
+ if (intf->node == node)
+ return intf;
+ }
+
+ return NULL;
+}
+
+struct mp_net *net_get (char name[])
+{
+ struct list *tmp;
+ struct mp_net *net;
+
+ for (tmp = net_head; tmp; tmp = tmp->next)
+ {
+ net = (struct mp_net *) tmp->data;
+ if (!strcmp (name, net->net_name))
+ return net;
+ }
+
+ return NULL;
+}
+
+struct mp_node *node_get (char name[])
+{
+ struct list *tmp;
+ struct mp_node *node;
+
+ for (tmp = node_head; tmp; tmp = tmp->next)
+ {
+ node = (struct mp_node *) tmp->data;
+ if (!strcmp (name, node->node_name))
+ return node;
+ }
+
+ return NULL;
+}
+
+struct mp_intf *intf_add (struct mp_node *node,
+ struct mp_net *net,
+ struct if_info *ifc)
+{
+ struct mp_intf *intf;
+
+ intf = intf_get (node, net);
+
+ if (!intf)
+ {
+ intf = (struct mp_intf *) malloc (sizeof (struct mp_intf));
+ memset ((void *) intf, 0, sizeof (struct mp_intf));
+ intf->node = node;
+ intf->net = net;
+ intf->intf_this_net.data = intf->intf_this_node.data = (void *) intf;
+ list_add (&node->intf_head, &intf->intf_this_node);
+ list_add (&net->intf_head, &intf->intf_this_net);
+ }
+
+ if (ifc)
+ {
+ intf->ifc = ifc;
+ intf->addr.s_addr = htonl (ifc->ip);
+ }
+
+ return intf;
+}
+
+void net_insert (struct mp_net *net)
+{
+ net->net_this_segment.data = net->global_list.data = (void *) net;
+ list_add (&net_head, &net->global_list);
+ list_add (&net->segment->net_head, &net->net_this_segment);
+}
+
+struct mp_net *net_add (char name[],
+ struct in_addr bcast,
+ struct in_addr netmask,
+ struct dummy_segment *seg)
+{
+ struct mp_net *net;
+
+ net = net_get (name);
+
+ if (!net)
+ {
+ net = (struct mp_net *) malloc (sizeof (struct mp_net));
+ memset ((void *) net, 0, sizeof (struct mp_net));
+ strcpy (net->net_name, name);
+ net->segment = seg;
+ net_insert (net);
+ }
+
+ if (bcast.s_addr)
+ {
+ net->bcast = bcast;
+ net->netmask = netmask;
+ }
+
+ return net;
+}
+
+struct mp_node *node_add (char name[], struct dummy_proc *proc)
+{
+ struct mp_node *node;
+
+ node = node_get (name);
+
+ if (!node)
+ {
+ node = (struct mp_node *) malloc (sizeof (struct mp_node));
+ memset ((void *) node, 0, sizeof (struct mp_node));
+ strcpy (node->node_name, name);
+ node->proc = proc;
+ node->global_list.data = (void *) node;
+ list_add (&node_head, &node->global_list);
+ }
+
+ return node;
+}
+
+int mp_send (channel chan,
+ struct mp_node *mynode,
+ struct mp_node *target,
+ unsigned short target_port,
+ sys_scatter *scat)
+{
+ struct mp_net *net;
+ struct mp_intf *intf;
+ struct list *tmp;
+ int ret = -1, r = 0;
+
+ /* For each of my own interfaces, find the corresponding network. If
+ the target node have an interface on this network, send message
+ to this interface. If no target has been specified, just
+ broadcast once per network */
+
+ for (tmp = mynode->intf_head; tmp; tmp = tmp->next)
+ {
+ intf = (struct mp_intf *) tmp->data;
+ net = intf->net;
+
+ if (target)
+ {
+ if ((intf = intf_get (target, net)))
+ {
+ r = DL_send (chan, ntohl (intf->addr.s_addr), target_port, scat);
+ }
+ }
+ else
+ {
+ r = DL_send (chan, ntohl (net->bcast.s_addr), target_port, scat);
+ }
+
+ ret = MAX (ret, r);
+ }
+
+ return ret;
+}
+
Index: multipath.h
===================================================================
RCS file: multipath.h
diff -N multipath.h
--- /dev/null 1 Jan 1970 00:00:00 -0000
+++ multipath.h 23 Oct 2001 10:13:09 -0000
@@ -0,0 +1,86 @@
+#ifndef _MULTIPATH_H
+#define _MULTIPATH_H
+
+#include "arch.h"
+#include "scatter.h"
+
+#ifndef ARCH_PC_WIN95
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+
+#else /* ARCH_PC_WIN95 */
+
+#include <winsock.h>
+
+#endif /* ARCH_PC_WIN95 */
+
+struct mp_net;
+struct mp_node;
+struct dummy_segment;
+struct dummy_proc;
+struct if_info;
+
+struct list
+{
+ struct list *prev;
+ struct list *next;
+ void *data;
+};
+
+struct mp_intf
+{
+ struct in_addr addr; /* Unicast address for directed send */
+ struct if_info *ifc;
+ struct mp_net *net;
+ struct mp_node *node;
+ struct list intf_this_net;
+ struct list intf_this_node;
+};
+
+struct mp_net
+{
+ char net_name[256];
+ struct in_addr bcast;
+ struct in_addr netmask;
+ unsigned int state;
+ struct dummy_segment *segment;
+ struct list *intf_head;
+ struct list net_this_segment;
+ struct list global_list;
+};
+
+struct mp_node
+{
+ char node_name[256];
+ unsigned short port; /* Port for directed send */
+ struct dummy_proc *proc;
+ struct list *intf_head;
+ struct list global_list;
+};
+
+struct mp_intf *intf_get (struct mp_node *node, struct mp_net *net);
+struct mp_net *net_get (char name[]);
+struct mp_node *node_get (char name[]);
+struct mp_intf *intf_add (struct mp_node *node,
+ struct mp_net *net,
+ struct if_info *ifc);
+void net_insert (struct mp_net *net);
+struct mp_net *net_add (char name[],
+ struct in_addr bcast,
+ struct in_addr netmask,
+ struct dummy_segment *seg);
+struct mp_node *node_add (char name[], struct dummy_proc *);
+struct mp_node *node_get (char name[]);
+
+int mp_send (channel chan,
+ struct mp_node *mynode,
+ struct mp_node *target,
+ unsigned short target_port,
+ sys_scatter *scat);
+
+#undef MAX
+#define MAX(a, b) (((a) > (b)) ? (a) : (b))
+
+#endif
Index: net_types.h
===================================================================
RCS file: /storage/cvsroot/spread/daemon/net_types.h,v
retrieving revision 1.2
diff -u -r1.2 net_types.h
--- net_types.h 31 Aug 2001 03:06:49 -0000 1.2
+++ net_types.h 23 Oct 2001 10:13:09 -0000
@@ -59,13 +59,13 @@
#define FORM2_TYPE 0x00002000
#define FORM_TYPE 0x00003000
-#define ARQ_TYPE 0x000f0000
-#define RETRANS_TYPE 0x00f00000
+#define ARQ_TYPE 0x00ff0000
+#define RETRANS_TYPE 0x0f000000
-#define STATUS_TYPE 0x01000000
-#define PARTITION_TYPE 0x02000000
-#define FC_TYPE 0x04000000
-#define CONTROL_TYPE 0x0f000000
+#define STATUS_TYPE 0x10000000
+#define PARTITION_TYPE 0x20000000
+#define FC_TYPE 0x40000000
+#define CONTROL_TYPE 0xf0000000
#define Is_unreliable( type ) ( type & UNRELIABLE_TYPE )
@@ -92,8 +92,8 @@
#define Get_arq( type ) ( (type & ARQ_TYPE) >> 16)
#define Set_arq( type, val ) ( (type & ~ARQ_TYPE) | ((val << 16)&ARQ_TYPE) )
-#define Get_retrans( type ) ( (type & RETRANS_TYPE) >> 20)
-#define Set_retrans( type, val) ( (type & ~RETRANS_TYPE) | ((val << 20)&RETRANS_TYPE) )
+#define Get_retrans( type ) ( (type & RETRANS_TYPE) >> 24)
+#define Set_retrans( type, val) ( (type & ~RETRANS_TYPE) | ((val << 24)&RETRANS_TYPE) )
#define Is_status( type ) ( type & STATUS_TYPE )
#define Is_partition( type ) ( type & PARTITION_TYPE )
Index: network.c
===================================================================
RCS file: /storage/cvsroot/spread/daemon/network.c,v
retrieving revision 1.5
diff -u -r1.5 network.c
--- network.c 17 Oct 2001 14:01:07 -0000 1.5
+++ network.c 23 Oct 2001 10:13:10 -0000
@@ -50,11 +50,10 @@
static int Num_token_channels;
static int Bcast_needed;
-static int32 Bcast_address;
static int16 Bcast_port;
static int Num_send_needed;
-static int32 Send_address[MAX_SEGMENTS];
+static struct mp_node *Send_address[MAX_SEGMENTS];
static int16 Send_ports[MAX_SEGMENTS];
/* ### Pack: 3 lines */
@@ -64,7 +63,7 @@
static const char align_padding[4] = "padd";
/* address for token sending - which is always needed */
-static int32 Token_address;
+static struct mp_node *Token_address;
static int16 Token_port;
static configuration Net_membership;
@@ -87,8 +86,9 @@
void Net_init()
{
proc dummy_proc;
- int32u interface_addr;
- int i;
+ int32u interface_addr, Bcast_address;
+ struct mp_intf *intf;
+ struct list *tmp;
Cn = Conf();
My = Conf_my();
@@ -100,14 +100,12 @@
{
/* I am not allone in segment */
Bcast_needed = 1;
- Bcast_address = Cn.segments[My.seg_index].bcast_address;
Bcast_port = My.port;
- Alarm( NETWORK, "Net_init: Bcast needed to address (%d, %d)\n",
- Bcast_address, Bcast_port );
+ Alarm( NETWORK, "Net_init: Bcast needed to port %d\n",
+ Bcast_port );
}else{
Bcast_needed = 0;
- Bcast_address = 0;
Alarm( NETWORK, "Net_init: Bcast is not needed\n" );
}
@@ -116,21 +114,29 @@
* address on the interface as well as the unicast interface. That is
* what the double bind of the bcast_address does.
*/
- for ( i=0; i < My.num_if; i++)
- {
- if (Is_IfType_Daemon( My.ifc[i].type ) || Is_IfType_Any( My.ifc[i].type ) )
- {
- if (Is_IfType_Any( My.ifc[i].type ) )
- interface_addr = 0;
- else {
- interface_addr = My.ifc[i].ip;
- Bcast_channel[Num_bcast_channels++] = DL_init_channel( RECV_CHANNEL, My.port, Bcast_address, Bcast_address );
- }
- Bcast_channel[Num_bcast_channels++] = DL_init_channel( RECV_CHANNEL, My.port, Bcast_address, interface_addr );
- Token_channel[Num_token_channels++] = DL_init_channel( RECV_CHANNEL, My.port+1, 0, interface_addr );
- }
- }
+ for (tmp = My.node->intf_head; tmp; tmp = tmp->next)
+ {
+ /* We know for sure that this interface is ANY or DAEMON */
+ intf = (struct mp_intf *) tmp->data;
+
+ if (Is_IfType_Any (intf->ifc->type))
+ {
+ interface_addr = 0;
+ Bcast_address = 0;
+ }
+ else
+ {
+ interface_addr = intf->ifc->ip;
+ Bcast_address = ntohl (intf->net->bcast.s_addr);
+#ifndef ARCH_PC_WIN95
+ Bcast_channel[Num_bcast_channels++] = DL_init_channel( RECV_CHANNEL, My.port, Bcast_address, Bcast_address );
+#endif
+ }
+ Bcast_channel[Num_bcast_channels++] = DL_init_channel( RECV_CHANNEL, My.port, Bcast_address, interface_addr );
+ Token_channel[Num_token_channels++] = DL_init_channel( RECV_CHANNEL, My.port+1, 0, interface_addr );
+ }
+
Send_channel = DL_init_channel( SEND_CHANNEL, My.port+2, 0, My.id );
Num_send_needed = 0;
@@ -169,7 +175,7 @@
} else if( Net_membership.segments[i].num_procs > 0 ) {
- Send_address[Num_send_needed] = Net_membership.segments[i].procs[0]->id;
+ Send_address[Num_send_needed] = Net_membership.segments[i].procs[0]->node;
Send_ports [Num_send_needed] = Net_membership.segments[i].port;
Num_send_needed++;
@@ -182,10 +188,10 @@
i, Send_address[i], Send_ports[i] );
/* Calculate where to send the token */
- Token_address = 0;
+ Token_address = NULL;
if( my_index_in_seg < My_seg.num_procs-1 )
{
- Token_address = My_seg.procs[my_index_in_seg+1]->id;
+ Token_address = My_seg.procs[my_index_in_seg+1]->node;
Token_port = My_seg.port+1;
}else{
/* I am last in my segment */
@@ -195,7 +201,7 @@
* My segment is the only segment
* sending token to the first in my segment
*/
- Token_address = My_seg.procs[0]->id;
+ Token_address = My_seg.procs[0]->node;
Token_port = My_seg.port+1;
} else if( Num_send_needed == my_next_index ) {
/*
@@ -214,8 +220,8 @@
}
}
- Alarm( NETWORK, "Net_set_membership: Token_address : (%d, %d)\n",
- Token_address, Token_port );
+ Alarm( NETWORK, "Net_set_membership: Token_address : (%s, %d)\n",
+ Token_address->node_name, Token_port );
}
int Net_bcast( sys_scatter *scat )
@@ -232,14 +238,14 @@
pack_ptr->transmiter_id = My.id;
for ( i=0; i< Num_send_needed; i++ )
{
- ret = DL_send( Send_channel, Send_address[i], Send_ports[i], scat );
+ ret = mp_send( Send_channel, My.node, Send_address[i], Send_ports[i], scat );
}
pack_ptr->type = Clear_routed( pack_ptr->type );
/* broadcasting if needed according to configuration */
if( Bcast_needed )
{
- ret = DL_send( Send_channel, Bcast_address, Bcast_port, scat );
+ ret = mp_send( Send_channel, My.node, NULL, Bcast_port, scat );
}
return( ret );
}
@@ -332,7 +338,7 @@
for ( i=0; i< Num_send_needed; i++ )
{
- ret = DL_send( Send_channel, Send_address[i], Send_ports[i], &Queue_scat );
+ ret = mp_send( Send_channel, My.node, Send_address[i], Send_ports[i], &Queue_scat );
}
pack_ptr = (packet_header *)Queue_scat.elements[0].buf;
pack_ptr->type = Clear_routed( pack_ptr->type );
@@ -340,7 +346,7 @@
/* broadcasting if needed according to configuration */
if( Bcast_needed )
{
- ret = DL_send( Send_channel, Bcast_address, Bcast_port, &Queue_scat );
+ ret = mp_send( Send_channel, My.node, NULL, Bcast_port, &Queue_scat );
}
Queue_scat.num_elements = 0;
Queued_bytes = 0;
@@ -360,14 +366,14 @@
{
if( Bcast_needed )
{
- ret = DL_send( Send_channel, Bcast_address, Bcast_port, scat );
+ ret = mp_send( Send_channel, My.node, NULL, Bcast_port, scat );
}
}else{
if( Net_membership.segments[seg_index].num_procs > 0 )
{
pack_ptr->type = Set_routed( pack_ptr->type );
- ret = DL_send( Send_channel,
- Net_membership.segments[seg_index].procs[0]->id,
+ ret = mp_send( Send_channel, My.node,
+ Net_membership.segments[seg_index].procs[0]->node,
Net_membership.segments[seg_index].port,
scat );
pack_ptr->type = Clear_routed( pack_ptr->type );
@@ -391,7 +397,7 @@
Alarm( PRINT, "Net_ucast: non existing proc_id %d\n",proc_id );
return( ret );
}
- ret = DL_send( Send_channel, proc_id, p.port, scat );
+ ret = mp_send( Send_channel, My.node, p.node, p.port, scat );
return( ret );
}
@@ -418,7 +424,6 @@
if (ch_found == FALSE) {
Alarm(EXIT, "Net_recv: Listening and received packet on un-used interface %d\n", fd);
}
-
received_bytes = DL_recv( fd, scat );
if( received_bytes <= 0 ) return( received_bytes );
@@ -496,7 +501,7 @@
/* fliping to original form */
if( !Same_endian( pack_ptr->type ) ) Flip_pack( pack_ptr );
- DL_send( Send_channel, Bcast_address, Bcast_port, scat );
+ mp_send( Send_channel, My.node, NULL, Bcast_port, scat );
/* re-fliping to my form */
if( !Same_endian( pack_ptr->type ) ) Flip_pack( pack_ptr );
@@ -580,7 +585,7 @@
token_ptr = (token_header *)scat->elements[0].buf;
token_ptr->type = Set_endian( token_ptr->type );
token_ptr->transmiter_id = My.id;
- ret = DL_send( Send_channel, Token_address, Token_port, scat );
+ ret = mp_send( Send_channel, My.node, Token_address, Token_port, scat );
return ( ret );
}
Index: protocol.c
===================================================================
RCS file: /storage/cvsroot/spread/daemon/protocol.c,v
retrieving revision 1.3
diff -u -r1.3 protocol.c
--- protocol.c 16 Oct 2001 15:58:42 -0000 1.3
+++ protocol.c 23 Oct 2001 10:13:12 -0000
@@ -469,7 +469,7 @@
return;
}
}else{
- if( Get_arq(Token->type) == Get_arq(Last_token->type) )
+ if( Get_arq(Token->type) != ((Get_arq(Last_token->type)+1)%0x100) )
{
if( Get_retrans(Token->type) > Get_retrans(Last_token->type) )
{
@@ -578,7 +578,7 @@
if( Conf_leader( Memb_active_ptr() ) == My.id )
{
val = Get_arq( Token->type );
- val = (val + 1)% 0x10;
+ val = (val + 1)% 0x100;
Token->type = Set_arq( Token->type, val );
Token->type = Set_retrans( Token->type, 0 );
}
Index: session.c
===================================================================
RCS file: /storage/cvsroot/spread/daemon/session.c,v
retrieving revision 1.2
diff -u -r1.2 session.c
--- session.c 22 Aug 2001 17:41:13 -0000 1.2
+++ session.c 23 Oct 2001 10:13:14 -0000
@@ -134,7 +134,7 @@
{
struct sockaddr_in inet_addr;
int16 port;
- int ret, i;
+ int ret, i, on = 1;
mailbox mbox;
#ifndef ARCH_PC_WIN95
@@ -190,6 +190,9 @@
inet_addr.sin_addr.s_addr = INADDR_ANY;
else
inet_addr.sin_addr.s_addr = htonl(My.ifc[i].ip);
+ if (setsockopt (mbox, SOL_SOCKET, SO_REUSEADDR, &on, sizeof (on)))
+ Alarm (EXIT, "Sess_init: SO_REUSEADDR failed\n");
+
if( bind( mbox, (struct sockaddr *)&inet_addr, sizeof(inet_addr) ) == -1)
{
Alarm( PRINT, "Sess_init: INET unable to bind to port %d, already running \n" ,port );
--
And don't forget you'll never get a dog to walk upright
Just 'cause you've got the power, that don't mean you've got the right.
More information about the Spread-users
mailing list