[Spread-users] Spread for large message sizes

John Schultz jschultz at commedia.cnds.jhu.edu
Wed Mar 5 01:00:06 EST 2003


In response to your message I re-examined our current code for
MMSpreadConnection. I don't know if the version in the list archive was
the most up-to-date, so I've reattached our most recent version to this
email. I also fixed what I think was a bug about not removing partial
multi-msgs for leavers from the main message queue.

On Wed, 5 Mar 2003 Doug.Palmer at csiro.au wrote:

> A couple of questions that I can't figure out from examining the code and
> mailing list:
>
> * If I have two producers (A and B) multicasting to the same group using
> large messages, is it possible for the fragmented messages to arrive in
> mixed order? Say A_1, B_1, B_2, A_2, B_3. Is this handled? I think
> enqueueMsg does this, but I've got a bit lost.

Yes, messages can arrive in a mixed order. However, the delivery order is
determined by the first fragment of each msg. So, in your example A's msg
would be delivered before B's at the receiving process.

> * What are the effects on the message safety semantics? If the messages are
> queued, as I think they are, they should appear at the client in the same
> order that the initial headers arrived, this should take case of FIFO,
> Causal and Total Order semantics. I don't suppose that Safe can be
> guaranteed?

The _ordering_ safety properties should be maintained (fifo, causal,
agreed), however holes can be introduced in the order. For example,
process P receives msg fragment A1 and then msg B. MMSpreadConnection
won't deliver B until it collects all of the fragments of A and delivers
it, or is forced to give up on A. If the sender of A1 leaves before we can
collect all the fragments of A, it will be silently dropped, then B will
be delivered and so on. This breaks several of the no-holes safety
properties of EVS/Spread.

It might be possible to generate a transitional signal and mess with the
VS sets here to handle this situation while maintaining all of the safety
properties. I remember considering this and discussing it with Yair. I
don't remember if we decided whether or not you could meet the safety
requirements, but I know we decided it wasn't worth our effort (we didn't
need the stronger semantics).

> This pretty much deals with my original motivation. So thankyou. I would
> like to reiterate my original questions, however:
>
> > >>Is the message size a hard limit?

The max message size in Spread is a compile time limit.

> > >>Can I change it by settings in spread.conf or somewhere?
>
> > >>Is it tied to some fundamental design limit, or could I recompile
> > spread
> > >>with different settings? (And if so, how?).

It is determined by the #define MAX_MESSAGE_BODY in sess_types.h which in
turn is determined by the #define MAX_SCATTER_ELEMENTS in scatter.h and
MAX_PACKET_SIZE in data_link.h. I think you can ratchet MAX_PACKET_SIZE up
to the maximum UDP data size, which I think is 64KB. If this works, this
would raise the max msg size up to ~6.4MB.

I'm not sure if you can freely move MAX_SCATTER_ELEMENTS up and down -- it
probably depends on your platform and if spread is using scatter
send/recvs or not.

Jonathan or Yair can give you the answer to this question for sure. Doing
this will increase the memory foot print of Spread -- by how much I'm not
sure, but it could be considerable.

I don't know if this #define limit is a FUNDAMENTAL design limit of the
GCS, or just a limit of the way it is currently engineered.

John Schultz
Co-Founder, Lead Engineer
D-Fusion, Inc. (http://www.d-fusion.net)
Phn: 443 838 2200
-------------- next part --------------
/*
Copyright (C) 2002 D-Fusion, Inc.

Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
    
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR
ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package net.DFusion.protocols.spread;

import java.net.*;
import java.io.*;
import java.util.*;

public class MMSpreadConnection extends SpreadConnection {
  // Spread message types reserved for use by MMSpreadConnection
  private static final short MULTI_MSG_TYPE_BEGIN = -32768;   // the first msg of a chunked MMsg
  private static final short MULTI_MSG_TYPE       = -32767;   // one of the non-first + non-last msgs of a chunked MMsg
  private static final short MULTI_MSG_TYPE_END   = -32766;   // the last msg of a chunked MMsg
  private static final short MULTI_MSG_COVER_UP   = -32765;   // a non-MMsg that uses one of these reserved msg types

  private static final int   MAX_MSG_CHUNK_SIZE   = 80000;    // max size of a chunk in a MMsg

  private List   m_Mess_Queue;              // List<List<SpreadMessage> > Queue of msgs: each recvd msg is represented by a List<SpreadMessage>
  private Map    m_Multi_Mess_Map;          // Map<String, List<SpreadMessages> > A sender's spread name -> incomplete MMsg they are sending
  private Object m_SendLock;                // Lock to synchronize connects and multicasts
  private Object m_RecvLock;                // Lock to synchronize receive and poll calls

  private boolean m_WantsMembership;        // Does the user want to receive membership msgs?

  private MembershipInfo m_LastMembership;  // Most recent membership
  private SortedSet      m_LastLeavers;     // <String> members who left due to most recent membership (m_LastMembership)

  public MMSpreadConnection(){
    super();
    m_Mess_Queue      = new LinkedList();
    m_Multi_Mess_Map  = new HashMap();
    m_SendLock        = new Object();
    m_RecvLock        = new Object();

    m_WantsMembership = true;

    m_LastMembership  = null;
    m_LastLeavers     = null;
  }

  public void connect(InetAddress address, int port, String privateName, 
		      boolean priority, boolean groupMembership) throws SpreadException {
    synchronized (m_SendLock) {
      super.connect(address, port, privateName, priority, true);
      m_WantsMembership = groupMembership;
    }
  }

  // Sends a msg.  If the msg is larger then MAX_MSG_CHUNK_SIZE, then the msg is broken
  // up into msgs less than MAX_MSG_CHUNK_SIZE.  Each sub msg is sent out.  The recieve() call
  // will correctly reassemble the msgs on the other side.  If the user tries to use a reserved
  // msg type then a MULTI_MSG_COVER_UP msg will be sent + the user's type will be encoded in
  // the body.  The user's msg type will be restored by the receivers.

  public void multicast(SpreadMessage msg) throws SpreadException {
    synchronized (m_SendLock) {
      // if not a MMsg + not using a reserved type, then send regularly
      if (msg.getData().length <= MAX_MSG_CHUNK_SIZE && msg.getType() > MULTI_MSG_COVER_UP) {  
	super.multicast(msg);

      } else {
	try {
	  ByteArrayOutputStream baos = new ByteArrayOutputStream();
	  DataOutputStream dout      = new DataOutputStream(baos);
	  byte[] msgData             = msg.getData();
	  int count                  = 0;                         // number of bytes written from msgData so far
	  int size;
	  
	  dout.writeShort(msg.getType());                         // put the user type at front of first chunked msg	  
	  msg.setData(new byte[0]);                               // set data to empty array, so the clone() doesn't copy data everytime

	  do {
	    SpreadMessage send = (SpreadMessage) msg.clone();     // copy destination, other requested parameters from user's msg

	    if (count != 0) {                                     // not the first time through the loop
	      send.setFifo();                                     // msgs other than the first only need to be FIFO

	      if (msgData.length - count > MAX_MSG_CHUNK_SIZE) {  // if rest of msg is still too big, send another chunk
		send.setType(MULTI_MSG_TYPE);

	      } else {                                            // rest of msg will fit in one chunk, send final chunk
		send.setType(MULTI_MSG_TYPE_END);
	      }
	    } else {                                              // else first time through loop
	      if (msgData.length > MAX_MSG_CHUNK_SIZE) {          // we're sending a multi-msg
		send.setType(MULTI_MSG_TYPE_BEGIN);

	      } else {                                            // user is using a reserved msg type on a non-MMsg msg
		send.setType(MULTI_MSG_COVER_UP);
	      }
	    }
	    size = Math.min(MAX_MSG_CHUNK_SIZE, msgData.length - count);  // compute size of send for this msg
	    dout.write(msgData, count, size);
	    count += size;

	    send.setData(baos.toByteArray());                     // set the data of this msg
	    baos.reset();                                         // reset output buffer for next iteration
		
	    super.multicast(send);                                // multicast this chunk

	    // NOTE: there is no flow control here, so very large msgs can flood Spread possibly 
	    // causing Spread to disconnect too slow receivers

	  } while (count < msgData.length);

	  if (count != msgData.length) {                          // integrity check
	    throw new Error("Bug with our arithmetic!");
	  }
	} catch (IOException e) { throw new Error("Unexpected exception!", e);  // shouldn't happen because any IOExceptions are wrapped to SpreadExceptions
	} catch (SpreadException spe) { 
	  try { disconnect(); } catch (Throwable t) {}                          // can't allow sending of partial MMsg's, make sure we leave the group
	  throw spe;                                                            // rethrow the SpreadException
	}              
      }
    }
  }

  // Receives the next msg.  If we have msgs in m_Mess_Queue to be delivered, it returns the first one,
  // else it tries to recvs the next msg.  If a MMsg is at the head of the queue, this fcn ensures that we
  // get all the data for a complete MMsg before it returns.  All other msgs (including other MMsgs from
  // other senders) are buffered until the first MMsg is complete.  The synchronized Multicast call 
  // guarantees that only one incomplete MMsg is in the system per sender at any one time.

  public SpreadMessage receive() throws SpreadException, InterruptedIOException {
    synchronized (m_RecvLock) {
      while (true) {
	if (m_Mess_Queue.isEmpty()) {                                     // queue is empty, receive into queue and loop around again
	  enqueueMsg(super.receive());                                    // may or may not actually put a message in the queue (see enqueueMsg())

	} else {                                                          // else there is a message in the queue
	  List msgList      = (List) m_Mess_Queue.get(0);                 // get the msgList of the msg
	  SpreadMessage msg = (SpreadMessage) msgList.get(0);             // get the first chunk of the msg
	  short msgType     = msg.getType();                              // get the first chunk's type

	  if (msgType > MULTI_MSG_COVER_UP) {                             // not a special msg, so just return it (includes membership msgs)
	    m_Mess_Queue.remove(0);                                       // dequeue msgList from m_Mess_Queue
	    return msg;                                                   

	  } else {
	    if (msgType == MULTI_MSG_COVER_UP) {                          // user's msg type was reserved -> encoded in body of msg
	      fixSpecialMsg(msgList);                                     // restore user's encoded msg type
	      m_Mess_Queue.remove(0);                                     // dequeue msgList from m_Mess_Queue
	      return msg;
	    }
	    if (msgType != MULTI_MSG_TYPE_BEGIN) {                        // integrity check
	      throw new Error("Unexpected special message type at head of a msgList: " + msgType + "!");
	    }
	    if (recvMultiMsg(msgList, msg.getSender().toString())) {      // go and receive rest of this MMsg
	      fixSpecialMsg(msgList);                                     // combine msg into one big msg + restore user's encoded msg type
	      m_Mess_Queue.remove(0);                                     // dequeue msgList from m_Mess_Queue
	      return msg;
	    }
	    // else sender left while sending the MMsg, so loop around and try for next msg
	  }
	}
      }
    }
  }

  public boolean poll() throws SpreadException {
    synchronized (m_RecvLock) {
      return !m_Mess_Queue.isEmpty() || super.poll();
    }
  }

  // Next msg to be returned is a MMsg.  Ensure we have the whole msg + then return. If
  // the sender leaves before we receive the entire MMsg, then return false.

  private boolean recvMultiMsg(List msgList, String sender) throws SpreadException, InterruptedIOException {
    SpreadMessage msg = (SpreadMessage) msgList.get(msgList.size() - 1);  // get last msg chunk to check its type

    if (msg.getType() != MULTI_MSG_TYPE_END) {              // this MMsg hasn't been completely received yet
      while (true) {                                        // receive in a loop until we get the end of the MMsg from sender or he leaves
	msg = super.receive();
	enqueueMsg(msg);

	if (msg.isMembership()) {
	  if (m_LastLeavers.contains(sender)) {             // enqueueMsg updates m_LastLeavers for each membership msg
	    return false;
	  }
	} else if (msg.getType() == MULTI_MSG_TYPE_END && msg.getSender().toString().equals(sender)) {  // got end of MMsg we were waiting for
	  break;
	}
      }
    } 
    return true;                                            // current MMsg is completed
  }

  // This fcn takes appropriate actions depending on what type of message was just received.

  private void enqueueMsg(SpreadMessage msg) {
    List msgList = null;

    if (!msg.isMembership()) {
      short msgType = msg.getType();

      switch (msgType) {
	default:
        case MULTI_MSG_COVER_UP: {                                             // create a new queue entry for this msg
	  msgList = new LinkedList();
	  m_Mess_Queue.add(msgList);
	  msgList.add(msg);
	}
	break;

	case MULTI_MSG_TYPE_BEGIN:
	case MULTI_MSG_TYPE:
        case MULTI_MSG_TYPE_END: {
	  msgList = (List) m_Multi_Mess_Map.get(msg.getSender().toString());   // see if an incomplete MMsg already exists for this sender

	  if (msgList != null) {
	    if (msgType == MULTI_MSG_TYPE_BEGIN) {
	      throw new Error("Got a MULTI_MSG_TYPE_BEGIN from a member that is already sending a MMsg!");
	    }
	  } else {
	    if (msgType == MULTI_MSG_TYPE || msgType == MULTI_MSG_TYPE_END) {  // we don't have the start of this MMsg -> drop msg
	      return;
	    }
	    // else create a new queue entry for this MULTI_MSG_TYPE_BEGIN msg
	    msgList = new LinkedList();
	    m_Mess_Queue.add(msgList);
	    m_Multi_Mess_Map.put(msg.getSender().toString(), msgList);         // associate this sender with this incomplete MMsg
	  }
	  msgList.add(msg);                                                    // add the just received portion of the MMsg to the appropriate msgList

	  if (msgType == MULTI_MSG_TYPE_END) {                                 // if this msg completed a MMsg then
	    m_Multi_Mess_Map.remove(msg.getSender().toString());               // disassociate the sender with this now completed MMsg
	  }
	}
	break;  
      }
    } else {
      MembershipInfo info = msg.getMembershipInfo();

      m_LastLeavers = new TreeSet();                                           // construct a new set of leavers caused by this membership
      
      if (m_LastMembership != null) {                                          // if this isn't the first membership for this connection
	if (info.isCausedByLeave()) {
	  m_LastLeavers.add(info.getLeft().toString());
	
	} else if (info.isCausedByDisconnect()) {
	  m_LastLeavers.add(info.getDisconnected().toString());

	} else if (info.isCausedByNetwork()) {
	  SpreadGroup[] prevMembs = m_LastMembership.getMembers();
	  SpreadGroup[] survivors = info.getStayed();
	  
	  for (int i = 0; i < prevMembs.length; ++i) {
	    m_LastLeavers.add(prevMembs[i].toString());
	  }
	  for (int i = 0; i < survivors.length; ++i) {
	    m_LastLeavers.remove(survivors[i].toString());
	  }
	}
      }
      m_LastMembership = info;

      // remove incomplete MMsgs sent by any of the leavers
      Iterator it = m_LastLeavers.iterator();

      while (it.hasNext()) {
	String leaver = (String) it.next();
	List mmsg     = (List) m_Multi_Mess_Map.remove(leaver);

	if (mmsg != null) {
	  m_Mess_Queue.remove(mmsg);  // linear search
	}
      }

      // if the user requested memberships, create a new entry on the queue for this membership msg
      if (m_WantsMembership) {
	msgList = new LinkedList();  
	m_Mess_Queue.add(msgList);
	msgList.add(msg);
      }
    }
    //System.err.println("m_Mess_Queue size is now: " + m_Mess_Queue.size());
  }

  // Called when we are ready to return a special msg (a MMsg or a
  // COVER_UP msg) to the user.  We set the user type back to the
  // original. If it is a MMsg, we also combine the chunks into one big msg.

  private void fixSpecialMsg(List msgList) {
    try {
      SpreadMessage msg   = (SpreadMessage) msgList.get(0);
      DataInputStream din = new DataInputStream(new ByteArrayInputStream(msg.getData()));
	
      msg.setType(din.readShort());           // get + set the user type

      ByteArrayOutputStream baos = new ByteArrayOutputStream();
      Iterator it                = msgList.iterator();

      // take care of the first msg seperately, becasue it has the user type in it
      baos.write(msg.getData(), 2, msg.getData().length - 2);
      it.next();
	
      while (it.hasNext()) {                  // handle MMsgs
	baos.write(((SpreadMessage) it.next()).getData());   
      }
      msg.setData(baos.toByteArray());        // set the data of the msg

      msgList.clear();                        // clear out msgList
      msgList.add(msg);                       // put back the fixed up msg

    } catch (IOException e) { throw new Error("Unexpected exception!", e); } 
  }

  public static void main(String[] args) throws Throwable {
    MMSpreadConnection sendConn = new MMSpreadConnection();
    SpreadGroup sendGrp = new SpreadGroup();

    MMSpreadConnection recvConn = new MMSpreadConnection();
    SpreadGroup recvGrp = new SpreadGroup();

    sendConn.connect(InetAddress.getByName("localhost"), 4803, "sendConn", true, false);
    recvConn.connect(InetAddress.getByName("localhost"), 4803, "recvConn", true, false);

    sendGrp.join(sendConn, "testGrp");
    recvGrp.join(recvConn, "testGrp");

    ByteArrayOutputStream bout = new ByteArrayOutputStream();
    DataOutputStream dout = new DataOutputStream(bout);

    for (int i = 0; i < 250000; ++i) {
      dout.writeInt(i);
    }
    SpreadMessage msg = new SpreadMessage();

    msg.addGroup("testGrp");
    msg.setReliable();
    msg.setType(MULTI_MSG_TYPE);
    msg.setSelfDiscard(true);
    msg.setData(bout.toByteArray());

    sendConn.multicast(msg);

    msg = recvConn.receive();

    ByteArrayInputStream bin = new ByteArrayInputStream(msg.getData());
    DataInputStream din = new DataInputStream(bin);

    for (int i = 0; i < 250000; ++i) {
      int x = din.readInt();

      if (x != i) {
	throw new Error("Mismatch at index i = " + i + " val(" + x + ") != i!");
      }
    }
    if (msg.getData().length != 1000000) {
      throw new Error("Msg was incorrect size: " + msg.getData().length);
    }
    if (msg.getType() != MULTI_MSG_TYPE) {
      throw new Error("Msg type mismatches: " + msg.getType() + " != MULTI_MSG_TYPE!");
    }

    msg.setData(new byte[0]);

    sendConn.multicast(msg);

    msg = recvConn.receive();

    if (msg.getData().length != 0) {
      throw new Error("Msg was incorrect size: " + msg.getData().length);
    }
    if (msg.getType() != MULTI_MSG_TYPE) {
      throw new Error("Msg type mismatches: " + msg.getType() + " != MULTI_MSG_TYPE!");
    }

    msg.setType((short) 0);

    sendConn.multicast(msg);

    msg = recvConn.receive();

    if (msg.getData().length != 0) {
      throw new Error("Msg was incorrect size: " + msg.getData().length);
    }
    if (msg.getType() != 0) {
      throw new Error("Msg type mismatches: " + msg.getType() + " != MULTI_MSG_TYPE!");
    }

    System.out.println("Tests successful!");

    sendConn.disconnect();
    recvConn.disconnect();
  }
}


More information about the Spread-users mailing list