[Spread-cvs] commit: r473 - in trunk: daemon java/spread
jonathan at spread.org
jonathan at spread.org
Tue Feb 21 21:54:04 EST 2012
Author: jonathan
Date: 2012-02-21 21:54:03 -0500 (Tue, 21 Feb 2012)
New Revision: 473
Modified:
trunk/daemon/Changelog
trunk/java/spread/SpreadConnection.java
Log:
Commit Fix for several deadlock issues with java Listeners and fix for 100 ms corrupted messages reported on list.
Modified: trunk/daemon/Changelog
===================================================================
--- trunk/daemon/Changelog 2012-02-08 04:35:18 UTC (rev 472)
+++ trunk/daemon/Changelog 2012-02-22 02:54:03 UTC (rev 473)
@@ -1,3 +1,12 @@
+Mon Feb 20 21:35:49 2012 Jonathan Stanton <jonathan at spreadconcepts.com>
+
+ * SpreadConnection.java: Fix deadlock in stopListener where listener
+ thread can call stopListener and deadlock calling .join on itself.
+ Also fix Listener.run bug in the way internal_receive is called with a
+ 100 ms timeout. If the timeout triggers, loop back into receive as long
+ as the Listener thread was not signaled to exit, this prevents corrupted
+ messages when a receive is timed out.
+
Fri Jan 13 15:39:48 2012 John Schultz <jschultz at spreadconcepts.com>
* configure.in, events.c: Added a monotonic timer based on
Modified: trunk/java/spread/SpreadConnection.java
===================================================================
--- trunk/java/spread/SpreadConnection.java 2012-02-08 04:35:18 UTC (rev 472)
+++ trunk/java/spread/SpreadConnection.java 2012-02-22 02:54:03 UTC (rev 473)
@@ -318,6 +318,49 @@
}
+ // Reads from inputsocket until all bytes read or an exception raised which indicates the read is
+ // failing and the socket will be closed
+ //////////////////////////////////////////////////////////////////
+ private void readBytesFromSocketListener(byte buffer[], String bufferTypeString) throws SpreadException, InterruptedIOException
+ {
+ int byteIndex;
+ int rcode;
+ boolean keep_going = true;
+
+ byteIndex = 0;
+ while( keep_going == true )
+ {
+ keep_going = false;
+ try {
+ while( byteIndex < buffer.length)
+ {
+ rcode = socketInput.read(buffer, byteIndex, buffer.length - byteIndex);
+ if(rcode == -1)
+ {
+ throw new SpreadException("Connection closed while reading " + bufferTypeString);
+ }
+ byteIndex += rcode;
+ }
+
+ }
+ catch( InterruptedIOException e) {
+ if ( listener != null && listener.signal == true )
+ {
+ throw( e );
+ }
+ if ( e.bytesTransferred >= 0 ) {
+ byteIndex += e.bytesTransferred;
+ }
+ keep_going = true;
+ }
+ catch(IOException e)
+ {
+ throw new SpreadException("readBytesFromSocketListener(): read() " + e);
+ }
+ }
+
+ }
+
// Gets a string from an array of bytes.
////////////////////////////////////////
@@ -1044,23 +1087,15 @@
///////////////////
byte header[] = new byte[MAX_GROUP_NAME+16];
int headerIndex;
- int rcode;
- try
- {
- for(headerIndex = 0 ; headerIndex < header.length ; headerIndex += rcode)
- {
- rcode = socketInput.read(header, headerIndex, header.length - headerIndex);
- if(rcode == -1)
- {
- throw new SpreadException("Connection closed while reading header");
- }
- }
+
+ try
+ {
+ readBytesFromSocketListener( header, "message header" );
}
catch(InterruptedIOException e) {
throw e;
- }
- catch(IOException e)
- {
+ }
+ catch(IOException e) {
throw new SpreadException("read(): " + e);
}
@@ -1168,25 +1203,17 @@
{
// Read in the old type and or with reject type field.
byte oldtypeBuffer[] = new byte[4];
- try
- {
- for(int oldtypeIndex = 0 ; oldtypeIndex < oldtypeBuffer.length ; )
- {
- rcode = socketInput.read(oldtypeBuffer, oldtypeIndex, oldtypeBuffer.length - oldtypeIndex);
- if(rcode == -1)
- {
- throw new SpreadException("Connection closed while reading groups");
- }
- oldtypeIndex += rcode;
- }
- }
- catch(InterruptedIOException e) {
- throw e;
- }
- catch(IOException e)
- {
- throw new SpreadException("read(): " + e);
- }
+ try
+ {
+ readBytesFromSocketListener( oldtypeBuffer, "message reject type" );
+ }
+ catch(InterruptedIOException e) {
+ throw e;
+ }
+ catch(IOException e) {
+ throw new SpreadException("read(): " + e);
+ }
+
int oldType = toInt(oldtypeBuffer, 0);
if ( sameEndian(serviceType) == false )
oldType = flip(oldType);
@@ -1197,26 +1224,17 @@
// Read in the group names.
///////////////////////////
byte buffer[] = new byte[numGroups * MAX_GROUP_NAME];
- try
+ try
{
- for(int bufferIndex = 0 ; bufferIndex < buffer.length ; )
- {
- rcode = socketInput.read(buffer, bufferIndex, buffer.length - bufferIndex);
- if(rcode == -1)
- {
- throw new SpreadException("Connection closed while reading groups");
- }
- bufferIndex += rcode;
- }
+ readBytesFromSocketListener( buffer, "message group name" );
}
catch(InterruptedIOException e) {
throw e;
+ }
+ catch(IOException e) {
+ throw new SpreadException("read(): " + e);
}
- catch(IOException e)
- {
- throw new SpreadException("read(): " + e);
- }
-
+
// Clear the endian type.
/////////////////////////
serviceType = clearEndian(serviceType);
@@ -1234,26 +1252,17 @@
// Read in the data.
////////////////////
byte data[] = new byte[dataLen];
- try
+ try
{
- for(int dataIndex = 0 ; dataIndex < dataLen ; )
- {
- rcode = socketInput.read(data, dataIndex, dataLen - dataIndex);
- if(rcode == -1)
- {
- throw new SpreadException("Connection close while reading data");
- }
- dataIndex += rcode;
- }
+ readBytesFromSocketListener( data, "message body" );
}
catch(InterruptedIOException e) {
throw e;
+ }
+ catch(IOException e) {
+ throw new SpreadException("read(): " + e);
}
- catch(IOException e)
- {
- throw new SpreadException("read():" + e);
- }
-
+
// Is it a membership message?
//////////////////////////////
MembershipInfo membershipInfo;
@@ -1481,11 +1490,14 @@
// Wait for the thread to die, to avoid inconsistencies.
////////////////////////////////////////////////////////
- try {
- listener.join();
- }
- catch ( InterruptedException e ) {
- // Ignore
+ if ( listener != null && ! listener.equals(Thread.currentThread()) )
+ {
+ try {
+ listener.join();
+ }
+ catch ( InterruptedException e ) {
+ // Ignore
+ }
}
// Clear the variable.
//////////////////////
@@ -1671,9 +1683,25 @@
// WE WILL BLOCK HERE UNTIL DATA IS AVAILABLE
// or 100 MS expires
/////////////////
+ message = null;
try {
+
synchronized(rsynchro) {
- message = connection.internal_receive();
+ boolean keep_going = true;
+ while( keep_going == true )
+ {
+ keep_going = false;
+ try {
+ message = connection.internal_receive();
+ }
+ catch( InterruptedIOException e) {
+ if ( signal == true )
+ {
+ throw( e );
+ }
+ keep_going = true;
+ }
+ }
}
// Calling listeners.
/////////////////////
More information about the Spread-cvs
mailing list