[Spread-users] Multipath Spread
Marc Zyngier
Marc.Zyngier at evidian.com
Thu Oct 4 11:31:19 EDT 2001
Hello all,
We are currently considering using Spread in a high avaibility
environment. One of our main requirements is the ability to handle
multiple networks attached to each machine, and using those networks
as a redundant link.
So we came, we saw, we hacked (famous last words...).
The basic idea is to describe the network topology in the
configuration file :
Spread_Segment mp:4803
{
192.168.110.255
{
{ foo D 192.168.110.1 }
{ bar 192.168.110.2 }
{ fubar 192.168.110.3 }
}
192.168.111.255
{
{ foo 192.168.111.1 }
{ fubar 192.168.111.3 }
}
foo { C 127.0.0.1 }
}
The 'mp:4803' token indicates that we are describing a "multipath"
configuration. In this sample configuration, we have two networks
(192.168.110.255 and 192.168.111.255). foo and fubar both have an
interface on both networks. truc only belongs to the first. Moreover,
foo will listen to clients on 127.0.0.1.
The patch changes the DL_send interface, so that we do not send a
packet to an ip address, but to a proc. Another layer (multipath) is
responsible for the conversion from proc to a list of addresses. Thus,
it is quite invasive in both network.c and datalink.c. It changes the
configuration parser quite a bit too...
Please note that this patch is a work in progress. It is not as nicely
integrated as it should be, and probably introduces quite a lot of
problems (we saw quite a few crashes with req_seq < Aru...). We'd like
to get as much feedback as possible from Spread creators and users, so
we can fix problems as fast as possible.
This patch is against 3.16.1beta1, and has been tested on Linux,
Solaris and W2000.
Thanks a lot.
Marc Zyngier
Evidian - SafeKit Project
http://www.evidian.com
<std disclaimer>
Copyright (C) 2001 Evidian.
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR
ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
</std disclaimer>
Index: LINUX_makefile
===================================================================
RCS file: /CVSROOT/src/components/spread/LINUX_makefile,v
retrieving revision 1.1.1.2
retrieving revision 1.4
diff -u -r1.1.1.2 -r1.4
--- LINUX_makefile 2001/10/01 08:51:18 1.1.1.2
+++ LINUX_makefile 2001/10/03 13:53:12 1.4
@@ -15,7 +15,7 @@
CFLAGS = -O2 -ansi -c -D_GNU_SOURCE -Wall
TCFLAGS = -O2 -c -D_GNU_SOURCE -Wall # ansi flag cannot be used because of an error when compiling sp.to
-COBJS = lex.yy.o y.tab.o configuration.o skiplist.o acm.o
+COBJS = lex.yy.o y.tab.o configuration.o skiplist.o acm.o multipath.o
OBJS = alarm.o events.o memory.o membership.o data_link.o network.o status.o log.o flow_control.o message.o acp-permit.o auth-null.o auth-ip.o
# add to spread build line if enabling password
AUTHLIBS = -lcrypt
Index: SOLARIS_makefile
===================================================================
RCS file: /CVSROOT/src/components/spread/SOLARIS_makefile,v
retrieving revision 1.1.1.1
retrieving revision 1.2
diff -u -r1.1.1.1 -r1.2
--- SOLARIS_makefile 2001/09/24 15:26:53 1.1.1.1
+++ SOLARIS_makefile 2001/09/28 07:42:09 1.2
@@ -10,7 +10,7 @@
CFLAGS = -O2 -ansi -Wall -c
TCFLAGS = -O2 -Wall -c # ansi flag cannot be used because of an error when compiling sp.to
-COBJS = lex.yy.o y.tab.o configuration.o skiplist.o acm.o
+COBJS = lex.yy.o y.tab.o configuration.o skiplist.o acm.o multipath.o
OBJS = alarm.o events.o memory.o membership.o data_link.o network.o status.o log.o flow_control.o message.o acp-permit.o auth-null.o auth-ip.o
all: spread spmonitor spuser sptuser simple_user spflooder
Index: config_gram.l
===================================================================
RCS file: /CVSROOT/src/components/spread/config_gram.l,v
retrieving revision 1.1.1.1
retrieving revision 1.2
diff -u -r1.1.1.1 -r1.2
--- config_gram.l 2001/09/24 15:27:04 1.1.1.1
+++ config_gram.l 2001/09/27 10:01:52 1.2
@@ -61,6 +61,7 @@
no [Nn][Oo]
ipaddr [0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}
ipport [0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}:[0-9]{1,5}
+mpport [Mm][Pp]:[0-9]{1,5}
%option noyywrap
%%
#.* {} /* Comments */
@@ -151,6 +152,9 @@
yylval.ip.port = 0;
return IPADDR;
}
+{mpport} { yylval.ip.port = (unsigned short) atoi (yytext + 3);
+ return MPPORT;
+ }
([0-9]{1,3}[ \t]*)+[ \t]* {
int fcost, i, done;
char *c;
Index: config_parse.y
===================================================================
RCS file: /CVSROOT/src/components/spread/config_parse.y,v
retrieving revision 1.1.1.1
retrieving revision 1.6
diff -u -r1.1.1.1 -r1.6
--- config_parse.y 2001/09/24 15:27:05 1.1.1.1
+++ config_parse.y 2001/10/04 12:40:45 1.6
@@ -51,6 +51,7 @@
#endif /* ARCH_PC_WIN95 */
#include "alarm.h"
+#include "multipath.h"
#include "configuration.h"
#include "skiplist.h"
#include "memory.h"
@@ -67,22 +68,60 @@
static int num_procs = 0;
static int segment_procs = 0;
static int segments = 0;
+ static int networks = 0;
static int rvec_num = 0;
static int procs_interfaces = 0;
static proc *new_proc;
+ static proc *current_proc = NULL;
static int32 temp_seg_list[MAX_PROCS_SEGMENT];
+ static struct mp_net *current_net = NULL;
static int authentication_configured = 0;
+static void create_intfs_current_net (proc *p)
+{
+ int i;
+
+ if (!current_net)
+ {
+ current_net = (struct mp_net *) malloc (sizeof (struct mp_net));
+ memset ((void *) current_net, 0, sizeof (struct mp_net));
+ sprintf (current_net->net_name, "network_%d", networks);
+ current_net->segment = &Config.segments[segments];
+ }
+
+ for (i = 0; i < p->num_if; i++)
+ if (Is_IfType_Daemon(p->ifc[i].type) || Is_IfType_Any(p->ifc[i].type))
+ intf_add (p->node, current_net, &p->ifc[i]);
+}
+
+static void attach_to_current_net (struct mp_node *n, struct if_info *ifc)
+{
+ if (!current_net)
+ {
+ current_net = (struct mp_net *) malloc (sizeof (struct mp_net));
+ memset ((void *) current_net, 0, sizeof (struct mp_net));
+ sprintf (current_net->net_name, "network_%d", networks);
+ current_net->segment = &Config.segments[segments];
+ }
+
+ intf_add (n, current_net, ifc);
+}
+
static char *segment2str(int seg) {
- static char ipstr[40];
- int id = Config.segments[seg].bcast_address;
- sprintf(ipstr, "%d.%d.%d.%d:%d",
- (id & 0xff000000)>>24,
- (id & 0xff0000)>>16,
- (id & 0xff00)>>8,
- (id & 0xff),
- Config.segments[seg].port);
+ static char ipstr[256];
+ struct mp_net *net;
+ struct list *tmp;
+ int i = 0;
+
+ for (tmp = Config.segments[seg].net_head; tmp; tmp = tmp->next)
+ {
+ net = (struct mp_net *) tmp->data;
+ i += sprintf(ipstr + i, "%s:%d/",
+ inet_ntoa (net->bcast),
+ Config.segments[seg].port);
+ }
+ ipstr[i-1] = '\0';
return ipstr;
}
static void alarm_print_proc(proc *p, int port) {
@@ -191,6 +230,7 @@
%token SP_BOOL LINKPROTOCOL PHOP PTCPHOP
%token IMONITOR ICLIENT IDAEMON
%token ROUTEMATRIX LINKCOST
+%token MPPORT
%%
Config : ConfigStructs
{
@@ -342,8 +382,9 @@
SEGMENT_CHECK( segments, inet_ntoa($2.ip.addr) );
Config.segments[segments].num_procs = segment_procs;
Config.segments[segments].port = $2.ip.port;
- Config.segments[segments].bcast_address =
- $2.ip.addr.s_addr;
+ current_net->bcast.s_addr = htonl ($2.ip.addr.s_addr);
+ net_insert (current_net);
+ current_net = NULL;
if(Config.segments[segments].port == 0)
Config.segments[segments].port = DEFAULT_SPREAD_PORT;
Alarm(CONF, "Successfully configured Segment %d [%s] with %d procs:\n",
@@ -360,8 +401,32 @@
Config.segments[segments].port);
}
segments++;
+ networks++;
segment_procs = 0;
}
+ | SEGMENT MPPORT OPENBRACE NetDescs CLOSEBRACE
+ {
+ int i;
+ Config.segments[segments].num_procs = segment_procs;
+ Config.segments[segments].port = $2.ip.port;
+ if(Config.segments[segments].port == 0)
+ Config.segments[segments].port = DEFAULT_SPREAD_PORT;
+ Alarm(CONF, "Successfully configured Segment %d [%s] with %d procs:\n",
+ segments,
+ segment2str(segments),
+ segment_procs);
+ for(i=(num_procs - segment_procs); i<num_procs; i++) {
+ /* This '1' is to keep each proc with the same port as the segment.*/
+ if( 1 || Config_procs[i].port==0) {
+ Config_procs[i].port=
+ Config.segments[segments].port;
+ }
+ alarm_print_proc(&Config_procs[i],
+ Config.segments[segments].port);
+ }
+ segments++;
+ segment_procs = 0;
+ }
;
Segmentparams : Segmentparam Segmentparams
@@ -369,13 +434,15 @@
;
Segmentparam : STRING IPADDR OPENBRACE Interfaceparams CLOSEBRACE
- {
- PROCS_CHECK( num_procs, $1.string );
+ {
+ PROCS_CHECK( num_procs, $1.string );
SEGMENT_CHECK( segments, $1.string );
SEGMENT_SIZE_CHECK( segment_procs, $1.string );
if (procs_interfaces == 0)
yyerror("Interfaces section declared but no actual interface addresses defined\n");
strcpy(Config_procs[num_procs].name, $1.string);
+ Config_procs[num_procs].node = node_add (Config_procs[num_procs].name,
+ &Config_procs[num_procs]);
Config_procs[num_procs].id = $2.ip.addr.s_addr;
Config_procs[num_procs].port = $2.ip.port;
Config_procs[num_procs].seg_index = segments;
@@ -383,18 +450,21 @@
Config_procs[num_procs].num_if = procs_interfaces;
Config.segments[segments].procs[segment_procs] =
&Config_procs[num_procs];
+ create_intfs_current_net (&Config_procs[num_procs]);
num_procs++;
segment_procs++;
procs_interfaces = 0;
}
| STRING OPENBRACE Interfaceparams CLOSEBRACE
- {
+ {
PROCS_CHECK( num_procs, $1.string );
SEGMENT_CHECK( segments, $1.string );
SEGMENT_SIZE_CHECK( segment_procs, $1.string );
if (procs_interfaces == 0)
yyerror("Interfaces section declared but no actual interface addresses defined\n");
strcpy(Config_procs[num_procs].name, $1.string);
+ Config_procs[num_procs].node = node_add (Config_procs[num_procs].name,
+ &Config_procs[num_procs]);
Config_procs[num_procs].id =
name2ip(Config_procs[num_procs].name);
Config_procs[num_procs].port = 0;
@@ -403,16 +473,19 @@
Config_procs[num_procs].num_if = procs_interfaces;
Config.segments[segments].procs[segment_procs] =
&Config_procs[num_procs];
+ create_intfs_current_net (&Config_procs[num_procs]);
num_procs++;
segment_procs++;
procs_interfaces = 0;
}
| STRING IPADDR
- {
+ {
PROCS_CHECK( num_procs, $1.string );
SEGMENT_CHECK( segments, $1.string );
SEGMENT_SIZE_CHECK( segment_procs, $1.string );
strcpy(Config_procs[num_procs].name, $1.string);
+ Config_procs[num_procs].node = node_add (Config_procs[num_procs].name,
+ &Config_procs[num_procs]);
Config_procs[num_procs].id = $2.ip.addr.s_addr;
Config_procs[num_procs].port = $2.ip.port;
Config_procs[num_procs].seg_index = segments;
@@ -423,6 +496,7 @@
Config_procs[num_procs].ifc[0].type = IFTYPE_ALL | IFTYPE_ANY;
Config.segments[segments].procs[segment_procs] =
&Config_procs[num_procs];
+ create_intfs_current_net (&Config_procs[num_procs]);
num_procs++;
segment_procs++;
procs_interfaces = 0;
@@ -433,6 +507,8 @@
SEGMENT_CHECK( segments, $1.string );
SEGMENT_SIZE_CHECK( segment_procs, $1.string );
strcpy(Config_procs[num_procs].name, $1.string);
+ Config_procs[num_procs].node = node_add (Config_procs[num_procs].name,
+ &Config_procs[num_procs]);
Config_procs[num_procs].id =
name2ip(Config_procs[num_procs].name);
Config_procs[num_procs].port = 0;
@@ -444,12 +520,89 @@
Config_procs[num_procs].ifc[0].type = IFTYPE_ALL | IFTYPE_ANY;
Config.segments[segments].procs[segment_procs] =
&Config_procs[num_procs];
+ create_intfs_current_net (&Config_procs[num_procs]);
num_procs++;
segment_procs++;
procs_interfaces = 0;
}
;
+NetDescs : Networks MpTypedIntfs
+ ;
+
+Networks : Network Networks
+ |
+ ;
+
+Network : IPADDR OPENBRACE MpIntfs CLOSEBRACE
+ {
+ current_net->bcast.s_addr = htonl ($1.ip.addr.s_addr);
+ net_insert (current_net);
+ current_net = NULL;
+ networks++;
+ }
+ ;
+
+
+MpIntfs : MpIntf MpIntfs
+ |
+ ;
+
+MpIntf : OPENBRACE STRING IfTypeComp IPADDR CLOSEBRACE
+ {
+ struct mp_node *node;
+ PROCS_CHECK( num_procs, $2.string );
+ SEGMENT_CHECK( segments, $2.string );
+ SEGMENT_SIZE_CHECK( segment_procs, $2.string );
+ if (!(node = node_get ($2.string)))
+ {
+ strcpy(Config_procs[num_procs].name, $2.string);
+ Config_procs[num_procs].node = node_add (Config_procs[num_procs].name,
+ &Config_procs[num_procs]);
+ Config_procs[num_procs].id = $4.ip.addr.s_addr;
+ Config_procs[num_procs].port = $4.ip.port;
+ Config_procs[num_procs].seg_index = segments;
+ Config_procs[num_procs].index_in_seg = segment_procs;
+ Config_procs[num_procs].num_if = 0;
+ Config.segments[segments].procs[segment_procs] =
+ &Config_procs[num_procs];
+ node = Config_procs[num_procs].node;
+ num_procs++;
+ segment_procs++;
+ }
+ node->proc->ifc[node->proc->num_if].ip = $4.ip.addr.s_addr;
+ node->proc->ifc[node->proc->num_if].port = $4.ip.port;
+ if ($3.mask == 0)
+ node->proc->ifc[node->proc->num_if].type = IFTYPE_ALL;
+ else
+ node->proc->ifc[node->proc->num_if].type = $3.mask;
+ attach_to_current_net (node,
+ &node->proc->ifc[node->proc->num_if]);
+ node->proc->num_if++;
+ procs_interfaces = 0; /* XXX FIXME, pb with multiple spread ifc */
+ }
+ ;
+
+MpTypedIntfs : MpTypedIntf MpTypedIntfs
+ |
+ ;
+
+MpTypedIntf : STRING OPENBRACE MpIfParms CLOSEBRACE
+ {
+ struct mp_node *node;
+ int i;
+ if (!(node = node_get ($1.string)))
+ yyerror ("Unknown proc, cannot add interfaces\n");
+ if (!current_proc)
+ yyerror ("No interfaces for proc ??\n");
+ for (i = 0; i < current_proc->num_if; i++)
+ node->proc->ifc[node->proc->num_if + i] = current_proc->ifc[i];
+ node->proc->num_if += current_proc->num_if;
+ free (current_proc);
+ current_proc = NULL;
+ }
+ ;
+
IfType : IMONITOR { $$ = $1; }
| ICLIENT { $$ = $1; }
| IDAEMON { $$ = $1; }
@@ -467,7 +620,7 @@
;
Interfaceparam : IfTypeComp IPADDR
- {
+ { /* Is $1.string really what you think it is ?? */
PROCS_CHECK( num_procs, $1.string );
SEGMENT_CHECK( segments, $1.string );
SEGMENT_SIZE_CHECK( segment_procs, $1.string );
@@ -479,6 +632,28 @@
else
Config_procs[num_procs].ifc[procs_interfaces].type = $1.mask;
procs_interfaces++;
+ }
+ ;
+
+MpIfParms : MpIfParm MpIfParms
+ |
+ ;
+
+MpIfParm : IfTypeComp IPADDR
+ {
+ if (!current_proc)
+ {
+ current_proc = (proc *) malloc (sizeof (proc));
+ memset ((void *) current_proc, 0, sizeof (proc));
+ }
+ INTERFACE_NUM_CHECK(current_proc->num_if, "MpIfParm");
+ current_proc->ifc[current_proc->num_if].ip = $2.ip.addr.s_addr;
+ current_proc->ifc[current_proc->num_if].port = $2.ip.port;
+ if ($1.mask == 0)
+ current_proc->ifc[current_proc->num_if].type = IFTYPE_ALL;
+ else
+ current_proc->ifc[current_proc->num_if].type = $1.mask;
+ current_proc->num_if++;
}
;
Index: configuration.c
===================================================================
RCS file: /CVSROOT/src/components/spread/configuration.c,v
retrieving revision 1.1.1.1
retrieving revision 1.2
diff -u -r1.1.1.1 -r1.2
--- configuration.c 2001/09/24 15:26:54 1.1.1.1
+++ configuration.c 2001/09/24 17:03:06 1.2
@@ -479,13 +479,15 @@
int s,p,ret;
char ip[16];
proc pr;
+ struct mp_net *net;
Alarm( PRINT, "--------------------\n" );
Alarm( PRINT, "Configuration at %s is:\n", My.name );
Alarm( PRINT, "Num Segments %d\n",config->num_segments );
for ( s=0; s < config->num_segments; s++ )
{
- Conf_id_to_str( config->segments[s].bcast_address, ip );
+ net = (struct mp_net *) config->segments[s].net_head->data;
+ Conf_id_to_str( ntohl (net->bcast.s_addr), ip );
Alarm( PRINT, "\t%d\t%-16s %hd\n",
config->segments[s].num_procs, ip,
config->segments[s].port );
Index: configuration.h
===================================================================
RCS file: /CVSROOT/src/components/spread/configuration.h,v
retrieving revision 1.1.1.1
retrieving revision 1.2
diff -u -r1.1.1.1 -r1.2
--- configuration.h 2001/09/24 15:26:54 1.1.1.1
+++ configuration.h 2001/09/24 17:03:06 1.2
@@ -36,6 +36,7 @@
#include "arch.h"
#include "spread_params.h"
+#include "multipath.h"
/* For what spread services should listen on what interfaces */
@@ -61,15 +62,16 @@
int16 seg_index;
int16 index_in_seg;
int32u id;
+ struct mp_node *node;
int num_if;
struct if_info ifc[MAX_INTERFACES_PROC];
} proc;
typedef struct dummy_segment{
- int32 bcast_address;
int16 port;
int num_procs;
proc *procs[MAX_PROCS_SEGMENT];
+ struct list *net_head;
} segment;
typedef struct dummy_configuration{
Index: data_link.c
===================================================================
RCS file: /CVSROOT/src/components/spread/data_link.c,v
retrieving revision 1.1.1.1
retrieving revision 1.2
diff -u -r1.1.1.1 -r1.2
--- data_link.c 2001/09/24 15:26:54 1.1.1.1
+++ data_link.c 2001/09/24 17:03:06 1.2
@@ -58,13 +58,16 @@
#include "status.h"
#include "alarm.h"
#include "sp_events.h" /* for sp_time */
+#include "multipath.h"
-channel DL_init_channel( int32 channel_type, int16 port, int32 mcast_address, int32 interface_address )
+channel DL_init_channel( int32 channel_type, unsigned short port,
+ struct in_addr mcast_address, struct in_addr interface_address )
{
channel chan;
struct sockaddr_in soc_addr;
int on=1;
int i1,i2,i3,i4;
+ int32 mcast_addr;
unsigned char ttl_val;
if((chan = socket(AF_INET, SOCK_DGRAM, 0)) == -1)
@@ -93,30 +96,29 @@
{
soc_addr.sin_family = AF_INET;
soc_addr.sin_port = htons(port);
- if (interface_address == 0)
- soc_addr.sin_addr.s_addr= INADDR_ANY;
- else
- soc_addr.sin_addr.s_addr= htonl(interface_address);
+ soc_addr.sin_addr = interface_address;
if(bind( chan, (struct sockaddr *) &soc_addr,
sizeof(soc_addr)) == -1)
{
- Alarm( PRINT, "DL_init_channel: bind error for port %d, already running \n",port);
+ Alarm( PRINT, "DL_init_channel: bind error for port %d, already running \n", port);
exit(0);
}
Alarm( DATA_LINK, "DL_init_channel: bind for recv_channel for port %d with chan %d ok\n",
port, chan);
- i1 = (mcast_address >> 24) & 0x000000ff;
- i2 = (mcast_address >> 16) & 0x000000ff;
- i3 = (mcast_address >> 8) & 0x000000ff;
- i4 = mcast_address & 0x000000ff;
+ mcast_addr = ntohl (mcast_address.s_addr);
+
+ i1 = (mcast_addr >> 24) & 0x000000ff;
+ i2 = (mcast_addr >> 16) & 0x000000ff;
+ i3 = (mcast_addr >> 8) & 0x000000ff;
+ i4 = mcast_addr & 0x000000ff;
if( i1 >=224 && i1 < 240 )
{
#ifndef ARCH_SPARC_SUNOS /* no support for IP multicast in old SunOS */
struct ip_mreq mreq;
- mreq.imr_multiaddr.s_addr = htonl( mcast_address );
+ mreq.imr_multiaddr = mcast_address;
/* the interface could be changed to a specific interface if needed */
mreq.imr_interface.s_addr = INADDR_ANY;
@@ -124,7 +126,7 @@
if (setsockopt(chan, IPPROTO_IP, IP_ADD_MEMBERSHIP, (void *)&mreq,
sizeof(mreq)) < 0)
{
- Alarm( EXIT, "DL_init_channel: problem in setsockopt to multicast address %d\n", mcast_address );
+ Alarm( EXIT, "DL_init_channel: problem in setsockopt to multicast address %d\n", mcast_addr );
}
Alarm( DATA_LINK, "DL_init_channel: Joining multicast address %d.%d.%d.%d went ok\n",i1,i2,i3,i4);
@@ -147,7 +149,9 @@
}
}
-int DL_send( channel chan, int32 address, int16 port, sys_scatter *scat )
+
+int DL_send_one ( channel chan, struct in_addr address,
+ unsigned short port, sys_scatter *scat )
{
#ifndef ARCH_SCATTER_NONE
@@ -168,7 +172,7 @@
assert(scat->num_elements <= ARCH_SCATTER_SIZE);
soc_addr.sin_family = AF_INET;
- soc_addr.sin_addr.s_addr= htonl(address);
+ soc_addr.sin_addr = address;
soc_addr.sin_port = htons(port);
#ifdef ARCH_PC_HOME
@@ -216,26 +220,65 @@
if(ret < 0) {
/* delay for a short while */
send_errormsg = strerror(errno);
- Alarm( DATA_LINK, "DL_send: delaying after failure in send to %d.%d.%d.%d, ret is %d\n",
- IP1(address), IP2(address), IP3(address), IP4(address), ret);
+ Alarm( DATA_LINK, "DL_send_one: delaying after failure in send to %s, ret is %d\n",
+ inet_ntoa (address), ret);
select( 0, 0, 0, 0, (struct timeval *)&select_delay );
}
}
if (ret < 0)
{
for( i=0; i < scat->num_elements; i++)
- Alarm( DATA_LINK, "DL_send: element[%d]: %d bytes\n",
+ Alarm( DATA_LINK, "DL_send_one: element[%d]: %d bytes\n",
i,scat->elements[i].len);
- Alarm( DATA_LINK, "DL_send: error: %s\n sending %d bytes on channel %d to address %d.%d.%d.%d\n",
- send_errormsg, total_len,chan,IP1(address), IP2(address), IP3(address), IP4(address) );
+ Alarm( DATA_LINK, "DL_send_one: error: %s\n sending %d bytes on channel %d to address %s\n",
+ send_errormsg, total_len,chan,inet_ntoa (address) );
}else if(ret < total_len){
- Alarm( DATA_LINK, "DL_send: partial sending %d out of %d\n",
+ Alarm( DATA_LINK, "DL_send_one: partial sending %d out of %d\n",
ret,total_len);
}
- Alarm( DATA_LINK, "DL_send: sent a message of %d bytes to (%d.%d.%d.%d,%d) on channel %d\n",
- ret,IP1(address), IP2(address),IP3(address), IP4(address),port,chan);
+ Alarm( DATA_LINK, "DL_send_one: sent a message of %d bytes to (%s) on channel %d\n",
+ ret,inet_ntoa (address),port,chan);
return(ret);
+}
+
+int DL_send (channel chan,
+ struct mp_node *mynode,
+ struct mp_node *target,
+ unsigned short target_port,
+ sys_scatter *scat)
+{
+ struct mp_net *net;
+ struct mp_intf *intf;
+ struct list *tmp;
+ int ret = -1, r = 0;
+
+ /* For each of my own interfaces, find the corresponding network. If
+ the target node have an interface on this network, send message
+ to this interface. If no target has been specified, just
+ broadcast once per network */
+
+ for (tmp = mynode->intf_head; tmp; tmp = tmp->next)
+ {
+ intf = (struct mp_intf *) tmp->data;
+ net = intf->net;
+
+ if (target)
+ {
+ if ((intf = intf_get (target, net)))
+ {
+ r = DL_send_one (chan, intf->addr, target_port, scat);
+ }
+ }
+ else
+ {
+ r = DL_send_one (chan, net->bcast, target_port, scat);
+ }
+
+ ret = MAX (ret, r);
+ }
+
+ return ret;
}
int DL_recv( channel chan, sys_scatter *scat )
Index: data_link.h
===================================================================
RCS file: /CVSROOT/src/components/spread/data_link.h,v
retrieving revision 1.1.1.2
retrieving revision 1.3
diff -u -r1.1.1.2 -r1.3
--- data_link.h 2001/10/01 08:51:21 1.1.1.2
+++ data_link.h 2001/10/03 13:53:12 1.3
@@ -36,15 +36,20 @@
#include "arch.h"
#include "scatter.h"
+#include "multipath.h"
#define MAX_PACKET_SIZE 1472 /*1472 = 1536-64 (of udp)*/
#define SEND_CHANNEL 0x00000001
#define RECV_CHANNEL 0x00000002
-channel DL_init_channel( int32 channel_type, int16 port, int32 mcast_address, int32 interface_address );
+channel DL_init_channel( int32 channel_type, unsigned short port,
+ struct in_addr mcast_address, struct in_addr interface_address );
void DL_close_channel(channel chan);
-int DL_send( channel chan, int32 address, int16 port, sys_scatter *scat );
+int DL_send_one ( channel chan, struct in_addr address,
+ unsigned short port, sys_scatter *scat );
+int DL_send (channel chan, struct mp_node *mynode,
+ struct mp_node *target, unsigned short target_port, sys_scatter *scat);
int DL_recv( channel chan, sys_scatter *scat );
#endif /* INC_DATA_LINK */
Index: groups.c
===================================================================
RCS file: /CVSROOT/src/components/spread/groups.c,v
retrieving revision 1.1.1.2
retrieving revision 1.3
diff -u -r1.1.1.2 -r1.3
--- groups.c 2001/10/01 08:51:23 1.1.1.2
+++ groups.c 2001/10/03 13:53:12 1.3
@@ -683,7 +683,7 @@
if( grp == NULL )
{
new_grp = new( GROUP );
- memset( new_grp->name, MAX_GROUP_NAME, 0 );
+ memset( new_grp->name, 0, MAX_GROUP_NAME );
strcpy( new_grp->name, group_name );
sl_init( &new_grp->MembersList );
sl_set_compare( &new_grp->MembersList,
@@ -711,7 +711,7 @@
}
/* Add a new member as ESTABLISHED (might change later on depending on the situation */
new_mbr = new( MEMBER );
- memset( new_mbr->private_name, MAX_GROUP_NAME, 0 );
+ memset( new_mbr->private_name, 0, MAX_GROUP_NAME );
strcpy( new_mbr->private_name, private_group_name );
new_mbr->proc_id = new_p.id;
new_mbr->status = ESTABLISHED_MEMBER;
@@ -1469,7 +1469,7 @@
if( orig_grp == NULL )
{
new_grp = new( GROUP );
- memset( new_grp->name, MAX_GROUP_NAME, 0 );
+ memset( new_grp->name, 0, MAX_GROUP_NAME );
strcpy( new_grp->name, this_group->name );
new_grp->grp_id = this_group->grp_id;
Index: monitor.c
===================================================================
RCS file: /CVSROOT/src/components/spread/monitor.c,v
retrieving revision 1.1.1.2
retrieving revision 1.3
diff -u -r1.1.1.2 -r1.3
--- monitor.c 2001/10/01 08:51:26 1.1.1.2
+++ monitor.c 2001/10/03 13:53:13 1.3
@@ -94,6 +94,7 @@
{
int i;
channel ch;
+ struct in_addr dummy_addr;
fclose(stderr);
@@ -171,9 +172,11 @@
Report_scat.elements[1].buf = (char *)&GlobalStatus;
Report_scat.elements[1].len = sizeof(status);
- SendChan = DL_init_channel( SEND_CHANNEL , My_port, 0, 0 );
+ dummy_addr.s_addr = INADDR_ANY;
+
+ SendChan = DL_init_channel( SEND_CHANNEL , My_port, dummy_addr, dummy_addr );
- ch = DL_init_channel( RECV_CHANNEL, My_port, 0, 0 );
+ ch = DL_init_channel( RECV_CHANNEL, My_port, dummy_addr, dummy_addr );
E_attach_fd( ch, READ_FD, Report_message, 0, NULL, HIGH_PRIORITY );
Print_menu();
@@ -403,8 +406,8 @@
{
proc_id = Cn.segments[i].procs[j]->id;
proc_index = Conf_proc_by_id( proc_id, &p );
- DL_send( SendChan, p.id, p.port, &Pack_scat );
- DL_send( SendChan, p.id, p.port, &Pack_scat );
+ DL_send( SendChan, My.node, p.node, p.port, &Pack_scat );
+ DL_send( SendChan, My.node, p.node, p.port, &Pack_scat );
}
}
E_queue( Send_partition, 0, NULL, Send_partition_timeout );
@@ -522,8 +525,8 @@
{
proc_id = Cn.segments[i].procs[j]->id;
proc_index = Conf_proc_by_id( proc_id, &p );
- DL_send( SendChan, p.id, p.port, &Pack_scat );
- DL_send( SendChan, p.id, p.port, &Pack_scat );
+ DL_send( SendChan, My.node, p.node, p.port, &Pack_scat );
+ DL_send( SendChan, My.node, p.node, p.port, &Pack_scat );
}
}
}
@@ -609,7 +612,7 @@
proc_index = Conf_proc_by_id( proc_id, &p );
if( Status_vector[proc_index] )
{
- DL_send( SendChan, p.id, p.port, &Pack_scat );
+ DL_send( SendChan, My.node, p.node, p.port, &Pack_scat );
}
}
}
@@ -682,8 +685,8 @@
if( Kill_partition[proc_index] == -1 )
{
Alarm( PRINT , "Monitor: Terminating %s\n", p.name );
- DL_send( SendChan, p.id, p.port, &Pack_scat );
- DL_send( SendChan, p.id, p.port, &Pack_scat );
+ DL_send( SendChan, My.node, p.node, p.port, &Pack_scat );
+ DL_send( SendChan, My.node, p.node, p.port, &Pack_scat );
}
}
}
Index: multipath.c
===================================================================
RCS file: multipath.c
diff -N multipath.c
--- /dev/null Thu Oct 4 14:48:12 2001
+++ multipath.c Thu Oct 4 14:49:06 2001
@@ -0,0 +1,163 @@
+#include <stdlib.h>
+#include <string.h>
+#include "arch.h"
+
+#ifndef ARCH_PC_WIN95
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <sys/uio.h>
+/* for select */
+#include <sys/time.h>
+#include <unistd.h>
+
+#else /* ARCH_PC_WIN95 */
+
+#include <winsock.h>
+
+#endif /* ARCH_PC_WIN95 */
+
+#include "multipath.h"
+#include "configuration.h"
+
+static struct list *net_head = NULL;
+static struct list *node_head = NULL;
+
+static void list_add (struct list **head, struct list *el)
+{
+ el->prev = NULL;
+ el->next = *head;
+ if (*head)
+ (*head)->prev = el;
+ *head = el;
+}
+
+/* Warning : If node have multiple interface on a net, intf_get only
+ returns the first... */
+struct mp_intf *intf_get (struct mp_node *node, struct mp_net *net)
+{
+ struct list *tmp;
+ struct mp_intf *intf;
+
+ for (tmp = net->intf_head; tmp; tmp = tmp->next)
+ {
+ intf = (struct mp_intf *) tmp->data;
+ if (intf->node == node)
+ return intf;
+ }
+
+ return NULL;
+}
+
+struct mp_net *net_get (char name[])
+{
+ struct list *tmp;
+ struct mp_net *net;
+
+ for (tmp = net_head; tmp; tmp = tmp->next)
+ {
+ net = (struct mp_net *) tmp->data;
+ if (!strcmp (name, net->net_name))
+ return net;
+ }
+
+ return NULL;
+}
+
+struct mp_node *node_get (char name[])
+{
+ struct list *tmp;
+ struct mp_node *node;
+
+ for (tmp = node_head; tmp; tmp = tmp->next)
+ {
+ node = (struct mp_node *) tmp->data;
+ if (!strcmp (name, node->node_name))
+ return node;
+ }
+
+ return NULL;
+}
+
+struct mp_intf *intf_add (struct mp_node *node,
+ struct mp_net *net,
+ struct if_info *ifc)
+{
+ struct mp_intf *intf;
+
+ intf = intf_get (node, net);
+
+ if (!intf)
+ {
+ intf = (struct mp_intf *) malloc (sizeof (struct mp_intf));
+ memset ((void *) intf, 0, sizeof (struct mp_intf));
+ intf->node = node;
+ intf->net = net;
+ intf->intf_this_net.data = intf->intf_this_node.data = (void *) intf;
+ list_add (&node->intf_head, &intf->intf_this_node);
+ list_add (&net->intf_head, &intf->intf_this_net);
+ }
+
+ if (ifc)
+ {
+ intf->ifc = ifc;
+ intf->addr.s_addr = htonl (ifc->ip);
+ }
+
+ return intf;
+}
+
+void net_insert (struct mp_net *net)
+{
+ net->net_this_segment.data = net->global_list.data = (void *) net;
+ list_add (&net_head, &net->global_list);
+ list_add (&net->segment->net_head, &net->net_this_segment);
+}
+
+struct mp_net *net_add (char name[],
+ struct in_addr bcast,
+ struct in_addr netmask,
+ struct dummy_segment *seg)
+{
+ struct mp_net *net;
+
+ net = net_get (name);
+
+ if (!net)
+ {
+ net = (struct mp_net *) malloc (sizeof (struct mp_net));
+ memset ((void *) net, 0, sizeof (struct mp_net));
+ strcpy (net->net_name, name);
+ net->segment = seg;
+ net_insert (net);
+ }
+
+ if (bcast.s_addr)
+ {
+ net->bcast = bcast;
+ net->netmask = netmask;
+ }
+
+ return net;
+}
+
+struct mp_node *node_add (char name[], struct dummy_proc *proc)
+{
+ struct mp_node *node;
+
+ node = node_get (name);
+
+ if (!node)
+ {
+ node = (struct mp_node *) malloc (sizeof (struct mp_node));
+ memset ((void *) node, 0, sizeof (struct mp_node));
+ strcpy (node->node_name, name);
+ node->proc = proc;
+ node->global_list.data = (void *) node;
+ list_add (&node_head, &node->global_list);
+ }
+
+ return node;
+}
Index: multipath.h
===================================================================
RCS file: multipath.h
diff -N multipath.h
--- /dev/null Thu Oct 4 14:48:12 2001
+++ multipath.h Thu Oct 4 14:49:06 2001
@@ -0,0 +1,78 @@
+#ifndef _MULTIPATH_H
+#define _MULTIPATH_H
+
+#include "arch.h"
+
+#ifndef ARCH_PC_WIN95
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+
+#else /* ARCH_PC_WIN95 */
+
+#include <winsock.h>
+
+#endif /* ARCH_PC_WIN95 */
+
+struct mp_net;
+struct mp_node;
+struct dummy_segment;
+struct dummy_proc;
+struct if_info;
+
+struct list
+{
+ struct list *prev;
+ struct list *next;
+ void *data;
+};
+
+struct mp_intf
+{
+ struct in_addr addr; /* Unicast address for directed send */
+ struct if_info *ifc;
+ struct mp_net *net;
+ struct mp_node *node;
+ struct list intf_this_net;
+ struct list intf_this_node;
+};
+
+struct mp_net
+{
+ char net_name[256];
+ struct in_addr bcast;
+ struct in_addr netmask;
+ struct dummy_segment *segment;
+ struct list *intf_head;
+ struct list net_this_segment;
+ struct list global_list;
+};
+
+struct mp_node
+{
+ char node_name[256];
+ unsigned short port; /* Port for directed send */
+ struct dummy_proc *proc;
+ struct list *intf_head;
+ struct list global_list;
+};
+
+struct mp_intf *intf_get (struct mp_node *node, struct mp_net *net);
+struct mp_net *net_get (char name[]);
+struct mp_node *node_get (char name[]);
+struct mp_intf *intf_add (struct mp_node *node,
+ struct mp_net *net,
+ struct if_info *ifc);
+void net_insert (struct mp_net *net);
+struct mp_net *net_add (char name[],
+ struct in_addr bcast,
+ struct in_addr netmask,
+ struct dummy_segment *seg);
+struct mp_node *node_add (char name[], struct dummy_proc *);
+struct mp_node *node_get (char name[]);
+
+#undef MAX
+#define MAX(a, b) (((a) > (b)) ? (a) : (b))
+
+#endif
Index: network.c
===================================================================
RCS file: /CVSROOT/src/components/spread/network.c,v
retrieving revision 1.1.1.2
retrieving revision 1.4
diff -u -r1.1.1.2 -r1.4
--- network.c 2001/10/01 08:51:27 1.1.1.2
+++ network.c 2001/10/04 12:48:09 1.4
@@ -49,12 +49,11 @@
static int Num_bcast_channels;
static int Bcast_needed;
-static int32 Bcast_address;
static int16 Bcast_port;
static int Num_send_needed;
-static int32 Send_address[MAX_SEGMENTS];
-static int16 Send_ports[MAX_SEGMENTS];
+static struct mp_node *Send_address[MAX_SEGMENTS];
+static unsigned short Send_ports[MAX_SEGMENTS];
/* ### Pack: 3 lines */
/* Global in function so both Net_queue_bcast and Net_flush_bcast can access them */
@@ -63,7 +62,7 @@
static const char align_padding[4] = "padd";
/* address for token sending - which is always needed */
-static int32 Token_address;
+static struct mp_node *Token_address;
static int16 Token_port;
static configuration Net_membership;
@@ -86,9 +85,12 @@
void Net_init()
{
proc dummy_proc;
- int32u interface_addr;
- int i;
+ struct in_addr interface_addr, addr_any;
+ struct mp_intf *intf;
+ struct list *tmp;
+ int i = 0;
+ addr_any.s_addr = INADDR_ANY;
Cn = Conf();
My = Conf_my();
@@ -99,32 +101,29 @@
{
/* I am not allone in segment */
Bcast_needed = 1;
- Bcast_address = Cn.segments[My.seg_index].bcast_address;
Bcast_port = My.port;
-
- Alarm( NETWORK, "Net_init: Bcast needed to address (%d, %d)\n",
- Bcast_address, Bcast_port );
+ Alarm( NETWORK, "Net_init: Bcast needed to port %d\n",
+ Bcast_port );
}else{
Bcast_needed = 0;
- Bcast_address = 0;
Alarm( NETWORK, "Net_init: Bcast is not needed\n" );
}
- for ( i=0; i < My.num_if; i++)
- {
- if (Is_IfType_Daemon( My.ifc[i].type ) || Is_IfType_Any( My.ifc[i].type ) )
- {
- if (Is_IfType_Any( My.ifc[i].type ) )
- interface_addr = 0;
- else
- interface_addr = My.ifc[i].ip;
-
- Bcast_channel[i] = DL_init_channel( RECV_CHANNEL, My.port, Bcast_address, interface_addr );
- Token_channel[i] = DL_init_channel( RECV_CHANNEL, My.port+1, 0, interface_addr );
- Num_bcast_channels++;
- }
- }
+ for (tmp = My.node->intf_head; tmp; tmp = tmp->next)
+ {
+ /* We know for sure that this interface is ANY or DAEMON */
+ intf = (struct mp_intf *) tmp->data;
+ if (Is_IfType_Any (intf->ifc->type))
+ interface_addr.s_addr = INADDR_ANY;
+ else
+ interface_addr = intf->addr;
+
+ Bcast_channel[i] = DL_init_channel( RECV_CHANNEL, My.port, intf->net->bcast, interface_addr );
+ Token_channel[i] = DL_init_channel( RECV_CHANNEL, My.port+1, addr_any, interface_addr );
+ i++;
+ Num_bcast_channels++;
+ }
- Send_channel = DL_init_channel( SEND_CHANNEL, My.port+2, 0, My.id );
+ Send_channel = DL_init_channel( SEND_CHANNEL, My.port + 2, addr_any, addr_any );
Num_send_needed = 0;
}
@@ -162,7 +161,7 @@
} else if( Net_membership.segments[i].num_procs > 0 ) {
- Send_address[Num_send_needed] = Net_membership.segments[i].procs[0]->id;
+ Send_address[Num_send_needed] = Net_membership.segments[i].procs[0]->node;
Send_ports [Num_send_needed] = Net_membership.segments[i].port;
Num_send_needed++;
@@ -175,10 +174,10 @@
i, Send_address[i], Send_ports[i] );
/* Calculate where to send the token */
- Token_address = 0;
+ Token_address = NULL;
if( my_index_in_seg < My_seg.num_procs-1 )
{
- Token_address = My_seg.procs[my_index_in_seg+1]->id;
+ Token_address = My_seg.procs[my_index_in_seg+1]->node;
Token_port = My_seg.port+1;
}else{
/* I am last in my segment */
@@ -188,7 +187,7 @@
* My segment is the only segment
* sending token to the first in my segment
*/
- Token_address = My_seg.procs[0]->id;
+ Token_address = My_seg.procs[0]->node;
Token_port = My_seg.port+1;
} else if( Num_send_needed == my_next_index ) {
/*
@@ -207,8 +206,8 @@
}
}
- Alarm( NETWORK, "Net_set_membership: Token_address : (%d, %d)\n",
- Token_address, Token_port );
+ Alarm( NETWORK, "Net_set_membership: Token_address : (%s, %d)\n",
+ Token_address->node_name, Token_port );
}
int Net_bcast( sys_scatter *scat )
@@ -225,14 +224,14 @@
pack_ptr->transmiter_id = My.id;
for ( i=0; i< Num_send_needed; i++ )
{
- ret = DL_send( Send_channel, Send_address[i], Send_ports[i], scat );
+ ret = DL_send( Send_channel, My.node, Send_address[i], Send_ports[i], scat );
}
pack_ptr->type = Clear_routed( pack_ptr->type );
/* broadcasting if needed according to configuration */
if( Bcast_needed )
{
- ret = DL_send( Send_channel, Bcast_address, Bcast_port, scat );
+ ret = DL_send( Send_channel, My.node, NULL, Bcast_port, scat );
}
return( ret );
}
@@ -325,7 +324,7 @@
for ( i=0; i< Num_send_needed; i++ )
{
- ret = DL_send( Send_channel, Send_address[i], Send_ports[i], &Queue_scat );
+ ret = DL_send( Send_channel, My.node, Send_address[i], Send_ports[i], &Queue_scat );
}
pack_ptr = (packet_header *)Queue_scat.elements[0].buf;
pack_ptr->type = Clear_routed( pack_ptr->type );
@@ -333,7 +332,7 @@
/* broadcasting if needed according to configuration */
if( Bcast_needed )
{
- ret = DL_send( Send_channel, Bcast_address, Bcast_port, &Queue_scat );
+ ret = DL_send( Send_channel, My.node, NULL, Bcast_port, &Queue_scat );
}
Queue_scat.num_elements = 0;
Queued_bytes = 0;
@@ -353,14 +352,14 @@
{
if( Bcast_needed )
{
- ret = DL_send( Send_channel, Bcast_address, Bcast_port, scat );
+ ret = DL_send( Send_channel, My.node, NULL, Bcast_port, scat );
}
}else{
if( Net_membership.segments[seg_index].num_procs > 0 )
{
pack_ptr->type = Set_routed( pack_ptr->type );
- ret = DL_send( Send_channel,
- Net_membership.segments[seg_index].procs[0]->id,
+ ret = DL_send( Send_channel, My.node,
+ Net_membership.segments[seg_index].procs[0]->node,
Net_membership.segments[seg_index].port,
scat );
pack_ptr->type = Clear_routed( pack_ptr->type );
@@ -384,7 +383,7 @@
Alarm( PRINT, "Net_ucast: non existing proc_id %d\n",proc_id );
return( ret );
}
- ret = DL_send( Send_channel, proc_id, p.port, scat );
+ ret = DL_send( Send_channel, My.node, p.node, p.port, scat );
return( ret );
}
@@ -411,7 +410,6 @@
if (ch_found == FALSE) {
Alarm(EXIT, "Net_recv: Listening and received packet on un-used interface %d\n", fd);
}
-
received_bytes = DL_recv( fd, scat );
if( received_bytes <= 0 ) return( received_bytes );
@@ -489,7 +487,7 @@
/* fliping to original form */
if( !Same_endian( pack_ptr->type ) ) Flip_pack( pack_ptr );
- DL_send( Send_channel, Bcast_address, Bcast_port, scat );
+ DL_send( Send_channel, My.node, NULL, Bcast_port, scat );
/* re-fliping to my form */
if( !Same_endian( pack_ptr->type ) ) Flip_pack( pack_ptr );
@@ -573,7 +571,7 @@
token_ptr = (token_header *)scat->elements[0].buf;
token_ptr->type = Set_endian( token_ptr->type );
token_ptr->transmiter_id = My.id;
- ret = DL_send( Send_channel, Token_address, Token_port, scat );
+ ret = DL_send( Send_channel, My.node, Token_address, Token_port, scat );
return ( ret );
}
@@ -626,7 +624,7 @@
proc_id );
return( ret );
}
- ret = DL_send( Send_channel, proc_id, p.port+1, scat );
+ ret = DL_send( Send_channel, My.node, p.node, p.port+1, scat );
return( ret );
}
int Net_num_channels()
Index: status.c
===================================================================
RCS file: /CVSROOT/src/components/spread/status.c,v
retrieving revision 1.1.1.1
retrieving revision 1.2
diff -u -r1.1.1.1 -r1.2
--- status.c 2001/09/24 15:27:03 1.1.1.1
+++ status.c 2001/09/24 17:03:06 1.2
@@ -38,6 +38,7 @@
#include "spread_params.h"
#include "scatter.h"
#include "net_types.h"
+#include "multipath.h"
#include "data_link.h"
#include "configuration.h"
#include "status.h"
@@ -54,10 +55,12 @@
void Stat_init()
{
int16 dummy_port = 0;
+ struct in_addr addr;
Start_time = E_get_time();
- Report_channel = DL_init_channel( SEND_CHANNEL, dummy_port, 0, Conf_my().id );
+ addr.s_addr = INADDR_ANY;
+ Report_channel = DL_init_channel( SEND_CHANNEL, dummy_port, addr, addr );
Report_scat.num_elements = 2;
Report_scat.elements[0].len = sizeof( packet_header );
@@ -87,6 +90,7 @@
packet_header *pack_ptr;
proc p;
int ret;
+ struct in_addr addr;
pack_ptr = (packet_header *)scat->elements[0].buf;
if( ! ( pack_ptr->memb_id.proc_id == 15051963 && Conf_id_in_conf( &Cn, pack_ptr->proc_id ) != -1 ) )
@@ -98,8 +102,8 @@
now = E_get_time();
delta = E_sub_time( now, Start_time );
GlobalStatus.sec = delta.sec;
-
- DL_send( Report_channel, pack_ptr->proc_id, pack_ptr->seq, &Report_scat );
+ addr.s_addr = ntohl (pack_ptr->proc_id);
+ DL_send_one ( Report_channel, addr, pack_ptr->seq, &Report_scat );
ret = Conf_proc_by_id( pack_ptr->proc_id, &p );
if( ret < 0 )
Alarm( STATUS,
--
And don't forget you'll never get a dog to walk upright
Just 'cause you've got the power, that don't mean you've got the right.
More information about the Spread-users
mailing list