[Spread-cvs] commit: r535 - trunk/libspread

jschultz at spread.org jschultz at spread.org
Thu Jul 12 16:00:59 EDT 2012


Author: jschultz
Date: 2012-07-12 16:00:59 -0400 (Thu, 12 Jul 2012)
New Revision: 535

Modified:
   trunk/libspread/sp.c
Log:
Changes to client library that move send/recv mutexes into session structure.  Should eliminate blocking (deadlocking?) bug on Windows where mailbox collisions on mutexes was too common. Also, session lookups on *nix systems should be slightly faster now.


Modified: trunk/libspread/sp.c
===================================================================
--- trunk/libspread/sp.c	2012-07-10 16:38:15 UTC (rev 534)
+++ trunk/libspread/sp.c	2012-07-12 20:00:59 UTC (rev 535)
@@ -92,6 +92,8 @@
 };
 
 typedef	struct	dummy_sp_session {
+        mutex_type recv_mutex;
+        mutex_type send_mutex;
 	mailbox	mbox;
         enum sp_sess_state state;
 	char	private_group_name[MAX_GROUP_NAME];
@@ -119,19 +121,18 @@
 static  struct auth_method_info Auth_Methods[MAX_AUTH_METHODS] = { {"NULL", sp_null_authenticate, NULL} };
 static  int     Num_Reg_Auth_Methods = 1;
 
-#define	MAX_MUTEX	256
-#define	MAX_MUTEX_MASK	0x000000ff
-
 static once_type Init_once = ONCE_STATIC_INIT;
 
 #ifdef _REENTRANT
 static	mutex_type	Struct_mutex;
-static	mutex_type	Mbox_mutex[MAX_MUTEX][2];
 
 #endif /* def _REENTRANT */
 
+#define MAX_LIB_SESSIONS       (0x1 << 10)                        /* must remain a power of 2 */
+#define MBOX_TO_BASE_SES(mbox) ((mbox) & (MAX_LIB_SESSIONS - 1))  /* so this bit masking works */
+
 static	int		Num_sessions = 0;
-static	sp_session	Sessions[MAX_SESSIONS];
+static	sp_session	Sessions[MAX_LIB_SESSIONS];
 
 static  sp_time         Zero_timeout = { 0, 0 };
 
@@ -193,6 +194,7 @@
 /* We take the Struct_mutex in prepare to prevent inconsistent state 
  * in Session[] structs in child
  */
+/* NOTE: this needs work to be 100% correct (need to grab all locks or use semaphores or something) */
 #ifdef _REENTRANT
 
 #ifdef  HAVE_PTHREAD_ATFORK
@@ -207,17 +209,17 @@
 {
         Mutex_unlock( &Struct_mutex );
 }
-/* Child unlocks Struct_mutex, and also unlocks all Mbox_mutexes so
+/* Child unlocks Struct_mutex, and also unlocks all session locks so
  * it can acces all of the connections.
  */
 static  void    sp_atfork_child(void)
 {
-        int i;
+        int ses;
         Mutex_unlock( &Struct_mutex );
-        for( i=0; i < MAX_MUTEX; i++ )
+        for( ses=0; ses < MAX_LIB_SESSIONS; ++ses )
         {
-            Mutex_unlock( &Mbox_mutex[i][0] );
-            Mutex_unlock( &Mbox_mutex[i][1] );
+            Mutex_unlock( &Sessions[ses].recv_mutex );
+	    Mutex_unlock( &Sessions[ses].send_mutex );
         }
 
 }
@@ -227,13 +229,15 @@
 
 static  void    sp_initialize(void)
 {
-        int i;
+        int ses;
 
 	Mutex_init( &Struct_mutex );
-	for( i=0; i < MAX_MUTEX; i++ )
+	for( ses=0; ses < MAX_LIB_SESSIONS; ++ses )
 	{
-	        Mutex_init( &Mbox_mutex[i][0] );
-		Mutex_init( &Mbox_mutex[i][1] );
+	        Mutex_init( &Sessions[ses].recv_mutex );
+		Mutex_init( &Sessions[ses].send_mutex );
+		Sessions[ses].mbox  = -1;
+		Sessions[ses].state = SESS_UNUSED;
 	}
 
 	Mutex_atfork( sp_atfork_prepare, sp_atfork_parent, sp_atfork_child );
@@ -563,6 +567,7 @@
         struct auth_method_info auth_methods[MAX_AUTH_METHODS];
 	int			p;
 	int			s;
+	int                     ses;
 	int			ret, i;
         unsigned int            len;
 	int			sp_v1, sp_v2, sp_v3;
@@ -906,12 +911,24 @@
 
 	Mutex_lock( &Struct_mutex );
 
-	Sessions[Num_sessions].mbox = s;
-	strcpy( Sessions[Num_sessions].private_group_name, private_group );
-        Sessions[Num_sessions].recv_message_saved = 0;
-        Sessions[Num_sessions].state = SESS_ACTIVE;
-	Num_sessions++;
+	if (Num_sessions >= MAX_LIB_SESSIONS) {
+	        Alarm( SESSION, "SP_connect: too many sessions in local process!\n");
+		close( s );
+		Mutex_unlock( &Struct_mutex );
+		return( REJECT_QUOTA );
+	}
 
+	++Num_sessions;
+
+	/* find an unused session structure */
+
+	for ( ses = MBOX_TO_BASE_SES(s); Sessions[ses].state != SESS_UNUSED; ses = (ses != MAX_LIB_SESSIONS - 1 ? ses + 1 : 0)) {}
+
+	Sessions[ses].mbox = s;
+        Sessions[ses].state = SESS_ACTIVE;
+	strcpy( Sessions[ses].private_group_name, private_group );
+        Sessions[ses].recv_message_saved = 0;
+
 	Mutex_unlock( &Struct_mutex );
 
 	return( ACCEPT_SESSION );
@@ -1073,6 +1090,7 @@
 	int		i;
         int             buf_len;
 	int		ret;
+	int             tmp;
 
         /* zero head_buf to avoid information leakage */
         memset( head_buf, 0, sizeof(message_header) + MAX_GROUP_NAME*num_groups );
@@ -1120,7 +1138,7 @@
 	head_ptr->data_len = mess_len;
 	memcpy( group_ptr, groups, MAX_GROUP_NAME * num_groups );
 
-	Mutex_lock( &Mbox_mutex[mbox&MAX_MUTEX_MASK][0] );
+	Mutex_lock( &Sessions[ses].send_mutex );
         for ( buf_len = 0; buf_len < sizeof(message_header)+MAX_GROUP_NAME*num_groups; buf_len += ret) 
         {
             while(((ret=send( mbox, &head_buf[buf_len], sizeof(message_header)+MAX_GROUP_NAME*num_groups - buf_len, 0 )) == -1) 
@@ -1130,17 +1148,18 @@
             {
 		Alarm( SESSION, "SP_internal_multicast: error %d sending header and groups on mailbox %d: %s \n", ret, mbox, sock_strerror(sock_errno));
                 Mutex_lock( &Struct_mutex );
-                ses = SP_get_session( mbox );
-                if( ses < 0 ){
-                    Alarmp( SPLOG_INFO, SESSION, "SP_internal_multicast: Session disappeared on us, possible in threaded apps\n");
+                tmp = SP_get_session( mbox );
+                if( tmp != ses ){
+                    Alarmp( SPLOG_INFO, SESSION, "SP_internal_multicast: Session disappeared on us, possible in threaded apps that "
+			    "don't externally synchronize use and destruction of mailboxes properly (user race condition)!!!\n");
                     Mutex_unlock( &Struct_mutex );
-                    Mutex_unlock( &Mbox_mutex[mbox&MAX_MUTEX_MASK][0] );
+                    Mutex_unlock( &Sessions[ses].send_mutex );
                     return( CONNECTION_CLOSED );
                 }
                 Sessions[ses].state = SESS_ERROR;
                 Mutex_unlock( &Struct_mutex );
 
-		Mutex_unlock( &Mbox_mutex[mbox&MAX_MUTEX_MASK][0] );
+		Mutex_unlock( &Sessions[ses].send_mutex );
 		return( CONNECTION_CLOSED );
             }
         }
@@ -1155,22 +1174,23 @@
 		{
 			Alarm( SESSION, "SP_internal_multicast: error %d sending message data on mailbox %d: %s \n", ret, mbox, sock_strerror(sock_errno));
                         Mutex_lock( &Struct_mutex );
-                        ses = SP_get_session( mbox );
-                        if( ses < 0 ){
-                            Alarmp( SPLOG_INFO, SESSION, "SP_internal_multicast: Session disappeared on us, possible in threaded apps\n");
+                        tmp = SP_get_session( mbox );
+                        if( tmp != ses ){
+			    Alarmp( SPLOG_INFO, SESSION, "SP_internal_multicast: Session disappeared on us, possible in threaded apps that "
+				    "don't externally synchronize use and destruction of mailboxes properly (user race condition)\n");
                             Mutex_unlock( &Struct_mutex );
-                            Mutex_unlock( &Mbox_mutex[mbox&MAX_MUTEX_MASK][0] );
+                            Mutex_unlock( &Sessions[ses].send_mutex );
                             return( CONNECTION_CLOSED );
                         }
                         Sessions[ses].state = SESS_ERROR;
                         Mutex_unlock( &Struct_mutex );
 
-			Mutex_unlock( &Mbox_mutex[mbox&MAX_MUTEX_MASK][0] );
+			Mutex_unlock( &Sessions[ses].send_mutex );
 			return( CONNECTION_CLOSED );
 		}
             }
 	}
-	Mutex_unlock( &Mbox_mutex[mbox&MAX_MUTEX_MASK][0] );
+	Mutex_unlock( &Sessions[ses].send_mutex );
 	return( len );
 }
 
@@ -1214,37 +1234,60 @@
 	char		This_session_private_group[MAX_GROUP_NAME];
 	int		i;
         int32           old_type;
+	int             tmp;
 
-        /* I must acquire the lock for this mbox before the Struct_mutex lock because
-         * I must be sure ONLY one thread is in recv for this mbox, EVEN for 
-         * this initial 'get the session and session state' operation.
-         * Otherwise one thread enters this and gets the state and sees no saved message
-         * then grabs the mbox mutex and discoveres buffer too short and so regrabs the
-         * Struct_mutex and adds the saved header, but during this time another thread
-         * has entered recv for the same mbox and already grabbed the struct_mutex and also
-         * read that no saved mesage exists and is now waiting for the mbox mutex.
-         * When it the first thread returns and releases the mbox mutex, the second thread will
-         * grab it and enter--but it will think there is NO saved messaage when in reality
+	/* lookup and validate the session */
+
+	Mutex_lock( &Struct_mutex );
+
+	ses = SP_get_session( mbox );
+
+	if( ses < 0 ) {
+	  Mutex_unlock( &Struct_mutex );
+	  return( ILLEGAL_SESSION );
+	}
+
+	if( Sessions[ses].state != SESS_ACTIVE ) {
+	  Mutex_unlock( &Struct_mutex );
+	  return( NET_ERROR_ON_SESSION );
+	}
+
+	Mutex_unlock( &Struct_mutex );
+
+        /* I must acquire the recv lock for this mbox before the
+         * Struct_mutex lock because I must be sure ONLY one thread is
+         * truly in recv for this mbox. Otherwise, one thread enters
+         * this and gets the state and sees no saved message then
+         * grabs the recv lock and discovers buffer too short and
+         * so regrabs the Struct_mutex and adds the saved header, but
+         * during this time another thread has entered recv for the
+         * same mbox and already grabbed the Struct_mutex and also
+         * seen that no saved mesage exists and is now waiting for the
+         * recv mutex.  When the first thread returns and releases the
+         * recv mutex, the second thread will grab it and enter--but
+         * it will think there is NO saved messaage when in reality
          * there IS one. This will cause MANY PROBLEMS :-)
          *
-         * NOTE: locking and unlocking the Struct_mutex multiple times during this is OK
-         * BECAUSE struct_Mutex only locks non-blocking operations that are guaranteed to complete
+         * NOTE: locking and unlocking the Struct_mutex multiple times
+         * during this is OK BECAUSE Struct_mutex only locks
+         * non-blocking operations that are guaranteed to complete
          * quickly and never take additional locks.
          */
-	Mutex_lock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
 
+	Mutex_lock( &Sessions[ses].recv_mutex );
+
 	Mutex_lock( &Struct_mutex );
-	/* verify mbox */
-	ses = SP_get_session( mbox );
-	if( ses < 0 ){
+
+	tmp = SP_get_session( mbox );
+	if( tmp != ses ){
 		Mutex_unlock( &Struct_mutex );
-                Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
+                Mutex_unlock( &Sessions[ses].recv_mutex );
 		return( ILLEGAL_SESSION );
 	}
 
         if( Sessions[ses].state != SESS_ACTIVE ) {
 		Mutex_unlock( &Struct_mutex );
-                Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
+                Mutex_unlock( &Sessions[ses].recv_mutex );
 		return( NET_ERROR_ON_SESSION );
 	}
 
@@ -1275,17 +1318,18 @@
                         {
                                 Alarm( SESSION, "SP_scat_receive: failed receiving header on session %d (ret: %d len: %d): %s\n", mbox, ret, len, sock_strerror(sock_errno) );
                                 Mutex_lock( &Struct_mutex );
-                                ses = SP_get_session( mbox );
-                                if( ses < 0 ){
-                                    Alarmp( SPLOG_INFO, SESSION, "SP_scat_receive: Session disappeared on us, possible in threaded apps\n");
+                                tmp = SP_get_session( mbox );
+                                if( tmp != ses ){
+				    Alarmp( SPLOG_INFO, SESSION, "SP_scat_receive: Session disappeared on us, possible in threaded apps that "
+					    "don't externally synchronize use and destruction of mailboxes properly (user race condition)!!!\n");
                                     Mutex_unlock( &Struct_mutex );
-                                    Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
+				    Mutex_unlock( &Sessions[ses].recv_mutex );
                                     return( CONNECTION_CLOSED );
                                 }
                                 Sessions[ses].state = SESS_ERROR;
                                 Mutex_unlock( &Struct_mutex );
 
-                                Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
+				Mutex_unlock( &Sessions[ses].recv_mutex );
                                 return( CONNECTION_CLOSED );
                         }
                 }
@@ -1301,17 +1345,17 @@
                 if ( scat_mess->elements[i].len < 0 )   {
                         if ( !drop_semantics && !This_session_message_saved) {
                                 Mutex_lock( &Struct_mutex );
-                                ses = SP_get_session( mbox );
-                                if( ses < 0 ){
+                                tmp = SP_get_session( mbox );
+                                if( tmp != ses ){
                                         Mutex_unlock( &Struct_mutex );
-                                        Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
+					Mutex_unlock( &Sessions[ses].recv_mutex );
                                         return( ILLEGAL_SESSION );
                                 }
                                 memcpy(&(Sessions[ses].recv_saved_head), &mess_head, sizeof(message_header) );
                                 Sessions[ses].recv_message_saved = 1;
                                 Mutex_unlock( &Struct_mutex );
                         }
-                        Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
+			Mutex_unlock( &Sessions[ses].recv_mutex );
                         return( ILLEGAL_MESSAGE );
                 }
 		max_mess_len += scat_mess->elements[i].len;
@@ -1321,14 +1365,14 @@
             /* reject this message since it has an impossible (negative) num_groups
              * This is likely to be caused by a malicious attack or memory corruption
              */
-            Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
+	    Mutex_unlock( &Sessions[ses].recv_mutex );
             return( ILLEGAL_MESSAGE );
         }
         if (head_ptr->data_len < 0) {
             /* reject this message since it has an impossible (negative) data_len
              * This is likely to be caused by a malicious attack or memory corruption
              */
-            Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
+	    Mutex_unlock( &Sessions[ses].recv_mutex );
             return( ILLEGAL_MESSAGE );
         }
 
@@ -1337,10 +1381,10 @@
                 if ( (head_ptr->num_groups > max_groups) || (head_ptr->data_len > max_mess_len) ) {
                         if (!This_session_message_saved) {
                                 Mutex_lock( &Struct_mutex );
-                                ses = SP_get_session( mbox );
-                                if( ses < 0 ){
+                                tmp = SP_get_session( mbox );
+                                if( tmp != ses ){
                                         Mutex_unlock( &Struct_mutex );
-                                        Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
+					Mutex_unlock( &Sessions[ses].recv_mutex );
                                         return( ILLEGAL_SESSION );
                                 }
                                 memcpy(&(Sessions[ses].recv_saved_head), &mess_head, sizeof(message_header) );
@@ -1384,7 +1428,7 @@
                         /* Return sender field to caller */
                         strncpy( sender, head_ptr->private_group_name, MAX_GROUP_NAME );
 
-                        Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
+			Mutex_unlock( &Sessions[ses].recv_mutex );
                         if (*num_groups)
                                 return( GROUPS_TOO_SHORT );
                         else
@@ -1425,17 +1469,18 @@
                                 Alarm( SESSION, "SP_scat_receive: failed receiving old_type for reject on session %d, ret is %d: %s\n", mbox, ret, sock_strerror(sock_errno));
 
                                 Mutex_lock( &Struct_mutex );
-                                ses = SP_get_session( mbox );
-                                if( ses < 0 ){
-                                    Alarmp( SPLOG_INFO, SESSION, "SP_scat_receive: Session disappeared on us, possible in threaded apps\n");
+                                tmp = SP_get_session( mbox );
+                                if( tmp != ses ){
+				    Alarmp( SPLOG_INFO, SESSION, "SP_scat_receive: Session disappeared on us, possible in threaded apps that "
+					    "don't externally synchronize use and destruction of mailboxes properly (user race condition)!!!\n");
                                     Mutex_unlock( &Struct_mutex );
-                                    Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
+				    Mutex_unlock( &Sessions[ses].recv_mutex );
                                     return( CONNECTION_CLOSED );
                                 }
                                 Sessions[ses].state = SESS_ERROR;
                                 Mutex_unlock( &Struct_mutex );
 
-                                Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
+				Mutex_unlock( &Sessions[ses].recv_mutex );
                                 return( CONNECTION_CLOSED );
                         }
                 }
@@ -1465,17 +1510,18 @@
 			Alarm( SESSION, "SP_scat_receive: failed receiving groups on session %d, ret is %d: %s\n", mbox, ret, sock_strerror(sock_errno));
 
                         Mutex_lock( &Struct_mutex );
-                        ses = SP_get_session( mbox );
-                        if( ses < 0 ){
-                            Alarmp( SPLOG_INFO, SESSION, "SP_scat_receive: Session disappeared on us, possible in threaded apps\n");
+                        tmp = SP_get_session( mbox );
+                        if( tmp != ses ){
+				    Alarmp( SPLOG_INFO, SESSION, "SP_scat_receive: Session disappeared on us, possible in threaded apps that "
+					    "don't externally synchronize use and destruction of mailboxes properly (user race condition)!!!\n");
                             Mutex_unlock( &Struct_mutex );
-                            Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
+			    Mutex_unlock( &Sessions[ses].recv_mutex );
                             return( CONNECTION_CLOSED );
                         }
                         Sessions[ses].state = SESS_ERROR;
                         Mutex_unlock( &Struct_mutex );
 
-			Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
+			Mutex_unlock( &Sessions[ses].recv_mutex );
 			return( CONNECTION_CLOSED );
 		}
 	}
@@ -1495,17 +1541,18 @@
                                        mbox, ret, sock_strerror(sock_errno) );
 
                                 Mutex_lock( &Struct_mutex );
-                                ses = SP_get_session( mbox );
-                                if( ses < 0 ){
-                                    Alarmp( SPLOG_INFO, SESSION, "SP_scat_receive: Session disappeared on us, possible in threaded apps\n");
+                                tmp = SP_get_session( mbox );
+                                if( tmp != ses ){
+				    Alarmp( SPLOG_INFO, SESSION, "SP_scat_receive: Session disappeared on us, possible in threaded apps that "
+					    "don't externally synchronize use and destruction of mailboxes properly (user race condition)!!!\n");
                                     Mutex_unlock( &Struct_mutex );
-                                    Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
+				    Mutex_unlock( &Sessions[ses].recv_mutex );
                                     return( CONNECTION_CLOSED );
                                 }
                                 Sessions[ses].state = SESS_ERROR;
                                 Mutex_unlock( &Struct_mutex );
 
-				Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
+				Mutex_unlock( &Sessions[ses].recv_mutex );
 				return( CONNECTION_CLOSED );
 			}
 		}
@@ -1531,7 +1578,7 @@
          * {
          *      Alarm( SESSION, "SP_scat_receive: failed receiving message on session %d\n", mbox );
 	 *
-	 *	Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
+	 *      Mutex_unlock( &Sessions[ses].recv_mutex );
 	 *
          *      SP_kill( mbox );
          *      return;
@@ -1559,17 +1606,18 @@
                                mbox, ret, sock_strerror(sock_errno) );
 
                         Mutex_lock( &Struct_mutex );
-                        ses = SP_get_session( mbox );
-                        if( ses < 0 ){
-                            Alarmp( SPLOG_INFO, SESSION, "SP_scat_receive: Session disappeared on us, possible in threaded apps\n");
+                        tmp = SP_get_session( mbox );
+                        if( tmp != ses ){
+				    Alarmp( SPLOG_INFO, SESSION, "SP_scat_receive: Session disappeared on us, possible in threaded apps that "
+					    "don't externally synchronize use and destruction of mailboxes properly (user race condition)!!!\n");
                             Mutex_unlock( &Struct_mutex );
-                            Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
+			    Mutex_unlock( &Sessions[ses].recv_mutex );
                             return( CONNECTION_CLOSED );
                         }
                         Sessions[ses].state = SESS_ERROR;
                         Mutex_unlock( &Struct_mutex );
 
-			Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
+			Mutex_unlock( &Sessions[ses].recv_mutex );
 			return( CONNECTION_CLOSED );
 		}else if( ret == to_read ){
 			byte_index = 0;
@@ -1732,30 +1780,31 @@
                                        mbox, ret, sock_strerror(sock_errno) );
 
                                 Mutex_lock( &Struct_mutex );
-                                ses = SP_get_session( mbox );
-                                if( ses < 0 ){
-                                    Alarmp( SPLOG_INFO, SESSION, "SP_scat_receive: Session disappeared on us, possible in threaded apps\n");
+                                tmp = SP_get_session( mbox );
+                                if( tmp != ses ){
+				    Alarmp( SPLOG_INFO, SESSION, "SP_scat_receive: Session disappeared on us, possible in threaded apps that "
+					    "don't externally synchronize use and destruction of mailboxes properly (user race condition)!!!\n");
                                     Mutex_unlock( &Struct_mutex );
-                                    Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
+				    Mutex_unlock( &Sessions[ses].recv_mutex );
                                     return( CONNECTION_CLOSED );
                                 }
                                 Sessions[ses].state = SESS_ERROR;
                                 Mutex_unlock( &Struct_mutex );
 
-				Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
+				Mutex_unlock( &Sessions[ses].recv_mutex );
 				return( CONNECTION_CLOSED );
 			}
 		}
-		Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
+		Mutex_unlock( &Sessions[ses].recv_mutex );
 		return( BUFFER_TOO_SHORT );
 	}
         /* Successful receive so clear saved_message info if any */
         if (This_session_message_saved) {
                 Mutex_lock( &Struct_mutex );
-                ses = SP_get_session( mbox );
-                if( ses < 0 ){
+                tmp = SP_get_session( mbox );
+                if( tmp != ses ){
                         Mutex_unlock( &Struct_mutex );
-                        Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
+			Mutex_unlock( &Sessions[ses].recv_mutex );
                         return( ILLEGAL_SESSION );
                 }
                 memset(&(Sessions[ses].recv_saved_head), 0, sizeof(message_header) );
@@ -1763,7 +1812,7 @@
                 Mutex_unlock( &Struct_mutex );
         }
 
-	Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
+	Mutex_unlock( &Sessions[ses].recv_mutex );
 	return( head_ptr->data_len );
 }
 
@@ -2036,21 +2085,23 @@
 void	SP_kill( mailbox mbox )
 {
 	int	ses;
-	int	i;
 
 	Mutex_lock( &Struct_mutex );
 
 	/* get mbox out of the data structures */
+
 	ses = SP_get_session( mbox );
+
 	if( ses < 0 ){ 
-		Alarm( SESSION, "SP_kill: killing non existent session for mailbox %d (might be ok in a threaded case)\n",mbox );
+		Alarm( SESSION, "SP_kill: killing a non existent session for mailbox %d (likely race condition)!!!\n", mbox );
 		Mutex_unlock( &Struct_mutex );
 		return;
 	}
 
+	Sessions[ses].mbox  = -1;
+	Sessions[ses].state = SESS_UNUSED;
 	close(mbox);
-	for( i=ses+1; i < Num_sessions; i++ )
-		memcpy( &Sessions[i-1], &Sessions[i], sizeof(sp_session) );
+
 	Num_sessions--;
 
 	Mutex_unlock( &Struct_mutex );
@@ -2058,14 +2109,24 @@
 
 static	int	SP_get_session( mailbox mbox )
 {
-	int ses;
+        int ses      = MBOX_TO_BASE_SES(mbox);
+        int base_ses = ses;
 
-	for( ses=0; ses < Num_sessions; ses++ )
-	{
-		if( Sessions[ses].mbox == mbox ) return( ses );
+	while( Sessions[ses].mbox != mbox ) {
+
+	        if( ++ses == MAX_LIB_SESSIONS ) {
+		        ses = 0;         /* wrap around to beginning of array */
+		}
+
+		if( ses == base_ses ) {  /* searched entire array */
+		        ses = -1;        /* not found */
+			break;
+		}
 	}
-	return( -1 );
+
+	return ses;
 }
+
 void	SP_error( int error )
 {
 	switch( error )
@@ -2077,7 +2138,7 @@
 			Alarm( PRINT, "SP_error: (%d) Could not connect. Is Spread running?\n", error );
 			break;
 		case REJECT_QUOTA:
-			Alarm( PRINT, "SP_error: (%d) Connection rejected, to many users\n", error );
+			Alarm( PRINT, "SP_error: (%d) Connection rejected, too many users\n", error );
 			break;
 		case REJECT_NO_NAME:
 			Alarm( PRINT, "SP_error: (%d) Connection rejected, no name was supplied\n", error );




More information about the Spread-cvs mailing list