[Spread-users] Multipath Spread #2
Marc Zyngier
Marc.Zyngier at evidian.com
Tue Oct 9 06:16:39 EDT 2001
Hello all,
This is the second release of the multipath patch for Spread
3.16.1beta1, and the first actually doing something useful. The
purpose of patch is to send Spread traffic on multiple networks, to
support network failures in a high avaibility environment.
As of this version, Spread traffic (both broadcasts and token
circulation) is replicated on all networks. There is no code to
handle primary/backup links yet, although we already introduced some
state information in this patch.
What's new since last week :
* Broadcast and bound socket don't mix...
Among the new things introduced in 3.16.1beta1, the way broadcast
reception sockets are bound have changed. If a specific interface is
used for daemon communication, both broadcast and token sockets are
bound to this interface, which is quite nice. Unfortunatly, some OSs
don't deliver broadcast messages to an UDP socket bound to anything
but INADDR_ANY (W2K is ok, but neither Linux 2.4.x nor Solaris 8 are
delivering broadcasts). To work around this, we now have both an
INADDR_ANY bound socket to receive broadcasts, and a per-interface
socket.
* Avoiding late token...
One of the problems that bite us was Spread crashing under high load
with a message like :
Answer_retrans: retrans of 8253 requested while Aru is 8411
This happens when two networks are *very* out of sync, and an old
token carrying retransmission requests for messages we don't have
anymore comes in. Too bad. To reduce the window of opportunity for
this to happend, we did two things :
* Expand the ARQ field from 4 to 8 bits,
* Tighten the way tokens are accepted when proc is not leader.
With this modifications, our test machines, which would usually crash
before processing 10000 messages, survived a 32 million packets test
tonight.
Note that this race condition still exists. It is *very* unlikely to
happend now, but could happen under very specific conditions.
* Configuration simplification
As per Yair Amir suggestion, configuration is a little bit simpler
(saves some braces...). This is the same configuration that was in the
first patch :
Spread_Segment mp:4803
{
192.168.110.255
{
foo D 192.168.110.1
bar 192.168.110.2
fubar 192.168.110.3
}
192.168.111.255
{
foo 192.168.111.1
fubar 192.168.111.3
}
foo C 127.0.0.1
}
* No more DL_send messing...
Most of data_link.[ch] hacks have been removed, and multipath.[ch] now
acts as a layer between network.[ch] and data_link.
As for the first version, we would be very happy to get any feedback
from both Spread creators and users. As of today, only Yair Amir have
expressed some remarks, which have made their way into the code. We're
looking forward for more feedback on what we have done so far.
Thanks,
Marc Zyngier
Evidian - SafeKit Project
http://www.evidian.com
<std disclaimer>
Copyright (C) 2001 Evidian.
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR
ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
</std disclaimer>
Index: LINUX_makefile
===================================================================
RCS file: /CVSROOT/src/components/spread/LINUX_makefile,v
retrieving revision 1.1.1.2
retrieving revision 1.4
diff -u -r1.1.1.2 -r1.4
--- LINUX_makefile 2001/10/01 08:51:18 1.1.1.2
+++ LINUX_makefile 2001/10/03 13:53:12 1.4
@@ -15,7 +15,7 @@
CFLAGS = -O2 -ansi -c -D_GNU_SOURCE -Wall
TCFLAGS = -O2 -c -D_GNU_SOURCE -Wall # ansi flag cannot be used because of an error when compiling sp.to
-COBJS = lex.yy.o y.tab.o configuration.o skiplist.o acm.o
+COBJS = lex.yy.o y.tab.o configuration.o skiplist.o acm.o multipath.o
OBJS = alarm.o events.o memory.o membership.o data_link.o network.o status.o log.o flow_control.o message.o acp-permit.o auth-null.o auth-ip.o
# add to spread build line if enabling password
AUTHLIBS = -lcrypt
Index: SOLARIS2.5_makefile
===================================================================
RCS file: /CVSROOT/src/components/spread/SOLARIS2.5_makefile,v
retrieving revision 1.1.1.1
retrieving revision 1.2
diff -u -r1.1.1.1 -r1.2
--- SOLARIS2.5_makefile 2001/10/01 08:51:19 1.1.1.1
+++ SOLARIS2.5_makefile 2001/10/08 12:45:15 1.2
@@ -11,7 +11,7 @@
CFLAGS = -O2 -ansi -Wall -DNEED_SOCKLEN_T -c
TCFLAGS = -O2 -Wall -DNEED_SOCKLEN_T -c # ansi flag cannot be used because of an error when compiling sp.to
-COBJS = lex.yy.o y.tab.o configuration.o skiplist.o acm.o
+COBJS = lex.yy.o y.tab.o configuration.o skiplist.o acm.o multipath.o
OBJS = alarm.o events.o memory.o membership.o data_link.o network.o status.o log.o flow_control.o message.o acp-permit.o auth-null.o auth-ip.o
all: spread spmonitor spuser sptuser simple_user spflooder
Index: SOLARIS8_makefile
===================================================================
RCS file: /CVSROOT/src/components/spread/SOLARIS8_makefile,v
retrieving revision 1.1.1.1
retrieving revision 1.2
diff -u -r1.1.1.1 -r1.2
--- SOLARIS8_makefile 2001/10/01 08:51:19 1.1.1.1
+++ SOLARIS8_makefile 2001/10/08 12:45:15 1.2
@@ -11,7 +11,7 @@
CFLAGS = -O2 -ansi -Wall -c
TCFLAGS = -O2 -Wall -c # ansi flag cannot be used because of an error when compiling sp.to
-COBJS = lex.yy.o y.tab.o configuration.o skiplist.o acm.o
+COBJS = lex.yy.o y.tab.o configuration.o skiplist.o acm.o multipath.o
OBJS = alarm.o events.o memory.o membership.o data_link.o network.o status.o log.o flow_control.o message.o acp-permit.o auth-null.o auth-ip.o
all: spread spmonitor spuser sptuser simple_user spflooder
Index: SOLARIS_makefile
===================================================================
RCS file: /CVSROOT/src/components/spread/SOLARIS_makefile,v
retrieving revision 1.1.1.1
retrieving revision 1.2
diff -u -r1.1.1.1 -r1.2
--- SOLARIS_makefile 2001/09/24 15:26:53 1.1.1.1
+++ SOLARIS_makefile 2001/09/28 07:42:09 1.2
@@ -10,7 +10,7 @@
CFLAGS = -O2 -ansi -Wall -c
TCFLAGS = -O2 -Wall -c # ansi flag cannot be used because of an error when compiling sp.to
-COBJS = lex.yy.o y.tab.o configuration.o skiplist.o acm.o
+COBJS = lex.yy.o y.tab.o configuration.o skiplist.o acm.o multipath.o
OBJS = alarm.o events.o memory.o membership.o data_link.o network.o status.o log.o flow_control.o message.o acp-permit.o auth-null.o auth-ip.o
all: spread spmonitor spuser sptuser simple_user spflooder
Index: config_gram.l
===================================================================
RCS file: /CVSROOT/src/components/spread/config_gram.l,v
retrieving revision 1.1.1.1
retrieving revision 1.2
diff -u -r1.1.1.1 -r1.2
--- config_gram.l 2001/09/24 15:27:04 1.1.1.1
+++ config_gram.l 2001/09/27 10:01:52 1.2
@@ -61,6 +61,7 @@
no [Nn][Oo]
ipaddr [0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}
ipport [0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}:[0-9]{1,5}
+mpport [Mm][Pp]:[0-9]{1,5}
%option noyywrap
%%
#.* {} /* Comments */
@@ -151,6 +152,9 @@
yylval.ip.port = 0;
return IPADDR;
}
+{mpport} { yylval.ip.port = (unsigned short) atoi (yytext + 3);
+ return MPPORT;
+ }
([0-9]{1,3}[ \t]*)+[ \t]* {
int fcost, i, done;
char *c;
Index: config_parse.y
===================================================================
RCS file: /CVSROOT/src/components/spread/config_parse.y,v
retrieving revision 1.1.1.1
retrieving revision 1.8
diff -u -r1.1.1.1 -r1.8
--- config_parse.y 2001/09/24 15:27:05 1.1.1.1
+++ config_parse.y 2001/10/09 09:57:27 1.8
@@ -51,6 +51,7 @@
#endif /* ARCH_PC_WIN95 */
#include "alarm.h"
+#include "multipath.h"
#include "configuration.h"
#include "skiplist.h"
#include "memory.h"
@@ -67,22 +68,60 @@
static int num_procs = 0;
static int segment_procs = 0;
static int segments = 0;
+ static int networks = 0;
static int rvec_num = 0;
static int procs_interfaces = 0;
static proc *new_proc;
+ static proc *current_proc = NULL;
static int32 temp_seg_list[MAX_PROCS_SEGMENT];
+ static struct mp_net *current_net = NULL;
static int authentication_configured = 0;
+static void create_intfs_current_net (proc *p)
+{
+ int i;
+
+ if (!current_net)
+ {
+ current_net = (struct mp_net *) malloc (sizeof (struct mp_net));
+ memset ((void *) current_net, 0, sizeof (struct mp_net));
+ sprintf (current_net->net_name, "network_%d", networks);
+ current_net->segment = &Config.segments[segments];
+ }
+
+ for (i = 0; i < p->num_if; i++)
+ if (Is_IfType_Daemon(p->ifc[i].type) || Is_IfType_Any(p->ifc[i].type))
+ intf_add (p->node, current_net, &p->ifc[i]);
+}
+
+static void attach_to_current_net (struct mp_node *n, struct if_info *ifc)
+{
+ if (!current_net)
+ {
+ current_net = (struct mp_net *) malloc (sizeof (struct mp_net));
+ memset ((void *) current_net, 0, sizeof (struct mp_net));
+ sprintf (current_net->net_name, "network_%d", networks);
+ current_net->segment = &Config.segments[segments];
+ }
+
+ intf_add (n, current_net, ifc);
+}
+
static char *segment2str(int seg) {
- static char ipstr[40];
- int id = Config.segments[seg].bcast_address;
- sprintf(ipstr, "%d.%d.%d.%d:%d",
- (id & 0xff000000)>>24,
- (id & 0xff0000)>>16,
- (id & 0xff00)>>8,
- (id & 0xff),
- Config.segments[seg].port);
+ static char ipstr[256];
+ struct mp_net *net;
+ struct list *tmp;
+ int i = 0;
+
+ for (tmp = Config.segments[seg].net_head; tmp; tmp = tmp->next)
+ {
+ net = (struct mp_net *) tmp->data;
+ i += sprintf(ipstr + i, "%s:%d/",
+ inet_ntoa (net->bcast),
+ Config.segments[seg].port);
+ }
+ ipstr[i-1] = '\0';
return ipstr;
}
static void alarm_print_proc(proc *p, int port) {
@@ -191,6 +230,7 @@
%token SP_BOOL LINKPROTOCOL PHOP PTCPHOP
%token IMONITOR ICLIENT IDAEMON
%token ROUTEMATRIX LINKCOST
+%token MPPORT
%%
Config : ConfigStructs
{
@@ -342,8 +382,9 @@
SEGMENT_CHECK( segments, inet_ntoa($2.ip.addr) );
Config.segments[segments].num_procs = segment_procs;
Config.segments[segments].port = $2.ip.port;
- Config.segments[segments].bcast_address =
- $2.ip.addr.s_addr;
+ current_net->bcast.s_addr = htonl ($2.ip.addr.s_addr);
+ net_insert (current_net);
+ current_net = NULL;
if(Config.segments[segments].port == 0)
Config.segments[segments].port = DEFAULT_SPREAD_PORT;
Alarm(CONF, "Successfully configured Segment %d [%s] with %d procs:\n",
@@ -360,8 +401,32 @@
Config.segments[segments].port);
}
segments++;
+ networks++;
segment_procs = 0;
}
+ | SEGMENT MPPORT OPENBRACE NetDescs CLOSEBRACE
+ {
+ int i;
+ Config.segments[segments].num_procs = segment_procs;
+ Config.segments[segments].port = $2.ip.port;
+ if(Config.segments[segments].port == 0)
+ Config.segments[segments].port = DEFAULT_SPREAD_PORT;
+ Alarm(CONF, "Successfully configured Segment %d [%s] with %d procs:\n",
+ segments,
+ segment2str(segments),
+ segment_procs);
+ for(i=(num_procs - segment_procs); i<num_procs; i++) {
+ /* This '1' is to keep each proc with the same port as the segment.*/
+ if( 1 || Config_procs[i].port==0) {
+ Config_procs[i].port=
+ Config.segments[segments].port;
+ }
+ alarm_print_proc(&Config_procs[i],
+ Config.segments[segments].port);
+ }
+ segments++;
+ segment_procs = 0;
+ }
;
Segmentparams : Segmentparam Segmentparams
@@ -369,13 +434,15 @@
;
Segmentparam : STRING IPADDR OPENBRACE Interfaceparams CLOSEBRACE
- {
- PROCS_CHECK( num_procs, $1.string );
+ {
+ PROCS_CHECK( num_procs, $1.string );
SEGMENT_CHECK( segments, $1.string );
SEGMENT_SIZE_CHECK( segment_procs, $1.string );
if (procs_interfaces == 0)
yyerror("Interfaces section declared but no actual interface addresses defined\n");
strcpy(Config_procs[num_procs].name, $1.string);
+ Config_procs[num_procs].node = node_add (Config_procs[num_procs].name,
+ &Config_procs[num_procs]);
Config_procs[num_procs].id = $2.ip.addr.s_addr;
Config_procs[num_procs].port = $2.ip.port;
Config_procs[num_procs].seg_index = segments;
@@ -383,18 +450,21 @@
Config_procs[num_procs].num_if = procs_interfaces;
Config.segments[segments].procs[segment_procs] =
&Config_procs[num_procs];
+ create_intfs_current_net (&Config_procs[num_procs]);
num_procs++;
segment_procs++;
procs_interfaces = 0;
}
| STRING OPENBRACE Interfaceparams CLOSEBRACE
- {
+ {
PROCS_CHECK( num_procs, $1.string );
SEGMENT_CHECK( segments, $1.string );
SEGMENT_SIZE_CHECK( segment_procs, $1.string );
if (procs_interfaces == 0)
yyerror("Interfaces section declared but no actual interface addresses defined\n");
strcpy(Config_procs[num_procs].name, $1.string);
+ Config_procs[num_procs].node = node_add (Config_procs[num_procs].name,
+ &Config_procs[num_procs]);
Config_procs[num_procs].id =
name2ip(Config_procs[num_procs].name);
Config_procs[num_procs].port = 0;
@@ -403,16 +473,19 @@
Config_procs[num_procs].num_if = procs_interfaces;
Config.segments[segments].procs[segment_procs] =
&Config_procs[num_procs];
+ create_intfs_current_net (&Config_procs[num_procs]);
num_procs++;
segment_procs++;
procs_interfaces = 0;
}
| STRING IPADDR
- {
+ {
PROCS_CHECK( num_procs, $1.string );
SEGMENT_CHECK( segments, $1.string );
SEGMENT_SIZE_CHECK( segment_procs, $1.string );
strcpy(Config_procs[num_procs].name, $1.string);
+ Config_procs[num_procs].node = node_add (Config_procs[num_procs].name,
+ &Config_procs[num_procs]);
Config_procs[num_procs].id = $2.ip.addr.s_addr;
Config_procs[num_procs].port = $2.ip.port;
Config_procs[num_procs].seg_index = segments;
@@ -423,6 +496,7 @@
Config_procs[num_procs].ifc[0].type = IFTYPE_ALL | IFTYPE_ANY;
Config.segments[segments].procs[segment_procs] =
&Config_procs[num_procs];
+ create_intfs_current_net (&Config_procs[num_procs]);
num_procs++;
segment_procs++;
procs_interfaces = 0;
@@ -433,6 +507,8 @@
SEGMENT_CHECK( segments, $1.string );
SEGMENT_SIZE_CHECK( segment_procs, $1.string );
strcpy(Config_procs[num_procs].name, $1.string);
+ Config_procs[num_procs].node = node_add (Config_procs[num_procs].name,
+ &Config_procs[num_procs]);
Config_procs[num_procs].id =
name2ip(Config_procs[num_procs].name);
Config_procs[num_procs].port = 0;
@@ -444,12 +520,89 @@
Config_procs[num_procs].ifc[0].type = IFTYPE_ALL | IFTYPE_ANY;
Config.segments[segments].procs[segment_procs] =
&Config_procs[num_procs];
+ create_intfs_current_net (&Config_procs[num_procs]);
num_procs++;
segment_procs++;
procs_interfaces = 0;
}
;
+NetDescs : Networks MpTypedIntfs
+ ;
+
+Networks : Network Networks
+ |
+ ;
+
+Network : IPADDR OPENBRACE MpIntfs CLOSEBRACE
+ {
+ current_net->bcast.s_addr = htonl ($1.ip.addr.s_addr);
+ net_insert (current_net);
+ current_net = NULL;
+ networks++;
+ }
+ ;
+
+
+MpIntfs : MpIntf MpIntfs
+ |
+ ;
+
+MpIntf : STRING IfTypeComp IPADDR
+ {
+ struct mp_node *node;
+ PROCS_CHECK( num_procs, $1.string );
+ SEGMENT_CHECK( segments, $1.string );
+ SEGMENT_SIZE_CHECK( segment_procs, $1.string );
+ if (!(node = node_get ($1.string)))
+ {
+ strcpy(Config_procs[num_procs].name, $1.string);
+ Config_procs[num_procs].node = node_add (Config_procs[num_procs].name,
+ &Config_procs[num_procs]);
+ Config_procs[num_procs].id = $3.ip.addr.s_addr;
+ Config_procs[num_procs].port = $3.ip.port;
+ Config_procs[num_procs].seg_index = segments;
+ Config_procs[num_procs].index_in_seg = segment_procs;
+ Config_procs[num_procs].num_if = 0;
+ Config.segments[segments].procs[segment_procs] =
+ &Config_procs[num_procs];
+ node = Config_procs[num_procs].node;
+ num_procs++;
+ segment_procs++;
+ }
+ node->proc->ifc[node->proc->num_if].ip = $3.ip.addr.s_addr;
+ node->proc->ifc[node->proc->num_if].port = $3.ip.port;
+ if (!$2.mask)
+ node->proc->ifc[node->proc->num_if].type = IFTYPE_ALL;
+ else
+ node->proc->ifc[node->proc->num_if].type = $2.mask;
+ attach_to_current_net (node,
+ &node->proc->ifc[node->proc->num_if]);
+ node->proc->num_if++;
+ procs_interfaces = 0;
+ }
+ ;
+
+MpTypedIntfs : MpTypedIntf MpTypedIntfs
+ |
+ ;
+
+MpTypedIntf : STRING MpIfParms
+ {
+ struct mp_node *node;
+ int i;
+ if (!(node = node_get ($1.string)))
+ yyerror ("Unknown proc, cannot add interfaces\n");
+ if (!current_proc)
+ yyerror ("No interfaces for proc ??\n");
+ for (i = 0; i < current_proc->num_if; i++)
+ node->proc->ifc[node->proc->num_if + i] = current_proc->ifc[i];
+ node->proc->num_if += current_proc->num_if;
+ free (current_proc);
+ current_proc = NULL;
+ }
+ ;
+
IfType : IMONITOR { $$ = $1; }
| ICLIENT { $$ = $1; }
| IDAEMON { $$ = $1; }
@@ -467,7 +620,7 @@
;
Interfaceparam : IfTypeComp IPADDR
- {
+ { /* Is $1.string really what you think it is ?? */
PROCS_CHECK( num_procs, $1.string );
SEGMENT_CHECK( segments, $1.string );
SEGMENT_SIZE_CHECK( segment_procs, $1.string );
@@ -479,6 +632,28 @@
else
Config_procs[num_procs].ifc[procs_interfaces].type = $1.mask;
procs_interfaces++;
+ }
+ ;
+
+MpIfParms : MpIfParm MpIfParms
+ |
+ ;
+
+MpIfParm : IfTypeComp IPADDR
+ {
+ if (!current_proc)
+ {
+ current_proc = (proc *) malloc (sizeof (proc));
+ memset ((void *) current_proc, 0, sizeof (proc));
+ }
+ INTERFACE_NUM_CHECK(current_proc->num_if, "MpIfParm");
+ current_proc->ifc[current_proc->num_if].ip = $2.ip.addr.s_addr;
+ current_proc->ifc[current_proc->num_if].port = $2.ip.port;
+ if ($1.mask == 0)
+ current_proc->ifc[current_proc->num_if].type = IFTYPE_ALL;
+ else
+ current_proc->ifc[current_proc->num_if].type = $1.mask;
+ current_proc->num_if++;
}
;
Index: configuration.c
===================================================================
RCS file: /CVSROOT/src/components/spread/configuration.c,v
retrieving revision 1.1.1.1
retrieving revision 1.2
diff -u -r1.1.1.1 -r1.2
--- configuration.c 2001/09/24 15:26:54 1.1.1.1
+++ configuration.c 2001/09/24 17:03:06 1.2
@@ -479,13 +479,15 @@
int s,p,ret;
char ip[16];
proc pr;
+ struct mp_net *net;
Alarm( PRINT, "--------------------\n" );
Alarm( PRINT, "Configuration at %s is:\n", My.name );
Alarm( PRINT, "Num Segments %d\n",config->num_segments );
for ( s=0; s < config->num_segments; s++ )
{
- Conf_id_to_str( config->segments[s].bcast_address, ip );
+ net = (struct mp_net *) config->segments[s].net_head->data;
+ Conf_id_to_str( ntohl (net->bcast.s_addr), ip );
Alarm( PRINT, "\t%d\t%-16s %hd\n",
config->segments[s].num_procs, ip,
config->segments[s].port );
Index: configuration.h
===================================================================
RCS file: /CVSROOT/src/components/spread/configuration.h,v
retrieving revision 1.1.1.1
retrieving revision 1.2
diff -u -r1.1.1.1 -r1.2
--- configuration.h 2001/09/24 15:26:54 1.1.1.1
+++ configuration.h 2001/09/24 17:03:06 1.2
@@ -36,6 +36,7 @@
#include "arch.h"
#include "spread_params.h"
+#include "multipath.h"
/* For what spread services should listen on what interfaces */
@@ -61,15 +62,16 @@
int16 seg_index;
int16 index_in_seg;
int32u id;
+ struct mp_node *node;
int num_if;
struct if_info ifc[MAX_INTERFACES_PROC];
} proc;
typedef struct dummy_segment{
- int32 bcast_address;
int16 port;
int num_procs;
proc *procs[MAX_PROCS_SEGMENT];
+ struct list *net_head;
} segment;
typedef struct dummy_configuration{
Index: data_link.c
===================================================================
RCS file: /CVSROOT/src/components/spread/data_link.c,v
retrieving revision 1.1.1.1
retrieving revision 1.4
diff -u -r1.1.1.1 -r1.4
--- data_link.c 2001/09/24 15:26:54 1.1.1.1
+++ data_link.c 2001/10/08 12:45:15 1.4
@@ -59,12 +59,14 @@
#include "alarm.h"
#include "sp_events.h" /* for sp_time */
-channel DL_init_channel( int32 channel_type, int16 port, int32 mcast_address, int32 interface_address )
+channel DL_init_channel( int32 channel_type, unsigned short port,
+ struct in_addr mcast_address, struct in_addr interface_address )
{
channel chan;
struct sockaddr_in soc_addr;
int on=1;
int i1,i2,i3,i4;
+ int32 mcast_addr;
unsigned char ttl_val;
if((chan = socket(AF_INET, SOCK_DGRAM, 0)) == -1)
@@ -93,30 +95,33 @@
{
soc_addr.sin_family = AF_INET;
soc_addr.sin_port = htons(port);
- if (interface_address == 0)
- soc_addr.sin_addr.s_addr= INADDR_ANY;
- else
- soc_addr.sin_addr.s_addr= htonl(interface_address);
+ soc_addr.sin_addr = interface_address;
+ if (setsockopt(chan, SOL_SOCKET, SO_REUSEADDR, (char *)&on,
+ sizeof(on)) < 0)
+ Alarm( EXIT, "DL_init_channel: setsockopt error for port %d\n",port);
+
if(bind( chan, (struct sockaddr *) &soc_addr,
sizeof(soc_addr)) == -1)
{
- Alarm( PRINT, "DL_init_channel: bind error for port %d, already running \n",port);
+ Alarm( PRINT, "DL_init_channel: bind error for port %d, already running \n", port);
exit(0);
}
Alarm( DATA_LINK, "DL_init_channel: bind for recv_channel for port %d with chan %d ok\n",
port, chan);
- i1 = (mcast_address >> 24) & 0x000000ff;
- i2 = (mcast_address >> 16) & 0x000000ff;
- i3 = (mcast_address >> 8) & 0x000000ff;
- i4 = mcast_address & 0x000000ff;
+ mcast_addr = ntohl (mcast_address.s_addr);
+
+ i1 = (mcast_addr >> 24) & 0x000000ff;
+ i2 = (mcast_addr >> 16) & 0x000000ff;
+ i3 = (mcast_addr >> 8) & 0x000000ff;
+ i4 = mcast_addr & 0x000000ff;
if( i1 >=224 && i1 < 240 )
{
#ifndef ARCH_SPARC_SUNOS /* no support for IP multicast in old SunOS */
struct ip_mreq mreq;
- mreq.imr_multiaddr.s_addr = htonl( mcast_address );
+ mreq.imr_multiaddr = mcast_address;
/* the interface could be changed to a specific interface if needed */
mreq.imr_interface.s_addr = INADDR_ANY;
@@ -124,7 +129,7 @@
if (setsockopt(chan, IPPROTO_IP, IP_ADD_MEMBERSHIP, (void *)&mreq,
sizeof(mreq)) < 0)
{
- Alarm( EXIT, "DL_init_channel: problem in setsockopt to multicast address %d\n", mcast_address );
+ Alarm( EXIT, "DL_init_channel: problem in setsockopt to multicast address %d\n", mcast_addr );
}
Alarm( DATA_LINK, "DL_init_channel: Joining multicast address %d.%d.%d.%d went ok\n",i1,i2,i3,i4);
@@ -147,7 +152,9 @@
}
}
-int DL_send( channel chan, int32 address, int16 port, sys_scatter *scat )
+
+int DL_send ( channel chan, struct in_addr address,
+ unsigned short port, sys_scatter *scat )
{
#ifndef ARCH_SCATTER_NONE
@@ -168,7 +175,7 @@
assert(scat->num_elements <= ARCH_SCATTER_SIZE);
soc_addr.sin_family = AF_INET;
- soc_addr.sin_addr.s_addr= htonl(address);
+ soc_addr.sin_addr = address;
soc_addr.sin_port = htons(port);
#ifdef ARCH_PC_HOME
@@ -216,8 +223,8 @@
if(ret < 0) {
/* delay for a short while */
send_errormsg = strerror(errno);
- Alarm( DATA_LINK, "DL_send: delaying after failure in send to %d.%d.%d.%d, ret is %d\n",
- IP1(address), IP2(address), IP3(address), IP4(address), ret);
+ Alarm( DATA_LINK, "DL_send: delaying after failure in send to %s, ret is %d\n",
+ inet_ntoa (address), ret);
select( 0, 0, 0, 0, (struct timeval *)&select_delay );
}
}
@@ -226,14 +233,14 @@
for( i=0; i < scat->num_elements; i++)
Alarm( DATA_LINK, "DL_send: element[%d]: %d bytes\n",
i,scat->elements[i].len);
- Alarm( DATA_LINK, "DL_send: error: %s\n sending %d bytes on channel %d to address %d.%d.%d.%d\n",
- send_errormsg, total_len,chan,IP1(address), IP2(address), IP3(address), IP4(address) );
+ Alarm( DATA_LINK, "DL_send: error: %s\n sending %d bytes on channel %d to address %s\n",
+ send_errormsg, total_len,chan,inet_ntoa (address) );
}else if(ret < total_len){
Alarm( DATA_LINK, "DL_send: partial sending %d out of %d\n",
ret,total_len);
}
- Alarm( DATA_LINK, "DL_send: sent a message of %d bytes to (%d.%d.%d.%d,%d) on channel %d\n",
- ret,IP1(address), IP2(address),IP3(address), IP4(address),port,chan);
+ Alarm( DATA_LINK, "DL_send: sent a message of %d bytes to (%s) on channel %d\n",
+ ret,inet_ntoa (address),port,chan);
return(ret);
}
Index: data_link.h
===================================================================
RCS file: /CVSROOT/src/components/spread/data_link.h,v
retrieving revision 1.1.1.2
retrieving revision 1.4
diff -u -r1.1.1.2 -r1.4
--- data_link.h 2001/10/01 08:51:21 1.1.1.2
+++ data_link.h 2001/10/05 12:52:32 1.4
@@ -37,14 +37,28 @@
#include "arch.h"
#include "scatter.h"
+#ifndef ARCH_PC_WIN95
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+
+#else /* ARCH_PC_WIN95 */
+
+#include <winsock.h>
+
+#endif /* ARCH_PC_WIN95 */
+
#define MAX_PACKET_SIZE 1472 /*1472 = 1536-64 (of udp)*/
#define SEND_CHANNEL 0x00000001
#define RECV_CHANNEL 0x00000002
-channel DL_init_channel( int32 channel_type, int16 port, int32 mcast_address, int32 interface_address );
+channel DL_init_channel( int32 channel_type, unsigned short port,
+ struct in_addr mcast_address, struct in_addr interface_address );
void DL_close_channel(channel chan);
-int DL_send( channel chan, int32 address, int16 port, sys_scatter *scat );
+int DL_send ( channel chan, struct in_addr address,
+ unsigned short port, sys_scatter *scat );
int DL_recv( channel chan, sys_scatter *scat );
#endif /* INC_DATA_LINK */
Index: groups.c
===================================================================
RCS file: /CVSROOT/src/components/spread/groups.c,v
retrieving revision 1.1.1.2
retrieving revision 1.3
diff -u -r1.1.1.2 -r1.3
--- groups.c 2001/10/01 08:51:23 1.1.1.2
+++ groups.c 2001/10/03 13:53:12 1.3
@@ -683,7 +683,7 @@
if( grp == NULL )
{
new_grp = new( GROUP );
- memset( new_grp->name, MAX_GROUP_NAME, 0 );
+ memset( new_grp->name, 0, MAX_GROUP_NAME );
strcpy( new_grp->name, group_name );
sl_init( &new_grp->MembersList );
sl_set_compare( &new_grp->MembersList,
@@ -711,7 +711,7 @@
}
/* Add a new member as ESTABLISHED (might change later on depending on the situation */
new_mbr = new( MEMBER );
- memset( new_mbr->private_name, MAX_GROUP_NAME, 0 );
+ memset( new_mbr->private_name, 0, MAX_GROUP_NAME );
strcpy( new_mbr->private_name, private_group_name );
new_mbr->proc_id = new_p.id;
new_mbr->status = ESTABLISHED_MEMBER;
@@ -1469,7 +1469,7 @@
if( orig_grp == NULL )
{
new_grp = new( GROUP );
- memset( new_grp->name, MAX_GROUP_NAME, 0 );
+ memset( new_grp->name, 0, MAX_GROUP_NAME );
strcpy( new_grp->name, this_group->name );
new_grp->grp_id = this_group->grp_id;
Index: monitor.c
===================================================================
RCS file: /CVSROOT/src/components/spread/monitor.c,v
retrieving revision 1.1.1.2
retrieving revision 1.4
diff -u -r1.1.1.2 -r1.4
--- monitor.c 2001/10/01 08:51:26 1.1.1.2
+++ monitor.c 2001/10/05 12:52:32 1.4
@@ -94,6 +94,7 @@
{
int i;
channel ch;
+ struct in_addr dummy_addr;
fclose(stderr);
@@ -171,9 +172,11 @@
Report_scat.elements[1].buf = (char *)&GlobalStatus;
Report_scat.elements[1].len = sizeof(status);
- SendChan = DL_init_channel( SEND_CHANNEL , My_port, 0, 0 );
+ dummy_addr.s_addr = INADDR_ANY;
+
+ SendChan = DL_init_channel( SEND_CHANNEL , My_port, dummy_addr, dummy_addr );
- ch = DL_init_channel( RECV_CHANNEL, My_port, 0, 0 );
+ ch = DL_init_channel( RECV_CHANNEL, My_port, dummy_addr, dummy_addr );
E_attach_fd( ch, READ_FD, Report_message, 0, NULL, HIGH_PRIORITY );
Print_menu();
@@ -403,8 +406,8 @@
{
proc_id = Cn.segments[i].procs[j]->id;
proc_index = Conf_proc_by_id( proc_id, &p );
- DL_send( SendChan, p.id, p.port, &Pack_scat );
- DL_send( SendChan, p.id, p.port, &Pack_scat );
+ mp_send( SendChan, My.node, p.node, p.port, &Pack_scat );
+ mp_send( SendChan, My.node, p.node, p.port, &Pack_scat );
}
}
E_queue( Send_partition, 0, NULL, Send_partition_timeout );
@@ -522,8 +525,8 @@
{
proc_id = Cn.segments[i].procs[j]->id;
proc_index = Conf_proc_by_id( proc_id, &p );
- DL_send( SendChan, p.id, p.port, &Pack_scat );
- DL_send( SendChan, p.id, p.port, &Pack_scat );
+ mp_send( SendChan, My.node, p.node, p.port, &Pack_scat );
+ mp_send( SendChan, My.node, p.node, p.port, &Pack_scat );
}
}
}
@@ -609,7 +612,7 @@
proc_index = Conf_proc_by_id( proc_id, &p );
if( Status_vector[proc_index] )
{
- DL_send( SendChan, p.id, p.port, &Pack_scat );
+ mp_send( SendChan, My.node, p.node, p.port, &Pack_scat );
}
}
}
@@ -682,8 +685,8 @@
if( Kill_partition[proc_index] == -1 )
{
Alarm( PRINT , "Monitor: Terminating %s\n", p.name );
- DL_send( SendChan, p.id, p.port, &Pack_scat );
- DL_send( SendChan, p.id, p.port, &Pack_scat );
+ mp_send( SendChan, My.node, p.node, p.port, &Pack_scat );
+ mp_send( SendChan, My.node, p.node, p.port, &Pack_scat );
}
}
}
Index: multipath.c
===================================================================
RCS file: multipath.c
diff -N multipath.c
--- /dev/null Tue Oct 9 12:13:01 2001
+++ multipath.c Tue Oct 9 12:13:19 2001
@@ -0,0 +1,204 @@
+#include <stdlib.h>
+#include <string.h>
+#include "arch.h"
+
+#ifndef ARCH_PC_WIN95
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <sys/uio.h>
+/* for select */
+#include <sys/time.h>
+#include <unistd.h>
+
+#else /* ARCH_PC_WIN95 */
+
+#include <winsock.h>
+
+#endif /* ARCH_PC_WIN95 */
+
+#include "multipath.h"
+#include "data_link.h"
+#include "configuration.h"
+
+static struct list *net_head = NULL;
+static struct list *node_head = NULL;
+
+static void list_add (struct list **head, struct list *el)
+{
+ el->prev = NULL;
+ el->next = *head;
+ if (*head)
+ (*head)->prev = el;
+ *head = el;
+}
+
+/* Warning : If node have multiple interface on a net, intf_get only
+ returns the first... */
+struct mp_intf *intf_get (struct mp_node *node, struct mp_net *net)
+{
+ struct list *tmp;
+ struct mp_intf *intf;
+
+ for (tmp = net->intf_head; tmp; tmp = tmp->next)
+ {
+ intf = (struct mp_intf *) tmp->data;
+ if (intf->node == node)
+ return intf;
+ }
+
+ return NULL;
+}
+
+struct mp_net *net_get (char name[])
+{
+ struct list *tmp;
+ struct mp_net *net;
+
+ for (tmp = net_head; tmp; tmp = tmp->next)
+ {
+ net = (struct mp_net *) tmp->data;
+ if (!strcmp (name, net->net_name))
+ return net;
+ }
+
+ return NULL;
+}
+
+struct mp_node *node_get (char name[])
+{
+ struct list *tmp;
+ struct mp_node *node;
+
+ for (tmp = node_head; tmp; tmp = tmp->next)
+ {
+ node = (struct mp_node *) tmp->data;
+ if (!strcmp (name, node->node_name))
+ return node;
+ }
+
+ return NULL;
+}
+
+struct mp_intf *intf_add (struct mp_node *node,
+ struct mp_net *net,
+ struct if_info *ifc)
+{
+ struct mp_intf *intf;
+
+ intf = intf_get (node, net);
+
+ if (!intf)
+ {
+ intf = (struct mp_intf *) malloc (sizeof (struct mp_intf));
+ memset ((void *) intf, 0, sizeof (struct mp_intf));
+ intf->node = node;
+ intf->net = net;
+ intf->intf_this_net.data = intf->intf_this_node.data = (void *) intf;
+ list_add (&node->intf_head, &intf->intf_this_node);
+ list_add (&net->intf_head, &intf->intf_this_net);
+ }
+
+ if (ifc)
+ {
+ intf->ifc = ifc;
+ intf->addr.s_addr = htonl (ifc->ip);
+ }
+
+ return intf;
+}
+
+void net_insert (struct mp_net *net)
+{
+ net->net_this_segment.data = net->global_list.data = (void *) net;
+ list_add (&net_head, &net->global_list);
+ list_add (&net->segment->net_head, &net->net_this_segment);
+}
+
+struct mp_net *net_add (char name[],
+ struct in_addr bcast,
+ struct in_addr netmask,
+ struct dummy_segment *seg)
+{
+ struct mp_net *net;
+
+ net = net_get (name);
+
+ if (!net)
+ {
+ net = (struct mp_net *) malloc (sizeof (struct mp_net));
+ memset ((void *) net, 0, sizeof (struct mp_net));
+ strcpy (net->net_name, name);
+ net->segment = seg;
+ net_insert (net);
+ }
+
+ if (bcast.s_addr)
+ {
+ net->bcast = bcast;
+ net->netmask = netmask;
+ }
+
+ return net;
+}
+
+struct mp_node *node_add (char name[], struct dummy_proc *proc)
+{
+ struct mp_node *node;
+
+ node = node_get (name);
+
+ if (!node)
+ {
+ node = (struct mp_node *) malloc (sizeof (struct mp_node));
+ memset ((void *) node, 0, sizeof (struct mp_node));
+ strcpy (node->node_name, name);
+ node->proc = proc;
+ node->global_list.data = (void *) node;
+ list_add (&node_head, &node->global_list);
+ }
+
+ return node;
+}
+
+int mp_send (channel chan,
+ struct mp_node *mynode,
+ struct mp_node *target,
+ unsigned short target_port,
+ sys_scatter *scat)
+{
+ struct mp_net *net;
+ struct mp_intf *intf;
+ struct list *tmp;
+ int ret = -1, r = 0;
+
+ /* For each of my own interfaces, find the corresponding network. If
+ the target node have an interface on this network, send message
+ to this interface. If no target has been specified, just
+ broadcast once per network */
+
+ for (tmp = mynode->intf_head; tmp; tmp = tmp->next)
+ {
+ intf = (struct mp_intf *) tmp->data;
+ net = intf->net;
+
+ if (target)
+ {
+ if ((intf = intf_get (target, net)))
+ {
+ r = DL_send (chan, intf->addr, target_port, scat);
+ }
+ }
+ else
+ {
+ r = DL_send (chan, net->bcast, target_port, scat);
+ }
+
+ ret = MAX (ret, r);
+ }
+
+ return ret;
+}
+
Index: multipath.h
===================================================================
RCS file: multipath.h
diff -N multipath.h
--- /dev/null Tue Oct 9 12:13:01 2001
+++ multipath.h Tue Oct 9 12:13:19 2001
@@ -0,0 +1,86 @@
+#ifndef _MULTIPATH_H
+#define _MULTIPATH_H
+
+#include "arch.h"
+#include "scatter.h"
+
+#ifndef ARCH_PC_WIN95
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+
+#else /* ARCH_PC_WIN95 */
+
+#include <winsock.h>
+
+#endif /* ARCH_PC_WIN95 */
+
+struct mp_net;
+struct mp_node;
+struct dummy_segment;
+struct dummy_proc;
+struct if_info;
+
+struct list
+{
+ struct list *prev;
+ struct list *next;
+ void *data;
+};
+
+struct mp_intf
+{
+ struct in_addr addr; /* Unicast address for directed send */
+ struct if_info *ifc;
+ struct mp_net *net;
+ struct mp_node *node;
+ struct list intf_this_net;
+ struct list intf_this_node;
+};
+
+struct mp_net
+{
+ char net_name[256];
+ struct in_addr bcast;
+ struct in_addr netmask;
+ unsigned int state;
+ struct dummy_segment *segment;
+ struct list *intf_head;
+ struct list net_this_segment;
+ struct list global_list;
+};
+
+struct mp_node
+{
+ char node_name[256];
+ unsigned short port; /* Port for directed send */
+ struct dummy_proc *proc;
+ struct list *intf_head;
+ struct list global_list;
+};
+
+struct mp_intf *intf_get (struct mp_node *node, struct mp_net *net);
+struct mp_net *net_get (char name[]);
+struct mp_node *node_get (char name[]);
+struct mp_intf *intf_add (struct mp_node *node,
+ struct mp_net *net,
+ struct if_info *ifc);
+void net_insert (struct mp_net *net);
+struct mp_net *net_add (char name[],
+ struct in_addr bcast,
+ struct in_addr netmask,
+ struct dummy_segment *seg);
+struct mp_node *node_add (char name[], struct dummy_proc *);
+struct mp_node *node_get (char name[]);
+
+int mp_send (channel chan,
+ struct mp_node *mynode,
+ struct mp_node *target,
+ unsigned short target_port,
+ sys_scatter *scat);
+
+#undef MAX
+#define MAX(a, b) (((a) > (b)) ? (a) : (b))
+
+#endif
Index: net_types.h
===================================================================
RCS file: /CVSROOT/src/components/spread/net_types.h,v
retrieving revision 1.1.1.2
retrieving revision 1.1.1.2.2.1
diff -u -r1.1.1.2 -r1.1.1.2.2.1
--- net_types.h 2001/10/01 08:51:26 1.1.1.2
+++ net_types.h 2001/10/08 14:37:10 1.1.1.2.2.1
@@ -59,13 +59,13 @@
#define FORM2_TYPE 0x00002000
#define FORM_TYPE 0x00003000
-#define ARQ_TYPE 0x000f0000
-#define RETRANS_TYPE 0x00f00000
+#define ARQ_TYPE 0x00ff0000
+#define RETRANS_TYPE 0x0f000000
-#define STATUS_TYPE 0x01000000
-#define PARTITION_TYPE 0x02000000
-#define FC_TYPE 0x04000000
-#define CONTROL_TYPE 0x0f000000
+#define STATUS_TYPE 0x10000000
+#define PARTITION_TYPE 0x20000000
+#define FC_TYPE 0x40000000
+#define CONTROL_TYPE 0xf0000000
#define Is_unreliable( type ) ( type & UNRELIABLE_TYPE )
@@ -92,8 +92,8 @@
#define Get_arq( type ) ( (type & ARQ_TYPE) >> 16)
#define Set_arq( type, val ) ( (type & ~ARQ_TYPE) | ((val << 16)&ARQ_TYPE) )
-#define Get_retrans( type ) ( (type & RETRANS_TYPE) >> 20)
-#define Set_retrans( type, val) ( (type & ~RETRANS_TYPE) | ((val << 20)&RETRANS_TYPE) )
+#define Get_retrans( type ) ( (type & RETRANS_TYPE) >> 24)
+#define Set_retrans( type, val) ( (type & ~RETRANS_TYPE) | ((val << 24)&RETRANS_TYPE) )
#define Is_status( type ) ( type & STATUS_TYPE )
#define Is_partition( type ) ( type & PARTITION_TYPE )
Index: network.c
===================================================================
RCS file: /CVSROOT/src/components/spread/network.c,v
retrieving revision 1.1.1.2
retrieving revision 1.7
diff -u -r1.1.1.2 -r1.7
--- network.c 2001/10/01 08:51:27 1.1.1.2
+++ network.c 2001/10/08 14:55:09 1.7
@@ -47,14 +47,14 @@
static channel Send_channel;
static int Num_bcast_channels;
+static int Num_token_channels;
static int Bcast_needed;
-static int32 Bcast_address;
static int16 Bcast_port;
static int Num_send_needed;
-static int32 Send_address[MAX_SEGMENTS];
-static int16 Send_ports[MAX_SEGMENTS];
+static struct mp_node *Send_address[MAX_SEGMENTS];
+static unsigned short Send_ports[MAX_SEGMENTS];
/* ### Pack: 3 lines */
/* Global in function so both Net_queue_bcast and Net_flush_bcast can access them */
@@ -63,7 +63,7 @@
static const char align_padding[4] = "padd";
/* address for token sending - which is always needed */
-static int32 Token_address;
+static struct mp_node *Token_address;
static int16 Token_port;
static configuration Net_membership;
@@ -86,9 +86,11 @@
void Net_init()
{
proc dummy_proc;
- int32u interface_addr;
- int i;
+ struct in_addr interface_addr, addr_any;
+ struct mp_intf *intf;
+ struct list *tmp;
+ addr_any.s_addr = INADDR_ANY;
Cn = Conf();
My = Conf_my();
@@ -99,32 +101,37 @@
{
/* I am not allone in segment */
Bcast_needed = 1;
- Bcast_address = Cn.segments[My.seg_index].bcast_address;
Bcast_port = My.port;
-
- Alarm( NETWORK, "Net_init: Bcast needed to address (%d, %d)\n",
- Bcast_address, Bcast_port );
+ Alarm( NETWORK, "Net_init: Bcast needed to port %d\n",
+ Bcast_port );
}else{
Bcast_needed = 0;
- Bcast_address = 0;
Alarm( NETWORK, "Net_init: Bcast is not needed\n" );
}
- for ( i=0; i < My.num_if; i++)
- {
- if (Is_IfType_Daemon( My.ifc[i].type ) || Is_IfType_Any( My.ifc[i].type ) )
- {
- if (Is_IfType_Any( My.ifc[i].type ) )
- interface_addr = 0;
- else
- interface_addr = My.ifc[i].ip;
-
- Bcast_channel[i] = DL_init_channel( RECV_CHANNEL, My.port, Bcast_address, interface_addr );
- Token_channel[i] = DL_init_channel( RECV_CHANNEL, My.port+1, 0, interface_addr );
- Num_bcast_channels++;
- }
- }
- Send_channel = DL_init_channel( SEND_CHANNEL, My.port+2, 0, My.id );
+ /* We have a bit of a problem here. We'd like to bind each
+ broadcast channel to an interface and be done with that,
+ but neither Linux 2.4.x nor Solaris 8 seem to be able to
+ receive a broadcasted message on a bound socket... So we
+ create a channel that only acts as a garbage
+ collector. Anyone with a brighter idea ?? */
+
+ Bcast_channel[0] = DL_init_channel( RECV_CHANNEL, My.port, addr_any, addr_any );
+ Num_bcast_channels = 1;
+
+ for (tmp = My.node->intf_head; tmp; tmp = tmp->next)
+ {
+ /* We know for sure that this interface is ANY or DAEMON */
+ intf = (struct mp_intf *) tmp->data;
+ if (Is_IfType_Any (intf->ifc->type))
+ continue; /* ANY is already taken care of with first socket */
+
+ interface_addr = intf->addr;
+
+ Bcast_channel[Num_bcast_channels++] = DL_init_channel( RECV_CHANNEL, My.port, intf->net->bcast, interface_addr );
+ Token_channel[Num_token_channels++] = DL_init_channel( RECV_CHANNEL, My.port+1, addr_any, interface_addr );
+ }
+ Send_channel = DL_init_channel( SEND_CHANNEL, My.port + 2, addr_any, addr_any );
Num_send_needed = 0;
}
@@ -162,7 +169,7 @@
} else if( Net_membership.segments[i].num_procs > 0 ) {
- Send_address[Num_send_needed] = Net_membership.segments[i].procs[0]->id;
+ Send_address[Num_send_needed] = Net_membership.segments[i].procs[0]->node;
Send_ports [Num_send_needed] = Net_membership.segments[i].port;
Num_send_needed++;
@@ -175,10 +182,10 @@
i, Send_address[i], Send_ports[i] );
/* Calculate where to send the token */
- Token_address = 0;
+ Token_address = NULL;
if( my_index_in_seg < My_seg.num_procs-1 )
{
- Token_address = My_seg.procs[my_index_in_seg+1]->id;
+ Token_address = My_seg.procs[my_index_in_seg+1]->node;
Token_port = My_seg.port+1;
}else{
/* I am last in my segment */
@@ -188,7 +195,7 @@
* My segment is the only segment
* sending token to the first in my segment
*/
- Token_address = My_seg.procs[0]->id;
+ Token_address = My_seg.procs[0]->node;
Token_port = My_seg.port+1;
} else if( Num_send_needed == my_next_index ) {
/*
@@ -207,8 +214,8 @@
}
}
- Alarm( NETWORK, "Net_set_membership: Token_address : (%d, %d)\n",
- Token_address, Token_port );
+ Alarm( NETWORK, "Net_set_membership: Token_address : (%s, %d)\n",
+ Token_address->node_name, Token_port );
}
int Net_bcast( sys_scatter *scat )
@@ -225,14 +232,14 @@
pack_ptr->transmiter_id = My.id;
for ( i=0; i< Num_send_needed; i++ )
{
- ret = DL_send( Send_channel, Send_address[i], Send_ports[i], scat );
+ ret = mp_send( Send_channel, My.node, Send_address[i], Send_ports[i], scat );
}
pack_ptr->type = Clear_routed( pack_ptr->type );
/* broadcasting if needed according to configuration */
if( Bcast_needed )
{
- ret = DL_send( Send_channel, Bcast_address, Bcast_port, scat );
+ ret = mp_send( Send_channel, My.node, NULL, Bcast_port, scat );
}
return( ret );
}
@@ -325,7 +332,7 @@
for ( i=0; i< Num_send_needed; i++ )
{
- ret = DL_send( Send_channel, Send_address[i], Send_ports[i], &Queue_scat );
+ ret = mp_send( Send_channel, My.node, Send_address[i], Send_ports[i], &Queue_scat );
}
pack_ptr = (packet_header *)Queue_scat.elements[0].buf;
pack_ptr->type = Clear_routed( pack_ptr->type );
@@ -333,7 +340,7 @@
/* broadcasting if needed according to configuration */
if( Bcast_needed )
{
- ret = DL_send( Send_channel, Bcast_address, Bcast_port, &Queue_scat );
+ ret = mp_send( Send_channel, My.node, NULL, Bcast_port, &Queue_scat );
}
Queue_scat.num_elements = 0;
Queued_bytes = 0;
@@ -353,14 +360,14 @@
{
if( Bcast_needed )
{
- ret = DL_send( Send_channel, Bcast_address, Bcast_port, scat );
+ ret = mp_send( Send_channel, My.node, NULL, Bcast_port, scat );
}
}else{
if( Net_membership.segments[seg_index].num_procs > 0 )
{
pack_ptr->type = Set_routed( pack_ptr->type );
- ret = DL_send( Send_channel,
- Net_membership.segments[seg_index].procs[0]->id,
+ ret = mp_send( Send_channel, My.node,
+ Net_membership.segments[seg_index].procs[0]->node,
Net_membership.segments[seg_index].port,
scat );
pack_ptr->type = Clear_routed( pack_ptr->type );
@@ -384,7 +391,7 @@
Alarm( PRINT, "Net_ucast: non existing proc_id %d\n",proc_id );
return( ret );
}
- ret = DL_send( Send_channel, proc_id, p.port, scat );
+ ret = mp_send( Send_channel, My.node, p.node, p.port, scat );
return( ret );
}
@@ -411,7 +418,6 @@
if (ch_found == FALSE) {
Alarm(EXIT, "Net_recv: Listening and received packet on un-used interface %d\n", fd);
}
-
received_bytes = DL_recv( fd, scat );
if( received_bytes <= 0 ) return( received_bytes );
@@ -489,7 +495,7 @@
/* fliping to original form */
if( !Same_endian( pack_ptr->type ) ) Flip_pack( pack_ptr );
- DL_send( Send_channel, Bcast_address, Bcast_port, scat );
+ mp_send( Send_channel, My.node, NULL, Bcast_port, scat );
/* re-fliping to my form */
if( !Same_endian( pack_ptr->type ) ) Flip_pack( pack_ptr );
@@ -573,7 +579,7 @@
token_ptr = (token_header *)scat->elements[0].buf;
token_ptr->type = Set_endian( token_ptr->type );
token_ptr->transmiter_id = My.id;
- ret = DL_send( Send_channel, Token_address, Token_port, scat );
+ ret = mp_send( Send_channel, My.node, Token_address, Token_port, scat );
return ( ret );
}
@@ -626,12 +632,13 @@
proc_id );
return( ret );
}
- ret = DL_send( Send_channel, proc_id, p.port+1, scat );
+ ret = mp_send( Send_channel, My.node, p.node, p.port+1, scat );
return( ret );
}
-int Net_num_channels()
+void Net_num_channels(int *bcast, int *token)
{
- return( Num_bcast_channels );
+ *bcast = Num_bcast_channels;
+ *token = Num_token_channels;
}
channel *Net_bcast_channel()
{
Index: network.h
===================================================================
RCS file: /CVSROOT/src/components/spread/network.h,v
retrieving revision 1.1.1.2
retrieving revision 1.2
diff -u -r1.1.1.2 -r1.2
--- network.h 2001/10/01 08:51:27 1.1.1.2
+++ network.h 2001/10/08 12:45:15 1.2
@@ -51,6 +51,6 @@
int Net_ucast_token( int32 proc_id, sys_scatter *scat );
channel *Net_bcast_channel();
channel *Net_token_channel();
-int Net_num_channels();
+void Net_num_channels(int *, int*);
#endif /* INC_NETWORK */
Index: protocol.c
===================================================================
RCS file: /CVSROOT/src/components/spread/protocol.c,v
retrieving revision 1.1.1.2
retrieving revision 1.2.2.1
diff -u -r1.1.1.2 -r1.2.2.1
--- protocol.c 2001/10/01 08:51:28 1.1.1.2
+++ protocol.c 2001/10/08 14:37:10 1.2.2.1
@@ -87,7 +87,7 @@
void Prot_init(void)
{
- int i;
+ int i, num_bcast, num_token;
channel *bcast_channels;
channel *token_channels;
@@ -149,10 +149,13 @@
bcast_channels = Net_bcast_channel();
token_channels = Net_token_channel();
- for ( i = 0; i < Net_num_channels(); i++) {
+ Net_num_channels (&num_bcast, &num_token);
+ for ( i = 0; i < num_bcast; i++) {
E_attach_fd( *bcast_channels, READ_FD, Prot_handle_bcast, 0, NULL, HIGH_PRIORITY );
- E_attach_fd( *token_channels, READ_FD, Prot_handle_token, 0, NULL, MEDIUM_PRIORITY );
bcast_channels++;
+ }
+ for ( i = 0; i < num_token; i++) {
+ E_attach_fd( *token_channels, READ_FD, Prot_handle_token, 0, NULL, MEDIUM_PRIORITY );
token_channels++;
}
@@ -466,7 +469,7 @@
return;
}
}else{
- if( Get_arq(Token->type) == Get_arq(Last_token->type) )
+ if( Get_arq(Token->type) != ((Get_arq(Last_token->type)+1)%0x100) )
{
if( Get_retrans(Token->type) > Get_retrans(Last_token->type) )
{
@@ -575,7 +578,7 @@
if( Conf_leader( Memb_active_ptr() ) == My.id )
{
val = Get_arq( Token->type );
- val = (val + 1)% 0x10;
+ val = (val + 1)% 0x100;
Token->type = Set_arq( Token->type, val );
Token->type = Set_retrans( Token->type, 0 );
}
Index: session.c
===================================================================
RCS file: /CVSROOT/src/components/spread/session.c,v
retrieving revision 1.1.1.2
retrieving revision 1.2
diff -u -r1.1.1.2 -r1.2
--- session.c 2001/10/01 08:51:30 1.1.1.2
+++ session.c 2001/10/08 12:45:16 1.2
@@ -134,7 +134,7 @@
{
struct sockaddr_in inet_addr;
int16 port;
- int ret, i;
+ int ret, i, on = 1;
mailbox mbox;
#ifndef ARCH_PC_WIN95
@@ -190,6 +190,9 @@
inet_addr.sin_addr.s_addr = INADDR_ANY;
else
inet_addr.sin_addr.s_addr = htonl(My.ifc[i].ip);
+ if (setsockopt (mbox, SOL_SOCKET, SO_REUSEADDR, &on, sizeof (on)))
+ Alarm (EXIT, "Sess_init: SO_REUSEADDR failed\n");
+
if( bind( mbox, (struct sockaddr *)&inet_addr, sizeof(inet_addr) ) == -1)
{
Alarm( PRINT, "Sess_init: INET unable to bind to port %d, already running \n" ,port );
Index: status.c
===================================================================
RCS file: /CVSROOT/src/components/spread/status.c,v
retrieving revision 1.1.1.1
retrieving revision 1.4
diff -u -r1.1.1.1 -r1.4
--- status.c 2001/09/24 15:27:03 1.1.1.1
+++ status.c 2001/10/05 12:52:33 1.4
@@ -54,10 +54,12 @@
void Stat_init()
{
int16 dummy_port = 0;
+ struct in_addr addr;
Start_time = E_get_time();
- Report_channel = DL_init_channel( SEND_CHANNEL, dummy_port, 0, Conf_my().id );
+ addr.s_addr = INADDR_ANY;
+ Report_channel = DL_init_channel( SEND_CHANNEL, dummy_port, addr, addr );
Report_scat.num_elements = 2;
Report_scat.elements[0].len = sizeof( packet_header );
@@ -87,6 +89,7 @@
packet_header *pack_ptr;
proc p;
int ret;
+ struct in_addr addr;
pack_ptr = (packet_header *)scat->elements[0].buf;
if( ! ( pack_ptr->memb_id.proc_id == 15051963 && Conf_id_in_conf( &Cn, pack_ptr->proc_id ) != -1 ) )
@@ -98,8 +101,8 @@
now = E_get_time();
delta = E_sub_time( now, Start_time );
GlobalStatus.sec = delta.sec;
-
- DL_send( Report_channel, pack_ptr->proc_id, pack_ptr->seq, &Report_scat );
+ addr.s_addr = htonl (pack_ptr->proc_id);
+ DL_send ( Report_channel, addr, pack_ptr->seq, &Report_scat );
ret = Conf_proc_by_id( pack_ptr->proc_id, &p );
if( ret < 0 )
Alarm( STATUS,
--
And don't forget you'll never get a dog to walk upright
Just 'cause you've got the power, that don't mean you've got the right.
More information about the Spread-users
mailing list