summaryrefslogtreecommitdiff
path: root/ACE/apps/JAWS2/JAWS/IO_Handler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ACE/apps/JAWS2/JAWS/IO_Handler.cpp')
-rw-r--r--ACE/apps/JAWS2/JAWS/IO_Handler.cpp502
1 files changed, 502 insertions, 0 deletions
diff --git a/ACE/apps/JAWS2/JAWS/IO_Handler.cpp b/ACE/apps/JAWS2/JAWS/IO_Handler.cpp
new file mode 100644
index 00000000000..6de73e4fc63
--- /dev/null
+++ b/ACE/apps/JAWS2/JAWS/IO_Handler.cpp
@@ -0,0 +1,502 @@
+// $Id$
+
+#include "ace/Proactor.h"
+#include "ace/Filecache.h"
+#include "ace/OS_NS_unistd.h"
+#include "ace/OS_NS_sys_socket.h"
+
+#include "JAWS/Jaws_IO.h"
+#include "JAWS/IO_Handler.h"
+#include "JAWS/IO_Acceptor.h"
+#include "JAWS/Data_Block.h"
+#include "JAWS/Policy.h"
+#include "JAWS/Waiter.h"
+#include "JAWS/Filecache.h"
+
+
+
+JAWS_Abstract_IO_Handler::~JAWS_Abstract_IO_Handler (void)
+{
+}
+
+JAWS_IO_Handler_Factory::~JAWS_IO_Handler_Factory (void)
+{
+}
+
+JAWS_IO_Handler *
+JAWS_IO_Handler_Factory::create_io_handler (void)
+{
+ JAWS_TRACE ("JAWS_IO_Handler_Factory::create");
+
+ JAWS_IO_Handler *handler;
+ handler = new JAWS_IO_Handler (this);
+
+ return handler;
+}
+
+void
+JAWS_IO_Handler_Factory::destroy_io_handler (JAWS_IO_Handler *handler)
+{
+ JAWS_TRACE ("JAWS_IO_Handler_Factory::destroy");
+ if (handler != 0)
+ {
+ delete handler->message_block ();
+ delete handler;
+ }
+}
+
+JAWS_IO_Handler::JAWS_IO_Handler (JAWS_IO_Handler_Factory *factory)
+ : status_ (0),
+ mb_ (0),
+ handle_ (ACE_INVALID_HANDLE),
+ task_ (0),
+ factory_ (factory)
+{
+}
+
+JAWS_IO_Handler::~JAWS_IO_Handler (void)
+{
+ this->mb_ = 0;
+ this->status_ = 0;
+ this->task_ = 0;
+ this->factory_ = 0;
+
+ ACE_OS::closesocket (this->handle_);
+ this->handle_ = ACE_INVALID_HANDLE;
+}
+
+void
+JAWS_IO_Handler::accept_complete (ACE_HANDLE handle)
+{
+ // callback into pipeline task, notify that the accept has completed
+ this->handle_ = handle;
+ this->status_ |= ACCEPT_OK;
+ this->status_ &= (ACCEPT_OK+1);
+
+ JAWS_Dispatch_Policy *policy = this->mb_->policy ();
+
+ // Do this so that Thread Per Request can spawn a new thread
+ policy->concurrency ()->activate_hook ();
+}
+
+void
+JAWS_IO_Handler::accept_error (void)
+{
+ // callback into pipeline task, notify that the accept has failed
+ this->status_ |= ACCEPT_ERROR;
+ this->status_ &= (ACCEPT_ERROR+1);
+}
+
+void
+JAWS_IO_Handler::read_complete (ACE_Message_Block *data)
+{
+ ACE_UNUSED_ARG (data);
+ // We can call back into the pipeline task at this point
+ // this->pipeline_->read_complete (data);
+ this->status_ |= READ_OK;
+ this->status_ &= (READ_OK+1);
+}
+
+void
+JAWS_IO_Handler::read_error (void)
+{
+ // this->pipeline_->read_error ();
+ this->status_ |= READ_ERROR;
+ this->status_ &= (READ_ERROR+1);
+}
+
+void
+JAWS_IO_Handler::transmit_file_complete (void)
+{
+ JAWS_TRACE ("JAWS_IO_Handler::transmit_file_complete");
+ // this->pipeline_->transmit_file_complete ();
+ this->status_ |= TRANSMIT_OK;
+ this->status_ &= (TRANSMIT_OK+1);
+}
+
+void
+JAWS_IO_Handler::transmit_file_error (int result)
+{
+ JAWS_TRACE ("JAWS_IO_Handler::transmit_file_error");
+ ACE_UNUSED_ARG (result);
+ // this->pipeline_->transmit_file_complete (result);
+ this->status_ |= TRANSMIT_ERROR;
+ this->status_ &= (TRANSMIT_ERROR+1);
+}
+
+void
+JAWS_IO_Handler::receive_file_complete (void)
+{
+ this->status_ |= RECEIVE_OK;
+ this->status_ &= (RECEIVE_OK+1);
+}
+
+void
+JAWS_IO_Handler::receive_file_error (int result)
+{
+ ACE_UNUSED_ARG(result);
+ this->status_ |= RECEIVE_ERROR;
+ this->status_ &= (RECEIVE_ERROR+1);
+}
+
+void
+JAWS_IO_Handler::write_error (void)
+{
+ ACE_DEBUG ((LM_DEBUG, " (%t) error in writing response\n"));
+
+ this->status_ |= WRITE_ERROR;
+ this->status_ &= (WRITE_ERROR+1);
+ this->done ();
+}
+
+void
+JAWS_IO_Handler::confirmation_message_complete (void)
+{
+ this->status_ |= WRITE_OK;
+ this->status_ &= (WRITE_OK+1);
+}
+
+void
+JAWS_IO_Handler::error_message_complete (void)
+{
+ this->status_ |= WRITE_OK;
+ this->status_ &= (WRITE_OK+1);
+}
+
+JAWS_IO_Handler_Factory *
+JAWS_IO_Handler::factory (void)
+{
+ return this->factory_;
+}
+
+ACE_HANDLE
+JAWS_IO_Handler::handle (void) const
+{
+ return this->handle_;
+}
+
+void
+JAWS_IO_Handler::task (JAWS_Pipeline_Handler *ph)
+{
+ this->task_ = ph;
+}
+
+JAWS_Pipeline_Handler *
+JAWS_IO_Handler::task (void)
+{
+ return this->task_;
+}
+
+void
+JAWS_IO_Handler::message_block (JAWS_Data_Block *mb)
+{
+ this->mb_ = mb;
+}
+
+JAWS_Data_Block *
+JAWS_IO_Handler::message_block (void)
+{
+ return this->mb_;
+}
+
+void
+JAWS_IO_Handler::done (void)
+{
+ this->factory ()->destroy_io_handler (this);
+}
+
+int
+JAWS_IO_Handler::status (void)
+{
+ return this->status_;
+}
+
+void
+JAWS_IO_Handler::idle (void)
+{
+ this->status_ &= (IDLE+1);
+}
+
+void
+JAWS_IO_Handler::acquire (void)
+{
+}
+
+void
+JAWS_IO_Handler::lock (void)
+{
+}
+
+void
+JAWS_IO_Handler::release (void)
+{
+}
+
+#if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
+
+JAWS_Asynch_IO_Handler_Factory::~JAWS_Asynch_IO_Handler_Factory (void)
+{
+}
+
+JAWS_IO_Handler *
+JAWS_Asynch_IO_Handler_Factory::create_io_handler (void)
+{
+ JAWS_TRACE ("JAWS_Asynch_IO_Handler_Factory::create");
+
+ JAWS_Asynch_IO_Handler *handler = 0;
+ handler = new JAWS_Asynch_IO_Handler (this);
+
+ return handler;
+}
+
+void
+JAWS_Asynch_IO_Handler_Factory::destroy_io_handler (JAWS_IO_Handler *handler)
+{
+ JAWS_TRACE ("JAWS_IO_Handler_Factory::destroy");
+
+ if (handler != 0)
+ {
+ //cerr << "(" << thr_self () << ") locking for destruction: " << handler << endl;
+ handler->lock ();
+ delete handler->message_block ();
+ handler->message_block (0);
+ delete handler;
+ }
+}
+
+
+JAWS_Asynch_IO_Handler::JAWS_Asynch_IO_Handler (JAWS_Asynch_IO_Handler_Factory *factory)
+ : JAWS_IO_Handler (factory),
+ handler_ (0)
+{
+ this->status_ = 1;
+}
+
+JAWS_Asynch_IO_Handler::~JAWS_Asynch_IO_Handler (void)
+{
+ delete this->handler_;
+ this->handler_ = 0;
+}
+
+ACE_Handler *
+JAWS_Asynch_IO_Handler::handler (void)
+{
+ return this->handler_;
+}
+
+void
+JAWS_Asynch_IO_Handler::acquire (void)
+{
+ //cerr << "(" << thr_self () << ") acquire handler: " << this << endl;
+ this->count_.acquire_read ();
+}
+
+void
+JAWS_Asynch_IO_Handler::lock (void)
+{
+ //cerr << "(" << thr_self () << ") locking handler: " << this << endl;
+ this->count_.acquire_write ();
+}
+
+void
+JAWS_Asynch_IO_Handler::release (void)
+{
+ //cerr << "(" << thr_self () << ") release handler: " << this << endl;
+ this->count_.release ();
+}
+
+JAWS_Asynch_Handler::JAWS_Asynch_Handler (void)
+ : ioh_ (0)
+{
+ this->proactor (ACE_Proactor::instance ());
+}
+
+JAWS_Asynch_Handler::~JAWS_Asynch_Handler (void)
+{
+}
+
+void
+JAWS_Asynch_Handler::open (ACE_HANDLE h,
+ ACE_Message_Block &mb)
+{
+ JAWS_TRACE ("JAWS_Asynch_Handler::open");
+
+ // This currently does nothing, but just in case.
+ ACE_Service_Handler::open (h, mb);
+
+ // ioh_ set from the ACT hopefully
+ //this->dispatch_handler ();
+
+#if !defined (ACE_WIN32)
+ // Assume at this point there is no data.
+ mb.rd_ptr (mb.wr_ptr ());
+ mb.crunch ();
+#else
+ // AcceptEx reads some initial data from the socket.
+ this->handler ()->message_block ()->copy (mb.rd_ptr (), mb.length ());
+#endif
+
+ ACE_Asynch_Accept_Result_Impl *fake_result
+ = ACE_Proactor::instance ()->create_asynch_accept_result
+ (this->proxy (), JAWS_IO_Asynch_Acceptor_Singleton::instance ()->get_handle (),
+ h, mb, JAWS_Data_Block::JAWS_DATA_BLOCK_SIZE,
+ this->ioh_, ACE_INVALID_HANDLE, 0);
+
+ this->handler ()->handler_ = this;
+
+ fake_result->complete (0, 1, 0);
+}
+
+void
+JAWS_Asynch_Handler::act (const void *act_ref)
+{
+ JAWS_TRACE ("JAWS_Asynch_Handler::act");
+
+ // Set the ioh from the act
+ this->ioh_ = (JAWS_Asynch_IO_Handler *) act_ref;
+}
+
+#if 0
+ACE_HANDLE
+JAWS_Asynch_Handler::handle (void) const
+{
+ return this->ioh_->handle ();
+}
+#endif
+
+void
+JAWS_Asynch_Handler::dispatch_handler (void)
+{
+#if 0
+ // A future version of ACE will support this.
+ ACE_Thread_ID tid = ACE_OS::thr_self ();
+#else
+ // Do it this way for now
+ ACE_thread_t thr_name;
+ thr_name = ACE_OS::thr_self ();
+
+ JAWS_Thread_ID tid (thr_name);
+#endif /* 0 */
+
+ JAWS_IO_Handler **iohref = JAWS_Waiter_Singleton::instance ()->find (tid);
+
+ *iohref = this->handler ();
+}
+
+void
+JAWS_Asynch_Handler::handle_read_stream (const ACE_Asynch_Read_Stream::Result
+ &result)
+{
+ JAWS_TRACE ("JAWS_Asynch_Handler::handle_read_stream");
+
+ this->dispatch_handler ();
+
+ if (result.act () != 0)
+ {
+ // This callback is for io->receive_file()
+ JAWS_TRACE ("JAWS_Asynch_Handler::handle_read_stream (recv_file)");
+
+ int code = 0;
+ if (result.success () && result.bytes_transferred () != 0)
+ {
+ if (result.message_block ().length ()
+ == result.message_block ().size ())
+ code = ACE_Filecache_Handle::ACE_SUCCESS;
+ else
+ {
+ ACE_Asynch_Read_Stream ar;
+ if (ar.open (*this, this->handler ()->handle ()) == -1
+ || ar.read (result.message_block (),
+ result.message_block ().size ()
+ - result.message_block ().length (),
+ result.act ()) == -1)
+ code = -1;
+ else
+ return;
+ }
+ }
+ else
+ code = -1;
+
+ if (code == ACE_Filecache_Handle::ACE_SUCCESS)
+ this->handler ()->receive_file_complete ();
+ else
+ this->handler ()->receive_file_error (code);
+
+ result.message_block ().release ();
+ delete (ACE_Filecache_Handle *) result.act ();
+ }
+ else
+ {
+ // This callback is for this->read()
+ JAWS_TRACE ("JAWS_Asynch_Handler::handle_read_stream (read)");
+
+ if (result.success ()
+ && result.bytes_transferred () != 0)
+ this->handler ()->read_complete (&result.message_block ());
+ else
+ this->handler ()->read_error ();
+ }
+}
+
+void
+JAWS_Asynch_Handler::handle_write_stream (const ACE_Asynch_Write_Stream::Result
+ &result)
+{
+ this->dispatch_handler ();
+
+ result.message_block ().release ();
+
+ if (result.act () == (void *) JAWS_Asynch_IO::CONFIRMATION)
+ this->handler ()->confirmation_message_complete ();
+ else
+ this->handler ()->error_message_complete ();
+}
+
+void
+JAWS_Asynch_Handler::handle_transmit_file (const
+ ACE_Asynch_Transmit_File::Result
+ &result)
+{
+ this->dispatch_handler ();
+
+ if (result.success ())
+ this->handler ()->transmit_file_complete ();
+ else
+ this->handler ()->transmit_file_error (-1);
+
+ result.header_and_trailer ()->header ()->release ();
+ result.header_and_trailer ()->trailer ()->release ();
+ delete result.header_and_trailer ();
+ delete (JAWS_Cached_FILE *) result.act ();
+}
+
+void
+JAWS_Asynch_Handler::handle_accept (const ACE_Asynch_Accept::Result &result)
+{
+ JAWS_TRACE ("JAWS_Asynch_Handler::handle_accept");
+ this->dispatch_handler ();
+
+ if (result.success ())
+ {
+ JAWS_TRACE ("JAWS_Asynch_Handler::handle_accept, success");
+ this->handler ()->accept_complete (result.accept_handle ());
+ }
+ else
+ this->handler ()->accept_error ();
+
+}
+
+void
+JAWS_Asynch_Handler::handler (JAWS_Asynch_IO_Handler *ioh)
+{
+ this->ioh_ = ioh;
+}
+
+JAWS_Asynch_IO_Handler *
+JAWS_Asynch_Handler::handler (void)
+{
+ return this->ioh_;
+}
+
+#endif /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS */