[Spread-cvs] cvs commit: spread/daemon groups.c groups.h memory.c objects.h session.c Readme.txt
jonathan at spread.org
jonathan at spread.org
Thu Aug 29 11:43:03 EDT 2002
jonathan 02/08/29 15:43:03
Modified: daemon groups.c groups.h memory.c objects.h session.c
Readme.txt
Log:
Apply Ryan Caudy's work on supporting large groups and large numbers of groups
during membership changes.
Fix GGT bug.
Fix Memory bug where Mem_alloc allocated memory could not be freed without
triggering an assertion failure.
Revision Changes Path
1.7 +158 -69 spread/daemon/groups.c
Index: groups.c
===================================================================
RCS file: /storage/cvsroot/spread/daemon/groups.c,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- groups.c 3 Feb 2002 19:28:10 -0000 1.6
+++ groups.c 29 Aug 2002 15:43:01 -0000 1.7
@@ -66,6 +66,12 @@
#define Is_established_member( status ) ( status == ESTABLISHED_MEMBER )
#define Is_new_member( status ) ( status == NEW_MEMBER )
#define Is_partitioned_member( status ) ( status == PARTITIONED_MEMBER )
+
+typedef struct dummy_groups_buf_link {
+ char buf[GROUPS_BUF_SIZE];
+ int bytes;
+ struct dummy_groups_buf_link *next;
+} groups_buf_link;
struct worklist {
char name[MAX_GROUP_NAME];
@@ -82,10 +88,12 @@
static configuration Reg_memb;
static membership_id Reg_memb_id;
static char Mess_buf[10000];
-static char Groups_buf[100000];
-static int Groups_bytes;
-static message_link Gathered; /* Groups messages */
-static int Num_gathered;
+
+static groups_buf_link *Groups_bufs;
+static int Num_mess_gathered;
+static int Num_daemons_gathered;
+static message_link Gathered; /* Groups messages */
+
#if 0
static group Work[MAX_PROCS_RING+1];
#endif
@@ -101,13 +109,14 @@
static message_link *G_build_trans_mess( group *grp );
static void G_stamp_groups_buf( char buf[] );
-static void G_refresh_groups_msg( message_obj *msg, int groups_bytes );
-static int G_build_groups_buf( message_obj *msg, char buf[] );
+static void G_build_groups_msg_hdr( message_obj *msg, int groups_bytes );
+static int G_build_groups_buf( char buf[], struct skiplistnode **iter_ptr );
static int G_mess_to_groups( message_link *mess_link, char *name,
struct worklist *work );
static int G_smallest_group_indices( Skiplist *work, struct worklist *indices[] );
static void G_compute_and_notify(void);
static void G_print(void);
+static void G_empty_groups_bufs(void);
static Skiplist GroupsList;
@@ -233,12 +242,20 @@
{
Alarm(EXIT, "G_init: Failure to Initialize MEMBER memory objects\n");
}
+ ret = Mem_init_object(GROUPS_BUF_LINK, sizeof(groups_buf_link), 1, 1);
+ if( ret < 0 )
+ {
+ Alarm(EXIT, "G_init: Failure to Initialize GROUPS_BUF_LINK memory objects\n");
+ }
#if ( SPREAD_PROTOCOL == 3 )
Groups_control_down_queue = 0;
#else
Groups_control_down_queue = init_queuesess(Groups_down_qs);
#endif
- Num_gathered = 0;
+
+ Groups_bufs = NULL;
+ Num_mess_gathered = 0;
+ Num_daemons_gathered = 0;
Gathered.next = NULL;
Gstate = GOP;
@@ -249,7 +266,8 @@
{
group *grp, *nextgroup;
member *mbr, *nextmember;
- struct skiplistnode *giter, *iter;
+ struct skiplistnode *giter, *iter, *passed_iter;
+ groups_buf_link *grps_buf_link;
message_link *mess_link;
down_link *down_ptr;
message_obj *msg;
@@ -407,18 +425,34 @@
Prot_set_down_queue( GROUPS_DOWNQUEUE );
/* build and send Groups message */
- msg = Message_new_message();
- Groups_bytes = G_build_groups_buf( msg, Groups_buf );
- head_ptr = Message_get_message_header(msg);
- head_ptr->type |= SAFE_MESS;
- Message_Buffer_to_Message_Fragments( msg, Groups_buf, Groups_bytes );
-
- down_ptr = Prot_Create_Down_Link(msg, Message_get_packet_type(head_ptr->type), 0, 0);
- down_ptr->mess = msg;
- Obj_Inc_Refcount(down_ptr->mess);
- /* Use control queue--not normal session queues */
- Prot_new_message( down_ptr, Groups_control_down_queue );
- Message_Dec_Refcount(msg);
+ /* Nowadays, we can send multiple groups messages. No group has
+ * data in more than one. As an optimization, only the last message is
+ * SAFE, and all previous ones are RELIABLE. G_handle_groups uses this
+ * knowledge when parsing Groups messages. */
+ passed_iter = NULL;
+ do {
+ msg = Message_new_message();
+ grps_buf_link = new( GROUPS_BUF_LINK );
+ grps_buf_link->next = Groups_bufs;
+ Groups_bufs = grps_buf_link;
+ grps_buf_link->bytes = G_build_groups_buf(grps_buf_link->buf, &passed_iter);
+ G_build_groups_msg_hdr( msg, grps_buf_link->bytes );
+ head_ptr = Message_get_message_header(msg);
+ if( passed_iter )
+ head_ptr->type |= RELIABLE_MESS;
+ else
+ head_ptr->type |= SAFE_MESS;
+ Message_Buffer_to_Message_Fragments( msg, grps_buf_link->buf, grps_buf_link->bytes );
+
+ down_ptr = Prot_Create_Down_Link(msg, Message_get_packet_type(head_ptr->type), 0, 0);
+ down_ptr->mess = msg;
+ Obj_Inc_Refcount(down_ptr->mess);
+ /* Use control queue--not normal session queues */
+ Prot_new_message( down_ptr, Groups_control_down_queue );
+ Message_Dec_Refcount(msg);
+ Alarm( GROUPS, "G_handle_reg_memb: (%8s) GROUPS message sent in GTRANS with %d bytes\n",
+ (passed_iter) ? "RELIABLE" : "SAFE", grps_buf_link->bytes );
+ } while( passed_iter != NULL );
Gstate = GGATHER;
GlobalStatus.gstate = Gstate;
@@ -444,31 +478,40 @@
Reg_memb_id = reg_memb_id;
/* Clear retained Groups messages in Gathered */
- for( i=0; i < Num_gathered; i++ )
+ for( i=0; i < Num_mess_gathered; i++ )
{
mess_link = Gathered.next;
Gathered.next = mess_link->next;
Sess_dispose_message( mess_link );
}
- Num_gathered = 0;
+ Num_mess_gathered = 0;
+ Num_daemons_gathered = 0;
- /* Stamp own Groups message in buffer with current membership id */
- G_stamp_groups_buf( Groups_buf );
+ for( grps_buf_link = Groups_bufs; grps_buf_link; grps_buf_link = grps_buf_link->next )
+ {
+ /* Stamp own Groups message in buffer with current membership id */
+ G_stamp_groups_buf( grps_buf_link->buf );
- /* send Groups message */
- msg = Message_new_message();
- G_refresh_groups_msg( msg, Groups_bytes );
- head_ptr = Message_get_message_header(msg);
- head_ptr->type |= SAFE_MESS;
- Message_Buffer_to_Message_Fragments( msg, Groups_buf, Groups_bytes );
-
- down_ptr = Prot_Create_Down_Link(msg, Message_get_packet_type(head_ptr->type), 0, 0);
- down_ptr->mess = msg;
- Obj_Inc_Refcount(down_ptr->mess);
- /* Use control queue--not normal session queues */
- Prot_new_message( down_ptr, Groups_control_down_queue );
- Message_Dec_Refcount(msg);
-
+ /* send Groups message */
+ msg = Message_new_message();
+ G_build_groups_msg_hdr( msg, grps_buf_link->bytes );
+ head_ptr = Message_get_message_header(msg);
+ if( grps_buf_link->next )
+ head_ptr->type |= RELIABLE_MESS;
+ else
+ head_ptr->type |= SAFE_MESS;
+ Message_Buffer_to_Message_Fragments( msg, grps_buf_link->buf, grps_buf_link->bytes );
+
+ down_ptr = Prot_Create_Down_Link(msg, Message_get_packet_type(head_ptr->type), 0, 0);
+ down_ptr->mess = msg;
+ Obj_Inc_Refcount(down_ptr->mess);
+ /* Use control queue--not normal session queues */
+ Prot_new_message( down_ptr, Groups_control_down_queue );
+ Message_Dec_Refcount(msg);
+ Alarm( GROUPS, "G_handle_reg_memb: (%8s) GROUPS message sent in GGT with %d bytes\n",
+ (grps_buf_link->next) ? "RELIABLE" : "SAFE", grps_buf_link->bytes );
+ }
+
Gstate = GGATHER;
GlobalStatus.gstate = Gstate;
@@ -701,7 +744,7 @@
Num_groups++; /*sl need this?*/
GlobalStatus.num_groups = Num_groups;
grp = new_grp;
- }
+ }
mbr = G_get_member( grp, private_group_name );
if( mbr != NULL )
{
@@ -1348,18 +1391,28 @@
}
if( ! Memb_is_equal( temp_memb_id, Reg_memb_id ) )
{
+ Alarm( GROUPS,
+ "G_handle_groups: GROUPS message received from bad memb id proc %d, time %d, daemon %s.\n",
+ temp_memb_id.proc_id, temp_memb_id.time, head_ptr->private_group_name );
Sess_dispose_message( mess_link );
Message_Dec_Refcount(msg);
return;
}
+
mess_link->next = Gathered.next;
Gathered.next = mess_link;
- Num_gathered++;
- if( Num_gathered != Conf_num_procs( &Reg_memb ) )
+ Num_mess_gathered++;
+ /* The last Groups message a daemon sends is SAFE. */
+ if( Is_safe_mess( head_ptr->type ) ) Num_daemons_gathered++;
+ Alarm( GROUPS, "G_handle_groups: GROUPS message received from %s - msgs %d, daemons %d\n",
+ head_ptr->private_group_name, Num_mess_gathered, Num_daemons_gathered );
+ if( Num_daemons_gathered != Conf_num_procs( &Reg_memb ) )
{
Message_Dec_Refcount(msg);
return;
}
+ Alarm( GROUPS, "G_handle_groups: Last GROUPS message received - msgs %d, daemons %d\n",
+ Num_mess_gathered, Num_daemons_gathered );
/* Replace protocol queue */
Prot_set_down_queue( NORMAL_DOWNQUEUE );
@@ -1392,7 +1445,6 @@
group *grp, *new_grp, *orig_grp;
member *mbr;
int changed;
- int orig_total_members;
int ret;
int vs_bytes;
int32 *num_vs_ptr; /* num members in virtual-synchrony/failure-atomicity set */
@@ -1415,7 +1467,7 @@
sl_init(&work);
sl_set_compare(&work, G_work_groups_comp, G_work_groups_keycomp);
- for( i=0; i < Num_gathered; i++ )
+ for( i=0; i < Num_mess_gathered; i++ )
{
struct worklist *tp;
tp = (struct worklist *)Mem_alloc(sizeof(struct worklist));
@@ -1459,13 +1511,10 @@
*num_vs_ptr = 0;
changed = 0;
- orig_total_members = 0;
orig_grp = NULL;
- if(NULL != (this_group = (group *)(sl_getlist(indices[0]->groups)->data)) ) {
- orig_total_members = this_group->num_local;
- orig_grp = sl_find( &GroupsList, this_group->name, &top_iter);
- }
-
+ assert( NULL != (this_group = (group *)(sl_getlist(indices[0]->groups)->data)) );
+ orig_grp = sl_find( &GroupsList, this_group->name, &top_iter);
+
if( orig_grp == NULL )
{
new_grp = new( GROUP );
@@ -1575,7 +1624,29 @@
Message_Dec_Refcount(msg);
}
}
- Num_gathered = 0;
+ Num_mess_gathered = 0;
+ Num_daemons_gathered = 0;
+
+ /* We're going back to GOP... destroy our groups messages. */
+ G_empty_groups_bufs();
+
+ /* Finish freeing the memory in our worklists */
+ {
+ struct worklist *worklist;
+ struct skiplistnode *iter;
+
+ iter = sl_getlist(&work);
+ worklist = (iter)?iter->data:NULL;
+
+ while( worklist != NULL ) {
+ assert( worklist->groups->size == 0 );
+ dispose( worklist->groups );
+ worklist = sl_next(&work, &iter);
+ }
+ }
+
+ sl_remove_all( &work, dispose );
+
G_print();
}
@@ -1776,11 +1847,12 @@
static void G_stamp_groups_buf( char buf[] )
{
char *memb_id_ptr;
-
- memb_id_ptr = &buf[ Message_get_data_header_size() ];
+ memb_id_ptr = buf;
memcpy( memb_id_ptr, &Reg_memb_id, sizeof( membership_id ) );
}
-static void G_refresh_groups_msg( message_obj *msg, int groups_bytes )
+
+/* This function used to be called G_refresh_groups_msg. */
+static void G_build_groups_msg_hdr( message_obj *msg, int groups_bytes )
{
message_header *head_ptr;
@@ -1793,10 +1865,12 @@
head_ptr->num_groups = 0;
head_ptr->data_len = groups_bytes;
}
-static int G_build_groups_buf( message_obj *msg, char buf[])
+
+/* This function guarantees that each group's data appears in only one buffer in
+ * a sequence, and that the sorted order is preserved from the GroupsList. */
+static int G_build_groups_buf(char buf[], struct skiplistnode **iter_ptr)
{
int num_bytes;
- message_header *head_ptr;
char *memb_id_ptr;
group *grp;
char *gid_ptr;
@@ -1804,28 +1878,26 @@
struct skiplistnode *giter, *iter;
int16 *num_memb_ptr;
char *memb_ptr;
+ int size_for_this_group;
num_bytes = 0;
- head_ptr = Message_get_message_header(msg);
- head_ptr->type = GROUPS_MESS;
- head_ptr->type = Set_endian( head_ptr->type );
- head_ptr->hint = Set_endian( 0 );
-
- memset(head_ptr->private_group_name, 0, MAX_GROUP_NAME);
-
- strcpy( head_ptr->private_group_name, My.name );
- head_ptr->num_groups = 0;
memb_id_ptr = &buf[num_bytes];
num_bytes += sizeof( membership_id );
memcpy( memb_id_ptr, &Reg_memb_id, sizeof( membership_id ) );
- giter = sl_getlist( &GroupsList );
+ giter = (*iter_ptr) ? (*iter_ptr) : (sl_getlist( &GroupsList ));
+
grp = (giter)?(group *)giter->data:NULL;
for( ; grp != NULL ; grp = sl_next( &GroupsList, &giter ) )
{
if( grp->num_local == 0 ) continue;
+ size_for_this_group = MAX_GROUP_NAME + sizeof(group_id) + sizeof(int16) +
+ (grp->num_local * MAX_GROUP_NAME);
+ /* This requires that the number of local group members be limited. */
+ if( size_for_this_group > GROUPS_BUF_SIZE - num_bytes ) break;
+
memcpy( &buf[num_bytes], grp->name, MAX_GROUP_NAME );
num_bytes += MAX_GROUP_NAME;
@@ -1839,8 +1911,7 @@
iter = sl_getlist( &grp->MembersList );
mbr = (iter)?(member *)iter->data:NULL;
- for( ; mbr != NULL ;
- mbr = sl_next( &grp->MembersList, &iter ) )
+ for( ; mbr != NULL ; mbr = sl_next( &grp->MembersList, &iter ) )
{
/* collect local members */
if( mbr->proc_id != My.id ) continue;
@@ -1854,8 +1925,7 @@
grp->name, (*num_memb_ptr), grp->num_local );
}
- head_ptr->data_len = num_bytes;
-
+ *iter_ptr = giter;
return( num_bytes );
}
@@ -2196,4 +2266,23 @@
}
printf("----------------------\n");
}
+}
+
+static void G_empty_groups_bufs()
+{
+ groups_buf_link *next;
+
+ for( ; Groups_bufs; Groups_bufs = next )
+ {
+ next = Groups_bufs->next;
+ dispose( Groups_bufs );
+ }
+ return;
+}
+
+int G_get_num_local( char *group_name )
+{
+ group *grp = G_get_group( group_name );
+ if( grp == NULL ) return 0;
+ return grp->num_local;
}
1.2 +7 -0 spread/daemon/groups.h
Index: groups.h
===================================================================
RCS file: /storage/cvsroot/spread/daemon/groups.h,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- groups.h 21 Aug 2001 14:28:21 -0000 1.1
+++ groups.h 29 Aug 2002 15:43:01 -0000 1.2
@@ -41,6 +41,12 @@
#define GGATHER 3
#define GGT 4
+#define GROUPS_BUF_SIZE 100000
+#define MAX_LOCAL_GROUP_MEMBERS ( GROUPS_BUF_SIZE - \
+ ( sizeof( membership_id ) + MAX_GROUP_NAME + \
+ sizeof( group_id ) + sizeof( int16 ) ) ) / \
+ MAX_GROUP_NAME
+
void G_init(void);
void G_handle_reg_memb( configuration reg_memb, membership_id reg_memb_id );
@@ -54,5 +60,6 @@
void G_set_mask( int num_groups, char target_groups[][MAX_GROUP_NAME], int32u *grp_mask );
int G_private_to_names( char *private_group_name, char *private_name, char *proc_name );
+int G_get_num_local( char *group_name );
#endif /* INC_GROUPS */
1.4 +6 -0 spread/daemon/memory.c
Index: memory.c
===================================================================
RCS file: /storage/cvsroot/spread/daemon/memory.c,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- memory.c 29 Oct 2001 21:21:47 -0000 1.3
+++ memory.c 29 Aug 2002 15:43:01 -0000 1.4
@@ -524,6 +524,12 @@
mem_header * head_ptr;
if (length == 0) { return(NULL); }
+ if( !Mem[BLOCK_OBJECT].exist )
+ {
+ Mem[BLOCK_OBJECT].exist = TRUE;
+ Mem[BLOCK_OBJECT].size = 0;
+ Mem[BLOCK_OBJECT].threshold = 0;
+ }
head_ptr = (mem_header *) calloc(1, length + sizeof(mem_header));
1.4 +6 -3 spread/daemon/objects.h
Index: objects.h
===================================================================
RCS file: /storage/cvsroot/spread/daemon/objects.h,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- objects.h 31 Aug 2001 16:13:35 -0000 1.3
+++ objects.h 29 Aug 2002 15:43:01 -0000 1.4
@@ -36,7 +36,7 @@
* Copyright 1997 Jonathan Stanton <jonathan at cs.jhu.edu>
* Center for Networking and Distributed Systems
*
- * $Id: objects.h,v 1.3 2001/08/31 16:13:35 jonathan Exp $
+ * $Id: objects.h,v 1.4 2002/08/29 15:43:01 jonathan Exp $
*
*/
@@ -47,7 +47,7 @@
#include "spread_params.h" /* For SPREAD_PROTOCOL used in memory.c */
#define MAX_OBJECTS 200
-#define MAX_OBJ_USED 53
+#define MAX_OBJ_USED 54
/* Object types
*
@@ -105,8 +105,11 @@
#define PACKET_BODY 50
#define SESSION_AUTH_INFO 51
+
+#define GROUPS_BUF_LINK 52
+
/* Special objects */
-#define UNKNOWN_OBJ 52 /* This represents an object of undertermined or
+#define UNKNOWN_OBJ 53 /* This represents an object of undertermined or
* variable type. Can only be used when appropriate.
* i.e. when internal structure of object is not accessed.
* This is mainly used with queues
1.9 +12 -0 spread/daemon/session.c
Index: session.c
===================================================================
RCS file: /storage/cvsroot/spread/daemon/session.c,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -r1.8 -r1.9
--- session.c 27 Aug 2002 01:10:52 -0000 1.8
+++ session.c 29 Aug 2002 15:43:01 -0000 1.9
@@ -1366,6 +1366,18 @@
char *groups_ptr;
int decision;
groups_ptr = Message_get_first_group( msg );
+
+ /* Make sure we don't let a join happen if the limit has been reached. */
+ if( G_get_num_local( groups_ptr ) == MAX_LOCAL_GROUP_MEMBERS ) {
+ Alarm( PRINT, "Sess_read: Attempt by session %s to join group %s "
+ "failed: too many local members.\n", head_ptr->private_group_name, groups_ptr );
+ head_ptr->type = (head_ptr->type & ~JOIN_MESS);
+ head_ptr->type |= CAUSED_BY_JOIN;
+ Sess_create_reject_message( msg );
+ Sess_deliver_reject( msg );
+ return;
+ }
+
decision = Sessions[ses].acp_ops.join_group( head_ptr->private_group_name, groups_ptr, NULL);
if (decision != ACM_ACCESS_ALLOWED)
{
1.20 +7 -1 spread/daemon/Readme.txt
Index: Readme.txt
===================================================================
RCS file: /storage/cvsroot/spread/daemon/Readme.txt,v
retrieving revision 1.19
retrieving revision 1.20
diff -u -r1.19 -r1.20
--- Readme.txt 27 Aug 2002 01:10:52 -0000 1.19
+++ Readme.txt 29 Aug 2002 15:43:01 -0000 1.20
@@ -50,12 +50,18 @@
does not use it if the default of INADDR_ANY is used. If you force
this on, be aware that it can open up a security risk where other
processes can steal Spread's traffic.
+*) Enhance the group membership algorithm so very large groups and very
+ numbers of groups will be merged correctly during a membership.
+ With this change Spread should correctly handle groups with thousands
+ of members and thousands of groups. Thanks to Ryan Caudy from JHU for
+ doing this work.
Bugfixes:
*) Make sure service_type is set to 0 before using it in SP_receive calls
in sample programs (spuser, spflooder, simple_user).
-
*) Updates to man pages. Clarify service_type handling.
+*) Fix GGT bug where Spread gets stuck or crashes if you reach GGT state.
+*) Fix Mem_Alloc blocks so they can be freed without assertion failure.
April 2, 2002 Ver 3.16.2 (The Almost April Fools Release)
---------------------------
More information about the Spread-cvs
mailing list