[Spread-users] patches to libspread/sp.c

John Robinson jr at vertica.com
Wed Aug 8 15:09:51 EDT 2007


Folks,

We have patched libspread for two purposes:

1.  Fix a valgrind complaint.
2.  Switch to using poll() rather than select().

The latter is of interest to anyone that expects to have a lot of open 
files in their spread application.  Perhaps it could become a configure 
option.

I hope readers (esp. maintainers) find these useful.

/jr
--------
diff -dur spread-src-4.0.0/libspread/sp.c vspread/libspread/sp.c
--- spread-src-4.0.0/libspread/sp.c	2006-11-30 11:22:44.000000000 -0500
+++ vspread/libspread/sp.c	2007-05-29 11:14:54.000000000 -0400
@@ -53,6 +53,7 @@
  #include <signal.h>
  #include <sys/ioctl.h>
  #include <unistd.h>
+#include <sys/poll.h>

  #else	/* ARCH_PC_WIN95 */

@@ -268,9 +269,10 @@
  static  int     recv_nointr_timeout(int s, void *buf, size_t len, int 
flags, sp_time *time_out)
  {
          int ret, num_ready;
-        fd_set rset,fixed_rset;
          sp_time start_time, temp_time, target_time, wait_time;
          struct timeval  sel_time;
+        struct pollfd pset[1];
+        int poll_time;

          if ( len == 0 )
                  return(0);
@@ -279,12 +281,15 @@
                  start_time = E_get_time();
                  target_time = E_add_time(start_time, *time_out);
                  wait_time = *time_out;
-                FD_ZERO(&fixed_rset);
-                FD_SET(s, &fixed_rset);
-                rset = fixed_rset;
                  sel_time.tv_sec = wait_time.sec;
                  sel_time.tv_usec = wait_time.usec;
-                while( ((num_ready = select(s+1, &rset, NULL, NULL, 
&sel_time)) == -1) && ((sock_errno == EINTR) || (sock_errno == EAGAIN) 
|| (sock_errno == EWOULDBLOCK)) )
+                pset[0].fd = s;
+                pset[0].events =
+                    POLLIN | POLLPRI | POLLOUT | POLLERR | POLLHUP | 
POLLNVAL;
+                pset[0].revents = 0;
+                poll_time = (sel_time.tv_sec * 1000) +
+                    ((sel_time.tv_usec + 500) / 1000); /* convert to 
msec */
+                while( ((num_ready = poll(pset, 1, poll_time)) == -1) 
&& ((sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == 
EWOULDBLOCK)) )
                  {
                          temp_time = E_get_time();
                          if (E_compare_time(temp_time, target_time) < 0 ) {
@@ -296,7 +301,6 @@
                                  sock_set_errno( ERR_TIMEDOUT );
                                  return(-1);
                          }
-                        rset = fixed_rset;
                  }
                  if ( !num_ready ) {
                          printf("recv_nointr_timeout: Timed out\n");
@@ -324,7 +328,6 @@
  static  int     connect_nointr_timeout(int s, struct sockaddr *sname, 
socklen_t slen, sp_time *time_out)
  {
      int         ret, num_ready;
-    fd_set      rset,fixed_rset,wset;
      sp_time     start_time, temp_time, target_time, wait_time;
      struct timeval sel_time;
      int         non_blocking = 0;
@@ -332,6 +335,8 @@
      int         on;
      int         ret_ioctl;
      sockopt_len_t   elen;
+    struct pollfd pset[1];
+    int poll_time;

      /* initialize time values even if blocking so later use is valid */
      start_time = E_get_time();
@@ -347,20 +352,22 @@
          on = 1;
          ret_ioctl = ioctl( s, FIONBIO, &on);
      }
-    /* Handle EINTR while connecting by waiting with select until the
+    /* Handle EINTR while connecting by waiting with poll until the
       * connect completes or fails.  This is a while loop but it is never
       * done more then once. The while is so we can use 'break'
       */
      while( ((ret = connect( s, sname, slen ) ) == -1)
             && ((sock_errno == EINTR) || (sock_errno == EAGAIN) || 
(sock_errno == EWOULDBLOCK) || (sock_errno == EINPROGRESS)) )
      {
-        FD_ZERO(&fixed_rset);
-        FD_SET(s, &fixed_rset);
-        rset = fixed_rset;
-        wset = rset;
-        Alarmp( SPLOG_DEBUG, SESSION, "connect_nointr_timeout: connect 
in progress for socket %d, now wait in select\n", s);
-        /* wait for select to timeout (num_ready == 0), give a 
permanent error (num_ready < 0 && sock_errno != transient). If transient 
error, retry after checking to make sure timeout has not expired */
-        while( ((num_ready = select(s+1, &rset, &wset, NULL, 
&sel_time)) == -1) && ((sock_errno == EINTR) || (sock_errno == EAGAIN) 
|| (sock_errno == EWOULDBLOCK)) )
+        Alarmp( SPLOG_DEBUG, SESSION, "connect_nointr_timeout: connect 
in progress for socket %d, now wait in poll\n", s);
+        pset[0].fd = s;
+        pset[0].events =
+            POLLIN | POLLPRI | POLLOUT | POLLERR | POLLHUP | POLLNVAL;
+        pset[0].revents = 0;
+        poll_time = (sel_time.tv_sec * 1000) +
+            ((sel_time.tv_usec + 500) / 1000); /* convert to msec */
+        /* wait for poll to timeout (num_ready == 0), give a permanent 
error (num_ready < 0 && sock_errno != transient). If transient error, 
retry after checking to make sure timeout has not expired */
+        while( ((num_ready = poll(pset, 1, poll_time)) == -1) && 
((sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == 
EWOULDBLOCK)) )
          {
              temp_time = E_get_time();
              if (E_compare_time(temp_time, target_time) < 0 ) {
@@ -368,14 +375,12 @@
                  sel_time.tv_sec = wait_time.sec;
                  sel_time.tv_usec = wait_time.usec;
              } else {
-                Alarmp( SPLOG_WARNING, SESSION, 
"connect_nointr_timeout: connect interrupted and select wait timesout 
during transient error: %s\n", sock_strerror(sock_errno));
+                Alarmp( SPLOG_WARNING, SESSION, 
"connect_nointr_timeout: connect interrupted and poll wait timesout 
during transient error: %s\n", sock_strerror(sock_errno));
                  close(s);
                  sock_set_errno( ERR_TIMEDOUT );
                  ret = -1;
                  goto done_connect_try;
              }
-            rset = fixed_rset;
-            wset = rset;
          }
          if ( num_ready == 0 ) {
              /* timeout */
@@ -385,11 +390,11 @@
              break;
          } else if ( num_ready < 0 )
          {
-            Alarmp( SPLOG_WARNING, SESSION, "connect_nointr_timeout: 
connect interrupted and error in select wait: %s\n", 
sock_strerror(sock_errno));
+            Alarmp( SPLOG_WARNING, SESSION, "connect_nointr_timeout: 
connect interrupted and error in poll wait: %s\n", 
sock_strerror(sock_errno));
              ret = -1;
              break;
          }
-        if (FD_ISSET(s, &rset) || FD_ISSET( s, &wset))
+        else if (pset[0].revents != 0)
          {
              err = 0;
              elen = sizeof(err);
@@ -407,7 +412,7 @@
              }
              break;
          } else {
-            Alarmp( SPLOG_FATAL, SESSION, "connect_nointr_timeout: 
connect interrupted--but select does not indicate either error or 
connecting socket ready. Impossible condition (i.e. bug).  ret= %d: 
%s\n", err, sock_strerror(sock_errno));
+            Alarmp( SPLOG_FATAL, SESSION, "connect_nointr_timeout: 
connect interrupted--but poll does not indicate either error or 
connecting socket ready. Impossible condition (i.e. bug).  ret= %d: 
%s\n", err, sock_strerror(sock_errno));
              ret = -1;
              break;
          }
@@ -937,6 +942,7 @@
  		Mutex_unlock( &Struct_mutex );
  		return( ILLEGAL_SESSION );
  	}
+	memset(send_group, 0, MAX_GROUP_NAME);
  	strcpy(send_group, Sessions[ses].private_group_name );

  	Mutex_unlock( &Struct_mutex );




More information about the Spread-users mailing list