[rds-devel] [PATCH RDS/netperf] Add RDS support for netperf-2.4.2

Vladimir Sokolovsky vlad at dev.mellanox.co.il
Mon Jan 8 06:35:22 PST 2007


Hi,
The following patch adds the RDS_STREAM test for netperf-2.4.2.

Patch for netperf-2.4.2. Added RDS_STREAM test.

Signed-off-by: Vladimir Sokolovsky <vlad at mellanox.co.il>

Index: netperf-2.4.2/src/netlib.c
===================================================================
--- netperf-2.4.2.orig/src/netlib.c
+++ netperf-2.4.2/src/netlib.c
@@ -151,7 +151,6 @@ char    netlib_id[]="\
 # include "missing/getaddrinfo.h"
 #endif
 
-
 #ifdef WANT_HISTOGRAM
 #include "hist.h"
 #endif /* WANT_HISTOGRAM */
@@ -436,6 +435,9 @@ inet_ttos(int type) 
   case SOCK_DGRAM:
     return("SOCK_DGRAM");
     break;
+  case SOCK_SEQPACKET:
+    return("SOCK_SEQPACKET");
+    break;
   case SOCK_STREAM:
     return("SOCK_STREAM");
     break;
@@ -480,6 +482,9 @@ inet_ftos(int family) 
     return("AF_INET6");
     break;
 #endif
+  case AF_RDS:
+        return("AF_RDS");
+        break;
   default:
     return("AF_UNSPEC");
   }
@@ -492,6 +497,7 @@ inet_nton(int af, const void *src, char 
 
   switch (af) {
   case AF_INET:
+  case AF_RDS:
     /* magic constants again... :) */
     if (cnt >= 4) {
       memcpy(dst,src,4);
@@ -2498,6 +2504,7 @@ establish_control_internal(char *hostnam
 
   /* first, we do the remote */
   memset(&hints, 0, sizeof(hints));
+
   hints.ai_family = remfam;
   hints.ai_socktype = SOCK_STREAM;
   hints.ai_protocol = IPPROTO_TCP;
Index: netperf-2.4.2/src/netlib.h
===================================================================
--- netperf-2.4.2.orig/src/netlib.h
+++ netperf-2.4.2/src/netlib.h
@@ -198,6 +198,10 @@
 #define         SCTP_RR_MANY_RESPONSE      531
 #define         SCTP_RR_MANY_RESULT        532
 
+#define         DO_RDS_STREAM           610
+#define         RDS_STREAM_RESPONSE     611
+#define         RDS_STREAM_RESULTS      612
+
 #if HAVE_INTTYPES_H
 # include <inttypes.h>
 #else
Index: netperf-2.4.2/src/netperf.c
===================================================================
--- netperf-2.4.2.orig/src/netperf.c
+++ netperf-2.4.2/src/netperf.c
@@ -163,6 +163,9 @@ main(int argc, char *argv[])
   else if (strcasecmp(test_name,"UDP_STREAM") == 0) {
     send_udp_stream(host_name);
   }
+  else if (strcasecmp(test_name,"RDS_STREAM") == 0) {
+    send_rds_stream(host_name);
+  }
   else if (strcasecmp(test_name,"UDP_RR") == 0) {
     send_udp_rr(host_name);
   }
Index: netperf-2.4.2/src/netserver.c
===================================================================
--- netperf-2.4.2.orig/src/netserver.c
+++ netperf-2.4.2/src/netserver.c
@@ -151,9 +151,9 @@ extern	char	*optarg;
 extern	int	optind, opterr;
 
 #ifndef WIN32
-#define SERVER_ARGS "dL:n:p:v:46"
+#define SERVER_ARGS "dL:n:p:rv:46"
 #else
-#define SERVER_ARGS "dL:n:p:v:46I:i:"
+#define SERVER_ARGS "dL:n:p:rv:46I:i:"
 #endif
 
  /* This routine implements the "main event loop" of the netperf	*/
@@ -256,6 +256,10 @@ process_requests()
       recv_udp_rr();
       break;
       
+    case DO_RDS_STREAM:
+      recv_rds_stream();
+      break;
+      
 #ifdef WANT_DLPI
 
     case DO_DLPI_CO_RR:
@@ -748,6 +752,11 @@ main(int argc, char *argv[])
       strncpy(listen_port,optarg,sizeof(listen_port));
       not_inetd = 1;
       break;
+    case 'r':
+      rds_enable = 1;
+      not_inetd = 1;
+      local_address_family = AF_INET;
+      break;
     case '4':
       local_address_family = AF_INET;
       break;
Index: netperf-2.4.2/src/netsh.c
===================================================================
--- netperf-2.4.2.orig/src/netsh.c
+++ netperf-2.4.2/src/netsh.c
@@ -85,6 +85,10 @@ double atof(const char *);
 #include "nettest_dns.h"
 #endif /* DO_DNS */
 
+#ifdef WANT_RDS
+int     rds_enable;                     /* enable RDS testing */
+#endif
+
 /************************************************************************/
 /*									*/
 /*	Global constants  and macros					*/
@@ -94,7 +98,7 @@ double atof(const char *);
  /* Some of the args take optional parameters. Since we are using */
  /* getopt to parse the command line, we will tell getopt that they do */
  /* not take parms, and then look for them ourselves */
-#define GLOBAL_CMD_LINE_ARGS "A:a:b:B:CcdDf:F:H:hi:I:k:K:l:L:n:O:o:P:p:t:T:v:W:w:46"
+#define GLOBAL_CMD_LINE_ARGS "A:a:b:B:CcdDf:F:H:hi:I:k:K:l:L:n:O:o:P:p:rt:T:v:W:w:46"
 
 /************************************************************************/
 /*									*/
@@ -228,6 +232,7 @@ Options:\n\
     -d                Increase debugging output\n\
     -L name,family    Use name to pick listen address and family for family\n\
     -p portnum        Listen for connect requests on portnum.\n\
+    -r                Enable RDS usage for UDP tests\n\
     -4                Do IPv4\n\
     -6                Do IPv6\n\
     -v verbosity      Specify the verbosity level\n\
@@ -256,6 +261,7 @@ Global options:\n\
     -i max,min        Specify the max and min number of iterations (15,1)\n\
     -I lvl[,intvl]    Specify confidence level (95 or 99) (99) \n\
                       and confidence interval in percentage (10)\n\
+    -r                Enable RDS usage for UDP tests\n\
     -l testlen        Specify test duration (>0 secs) (<0 bytes|trans)\n\
     -L name|ip,fam *  Specify the local ip|name and address family\n\
     -o send,recv      Set the local send,recv buffer offsets\n\
@@ -360,6 +366,10 @@ parse_address_family(char family_string[
       strstr(temp,"4")) {
     return(AF_INET);
   }
+  if (strstr(temp,"af_rds") ||
+      strstr(temp,"32")) {
+    return(AF_RDS);
+  }
   if (strstr(temp,"unspec") ||
       strstr(temp,"0")) {
     return(AF_UNSPEC);
@@ -732,6 +742,11 @@ scan_cmd_line(int argc, char *argv[])
 		"Unable to malloc space for result brand\n");
       }
       break;
+    case 'r':
+      rds_enable = 1;
+      address_family = AF_INET;
+      local_address_family = AF_INET;
+      break;
     case '4':
       address_family = AF_INET;
       local_address_family = AF_INET;
@@ -755,6 +770,9 @@ scan_cmd_line(int argc, char *argv[])
     case AF_INET:
       strcpy(host_name,"localhost");
       break;
+    case AF_RDS:
+      strcpy(host_name,"localhost");
+      break;
     case AF_UNSPEC:
       /* what to do here? case it off the local_address_family I
 	 suppose */
@@ -793,8 +811,12 @@ scan_cmd_line(int argc, char *argv[])
     case AF_INET:
       strcpy(local_host_name,"0.0.0.0");
       break;
+    case AF_RDS:
+      strcpy(local_host_name,"0.0.0.0");
+      break;
     case AF_UNSPEC:
       switch (address_family) {
+      case AF_RDS:
       case AF_INET:
       case AF_UNSPEC:
 	strcpy(local_host_name,"0.0.0.0");
@@ -849,6 +871,12 @@ scan_cmd_line(int argc, char *argv[])
 	scan_sockets_args(argc, argv);
       }
 
+    else if (strcasecmp(test_name,"RDS_STREAM") == 0)
+      {
+        rds_enable = 1;
+	scan_sockets_args(argc, argv);
+      }
+
 #ifdef WANT_DLPI
     else if ((strcasecmp(test_name,"DLCO_RR") == 0) ||
 	     (strcasecmp(test_name,"DLCL_RR") == 0) ||
Index: netperf-2.4.2/src/netsh.h
===================================================================
--- netperf-2.4.2.orig/src/netsh.h
+++ netperf-2.4.2/src/netsh.h
@@ -141,5 +141,23 @@ extern int
   dlpi_sap;
 
 #endif /* WANT_DLPI */
+#endif
 
+#ifdef WANT_RDS
+# include "rds/pfhack.h"
+extern int rds_enable;
+#ifndef AF_RDS
+# define AF_RDS OFFICIAL_PF_RDS
+#endif
+#ifndef PF_RDS
+# define PF_RDS AF_RDS
+#endif
+#ifndef SOL_RDS
+# define SOL_RDS OFFICIAL_SOL_RDS
+#endif
+#ifndef NF_RDS
+# define NF_RDS AF_RDS
 #endif
+#endif
+
+
Index: netperf-2.4.2/src/nettest_bsd.c
===================================================================
--- netperf-2.4.2.orig/src/nettest_bsd.c
+++ netperf-2.4.2/src/nettest_bsd.c
@@ -33,6 +33,8 @@ char	nettest_id[]="\
 /*	recv_udp_stream()					*/
 /*	send_udp_rr()		perform a udp request/response	*/
 /*	recv_udp_rr()						*/
+/*	send_rds_stream()	perform a rds stream test	*/
+/*	recv_rds_stream()					*/
 /*	loc_cpu_rate()		determine the local cpu maxrate */
 /*	rem_cpu_rate()		find the remote cpu maxrate	*/
 /*								*/
@@ -430,6 +432,9 @@ nf_to_af(int nf) {
   case NF_UNSPEC:
     return AF_UNSPEC;
     break;
+  case NF_RDS:
+    return AF_RDS;
+    break;
   case NF_INET6:
 #if defined(AF_INET6)
     return AF_INET6;
@@ -453,6 +458,9 @@ af_to_nf(int af) {
   case AF_UNSPEC:
     return NF_UNSPEC;
     break;
+  case AF_RDS:
+    return NF_RDS;
+    break;
 #if defined(AF_INET6)
   case AF_INET6:
     return NF_INET6;
@@ -698,6 +706,7 @@ static unsigned short
 get_port_number(struct addrinfo *res) 
 {
  switch(res->ai_family) {
+  case AF_RDS:
   case AF_INET: {
     struct sockaddr_in *foo = (struct sockaddr_in *)res->ai_addr;
     return(ntohs(foo->sin_port));
@@ -724,6 +733,7 @@ void
 set_port_number(struct addrinfo *res, unsigned short port)
 {
   switch(res->ai_family) {
+  case AF_RDS:
   case AF_INET: {
     struct sockaddr_in *foo = (struct sockaddr_in *)res->ai_addr;
     foo->sin_port = htons(port);
@@ -765,12 +775,16 @@ create_data_socket(struct addrinfo *res)
   int one;
   int    on  = 1;
   
-
   /*set up the data socket                        */
-  temp_socket = socket(res->ai_family,
-		       res->ai_socktype,
-		       res->ai_protocol);
-  
+  if (rds_enable)
+          temp_socket = socket(PF_RDS,
+        		       SOCK_SEQPACKET,
+        		       0);
+  else
+          temp_socket = socket(res->ai_family,
+        		       res->ai_socktype,
+        		       res->ai_protocol);
+ 
   if (temp_socket == INVALID_SOCKET){
     fprintf(where,
 	    "netperf: create_data_socket: socket: errno %d fam %s type %s prot %s errmsg %s\n",
@@ -5965,210 +5979,740 @@ bytes   bytes    secs            #      
 
 }
 
-
- /* this routine implements the receive side (netserver) of the */
- /* UDP_STREAM performance test. */
-
 void
-recv_udp_stream()
+send_rds_stream(char remote_host[])
 {
-  struct ring_elt *recv_ring;
+  /**********************************************************************/
+  /*									*/
+  /*               	RDS Unidirectional Send Test                    */
+  /*									*/
+  /**********************************************************************/
 
-  struct addrinfo *local_res;
-  char local_name[BUFSIZ];
-  char port_buffer[PORTBUFSIZE];
+#define RDS_LENGTH_MAX 0XFFFF - 28
 
-  struct sockaddr_in myaddr_in;
-  SOCKET	s_data;
-  netperf_socklen_t 	addrlen;
-  int	len = 0;
-  unsigned int	bytes_received = 0;
-  float	elapsed_time;
+  char *tput_title = "\
+Socket  Message  Elapsed      Messages                \n\
+Size    Size     Time         Okay Errors   Throughput\n\
+bytes   bytes    secs            #      #   %s/sec\n\n";
   
-  int	message_size;
-  unsigned int	messages_recvd = 0;
+  char *tput_fmt_0 =
+    "%7.2f\n";
   
-  struct	udp_stream_request_struct	*udp_stream_request;
-  struct	udp_stream_response_struct	*udp_stream_response;
-  struct	udp_stream_results_struct	*udp_stream_results;
+  char *tput_fmt_1 = "\
+%6d  %6d   %-7.2f   %7d %6d    %7.2f\n\
+%6d           %-7.2f   %7d           %7.2f\n\n";
   
-  udp_stream_request  = 
-    (struct udp_stream_request_struct *)netperf_request.content.test_specific_data;
-  udp_stream_response = 
-    (struct udp_stream_response_struct *)netperf_response.content.test_specific_data;
-  udp_stream_results  = 
-    (struct udp_stream_results_struct *)netperf_response.content.test_specific_data;
   
-  if (debug) {
-    fprintf(where,"netserver: recv_udp_stream: entered...\n");
-    fflush(where);
-  }
+  char *cpu_title = "\
+Socket  Message  Elapsed      Messages                   CPU      Service\n\
+Size    Size     Time         Okay Errors   Throughput   Util     Demand\n\
+bytes   bytes    secs            #      #   %s/sec %% %c%c     us/KB\n\n";
   
-  /* We want to set-up the listen socket with all the desired */
-  /* parameters and then let the initiator know that all is ready. If */
-  /* socket size defaults are to be used, then the initiator will have */
-  /* sent us 0's. If the socket sizes cannot be changed, then we will */
-  /* send-back what they are. If that information cannot be determined, */
-  /* then we send-back -1's for the sizes. If things go wrong for any */
-  /* reason, we will drop back ten yards and punt. */
+  char *cpu_fmt_0 =
+    "%6.2f %c\n";
   
-  /* If anything goes wrong, we want the remote to know about it. It */
-  /* would be best if the error that the remote reports to the user is */
-  /* the actual error we encountered, rather than some bogus unexpected */
-  /* response type message. */
+  char *cpu_fmt_1 = "\
+%6d  %6d   %-7.2f   %7d %6d    %7.1f     %-6.2f   %-6.3f\n\
+%6d           %-7.2f   %7d           %7.1f     %-6.2f   %-6.3f\n\n";
   
-  if (debug > 1) {
-    fprintf(where,"recv_udp_stream: setting the response type...\n");
-    fflush(where);
-  }
+  unsigned int	messages_recvd;
+  unsigned int 	messages_sent;
+  unsigned int	failed_sends;
+
+  float	elapsed_time,  
+        local_cpu_utilization,
+        remote_cpu_utilization;
   
-  netperf_response.content.response_type = UDP_STREAM_RESPONSE;
+  float	 local_service_demand, remote_service_demand;
+  double local_thruput, remote_thruput;
+  double bytes_sent;
+  double bytes_recvd;
   
-  if (debug > 2) {
-    fprintf(where,"recv_udp_stream: the response type is set...\n");
-    fflush(where);
-  }
   
-  /* We now alter the message_ptr variable to be at the desired */
-  /* alignment with the desired offset. */
+  int	len;
+  struct ring_elt *send_ring;
+  SOCKET 	data_socket;
   
-  if (debug > 1) {
-    fprintf(where,"recv_udp_stream: requested alignment of %d\n",
-	    udp_stream_request->recv_alignment);
-    fflush(where);
-  }
-
-  if (recv_width == 0) recv_width = 1;
-
-  recv_ring = allocate_buffer_ring(recv_width,
-				   udp_stream_request->message_size,
-				   udp_stream_request->recv_alignment,
-				   udp_stream_request->recv_offset);
+  unsigned int sum_messages_sent;
+  unsigned int sum_messages_recvd;
+  unsigned int sum_failed_sends;
+  double sum_local_thruput;
 
-  if (debug > 1) {
-    fprintf(where,"recv_udp_stream: receive alignment and offset set...\n");
-    fflush(where);
-  }
+  struct addrinfo *local_res;
+  struct addrinfo *remote_res;
   
-  /* Grab a socket to listen on, and then listen on it. */
+  struct	rds_stream_request_struct	*rds_stream_request;
+  struct	rds_stream_response_struct	*rds_stream_response;
+  struct	rds_stream_results_struct	*rds_stream_results;
+  
+  rds_stream_request	= 
+    (struct rds_stream_request_struct *)netperf_request.content.test_specific_data;
+  rds_stream_response	= 
+    (struct rds_stream_response_struct *)netperf_response.content.test_specific_data;
+  rds_stream_results	= 
+    (struct rds_stream_results_struct *)netperf_response.content.test_specific_data;
   
-  if (debug > 1) {
-    fprintf(where,"recv_udp_stream: grabbing a socket...\n");
-    fflush(where);
-  }
-
-  /* create_data_socket expects to find some things in the global */
-  /* variables, so set the globals based on the values in the request. */
-  /* once the socket has been created, we will set the response values */
-  /* based on the updated value of those globals. raj 7/94 */
-  lsr_size_req = udp_stream_request->recv_buf_size;
-  loc_rcvavoid = udp_stream_request->so_rcvavoid;
-  loc_sndavoid = udp_stream_request->so_sndavoid;
-
-  set_hostname_and_port(local_name,
-			port_buffer,
-			nf_to_af(udp_stream_request->ipfamily),
-			udp_stream_request->port);
-
-  local_res = complete_addrinfo(local_name,
-				local_name,
-				port_buffer,
-				nf_to_af(udp_stream_request->ipfamily),
-				SOCK_DGRAM,
-				IPPROTO_UDP,
-				0);
+#ifdef WANT_HISTOGRAM
+  time_hist = HIST_new();
+#endif /* WANT_HISTOGRAM */
 
-  s_data = create_data_socket(local_res);
-  
-  if (s_data == INVALID_SOCKET) {
-    netperf_response.content.serv_errno = errno;
-    send_response();
-    exit(1);
-  }
-  
-  udp_stream_response->test_length = udp_stream_request->test_length;
-  
-  /* now get the port number assigned by the system  */
-  addrlen = sizeof(myaddr_in);
-  if (getsockname(s_data, 
-		  (struct sockaddr *)&myaddr_in,
-		  &addrlen) == SOCKET_ERROR){
-    netperf_response.content.serv_errno = errno;
-    close(s_data);
-    send_response();
-    
-    exit(1);
-  }
-  
-  /* Now myaddr_in contains the port and the internet address this is */
-  /* returned to the sender also implicitly telling the sender that the */
-  /* socket buffer sizing has been done. */
-  
-  udp_stream_response->data_port_number = (int) ntohs(myaddr_in.sin_port);
-  netperf_response.content.serv_errno   = 0;
-  
-  /* But wait, there's more. If the initiator wanted cpu measurements, */
-  /* then we must call the calibrate routine, which will return the max */
-  /* rate back to the initiator. If the CPU was not to be measured, or */
-  /* something went wrong with the calibration, we will return a -1 to */
-  /* the initiator. */
+  /* since we are now disconnected from the code that established the */
+  /* control socket, and since we want to be able to use different */
+  /* protocols and such, we are passed the name of the remote host and */
+  /* must turn that into the test specific addressing information. */
   
-  udp_stream_response->cpu_rate    = (float)0.0; /* assume no cpu */
-  udp_stream_response->measure_cpu = 0;
-  if (udp_stream_request->measure_cpu) {
-    /* We will pass the rate into the calibration routine. If the */
-    /* user did not specify one, it will be 0.0, and we will do a */
-    /* "real" calibration. Otherwise, all it will really do is */
-    /* store it away... */
-    udp_stream_response->measure_cpu = 1;
-    udp_stream_response->cpu_rate = 
-      calibrate_local_cpu(udp_stream_request->cpu_rate);
+  complete_addrinfos(&remote_res,
+		     &local_res,
+		     remote_host,
+		     SOCK_DGRAM,
+		     IPPROTO_UDP,
+		     0);
+
+  if ( print_headers ) {
+    print_top_test_header("RDS UNIDIRECTIONAL SEND TEST",local_res,remote_res);
   }
-  
-  message_size	= udp_stream_request->message_size;
-  test_time	= udp_stream_request->test_length;
-  
-  /* before we send the response back to the initiator, pull some of */
-  /* the socket parms from the globals */
-  udp_stream_response->send_buf_size = lss_size;
-  udp_stream_response->recv_buf_size = lsr_size;
-  udp_stream_response->so_rcvavoid = loc_rcvavoid;
-  udp_stream_response->so_sndavoid = loc_sndavoid;
 
-  send_response();
-  
-  /* Now it's time to start receiving data on the connection. We will */
-  /* first grab the apropriate counters and then start grabbing. */
-  
-  cpu_start(udp_stream_request->measure_cpu);
-  
-#ifdef WIN32
-  /* this is used so the timer thread can close the socket out from */
-  /* under us, which to date is the easiest/cleanest/least */
-  /* Windows-specific way I can find to force the winsock calls to */
-  /* return WSAEINTR with the test is over. anything that will run on */
-  /* 95 and NT and is closer to what netperf expects from Unix signals */
-  /* and such would be appreciated raj 1/96 */
-  win_kludge_socket = s_data;
-#endif /* WIN32 */
-  
-  /* The loop will exit when the timer pops, or if we happen to recv a */
-  /* message of less than send_size bytes... */
-  
-  times_up = 0;
+  send_ring            = NULL;
+  confidence_iteration = 1;
+  init_stat();
+  sum_messages_sent    = 0;
+  sum_messages_recvd   = 0;
+  sum_failed_sends     = 0;
+  sum_local_thruput    = 0.0;
 
-  start_timer(test_time + PAD_TIME);
+  /* we have a great-big while loop which controls the number of times */
+  /* we run a particular test. this is for the calculation of a */
+  /* confidence interval (I really should have stayed awake during */
+  /* probstats :). If the user did not request confidence measurement */
+  /* (no confidence is the default) then we will only go though the */
+  /* loop once. the confidence stuff originates from the folks at IBM */
 
-  if (debug) {
-    fprintf(where,"recv_udp_stream: about to enter inner sanctum.\n");
-    fflush(where);
-  }
-  
-  while (!times_up) {
-#ifdef WIN32
-    /* for some reason, winsock does not like calling recv() on a UDP */
-    /* socket, unlike BSD sockets. NIH strikes again :( raj 1/96 */
-    if((len = recvfrom(s_data,
+  while (((confidence < 0) && (confidence_iteration < iteration_max)) ||
+	 (confidence_iteration <= iteration_min)) {
+    
+    /* initialize a few counters. we have to remember that we might be */
+    /* going through the loop more than once. */
+    messages_sent  = 0;
+    messages_recvd = 0;
+    failed_sends   = 0;
+    times_up       = 0;
+    
+    /*set up the data socket			*/
+    data_socket = create_data_socket(local_res);
+    
+    if (data_socket == INVALID_SOCKET){
+      perror("rds_send: data socket");
+      exit(1);
+    }
+    
+    /* now, we want to see if we need to set the send_size */
+    if (send_size == 0) {
+      if (lss_size > 0) {
+	send_size = (lss_size < RDS_LENGTH_MAX ? lss_size : RDS_LENGTH_MAX);
+      }
+      else {
+	send_size = 4096;
+      }
+    }
+    
+    
+    /* set-up the data buffer with the requested alignment and offset, */
+    /* most of the numbers here are just a hack to pick something nice */
+    /* and big in an attempt to never try to send a buffer a second time */
+    /* before it leaves the node...unless the user set the width */
+    /* explicitly. */
+    if (send_width == 0) send_width = 32;
+    
+    if (send_ring == NULL ) {
+      send_ring = allocate_buffer_ring(send_width,
+				       send_size,
+				       local_send_align,
+				       local_send_offset);
+    }
+    
+    
+    /* if the user supplied a cpu rate, this call will complete rather */
+    /* quickly, otherwise, the cpu rate will be retured to us for */
+    /* possible display. The Library will keep it's own copy of this data */
+    /* for use elsewhere. We will only display it. (Does that make it */
+    /* "opaque" to us?) */
+    
+    if (local_cpu_usage)
+      local_cpu_rate = calibrate_local_cpu(local_cpu_rate);
+    
+    /* Tell the remote end to set up the data connection. The server */
+    /* sends back the port number and alters the socket parameters there. */
+    /* Of course this is a datagram service so no connection is actually */
+    /* set up, the server just sets up the socket and binds it. */
+    
+    netperf_request.content.request_type      = DO_RDS_STREAM;
+    rds_stream_request->recv_buf_size  = rsr_size_req;
+    rds_stream_request->message_size   = send_size;
+    rds_stream_request->recv_alignment = remote_recv_align;
+    rds_stream_request->recv_offset    = remote_recv_offset;
+    rds_stream_request->measure_cpu    = remote_cpu_usage;
+    rds_stream_request->cpu_rate       = remote_cpu_rate;
+    rds_stream_request->test_length    = test_time;
+    rds_stream_request->so_rcvavoid    = rem_rcvavoid;
+    rds_stream_request->so_sndavoid    = rem_sndavoid;
+    rds_stream_request->port           = atoi(remote_data_port);
+    rds_stream_request->ipfamily = af_to_nf(remote_res->ai_family);
+
+    send_request();
+    
+    recv_response();
+    
+    if (!netperf_response.content.serv_errno) {
+      if (debug)
+	fprintf(where,"send_rds_stream: remote data connection done.\n");
+    }
+    else {
+      Set_errno(netperf_response.content.serv_errno);
+      perror("send_rds_stream: error on remote");
+      exit(1);
+    }
+    
+    /* Place the port number returned by the remote into the sockaddr */
+    /* structure so our sends can be sent to the correct place. Also get */
+    /* some of the returned socket buffer information for user display. */
+    
+    /* make sure that port numbers are in the proper order */
+    set_port_number(remote_res,(short)rds_stream_response->data_port_number);
+
+    rsr_size        = rds_stream_response->recv_buf_size;
+    rss_size        = rds_stream_response->send_buf_size;
+    remote_cpu_rate = rds_stream_response->cpu_rate;
+
+#ifndef RDS_ALWAYS_SENDTO
+    /* We "connect" up to the remote post to allow is to use the send */
+    /* call instead of the sendto call. Presumeably, this is a little */
+    /* simpler, and a little more efficient. I think that it also means */
+    /* that we can be informed of certain things, but am not sure */
+    /* yet...also, this is the way I would expect a client to behave */
+    /* when talking to a server */
+    
+    if (connect(data_socket,
+		remote_res->ai_addr,
+		remote_res->ai_addrlen) == INVALID_SOCKET){
+      perror("send_rds_stream: data socket connect failed");
+      exit(1);
+    }
+
+#endif
+    
+    /* set up the timer to call us after test_time. one of these days, */
+    /* it might be nice to figure-out a nice reliable way to have the */
+    /* test controlled by a byte count as well, but since RDS is not */
+    /* reliable, that could prove difficult. so, in the meantime, we */
+    /* only allow a RDS_STREAM test to be a timed test. */
+    
+    if (test_time) {
+      times_up = 0;
+      start_timer(test_time);
+    }
+    else {
+      fprintf(where,"Sorry, RDS_STREAM tests must be timed.\n");
+      fflush(where);
+    }
+    
+    /* Get the start count for the idle counter and the start time */
+    
+    cpu_start(local_cpu_usage);
+    
+#ifdef WANT_INTERVALS
+    INTERVALS_INIT();
+#endif /* WANT_INTERVALS */
+    
+    /* Send datagrams like there was no tomorrow. at somepoint it might */
+    /* be nice to set this up so that a quantity of bytes could be sent, */
+    /* but we still need some sort of end of test trigger on the receive */
+    /* side. that could be a select with a one second timeout, but then */
+    /* if there is a test where none of the data arrives for awile and */
+    /* then starts again, we would end the test too soon. something to */
+    /* think about... */
+    while (!times_up) {
+      
+#ifdef DIRTY
+      /* we want to dirty some number of consecutive integers in the buffer */
+      /* we are about to send. we may also want to bring some number of */
+      /* them cleanly into the cache. The clean ones will follow any dirty */
+      /* ones into the cache. */
+
+      access_buffer(send_ring->buffer_ptr,
+		    send_size,
+		    loc_dirty_count,
+		    loc_clean_count);
+#endif /* DIRTY */
+      
+#ifdef WANT_HISTOGRAM
+      HIST_timestamp(&time_one);
+#endif /* WANT_HISTOGRAM */
+      
+      len = send(data_socket,
+		 send_ring->buffer_ptr,
+		 send_size,
+		 0);
+      if (len != send_size) {
+	if ((len >= 0) || 
+	    SOCKET_EINTR(len))
+	  break;
+	if (errno == ENOBUFS) {
+	  failed_sends++;
+	  continue;
+	}
+	perror("rds_send: data send error");
+	exit(1);
+      }
+      messages_sent++;          
+      
+      /* now we want to move our pointer to the next position in the */
+      /* data buffer... */
+      
+      send_ring = send_ring->next;
+      
+      
+#ifdef WANT_HISTOGRAM
+      /* get the second timestamp */
+      HIST_timestamp(&time_two);
+      HIST_add(time_hist,delta_micro(&time_one,&time_two));
+#endif /* WANT_HISTOGRAM */
+
+#ifdef WANT_INTERVALS      
+      INTERVALS_WAIT();
+#endif /* WANT_INTERVALS */
+      
+    }
+    
+    /* This is a timed test, so the remote will be returning to us after */
+    /* a time. We should not need to send any "strange" messages to tell */
+    /* the remote that the test is completed, unless we decide to add a */
+    /* number of messages to the test. */
+    
+    /* the test is over, so get stats and stuff */
+    cpu_stop(local_cpu_usage,	
+	     &elapsed_time);
+    
+    /* Get the statistics from the remote end	*/
+    recv_response();
+    if (!netperf_response.content.serv_errno) {
+      if (debug)
+	fprintf(where,"send_rds_stream: remote results obtained\n");
+    }
+    else {
+      Set_errno(netperf_response.content.serv_errno);
+      perror("send_rds_stream: error on remote");
+      exit(1);
+    }
+    
+    bytes_sent    = (double) send_size * (double) messages_sent;
+    local_thruput = calc_thruput(bytes_sent);
+    
+    messages_recvd = rds_stream_results->messages_recvd;
+    bytes_recvd    = (double) send_size * (double) messages_recvd;
+    
+    /* we asume that the remote ran for as long as we did */
+    
+    remote_thruput = calc_thruput(bytes_recvd);
+    
+    /* print the results for this socket and message size */
+    
+    if (local_cpu_usage || remote_cpu_usage) {
+      /* We must now do a little math for service demand and cpu */
+      /* utilization for the system(s) We pass zeros for the local */
+      /* cpu utilization and elapsed time to tell the routine to use */
+      /* the libraries own values for those. */
+      if (local_cpu_usage) {
+	local_cpu_utilization	= calc_cpu_util(0.0);
+	/* shouldn't this really be based on bytes_recvd, since that is */
+	/* the effective throughput of the test? I think that it should, */
+	/* so will make the change raj 11/94 */
+	local_service_demand	= calc_service_demand(bytes_recvd,
+						      0.0,
+						      0.0,
+						      0);
+      }
+      else {
+	local_cpu_utilization	= (float) -1.0;
+	local_service_demand	= (float) -1.0;
+      }
+      
+      /* The local calculations could use variables being kept by */
+      /* the local netlib routines. The remote calcuations need to */
+      /* have a few things passed to them. */
+      if (remote_cpu_usage) {
+	remote_cpu_utilization	= rds_stream_results->cpu_util;
+	remote_service_demand	= calc_service_demand(bytes_recvd,
+						      0.0,
+						      remote_cpu_utilization,
+						      rds_stream_results->num_cpus);
+      }
+      else {
+	remote_cpu_utilization	= (float) -1.0;
+	remote_service_demand	= (float) -1.0;
+      }
+    }
+    else {
+      /* we were not measuring cpu, for the confidence stuff, we */
+      /* should make it -1.0 */
+      local_cpu_utilization  = (float) -1.0;
+      local_service_demand   = (float) -1.0;
+      remote_cpu_utilization = (float) -1.0;
+      remote_service_demand  = (float) -1.0;
+    }
+    
+    /* at this point, we want to calculate the confidence information. */
+    /* if debugging is on, calculate_confidence will print-out the */
+    /* parameters we pass it */
+    
+    calculate_confidence(confidence_iteration,
+			 elapsed_time,
+			 remote_thruput,
+			 local_cpu_utilization,
+			 remote_cpu_utilization,
+			 local_service_demand,
+			 remote_service_demand);
+    
+    /* since the routine calculate_confidence is rather generic, and */
+    /* we have a few other parms of interest, we will do a little work */
+    /* here to caclulate their average. */
+    sum_messages_sent  += messages_sent;
+    sum_messages_recvd += messages_recvd;
+    sum_failed_sends   += failed_sends;
+    sum_local_thruput  += local_thruput;
+    
+    confidence_iteration++;
+
+    /* this datapoint is done, so we don't need the socket any longer */
+    close(data_socket);
+
+  }
+
+  /* we should reach this point once the test is finished */
+
+  retrieve_confident_values(&elapsed_time,
+			    &remote_thruput,
+			    &local_cpu_utilization,
+			    &remote_cpu_utilization,
+			    &local_service_demand,
+			    &remote_service_demand);
+
+  /* some of the interesting values aren't covered by the generic */
+  /* confidence routine */
+  messages_sent    = sum_messages_sent / (confidence_iteration -1);
+  messages_recvd   = sum_messages_recvd / (confidence_iteration -1);
+  failed_sends     = sum_failed_sends / (confidence_iteration -1);
+  local_thruput    = sum_local_thruput / (confidence_iteration -1);
+
+  /* We are now ready to print all the information. If the user */
+  /* has specified zero-level verbosity, we will just print the */
+  /* local service demand, or the remote service demand. If the */
+  /* user has requested verbosity level 1, he will get the basic */
+  /* "streamperf" numbers. If the user has specified a verbosity */
+  /* of greater than 1, we will display a veritable plethora of */
+  /* background information from outside of this block as it it */
+  /* not cpu_measurement specific...  */
+    
+  
+  if (confidence < 0) {
+    /* we did not hit confidence, but were we asked to look for it? */
+    if (iteration_max > 1) {
+      display_confidence();
+    }
+  }
+
+  if (local_cpu_usage || remote_cpu_usage) {
+    local_cpu_method = format_cpu_method(cpu_method);
+    remote_cpu_method = format_cpu_method(rds_stream_results->cpu_method);
+    
+    switch (verbosity) {
+    case 0:
+      if (local_cpu_usage) {
+	fprintf(where,
+		cpu_fmt_0,
+		local_service_demand,
+		local_cpu_method);
+      }
+      else {
+	fprintf(where,
+		cpu_fmt_0,
+		remote_service_demand,
+		local_cpu_method);
+      }
+      break;
+    case 1:
+    case 2:
+      if (print_headers) {
+	fprintf(where,
+		cpu_title,
+		format_units(),
+		local_cpu_method,
+		remote_cpu_method);
+      }
+
+      fprintf(where,
+	      cpu_fmt_1,		/* the format string */
+	      lss_size,		        /* local sendbuf size */
+	      send_size,		/* how large were the sends */
+	      elapsed_time,		/* how long was the test */
+	      messages_sent,
+	      failed_sends,
+	      local_thruput, 		/* what was the xfer rate */
+	      local_cpu_utilization,	/* local cpu */
+	      local_service_demand,	/* local service demand */
+	      rsr_size,
+	      elapsed_time,
+	      messages_recvd,
+	      remote_thruput,
+	      remote_cpu_utilization,	/* remote cpu */
+	      remote_service_demand);	/* remote service demand */
+      break;
+    }
+  }
+  else {
+    /* The tester did not wish to measure service demand. */
+    switch (verbosity) {
+    case 0:
+      fprintf(where,
+	      tput_fmt_0,
+	      local_thruput);
+      break;
+    case 1:
+    case 2:
+      if (print_headers) {
+	fprintf(where,tput_title,format_units());
+      }
+      fprintf(where,
+	      tput_fmt_1,		/* the format string */
+	      lss_size, 		/* local sendbuf size */
+	      send_size,		/* how large were the sends */
+	      elapsed_time, 		/* how long did it take */
+	      messages_sent,
+	      failed_sends,
+	      local_thruput,
+	      rsr_size, 		/* remote recvbuf size */
+	      elapsed_time,
+	      messages_recvd,
+	      remote_thruput);
+      break;
+    }
+  }
+
+  fflush(where);
+#ifdef WANT_HISTOGRAM
+  if (verbosity > 1) {
+    fprintf(where,"\nHistogram of time spent in send() call\n");
+    fflush(where);
+    HIST_report(time_hist);
+  }
+#endif /* WANT_HISTOGRAM */
+
+}
+
+
+
+ /* this routine implements the receive side (netserver) of the */
+ /* UDP_STREAM performance test. */
+
+void
+recv_udp_stream()
+{
+  struct ring_elt *recv_ring;
+
+  struct addrinfo *local_res;
+  char local_name[BUFSIZ];
+  char port_buffer[PORTBUFSIZE];
+
+  struct sockaddr_in myaddr_in;
+  SOCKET	s_data;
+  netperf_socklen_t 	addrlen;
+  int	len = 0;
+  unsigned int	bytes_received = 0;
+  float	elapsed_time;
+  
+  int	message_size;
+  unsigned int	messages_recvd = 0;
+  
+  struct	udp_stream_request_struct	*udp_stream_request;
+  struct	udp_stream_response_struct	*udp_stream_response;
+  struct	udp_stream_results_struct	*udp_stream_results;
+  
+  udp_stream_request  = 
+    (struct udp_stream_request_struct *)netperf_request.content.test_specific_data;
+  udp_stream_response = 
+    (struct udp_stream_response_struct *)netperf_response.content.test_specific_data;
+  udp_stream_results  = 
+    (struct udp_stream_results_struct *)netperf_response.content.test_specific_data;
+  
+  if (debug) {
+    fprintf(where,"netserver: recv_udp_stream: entered...\n");
+    fflush(where);
+  }
+  
+  /* We want to set-up the listen socket with all the desired */
+  /* parameters and then let the initiator know that all is ready. If */
+  /* socket size defaults are to be used, then the initiator will have */
+  /* sent us 0's. If the socket sizes cannot be changed, then we will */
+  /* send-back what they are. If that information cannot be determined, */
+  /* then we send-back -1's for the sizes. If things go wrong for any */
+  /* reason, we will drop back ten yards and punt. */
+  
+  /* If anything goes wrong, we want the remote to know about it. It */
+  /* would be best if the error that the remote reports to the user is */
+  /* the actual error we encountered, rather than some bogus unexpected */
+  /* response type message. */
+  
+  if (debug > 1) {
+    fprintf(where,"recv_udp_stream: setting the response type...\n");
+    fflush(where);
+  }
+  
+  netperf_response.content.response_type = UDP_STREAM_RESPONSE;
+  
+  if (debug > 2) {
+    fprintf(where,"recv_udp_stream: the response type is set...\n");
+    fflush(where);
+  }
+  
+  /* We now alter the message_ptr variable to be at the desired */
+  /* alignment with the desired offset. */
+  
+  if (debug > 1) {
+    fprintf(where,"recv_udp_stream: requested alignment of %d\n",
+	    udp_stream_request->recv_alignment);
+    fflush(where);
+  }
+
+  if (recv_width == 0) recv_width = 1;
+
+  recv_ring = allocate_buffer_ring(recv_width,
+				   udp_stream_request->message_size,
+				   udp_stream_request->recv_alignment,
+				   udp_stream_request->recv_offset);
+
+  if (debug > 1) {
+    fprintf(where,"recv_udp_stream: receive alignment and offset set...\n");
+    fflush(where);
+  }
+  
+  /* Grab a socket to listen on, and then listen on it. */
+  
+  if (debug > 1) {
+    fprintf(where,"recv_udp_stream: grabbing a socket...\n");
+    fflush(where);
+  }
+
+  /* create_data_socket expects to find some things in the global */
+  /* variables, so set the globals based on the values in the request. */
+  /* once the socket has been created, we will set the response values */
+  /* based on the updated value of those globals. raj 7/94 */
+  lsr_size_req = udp_stream_request->recv_buf_size;
+  loc_rcvavoid = udp_stream_request->so_rcvavoid;
+  loc_sndavoid = udp_stream_request->so_sndavoid;
+
+  set_hostname_and_port(local_name,
+			port_buffer,
+			nf_to_af(udp_stream_request->ipfamily),
+			udp_stream_request->port);
+
+  local_res = complete_addrinfo(local_name,
+				local_name,
+				port_buffer,
+				nf_to_af(udp_stream_request->ipfamily),
+				SOCK_DGRAM,
+				IPPROTO_UDP,
+				0);
+
+  s_data = create_data_socket(local_res);
+  
+  if (s_data == INVALID_SOCKET) {
+    netperf_response.content.serv_errno = errno;
+    send_response();
+    exit(1);
+  }
+  
+  udp_stream_response->test_length = udp_stream_request->test_length;
+  
+  /* now get the port number assigned by the system  */
+  addrlen = sizeof(myaddr_in);
+  if (getsockname(s_data, 
+		  (struct sockaddr *)&myaddr_in,
+		  &addrlen) == SOCKET_ERROR){
+    netperf_response.content.serv_errno = errno;
+    close(s_data);
+    send_response();
+    
+    exit(1);
+  }
+  
+  /* Now myaddr_in contains the port and the internet address this is */
+  /* returned to the sender also implicitly telling the sender that the */
+  /* socket buffer sizing has been done. */
+  
+  udp_stream_response->data_port_number = (int) ntohs(myaddr_in.sin_port);
+  netperf_response.content.serv_errno   = 0;
+  
+  /* But wait, there's more. If the initiator wanted cpu measurements, */
+  /* then we must call the calibrate routine, which will return the max */
+  /* rate back to the initiator. If the CPU was not to be measured, or */
+  /* something went wrong with the calibration, we will return a -1 to */
+  /* the initiator. */
+  
+  udp_stream_response->cpu_rate    = (float)0.0; /* assume no cpu */
+  udp_stream_response->measure_cpu = 0;
+  if (udp_stream_request->measure_cpu) {
+    /* We will pass the rate into the calibration routine. If the */
+    /* user did not specify one, it will be 0.0, and we will do a */
+    /* "real" calibration. Otherwise, all it will really do is */
+    /* store it away... */
+    udp_stream_response->measure_cpu = 1;
+    udp_stream_response->cpu_rate = 
+      calibrate_local_cpu(udp_stream_request->cpu_rate);
+  }
+  
+  message_size	= udp_stream_request->message_size;
+  test_time	= udp_stream_request->test_length;
+  
+  /* before we send the response back to the initiator, pull some of */
+  /* the socket parms from the globals */
+  udp_stream_response->send_buf_size = lss_size;
+  udp_stream_response->recv_buf_size = lsr_size;
+  udp_stream_response->so_rcvavoid = loc_rcvavoid;
+  udp_stream_response->so_sndavoid = loc_sndavoid;
+
+  send_response();
+  
+  /* Now it's time to start receiving data on the connection. We will */
+  /* first grab the apropriate counters and then start grabbing. */
+  
+  cpu_start(udp_stream_request->measure_cpu);
+  
+#ifdef WIN32
+  /* this is used so the timer thread can close the socket out from */
+  /* under us, which to date is the easiest/cleanest/least */
+  /* Windows-specific way I can find to force the winsock calls to */
+  /* return WSAEINTR with the test is over. anything that will run on */
+  /* 95 and NT and is closer to what netperf expects from Unix signals */
+  /* and such would be appreciated raj 1/96 */
+  win_kludge_socket = s_data;
+#endif /* WIN32 */
+  
+  /* The loop will exit when the timer pops, or if we happen to recv a */
+  /* message of less than send_size bytes... */
+  
+  times_up = 0;
+
+  start_timer(test_time + PAD_TIME);
+
+  if (debug) {
+    fprintf(where,"recv_udp_stream: about to enter inner sanctum.\n");
+    fflush(where);
+  }
+  
+  while (!times_up) {
+#ifdef WIN32
+    /* for some reason, winsock does not like calling recv() on a UDP */
+    /* socket, unlike BSD sockets. NIH strikes again :( raj 1/96 */
+    if((len = recvfrom(s_data,
 		       recv_ring->buffer_ptr,
 		       message_size, 
 		       0,0,0)        
@@ -6862,7 +7406,298 @@ bytes  bytes  bytes   bytes  secs.   per
 #endif /* WANT_HISTOGRAM */
   }
 }
+
 
+//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ /* this routine implements the receive side (netserver) of the */
+ /* RDS_STREAM performance test. */
+
+void
+recv_rds_stream()
+{
+  struct ring_elt *recv_ring;
+
+  struct addrinfo *local_res;
+  char local_name[BUFSIZ];
+  char port_buffer[PORTBUFSIZE];
+
+  struct sockaddr_in myaddr_in;
+  SOCKET	s_data;
+  netperf_socklen_t 	addrlen;
+  int	len = 0;
+  unsigned int	bytes_received = 0;
+  float	elapsed_time;
+  
+  int	message_size;
+  unsigned int	messages_recvd = 0;
+  
+  struct	rds_stream_request_struct	*rds_stream_request;
+  struct	rds_stream_response_struct	*rds_stream_response;
+  struct	rds_stream_results_struct	*rds_stream_results;
+  
+  rds_stream_request  = 
+    (struct rds_stream_request_struct *)netperf_request.content.test_specific_data;
+  rds_stream_response = 
+    (struct rds_stream_response_struct *)netperf_response.content.test_specific_data;
+  rds_stream_results  = 
+    (struct rds_stream_results_struct *)netperf_response.content.test_specific_data;
+  
+  if (debug) {
+    fprintf(where,"netserver: recv_rds_stream: entered...\n");
+    fflush(where);
+  }
+  
+  /* We want to set-up the listen socket with all the desired */
+  /* parameters and then let the initiator know that all is ready. If */
+  /* socket size defaults are to be used, then the initiator will have */
+  /* sent us 0's. If the socket sizes cannot be changed, then we will */
+  /* send-back what they are. If that information cannot be determined, */
+  /* then we send-back -1's for the sizes. If things go wrong for any */
+  /* reason, we will drop back ten yards and punt. */
+  
+  /* If anything goes wrong, we want the remote to know about it. It */
+  /* would be best if the error that the remote reports to the user is */
+  /* the actual error we encountered, rather than some bogus unexpected */
+  /* response type message. */
+  
+  if (debug > 1) {
+    fprintf(where,"recv_rds_stream: setting the response type...\n");
+    fflush(where);
+  }
+  
+  netperf_response.content.response_type = RDS_STREAM_RESPONSE;
+  
+  if (debug > 2) {
+    fprintf(where,"recv_rds_stream: the response type is set...\n");
+    fflush(where);
+  }
+  
+  /* We now alter the message_ptr variable to be at the desired */
+  /* alignment with the desired offset. */
+  
+  if (debug > 1) {
+    fprintf(where,"recv_rds_stream: requested alignment of %d\n",
+	    rds_stream_request->recv_alignment);
+    fflush(where);
+  }
+
+  if (recv_width == 0) recv_width = 1;
+
+  recv_ring = allocate_buffer_ring(recv_width,
+				   rds_stream_request->message_size,
+				   rds_stream_request->recv_alignment,
+				   rds_stream_request->recv_offset);
+
+  if (debug > 1) {
+    fprintf(where,"recv_rds_stream: receive alignment and offset set...\n");
+    fflush(where);
+  }
+  
+  /* Grab a socket to listen on, and then listen on it. */
+  
+  if (debug > 1) {
+    fprintf(where,"recv_rds_stream: grabbing a socket...\n");
+    fflush(where);
+  }
+
+  /* create_data_socket expects to find some things in the global */
+  /* variables, so set the globals based on the values in the request. */
+  /* once the socket has been created, we will set the response values */
+  /* based on the updated value of those globals. raj 7/94 */
+  lsr_size_req = rds_stream_request->recv_buf_size;
+  loc_rcvavoid = rds_stream_request->so_rcvavoid;
+  loc_sndavoid = rds_stream_request->so_sndavoid;
+
+  set_hostname_and_port(local_name,
+			port_buffer,
+			nf_to_af(rds_stream_request->ipfamily),
+			rds_stream_request->port);
+
+  local_res = complete_addrinfo(local_name,
+				local_name,
+				port_buffer,
+				nf_to_af(rds_stream_request->ipfamily),
+				SOCK_DGRAM,
+				IPPROTO_UDP,
+				0);
+
+  s_data = create_data_socket(local_res);
+  
+  if (s_data == INVALID_SOCKET) {
+    netperf_response.content.serv_errno = errno;
+    send_response();
+    exit(1);
+  }
+
+  rds_stream_response->test_length = rds_stream_request->test_length;
+  
+  /* now get the port number assigned by the system  */
+  addrlen = sizeof(myaddr_in);
+  if (getsockname(s_data, 
+		  (struct sockaddr *)&myaddr_in,
+		  &addrlen) == SOCKET_ERROR){
+    netperf_response.content.serv_errno = errno;
+    close(s_data);
+    send_response();
+    
+    exit(1);
+  }
+  
+  /* Now myaddr_in contains the port and the internet address this is */
+  /* returned to the sender also implicitly telling the sender that the */
+  /* socket buffer sizing has been done. */
+  
+  rds_stream_response->data_port_number = (int) ntohs(myaddr_in.sin_port);
+  netperf_response.content.serv_errno   = 0;
+  
+  /* But wait, there's more. If the initiator wanted cpu measurements, */
+  /* then we must call the calibrate routine, which will return the max */
+  /* rate back to the initiator. If the CPU was not to be measured, or */
+  /* something went wrong with the calibration, we will return a -1 to */
+  /* the initiator. */
+  
+  rds_stream_response->cpu_rate    = (float)0.0; /* assume no cpu */
+  rds_stream_response->measure_cpu = 0;
+  if (rds_stream_request->measure_cpu) {
+    /* We will pass the rate into the calibration routine. If the */
+    /* user did not specify one, it will be 0.0, and we will do a */
+    /* "real" calibration. Otherwise, all it will really do is */
+    /* store it away... */
+    rds_stream_response->measure_cpu = 1;
+    rds_stream_response->cpu_rate = 
+      calibrate_local_cpu(rds_stream_request->cpu_rate);
+  }
+  
+  message_size	= rds_stream_request->message_size;
+  test_time	= rds_stream_request->test_length;
+  
+  /* before we send the response back to the initiator, pull some of */
+  /* the socket parms from the globals */
+  rds_stream_response->send_buf_size = lss_size;
+  rds_stream_response->recv_buf_size = lsr_size;
+  rds_stream_response->so_rcvavoid = loc_rcvavoid;
+  rds_stream_response->so_sndavoid = loc_sndavoid;
+
+  send_response();
+  
+  /* Now it's time to start receiving data on the connection. We will */
+  /* first grab the apropriate counters and then start grabbing. */
+  
+  cpu_start(rds_stream_request->measure_cpu);
+  
+#ifdef WIN32
+  /* this is used so the timer thread can close the socket out from */
+  /* under us, which to date is the easiest/cleanest/least */
+  /* Windows-specific way I can find to force the winsock calls to */
+  /* return WSAEINTR with the test is over. anything that will run on */
+  /* 95 and NT and is closer to what netperf expects from Unix signals */
+  /* and such would be appreciated raj 1/96 */
+  win_kludge_socket = s_data;
+#endif /* WIN32 */
+  
+  /* The loop will exit when the timer pops, or if we happen to recv a */
+  /* message of less than send_size bytes... */
+  
+  times_up = 0;
+
+  start_timer(test_time + PAD_TIME);
+
+  if (debug) {
+    fprintf(where,"recv_rds_stream: about to enter inner sanctum.\n");
+    fflush(where);
+  }
+  
+  while (!times_up) {
+#ifdef WIN32
+    /* for some reason, winsock does not like calling recv() on a RDS */
+    /* socket, unlike BSD sockets. NIH strikes again :( raj 1/96 */
+    if((len = recvfrom(s_data,
+		       recv_ring->buffer_ptr,
+		       message_size, 
+		       0,0,0)        
+	) != message_size)     
+#else
+    if ((len = recv(s_data, 
+		    recv_ring->buffer_ptr,
+		    message_size, 
+		    0)) != message_size) 
+#endif
+	{
+      if ((len == SOCKET_ERROR) && !SOCKET_EINTR(len)) {
+        netperf_response.content.serv_errno = errno;
+	    send_response();
+	    exit(1);
+      }
+      break;
+    }
+    messages_recvd++;
+    recv_ring = recv_ring->next;
+  }
+  
+  if (debug) {
+    fprintf(where,"recv_rds_stream: got %d messages.\n",messages_recvd);
+    fflush(where);
+  }
+  
+  
+  /* The loop now exits due timer or < send_size bytes received. in */
+  /* reality, we only really support a timed RDS_STREAM test. raj */
+  /* 12/95 */
+  
+  cpu_stop(rds_stream_request->measure_cpu,&elapsed_time);
+  
+  if (times_up) {
+    /* we ended on a timer, subtract the PAD_TIME */
+    elapsed_time -= (float)PAD_TIME;
+  }
+  else {
+    stop_timer();
+  }
+  
+  if (debug) {
+    fprintf(where,"recv_rds_stream: test ended in %f seconds.\n",elapsed_time);
+    fflush(where);
+  }
+  
+  
+  /* We will count the "off" message that got us out of the loop */
+  bytes_received = (messages_recvd * message_size) + len;
+  
+  /* send the results to the sender			*/
+  
+  if (debug) {
+    fprintf(where,
+	    "recv_rds_stream: got %d bytes\n",
+	    bytes_received);
+    fflush(where);
+  }
+  
+  netperf_response.content.response_type	= RDS_STREAM_RESULTS;
+  rds_stream_results->bytes_received	= htonl(bytes_received);
+  rds_stream_results->messages_recvd	= messages_recvd;
+  rds_stream_results->elapsed_time	= elapsed_time;
+  rds_stream_results->cpu_method        = cpu_method;
+  rds_stream_results->num_cpus          = lib_num_loc_cpus;
+  if (rds_stream_request->measure_cpu) {
+    rds_stream_results->cpu_util	= calc_cpu_util(elapsed_time);
+  }
+  else {
+    rds_stream_results->cpu_util	= (float) -1.0;
+  }
+  
+  if (debug > 1) {
+    fprintf(where,
+	    "recv_rds_stream: test complete, sending results.\n");
+    fflush(where);
+  }
+  
+  send_response();
+
+  close(s_data);
+
+}
+
+
  /* this routine implements the receive side (netserver) of a UDP_RR */
  /* test. */
 void
@@ -7172,6 +8007,7 @@ recv_udp_rr()
 
       }
 
+
 
  /* this routine implements the receive (netserver) side of a TCP_RR */
  /* test */
@@ -9411,9 +10247,9 @@ recv_tcp_tran_rr()
   /* multi-connection situation, but for now, we'll ignore the issue */
   /* and concentrate on single connection testing. */
   
-  if (bind(s_listen,
-	   (struct sockaddr *)&myaddr_in,
-	   sizeof(myaddr_in)) == SOCKET_ERROR) {
+  if (bind(send_socket,
+        (struct sockaddr *)myaddr,
+	        sizeof(struct sockaddr_in)) == SOCKET_ERROR) {
     netperf_response.content.serv_errno = errno;
     close(s_listen);
     send_response();
Index: netperf-2.4.2/src/nettest_bsd.h
===================================================================
--- netperf-2.4.2.orig/src/nettest_bsd.h
+++ netperf-2.4.2/src/nettest_bsd.h
@@ -284,7 +284,7 @@ struct	udp_stream_response_struct {
   int	so_sndavoid;	/* could the remote avoid send copies? */
 };
 
-struct	udp_stream_results_struct {
+struct	rds_stream_results_struct {
   unsigned int	messages_recvd;
   unsigned int	bytes_received;
   float	        elapsed_time;
@@ -293,6 +293,34 @@ struct	udp_stream_results_struct {
   int           num_cpus;      /* how many CPUs had the remote? */
 };
 
+struct	rds_stream_request_struct {
+  int	recv_buf_size;
+  int	message_size;
+  int	recv_alignment;
+  int	recv_offset;
+  int	checksum_off;
+  int	measure_cpu;
+  float	cpu_rate;
+  int	test_length;
+  int	so_rcvavoid;    /* do we want the remote to avoid receive */
+			/* copies? */ 
+  int	so_sndavoid;    /* do we want the remote to avoid send copies? */
+  int   port;           /* the port to which the recv side should bind
+			   to allow netperf to run through those evil
+			   firewall things */
+  int   ipfamily;
+};
+
+struct	rds_stream_response_struct {
+  int	recv_buf_size;
+  int	send_buf_size;
+  int	measure_cpu;
+  int	test_length;
+  int	data_port_number;
+  float	cpu_rate;
+  int	so_rcvavoid;	/* could the remote avoid receive copies? */
+  int	so_sndavoid;	/* could the remote avoid send copies? */
+};
 
 struct	udp_rr_request_struct {
   int	recv_buf_size;	/* how big does the client want it	*/
@@ -339,6 +367,16 @@ struct udp_rr_results_struct {
   int           num_cpus;      /* how many CPUs had the remote? */
 };
 
+
+struct	udp_stream_results_struct {
+  unsigned int	messages_recvd;
+  unsigned int	bytes_received;
+  float	        elapsed_time;
+  float	        cpu_util;
+  int           cpu_method;    /* how was cpu util measured? */
+  int           num_cpus;      /* how many CPUs had the remote? */
+};
+
 struct	tcp_cc_request_struct {
   int	recv_buf_size;	/* how big does the client want it	*/
   int	send_buf_size;
Index: netperf-2.4.2/src/rds/pfhack.c
===================================================================
--- /dev/null
+++ netperf-2.4.2/src/rds/pfhack.c
@@ -0,0 +1,78 @@
+/* -*- mode: c; c-basic-offset: 8; -*-
+ * vim: noexpandtab sw=8 ts=8 sts=0:
+ *
+ * pfhack.c - discover the RDS constants 
+ *
+ * PF_RDS and SOL_RDS should be assigned constants.  However, we don't have
+ * official values yet.  There is a hack to overload an existing PF_ value
+ * (21).  This dynamic code detects what the running kernel is using.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <limits.h>
+
+#include "kernel-list.h"
+#include "pfhack.h"
+#include "rdstool.h"
+
+#define PF_RDS_PATH	"/proc/sys/net/rds/pf_rds"
+#define SOL_RDS_PATH	"/proc/sys/net/rds/sol_rds"
+
+/* We don't allow any system that can't read pf_rds */
+static void explode(const char *reason)
+{
+	fprintf(stderr,
+	       	"%s: Unable to determine RDS constant: %s\n",
+	       	progname, reason);
+
+	exit(1);
+}
+
+static int discover_constant(const char *path, int official)
+{
+	int fd;
+	ssize_t ret, total = 0;
+	char buf[PATH_MAX];
+	char *ptr;
+	long val;
+
+	fd = open(path, O_RDONLY);
+	if (fd < 0)
+		explode("Can't open address constant");
+
+	while (total < sizeof(buf)) {
+		ret = read(fd, buf + total, sizeof(buf) - total);
+		if (ret > 0)
+			total += ret;
+		else
+			break;
+	}
+
+	close(fd);
+
+	if (ret < 0)
+		explode("Error reading address constant");
+
+	val = strtoul(buf, &ptr, 0);
+	if ((val > INT_MAX) || !ptr || (*ptr && (*ptr != '\n')))
+		explode("Invalid address constant");
+
+	return (int)val;
+}
+
+int discover_pf_rds()
+{
+	return discover_constant(PF_RDS_PATH, OFFICIAL_PF_RDS);
+}
+
+int discover_sol_rds()
+{
+	return discover_constant(SOL_RDS_PATH, OFFICIAL_SOL_RDS);
+}
Index: netperf-2.4.2/src/rds/pfhack.h
===================================================================
--- /dev/null
+++ netperf-2.4.2/src/rds/pfhack.h
@@ -0,0 +1,26 @@
+/* -*- mode: c; c-basic-offset: 8; -*-
+ * vim: noexpandtab sw=8 ts=8 sts=0:
+ *
+ * pfhack.h - discover the RDS constants 
+ *
+ * PF_RDS and SOL_RDS should be assigned constants.  However, we don't have
+ * official values yet.  There is a hack to overload an existing PF_ value
+ * (21).  This dynamic code detects what the running kernel is using.
+ */
+
+#ifndef __PF_HACK_H
+#define __PF_HACK_H
+
+#define OFFICIAL_PF_RDS		32
+#define OFFICIAL_SOL_RDS	272
+
+
+#ifdef DYNAMIC_PF_RDS
+extern int discover_pf_rds();
+extern int discover_sol_rds();
+
+#define AF_RDS discover_pf_rds()
+#define SOL_RDS discover_sol_rds()
+#endif  /* DYNAMIC_PF_RDS */
+
+#endif  /* __PF_HACK_H */
Index: netperf-2.4.2/configure.ac
===================================================================
--- netperf-2.4.2.orig/configure.ac
+++ netperf-2.4.2/configure.ac
@@ -259,6 +259,40 @@ then
 	AC_DEFINE([WANT_DLPI],,[Define to one to include DLPI tests.])
 fi
 
+# see if we should be including the RDS tests
+
+AC_MSG_CHECKING(whether to include RDS tests)
+
+AC_ARG_ENABLE(rds,
+	[AS_HELP_STRING([--enable-rds],[include RDS tests])])
+
+case "$enable_rds" in
+     yes)
+		use_rds=true
+		;;
+     no)	
+		use_rds=false
+		;;
+     '')
+		use_rds=false
+		;;
+     *)
+		AC_MSG_ERROR([--enable-rds takes yes or no])
+		;;
+esac
+
+if $use_rds
+then
+	AC_MSG_RESULT(yes)
+else
+	AC_MSG_RESULT(no)
+fi
+
+if $use_rds
+then
+	AC_DEFINE([WANT_RDS],,[Define to one to include RDS tests.])
+fi
+
 
 # see if we should be including the XTI tests
 

-- 
Vladimir Sokolovsky <vlad at dev.mellanox.co.il>
Mellanox Technologies Ltd.



More information about the rds-devel mailing list