[Spread-cvs] commit: r473 - in trunk: daemon java/spread

jonathan at spread.org jonathan at spread.org
Tue Feb 21 21:54:04 EST 2012


Author: jonathan
Date: 2012-02-21 21:54:03 -0500 (Tue, 21 Feb 2012)
New Revision: 473

Modified:
   trunk/daemon/Changelog
   trunk/java/spread/SpreadConnection.java
Log:
Commit Fix for several deadlock issues with java Listeners and fix for 100 ms corrupted messages reported on list.

Modified: trunk/daemon/Changelog
===================================================================
--- trunk/daemon/Changelog	2012-02-08 04:35:18 UTC (rev 472)
+++ trunk/daemon/Changelog	2012-02-22 02:54:03 UTC (rev 473)
@@ -1,3 +1,12 @@
+Mon Feb 20 21:35:49 2012  Jonathan Stanton  <jonathan at spreadconcepts.com>
+
+	* SpreadConnection.java: Fix deadlock in stopListener where listener
+	thread can call stopListener and deadlock calling .join on itself. 
+	Also fix Listener.run bug in the way internal_receive is called with a
+	100 ms timeout. If the timeout triggers, loop back into receive as long
+	as the Listener thread was not signaled to exit, this prevents corrupted
+	messages when a receive is timed out. 
+
 Fri Jan 13 15:39:48 2012  John Schultz <jschultz at spreadconcepts.com>
 
 	* configure.in, events.c: Added a monotonic timer based on

Modified: trunk/java/spread/SpreadConnection.java
===================================================================
--- trunk/java/spread/SpreadConnection.java	2012-02-08 04:35:18 UTC (rev 472)
+++ trunk/java/spread/SpreadConnection.java	2012-02-22 02:54:03 UTC (rev 473)
@@ -318,6 +318,49 @@
 
     }
 
+    // Reads from inputsocket until all bytes read or an exception raised which indicates the read is
+    // failing and the socket will be closed
+    //////////////////////////////////////////////////////////////////
+    private void readBytesFromSocketListener(byte buffer[], String bufferTypeString) throws SpreadException, InterruptedIOException
+    {
+	int byteIndex;
+	int rcode;
+	boolean keep_going = true;
+
+	byteIndex = 0;
+	while( keep_going == true )
+	{
+	    keep_going = false;
+	    try {
+		while( byteIndex < buffer.length) 
+		{
+		    rcode = socketInput.read(buffer, byteIndex, buffer.length - byteIndex);
+		    if(rcode == -1)
+		    {
+			throw new SpreadException("Connection closed while reading " + bufferTypeString);
+		    }
+		    byteIndex += rcode;
+		}
+
+	    } 
+	    catch( InterruptedIOException e) {
+		if ( listener != null && listener.signal == true )
+		{
+		    throw( e );
+		}
+		if ( e.bytesTransferred >= 0 ) {
+		    byteIndex += e.bytesTransferred;
+		}
+		keep_going = true;
+	    }
+	    catch(IOException e)
+	    {
+		throw new SpreadException("readBytesFromSocketListener(): read() " + e);
+	    }
+	}
+
+    }
+
 	
 	// Gets a string from an array of bytes.
 	////////////////////////////////////////
@@ -1044,23 +1087,15 @@
 		///////////////////
 		byte header[] = new byte[MAX_GROUP_NAME+16];
 		int headerIndex;
-		int rcode;
-		try
-		{
-			for(headerIndex = 0 ; headerIndex < header.length ; headerIndex += rcode)
-			{
-				rcode = socketInput.read(header, headerIndex, header.length - headerIndex);
-				if(rcode == -1)
-				{
-					throw new SpreadException("Connection closed while reading header");
-				}
-			}
+
+		try 
+	        {
+		    readBytesFromSocketListener( header, "message header" );
 		}
 		catch(InterruptedIOException e) {
 		    throw e;
-		}
-		catch(IOException e)
-		{
+		} 
+		catch(IOException e) {
 			throw new SpreadException("read(): " + e);
 		}
 		
@@ -1168,25 +1203,17 @@
                 {
                         // Read in the old type and or with reject type field.
                         byte oldtypeBuffer[] = new byte[4];
-                        try
-                        {
-                                for(int oldtypeIndex = 0 ; oldtypeIndex < oldtypeBuffer.length ; )
-                                {
-                                        rcode = socketInput.read(oldtypeBuffer, oldtypeIndex, oldtypeBuffer.length - oldtypeIndex);
-                                        if(rcode == -1)
-                                        {
-                                                throw new SpreadException("Connection closed while reading groups");
-                                        }
-                                        oldtypeIndex += rcode;
-                                }
-                        }
-                        catch(InterruptedIOException e) {
-                                throw e;
-                        }
-                        catch(IOException e)
-                        {
-                                throw new SpreadException("read(): " + e);
-                        }
+			try 
+		        {
+			    readBytesFromSocketListener( oldtypeBuffer, "message reject type" );
+			}
+			catch(InterruptedIOException e) {
+			    throw e;
+			} 
+			catch(IOException e) {
+			    throw new SpreadException("read(): " + e);
+			}
+
                         int oldType = toInt(oldtypeBuffer, 0);
                         if ( sameEndian(serviceType) == false )
                             oldType = flip(oldType);
@@ -1197,26 +1224,17 @@
 		// Read in the group names.
 		///////////////////////////
 		byte buffer[] = new byte[numGroups * MAX_GROUP_NAME];
-		try
+		try 
 		{
-			for(int bufferIndex = 0 ; bufferIndex < buffer.length ; )
-			{
-				rcode = socketInput.read(buffer, bufferIndex, buffer.length - bufferIndex);
-				if(rcode == -1)
-				{
-					throw new SpreadException("Connection closed while reading groups");
-				}
-				bufferIndex += rcode;
-			}
+		    readBytesFromSocketListener( buffer, "message group name" );
 		}
 		catch(InterruptedIOException e) {
 		    throw e;
+		} 
+		catch(IOException e) {
+		    throw new SpreadException("read(): " + e);
 		}
-		catch(IOException e)
-		{
-			throw new SpreadException("read(): " + e);
-		}
-		
+
 		// Clear the endian type.
 		/////////////////////////
 		serviceType = clearEndian(serviceType);
@@ -1234,26 +1252,17 @@
 		// Read in the data.
 		////////////////////
 		byte data[] = new byte[dataLen];
-		try
+		try 
 		{
-			for(int dataIndex = 0 ; dataIndex < dataLen ; )
-			{
-				rcode = socketInput.read(data, dataIndex, dataLen - dataIndex);
-				if(rcode == -1)
-				{
-					throw new SpreadException("Connection close while reading data");
-				}
-				dataIndex += rcode;
-			}
+		    readBytesFromSocketListener( data, "message body" );
 		}
 		catch(InterruptedIOException e) {
 		    throw e;
+		} 
+		catch(IOException e) {
+		    throw new SpreadException("read(): " + e);
 		}
-		catch(IOException e)
-		{
-			throw new SpreadException("read():" + e);
-		}
-		
+
 		// Is it a membership message?
 		//////////////////////////////
 		MembershipInfo membershipInfo;
@@ -1481,11 +1490,14 @@
 		
 		// Wait for the thread to die, to avoid inconsistencies.
 		////////////////////////////////////////////////////////
-		try {
-		    listener.join();
-		} 
-		catch ( InterruptedException e ) {
-		    // Ignore
+		if ( listener != null && ! listener.equals(Thread.currentThread()) ) 
+		{
+		    try {
+			listener.join();
+		    } 
+		    catch ( InterruptedException e ) {
+			// Ignore
+		    }
 		}
 		// Clear the variable.
 		//////////////////////
@@ -1671,9 +1683,25 @@
 						// WE WILL BLOCK HERE UNTIL DATA IS AVAILABLE
 						// or 100 MS expires
 						/////////////////
+    					        message = null;
 						try {
+					
 						    synchronized(rsynchro) {
-							message = connection.internal_receive();
+							boolean keep_going = true;
+							while( keep_going == true )
+							{
+							    keep_going = false;
+							    try {
+								message = connection.internal_receive();
+							    } 
+							    catch( InterruptedIOException e) {
+								if ( signal  == true )
+								{
+								    throw( e );
+								}
+								keep_going = true;
+							    }
+							}
 						    }
 						    // Calling listeners.
 						    /////////////////////




More information about the Spread-cvs mailing list