[Spread-cvs] commit: r236 - trunk/daemon
jonathan at spread.org
jonathan at spread.org
Mon Jun 13 01:11:53 EDT 2005
Author: jonathan
Date: 2005-06-13 01:11:53 -0400 (Mon, 13 Jun 2005)
New Revision: 236
Modified:
trunk/daemon/Changelog
trunk/daemon/conf_body.h
trunk/daemon/config_parse.y
trunk/daemon/configuration.c
trunk/daemon/configuration.h
trunk/daemon/flow_control.c
trunk/daemon/log.c
trunk/daemon/membership.c
trunk/daemon/membership.h
trunk/daemon/monitor.c
trunk/daemon/net_types.h
trunk/daemon/network.c
trunk/daemon/network.h
trunk/daemon/protocol.c
trunk/daemon/session.c
trunk/daemon/session.h
trunk/daemon/status.c
Log:
Initial dynamic configuration patch. This works but needs a bit of
tuning and a few tweaks still.
To try it out, start up several daemons with identical spread.conf
files, then while they are running, edit the spread.conf to add a new
machine and copy this new spread.conf to all nodes. Then using
spmonitor, send the "reload config" command. If you examine the logs for
the daemons you should find that they have reconfigured and now include
the new host. You can then start up a daemon on that host and it will
merge with the rest.
Modified: trunk/daemon/Changelog
===================================================================
--- trunk/daemon/Changelog 2005-06-13 04:41:15 UTC (rev 235)
+++ trunk/daemon/Changelog 2005-06-13 05:11:53 UTC (rev 236)
@@ -1,3 +1,53 @@
+Wed Jun 1 16:05:34 2005 Jonathan Stanton <jonathan at cnds.jhu.edu>
+
+ * session.c (Sess_init): Change ACTIVATE_PORT_REUSE() from
+ #define macro to regular function Sess_activate_port_reuse().
+
+Mon May 30 15:26:57 2005 Jonathan Stanton <jonathan at cnds.jhu.edu>
+
+ * monitor.c: Add "Reload" command to menu. This sends a packet
+ with type RELOAD_TYPE to all daemons to trigger a conf file
+ reload.
+
+ * log.c, membership.c, session.c:
+ Remove all static copies of the My struct from Conf_my()
+ to avoid old .seg_index and .index_in_seg values when conf changes.
+
+ * flow_control.c, log.c, network.c, membership.c, status.c:
+ Use dynamic refs to Conf instead of static copies.
+ Cleanup conf use to use functions instead of direct data
+ structure access when possible.
+
+ * configuration.c (Conf_reload_initiate): Add function to reload
+ configuration files and parse what type of changes were made
+ to the configuration to know if a singleton partition is needed.
+
+ * membership.c (Lookup_new_members): Change name to
+ Memb_lookup_new_members() and make public function.
+
+ * configuration.c, conf_body.h, config_parse.y: Make Config and
+ Config_procs pointers instead of static structures so they can be
+ reallocated upon config reload.
+
+ * configuration.c (Conf_in_reload_state): Add Conf_reload_state_*()
+ functions to query, set and reset the state. Should be true when
+ configuration is being reloaded to prevent other operations that
+ might interfere.
+
+ * conf_body.h: Remove skiplist.h and unused Skiplist declarations.
+
+ * configuration.c, configuration.h (Conf_init, Conf_load_conf_file):
+ Change return value to void since not used. Store filename and
+ hostname provided by user on command line in static buffer in
+ configuration so it can be used for later reloads of conf file.
+
+ * network.c, network.h (Net_signal_conf_reload): Add method to
+ signal network layer to configuration change.
+
+ * network.c, network.h (Net_recv): Make public Set_partition(),
+ Clear_partition() interface.
+
+
Thu Apr 7 23:47:37 2005 Jonathan Stanton <jonathan at cnds.jhu.edu>
* groups.c (G_handle_reg_memb): Fix misplaced closing } for
@@ -60,12 +110,12 @@
Fri Oct 29 01:18:05 2004 Jonathan Stanton <jonathan at cnds.jhu.edu>
- * groups.c, groups.h,sess_body.h,objects.h,sp.c,sp.h,user.c:
- Apply Ryan Caudy's major Group change to support multiple VS set reporting.
+ * groups.c, groups.h,sess_body.h,objects.h,sp.c,sp.h,user.c:
+ Apply Ryan Caudy's major Group change to support multiple VS set reporting.
- * javalib/Membership.java, javaapps/User.java, javaapps/recThread.java:
- Apply Ryan's change to java library to support VS sets. Change the
- example code of User to print out the new membership info.
+ * javalib/Membership.java, javaapps/User.java, javaapps/recThread.java:
+ Apply Ryan's change to java library to support VS sets. Change the
+ example code of User to print out the new membership info.
Fri May 07 10:53:03 2004 Ryan Caudy <rcaudy at gmail.com>
Modified: trunk/daemon/conf_body.h
===================================================================
--- trunk/daemon/conf_body.h 2005-06-13 04:41:15 UTC (rev 235)
+++ trunk/daemon/conf_body.h 2005-06-13 05:11:53 UTC (rev 236)
@@ -41,11 +41,11 @@
#define INC_CONF_BODY
#include "arch.h"
-#include "skiplist.h"
#include "configuration.h"
#include "spread_params.h"
int yyparse();
+void parser_init();
#undef ext
#ifndef ext_conf_body
@@ -54,12 +54,9 @@
#define ext
#endif
-ext configuration Config;
-ext int Num_procs;
+ext configuration *Config;
ext FILE *yyin;
-ext Skiplist ConfProcsbyname;
-ext Skiplist ConfProcsbyid;
-ext proc Config_procs[MAX_PROCS_RING];
+ext proc *Config_procs;
ext int LinkWeights[MAX_SEGMENTS][MAX_SEGMENTS];
#define MAX_CONF_STRING 20000
Modified: trunk/daemon/config_parse.y
===================================================================
--- trunk/daemon/config_parse.y 2005-06-13 04:41:15 UTC (rev 235)
+++ trunk/daemon/config_parse.y 2005-06-13 05:11:53 UTC (rev 236)
@@ -73,15 +73,24 @@
static int authentication_configured = 0;
+void parser_init()
+{
+ num_procs = 0;
+ segment_procs = 0;
+ segments = 0;
+ rvec_num = 0;
+ procs_interfaces = 0;
+}
+
static char *segment2str(int seg) {
static char ipstr[40];
- int id = Config.segments[seg].bcast_address;
+ int id = Config->segments[seg].bcast_address;
sprintf(ipstr, "%d.%d.%d.%d:%d",
(id & 0xff000000)>>24,
(id & 0xff0000)>>16,
(id & 0xff00)>>8,
(id & 0xff),
- Config.segments[seg].port);
+ Config->segments[seg].port);
return ipstr;
}
static void alarm_print_proc(proc *p, int port) {
@@ -316,8 +325,8 @@
%%
Config : ConfigStructs
{
- Config.num_segments = segments;
- Num_procs = num_procs;
+ Config->num_segments = segments;
+ Config->num_total_procs = num_procs;
Alarm(CONF, "Finished configuration file.\n");
Alarmp( SPLOG_INFO, CONF, "The full segment string is %d characters long:\n%s", strlen(ConfStringRep), ConfStringRep);
}
@@ -508,12 +517,12 @@
{ int i;
int added_len;
SEGMENT_CHECK( segments, inet_ntoa($2.ip.addr) );
- Config.segments[segments].num_procs = segment_procs;
- Config.segments[segments].port = $2.ip.port;
- Config.segments[segments].bcast_address =
+ Config->segments[segments].num_procs = segment_procs;
+ Config->segments[segments].port = $2.ip.port;
+ Config->segments[segments].bcast_address =
$2.ip.addr.s_addr;
- if(Config.segments[segments].port == 0)
- Config.segments[segments].port = DEFAULT_SPREAD_PORT;
+ if(Config->segments[segments].port == 0)
+ Config->segments[segments].port = DEFAULT_SPREAD_PORT;
Alarm(CONF, "Successfully configured Segment %d [%s] with %d procs:\n",
segments,
segment2str(segments),
@@ -522,13 +531,13 @@
/* This '1' is to keep each proc with the same port as the segment.*/
if( 1 || Config_procs[i].port==0) {
Config_procs[i].port=
- Config.segments[segments].port;
+ Config->segments[segments].port;
}
alarm_print_proc(&Config_procs[i],
- Config.segments[segments].port);
+ Config->segments[segments].port);
}
/* generate string representation of segment */
- added_len = convert_segment_to_string(&ConfStringRep[ConfStringLen], MAX_CONF_STRING - ConfStringLen, &Config.segments[segments] );
+ added_len = convert_segment_to_string(&ConfStringRep[ConfStringLen], MAX_CONF_STRING - ConfStringLen, &Config->segments[segments] );
if (added_len == -1 )
yyerror("Failed to update string with segment!\n");
ConfStringLen += added_len;
@@ -556,7 +565,7 @@
Config_procs[num_procs].seg_index = segments;
Config_procs[num_procs].index_in_seg = segment_procs;
Config_procs[num_procs].num_if = procs_interfaces;
- Config.segments[segments].procs[segment_procs] =
+ Config->segments[segments].procs[segment_procs] =
&Config_procs[num_procs];
num_procs++;
segment_procs++;
@@ -577,7 +586,7 @@
Config_procs[num_procs].seg_index = segments;
Config_procs[num_procs].index_in_seg = segment_procs;
Config_procs[num_procs].num_if = procs_interfaces;
- Config.segments[segments].procs[segment_procs] =
+ Config->segments[segments].procs[segment_procs] =
&Config_procs[num_procs];
num_procs++;
segment_procs++;
@@ -598,7 +607,7 @@
Config_procs[num_procs].ifc[0].ip = Config_procs[num_procs].id;
Config_procs[num_procs].ifc[0].port = Config_procs[num_procs].port;
Config_procs[num_procs].ifc[0].type = IFTYPE_ALL | IFTYPE_ANY;
- Config.segments[segments].procs[segment_procs] =
+ Config->segments[segments].procs[segment_procs] =
&Config_procs[num_procs];
num_procs++;
segment_procs++;
@@ -620,7 +629,7 @@
Config_procs[num_procs].ifc[0].ip = Config_procs[num_procs].id;
Config_procs[num_procs].ifc[0].port = Config_procs[num_procs].port;
Config_procs[num_procs].ifc[0].type = IFTYPE_ALL | IFTYPE_ANY;
- Config.segments[segments].procs[segment_procs] =
+ Config->segments[segments].procs[segment_procs] =
&Config_procs[num_procs];
num_procs++;
segment_procs++;
Modified: trunk/daemon/configuration.c
===================================================================
--- trunk/daemon/configuration.c 2005-06-13 04:41:15 UTC (rev 235)
+++ trunk/daemon/configuration.c 2005-06-13 05:11:53 UTC (rev 236)
@@ -101,6 +101,15 @@
static int Link_Protocol;
+static bool Conf_Reload_State = FALSE;
+static configuration *Config_Previous;
+static proc *Config_Previous_Procs;
+static char Conf_FileName[80];
+static char Conf_MyName_buf[80];
+static char *Conf_MyName;
+
+static int Conf_prev_proc_by_id( int32u id, proc *p );
+
/* Hash function for string to 32 bit int */
static LOC_INLINE int32u conf_hash_string(const void * key, int32u key_len)
{
@@ -124,19 +133,141 @@
-int Conf_init( char *file_name, char *my_name )
+void Conf_init( char *file_name, char *my_name )
{
- int ret;
+ strncpy(Conf_FileName, file_name, 80);
+ if (my_name != NULL) {
+ strncpy(Conf_MyName_buf, my_name, 80);
+ Conf_MyName = &Conf_MyName_buf[0];
+ } else {
+ Conf_MyName = NULL;
+ }
- ret = Conf_load_conf_file( file_name, my_name);
+ Config = Mem_alloc( sizeof( configuration ) );
+ if (Config == NULL) {
+ Alarmp( SPLOG_FATAL, CONF, "Conf_init: Failed to allocate memory for configuration structure\n");
+ }
+ Config_procs = Mem_alloc( MAX_PROCS_RING * sizeof( proc ) );
+ if (Config_procs == NULL) {
+ Alarmp( SPLOG_FATAL, CONF, "Conf_init: Failed to allocate memory for configuration procs array\n");
+ }
- return(ret);
+ Conf_load_conf_file( file_name, my_name);
}
+bool Conf_in_reload_state(void)
+{
+ return(Conf_Reload_State);
+}
+void Conf_reload_state_begin(void)
+{
-int Conf_load_conf_file( char *file_name, char *my_name )
+ Conf_Reload_State = TRUE;
+}
+
+void Conf_reload_state_end(void)
{
+
+ Conf_Reload_State = FALSE;
+}
+
+/* Basic algorithm:
+ * 1) copy Config to oldConfig
+ * 2) load new spread.conf file into Config
+ * 3) Check if we shuold exit;
+ * 4) Check if this change is only add/sub or not. Return answer
+ */
+bool Conf_reload_initiate(void)
+{
+ bool need_partition = FALSE;
+ proc np, op;
+ int i, pi;
+
+ Config_Previous = Config;
+ Config_Previous_Procs = Config_procs;
+
+ Config = Mem_alloc( sizeof( configuration ) );
+ if (Config == NULL) {
+ Alarmp( SPLOG_FATAL, CONF, "Conf_reload_initiate: Failed to allocate memory for configuration structure\n");
+ }
+ Config_procs = Mem_alloc( MAX_PROCS_RING * sizeof( proc ) );
+ if (Config_procs == NULL) {
+ Alarmp( SPLOG_FATAL, CONF, "Conf_reload_initiate: Failed to allocate memory for configuration procs array\n");
+ }
+
+ Conf_load_conf_file( Conf_FileName, Conf_MyName );
+
+ /* Exit if:
+ * 1) I am no longer in config
+ * 2) My IP/Name has changed
+ * 3) My broadcast/netmask has changed
+ */
+ if ( Conf_proc_by_id( My.id, &np ) < 0 ) {
+ /* I am no longer in config */
+ Alarmp(SPLOG_FATAL, CONF, "Conf_reload_initiate: I (%d.%d.%d.%d) am no longer in config, so exiting.\n", IP1(My.id), IP2(My.id), IP3(My.id), IP4(My.id));
+ }
+ if ( Conf_prev_proc_by_id( My.id, &op ) < 0 ) {
+ Alarmp(SPLOG_FATAL, CONF, "Conf_reload_initiate: BUG! I (%d.%d.%d.%d) am not in previous config, so exiting.\n", IP1(My.id), IP2(My.id), IP3(My.id), IP4(My.id));
+ }
+
+ if ( strncmp( np.name, op.name, MAX_PROC_NAME ) ||
+ (np.num_if != op.num_if) ||
+ (Config->segments[np.seg_index].bcast_address != Config_Previous->segments[op.seg_index].bcast_address) )
+ {
+ /* My identity has changed so exit */
+ Alarmp( SPLOG_FATAL, CONF, "Conf_reload_initiate: My identity has changed: old name (%s), num_if: %d, bcast: %d.%d.%d.%d\t new name (%s), num_if: %d, bcast: %d.%d.%d.%d\n", op.name, op.num_if, IP1(Config_Previous->segments[op.seg_index].bcast_address), IP2(Config_Previous->segments[op.seg_index].bcast_address), IP3(Config_Previous->segments[op.seg_index].bcast_address), IP4(Config_Previous->segments[op.seg_index].bcast_address), np.name, np.num_if, IP1(Config->segments[np.seg_index].bcast_address), IP2(Config->segments[np.seg_index].bcast_address), IP3(Config->segments[np.seg_index].bcast_address), IP4(Config->segments[np.seg_index].bcast_address) );
+ }
+ /* Check interfaces are identical */
+ for (i = 0 ; i < np.num_if; i++) {
+ if ( (np.ifc[i].ip != op.ifc[i].ip) ||
+ (np.ifc[i].port != op.ifc[i].port) ||
+ (np.ifc[i].type != op.ifc[i].type) )
+ {
+ Alarmp( SPLOG_FATAL, CONF, "Conf_reload_initiate: My interface spec has changed so must exit: old (%d.%d.%d.%d:%d - %d) new (%d.%d.%d.%d:%d - %d)\n", IP1(op.ifc[i].ip), IP1(op.ifc[i].ip), IP1(op.ifc[i].ip), IP1(op.ifc[i].ip), op.ifc[i].port, op.ifc[i].type, IP1(np.ifc[i].ip), IP1(np.ifc[i].ip), IP1(np.ifc[i].ip), IP1(np.ifc[i].ip), np.ifc[i].port, np.ifc[i].type );
+ }
+ }
+ /* Check if only new configuration contains only additions and subtractions of daemons and no changes */
+ for ( pi=0; pi < Config->num_total_procs; pi++ )
+ {
+ np = Config_procs[pi];
+ if ( Conf_prev_proc_by_id( np.id, &op ) < 0 ) {
+ Alarmp( SPLOG_INFO, CONF, "Conf_reload_initiate: Config Added daemon at %d.%d.%d.%d \n", IP1(np.id), IP2(np.id), IP3(np.id), IP4(np.id));
+ } else {
+ /* compare proc entries to check if identical */
+ if ( strncmp( np.name, op.name, MAX_PROC_NAME ) ||
+ (np.num_if != op.num_if) ||
+ (Config->segments[np.seg_index].bcast_address != Config_Previous->segments[op.seg_index].bcast_address) )
+ {
+ need_partition = TRUE;
+ Alarmp( SPLOG_DEBUG, CONF, "Conf_reload_initiate: identity of daemon %d.%d.%d.%d has changed: old name (%s), num_if: %d, bcast: %d.%d.%d.%d\t new name (%s), num_if: %d, bcast: %d.%d.%d.%d\n", IP1(np.id), IP2(np.id), IP3(np.id), IP4(np.id), op.name, op.num_if, IP1(Config_Previous->segments[op.seg_index].bcast_address), IP2(Config_Previous->segments[op.seg_index].bcast_address), IP3(Config_Previous->segments[op.seg_index].bcast_address), IP4(Config_Previous->segments[op.seg_index].bcast_address), np.name, np.num_if, IP1(Config->segments[np.seg_index].bcast_address), IP2(Config->segments[np.seg_index].bcast_address), IP3(Config->segments[np.seg_index].bcast_address), IP4(Config->segments[np.seg_index].bcast_address) );
+ }
+ /* Check interfaces are identical */
+ for (i = 0 ; i < np.num_if; i++) {
+ if ( (np.ifc[i].ip != op.ifc[i].ip) ||
+ (np.ifc[i].port != op.ifc[i].port) ||
+ (np.ifc[i].type != op.ifc[i].type) )
+ {
+ need_partition = TRUE;
+ Alarmp( SPLOG_DEBUG, CONF, "Conf_reload_initiate: daemon interface spec for %d.%d.%d.%d has changed.: old (%d.%d.%d.%d:%d - %d) new (%d.%d.%d.%d:%d - %d)\n", IP1(np.id), IP2(np.id), IP3(np.id), IP4(np.id), IP1(op.ifc[i].ip), IP1(op.ifc[i].ip), IP1(op.ifc[i].ip), IP1(op.ifc[i].ip), op.ifc[i].port, op.ifc[i].type, IP1(np.ifc[i].ip), IP1(np.ifc[i].ip), IP1(np.ifc[i].ip), IP1(np.ifc[i].ip), np.ifc[i].port, np.ifc[i].type );
+ }
+ }
+ } /* else */
+ } /* for */
+
+ /* free old config structs and arrays since they will never be used again */
+ dispose( Config_Previous );
+ dispose( Config_Previous_Procs );
+
+ Config_Previous = NULL;
+ Config_Previous_Procs = NULL;
+
+ Alarmp( SPLOG_DEBUG, CONF, "Conf_reload_initiate: Return need_partition = %d\n", need_partition);
+ return(need_partition);
+}
+
+void Conf_load_conf_file( char *file_name, char *my_name )
+{
struct hostent *host_ptr;
char machine_name[256];
char ip[16];
@@ -144,7 +275,6 @@
unsigned int name_len;
char configfile_location[MAXPATHLEN];
- Num_procs = 0;
/* Initialize hash string */
ConfStringRep[0] = '\0';
ConfStringLen = 0;
@@ -157,17 +287,20 @@
strcat(configfile_location, "/spread.conf");
if (NULL != (yyin = fopen(file_name,"r")) )
- Alarm( PRINT, "Conf_init: using file: %s\n", file_name);
+ Alarm( PRINT, "Conf_load_conf_file: using file: %s\n", file_name);
if (yyin == NULL)
if (NULL != (yyin = fopen("./spread.conf", "r")) )
- Alarm( PRINT, "Conf_init: using file: ./spread.conf\n");
+ Alarm( PRINT, "Conf_load_conf_file: using file: ./spread.conf\n");
if (yyin == NULL)
if (NULL != (yyin = fopen(configfile_location, "r")) )
- Alarm( PRINT, "Conf_init: using file: %s\n", configfile_location);
+ Alarm( PRINT, "Conf_load_conf_file: using file: %s\n", configfile_location);
if (yyin == NULL)
- Alarm( EXIT, "Conf_init: error opening config file %s\n",
+ Alarm( EXIT, "Conf_load_conf_file: error opening config file %s\n",
file_name);
+ /* reinitialize all the variables in the yacc parser */
+ parser_init();
+
yyparse();
fclose(yyin);
@@ -175,11 +308,11 @@
/* Test for localhost segemnt defined with other non-localhost segments.
* That is an invalid configuration
*/
- if ( Config.num_segments > 1 ) {
+ if ( Config->num_segments > 1 ) {
int found_localhost = 0;
int found_nonlocal = 0;
- for ( i=0; i < Config.num_segments; i++) {
- if ( ((Config.segments[i].bcast_address & 0xff000000) >> 24) == 127 ) {
+ for ( i=0; i < Config->num_segments; i++) {
+ if ( ((Config->segments[i].bcast_address & 0xff000000) >> 24) == 127 ) {
found_localhost = 1;
} else {
found_nonlocal = 1;
@@ -187,30 +320,30 @@
}
if (found_nonlocal && found_localhost) {
/* Both localhost and non-localhost segments exist. This is a non-functional config.*/
- Alarmp( SPLOG_PRINT, PRINT, "Conf_init: Invalid configuration:\n");
- Conf_print( &Config );
+ Alarmp( SPLOG_PRINT, PRINT, "Conf_load_conf_file: Invalid configuration:\n");
+ Conf_print( Config );
Alarmp( SPLOG_PRINT, PRINT, "\n");
- Alarmp( SPLOG_FATAL, CONF, "Conf_init: Localhost segments can not be used along with regular network address segments.\nMost likely you need to remove or comment out the \nSpread_Segment 127.0.0.255 {...}\n section of your configuration file.\n");
+ Alarmp( SPLOG_FATAL, CONF, "Conf_load_conf_file: Localhost segments can not be used along with regular network address segments.\nMost likely you need to remove or comment out the \nSpread_Segment 127.0.0.255 {...}\n section of your configuration file.\n");
}
}
/* calculate hash value of configuration.
* This daemon will only work with other daemons who have an identical hash value.
*/
- Config.hash_code = conf_hash_string(ConfStringRep, ConfStringLen);
- Alarmp(SPLOG_DEBUG, CONF, "Hash value for this configuration is: %u\n", Config.hash_code);
+ Config->hash_code = conf_hash_string(ConfStringRep, ConfStringLen);
+ Alarmp(SPLOG_DEBUG, CONF, "Hash value for this configuration is: %u\n", Config->hash_code);
/* Match my IP address to entry in configuration file */
if( my_name == NULL ){
gethostname(machine_name,sizeof(machine_name));
host_ptr = gethostbyname(machine_name);
if( host_ptr == 0 )
- Alarm( EXIT, "Conf_init: could not get my ip address (my name is %s)\n",
+ Alarm( EXIT, "Conf_load_conf_file: could not get my ip address (my name is %s)\n",
machine_name );
if (host_ptr->h_addrtype != AF_INET)
- Alarm(EXIT, "Conf_init: Sorry, cannot handle addr types other than IPv4\n");
+ Alarm(EXIT, "Conf_load_conf_file: Sorry, cannot handle addr types other than IPv4\n");
if (host_ptr->h_length != 4)
- Alarm(EXIT, "Conf_init: Bad IPv4 address length\n");
+ Alarm(EXIT, "Conf_load_conf_file: Bad IPv4 address length\n");
i = -1; /* in case host_ptr->h_length == 0 */
for (j = 0; host_ptr->h_addr_list[j] != NULL; j++) {
@@ -220,14 +353,14 @@
if( i >= 0 ) break;
}
if( i < 0 ) Alarm( EXIT,
- "Conf_init: My proc id (%d.%d.%d.%d) is not in configuration\n", IP1(My.id),IP2(My.id),IP3(My.id),IP4(My.id) );
+ "Conf_load_conf_file: My proc id (%d.%d.%d.%d) is not in configuration\n", IP1(My.id),IP2(My.id),IP3(My.id),IP4(My.id) );
}else if( ! strcmp( my_name, "Monitor" ) ){
gethostname(machine_name,sizeof(machine_name));
host_ptr = gethostbyname(machine_name);
if( host_ptr == 0 )
- Alarm( EXIT, "Conf_init: no such monitor host %s\n",
+ Alarm( EXIT, "Conf_load_conf_file: no such monitor host %s\n",
machine_name );
memcpy(&My.id, host_ptr->h_addr_list[0],
@@ -237,32 +370,37 @@
name_len = strlen( machine_name );
if( name_len > sizeof(My.name) ) name_len = sizeof(My.name);
memcpy(My.name, machine_name, name_len );
- Alarm( CONF, "Conf_init: My name: %s, id: %d\n",
+ Alarm( CONF, "Conf_load_conf_file: My name: %s, id: %d\n",
My.name, My.id );
- return( 1 );
+ return;
}else{
name_len = strlen( my_name );
if( name_len > sizeof(My.name) ) name_len = sizeof(My.name);
memcpy(My.name, my_name, name_len );
i = Conf_proc_by_name( My.name, &My );
if( i < 0 ) Alarm( EXIT,
- "Conf_init: My proc %s is not in configuration \n",
+ "Conf_load_conf_file: My proc %s is not in configuration \n",
My.name);
}
Conf_id_to_str( My.id, ip );
- Alarm( CONF, "Conf_init: My name: %s, id: %s, port: %hd\n",
+ Alarm( CONF, "Conf_load_conf_file: My name: %s, id: %s, port: %hd\n",
My.name, ip, My.port );
- return( 0 );
+ return;
}
configuration Conf()
{
- return Config;
+ return *Config;
}
+configuration *Conf_ref(void)
+{
+ return Config;
+}
+
proc Conf_my()
{
return My;
@@ -287,7 +425,7 @@
{
int i,j;
- for ( i=0; i < Num_procs; i++ )
+ for ( i=0; i < Config->num_total_procs; i++ )
{
for ( j=0; j < Config_procs[i].num_if; j++)
{
@@ -305,7 +443,7 @@
{
int i;
- for ( i=0; i < Num_procs; i++ )
+ for ( i=0; i < Config->num_total_procs; i++ )
{
if ( strcmp( Config_procs[i].name, name ) == 0 )
{
@@ -334,7 +472,7 @@
{
int i,j;
- for ( i=0; i < Num_procs; i++ )
+ for ( i=0; i < Config->num_total_procs; i++ )
{
for ( j=0; j < Config_procs[i].num_if; j++)
{
@@ -348,6 +486,24 @@
return( -1 );
}
+static int Conf_prev_proc_by_id( int32u id, proc *p )
+{
+ int i,j;
+
+ for ( i=0; i < Config_Previous->num_total_procs; i++ )
+ {
+ for ( j=0; j < Config_Previous_Procs[i].num_if; j++)
+ {
+ if ( Config_Previous_Procs[i].ifc[j].ip == id )
+ {
+ *p = Config_Previous_Procs[i] ;
+ return( i );
+ }
+ }
+ }
+ return( -1 );
+}
+
int Conf_append_id_to_seg( segment *seg, int32u id)
{
proc *p;
@@ -380,6 +536,11 @@
return( ret );
}
+int Conf_num_segments( configuration *config )
+{
+ return( config->num_segments );
+}
+
int32u Conf_leader( configuration *config )
{
int i;
Modified: trunk/daemon/configuration.h
===================================================================
--- trunk/daemon/configuration.h 2005-06-13 04:41:15 UTC (rev 235)
+++ trunk/daemon/configuration.h 2005-06-13 05:11:53 UTC (rev 236)
@@ -76,6 +76,7 @@
typedef struct dummy_configuration{
int32u hash_code;
int num_segments;
+ int num_total_procs;
segment segments[MAX_SEGMENTS];
} configuration;
@@ -85,15 +86,17 @@
port_reuse_off
} port_reuse;
-int Conf_init( char *file_name, char *my_name );
-int Conf_load_conf_file( char *file_name, char *my_name );
+void Conf_init( char *file_name, char *my_name );
+void Conf_load_conf_file( char *file_name, char *my_name );
configuration Conf(void);
+configuration *Conf_ref(void);
proc Conf_my(void);
int Conf_proc_by_id( int32u id, proc *p );
int Conf_proc_by_name( char *name, proc *p );
int Conf_id_in_seg( segment *seg, int32u id );
int Conf_id_in_conf( configuration *config, int32u id );
int Conf_num_procs( configuration *config );
+int Conf_num_segments( configuration *config );
int32u Conf_leader( configuration *config );
int32u Conf_last( configuration *config );
int32u Conf_seg_leader( configuration *config, int16 seg_index );
@@ -102,6 +105,10 @@
int Conf_num_procs_in_seg( configuration *config, int16 seg_index );
void Conf_id_to_str( int32u id, char *str );
char Conf_print(configuration *config);
+bool Conf_in_reload_state(void);
+void Conf_reload_state_begin(void);
+void Conf_reload_state_end(void);
+bool Conf_reload_initiate(void);
bool Conf_get_dangerous_monitor_state(void);
void Conf_set_dangerous_monitor_state(bool new_state);
Modified: trunk/daemon/flow_control.c
===================================================================
--- trunk/daemon/flow_control.c 2005-06-13 04:41:15 UTC (rev 235)
+++ trunk/daemon/flow_control.c 2005-06-13 05:11:53 UTC (rev 236)
@@ -41,15 +41,12 @@
static int16 Window;
static int16 Personal_window;
-static configuration Cn;
void FC_init( )
{
Window = 60;
Personal_window = 15;
- Cn = Conf();
-
GlobalStatus.window = Window;
GlobalStatus.personal_window = Personal_window;
}
@@ -82,14 +79,17 @@
proc dummy_proc;
int my_index;
int16 temp_window,temp_personal_window;
+ configuration *Cn;
+ Cn = Conf_ref();
+
pack_ptr = (packet_header *)scat->elements[0].buf;
if ( ! Conf_get_dangerous_monitor_state() ) {
Alarm( FLOW_CONTROL, "FC_handle_message: Request to change flow control from (%d.%d.%d.%d) denied. Monitor in safe mode\n", IP1(pack_ptr->proc_id), IP2(pack_ptr->proc_id), IP3(pack_ptr->proc_id), IP4(pack_ptr->proc_id) );
return;
}
- if( ! ( pack_ptr->memb_id.proc_id == 15051963 && Conf_id_in_conf( &Cn, pack_ptr->proc_id ) != -1 ) )
+ if( ! ( pack_ptr->memb_id.proc_id == 15051963 && Conf_id_in_conf( Cn, pack_ptr->proc_id ) != -1 ) )
{
Alarm( FLOW_CONTROL,
"FC_handle_message: Illegal monitor request\n");
@@ -100,10 +100,10 @@
my_index = Conf_proc_by_id( Conf_my().id, &dummy_proc );
if( Same_endian( pack_ptr->type ) ) {
- temp_window = cur_fc_buf[Conf_num_procs( &Cn )];
+ temp_window = cur_fc_buf[Conf_num_procs( Cn )];
temp_personal_window = cur_fc_buf[my_index];
}else{
- temp_window = Flip_int16( cur_fc_buf[Conf_num_procs( &Cn )] );
+ temp_window = Flip_int16( cur_fc_buf[Conf_num_procs( Cn )] );
temp_personal_window = Flip_int16( cur_fc_buf[my_index] );
}
if( temp_window != -1 ) Window = temp_window;
Modified: trunk/daemon/log.c
===================================================================
--- trunk/daemon/log.c 2005-06-13 04:41:15 UTC (rev 235)
+++ trunk/daemon/log.c 2005-06-13 05:11:53 UTC (rev 236)
@@ -58,28 +58,30 @@
static sp_time alive_time;
static FILE *fd;
-static proc My;
+static char My_name[MAX_PROC_NAME];
static void Log_alive(int dummy, void *dummy_p);
void Log_init()
{
long start_file_pos;
+ proc my;
Is_inited = 1;
- My = Conf_my();
+ my = Conf_my();
+ strncpy( My_name, my.name, MAX_PROC_NAME);
- fd = fopen( My.name, "a" );
+ fd = fopen( My_name, "a" );
if( fd == NULL )
- Alarm( EXIT, "Log_init: error (%s) could not open file %s\n",strerror(errno), My.name );
+ Alarm( EXIT, "Log_init: error (%s) could not open file %s\n",strerror(errno), My_name );
start_file_pos = ftell(fd);
if (start_file_pos == -1)
- Alarm( EXIT, "Log_init: failed to find end of file %s\n", My.name );
+ Alarm( EXIT, "Log_init: failed to find end of file %s\n", My_name );
fclose(fd);
- fd = fopen( My.name, "r+" );
+ fd = fopen( My_name, "r+" );
if( fd == NULL )
- Alarm( EXIT, "Log_init: error (%s) could not open file %s\n",strerror(errno), My.name );
+ Alarm( EXIT, "Log_init: error (%s) could not open file %s\n",strerror(errno), My_name );
fseek( fd, start_file_pos, SEEK_SET );
alive_time.sec = 10;
@@ -101,23 +103,23 @@
fprintf( fd, "A %13ld \n",E_get_time().sec );
#endif
if( fseek( fd, -28, SEEK_CUR ) )
- Alarm( EXIT, "Log_alive: error (%s) in fseek -28 on %s\n", strerror(errno), My.name);
+ Alarm( EXIT, "Log_alive: error (%s) in fseek -28 on %s\n", strerror(errno), My_name);
file_pos = ftell(fd);
if( fseek( fd, 28, SEEK_CUR ) )
- Alarm( EXIT, "Log_alive: error (%s) in fseek 28 on %s\n", strerror(errno), My.name);
+ Alarm( EXIT, "Log_alive: error (%s) in fseek 28 on %s\n", strerror(errno), My_name);
fclose(fd);
- fd = fopen( My.name, "r+" );
+ fd = fopen( My_name, "r+" );
if( fd == NULL )
- Alarm( EXIT, "Log_alive: error (%s) could not open file %s\n",strerror(errno), My.name );
+ Alarm( EXIT, "Log_alive: error (%s) could not open file %s\n",strerror(errno), My_name );
if( fseek( fd, file_pos, SEEK_SET ) )
- Alarm( EXIT, "Log_alive: error (%s) in fseek file_pos (%ld) on %s\n", strerror(errno), file_pos, My.name);
+ Alarm( EXIT, "Log_alive: error (%s) in fseek file_pos (%ld) on %s\n", strerror(errno), file_pos, My_name);
E_queue( Log_alive, 0, NULL, alive_time );
}
void Log_membership()
{
- configuration Cn;
+ configuration *Cn;
int32 proc_id;
proc dummy_p;
int i,j;
@@ -132,12 +134,12 @@
E_get_time().sec, Memb_id_for_Network().time );
#endif
found = -1;
- Cn= Conf();
- for( i=0; i < Conf().num_segments; i++ )
+ Cn = Conf_ref();
+ for( i=0; i < Cn->num_segments; i++ )
{
- for( j=0; j < Cn.segments[i].num_procs; j++ )
+ for( j=0; j < Cn->segments[i].num_procs; j++ )
{
- proc_id = Cn.segments[i].procs[j]->id;
+ proc_id = Cn->segments[i].procs[j]->id;
if( Conf_id_in_conf( Memb_active_ptr(), proc_id ) != -1 )
{
if( found == -1 ) found = Conf_proc_by_id( proc_id, &dummy_p );
Modified: trunk/daemon/membership.c
===================================================================
--- trunk/daemon/membership.c 2005-06-13 04:41:15 UTC (rev 235)
+++ trunk/daemon/membership.c 2005-06-13 05:11:53 UTC (rev 236)
@@ -90,10 +90,9 @@
static int State;
static int Token_alive;
static proc My;
-static segment My_conf_seg;
static int32 My_seg_rep;
static int Foreign_found;
-static configuration Cn;
+static configuration *Cn;
static members_info F_members;
static reps_info F_reps;
@@ -119,7 +118,6 @@
static void Form_or_fail();
static void Scast_alive();
static void Send_join();
-static void Lookup_new_members();
static int Insert_member( members_info *m, int32 proc_id );
static int Insert_rep( reps_info *r, rep_info rep );
static int32 Smallest_member( members_info *m, int *index );
@@ -139,22 +137,23 @@
packet_header *pack_ptr;
int32 reference_subnet;
int32 current_subnet;
- int i;
+ int i, num_seg;
State = OP;
GlobalStatus.state = OP;
GlobalStatus.membership_changes = 0;
My = Conf_my();
- Cn = Conf();
+ Cn = Conf_ref();
+ num_seg = Conf_num_segments( Cn );
- reference_subnet = Cn.segments[0].procs[0]->id;
+ reference_subnet = Cn->segments[0].procs[0]->id;
reference_subnet = reference_subnet & 0xffff0000;
Wide_network = 0;
- for( i=1; i < Cn.num_segments; i++ )
+ for( i=1; i < num_seg; i++ )
{
- current_subnet = Cn.segments[i].procs[0]->id;
+ current_subnet = Cn->segments[i].procs[0]->id;
current_subnet = current_subnet & 0xffff0000;
if( current_subnet != reference_subnet )
{
@@ -191,12 +190,11 @@
/* Lookup timeout when only one segment exists can be longer,
* since a no remote segments need to be probed
*/
- if ( Cn.num_segments == 1 )
+ if ( num_seg == 1 )
Lookup_timeout.sec = 300;
Membership = Conf();
- My_conf_seg = Cn.segments[My.seg_index];
- for( i=0; i < Conf().num_segments; i++ )
+ for( i=0; i < num_seg; i++ )
Membership.segments[i].num_procs = 0;
Conf_append_id_to_seg( &Membership.segments[My.seg_index], My.id);
Membership_id.proc_id = My.id;
@@ -214,6 +212,11 @@
Memb_token_loss();
}
+void Memb_signal_conf_reload(void)
+{
+ My = Conf_my();
+
+}
configuration *Memb_active_ptr()
{
if( State == EVS ) return ( &Future_membership );
@@ -255,7 +258,7 @@
pack_ptr = (packet_header *)scat->elements[0].buf;
/* First reject any message whose daemon has a different configuration */
- if (pack_ptr->conf_hash != Cn.hash_code) {
+ if (pack_ptr->conf_hash != Cn->hash_code) {
Alarmp( SPLOG_WARNING, MEMB, "Memb_handle_message: Received message (pkthdr_len = %u) from host %d.%d.%d.%d with different spread configuration file (hash %u != local hash %u)\n",
scat->elements[0].len,
IP1(pack_ptr->proc_id),
@@ -263,7 +266,7 @@
IP3(pack_ptr->proc_id),
IP4(pack_ptr->proc_id),
pack_ptr->conf_hash,
- Cn.hash_code);
+ Cn->hash_code);
return;
}
@@ -313,14 +316,14 @@
token_ptr = (token_header *)scat->elements[0].buf;
/* First reject any token whose daemon has a different configuration */
- if (token_ptr->conf_hash != Cn.hash_code) {
+ if (token_ptr->conf_hash != Cn->hash_code) {
Alarmp( SPLOG_WARNING, MEMB, "Memb_handle_token: Received token from host %d.%d.%d.%d with different spread configuration file (hash %u != local hash %u)\n",
IP1(token_ptr->proc_id),
IP2(token_ptr->proc_id),
IP3(token_ptr->proc_id),
IP4(token_ptr->proc_id),
token_ptr->conf_hash,
- Cn.hash_code);
+ Cn->hash_code);
return;
}
@@ -473,7 +476,7 @@
* ignored because a real token loss will trigger an ALIVE broadcast
* packet which will force the membership change.
*/
- if ( Cn.num_segments > 1 )
+ if ( Conf_num_segments( Cn ) > 1 )
Memb_token_loss();
}
break;
@@ -632,7 +635,7 @@
}
if( Conf_leader( &Membership ) == My.id )
{
- Lookup_new_members();
+ Memb_lookup_new_members();
}else if( Conf_seg_leader( &Membership, My.seg_index ) == My.id &&
(!Foreign_found) ){
/*
@@ -770,7 +773,7 @@
Potential_reps.num_reps = 0;
temp_rep.type = POTENTIAL_REP;
- for( i=0; i < Conf().num_segments; i++ )
+ for( i=0; i < Conf_num_segments( Cn ); i++ )
{
if( i == My.seg_index ) continue;
temp_rep.seg_index = i;
@@ -778,7 +781,7 @@
{
temp_rep.proc_id = Membership.segments[i].procs[0]->id;
}else{
- temp_rep.proc_id = Cn.segments[i].procs[0]->id;
+ temp_rep.proc_id = Conf_seg_leader( Cn, i );
}
Insert_rep( &Potential_reps, temp_rep );
}
@@ -812,7 +815,7 @@
}
/* Next adding reps from my membership - they are less important */
- for( i=0; i < Conf().num_segments; i++ )
+ for( i=0; i < Conf_num_segments( Cn ); i++ )
{
if( i == My.seg_index ) continue;
temp_rep.seg_index = i;
@@ -820,7 +823,7 @@
{
temp_rep.proc_id = Membership.segments[i].procs[0]->id;
}else{
- temp_rep.proc_id = Cn.segments[i].procs[0]->id;
+ temp_rep.proc_id = Conf_seg_leader( Cn, i );
}
Insert_rep( &Potential_reps, temp_rep );
}
@@ -839,7 +842,7 @@
/* update potential according to future_membership */
Potential_reps.num_reps = 0;
temp_rep.type = POTENTIAL_REP;
- for( i=0; i < Conf().num_segments; i++ )
+ for( i=0; i < Conf_num_segments( Cn ); i++ )
{
if( i == My.seg_index ) continue;
temp_rep.seg_index = i;
@@ -848,7 +851,7 @@
temp_rep.proc_id =
Future_membership.segments[i].procs[0]->id;
}else{
- temp_rep.proc_id = Cn.segments[i].procs[0]->id;
+ temp_rep.proc_id = Conf_seg_leader( Cn, i );
}
Insert_rep( &Potential_reps, temp_rep );
}
@@ -862,12 +865,12 @@
E_dequeue( Send_join, 0, NULL );
E_dequeue( Form_or_fail, 0, NULL );
E_dequeue( Prot_token_hurry, 0, NULL );
- E_dequeue( Lookup_new_members, 0, NULL );
+ E_dequeue( Memb_lookup_new_members, 0, NULL );
Last_token->type = 0;
Last_token->seq = 0;
Last_token->aru = 0;
- for( i=0; i < Conf().num_segments; i++ )
+ for( i=0; i < Conf_num_segments( Cn ); i++ )
Membership.segments[i].num_procs = 0;
Conf_append_id_to_seg(&Membership.segments[My.seg_index], My.id);
}
@@ -911,7 +914,7 @@
else F_reps.reps[0].type = SEG_REP;
E_dequeue( Scast_alive, 0, NULL );
- E_dequeue( Lookup_new_members, 0, NULL );
+ E_dequeue( Memb_lookup_new_members, 0, NULL );
E_queue( Send_join, 0, NULL, Zero_timeout );
E_queue( Form_or_fail, 0, NULL, Gather_timeout );
}
@@ -939,7 +942,7 @@
{
/* clear everything and go back to op */
E_dequeue( Send_join, 0, NULL);
- E_queue( Lookup_new_members, 0, NULL, Lookup_timeout );
+ E_queue( Memb_lookup_new_members, 0, NULL, Lookup_timeout );
State = OP;
GlobalStatus.state = OP;
}else{
@@ -952,7 +955,7 @@
/* clear everything and go back to op */
Alarm( MEMB, "Form_or_fail:failed, return to OP\n");
E_dequeue( Send_join, 0, NULL );
- E_queue( Lookup_new_members, 0, NULL, Lookup_timeout );
+ E_queue( Memb_lookup_new_members, 0, NULL, Lookup_timeout );
State = OP;
GlobalStatus.state = OP;
}else{
@@ -984,7 +987,7 @@
pack_ptr = (packet_header *)Send_pack.elements[0].buf;
pack_ptr->type = ALIVE_TYPE;
- pack_ptr->conf_hash = Cn.hash_code;
+ pack_ptr->conf_hash = Cn->hash_code;
pack_ptr->data_len =
2*sizeof(int16) + (F_members.num_members)*sizeof(int32);
@@ -1005,7 +1008,7 @@
pack_ptr = (packet_header *)Send_pack.elements[0].buf;
pack_ptr->type = JOIN_TYPE;
- pack_ptr->conf_hash = Cn.hash_code;
+ pack_ptr->conf_hash = Cn->hash_code;
Send_pack.elements[1].buf = (char *)&F_members;
Send_pack.elements[1].len =
2*sizeof(int16) + (F_members.num_members)*sizeof(int32);
@@ -1028,7 +1031,7 @@
E_queue( Send_join, 0, NULL, Join_timeout );
}
-static void Lookup_new_members()
+void Memb_lookup_new_members()
{
packet_header *pack_ptr;
int num_missing;
@@ -1036,7 +1039,7 @@
if( State != OP )
{
- Alarm( MEMB, "Lookup_new_member: not in OP state, returning\n");
+ Alarm( MEMB, "Memb_lookup_new_member: not in OP state, returning\n");
return;
}
@@ -1051,7 +1054,7 @@
pack_ptr = (packet_header *)Send_pack.elements[0].buf;
pack_ptr->type = JOIN_TYPE;
- pack_ptr->conf_hash = Cn.hash_code;
+ pack_ptr->conf_hash = Cn->hash_code;
Send_pack.elements[1].buf = (char *)&F_members;
Send_pack.elements[1].len =
2*sizeof(int16) + (F_members.num_members)*sizeof(int32);
@@ -1064,18 +1067,18 @@
num_missing = 0;
/* For single segment configured, send local broadcast of join to entire segment -- current members will ignore */
- if ( Cn.num_segments == 1 ) {
+ if ( Conf_num_segments( Cn ) == 1 ) {
Net_scast( My.seg_index, &Send_pack );
num_missing++;
} else {
/* Send unicasts to each host that is not in the current membership. */
- for( i=0; i < Cn.num_segments; i++ )
+ for( i=0; i < Conf_num_segments( Cn ); i++ )
{
- for( j=0; j < Cn.segments[i].num_procs; j++ )
+ for( j=0; j < Conf_num_procs_in_seg( Cn, i ); j++ )
{
- if( Conf_id_in_conf( &Reg_membership, Cn.segments[i].procs[j]->id ) == -1 )
+ if( Conf_id_in_conf( &Reg_membership, Cn->segments[i].procs[j]->id ) == -1 )
{
- Net_ucast( Cn.segments[i].procs[j]->id, &Send_pack );
+ Net_ucast( Cn->segments[i].procs[j]->id, &Send_pack );
num_missing++;
}
}
@@ -1315,7 +1318,7 @@
form_token.seq = Highest_seq+3333;
form_token.proc_id = My.id;
form_token.type = FORM1_TYPE;
- form_token.conf_hash = Cn.hash_code;
+ form_token.conf_hash = Cn->hash_code;
/* if I am a ring leader - update my F_members */
if( F_reps.reps[0].type == RING_REP )
@@ -2239,18 +2242,22 @@
void Memb_transitional()
{
int i, j, k;
+ int num_seg, num_proc;
int32u proc_id;
Alarm( MEMB, "Memb_transitional\n");
+ num_seg = Conf_num_segments( Cn );
+
Transitional = 1;
- Trans_membership.num_segments = Cn.num_segments;
- for( i=0; i < Cn.num_segments; i++ )
+ Trans_membership.num_segments = num_seg;
+ for( i=0; i < num_seg; i++ )
{
+ num_proc = Conf_num_procs_in_seg( Cn, i );
Trans_membership.segments[i].num_procs = 0;
- for( j=0; j < Cn.segments[i].num_procs; j++ )
+ for( j=0; j < num_proc; j++ )
{
- proc_id = Cn.segments[i].procs[j]->id;
+ proc_id = Cn->segments[i].procs[j]->id;
for( k=0; k < Commit_set.num_pending; k++ )
{
if( Commit_set.members[k] == proc_id )
@@ -2266,13 +2273,14 @@
Trans_memb_id.proc_id = Conf_leader( &Trans_membership );
Trans_memb_id.time = F_trans_memb_time;
- Commit_membership.num_segments = Cn.num_segments;
- for( i=0; i < Cn.num_segments; i++ )
+ Commit_membership.num_segments = Conf_num_segments( Cn );
+ for( i=0; i < num_seg; i++ )
{
+ num_proc = Conf_num_procs_in_seg( Cn, i );
Commit_membership.segments[i].num_procs = 0;
- for( j=0; j < Cn.segments[i].num_procs; j++ )
+ for( j=0; j < num_proc; j++ )
{
- proc_id = Cn.segments[i].procs[j]->id;
+ proc_id = Cn->segments[i].procs[j]->id;
for( k=0; k < Commit_set.num_members; k++ )
{
if( Commit_set.members[k] == proc_id )
@@ -2313,7 +2321,7 @@
Foreign_found = 0;
if( Conf_leader( &Membership ) == My.id )
- E_queue( Lookup_new_members, 0, NULL, Lookup_timeout );
+ E_queue( Memb_lookup_new_members, 0, NULL, Lookup_timeout );
printf("Membership id is ( %d, %d)\n", Membership_id.proc_id, Membership_id.time );
printf("%c", Conf_print( &Membership ) );
}
Modified: trunk/daemon/membership.h
===================================================================
--- trunk/daemon/membership.h 2005-06-13 04:41:15 UTC (rev 235)
+++ trunk/daemon/membership.h 2005-06-13 05:11:53 UTC (rev 236)
@@ -57,6 +57,9 @@
void Memb_handle_message( sys_scatter *scat );
void Memb_handle_token( sys_scatter *scat );
void Memb_token_loss();
+void Memb_lookup_new_members();
+void Memb_signal_conf_reload();
+
void Memb_commit();
void Memb_transitional();
void Memb_regular();
Modified: trunk/daemon/monitor.c
===================================================================
--- trunk/daemon/monitor.c 2005-06-13 04:41:15 UTC (rev 235)
+++ trunk/daemon/monitor.c 2005-06-13 05:11:53 UTC (rev 236)
@@ -115,6 +115,8 @@
static void Report_message();
+static void Reload_Conf();
+
#ifdef _REENTRANT
#ifndef ARCH_PC_WIN95
@@ -143,6 +145,7 @@
static void Usage( int argc, char *argv[] );
static void initialize_locks(void);
+static void read_configuration(void);
int main( int argc, char *argv[] )
{
@@ -198,13 +201,8 @@
Usage( argc, argv );
Alarm_set_interactive();
- Conf_init( Config_file, My_name );
- Cn = Conf();
- My = Conf_my();
+ read_configuration();
- Alarm_clear_types(ALL);
- Alarm_set_types(PRINT | EXIT );
-
#ifdef ARCH_PC_WIN95
ret = WSAStartup( MAKEWORD(2,0), &WSAData );
if( ret != 0 )
@@ -273,6 +271,17 @@
return 0;
}
+static void read_configuration(void)
+{
+
+ Conf_init( Config_file, My_name );
+ Cn = Conf();
+ My = Conf_my();
+
+ Alarm_clear_types(ALL);
+ Alarm_set_types(PRINT | EXIT );
+
+}
static void initialize_locks(void)
{
int ret;
@@ -408,6 +417,12 @@
break;
+ case 'r':
+ /* trigger reload of spread configuration */
+ Reload_Conf();
+ printf("Reload Membership. \n");
+
+ break;
case '9':
case 'q':
printf("Bye.\n");
@@ -442,6 +457,8 @@
printf("\n");
printf("\t8. Terminate Spread Daemons {all, none, Proc, CR}\n");
printf("\n");
+ printf("\tr. Reload Configuration File\n");
+ printf("\n");
printf("\t9. Exit\n");
printf("\n");
printf("Monitor> ");
@@ -984,6 +1001,38 @@
}
+/* Send message to all daemons to reload their configuration files to change the set of daemons */
+static void Reload_Conf()
+{
+ int32 proc_id;
+ proc p;
+ int proc_index;
+ int i,j;
+
+ Pack.type = RELOAD_TYPE;
+ Pack.type = Set_endian( Pack.type );
+ Pack.data_len = 0;
+
+ Pack_scat.num_elements = 1;
+
+ Alarm( PRINT , "Monitor: send conf reload command\n");
+ for( i=0; i < Cn.num_segments ; i++ )
+ {
+ for( j=0; j < Cn.segments[i].num_procs; j++ )
+ {
+ proc_id = Cn.segments[i].procs[j]->id;
+ proc_index = Conf_proc_by_id( proc_id, &p );
+ DL_send( SendChan, p.id, p.port, &Pack_scat );
+ }
+ }
+ /* Now reload monitor's configuration
+ * and clear all Partition, Status, FC arrays as they are now inaccurate
+ */
+
+ read_configuration();
+
+}
+
static void Usage(int argc, char *argv[])
{
My_name = 0; /* NULL */
Modified: trunk/daemon/net_types.h
===================================================================
--- trunk/daemon/net_types.h 2005-06-13 04:41:15 UTC (rev 235)
+++ trunk/daemon/net_types.h 2005-06-13 05:11:53 UTC (rev 236)
@@ -66,6 +66,7 @@
#define STATUS_TYPE 0x01000000
#define PARTITION_TYPE 0x02000000
#define FC_TYPE 0x04000000
+#define RELOAD_TYPE 0x08000000
#define CONTROL_TYPE 0x0f000000
@@ -99,6 +100,7 @@
#define Is_status( type ) ( type & STATUS_TYPE )
#define Is_partition( type ) ( type & PARTITION_TYPE )
#define Is_fc( type ) ( type & FC_TYPE )
+#define Is_conf_reload( type ) ( type & RELOAD_TYPE )
#define Is_control( type ) ( type & CONTROL_TYPE )
Modified: trunk/daemon/network.c
===================================================================
--- trunk/daemon/network.c 2005-06-13 04:41:15 UTC (rev 235)
+++ trunk/daemon/network.c 2005-06-13 05:11:53 UTC (rev 236)
@@ -74,15 +74,14 @@
static configuration Net_membership;
static int Segment_leader;
-static configuration Cn;
+static configuration *Cn;
static proc My;
-static segment My_seg;
static int16 Partition[MAX_PROCS_RING];
static sp_time Partition_timeout = { 60, 0};
-static int My_index;
+static int Partition_my_index;
-static void Clear_partition(int dummy, void *dummy_p);
+static void Clear_partition_cb(int dummy, void *dummy_p);
static int In_my_component( int32 proc_id );
static void Flip_pack( packet_header *pack_ptr );
static void Flip_token( token_header *token_ptr );
@@ -95,17 +94,17 @@
int i;
bool bcast_bound = FALSE;
- Cn = Conf();
+ Cn = Conf_ref();
My = Conf_my();
- My_index = Conf_proc_by_id( My.id, &dummy_proc );
- Clear_partition(0, NULL);
+ Partition_my_index = Conf_proc_by_id( My.id, &dummy_proc );
+ Net_clear_partition();
- if( Cn.segments[My.seg_index].num_procs > 1 )
+ if( Cn->segments[My.seg_index].num_procs > 1 )
{
/* I am not allone in segment */
Bcast_needed = 1;
- Bcast_address = Cn.segments[My.seg_index].bcast_address;
+ Bcast_address = Cn->segments[My.seg_index].bcast_address;
Bcast_port = My.port;
Alarm( NETWORK, "Net_init: Bcast needed to address (%d, %d)\n",
@@ -143,16 +142,29 @@
Num_send_needed = 0;
}
+/* Called from above when configuration file is reloaded (potentially with changes to spread configuration
+ * Needs to update any static-scope variables that depend on current configuration
+ */
+void Net_signal_conf_reload(void)
+{
+ proc dummy_proc;
+ Partition_my_index = Conf_proc_by_id( My.id, &dummy_proc );
+
+ Cn = Conf_ref();
+ My = Conf_my();
+}
+
void Net_set_membership( configuration memb )
{
int i;
int my_index_in_seg;
int my_next_index;
+ segment my_seg;
Net_membership = memb;
- My_seg = Net_membership.segments[My.seg_index];
- my_index_in_seg = Conf_id_in_seg( &My_seg, My.id );
+ my_seg = Net_membership.segments[My.seg_index];
+ my_index_in_seg = Conf_id_in_seg( &my_seg, My.id );
if( my_index_in_seg < 0 )
Alarm( EXIT,"Net_set_membership: I am not in membership %c",
Conf_print( &Net_membership ) );
@@ -163,7 +175,7 @@
Num_send_needed = 0;
my_next_index = -1;
- for( i=0; i < Conf().num_segments; i++ )
+ for( i=0; i < Conf_num_segments( Cn ); i++ )
{
if( i == My.seg_index )
{
@@ -191,10 +203,10 @@
/* Calculate where to send the token */
Token_address = 0;
- if( my_index_in_seg < My_seg.num_procs-1 )
+ if( my_index_in_seg < my_seg.num_procs-1 )
{
- Token_address = My_seg.procs[my_index_in_seg+1]->id;
- Token_port = My_seg.port+1;
+ Token_address = my_seg.procs[my_index_in_seg+1]->id;
+ Token_port = my_seg.port+1;
}else{
/* I am last in my segment */
if( Num_send_needed == 0 )
@@ -203,8 +215,8 @@
* My segment is the only segment
* sending token to the first in my segment
*/
- Token_address = My_seg.procs[0]->id;
- Token_port = My_seg.port+1;
+ Token_address = my_seg.procs[0]->id;
+ Token_port = my_seg.port+1;
} else if( Num_send_needed == my_next_index ) {
/*
* My segment is the last segment
@@ -466,24 +478,26 @@
return( 0 );
}
- if( ! ( pack_ptr->memb_id.proc_id == 15051963 && Conf_id_in_conf( &Cn, pack_ptr->proc_id ) != -1 ) ) return( 0 );
+ if( ! ( pack_ptr->memb_id.proc_id == 15051963 && Conf_id_in_conf( Cn, pack_ptr->proc_id ) != -1 ) ) return( 0 );
cur_partition = (int16 *)scat->elements[1].buf;
- for( i=0; i < Conf_num_procs( &Cn ); i++ )
- if( Same_endian( pack_ptr->type ) ) Partition[i] = cur_partition[i];
- else Partition[i] = Flip_int16( cur_partition[i] );
+ for( i=0; i < Conf_num_procs( Cn ); i++ )
+ if( ! Same_endian( pack_ptr->type ) )
+ cur_partition[i] = Flip_int16( cur_partition[i] );
- E_queue( Clear_partition, 0, NULL, Partition_timeout );
- if( Partition[My_index] < 0 )
+ Net_set_partition(cur_partition);
+
+ E_queue( Clear_partition_cb, 0, NULL, Partition_timeout );
+ if( Partition[Partition_my_index] == -1 )
Alarm( EXIT, "Net_recv: Instructed to exit by monitor\n");
Alarm( PRINT , "Net_recv: Got monitor message, component %d\n",
- Partition[My_index] );
+ Partition[Partition_my_index] );
return( 0 );
}
- /* Monitor : droping packet from other monitor components */
+ /* Monitor : drop packet from daemon in different monitor-caused partition */
if( ! ( pack_ptr->memb_id.proc_id == 15051963 || In_my_component( pack_ptr->transmiter_id ) ) )
return( 0 );
@@ -641,7 +655,7 @@
/* Fliping token header to my form if needed */
if( !Same_endian( token_ptr->type ) ) Flip_token( token_ptr );
- /* Monitor : droping token from other monitor components */
+ /* Monitor : drop token from daemon in different monitor-caused partition */
if( !In_my_component( token_ptr->transmiter_id ) )
return( 0 );
@@ -690,14 +704,32 @@
return( &(Token_channel[0]) );
}
-static void Clear_partition(int dummy, void *dummy_p)
+void Net_set_partition(int16 *new_partition)
{
+ int i;
+
+ if ( Conf_in_reload_state() ) {
+ Alarmp(SPLOG_DEBUG, NETWORK, "Net_set_partition: Can not change partition since daemon configuration change in progress\n");
+ return;
+ }
+
+ for( i=0; i < Conf_num_procs( Cn ); i++ )
+ Partition[i] = new_partition[i];
+}
+
+void Net_clear_partition(void)
+{
int i;
- for( i=0; i < Conf_num_procs( &Cn ); i++ )
+ for( i=0; i < Conf_num_procs( Cn ); i++ )
Partition[i] = 0;
}
+static void Clear_partition_cb(int dummy, void *dummy_p)
+{
+ Net_clear_partition();
+}
+
static int In_my_component( int32 proc_id )
{
int proc_index;
@@ -712,7 +744,7 @@
return( 0 );
}
- return( Partition[My_index] == Partition[proc_index] );
+ return( Partition[Partition_my_index] == Partition[proc_index] );
}
void Flip_pack( packet_header *pack_ptr )
Modified: trunk/daemon/network.h
===================================================================
--- trunk/daemon/network.h 2005-06-13 04:41:15 UTC (rev 235)
+++ trunk/daemon/network.h 2005-06-13 05:11:53 UTC (rev 236)
@@ -41,6 +41,8 @@
void Net_init();
void Net_set_membership( configuration memb );
+void Net_signal_conf_reload(void);
+
int Net_bcast( sys_scatter *scat );
int Net_queue_bcast(sys_scatter *scat);
int Net_flush_bcast(void);
@@ -54,4 +56,7 @@
channel *Net_token_channel(void);
void Net_num_channels(int *num_bcast, int *num_token);
+void Net_set_partition(int16 *new_partition);
+void Net_clear_partition(void);
+
#endif /* INC_NETWORK */
Modified: trunk/daemon/protocol.c
===================================================================
--- trunk/daemon/protocol.c 2005-06-13 04:41:15 UTC (rev 235)
+++ trunk/daemon/protocol.c 2005-06-13 05:11:53 UTC (rev 236)
@@ -67,6 +67,7 @@
static packet_header *Hurry_head;
static sys_scatter Hurry_pack;
+static sp_time Zero_timeout = { 0, 0};
/* ### Pack: 1 line */
static packet_info Buffered_packets[ARCH_SCATTER_SIZE];
@@ -85,6 +86,8 @@
static void Deliver_reliable_packets( int32 start_seq, int num_packets );
static void Deliver_agreed_packets();
+static void Prot_handle_conf_reload(sys_scatter *scat);
+
void Prot_init(void)
{
int i, num_bcast, num_token;
@@ -264,6 +267,11 @@
return;
}
+ if( Is_conf_reload( pack_ptr->type ) )
+ {
+ Prot_handle_conf_reload( &New_pack );
+ return;
+ }
/* delete random
r1 = ((-My.id)%17)+3;
r2 = get_rand() % (r1+3 );
@@ -637,6 +645,43 @@
GlobalStatus.token_rounds++;
}
+/* Basic algorithm:
+ * 1) have configuration code load new conf file and check for modifications to conf.
+ * 2) If only add/sub of daemons, then initiate membership change with token_loss and return;
+ * 3) else, then set Conf_reload_state, create singleton partition, and schedule token_loss.
+ * 4) When membership completes in Discard_packets() cleanup partition and probe for new members.
+ */
+static void Prot_handle_conf_reload(sys_scatter *scat)
+{
+ bool need_memb_partition;
+ int16 singleton_partition[MAX_PROCS_RING];
+ int i;
+
+ need_memb_partition = Conf_reload_initiate();
+
+ /* Signal all subsystems to update Conf and My strucures */
+ Net_signal_conf_reload();
+ Memb_signal_conf_reload();
+ Sess_signal_conf_reload();
+
+ /* update protocol varialbes with new conf */
+ My = Conf_my();
+ My_index = Conf_proc_by_id( My.id, &My );
+
+ if (need_memb_partition) {
+ /* make partition */
+ for ( i = 0 ; i < Conf_num_procs( Conf_ref() ); i++ )
+ {
+ singleton_partition[i] = i;
+ }
+ Net_set_partition(singleton_partition);
+
+ Conf_reload_state_begin();
+ }
+ E_queue( Memb_token_loss, 0, NULL, Zero_timeout );
+}
+
+
void Prot_new_message( down_link *down_ptr, int not_used_in_spread3_p )
{
int32 leader_id;
@@ -1131,7 +1176,19 @@
reg_memb_id = Memb_id();
Sess_deliver_reg_memb( Reg_membership, reg_memb_id );
-
+ /* If in change conf mode; then if singleton (which should be true) and GOP state then:
+ * Remove partition
+ * Initiate Memb_lookup() to find other daemons
+ */
+ if( Conf_in_reload_state() ) {
+ /* GOP state equals value 1, but is private declaration in groups.c */
+ if ( (GlobalStatus.gstate != 1 ) || ( Conf_num_procs( &Reg_membership ) != 1 ) ) {
+ Alarmp( SPLOG_FATAL, MEMB, "Discard_packets: Failed to reload configuration - gstate: %d and num_procs in membership: %d\n", GlobalStatus.gstate, Conf_num_procs( &Reg_membership) );
+ }
+ Net_clear_partition();
+ E_queue( Memb_lookup_new_members, 0, NULL, Zero_timeout);
+ Conf_reload_state_end();
+ }
/* set variables for next membership */
Last_token->aru = 0;
Highest_seq = 0;
Modified: trunk/daemon/session.c
===================================================================
--- trunk/daemon/session.c 2005-06-13 04:41:15 UTC (rev 235)
+++ trunk/daemon/session.c 2005-06-13 05:11:53 UTC (rev 236)
@@ -119,11 +119,12 @@
static void Sess_create_reject_message ( message_obj *msg );
static int Sess_get_p2p_dests( int num_groups, char groups[][MAX_GROUP_NAME], char dests[][MAX_GROUP_NAME] );
-#define ACTIVATE_PORT_REUSE(mbox) do { \
- int on = 1; \
- if (setsockopt(mbox, SOL_SOCKET, SO_REUSEADDR, (void *) &on, sizeof(on)) < 0) \
- Alarm( EXIT, "Sess_init: Error setting SO_REUSEADDR socket option\n" ); \
-} while (0)
+static void Sess_activate_port_reuse(mailbox mbox)
+{
+ int on = 1;
+ if (setsockopt(mbox, SOL_SOCKET, SO_REUSEADDR, (void *) &on, sizeof(on)) < 0)
+ Alarm( EXIT, "Sess_activate_port_reuse: From Sess_init: Error setting SO_REUSEADDR socket option\n" );
+}
int Sess_get_session_index (int mbox)
{
@@ -369,14 +370,14 @@
Alarm( EXIT, "Sess_init: INET sock error\n" );
type = Conf_get_port_reuse_type();
if (type == port_reuse_on)
- ACTIVATE_PORT_REUSE(mbox);
+ Sess_activate_port_reuse(mbox);
if (Is_IfType_Any(My.ifc[i].type) )
inet_addr.sin_addr.s_addr = INADDR_ANY;
else
{
if (type == port_reuse_auto)
- ACTIVATE_PORT_REUSE(mbox);
+ Sess_activate_port_reuse(mbox);
inet_addr.sin_addr.s_addr = htonl(My.ifc[i].ip);
}
if( bind( mbox, (struct sockaddr *)&inet_addr, sizeof(inet_addr) ) == -1)
@@ -436,6 +437,11 @@
Alarm( SESSION, "Sess_init: ended ok\n" );
}
+void Sess_signal_conf_reload(void)
+{
+ My = Conf_my();
+}
+
void Sess_set_active_threshold()
{
/* This function is used only by the session (and groups) layer */
Modified: trunk/daemon/session.h
===================================================================
--- trunk/daemon/session.h 2005-06-13 04:41:15 UTC (rev 235)
+++ trunk/daemon/session.h 2005-06-13 05:11:53 UTC (rev 236)
@@ -76,6 +76,7 @@
} session;
void Sess_init(void);
+void Sess_signal_conf_reload(void);
void Sess_block_users_level(void);
void Sess_unblock_users_level(void);
void Sess_block_user(int xxx);
Modified: trunk/daemon/status.c
===================================================================
--- trunk/daemon/status.c 2005-06-13 04:41:15 UTC (rev 235)
+++ trunk/daemon/status.c 2005-06-13 05:11:53 UTC (rev 236)
@@ -49,7 +49,6 @@
static channel Report_channel;
static sys_scatter Report_scat;
static packet_header Pack;
-static configuration Cn;
void Stat_init()
@@ -71,8 +70,6 @@
Pack.data_len = sizeof( status );
Pack.proc_id = Conf_my().id;
- Cn = Conf();
-
GlobalStatus.major_version = SP_MAJOR_VERSION;
GlobalStatus.minor_version = SP_MINOR_VERSION;
GlobalStatus.patch_version = SP_PATCH_VERSION;
@@ -90,7 +87,7 @@
int ret;
pack_ptr = (packet_header *)scat->elements[0].buf;
- if( ! ( pack_ptr->memb_id.proc_id == 15051963 && Conf_id_in_conf( &Cn, pack_ptr->proc_id ) != -1 ) )
+ if( ! ( pack_ptr->memb_id.proc_id == 15051963 && Conf_id_in_conf( Conf_ref(), pack_ptr->proc_id ) != -1 ) )
{
Alarm( STATUS, "Stat_handle_message: Illegal monitor request\n");
return;
More information about the Spread-cvs
mailing list