[Spread-users] Large Objects

Jonathan Stanton jonathan at cnds.jhu.edu
Wed Dec 8 09:56:48 EST 2004


Hi, 

The subclassed version was created by John Shultz and Jacob Green. They 
originally posted it in this message:

http://lists.spread.org/pipermail/spread-users/2002-May/000784.html

but it's a bit hard to decode MIME in a webbrowser, so I've attached it 
to this email as well.

Cheers,
Jonathan


On Mon, Dec 06, 2004 at 04:17:56PM -0500, Richard Boehme wrote:
> Hi there. I was searching the list archives for a way to send a file 
> when I bumped into a message from September 2001 that said:
> 
> Subject: [Spread-users] Large Objects and TCP vs. Spread (was Public CVS?)
> 
> ...snip...
> 
> Since Spread has a max message size(about 100 KB) suporting really large
> objects requires some application level handling, which can make a tcp
> stream sound appealing. (For Java we have a subclassed version of
> SpreadConnection that supports arbitrarily large objects that is
> available)
> 
> Does anynone know where that subclassed version of SpreadConnection is? 
> I can't seem to find it in the javadocs.
> 
> We have a computer on the network downloading a large update (let's say 
> about 100 or 200 MB in total) that then needs to be multicast to all of 
> the clients on the network, so this would help.
> 
> Thanks for the help.
> 
> Richard Boehme
> 
> 
> _______________________________________________
> Spread-users mailing list
> Spread-users at lists.spread.org
> http://lists.spread.org/mailman/listinfo/spread-users

-- 
-------------------------------------------------------
Jonathan R. Stanton         jonathan at cs.jhu.edu
Dept. of Computer Science   
Johns Hopkins University    
-------------------------------------------------------
-------------- next part --------------
/*
Copyright (C) 2002 D-Fusion, Inc.

Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
    
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR
ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package spread;

import java.net.*;
import java.io.*;
import java.util.*;

public class MMSpreadConnection extends SpreadConnection {
  // Spread message types reserved for use by MMSpreadConnection
  private static final short MULTI_MSG_TYPE_BEGIN = -32768;   // the first msg of a chunked MMsg
  private static final short MULTI_MSG_TYPE       = -32767;   // one of the non-first + non-last msgs of a chunked MMsg
  private static final short MULTI_MSG_TYPE_END   = -32766;   // the last msg of a chunked MMsg
  private static final short MULTI_MSG_COVER_UP   = -32765;   // a non-MMsg that uses one of these reserved msg types

  private static final int   MAX_MSG_CHUNK_SIZE   = 80000;    // max size of a chunk in a MMsg

  private List   m_Mess_Queue;              // List<List<SpreadMessage> > Queue of msgs: each recvd msg is represented by a List<SpreadMessage>
  private Map    m_Multi_Mess_Map;          // Map<String, List<SpreadMessages> > A sender's spread name -> incomplete MMsg they are sending
  private Object m_SendLock;                // Lock to synchronize connects and multicasts
  private Object m_RecvLock;                // Lock to synchronize receive and poll calls

  private boolean m_WantsMembership;        // Does the user want to receive membership msgs?

  private MembershipInfo m_LastMembership;  // Most recent membership
  private SortedSet      m_LastLeavers;     // <String> members who left due to most recent membership (m_LastMembership)

  public MMSpreadConnection(){
    super();
    m_Mess_Queue      = new LinkedList();
    m_Multi_Mess_Map  = new HashMap();
    m_SendLock        = new Object();
    m_RecvLock        = new Object();

    m_WantsMembership = true;

    m_LastMembership  = null;
    m_LastLeavers     = null;
  }

  public void connect(InetAddress address, int port, String privateName, 
		      boolean priority, boolean groupMembership) throws SpreadException {
    synchronized (m_SendLock) {
      super.connect(address, port, privateName, priority, true);
      m_WantsMembership = groupMembership;
    }
  }

  // Sends a msg.  If the msg is larger then MAX_MSG_CHUNK_SIZE, then the msg is broken
  // up into msgs less then MAX_MSG_CHUNK_SIZE.  Each sub msg is sent out.  The recieve() call
  // will correctly reassemble the msgs on the other side.  If the user tries to use a reserved
  // msg type then a MULTI_MSG_COVER_UP msg will be sent + the user's type will be encoded in
  // the body.  The user's msg type will be restored by the receivers.

  public void multicast(SpreadMessage msg) throws SpreadException {
    synchronized (m_SendLock) {
      // if not a MMsg + not using a reserved type, then send regularly
      if (msg.getData().length <= MAX_MSG_CHUNK_SIZE && msg.getType() > MULTI_MSG_COVER_UP) {  
	super.multicast(msg);

      } else {
	try {
	  ByteArrayOutputStream baos = new ByteArrayOutputStream();
	  DataOutputStream dout      = new DataOutputStream(baos);
	  byte[] msgData             = msg.getData();
	  int count                  = 0;                         // number of bytes written from msgData so far
	  int size;
	  
	  dout.writeShort(msg.getType());                         // put the user type at front of first chunked msg	  
	  msg.setData(new byte[0]);                               // set data to empty array, so the clone() doesn't copy data everytime

	  do {
	    SpreadMessage send = (SpreadMessage) msg.clone();     // copy destination, other requested parameters from user's msg

	    if (count != 0) {                                     // not the first time through the loop
	      send.setFifo();                                     // msgs other than the first only need to be FIFO

	      if (msgData.length - count > MAX_MSG_CHUNK_SIZE) {  // if rest of msg is still too big, send another chunk
		send.setType(MULTI_MSG_TYPE);

	      } else {                                            // rest of msg will fit in one chunk, send final chunk
		send.setType(MULTI_MSG_TYPE_END);
	      }
	    } else {                                              // else first time through loop
	      if (msgData.length > MAX_MSG_CHUNK_SIZE) {          // we're sending a multi-msg
		send.setType(MULTI_MSG_TYPE_BEGIN);

	      } else {                                            // user is using a reserved msg type on a non-MMsg msg
		send.setType(MULTI_MSG_COVER_UP);
	      }
	    }
	    size = Math.min(MAX_MSG_CHUNK_SIZE, msgData.length - count);  // compute size of send for this msg
	    dout.write(msgData, count, size);
	    count += size;

	    send.setData(baos.toByteArray());                     // set the data of this msg
	    baos.reset();                                         // reset output buffer for next iteration
		
	    super.multicast(send);                                // multicast this chunk

	    // NOTE: there is no flow control here, so very large msgs can flood Spread possibly 
	    // causing Spread to disconnect too slow receivers

	  } while (count < msgData.length);

	  if (count != msgData.length) {                          // integrity check
	    throw new Error("Bug with our arithmetic!");
	  }
	} catch (IOException e) { throw new Error("Unexpected exception!", e);  // shouldn't happen because any IOExceptions are wrapped to SpreadExceptions
	} catch (SpreadException spe) { 
	  try { disconnect(); } catch (Throwable t) {}                          // can't allow sending of partial MMsg's, make sure we leave the group
	  throw spe;                                                            // rethrow the SpreadException
	}              
      }
    }
  }

  // Receives the next msg.  If we have msgs in m_Mess_Queue to be delivered, it returns the first one,
  // else it tries to recvs the next msg.  If a MMsg is at the head of the queue, this fcn ensures that we
  // get all the data for a complete MMsg before it returns.  All other msgs (including other MMsgs from
  // other senders) are buffered until the first MMsg is complete.  The synchronized Multicast call 
  // guarantees that only one incomplete MMsg is in the system per sender at any one time.

  public SpreadMessage receive() throws SpreadException, InterruptedIOException {
    synchronized (m_RecvLock) {
      while (true) {
	if (m_Mess_Queue.isEmpty()) {                                     // queue is empty, receive into queue and loop around again
	  enqueueMsg(super.receive());                                    // may or may not actually put a message in the queue (see enqueueMsg())

	} else {                                                          // else there is a message in the queue
	  List msgList      = (List) m_Mess_Queue.get(0);                 // get the msgList of the msg
	  SpreadMessage msg = (SpreadMessage) msgList.get(0);             // get the first chunk of the msg
	  short msgType     = msg.getType();                              // get the first chunk's type

	  if (msgType > MULTI_MSG_COVER_UP) {                             // not a special msg, so just return it (includes membership msgs)
	    m_Mess_Queue.remove(0);                                       // dequeue msgList from m_Mess_Queue
	    return msg;                                                   

	  } else {
	    if (msgType == MULTI_MSG_COVER_UP) {                          // user's msg type was reserved -> encoded in body of msg
	      fixSpecialMsg(msgList);                                     // restore user's encoded msg type
	      m_Mess_Queue.remove(0);                                     // dequeue msgList from m_Mess_Queue
	      return msg;
	    }
	    if (msgType != MULTI_MSG_TYPE_BEGIN) {                        // integrity check
	      throw new Error("Unexpected special message type at head of a msgList: " + msgType + "!");
	    }
	    if (recvMultiMsg(msgList, msg.getSender().toString())) {      // go and receive rest of this MMsg
	      fixSpecialMsg(msgList);                                     // combine msg into one big msg + restore user's encoded msg type
	      m_Mess_Queue.remove(0);                                     // dequeue msgList from m_Mess_Queue
	      return msg;
	    }
	    // else sender left while sending the MMsg, so loop around and try for next msg
	  }
	}
      }
    }
  }

  public boolean poll() throws SpreadException {
    synchronized (m_RecvLock) {
      return !m_Mess_Queue.isEmpty() || super.poll();
    }
  }

  // Next msg to be returned is a MMsg.  Ensure we have the whole msg + then return. If
  // the sender leaves before we receive the entire MMsg, then return false.

  private boolean recvMultiMsg(List msgList, String sender) throws SpreadException, InterruptedIOException {
    SpreadMessage msg = (SpreadMessage) msgList.get(msgList.size() - 1);  // get last msg chunk to check its type

    if (msg.getType() != MULTI_MSG_TYPE_END) {              // this MMsg hasn't been completely received yet
      while (true) {                                        // receive in a loop until we get the end of the MMsg from sender or he leaves
	msg = super.receive();
	enqueueMsg(msg);

	if (msg.isMembership()) {
	  if (m_LastLeavers.contains(sender)) {             // enqueueMsg updates m_LastLeavers for each membership msg
	    return false;
	  }
	} else if (msg.getType() == MULTI_MSG_TYPE_END && msg.getSender().toString().equals(sender)) {  // got end of MMsg we were waiting for
	  break;
	}
      }
    } 
    return true;                                            // current MMsg is completed
  }

  // This fcn takes appropriate actions depending on what type of message was just received.

  private void enqueueMsg(SpreadMessage msg) {
    List msgList = null;

    if (!msg.isMembership()) {
      short msgType = msg.getType();

      switch (msgType) {
	default:
        case MULTI_MSG_COVER_UP: {                                             // create a new queue entry for this msg
	  msgList = new LinkedList();
	  m_Mess_Queue.add(msgList);
	  msgList.add(msg);
	}
	break;

	case MULTI_MSG_TYPE_BEGIN:
	case MULTI_MSG_TYPE:
        case MULTI_MSG_TYPE_END: {
	  msgList = (List) m_Multi_Mess_Map.get(msg.getSender().toString());   // see if an incomplete MMsg already exists for this sender

	  if (msgList != null) {
	    if (msgType == MULTI_MSG_TYPE_BEGIN) {
	      throw new Error("Got a MULTI_MSG_TYPE_BEGIN from a member that is already sending a MMsg!");
	    }
	  } else {
	    if (msgType == MULTI_MSG_TYPE || msgType == MULTI_MSG_TYPE_END) {  // we don't have the start of this MMsg -> drop msg
	      return;
	    }
	    // else create a new queue entry for this MULTI_MSG_TYPE_BEGIN msg
	    msgList = new LinkedList();
	    m_Mess_Queue.add(msgList);
	    m_Multi_Mess_Map.put(msg.getSender().toString(), msgList);         // associate this sender with this incomplete MMsg
	  }
	  msgList.add(msg);                                                    // add the just received portion of the MMsg to the appropriate msgList

	  if (msgType == MULTI_MSG_TYPE_END) {                                 // if this msg completed a MMsg then
	    m_Multi_Mess_Map.remove(msg.getSender().toString());               // disassociate the sender with this now completed MMsg
	  }
	}
	break;  
      }
    } else {
      MembershipInfo info = msg.getMembershipInfo();

      m_LastLeavers = new TreeSet();                                           // construct a new set of leavers caused by this membership
      
      if (m_LastMembership != null) {                                          // if this isn't the first membership for this connection
	if (info.isCausedByLeave()) {
	  m_LastLeavers.add(info.getLeft().toString());
	
	} else if (info.isCausedByDisconnect()) {
	  m_LastLeavers.add(info.getDisconnected().toString());

	} else if (info.isCausedByNetwork()) {
	  SpreadGroup[] prevMembs = m_LastMembership.getMembers();
	  SpreadGroup[] survivors = info.getStayed();
	  
	  for (int i = 0; i < prevMembs.length; ++i) {
	    m_LastLeavers.add(prevMembs[i].toString());
	  }
	  for (int i = 0; i < survivors.length; ++i) {
	    m_LastLeavers.remove(survivors[i].toString());
	  }
	}
      }
      m_LastMembership = info;

      // remove incomplete MMsgs sent by any of the leavers
      Iterator it = m_LastLeavers.iterator();

      while (it.hasNext()) {
	String leaver = (String) it.next();

	m_Multi_Mess_Map.remove(leaver);
      }

      // if the user requested memberships, create a new entry on the queue for this membership msg
      if (m_WantsMembership) {
	msgList = new LinkedList();  
	m_Mess_Queue.add(msgList);
	msgList.add(msg);
      }
    }
    System.err.println("m_Mess_Queue size is now: " + m_Mess_Queue.size());
  }

  // Called when we are ready to return a special msg (a MMsg or a
  // COVER_UP msg) to the user.  We set the user type back to the
  // original. If it is a MMsg, we also combine the chunks into one big msg.

  private void fixSpecialMsg(List msgList) {
    try {
      SpreadMessage msg   = (SpreadMessage) msgList.get(0);
      DataInputStream din = new DataInputStream(new ByteArrayInputStream(msg.getData()));
	
      msg.setType(din.readShort());           // get + set the user type

      ByteArrayOutputStream baos = new ByteArrayOutputStream();
      Iterator it                = msgList.iterator();

      // take care of the first msg seperately, becasue it has the user type in it
      baos.write(msg.getData(), 2, msg.getData().length - 2);
      it.next();
	
      while (it.hasNext()) {                  // handle MMsgs
	baos.write(((SpreadMessage) it.next()).getData());   
      }
      msg.setData(baos.toByteArray());        // set the data of the msg

      msgList.clear();                        // clear out msgList
      msgList.add(msg);                       // put back the fixed up msg

    } catch (IOException e) { throw new Error("Unexpected exception!", e); } 
  }

  public static void main(String[] args) throws Throwable {
    MMSpreadConnection sendConn = new MMSpreadConnection();
    SpreadGroup sendGrp = new SpreadGroup();

    MMSpreadConnection recvConn = new MMSpreadConnection();
    SpreadGroup recvGrp = new SpreadGroup();

    sendConn.connect(InetAddress.getByName("localhost"), 4803, "sendConn", true, false);
    recvConn.connect(InetAddress.getByName("localhost"), 4803, "recvConn", true, false);

    sendGrp.join(sendConn, "testGrp");
    recvGrp.join(recvConn, "testGrp");

    ByteArrayOutputStream bout = new ByteArrayOutputStream();
    DataOutputStream dout = new DataOutputStream(bout);

    for (int i = 0; i < 250000; ++i) {
      dout.writeInt(i);
    }
    SpreadMessage msg = new SpreadMessage();

    msg.addGroup("testGrp");
    msg.setReliable();
    msg.setType(MULTI_MSG_TYPE);
    msg.setSelfDiscard(true);
    msg.setData(bout.toByteArray());

    sendConn.multicast(msg);

    msg = recvConn.receive();

    ByteArrayInputStream bin = new ByteArrayInputStream(msg.getData());
    DataInputStream din = new DataInputStream(bin);

    for (int i = 0; i < 250000; ++i) {
      int x = din.readInt();

      if (x != i) {
	throw new Error("Mismatch at index i = " + i + " val(" + x + ") != i!");
      }
    }
    if (msg.getData().length != 1000000) {
      throw new Error("Msg was incorrect size: " + msg.getData().length);
    }
    if (msg.getType() != MULTI_MSG_TYPE) {
      throw new Error("Msg type mismatches: " + msg.getType() + " != MULTI_MSG_TYPE!");
    }

    msg.setData(new byte[0]);

    sendConn.multicast(msg);

    msg = recvConn.receive();

    if (msg.getData().length != 0) {
      throw new Error("Msg was incorrect size: " + msg.getData().length);
    }
    if (msg.getType() != MULTI_MSG_TYPE) {
      throw new Error("Msg type mismatches: " + msg.getType() + " != MULTI_MSG_TYPE!");
    }

    msg.setType((short) 0);

    sendConn.multicast(msg);

    msg = recvConn.receive();

    if (msg.getData().length != 0) {
      throw new Error("Msg was incorrect size: " + msg.getData().length);
    }
    if (msg.getType() != 0) {
      throw new Error("Msg type mismatches: " + msg.getType() + " != MULTI_MSG_TYPE!");
    }

    System.out.println("Tests successful!");

    sendConn.disconnect();
    recvConn.disconnect();
  }
}


More information about the Spread-users mailing list