[Spread-users] Spread and select

DaiLin(戴霖) dailin at founder.com
Sun Apr 9 11:55:50 EDT 2006


> Hua Zhong wrote:

> Good, but there are a couple of issues:
> 1. it's not documented, thus making me wonder if it's officially supported feature. In fact, there is not even a SP_mailbox_to_fd
> kind of helper. How could one assume mailbox == fd? It's implementation details that could change anytime, and nothing to depend on.
> 2. there is still not a non-blocking version SP_receive. As I explained earlier, it's not very nice to programers.
>
> I just checked the spread code (v3.17.3) and find the following in sp.c:
>
>               while(((ret = recv( mbox, &scat_mess->elements[scat_index].buf[byte_index], to_read, 0 )) == -1 )
>                                       && ((sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == EWOULDBLOCK)) )
>
> This is very weird. If it's a blocking socket, it should never return EGAIN. If it's a non-blocking socket, the above effectily will
> spin forever until there is data.
>
> Anyway, I think it would be a great improvement if non-blocking API could be provided.
>
> Hua
>

I have made some change to spread code to add a non-blocking receive method based on source code of version 3.17.2.

The following is my code:

int     SP_receive( mailbox mbox, service *service_type, char sender[MAX_GROUP_NAME],
                           int max_groups, int *num_groups, char groups[][MAX_GROUP_NAME],
                           int16 *mess_type, int *endian_mismatch,
                           int max_mess_len, char *mess )
{
        return SP_receive_timeout(mbox, service_type, sender, max_groups, num_groups, groups, mess_type,
                endian_mismatch, max_mess_len, mess, 0 );
}

int     SP_scat_receive( mailbox mbox, service *service_type,
                                        char sender[MAX_GROUP_NAME], int max_groups,
                                        int *num_groups, char groups[][MAX_GROUP_NAME],
                                        int16 *mess_type, int *endian_mismatch,
                                        scatter *scat_mess )
{
        return SP_scat_receive_timeout( mbox, service_type, sender, max_groups, num_groups, groups,
                mess_type, endian_mismatch, scat_mess, 0 );
}

int     SP_receive_timeout( mailbox mbox, service *service_type, char sender[MAX_GROUP_NAME],
                                           int max_groups, int *num_groups, char groups[][MAX_GROUP_NAME],
                                           int16 *mess_type, int *endian_mismatch,
                                           int max_mess_len, char *mess, long time_out )
{
        int             ret;
        scatter         recv_scat;

        recv_scat.num_elements = 1;
        recv_scat.elements[0].len = max_mess_len;
        recv_scat.elements[0].buf = mess;

        ret = SP_scat_receive_timeout( mbox, service_type, sender, max_groups, num_groups, groups,
                mess_type, endian_mismatch, &recv_scat, time_out );
        return( ret );
}

int     SP_scat_receive_timeout( mailbox mbox, service *service_type, char sender[MAX_GROUP_NAME],
                                                        int max_groups, int *num_groups, char groups[][MAX_GROUP_NAME],
                                                        int16 *mess_type, int *endian_mismatch,
                                                        scatter *scat_mess, long time_out )
{

        static  char            dummy_buf[10240];
        int          This_session_message_saved;
        int          drop_semantics;
        message_header  mess_head;
        message_header  *head_ptr;
        char            *buf_ptr;
        int16           temp_mess_type;
        int             len, remain, ret;
        int             max_mess_len;
        int             short_buffer;
        int             short_groups;
        int             to_read;
        int             scat_index, byte_index;
        int             ses;
        char            This_session_private_group[MAX_GROUP_NAME];
        int             i;
        int32      old_type;

        fd_set fd;
        struct timeval tv;
        struct timeval *ptv = NULL;

        /* I must acquire the lock for this mbox before the Struct_mutex lock because
        * I must be sure ONLY one thread is in recv for this mbox, EVEN for
        * this initial 'get the session and session state' operation.
        * Otherwise one thread enters this and gets the state and sees no saved message
        * then grabs the mbox mutex and discoveres buffer too short and so regrabs the
        * Struct_mutex and adds the saved header, but during this time another thread
        * has entered recv for the same mbox and already grabbed the struct_mutex and also
        * read that no saved mesage exists and is now waiting for the mbox mutex.
        * When it the first thread returns and releases the mbox mutex, the second thread will
        * grab it and enter--but it will think there is NO saved messaage when in reality
        * there IS one. This will cause MANY PROBLEMS :-)
        *
        * NOTE: locking and unlocking the Struct_mutex multiple times during this is OK
        * BECAUSE struct_Mutex only locks non-blocking operations that are guaranteed to complete
        * quickly and never take additional locks.
        */
        Mutex_lock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );

        Mutex_lock( &Struct_mutex );
        /* verify mbox */
        ses = SP_get_session( mbox );
        if( ses < 0 ){
                Mutex_unlock( &Struct_mutex );
                Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
                return( ILLEGAL_SESSION );
        }
        strcpy( This_session_private_group, Sessions[ses].private_group_name );

        if (Sessions[ses].recv_message_saved) {
                memcpy(&mess_head, &(Sessions[ses].recv_saved_head), sizeof(message_header) );
                This_session_message_saved = 1;
        } else {
                This_session_message_saved = 0;
        }

        Mutex_unlock( &Struct_mutex );

        head_ptr = (message_header *)&mess_head;
        buf_ptr = (char *)&mess_head;

        drop_semantics = Is_drop_recv(*service_type);

        if (time_out < 0)
                time_out = 0;

        if (!This_session_message_saved) {
                /* read up to size of message_header */
                for( len=0, remain = sizeof(message_header); remain > 0;  len += ret, remain -= ret )
                {
                        if (time_out > 0)
                        {
                                tv.tv_sec = time_out / 1000;
                                tv.tv_usec = (time_out - ((long)(time_out / 1000)) * 1000) * 1000;
                                ptv = &tv;
                        }

                        do
                        {
                                fd.fd_count = 1;
                                fd.fd_array[0] = mbox;
                        }
                        while((ret = select(0, &fd, NULL, NULL, ptv)) < 0);

                        if (ret == 0)
                        {
                                Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
                                return( RECEIVE_TIMEOUT );
                        }

                        ret = recv( mbox, &buf_ptr[len], remain, 0 );
                        //while(((ret = recv( mbox, &buf_ptr[len], remain, 0 )) == -1 )
                        //      && ((sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == EWOULDBLOCK)) )
                        //      ;
                        if( ret <=0 )
                        {
                                Alarm( SESSION, "SP_scat_receive: failed receiving header on session %d (ret: %d len: %d): %s\n", mbox, ret, len, sock_strerror(sock_errno) );
                                Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );

                                SP_kill( mbox );
                                return( CONNECTION_CLOSED );
                        }
                }

                /* Fliping message header to my form if needed */
                if( !Same_endian( head_ptr->type ) )
                {
                        Flip_mess( head_ptr );
                }
        }
        /* Validate user's scatter */
        for( max_mess_len = 0, i=0; i < scat_mess->num_elements; i++ ) {
                if ( scat_mess->elements[i].len < 0 )   {
                        if ( !drop_semantics && !This_session_message_saved) {
                                Mutex_lock( &Struct_mutex );
                                ses = SP_get_session( mbox );
                                if( ses < 0 ){
                                        Mutex_unlock( &Struct_mutex );
                                        Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
                                        return( ILLEGAL_SESSION );
                                }
                                memcpy(&(Sessions[ses].recv_saved_head), &mess_head, sizeof(message_header) );
                                Sessions[ses].recv_message_saved = 1;
                                Mutex_unlock( &Struct_mutex );
                        }
                        return( ILLEGAL_MESSAGE );
                }
                max_mess_len += scat_mess->elements[i].len;
        }
        /* Validate num_groups and data_len */
        if (head_ptr->num_groups < 0) {
                /* reject this message since it has an impossible (negative) num_groups
                * This is likely to be caused by a malicious attack or memory corruption
                */
                return( ILLEGAL_MESSAGE );
        }
        if (head_ptr->data_len < 0) {
                /* reject this message since it has an impossible (negative) data_len
                * This is likely to be caused by a malicious attack or memory corruption
                */
                return( ILLEGAL_MESSAGE );
        }

        /* Check if sufficient buffer space for groups and data */
        if (!drop_semantics) {
                if ( (head_ptr->num_groups > max_groups) || (head_ptr->data_len > max_mess_len) ) {
                        if (!This_session_message_saved) {
                                Mutex_lock( &Struct_mutex );
                                ses = SP_get_session( mbox );
                                if( ses < 0 ){
                                        Mutex_unlock( &Struct_mutex );
                                        Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
                                        return( ILLEGAL_SESSION );
                                }
                                memcpy(&(Sessions[ses].recv_saved_head), &mess_head, sizeof(message_header) );
                                Sessions[ses].recv_message_saved = 1;
                                Mutex_unlock( &Struct_mutex );
                        }
                        if ( Is_regular_mess( head_ptr->type ) || Is_reject_mess( head_ptr->type ) )
                        {
                                temp_mess_type = head_ptr->hint;
                                if ( !Same_endian( head_ptr->hint ) ) {
                                        temp_mess_type = Flip_int32( temp_mess_type );
                                }
                                *mess_type = ( temp_mess_type >> 8 ) & 0x0000ffff;
                        }
                        else
                                *mess_type = 0;
                        *service_type = Clear_endian( head_ptr->type );
                        if (head_ptr->num_groups > max_groups)
                                *num_groups = -(head_ptr->num_groups);
                        else 
                                *num_groups = 0;
                        if (head_ptr->data_len > max_mess_len)
                                *endian_mismatch = -(head_ptr->data_len);
                        else
                                *endian_mismatch = 0;
                        Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
                        if (*num_groups)
                                return( GROUPS_TOO_SHORT );
                        else
                                return( BUFFER_TOO_SHORT );
                }
        }
        /* Compute mess_type and endian_mismatch from hint */
        if( Is_regular_mess( head_ptr->type ) || Is_reject_mess( head_ptr->type)  )
        {
                if( !Same_endian( head_ptr->hint ) )
                {
                        head_ptr->hint = Flip_int32( head_ptr->hint );
                        head_ptr->hint = Clear_endian( head_ptr->hint );
                        head_ptr->hint = ( head_ptr->hint >> 8 ) & 0x0000ffff;
                        *mess_type = head_ptr->hint;
                        *endian_mismatch = 1;
                }else{
                        head_ptr->hint = Clear_endian( head_ptr->hint );
                        head_ptr->hint = ( head_ptr->hint >> 8 ) & 0x0000ffff;
                        *mess_type = head_ptr->hint;
                        *endian_mismatch = 0;
                }
        }else{
                *mess_type = -1; /* marks the index (0..n-1) of the member in the group */
                *endian_mismatch = 0;
        }

        strcpy( sender, head_ptr->private_group_name );

        /* if a reject message read the extra old_type field first, and merge with head_ptr->type */
        if ( Is_reject_mess( head_ptr->type ) )
        {
                remain = 4;
                buf_ptr = (char *)&old_type;
                for( len=0; remain > 0; len += ret, remain -= ret )
                {
                        while(((ret = recv( mbox, &buf_ptr[len], remain, 0 )) == -1 ) && ((sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == EWOULDBLOCK)) )
                                ;
                        if( ret <=0 )
                        {
                                Alarm( SESSION, "SP_scat_receive: failed receiving old_type for reject on session %d, ret is %d: %s\n", mbox, ret, sock_strerror(sock_errno));
                                Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
                                SP_kill( mbox );
                                return( CONNECTION_CLOSED );
                        }
                }
                /* endian flip it */
                if ( !Same_endian( head_ptr->type ) )
                        old_type = Flip_int32(old_type);
        }

        /* read the destination groups */
        buf_ptr = (char *)groups;

        remain = head_ptr->num_groups * MAX_GROUP_NAME;
        short_groups=0;
        if( head_ptr->num_groups > max_groups )
        {
                /* groups too short */
                remain = max_groups * MAX_GROUP_NAME;
                short_groups = 1;
        }

        for( len=0; remain > 0; len += ret, remain -= ret )
        {
                while(((ret = recv( mbox, &buf_ptr[len], remain, 0 )) == -1 ) && ((sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == EWOULDBLOCK)) )
                        ;
                if( ret <=0 )
                {
                        Alarm( SESSION, "SP_scat_receive: failed receiving groups on session %d, ret is %d: %s\n", mbox, ret, sock_strerror(sock_errno));
                        Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );

                        SP_kill( mbox );
                        return( CONNECTION_CLOSED );
                }
        }

        if( short_groups )
        {
                for( remain = (head_ptr->num_groups - max_groups) * MAX_GROUP_NAME;
                        remain > 0; remain -= ret )
                {
                        to_read = remain;
                        if( to_read > sizeof( dummy_buf ) ) to_read = sizeof( dummy_buf );
                        while(((ret = recv( mbox, dummy_buf, to_read, 0 )) == -1 ) && ((sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == EWOULDBLOCK)) )
                                ;
                        if( ret <=0 )
                        {
                                Alarm( SESSION, "SP_scat_receive: failed receiving groups overflow on session %d, ret is %d: %s\n",
                                        mbox, ret, sock_strerror(sock_errno) );
                                Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
                                SP_kill( mbox );
                                return( CONNECTION_CLOSED );
                        }
                }
                *num_groups = -head_ptr->num_groups; /* !!!! */
        }else   *num_groups = head_ptr->num_groups;

        /* read the rest of the message */
        remain = head_ptr->data_len;
        short_buffer=0;
        if( head_ptr->data_len > max_mess_len )
        {
                /* buffer too short */
                remain = max_mess_len;
                short_buffer = 1;
        }

        ret = 0;
        /*
        * pay attention that if head_ptr->data_len is smaller than max_mess_len we need to
        * change scat, do recvmsg, and restore scat, and then check ret.
        * ret = recvmsg( mbox, &msg, 0 );
        * if( ret <=0 )
        * {
        *      Alarm( SESSION, "SP_scat_receive: failed receiving message on session %d\n", mbox );
        *
        *       Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
        *
        *      SP_kill( mbox );
        *      return;
        * }
        */

        /* calculate scat_index and byte_index based on ret and scat_mess */
        for( byte_index=ret, scat_index=0; scat_index < scat_mess->num_elements; scat_index++ )
        {
                if( scat_mess->elements[scat_index].len > byte_index ) break;
                byte_index -= scat_mess->elements[scat_index].len;
        }

        remain -= ret;
        for( len=ret; remain > 0; len += ret, remain -= ret )
        {
                to_read = scat_mess->elements[scat_index].len - byte_index;
                if( to_read > remain ) to_read = remain;
                while(((ret = recv( mbox, &scat_mess->elements[scat_index].buf[byte_index], to_read, 0 )) == -1 )
                        && ((sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == EWOULDBLOCK)) )
                        ;
                if( ret <=0 )
                {
                        Alarm( SESSION, "SP_scat_receive: failed receiving message on session %d, ret is %d: %s\n",
                                mbox, ret, sock_strerror(sock_errno) );
                        Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
                        SP_kill( mbox );
                        return( CONNECTION_CLOSED );
                }else if( ret == to_read ){
                        byte_index = 0;
                        scat_index++;
                }else{
                        byte_index += ret;
                }
        }

        if( Is_reg_memb_mess( head_ptr->type ) && !short_groups )
        {
                /* calculate my index in group */
                for( i=0; i < head_ptr->num_groups; i++ )
                {
                        if( !strcmp( groups[i], This_session_private_group ) )
                        {
                                *mess_type = i;
                                break;
                        }
                }
        }

        if( Is_reg_memb_mess( head_ptr->type ) && !Same_endian( head_ptr->type ) )
        {
                int             flip_size;
                group_id        *gid_ptr;
                int32           *num_vs_ptr;
                int             bytes_to_copy, bytes_index;
                char            groups_buf[10240];

                /*
                * flip membership message:
                * group_id and number of member ins vs_set
                * so - acctually 4 int32.
                */
                flip_size = sizeof( group_id ) + sizeof( int32 );
                if( flip_size > max_mess_len ) flip_size = max_mess_len;
                for( bytes_index=0, i=0 ; bytes_index < flip_size ; i++, bytes_index += bytes_to_copy )
                {
                        bytes_to_copy = flip_size - bytes_index;
                        if( bytes_to_copy > scat_mess->elements[i].len )
                                bytes_to_copy = scat_mess->elements[i].len;
                        memcpy( &groups_buf[bytes_index], scat_mess->elements[i].buf, bytes_to_copy );
                }
                gid_ptr    = (group_id *)&groups_buf[0];
                num_vs_ptr = (int32 *)&groups_buf[sizeof(group_id)];
                gid_ptr->memb_id.proc_id = Flip_int32( gid_ptr->memb_id.proc_id );
                gid_ptr->memb_id.time    = Flip_int32( gid_ptr->memb_id.time );
                gid_ptr->index     = Flip_int32( gid_ptr->index );
                *num_vs_ptr              = Flip_int32( *num_vs_ptr );
                for( bytes_index=0, i=0 ; bytes_index < flip_size ; i++, bytes_index += bytes_to_copy )
                {
                        bytes_to_copy = flip_size - bytes_index;
                        if( bytes_to_copy > scat_mess->elements[i].len )
                                bytes_to_copy = scat_mess->elements[i].len;
                        memcpy( scat_mess->elements[i].buf, &groups_buf[bytes_index], bytes_to_copy );
                }

        }
        if ( Is_reject_mess( head_ptr->type ) )
        {
                /* set type to be old type + reject */
                head_ptr->type = old_type | REJECT_MESS;
        }
        *service_type = Clear_endian( head_ptr->type );

        if( short_buffer )
        {
                for( remain = head_ptr->data_len - max_mess_len; remain > 0; remain -= ret )
                {
                        to_read = remain;
                        if( to_read > sizeof( dummy_buf ) ) to_read = sizeof( dummy_buf );
                        while(((ret = recv( mbox, dummy_buf, to_read, 0 )) == -1 ) && ((sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == EWOULDBLOCK)) )
                                ;
                        if( ret <=0 )
                        {
                                Alarm( SESSION, "SP_scat_receive: failed receiving overflow on session %d, ret is %d: %s\n",
                                        mbox, ret, sock_strerror(sock_errno) );
                                Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
                                SP_kill( mbox );
                                return( CONNECTION_CLOSED );
                        }
                }
                Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
                return( BUFFER_TOO_SHORT );
        }
        /* Successful receive so clear saved_message info if any */
        if (This_session_message_saved) {
                Mutex_lock( &Struct_mutex );
                ses = SP_get_session( mbox );
                if( ses < 0 ){
                        Mutex_unlock( &Struct_mutex );
                        Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
                        return( ILLEGAL_SESSION );
                }
                memset(&(Sessions[ses].recv_saved_head), 0, sizeof(message_header) );
                Sessions[ses].recv_message_saved = 0;
                Mutex_unlock( &Struct_mutex );
        }

        Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
        return( head_ptr->data_len );
}










More information about the Spread-users mailing list