[Spread-cvs] cvs commit: spread/daemon Changelog groups.c groups.h objects.h sess_body.h sp.c sp_func.h user.c
jonathan at spread.org
jonathan at spread.org
Tue Jul 12 09:50:34 EDT 2011
jonathan 04/10/29 17:13:09
Modified: daemon Changelog groups.c groups.h objects.h sess_body.h
sp.c sp_func.h user.c
Log:
Apply Ryan Caudy's rework of groups code to report multiple VS sets
in membership messages. This breaks the SP API, althought some backwards
compability code is included. Also fixes the Java interface to support
new methods.
Revision Changes Path
1.6 +54 -0 spread/daemon/Changelog
Index: Changelog
===================================================================
RCS file: /storage/cvsroot/spread/daemon/Changelog,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- Changelog 29 Oct 2004 15:01:03 -0000 1.5
+++ Changelog 29 Oct 2004 21:13:07 -0000 1.6
@@ -1,3 +1,57 @@
+Fri Oct 29 01:18:05 2004 Jonathan Stanton <jonathan at cnds.jhu.edu>
+
+ * groups.c, groups.h,sess_body.h,objects.h,sp.c,sp.h,user.c:
+ Apply Ryan Caudy's major Group change to support multiple VS set reporting.
+
+ * javalib/Membership.java, javaapps/User.java, javaapps/recThread.java:
+ Apply Ryan's change to java library to support VS sets. Change the
+ example code of User to print out the new membership info.
+
+Fri May 07 10:53:03 2004 Ryan Caudy <rcaudy at gmail.com>
+
+ * Add multiple VS (virtual synchrony) sets to regular membership messages at
+ SP interface. These show all that is known about which members were
+ virtually synchronous with one another during their previous regular
+ memberships. ***Breaks client compatibility for regular membership
+ messages.***
+
+ * Use a representative based group message exchange.
+ This allows merging partial states if both parties are complete in
+ a cascading case. The two points this can occur are G_handle_reg_memb and
+ G_compute_and_notify.
+
+ * Stop throwing away useful information:
+ - No longer delete currently up-to-date groups state at G_Compute_notify.
+ This allows each daemon to not process groups messages from their
+ representative. Allows a much better performance for large numbers of
+ group memberships.
+ - If rep state (2) applies then do not drop groups state on cascade, but
+ instead process the messages and proceed under a single representative.
+ As a result the GROUPS message has changed and the heavy weight client
+ membership message format has chagned. The lightweight client membership
+ message has not changed, except small syntax changes to be consistent.
+
+ * Fixes: include last fall's group_id fixes.
+
+ * Change the order of members in membership messages to be by daemon name,
+ then member name, where daemosn are ordered as in conf file.
+
+ * Groups skiplist in memory now has 3 layers (group-daemon-member).
+
+ * Remove NEW state for groups -- it is obselete.
+
+ * Lots of cleanups and creation of X_send_mess() functions that encapsulate
+ all the common sending code.
+
+ * Send heavyweight membership messages followed by transitional membership
+ messages for leaves and disconnects from changed groups in GTRANS, instead
+ of notifying in the next heavyweight membership message. This changes the
+ sending of these membership message types be the same as that for joins.
+
+ * The C and Java library and example code have been updated to reflect the new
+ regular membership message format.
+
+
Fri Oct 29 10:53:03 2004 Jonathan Stanton <jonathan at cnds.jhu.edu>
* groups.c: Move assignment of this_group out of assert() call.
1.19 +1697 -1451spread/daemon/groups.c
Index: groups.c
===================================================================
RCS file: /storage/cvsroot/spread/daemon/groups.c,v
retrieving revision 1.18
retrieving revision 1.19
diff -u -r1.18 -r1.19
--- groups.c 29 Oct 2004 15:01:03 -0000 1.18
+++ groups.c 29 Oct 2004 21:13:07 -0000 1.19
@@ -61,23 +61,49 @@
#define NULL 0
#endif /* NULL */
-#define ESTABLISHED_MEMBER 0
-#define NEW_MEMBER 1
-#define PARTITIONED_MEMBER 2
-#define Is_established_member( status ) ( status == ESTABLISHED_MEMBER )
-#define Is_new_member( status ) ( status == NEW_MEMBER )
-#define Is_partitioned_member( status ) ( status == PARTITIONED_MEMBER )
+/* An unknown membership id (on a daemon_members struct), defined as having a proc_id of -1,
+ * means that the daemon is "partitioned." "Established" is the complement.
+ * Since -1 == 255.255.255.255 is the universal broadcast address, this can never
+ * conflict with a *real* proc_id. */
+#define Is_unknown_memb_id( midp ) ( (midp)->proc_id == -1 )
+
+#define Is_established_daemon( dp ) ( !Is_unknown_memb_id( &((dp)->memb_id) ) )
+#define Is_partitioned_daemon( dp ) ( Is_unknown_memb_id( &((dp)->memb_id) ) )
+
+/* Flag values - working with a pointer to char, set it 0x01 or test if its 0x01 */
+#define Set_first_message( cp ) ( (*cp) = 0x01 )
+#define Set_later_message( cp ) ( (*cp) = 0x00 )
+#define Is_first_message( cp ) ( (*cp) == 0x01 )
+#define Is_later_message( cp ) ( (*cp) == 0x00 )
+
+/* IP should be a 32-bit integer, and
+ * STR should be a character array of size at least 16. */
+#define IP_to_STR( IP, STR ) snprintf( STR, 16, "%d.%d.%d.%d", \
+ IP1(IP), IP2(IP), IP3(IP), IP4(IP) )
+
+/* The representative of a synced set is the first member.
+ * The set is sorted by daemon order in conf. */
+#define Is_synced_set_leader( proc_id ) ( (proc_id) == MySyncedSet.proc_ids[0] )
typedef struct dummy_groups_buf_link {
- char buf[GROUPS_BUF_SIZE];
- int bytes;
- struct dummy_groups_buf_link *next;
+ char buf[GROUPS_BUF_SIZE];
+ int bytes;
+ struct dummy_groups_buf_link *next;
} groups_buf_link;
-
-struct worklist {
- char name[MAX_GROUP_NAME];
- Skiplist *groups;
-};
+
+typedef struct dummy_synced_set {
+ int32u size;
+ int32 proc_ids[MAX_PROCS_RING];
+} synced_set;
+
+/* This message link struct enables the messages from a given synced
+ * set to be kept together. */
+typedef struct dummy_groups_message_link {
+ int32 rep_proc_id;
+ int complete;
+ message_link *first;
+ struct dummy_groups_message_link *next;
+} groups_message_link;
char *printgroup(void *vgrp) {
group *grp = (group *)vgrp;
@@ -91,133 +117,155 @@
static membership_id Reg_memb_id;
static char Mess_buf[MAX_MESSAGE_BODY_LEN];
-static groups_buf_link *Groups_bufs;
-static int Num_mess_gathered;
-static int Num_daemons_gathered;
-static message_link Gathered; /* Groups messages */
+static groups_buf_link *Groups_bufs;
+static int Groups_bufs_fresh;
+static int Num_mess_gathered;
+static int Num_daemons_gathered;
+static groups_message_link Gathered; /* Groups messages */
-#if 0
-static group Work[MAX_PROCS_RING+1];
-#endif
static int Groups_control_down_queue;
-static int G_id_is_equal( group_id g1, group_id g2 );
-static void G_print_group_id( group_id g );
+static Skiplist GroupsList;
+static synced_set MySyncedSet;
+static membership_id unknown_memb_id = { -1, -1 }; /* See explanation above. */
+
+/* Unused function
+ *static int G_id_is_equal( group_id g1, group_id g2 );
+ */
+static void G_print_group_id( int priority, group_id g, char *func_name );
static group *G_get_group( char *group_name );
-static member *G_get_member( group *grp, char *private_group_name );
+static daemon_members *G_get_daemon( group *grp, int32u proc_id );
+static member *G_get_member( daemon_members *dmn, char *private_group_name );
+
+static void G_send_lightweight_memb( group *grp, int32 caused, char *private_group_name );
+static void G_send_self_leave( group *grp, int ses );
+static void G_send_heavyweight_memb( group *grp );
+static void G_send_heavyweight_join( group *grp, member *joiner, mailbox new_mbox );
+static void G_send_trans_memb( group *grp );
+static void G_compute_group_mask( group *grp, char *func_name );
+
static int G_build_memb_buf( group *grp, message_obj *msg,
- char buf[] );
+ char buf[], int32 caused );
static int G_build_memb_vs_buf( group *grp, message_obj *msg,
- char buf[], int32 caused );
+ char buf[], int32 caused, member *joiner );
static message_link *G_build_trans_mess( group *grp );
-static void G_stamp_groups_buf( char buf[] );
static void G_build_groups_msg_hdr( message_obj *msg, int groups_bytes );
-static int G_build_groups_buf( char buf[], struct skiplistnode **iter_ptr );
-static int G_mess_to_groups( message_link *mess_link, char *name,
- struct worklist *work );
-static int G_smallest_group_indices( Skiplist *work, struct worklist *indices[] );
+static int G_build_groups_buf( char buf[], struct skiplistnode **giter_ptr,
+ struct skiplistnode **diter_ptr );
+static void G_build_new_groups_bufs(void);
+static int G_send_groups_messages(void);
+static void G_stamp_groups_bufs(void);
+static void G_discard_groups_bufs(void);
+static int G_mess_to_groups( message_link *mess_link, synced_set *sset );
static void G_compute_and_notify(void);
static void G_print(void);
-static void G_empty_groups_bufs(void);
+static void G_update_daemon_memb_ids( group *grp );
-static Skiplist GroupsList;
+static void G_add_to_synced_set( synced_set *s );
+static int G_check_synced_set( synced_set *s, configuration *m );
+static void G_print_synced_set( int priority, synced_set *s, char *func_name );
+
+static int G_eliminate_partitioned_daemons( group *grp );
+static int G_check_if_changed_by_cascade( group *grp );
+static void G_remove_daemon( group *grp, daemon_members *dmn );
+static void G_remove_group( group *grp );
static int G_compare(void *, void *);
-static int G_compare(void *a, void *b)
+static int G_compare_memb_ids( const membership_id *mid1,
+ const membership_id *mid2 );
+static int G_compare_proc_ids_by_conf( int32 pid1, int32 pid2 );
+static int G_daemon_recordcompare( void *a, void *b );
+static int G_daemon_keycompare( void *key, void *b );
+static int G_daemon_vs_set_recordcompare( void *a, void *b );
+
+/* Comparator function for anything that starts with a null-terminated char[] which
+ * is also the key. This applies to group and member objects, and so this function
+ * is used as the comparator for their records and keys. */
+static int G_compare(void *a, void *b)
{
- /* Takes two Group records and compares them based on their keys (name) */
- /* This will work for any record type that has a char[LENGTH] as the first
- member of the struct and is the key */
assert(a);
assert(b);
return strcmp((char *)a, (char *)b);
}
-int G_member_recordcompare(void *a, void *b)
-{
- int compared;
- /* Takes two Member records and compares them based on their keys (name) */
- member *am = (member *)a, *bm = (member *)b;
- assert(a);
- assert(b);
- compared = strcmp( bm->private_name, am->private_name );
- if(compared > 0)
- return -1;
- if(compared == 0)
- return 0;
- return 1;
-}
-int G_member_keycompare(void *a, void *b)
-{
- int compared;
- /* Takes two Member records and compares them based on their keys (name) */
- member *bm = (member *)b;
- char *am = (char *)a;
- assert(a);
- assert(b);
- compared = strcmp( bm->private_name, am );
- if(compared > 0)
- return -1;
- if(compared == 0) {
- return 0;
- }
- return 1;
-}
-/* Take two worklist pointers and compare them by the ->groups pointers */
-/* We do not actually care about the order they are stored in the skiplist
- * that is why we are using meaningless pointers as keys.
- */
-static int G_work_groups_comp(void *a, void *b)
-{
- struct worklist *wA, *wB;
- assert(a);
- assert(b);
- wA = (struct worklist *)a;
- wB = (struct worklist *)b;
-
- if (wA->groups < wB->groups)
- return(-1);
- if (wA->groups == wB->groups)
- return(0);
- return(1);
-}
-static int G_work_groups_keycomp(void *key, void *b)
-{
- struct worklist *wB;
- Skiplist *wKey;
- assert(key);
- assert(b);
- wB = (struct worklist *)b;
- wKey = (Skiplist *)key;
- if (wKey < wB->groups)
- return(-1);
- if (wKey == wB->groups)
- return(0);
- return(1);
-}
-#if 0
-int G_group_revproc_comp(void *a, void *b) {
- int32 aid, bid;
- struct worklist *A=(struct worklist *)a, *B=(struct worklist *)b;
- aid=A->proc_index;
- bid=B->proc_index;
- return (aid>bid)?(-1):((aid==bid)?0:1);
-}
-int G_group_revproc_keycomp(void *a, void *b) {
- int32 aid, bid;
- struct worklist *B=(struct worklist *)b;
- bid=B->proc_index;
- aid = *(int32 *)a;
- return (aid>bid)?-1:((aid==bid)?0:1);
+/* Compares two memb_ids. Arbitrary, but deterministic. A memb_id with a proc_id
+ * of -1 compares after ANY other memb_id, by definition. */
+static int G_compare_memb_ids( const membership_id *mid1, const membership_id *mid2 )
+{
+ int unknown_ids = 0;
+ if( Is_unknown_memb_id( mid1 ) )
+ unknown_ids += 1;
+ if( Is_unknown_memb_id( mid2 ) )
+ unknown_ids += 2;
+ switch( unknown_ids ) {
+ case 0: break;
+ case 1: return 1;
+ case 2: return -1;
+ case 3: return 0;
+ }
+ if( mid1->proc_id != mid2->proc_id )
+ return mid1->proc_id - mid2->proc_id;
+ if( mid1->time != mid2->time )
+ return mid1->time - mid2->time;
+ return 0;
+}
+
+/* Compares two daemon proc_ids, by their order in the conf. */
+static int G_compare_proc_ids_by_conf( int32 pid1, int32 pid2 )
+{
+ int ia, ib;
+ proc dummy_proc;
+ ia = Conf_proc_by_id( pid1, &dummy_proc );
+ ib = Conf_proc_by_id( pid2, &dummy_proc );
+ assert( ia > -1 );
+ assert( ib > -1 );
+ return ia - ib;
+}
+
+/* Takes two daemons and compares them based on their indexes in the
+ * configuration. */
+static int G_daemon_recordcompare( void *a, void *b )
+{
+ daemon_members *da, *db;
+ da = (daemon_members *) a;
+ db = (daemon_members *) b;
+ assert(da);
+ assert(db);
+ return G_compare_proc_ids_by_conf( da->proc_id, db->proc_id );
+}
+
+/* Compares in the same way. */
+static int G_daemon_keycompare( void *key, void *b )
+{
+ int32 *pida = (int32 *)key;
+ daemon_members *db = (daemon_members *)b;
+ assert(pida);
+ assert(db);
+ return G_compare_proc_ids_by_conf( *pida, db->proc_id );
+}
+
+/* Compares two daemons by gid. Conf order is used to break ties. */
+static int G_daemon_vs_set_recordcompare( void *a, void *b )
+{
+ int compared;
+ daemon_members *da, *db;
+ da = (daemon_members *) a;
+ db = (daemon_members *) b;
+ assert(da);
+ assert(db);
+ compared = G_compare_memb_ids( &(da->memb_id), &(db->memb_id) );
+ if( compared )
+ return compared;
+ return G_compare_proc_ids_by_conf( da->proc_id, db->proc_id );
}
-#endif
+
void G_init()
{
int ret;
- Alarm( GROUPS, "G_init: \n" );
+ Alarmp( SPLOG_INFO, GROUPS, "G_init: \n" );
Num_groups = 0;
GlobalStatus.num_groups = Num_groups;
@@ -231,25 +279,36 @@
G_compare,
G_compare);
- /* Groups.next = NULL;
- Groups.prev = NULL; */
+ MySyncedSet.size = 1;
+ MySyncedSet.proc_ids[0] = My.id;
+ G_print_synced_set( SPLOG_INFO, &MySyncedSet, "G_init" );
- ret = Mem_init_object(GROUP, sizeof(group), 100, 0);
+ ret = Mem_init_object(GROUP, sizeof(group), 1000, 0);
if (ret < 0)
{
- Alarm(EXIT, "G_init: Failure to Initialize GROUP memory objects\n");
+ Alarmp( SPLOG_FATAL, GROUPS, "G_init: Failure to Initialize GROUP memory objects\n");
}
-
- ret = Mem_init_object(MEMBER, sizeof(member), 200, 0);
+ ret = Mem_init_object(DAEMON_MEMBERS, sizeof(daemon_members), 2000, 0);
if (ret < 0)
{
- Alarm(EXIT, "G_init: Failure to Initialize MEMBER memory objects\n");
+ Alarmp( SPLOG_FATAL, GROUPS, "G_init: Failure to Initialize DAEMON_MEMBERS memory objects\n");
+ }
+ ret = Mem_init_object(MEMBER, sizeof(member), 10000, 0);
+ if (ret < 0)
+ {
+ Alarmp( SPLOG_FATAL, GROUPS, "G_init: Failure to Initialize MEMBER memory objects\n");
}
ret = Mem_init_object(GROUPS_BUF_LINK, sizeof(groups_buf_link), 1, 1);
if( ret < 0 )
{
- Alarm(EXIT, "G_init: Failure to Initialize GROUPS_BUF_LINK memory objects\n");
+ Alarmp( SPLOG_FATAL, GROUPS, "G_init: Failure to Initialize GROUPS_BUF_LINK memory objects\n");
+ }
+ ret = Mem_init_object(GROUPS_MESSAGE_LINK, sizeof(groups_message_link), 1, 1);
+ if( ret < 0 )
+ {
+ Alarmp( SPLOG_FATAL, GROUPS, "G_init: Failure to Initialize GROUPS_MESSAGE_LINK memory objects\n");
}
+
#if ( SPREAD_PROTOCOL == 3 )
Groups_control_down_queue = 0;
#else
@@ -257,9 +316,11 @@
#endif
Groups_bufs = NULL;
+ Groups_bufs_fresh = 0;
Num_mess_gathered = 0;
Num_daemons_gathered = 0;
- Gathered.next = NULL;
+ Gathered.complete = 0;
+ Gathered.next = NULL;
Gstate = GOP;
GlobalStatus.gstate = Gstate;
@@ -267,28 +328,23 @@
void G_handle_reg_memb( configuration reg_memb, membership_id reg_memb_id )
{
- group *grp, *nextgroup;
- member *mbr, *nextmember;
- struct skiplistnode *giter, *iter, *passed_iter;
- groups_buf_link *grps_buf_link;
- message_link *mess_link;
- down_link *down_ptr;
- message_obj *msg;
- message_header *head_ptr;
- int num_bytes;
- int needed;
- int ses;
- int i;
-
-
- Alarm( GROUPS, "G_handle_reg_memb: with (%d.%d.%d.%d, %d) id\n",
- IP1(reg_memb_id.proc_id),IP2(reg_memb_id.proc_id),IP3(reg_memb_id.proc_id),
- IP4(reg_memb_id.proc_id), reg_memb_id.time );
+ group *grp, *nextgroup;
+ struct skiplistnode *giter;
+ groups_message_link *grp_mlink;
+ message_link *mess_link;
+ synced_set sset;
+ int ret;
+ char ip_string[16];
+ int group_changed, synced_set_changed;
+
+ IP_to_STR( reg_memb_id.proc_id, ip_string );
+ Alarmp( SPLOG_INFO, GROUPS, "G_handle_reg_memb: with (%s, %d) id\n",
+ ip_string, reg_memb_id.time );
switch( Gstate )
{
case GOP:
- Alarm( EXIT, "G_handle_reg_memb in GOP\n");
+ Alarmp( SPLOG_FATAL, GROUPS, "G_handle_reg_memb in GOP\n");
break;
@@ -296,321 +352,304 @@
/*
* Save reg_memb and reg_memb_id
* if previous Trans_memb is equal to reg_memb then:
+ * (Purely subtractive membership change!!)
* for every changed group
- * eliminate partitioned members
+ * eliminate partitioned daemons
* set Grp_id to (reg_memb_id, 1)
* notify local members of regular membership
* Shift to GOP
- * else
- * for every changed group
- * eliminate partitioned members
- * set Grp_id to (reg_memb_id, -1)
- * Replace protocol queue, raise event thershold
- * Build groups message -- only local members
- * Send groups message
- * Shift to GGATHER
- */
- Alarm( GROUPS, "G_handle_reg_memb in GTRANS\n");
+ * else
+ * for every changed group
+ * eliminate partitioned members
+ * Replace protocol queue, raise event thershold
+ * If I'm representative of synced set
+ * Build groups messages --
+ * contains only members local to daemons in SyncedSet
+ * Send groups messages
+ * Shift to GGATHER
+ */
+ Alarmp( SPLOG_INFO, GROUPS, "G_handle_reg_memb in GTRANS\n");
- Reg_memb = reg_memb;
+ Reg_memb = reg_memb;
Reg_memb_id = reg_memb_id;
if( Conf_num_procs( &Trans_memb ) == Conf_num_procs( &Reg_memb ) )
{
giter = sl_getlist( &GroupsList );
- grp = (giter)?(group *)giter->data:NULL;
+ grp = (giter) ? (group *)giter->data : NULL;
for( ; grp != NULL ; grp = nextgroup )
{
nextgroup = sl_next( &GroupsList, &giter );
if( grp->changed )
{
- /* The group has changed */
- /* eliminating partitioned members */
- iter = sl_getlist( &grp->MembersList );
- mbr = (iter)?(member *)iter->data:NULL;
- for( ; mbr != NULL ; mbr = nextmember )
- {
- nextmember = sl_next( &grp->MembersList, &iter );
- if( Is_partitioned_member( mbr->status ) )
- {
- /* discard this member - proc no longer in membership */
- sl_remove ( &grp->MembersList,mbr->private_name, dispose);
- grp->num_members--;
- }
- }
- if( grp->num_members == 0 )
- {
- /* discard this empty group */
- sl_destruct ( &grp->MembersList, dispose);
- sl_remove ( &GroupsList, grp->name, dispose);
- Num_groups--;
- GlobalStatus.num_groups = Num_groups;
- }else{
- Alarm( GROUPS, "G_handle_reg_memb: skipping state transfer for group %s.\n", grp->name );
- Alarm( DEBUG, "G_handle_reg_memb: changing group_id from: " );
- G_print_group_id( grp->grp_id );
- grp->grp_id.memb_id = Reg_memb_id;
- grp->grp_id.index = 1;
- grp->changed = 0;
- Alarm( DEBUG, " to: " );
- G_print_group_id( grp->grp_id );
- Alarm( DEBUG, "\n" );
-
- if( grp->num_local > 0 ){
- /* send members regular membership */
- msg = Message_new_message();
- num_bytes = G_build_memb_vs_buf( grp, msg, Mess_buf, CAUSED_BY_NETWORK );
-
- /* create the mess_link */
- mess_link = new( MESSAGE_LINK );
- Message_Buffer_to_Message_Fragments( msg, Mess_buf, num_bytes );
- mess_link->mess = msg;
- Obj_Inc_Refcount(mess_link->mess);
-
- /* notify local members */
- needed = 0;
- for( i=0; i < grp->num_local; i++ )
+ /* The group has changed */
+ /* eliminating partitioned daemons */
+ G_eliminate_partitioned_daemons( grp );
+ if( grp->num_members == 0 )
+ {
+ /* discard this empty group */
+ G_remove_group( grp );
+ }else{
+ Alarmp( SPLOG_INFO, GROUPS,
+ "G_handle_reg_memb: no state transfer needed for group %s.\n",
+ grp->name );
+ Alarmp( SPLOG_DEBUG, GROUPS,
+ "G_handle_reg_memb: changing group_id for group %s\n", grp->name );
+ G_print_group_id( SPLOG_DEBUG, grp->grp_id, "G_handle_reg_memb" );
+ grp->grp_id.memb_id = Reg_memb_id;
+ grp->grp_id.index = 1;
+ grp->changed = 0;
+ G_print_group_id( SPLOG_DEBUG, grp->grp_id, "G_handle_reg_memb" );
+ G_update_daemon_memb_ids( grp );
+ if( grp->num_local > 0 )
{
- ses = Sess_get_session_index ( grp->mbox[i] );
- if( Is_memb_session( Sessions[ ses ].status ) )
- Sess_write( ses, mess_link, &needed );
+ G_send_heavyweight_memb( grp );
}
- Message_Dec_Refcount(msg);
- if( !needed ) Sess_dispose_message( mess_link );
}
- }
- }
- }
- Gstate = GOP;
- GlobalStatus.gstate = Gstate;
- }else{
- /*
- * else
- * for every changed group
- * eliminate partitioned members
- * set Grp_id to (reg_memb_id, -1)
- * Replace protocol queue, raise event thershold
- * build groups message -- only local members
- * Send groups message
- * Shift to GGATHER
- */
- giter = sl_getlist( &GroupsList );
- grp = (giter)?(group *)giter->data:NULL;
- for( ; grp != NULL ; grp = nextgroup )
- {
- nextgroup = sl_next( &GroupsList, &giter );
- if( grp->changed )
- {
- /* The group has changed */
- /* eliminating partitioned members */
- iter = sl_getlist( &grp->MembersList );
- mbr = (iter)?(member *)iter->data:NULL;
- for( ; mbr != NULL ; mbr = nextmember )
- {
- nextmember = sl_next( &grp->MembersList, &iter );
- if( Is_partitioned_member( mbr->status ) )
- {
- /* discard this member - proc no longer in membership */
- sl_remove ( &grp->MembersList,mbr->private_name, dispose);
- grp->num_members--;
- }
- }
- if( grp->num_members == 0 )
- {
- /* discard this empty group */
- sl_destruct ( &grp->MembersList, dispose);
- sl_remove ( &GroupsList, grp->name, dispose);
- Num_groups--;
- GlobalStatus.num_groups = Num_groups;
- }
}
}
- /* raise events threshold */
- Session_threshold = MEDIUM_PRIORITY;
- Sess_set_active_threshold();
-
- /* Replace down queue */
- Prot_set_down_queue( GROUPS_DOWNQUEUE );
-
- /* build and send Groups message */
- /* Nowadays, we can send multiple groups messages. No group has
- * data in more than one. As an optimization, only the last message is
- * AGREED, and all previous ones are RELIABLE. G_handle_groups uses this
- * knowledge when parsing Groups messages. */
- passed_iter = NULL;
- do {
- msg = Message_new_message();
- grps_buf_link = new( GROUPS_BUF_LINK );
- grps_buf_link->next = Groups_bufs;
- Groups_bufs = grps_buf_link;
- grps_buf_link->bytes = G_build_groups_buf(grps_buf_link->buf, &passed_iter);
- G_build_groups_msg_hdr( msg, grps_buf_link->bytes );
- head_ptr = Message_get_message_header(msg);
- if( passed_iter )
- head_ptr->type |= RELIABLE_MESS;
- else
- head_ptr->type |= AGREED_MESS;
- Message_Buffer_to_Message_Fragments( msg, grps_buf_link->buf, grps_buf_link->bytes );
-
- down_ptr = Prot_Create_Down_Link(msg, Message_get_packet_type(head_ptr->type), 0, 0);
- down_ptr->mess = msg;
- Obj_Inc_Refcount(down_ptr->mess);
- /* Use control queue--not normal session queues */
- Prot_new_message( down_ptr, Groups_control_down_queue );
- Message_Dec_Refcount(msg);
- Alarm( GROUPS, "G_handle_reg_memb: (%8s) GROUPS message sent in GTRANS with %d bytes\n",
- (passed_iter) ? "RELIABLE" : "AGREED", grps_buf_link->bytes );
- } while( passed_iter != NULL );
+ Gstate = GOP;
+ GlobalStatus.gstate = Gstate;
- Gstate = GGATHER;
- GlobalStatus.gstate = Gstate;
- }
- break;
+ } else {
+ /*
+ * else
+ * for every changed group
+ * eliminate partitioned members
+ * Replace protocol queue, raise event thershold
+ * If I'm representative of synced set
+ * Build groups messages --
+ * contains only members local to daemons in SyncedSet
+ * Send groups messages
+ * Shift to GGATHER
+ */
+
+ giter = sl_getlist( &GroupsList );
+ grp = (giter)?(group *)giter->data:NULL;
+ for( ; grp != NULL ; grp = nextgroup )
+ {
+ nextgroup = sl_next( &GroupsList, &giter );
+ if( grp->changed )
+ {
+ /* The group has changed */
+ /* eliminating partitioned members */
+ G_eliminate_partitioned_daemons( grp );
+ if( grp->num_members == 0 )
+ {
+ /* discard this empty group */
+ G_remove_group( grp );
+ }
+ }
+ }
+ /* raise events threshold */
+ Session_threshold = MEDIUM_PRIORITY;
+ Sess_set_active_threshold();
+
+ /* Replace down queue */
+ Prot_set_down_queue( GROUPS_DOWNQUEUE );
+
+ /* If I'm the head of my synced set, I send one or more GROUPS messages.
+ * No daemon's data about members for a given group is ever split across
+ * multiple messages. As an optimization, only the last message is sent
+ * AGREED, and all previous messages are sent RELIABLE. G_handle_groups depends
+ * on this to determine when it has all the messages it is waiting for. */
+ if( Is_synced_set_leader(My.id) ) {
+ G_build_new_groups_bufs();
+ Groups_bufs_fresh = 1;
+ ret = G_send_groups_messages();
+ Alarmp( SPLOG_INFO, GROUPS,
+ "G_handle_reg_memb: %d GROUPS messages sent in GTRANS\n", ret );
+ }
- case GGATHER:
- Alarm( EXIT, "G_handle_reg_memb in GGATHER\n");
+ Gstate = GGATHER;
+ GlobalStatus.gstate = Gstate;
+ break;
- break;
+ case GGATHER:
+ Alarmp( SPLOG_FATAL, GROUPS, "G_handle_reg_memb in GGATHER\n");
- case GGT:
- /*
- * Save reg_memb and reg_memb_id
- * Clear all retained Groups messages
- * Stamp own Groups message with current membership id
- * Send group message
- * Shift to GGATHER
- */
- Alarm( GROUPS, "G_handle_reg_memb in GGT\n");
+ break;
- Reg_memb = reg_memb;
- Reg_memb_id = reg_memb_id;
+ case GGT:
+ /*
+ * Save reg_memb and reg_memb_id
+ * If I received all of my synced set's group messages
+ * For all synced sets for which I have all the Groups messages
+ * Extract group information from Groups messages
+ * Add the synced set to mine
+ * Clear all retained Groups messages
+ * Check the groups state against the last delivered Trans_memb
+ * Check my synced set against Trans_memb
+ * If I'm representative of synced set
+ * If Groups bufs are still good (i.e. we didn't change anything)
+ * Stamp Groups messages with current membership id
+ * Else
+ * Remove all Groups bufs
+ * Build groups messages --
+ * contains only members local to daemons in SyncedSet
+ * Send groups messages
+ * Shift to GGATHER
+ */
+ Alarmp( SPLOG_INFO, GROUPS, "G_handle_reg_memb in GGT\n");
+
+ Reg_memb = reg_memb;
+ Reg_memb_id = reg_memb_id;
+
+ /* If our messages have all arrived, then we can bring into our
+ * synced set anyone who's messages have all arrived.
+ * Regardless, we need to dispose of the groups messages. */
+ for( grp_mlink = Gathered.next; grp_mlink != NULL; )
+ {
+ for( mess_link = grp_mlink->first; mess_link != NULL; )
+ {
+ if( Gathered.complete && grp_mlink->complete )
+ {
+ ret = G_mess_to_groups( mess_link, &sset );
+ if( ret < 0 )
+ Alarmp( SPLOG_FATAL, GROUPS, "G_handle_reg_memb:"
+ " G_mess_to_groups errored %d\n", ret );
+ Groups_bufs_fresh = 0;
+ }
+ grp_mlink->first = mess_link->next;
+ Sess_dispose_message( mess_link );
+ mess_link = grp_mlink->first;
+ }
+ if( Gathered.complete && grp_mlink->complete )
+ G_add_to_synced_set( &sset );
+ Gathered.next = grp_mlink->next;
+ dispose( grp_mlink );
+ grp_mlink = Gathered.next;
+ }
- /* Clear retained Groups messages in Gathered */
- for( i=0; i < Num_mess_gathered; i++ )
- {
- mess_link = Gathered.next;
- Gathered.next = mess_link->next;
- Sess_dispose_message( mess_link );
- }
- Num_mess_gathered = 0;
- Num_daemons_gathered = 0;
+ /* We put off really handling the transitional configuration until now
+ * so as to not deliver potentially inconsistent groups messages
+ * if we completed the old state exchange. Now, prepare for the next one.
+ */
+ giter = sl_getlist( &GroupsList );
+ grp = (giter)?(group *)giter->data:NULL;
+ for( ; grp != NULL ; grp = nextgroup )
+ {
+ nextgroup = sl_next( &GroupsList, &giter );
+ group_changed = G_eliminate_partitioned_daemons( grp );
+ if( group_changed )
+ {
+ Groups_bufs_fresh = 0;
+ if( grp->num_members == 0 )
+ {
+ /* discard this empty group */
+ G_remove_group( grp );
+ } else {
+ grp->changed = 1;
+ }
+ }
+ }
+ synced_set_changed = G_check_synced_set( &MySyncedSet, &Trans_memb );
+ G_print_synced_set( SPLOG_INFO, &MySyncedSet, "G_handle_reg_memb" );
+ /* Since one of the groups bufs holds the synced set... */
+ if( synced_set_changed )
+ Groups_bufs_fresh = 0;
+
+ Gathered.complete = 0;
+ Num_mess_gathered = 0;
+ Num_daemons_gathered = 0;
+
+ if( Is_synced_set_leader(My.id) ) {
+ /* Stamp own Groups message in buffer with current membership id */
+ if( Groups_bufs_fresh ) {
+ G_stamp_groups_bufs();
+ } else {
+ G_discard_groups_bufs();
+ G_build_new_groups_bufs();
+ Groups_bufs_fresh = 1;
+ }
+ ret = G_send_groups_messages();
+ Alarmp( SPLOG_INFO, GROUPS, "G_handle_reg_memb: %d GROUPS messages"
+ " sent in GGT\n", ret );
+ }
- for( grps_buf_link = Groups_bufs; grps_buf_link; grps_buf_link = grps_buf_link->next )
- {
- /* Stamp own Groups message in buffer with current membership id */
- G_stamp_groups_buf( grps_buf_link->buf );
+ Gstate = GGATHER;
+ GlobalStatus.gstate = Gstate;
- /* send Groups message */
- msg = Message_new_message();
- G_build_groups_msg_hdr( msg, grps_buf_link->bytes );
- head_ptr = Message_get_message_header(msg);
- if( grps_buf_link->next )
- head_ptr->type |= RELIABLE_MESS;
- else
- head_ptr->type |= AGREED_MESS;
- Message_Buffer_to_Message_Fragments( msg, grps_buf_link->buf, grps_buf_link->bytes );
-
- down_ptr = Prot_Create_Down_Link(msg, Message_get_packet_type(head_ptr->type), 0, 0);
- down_ptr->mess = msg;
- Obj_Inc_Refcount(down_ptr->mess);
- /* Use control queue--not normal session queues */
- Prot_new_message( down_ptr, Groups_control_down_queue );
- Message_Dec_Refcount(msg);
- Alarm( GROUPS, "G_handle_reg_memb: (%8s) GROUPS message sent in GGT with %d bytes\n",
- (grps_buf_link->next) ? "RELIABLE" : "AGREED", grps_buf_link->bytes );
+ break;
}
-
- Gstate = GGATHER;
- GlobalStatus.gstate = Gstate;
-
- break;
- }
+ }
}
void G_handle_trans_memb( configuration trans_memb, membership_id trans_memb_id )
{
- group *grp, *nextgroup;
- member *mbr, *nextmember;
- struct skiplistnode *giter, *iter;
- int group_changed;
- message_link *mess_link;
- int needed;
- int ses;
- int i;
-
- Alarm( GROUPS, "G_handle_trans_memb: \n" );
+ group *grp, *nextgroup;
+ daemon_members *dmn, *nextdaemon;
+ struct skiplistnode *giter, *diter;
+ int group_changed;
+ char ip_string[16];
+
+ IP_to_STR( trans_memb_id.proc_id, ip_string );
+ Alarmp( SPLOG_INFO, GROUPS, "G_handle_trans_memb: with (%s, %d) id\n",
+ ip_string, trans_memb_id.time );
switch( Gstate )
{
case GOP:
/*
* Save transitional membership
- * For every group that has members that are not in the trans_memb do:
- * mark group members that are not in trans_memb as partitioned.
+ * For every group that has daemons that are not in the trans_memb do:
+ * mark group daemons that are not in trans_memb as partitioned.
* notify local members with an empty transitional group mess.
- * mark group as changed (index = -1)
+ * mark group as changed
+ * update group ID and daemon membership IDs with Trans_memb_id
* Shift to GTRANS
*/
- Alarm( GROUPS, "G_handle_trans_memb in GOP\n");
+ Alarmp( SPLOG_INFO, GROUPS, "G_handle_trans_memb in GOP\n");
Trans_memb = trans_memb;
Trans_memb_id = trans_memb_id;
- Alarm( GROUPS, "G_handle_trans_memb: Received trans memb id of:"
- " {proc_id: %d "
- " time: %d}\n", Trans_memb_id.proc_id, Trans_memb_id.time );
giter = sl_getlist( &GroupsList );
- grp = (giter)?(group *)giter->data:NULL;
+ grp = (giter) ? (group *)giter->data : NULL;
for( ; grp != NULL ; grp = nextgroup )
- {
- nextgroup = sl_next( &GroupsList, &giter );
- group_changed = 0;
- iter = sl_getlist( &grp->MembersList );
- mbr = (iter)?(member *)iter->data:NULL;
- for( ; mbr != NULL ; mbr = nextmember )
- {
- nextmember = sl_next( &grp->MembersList, &iter );
- if( Conf_id_in_conf( &Trans_memb, mbr->proc_id ) == -1 )
- {
- /* mark this member as partitioned - proc no longer in membership */
- mbr->status = PARTITIONED_MEMBER;
+ {
+ nextgroup = sl_next( &GroupsList, &giter );
+ group_changed = 0;
+ diter = sl_getlist( &grp->DaemonsList );
+ dmn = (diter) ? (daemon_members *)diter->data : NULL;
+ for( ; dmn != NULL; dmn = nextdaemon ) {
+ nextdaemon = sl_next( &grp->DaemonsList, &diter );
+ if( Conf_id_in_conf( &Trans_memb, dmn->proc_id ) == -1 )
+ {
+ /* mark this daemon as partitioned - proc no longer in membership */
+ dmn->memb_id = unknown_memb_id;
group_changed = 1;
- }
+ }
}
- if( group_changed )
+ if( group_changed )
{
- if( grp->num_local > 0 )
- {
- /* send members transitional membership */
- mess_link = G_build_trans_mess( grp );
- needed = 0;
- for( i=0; i < grp->num_local; i++ )
- {
- ses = Sess_get_session_index ( grp->mbox[i] );
- if( Is_memb_session( Sessions[ ses ].status ) )
- Sess_write( ses, mess_link, &needed );
- }
- if( !needed ) Sess_dispose_message( mess_link );
+ if( grp->num_local > 0 )
+ {
+ G_send_trans_memb( grp );
}
- Alarm( DEBUG, "G_handle_trans_memb: changed group %s in GOP, change"
- " group id from: ", grp->name );
- G_print_group_id( grp->grp_id );
- grp->grp_id.memb_id = trans_memb_id;
- grp->grp_id.index = 1; /* Not technically needed, but not bad, either. */
+ Alarmp( SPLOG_DEBUG, GROUPS, "G_handle_trans_memb: changed group %s in GOP,"
+ " change group id \n", grp->name );
+ G_print_group_id( SPLOG_DEBUG, grp->grp_id, "G_handle_trans_memb" );
+ grp->grp_id.memb_id = Trans_memb_id;
+ grp->grp_id.index = 1;
grp->changed = 1;
- Alarm( DEBUG, " to: " );
- G_print_group_id( grp->grp_id );
- Alarm( DEBUG, "\n" );
+ G_print_group_id( SPLOG_DEBUG, grp->grp_id, "G_handle_trans_memb" );
+ /* Here, we mark everyone who we are synced with as having the new memb_id
+ * Specifically, that is, everyone who isn't partitioned from us now in the GroupsList */
+ G_update_daemon_memb_ids( grp );
}
}
Gstate = GTRANS;
GlobalStatus.gstate = Gstate;
+ G_check_synced_set( &MySyncedSet, &Trans_memb );
+ G_print_synced_set( SPLOG_INFO, &MySyncedSet, "G_handle_trans_memb" );
+
break;
case GTRANS:
- Alarm( EXIT, "G_handle_trans_memb in GTRANS\n");
+ Alarmp( SPLOG_FATAL, GROUPS, "G_handle_trans_memb in GTRANS\n");
break;
@@ -618,56 +657,43 @@
/*
* Save transitional membership
* For every group that has members that are not in the
- * trans_memb do:
- * discard group members that are not in trans_memb
- * if group is changed then mark it as changed (index = -1) (it might be already changed, but its ok).
+ * trans_memb do:
+ * If group has daemons not in the transitional conf
+ * mark it as changed (it might be already changed, but its ok)
+ *
+ * In order to correctly handle the case in which we complete the state
+ * exchange from the *last* reg memb, don't throw out information here
+ * but rather when we get the next reg memb.
+ *
* Shift to GGT
*
* Note: there is no need to notify local members with a transitional group mess
- * becuase no message will come between the trans group memb and the next reg group memb.
+ * because no message will come between the trans group memb and the next reg group memb.
* Note: this cascading deletes of members that are not in transitional membership actually
* opens the door for implementation of the ERSADS97 algorithm.
*/
- Alarm( GROUPS, "G_handle_trans_memb in GGATHER\n");
+ Alarmp( SPLOG_INFO, GROUPS, "G_handle_trans_memb in GGATHER\n");
Trans_memb = trans_memb;
- Trans_memb_id = trans_memb_id; /* Need this because we deliver the transitional again if we complete
- * the state exchange during GGT. */
- Alarm( GROUPS, "G_handle_trans_memb: Received trans memb id of:"
- " {proc_id: %d "
- " time: %d}\n", Trans_memb_id.proc_id, Trans_memb_id.time );
+ Trans_memb_id = trans_memb_id; /* Need these either in G_handle_groups, or in
+ * G_handle_reg_memb, depending on whether we
+ * we complete state transfer for the current Reg_memb */
+
+ /* We don't actually *need* to mark these groups as changed in the loop below.
+ * It will cause some groups that were not changed by the last membership
+ * to receive notifications anyway if we do complete the state exchange in GGT
+ * Pros: This avoids a small loss of information about who received which messages.
+ * Cons: This isn't strictly required by EVS.
+ */
giter = sl_getlist( &GroupsList );
grp = (giter)?(group *)giter->data:NULL;
for( ; grp != NULL ; grp = nextgroup )
{
nextgroup = sl_next( &GroupsList, &giter );
- group_changed = 0;
- iter = sl_getlist( &grp->MembersList );
- mbr = (iter)?(member *)iter->data:NULL;
- for( ; mbr != NULL ; mbr = nextmember )
- {
- nextmember = sl_next( &grp->MembersList, &iter );
- if( Conf_id_in_conf( &Trans_memb, mbr->proc_id ) == -1 )
- {
- /* discard this member - proc no longer in membership */
- sl_remove ( &grp->MembersList,mbr->private_name, dispose);
- grp->num_members--;
-
- group_changed = 1;
- }
- }
- if( grp->num_members == 0 )
- {
- /* discard this empty group */
- sl_destruct ( &grp->MembersList, dispose);
- sl_remove ( &GroupsList, grp->name, dispose);
- Num_groups--;
- GlobalStatus.num_groups = Num_groups;
-
- }else if( group_changed ) {
- grp->changed = 1;
- }
+ group_changed = G_check_if_changed_by_cascade( grp );
+ if( group_changed )
+ grp->changed = 1;
}
Gstate = GGT;
@@ -676,7 +702,7 @@
break;
case GGT:
- Alarm( EXIT, "G_handle_trans_memb in GGT\n");
+ Alarmp( SPLOG_FATAL, GROUPS, "G_handle_trans_memb in GGT\n");
break;
}
@@ -685,67 +711,56 @@
void G_handle_join( char *private_group_name, char *group_name )
{
group *grp, *new_grp;
+ daemon_members *dmn, *new_dmn;
member *mbr, *new_mbr;
- int needed;
- char *num_vs_ptr; /* num members in virtual-synchrony/failure-atomicity set */
- int num_bytes;
char proc_name[MAX_PROC_NAME];
char private_name[MAX_PRIVATE_NAME+1];
- int new_p_ind, p_ind1;
- proc new_p, p1;
+ int new_p_ind;
+ proc new_p;
int ses;
mailbox new_mbox;
- message_link *mess_link;
- message_header *head_ptr;
- message_obj *msg, *joiner_msg;
- char *vs_ptr; /* the virtual synchrony set */
- int i;
- int32u temp;
- Alarm( GROUPS, "G_handle_join: %s joins group %s\n", private_group_name, group_name );
+ Alarmp( SPLOG_INFO, GROUPS, "G_handle_join: %s joins group %s\n", private_group_name, group_name );
switch( Gstate )
{
- case GOP:
- case GTRANS:
+ case GOP:
+ case GTRANS:
- if (Gstate == GOP) Alarm( GROUPS, "G_handle_join in GOP\n");
- if (Gstate == GTRANS) Alarm( GROUPS, "G_handle_join in GTRANS\n");
+ if (Gstate == GOP) Alarmp( SPLOG_INFO, GROUPS, "G_handle_join in GOP\n");
+ if (Gstate == GTRANS) Alarmp( SPLOG_INFO, GROUPS, "G_handle_join in GTRANS\n");
/*
- * if already in group then ignore
- * if the group is unchanged and new member is coming from alive daemon then:
- * Insert to group as established
- * Increment Grp_id
- * Notify all local members of a regular membership caused by join
- * else if group is changed and coming from alive daemon then
- * Insert to group as new
- * Increment Grp_id
- * if there are local members then
- * build a membership with all members, and vs set with all established members
- * notify all local established members with that membership (caused by network)
- * if new member is local
- * notify new member with membership and self vs set (caused by network)
- * notify all local members a transitional membership
- * mark new member as established
- * else (if new member is coming from a partitioned daemon then)
- * Insert to group as partitioned
- * Increment Grp_id, and mark group as changed if not already done
- * if there are local members then
- * build a membership with all members and vs set with all established members
- * notify all local members with that membership (caused by network)
- * notify all local members with transitional membership
- *
- * Note: remember that when delivering a regular message while in GTRANS, you should use the
- * mbox list of the group. You should still be cautious when delivering memberships to take
- * care of the fact that the new guy gets a different treatment.
+ * Find the group being joined
+ * If group doesn't exist, then add it [unchanged, with gid index 0]
+ * If in GOP, grp_id.memb_id is Reg_memb_id
+ * If in GTRANS, grp_id.memb_id is Trans_memb_id
+ * Find the daemon
+ * If daemon [for this group] doesn't exist, then add it
+ * Record partitioned status when adding, based on Gstate and Trans_memb
+ * Find or create the member
+ * If already in group then ignore (return)
+ * Initialize member object
+ * Increment group size
+ * Increment group ID
+ * Add mbox for local joiner
+ * Mark group as changed if daemon is partitioned
+ *
+ * If the group is changed
+ * Notify all local members of a regular membership caused by network
+ * Note: this regular membership lists the joiner in a separate vs_set from
+ * its daemon's other members
+ * Notify all local members of a transitional membership
+ * Else (if the group isn't changed)
+ * Notify all local members of a regular membership caused by join
+ * Update the group mask (not needed for Spread 3)
*/
G_private_to_names( private_group_name, private_name, proc_name );
new_p_ind = Conf_proc_by_name( proc_name, &new_p );
if( new_p_ind < 0 )
{
- Alarm( PRINT, "G_handle_join: illegal proc_name %s in private_group %s \n",
+ Alarmp( SPLOG_ERROR, GROUPS, "G_handle_join: illegal proc_name %s in private_group %s \n",
proc_name, private_group_name );
return;
}
@@ -755,261 +770,101 @@
new_grp = new( GROUP );
memset( new_grp->name, 0, MAX_GROUP_NAME );
strcpy( new_grp->name, group_name );
- sl_init( &new_grp->MembersList );
- sl_set_compare( &new_grp->MembersList,
- G_member_recordcompare,
- G_member_keycompare);
+ sl_init( &new_grp->DaemonsList );
+ sl_set_compare( &new_grp->DaemonsList,
+ G_daemon_recordcompare,
+ G_daemon_keycompare );
+ /* NOTE: Older versions of groups do mark a new group as changed if it's
+ * created in GTRANS. This is only needed if the joiner is partitioned
+ * from us [handled below]. */
+ new_grp->changed = 0;
if( Gstate == GOP) {
- new_grp->changed = 0;
new_grp->grp_id.memb_id = Reg_memb_id;
+
} else { /* Gtrans */
- new_grp->changed = 1;
new_grp->grp_id.memb_id = Trans_memb_id;
}
- new_grp->grp_id.index = 0; /* 0 because we will definitely increment it */
- Alarm( DEBUG, "G_handle_join: New group added with group id: " );
- G_print_group_id( new_grp->grp_id );
- Alarm( DEBUG, "\n" );
+ new_grp->grp_id.index = 0; /* This will be incremented to 1, below. */
+ Alarmp( SPLOG_DEBUG, GROUPS, "G_handle_join: New group added with group id:\n" );
+ G_print_group_id( SPLOG_DEBUG, new_grp->grp_id, "G_handle_join" );
new_grp->num_members = 0;
new_grp->num_local = 0;
sl_insert( &GroupsList, new_grp );
- Num_groups++; /*sl need this?*/
+ Num_groups++; /*sl need this? No, but its nice to have, kept global. */
GlobalStatus.num_groups = Num_groups;
grp = new_grp;
}
- mbr = G_get_member( grp, private_group_name );
+
+ dmn = G_get_daemon( grp, new_p.id );
+ if( dmn == NULL ) {
+ new_dmn = new( DAEMON_MEMBERS );
+ new_dmn->proc_id = new_p.id;
+ sl_init( &new_dmn->MembersList );
+ sl_set_compare( &new_dmn->MembersList, G_compare, G_compare );
+ sl_insert( &grp->DaemonsList, new_dmn );
+ /* Are we partitioned from this daemon? */
+ if( Gstate == GOP || ( Conf_id_in_conf( &Trans_memb, new_p.id ) != -1 ) ) {
+ new_dmn->memb_id = grp->grp_id.memb_id;
+ } else {
+ new_dmn->memb_id = unknown_memb_id;
+ }
+ dmn = new_dmn;
+ }
+
+ mbr = G_get_member( dmn, private_group_name );
if( mbr != NULL )
{
- Alarm( PRINT, "G_handle_join: %s is already in group %s\n",
+ Alarmp( SPLOG_ERROR, GROUPS, "G_handle_join: %s is already in group %s\n",
private_group_name, group_name );
return;
}
- /* Add a new member as ESTABLISHED (might change later on depending on the situation */
+ /* Add a new member */
new_mbr = new( MEMBER );
- memset( new_mbr->private_name, 0, MAX_GROUP_NAME );
- strcpy( new_mbr->private_name, private_group_name );
- new_mbr->proc_id = new_p.id;
- new_mbr->status = ESTABLISHED_MEMBER;
- new_mbr->p_ind = new_p_ind;
+ memset( new_mbr->name, 0, MAX_GROUP_NAME );
+ strcpy( new_mbr->name, private_group_name );
- sl_insert( &grp->MembersList, new_mbr );
+ sl_insert( &dmn->MembersList, new_mbr );
grp->num_members++;
+ grp->grp_id.index++;
/* if member is local then add mbox */
- if( new_mbr->proc_id == My.id )
+ if( dmn->proc_id == My.id )
{
ses = Sess_get_session( private_name );
- if( ses < 0 ) Alarm( EXIT, "G_handle_join: local session does not exist\n");
+ if( ses < 0 ) Alarmp( SPLOG_FATAL, GROUPS, "G_handle_join: local session does not exist\n" );
grp->mbox[ grp->num_local ] = Sessions[ ses ].mbox;
grp->num_local++;
new_mbox = Sessions[ ses ].mbox;
- }else new_mbox = -1;
-
- /* This is the meat */
- if( Gstate == GOP || ( Conf_id_in_conf( &Trans_memb, new_p.id ) != -1 ) )
- {
- /* new member is coming from alive daemon */
- if( !grp->changed )
- {
- /* group is unchanged */
- /* Increment group id */
- grp->grp_id.index++;
+ } else
+ new_mbox = -1;
- /* Notify local members */
- if( grp->num_local > 0 )
- {
- msg = Message_new_message();
- num_bytes = G_build_memb_buf( grp, msg, Mess_buf );
- head_ptr = Message_get_message_header(msg);
-
- head_ptr->type |= CAUSED_BY_JOIN ;
-
- /* notify all local members */
- num_vs_ptr = &Mess_buf[ num_bytes ];
- num_bytes += sizeof( int32 );
- temp = 1;
- memcpy( num_vs_ptr, &temp, sizeof( int32 ) ); /* *num_vs_ptr = 1; */
-
- vs_ptr = (char *)&Mess_buf[ num_bytes ];
- memcpy( vs_ptr, new_mbr->private_name, MAX_GROUP_NAME );
- num_bytes += MAX_GROUP_NAME;
-
- head_ptr->data_len += ( sizeof(int32) + MAX_GROUP_NAME );
-
- mess_link = new( MESSAGE_LINK );
- Message_Buffer_to_Message_Fragments( msg, Mess_buf, num_bytes );
- mess_link->mess = msg;
- Obj_Inc_Refcount(mess_link->mess);
-
- needed = 0;
- for( i=0; i < grp->num_local; i++ )
- {
- ses = Sess_get_session_index ( grp->mbox[i] );
- if( Is_memb_session( Sessions[ ses ].status ) )
- Sess_write( ses, mess_link, &needed );
- }
- if ( !needed ) Sess_dispose_message( mess_link );
- Message_Dec_Refcount(msg);
- }
- }else{
- /* group is changed */
- /* mark new member as new */
- new_mbr->status = NEW_MEMBER;
-
- /* Increment group id */
- grp->grp_id.index++;
+ if( Is_partitioned_daemon( dmn ) && !grp->changed )
+ grp->changed = 1;
- if( grp->num_local > 0 )
- {
- /* build a membership with all members, and vs set with all established members */
- msg = Message_new_message();
- num_bytes = G_build_memb_vs_buf( grp, msg, Mess_buf, CAUSED_BY_NETWORK );
-
- /* notify all non-new local members with that membership (caused by network) */
- mess_link = new( MESSAGE_LINK );
- Message_Buffer_to_Message_Fragments( msg, Mess_buf, num_bytes );
- mess_link->mess = msg;
- Obj_Inc_Refcount(mess_link->mess);
-
- needed = 0;
- for( i=0; i < grp->num_local; i++ )
- {
- /* if new member is local we do not notify it here */
- if( grp->mbox[i] == new_mbox ) continue;
-
- ses = Sess_get_session_index ( grp->mbox[i] );
- if( Is_memb_session( Sessions[ ses ].status ) )
- Sess_write( ses, mess_link, &needed );
- }
- if ( !needed ) Sess_dispose_message( mess_link );
- Message_Dec_Refcount(msg);
-
- /* notify new member if local */
- if( new_mbox != -1 )
- {
- /* build a membership with all members */
- joiner_msg = Message_new_message();
- num_bytes = G_build_memb_buf( grp, joiner_msg, Mess_buf );
- head_ptr = Message_get_message_header(joiner_msg);
- head_ptr->type |= CAUSED_BY_NETWORK ;
- /* build a self vs set */
- num_vs_ptr = &Mess_buf[ num_bytes ];
- num_bytes += sizeof( int32 );
- temp = 1;
- memcpy( num_vs_ptr, &temp, sizeof( int32 ) ); /* *num_vs_ptr = 1; */
- vs_ptr = (char *)&Mess_buf[ num_bytes ];
- memcpy( vs_ptr, new_mbr->private_name, MAX_GROUP_NAME );
- num_bytes += MAX_GROUP_NAME;
- head_ptr->data_len += ( sizeof(int32) + MAX_GROUP_NAME );
- mess_link = new( MESSAGE_LINK );
- Message_Buffer_to_Message_Fragments( joiner_msg, Mess_buf, num_bytes );
- mess_link->mess = joiner_msg;
- Obj_Inc_Refcount(mess_link->mess);
-
- needed = 0;
- ses = Sess_get_session_index ( new_mbox );
- if( Is_memb_session( Sessions[ ses ].status ) )
- Sess_write( ses, mess_link, &needed );
- if ( !needed ) Sess_dispose_message( mess_link );
- Message_Dec_Refcount(joiner_msg);
- }
- /* notify all local members a transitional membership */
- mess_link = G_build_trans_mess( grp );
- needed = 0;
- for( i=0; i < grp->num_local; i++ )
- {
- ses = Sess_get_session_index ( grp->mbox[i] );
- if( Is_memb_session( Sessions[ ses ].status ) )
- Sess_write( ses, mess_link, &needed );
- }
- if( !needed ) Sess_dispose_message( mess_link );
- }
- /* Mark new member as established */
- new_mbr->status = ESTABLISHED_MEMBER;
- }
- }else{
- /* coming from a partitioned daemon */
- /* mark new member as partitioned member */
- new_mbr->status = PARTITIONED_MEMBER;
- /*
- * (marking group as changed - it might be already )
- */
- if( !grp->changed ) grp->changed = 1;
- grp->grp_id.index++;
- if( grp->num_local > 0 )
+ if( grp->num_local > 0 ) {
+ if( grp->changed )
{
- /* build a membership with all members, and vs set with all non-partitioned members */
- msg = Message_new_message();
- num_bytes = G_build_memb_vs_buf( grp, msg, Mess_buf, CAUSED_BY_NETWORK );
-
- /* notify all local members with that membership (caused by network) */
- mess_link = new( MESSAGE_LINK );
- Message_Buffer_to_Message_Fragments( msg, Mess_buf, num_bytes );
- mess_link->mess = msg;
- Obj_Inc_Refcount(mess_link->mess);
-
- needed = 0;
- for( i=0; i < grp->num_local; i++ )
- {
- ses = Sess_get_session_index ( grp->mbox[i] );
- if( Is_memb_session( Sessions[ ses ].status ) )
- Sess_write( ses, mess_link, &needed );
- }
- if ( !needed ) Sess_dispose_message( mess_link );
- Message_Dec_Refcount(msg);
-
- /* notify all local members a transitional membership */
- mess_link = G_build_trans_mess( grp );
-
- needed = 0;
- for( i=0; i < grp->num_local; i++ )
- {
- ses = Sess_get_session_index ( grp->mbox[i] );
- if( Is_memb_session( Sessions[ ses ].status ) )
- Sess_write( ses, mess_link, &needed );
- }
- if( !needed ) Sess_dispose_message( mess_link );
+ G_send_heavyweight_join( grp, new_mbr, new_mbox );
+ G_send_trans_memb( grp );
+ } else {
+ G_send_lightweight_memb( grp, CAUSED_BY_JOIN, new_mbr->name );
}
}
+
/* Compute the mask */
- for(i=0; i<4; i++)
- {
- grp->grp_mask[i] = 0;
- }
- {
- struct skiplistnode *iter;
- member *memp;
- /*
- for( mbr= &grp->members; mbr->next != NULL; mbr=mbr->next )
- {
- p_ind1 = Conf_proc_by_id( mbr->next->proc_id, &p1 ); */
- for( iter = sl_getlist( &grp->MembersList ),
- memp=(member *)iter->data;
- iter != NULL;
- memp = (member *)sl_next( &grp->MembersList, &iter )) {
- p_ind1 = Conf_proc_by_id( memp->proc_id, &p1 );
- temp = 1;
- for(i=0; i<p1.seg_index%32; i++)
- {
- temp *= 2;
- }
- grp->grp_mask[p1.seg_index/32] |= temp;
- }
- }
- Alarm(GROUPS, "G_handle_join: Mask for group %s set to %x %x %x %x\n",
- grp->name, grp->grp_mask[3], grp->grp_mask[2], grp->grp_mask[1], grp->grp_mask[0]);
+ G_compute_group_mask( grp, "G_handle_join" );
break;
- case GGATHER:
- Alarm( EXIT, "G_handle_join in GGATHER\n");
+ case GGATHER:
+ Alarmp( SPLOG_FATAL, GROUPS, "G_handle_join in GGATHER\n");
break;
- case GGT:
- Alarm( EXIT, "G_handle_join in GGT\n");
+ case GGT:
+ Alarmp( SPLOG_FATAL, GROUPS, "G_handle_join in GGT\n");
break;
}
@@ -1021,88 +876,72 @@
char proc_name[MAX_PROC_NAME];
char private_name[MAX_PRIVATE_NAME+1];
char departing_private_group_name[MAX_GROUP_NAME];
- int p_ind, p_ind1;
- proc p, p1;
+ int p_ind;
+ proc p;
+ int ses;
group *grp;
+ daemon_members *dmn;
member *mbr;
- char *num_vs_ptr; /* num members in vs set */
- char *vs_ptr; /* the virtual synchrony set */
- message_link *mess_link;
- message_header *head_ptr;
- message_obj *msg;
- int num_bytes;
- int needed;
- int ses;
- int i, j;
- int32u temp;
+ int i,j;
- Alarm( GROUPS, "G_handle_leave: %s leaves group %s\n", private_group_name, group_name );
+ Alarmp( SPLOG_INFO, GROUPS, "G_handle_leave: %s leaves group %s\n", private_group_name, group_name );
switch( Gstate )
{
case GOP:
case GTRANS:
- if (Gstate == GOP) Alarm( GROUPS, "G_handle_leave in GOP\n");
- if (Gstate == GTRANS) Alarm( GROUPS, "G_handle_leave in GTRANS\n");
+ if (Gstate == GOP) Alarmp( SPLOG_INFO, GROUPS, "G_handle_leave in GOP\n");
+ if (Gstate == GTRANS) Alarmp( SPLOG_INFO, GROUPS, "G_handle_leave in GTRANS\n");
/*
- * if not already in group then ignore
- * if this member is local, notify it and extract its mbox
- * Extract this member from group
- * if the group is unchanged (in GOP all groups are unchanged) then:
- * Increment Grp_id
- * Notify all local members of a regular membership caused by leave
+ * If not already in group then ignore
+ * If this member is local, notify it [Self Leave] and extract its mbox
+ * Extract this member from group (and discard empty daemons/groups)
+ * Increment Grp_id
+ * If the group is changed, then:
+ * Notify all local members of a regular membership caused by network
+ * with all ESTABLISHED members in the vs_set
+ * Notify all local members of a transitional membership
+ * If the group is unchanged (in GOP all groups are unchanged) then:
+ * Notify all local members of a regular membership caused by leave
+ * Update the group mask
*/
G_private_to_names( private_group_name, private_name, proc_name );
p_ind = Conf_proc_by_name( proc_name, &p );
if( p_ind < 0 )
{
- Alarm( PRINT, "G_handle_leave: illegal proc_name %s in private_group %s \n",
+ Alarmp( SPLOG_ERROR, GROUPS, "G_handle_leave: illegal proc_name %s in private_group %s \n",
proc_name, private_group_name );
return;
}
grp = G_get_group( group_name );
if( grp == NULL )
{
- Alarm( PRINT, "G_handle_leave: group %s does not exist\n",
+ Alarmp( SPLOG_ERROR, GROUPS, "G_handle_leave: group %s does not exist\n",
group_name );
return;
}
- mbr = G_get_member( grp, private_group_name );
+ dmn = G_get_daemon( grp, p.id );
+ if( dmn == NULL )
+ {
+ Alarmp( SPLOG_ERROR, GROUPS, "G_handle_leave: daemon %s doesn't exist in group %s\n",
+ proc_name, group_name );
+ return;
+ }
+ mbr = G_get_member( dmn, private_group_name );
if( mbr == NULL )
{
- Alarm( PRINT, "G_handle_leave: member %s does not exist in group %s\n",
- private_group_name, group_name );
+ Alarmp( SPLOG_ERROR, GROUPS, "G_handle_leave: member %s does not exist in daemon/group %s/%s\n",
+ private_group_name, proc_name, group_name );
return;
}
if( p.id == My.id )
{
/* notify this local member and extract its mbox from group */
- msg = Message_new_message();
- head_ptr = Message_get_message_header(msg);
- head_ptr->type = CAUSED_BY_LEAVE;
- head_ptr->type = Set_endian( head_ptr->type );
- head_ptr->hint = Set_endian( 0 );
- memcpy( head_ptr->private_group_name, grp->name, MAX_GROUP_NAME );
- head_ptr->num_groups = 0;
- head_ptr->data_len = 0;
-
- /* create the mess_link */
- mess_link = new( MESSAGE_LINK );
- /* NOTE: Mess_buf contents are NOT used here. We only examine "0" bytes of it
- * We just need a valid pointer here to prevent faults */
- Message_Buffer_to_Message_Fragments( msg, Mess_buf, 0);
- mess_link->mess = msg;
- Obj_Inc_Refcount(mess_link->mess);
- /* notify member */
- needed = 0;
ses = Sess_get_session( private_name );
- if( Is_memb_session( Sessions[ ses ].status ) )
- Sess_write( ses, mess_link, &needed );
- if( !needed ) Sess_dispose_message( mess_link );
-
+ G_send_self_leave( grp, ses );
/* extract this mbox */
for( i=0, j=0; i < grp->num_local; i++,j++ )
{
@@ -1110,105 +949,53 @@
else grp->mbox[j] = grp->mbox[i];
}
grp->num_local--;
- Message_Dec_Refcount(msg);
}
/* extract this member from group */
- memcpy( departing_private_group_name, mbr->private_name, MAX_GROUP_NAME );
- sl_remove( &grp->MembersList, mbr->private_name, dispose );
+ memcpy( departing_private_group_name, mbr->name, MAX_GROUP_NAME );
+ sl_remove( &dmn->MembersList, mbr->name, dispose );
grp->num_members--;
+ if( dmn->MembersList.size == 0 )
+ {
+ G_remove_daemon( grp, dmn );
+ }
if( grp->num_members == 0 )
{
/* discard this empty group */
- sl_destruct ( &grp->MembersList, dispose);
- sl_remove( &GroupsList, grp->name, dispose );
- Num_groups--;
- GlobalStatus.num_groups = Num_groups;
- return;
+ G_remove_group( grp );
+ return;
}
- if( grp->changed )
- {
- if( Gstate != GTRANS ) Alarm( EXIT, "G_handle_leave: changed group in GOP\n");
- /*
- * If the group is changed (in GTRANS) then there is no need
- * to increment group id or to notify the local members.
- * They will get a group membership after the state transfer
- * terminates.
- */
- return;
- }
-
/* Increment group id */
- grp->grp_id.index++;
+ grp->grp_id.index++;
+
+ /* Note: Groups become changed because they include partitioned daemons.
+ * We never need to mark a group as changed here, or in G_handle_kill.
+ */
if( grp->num_local > 0 )
{
- /* notify all local members */
- msg = Message_new_message();
- num_bytes = G_build_memb_buf( grp, msg, Mess_buf );
- head_ptr = Message_get_message_header(msg);
- head_ptr->type |= CAUSED_BY_LEAVE ;
-
- /* notify all local members */
- num_vs_ptr = &Mess_buf[ num_bytes ];
- num_bytes += sizeof( int32 );
- temp = 1;
- memcpy( num_vs_ptr, &temp, sizeof( int32 ) ); /* *num_vs_ptr = 1; */
-
- vs_ptr = (char *)&Mess_buf[ num_bytes ];
- memcpy( vs_ptr, departing_private_group_name, MAX_GROUP_NAME );
- num_bytes += MAX_GROUP_NAME;
-
- head_ptr->data_len += ( sizeof(int32) + MAX_GROUP_NAME );
-
- mess_link = new( MESSAGE_LINK );
- Message_Buffer_to_Message_Fragments( msg, Mess_buf, num_bytes );
- mess_link->mess = msg;
- Obj_Inc_Refcount(mess_link->mess);
- needed = 0;
- for( i=0; i < grp->num_local; i++ )
- {
- ses = Sess_get_session_index ( grp->mbox[i] );
- if( Is_memb_session( Sessions[ ses ].status ) )
- Sess_write( ses, mess_link, &needed );
- }
- if ( !needed ) Sess_dispose_message( mess_link );
- Message_Dec_Refcount(msg);
+ if( grp->changed )
+ {
+ G_send_heavyweight_memb( grp );
+ G_send_trans_memb( grp );
+ } else {
+ G_send_lightweight_memb( grp, CAUSED_BY_LEAVE, departing_private_group_name );
+ }
}
+
/* Compute the mask */
- for(i=0; i<4; i++)
- {
- grp->grp_mask[i] = 0;
- }
- {
- struct skiplistnode *iter;
- member *memp;
- for( iter = sl_getlist( &grp->MembersList ),
- memp=(member *)iter->data;
- iter != NULL;
- memp = (member *)sl_next( &grp->MembersList, &iter )) {
- p_ind1 = Conf_proc_by_id( memp->proc_id, &p1 );
- temp = 1;
- for(i=0; i<p1.seg_index%32; i++)
- {
- temp *= 2;
- }
- grp->grp_mask[p1.seg_index/32] |= temp;
- }
- }
+ G_compute_group_mask( grp, "G_handle_leave" );
- Alarm(GROUPS, "G_handle_leave: Mask for group %s set to %x %x %x %x\n",
- grp->name, grp->grp_mask[3], grp->grp_mask[2], grp->grp_mask[1], grp->grp_mask[0]);
break;
case GGATHER:
- Alarm( EXIT, "G_handle_leave in GGATHER\n");
+ Alarmp( SPLOG_FATAL, GROUPS, "G_handle_leave in GGATHER\n");
break;
case GGT:
- Alarm( EXIT, "G_handle_leave in GGT\n");
+ Alarmp( SPLOG_FATAL, GROUPS, "G_handle_leave in GGT\n");
break;
}
@@ -1219,44 +1006,42 @@
char proc_name[MAX_PROC_NAME];
char private_name[MAX_PRIVATE_NAME+1];
char departing_private_group_name[MAX_GROUP_NAME];
- int p_ind, p_ind1;
- proc p, p1;
+ int p_ind;
+ proc p;
group *grp, *nextgroup;
+ daemon_members *dmn;
member *mbr;
- char *num_vs_ptr; /* num members in vs set */
- char *vs_ptr; /* the virtual synchrony set */
- message_link *mess_link;
- message_header *head_ptr;
- message_obj *msg;
- int num_bytes;
- int needed;
int ses = -1; /* Fool compiler */
int i, j;
- int32u temp;
struct skiplistnode *giter;
- Alarm( GROUPS, "G_handle_kill: %s is killed\n", private_group_name );
+ Alarmp( SPLOG_INFO, GROUPS, "G_handle_kill: %s is killed\n", private_group_name );
switch( Gstate )
{
case GOP:
case GTRANS:
- if (Gstate == GOP) Alarm( GROUPS, "G_handle_kill in GOP\n");
- if (Gstate == GTRANS) Alarm( GROUPS, "G_handle_kill in GTRANS\n");
+ if (Gstate == GOP) Alarmp( SPLOG_INFO, GROUPS, "G_handle_kill in GOP\n");
+ if (Gstate == GTRANS) Alarmp( SPLOG_INFO, GROUPS, "G_handle_kill in GTRANS\n");
/*
- * for every group this guy is a member of
- * Extract this member from group
- * if the group is unchanged (in GOP all groups are unchanged) then:
- * Increment Grp_id
- * Notify all local members of a regular membership caused by disconnet
+ * For every group this guy is a member of
+ * Extract this member from group (and discard empty daemons/groups)
+ * Increment Grp_id
+ * If the group is changed, then:
+ * Notify all local members of a regular membership caused by network
+ * with all ESTABLISHED members in the vs_set
+ * Notify all local members of a transitional membership
+ * If the group is unchanged (in GOP all groups are unchanged) then:
+ * Notify all local members of a regular membership caused by disconnect
+ * Update the group mask
*/
G_private_to_names( private_group_name, private_name, proc_name );
p_ind = Conf_proc_by_name( proc_name, &p );
if( p_ind < 0 )
{
- Alarm( PRINT, "G_handle_kill: illegal proc_name %s in private_group %s \n",
+ Alarmp( SPLOG_ERROR, GROUPS, "G_handle_kill: illegal proc_name %s in private_group %s \n",
proc_name, private_group_name );
return;
}
@@ -1271,7 +1056,9 @@
choose to remove it it doesn't screw up the iterator.
Then next time through use this "next" value */
nextgroup = sl_next( &GroupsList, &giter );
- mbr = G_get_member( grp, private_group_name );
+ dmn = G_get_daemon( grp, p.id );
+ if( dmn == NULL ) continue; /* member's daemon not in group */
+ mbr = G_get_member( dmn, private_group_name );
if( mbr == NULL ) continue; /* no such member in that group */
/* Extract this member from group */
@@ -1285,137 +1072,264 @@
}
grp->num_local--;
}
- memcpy( departing_private_group_name, mbr->private_name, MAX_GROUP_NAME );
- sl_remove( &grp->MembersList, mbr->private_name, dispose );
- grp->num_members--;
- if( grp->num_members == 0 )
- {
- sl_destruct ( &grp->MembersList, dispose);
- sl_remove( &GroupsList, grp->name, dispose );
- Num_groups--;
- GlobalStatus.num_groups = Num_groups;
- continue;
- }
-
- if( grp->changed )
- {
- if( Gstate != GTRANS ) Alarm( EXIT, "G_handle_kill: changed group in GOP\n");
- /*
- * If the group is changed (in GTRANS) then there is no need
- * to increment group id or to notify the local members.
- * They will get a group membership after the state transfer
- * terminates.
- */
- continue;
- }
+ memcpy( departing_private_group_name, mbr->name, MAX_GROUP_NAME );
+ sl_remove( &dmn->MembersList, mbr->name, dispose );
+ grp->num_members--;
+ if( dmn->MembersList.size == 0 )
+ {
+ G_remove_daemon( grp, dmn );
+ }
+ if( grp->num_members == 0 )
+ {
+ /* discard this empty group */
+ G_remove_group( grp );
+ continue;
+ }
/* Increment group id */
grp->grp_id.index++;
- /* Compute the mask */
- for(i=0; i<4; i++)
- {
- grp->grp_mask[i] = 0;
- }
- {
- struct skiplistnode *iter;
- member *memp;
- for( iter = sl_getlist( &grp->MembersList ),
- memp=(member *)iter->data;
- iter != NULL;
- memp = (member *)sl_next( &grp->MembersList, &iter )) {
- p_ind1 = Conf_proc_by_id( memp->proc_id, &p1 );
- temp = 1;
- for(i=0; i<p1.seg_index%32; i++)
- {
- temp *= 2;
- }
- grp->grp_mask[p1.seg_index/32] |= temp;
- }
- }
-
- Alarm(GROUPS, "G_handle_kill: Mask for group %s set to %x %x %x %x\n",
- grp->name, grp->grp_mask[3], grp->grp_mask[2], grp->grp_mask[1], grp->grp_mask[0]);
-
- if( grp->num_local > 0 )
- {
- /* notify all local members */
- msg = Message_new_message();
- num_bytes = G_build_memb_buf( grp, msg, Mess_buf );
- head_ptr = Message_get_message_header(msg);
-
- head_ptr->type |= CAUSED_BY_DISCONNECT ;
-
- num_vs_ptr = &Mess_buf[ num_bytes ];
- num_bytes += sizeof( int32 );
- temp = 1;
- memcpy( num_vs_ptr, &temp, sizeof( int32 ) ); /* *num_vs_ptr = 1; */
-
- vs_ptr = (char *)&Mess_buf[ num_bytes ];
- memcpy( vs_ptr, departing_private_group_name, MAX_GROUP_NAME );
- num_bytes += MAX_GROUP_NAME;
-
- head_ptr->data_len += ( sizeof(int32) + MAX_GROUP_NAME );
-
- mess_link = new( MESSAGE_LINK );
- Message_Buffer_to_Message_Fragments( msg, Mess_buf, num_bytes );
- mess_link->mess = msg;
- Obj_Inc_Refcount(mess_link->mess);
- needed = 0;
- for( i=0; i < grp->num_local; i++ )
- {
- int temp_ses;
+ if( grp->num_local > 0 )
+ {
+ if( grp->changed )
+ {
+ G_send_heavyweight_memb( grp );
+ G_send_trans_memb( grp );
+ } else {
+ G_send_lightweight_memb( grp, CAUSED_BY_DISCONNECT,
+ departing_private_group_name );
+ }
+ }
- temp_ses = Sess_get_session_index ( grp->mbox[i] );
- if( Is_memb_session( Sessions[ temp_ses ].status ) )
- Sess_write( temp_ses, mess_link, &needed );
- }
- if ( !needed ) Sess_dispose_message( mess_link );
- Message_Dec_Refcount(msg);
- }
+ /* Compute the mask */
+ G_compute_group_mask( grp, "G_handle_kill" );
}
break;
case GGATHER:
- Alarm( EXIT, "G_handle_kill in GGATHER\n");
+ Alarmp( SPLOG_FATAL, GROUPS, "G_handle_kill in GGATHER\n");
break;
case GGT:
- Alarm( EXIT, "G_handle_kill in GGT\n");
+ Alarmp( SPLOG_FATAL, GROUPS, "G_handle_kill in GGT\n");
break;
}
}
-void G_handle_groups( message_link *mess_link )
+static void G_send_lightweight_memb( group *grp, int32 caused, char *private_group_name )
{
- char *memb_id_ptr;
- membership_id temp_memb_id;
+ int needed;
+ int num_bytes;
+ int ses;
+ message_link *mess_link;
+ message_header *head_ptr;
message_obj *msg;
+ int i;
+ int32u temp;
+ char *num_vs_sets_ptr; /* number of vs_sets */
+ char *vs_set_offset_ptr; /* Byte offset into the vs_set region of my vs_set */
+ char *num_vs_ptr; /* num members in virtual-synchrony/failure-atomicity set */
+ char *vs_ptr; /* the virtual synchrony set */
+
+ msg = Message_new_message();
+ num_bytes = G_build_memb_buf( grp, msg, Mess_buf, caused );
+
+ num_vs_sets_ptr = &Mess_buf[num_bytes];
+ num_bytes += sizeof( int32u );
+ temp = 1;
+ memcpy( num_vs_sets_ptr, &temp, sizeof(int32u) ); /* 1 vs_set */
+
+ vs_set_offset_ptr = &Mess_buf[num_bytes];
+ num_bytes += sizeof( int32u );
+
+ num_vs_ptr = &Mess_buf[ num_bytes ];
+ num_bytes += sizeof( int32u );
+ temp = 1;
+ memcpy( num_vs_ptr, &temp, sizeof( int32u ) ); /* with 1 member */
+
+ temp = 0;
+ memcpy( vs_set_offset_ptr, &temp, sizeof(int32u) ); /* offset is zero, always */
+
+ vs_ptr = (char *)&Mess_buf[ num_bytes ]; /* vs_set has joiner/leaver/disconnecter */
+ memcpy( vs_ptr, private_group_name, MAX_GROUP_NAME );
+ num_bytes += MAX_GROUP_NAME;
+
+ head_ptr = Message_get_message_header(msg);
+ head_ptr->data_len += ( 3*sizeof(int32) + MAX_GROUP_NAME );
+
+ mess_link = new( MESSAGE_LINK );
+ Message_Buffer_to_Message_Fragments( msg, Mess_buf, num_bytes );
+ mess_link->mess = msg;
+ Obj_Inc_Refcount(mess_link->mess);
+
+ needed = 0;
+ for( i=0; i < grp->num_local; i++ )
+ {
+ ses = Sess_get_session_index ( grp->mbox[i] );
+ if( Is_memb_session( Sessions[ ses ].status ) )
+ Sess_write( ses, mess_link, &needed );
+ }
+ if ( !needed ) Sess_dispose_message( mess_link );
+ Message_Dec_Refcount(msg);
+}
+
+static void G_send_self_leave( group *grp, int ses )
+{
+ message_link *mess_link;
message_header *head_ptr;
+ message_obj *msg;
+ int needed;
- Alarm( GROUPS, "G_handle_groups: \n" );
+ msg = Message_new_message();
+ head_ptr = Message_get_message_header(msg);
+ head_ptr->type = CAUSED_BY_LEAVE;
+ head_ptr->type = Set_endian( head_ptr->type );
+ head_ptr->hint = Set_endian( 0 );
+ memcpy( head_ptr->private_group_name, grp->name, MAX_GROUP_NAME );
+ head_ptr->num_groups = 0;
+ head_ptr->data_len = 0;
+
+ /* create the mess_link */
+ mess_link = new( MESSAGE_LINK );
+ /* NOTE: Mess_buf contents are NOT used here. We only examine "0" bytes of it
+ * We just need a valid pointer here to prevent faults */
+ Message_Buffer_to_Message_Fragments( msg, Mess_buf, 0);
+ mess_link->mess = msg;
+ Obj_Inc_Refcount(mess_link->mess);
+ /* notify member */
+ needed = 0;
+ if( Is_memb_session( Sessions[ ses ].status ) )
+ Sess_write( ses, mess_link, &needed );
+ if( !needed ) Sess_dispose_message( mess_link );
+ Message_Dec_Refcount(msg);
+}
+
+static void G_send_heavyweight_memb( group *grp )
+{
+ G_send_heavyweight_join( grp, NULL, -1 );
+}
+
+/* If there is a new member, then joiner will be non-null.
+ * new_mbox is -1 unless there is a new member, and it's local. */
+static void G_send_heavyweight_join( group *grp, member *joiner, mailbox new_mbox )
+{
+ int num_bytes;
+ message_link *mess_link, *joiner_mess_link;
+ message_header *head_ptr;
+ message_obj *msg, *joiner_msg;
+ int32u temp;
+ char *local_vs_set_offset_ptr;
+ int needed, joiner_needed;
+ int ses;
+ int i;
+
+ msg = Message_new_message();
+ num_bytes = G_build_memb_vs_buf( grp, msg, Mess_buf, CAUSED_BY_NETWORK, joiner );
+
+ /* create the mess_link */
+ mess_link = new( MESSAGE_LINK );
+ Message_Buffer_to_Message_Fragments( msg, Mess_buf, num_bytes );
+ mess_link->mess = msg;
+ Obj_Inc_Refcount(mess_link->mess);
+
+ /* notify local members */
+ needed = 0;
+ for( i=0; i < grp->num_local; i++ )
+ {
+ /* if new member is local we do not notify it here. */
+ if( joiner != NULL && grp->mbox[i] == new_mbox ) continue;
- switch( Gstate )
+ ses = Sess_get_session_index ( grp->mbox[i] );
+ if( Is_memb_session( Sessions[ ses ].status ) )
+ Sess_write( ses, mess_link, &needed );
+ }
+
+ /* notify new member if local */
+ if( new_mbox != -1 )
+ {
+ /* Use (mostly) the same message as was sent to the other local members. */
+ joiner_msg = Message_new_message();
+ head_ptr = Message_get_message_header(joiner_msg);
+ memcpy( head_ptr, Message_get_message_header(msg), sizeof(message_header) );
+
+ /* Change the local vs_set offset to be the right one for the joiner. */
+ local_vs_set_offset_ptr = &Mess_buf[
+ head_ptr->num_groups * MAX_GROUP_NAME + sizeof(group_id) + sizeof(int32u)];
+ /* Offset starts from the first vs_set's size, and goes to the size of the last. */
+ temp = head_ptr->data_len
+ - (sizeof(int32u) + MAX_GROUP_NAME)
+ - (sizeof(group_id) + 2*sizeof(int32u));
+ memcpy( local_vs_set_offset_ptr, &temp, sizeof(int32u) );
+
+ joiner_mess_link = new( MESSAGE_LINK );
+ Message_Buffer_to_Message_Fragments( joiner_msg, Mess_buf, num_bytes );
+ joiner_mess_link->mess = joiner_msg;
+ Obj_Inc_Refcount(joiner_mess_link->mess);
+
+ joiner_needed = 0;
+ ses = Sess_get_session_index ( new_mbox );
+ if( Is_memb_session( Sessions[ ses ].status ) )
+ Sess_write( ses, joiner_mess_link, &joiner_needed );
+ if ( !joiner_needed ) Sess_dispose_message( joiner_mess_link );
+ Message_Dec_Refcount(joiner_msg);
+ }
+ if( !needed ) Sess_dispose_message( mess_link );
+ Message_Dec_Refcount(msg);
+}
+
+static void G_send_trans_memb( group *grp )
+{
+ message_link *mess_link;
+ int needed;
+ int ses;
+ int i;
+
+ /* send members transitional membership */
+ mess_link = G_build_trans_mess( grp );
+ needed = 0;
+ for( i=0; i < grp->num_local; i++ )
+ {
+ ses = Sess_get_session_index ( grp->mbox[i] );
+ if( Is_memb_session( Sessions[ ses ].status ) )
+ Sess_write( ses, mess_link, &needed );
+ }
+ if( !needed ) Sess_dispose_message( mess_link );
+}
+
+void G_handle_groups( message_link *mess_link )
+{
+ char *memb_id_ptr;
+ membership_id temp_memb_id;
+ message_obj *msg;
+ message_header *head_ptr;
+ proc p;
+ int32u num_daemons_represented;
+ int needed;
+ groups_message_link *grp_mlink = NULL;
+
+ Alarmp( SPLOG_INFO, GROUPS, "G_handle_groups: \n" );
+
+ switch( Gstate )
{
case GOP:
- Alarm( EXIT, "G_handle_groups in GOP\n");
+ Alarmp( SPLOG_FATAL, GROUPS, "G_handle_groups in GOP\n");
break;
case GTRANS:
- Alarm( EXIT, "G_handle_groups in GTRANS\n");
+ Alarmp( SPLOG_FATAL, GROUPS, "G_handle_groups in GTRANS\n");
break;
case GGATHER:
case GGT:
- if (Gstate == GGATHER) Alarm( GROUPS, "G_handle_groups in GGATHER\n");
- if (Gstate == GGT) Alarm( GROUPS, "G_handle_groups in GGT\n");
+ if (Gstate == GGATHER) Alarmp( SPLOG_INFO, GROUPS, "G_handle_groups in GGATHER\n");
+ if (Gstate == GGT) Alarmp( SPLOG_INFO, GROUPS, "G_handle_groups in GGT\n");
msg = mess_link->mess;
Obj_Inc_Refcount(msg);
@@ -1430,27 +1344,78 @@
}
if( ! Memb_is_equal( temp_memb_id, Reg_memb_id ) )
{
- Alarm( GROUPS,
+ Alarmp( SPLOG_INFO, GROUPS,
"G_handle_groups: GROUPS message received from bad memb id proc %d, time %d, daemon %s.\n",
temp_memb_id.proc_id, temp_memb_id.time, head_ptr->private_group_name );
Sess_dispose_message( mess_link );
Message_Dec_Refcount(msg);
return;
}
+ if (0 > Conf_proc_by_name( head_ptr->private_group_name , &p ) )
+ {
+ Alarmp( SPLOG_ERROR, GROUPS, "G_handle_groups: Groups message from someone (%s) not in conf\n",
+ head_ptr->private_group_name);
+ Sess_dispose_message( mess_link );
+ Message_Dec_Refcount(msg);
+ return;
+ }
- mess_link->next = Gathered.next;
- Gathered.next = mess_link;
- Num_mess_gathered++;
+ /* This is a message from my rep -- don't process it. */
+ if( Is_synced_set_leader(p.id) )
+ {
+ grp_mlink = &Gathered;
+ needed = 0;
+ /* else, find the appropriate rep's message list. */
+ } else {
+ needed = 1;
+ for( grp_mlink = Gathered.next; grp_mlink != NULL; grp_mlink = grp_mlink->next )
+ if( p.id == grp_mlink->rep_proc_id )
+ break;
+ if( grp_mlink == NULL )
+ {
+ grp_mlink = new( GROUPS_MESSAGE_LINK );
+ grp_mlink->rep_proc_id = p.id;
+ grp_mlink->complete = 0;
+ mess_link->next = NULL;
+ grp_mlink->first = mess_link;
+ grp_mlink->next = Gathered.next;
+ Gathered.next = grp_mlink;
+ } else {
+ mess_link->next = grp_mlink->first;
+ grp_mlink->first = mess_link;
+ }
+ }
+
+ Num_mess_gathered++;
/* The last Groups message a daemon sends is AGREED. */
- if( Is_agreed_mess( head_ptr->type ) ) Num_daemons_gathered++;
- Alarm( GROUPS, "G_handle_groups: GROUPS message received from %s - msgs %d, daemons %d\n",
+ if( Is_agreed_mess( head_ptr->type ) )
+ {
+ /* The last 4 bytes of the private group name field are overridden to hold
+ * the size of the synced set of this daemon. This way, G_handle_groups
+ * doesn't have to find the groups message containing the synced set. */
+ memcpy( &num_daemons_represented,
+ &(head_ptr->private_group_name[MAX_GROUP_NAME-sizeof(int32u)]),
+ sizeof(int32u) );
+ if( !Same_endian( head_ptr->type ) )
+ num_daemons_represented = Flip_int32( num_daemons_represented );
+ Num_daemons_gathered += num_daemons_represented;
+ grp_mlink->complete = 1;
+ }
+
+ Alarmp( SPLOG_INFO, GROUPS, "G_handle_groups: GROUPS message received from %s - msgs %d, daemons %d\n",
head_ptr->private_group_name, Num_mess_gathered, Num_daemons_gathered );
+
+ /* At this point, we no longer need to work with the message object in this function. */
+ if( !needed )
+ Sess_dispose_message( mess_link );
+ Message_Dec_Refcount(msg);
+
+
if( Num_daemons_gathered != Conf_num_procs( &Reg_memb ) )
{
- Message_Dec_Refcount(msg);
return;
}
- Alarm( GROUPS, "G_handle_groups: Last GROUPS message received - msgs %d, daemons %d\n",
+ Alarmp( SPLOG_INFO, GROUPS, "G_handle_groups: Last GROUPS message received - msgs %d, daemons %d\n",
Num_mess_gathered, Num_daemons_gathered );
/* Replace protocol queue */
Prot_set_down_queue( NORMAL_DOWNQUEUE );
@@ -1476,327 +1441,127 @@
* after our Reg_memb is delivered. */
G_handle_trans_memb( Trans_memb, Trans_memb_id );
}
-
- Message_Dec_Refcount(msg);
+
break;
}
}
static void G_compute_and_notify()
{
- group *grp, *new_grp, *orig_grp;
- member *mbr;
- int changed;
- int ret;
- int vs_bytes;
- char *num_vs_ptr; /* num members in virtual-synchrony/failure-atomicity set */
- int32 num_vs;
- int num_exist;
- struct worklist *indices[MAX_PROCS_RING];
- int num_bytes;
- message_link *mess_link;
- message_header *head_ptr;
- message_obj *msg;
- int needed;
- char proc_name[MAX_PROC_NAME];
- char private_name[MAX_PRIVATE_NAME+1];
- int ses;
- int i;
- Skiplist work;
+ group *grp;
+ struct skiplistnode *giter;
+ int ret;
+ groups_message_link *grp_mlink;
+ message_link *mess_link;
+ synced_set sset;
- Alarm( GROUPS, "G_compute_and_notify:\n");
- /* Compute groups structure in Work from gathered messages and clear messages */
+ Alarmp( SPLOG_INFO, GROUPS, "G_compute_and_notify:\n");
+ /* Add contents of groups messages from other synced sets to my GroupsList,
+ * from gathered messages. Then discard messages */
- sl_init(&work);
- sl_set_compare(&work, G_work_groups_comp, G_work_groups_keycomp);
-
- for( i=0; i < Num_mess_gathered; i++ )
+ for( grp_mlink = Gathered.next; grp_mlink != NULL; )
{
- struct worklist *tp;
- tp = (struct worklist *)Mem_alloc(sizeof(struct worklist));
- tp->groups=NULL;
- mess_link = Gathered.next;
- Gathered.next = mess_link->next;
- ret = G_mess_to_groups( mess_link, tp->name, tp );
- if( ret < 0 )
- Alarm( EXIT, "G_compute_and_notify: G_mess_to_groups errored %d\n",
- ret );
- Sess_dispose_message( mess_link );
- if ( !sl_insert(&work, tp) )
+ for( mess_link = grp_mlink->first; mess_link != NULL; )
{
- Alarm(EXIT, "G_compute_and_notify: Failed to insert worklist (%s) into work\n", tp->name);
+ ret = G_mess_to_groups( mess_link, &sset );
+ if( ret < 0 )
+ Alarmp( SPLOG_FATAL, GROUPS, "G_compute_and_notify:"
+ " G_mess_to_groups errored %d\n", ret );
+ grp_mlink->first = mess_link->next;
+ Sess_dispose_message( mess_link );
+ mess_link = grp_mlink->first;
}
+ G_add_to_synced_set( &sset );
+ Gathered.next = grp_mlink->next;
+ dispose( grp_mlink );
+ grp_mlink = Gathered.next;
}
- /*
- * for every sorted group name:
- * Join the member lists to one list in Groups with a vs set.
- * If the group has changed (*)
- * Set new gid
- * notify all local members: non-new get vs set, new get self.
- * cancel new mark.
- * dispose of this group is all of Work.
- *
- * Note: group has changed unless all of this hold:
- * - everyone has the same gid
- * - gid is not changed (-1)
- */
-
- for( num_exist = G_smallest_group_indices( &work, indices ) ;
- num_exist > 0 ;
- num_exist = G_smallest_group_indices( &work, indices ) )
- {
- struct skiplistnode *top_iter;
- group *this_group;
- /* prepare vs set */
- vs_bytes = 0;
- num_vs_ptr = &Temp_buf[0];
- vs_bytes+= sizeof( int32 );
- num_vs = 0;
-
- changed = 0;
- orig_grp = NULL;
- this_group = (group *)(sl_getlist(indices[0]->groups)->data);
- assert( NULL != this_group );
- orig_grp = sl_find( &GroupsList, this_group->name, &top_iter);
-
- if( orig_grp == NULL )
- {
- new_grp = new( GROUP );
- memset( new_grp->name, 0, MAX_GROUP_NAME );
- strcpy( new_grp->name, this_group->name );
-
- new_grp->grp_id = this_group->grp_id;
+ G_print_synced_set( SPLOG_INFO, &MySyncedSet, "G_compute_and_notify" );
- new_grp->num_members = 0;
- sl_init( &new_grp->MembersList );
- sl_set_compare( &new_grp->MembersList,
- G_member_recordcompare,
- G_member_keycompare);
- new_grp->num_local = 0;
+ /* At this point, our GroupsList is complete, as is our synced_set. */
- sl_insert( &GroupsList, new_grp );
- Num_groups++;
- GlobalStatus.num_groups = Num_groups;
- orig_grp = new_grp;
- }else{
- /* free members but keep local mbox */
- sl_remove_all( &orig_grp->MembersList, dispose );
- orig_grp->num_members = 0;
- }
-
- for( i=0 ; i < num_exist; i++ )
- {
- group *currentgroup;
- currentgroup =
- (group *)sl_getlist(indices[i]->groups)->data;
- if( G_id_is_equal( orig_grp->grp_id, currentgroup->grp_id ) )
- {
- struct skiplistnode *iter;
- Skiplist *currentmembers;
- currentmembers = ¤tgroup->MembersList;
- iter = sl_getlist(currentmembers);
- assert(iter != NULL); /* memberlist in Groups message should never be empty */
- for( mbr = iter->data;
- mbr != NULL;
- mbr = sl_next(currentmembers, &iter))
- {
- /* add this non-new member to vs */
- memcpy( &Temp_buf[vs_bytes], mbr->private_name, MAX_GROUP_NAME );
- vs_bytes += MAX_GROUP_NAME;
- num_vs++;
- }
- }else{
- /* not the same grp_id */
- changed = 1;
- }
- /* in any way, mbr points here to the last member */
- /* chain these members */
-
- sl_concat(&orig_grp->MembersList,
- ¤tgroup->MembersList);
- orig_grp->num_members = orig_grp->MembersList.size;
-
- /* free this Work group */
- sl_destruct(¤tgroup->MembersList, dispose);
- sl_remove(indices[i]->groups, currentgroup, dispose);
- }
-
- memcpy( num_vs_ptr, &num_vs, sizeof( int32 ) ); /* *num_vs_ptr = current count; */
-
- /* now our orig_grp is almost updated */
- grp = orig_grp;
-
- if( grp->changed ) changed = 1;
-
- if( !changed ) continue;
-
- /* the group has changed */
- Alarm( GROUPS, "G_compute_and_notify: completed group %s.\n", grp->name );
- Alarm( DEBUG, "G_compute_and_notify: changing group id from: " );
- G_print_group_id( grp->grp_id );
- grp->grp_id.memb_id = Reg_memb_id;
- grp->grp_id.index = 1;
- grp->changed = 0;
- Alarm( DEBUG, " to: " );
- G_print_group_id( grp->grp_id );
- Alarm( DEBUG, "\n" );
+ giter = sl_getlist( &GroupsList );
+ grp = (giter) ? (group *)giter->data : NULL;
+ for( ; grp != NULL ; grp = sl_next( &GroupsList, &giter ) )
+ {
+ /*
+ * for every group:
+ * If the group has changed (*)
+ * Set new gid
+ * notify all local members (who came with whom)
+ *
+ * Note: the group is changed if any of the following is true:
+ * (1) It had partitioned members during the transitional period
+ * [If it lost members to transitional, or gained some
+ * because of a join from a partitioned daemon.]
+ * (2) Daemons marked with different memb_ids were included
+ * [We found new daemons with info for this group,
+ * according to G_mess_to_groups.]
+ * (3) If we got a cascading transitional that affects this group.
+ * Note: I'm not sure that this behavior is necessary, but it's
+ * consistent with all versions of Spread that I know.
+ * See GGATHER case of G_handle_trans_memb for more info.
+ */
- if( grp->num_local > 0 )
- {
- struct skiplistnode *iter;
- msg = Message_new_message();
- num_bytes = G_build_memb_buf( grp, msg, Mess_buf );
- head_ptr = Message_get_message_header(msg);
-
- head_ptr->type |= CAUSED_BY_NETWORK ;
-
- /* notify non-new local members */
- memcpy( &Mess_buf[num_bytes], Temp_buf, vs_bytes );
- head_ptr->data_len += vs_bytes;
-
- mess_link = new( MESSAGE_LINK );
- Message_Buffer_to_Message_Fragments( msg, Mess_buf, num_bytes + vs_bytes);
- mess_link->mess = msg;
- Obj_Inc_Refcount(mess_link->mess);
- needed = 0;
- iter = sl_getlist(&grp->MembersList);
- for( mbr = iter->data;
- mbr != NULL;
- mbr = sl_next(&grp->MembersList, &iter))
- {
- if( Is_new_member( mbr->status ) ) continue;
- if( mbr->proc_id != My.id ) continue;
-
- G_private_to_names( mbr->private_name, private_name, proc_name );
- ses = Sess_get_session( private_name );
- if( ses < 0 ) Alarm( EXIT, "G_compute_and_notify: no session for %s\n", private_name);
-
- if( Is_memb_session( Sessions[ ses ].status ) )
- Sess_write( ses, mess_link, &needed );
- }
- if( !needed ) Sess_dispose_message( mess_link );
- Message_Dec_Refcount(msg);
- }
+ if( !grp->changed )
+ continue;
+ /* the group has changed */
+ grp->grp_id.memb_id = Reg_memb_id;
+ grp->grp_id.index = 1;
+ grp->changed = 0;
+ if( grp->num_local > 0 )
+ G_send_heavyweight_memb( grp );
+ G_update_daemon_memb_ids( grp );
}
+ Gathered.complete = 0;
Num_mess_gathered = 0;
Num_daemons_gathered = 0;
/* We're going back to GOP... destroy our groups messages. */
- G_empty_groups_bufs();
+ G_discard_groups_bufs();
+ Groups_bufs_fresh = 0;
- /* Finish freeing the memory in our worklists */
- {
- struct worklist *worklist;
- struct skiplistnode *iter;
-
- iter = sl_getlist(&work);
- worklist = (iter)?iter->data:NULL;
-
- while( worklist != NULL ) {
- assert( worklist->groups->size == 0 );
- dispose( worklist->groups );
- worklist = sl_next(&work, &iter);
- }
- }
-
- sl_destruct( &work, dispose );
-
G_print();
}
-static int G_smallest_group_indices( Skiplist *work, struct worklist *indices[] )
-{
- /*
- * this function searches the Work structure for the smallest
- * alphabetically ordered group name. It stores
- * all of the occurences of that group in the indices array,
- * and returns the number of occurences.
- */
- int num_exist;
- int cmp;
- struct worklist *worklist;
- Skiplist *groups;
- struct skiplistnode *iter;
-
- iter = sl_getlist(work);
- worklist = (iter)?iter->data:NULL;
- num_exist = 0;
- if(!worklist) {
- return 0;
- }
- /* set indices[0] to first worklist with any groups */
- do {
- if ( worklist->groups->size == 0 )
- {
- worklist = sl_next(work, &iter);
- } else {
- indices[0] = worklist;
- num_exist = 1;
- break;
- }
- } while ( worklist != NULL );
-
- if(!worklist) {
- /* All worklist groups are empty (no daemons have any alive groups) */
- return 0;
- }
-
- worklist = sl_next( work, &iter );
- /* Check rest of worklists for any with earlier groups or the same first group as indices[0] */
- while ( worklist != NULL )
- {
- group *first, *current;
-
- groups = worklist->groups;
- if( groups->size == 0 )
- {
- worklist = sl_next(work, &iter);
- continue;
- }
- first = (group *)(sl_getlist(indices[0]->groups)->data);
- current = (group *)(sl_getlist(groups)->data);
- cmp = strcmp( first->name, current->name );
- if( cmp == 0 )
- {
- indices[num_exist] = worklist;
- num_exist++;
- }else if( cmp > 0 ){
- indices[0] = worklist;
- num_exist = 1;
- }
- worklist = sl_next(work, &iter);
- }
- return( num_exist );
-}
-
-static int G_id_is_equal( group_id g1, group_id g2 )
-{
- if( g1.index == g2.index && Memb_is_equal( g1.memb_id, g2.memb_id ) )
- return( 1 );
- else return( 0 );
-}
+/* Commented out -- not currently needed
+ *static int G_id_is_equal( group_id g1, group_id g2 )
+ *{
+ * if( g1.index == g2.index && Memb_is_equal( g1.memb_id, g2.memb_id ) )
+ * return( 1 );
+ * else return( 0 );
+ *}
+ */
static group *G_get_group( char *group_name )
{
struct skiplistnode *iter;
-
return sl_find( &GroupsList, group_name, &iter );
}
-static member *G_get_member( group *grp, char *private_group_name )
+static daemon_members *G_get_daemon( group *grp, int32u proc_id ) {
+ struct skiplistnode *iter;
+ assert(grp);
+ return sl_find( &grp->DaemonsList, &proc_id, &iter );
+}
+
+static member *G_get_member( daemon_members *dmn, char *private_group_name )
{
struct skiplistnode *iter;
-
- return sl_find( &grp->MembersList, private_group_name, &iter );
+ assert(dmn);
+ return sl_find( &dmn->MembersList, private_group_name, &iter );
}
-static message_link *G_build_trans_mess( group *grp )
+static message_link *G_build_trans_mess( group *grp )
{
/*
* This routine builds a ready-to-be-sent transitional message signal
* to the members of the process group grp
*/
+ /* FIXME: the documentation says the gid field isn't there. Should
+ * it be removed? */
+
message_link *mess_link;
scatter *scat;
message_header *head_ptr;
@@ -1817,34 +1582,41 @@
return( mess_link );
}
-static int G_build_memb_buf( group *grp, message_obj *msg, char buf[])
-{
- int num_bytes;
- message_header *head_ptr;
- char *gid_ptr;
- member *mbr;
- struct skiplistnode *iter;
- char *memb_ptr;
-
+/* The buffer built needs to be deterministic and ordered according first
+ * to daemon order in conf, second by member name. */
+static int G_build_memb_buf( group *grp, message_obj *msg, char buf[], int32 caused )
+{
+ int num_bytes;
+ message_header *head_ptr;
+ char *gid_ptr;
+ member *mbr;
+ daemon_members *dmn;
+ struct skiplistnode *diter, *iter;
+ char *memb_ptr;
head_ptr = Message_get_message_header(msg);
head_ptr->type = REG_MEMB_MESS;
head_ptr->type = Set_endian( head_ptr->type );
+ head_ptr->type |= caused;
head_ptr->hint = Set_endian( 0 );
memcpy( head_ptr->private_group_name, grp->name, MAX_GROUP_NAME );
head_ptr->num_groups = grp->num_members;
head_ptr->data_len = sizeof( group_id );
num_bytes = 0;
- iter = sl_getlist( &grp->MembersList );
- mbr = (iter)?(member *)iter->data:NULL;
- for( ; mbr != NULL ; mbr = sl_next( &grp->MembersList, &iter ) )
- {
- memb_ptr = (char *)&buf[num_bytes];
- num_bytes += MAX_GROUP_NAME;
- memcpy( memb_ptr, mbr->private_name, MAX_GROUP_NAME );
- }
-
+ diter = sl_getlist( &grp->DaemonsList );
+ dmn = (diter) ? (daemon_members *)diter->data : NULL;
+ for( ; dmn != NULL ; dmn = sl_next( &grp->DaemonsList, &diter ) )
+ {
+ iter = sl_getlist( &dmn->MembersList );
+ mbr = (iter) ? (member *)iter->data : NULL;
+ for( ; mbr != NULL ; mbr = sl_next( &dmn->MembersList, &iter ) )
+ {
+ memb_ptr = &buf[num_bytes];
+ num_bytes += MAX_GROUP_NAME;
+ memcpy( memb_ptr, mbr->name, MAX_GROUP_NAME );
+ }
+ }
gid_ptr = &buf[num_bytes];
num_bytes += sizeof( group_id );
memcpy( gid_ptr, &grp->grp_id, sizeof(group_id) );
@@ -1853,62 +1625,162 @@
}
-static int G_build_memb_vs_buf( group *grp, message_obj *msg, char buf[], int32 caused )
+static int G_build_memb_vs_buf( group *grp, message_obj *msg, char buf[], int32 caused, member *joiner )
{
/*
* This routine builds the memb buffer message, including a virtual synchrony
- * (failure atomicity) part with a set which contains only the established members
- * in the group membership.
+ * (failure atomicity) part with a set of vs_sets (ordered deterministically by the
+ * daemon membership IDs). Each vs_set specifies a set of members (ordered deterministically
+ * by daemon order in conf, then by private group name) with the property that the members
+ * listed are either virtually syncrhonous with each other, or crashed.
+ *
+ * Partitioned daemons get singleton sets, as do new joiners in the case of
+ * a join delivered during a transitional period for a changed group. That is, we provide
+ * all the information we have, which is that the members at a given daemon are together.
*
- * Note that in leave and disconnect we provide the member that left or
- * got disconnected in the vs_set. Therefore, caused will always be CAUSED_BY_NETWORK.
+ * Note that in (non-GTRANS/changed) join, leave, and disconnect we provide the member
+ * that joined, left, or got disconnected in the vs_set. Therefore, caused will always be
+ * CAUSED_BY_NETWORK.
+ * The joiner should be NULL, except in the case of a join during GTRANS for a group
+ * that has some partitioned daemons.
*/
- int num_bytes;
- message_header *head_ptr;
- char *num_vs_ptr; /* num members in virtual-synchrony/failure-atomicity set */
- struct skiplistnode *iter;
- member *mbr;
- char *membs_ptr;
- int32 num_vs;
-
- num_bytes = G_build_memb_buf( grp, msg, buf);
- head_ptr = Message_get_message_header(msg);
-
- head_ptr->type = head_ptr->type | caused;
+/* The buffer constructed should have two regions, as exposed to the user:
+ * groups array (ordered by daemon order in conf, then by member name)
+ * data
+ *
+ * The data portion should look like the following:
+ * group id (group_id)
+ * number of vs sets (int32u)
+ * offset to the vs set for the member this is sent to (This is a byte offset into
+ * the vs_set region. Could do just vs_set number, but that would be slower in the
+ * [assumed to be] common case that people just want their set.) (int32u)
+ * vs sets (ordered by group id, with partitioned daemons singleton, and joiner last)
+ *
+ * Each vs set looks like:
+ * number of members (int32u)
+ * members (ordered by daemon order in conf, then by member name) (array of group names)
+ */
- num_vs_ptr = &buf[num_bytes];
- num_bytes += sizeof( int32 );
- head_ptr->data_len += sizeof( int32 );
- num_vs = 0;
-
- iter = sl_getlist( &grp->MembersList );
- mbr = (iter)?(member *)iter->data:NULL;
- for( ; mbr != NULL ; mbr = sl_next( &grp->MembersList, &iter ) )
- {
- if( Is_established_member( mbr->status ) )
- {
- membs_ptr = (char *)&buf[num_bytes];
- memcpy( membs_ptr, mbr->private_name, MAX_GROUP_NAME );
- num_vs++ ;
- num_bytes += MAX_GROUP_NAME;
+ int num_bytes;
+ message_header *head_ptr;
+ char *vs_set_region_ptr; /* int32u */
+ char *num_vs_sets_ptr; /* int32u */
+ int32u num_vs_sets;
+ char *local_vs_set_offset_ptr; /* int32u */
+ int32u local_vs_set_offset;
+
+ char *curr_vs_set_size_ptr; /* int32u */
+ int32u curr_vs_set_size;
+ membership_id curr_vs_set_memb_id;
+
+ struct skiplistnode *diter, *iter;
+ daemon_members *dmn;
+ member *mbr;
+ char *membs_ptr;
+ Skiplist temp;
+ int needed;
+ int found_joiner = 0;
+
+ num_bytes = G_build_memb_buf( grp, msg, buf, caused );
+ head_ptr = Message_get_message_header(msg);
+
+ num_vs_sets_ptr = &buf[num_bytes];
+ num_bytes += sizeof( int32u );
+ head_ptr->data_len += sizeof( int32u );
+ num_vs_sets = 0;
+
+ local_vs_set_offset_ptr = &buf[num_bytes];
+ num_bytes += sizeof( int32u );
+ head_ptr->data_len += sizeof( int32u );
+ /* This function is only called if I have local members. So, if the offset
+ * isn't found, then the joiner is my member. */
+ local_vs_set_offset = 0;
+
+ /* Points to the front of the vs_sets */
+ vs_set_region_ptr = &buf[num_bytes];
+
+ /* Basically, use the skiplist to do an insertion sort. */
+ sl_init(&temp);
+ sl_set_compare( &temp, G_daemon_vs_set_recordcompare, G_daemon_keycompare );
+ /* FIXME: should I limit the height of the skiplist to 1, or not? */
+ /* temp.preheight = 1; */
+ diter = sl_getlist( &grp->DaemonsList );
+ dmn = (diter) ? (daemon_members *)diter->data : NULL;
+ for( ; dmn != NULL; dmn = sl_next( &grp->DaemonsList, &diter ) )
+ sl_insert( &temp, dmn );
+
+ curr_vs_set_memb_id = unknown_memb_id;
+ curr_vs_set_size_ptr = NULL;
+
+ diter = sl_getlist( &temp );
+ dmn = (diter) ? (daemon_members *)diter->data : NULL;
+ for( ; dmn != NULL; dmn = sl_next( &temp, &diter ) ) {
+ needed = 0;
+ if( Is_unknown_memb_id(&curr_vs_set_memb_id) ||
+ !Memb_is_equal( curr_vs_set_memb_id, dmn->memb_id ) )
+ needed = 1;
+ if( needed ) {
+ num_vs_sets++;
+ curr_vs_set_memb_id = dmn->memb_id;
+ curr_vs_set_size_ptr = &buf[num_bytes];
+ num_bytes += sizeof(int32u);
+ head_ptr->data_len += sizeof(int32u);
+ curr_vs_set_size = 0;
+ }
+ if( dmn->proc_id == My.id ) {
+ if( local_vs_set_offset != 0 )
+ Alarmp( SPLOG_FATAL, GROUPS, "G_build_memb_vs_buf: Found my vs set twice for group %s\n",
+ grp->name );
+ local_vs_set_offset = curr_vs_set_size_ptr - vs_set_region_ptr;
+ memcpy( local_vs_set_offset_ptr, &local_vs_set_offset, sizeof(int32u) );
+ }
+ iter = sl_getlist( &dmn->MembersList );
+ mbr = (iter) ? (member *)iter->data : NULL;
+ for( ; mbr != NULL ; mbr = sl_next( &dmn->MembersList, &iter ) )
+ {
+ /* Handle changed-group join during transitional. The joiner does not
+ * get to be listed with everyone else from his daemon, but rather at
+ * the end, in a self vs set. */
+ if( joiner != NULL && !found_joiner )
+ if( strcmp( joiner->name, mbr->name ) == 0 )
+ {
+ found_joiner = 1;
+ continue;
+ }
+ membs_ptr = &buf[num_bytes];
+ num_bytes += MAX_GROUP_NAME;
head_ptr->data_len += MAX_GROUP_NAME;
+ memcpy( membs_ptr, mbr->name, MAX_GROUP_NAME );
+ curr_vs_set_size++;
}
+ /* Every time we finish one daemon, update the size of the current vs_set */
+ memcpy( curr_vs_set_size_ptr, &curr_vs_set_size, sizeof(int32u) );
}
- memcpy( num_vs_ptr, &num_vs, sizeof( int32 ) ); /* *num_vs_ptr = total count; */
+ if( joiner != NULL ) {
+ if( !found_joiner )
+ Alarmp( SPLOG_FATAL, GROUPS, "G_build_memb_vs_buf: Expected to find joining member %s.\n",
+ joiner->name );
+ num_vs_sets++;
+ curr_vs_set_size_ptr = &buf[num_bytes];
+ num_bytes += sizeof(int32u);
+ head_ptr->data_len += sizeof(int32u);
+ curr_vs_set_size = 1;
+ memcpy( curr_vs_set_size_ptr, &curr_vs_set_size, sizeof(int32u) );
+ membs_ptr = &buf[num_bytes];
+ memcpy( membs_ptr, joiner->name, MAX_GROUP_NAME );
+ num_bytes += MAX_GROUP_NAME;
+ head_ptr->data_len += MAX_GROUP_NAME;
+ }
+ /* Make sure we don't leak memory before the stack gets freed and takes
+ * the skiplist with it. We don't actually want to free the daemons. */
+ sl_destruct( &temp, (FreeFunc)NULL );
+ memcpy( num_vs_sets_ptr, &num_vs_sets, sizeof(int32u) );
return( num_bytes );
}
-static void G_stamp_groups_buf( char buf[] )
-{
- char *memb_id_ptr;
- memb_id_ptr = buf;
- memcpy( memb_id_ptr, &Reg_memb_id, sizeof( membership_id ) );
-}
-
-/* This function used to be called G_refresh_groups_msg. */
-static void G_build_groups_msg_hdr( message_obj *msg, int groups_bytes )
+static void G_build_groups_msg_hdr( message_obj *msg, int groups_bytes )
{
message_header *head_ptr;
@@ -1917,91 +1789,261 @@
head_ptr->type = Set_endian( head_ptr->type );
head_ptr->hint = Set_endian( 0 );
memset(head_ptr->private_group_name, 0, MAX_GROUP_NAME);
+ /* Note: this copy uses at most 20 bytes (including terminating NULL),
+ * because proc.name is limited to MAX_PROC_NAME. */
strcpy( head_ptr->private_group_name, My.name );
+ /* The last 4 bytes of the private group name field are overridden to hold
+ * the size of the synced set of this daemon. This way, G_handle_groups
+ * doesn't have to mess around with the data region of the groups messages. */
+ memcpy( &head_ptr->private_group_name[MAX_GROUP_NAME-sizeof(int32u)],
+ &(MySyncedSet.size), sizeof(int32u) );
head_ptr->num_groups = 0;
head_ptr->data_len = groups_bytes;
}
-/* This function guarantees that each group's data appears in only one buffer in
+/* This function guarantees that each daemon's data about a given group appears in only one buffer in
* a sequence, and that the sorted order is preserved from the GroupsList. */
-static int G_build_groups_buf(char buf[], struct skiplistnode **iter_ptr)
+static int G_build_groups_buf( char buf[], struct skiplistnode **giter_ptr, struct skiplistnode **diter_ptr )
{
- int num_bytes;
+ int num_bytes;
+
char *memb_id_ptr;
+ char *flag_ptr; /* char, only need 1 bit, really */
+ char *synced_set_size_ptr; /* int32u */
+ char *synced_set_procs_ptr; /* int32 */
+
group *grp;
- char *gid_ptr;
+ daemon_members *dmn;
+ char *proc_id_ptr; /* int32 */
+ char *dmn_memb_id_ptr; /* membership_id */
member *mbr;
- struct skiplistnode *giter, *iter;
- char *num_memb_ptr;
- int16 num_memb;
+ struct skiplistnode *giter, *diter, *iter;
+ char *num_dmns_ptr; /* int16u */
+ int16u num_dmns;
+ char *num_memb_ptr; /* int16u */
+ int16u num_memb;
char *memb_ptr;
- int size_for_this_group;
- num_bytes = 0;
+ int32u size_needed;
+ int couldnt_fit_daemon;
+
+ /* A GROUPS message looks like this:
+ * (Representative's name is in header, so we can get his proc id)
+ * This is necessary, because we store received GROUPS messages by
+ * synced set, to more easily recognize which to keep.
+ * Membership id
+ * flag (1 if first message from set, 0 else) (char)
+ * if flag is 1
+ * size of synced set (int32u)
+ * proc ids of synced set represented by this daemon (int32*size)
+ * For each group:
+ * group name (repeated for each message it appears in) (MAX_GROUP_NAME)
+ * group_id at the representative
+ * [Either there is only one ID for the group (i.e. the group is not
+ * changed in any respect by this membership) or this ID can be
+ * discarded. This is here so that daemons that don't know about
+ * the group at all can get the correct ID in the unchanged case.]
+ * number of daemons for this group (in this message) (int16u)
+ * For each daemon:
+ * daemon proc id (int32)
+ * memb id at daemon (membership_id)
+ * number of local members at daemon (int16u)
+ * For each local member at daemon
+ * member's private group name (MAX_GROUP_NAME)
+ */
+
+ num_bytes = 0;
memb_id_ptr = &buf[num_bytes];
- num_bytes += sizeof( membership_id );
+ num_bytes += sizeof( membership_id );
memcpy( memb_id_ptr, &Reg_memb_id, sizeof( membership_id ) );
- giter = (*iter_ptr) ? (*iter_ptr) : (sl_getlist( &GroupsList ));
+ flag_ptr = &buf[num_bytes];
+ num_bytes += sizeof(char);
+ if( *giter_ptr )
+ Set_later_message(flag_ptr);
+ else
+ Set_first_message(flag_ptr);
- grp = (giter)?(group *)giter->data:NULL;
+ if( Is_first_message(flag_ptr) )
+ {
+ synced_set_size_ptr = &buf[num_bytes];
+ num_bytes += sizeof(int32u);
+ memcpy( synced_set_size_ptr, &(MySyncedSet.size), sizeof(int32u) );
+ synced_set_procs_ptr = &buf[num_bytes];
+ num_bytes += MySyncedSet.size * sizeof(int32);
+ memcpy( synced_set_procs_ptr, &MySyncedSet.proc_ids, MySyncedSet.size*sizeof(int32) );
+ }
+
+ /* Resume where we left off in the GroupsList */
+ couldnt_fit_daemon = 0;
+ giter = (*giter_ptr) ? (*giter_ptr) : (sl_getlist( &GroupsList ));
+ grp = (giter) ? (group *) giter->data : NULL;
for( ; grp != NULL ; grp = sl_next( &GroupsList, &giter ) )
{
- if( grp->num_local == 0 ) continue;
-
- size_for_this_group = MAX_GROUP_NAME + sizeof(group_id) + sizeof(int16) +
- (grp->num_local * MAX_GROUP_NAME);
- /* This requires that the number of local group members be limited. */
- if( size_for_this_group > GROUPS_BUF_SIZE - num_bytes ) break;
+ /* To have information about this group, we need to be able to fit
+ * its name, ID, and the number of daemons it has in this message. */
+ size_needed = MAX_GROUP_NAME + sizeof(group_id) + sizeof(int16u);
+ if( size_needed > GROUPS_BUF_SIZE - num_bytes ) break;
memcpy( &buf[num_bytes], grp->name, MAX_GROUP_NAME );
num_bytes += MAX_GROUP_NAME;
-
- gid_ptr = &buf[num_bytes];
- num_bytes += sizeof( group_id );
- memcpy( gid_ptr, &grp->grp_id, sizeof(group_id) );
-
- num_memb_ptr = &buf[num_bytes];
- num_bytes += sizeof( int16 );
- num_memb = 0;
-
- iter = sl_getlist( &grp->MembersList );
- mbr = (iter)?(member *)iter->data:NULL;
- for( ; mbr != NULL ; mbr = sl_next( &grp->MembersList, &iter ) )
- {
- /* collect local members */
- if( mbr->proc_id != My.id ) continue;
- memb_ptr = (char *)&buf[num_bytes];
- num_bytes += MAX_GROUP_NAME;
- memcpy( memb_ptr, mbr->private_name, MAX_GROUP_NAME );
- num_memb++;
- }
- memcpy(num_memb_ptr, &num_memb, sizeof( int16 ) );
- if( num_memb != grp->num_local )
- Alarm( EXIT, "G_build_groups_buf: group %s has %d %d members\n",
- grp->name, num_memb, grp->num_local );
-
+
+ memcpy( &buf[num_bytes], &grp->grp_id, sizeof(group_id) );
+ num_bytes += sizeof(group_id);
+
+ num_dmns_ptr = &buf[num_bytes];
+ num_bytes += sizeof(int16u);
+ num_dmns = 0;
+
+ diter = (*diter_ptr) ? (*diter_ptr) : (sl_getlist( &grp->DaemonsList ));
+ dmn = (diter) ? (daemon_members *) diter->data : NULL;
+ for( ; dmn != NULL; dmn = sl_next( &grp->DaemonsList, &diter ) )
+ {
+ /* To store this daemon's information about the current group,
+ * we need to be able to store its proc_id, memb_id, number of
+ * local members, and the private group names of its local members. */
+ size_needed = sizeof(int32) + sizeof(membership_id) + sizeof(int16u) +
+ (dmn->MembersList.size * MAX_GROUP_NAME);
+ /* This requires that the number of local group members be limited. */
+ if( size_needed > GROUPS_BUF_SIZE - num_bytes )
+ {
+ couldnt_fit_daemon = 1;
+ break;
+ }
+ proc_id_ptr = &buf[num_bytes];
+ num_bytes += sizeof(int32);
+ memcpy( proc_id_ptr, &dmn->proc_id, sizeof(int32) );
+
+ dmn_memb_id_ptr = &buf[num_bytes];
+ num_bytes += sizeof(membership_id);
+ memcpy( dmn_memb_id_ptr, &grp->grp_id.memb_id, sizeof(membership_id) );
+
+ num_memb_ptr = &buf[num_bytes];
+ num_bytes += sizeof(int16u);
+ num_memb = 0;
+
+ iter = sl_getlist( &dmn->MembersList );
+ mbr = (iter) ? (member *) iter->data : NULL;
+ for( ; mbr != NULL ; mbr = sl_next( &dmn->MembersList, &iter ) )
+ {
+ /* Add to the buffer all group members from this daemon. */
+ memb_ptr = &buf[num_bytes];
+ num_bytes += MAX_GROUP_NAME;
+ memcpy( memb_ptr, mbr->name, MAX_GROUP_NAME );
+ num_memb++;
+ }
+ memcpy( num_memb_ptr, &num_memb, sizeof(int16u) );
+
+ if( num_memb != dmn->MembersList.size )
+ Alarmp( SPLOG_FATAL, GROUPS, "G_build_groups_buf: group %s has %d %d members\n",
+ grp->name, num_memb, dmn->MembersList.size );
+ num_dmns++;
+ }
+ memcpy( num_dmns_ptr, &num_dmns, sizeof(int16u) );
+ if( couldnt_fit_daemon )
+ break;
}
- *iter_ptr = giter;
+ *giter_ptr = giter;
+ *diter_ptr = diter;
return( num_bytes );
}
-static int G_mess_to_groups( message_link *mess_link, char *name, struct worklist *work )
+static void G_build_new_groups_bufs()
+{
+ struct skiplistnode *passed_giter, *passed_diter;
+ groups_buf_link *grps_buf_link;
+
+ passed_giter = NULL;
+ passed_diter = NULL;
+ do {
+ grps_buf_link = new( GROUPS_BUF_LINK );
+ grps_buf_link->next = Groups_bufs;
+ Groups_bufs = grps_buf_link;
+ grps_buf_link->bytes = G_build_groups_buf( grps_buf_link->buf,
+ &passed_giter, &passed_diter );
+ } while( passed_giter != NULL );
+}
+
+/* This function used to be called G_refresh_groups_msg. */
+static void G_stamp_groups_bufs()
+{
+ groups_buf_link *curr;
+ char *memb_id_ptr;
+ for( curr = Groups_bufs; curr; curr = curr->next )
+ {
+ memb_id_ptr = curr->buf;
+ memcpy( memb_id_ptr, &Reg_memb_id, sizeof( membership_id ) );
+ }
+}
+
+static void G_discard_groups_bufs()
+{
+ groups_buf_link *next;
+
+ for( ; Groups_bufs; Groups_bufs = next )
+ {
+ next = Groups_bufs->next;
+ dispose( Groups_bufs );
+ }
+ return;
+}
+
+static int G_send_groups_messages()
+{
+ groups_buf_link *grps_buf_link;
+ down_link *down_ptr;
+ message_obj *msg;
+ message_header *head_ptr;
+ int i = 0;
+
+ for( grps_buf_link = Groups_bufs; grps_buf_link != NULL; grps_buf_link = grps_buf_link->next ) {
+ msg = Message_new_message();
+ G_build_groups_msg_hdr( msg, grps_buf_link->bytes );
+ head_ptr = Message_get_message_header(msg);
+ if( grps_buf_link->next )
+ head_ptr->type |= RELIABLE_MESS;
+ else
+ head_ptr->type |= AGREED_MESS;
+ Message_Buffer_to_Message_Fragments( msg, grps_buf_link->buf, grps_buf_link->bytes );
+
+ down_ptr = Prot_Create_Down_Link(msg, Message_get_packet_type(head_ptr->type), 0, 0);
+ down_ptr->mess = msg;
+ Obj_Inc_Refcount(down_ptr->mess);
+ /* Use control queue--not normal session queues */
+ Prot_new_message( down_ptr, Groups_control_down_queue );
+ Message_Dec_Refcount(msg);
+ ++i;
+ }
+ return i;
+}
+
+/* This function fills the synced set from the synced set portion of a
+ * groups message if there is one, and adds all the group membership
+ * information to the GroupsList. */
+static int G_mess_to_groups( message_link *mess_link, synced_set *sset )
{
- /* the function returns 0 for success or -1 if an error occured */
+ /* The function returns 0 for success or -1 if an error occured
+ * Right now, there are no errors that can occur. However,
+ * if we add stricter checks on the daemons, that may change. */
message_obj *msg;
+ message_header *head_ptr;
scatter *scat;
- message_header *head_ptr;
- proc p;
int num_bytes, total_bytes;
+
+ char *flag_ptr; /* char, only need 1 bit, really */
+ char *synced_set_size_ptr; /* int32u */
+ char *synced_set_procs_ptr; /* int32 */
+
group *grp;
- char *gid_ptr;
+ char *group_name_ptr;
+ int16u num_dmns;
+ daemon_members *dmn;
+ int16u num_memb;
member *mbr;
- char *num_memb_ptr;
- int16 num_memb;
- int i;
+ int i,j;
+ char ip_string[16];
total_bytes = 0;
msg = mess_link->mess;
@@ -2012,76 +2054,122 @@
total_bytes += scat->elements[i].len;
}
- num_bytes = 0;
- head_ptr = Message_get_message_header(msg);
- num_bytes += Message_get_data_header_size();
- if (0 > Conf_proc_by_name( head_ptr->private_group_name , &p ) )
- {
- Alarm( PRINT, "G_mess_to_groups: Groups message from someone (%s) not in conf\n", head_ptr->private_group_name);
- return( -1 );
- }
- work->groups = (Skiplist *)Mem_alloc(sizeof(Skiplist));
- sl_init(work->groups);
- sl_set_compare(work->groups,
- G_compare,
- G_compare);
-
- memcpy( name, head_ptr->private_group_name, MAX_GROUP_NAME );
+ num_bytes = Message_get_data_header_size();
+ head_ptr = Message_get_message_header(msg);
+ Alarmp( SPLOG_DEBUG, GROUPS, "G_mess_to_groups: message from rep %s\n", head_ptr->private_group_name );
num_bytes += sizeof( membership_id );
+ flag_ptr = &Temp_buf[num_bytes];
+ num_bytes += sizeof(char);
+ if( Is_first_message(flag_ptr) )
+ {
+ /* Populate the synced_set from the first message */
+ synced_set_size_ptr = &Temp_buf[num_bytes];
+ num_bytes += sizeof(int32u);
+ memcpy( &sset->size, synced_set_size_ptr, sizeof(int32u) );
+ if( !Same_endian( head_ptr->type ) )
+ sset->size = Flip_int32( sset->size );
+ synced_set_procs_ptr = &Temp_buf[num_bytes];
+ num_bytes += sset->size * sizeof(int32) ;
+ memcpy( &sset->proc_ids, synced_set_procs_ptr, sset->size * sizeof(int32) );
+ if( !Same_endian( head_ptr->type ) )
+ for( i = 0; i < sset->size; ++i )
+ sset->proc_ids[i] = Flip_int32( sset->proc_ids[i] );
+ }
+
+ /* Read the groups data, and insert it into the GroupsList */
for( ; num_bytes < total_bytes; )
{
- /* creating a new group */
- grp = new( GROUP );
+ group_name_ptr = &Temp_buf[num_bytes];
+ num_bytes += MAX_GROUP_NAME;
- memcpy( grp->name, &Temp_buf[num_bytes], MAX_GROUP_NAME );
- num_bytes += MAX_GROUP_NAME;
-
- sl_append( work->groups, grp );
- sl_init( &grp->MembersList );
- sl_set_compare( &grp->MembersList,
- G_member_recordcompare,
- G_member_keycompare);
-
- gid_ptr = &Temp_buf[num_bytes];
- num_bytes += sizeof( group_id );
- memcpy( &grp->grp_id, gid_ptr, sizeof(group_id) );
-
- num_memb_ptr = &Temp_buf[num_bytes];
- num_bytes += sizeof( int16 );
- memcpy( &num_memb, num_memb_ptr, sizeof( int16 ) );
-
- if( !Same_endian( head_ptr->type ) )
- {
- /* Flip group id */
- grp->grp_id.memb_id.proc_id = Flip_int32( grp->grp_id.memb_id.proc_id );
- grp->grp_id.memb_id.time = Flip_int32( grp->grp_id.memb_id.time );
- grp->grp_id.index = Flip_int32( grp->grp_id.index );
-
- /* flip other parts of the message */
- num_memb = Flip_int16( num_memb );
- }
- /* creating members */
- for( i=0; i < num_memb; i++ )
- {
- mbr = new( MEMBER );
-
- mbr->proc_id = p.id;
- mbr->status = ESTABLISHED_MEMBER;
- memcpy( mbr->private_name, &Temp_buf[num_bytes], MAX_GROUP_NAME );
- num_bytes += MAX_GROUP_NAME;
-
- sl_append( &grp->MembersList, mbr );
- }
+ Alarmp( SPLOG_DEBUG, GROUPS, "G_mess_to_groups: group %s\n", group_name_ptr );
+ /* Create a group if necessary, and be careful to mark as changed if needed. */
+ grp = G_get_group( group_name_ptr );
+ if( grp == NULL )
+ {
+ grp = new( GROUP );
+ memset( grp->name, 0, MAX_GROUP_NAME );
+ strcpy( grp->name, group_name_ptr );
+ sl_init( &grp->DaemonsList );
+ sl_set_compare( &grp->DaemonsList,
+ G_daemon_recordcompare,
+ G_daemon_keycompare );
+ grp->changed = 0;
+ grp->num_members = 0;
+ grp->num_local = 0;
+ sl_insert( &GroupsList, grp );
+ Num_groups++;
+ GlobalStatus.num_groups = Num_groups;
+ /* Set a group id here, so that if the group isn't changed,
+ * everyone will have the right ID (because all must have same). */
+ memcpy( &grp->grp_id, &Temp_buf[num_bytes], sizeof(group_id) );
+ if( !Same_endian( head_ptr->type ) )
+ {
+ /* Flip group id */
+ grp->grp_id.memb_id.proc_id = Flip_int32( grp->grp_id.memb_id.proc_id );
+ grp->grp_id.memb_id.time = Flip_int32( grp->grp_id.memb_id.time );
+ grp->grp_id.index = Flip_int32( grp->grp_id.index );
+ }
+ }
+ num_bytes += sizeof(group_id);
- grp->num_members = num_memb;
- memcpy( num_memb_ptr, &num_memb, sizeof( int16 ) );
+ memcpy( &num_dmns, &Temp_buf[num_bytes], sizeof(int16u) );
+ Alarmp( SPLOG_DEBUG, GROUPS, "G_mess_to_groups: \twith %u daemons\n", num_dmns );
+ num_bytes += sizeof(int16u);
+ if( !Same_endian( head_ptr->type ) )
+ num_dmns = Flip_int16( num_dmns );
+
+ /* For each daemon in the message for this group.
+ * Create a daemon object, and add it to the DaemonsList.
+ * Add all members to the MembersList. */
+ for( i = 0; i < num_dmns; ++i )
+ {
+ /* FIXME: If I was paranoid, I could always check here that the daemon
+ * isn't already in my GroupsList, or that it is in my conf (from Reg_memb). */
+ dmn = new( DAEMON_MEMBERS );
+ memcpy( &dmn->proc_id, &Temp_buf[num_bytes], sizeof(int32) );
+ num_bytes += sizeof(int32);
+ memcpy( &dmn->memb_id, &Temp_buf[num_bytes], sizeof(membership_id) );
+ num_bytes += sizeof(membership_id);
+ memcpy( &num_memb, &Temp_buf[num_bytes], sizeof(int16u) );
+ num_bytes += sizeof(int16u);
+ if( !Same_endian( head_ptr->type ) )
+ {
+ dmn->proc_id = Flip_int32( dmn->proc_id );
+ dmn->memb_id.proc_id = Flip_int32( dmn->memb_id.proc_id );
+ dmn->memb_id.time = Flip_int32( dmn->memb_id.time );
+ num_memb = Flip_int16( num_memb );
+ }
+ IP_to_STR( dmn->proc_id, ip_string );
+ Alarmp( SPLOG_DEBUG, GROUPS, "G_mess_to_groups: \tdaemon with proc_id %s\n", ip_string );
+ IP_to_STR( dmn->memb_id.proc_id, ip_string );
+ Alarmp( SPLOG_DEBUG, GROUPS, "G_mess_to_groups: \t\twith memb_id (%s, %d)\n",
+ ip_string, dmn->memb_id.time );
+ Alarmp( SPLOG_DEBUG, GROUPS, "G_mess_to_groups: \t\twith %u members:\n", num_memb );
+ sl_init( &dmn->MembersList );
+ sl_set_compare( &dmn->MembersList, G_compare, G_compare );
+ sl_insert( &grp->DaemonsList, dmn );
+ if( !grp->changed &&
+ !Memb_is_equal( dmn->memb_id, grp->grp_id.memb_id ) )
+ grp->changed = 1;
+ /* creating members */
+ for( j = 0; j < num_memb; ++j )
+ {
+ mbr = new( MEMBER );
+ memcpy( mbr->name, &Temp_buf[num_bytes], MAX_GROUP_NAME );
+ Alarmp( SPLOG_DEBUG, GROUPS, "G_mess_to_groups: \t\t%s\n", mbr->name );
+ num_bytes += MAX_GROUP_NAME;
+ sl_append( &dmn->MembersList, mbr );
+ }
+ grp->num_members += num_memb;
+ }
}
return( 0 );
}
-int G_analize_groups( int num_groups, char target_groups[][MAX_GROUP_NAME], int target_sessions[] )
+int G_analize_groups( int num_groups, char target_groups[][MAX_GROUP_NAME], int target_sessions[] )
{
static mailbox mbox[MAX_SESSIONS];
static mailbox current[MAX_SESSIONS];
@@ -2090,8 +2178,6 @@
int num_mbox_pre;
int num_current;
group *grp;
- member *mbr;
- struct skiplistnode *iter;
char proc_name[MAX_PROC_NAME];
char private_name[MAX_PRIVATE_NAME+1];
int found;
@@ -2126,32 +2212,12 @@
/* regular group */
grp = G_get_group( target_groups[i] );
if( grp == NULL ) continue;
- if( Gstate == GOP )
- {
- current_ptr = grp->mbox;
- num_current = grp->num_local;
- }else if( Gstate == GTRANS ){
- current_ptr = current;
- num_current = 0;
- iter = sl_getlist( &grp->MembersList );
- mbr = (iter)?(member *)iter->data:NULL;
- for( ; mbr != NULL ;
- mbr = sl_next( &grp->MembersList, &iter ) )
- {
- if( mbr->proc_id == My.id && !Is_new_member( mbr->status ) )
- {
- G_private_to_names( mbr->private_name, private_name, proc_name );
- ses = Sess_get_session( private_name );
- if( ses < 0 ) Alarm( EXIT,
- "G_analize_groups: ses is %d private_name is %s\n",
- ses, private_name );
- current[ num_current ] = Sessions[ ses ].mbox;
- num_current++;
- }
- }
+ if( Gstate == GOP || Gstate == GTRANS ) {
+ current_ptr = grp->mbox;
+ num_current = grp->num_local;
}else {
num_current = 0; /* fool compiler warnings */
- Alarm( EXIT, "G_analize_groups: Gstate is %d\n", Gstate );
+ Alarmp( SPLOG_FATAL, GROUPS, "G_analize_groups: Gstate is %d\n", Gstate );
}
}
num_mbox_pre = num_mbox;
@@ -2178,9 +2244,38 @@
for( i=0; i < num_mbox; i++ ) target_sessions[i] = Sess_get_session_index ( mbox[ i ] );
return( num_mbox );
}
+
+static void G_compute_group_mask( group *grp, char *func_name )
+{
+#if (SPREAD_PROTOCOL == 4)
+ int i;
+ int temp;
+ struct skiplistnode *iter;
+ daemon_members *dmn;
+ proc p;
+
+ for(i=0; i<4; i++)
+ {
+ grp->grp_mask[i] = 0;
+ }
+ for( iter = sl_getlist( &grp->DaemonsList ), dmn = (daemon_members *)iter->data;
+ iter != NULL;
+ dmn = (daemon_members *)sl_next( &grp->DaemonsList, &iter ))
+ {
+ Conf_proc_by_id( dmn->proc_id, &p );
+ temp = 1;
+ for( i = 0; i < p.seg_index%32; i++ )
+ {
+ temp *= 2;
+ }
+ grp->grp_mask[p.seg_index/32] |= temp;
+ }
+ Alarmp( SPLOG_INFO, GROUPS, "%s: Mask for group %s set to %x %x %x %x\n", func_name,
+ grp->name, grp->grp_mask[3], grp->grp_mask[2], grp->grp_mask[1], grp->grp_mask[0]);
+#endif
+}
-
-void G_set_mask( int num_groups, char target_groups[][MAX_GROUP_NAME], int32u *grp_mask )
+void G_set_mask( int num_groups, char target_groups[][MAX_GROUP_NAME], int32u *grp_mask )
{
group *grp;
char proc_name[MAX_PROC_NAME];
@@ -2241,14 +2336,12 @@
}
grp_mask[p.seg_index/32] |= temp;
- }else Alarm( EXIT, "G_set_mask: Gstate is %d\n", Gstate );
+ }else Alarmp( SPLOG_FATAL, GROUPS, "G_set_mask: Gstate is %d\n", Gstate );
}
}
}
-
-
-int G_private_to_names( char *private_group_name, char *private_name, char *proc_name )
+int G_private_to_names( char *private_group_name, char *private_name, char *proc_name )
{
char name[MAX_GROUP_NAME];
char *pn, *prvn;
@@ -2271,7 +2364,8 @@
( proc_name_len >= MAX_PROC_NAME ) ||
( proc_name_len < 1 ) )
{
- Alarm( GROUPS, "G_private_to_names: Illegal private_group_name (priv, proc)\n");
+ Alarmp( SPLOG_ERROR, GROUPS, "G_private_to_names: Illegal private_group_name %s (priv, proc)\n",
+ private_group_name );
return( ILLEGAL_GROUP );
}
/* start strings at actual beginning */
@@ -2295,7 +2389,8 @@
}
if( !legal_private_name )
{
- Alarm( GROUPS, "G_private_to_names: Illegal private_group_name characters (%s) (%s)\n", prvn, pn );
+ Alarmp( SPLOG_ERROR, GROUPS, "G_private_to_names: Illegal private_group_name characters (%s) (%s)\n",
+ prvn, pn );
return( ILLEGAL_GROUP );
}
/* copy name components including null termination */
@@ -2306,50 +2401,201 @@
static void G_print()
{
- group *grp;
- member *mbr;
- struct skiplistnode *giter, *iter;
- int i, j;
+ group *grp;
+ daemon_members *dmn;
+ member *mbr;
+ struct skiplistnode *giter, *diter, *iter;
+ int i, j, k;
- printf("++++++++++++++++++++++\n");
- printf("Num of groups: %d\n", Num_groups );
+ Alarmp( SPLOG_PRINT, GROUPS, "++++++++++++++++++++++\n" );
+ Alarmp( SPLOG_PRINT, GROUPS, "Num of groups: %d\n", Num_groups );
giter = sl_getlist( &GroupsList );
- grp = (giter)?(group *)giter->data:NULL;
- for( i=0; grp != NULL ; i++, grp = sl_next( &GroupsList, &giter ) )
+ grp = (giter) ? (group *)giter->data : NULL;
+ for( i = 0; grp != NULL ; i++, grp = sl_next( &GroupsList, &giter ) )
{
- printf("[%d] group %s with %d members:\n", i+1, grp->name, grp->num_members );
- iter = sl_getlist( &grp->MembersList );
- mbr = (iter)?(member *)iter->data:NULL;
- for( j=0; mbr != NULL ;
- j++, mbr = sl_next( &grp->MembersList, &iter ) )
- {
- printf("\t[%d] %s\n", j+1, mbr->private_name );
- }
- printf("----------------------\n");
- }
+ Alarmp( SPLOG_PRINT, GROUPS, "[%d] group %s with %d members:\n", i+1, grp->name, grp->num_members );
+ diter = sl_getlist( &grp->DaemonsList );
+ dmn = (diter) ? (daemon_members *)diter->data : NULL;
+ for( j = 0; dmn != NULL ; j++, dmn = sl_next( &grp->DaemonsList, &diter ) )
+ {
+ iter = sl_getlist( &dmn->MembersList );
+ mbr = (iter) ? (member *)iter->data : NULL;
+ for( k = 0; mbr != NULL ; k++, mbr = sl_next( &dmn->MembersList, &iter ) )
+ {
+ Alarmp( SPLOG_PRINT, GROUPS, "\t[%d] %s\n", k+1, mbr->name );
+ }
+ }
+ Alarmp( SPLOG_PRINT, GROUPS, "----------------------\n" );
+ }
}
-static void G_empty_groups_bufs()
+int G_get_num_local( char *group_name )
{
- groups_buf_link *next;
+ group *grp = G_get_group( group_name );
+ if( grp == NULL ) return 0;
+ return grp->num_local;
+}
- for( ; Groups_bufs; Groups_bufs = next )
+/* Add new members to my synced set. */
+static void G_add_to_synced_set( synced_set *sset ) {
+ synced_set temp;
+ int32u i = 0, j = 0;
+ int index_l = -1, index_r = -1;
+ proc dummy_proc;
+
+ temp.size = 0;
+ while( i < MySyncedSet.size || j < sset->size ) {
+ if( i < MySyncedSet.size && index_l == -1 ) {
+ index_l = Conf_proc_by_id( MySyncedSet.proc_ids[i], &dummy_proc );
+ if( index_l == -1 )
+ Alarmp( SPLOG_FATAL, GROUPS, "G_add_to_synced_set: proc_id %u not in conf\n",
+ MySyncedSet.proc_ids[i] );
+ }
+ if( j < sset->size && index_r == -1 ) {
+ index_r = Conf_proc_by_id( sset->proc_ids[j], &dummy_proc );
+ if( index_r == -1 )
+ Alarmp( SPLOG_FATAL, GROUPS, "G_add_to_synced_set: proc_id %u not in conf\n",
+ sset->proc_ids[j] );
+ }
+ if( ( index_l < index_r && index_l != -1 ) || index_r == -1 ) {
+ temp.proc_ids[temp.size++] = MySyncedSet.proc_ids[i++];
+ index_l = -1;
+ } else if( ( index_r < index_l && index_r != -1 ) || index_l == -1 ) {
+ temp.proc_ids[temp.size++] = sset->proc_ids[j++];
+ index_r = -1;
+ } else {
+ Alarmp( SPLOG_FATAL, GROUPS, "G_add_to_synced_set: intersection isn't empty --"
+ " equal procs %u and %u\n", MySyncedSet.proc_ids[i], sset->proc_ids[j] );
+ }
+ }
+ memcpy( &MySyncedSet, &temp, sizeof(synced_set) );
+}
+
+/* Remove members who aren't in the membership. */
+static int G_check_synced_set( synced_set *s, configuration *memb_p ) {
+ int i, j = 0, changed = 0;
+
+ for( i = 0; i < s->size; ++i )
+ if( Conf_id_in_conf( memb_p, s->proc_ids[i] ) >= 0 )
+ s->proc_ids[j++] = s->proc_ids[i];
+ /* If we lost members. */
+ if( j != s->size ) {
+ s->size = j;
+ changed = 1;
+ }
+ return changed;
+}
+
+/* Print the synced set. For debugging. */
+static void G_print_synced_set( int priority, synced_set *s, char *func_name ) {
+ int i;
+ proc p;
+ Alarmp( priority, GROUPS, "%s: Synced Set (with %u members):\n", func_name, s->size );
+ for( i = 0; i < s->size; ++i ) {
+ Conf_proc_by_id( s->proc_ids[i], &p );
+ Alarmp( priority, GROUPS, "%s: \t%s\n", func_name, p.name );
+ }
+}
+
+/* Eliminate the partitioned daemons of a group. Return true if we changed the
+ * group. */
+static int G_eliminate_partitioned_daemons( group *grp ) {
+ struct skiplistnode *diter;
+ daemon_members *dmn, *nextdaemon;
+ int group_changed = 0;
+ int needed;
+
+ diter = sl_getlist( &grp->DaemonsList );
+ dmn = (diter) ? (daemon_members *)diter->data : NULL;
+ for( ; dmn != NULL ; dmn = nextdaemon )
{
- next = Groups_bufs->next;
- dispose( Groups_bufs );
+ nextdaemon = sl_next( &grp->DaemonsList, &diter );
+ needed = 0;
+ /* The first condition is sufficient, but we can optimize a bit this way. */
+ if( Gstate == GGT ) /* Called in G_handle_reg_memb after we got a cascading transitional */
+ {
+ if( Conf_id_in_conf( &Trans_memb, dmn->proc_id ) == -1 )
+ {
+ needed = 1;
+ } else {
+ needed = 0;
+ }
+ } else { /* Called because we got the non-cascading regular membership */
+ if( Is_partitioned_daemon( dmn ) )
+ {
+ needed = 1;
+ } else {
+ needed = 0;
+ }
+ }
+ if( needed )
+ {
+ /* discard this daemon and its members - proc no longer in membership */
+ G_remove_daemon( grp, dmn );
+ group_changed = 1;
+ }
}
- return;
+ return group_changed;
+}
+
+/* This function is only called when we handle a cascading transitional membership.
+ * Gstate should be GGATHER, about to change to GGT */
+static int G_check_if_changed_by_cascade( group *grp ) {
+ struct skiplistnode *diter;
+ daemon_members *dmn, *nextdaemon;
+ int group_changed = 0;
+
+ diter = sl_getlist( &grp->DaemonsList );
+ dmn = (diter) ? (daemon_members *)diter->data : NULL;
+ for( ; dmn != NULL ; dmn = nextdaemon )
+ {
+ nextdaemon = sl_next( &grp->DaemonsList, &diter );
+ if( Conf_id_in_conf( &Trans_memb, dmn->proc_id ) == -1 )
+ {
+ group_changed = 1;
+ break;
+ }
+ }
+ return group_changed;
}
-int G_get_num_local( char *group_name )
+static void G_remove_daemon( group *grp, daemon_members *dmn )
{
- group *grp = G_get_group( group_name );
- if( grp == NULL ) return 0;
- return grp->num_local;
+ grp->num_members -= dmn->MembersList.size;
+ sl_destruct( &dmn->MembersList, dispose );
+ sl_remove( &grp->DaemonsList, &dmn->proc_id, dispose );
+}
+
+/* Remove a group that is known to be empty. */
+static void G_remove_group( group *grp ) {
+ assert( grp->DaemonsList.size == 0 );
+ sl_destruct( &grp->DaemonsList, dispose );
+ sl_remove( &GroupsList, grp->name, dispose );
+ Num_groups--;
+ GlobalStatus.num_groups = Num_groups;
+}
+
+/* All non-partitioned daemons get the membership ID component of the
+ * groups current group ID. */
+static void G_update_daemon_memb_ids( group *grp ) {
+ struct skiplistnode *diter;
+ daemon_members *dmn, *nextdaemon;
+
+ diter = sl_getlist( &grp->DaemonsList );
+ dmn = (diter) ? (daemon_members *)diter->data : NULL;
+ for( ; dmn != NULL ; dmn = nextdaemon )
+ {
+ nextdaemon = sl_next( &grp->DaemonsList, &diter );
+ if( Is_established_daemon( dmn ) )
+ dmn->memb_id = grp->grp_id.memb_id;
+ }
}
-static void G_print_group_id( group_id g )
+static void G_print_group_id( int priority, group_id g, char *func_name )
{
- Alarm( DEBUG, "{Proc ID: %d, Time: %d, Index: %d}",
- g.memb_id.proc_id, g.memb_id.time, g.index );
+ char ip_string[16];
+ IP_to_STR( g.memb_id.proc_id, ip_string );
+ Alarmp( priority, GROUPS,
+ "%s: Group_id {Proc ID: %s, Time: %d, Index: %d}\n", func_name,
+ ip_string, g.memb_id.time, g.index );
}
1.6 +9 -4 spread/daemon/groups.h
Index: groups.h
===================================================================
RCS file: /storage/cvsroot/spread/daemon/groups.h,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- groups.h 5 Mar 2004 00:32:46 -0000 1.5
+++ groups.h 29 Oct 2004 21:13:07 -0000 1.6
@@ -43,10 +43,15 @@
#define GGT 4
#define GROUPS_BUF_SIZE 100000
-#define MAX_LOCAL_GROUP_MEMBERS ( GROUPS_BUF_SIZE - \
- ( sizeof( membership_id ) + MAX_GROUP_NAME + \
- sizeof( group_id ) + sizeof( int16 ) ) ) / \
- MAX_GROUP_NAME
+#define GROUPS_BUF_PREAMBLE_SIZE ( sizeof(membership_id) + sizeof(char) )
+#define GROUPS_BUF_GROUP_INFO_SIZE ( MAX_GROUP_NAME + sizeof(group_id) + \
+ sizeof(int16u) )
+#define GROUPS_BUF_DAEMON_INFO_SIZE ( sizeof(int32) + sizeof(membership_id) + \
+ sizeof(int16u) )
+#define MAX_LOCAL_GROUP_MEMBERS (( GROUPS_BUF_SIZE - GROUPS_BUF_PREAMBLE_SIZE \
+ - GROUPS_BUF_GROUP_INFO_SIZE \
+ - GROUPS_BUF_DAEMON_INFO_SIZE ) \
+ / MAX_GROUP_NAME )
void G_init(void);
1.7 +5 -3 spread/daemon/objects.h
Index: objects.h
===================================================================
RCS file: /storage/cvsroot/spread/daemon/objects.h,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- objects.h 5 Mar 2004 00:32:46 -0000 1.6
+++ objects.h 29 Oct 2004 21:13:07 -0000 1.7
@@ -37,7 +37,7 @@
* Copyright 1997 Jonathan Stanton <jonathan at cs.jhu.edu>
* Center for Networking and Distributed Systems
*
- * $Id: objects.h,v 1.6 2004/03/05 00:32:46 jonathan Exp $
+ * $Id: objects.h,v 1.7 2004/10/29 21:13:07 jonathan Exp $
*
*/
@@ -48,7 +48,7 @@
#include "spread_params.h" /* For SPREAD_PROTOCOL used in memory.c */
#define MAX_OBJECTS 200
-#define MAX_OBJ_USED 54
+#define MAX_OBJ_USED 56
/* Object types
*
@@ -108,9 +108,11 @@
#define SESSION_AUTH_INFO 51
#define GROUPS_BUF_LINK 52
+#define GROUPS_MESSAGE_LINK 53
+#define DAEMON_MEMBERS 54
/* Special objects */
-#define UNKNOWN_OBJ 53 /* This represents an object of undertermined or
+#define UNKNOWN_OBJ 55 /* This represents an object of undertermined or
* variable type. Can only be used when appropriate.
* i.e. when internal structure of object is not accessed.
* This is mainly used with queues
1.6 +20 -15 spread/daemon/sess_body.h
Index: sess_body.h
===================================================================
RCS file: /storage/cvsroot/spread/daemon/sess_body.h,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- sess_body.h 5 Mar 2004 00:32:46 -0000 1.5
+++ sess_body.h 29 Oct 2004 21:13:07 -0000 1.6
@@ -66,24 +66,29 @@
#define Is_preauth_session( status ) ( status & PRE_AUTH_SESSION )
#define Set_preauth_session( status ) ( status | PRE_AUTH_SESSION )
#define Clear_preauth_session( status ) ( status & ~PRE_AUTH_SESSION )
-
-typedef struct dummy_member {
- char private_name[MAX_GROUP_NAME];
- int32 status; /* new: joined in the last trans =>1, or not =>0 */
- int32 proc_id;
- int32 p_ind;
+
+/* All the information we need to maintain per group member is its private
+ * group name. */
+typedef struct dummy_member {
+ char name[MAX_GROUP_NAME];
} member;
+typedef struct dummy_daemon_members {
+ int32 proc_id;
+ membership_id memb_id; /* Used for vs_set sorting.
+ * == unknown_memb_id means partitioned. */
+ Skiplist MembersList;
+} daemon_members;
+
typedef struct dummy_group {
- char name[MAX_GROUP_NAME];
- group_id grp_id;
- int changed; /* No longer use negative grp_id.index to flag
- * groups that have changed. */
- int num_members;
- Skiplist MembersList;
- int num_local;
- mailbox mbox[MAX_SESSIONS];
- route_mask grp_mask;
+ char name[MAX_GROUP_NAME];
+ group_id grp_id;
+ int changed;
+ int num_members;
+ Skiplist DaemonsList;
+ int num_local;
+ mailbox mbox[MAX_SESSIONS];
+ route_mask grp_mask;
} group;
#undef ext
1.15 +132 -23 spread/daemon/sp.c
Index: sp.c
===================================================================
RCS file: /storage/cvsroot/spread/daemon/sp.c,v
retrieving revision 1.14
retrieving revision 1.15
diff -u -r1.14 -r1.15
--- sp.c 5 Oct 2004 14:08:10 -0000 1.14
+++ sp.c 29 Oct 2004 21:13:07 -0000 1.15
@@ -1406,38 +1406,120 @@
{
int flip_size;
group_id *gid_ptr;
- int32 *num_vs_ptr;
+ int32u *num_vs_sets_ptr;
+ int32u num_vs_sets;
+ int32u *local_vs_set_offset_ptr;
+ int32u *num_vs_ptr;
int bytes_to_copy, bytes_index;
char groups_buf[10240];
+ int num_bytes;
+ int j;
+ int total_index; /* Index into the scatters viewed as a contiguous byte array */
+ int target_index;/* Goal for total index */
+ int scat; /* The current scatter element */
+ int scat_index=0;/* Index into the current scatter element */
+ int first_scat; /* The first scatter element used for a given num_vs */
+ int first_scat_index; /* Index into the first scatter element for a given num_vs */
/*
* flip membership message:
- * group_id and number of member ins vs_set
- * so - acctually 4 int32.
+ * group_id, number of vs_sets, offset to local,
+ * and number of members in vs_sets (for each set)
+ * so - acctually 5+n int32.
*/
- flip_size = sizeof( group_id ) + sizeof( int32 );
+ flip_size = sizeof( group_id ) + 2*sizeof( int32u );
if( flip_size > max_mess_len ) flip_size = max_mess_len;
- for( bytes_index=0, i=0 ; bytes_index < flip_size ; i++, bytes_index += bytes_to_copy )
+ for( bytes_index = 0, scat = 0 ; bytes_index < flip_size ; bytes_index += bytes_to_copy )
{
bytes_to_copy = flip_size - bytes_index;
- if( bytes_to_copy > scat_mess->elements[i].len )
- bytes_to_copy = scat_mess->elements[i].len;
- memcpy( &groups_buf[bytes_index], scat_mess->elements[i].buf, bytes_to_copy );
+ if( bytes_to_copy > scat_mess->elements[scat].len )
+ bytes_to_copy = scat_mess->elements[scat].len;
+ memcpy( &groups_buf[bytes_index], scat_mess->elements[scat].buf, bytes_to_copy );
+ if( bytes_to_copy == scat_mess->elements[scat].len )
+ {
+ scat_index = 0;
+ ++scat;
+ } else {
+ scat_index = bytes_to_copy;
+ }
}
- gid_ptr = (group_id *)&groups_buf[0];
- num_vs_ptr = (int32 *)&groups_buf[sizeof(group_id)];
+ total_index = flip_size;
+ target_index = total_index;
+
+ num_bytes = 0;
+ gid_ptr = (group_id *)&groups_buf[num_bytes];
+ num_bytes += sizeof(group_id);
+ num_vs_sets_ptr = (int32u *)&groups_buf[num_bytes];
+ num_bytes += sizeof(int32u);
+ local_vs_set_offset_ptr = (int32u *)&groups_buf[num_bytes];
+ num_bytes += sizeof(int32u);
+
gid_ptr->memb_id.proc_id = Flip_int32( gid_ptr->memb_id.proc_id );
gid_ptr->memb_id.time = Flip_int32( gid_ptr->memb_id.time );
gid_ptr->index = Flip_int32( gid_ptr->index );
- *num_vs_ptr = Flip_int32( *num_vs_ptr );
- for( bytes_index=0, i=0 ; bytes_index < flip_size ; i++, bytes_index += bytes_to_copy )
+ *num_vs_sets_ptr = Flip_int32( *num_vs_sets_ptr );
+ num_vs_sets = *num_vs_sets_ptr;
+ *local_vs_set_offset_ptr = Flip_int32( *local_vs_set_offset_ptr );
+
+ for( bytes_index = 0, j = 0 ; bytes_index < flip_size ; j++, bytes_index += bytes_to_copy )
{
bytes_to_copy = flip_size - bytes_index;
- if( bytes_to_copy > scat_mess->elements[i].len )
- bytes_to_copy = scat_mess->elements[i].len;
- memcpy( scat_mess->elements[i].buf, &groups_buf[bytes_index], bytes_to_copy );
+ if( bytes_to_copy > scat_mess->elements[j].len )
+ bytes_to_copy = scat_mess->elements[j].len;
+ memcpy( scat_mess->elements[j].buf, &groups_buf[bytes_index], bytes_to_copy );
}
-
+ for( i = 0; i < num_vs_sets; ++i )
+ {
+ while( total_index < target_index )
+ {
+ if( target_index - total_index < scat_mess->elements[scat].len - scat_index )
+ {
+ scat_index += target_index - total_index;
+ total_index = target_index;
+ } else {
+ total_index += scat_mess->elements[scat].len - scat_index;
+ scat_index = 0;
+ ++scat;
+ }
+ }
+ first_scat_index = scat_index;
+ first_scat = scat;
+
+ flip_size = sizeof( int32u );
+ if( flip_size + total_index > max_mess_len ) flip_size = max_mess_len - total_index;
+ for( bytes_index = 0 ; bytes_index < flip_size ; bytes_index += bytes_to_copy )
+ {
+ bytes_to_copy = flip_size - bytes_index;
+ if( bytes_to_copy > scat_mess->elements[scat].len - scat_index )
+ bytes_to_copy = scat_mess->elements[scat].len - scat_index;
+ memcpy( &groups_buf[bytes_index], &(scat_mess->elements[scat].buf[scat_index]),
+ bytes_to_copy );
+ if( bytes_to_copy == scat_mess->elements[scat].len - scat_index )
+ {
+ scat_index = 0;
+ ++scat;
+ } else {
+ scat_index += bytes_to_copy;
+ }
+ }
+ total_index += flip_size;
+ target_index = total_index;
+
+ num_vs_ptr = (int32u *)&groups_buf[0];
+ *num_vs_ptr = Flip_int32( *num_vs_ptr );
+
+ for( bytes_index = 0, j = first_scat ; bytes_index < flip_size ;
+ j++, bytes_index += bytes_to_copy )
+ {
+ bytes_to_copy = flip_size - bytes_index;
+ if( bytes_to_copy > scat_mess->elements[j].len - first_scat_index )
+ bytes_to_copy = scat_mess->elements[j].len - first_scat_index;
+ memcpy( &(scat_mess->elements[j].buf[first_scat_index]),
+ &groups_buf[bytes_index], bytes_to_copy );
+ first_scat_index = 0;
+ }
+ target_index += *num_vs_ptr * MAX_GROUP_NAME;
+ }
}
if ( Is_reject_mess( head_ptr->type ) )
{
@@ -1507,23 +1589,50 @@
int SP_equal_group_ids( group_id g1, group_id g2 )
{
- if( g1.memb_id.proc_id == g2.memb_id.proc_id && g1.memb_id.time == g2.memb_id.time && g1.index == g2.index ) return( 1 );
+ if( g1.memb_id.proc_id == g2.memb_id.proc_id &&
+ g1.memb_id.time == g2.memb_id.time &&
+ g1.index == g2.index )
+ return( 1 );
else return( 0 );
}
-int SP_get_gid_offset_memb_mess()
+int SP_get_gid_offset_memb_mess()
+{
+ return 0;
+}
+
+int SP_get_num_vs_sets_offset_memb_mess()
+{
+ return sizeof(group_id);
+}
+
+static int SP_get_offset_to_local_vs_set_offset()
+{
+ return sizeof(group_id) + sizeof(int32u);
+}
+
+int SP_get_first_vs_set_offset_memb_mess()
+{
+ return sizeof(group_id) + 2*sizeof(int32u);
+}
+
+int SP_get_vs_set_size_offset_vs_set()
{
- return 0;
+ return 0;
}
-int SP_get_num_vs_offset_memb_mess()
+int SP_get_vs_set_members_offset_vs_set()
{
- return sizeof(group_id);
+ return sizeof(int32u);
}
-int SP_get_vs_set_offset_memb_mess()
+int SP_get_local_vs_set_offset_memb_mess( char *reg_memb_mess )
{
- return sizeof(group_id) + sizeof(int32);
+ int32u offset;
+ memcpy( &offset, ®_memb_mess[SP_get_offset_to_local_vs_set_offset()],
+ sizeof(int32u) );
+ offset += SP_get_first_vs_set_offset_memb_mess();
+ return offset;
}
int SP_query_groups( mailbox mbox, int max_groups, char *groups[MAX_GROUP_NAME] )
1.6 +11 -3 spread/daemon/sp_func.h
Index: sp_func.h
===================================================================
RCS file: /storage/cvsroot/spread/daemon/sp_func.h,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- sp_func.h 5 Mar 2004 00:32:46 -0000 1.5
+++ sp_func.h 29 Oct 2004 21:13:07 -0000 1.6
@@ -89,10 +89,18 @@
int16 *mess_type, int *endian_mismatch,
scatter *scat_mess );
-/* returns offset in memb. message of gid (group id), num_vs and vs_set */
+/* FIXME: We could make something much friendlier here. */
+/* returns offset in memb. message of gid (group id), num_vs_sets,
+ * offset to local vs_set, and first vs_set */
int SP_get_gid_offset_memb_mess(void);
-int SP_get_num_vs_offset_memb_mess(void);
-int SP_get_vs_set_offset_memb_mess(void);
+int SP_get_num_vs_sets_offset_memb_mess(void);
+int SP_get_first_vs_set_offset_memb_mess(void);
+/* returns offset into the buffer for a vs_set of the number of
+ * members in the vs_set. */
+int SP_get_vs_set_size_offset_vs_set(void);
+int SP_get_vs_set_members_offset_vs_set(void);
+/* returns value from regular membership message */
+int SP_get_local_vs_set_offset_memb_mess( char *reg_memb_mess );
int SP_poll( mailbox mbox );
1.10 +51 -27 spread/daemon/user.c
Index: user.c
===================================================================
RCS file: /storage/cvsroot/spread/daemon/user.c,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -r1.9 -r1.10
--- user.c 24 Sep 2004 00:30:08 -0000 1.9
+++ user.c 29 Oct 2004 21:13:07 -0000 1.10
@@ -39,6 +39,8 @@
#include <stdlib.h>
#include <string.h>
+#define int32u unsigned int
+
#ifdef _REENTRANT
#ifndef ARCH_PC_WIN95
@@ -374,22 +376,28 @@
fflush(stdout);
}
+/* FIXME: The user.c code does not use memcpy()s to avoid bus errors when
+ * dereferencing a pointer into a potentially misaligned buffer */
+
static void Read_message()
{
-static char mess[MAX_MESSLEN];
- char sender[MAX_GROUP_NAME];
- char target_groups[100][MAX_GROUP_NAME];
- group_id *grp_id;
- int32 *num_vs;
- char *vs_members;
- int num_groups;
- int num_bytes;
- int service_type;
- int16 mess_type;
- int endian_mismatch;
- int i;
- int ret;
+static char mess[MAX_MESSLEN];
+ char sender[MAX_GROUP_NAME];
+ char target_groups[100][MAX_GROUP_NAME];
+ group_id *grp_id_ptr;
+ int32u *num_vs_sets_ptr;
+ int32u local_vs_set_offset;
+ char *vs_set_size_ptr;
+ int32u vs_set_size;
+ char *vs_set_members;
+ int num_groups;
+ int num_bytes;
+ int service_type;
+ int16 mess_type;
+ int endian_mismatch;
+ int i,j;
+ int ret;
service_type = 0;
@@ -430,30 +438,46 @@
{
if ( Is_reg_memb_mess( service_type ) )
{
- num_bytes = SP_get_gid_offset_memb_mess();
- grp_id = (group_id *)&mess[num_bytes];
- num_bytes = SP_get_num_vs_offset_memb_mess();
- num_vs = (int32 *)&mess[num_bytes];
- num_bytes = SP_get_vs_set_offset_memb_mess();
- vs_members = &mess[num_bytes];
+ num_bytes = SP_get_gid_offset_memb_mess();
+ grp_id_ptr = (group_id *)&mess[num_bytes];
+ num_bytes = SP_get_num_vs_sets_offset_memb_mess();
+ num_vs_sets_ptr = (int32u *)&mess[num_bytes];
+ local_vs_set_offset = SP_get_local_vs_set_offset_memb_mess(mess);
+ num_bytes = SP_get_first_vs_set_offset_memb_mess();
+ num_bytes += SP_get_vs_set_size_offset_vs_set();
+ vs_set_size_ptr = &mess[num_bytes];
+ num_bytes += SP_get_vs_set_members_offset_vs_set();
+ vs_set_members = &mess[num_bytes];
+
printf("Received REGULAR membership for group %s with %d members, where I am member %d:\n",
sender, num_groups, mess_type );
for( i=0; i < num_groups; i++ )
printf("\t%s\n", &target_groups[i][0] );
- printf("grp id is %d %d %d\n",grp_id->id[0], grp_id->id[1], grp_id->id[2] );
+ printf("grp id is %d %d %d\n",grp_id_ptr->id[0], grp_id_ptr->id[1], grp_id_ptr->id[2] );
if( Is_caused_join_mess( service_type ) )
{
- printf("Due to the JOIN of %s\n", vs_members );
+ printf("Due to the JOIN of %s\n", vs_set_members );
}else if( Is_caused_leave_mess( service_type ) ){
- printf("Due to the LEAVE of %s\n", vs_members);
+ printf("Due to the LEAVE of %s\n", vs_set_members);
}else if( Is_caused_disconnect_mess( service_type ) ){
- printf("Due to the DISCONNECT of %s\n", vs_members );
+ printf("Due to the DISCONNECT of %s\n", vs_set_members );
}else if( Is_caused_network_mess( service_type ) ){
- printf("Due to NETWORK change. ");
- printf("VS set has %d members:\n", *num_vs );
- for( i=0; i < *num_vs; i++, vs_members+= MAX_GROUP_NAME )
- printf("\t%s\n", vs_members );
+ printf("Due to NETWORK change with %u VS sets\n", *num_vs_sets_ptr);
+ for( i = 0; i < *num_vs_sets_ptr; ++i )
+ {
+ if( 0 != i )
+ {
+ vs_set_size_ptr = vs_set_members;
+ vs_set_members += sizeof(int32u);
+ }
+ memcpy( &vs_set_size, vs_set_size_ptr, sizeof(int32u) );
+ printf("%s VS set %d has %d members:\n",
+ (vs_set_size_ptr - mess == local_vs_set_offset) ?
+ ("LOCAL") : ("OTHER"), i, vs_set_size );
+ for( j = 0; j < vs_set_size; j++, vs_set_members += MAX_GROUP_NAME )
+ printf("\t%s\n", vs_set_members );
+ }
}
}else if( Is_transition_mess( service_type ) ) {
printf("received TRANSITIONAL membership for group %s\n", sender );
More information about the Spread-cvs
mailing list