[Spread-cvs] commit: r261 - in trunk: daemon include

jonathan at spread.org jonathan at spread.org
Mon Aug 8 00:13:54 EDT 2005


Author: jonathan
Date: 2005-08-08 00:13:53 -0400 (Mon, 08 Aug 2005)
New Revision: 261

Modified:
   trunk/daemon/Changelog
   trunk/daemon/sess_types.h
   trunk/daemon/sp.c
   trunk/include/sp.h
   trunk/include/sp_func.h
Log:
Make SP_kill() a public function so apps
can close unneeded FD/mboxes without disconnecting. Also
change socket error handling so sockets are not closed when
errors occur, but app is notified and must call 
SP_disconnect() to clean up mbox before reusing. This helps
threaded apps correctly handle closed fd's without races.
                                                      


Modified: trunk/daemon/Changelog
===================================================================
--- trunk/daemon/Changelog	2005-08-07 23:38:32 UTC (rev 260)
+++ trunk/daemon/Changelog	2005-08-08 04:13:53 UTC (rev 261)
@@ -1,3 +1,12 @@
+Mon Aug  8 00:11:24 2005  Jonathan Stanton  <jonathan at cnds.jhu.edu>
+
+	* sp.c (SP_kill): Make SP_kill() a public function so apps
+	can close unneeded FD/mboxes without disconnecting. Also
+	change socket error handling so sockets are not closed when
+	errors occur, but app is notified and must call 
+	SP_disconnect() to clean up mbox before reusing. This helps
+	threaded apps correctly handle closed fd's without races.
+
 Sun Aug  7 19:36:54 2005  Jonathan Stanton  <jonathan at cnds.jhu.edu>
 
 	* sp.c (SP_get_local_vs_set_offset_memb_scat): Add function

Modified: trunk/daemon/sess_types.h
===================================================================
--- trunk/daemon/sess_types.h	2005-08-07 23:38:32 UTC (rev 260)
+++ trunk/daemon/sess_types.h	2005-08-08 04:13:53 UTC (rev 261)
@@ -138,6 +138,7 @@
 #define		BUFFER_TOO_SHORT	-15
 #define         GROUPS_TOO_SHORT        -16
 #define         MESSAGE_TOO_LONG        -17
+#define         NET_ERROR_ON_SESSION    -18
 
 typedef	struct	dummy_message_header {
 	int32u	type;

Modified: trunk/daemon/sp.c
===================================================================
--- trunk/daemon/sp.c	2005-08-07 23:38:32 UTC (rev 260)
+++ trunk/daemon/sp.c	2005-08-08 04:13:53 UTC (rev 261)
@@ -68,8 +68,15 @@
 #include "sp_func.h"
 #include "acm.h"
 
+enum sp_sess_state {
+    UNUSED,
+    ACTIVE,
+    ERROR,
+};
+
 typedef	struct	dummy_sp_session {
 	mailbox	mbox;
+        enum sp_sess_state state;
 	char	private_group_name[MAX_GROUP_NAME];
         message_header  recv_saved_head;
         int     recv_message_saved;
@@ -106,7 +113,6 @@
 static  sp_time         Zero_timeout = { 0, 0 };
 
 static	void    Flip_mess( message_header *head_ptr );
-static  void    SP_kill( mailbox mbox );
 static	int	SP_get_session( mailbox mbox );
 static	int	SP_internal_multicast( mailbox mbox, service service_type, 
 				       int num_groups,
@@ -814,6 +820,7 @@
 	Sessions[Num_sessions].mbox = s;
 	strcpy( Sessions[Num_sessions].private_group_name, private_group );
         Sessions[Num_sessions].recv_message_saved = 0;
+        Sessions[Num_sessions].state = ACTIVE;
 	Num_sessions++;
 
 	Mutex_unlock( &Struct_mutex );
@@ -989,6 +996,11 @@
 		return( ILLEGAL_SESSION );
 	}
 
+        if( Sessions[ses].state != ACTIVE ) {
+		Mutex_unlock( &Struct_mutex );
+		return( NET_ERROR_ON_SESSION );
+	}
+
 	head_ptr = (message_header *)head_buf;
 	group_ptr = &head_buf[ sizeof(message_header) ];
 
@@ -1028,8 +1040,17 @@
             if( ret <=0 )
             {
 		Alarm( SESSION, "SP_internal_multicast: error %d sending header and groups on mailbox %d: %s \n", ret, mbox, sock_strerror(sock_errno));
+                Mutex_lock( &Struct_mutex );
+                ses = SP_get_session( mbox );
+                if( ses < 0 ){
+                    Alarmp( SPLOG_INFO, SESSION, "SP_internal_multicast: Session disappeared on us, possible in threaded apps\n");
+                    Mutex_unlock( &Struct_mutex );
+                    return( CONNECTION_CLOSED );
+                }
+                Sessions[ses].state = ERROR;
+                Mutex_unlock( &Struct_mutex );
+
 		Mutex_unlock( &Mbox_mutex[mbox&MAX_MUTEX_MASK][0] );
-		SP_kill( mbox );
 		return( CONNECTION_CLOSED );
             }
         }
@@ -1043,8 +1064,17 @@
 		if( ret < 0 )
 		{
 			Alarm( SESSION, "SP_internal_multicast: error %d sending message data on mailbox %d: %s \n", ret, mbox, sock_strerror(sock_errno));
+                        Mutex_lock( &Struct_mutex );
+                        ses = SP_get_session( mbox );
+                        if( ses < 0 ){
+                            Alarmp( SPLOG_INFO, SESSION, "SP_internal_multicast: Session disappeared on us, possible in threaded apps\n");
+                            Mutex_unlock( &Struct_mutex );
+                            return( CONNECTION_CLOSED );
+                        }
+                        Sessions[ses].state = ERROR;
+                        Mutex_unlock( &Struct_mutex );
+
 			Mutex_unlock( &Mbox_mutex[mbox&MAX_MUTEX_MASK][0] );
-			SP_kill( mbox );
 			return( CONNECTION_CLOSED );
 		}
             }
@@ -1120,6 +1150,13 @@
                 Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
 		return( ILLEGAL_SESSION );
 	}
+
+        if( Sessions[ses].state != ACTIVE ) {
+		Mutex_unlock( &Struct_mutex );
+                Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
+		return( NET_ERROR_ON_SESSION );
+	}
+
 	strcpy( This_session_private_group, Sessions[ses].private_group_name );
 
         if (Sessions[ses].recv_message_saved) {
@@ -1146,9 +1183,17 @@
                         if( ret <=0 )
                         {
                                 Alarm( SESSION, "SP_scat_receive: failed receiving header on session %d (ret: %d len: %d): %s\n", mbox, ret, len, sock_strerror(sock_errno) );
+                                Mutex_lock( &Struct_mutex );
+                                ses = SP_get_session( mbox );
+                                if( ses < 0 ){
+                                    Alarmp( SPLOG_INFO, SESSION, "SP_scat_receive: Session disappeared on us, possible in threaded apps\n");
+                                    Mutex_unlock( &Struct_mutex );
+                                    return( CONNECTION_CLOSED );
+                                }
+                                Sessions[ses].state = ERROR;
+                                Mutex_unlock( &Struct_mutex );
+
                                 Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
-
-                                SP_kill( mbox );
                                 return( CONNECTION_CLOSED );
                         }
                 }
@@ -1283,8 +1328,18 @@
                         if( ret <=0 )
                         {
                                 Alarm( SESSION, "SP_scat_receive: failed receiving old_type for reject on session %d, ret is %d: %s\n", mbox, ret, sock_strerror(sock_errno));
+
+                                Mutex_lock( &Struct_mutex );
+                                ses = SP_get_session( mbox );
+                                if( ses < 0 ){
+                                    Alarmp( SPLOG_INFO, SESSION, "SP_scat_receive: Session disappeared on us, possible in threaded apps\n");
+                                    Mutex_unlock( &Struct_mutex );
+                                    return( CONNECTION_CLOSED );
+                                }
+                                Sessions[ses].state = ERROR;
+                                Mutex_unlock( &Struct_mutex );
+
                                 Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
-                                SP_kill( mbox );
                                 return( CONNECTION_CLOSED );
                         }
                 }
@@ -1312,9 +1367,18 @@
 		if( ret <=0 )
 		{
 			Alarm( SESSION, "SP_scat_receive: failed receiving groups on session %d, ret is %d: %s\n", mbox, ret, sock_strerror(sock_errno));
+
+                        Mutex_lock( &Struct_mutex );
+                        ses = SP_get_session( mbox );
+                        if( ses < 0 ){
+                            Alarmp( SPLOG_INFO, SESSION, "SP_scat_receive: Session disappeared on us, possible in threaded apps\n");
+                            Mutex_unlock( &Struct_mutex );
+                            return( CONNECTION_CLOSED );
+                        }
+                        Sessions[ses].state = ERROR;
+                        Mutex_unlock( &Struct_mutex );
+
 			Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
-
-			SP_kill( mbox );
 			return( CONNECTION_CLOSED );
 		}
 	}
@@ -1332,8 +1396,18 @@
 			{
 				Alarm( SESSION, "SP_scat_receive: failed receiving groups overflow on session %d, ret is %d: %s\n", 
                                        mbox, ret, sock_strerror(sock_errno) );
+
+                                Mutex_lock( &Struct_mutex );
+                                ses = SP_get_session( mbox );
+                                if( ses < 0 ){
+                                    Alarmp( SPLOG_INFO, SESSION, "SP_scat_receive: Session disappeared on us, possible in threaded apps\n");
+                                    Mutex_unlock( &Struct_mutex );
+                                    return( CONNECTION_CLOSED );
+                                }
+                                Sessions[ses].state = ERROR;
+                                Mutex_unlock( &Struct_mutex );
+
 				Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
-				SP_kill( mbox );
 				return( CONNECTION_CLOSED );
 			}
 		}
@@ -1385,8 +1459,18 @@
 		{
 			Alarm( SESSION, "SP_scat_receive: failed receiving message on session %d, ret is %d: %s\n", 
                                mbox, ret, sock_strerror(sock_errno) );
+
+                        Mutex_lock( &Struct_mutex );
+                        ses = SP_get_session( mbox );
+                        if( ses < 0 ){
+                            Alarmp( SPLOG_INFO, SESSION, "SP_scat_receive: Session disappeared on us, possible in threaded apps\n");
+                            Mutex_unlock( &Struct_mutex );
+                            return( CONNECTION_CLOSED );
+                        }
+                        Sessions[ses].state = ERROR;
+                        Mutex_unlock( &Struct_mutex );
+
 			Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
-			SP_kill( mbox );
 			return( CONNECTION_CLOSED );
 		}else if( ret == to_read ){
 			byte_index = 0;
@@ -1547,8 +1631,18 @@
 			{
 				Alarm( SESSION, "SP_scat_receive: failed receiving overflow on session %d, ret is %d: %s\n", 
                                        mbox, ret, sock_strerror(sock_errno) );
+
+                                Mutex_lock( &Struct_mutex );
+                                ses = SP_get_session( mbox );
+                                if( ses < 0 ){
+                                    Alarmp( SPLOG_INFO, SESSION, "SP_scat_receive: Session disappeared on us, possible in threaded apps\n");
+                                    Mutex_unlock( &Struct_mutex );
+                                    return( CONNECTION_CLOSED );
+                                }
+                                Sessions[ses].state = ERROR;
+                                Mutex_unlock( &Struct_mutex );
+
 				Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
-				SP_kill( mbox );
 				return( CONNECTION_CLOSED );
 			}
 		}
@@ -1674,7 +1768,7 @@
 	return( -1 );
 }
 
-static  void	SP_kill( mailbox mbox )
+void	SP_kill( mailbox mbox )
 {
 	int	ses;
 	int	i;
@@ -1759,6 +1853,9 @@
 		case MESSAGE_TOO_LONG:
 			Alarm( PRINT, "SP_error: (%d) The message body + group names was too large to fit in a message\n", error );
 			break;
+		case NET_ERROR_ON_SESSION:
+			Alarm( PRINT, "SP_error: (%d) The network socket experienced an error. This Spread mailbox will no longer work until the connection is disconnected and then reconnected\n", error );
+			break;
 		default:
 			Alarm( PRINT, "SP_error: (%d) unrecognized error\n", error );
 	}

Modified: trunk/include/sp.h
===================================================================
--- trunk/include/sp.h	2005-08-07 23:38:32 UTC (rev 260)
+++ trunk/include/sp.h	2005-08-08 04:13:53 UTC (rev 261)
@@ -125,6 +125,7 @@
 #define		BUFFER_TOO_SHORT	-15
 #define         GROUPS_TOO_SHORT        -16
 #define         MESSAGE_TOO_LONG        -17
+#define         NET_ERROR_ON_SESSION    -18
 
 #define		MAX_CLIENT_SCATTER_ELEMENTS	100
 

Modified: trunk/include/sp_func.h
===================================================================
--- trunk/include/sp_func.h	2005-08-07 23:38:32 UTC (rev 260)
+++ trunk/include/sp_func.h	2005-08-08 04:13:53 UTC (rev 261)
@@ -53,6 +53,8 @@
 
 int	SP_disconnect( mailbox mbox );
 
+void    SP_kill( mailbox mbox );
+
 int	SP_join( mailbox mbox, const char *group );
 
 int	SP_leave( mailbox mbox, const char *group );




More information about the Spread-cvs mailing list