// $Id$ #include "RT_Class.h" #include "ace/Task.h" #include "ace/SOCK_Acceptor.h" #include "ace/SOCK_Stream.h" #include "ace/Acceptor.h" #include "ace/Svc_Handler.h" #include "ace/Reactor.h" #include "ace/TP_Reactor.h" #include "ace/Get_Opt.h" ACE_RCSID (TAO_PERF_RTEC_TCP_Baseline, server, "$Id$") int use_rt = 0; int nthreads = 1; char const * hi_endpoint = "localhost:12345"; char const * lo_endpoint = "localhost:23456"; int parse_args (int argc, char *argv[]); class Task : public ACE_Task_Base { public: Task(char const * endpoint); virtual int svc(); private: char const * endpoint_; ACE_Reactor reactor_; }; int main (int argc, char *argv[]) { /// Move the test to the real-time class if it is possible. RT_Class rt_class; if (parse_args (argc, argv) != 0) return 1; Task hi_task(hi_endpoint); Task lo_task(lo_endpoint); hi_task.activate(rt_class.thr_sched_class() | THR_NEW_LWP | THR_JOINABLE, 1, 1, rt_class.priority_high()); lo_task.activate(rt_class.thr_sched_class() | THR_NEW_LWP | THR_JOINABLE, nthreads, 1, rt_class.priority_low()); lo_task.wait(); hi_task.wait(); return 0; } // **************************************************************** class Svc_Handler : public ACE_Svc_Handler { public: Svc_Handler(ACE_Reactor * reactor = 0); virtual int handle_input(ACE_HANDLE); virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask); }; typedef ACE_Acceptor Acceptor; Svc_Handler::Svc_Handler (ACE_Reactor * reactor) : ACE_Svc_Handler (0, 0, reactor) { ACE_DEBUG ((LM_DEBUG, "(%P|%t) Accepted connection\n")); } int Svc_Handler::handle_input(ACE_HANDLE h) { const size_t bufsize = BUFSIZ; char buf[bufsize]; while(1) { ssize_t n = this->peer().recv(buf, bufsize); if (n == 0) { ACE_ERROR((LM_ERROR, "Connection %d closed while reading data\n", h, "")); break; } else if (n == -1) { if(errno == EWOULDBLOCK) return 0; ACE_ERROR((LM_ERROR, "Error on <%d> while reading %p\n", h, "")); break; } ssize_t k = this->peer().send(buf, n); if(k == 0) { ACE_ERROR((LM_ERROR, "Connection <%d> closed while writing\n", h, "")); break; } else if (k == -1) { if(errno == EWOULDBLOCK) return 0; ACE_ERROR((LM_ERROR, "Error on <%d> while writing %p\n", h, "")); break; } else if (k != n) { ACE_ERROR((LM_ERROR, "Short write on <%d>\n", h, "")); break; } } return -1; } int Svc_Handler::handle_close(ACE_HANDLE h, ACE_Reactor_Mask m) { ACE_DEBUG ((LM_DEBUG, "(%P|%t) Closed connection\n")); this->peer().close(); int r = this->ACE_Svc_Handler::handle_close(h,m); return r; } // **************************************************************** Task::Task(char const * endpoint) : endpoint_(endpoint) , reactor_(new ACE_TP_Reactor) { ACE_INET_Addr local_sap(endpoint_); Acceptor * acceptor = new Acceptor; if(acceptor->open(local_sap, &reactor_, ACE_NONBLOCK) == -1) { ACE_ERROR((LM_ERROR, "Cannot open endpoint <%s>\n", endpoint_)); } } int Task::svc() { ACE_DEBUG ((LM_DEBUG, "(%P|%t) Starting thread for %s\n", endpoint_)); (void) reactor_.run_reactor_event_loop(); return 0; } // **************************************************************** int parse_args (int argc, char *argv[]) { ACE_Get_Opt get_opts (argc, argv, "h:l:n:r"); int c; while ((c = get_opts ()) != -1) switch (c) { case 'h': hi_endpoint = get_opts.opt_arg (); break; case 'l': lo_endpoint = get_opts.opt_arg (); break; case 'n': nthreads = ACE_OS::atoi (get_opts.opt_arg ()); break; case 'r': use_rt = 1; break; case '?': default: ACE_ERROR_RETURN ((LM_ERROR, "usage: %s " "-o " "-r (use RT-CORBA) " "-n nthreads " "\n", argv [0]), -1); } // Indicates sucessful parsing of the command line return 0; } #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) #elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */