/* * The Spread Toolkit. * * The contents of this file are subject to the Spread Open-Source * License, Version 1.0 (the ``License''); you may not use * this file except in compliance with the License. You may obtain a * copy of the License at: * * http://www.spread.org/license/ * * or in the file ``license.txt'' found in this distribution. * * Software distributed under the License is distributed on an AS IS basis, * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License * for the specific language governing rights and limitations under the * License. * * The Creators of Spread are: * Yair Amir, Michal Miskin-Amir, Jonathan Stanton. * * Copyright (C) 1993-2001 Spread Concepts LLC * * All Rights Reserved. * * Major Contributor(s): * --------------- * Dan Schoenblum dansch@cnds.jhu.edu - Java Interface Developer. * John Schultz jschultz@cnds.jhu.edu - contribution to process group membership. * Theo Schlossnagle theos@cnds.jhu.edu - Perl library and Skiplists. * */ /* * simple_user.c * * This simple program demonstrates how to use the Spread library. * */ #include "sp.h" #include #include #include #include #include #include #include #include #include #include #include #include #include // int gettimeofday(struct timeval *tp, void *tzp); int myRLSD_cnt; static char User[80]; static char Spread_name[80]; static char Private_group[MAX_GROUP_NAME]; static mailbox Mbox; static int Read_message(); static void Usage( int argc, char *argv[] ); #define DATA_SIZE 1500 #define THISCOUNT 100000 #define BURST 200000 int mainpid; int g_data_size = DATA_SIZE; int g_thiscount = THISCOUNT; int g_burst = BURST; int g_use_gcs = 0; void signal_handler(int sigid) { printf("Caught Signal %d. Process %d Exiting gracefully\n",sigid, getpid()); exit(1); } /* function to be executed by the new thread */ void* do_loop(void* data) { int ret; int count = 0; int diff_sec =0; struct timeval tv1,tv; tv.tv_sec = 0; tv.tv_usec = 0; tv1.tv_sec = 0; tv1.tv_usec = 0; printf("In Receive Loop\n"); do{ ret = Read_message(); if(!(count%g_thiscount)) { gettimeofday(&tv, NULL); diff_sec = tv.tv_sec - tv1.tv_sec; //diff_usec = tv.tv_sec - tv1.tv_sec; if (diff_sec == 0) { printf("WARNNNNNNNING: You need to increase your burst and thiscount\n"); } if (diff_sec) { printf("Read message type: %5d sec %12u usec %8u " "msgs/sec: %6d byte/sec: %8d count %8d\n", ret, tv.tv_sec, tv.tv_usec, g_thiscount/diff_sec, (g_data_size*g_thiscount)/diff_sec, count); } tv1.tv_sec = tv.tv_sec; tv1.tv_usec = tv.tv_usec; } count++; } while((Is_membership_mess( ret )) || (Is_regular_mess( ret ))); while(1) { printf("do_loop. Sleeping.....................\n"); sleep(1000); } /* terminate the thread */ pthread_exit(NULL); } //void (*signal (int sig, void (*disp)(int)))(int); int main( int argc, char *argv[] ) { int ret,i; int mess_len; char mess[g_data_size]; int thr_id; /* thread ID for the newly created thread */ pthread_t p_thread; /* thread's structure */ int a = 1; /* thread 1 identifying number */ int b = 2; /* thread 2 identifying number */ signal(SIGINT,signal_handler); Usage( argc, argv ); if(g_use_gcs == 1) { /* connecting to the daemon, requesting group information */ ret = SP_connect( Spread_name, User, 0, 1, &Mbox, Private_group ); if( ret < 0 ) { SP_error( ret ); exit(0); } /* joining a group */ SP_join( Mbox, "simple_group" ); } //----------------------------------------- /* create a new thread that will execute 'do_loop()' */ thr_id = pthread_create(&p_thread, NULL, do_loop, (void*)&a); for(i=0; i< g_data_size; i++) { mess[i] = i%10; } mess_len = g_data_size; //----------------------------------------- printf("In Send Loop\n"); for(i =0;i <=g_burst; i++) { if(!(i%g_thiscount)) { printf("Sent %d message \n",i); } /* multicast a message to that group */ if(g_use_gcs == 1) { //ret = GCS_Multicast( Mbox, SAFE_MESS, "simple_group", 1, mess_len, mess ); ret = GCS_Multicast( Mbox, UNRELIABLE_MESS, "simple_group", 1, mess_len, mess ); } else { ret = GCS_Multicast_ssodhi( Mbox, SAFE_MESS, "simple_group", 1, mess_len, mess ); } if(ret < 0) { //printf("ADD Failed. Main.Sleeping.....................\n"); } } while(1) { printf("Main.Sleeping.....................\n"); sleep(1000); } } static int Read_message() { static char mess[139264]; char sender[MAX_GROUP_NAME]; char target_groups[100][MAX_GROUP_NAME]; group_id *grp_id; int num_groups; int num_bytes; int service_type; int16 mess_type; int endian_mismatch; int i; int ret; // printf("\n============================\n"); if(g_use_gcs == 1) { ret = GCS_Receive( Mbox, &service_type, sender, 100, &num_groups, target_groups, &mess_type, &endian_mismatch, sizeof(mess), mess ); } else { ret = GCS_Receive_ssodhi( Mbox, &service_type, sender, 100, &num_groups, target_groups, &mess_type, &endian_mismatch, sizeof(mess), mess ); } if( ret < 0 ) { printf("SP_receive returned error\n"); SP_error( ret ); exit(0); } if( Is_regular_mess( service_type ) ) { /* A regular message, sent by one of the processes */ mess[ret] = 0; #if 0 if ( Is_unreliable_mess( service_type ) ) printf("received UNRELIABLE "); else if( Is_reliable_mess( service_type ) ) printf("received RELIABLE "); else if( Is_fifo_mess( service_type ) ) printf("received FIFO "); else if( Is_causal_mess( service_type ) ) printf("received CAUSAL "); else if( Is_agreed_mess( service_type ) ) printf("received AGREED "); else if( Is_safe_mess( service_type ) ) printf("received SAFE "); printf("message from %s of type %d (endian %d), to %d groups \n(%d bytes): %s\n", sender, mess_type, endian_mismatch, num_groups, ret, mess ); #endif }else if( Is_membership_mess( service_type ) ){ /* A membership notification */ if ( Is_reg_memb_mess( service_type ) ) { num_bytes = 0; grp_id = (group_id *)&mess[num_bytes]; num_bytes += sizeof( group_id ); printf("received REGULAR membership "); if( Is_caused_join_mess( service_type ) ) printf("caused by JOIN "); if( Is_caused_leave_mess( service_type ) ) printf("caused by LEAVE "); if( Is_caused_disconnect_mess( service_type ) ) printf("caused by DISCONNECT "); printf("for group %s with %d members:\n", sender, num_groups ); for( i=0; i < num_groups; i++ ) printf("\t%s\n", &target_groups[i][0] ); printf("grp id is %d %d %d\n",grp_id->id[0], grp_id->id[1], grp_id->id[2] ); }else if( Is_transition_mess( service_type ) ) { printf("received TRANSITIONAL membership for group %s\n", sender ); }else if( Is_caused_leave_mess( service_type ) ){ printf("received membership message that left group %s\n", sender ); }else printf("received incorrect membership message of type %d\n", service_type ); }else printf("received message of unknown message type %d with %d bytes\n", service_type, ret); return( service_type ); } #define SIZE 10 static void Usage(int argc, char *argv[]) { char b_burst[SIZE]; char *endptr1 = &b_burst[SIZE-1]; char b_thiscount[SIZE]; char *endptr2 = &b_thiscount[SIZE-1]; char b_data_size[SIZE]; char *endptr3 = &b_data_size[SIZE-1]; /* Setting defaults */ sprintf( User, "ssodhi" ); sprintf( Spread_name, "7805"); while( --argc > 0 ) { argv++; if( !strncmp( *argv, "-u", 2 ) ) { strcpy( User, argv[1] ); argc--; argv++; }else if( !strncmp( *argv, "-s", 2 ) ){ strcpy( Spread_name, argv[1] ); argc--; argv++; }else if( !strncmp( *argv, "-b", 2 ) ){ strcpy( b_burst, argv[1] ); g_burst = strtoul(&b_burst[0],&endptr1,0); argc--; argv++; }else if( !strncmp( *argv, "-c", 2 ) ){ strcpy(b_thiscount, argv[1] ); g_thiscount = strtoul(&b_thiscount[0],&endptr2,0); argc--; argv++; }else if( !strncmp(*argv, "-d", 2 ) ){ strcpy(b_data_size, argv[1] ); g_data_size = strtoul(&b_data_size[0],&endptr3,0); argc--; argv++; }else if( !strncmp(*argv, "-g", 2 ) ){ g_use_gcs = atoi( argv[1] ); argc--; argv++; }else{ printf( "Usage: user\n%s\n%s\n", "\t[-u ] : unique (in this machine) user name", "\t[-s
] : either port or port@machine", "\t[-b ] : ", "\t[-c ] : "); exit(1); } } printf("Using g_burst: %u\n",g_burst); printf("Using g_thiscount: %u\n",g_thiscount); printf("Using g_data_size: %u\n",g_data_size); printf("Using g_use_gcs: %u\n",g_use_gcs); }