[Spread-users] Invalid message in high load group
Scott Barvick
sbarvick at revasystems.com
Thu Sep 1 09:20:24 EDT 2005
There has been discussion before that might help on this.
http://commedia.cnds.jhu.edu/pipermail/spread-users/2005-April/002496.html
Scott
On Thu, 2005-09-01 at 05:56, Pradyumna Sampath wrote:
> Hi Marcin,
>
> We had a similar problem with spread when we loaded it
> quite a bit. First of all I suggest that you use
> message type "UNRELIABLE_MESS".
>
> When we did this the frequency of the problem did
> decrease but continued to happen.
>
> In this case there a workaround has been implemented
> in the file SpreadConnection.java which I have
> attached with this file.
>
> Note: There exists a high risk of losing messages and
> is *NOT* a very safe workaround.
>
> Hope this helps,
> Regards
> Prady
>
> --- Marcin Kuthan <mkuthan at pit.edu.pl> wrote:
>
> > Hi All
> >
> > I have a problem with Java clients on high load
> > group. Header of spread
> > message is not decoded correctly, and
> > java.lang.OutOfMemoryError is
> > thrown. Java spread client expect following header
> > structure:
> >
> > 1. type (4 bytes)
> > 2. sender (MAX_GROUP_NAME bytes) - typically 32
> > 3. numGroups (4 bytes)
> > 4. hint (4 bytes)
> > 5. dataLen (4 bytes)
> >
> > But on high load group I sometimes receive another
> > header:
> > 1. type is missing, header start with group name
> > (NOT sender, length
> > looks like MAX_GROUP_NAME)
> > 2. next four bytes contains 0,0,0,28 (decimal)
> > 3. next four bytes contains 2,0,0,0 (decimal)
> > 4. next four bytes contains 10,0,0,0 (decimal)
> > 5. next four bytes contains -123,0,0,0 (decimal)
> >
> > numGroups field is decoded to very large number and
> > allocation of byte
> > array throws an OutOfMemoryError. Even if I check
> > numGroups size and
> > error is not thrown my clients will not receive any
> > new messages.
> >
> > When I turn on SESSION debug flag in spread.conf I
> > get following logs:
> > Sess_badger: for mbox 9
> >
> >
> > High load environment is mean about 10.000 msg/sec.
> > My software:
> >
> > OS: Linux woody (gcc 2.95, libc 2.2.5)
> > spread: 3.17.3 (with MAX_SESSION_MESSAGES set to
> > 10000)
> > java: 1.4.2_08
> >
> > Any help would be appreciated.
> >
> > --
> > Telecommunication Research Institute
> > Poligonowa 30, 04-051 Warszawa, Poland
> >
> > Marcin Kuthan
> > mail: mkuthan at pit.edu.pl
> > phone: +48 (22) 486-53-46
> >
> >
> > _______________________________________________
> > Spread-users mailing list
> > Spread-users at lists.spread.org
> >
> http://lists.spread.org/mailman/listinfo/spread-users
> >
>
>
> Visit me at http://rtns.org/prady/
> Mail : pradysam (at) yahoo (dot) co.in
>
>
>
> ____________________________________________________
> Start your day with Yahoo! - make it your home page
> http://www.yahoo.com/r/hs
>
>
> ______________________________________________________________________
> /*
> * The Spread Toolkit.
> *
> * The contents of this file are subject to the Spread Open-Source
> * License, Version 1.0 (the ``License''); you may not use
> * this file except in compliance with the License. You may obtain a
> * copy of the License at:
> *
> * http://www.spread.org/license/
> *
> * or in the file ``license.txt'' found in this distribution.
> *
> * Software distributed under the License is distributed on an AS IS basis,
> * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
> * for the specific language governing rights and limitations under the
> * License.
> *
> * The Creators of Spread are:
> * Yair Amir, Michal Miskin-Amir, Jonathan Stanton.
> *
> * Copyright (C) 1993-2004 Spread Concepts LLC <spread at spreadconcepts.com>
> *
> * All Rights Reserved.
> *
> * Major Contributor(s):
> * ---------------
> * Cristina Nita-Rotaru crisn at cs.purdue.edu - group communication security.
> * Theo Schlossnagle jesus at omniti.com - Perl, skiplists, autoconf.
> * Dan Schoenblum dansch at cnds.jhu.edu - Java interface.
> * John Schultz jschultz at cnds.jhu.edu - contribution to process group membership.
> *
> */
>
> package spread;
>
> import java.io.IOException;
> import java.io.InputStream;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.io.UnsupportedEncodingException;
> import java.net.InetAddress;
> import java.net.Socket;
> import java.net.SocketException;
> import java.net.UnknownHostException;
> import java.util.Vector;
>
> /**
> * A SpreadConnection object is used to establish a connection to a spread
> * daemon. To connect to a spread daemon, first create a new SpreadConnection
> * object, and then call
> * {@link SpreadConnection#connect(InetAddress, int, String, boolean, boolean)}:
> * <p>
> * <blockquote>
> *
> * <pre>
> * SpreadConnection connection = new SpreadConnection();
> * connection.connect(null, 0, "name", false, false);
> * </pre>
> *
> * </blockquote>
> * <p>
> * The only methods that can be called before
> * {@link SpreadConnection#connect(InetAddress, int, String, boolean, boolean)}
> * are the add ({@link SpreadConnection#add(BasicMessageListener)},
> * {@link SpreadConnection#add(AdvancedMessageListener)}) and remove (
> * {@link SpreadConnection#remove(BasicMessageListener)},
> * {@link SpreadConnection#remove(AdvancedMessageListener)}) methods. If any
> * other methods are called, a SpreadException is thrown, except for
> * {@link SpreadConnection#getPrivateGroup()}, which returns null.
> * <p>
> * To disconnect from the daemon, call {@link SpreadConnection#disconnect()}:
> * <p>
> * <blockquote>
> *
> * <pre>
> * connection.disconnect();
> * </pre>
> *
> * </blockquote>
> * <p>
> * To send a message on this connection, call
> * {@link SpreadConnection#multicast(SpreadMessage)}:
> * <p>
> * <blockquote>
> *
> * <pre>
> * connection.multicast(message);
> * </pre>
> *
> * </blockquote>
> * <p>
> * To receive a message sent to this connection, call
> * {@link SpreadConnection#receive()}:
> * <p>
> * <blockquote>
> *
> * <pre>
> * SpreadMessage message = connection.receive();
> * </pre>
> *
> * </blockquote>
> * <p>
> */
> public class SpreadConnection {
> // The default Spread port.
> // /////////////////////////
> private static final int DEFAULT_SPREAD_PORT = 4803;
>
> // The maximum length of the private name.
> // ////////////////////////////////////////
> private static final int MAX_PRIVATE_NAME = 10;
>
> // The maximum length of a message + group names.
> // ////////////////////////////////////////
> private static final int MAX_MESSAGE_LENGTH = 140000;
>
> // The maximum length of the group name.
> // //////////////////////////////////////
> protected static final int MAX_GROUP_NAME = 32;
>
> // The Spread version.
> // ////////////////////
> private static final int SP_MAJOR_VERSION = 3;
>
> private static final int SP_MINOR_VERSION = 17;
>
> private static final int SP_PATCH_VERSION = 3;
>
> // The default authentication method
> // //////////////////////////////////
> private static final String DEFAULT_AUTH_NAME = "NULL";
>
> // The class name of the default authentication method
> // ////////////////////////////////////////////////////
> private static final String DEFAULT_AUTHCLASS_NAME = "spread.NULLAuth";
>
> // The maximum length of a authentication method name
> // ///////////////////////////////////////////////////
> private static final int MAX_AUTH_NAME = 30;
>
> // The maximum number of authentication methods
> // /////////////////////////////////////////////
> private static final int MAX_AUTH_METHODS = 3;
>
> // Received if a connection attempt was successful.
> // /////////////////////////////////////////////////
> private static final int ACCEPT_SESSION = 1;
>
> // Used to determine endianness.
> // //////////////////////////////
> private static final int ENDIAN_TYPE = 0x80000080;
>
> // Only true if this connection is connected.
> // ///////////////////////////////////////////
> private boolean connected;
>
> // Reading synchro.
> // /////////////////
> private Boolean rsynchro;
>
> // Writing synchro.
> // /////////////////
> private Boolean wsynchro;
>
> // Listener list synchro.
> // /////////////////
> private Boolean listenersynchro;
>
> // True if calling listeners.
> // ///////////////////////////
> private boolean callingListeners;
>
> // The thread feeding the listeners.
> // //////////////////////////////////
> private Listener listener;
>
> // Basic listeners.
> // /////////////////
> protected Vector basicListeners;
>
> // Advanced listeners.
> // ////////////////////
> protected Vector advancedListeners;
>
> // The daemon's address.
> // //////////////////////
> private InetAddress address;
>
> // The daemon's port.
> // ///////////////////
> private int port;
>
> // Is this a priority connection?
> // ///////////////////////////////
> private boolean priority;
>
> // Getting group membership messages?
> // ///////////////////////////////////
> private boolean groupMembership;
>
> // Name of active choosen Authentication method
> // /////////////////////////////////////////////
> private String authName;
>
> // Name of class for active choosen Authentication method
> // ///////////////////////////////////////////////////////
> private String authClassName;
>
> // Object reference to current authentication class
> // /////////////////////////////////////////////////
> private Object authObj;
>
> // Method reference to authenticate method
> // ////////////////////////////////////////
> private java.lang.reflect.Method authMethodAuthenticate;
>
> // The socket this connection is using.
> // /////////////////////////////////////
> private Socket socket;
>
> // The socket's input stream.
> // ///////////////////////////
> private InputStream socketInput;
>
> // The socket's output stream.
> // ////////////////////////////
> private OutputStream socketOutput;
>
> // The private group.
> // ///////////////////
> private SpreadGroup group;
>
> // Commands buffered during listener callbacks.
> // The buffer is a list of BUFFER_* constants.
> // For commands with an argument (all except
> // for BUFFER_DISCONNECT), the argument follows in the Vector.
> // ////////////////////////////////////////////////////////////
> private Vector listenerBuffer;
>
> // Listener buffer commands.
> // These are Object's because they need to be added to a Vector.
> // //////////////////////////////////////////////////////////////
> private static final Object BUFFER_DISCONNECT = new Object();
>
> private static final Object BUFFER_ADD_BASIC = new Object();
>
> private static final Object BUFFER_ADD_ADVANCED = new Object();
>
> private static final Object BUFFER_REMOVE_BASIC = new Object();
>
> private static final Object BUFFER_REMOVE_ADVANCED = new Object();
>
> // Checks if the int is the same-endian type as the local machine.
> // ////////////////////////////////////////////////////////////////
> private static boolean sameEndian(int i) {
> return ((i & ENDIAN_TYPE) == 0);
> }
>
> // Clears the int's endian type.
> // //////////////////////////////
> private static int clearEndian(int i) {
> return (i & ~ENDIAN_TYPE);
> }
>
> // Endian-flips the int.
> // //////////////////////
> protected static int flip(int i) {
> return (((i >> 24) & 0x000000ff) | ((i >> 8) & 0x0000ff00)
> | ((i << 8) & 0x00ff0000) | ((i << 24) & 0xff000000));
> }
>
> // Endian-flips the short.
> // ////////////////////////
> private static short flip(short s) {
> return ((short) (((s >> 8) & 0x00ff) | ((s << 8) & 0xff00)));
> }
>
> // Puts a group name into an array of bytes.
> // //////////////////////////////////////////
> private static void toBytes(SpreadGroup group, byte buffer[],
> int bufferIndex) {
> // Get the group's name.
> // //////////////////////
> byte name[];
> try {
> name = group.toString().getBytes("ISO8859_1");
> } catch (UnsupportedEncodingException e) {
> // Already checked for this exception in connect.
> // ///////////////////////////////////////////////
> name = new byte[0];
> }
>
> // Put a cap on the length.
> // /////////////////////////
> int len = name.length;
> if (len > MAX_GROUP_NAME)
> len = MAX_GROUP_NAME;
>
> // Copy the name into the buffer.
> // ///////////////////////////////
> System.arraycopy(name, 0, buffer, bufferIndex, len);
> for (; len < MAX_GROUP_NAME; len++)
> buffer[bufferIndex + len] = 0;
> }
>
> // Puts an int into an array of bytes.
> // ////////////////////////////////////
> private static void toBytes(int i, byte buffer[], int bufferIndex) {
> buffer[bufferIndex++] = (byte) ((i >> 24) & 0xFF);
> buffer[bufferIndex++] = (byte) ((i >> 16) & 0xFF);
> buffer[bufferIndex++] = (byte) ((i >> 8) & 0xFF);
> buffer[bufferIndex++] = (byte) ((i) & 0xFF);
> }
>
> // Gets an int from an array of bytes.
> // ////////////////////////////////////
> protected static int toInt(byte buffer[], int bufferIndex) {
> int i0 = (buffer[bufferIndex++] & 0xFF);
> int i1 = (buffer[bufferIndex++] & 0xFF);
> int i2 = (buffer[bufferIndex++] & 0xFF);
> int i3 = (buffer[bufferIndex++] & 0xFF);
>
> return ((i0 << 24) | (i1 << 16) | (i2 << 8) | (i3));
> }
>
> // Reads from inputsocket until all bytes read or exception raised
> // ////////////////////////////////////////////////////////////////
> private void readBytesFromSocket(byte buffer[], String bufferTypeString)
> throws SpreadException {
> int byteIndex;
> int rcode;
> try {
> for (byteIndex = 0; byteIndex < buffer.length; byteIndex += rcode) {
> rcode = socketInput.read(buffer, byteIndex, buffer.length
> - byteIndex);
> if (rcode == -1) {
> throw new SpreadException(
> "Connection closed while reading "
> + bufferTypeString);
> }
> }
> } catch (InterruptedIOException e) {
> throw new SpreadException(
> "readBytesFromSocket(): InterruptedIOException " + e);
> } catch (IOException e) {
> throw new SpreadException("readBytesFromSocket(): read() " + e);
> }
>
> }
>
> // Gets a string from an array of bytes.
> // //////////////////////////////////////
> protected SpreadGroup toGroup(byte buffer[], int bufferIndex) {
> try {
> for (int end = bufferIndex; end < buffer.length; end++) {
> if (buffer[end] == 0) {
> // Get the group name.
> // ////////////////////
> String name = new String(buffer, bufferIndex, end
> - bufferIndex, "ISO8859_1");
>
> // Return the group.
> // //////////////////
> return new SpreadGroup(this, name);
> }
> }
> } catch (UnsupportedEncodingException e) {
> // Already checked for this exception in connect.
> // ///////////////////////////////////////////////
> }
>
> return null;
> }
>
> // Set the send and receive buffer sizes.
> // ///////////////////////////////////////
> private void setBufferSizes() throws SpreadException {
>
> try {
> for (int i = 10; i <= 200; i += 5) { //
> int size = (1024 * i);
> // Set the buffer sizes. ////////////////////////
> socket.setSendBufferSize(size);
> socket.setReceiveBufferSize(size);
> // Check the actual sizes. If smaller, then the max was hit.
> // ///////////////////////////////////////////////////////////
> if ((socket.getSendBufferSize() < size)
> || (socket.getReceiveBufferSize() < size)) {
> break;
> }
> //System.out.println("Max recieve and send buffer size " + size);
> }
> } catch (SocketException e) {
> throw new SpreadException("set/getSend/ReceiveBufferSize(): " + e);
> }
>
> }
>
> // Sends the initial connect message.
> // ///////////////////////////////////
> private void sendConnect(String privateName) throws SpreadException {
> // Check the private name for validity.
> // /////////////////////////////////////
> int len = (privateName == null ? 0 : privateName.length());
> if (len > MAX_PRIVATE_NAME) {
> privateName = privateName.substring(0, MAX_PRIVATE_NAME);
> len = MAX_PRIVATE_NAME;
> }
>
> // Allocate the buffer.
> // /////////////////////
> byte buffer[] = new byte[len + 5];
>
> // Set the version.
> // /////////////////
> buffer[0] = (byte) SP_MAJOR_VERSION;
> buffer[1] = (byte) SP_MINOR_VERSION;
> buffer[2] = (byte) SP_PATCH_VERSION;
>
> // Byte used for group membership and priority.
> // /////////////////////////////////////////////
> buffer[3] = 0;
>
> // Group membership.
> // //////////////////
> if (groupMembership) {
> buffer[3] |= 0x01;
> }
>
> // Priority.
> // //////////
> if (priority) {
> buffer[3] |= 0x10;
> }
>
> // Write the length.
> // //////////////////
> buffer[4] = (byte) len;
>
> if (len > 0) {
> // Write the private name.
> // ////////////////////////
> byte nameBytes[] = privateName.getBytes();
> for (int src = 0, dest = 5; src < len; src++, dest++) {
> buffer[dest] = nameBytes[src];
> }
> }
>
> // Send the connection message.
> // /////////////////////////////
> try {
> socketOutput.write(buffer);
> } catch (IOException e) {
> throw new SpreadException("write(): " + e);
> }
> }
>
> // read the Auth List
> // ///////////////////
> private void readAuthMethods() throws SpreadException {
> // Read the length.
> // /////////////////
> int len;
> try {
> len = socketInput.read();
> } catch (IOException e) {
> throw new SpreadException("read(): " + e);
> }
>
> // System.out.println("readAuthMethods: len is " + len);
> // Check for no more data.
> // ////////////////////////
> if (len == -1) {
> throw new SpreadException(
> "Connection closed during connect attempt to read authlen");
> }
> // Check if it was a response code
> // ////////////////////////////////
> if (len < -1) {
> throw new SpreadException("Connection attempt rejected="
> + (byte) len);
> }
>
> // Read the name.
> // ///////////////
> byte buffer[] = new byte[len];
> readBytesFromSocket(buffer, "authname");
> // System.out.println("readAuthMethods: string is " + new
> // String(buffer));
>
> // if(numRead != len)
> // {
> // throw new SpreadException("Connection closed during connect attempt
> // to read authnames");
> // }
>
> // for now we ignore the list.
> // ////////////////////////////
> }
>
> // Sends the choice of auth methods message.
> // ///////////////////////////////////
> private void sendAuthMethod() throws SpreadException {
> int len = authName.length();
> // Allocate the buffer.
> // /////////////////////
> byte buffer[] = new byte[MAX_AUTH_NAME * MAX_AUTH_METHODS];
>
> try {
> System.arraycopy(authName.getBytes("ISO8859_1"), 0, buffer, 0, len);
> } catch (UnsupportedEncodingException e) {
> // Already checked for this exception in connect.
> // ///////////////////////////////////////////////
> }
> for (; len < (MAX_AUTH_NAME * MAX_AUTH_METHODS); len++)
> buffer[len] = 0;
>
> // Send the connection message.
> // /////////////////////////////
> try {
> socketOutput.write(buffer);
> } catch (IOException e) {
> throw new SpreadException("write(): " + e);
> }
> }
>
> //
> // ///////////////////////////////////
> private void instantiateAuthMethod() throws SpreadException {
> Class authclass;
>
> // System.out.println("Authname is " + authName);
> // System.out.println("class name is " + authClassName);
> try {
> authclass = Class.forName(authClassName);
> } catch (ClassNotFoundException e) {
> throw new SpreadException("class " + authClassName
> + " not found.\n");
> }
>
> try {
> authObj = authclass.newInstance();
> } catch (Exception e) {
> throw new SpreadException("class " + authClassName
> + " error getting instance.\n" + e);
> }
> try {
> authMethodAuthenticate = authclass.getMethod("authenticate",
> new Class[] {});
> } catch (NoSuchMethodException e) {
> System.out.println("Failed to find auth method authenticate()");
> System.exit(1);
> } catch (SecurityException e) {
> System.out.println("security exception for method authenticate()");
> System.exit(1);
> }
>
> }
>
> // Checks for an accept message.
> // //////////////////////////////
> private void checkAccept() throws SpreadException {
> // Read the connection response.
> // //////////////////////////////
> int accepted;
> try {
> accepted = socketInput.read();
> } catch (IOException e) {
> throw new SpreadException("read(): " + e);
> }
>
> // Check for no more data.
> // ////////////////////////
> if (accepted == -1) {
> throw new SpreadException(
> "Connection closed during connect attempt");
> }
>
> // Was it accepted?
> // /////////////////
> if (accepted != ACCEPT_SESSION) {
> throw new SpreadException("Connection attempt rejected="
> + (byte) accepted);
> }
> }
>
> // Checks the daemon version.
> // ///////////////////////////
> private void checkVersion() throws SpreadException {
> // Read the version.
> // //////////////////
> int majorVersion;
> try {
> majorVersion = socketInput.read();
> } catch (IOException e) {
> throw new SpreadException("read(): " + e);
> }
>
> // Read the sub-version.
> // //////////////////////
> int minorVersion;
> try {
> minorVersion = socketInput.read();
> } catch (IOException e) {
> throw new SpreadException("read(): " + e);
> }
>
> // Read the patch-version.
> // //////////////////////
> int patchVersion;
> try {
> patchVersion = socketInput.read();
> } catch (IOException e) {
> throw new SpreadException("read(): " + e);
> }
>
> // Check for no more data.
> // ////////////////////////
> if ((majorVersion == -1) || (minorVersion == -1)
> || (patchVersion == -1)) {
> throw new SpreadException(
> "Connection closed during connect attempt");
> }
>
> // Check the version.
> // ///////////////////
> int version = ((majorVersion * 10000) + (minorVersion * 100) + patchVersion);
> if (version < 30100) {
> throw new SpreadException("Old version " + majorVersion + "."
> + minorVersion + "." + patchVersion + " not supported");
> }
> if ((version < 30800) && (priority)) {
> throw new SpreadException("Old version " + majorVersion + "."
> + minorVersion + "." + patchVersion
> + " does not support priority");
> }
> }
>
> // Get the private group name.
> // ////////////////////////////
> private void readGroup() throws SpreadException {
> // Read the length.
> // /////////////////
> int len;
> try {
> len = socketInput.read();
> } catch (IOException e) {
> throw new SpreadException("read(): " + e);
> }
>
> // Check for no more data.
> // ////////////////////////
> if (len == -1) {
> throw new SpreadException(
> "Connection closed during connect attempt");
> }
>
> // Read the name.
> // ///////////////
> byte buffer[] = new byte[len];
> readBytesFromSocket(buffer, "group name");
>
> // Store the group.
> // /////////////////
> group = new SpreadGroup(this, new String(buffer));
> }
>
> // Constructor.
> // /////////////
> /**
> * Initializes a new SpreadConnection object. To connect to a daemon with
> * this object, use
> * {@link SpreadConnection#connect(InetAddress, int, String, boolean, boolean)}.
> *
> * @see SpreadConnection#connect(InetAddress, int, String, boolean, boolean)
> */
>
> String privateName;
> public SpreadConnection() {
> // We're not connected.
> // /////////////////////
> connected = false;
>
> // Init synchros.
> // ///////////////
> rsynchro = new Boolean(false);
> wsynchro = new Boolean(false);
> listenersynchro = new Boolean(false);
> // Init listeners.
> // ////////////////
> basicListeners = new Vector();
> advancedListeners = new Vector();
>
> // Init listener command buffer.
> // //////////////////////////////
> listenerBuffer = new Vector();
>
> // Init default authentication
> // ////////////////////////////
> authName = DEFAULT_AUTH_NAME;
> authClassName = DEFAULT_AUTHCLASS_NAME;
>
> }
>
> /**
> * Sets the authentication name and class string for the client side
> * authentication method. An authentication method can only be registered
> * before connect is called. The authentication method registered will then
> * be used whenever
> * {@link SpreadConnection#connect(InetAddress, int, String, boolean, boolean)}
> * is called.
> *
> * @param authName
> * the short official "name" of the method begin registered.
> * @param authClassName
> * the complete class name for the method (including package)
> * @throws SpreadException
> * if the connection is already established
> */
> synchronized public void registerAuthentication(String authName,
> String authClassName) throws SpreadException {
> // Check if we're connected.
> // //////////////////////////
> if (connected == true) {
> throw new SpreadException("Already connected.");
> }
>
> this.authClassName = authClassName;
>
> try {
> this.authName = authName.substring(0, MAX_AUTH_NAME);
> } catch (IndexOutOfBoundsException e) {
> // Nothing to shorten.
> // ////////////////////
> this.authName = authName;
> }
> }
>
> // Establishes a connection with the spread daemon.
> // /////////////////////////////////////////////////
> /**
> * Establishes a connection to a spread daemon. Groups can be joined, and
> * messages can be sent or received once the connection has been
> * established.
> *
> * @param address
> * the daemon's address, or null to connect to the localhost
> * @param port
> * the daemon's port, or 0 for the default port (4803)
> * @param privateName
> * the private name to use for this connection
> * @param priority
> * if true, this is a priority connection
> * @param groupMembership
> * if true, membership messages will be received on this
> * connection
> * @throws SpreadException
> * if the connection cannot be established
> * @see SpreadConnection#disconnect()
> */
> synchronized public void connect(InetAddress address, int port,
> String privateName, boolean priority, boolean groupMembership)
> throws SpreadException {
> // Check if we're connected.
> // //////////////////////////
> if (connected == true) {
> throw new SpreadException("Already connected.");
> }
>
> // Is ISO8859_1 encoding supported?
> // /////////////////////////////
> try {
> new String("ASCII/ISO8859_1 encoding test").getBytes("ISO8859_1");
> } catch (UnsupportedEncodingException e) {
> throw new SpreadException("ISO8859_1 encoding is not supported.");
> }
>
> // Store member variables.
> // ////////////////////////
> this.address = address;
> this.port = port;
> this.priority = priority;
> this.groupMembership = groupMembership;
> this.privateName = privateName;
>
> // Check if no address was specified.
> // ///////////////////////////////////
> if (address == null) {
> // Use the local host.
> // ////////////////////
> try {
> address = InetAddress.getLocalHost();
> } catch (UnknownHostException e) {
> throw new SpreadException("Error getting local host");
> }
> }
>
> // Check if no port was specified.
> // ////////////////////////////////
> if (port == 0) {
> // Use the default port.
> // //////////////////////
> port = DEFAULT_SPREAD_PORT;
> }
>
> // Check if the port is out of range.
> // ///////////////////////////////////
> if ((port < 0) || (port > (32 * 1024))) {
> throw new SpreadException("Bad port (" + port + ").");
> }
>
> // Create the socket.
> // ///////////////////
> try {
> socket = new Socket(address, port);
> } catch (IOException e) {
> throw new SpreadException("Socket(): " + e);
> }
>
> // Set the socket's buffer sizes.
> // ///////////////////////////////
> setBufferSizes();
>
> // Get the socket's streams.
> // //////////////////////////
> try {
> socketInput = socket.getInputStream();
> socketOutput = socket.getOutputStream();
> } catch (IOException e) {
> throw new SpreadException("getInput/OutputStream(): " + e);
> }
>
> // Send the connect message.
> // //////////////////////////
> sendConnect(privateName);
>
> // Recv the authentication method list
> // ////////////////////////////////////
> readAuthMethods();
>
> // Send auth method choice
> // ////////////////////////
> sendAuthMethod();
>
> // turn string name of auth method into class and instance
> // ////////////////////////////////////////////////////////
> try {
> instantiateAuthMethod();
> } catch (SpreadException e) {
> System.out.println("Failed to create authMethod instance"
> + e.toString());
> System.exit(1);
> }
> // Call authenticate module. This will only return when connection is
> // authenticated
> // //////////////////////////
> try {
> authMethodAuthenticate.invoke(authObj, new Object[] {});
> } catch (IllegalAccessException e) {
> System.out.println("error calling authenticate" + e.toString());
> System.exit(1);
> } catch (IllegalArgumentException e) {
> System.out.println("error calling authenticate" + e.toString());
> System.exit(1);
> } catch (java.lang.reflect.InvocationTargetException e) {
> Throwable thr = e.getTargetException();
> if (thr.getClass().equals(SpreadException.class)) {
> throw new SpreadException(
> "Connection Rejected: Authentication failed");
> }
> }
> // Check for acceptance.
> // //////////////////////
> checkAccept();
>
> // Check the version.
> // ///////////////////
> checkVersion();
>
> // Get the private group name.
> // ////////////////////////////
> readGroup();
>
> // Connection complete.
> // /////////////////////
> connected = true;
>
> // Are there any listeners.
> // /////////////////////////
> if ((basicListeners.size() != 0) || (advancedListeners.size() != 0)) {
> // Start the listener thread.
> // ///////////////////////////
> startListener();
> }
> }
>
> // Disconnects from the spread daemon.
> // ////////////////////////////////////
> /**
> * Disconnects the connection to the daemon. Nothing else should be done
> * with this connection after disconnecting it.
> *
> * @throws SpreadException
> * if there is no connection or there is an error disconnecting
> * @see SpreadConnection#connect(InetAddress, int, String, boolean, boolean)
> */
> synchronized public void disconnect() throws SpreadException {
> // Check if we're connected.
> // //////////////////////////
> if (connected == false) {
> throw new SpreadException("Not connected.");
> }
>
> // Are we in a listener callback?
> // ///////////////////////////////
> if (callingListeners) {
> // Add it to the command buffer.
> // //////////////////////////////
> listenerBuffer.addElement(BUFFER_DISCONNECT);
>
> // Don't need to do anything else.
> // ////////////////////////////////
> return;
> }
>
> // Get a new message.
> // ///////////////////
> SpreadMessage killMessage = new SpreadMessage();
>
> // Send it to our private group.
> // //////////////////////////////
> killMessage.addGroup(group);
>
> // Set the service type.
> // //////////////////////
> killMessage.setServiceType(SpreadMessage.KILL_MESS);
>
> // Send the message.
> // //////////////////
> multicast(killMessage);
>
> // Check for a listener thread.
> // /////////////////////////////
> if (listener != null) {
> // Stop it.
> // /////////
> stopListener();
> }
>
> // Close the socket.
> // //////////////////
> try {
> socket.close();
> } catch (IOException e) {
> throw new SpreadException("close(): " + e);
> }
>
> connected = false;
> }
>
> // Gets the user's private group.
> // ///////////////////////////////
> /**
> * Gets the private group for this connection.
> *
> * @return the SpreadGroup representing this connection's private group, or
> * null if there is no connection
> */
> public SpreadGroup getPrivateGroup() {
> // Check if we're connected.
> // //////////////////////////
> if (connected == false) {
> return null;
> }
>
> return group;
> }
>
> // Receives a new message.
> // ////////////////////////
> /**
> * Receives the next message waiting on this connection. If there are no
> * messages waiting, the call will block until a message is ready to be
> * received.
> *
> * @return the message that has just been received
> * @throws SpreadException
> * if there is no connection or there is any error reading a new
> * message
> */
> public SpreadMessage receive() throws SpreadException,
> InterruptedIOException {
>
> synchronized (rsynchro) {
> // Check if there are any listeners.
> // //////////////////////////////////
> if ((basicListeners.isEmpty() == false)
> || (advancedListeners.isEmpty() == false)) {
> // Get out of here.
> // /////////////////
> throw new SpreadException(
> "Tried to receive while there are listeners");
> }
>
> return internal_receive();
> }
> }
>
>
>
> // Actually receives a new message
> // /////////////////////////////////
> private SpreadMessage internal_receive() throws SpreadException,
> InterruptedIOException {
> // Check if we're connected.
> // //////////////////////////
> if (connected == false) {
> throw new SpreadException("Not connected.");
> }
>
> // Read the header.
> // /////////////////
> byte header[] = new byte[MAX_GROUP_NAME + 16];
> int headerIndex;
> int rcode;
> try {
> for (headerIndex = 0; headerIndex < header.length; headerIndex += rcode) {
>
> rcode = socketInput.read(header, headerIndex, header.length
> - headerIndex);
> if (rcode == -1) {
> /*System.out.println("Connection closed while reading header. Caller will try to reconnect..");
> return null;*/
> throw new SpreadException( "Connection closed while reading header");
> }
> }
> } catch (InterruptedIOException e) {
> throw e;
> } catch (IOException e) {
> throw new SpreadException("read(): " + e);
> }
>
> // Reset header index.
> // ////////////////////
> headerIndex = 0;
>
> // Get service type.
> // //////////////////
> int serviceType = toInt(header, headerIndex);
> headerIndex += 4;
>
> // Get the sender.
> // ////////////////
> SpreadGroup sender = toGroup(header, headerIndex);
> headerIndex += MAX_GROUP_NAME;
>
> // Get the number of groups.
> // //////////////////////////
> int numGroups = toInt(header, headerIndex);
> headerIndex += 4;
>
> // Get the hint/type.
> // ///////////////////
> int hint = toInt(header, headerIndex);
> headerIndex += 4;
>
> // Get the data length.
> // /////////////////////
> int dataLen = toInt(header, headerIndex);
> headerIndex += 4;
>
> // Does the header need to be flipped?
> // (Checking for a daemon server endian-mismatch)
> // ///////////////////////////////////////////////
> boolean daemonEndianMismatch;
> if (sameEndian(serviceType) == false) {
> // Flip them.
> // ///////////
> serviceType = flip(serviceType);
> numGroups = flip(numGroups);
> dataLen = flip(dataLen);
>
> // The daemon endian-mismatch.
> // ////////////////////////////
> daemonEndianMismatch = true;
> } else {
> // The daemon endian-mismatch.
> // ////////////////////////////
> daemonEndianMismatch = false;
> }
>
> // Validate numGroups and dataLen
>
> //WORKAROUND FOR NEGATIVEARRAYEXCEPTION AND OUTOFMEMORYERROR - START
> if (((numGroups * MAX_GROUP_NAME) < 0)) {// NEGATIVE SIZE RECIEVED
> System.out.println("NEGATIVE SIZE RECIEVED "
> + (numGroups * MAX_GROUP_NAME));
> // read whatever is available in the stream and ignore
> int len = -1;
> try {
> while ((len = socketInput.available()) > 0) {
> byte[] temp = new byte[len];
> socketInput.read(temp, 0, len);
> System.out.println("Read dummy data " + (new String(temp)));
> }
> } catch (IOException e) {
>
> }
> //return null;
> throw new SpreadException( "Recieved corrupted header from Spread daemon");
> }else if(numGroups > 30000) {
> //some large number which we hope will not occur in normal usage
> //if allowed to continue, this might cause OutOfMemory error.
> ////////////////////////////////////////////////////////////////
> System.out.println("Corrupted header recieved from spread. numGroups = " + numGroups);
> throw new SpreadException( "Recieved corrupted header from Spread daemon");
> }
> //WORKAROUND FOR NEGATIVEARRAYEXCEPTION AND OUTOFMEMORYERROR - END
>
>
> if ((numGroups < 0) || (dataLen < 0)) {
> // drop message
> throw new SpreadException("Illegal Message: Message Dropped");
> }
>
> // An endian mismatch.
> // ////////////////////
> boolean endianMismatch;
>
> // The type.
> // //////////
> short type;
>
> // Is this a regular message?
> // ///////////////////////////
> if (SpreadMessage.isRegular(serviceType)
> || SpreadMessage.isReject(serviceType)) {
> // Does the hint need to be flipped?
> // (Checking for a sender endian-mismatch)
> // ////////////////////////////////////////
> if (sameEndian(hint) == false) {
> hint = flip(hint);
> endianMismatch = true;
> } else {
> endianMismatch = false;
> }
>
> // Get the type from the hint.
> // ////////////////////////////
> hint = clearEndian(hint);
> hint >>= 8;
> hint &= 0x0000FFFF;
> type = (short) hint;
> } else {
> // This is not a regular message.
> // ///////////////////////////////
> type = -1;
> endianMismatch = false;
> }
>
> if (SpreadMessage.isReject(serviceType)) {
> // Read in the old type and or with reject type field.
> byte oldtypeBuffer[] = new byte[4];
> try {
> for (int oldtypeIndex = 0; oldtypeIndex < oldtypeBuffer.length;) {
> rcode = socketInput.read(oldtypeBuffer, oldtypeIndex,
> oldtypeBuffer.length - oldtypeIndex);
> if (rcode == -1) {
> throw new SpreadException(
> "Connection closed while reading groups");
> }
> oldtypeIndex += rcode;
> }
> } catch (InterruptedIOException e) {
> throw e;
> } catch (IOException e) {
> throw new SpreadException("read(): " + e);
> }
> int oldType = toInt(oldtypeBuffer, 0);
> if (sameEndian(serviceType) == false)
> oldType = flip(oldType);
>
> serviceType = (SpreadMessage.REJECT_MESS | oldType);
> }
>
> // Read in the group names.
> // /////////////////////////
> byte buffer[] = new byte[numGroups * MAX_GROUP_NAME];
> try {
> for (int bufferIndex = 0; bufferIndex < buffer.length;) {
> rcode = socketInput.read(buffer, bufferIndex, buffer.length
> - bufferIndex);
> if (rcode == -1) {
> throw new SpreadException(
> "Connection closed while reading groups");
> }
> bufferIndex += rcode;
> }
> } catch (InterruptedIOException e) {
> throw e;
> } catch (IOException e) {
> throw new SpreadException("read(): " + e);
> }
>
> // Clear the endian type.
> // ///////////////////////
> serviceType = clearEndian(serviceType);
>
> // Get the groups from the buffer.
> // ////////////////////////////////
> Vector groups = new Vector(numGroups);
> for (int bufferIndex = 0; bufferIndex < buffer.length; bufferIndex += MAX_GROUP_NAME) {
> // Translate the name into a group and add it to the vector.
> // //////////////////////////////////////////////////////////
> groups.addElement(toGroup(buffer, bufferIndex));
> }
>
> // Read in the data.
> // //////////////////
> byte data[] = new byte[dataLen];
> try {
> for (int dataIndex = 0; dataIndex < dataLen;) {
> rcode = socketInput.read(data, dataIndex, dataLen - dataIndex);
> if (rcode == -1) {
> /*System.out.println("Connection close while reading data. Caller will try to reconnect..");
> return null;*/
> throw new SpreadException(
> "Connection close while reading data");
> }
> dataIndex += rcode;
> }
> } catch (InterruptedIOException e) {
> throw e;
> } catch (IOException e) {
> throw new SpreadException("read():" + e);
> }
>
> // Is it a membership message?
> // ////////////////////////////
> MembershipInfo membershipInfo;
> if (SpreadMessage.isMembership(serviceType)) {
> // Create a membership info object.
> // /////////////////////////////////
> membershipInfo = new MembershipInfo(this, serviceType, groups,
> sender, data, daemonEndianMismatch);
>
> // Is it a regular membership message?
> // ////////////////////////////////////
> if (membershipInfo.isRegularMembership()) {
> // Find which of these groups is the local connection.
> // ////////////////////////////////////////////////////
> type = (short) groups.indexOf(group);
> }
> } else {
> // There's no membership info.
> // ////////////////////////////
> membershipInfo = null;
> }
>
> // Create the message.
> // ////////////////////
> return new SpreadMessage(serviceType, groups, sender, data, type,
> endianMismatch, membershipInfo);
> }
>
> // Receives numMessages new messages.
> // ///////////////////////////////////
> /**
> * Receives <code>numMessages</code> messages on the connection and
> * returns them in an array. If there are not <code>numMessages</code>
> * messages waiting, the call will block until there are enough messages
> * available.
> *
> * @param numMessages
> * the number of messages to receive
> * @return an array of messages
> * @throws SpreadException
> * if there is no connection or if there is any error reading
> * the messages
> */
> public SpreadMessage[] receive(int numMessages) throws SpreadException,
> InterruptedIOException {
> // Allocate the messages array.
> // /////////////////////////////
> SpreadMessage[] messages = new SpreadMessage[numMessages];
> synchronized (rsynchro) {
> // Check if there are any listeners.
> // //////////////////////////////////
> if ((basicListeners.isEmpty() == false)
> || (advancedListeners.isEmpty() == false)) {
> // Get out of here.
> // /////////////////
> throw new SpreadException(
> "Tried to receive while there are listeners");
> }
>
> // Receive the messages.
> // //////////////////////
> for (int i = 0; i < numMessages; i++) {
> messages[i] = internal_receive();
> }
>
> }
> // Return the array.
> // //////////////////
> return messages;
> }
>
> // Returns true if there are messages waiting.
> // ////////////////////////////////////////////
> /**
> * Returns true if there are any messages waiting on this connection.
> *
> * @return true if there is at least one message that can be received
> * @throws SpreadException
> * if there is no connection or if there is an error checking
> * for messages
> */
> public boolean poll() throws SpreadException {
> // Check if we're connected.
> // //////////////////////////
> if (connected == false) {
> throw new SpreadException("Not connected.");
> }
>
> // Check if there is anything waiting.
> // ////////////////////////////////////
> try {
> if (socketInput.available() == 0) {
> // There's nothing to read.
> // /////////////////////////
> return false;
> }
> } catch (IOException e) {
> throw new SpreadException("available(): " + e);
> }
>
> // There's something to read.
> // ///////////////////////////
> return true;
> }
>
> // Private function for starting a listener thread.
> // /////////////////////////////////////////////////
> private void startListener() {
> // Get a new thread.
> // //////////////////
> listener = new Listener(this);
>
> // Start it.
> // //////////
> listener.start();
> }
>
> // Adds a new basic message listener.
> // ///////////////////////////////////
> /**
> * Adds the BasicMessageListener to this connection. If there are no other
> * listeners, this call will start a thread to listen for new messages. From
> * the time this function is called until this listener is removed,
> * {@link BasicMessageListener#messageReceived(SpreadMessage)}will be
> * called every time a message is received.
> *
> * @param listener
> * a BasicMessageListener to add to this connection
> * @see SpreadConnection#remove(BasicMessageListener)
> */
> public void add(BasicMessageListener listener) {
>
> synchronized (listenersynchro) {
> // Are we in a listener callback?
> // ///////////////////////////////
>
> if (callingListeners) {
> // Add it to the command buffer.
> // //////////////////////////////
> listenerBuffer.addElement(BUFFER_ADD_BASIC);
> listenerBuffer.addElement(listener);
> // Don't need to do anything else.
> // ////////////////////////////////
> return;
> }
> // Add the listener.
> // //////////////////
> basicListeners.addElement(listener);
>
> // Check if we're connected.
> // //////////////////////////
> if (connected == true) {
> // Check if the thread is running.
> // ////////////////////////////////
> if (this.listener == null) {
> // Start the thread.
> // //////////////////
>
> startListener();
> }
> }
> }
> }
>
> // Adds a new advanced message listener.
> // //////////////////////////////////////
> /**
> * Adds the AdvancedMessageListener to this connection. If there are no
> * other listeners, this call will start a thread to listen for new
> * messages. From the time this function is called until this listener is
> * removed,
> * {@link AdvancedMessageListener#regularMessageReceived(SpreadMessage)}
> * will be called every time a regular message is received, and
> * {@link AdvancedMessageListener#membershipMessageReceived(SpreadMessage)}
> * will be called every time a membership message is received.
> *
> * @param listener
> * an AdvancedMessageListener to add to this connection
> * @see SpreadConnection#remove(AdvancedMessageListener)
> */
> public void add(AdvancedMessageListener listener) {
> synchronized (listenersynchro) {
> // Are we in a listener callback?
> // ///////////////////////////////
> if (callingListeners) {
> // Add it to the command buffer.
> // //////////////////////////////
> listenerBuffer.addElement(BUFFER_ADD_ADVANCED);
> listenerBuffer.addElement(listener);
>
> // Don't need to do anything else.
> // ////////////////////////////////
> return;
> }
>
> // Add the listener.
> // //////////////////
> advancedListeners.addElement(listener);
>
> // Check if we're connected.
> // //////////////////////////
> if (connected == true) {
> // Check if the thread is running.
> // ////////////////////////////////
> if (this.listener == null) {
> // Start the thread.
> // //////////////////
> startListener();
> }
> }
> }
> }
>
> // Stops the listener thread.
> // ///////////////////////////
> private void stopListener() {
> // Set the signal.
> // ////////////////
> listener.signal = true;
>
> // Clear the variable.
> // ////////////////////
> listener = null;
> }
>
> // Removes a basic message listener.
> // //////////////////////////////////
> /**
> * Removes the BasicMessageListener from this connection. If this is the
> * only listener on this connection, the listener thread will be stopped.
> *
> * @param listener
> * the listener to remove
> * @see SpreadConnection#add(BasicMessageListener)
> */
> public void remove(BasicMessageListener listener) {
> synchronized (listenersynchro) {
> // Are we in a listener callback?
> // ///////////////////////////////
> if (callingListeners) {
> // Add it to the command buffer.
> // //////////////////////////////
> listenerBuffer.addElement(BUFFER_REMOVE_BASIC);
> listenerBuffer.addElement(listener);
>
> // Don't need to do anything else.
> // ////////////////////////////////
> return;
> }
>
> // Remove the listener.
> // /////////////////////
> basicListeners.removeElement(listener);
>
> // Check if we're connected.
> // //////////////////////////
> if (connected == true) {
> // Check if there are any more listeners.
> // ///////////////////////////////////////
> if ((basicListeners.size() == 0)
> && (advancedListeners.size() == 0)) {
> // Stop the listener thread.
> // //////////////////////////
> stopListener();
> }
> }
> }
> }
>
> // Removes an advanced message listener.
> // //////////////////////////////////////
> /**
> * Removes the AdvancedMessageListener from this connection. If this is the
> * only listener on this connection, the listener thread will be stopped.
> *
> * @param listener
> * the listener to remove
> * @see SpreadConnection#add(AdvancedMessageListener)
> */
> public void remove(AdvancedMessageListener listener) {
> synchronized (listenersynchro) {
> // Are we in a listener callback?
> // ///////////////////////////////
> if (callingListeners) {
> // Add it to the command buffer.
> // //////////////////////////////
> listenerBuffer.addElement(BUFFER_REMOVE_ADVANCED);
> listenerBuffer.addElement(listener);
>
> // Don't need to do anything else.
> // ////////////////////////////////
> return;
> }
>
> // Remove the listener.
> // /////////////////////
> advancedListeners.removeElement(listener);
>
> // Check if we're connected.
> // //////////////////////////
> if (connected == true) {
> // Check if there are any more listeners.
> // ///////////////////////////////////////
> if ((basicListeners.size() == 0)
> && (advancedListeners.size() == 0)) {
> // Stop the listener thread.
> // //////////////////////////
> stopListener();
> }
> }
> }
> }
>
> // This is the thread used to handle listener interfaces.
> // ///////////////////////////////////////////////////////
> private class Listener extends Thread {
> // The connection this thread is listening to.
> // ////////////////////////////////////////////
> private SpreadConnection connection;
>
> // If true, the connection wants the thread to stop.
> // //////////////////////////////////////////////////
> protected boolean signal;
>
> // The constructor.
> // /////////////////
> public Listener(SpreadConnection connection) {
> // Store local variables.
> // ///////////////////////
> this.connection = connection;
> this.signal = false;
>
> // Be a daemon.
> // /////////////
> this.setDaemon(true);
> }
>
> // The thread's entry point.
> // //////////////////////////
> public void run() {
> // An incoming message.
> // /////////////////////
> SpreadMessage message = null;
>
> // A basic listener.
> // //////////////////
> BasicMessageListener basicListener;
>
> // An advanced listener.
> // //////////////////////
> AdvancedMessageListener advancedListener;
>
> // A buffered command.
> // ////////////////////
> Object command;
>
> int previous_socket_timeout = 100;
>
> try {
> try {
> previous_socket_timeout = connection.socket.getSoTimeout();
> System.out.println("using timeout 1000");
>
> connection.socket.setSoTimeout(1000);
> } catch (SocketException e) {
> // just ignore for now
> System.out.println("socket error setting timeout"
> + e.toString());
> }
>
> while (true) {
> // Get a lock on the connection.
>
> // //////////////////////////////
> synchronized (connection) {
> // Do they want us to stop?
> // /////////////////////////
> if (signal == true) {
> // We're done.
> // ////////////
> System.out
> .println("LISTENER: told to exit so returning");
> try {
> connection.socket
> .setSoTimeout(previous_socket_timeout);
> } catch (SocketException e) {
> // just ignore for now
> System.out
> .println("socket error setting timeout"
> + e.toString());
> }
>
> return;
> }
>
> // Get a message.
> // WE WILL BLOCK HERE UNTIL DATA IS AVAILABLE
> // or 100 MS expires
> // ///////////////
> try {
> synchronized (rsynchro) {
> message = connection.internal_receive();
>
> }
> // Calling listeners.
> // ///////////////////
> callingListeners = true;
>
> // Tell all the basic listeners.
> // //////////////////////////////
> for (int i = 0; i < basicListeners.size(); i++) {
> // Get the listener.
> // //////////////////
> basicListener = (BasicMessageListener) basicListeners
> .elementAt(i);
>
> // Tell it.
> // /////////
> basicListener.messageReceived(message);
> }
>
> // Tell all the advanced listeners.
> // /////////////////////////////////
> for (int i = 0; i < advancedListeners.size(); i++) {
> // Get the listener.
> // //////////////////
> advancedListener = (AdvancedMessageListener) advancedListeners
> .elementAt(i);
>
> // What type of message is it?
> // ////////////////////////////
>
> if(message == null) {
> advancedListener
> .regularMessageReceived(message);
> }else
> if (message.isRegular()) {
> // Tell it.
> // /////////
> advancedListener
> .regularMessageReceived(message);
> } else {
> // Tell it.
> // /////////
> advancedListener
> .membershipMessageReceived(message);
> }
> }
>
> // Done calling listeners.
> // ////////////////////////
> callingListeners = false;
>
> } catch (InterruptedIOException e) {
>
> }
> // Execute buffered commands.
> // ///////////////////////////
> while (listenerBuffer.isEmpty() == false) {
> // Get the first command.
> // ///////////////////////
> command = listenerBuffer.firstElement();
>
> // Remove it from the list.
> // /////////////////////////
> listenerBuffer.removeElementAt(0);
>
> // Check what type of command it is.
> // //////////////////////////////////
> if (command == BUFFER_DISCONNECT) {
> // Disconnect.
> // ////////////
> connection.disconnect();
>
> // Don't execute any more commands.
> // /////////////////////////////////
> listenerBuffer.removeAllElements();
> } else if (command == BUFFER_ADD_BASIC) {
> // Get the listener.
> // //////////////////
> basicListener = (BasicMessageListener) listenerBuffer
> .firstElement();
>
> // Add it.
> // ////////
> connection.add(basicListener);
> // Remove the listener from the Vector.
> // /////////////////////////////////////
> listenerBuffer.removeElementAt(0);
>
> } else if (command == BUFFER_ADD_ADVANCED) {
> // Get the listener.
> // //////////////////
> advancedListener = (AdvancedMessageListener) listenerBuffer
> .firstElement();
>
> // Add it.
> // ////////
> connection.add(advancedListener);
> // Remove the listener from the Vector.
> // /////////////////////////////////////
> listenerBuffer.removeElementAt(0);
>
> } else if (command == BUFFER_REMOVE_BASIC) {
> // Get the listener.
> // //////////////////
> basicListener = (BasicMessageListener) listenerBuffer
> .firstElement();
>
> // Remove it.
> // ///////////
> connection.remove(basicListener);
>
> // Remove the listener from the Vector.
> // /////////////////////////////////////
> listenerBuffer.removeElementAt(0);
> } else if (command == BUFFER_REMOVE_ADVANCED) {
> // Get the listener.
> // //////////////////
> advancedListener = (AdvancedMessageListener) listenerBuffer
> .firstElement();
>
> // Remove it.
> // ///////////
> connection.remove(advancedListener);
> // Remove the listener from the Vector.
> // /////////////////////////////////////
> listenerBuffer.removeElementAt(0);
> }
> }
> }
>
> // There are no messages waiting, take a break.
> // /////////////////////////////////////////////
> yield();
> }
> } catch (SpreadException e) {
>
> System.out.println("Connection closed by spread. Callers will try to reconnect.");
> System.out.println("basicListeners " + basicListeners.size());
> System.out.println("advancedListeners " + advancedListeners.size());
> System.out.println("message " + message);
> // Tell all the basic listeners to reconnect.
> // //////////////////////////////
> for (int i = 0; i < basicListeners.size(); i++) {
> // Get the listener.
> // //////////////////
> basicListener = (BasicMessageListener) basicListeners
> .elementAt(i);
>
> // Tell it.
> // /////////
> basicListener.messageReceived(null);
> }
>
> // Tell all the advanced listeners to reconnect.
> // /////////////////////////////////
> for (int i = 0; i < advancedListeners.size(); i++) {
> // Get the listener.
> // //////////////////
> advancedListener = (AdvancedMessageListener) advancedListeners
> .elementAt(i);
>
> advancedListener
> .regularMessageReceived(null);
> }
> System.out.println("SpreadException: " + e.toString());
> }
> }
> }
>
> // Sends the message.
> // ///////////////////
> /**
> * Multicasts a message. The message will be sent to all the groups
> * specified in the message.
> *
> * @param message
> * the message to multicast
> * @throws SpreadException
> * if there is no connection or if there is any error sending
> * the message
> */
> public void multicast(SpreadMessage message) throws SpreadException {
> // Check if we're connected.
> // //////////////////////////
> if (connected == false) {
> throw new SpreadException("Not connected.");
> }
>
> // The groups this message is going to.
> // /////////////////////////////////////
> SpreadGroup groups[] = message.getGroups();
>
> // The message data.
> // //////////////////
> byte data[] = message.getData();
>
> // Calculate the total number of bytes.
> // /////////////////////////////////////
> int numBytes = 16; // serviceType, numGroups, type/hint, dataLen
> numBytes += MAX_GROUP_NAME; // private group
> numBytes += (MAX_GROUP_NAME * groups.length); // groups
>
> if (numBytes + data.length > MAX_MESSAGE_LENGTH) {
> throw new SpreadException(
> "Message is too long for a Spread Message");
> }
> // Allocate the send buffer.
> // //////////////////////////
> byte buffer[] = new byte[numBytes];
> int bufferIndex = 0;
>
> // The service type.
> // //////////////////
> toBytes(message.getServiceType(), buffer, bufferIndex);
> bufferIndex += 4;
>
> // The private group.
> // ///////////////////
> toBytes(group, buffer, bufferIndex);
> bufferIndex += MAX_GROUP_NAME;
>
> // The number of groups.
> // //////////////////////
> toBytes(groups.length, buffer, bufferIndex);
> bufferIndex += 4;
>
> // The service type and hint.
> // ///////////////////////////
> toBytes(((int) message.getType() << 8) & 0x00FFFF00, buffer,
> bufferIndex);
> bufferIndex += 4;
>
> // The data length.
> // /////////////////
> toBytes(data.length, buffer, bufferIndex);
> bufferIndex += 4;
>
> // The group names.
> // /////////////////
> for (int i = 0; i < groups.length; i++) {
> toBytes(groups[i], buffer, bufferIndex);
> bufferIndex += MAX_GROUP_NAME;
> }
>
> // Send it.
> // /////////
> synchronized (wsynchro) {
> try {
> socketOutput.write(buffer);
> socketOutput.write(data);
> } catch (IOException e) {
> throw new SpreadException("write(): " + e.toString());
> }
> }
> }
>
> // Sends the array of messages.
> // /////////////////////////////
> /**
> * Multicasts an array of messages. Each message will be sent to all the
> * groups specified in the message.
> *
> * @param messages
> * the messages to multicast
> * @throws SpreadException
> * if there is no connection or if there is any error sending
> * the messages
> */
> public void multicast(SpreadMessage messages[]) throws SpreadException {
> // Go through the array.
> // //////////////////////
> for (int i = 0; i < messages.length; i++) {
> // Send this message.
> // ///////////////////
> multicast(messages[i]);
> }
> }
>
> public boolean isConnected() {
> return connected;
> }
> }
>
> ______________________________________________________________________
> _______________________________________________
> Spread-users mailing list
> Spread-users at lists.spread.org
> http://lists.spread.org/mailman/listinfo/spread-users
More information about the Spread-users
mailing list