Index: spreadmap/spread_src/spread4_olf/daemon/data_link.c =================================================================== RCS file: /cvs/master/olf/ecom/spreadmap/spread_src/spread4_olf/daemon/data_link.c,v retrieving revision 1.6 diff -b -c -1 -1 -r1.6 data_link.c *** spreadmap/spread_src/spread4_olf/daemon/data_link.c 21 Jul 2009 20:40:23 -0000 1.6 --- spreadmap/spread_src/spread4_olf/daemon/data_link.c 24 Jan 2013 22:12:24 -0000 *************** *** 50,84 **** --- 50,115 ---- #include #endif /* ARCH_PC_WIN95 */ #include #include #include "data_link.h" #include "status.h" #include "alarm.h" #include "sp_events.h" /* for sp_time */ + static void set_large_socket_buffers(int s) + { + int i, on, ret; + sockopt_len_t onlen; + + for( i=10; i <= 200; i+=5 ) + { + on = 1024*i; + + ret = setsockopt( s, SOL_SOCKET, SO_SNDBUF, (void *)&on, 4); + if (ret < 0 ) break; + + ret = setsockopt( s, SOL_SOCKET, SO_RCVBUF, (void *)&on, 4); + if (ret < 0 ) break; + + onlen = sizeof(on); + ret= getsockopt( s, SOL_SOCKET, SO_SNDBUF, (void *)&on, &onlen ); + if( on < i*1024 ) break; + Alarmp( SPLOG_INFO, SESSION, "set_large_socket_buffers: set sndbuf %d, ret is %d\n", on, ret ); + + onlen = sizeof(on); + ret= getsockopt( s, SOL_SOCKET, SO_RCVBUF, (void *)&on, &onlen ); + if( on < i*1024 ) break; + Alarmp( SPLOG_INFO, SESSION, "set_large_socket_buffers: set rcvbuf %d, ret is %d\n", on, ret ); + } + Alarmp( SPLOG_INFO, SESSION, "set_large_socket_buffers: set sndbuf/rcvbuf to %d\n", 1024*(i-5) ); + } + + channel DL_init_channel( int32 channel_type, int16 port, int32 mcast_address, int32 interface_address ) { channel chan; struct sockaddr_in soc_addr; int on=1; int i1,i2,i3,i4; #ifdef IP_MULTICAST_TTL unsigned char ttl_val; #endif if((chan = socket(AF_INET, SOCK_DGRAM, 0)) == -1) Alarm( EXIT, "DL_init_channel: socket error for port %d\n", port ); + set_large_socket_buffers(chan); + if ( channel_type & SEND_CHANNEL ) { if (setsockopt(chan, SOL_SOCKET, SO_BROADCAST, (char *)&on, sizeof(on)) < 0) Alarm( EXIT, "DL_init_channel: setsockopt error for port %d\n",port); Alarm( DATA_LINK, "DL_init_channel: setsockopt for send and broadcast went ok\n"); #ifdef IP_MULTICAST_TTL /* ### Isn't this for sending??? */ ttl_val = 1; if (setsockopt(chan, IPPROTO_IP, IP_MULTICAST_TTL, (void *)&ttl_val, *************** *** 127,160 **** --- 158,193 ---- i4 = mcast_address & 0x000000ff; if( i1 >=224 && i1 < 240 ) { #ifdef IP_MULTICAST_TTL struct ip_mreq mreq; mreq.imr_multiaddr.s_addr = htonl( mcast_address ); /* the interface could be changed to a specific interface if needed */ mreq.imr_interface.s_addr = INADDR_ANY; + if (setsockopt(chan, IPPROTO_IP, IP_ADD_MEMBERSHIP, (void *)&mreq, sizeof(mreq)) < 0) { Alarm( EXIT, "DL_init_channel: problem in setsockopt to multicast address %d errno[%d]::%s\n", mcast_address, sock_errno, sock_strerror(sock_errno) ); } Alarm( DATA_LINK, "DL_init_channel: Joining multicast address %d.%d.%d.%d went ok\n",i1,i2,i3,i4); #else /* no multicast support */ Alarm( EXIT, "DL_init_channel: Old SunOS architecture does not support IP multicast: %d.%d.%d.%d\n",i1,i2,i3,i4); #endif } else { + if (setsockopt(chan, SOL_SOCKET, SO_BROADCAST, (char *)&on, sizeof(on)) < 0) Alarm( EXIT, "DL_init_channel: setsockopt SO_BROADCAST error for port %d, socket %d\n",port,chan); Alarm( DATA_LINK, "DL_init_channel: setsockopt for recv and broadcast went ok\n"); } } Alarm( DATA_LINK, "DL_init_channel: went ok on channel %d\n",chan); return ( chan ); } Index: spreadmap/spread_src/spread4_olf/daemon/net_types.h =================================================================== RCS file: /cvs/master/olf/ecom/spreadmap/spread_src/spread4_olf/daemon/net_types.h,v retrieving revision 1.4 diff -b -c -1 -1 -r1.4 net_types.h *** spreadmap/spread_src/spread4_olf/daemon/net_types.h 5 Jan 2007 15:54:25 -0000 1.4 --- spreadmap/spread_src/spread4_olf/daemon/net_types.h 24 Jan 2013 22:12:25 -0000 *************** *** 123,144 **** --- 123,145 ---- typedef struct dummy_token_header { int32 type; int32 transmiter_id; int32 seq; int32 proc_id; int32 aru; int32 aru_last_id; int16 flow_control; int16 rtr_len; int32 conf_hash; + int16 reprocessing; } token_header; typedef char token_body[MAX_PACKET_SIZE-sizeof(token_header)]; typedef struct dummy_ring_rtr { membership_id memb_id; int32 proc_id; int16 seg_index; int16 num_seq; } ring_rtr; Index: spreadmap/spread_src/spread4_olf/daemon/network.c =================================================================== RCS file: /cvs/master/olf/ecom/spreadmap/spread_src/spread4_olf/daemon/network.c,v retrieving revision 1.5 diff -b -c -1 -1 -r1.5 network.c *** spreadmap/spread_src/spread4_olf/daemon/network.c 21 Jul 2009 20:40:23 -0000 1.5 --- spreadmap/spread_src/spread4_olf/daemon/network.c 24 Jan 2013 22:12:27 -0000 *************** *** 61,82 **** --- 61,84 ---- static int Num_send_needed; static int32 Send_address[MAX_SEGMENTS]; static int16 Send_ports[MAX_SEGMENTS]; /* ### Pack: 3 lines */ /* Global in function so both Net_queue_bcast and Net_flush_bcast can access them */ static sys_scatter Queue_scat; static int Queued_bytes = 0; static const char align_padding[4] = "padd"; /* address for token sending - which is always needed */ + + static int32 My_address; static int32 Token_address; static int16 Token_port; static configuration Net_membership; static int Segment_leader; static configuration *Cn; static proc My; static int16 Partition[MAX_PROCS_RING]; static sp_time Partition_timeout = { 60, 0}; *************** *** 202,223 **** --- 204,227 ---- Num_send_needed++; } } assert(my_next_index != -1); for( i=0; i < Num_send_needed; i++ ) Alarm( NETWORK, "Net_set_membership: Send_addr[%d] is (%u.%u.%u.%u:%d)\n", i, IP1(Send_address[i]), IP2(Send_address[i]), IP3(Send_address[i]), IP4(Send_address[i]), Send_ports[i] ); /* Calculate where to send the token */ Token_address = 0; + My_address =my_seg.procs[my_index_in_seg]->id; + if( my_index_in_seg < my_seg.num_procs-1 ) { Token_address = my_seg.procs[my_index_in_seg+1]->id; Token_port = my_seg.port+1; }else{ /* I am last in my segment */ if( Num_send_needed == 0 ) { /* * My segment is the only segment * sending token to the first in my segment *************** *** 634,667 **** --- 638,690 ---- } int Net_send_token( sys_scatter *scat ) { token_header *token_ptr; int ret; token_ptr = (token_header *)scat->elements[0].buf; token_ptr->type = Set_endian( token_ptr->type ); token_ptr->conf_hash = Cn->hash_code; token_ptr->transmiter_id = My.id; + token_ptr->reprocessing = 0; + if ( token_ptr->rtr_len > (MAX_PACKET_SIZE - sizeof(token_header) ) ) { if ( Is_form( token_ptr->type ) ) Memb_print_form_token( scat ); Alarmp( SPLOG_FATAL, PRINT, "Net_send_token: Token too long for packet!\n"); } ret = DL_send( Send_channel, Token_address, Token_port, scat ); return ( ret ); } + int Net_resend_myself_token( sys_scatter *scat ) + { + + int ret; + token_header *token_ptr; + size_t saved_len = scat->elements[1].len; + + token_ptr = (token_header *)scat->elements[0].buf; + token_ptr->type = Set_endian( token_ptr->type ); + token_ptr->reprocessing = 1; + + scat->elements[1].len =token_ptr->rtr_len; + + ret = DL_send( Send_channel, My_address, Token_port, scat ); + scat->elements[1].len =saved_len; + return ( ret ); + } int Net_recv_token( channel fd, sys_scatter *scat ) { token_header *token_ptr; int ret, i; bool ch_found; token_ptr = (token_header *)scat->elements[0].buf; ch_found = FALSE; for (i = 0 ; i < Num_token_channels; i++) { if ( fd == Token_channel[i]) { *************** *** 802,814 **** --- 825,838 ---- void Flip_token( token_header *token_ptr ) { token_ptr->type = Flip_int32( token_ptr->type ); token_ptr->transmiter_id = Flip_int32( token_ptr->transmiter_id ); token_ptr->seq = Flip_int32( token_ptr->seq ); token_ptr->proc_id = Flip_int32( token_ptr->proc_id ); token_ptr->aru = Flip_int32( token_ptr->aru ); token_ptr->aru_last_id = Flip_int32( token_ptr->aru_last_id ); token_ptr->flow_control = Flip_int16( token_ptr->flow_control ); token_ptr->rtr_len = Flip_int16( token_ptr->rtr_len ); token_ptr->conf_hash = Flip_int32( token_ptr->conf_hash ); + token_ptr->reprocessing = Flip_int16( token_ptr->reprocessing ); } Index: spreadmap/spread_src/spread4_olf/daemon/network.h =================================================================== RCS file: /cvs/master/olf/ecom/spreadmap/spread_src/spread4_olf/daemon/network.h,v retrieving revision 1.4 diff -b -c -1 -1 -r1.4 network.h *** spreadmap/spread_src/spread4_olf/daemon/network.h 5 Jan 2007 15:54:25 -0000 1.4 --- spreadmap/spread_src/spread4_olf/daemon/network.h 24 Jan 2013 22:12:28 -0000 *************** *** 43,64 **** --- 43,65 ---- void Net_init(); void Net_set_membership( configuration memb ); void Net_signal_conf_reload(void); int Net_bcast( sys_scatter *scat ); int Net_queue_bcast(sys_scatter *scat); int Net_flush_bcast(void); int Net_scast( int16 seg_index, sys_scatter *scat ); int Net_ucast( int32 proc_id, sys_scatter *scat ); int Net_recv ( channel fd, sys_scatter *scat ); int Net_send_token( sys_scatter *scat ); + int Net_resend_myself_token( sys_scatter *scat ); int Net_recv_token( channel fd, sys_scatter *scat ); int Net_ucast_token( int32 proc_id, sys_scatter *scat ); channel *Net_bcast_channel(void); channel *Net_token_channel(void); void Net_num_channels(int *num_bcast, int *num_token); void Net_set_partition(int16 *new_partition); void Net_clear_partition(void); #endif /* INC_NETWORK */ Index: spreadmap/spread_src/spread4_olf/daemon/protocol.c =================================================================== RCS file: /cvs/master/olf/ecom/spreadmap/spread_src/spread4_olf/daemon/protocol.c,v retrieving revision 1.5 diff -b -c -1 -1 -r1.5 protocol.c *** spreadmap/spread_src/spread4_olf/daemon/protocol.c 21 Jul 2009 20:40:23 -0000 1.5 --- spreadmap/spread_src/spread4_olf/daemon/protocol.c 24 Jan 2013 22:12:30 -0000 *************** *** 147,168 **** --- 147,169 ---- Send_pack.num_elements = 2; Send_pack.elements[0].len = sizeof(packet_header); Token = (token_header *)New_token.elements[0].buf; Last_token = new(TOKEN_HEAD_OBJ); Last_token->type = 0; Last_token->seq = 0; Last_token->aru = 0; Last_token->flow_control = 0; Last_token->conf_hash = 0; + Last_token->reprocessing = 0; Hurry_pack.num_elements = 1; Hurry_pack.elements[0].len = sizeof(packet_header); Hurry_pack.elements[0].buf = (char *) new(PACKET_BODY); Hurry_head = (packet_header *)Hurry_pack.elements[0].buf; Hurry_head->proc_id = My.id; Hurry_head->type = HURRY_TYPE; Net_init(); bcast_channels = Net_bcast_channel(); *************** *** 245,266 **** --- 246,270 ---- packet_header *pack_ptr; int pack_entry; proc p; int received_bytes; int total_bytes_processed; int num_buffered_packets; int i; int32 j; /* int r1,r2; */ received_bytes = Net_recv( fd, &New_pack ); + + Alarm( PROTOCOL, "BROADCAST: Received %d, total processed %d\n", received_bytes, total_bytes_processed); + /* My own packet or from another monitor component */ if( received_bytes == 0 ) return; /* problem in receiving */ if( received_bytes < 0 ) return; pack_ptr = (packet_header *)New_pack.elements[0].buf; /* ### Pack, this has to move down to network.c * if( pack_ptr->data_len +sizeof(packet_header) != received_bytes ) * { * Alarm( PRINT, "Prot_handle_bcast: received %d, should be %d\n", *************** *** 318,339 **** --- 322,345 ---- if( Conf_leader( Memb_active_ptr() ) == My.id ) { E_queue( Prot_token_hurry, 0, NULL, Hurry_timeout ); } } /* ### Pack: next 70 lines (almost till the end of the routine) have changed */ Buffered_packets[0].head = pack_ptr; Buffered_packets[0].body = (packet_body *)New_pack.elements[1].buf; received_bytes -= sizeof(packet_header); total_bytes_processed = pack_ptr->data_len; + + Alarm( PROTOCOL, "BROADCAST: Received %d, total processed %d\n", received_bytes, total_bytes_processed); /* ignore any alignment padding */ switch(total_bytes_processed % 4) { case 1: total_bytes_processed++; case 2: total_bytes_processed++; case 3: total_bytes_processed++; case 0: /* already aligned */ *************** *** 447,469 **** /* int r1,r2;*/ ret = Net_recv_token( fd, &New_token ); /* from another monitor component */ if( ret == 0 ) return; /* delete random r1 = ((-My.id)%17)+3; r2 = get_rand() % (r1+3 ); if ( r2 == 0 ) return; */ ! Alarm( DEBUG, "Received Token\n"); /* check if it is a regular token */ if( Is_form( Token->type ) ) { Alarm(PROTOCOL, "it is a Form Token.\n"); Memb_handle_token( &New_token ); return; } /* The Veto property for tokens - swallow this token */ if( ! Memb_token_alive() ) { Alarm(PROTOCOL, "Prot_handle_token: Veto Property. Memb not alive.\n"); --- 453,476 ---- /* int r1,r2;*/ ret = Net_recv_token( fd, &New_token ); /* from another monitor component */ if( ret == 0 ) return; /* delete random r1 = ((-My.id)%17)+3; r2 = get_rand() % (r1+3 ); if ( r2 == 0 ) return; */ ! Alarm( DEBUG, "Received Token ARQ :%d, Retrans %d, Last Token Arq: %d, Ret: %d Seq: %d, My Seq %d, reprocess %d\n", ! Get_arq(Token->type), Get_retrans(Token->type), Get_arq(Last_token->type), ret, Token->seq, Highest_seq, Token->reprocessing ); /* check if it is a regular token */ if( Is_form( Token->type ) ) { Alarm(PROTOCOL, "it is a Form Token.\n"); Memb_handle_token( &New_token ); return; } /* The Veto property for tokens - swallow this token */ if( ! Memb_token_alive() ) { Alarm(PROTOCOL, "Prot_handle_token: Veto Property. Memb not alive.\n"); *************** *** 473,541 **** if( ret != sizeof(token_header) + Token->rtr_len ) { Alarm( PRINT, "Prot_handle_token: recv token len is %d, should be %d\n", ret,sizeof(token_header) + Token->rtr_len ); return; } if( !Same_endian( Token->type ) ) Flip_token_body( New_token.elements[1].buf, Token ); /* Deal with wrapping seq values (2^32) by triggering a membership by dropping the token */ if( (Memb_state() != EVS ) && (Token->seq > MAX_WRAP_SEQUENCE_VALUE ) ) { Alarm( PROTOCOL, "Prot_handle_token: Token Sequence number (%ld) approaching 2^31 so trigger membership to reset it.\n", Token->seq); return; } if( Conf_leader( Memb_active_ptr() ) == My.id ) { if( Get_arq(Token->type) != Get_arq(Last_token->type) ) { Alarm( PROTOCOL, "Prot_handle_token: leader swallowing token %d %d %d\n", Get_arq(Token->type),Get_retrans(Token->type),Get_arq(Last_token->type) ); /* received double token - swallow it */ return; } ! }else{ if( Get_arq(Token->type) == Get_arq(Last_token->type) ) { if( Get_retrans(Token->type) > Get_retrans(Last_token->type) ) { val = Get_retrans(Token->type); Last_token->type = Set_retrans(Last_token->type,val); /* asked to send token again (almost lost) */ Alarm( PROTOCOL, "Prot_handle_token: not leader, asked to retrans %d %d\n", Get_arq(Token->type), val ); Prot_token_hurry(); ! }else{ Alarm( PROTOCOL, "Prot_handle_token: not leader, swallow same token %d %d\n", Get_arq(Token->type), Get_retrans(Token->type) ); } return; ! } else if ( Get_arq(Token->type) != ( ( Get_arq( Last_token->type ) + 1 ) % 0x10 ) ) { Alarm( PROTOCOL, "Prot_handle_token: not leader, swallowing very outdated token: ARQ(%d) RETRANS(%d) vs. Last ARQ(%d)\n", Get_arq(Token->type), Get_retrans(Token->type), Get_arq(Last_token->type) ); return; ! } else { if ( Get_retrans(Token->type) > 0 ) { GlobalStatus.token_hurry++; } } } if( Highest_seq < Token->seq ) Highest_seq = Token->seq; /* Handle retransmissions */ num_retrans = Answer_retrans( &new_ptr, &rtr_proc_id, &rtr_seg_index ); GlobalStatus.retrans += num_retrans; new_rtr = New_token.elements[1].buf; /* Handle new packets */ --- 480,572 ---- if( ret != sizeof(token_header) + Token->rtr_len ) { Alarm( PRINT, "Prot_handle_token: recv token len is %d, should be %d\n", ret,sizeof(token_header) + Token->rtr_len ); return; } if( !Same_endian( Token->type ) ) Flip_token_body( New_token.elements[1].buf, Token ); + if ( !Token->reprocessing) + { + if (Highest_seq < Token->seq ) + { + /*may be a race conditon to resend to yourself before asking for retransmission*/ + #ifdef _WIN32 + Sleep(1); + #endif + Net_resend_myself_token( &New_token ); + return; + } + } + + + + /* Deal with wrapping seq values (2^32) by triggering a membership by dropping the token */ if( (Memb_state() != EVS ) && (Token->seq > MAX_WRAP_SEQUENCE_VALUE ) ) { Alarm( PROTOCOL, "Prot_handle_token: Token Sequence number (%ld) approaching 2^31 so trigger membership to reset it.\n", Token->seq); return; } if( Conf_leader( Memb_active_ptr() ) == My.id ) { if( Get_arq(Token->type) != Get_arq(Last_token->type) ) { Alarm( PROTOCOL, "Prot_handle_token: leader swallowing token %d %d %d\n", Get_arq(Token->type),Get_retrans(Token->type),Get_arq(Last_token->type) ); /* received double token - swallow it */ return; } ! } ! else ! { if( Get_arq(Token->type) == Get_arq(Last_token->type) ) { if( Get_retrans(Token->type) > Get_retrans(Last_token->type) ) { val = Get_retrans(Token->type); Last_token->type = Set_retrans(Last_token->type,val); /* asked to send token again (almost lost) */ Alarm( PROTOCOL, "Prot_handle_token: not leader, asked to retrans %d %d\n", Get_arq(Token->type), val ); Prot_token_hurry(); ! } ! else{ Alarm( PROTOCOL, "Prot_handle_token: not leader, swallow same token %d %d\n", Get_arq(Token->type), Get_retrans(Token->type) ); } return; ! } ! else if ( Get_arq(Token->type) != ( ( Get_arq( Last_token->type ) + 1 ) % 0x10 ) ) ! { Alarm( PROTOCOL, "Prot_handle_token: not leader, swallowing very outdated token: ARQ(%d) RETRANS(%d) vs. Last ARQ(%d)\n", Get_arq(Token->type), Get_retrans(Token->type), Get_arq(Last_token->type) ); return; ! } ! else ! { if ( Get_retrans(Token->type) > 0 ) { GlobalStatus.token_hurry++; } + } } if( Highest_seq < Token->seq ) Highest_seq = Token->seq; /* Handle retransmissions */ num_retrans = Answer_retrans( &new_ptr, &rtr_proc_id, &rtr_seg_index ); GlobalStatus.retrans += num_retrans; new_rtr = New_token.elements[1].buf; /* Handle new packets */ *************** *** 547,575 **** /* Flow control calculations */ Token->flow_control = Token->flow_control - Last_num_retrans - Last_num_sent + num_retrans + num_sent; Last_num_retrans = num_retrans; Last_num_sent = num_sent; /* Prepare my retransmission requests */ for( i = My_aru+1; i <= Highest_seq; i++ ) { ! if( ! Packets[i & PACKET_MASK].exist ) break; My_aru++; } GlobalStatus.my_aru = My_aru; if( My_aru < Highest_seq ) { /* Compute how many of my retransmission requests are possible to fit */ retrans_allowed = ( sizeof( token_body ) - new_ptr - sizeof( ring_rtr ) ) / sizeof( int32 ); if( retrans_allowed > 1 ) { ring_rtr_ptr = (ring_rtr *)&new_rtr[new_ptr]; ring_rtr_ptr->memb_id = Memb_id(); ring_rtr_ptr->proc_id = rtr_proc_id; ring_rtr_ptr->seg_index = rtr_seg_index; ring_rtr_ptr->num_seq = 0; new_ptr += sizeof(ring_rtr); for( i=My_aru+1; i <= Highest_seq && retrans_allowed > 0; i++, retrans_allowed-- ) --- 578,611 ---- /* Flow control calculations */ Token->flow_control = Token->flow_control - Last_num_retrans - Last_num_sent + num_retrans + num_sent; Last_num_retrans = num_retrans; Last_num_sent = num_sent; /* Prepare my retransmission requests */ for( i = My_aru+1; i <= Highest_seq; i++ ) { ! if( ! Packets[i & PACKET_MASK].exist ) ! { ! break; ! } My_aru++; } GlobalStatus.my_aru = My_aru; if( My_aru < Highest_seq ) { + Alarm( PROTOCOL, "RETRANS NEEDED ARU(%d), Highest Seq (%d) \n",My_aru, Highest_seq); + /* Compute how many of my retransmission requests are possible to fit */ retrans_allowed = ( sizeof( token_body ) - new_ptr - sizeof( ring_rtr ) ) / sizeof( int32 ); if( retrans_allowed > 1 ) { ring_rtr_ptr = (ring_rtr *)&new_rtr[new_ptr]; ring_rtr_ptr->memb_id = Memb_id(); ring_rtr_ptr->proc_id = rtr_proc_id; ring_rtr_ptr->seg_index = rtr_seg_index; ring_rtr_ptr->num_seq = 0; new_ptr += sizeof(ring_rtr); for( i=My_aru+1; i <= Highest_seq && retrans_allowed > 0; i++, retrans_allowed-- ) *************** *** 620,641 **** --- 656,678 ---- { val = Get_arq( Token->type ); val = (val + 1)% 0x10; Token->type = Set_arq( Token->type, val ); Token->type = Set_retrans( Token->type, 0 ); } /* Send token */ if( ! ( Conf_leader( Memb_active_ptr() ) == My.id && To_hold_token() ) ) { + /* sending token */ Net_send_token( &New_token ); /* ### Bug fix for SGIs */ #ifdef ARCH_SGI_IRIX Net_send_token( &New_token ); #endif /* ARCH_SGI_IRIX */ if( Get_retrans( Token->type ) > 1 ) { /* problems */ Net_send_token( &New_token ); *************** *** 853,875 **** Send_pack.elements[1].buf = (char *)Packets[pack_entry].body; Send_pack.elements[1].len = pack_ptr->data_len; if( ring_rtr_ptr->proc_id != -1 ) { ret = Net_ucast ( ring_rtr_ptr->proc_id, &Send_pack ); GlobalStatus.u_retrans++; Alarm( PROTOCOL, ! "Answer_retrans: retransmit to proc %d\n", ring_rtr_ptr->proc_id ); }else if( ring_rtr_ptr->seg_index != -1 ) { ret = Net_scast ( ring_rtr_ptr->seg_index, &Send_pack ); GlobalStatus.s_retrans++; Alarm( PROTOCOL, "Answer_retrans: retransmit to seg %d\n", ring_rtr_ptr->seg_index ); }else{ #if 1 ret = Net_queue_bcast ( &Send_pack ); #else ret = Net_bcast ( &Send_pack ); --- 890,912 ---- Send_pack.elements[1].buf = (char *)Packets[pack_entry].body; Send_pack.elements[1].len = pack_ptr->data_len; if( ring_rtr_ptr->proc_id != -1 ) { ret = Net_ucast ( ring_rtr_ptr->proc_id, &Send_pack ); GlobalStatus.u_retrans++; Alarm( PROTOCOL, ! "Answer_retrans: retransmit to proc %d aru is %d\n", ring_rtr_ptr->proc_id,Aru ); }else if( ring_rtr_ptr->seg_index != -1 ) { ret = Net_scast ( ring_rtr_ptr->seg_index, &Send_pack ); GlobalStatus.s_retrans++; Alarm( PROTOCOL, "Answer_retrans: retransmit to seg %d\n", ring_rtr_ptr->seg_index ); }else{ #if 1 ret = Net_queue_bcast ( &Send_pack ); #else ret = Net_bcast ( &Send_pack ); *************** *** 1347,1368 **** --- 1384,1406 ---- else return( 0 ); } static void Handle_hurry( packet_header *pack_ptr ) { if( Conf_leader( Memb_active_ptr() ) == My.id && Is_token_hold() ) { if( Conf_id_in_conf( Memb_active_ptr(), pack_ptr->proc_id ) != -1 ) { Alarm( PROTOCOL, "Handle_hurry: sending token now\n"); + Token_counter = 0; Prot_token_hurry(); } } } void Prot_token_hurry() { /* asked to send token again (almost lost) */ sys_scatter retrans_token; int32 val; Index: spreadmap/spread_src/spread4_olf/daemon/scatter.h =================================================================== RCS file: /cvs/master/olf/ecom/spreadmap/spread_src/spread4_olf/daemon/scatter.h,v retrieving revision 1.5 diff -b -c -1 -1 -r1.5 scatter.h *** spreadmap/spread_src/spread4_olf/daemon/scatter.h 12 Dec 2007 14:37:13 -0000 1.5 --- spreadmap/spread_src/spread4_olf/daemon/scatter.h 24 Jan 2013 22:12:31 -0000 *************** *** 35,60 **** #ifndef INC_SCATTER #define INC_SCATTER #include "arch.h" #define MAX_SCATTER_ELEMENTS 130 /* scat_element is EXACTLY as defined in iovec */ typedef struct dummy_scat_element{ char *buf; ! int len; } scat_element; typedef struct dummy_scatter { ! int num_elements; scat_element elements[ARCH_SCATTER_SIZE]; } sys_scatter; typedef struct dummy_big_scatter { ! int num_elements; scat_element elements[MAX_SCATTER_ELEMENTS]; } scatter; #endif /* INC_SCATTER */ --- 35,60 ---- #ifndef INC_SCATTER #define INC_SCATTER #include "arch.h" #define MAX_SCATTER_ELEMENTS 130 /* scat_element is EXACTLY as defined in iovec */ typedef struct dummy_scat_element{ char *buf; ! size_t len; } scat_element; typedef struct dummy_scatter { ! size_t num_elements; scat_element elements[ARCH_SCATTER_SIZE]; } sys_scatter; typedef struct dummy_big_scatter { ! size_t num_elements; scat_element elements[MAX_SCATTER_ELEMENTS]; } scatter; #endif /* INC_SCATTER */ Index: spreadmap/spread_src/spread4_olf/daemon/spread.c =================================================================== RCS file: /cvs/master/olf/ecom/spreadmap/spread_src/spread4_olf/daemon/spread.c,v retrieving revision 1.11 diff -b -c -1 -1 -r1.11 spread.c *** spreadmap/spread_src/spread4_olf/daemon/spread.c 21 Jul 2009 20:40:23 -0000 1.11 --- spreadmap/spread_src/spread4_olf/daemon/spread.c 24 Jan 2013 22:12:33 -0000 *************** *** 148,170 **** int _run_main(int argc, char *argv[]) { #ifndef ARCH_PC_WIN95 struct group *grp; struct passwd *pwd; #endif #ifdef ARCH_PC_WIN95 int ret; ! ret = WSAStartup( MAKEWORD(1,1), &WSAData ); if( ret != 0 ) Alarmp( SPLOG_FATAL, NETWORK, "Spread: winsock initialization error %d\n", ret ); #endif /* ARCH_PC_WIN95 */ /* initialize each valid authentication protocol */ null_init(); ip_init(); #ifdef ENABLE_PASSWORD pword_init(); #endif --- 148,170 ---- int _run_main(int argc, char *argv[]) { #ifndef ARCH_PC_WIN95 struct group *grp; struct passwd *pwd; #endif #ifdef ARCH_PC_WIN95 int ret; ! ret = WSAStartup( MAKEWORD(2,2), &WSAData ); if( ret != 0 ) Alarmp( SPLOG_FATAL, NETWORK, "Spread: winsock initialization error %d\n", ret ); #endif /* ARCH_PC_WIN95 */ /* initialize each valid authentication protocol */ null_init(); ip_init(); #ifdef ENABLE_PASSWORD pword_init(); #endif Index: spreadmap/spread_src/spread4_olf/include/sp.h =================================================================== RCS file: /cvs/master/olf/ecom/spreadmap/spread_src/spread4_olf/include/sp.h,v retrieving revision 1.5 diff -b -c -1 -1 -r1.5 sp.h *** spreadmap/spread_src/spread4_olf/include/sp.h 5 Jan 2007 15:54:25 -0000 1.5 --- spreadmap/spread_src/spread4_olf/include/sp.h 24 Jan 2013 22:12:34 -0000 *************** *** 29,50 **** --- 29,53 ---- * Cristina Nita-Rotaru crisn@cs.purdue.edu - group communication security. * Theo Schlossnagle jesus@omniti.com - Perl, autoconf, old skiplist. * Dan Schoenblum dansch@cnds.jhu.edu - Java interface. * */ #ifndef INC_SP #define INC_SP + /* for size_t */ + #include + #ifdef __cplusplus extern "C" { #endif #define int16 short #define int32 int #define Flip_int16( type ) ( ( (type >> 8) & 0x00ff) | ( (type << 8) & 0xff00) ) #define Flip_int32( type ) ( ( (type >>24) & 0x000000ff) | ( (type >> 8) & 0x0000ff00) | ( (type << 8) & 0x00ff0000) | ( (type <<24) & 0xff000000) ) *************** *** 131,157 **** #define GROUPS_TOO_SHORT -16 #define MESSAGE_TOO_LONG -17 #define NET_ERROR_ON_SESSION -18 #define MAX_CLIENT_SCATTER_ELEMENTS 100 typedef int mailbox; typedef int service; typedef struct dummy_scat_element{ char *buf; ! int len; } scat_element; typedef struct dummy_scatter{ ! int num_elements; scat_element elements[MAX_CLIENT_SCATTER_ELEMENTS]; } scatter; typedef struct dummy_group_id { int32 id[3]; } group_id; typedef struct dummy_vs_set_info { unsigned int num_members; unsigned int members_offset; /* offset from beginning of msg body*/ } vs_set_info; --- 134,160 ---- #define GROUPS_TOO_SHORT -16 #define MESSAGE_TOO_LONG -17 #define NET_ERROR_ON_SESSION -18 #define MAX_CLIENT_SCATTER_ELEMENTS 100 typedef int mailbox; typedef int service; typedef struct dummy_scat_element{ char *buf; ! size_t len; } scat_element; typedef struct dummy_scatter{ ! size_t num_elements; scat_element elements[MAX_CLIENT_SCATTER_ELEMENTS]; } scatter; typedef struct dummy_group_id { int32 id[3]; } group_id; typedef struct dummy_vs_set_info { unsigned int num_members; unsigned int members_offset; /* offset from beginning of msg body*/ } vs_set_info;