[Spread-cvs] commit: r591 - branches/experimental-4.3-threaded/daemon

jschultz at spread.org jschultz at spread.org
Fri Sep 20 15:20:44 EDT 2013


Author: jschultz
Date: 2013-09-20 15:20:44 -0400 (Fri, 20 Sep 2013)
New Revision: 591

Added:
   branches/experimental-4.3-threaded/daemon/signal_queues.c
   branches/experimental-4.3-threaded/daemon/signal_queues.h
Modified:
   branches/experimental-4.3-threaded/daemon/Makefile.in
   branches/experimental-4.3-threaded/daemon/protocol.c
   branches/experimental-4.3-threaded/daemon/session.c
   branches/experimental-4.3-threaded/daemon/session.h
Log:
Changes to add signal queues, new thread, etc.


Modified: branches/experimental-4.3-threaded/daemon/Makefile.in
===================================================================
--- branches/experimental-4.3-threaded/daemon/Makefile.in	2013-09-17 19:53:38 UTC (rev 590)
+++ branches/experimental-4.3-threaded/daemon/Makefile.in	2013-09-20 19:20:44 UTC (rev 591)
@@ -49,7 +49,7 @@
 
 TARGETS=spread$(EXEEXT) spmonitor$(EXEEXT)
 
-SPREADOBJS= spread.o protocol.o session.o groups.o membership.o network.o status.o log.o flow_control.o message.o lex.yy.o y.tab.o configuration.o acm.o acp-permit.o auth-null.o auth-ip.o ip_enum.o
+SPREADOBJS= spread.o protocol.o session.o groups.o membership.o network.o status.o log.o flow_control.o message.o lex.yy.o y.tab.o configuration.o acm.o acp-permit.o auth-null.o auth-ip.o ip_enum.o signal_queues.o
 
 MONITOR_OBJS= monitor.o lex.yy.o y.tab.o configuration.o ip_enum.o acm.o
 

Modified: branches/experimental-4.3-threaded/daemon/protocol.c
===================================================================
--- branches/experimental-4.3-threaded/daemon/protocol.c	2013-09-17 19:53:38 UTC (rev 590)
+++ branches/experimental-4.3-threaded/daemon/protocol.c	2013-09-20 19:20:44 UTC (rev 591)
@@ -51,6 +51,9 @@
 #include "spu_alarm.h"
 #include "sess_types.h" /* for message_header */
 
+/* ### Added for session-daemon queuing/signalling */
+#include "signal_queues.h"
+
 typedef struct dummy_queue_link{
         sys_scatter *packet;
         struct dummy_queue_link *next;
@@ -95,6 +98,11 @@
 
 static	down_queue	Protocol_down_queue[2]; /* only used in spread3 */
 
+/* ### Added for session-daemon queuing/signalling */
+static signal_q Sess_Daemon_Q;
+static void    Prot_process_message( down_link *down_ptr );
+static void    Prot_process_sess_q( int dmy_fd, int dmy_code, void *dmy_ptr );
+
 static	void	Prot_handle_bcast();
 static	void	Prot_handle_token();
 static	int	Answer_retrans( int *ret_new_ptr, int32 *proc_id, int16 *seg_index );
@@ -207,6 +215,9 @@
 
 	Net_set_membership( Reg_membership );
 
+        Signal_Q_init(&Sess_Daemon_Q, sizeof(down_link*));
+
+	E_attach_fd( Sess_Daemon_Q.q_pipe[0], READ_FD, Prot_process_sess_q, 0, 0, HIGH_PRIORITY );
 }
 
 void    Prot_init_down_queues(void)
@@ -875,9 +886,48 @@
         E_queue( Memb_token_loss, 0, NULL, Zero_timeout );
 }
 
+/* ### Changed to use queues and signals */
 
 void	Prot_new_message( down_link *down_ptr, int not_used_in_spread3_p )
 {
+        /* Session thread just adds down_ptr to queue; Daemon thread will 
+         * get signal and process down_links from queue */
+        Signal_Q_enqueue(&Sess_Daemon_Q, &down_ptr);
+}
+
+static void    Prot_process_sess_q( int dmy_fd, int dmy_code, void *dmy_ptr )
+{
+        Signal_Q_process(&Sess_Daemon_Q, (void(*)(void*)) Prot_process_message);
+
+#if 0
+        stdit it;
+        char buf;
+        down_link **link_ptr;
+
+        /* Clear signaling pipe (should we read as much as is there instead of
+         * only one byte? should only ever have one byte though... */
+        read(Sess_Daemon_Q.q_pipe[0], &buf, 1);
+
+        /* Swap main_q with (empty) process_q so we can process messages from
+         * process_q while new messages can be added to main_q */
+        stdmutex_grab(&Sess_Daemon_Q.q_mut);
+        stdcarr_swap(&Sess_Daemon_Q.main_q, &Sess_Daemon_Q.process_q);
+        stdmutex_drop(&Sess_Daemon_Q.q_mut);
+
+        /* Process all messages in main_q */
+        stdcarr_begin(&Sess_Daemon_Q.process_q, &it);
+        while (!stdcarr_is_end(&Sess_Daemon_Q.process_q, &it))
+        {
+                link_ptr = stdcarr_it_val(&it);
+                Prot_process_message(*link_ptr);
+                stdcarr_it_next(&it);
+        }
+        stdcarr_clear(&Sess_Daemon_Q.process_q);
+#endif
+}
+
+void    Prot_process_message( down_link *down_ptr )
+{
 	int32	leader_id;
 
 	if( Down_queue_ptr->num_mess > 0 )

Modified: branches/experimental-4.3-threaded/daemon/session.c
===================================================================
--- branches/experimental-4.3-threaded/daemon/session.c	2013-09-17 19:53:38 UTC (rev 590)
+++ branches/experimental-4.3-threaded/daemon/session.c	2013-09-20 19:20:44 UTC (rev 591)
@@ -87,6 +87,8 @@
 #include "message.h"
 #include "acm.h"
 
+#include "stdutil/stderror.h"
+
 static	sp_time		Badger_timeout = { 0, 100000 };
 
 static	message_obj	New_mess;
@@ -103,6 +105,16 @@
 static	session		*Sessions_tail;
 static	session		*Sessions_free;
 
+/* ### Added for daemon-session queues/signalling */
+#include "signal_queues.h"
+
+static signal_q     Daemon_Sess_Q;
+static E_events     Session_Events;
+static stdthread    Session_Thread;
+static stdthread_id Session_Thread_ID;
+
+static  void    Sess_process_daemon_q(int dmy_fd, int dmy_code, void *dmy_ptr);
+static  void    Sess_process_message(message_link *mess_link);
 static	void	Sess_attach_accept(void);
 static	void	Sess_detach_accept(void);
 static  void    Sess_recv_client_auth(mailbox mbox, int dummy, void *dummy_p);
@@ -120,6 +132,22 @@
 static  void    Sess_create_reject_message ( message_obj *msg );
 static  int     Sess_get_p2p_dests( int num_groups, char groups[][MAX_GROUP_NAME], char dests[][MAX_GROUP_NAME] );
 
+static void * STDTHREAD_FCN Sess_run(void *arg)
+{
+  E_events_handle_events(&Session_Events);
+
+  return NULL;
+}
+
+void Sess_spawn(void)
+{
+  stdcode code;
+
+  if ((code = stdthread_spawn(&Session_Thread, &Session_Thread_ID, Sess_run, NULL))) {
+    Alarm(EXIT, "Sess_spawn: spawning session thread failed; code = %d; error = '%s'\n", code, stderr_strerr(code));
+  }
+}
+
 static  void    Sess_activate_port_reuse(mailbox mbox)
 {
     int on = 1; 
@@ -335,9 +363,13 @@
         ret = Mem_init_object( DOWN_LINK, "down_link", sizeof(down_link), 200, 0);
         if (ret < 0)
         {
-                Alarm(EXIT, "Sess_Init: Failure to Initialize DOWN_LINK memory objects\n");
+                Alarm(EXIT, "Sess_init: Failure to Initialize DOWN_LINK memory objects\n");
         }
 
+	if (E_events_init(&Session_Events)) {
+	        Alarm(EXIT, "Sess_init: E_events_init failure!\n");
+	}
+
 	Sess_init_sessions ();
 	
 	Num_sessions = 0;
@@ -435,6 +467,10 @@
 
 	G_init();
 
+        Signal_Q_init(&Daemon_Sess_Q, sizeof(message_link*)); /* ### Testing queues */
+
+	E_events_attach_fd( &Session_Events, Daemon_Sess_Q.q_pipe[0], READ_FD, Sess_process_daemon_q, 0, 0, HIGH_PRIORITY );
+
         Alarm( SESSION, "Sess_init: ended ok\n" );
 }
 
@@ -449,6 +485,8 @@
 {
 	/* This function is used only by the session (and groups) layer */
 	
+  /*TODO: FIX ME*/
+
 	if( Protocol_threshold > Session_threshold ) 
 		E_set_active_threshold( Protocol_threshold );
 	else 	E_set_active_threshold( Session_threshold );
@@ -465,6 +503,7 @@
 
         Alarm(EXIT,"Sess_unblock_user: NOT IMPLEMENTED!\n");
 }
+
 void	Sess_block_users_level()
 {
 	/* This function is used only by lower layers (protocol) */
@@ -490,12 +529,12 @@
         int i;
         for ( i=0; i < Accept_inet_mbox_num; i++)
         {
-                E_attach_fd( Accept_inet_mbox[i], READ_FD, Sess_accept, AF_INET, NULL, LOW_PRIORITY );
-                E_attach_fd( Accept_inet_mbox[i], EXCEPT_FD, Sess_accept, AF_INET, NULL, LOW_PRIORITY );
+	        E_events_attach_fd( &Session_Events, Accept_inet_mbox[i], READ_FD, Sess_accept, AF_INET, NULL, LOW_PRIORITY );
+                E_events_attach_fd( &Session_Events, Accept_inet_mbox[i], EXCEPT_FD, Sess_accept, AF_INET, NULL, LOW_PRIORITY );
         }
 #ifndef ARCH_PC_WIN95
 
-	E_attach_fd( Accept_unix_mbox, READ_FD, Sess_accept, AF_UNIX, NULL, LOW_PRIORITY );
+	E_events_attach_fd( &Session_Events, Accept_unix_mbox, READ_FD, Sess_accept, AF_UNIX, NULL, LOW_PRIORITY );
 
 #endif	/* ARCH_PC_WIN95 */
 
@@ -506,12 +545,12 @@
         int i;
         for (i=0; i < Accept_inet_mbox_num; i++)
         {
-                E_detach_fd( Accept_inet_mbox[i], READ_FD );
-                E_detach_fd( Accept_inet_mbox[i], EXCEPT_FD );
+	        E_events_detach_fd( &Session_Events, Accept_inet_mbox[i], READ_FD );
+                E_events_detach_fd( &Session_Events, Accept_inet_mbox[i], EXCEPT_FD );
         }
 #ifndef ARCH_PC_WIN95
 
-	E_detach_fd( Accept_unix_mbox, READ_FD );
+	E_events_detach_fd( &Session_Events, Accept_unix_mbox, READ_FD );
 
 #endif	/* ARCH_PC_WIN95 */
 
@@ -614,11 +653,11 @@
         }
 	/* delaying for the private name to be written */
 	Sess_detach_accept();
-	E_attach_fd( Sessions[MAX_SESSIONS].mbox, READ_FD, Sess_accept_continue, 0, NULL, LOW_PRIORITY );	
-	E_attach_fd( Sessions[MAX_SESSIONS].mbox, EXCEPT_FD, Sess_accept_continue, 0, NULL, LOW_PRIORITY );	
+	E_events_attach_fd( &Session_Events, Sessions[MAX_SESSIONS].mbox, READ_FD, Sess_accept_continue, 0, NULL, LOW_PRIORITY );	
+	E_events_attach_fd( &Session_Events, Sessions[MAX_SESSIONS].mbox, EXCEPT_FD, Sess_accept_continue, 0, NULL, LOW_PRIORITY );	
 	accept_delay.sec = 1;
 	accept_delay.usec= 0;
-	E_queue( Sess_accept_continue2, 1, NULL, accept_delay );
+	E_events_queue( &Session_Events, Sess_accept_continue2, 1, NULL, accept_delay, LOW_PRIORITY);
 }
 
 void	Sess_accept_continue(mailbox d1, int d2, void *d3)
@@ -636,9 +675,9 @@
         unsigned char           list_len;
 	session			*tmp_ses;
  
-	E_dequeue( Sess_accept_continue2, 1, NULL );
-	E_detach_fd( Sessions[MAX_SESSIONS].mbox, READ_FD );
-	E_detach_fd( Sessions[MAX_SESSIONS].mbox, EXCEPT_FD );
+	E_events_dequeue( &Session_Events, Sess_accept_continue2, 1, NULL );
+	E_events_detach_fd( &Session_Events, Sessions[MAX_SESSIONS].mbox, READ_FD );
+	E_events_detach_fd( &Session_Events, Sessions[MAX_SESSIONS].mbox, EXCEPT_FD );
 	Sess_attach_accept();
 
 	/* set file descriptor to non blocking */
@@ -862,8 +901,8 @@
         }
 
         /* Now wait for client reply */
-	E_attach_fd( Sessions[sess_location].mbox, READ_FD, Sess_recv_client_auth, 0, NULL, LOW_PRIORITY );
-	E_attach_fd( Sessions[sess_location].mbox, EXCEPT_FD, Sess_recv_client_auth, 0, NULL, LOW_PRIORITY );
+	E_events_attach_fd( &Session_Events, Sessions[sess_location].mbox, READ_FD, Sess_recv_client_auth, 0, NULL, LOW_PRIORITY );
+	E_events_attach_fd( &Session_Events, Sessions[sess_location].mbox, EXCEPT_FD, Sess_recv_client_auth, 0, NULL, LOW_PRIORITY );
 }
 
 static void    Sess_recv_client_auth(mailbox mbox, int dummy, void *dummy_p)
@@ -883,8 +922,8 @@
                 Alarm( EXIT, "Sess_recv_client_auth: BUG! Session is already authorized (status 0x%x)\n", Sessions[ses].status);
         }
     
-        E_detach_fd(mbox, READ_FD);
-        E_detach_fd(mbox, EXCEPT_FD);
+        E_events_detach_fd(&Session_Events, mbox, READ_FD);
+        E_events_detach_fd(&Session_Events, mbox, EXCEPT_FD);
 
 	/* set file descriptor to non blocking */
 	ioctl_cmd = 1;
@@ -1065,9 +1104,9 @@
         /* sending the private group name */
         send( Sessions[ses].mbox, private_group_name, name_len, 0 );
 
-        E_attach_fd( Sessions[ses].mbox, READ_FD, Sess_read, Sessions[ses].type, NULL, 
+        E_events_attach_fd( &Session_Events, Sessions[ses].mbox, READ_FD, Sess_read, Sessions[ses].type, NULL, 
                      LOW_PRIORITY );
-        E_attach_fd( Sessions[ses].mbox, EXCEPT_FD, Sess_read, Sessions[ses].type, NULL, 
+        E_events_attach_fd( &Session_Events, Sessions[ses].mbox, EXCEPT_FD, Sess_read, Sessions[ses].type, NULL, 
                      LOW_PRIORITY );
 
         Sessions[ses].status = Set_op_session( Sessions[ses].status );
@@ -1459,10 +1498,10 @@
                 }
 
                 /* close the mailbox and mark it unoperational */
-                E_dequeue( Sess_badger_TO, mbox, NULL );
-                E_detach_fd( mbox, READ_FD );
-                E_detach_fd( mbox, EXCEPT_FD );
-		E_detach_fd( mbox, WRITE_FD );
+                E_events_dequeue( &Session_Events, Sess_badger_TO, mbox, NULL );
+                E_events_detach_fd( &Session_Events, mbox, READ_FD );
+                E_events_detach_fd( &Session_Events, mbox, EXCEPT_FD );
+		E_events_detach_fd( &Session_Events, mbox, WRITE_FD );
                 close( mbox );
                 /* the mailbox is closed but the entry still points to it */
                 Sessions[ses].status = Clear_op_session( Sessions[ses].status );
@@ -1660,8 +1699,8 @@
                         Message_calculate_current_location(tmp_link->mess, len_sent, &(Sessions[ses].write) );
 
 			/* We will need to badger this guy */
-			E_queue( Sess_badger_TO, Sessions[ses].mbox, NULL, Badger_timeout );
-			E_attach_fd( Sessions[ses].mbox, WRITE_FD, Sess_badger_FD, 0, NULL, LOW_PRIORITY );
+			E_events_queue( &Session_Events, Sess_badger_TO, Sessions[ses].mbox, NULL, Badger_timeout, LOW_PRIORITY );
+			E_events_attach_fd( &Session_Events, Sessions[ses].mbox, WRITE_FD, Sess_badger_FD, 0, NULL, LOW_PRIORITY );
 		}else{
 			/* This guy was already badgered */
 			Sessions[ses].last->next = tmp_link;
@@ -1754,13 +1793,13 @@
 	ret = ioctl( mbox, FIONBIO, &ioctl_cmd);
 
 	if( Sessions[ses].num_mess > 0 ) {
-	  E_queue( Sess_badger_TO, mbox, NULL, Badger_timeout );
-	  E_attach_fd( mbox, WRITE_FD, Sess_badger_FD, 0, NULL, LOW_PRIORITY );
+	  E_events_queue( &Session_Events, Sess_badger_TO, mbox, NULL, Badger_timeout, LOW_PRIORITY);
+	  E_events_attach_fd( &Session_Events, mbox, WRITE_FD, Sess_badger_FD, 0, NULL, LOW_PRIORITY );
 
 	}else{
 	NO_WORK:
-	  E_dequeue( Sess_badger_TO, mbox, NULL );
-	  E_detach_fd( mbox, WRITE_FD );
+	  E_events_dequeue( &Session_Events, Sess_badger_TO, mbox, NULL );
+	  E_events_detach_fd( &Session_Events, mbox, WRITE_FD );
 	}
 }
 
@@ -1833,18 +1872,53 @@
         Sessions[ses].read_mess = NULL;
 
 	/* close the mailbox and mark it unoperational */
-	E_dequeue( Sess_badger_TO, mbox, NULL );
-	E_detach_fd( mbox, READ_FD );
-	E_detach_fd( mbox, EXCEPT_FD );
-	E_detach_fd( mbox, WRITE_FD );
+	E_events_dequeue( &Session_Events, Sess_badger_TO, mbox, NULL );
+	E_events_detach_fd( &Session_Events, mbox, READ_FD );
+	E_events_detach_fd( &Session_Events, mbox, EXCEPT_FD );
+	E_events_detach_fd( &Session_Events, mbox, WRITE_FD );
         close(mbox);
 	/* the mailbox is closed but the entry still points to it */
 	Sessions[ses].status = Clear_op_session( Sessions[ses].status );
 	Alarm( SESSION, "Sess_kill: killing session %s ( mailbox %d )\n",Sessions[ses].name, mbox );
 }
 
-void	Sess_deliver_message( message_link *mess_link )
+void	Sess_deliver_message( message_link *mess_link )  /* typically, but not always, called by protocol thread */
 {
+        Signal_Q_enqueue(&Daemon_Sess_Q, &mess_link);
+}
+
+static void Sess_process_daemon_q(int fd, int dmy_code, void *dmy_ptr)
+{
+  Signal_Q_process(&Daemon_Sess_Q, (void(*)(void*)) Sess_process_message);
+#if 0
+        stdit it;
+        char buf;
+        message_link **link_ptr;
+
+        /* Clear signaling pipe (should we read as much as is there instead of
+         * only one byte? should only ever have one byte though... */
+        read(Daemon_Sess_Q.q_pipe[0], &buf, 1);
+
+        /* Swap main_q with (empty) process_q so we can process messages from
+         * process_q while new messages can be added to main_q */
+        stdmutex_grab(&Daemon_Sess_Q.q_mut);
+        stdcarr_swap(&Daemon_Sess_Q.main_q, &Daemon_Sess_Q.process_q);
+        stdmutex_drop(&Daemon_Sess_Q.q_mut);
+
+        /* Process all messages in Sess_Q */
+        stdcarr_begin(&Daemon_Sess_Q.process_q, &it);
+        while (!stdcarr_is_end(&Daemon_Sess_Q.process_q, &it))
+        {
+                link_ptr = stdcarr_it_val(&it);
+                Sess_process_message(*link_ptr);
+                stdcarr_it_next(&it);
+        }
+        stdcarr_clear(&Daemon_Sess_Q.process_q);
+#endif
+}
+
+static void Sess_process_message( message_link *mess_link )
+{
 static	int		target_sessions[MAX_SESSIONS];
 	int		num_target_sessions;
 	int		source_ses;
@@ -1930,6 +2004,11 @@
 	if( !needed ) Sess_dispose_message( mess_link );
 }
 
+/* ### Sess_deliver_reg_memb and Sess_deliver_trans_memb may need to be changed to
+ * enqueue messages */
+
+/* TODO: FIX ME*/
+
 void	Sess_deliver_reg_memb( configuration reg_memb, membership_id reg_memb_id )
 {
 	G_handle_reg_memb( reg_memb, reg_memb_id );

Modified: branches/experimental-4.3-threaded/daemon/session.h
===================================================================
--- branches/experimental-4.3-threaded/daemon/session.h	2013-09-17 19:53:38 UTC (rev 590)
+++ branches/experimental-4.3-threaded/daemon/session.h	2013-09-20 19:20:44 UTC (rev 591)
@@ -77,6 +77,7 @@
 } session;
 
 void	Sess_init(void);
+void    Sess_spawn(void);
 void    Sess_signal_conf_reload(void);
 void	Sess_block_users_level(void);
 void	Sess_unblock_users_level(void);

Added: branches/experimental-4.3-threaded/daemon/signal_queues.c
===================================================================
--- branches/experimental-4.3-threaded/daemon/signal_queues.c	                        (rev 0)
+++ branches/experimental-4.3-threaded/daemon/signal_queues.c	2013-09-20 19:20:44 UTC (rev 591)
@@ -0,0 +1,91 @@
+#include <stdlib.h>
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+
+#include <sys/ioctl.h>
+
+#include "spu_alarm.h"
+#include "signal_queues.h"
+#include "stdutil/stderror.h"
+
+void Signal_Q_init(signal_q *q, stdsize vsize)
+{
+  stdcode code;
+  int     on;
+
+  if ((code = stdarr_construct(&q->main_q, vsize, STDARR_OPTS_NO_AUTO_SHRINK)) ||
+      (code = stdarr_construct(&q->process_q, vsize, STDARR_OPTS_NO_AUTO_SHRINK)) ||
+      (code = stdmutex_construct(&q->q_mut, STDMUTEX_FAST)) ||
+      pipe(q->q_pipe) ||
+      ioctl(q->q_pipe[0], FIONBIO, (on = 1, &on)) == -1 ||
+      ioctl(q->q_pipe[1], FIONBIO, (on = 1, &on)) == -1) {
+    Alarm(EXIT, "Signal_Q_init: failed; code = %d; '%s' error '%s'\n", code, stderr_strerr(code), strerror(errno));
+  }
+}
+
+void Signal_Q_destroy(signal_q *q)
+{
+  stdarr_destruct(&q->main_q);
+  stdarr_destruct(&q->process_q);
+  stdmutex_destruct(&q->q_mut);
+  close(q->q_pipe[0]);
+  close(q->q_pipe[1]);
+}
+
+void Signal_Q_enqueue(signal_q *q, void *obj)
+{
+  char    buf = '1';
+  stdcode code;
+  int     prev_size;
+  int     ret;
+
+  /* Put message on queue */
+
+  stdmutex_grab(&q->q_mut) && (abort(), 1);
+  {
+    prev_size = stdarr_size(&q->main_q);
+
+    if ((code = stdarr_push_back(&q->main_q, obj))) {
+      Alarm(EXIT, "Signal_Q_enqueue: pushing to queue failed; code = %d; error '%s'\n", code, stderr_strerr(code));
+    }
+  }
+  stdmutex_drop(&q->q_mut) && (abort(), 1);
+
+  /* Signal if necessary (i.e. if the queue was empty) */
+
+  if (prev_size == 0 && (ret = write(q->q_pipe[1], &buf, 1)) != 1 && errno != EAGAIN && errno != EWOULDBLOCK) {
+    Alarm(EXIT, "Signal_Q_enqueue: write to pipe failed, ret = %d, errno '%s'\n", ret, strerror(errno));
+  }
+}
+
+void Signal_Q_process(signal_q *q, void (func)(void*))
+{
+  stdit  it;
+  char   buf[128];
+  void **link_ptr;
+
+  /* Clear signaling pipe (should only ever have one byte though) */
+
+  if (read(q->q_pipe[0], buf, sizeof(buf)) <= 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
+    Alarm(EXIT, "Signal_Q_process: reading from pipe failed\n");
+  }
+
+  /* Swap main_q with (empty) process_q so we can process messages from
+   * process_q while new messages can be added to main_q */
+
+  stdmutex_grab(&q->q_mut) && (abort(), 1);
+  {
+    stdarr_swap(&q->main_q, &q->process_q);
+  }
+  stdmutex_drop(&q->q_mut) && (abort(), 1);
+
+  /* Process all messages in process_q */
+  
+  for (stdarr_begin(&q->process_q, &it); !stdarr_is_end(&q->process_q, &it); stdarr_it_next(&it)) {
+    link_ptr = stdarr_it_val(&it);
+    func(*link_ptr);
+  }
+
+  stdarr_clear(&q->process_q);
+}

Added: branches/experimental-4.3-threaded/daemon/signal_queues.h
===================================================================
--- branches/experimental-4.3-threaded/daemon/signal_queues.h	                        (rev 0)
+++ branches/experimental-4.3-threaded/daemon/signal_queues.h	2013-09-20 19:20:44 UTC (rev 591)
@@ -0,0 +1,21 @@
+#ifndef INC_SIGNAL_Q
+#define INC_SIGNAL_Q
+
+#include "stdutil/stdarr.h"
+#include "stdutil/stdthread.h"
+
+typedef struct 
+{
+  stdmutex q_mut;
+  int      q_pipe[2];
+  stdarr   main_q;
+  stdarr   process_q;
+
+} signal_q;
+
+void Signal_Q_init(signal_q *q, stdsize vsize);
+void Signal_Q_destroy(signal_q *q);
+void Signal_Q_enqueue(signal_q *q, void *obj);
+void Signal_Q_process(signal_q *q, void (func)(void*));
+
+#endif	/* INC_DELV_Q */ 




More information about the Spread-cvs mailing list