[Spread-users] Spread and select
DaiLin(戴霖)
dailin at founder.com
Sun Apr 9 11:55:50 EDT 2006
> Hua Zhong wrote:
> Good, but there are a couple of issues:
> 1. it's not documented, thus making me wonder if it's officially supported feature. In fact, there is not even a SP_mailbox_to_fd
> kind of helper. How could one assume mailbox == fd? It's implementation details that could change anytime, and nothing to depend on.
> 2. there is still not a non-blocking version SP_receive. As I explained earlier, it's not very nice to programers.
>
> I just checked the spread code (v3.17.3) and find the following in sp.c:
>
> while(((ret = recv( mbox, &scat_mess->elements[scat_index].buf[byte_index], to_read, 0 )) == -1 )
> && ((sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == EWOULDBLOCK)) )
>
> This is very weird. If it's a blocking socket, it should never return EGAIN. If it's a non-blocking socket, the above effectily will
> spin forever until there is data.
>
> Anyway, I think it would be a great improvement if non-blocking API could be provided.
>
> Hua
>
I have made some change to spread code to add a non-blocking receive method based on source code of version 3.17.2.
The following is my code:
int SP_receive( mailbox mbox, service *service_type, char sender[MAX_GROUP_NAME],
int max_groups, int *num_groups, char groups[][MAX_GROUP_NAME],
int16 *mess_type, int *endian_mismatch,
int max_mess_len, char *mess )
{
return SP_receive_timeout(mbox, service_type, sender, max_groups, num_groups, groups, mess_type,
endian_mismatch, max_mess_len, mess, 0 );
}
int SP_scat_receive( mailbox mbox, service *service_type,
char sender[MAX_GROUP_NAME], int max_groups,
int *num_groups, char groups[][MAX_GROUP_NAME],
int16 *mess_type, int *endian_mismatch,
scatter *scat_mess )
{
return SP_scat_receive_timeout( mbox, service_type, sender, max_groups, num_groups, groups,
mess_type, endian_mismatch, scat_mess, 0 );
}
int SP_receive_timeout( mailbox mbox, service *service_type, char sender[MAX_GROUP_NAME],
int max_groups, int *num_groups, char groups[][MAX_GROUP_NAME],
int16 *mess_type, int *endian_mismatch,
int max_mess_len, char *mess, long time_out )
{
int ret;
scatter recv_scat;
recv_scat.num_elements = 1;
recv_scat.elements[0].len = max_mess_len;
recv_scat.elements[0].buf = mess;
ret = SP_scat_receive_timeout( mbox, service_type, sender, max_groups, num_groups, groups,
mess_type, endian_mismatch, &recv_scat, time_out );
return( ret );
}
int SP_scat_receive_timeout( mailbox mbox, service *service_type, char sender[MAX_GROUP_NAME],
int max_groups, int *num_groups, char groups[][MAX_GROUP_NAME],
int16 *mess_type, int *endian_mismatch,
scatter *scat_mess, long time_out )
{
static char dummy_buf[10240];
int This_session_message_saved;
int drop_semantics;
message_header mess_head;
message_header *head_ptr;
char *buf_ptr;
int16 temp_mess_type;
int len, remain, ret;
int max_mess_len;
int short_buffer;
int short_groups;
int to_read;
int scat_index, byte_index;
int ses;
char This_session_private_group[MAX_GROUP_NAME];
int i;
int32 old_type;
fd_set fd;
struct timeval tv;
struct timeval *ptv = NULL;
/* I must acquire the lock for this mbox before the Struct_mutex lock because
* I must be sure ONLY one thread is in recv for this mbox, EVEN for
* this initial 'get the session and session state' operation.
* Otherwise one thread enters this and gets the state and sees no saved message
* then grabs the mbox mutex and discoveres buffer too short and so regrabs the
* Struct_mutex and adds the saved header, but during this time another thread
* has entered recv for the same mbox and already grabbed the struct_mutex and also
* read that no saved mesage exists and is now waiting for the mbox mutex.
* When it the first thread returns and releases the mbox mutex, the second thread will
* grab it and enter--but it will think there is NO saved messaage when in reality
* there IS one. This will cause MANY PROBLEMS :-)
*
* NOTE: locking and unlocking the Struct_mutex multiple times during this is OK
* BECAUSE struct_Mutex only locks non-blocking operations that are guaranteed to complete
* quickly and never take additional locks.
*/
Mutex_lock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
Mutex_lock( &Struct_mutex );
/* verify mbox */
ses = SP_get_session( mbox );
if( ses < 0 ){
Mutex_unlock( &Struct_mutex );
Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
return( ILLEGAL_SESSION );
}
strcpy( This_session_private_group, Sessions[ses].private_group_name );
if (Sessions[ses].recv_message_saved) {
memcpy(&mess_head, &(Sessions[ses].recv_saved_head), sizeof(message_header) );
This_session_message_saved = 1;
} else {
This_session_message_saved = 0;
}
Mutex_unlock( &Struct_mutex );
head_ptr = (message_header *)&mess_head;
buf_ptr = (char *)&mess_head;
drop_semantics = Is_drop_recv(*service_type);
if (time_out < 0)
time_out = 0;
if (!This_session_message_saved) {
/* read up to size of message_header */
for( len=0, remain = sizeof(message_header); remain > 0; len += ret, remain -= ret )
{
if (time_out > 0)
{
tv.tv_sec = time_out / 1000;
tv.tv_usec = (time_out - ((long)(time_out / 1000)) * 1000) * 1000;
ptv = &tv;
}
do
{
fd.fd_count = 1;
fd.fd_array[0] = mbox;
}
while((ret = select(0, &fd, NULL, NULL, ptv)) < 0);
if (ret == 0)
{
Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
return( RECEIVE_TIMEOUT );
}
ret = recv( mbox, &buf_ptr[len], remain, 0 );
//while(((ret = recv( mbox, &buf_ptr[len], remain, 0 )) == -1 )
// && ((sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == EWOULDBLOCK)) )
// ;
if( ret <=0 )
{
Alarm( SESSION, "SP_scat_receive: failed receiving header on session %d (ret: %d len: %d): %s\n", mbox, ret, len, sock_strerror(sock_errno) );
Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
SP_kill( mbox );
return( CONNECTION_CLOSED );
}
}
/* Fliping message header to my form if needed */
if( !Same_endian( head_ptr->type ) )
{
Flip_mess( head_ptr );
}
}
/* Validate user's scatter */
for( max_mess_len = 0, i=0; i < scat_mess->num_elements; i++ ) {
if ( scat_mess->elements[i].len < 0 ) {
if ( !drop_semantics && !This_session_message_saved) {
Mutex_lock( &Struct_mutex );
ses = SP_get_session( mbox );
if( ses < 0 ){
Mutex_unlock( &Struct_mutex );
Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
return( ILLEGAL_SESSION );
}
memcpy(&(Sessions[ses].recv_saved_head), &mess_head, sizeof(message_header) );
Sessions[ses].recv_message_saved = 1;
Mutex_unlock( &Struct_mutex );
}
return( ILLEGAL_MESSAGE );
}
max_mess_len += scat_mess->elements[i].len;
}
/* Validate num_groups and data_len */
if (head_ptr->num_groups < 0) {
/* reject this message since it has an impossible (negative) num_groups
* This is likely to be caused by a malicious attack or memory corruption
*/
return( ILLEGAL_MESSAGE );
}
if (head_ptr->data_len < 0) {
/* reject this message since it has an impossible (negative) data_len
* This is likely to be caused by a malicious attack or memory corruption
*/
return( ILLEGAL_MESSAGE );
}
/* Check if sufficient buffer space for groups and data */
if (!drop_semantics) {
if ( (head_ptr->num_groups > max_groups) || (head_ptr->data_len > max_mess_len) ) {
if (!This_session_message_saved) {
Mutex_lock( &Struct_mutex );
ses = SP_get_session( mbox );
if( ses < 0 ){
Mutex_unlock( &Struct_mutex );
Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
return( ILLEGAL_SESSION );
}
memcpy(&(Sessions[ses].recv_saved_head), &mess_head, sizeof(message_header) );
Sessions[ses].recv_message_saved = 1;
Mutex_unlock( &Struct_mutex );
}
if ( Is_regular_mess( head_ptr->type ) || Is_reject_mess( head_ptr->type ) )
{
temp_mess_type = head_ptr->hint;
if ( !Same_endian( head_ptr->hint ) ) {
temp_mess_type = Flip_int32( temp_mess_type );
}
*mess_type = ( temp_mess_type >> 8 ) & 0x0000ffff;
}
else
*mess_type = 0;
*service_type = Clear_endian( head_ptr->type );
if (head_ptr->num_groups > max_groups)
*num_groups = -(head_ptr->num_groups);
else
*num_groups = 0;
if (head_ptr->data_len > max_mess_len)
*endian_mismatch = -(head_ptr->data_len);
else
*endian_mismatch = 0;
Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
if (*num_groups)
return( GROUPS_TOO_SHORT );
else
return( BUFFER_TOO_SHORT );
}
}
/* Compute mess_type and endian_mismatch from hint */
if( Is_regular_mess( head_ptr->type ) || Is_reject_mess( head_ptr->type) )
{
if( !Same_endian( head_ptr->hint ) )
{
head_ptr->hint = Flip_int32( head_ptr->hint );
head_ptr->hint = Clear_endian( head_ptr->hint );
head_ptr->hint = ( head_ptr->hint >> 8 ) & 0x0000ffff;
*mess_type = head_ptr->hint;
*endian_mismatch = 1;
}else{
head_ptr->hint = Clear_endian( head_ptr->hint );
head_ptr->hint = ( head_ptr->hint >> 8 ) & 0x0000ffff;
*mess_type = head_ptr->hint;
*endian_mismatch = 0;
}
}else{
*mess_type = -1; /* marks the index (0..n-1) of the member in the group */
*endian_mismatch = 0;
}
strcpy( sender, head_ptr->private_group_name );
/* if a reject message read the extra old_type field first, and merge with head_ptr->type */
if ( Is_reject_mess( head_ptr->type ) )
{
remain = 4;
buf_ptr = (char *)&old_type;
for( len=0; remain > 0; len += ret, remain -= ret )
{
while(((ret = recv( mbox, &buf_ptr[len], remain, 0 )) == -1 ) && ((sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == EWOULDBLOCK)) )
;
if( ret <=0 )
{
Alarm( SESSION, "SP_scat_receive: failed receiving old_type for reject on session %d, ret is %d: %s\n", mbox, ret, sock_strerror(sock_errno));
Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
SP_kill( mbox );
return( CONNECTION_CLOSED );
}
}
/* endian flip it */
if ( !Same_endian( head_ptr->type ) )
old_type = Flip_int32(old_type);
}
/* read the destination groups */
buf_ptr = (char *)groups;
remain = head_ptr->num_groups * MAX_GROUP_NAME;
short_groups=0;
if( head_ptr->num_groups > max_groups )
{
/* groups too short */
remain = max_groups * MAX_GROUP_NAME;
short_groups = 1;
}
for( len=0; remain > 0; len += ret, remain -= ret )
{
while(((ret = recv( mbox, &buf_ptr[len], remain, 0 )) == -1 ) && ((sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == EWOULDBLOCK)) )
;
if( ret <=0 )
{
Alarm( SESSION, "SP_scat_receive: failed receiving groups on session %d, ret is %d: %s\n", mbox, ret, sock_strerror(sock_errno));
Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
SP_kill( mbox );
return( CONNECTION_CLOSED );
}
}
if( short_groups )
{
for( remain = (head_ptr->num_groups - max_groups) * MAX_GROUP_NAME;
remain > 0; remain -= ret )
{
to_read = remain;
if( to_read > sizeof( dummy_buf ) ) to_read = sizeof( dummy_buf );
while(((ret = recv( mbox, dummy_buf, to_read, 0 )) == -1 ) && ((sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == EWOULDBLOCK)) )
;
if( ret <=0 )
{
Alarm( SESSION, "SP_scat_receive: failed receiving groups overflow on session %d, ret is %d: %s\n",
mbox, ret, sock_strerror(sock_errno) );
Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
SP_kill( mbox );
return( CONNECTION_CLOSED );
}
}
*num_groups = -head_ptr->num_groups; /* !!!! */
}else *num_groups = head_ptr->num_groups;
/* read the rest of the message */
remain = head_ptr->data_len;
short_buffer=0;
if( head_ptr->data_len > max_mess_len )
{
/* buffer too short */
remain = max_mess_len;
short_buffer = 1;
}
ret = 0;
/*
* pay attention that if head_ptr->data_len is smaller than max_mess_len we need to
* change scat, do recvmsg, and restore scat, and then check ret.
* ret = recvmsg( mbox, &msg, 0 );
* if( ret <=0 )
* {
* Alarm( SESSION, "SP_scat_receive: failed receiving message on session %d\n", mbox );
*
* Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
*
* SP_kill( mbox );
* return;
* }
*/
/* calculate scat_index and byte_index based on ret and scat_mess */
for( byte_index=ret, scat_index=0; scat_index < scat_mess->num_elements; scat_index++ )
{
if( scat_mess->elements[scat_index].len > byte_index ) break;
byte_index -= scat_mess->elements[scat_index].len;
}
remain -= ret;
for( len=ret; remain > 0; len += ret, remain -= ret )
{
to_read = scat_mess->elements[scat_index].len - byte_index;
if( to_read > remain ) to_read = remain;
while(((ret = recv( mbox, &scat_mess->elements[scat_index].buf[byte_index], to_read, 0 )) == -1 )
&& ((sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == EWOULDBLOCK)) )
;
if( ret <=0 )
{
Alarm( SESSION, "SP_scat_receive: failed receiving message on session %d, ret is %d: %s\n",
mbox, ret, sock_strerror(sock_errno) );
Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
SP_kill( mbox );
return( CONNECTION_CLOSED );
}else if( ret == to_read ){
byte_index = 0;
scat_index++;
}else{
byte_index += ret;
}
}
if( Is_reg_memb_mess( head_ptr->type ) && !short_groups )
{
/* calculate my index in group */
for( i=0; i < head_ptr->num_groups; i++ )
{
if( !strcmp( groups[i], This_session_private_group ) )
{
*mess_type = i;
break;
}
}
}
if( Is_reg_memb_mess( head_ptr->type ) && !Same_endian( head_ptr->type ) )
{
int flip_size;
group_id *gid_ptr;
int32 *num_vs_ptr;
int bytes_to_copy, bytes_index;
char groups_buf[10240];
/*
* flip membership message:
* group_id and number of member ins vs_set
* so - acctually 4 int32.
*/
flip_size = sizeof( group_id ) + sizeof( int32 );
if( flip_size > max_mess_len ) flip_size = max_mess_len;
for( bytes_index=0, i=0 ; bytes_index < flip_size ; i++, bytes_index += bytes_to_copy )
{
bytes_to_copy = flip_size - bytes_index;
if( bytes_to_copy > scat_mess->elements[i].len )
bytes_to_copy = scat_mess->elements[i].len;
memcpy( &groups_buf[bytes_index], scat_mess->elements[i].buf, bytes_to_copy );
}
gid_ptr = (group_id *)&groups_buf[0];
num_vs_ptr = (int32 *)&groups_buf[sizeof(group_id)];
gid_ptr->memb_id.proc_id = Flip_int32( gid_ptr->memb_id.proc_id );
gid_ptr->memb_id.time = Flip_int32( gid_ptr->memb_id.time );
gid_ptr->index = Flip_int32( gid_ptr->index );
*num_vs_ptr = Flip_int32( *num_vs_ptr );
for( bytes_index=0, i=0 ; bytes_index < flip_size ; i++, bytes_index += bytes_to_copy )
{
bytes_to_copy = flip_size - bytes_index;
if( bytes_to_copy > scat_mess->elements[i].len )
bytes_to_copy = scat_mess->elements[i].len;
memcpy( scat_mess->elements[i].buf, &groups_buf[bytes_index], bytes_to_copy );
}
}
if ( Is_reject_mess( head_ptr->type ) )
{
/* set type to be old type + reject */
head_ptr->type = old_type | REJECT_MESS;
}
*service_type = Clear_endian( head_ptr->type );
if( short_buffer )
{
for( remain = head_ptr->data_len - max_mess_len; remain > 0; remain -= ret )
{
to_read = remain;
if( to_read > sizeof( dummy_buf ) ) to_read = sizeof( dummy_buf );
while(((ret = recv( mbox, dummy_buf, to_read, 0 )) == -1 ) && ((sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == EWOULDBLOCK)) )
;
if( ret <=0 )
{
Alarm( SESSION, "SP_scat_receive: failed receiving overflow on session %d, ret is %d: %s\n",
mbox, ret, sock_strerror(sock_errno) );
Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
SP_kill( mbox );
return( CONNECTION_CLOSED );
}
}
Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
return( BUFFER_TOO_SHORT );
}
/* Successful receive so clear saved_message info if any */
if (This_session_message_saved) {
Mutex_lock( &Struct_mutex );
ses = SP_get_session( mbox );
if( ses < 0 ){
Mutex_unlock( &Struct_mutex );
Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
return( ILLEGAL_SESSION );
}
memset(&(Sessions[ses].recv_saved_head), 0, sizeof(message_header) );
Sessions[ses].recv_message_saved = 0;
Mutex_unlock( &Struct_mutex );
}
Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
return( head_ptr->data_len );
}
More information about the Spread-users
mailing list