summaryrefslogtreecommitdiff
path: root/ACE/protocols/tests/HTBP/Reactor_Tests/server.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ACE/protocols/tests/HTBP/Reactor_Tests/server.cpp')
-rw-r--r--ACE/protocols/tests/HTBP/Reactor_Tests/server.cpp251
1 files changed, 251 insertions, 0 deletions
diff --git a/ACE/protocols/tests/HTBP/Reactor_Tests/server.cpp b/ACE/protocols/tests/HTBP/Reactor_Tests/server.cpp
new file mode 100644
index 00000000000..7bc3d8ac0ee
--- /dev/null
+++ b/ACE/protocols/tests/HTBP/Reactor_Tests/server.cpp
@@ -0,0 +1,251 @@
+/**
+ * server for a reactor based connection establishment test using HTBP
+ *
+ * $Id$
+ */
+
+#include "ace/Log_Msg.h"
+
+#include "ace/HTBP/HTBP_Session.h"
+#include "ace/HTBP/HTBP_Stream.h"
+#include "ace/HTBP/HTBP_Addr.h"
+
+#include "ace/SOCK_Acceptor.h"
+#include "ace/SOCK_Stream.h"
+#include "ace/Event_Handler.h"
+#include "ace/Reactor.h"
+#include "ace/Get_Opt.h"
+#include "ace/OS_NS_stdio.h"
+#include "ace/OS_NS_unistd.h"
+
+unsigned port = 8088;
+const ACE_TCHAR *notifier_file = 0;
+
+int
+parse_args (int argc, ACE_TCHAR *argv[])
+{
+ ACE_Get_Opt get_opts (argc, argv, ACE_TEXT("o:p:"));
+ int c;
+
+ while ((c = get_opts ()) != -1)
+ switch (c)
+ {
+ case 'o':
+ notifier_file = get_opts.opt_arg();
+ break;
+
+ case 'p':
+ port = static_cast<unsigned>(ACE_OS::atoi (get_opts.opt_arg()));
+ break;
+ case '?':
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "usage: %s "
+ "-p port "
+ "\n",
+ argv [0]),
+ -1);
+ }
+ // Indicates sucessful parsing of the command line
+ return 0;
+}
+
+class Accept_Handler : public ACE_Event_Handler
+{
+public:
+ Accept_Handler (ACE_SOCK_Acceptor& a);
+ virtual ~Accept_Handler (void);
+ virtual int handle_input (ACE_HANDLE );
+private:
+ ACE_SOCK_Acceptor& acceptor_;
+ ACE::HTBP::Channel *channels_[2];
+};
+
+class Stream_Handler : public ACE_Event_Handler
+{
+public:
+ Stream_Handler (ACE::HTBP::Stream &s);
+ virtual ~Stream_Handler ();
+ virtual int handle_input (ACE_HANDLE );
+private:
+ ACE::HTBP::Stream &stream_;
+};
+
+
+Accept_Handler::Accept_Handler(ACE_SOCK_Acceptor &a)
+ :ACE_Event_Handler(),
+ acceptor_(a)
+{
+ this->channels_[0] = this->channels_[1] = 0;
+ if (this->reactor() == 0)
+ this->reactor(ACE_Reactor::instance());
+ this->reactor()->register_handler (acceptor_.get_handle(),
+ this,
+ ACE_Event_Handler::ACCEPT_MASK);
+}
+
+Accept_Handler::~Accept_Handler()
+{
+ this->reactor()->remove_handler (acceptor_.get_handle(),
+ ACE_Event_Handler::ACCEPT_MASK|
+ ACE_Event_Handler::DONT_CALL);
+ acceptor_.close();
+}
+
+int
+Accept_Handler::handle_input (ACE_HANDLE h)
+{
+ ACE::HTBP::Channel **ch = 0;
+ if (h == acceptor_.get_handle())
+ {
+ ACE_SOCK_Stream *sock = new ACE_SOCK_Stream;
+ acceptor_.accept(*sock);
+ ch = channels_[0] == 0 ? &channels_[0] :& channels_[1];
+ *ch = new ACE::HTBP::Channel(*sock);
+ this->reactor()->register_handler (sock->get_handle(),
+ this,
+ ACE_Event_Handler::READ_MASK);
+ return 0;
+ }
+ for (int i = 0; i < 2; i++)
+ if (channels_[i] && channels_[i]->get_handle() == h)
+ {
+ ch = &channels_[i];
+ break;
+ }
+ if (ch == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("(%P|%t) Server Accept_Handler::handle_input, ")
+ ACE_TEXT ("unknown handle %d\n") ,h),
+ -1);
+ int result = (*ch)->pre_recv();
+ if (result == 0)
+ {
+ this->reactor()->remove_handler (h,
+ ACE_Event_Handler::READ_MASK |
+ ACE_Event_Handler::DONT_CALL);
+
+ (*ch)->register_notifier(this->reactor());
+ ACE::HTBP::Session *session = (*ch)->session();
+
+ ACE::HTBP::Stream *stream = new ACE::HTBP::Stream(session);
+ ACE_Event_Handler *handler = session->handler();
+
+ if (handler == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Server Accept_Handler::handle_input ")
+ ACE_TEXT ("Creating new stream handler for %d\n"),
+ stream->get_handle()));
+ Stream_Handler *sh = new Stream_Handler(*stream);
+ session->handler (sh);
+ }
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Server Accept_Handler::handle_input ")
+ ACE_TEXT ("There is already a handler for %d\n"),
+ stream->get_handle()));
+
+ if ((*ch)->state() == ACE::HTBP::Channel::Data_Queued)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Server Accept_Handler::handle_input \n"),
+ ACE_TEXT ("Issuing notification on handler\n")));
+ this->reactor()->notify (session->handler(),
+ ACE_Event_Handler::READ_MASK);
+ }
+
+ *ch = 0;
+ }
+ return 0;
+}
+
+Stream_Handler::Stream_Handler (ACE::HTBP::Stream &s)
+ :stream_(s)
+{}
+Stream_Handler::~Stream_Handler ()
+{
+}
+
+int
+Stream_Handler::handle_input (ACE_HANDLE h)
+{
+ char buffer[1000];
+ ssize_t n = this->stream_.recv (buffer,1000);
+ if (n == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("(%P|%t) Server Stream_Handler::handle_input %p\n"),
+ ACE_TEXT ("recv")),
+ 0);
+ buffer[n] = 0;
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Server Stream_Handler::handle_input ")
+ ACE_TEXT (" (%d) read %b:\n%C\n"),
+ h, n, buffer));
+
+ const char *tok_loc = ACE_OS::strstr (buffer, "goodbye");
+ if (tok_loc != 0)
+ this->reactor()->end_event_loop();
+ else
+ {
+ ACE::HTBP::Channel *ch = stream_.session()->outbound();
+ if (ch != 0)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Server Stream_Handler::handle_input ")
+ ACE_TEXT ("Sending reply on %d\n"),
+ ch->ace_stream().get_handle()));
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Server Stream_Handler::handle_input ")
+ ACE_TEXT ("Can't send reply on nul channel\n")));
+ this->stream_.send ("Back atcha!",11);
+ }
+ return 0;
+}
+
+int
+ACE_TMAIN (int argc, ACE_TCHAR * argv[])
+{
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Server: ")
+ ACE_TEXT ("At start of main\n")));
+ ACE_OS::socket_init (ACE_WSOCK_VERSION);
+
+ if (parse_args (argc, argv) != 0)
+ return 1;
+
+ ACE_TCHAR host[MAXHOSTNAMELEN+1];
+ if (ACE_OS::hostname (host, MAXHOSTNAMELEN) != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("(%P|%t) Server failure: %p\n"),
+ ACE_TEXT ("hostname")),
+ 1);
+
+ ACE_INET_Addr local (port, host);
+ local.addr_to_string (host, MAXHOSTNAMELEN);
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Server: ")
+ ACE_TEXT ("listening at %s\n"),
+ host));
+
+ ACE_SOCK_Acceptor acc (local, 1);
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Server: ")
+ ACE_TEXT ("opened listener\n")));
+
+ Accept_Handler handler (acc);
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Server: ")
+ ACE_TEXT ("server is ready\n")));
+
+ if (notifier_file != 0)
+ {
+ FILE *f = ACE_OS::fopen (notifier_file,ACE_TEXT("w+"));
+ const char *msg = "server ready";
+ ACE_OS::fwrite (msg,ACE_OS::strlen(msg),1,f);
+ ACE_OS::fclose (f);
+ }
+
+ ACE_Reactor::instance()->run_reactor_event_loop();
+ return 0;
+}