[Spread-cvs] commit: r591 - branches/experimental-4.3-threaded/daemon
jschultz at spread.org
jschultz at spread.org
Fri Sep 20 15:20:44 EDT 2013
Author: jschultz
Date: 2013-09-20 15:20:44 -0400 (Fri, 20 Sep 2013)
New Revision: 591
Added:
branches/experimental-4.3-threaded/daemon/signal_queues.c
branches/experimental-4.3-threaded/daemon/signal_queues.h
Modified:
branches/experimental-4.3-threaded/daemon/Makefile.in
branches/experimental-4.3-threaded/daemon/protocol.c
branches/experimental-4.3-threaded/daemon/session.c
branches/experimental-4.3-threaded/daemon/session.h
Log:
Changes to add signal queues, new thread, etc.
Modified: branches/experimental-4.3-threaded/daemon/Makefile.in
===================================================================
--- branches/experimental-4.3-threaded/daemon/Makefile.in 2013-09-17 19:53:38 UTC (rev 590)
+++ branches/experimental-4.3-threaded/daemon/Makefile.in 2013-09-20 19:20:44 UTC (rev 591)
@@ -49,7 +49,7 @@
TARGETS=spread$(EXEEXT) spmonitor$(EXEEXT)
-SPREADOBJS= spread.o protocol.o session.o groups.o membership.o network.o status.o log.o flow_control.o message.o lex.yy.o y.tab.o configuration.o acm.o acp-permit.o auth-null.o auth-ip.o ip_enum.o
+SPREADOBJS= spread.o protocol.o session.o groups.o membership.o network.o status.o log.o flow_control.o message.o lex.yy.o y.tab.o configuration.o acm.o acp-permit.o auth-null.o auth-ip.o ip_enum.o signal_queues.o
MONITOR_OBJS= monitor.o lex.yy.o y.tab.o configuration.o ip_enum.o acm.o
Modified: branches/experimental-4.3-threaded/daemon/protocol.c
===================================================================
--- branches/experimental-4.3-threaded/daemon/protocol.c 2013-09-17 19:53:38 UTC (rev 590)
+++ branches/experimental-4.3-threaded/daemon/protocol.c 2013-09-20 19:20:44 UTC (rev 591)
@@ -51,6 +51,9 @@
#include "spu_alarm.h"
#include "sess_types.h" /* for message_header */
+/* ### Added for session-daemon queuing/signalling */
+#include "signal_queues.h"
+
typedef struct dummy_queue_link{
sys_scatter *packet;
struct dummy_queue_link *next;
@@ -95,6 +98,11 @@
static down_queue Protocol_down_queue[2]; /* only used in spread3 */
+/* ### Added for session-daemon queuing/signalling */
+static signal_q Sess_Daemon_Q;
+static void Prot_process_message( down_link *down_ptr );
+static void Prot_process_sess_q( int dmy_fd, int dmy_code, void *dmy_ptr );
+
static void Prot_handle_bcast();
static void Prot_handle_token();
static int Answer_retrans( int *ret_new_ptr, int32 *proc_id, int16 *seg_index );
@@ -207,6 +215,9 @@
Net_set_membership( Reg_membership );
+ Signal_Q_init(&Sess_Daemon_Q, sizeof(down_link*));
+
+ E_attach_fd( Sess_Daemon_Q.q_pipe[0], READ_FD, Prot_process_sess_q, 0, 0, HIGH_PRIORITY );
}
void Prot_init_down_queues(void)
@@ -875,9 +886,48 @@
E_queue( Memb_token_loss, 0, NULL, Zero_timeout );
}
+/* ### Changed to use queues and signals */
void Prot_new_message( down_link *down_ptr, int not_used_in_spread3_p )
{
+ /* Session thread just adds down_ptr to queue; Daemon thread will
+ * get signal and process down_links from queue */
+ Signal_Q_enqueue(&Sess_Daemon_Q, &down_ptr);
+}
+
+static void Prot_process_sess_q( int dmy_fd, int dmy_code, void *dmy_ptr )
+{
+ Signal_Q_process(&Sess_Daemon_Q, (void(*)(void*)) Prot_process_message);
+
+#if 0
+ stdit it;
+ char buf;
+ down_link **link_ptr;
+
+ /* Clear signaling pipe (should we read as much as is there instead of
+ * only one byte? should only ever have one byte though... */
+ read(Sess_Daemon_Q.q_pipe[0], &buf, 1);
+
+ /* Swap main_q with (empty) process_q so we can process messages from
+ * process_q while new messages can be added to main_q */
+ stdmutex_grab(&Sess_Daemon_Q.q_mut);
+ stdcarr_swap(&Sess_Daemon_Q.main_q, &Sess_Daemon_Q.process_q);
+ stdmutex_drop(&Sess_Daemon_Q.q_mut);
+
+ /* Process all messages in main_q */
+ stdcarr_begin(&Sess_Daemon_Q.process_q, &it);
+ while (!stdcarr_is_end(&Sess_Daemon_Q.process_q, &it))
+ {
+ link_ptr = stdcarr_it_val(&it);
+ Prot_process_message(*link_ptr);
+ stdcarr_it_next(&it);
+ }
+ stdcarr_clear(&Sess_Daemon_Q.process_q);
+#endif
+}
+
+void Prot_process_message( down_link *down_ptr )
+{
int32 leader_id;
if( Down_queue_ptr->num_mess > 0 )
Modified: branches/experimental-4.3-threaded/daemon/session.c
===================================================================
--- branches/experimental-4.3-threaded/daemon/session.c 2013-09-17 19:53:38 UTC (rev 590)
+++ branches/experimental-4.3-threaded/daemon/session.c 2013-09-20 19:20:44 UTC (rev 591)
@@ -87,6 +87,8 @@
#include "message.h"
#include "acm.h"
+#include "stdutil/stderror.h"
+
static sp_time Badger_timeout = { 0, 100000 };
static message_obj New_mess;
@@ -103,6 +105,16 @@
static session *Sessions_tail;
static session *Sessions_free;
+/* ### Added for daemon-session queues/signalling */
+#include "signal_queues.h"
+
+static signal_q Daemon_Sess_Q;
+static E_events Session_Events;
+static stdthread Session_Thread;
+static stdthread_id Session_Thread_ID;
+
+static void Sess_process_daemon_q(int dmy_fd, int dmy_code, void *dmy_ptr);
+static void Sess_process_message(message_link *mess_link);
static void Sess_attach_accept(void);
static void Sess_detach_accept(void);
static void Sess_recv_client_auth(mailbox mbox, int dummy, void *dummy_p);
@@ -120,6 +132,22 @@
static void Sess_create_reject_message ( message_obj *msg );
static int Sess_get_p2p_dests( int num_groups, char groups[][MAX_GROUP_NAME], char dests[][MAX_GROUP_NAME] );
+static void * STDTHREAD_FCN Sess_run(void *arg)
+{
+ E_events_handle_events(&Session_Events);
+
+ return NULL;
+}
+
+void Sess_spawn(void)
+{
+ stdcode code;
+
+ if ((code = stdthread_spawn(&Session_Thread, &Session_Thread_ID, Sess_run, NULL))) {
+ Alarm(EXIT, "Sess_spawn: spawning session thread failed; code = %d; error = '%s'\n", code, stderr_strerr(code));
+ }
+}
+
static void Sess_activate_port_reuse(mailbox mbox)
{
int on = 1;
@@ -335,9 +363,13 @@
ret = Mem_init_object( DOWN_LINK, "down_link", sizeof(down_link), 200, 0);
if (ret < 0)
{
- Alarm(EXIT, "Sess_Init: Failure to Initialize DOWN_LINK memory objects\n");
+ Alarm(EXIT, "Sess_init: Failure to Initialize DOWN_LINK memory objects\n");
}
+ if (E_events_init(&Session_Events)) {
+ Alarm(EXIT, "Sess_init: E_events_init failure!\n");
+ }
+
Sess_init_sessions ();
Num_sessions = 0;
@@ -435,6 +467,10 @@
G_init();
+ Signal_Q_init(&Daemon_Sess_Q, sizeof(message_link*)); /* ### Testing queues */
+
+ E_events_attach_fd( &Session_Events, Daemon_Sess_Q.q_pipe[0], READ_FD, Sess_process_daemon_q, 0, 0, HIGH_PRIORITY );
+
Alarm( SESSION, "Sess_init: ended ok\n" );
}
@@ -449,6 +485,8 @@
{
/* This function is used only by the session (and groups) layer */
+ /*TODO: FIX ME*/
+
if( Protocol_threshold > Session_threshold )
E_set_active_threshold( Protocol_threshold );
else E_set_active_threshold( Session_threshold );
@@ -465,6 +503,7 @@
Alarm(EXIT,"Sess_unblock_user: NOT IMPLEMENTED!\n");
}
+
void Sess_block_users_level()
{
/* This function is used only by lower layers (protocol) */
@@ -490,12 +529,12 @@
int i;
for ( i=0; i < Accept_inet_mbox_num; i++)
{
- E_attach_fd( Accept_inet_mbox[i], READ_FD, Sess_accept, AF_INET, NULL, LOW_PRIORITY );
- E_attach_fd( Accept_inet_mbox[i], EXCEPT_FD, Sess_accept, AF_INET, NULL, LOW_PRIORITY );
+ E_events_attach_fd( &Session_Events, Accept_inet_mbox[i], READ_FD, Sess_accept, AF_INET, NULL, LOW_PRIORITY );
+ E_events_attach_fd( &Session_Events, Accept_inet_mbox[i], EXCEPT_FD, Sess_accept, AF_INET, NULL, LOW_PRIORITY );
}
#ifndef ARCH_PC_WIN95
- E_attach_fd( Accept_unix_mbox, READ_FD, Sess_accept, AF_UNIX, NULL, LOW_PRIORITY );
+ E_events_attach_fd( &Session_Events, Accept_unix_mbox, READ_FD, Sess_accept, AF_UNIX, NULL, LOW_PRIORITY );
#endif /* ARCH_PC_WIN95 */
@@ -506,12 +545,12 @@
int i;
for (i=0; i < Accept_inet_mbox_num; i++)
{
- E_detach_fd( Accept_inet_mbox[i], READ_FD );
- E_detach_fd( Accept_inet_mbox[i], EXCEPT_FD );
+ E_events_detach_fd( &Session_Events, Accept_inet_mbox[i], READ_FD );
+ E_events_detach_fd( &Session_Events, Accept_inet_mbox[i], EXCEPT_FD );
}
#ifndef ARCH_PC_WIN95
- E_detach_fd( Accept_unix_mbox, READ_FD );
+ E_events_detach_fd( &Session_Events, Accept_unix_mbox, READ_FD );
#endif /* ARCH_PC_WIN95 */
@@ -614,11 +653,11 @@
}
/* delaying for the private name to be written */
Sess_detach_accept();
- E_attach_fd( Sessions[MAX_SESSIONS].mbox, READ_FD, Sess_accept_continue, 0, NULL, LOW_PRIORITY );
- E_attach_fd( Sessions[MAX_SESSIONS].mbox, EXCEPT_FD, Sess_accept_continue, 0, NULL, LOW_PRIORITY );
+ E_events_attach_fd( &Session_Events, Sessions[MAX_SESSIONS].mbox, READ_FD, Sess_accept_continue, 0, NULL, LOW_PRIORITY );
+ E_events_attach_fd( &Session_Events, Sessions[MAX_SESSIONS].mbox, EXCEPT_FD, Sess_accept_continue, 0, NULL, LOW_PRIORITY );
accept_delay.sec = 1;
accept_delay.usec= 0;
- E_queue( Sess_accept_continue2, 1, NULL, accept_delay );
+ E_events_queue( &Session_Events, Sess_accept_continue2, 1, NULL, accept_delay, LOW_PRIORITY);
}
void Sess_accept_continue(mailbox d1, int d2, void *d3)
@@ -636,9 +675,9 @@
unsigned char list_len;
session *tmp_ses;
- E_dequeue( Sess_accept_continue2, 1, NULL );
- E_detach_fd( Sessions[MAX_SESSIONS].mbox, READ_FD );
- E_detach_fd( Sessions[MAX_SESSIONS].mbox, EXCEPT_FD );
+ E_events_dequeue( &Session_Events, Sess_accept_continue2, 1, NULL );
+ E_events_detach_fd( &Session_Events, Sessions[MAX_SESSIONS].mbox, READ_FD );
+ E_events_detach_fd( &Session_Events, Sessions[MAX_SESSIONS].mbox, EXCEPT_FD );
Sess_attach_accept();
/* set file descriptor to non blocking */
@@ -862,8 +901,8 @@
}
/* Now wait for client reply */
- E_attach_fd( Sessions[sess_location].mbox, READ_FD, Sess_recv_client_auth, 0, NULL, LOW_PRIORITY );
- E_attach_fd( Sessions[sess_location].mbox, EXCEPT_FD, Sess_recv_client_auth, 0, NULL, LOW_PRIORITY );
+ E_events_attach_fd( &Session_Events, Sessions[sess_location].mbox, READ_FD, Sess_recv_client_auth, 0, NULL, LOW_PRIORITY );
+ E_events_attach_fd( &Session_Events, Sessions[sess_location].mbox, EXCEPT_FD, Sess_recv_client_auth, 0, NULL, LOW_PRIORITY );
}
static void Sess_recv_client_auth(mailbox mbox, int dummy, void *dummy_p)
@@ -883,8 +922,8 @@
Alarm( EXIT, "Sess_recv_client_auth: BUG! Session is already authorized (status 0x%x)\n", Sessions[ses].status);
}
- E_detach_fd(mbox, READ_FD);
- E_detach_fd(mbox, EXCEPT_FD);
+ E_events_detach_fd(&Session_Events, mbox, READ_FD);
+ E_events_detach_fd(&Session_Events, mbox, EXCEPT_FD);
/* set file descriptor to non blocking */
ioctl_cmd = 1;
@@ -1065,9 +1104,9 @@
/* sending the private group name */
send( Sessions[ses].mbox, private_group_name, name_len, 0 );
- E_attach_fd( Sessions[ses].mbox, READ_FD, Sess_read, Sessions[ses].type, NULL,
+ E_events_attach_fd( &Session_Events, Sessions[ses].mbox, READ_FD, Sess_read, Sessions[ses].type, NULL,
LOW_PRIORITY );
- E_attach_fd( Sessions[ses].mbox, EXCEPT_FD, Sess_read, Sessions[ses].type, NULL,
+ E_events_attach_fd( &Session_Events, Sessions[ses].mbox, EXCEPT_FD, Sess_read, Sessions[ses].type, NULL,
LOW_PRIORITY );
Sessions[ses].status = Set_op_session( Sessions[ses].status );
@@ -1459,10 +1498,10 @@
}
/* close the mailbox and mark it unoperational */
- E_dequeue( Sess_badger_TO, mbox, NULL );
- E_detach_fd( mbox, READ_FD );
- E_detach_fd( mbox, EXCEPT_FD );
- E_detach_fd( mbox, WRITE_FD );
+ E_events_dequeue( &Session_Events, Sess_badger_TO, mbox, NULL );
+ E_events_detach_fd( &Session_Events, mbox, READ_FD );
+ E_events_detach_fd( &Session_Events, mbox, EXCEPT_FD );
+ E_events_detach_fd( &Session_Events, mbox, WRITE_FD );
close( mbox );
/* the mailbox is closed but the entry still points to it */
Sessions[ses].status = Clear_op_session( Sessions[ses].status );
@@ -1660,8 +1699,8 @@
Message_calculate_current_location(tmp_link->mess, len_sent, &(Sessions[ses].write) );
/* We will need to badger this guy */
- E_queue( Sess_badger_TO, Sessions[ses].mbox, NULL, Badger_timeout );
- E_attach_fd( Sessions[ses].mbox, WRITE_FD, Sess_badger_FD, 0, NULL, LOW_PRIORITY );
+ E_events_queue( &Session_Events, Sess_badger_TO, Sessions[ses].mbox, NULL, Badger_timeout, LOW_PRIORITY );
+ E_events_attach_fd( &Session_Events, Sessions[ses].mbox, WRITE_FD, Sess_badger_FD, 0, NULL, LOW_PRIORITY );
}else{
/* This guy was already badgered */
Sessions[ses].last->next = tmp_link;
@@ -1754,13 +1793,13 @@
ret = ioctl( mbox, FIONBIO, &ioctl_cmd);
if( Sessions[ses].num_mess > 0 ) {
- E_queue( Sess_badger_TO, mbox, NULL, Badger_timeout );
- E_attach_fd( mbox, WRITE_FD, Sess_badger_FD, 0, NULL, LOW_PRIORITY );
+ E_events_queue( &Session_Events, Sess_badger_TO, mbox, NULL, Badger_timeout, LOW_PRIORITY);
+ E_events_attach_fd( &Session_Events, mbox, WRITE_FD, Sess_badger_FD, 0, NULL, LOW_PRIORITY );
}else{
NO_WORK:
- E_dequeue( Sess_badger_TO, mbox, NULL );
- E_detach_fd( mbox, WRITE_FD );
+ E_events_dequeue( &Session_Events, Sess_badger_TO, mbox, NULL );
+ E_events_detach_fd( &Session_Events, mbox, WRITE_FD );
}
}
@@ -1833,18 +1872,53 @@
Sessions[ses].read_mess = NULL;
/* close the mailbox and mark it unoperational */
- E_dequeue( Sess_badger_TO, mbox, NULL );
- E_detach_fd( mbox, READ_FD );
- E_detach_fd( mbox, EXCEPT_FD );
- E_detach_fd( mbox, WRITE_FD );
+ E_events_dequeue( &Session_Events, Sess_badger_TO, mbox, NULL );
+ E_events_detach_fd( &Session_Events, mbox, READ_FD );
+ E_events_detach_fd( &Session_Events, mbox, EXCEPT_FD );
+ E_events_detach_fd( &Session_Events, mbox, WRITE_FD );
close(mbox);
/* the mailbox is closed but the entry still points to it */
Sessions[ses].status = Clear_op_session( Sessions[ses].status );
Alarm( SESSION, "Sess_kill: killing session %s ( mailbox %d )\n",Sessions[ses].name, mbox );
}
-void Sess_deliver_message( message_link *mess_link )
+void Sess_deliver_message( message_link *mess_link ) /* typically, but not always, called by protocol thread */
{
+ Signal_Q_enqueue(&Daemon_Sess_Q, &mess_link);
+}
+
+static void Sess_process_daemon_q(int fd, int dmy_code, void *dmy_ptr)
+{
+ Signal_Q_process(&Daemon_Sess_Q, (void(*)(void*)) Sess_process_message);
+#if 0
+ stdit it;
+ char buf;
+ message_link **link_ptr;
+
+ /* Clear signaling pipe (should we read as much as is there instead of
+ * only one byte? should only ever have one byte though... */
+ read(Daemon_Sess_Q.q_pipe[0], &buf, 1);
+
+ /* Swap main_q with (empty) process_q so we can process messages from
+ * process_q while new messages can be added to main_q */
+ stdmutex_grab(&Daemon_Sess_Q.q_mut);
+ stdcarr_swap(&Daemon_Sess_Q.main_q, &Daemon_Sess_Q.process_q);
+ stdmutex_drop(&Daemon_Sess_Q.q_mut);
+
+ /* Process all messages in Sess_Q */
+ stdcarr_begin(&Daemon_Sess_Q.process_q, &it);
+ while (!stdcarr_is_end(&Daemon_Sess_Q.process_q, &it))
+ {
+ link_ptr = stdcarr_it_val(&it);
+ Sess_process_message(*link_ptr);
+ stdcarr_it_next(&it);
+ }
+ stdcarr_clear(&Daemon_Sess_Q.process_q);
+#endif
+}
+
+static void Sess_process_message( message_link *mess_link )
+{
static int target_sessions[MAX_SESSIONS];
int num_target_sessions;
int source_ses;
@@ -1930,6 +2004,11 @@
if( !needed ) Sess_dispose_message( mess_link );
}
+/* ### Sess_deliver_reg_memb and Sess_deliver_trans_memb may need to be changed to
+ * enqueue messages */
+
+/* TODO: FIX ME*/
+
void Sess_deliver_reg_memb( configuration reg_memb, membership_id reg_memb_id )
{
G_handle_reg_memb( reg_memb, reg_memb_id );
Modified: branches/experimental-4.3-threaded/daemon/session.h
===================================================================
--- branches/experimental-4.3-threaded/daemon/session.h 2013-09-17 19:53:38 UTC (rev 590)
+++ branches/experimental-4.3-threaded/daemon/session.h 2013-09-20 19:20:44 UTC (rev 591)
@@ -77,6 +77,7 @@
} session;
void Sess_init(void);
+void Sess_spawn(void);
void Sess_signal_conf_reload(void);
void Sess_block_users_level(void);
void Sess_unblock_users_level(void);
Added: branches/experimental-4.3-threaded/daemon/signal_queues.c
===================================================================
--- branches/experimental-4.3-threaded/daemon/signal_queues.c (rev 0)
+++ branches/experimental-4.3-threaded/daemon/signal_queues.c 2013-09-20 19:20:44 UTC (rev 591)
@@ -0,0 +1,91 @@
+#include <stdlib.h>
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+
+#include <sys/ioctl.h>
+
+#include "spu_alarm.h"
+#include "signal_queues.h"
+#include "stdutil/stderror.h"
+
+void Signal_Q_init(signal_q *q, stdsize vsize)
+{
+ stdcode code;
+ int on;
+
+ if ((code = stdarr_construct(&q->main_q, vsize, STDARR_OPTS_NO_AUTO_SHRINK)) ||
+ (code = stdarr_construct(&q->process_q, vsize, STDARR_OPTS_NO_AUTO_SHRINK)) ||
+ (code = stdmutex_construct(&q->q_mut, STDMUTEX_FAST)) ||
+ pipe(q->q_pipe) ||
+ ioctl(q->q_pipe[0], FIONBIO, (on = 1, &on)) == -1 ||
+ ioctl(q->q_pipe[1], FIONBIO, (on = 1, &on)) == -1) {
+ Alarm(EXIT, "Signal_Q_init: failed; code = %d; '%s' error '%s'\n", code, stderr_strerr(code), strerror(errno));
+ }
+}
+
+void Signal_Q_destroy(signal_q *q)
+{
+ stdarr_destruct(&q->main_q);
+ stdarr_destruct(&q->process_q);
+ stdmutex_destruct(&q->q_mut);
+ close(q->q_pipe[0]);
+ close(q->q_pipe[1]);
+}
+
+void Signal_Q_enqueue(signal_q *q, void *obj)
+{
+ char buf = '1';
+ stdcode code;
+ int prev_size;
+ int ret;
+
+ /* Put message on queue */
+
+ stdmutex_grab(&q->q_mut) && (abort(), 1);
+ {
+ prev_size = stdarr_size(&q->main_q);
+
+ if ((code = stdarr_push_back(&q->main_q, obj))) {
+ Alarm(EXIT, "Signal_Q_enqueue: pushing to queue failed; code = %d; error '%s'\n", code, stderr_strerr(code));
+ }
+ }
+ stdmutex_drop(&q->q_mut) && (abort(), 1);
+
+ /* Signal if necessary (i.e. if the queue was empty) */
+
+ if (prev_size == 0 && (ret = write(q->q_pipe[1], &buf, 1)) != 1 && errno != EAGAIN && errno != EWOULDBLOCK) {
+ Alarm(EXIT, "Signal_Q_enqueue: write to pipe failed, ret = %d, errno '%s'\n", ret, strerror(errno));
+ }
+}
+
+void Signal_Q_process(signal_q *q, void (func)(void*))
+{
+ stdit it;
+ char buf[128];
+ void **link_ptr;
+
+ /* Clear signaling pipe (should only ever have one byte though) */
+
+ if (read(q->q_pipe[0], buf, sizeof(buf)) <= 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
+ Alarm(EXIT, "Signal_Q_process: reading from pipe failed\n");
+ }
+
+ /* Swap main_q with (empty) process_q so we can process messages from
+ * process_q while new messages can be added to main_q */
+
+ stdmutex_grab(&q->q_mut) && (abort(), 1);
+ {
+ stdarr_swap(&q->main_q, &q->process_q);
+ }
+ stdmutex_drop(&q->q_mut) && (abort(), 1);
+
+ /* Process all messages in process_q */
+
+ for (stdarr_begin(&q->process_q, &it); !stdarr_is_end(&q->process_q, &it); stdarr_it_next(&it)) {
+ link_ptr = stdarr_it_val(&it);
+ func(*link_ptr);
+ }
+
+ stdarr_clear(&q->process_q);
+}
Added: branches/experimental-4.3-threaded/daemon/signal_queues.h
===================================================================
--- branches/experimental-4.3-threaded/daemon/signal_queues.h (rev 0)
+++ branches/experimental-4.3-threaded/daemon/signal_queues.h 2013-09-20 19:20:44 UTC (rev 591)
@@ -0,0 +1,21 @@
+#ifndef INC_SIGNAL_Q
+#define INC_SIGNAL_Q
+
+#include "stdutil/stdarr.h"
+#include "stdutil/stdthread.h"
+
+typedef struct
+{
+ stdmutex q_mut;
+ int q_pipe[2];
+ stdarr main_q;
+ stdarr process_q;
+
+} signal_q;
+
+void Signal_Q_init(signal_q *q, stdsize vsize);
+void Signal_Q_destroy(signal_q *q);
+void Signal_Q_enqueue(signal_q *q, void *obj);
+void Signal_Q_process(signal_q *q, void (func)(void*));
+
+#endif /* INC_DELV_Q */
More information about the Spread-cvs
mailing list