[Spread-cvs] commit: r596 - in branches/experimental-4.3-threaded: daemon libspread-util/src
jschultz at spread.org
jschultz at spread.org
Thu Oct 17 14:48:53 EDT 2013
Author: jschultz
Date: 2013-10-17 14:48:53 -0400 (Thu, 17 Oct 2013)
New Revision: 596
Modified:
branches/experimental-4.3-threaded/daemon/groups.c
branches/experimental-4.3-threaded/daemon/protocol.c
branches/experimental-4.3-threaded/daemon/protocol.h
branches/experimental-4.3-threaded/daemon/session.c
branches/experimental-4.3-threaded/daemon/session.h
branches/experimental-4.3-threaded/libspread-util/src/memory.c
Log:
Updated memory system to only use calloc/free
Changed protocol, session and groups to coordinate delivery of reg_membs and un/blocking clients
Modified: branches/experimental-4.3-threaded/daemon/groups.c
===================================================================
--- branches/experimental-4.3-threaded/daemon/groups.c 2013-10-17 15:15:44 UTC (rev 595)
+++ branches/experimental-4.3-threaded/daemon/groups.c 2013-10-17 18:48:53 UTC (rev 596)
@@ -527,12 +527,23 @@
int ret;
char ip_string[16];
bool group_changed, synced_set_changed;
+ obj_link *obj;
stdit it;
IP_to_STR( reg_memb_id.proc_id, ip_string );
Alarmp( SPLOG_INFO, GROUPS, "G_handle_reg_memb: with (%s, %d) id\n",
ip_string, reg_memb_id.time );
-
+
+ /* prepare ACK for protocol layer */
+
+ if ((obj = new(OBJ_LINK)) == NULL) {
+ Alarm(EXIT, "G_handle_reg_memb in GTRANS; can't create obj\n");
+ }
+
+ obj->type = GROUPS_DOWNQUEUE_SIG;
+ obj->mess = NULL;
+ obj->next = NULL;
+
switch( Gstate )
{
case GOP:
@@ -562,13 +573,6 @@
*/
Alarmp( SPLOG_INFO, GROUPS, "G_handle_reg_memb in GTRANS\n");
- if ((obj = new( OBJ_LINK)) == NULL) {
- Alarm(EXIT, "G_handle_reg_memb in GTRANS; can't create obj\n");
- }
-
- obj->mess = NULL;
- obj->next = NULL;
-
Reg_memb = reg_memb;
Reg_memb_id = reg_memb_id;
@@ -648,7 +652,6 @@
/* Replace down queue */
/*Prot_set_down_queue( GROUPS_DOWNQUEUE );*/
- obj->type = GROUPS_DOWNQUEUE_SIG;
/* If I'm the head of my synced set, I send one or more GROUPS messages.
* No daemon's data about members for a given group is ever split across
@@ -667,7 +670,6 @@
GlobalStatus.gstate = Gstate;
}
- Prot_new_message(obj, 0);
break;
case GGATHER:
@@ -777,6 +779,8 @@
break;
}
+
+ Prot_new_message(obj, 0);
}
void G_handle_trans_memb( configuration trans_memb, membership_id trans_memb_id )
@@ -1667,8 +1671,21 @@
Alarmp( SPLOG_INFO, GROUPS, "G_handle_groups: Last GROUPS message received - msgs %d, daemons %d\n",
Num_mess_gathered, Num_daemons_gathered );
/* Replace protocol queue */
- Prot_set_down_queue( NORMAL_DOWNQUEUE );
+ /*Prot_set_down_queue( NORMAL_DOWNQUEUE );*/
+ {
+ obj_link *obj = new(OBJ_LINK);
+ if (obj == NULL) {
+ Alarm(EXIT, "G_handle_groups: couldn't allocate queue signal obj!\n");
+ }
+
+ obj->type = NORMAL_DOWNQUEUE_SIG;
+ obj->mess = NULL;
+ obj->next = NULL;
+
+ Prot_new_message(obj, 0);
+ }
+
/* lower events threshold */
Session_threshold = LOW_PRIORITY;
Sess_set_active_threshold();
Modified: branches/experimental-4.3-threaded/daemon/protocol.c
===================================================================
--- branches/experimental-4.3-threaded/daemon/protocol.c 2013-10-17 15:15:44 UTC (rev 595)
+++ branches/experimental-4.3-threaded/daemon/protocol.c 2013-10-17 18:48:53 UTC (rev 596)
@@ -944,9 +944,23 @@
Down_queue_ptr->num_mess );
}
Down_queue_ptr->num_mess++;
- if( Down_queue_ptr->num_mess >= WATER_MARK )
- Sess_block_users_level();
+ if( Down_queue_ptr->num_mess >= WATER_MARK ) {
+ obj_link *obj = new(OBJ_LINK);
+
+ if (obj == NULL) {
+ Alarm(EXIT, "Couldn't alloc block users toggle on!\n");
+ }
+
+ obj->type = BLOCK_USERS_TOGGLE_SIG;
+ obj->mess = NULL;
+ obj->next = NULL;
+
+ Signal_Q_enqueue(&Daemon_Sess_Q, &obj);
+
+ /*Sess_block_users_level();*/
+ }
+
if( Down_queue_ptr->num_mess == 1 && Is_token_hold() )
{
leader_id = Conf_leader( Memb_active_ptr() );
@@ -1143,8 +1157,22 @@
Down_queue_ptr->num_mess--;
dispose( tmp_down->mess );
dispose( tmp_down );
- if( Down_queue_ptr->num_mess < WATER_MARK )
- Sess_unblock_users_level();
+
+ if( Down_queue_ptr->num_mess < WATER_MARK ) {
+ obj_link *obj = new(OBJ_LINK);
+
+ if (obj == NULL) {
+ Alarm(EXIT, "Couldn't alloc block users toggle off!\n");
+ }
+
+ obj->type = BLOCK_USERS_TOGGLE_SIG;
+ obj->mess = NULL;
+ obj->next = NULL;
+
+ Signal_Q_enqueue(&Daemon_Sess_Q, &obj);
+
+ /*Sess_unblock_users_level();*/
+ }
}else{
Alarm( EXIT,
"Send_new_packets: error in packet index: %d %d\n",
Modified: branches/experimental-4.3-threaded/daemon/protocol.h
===================================================================
--- branches/experimental-4.3-threaded/daemon/protocol.h 2013-10-17 15:15:44 UTC (rev 595)
+++ branches/experimental-4.3-threaded/daemon/protocol.h 2013-10-17 18:48:53 UTC (rev 596)
@@ -61,8 +61,9 @@
#define NORMAL_DOWNQUEUE 0
#define GROUPS_DOWNQUEUE 1
-#define NORMAL_DOWNQUEUE_SIG 0x10000000
-#define GROUPS_DOWNQUEUE_SIG 0x20000000
+#define NORMAL_DOWNQUEUE_SIG 0x10000000
+#define GROUPS_DOWNQUEUE_SIG 0x20000000
+#define BLOCK_USERS_TOGGLE_SIG 0x40000000
extern int Protocol_Actively_Waiting;
Modified: branches/experimental-4.3-threaded/daemon/session.c
===================================================================
--- branches/experimental-4.3-threaded/daemon/session.c 2013-10-17 15:15:44 UTC (rev 595)
+++ branches/experimental-4.3-threaded/daemon/session.c 2013-10-17 18:48:53 UTC (rev 596)
@@ -71,6 +71,7 @@
#include "spu_objects.h"
#include "spu_memory.h"
#include "session.h"
+#include "network.h"
#include "sess_types.h"
#define ext_sess_body
@@ -115,7 +116,7 @@
/* ### Added for daemon-session queues/signalling */
#include "signal_queues.h"
-static signal_q Daemon_Sess_Q;
+signal_q Daemon_Sess_Q;
static E_events *Session_Events = NULL;
#ifdef _REENTRANT
@@ -513,22 +514,18 @@
{
/* This function is used only by the session (and groups) layer */
- TODO: FIX ME
-
if( Protocol_threshold > Session_threshold )
- E_set_active_threshold( Protocol_threshold );
- else E_set_active_threshold( Session_threshold );
+ E_events_set_active_threshold( Session_Events, Protocol_threshold );
+ else E_events_set_active_threshold( Session_Events, Session_threshold );
}
void Sess_block_user(int xxx)
{
-
Alarm(EXIT,"Sess_block_user: NOT IMPLEMENTED!\n");
}
void Sess_unblock_user(int xxx)
{
-
Alarm(EXIT,"Sess_unblock_user: NOT IMPLEMENTED!\n");
}
@@ -1987,6 +1984,18 @@
goto END;
}
+ if (obj_link->type == BLOCK_USERS_TOGGLE_SIG) {
+
+ if (Protocol_threshold == LOW_PRIORITY) {
+ Sess_block_users_level();
+
+ } else {
+ Sess_unblock_users_level();
+ }
+
+ goto END;
+ }
+
assert(obj_link->type == REGULAR_MESS);
msg = mess_link->mess;
@@ -2072,6 +2081,7 @@
int num_bcast, num_token;
channel *bcast_channels;
channel *token_channels;
+ int ret;
int i;
if (obj == NULL || m == NULL) {
Modified: branches/experimental-4.3-threaded/daemon/session.h
===================================================================
--- branches/experimental-4.3-threaded/daemon/session.h 2013-10-17 15:15:44 UTC (rev 595)
+++ branches/experimental-4.3-threaded/daemon/session.h 2013-10-17 18:48:53 UTC (rev 596)
@@ -43,6 +43,7 @@
#include "sess_types.h"
#include "acm.h"
#include "protocol.h"
+#include "signal_queues.h"
typedef struct dummy_message_link {
message_obj *mess;
@@ -77,6 +78,8 @@
struct session *hash_next;
} session;
+extern signal_q Daemon_Sess_Q;
+
void Sess_init(void);
void Sess_spawn(void);
void Sess_signal_conf_reload(void);
Modified: branches/experimental-4.3-threaded/libspread-util/src/memory.c
===================================================================
--- branches/experimental-4.3-threaded/libspread-util/src/memory.c 2013-10-17 15:15:44 UTC (rev 595)
+++ branches/experimental-4.3-threaded/libspread-util/src/memory.c 2013-10-17 18:48:53 UTC (rev 596)
@@ -345,6 +345,8 @@
assert(Mem_valid_objtype(obj_type));
+ return calloc(1, sizeobj(obj_type));
+
if (Mem[obj_type].list_head == NULL)
{
mem_header * head_ptr;
@@ -464,6 +466,9 @@
mem_header * head_ptr;
if (length == 0) { return(NULL); }
+
+ return calloc(1, length);
+
if( !Mem[BLOCK_OBJECT].exist )
{
Mem[BLOCK_OBJECT].exist = TRUE;
@@ -544,6 +549,9 @@
if (object == NULL) { return; }
+ free(object);
+ return;
+
obj_type = mem_header_ptr(object)->obj_type;
ref_cnt = mem_header_ptr(object)->ref_cnt;
More information about the Spread-cvs
mailing list