diff options
author | jxh <jxh@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-08-05 14:55:57 +0000 |
---|---|---|
committer | jxh <jxh@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-08-05 14:55:57 +0000 |
commit | a7f4d3d641a459db1dac1a3bce213719cfba3972 (patch) | |
tree | 4cce468cdb12c30caf6b4b9fa4e342ce4c8edd81 /apps/JAWS | |
parent | d492f109fbc687dbf6c6ade43b44217771a1615a (diff) | |
download | ATCD-a7f4d3d641a459db1dac1a3bce213719cfba3972.tar.gz |
Substantial changes, attempts to reduce memory leaks.
Diffstat (limited to 'apps/JAWS')
-rw-r--r-- | apps/JAWS/PROTOTYPE/JAWS/Concurrency.cpp | 81 | ||||
-rw-r--r-- | apps/JAWS/PROTOTYPE/JAWS/Data_Block.cpp | 19 | ||||
-rw-r--r-- | apps/JAWS/PROTOTYPE/JAWS/Data_Block.h | 1 | ||||
-rw-r--r-- | apps/JAWS/PROTOTYPE/JAWS/IO.cpp | 43 | ||||
-rw-r--r-- | apps/JAWS/PROTOTYPE/JAWS/IO_Handler.cpp | 138 | ||||
-rw-r--r-- | apps/JAWS/PROTOTYPE/JAWS/IO_Handler.h | 154 | ||||
-rw-r--r-- | apps/JAWS/PROTOTYPE/JAWS/Pipeline_Tasks.cpp | 79 | ||||
-rw-r--r-- | apps/JAWS/PROTOTYPE/JAWS/Pipeline_Tasks.h | 9 | ||||
-rw-r--r-- | apps/JAWS/PROTOTYPE/JAWS/Reaper.cpp | 3 |
9 files changed, 219 insertions, 308 deletions
diff --git a/apps/JAWS/PROTOTYPE/JAWS/Concurrency.cpp b/apps/JAWS/PROTOTYPE/JAWS/Concurrency.cpp index e26432da815..3d27b0cf0ae 100644 --- a/apps/JAWS/PROTOTYPE/JAWS/Concurrency.cpp +++ b/apps/JAWS/PROTOTYPE/JAWS/Concurrency.cpp @@ -103,86 +103,61 @@ JAWS_Concurrency_Base::svc_hook (JAWS_Data_Block *db) JAWS_Dispatch_Policy *policy; // Contains task policies JAWS_IO_Handler *handler; // Keeps the state of the task JAWS_Pipeline_Handler *task; // The task itself + JAWS_Data_Block *mb; // The task message block // Thread specific message block and data block - JAWS_Data_Block *ts_db = new JAWS_Data_Block; + JAWS_Data_Block *ts_db = new JAWS_Data_Block (*db); if (ts_db == 0) { ACE_ERROR ((LM_ERROR, "%p\n", "JAWS_Concurrency_Base::svc_hook")); return -1; } - ts_db->task (db->task ()); - ts_db->policy (db->policy ()); - - // ACE_DEBUG ((LM_DEBUG, "yo")); - policy = db->policy (); - - // Each time we iterate, we create a handler to maintain - // our state for us. - JAWS_IO_Handler *ts_handler; - ts_handler = policy->ioh_factory ()->create_io_handler (); - if (ts_handler == 0) - { - ACE_DEBUG ((LM_DEBUG, "JAWS_Server::open, can't create handler\n")); - ts_db->release (); - return -1; - } - handler = ts_handler; - - // Set the initial task in the handler - handler->task (db->task ()); - handler->message_block (ts_db); - ts_db->io_handler (handler); + task = db->task (); // Get the waiter index JAWS_Waiter *waiter = JAWS_Waiter_Singleton::instance (); int waiter_index = waiter->index (); + mb = ts_db; do { JAWS_TRACE ("JAWS_Concurrency_Base::svc_hook, looping"); - // handler maintains the state of the protocol - task = handler->task (); - ts_db = handler->message_block (); - // Use a NULL task to make the thread recycle now if (task == 0) { JAWS_TRACE ("JAWS_Concurrency_Base::svc_hook, recycling"); - if (handler != ts_handler) - policy->ioh_factory ()->destroy_io_handler (handler); break; } // the task should set the handler to the appropriate next step - result = task->put (ts_db); + result = task->put (mb); - if (result == 1 || result == 2) + switch (result) { + case 0: + { + JAWS_TRACE ("JAWS_Concurrency_Base::svc_hook, synched"); + handler = mb->io_handler (); + } + break; + case 1: + case 2: { JAWS_TRACE ("JAWS_Concurrency_Base::svc_hook, waiting"); // need to wait for an asynchronous event - // In the case of asynchronous accepts, the handler we just - // created was useless. This is ok, because we know that it - // will not be used by another thread. Just save the - // reference so that it can be destroyed later. - - // This means we need a way to destroy all the handlers - // created by the Asynch_Acceptor. Figure this out later. - - JAWS_IO_Handler *h; - h = waiter->wait_for_completion (waiter_index); - if (h == 0) - result = -1; - else - { - handler = h; - result = 0; - } + // We need a way to destroy all the handlers created by the + // Asynch_Acceptor. Figure this out later. + + handler = waiter->wait_for_completion (waiter_index); + result = (handler == 0) ? -1 : 0; } + break; + default: + break; + } if (result == -1) { @@ -192,11 +167,17 @@ JAWS_Concurrency_Base::svc_hook (JAWS_Data_Block *db) break; } + mb = handler->message_block (); + task = handler->task (); } while (result == 0); - policy->ioh_factory ()->destroy_io_handler (ts_handler); - delete ts_db; + if (handler != 0) + { + handler->message_block ()->release (); + handler->factory ()->destroy_io_handler (handler); + } + ts_db->release (); return result; } diff --git a/apps/JAWS/PROTOTYPE/JAWS/Data_Block.cpp b/apps/JAWS/PROTOTYPE/JAWS/Data_Block.cpp index 15d8008e73e..cf61497b3d4 100644 --- a/apps/JAWS/PROTOTYPE/JAWS/Data_Block.cpp +++ b/apps/JAWS/PROTOTYPE/JAWS/Data_Block.cpp @@ -6,7 +6,11 @@ ACE_RCSID(JAWS, Data_Block, "$Id$") JAWS_Data_Block::JAWS_Data_Block (void) - : ACE_Message_Block (JAWS_DATA_BLOCK_SIZE), + : ACE_Message_Block (JAWS_DATA_BLOCK_SIZE +#if defined (ACE_WIN32) || defined (ACE_HAS_AIO_CALLS) + + 2 * (sizeof (sockaddr_in) + sizeof (sockaddr)) +#endif /* ACE_WIN32 || ACE_HAS_AIO_CALLS */ + ), io_handler_ (0), policy_ (0), task_ (0), @@ -14,6 +18,19 @@ JAWS_Data_Block::JAWS_Data_Block (void) { } +JAWS_Data_Block::JAWS_Data_Block (JAWS_Data_Block &db) + : ACE_Message_Block (JAWS_DATA_BLOCK_SIZE +#if defined (ACE_WIN32) || defined (ACE_HAS_AIO_CALLS) + + 2 * (sizeof (sockaddr_in) + sizeof (sockaddr)) +#endif /* ACE_WIN32 || ACE_HAS_AIO_CALLS */ + ), + io_handler_ (db.io_handler_), + policy_ (db.policy_), + task_ (db.task_), + payload_ (db.payload_) +{ +} + JAWS_Data_Block::~JAWS_Data_Block (void) { } diff --git a/apps/JAWS/PROTOTYPE/JAWS/Data_Block.h b/apps/JAWS/PROTOTYPE/JAWS/Data_Block.h index 9bc693d9cd0..8d05ee8003e 100644 --- a/apps/JAWS/PROTOTYPE/JAWS/Data_Block.h +++ b/apps/JAWS/PROTOTYPE/JAWS/Data_Block.h @@ -20,6 +20,7 @@ class JAWS_Export JAWS_Data_Block : public ACE_Message_Block { public: JAWS_Data_Block (void); + JAWS_Data_Block (JAWS_Data_Block &db); ~JAWS_Data_Block (void); JAWS_Pipeline_Handler *task (void); diff --git a/apps/JAWS/PROTOTYPE/JAWS/IO.cpp b/apps/JAWS/PROTOTYPE/JAWS/IO.cpp index c3295ddeae1..fd35069fab7 100644 --- a/apps/JAWS/PROTOTYPE/JAWS/IO.cpp +++ b/apps/JAWS/PROTOTYPE/JAWS/IO.cpp @@ -60,7 +60,8 @@ JAWS_Synch_IO::JAWS_Synch_IO (void) JAWS_Synch_IO::~JAWS_Synch_IO (void) { - ACE_OS::closesocket (this->handle_); + if (this->handle_ != ACE_INVALID_HANDLE) + ACE_OS::closesocket (this->handle_); } void @@ -237,49 +238,21 @@ JAWS_Asynch_IO::accept (JAWS_IO_Handler *ioh, ACE_Message_Block *, unsigned int) { - // Create our own handler and message block - JAWS_Data_Block *ndb = new JAWS_Data_Block; - if (ndb == 0) - { - ioh->accept_error (); - return; - } + JAWS_TRACE ("JAWS_Asynch_IO::accept"); JAWS_Data_Block *db = ioh->message_block (); - JAWS_IO_Handler *nioh = - ioh->factory ()->create_io_handler (); - if (nioh == 0) - { - delete ndb; - ioh->accept_error (); - return; - } - - ndb->task (db->task ()); - ndb->policy (db->policy ()); - ndb->io_handler (nioh); - nioh->task (db->task ()); - nioh->message_block (ndb); + ACE_HANDLE listen_handle = db->policy ()->acceptor ()->get_handle (); JAWS_Asynch_IO_Handler *aioh = - ACE_dynamic_cast (JAWS_Asynch_IO_Handler *, nioh); - - ACE_HANDLE listen_handle = db->policy ()->acceptor ()->get_handle (); + ACE_dynamic_cast (JAWS_Asynch_IO_Handler *, ioh); - aioh->accept_called_already (1); ACE_Asynch_Accept aa; - size_t address_size = sizeof (sockaddr_in) + sizeof (sockaddr); - size_t bytes_to_read = - JAWS_Data_Block::JAWS_DATA_BLOCK_SIZE - (2 * address_size); + size_t bytes_to_read = JAWS_Data_Block::JAWS_DATA_BLOCK_SIZE; if (aa.open (*(aioh->handler ()), listen_handle) == -1 - || aa.accept (*ndb, bytes_to_read) == -1) - { - ioh->factory ()->destroy_io_handler (nioh); - delete ndb; - ioh->accept_error (); - } + || aa.accept (*db, bytes_to_read) == -1) + ioh->accept_error (); } void diff --git a/apps/JAWS/PROTOTYPE/JAWS/IO_Handler.cpp b/apps/JAWS/PROTOTYPE/JAWS/IO_Handler.cpp index 54df47dfbc2..af6d2390e53 100644 --- a/apps/JAWS/PROTOTYPE/JAWS/IO_Handler.cpp +++ b/apps/JAWS/PROTOTYPE/JAWS/IO_Handler.cpp @@ -21,14 +21,35 @@ JAWS_IO_Handler_Factory::~JAWS_IO_Handler_Factory (void) { } +JAWS_IO_Handler * +JAWS_IO_Handler_Factory::create_io_handler (void) +{ + JAWS_TRACE ("JAWS_IO_Handler_Factory::create"); + + JAWS_Asynch_IO_Handler *handler; + handler = new JAWS_Asynch_IO_Handler (this); + + return handler; +} + +void +JAWS_IO_Handler_Factory::destroy_io_handler (JAWS_IO_Handler *handler) +{ + JAWS_TRACE ("JAWS_IO_Handler_Factory::destroy"); + if (handler != 0) + delete handler; +} + JAWS_IO_Handler::JAWS_IO_Handler (JAWS_IO_Handler_Factory *factory) : status_ (0), mb_ (0), handle_ (ACE_INVALID_HANDLE), task_ (0), factory_ (factory) +#if defined (ACE_WIN32) || defined (ACE_HAS_AIO_CALLS) + , handler_ (this) +#endif /* defined (ACE_WIN32) || defined (ACE_HAS_AIO_CALLS) */ { - // this->io_->handler (this); } JAWS_IO_Handler::~JAWS_IO_Handler (void) @@ -175,42 +196,14 @@ JAWS_IO_Handler::status (void) return this->status_; } -JAWS_Synch_IO_Handler::JAWS_Synch_IO_Handler (JAWS_IO_Handler_Factory *factory) - : JAWS_IO_Handler (factory) -{ - // this->io_->handler (this); -} - -JAWS_Synch_IO_Handler::~JAWS_Synch_IO_Handler (void) -{ -} - -JAWS_IO_Handler * -JAWS_Synch_IO_Handler_Factory::create_io_handler (void) -{ - JAWS_TRACE ("JAWS_Synch_IO_Handler_Factory::create"); - - JAWS_Synch_IO *io; - JAWS_Synch_IO_Handler *handler; +#if defined (ACE_WIN32) || defined (ACE_HAS_AIO_CALLS) - io = new JAWS_Synch_IO; - if (io == 0) return 0; - - handler = new JAWS_Synch_IO_Handler (this); - if (handler == 0) delete io; - - return handler; -} - -void -JAWS_Synch_IO_Handler_Factory::destroy_io_handler (JAWS_IO_Handler *handler) +ACE_Handler * +JAWS_IO_Handler::handler (void) { - JAWS_TRACE ("JAWS_Synch_IO_Handler_Factory::destroy"); - delete handler; + return &this->handler_; } -#if defined (ACE_WIN32) - JAWS_Asynch_Handler::JAWS_Asynch_Handler (JAWS_IO_Handler *ioh) : ioh_ (ioh) { @@ -349,90 +342,13 @@ JAWS_Asynch_Handler::handler (void) return this->ioh_; } -JAWS_Asynch_IO_Handler::JAWS_Asynch_IO_Handler (JAWS_IO_Handler_Factory - *factory) - : JAWS_IO_Handler (factory), - handler_ (this) -{ - // this->io_->handler (this); -} - -JAWS_Asynch_IO_Handler::~JAWS_Asynch_IO_Handler (void) -{ -} - -void -JAWS_Asynch_IO_Handler::accept_complete (ACE_HANDLE handle) -{ - // callback into pipeline task, notify that the accept has completed - this->handle_ = handle; - this->status_ = ACCEPT_OK; - - JAWS_Dispatch_Policy *policy = this->mb_->policy (); - - // Irfan says at this point issue another accept - JAWS_Asynch_IO_Singleton::instance ()->accept (this); - - // Do this so that Thread Per Request can spawn a new thread - policy->concurrency ()->activate_hook (); -} - -ACE_Handler * -JAWS_Asynch_IO_Handler::handler (void) -{ - return &this->handler_; -} - -void -JAWS_Asynch_IO_Handler::accept_called_already (int called) -{ - this->accept_called_already_ = called; -} - -int -JAWS_Asynch_IO_Handler::accept_called_already (void) -{ - return this->accept_called_already_; -} - - -JAWS_IO_Handler * -JAWS_Asynch_IO_Handler_Factory::create_io_handler (void) -{ - JAWS_TRACE ("JAWS_Asynch_IO_Handler_Factory::create"); - - JAWS_Asynch_IO *io; - JAWS_Asynch_IO_Handler *handler; - - io = new JAWS_Asynch_IO; - if (io == 0) return 0; - - handler = new JAWS_Asynch_IO_Handler (this); - if (handler == 0) delete io; - - return handler; -} - -void -JAWS_Asynch_IO_Handler_Factory::destroy_io_handler (JAWS_IO_Handler *handler) -{ - JAWS_TRACE ("JAWS_Asynch_IO_Handler_Factory::destroy"); - delete handler; -} - #endif /* ACE_WIN32 */ #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) template class ACE_Singleton<JAWS_Synch_IO_Handler_Factory, ACE_SYNCH_MUTEX>; -#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) -#pragma instantiate ACE_Singleton<JAWS_Synch_IO_Handler_Factory, ACE_SYNCH_MUTEX> -#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ - -#if defined (ACE_WIN32) -#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) template class ACE_Singleton<JAWS_Asynch_IO_Handler_Factory, ACE_SYNCH_MUTEX>; #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +#pragma instantiate ACE_Singleton<JAWS_Synch_IO_Handler_Factory, ACE_SYNCH_MUTEX> #pragma instantiate ACE_Singleton<JAWS_Asynch_IO_Handler_Factory, ACE_SYNCH_MUTEX> #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ -#endif /* ACE_WIN32 */ diff --git a/apps/JAWS/PROTOTYPE/JAWS/IO_Handler.h b/apps/JAWS/PROTOTYPE/JAWS/IO_Handler.h index 6b349cf2771..35f68358dbc 100644 --- a/apps/JAWS/PROTOTYPE/JAWS/IO_Handler.h +++ b/apps/JAWS/PROTOTYPE/JAWS/IO_Handler.h @@ -28,11 +28,7 @@ class JAWS_IO; class JAWS_Synch_IO; class JAWS_Asynch_IO; class JAWS_IO_Handler; -class JAWS_Synch_IO_Handler; -class JAWS_Asynch_IO_Handler; class JAWS_IO_Handler_Factory; -class JAWS_Synch_IO_Handler_Factory; -class JAWS_Asynch_IO_Handler_Factory; class JAWS_Data_Block; class JAWS_Pipeline_Handler; @@ -121,8 +117,45 @@ public: }; +#if defined(ACE_WIN32) || defined(ACE_HAS_AIO_CALLS) +class JAWS_Export JAWS_Asynch_Handler : public ACE_Handler +{ +public: + JAWS_Asynch_Handler (JAWS_IO_Handler *); + virtual ~JAWS_Asynch_Handler (void); + + virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result + &result); + // This method will be called when an asynchronous read completes on + // a stream. + + virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result + &result); + // This method will be called when an asynchronous write completes + // on a stream. + + virtual void handle_transmit_file (const ACE_Asynch_Transmit_File::Result + &result); + // This method will be called when an asynchronous transmit file + // completes. + + virtual void handle_accept (const ACE_Asynch_Accept::Result &result); + // This method will be called when an asynchronous accept completes. + + virtual void handler (JAWS_IO_Handler *ioh); + virtual JAWS_IO_Handler * handler (void); + + virtual void dispatch_handler (void); + +private: + JAWS_IO_Handler *ioh_; +}; +#endif /* defined(ACE_WIN32) || defined(ACE_HAS_AIO_CALLS) */ + + class JAWS_Export JAWS_IO_Handler : public JAWS_Abstract_IO_Handler { + // Provide implementations for the common functions. public: JAWS_IO_Handler (JAWS_IO_Handler_Factory *factory); virtual ~JAWS_IO_Handler (void); @@ -161,6 +194,10 @@ public: RECEIVE_OK, RECEIVE_ERROR }; // The different states of the handler +#if defined (ACE_WIN32) || defined (ACE_HAS_AIO_CALLS) + virtual ACE_Handler *handler (void); +#endif /* ACE_WIN32 || ACE_HAS_AIO_CALLS */ + protected: int status_; // The state of the handler. @@ -177,6 +214,10 @@ protected: JAWS_IO_Handler_Factory *factory_; // The reference to the handler's factory. + +#if defined (ACE_WIN32) || defined (ACE_HAS_AIO_CALLS) + JAWS_Asynch_Handler handler_; +#endif /* ACE_WIN32 || ACE_HAS_AIO_CALLS */ }; class JAWS_Export JAWS_IO_Handler_Factory @@ -188,112 +229,23 @@ public: virtual ~JAWS_IO_Handler_Factory (void); // Destructor - virtual JAWS_IO_Handler *create_io_handler (void) = 0; - // This creates a new HTTP_Handler - - virtual void destroy_io_handler (JAWS_IO_Handler *handler) = 0; - // The HTTP handler will call this method from HTTP_Handler::done to - // tell the factory to reap up the handler as it is now done with - // the protocol -}; - -class JAWS_Export JAWS_Synch_IO_Handler : protected JAWS_IO_Handler -{ -friend class JAWS_Synch_IO; -friend class JAWS_Synch_IO_Handler_Factory; - -public: - JAWS_Synch_IO_Handler (JAWS_IO_Handler_Factory *factory); - virtual ~JAWS_Synch_IO_Handler (void); + virtual JAWS_IO_Handler *create_io_handler (void); + // This creates a new JAWS_IO_Handler + virtual void destroy_io_handler (JAWS_IO_Handler *handler); + // This deletes a JAWS_IO_Handler }; -class JAWS_Export JAWS_Synch_IO_Handler_Factory : public JAWS_IO_Handler_Factory -{ -public: - JAWS_IO_Handler *create_io_handler (void); - // This creates a new HTTP_Handler - - void destroy_io_handler (JAWS_IO_Handler *handler); - // The HTTP handler will call this method from HTTP_Handler::done to - // tell the factory to reap up the handler as it is now done with - // the protocol -}; +typedef JAWS_IO_Handler JAWS_Synch_IO_Handler; +typedef JAWS_IO_Handler_Factory JAWS_Synch_IO_Handler_Factory; typedef ACE_Singleton<JAWS_Synch_IO_Handler_Factory, ACE_SYNCH_MUTEX> JAWS_Synch_IO_Handler_Factory_Singleton; -// This only works on Win32 -#if defined (ACE_WIN32) - -class JAWS_Export JAWS_Asynch_Handler : public ACE_Handler -{ -public: - JAWS_Asynch_Handler (JAWS_IO_Handler *); - virtual ~JAWS_Asynch_Handler (void); - - virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result - &result); - // This method will be called when an asynchronous read completes on - // a stream. - - virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result - &result); - // This method will be called when an asynchronous write completes - // on a stream. - - virtual void handle_transmit_file (const ACE_Asynch_Transmit_File::Result - &result); - // This method will be called when an asynchronous transmit file - // completes. - - virtual void handle_accept (const ACE_Asynch_Accept::Result &result); - // This method will be called when an asynchronous accept completes. - - virtual void handler (JAWS_IO_Handler *ioh); - virtual JAWS_IO_Handler * handler (void); - - virtual void dispatch_handler (void); - -private: - JAWS_IO_Handler *ioh_; -}; - -class JAWS_Export JAWS_Asynch_IO_Handler : protected JAWS_IO_Handler -{ -friend class JAWS_Asynch_IO; -friend class JAWS_Asynch_IO_Handler_Factory; - -public: - JAWS_Asynch_IO_Handler (JAWS_IO_Handler_Factory *factory); - virtual ~JAWS_Asynch_IO_Handler (void); - - virtual void accept_complete (ACE_HANDLE handle); - - virtual ACE_Handler *handler (void); - - virtual void accept_called_already (int called); - virtual int accept_called_already (void); - -private: - JAWS_Asynch_Handler handler_; - int accept_called_already_; -}; - -class JAWS_Export JAWS_Asynch_IO_Handler_Factory : public JAWS_IO_Handler_Factory -{ -public: - JAWS_IO_Handler *create_io_handler (void); - // This creates a new HTTP_Handler - - void destroy_io_handler (JAWS_IO_Handler *handler); - // The HTTP handler will call this method from HTTP_Handler::done to - // tell the factory to reap up the handler as it is now done with - // the protocol -}; +typedef JAWS_IO_Handler JAWS_Asynch_IO_Handler; +typedef JAWS_IO_Handler_Factory JAWS_Asynch_IO_Handler_Factory; typedef ACE_Singleton<JAWS_Asynch_IO_Handler_Factory, ACE_SYNCH_MUTEX> JAWS_Asynch_IO_Handler_Factory_Singleton; -#endif /* ACE_WIN32 */ #endif /* JAWS_IO_HANDLER_H */ diff --git a/apps/JAWS/PROTOTYPE/JAWS/Pipeline_Tasks.cpp b/apps/JAWS/PROTOTYPE/JAWS/Pipeline_Tasks.cpp index ee8e4effc8c..b390d03ba29 100644 --- a/apps/JAWS/PROTOTYPE/JAWS/Pipeline_Tasks.cpp +++ b/apps/JAWS/PROTOTYPE/JAWS/Pipeline_Tasks.cpp @@ -10,6 +10,11 @@ ACE_RCSID(JAWS, Pipeline_Tasks, "$Id$") +JAWS_Pipeline_Handler::JAWS_Pipeline_Handler (void) + : policy_ (0) +{ +} + JAWS_Pipeline_Handler::~JAWS_Pipeline_Handler (void) { } @@ -33,20 +38,38 @@ JAWS_Pipeline_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *tv) return status; } +JAWS_Dispatch_Policy * +JAWS_Pipeline_Handler::policy (void) +{ + return this->policy_; +} + +void +JAWS_Pipeline_Handler::policy (JAWS_Dispatch_Policy *policy) +{ + this->policy_ = policy; +} + int JAWS_Pipeline_Accept_Task::put (ACE_Message_Block *mb, ACE_Time_Value *tv) { JAWS_Data_Block *db = ACE_dynamic_cast (JAWS_Data_Block *, mb); - JAWS_Pipeline_Handler *task = db->io_handler ()->task (); + JAWS_Pipeline_Handler *task = db->task (); JAWS_Pipeline_Handler *next = ACE_dynamic_cast (JAWS_Pipeline_Handler *, task->next ()); - db->io_handler ()->task (next); + JAWS_IO_Handler *ioh = this->new_handler (db); + if (ioh == 0) + { + ACE_ERROR ((LM_ERROR, "%p\n", "JAWS_Pipeline_Accept_Task::put")); + return -1; + } - int status = this->handle_put (db, tv); + ioh->task (next); + db->io_handler (ioh); - return status; + return this->handle_put (ioh->message_block (), tv); } int @@ -57,7 +80,9 @@ JAWS_Pipeline_Accept_Task::handle_put (JAWS_Data_Block *data, // JAWS_Data_Block should contain an INET_Addr and an IO JAWS_IO_Handler *handler = data->io_handler (); - JAWS_Dispatch_Policy *policy = data->policy (); + JAWS_Dispatch_Policy *policy = this->policy (); + + if (policy == 0) policy = data->policy (); // data->policy ()->update (handler); @@ -96,13 +121,19 @@ JAWS_Pipeline_Accept_Task::handle_put (JAWS_Data_Block *data, { for (int i = 1; i < policy->ratio (); i++) { - io->accept (handler); - if (handler->status () == JAWS_IO_Handler::ACCEPT_ERROR) + JAWS_IO_Handler *ioh = this->new_handler (data); + if (handler == 0) break; + ioh->task (handler->task ()); + io->accept (ioh); + if (ioh->status () == JAWS_IO_Handler::ACCEPT_ERROR) + { + ioh->message_block ()->release (); + ioh->factory ()->destroy_io_handler (ioh); + break; + } } policy->ratio (1); - policy->io (JAWS_Asynch2_IO_Singleton::instance ()); - // This is the Asynch IO version with a do nothing accept(). } #endif /* defined (ACE_WIN32) || defined (ACE_HAS_AIO_CALLS) */ @@ -119,6 +150,36 @@ JAWS_Pipeline_Accept_Task::handle_put (JAWS_Data_Block *data, return result; } +JAWS_IO_Handler * +JAWS_Pipeline_Accept_Task::new_handler (JAWS_Data_Block *data) +{ + // Create a new handler and message block + JAWS_Data_Block *ndb = new JAWS_Data_Block (*data); + if (ndb == 0) + { + JAWS_TRACE ("JAWS_Pipeline_Accept_Task::new_handler, failed DB"); + return 0; + } + + JAWS_Dispatch_Policy *policy = + (this->policy () == 0) ? data->policy () : this->policy (); + JAWS_IO_Handler_Factory *ioh_factory = policy->ioh_factory (); + + JAWS_IO_Handler *nioh = ioh_factory->create_io_handler (); + if (nioh == 0) + { + ndb->release (); + return 0; + } + + ndb->io_handler (nioh); + nioh->task (data->task ()); + nioh->message_block (ndb); + + return nioh; +} + + #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) template class JAWS_Pipeline_Abstract_Handler<JAWS_Data_Block>; template class ACE_Singleton<JAWS_Pipeline_Accept_Task, ACE_SYNCH_MUTEX>; diff --git a/apps/JAWS/PROTOTYPE/JAWS/Pipeline_Tasks.h b/apps/JAWS/PROTOTYPE/JAWS/Pipeline_Tasks.h index bea10546203..524eadcf6a4 100644 --- a/apps/JAWS/PROTOTYPE/JAWS/Pipeline_Tasks.h +++ b/apps/JAWS/PROTOTYPE/JAWS/Pipeline_Tasks.h @@ -14,9 +14,16 @@ class JAWS_Export JAWS_Pipeline_Handler : public JAWS_Pipeline_Abstract_Handler<JAWS_Data_Block> { public: + JAWS_Pipeline_Handler (void); virtual ~JAWS_Pipeline_Handler (void); virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv = 0); virtual int handle_put (JAWS_Data_Block *data, ACE_Time_Value *tv) = 0; + + virtual JAWS_Dispatch_Policy * policy (void); + virtual void policy (JAWS_Dispatch_Policy *); + +private: + JAWS_Dispatch_Policy *policy_; }; class JAWS_Pipeline_Accept_Task : public JAWS_Pipeline_Handler @@ -24,6 +31,8 @@ class JAWS_Pipeline_Accept_Task : public JAWS_Pipeline_Handler public: virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv = 0); virtual int handle_put (JAWS_Data_Block *data, ACE_Time_Value *tv); + + virtual JAWS_IO_Handler * new_handler (JAWS_Data_Block *data); }; typedef ACE_Singleton<JAWS_Pipeline_Accept_Task, ACE_SYNCH_MUTEX> diff --git a/apps/JAWS/PROTOTYPE/JAWS/Reaper.cpp b/apps/JAWS/PROTOTYPE/JAWS/Reaper.cpp index 7099692891a..a6afbf85695 100644 --- a/apps/JAWS/PROTOTYPE/JAWS/Reaper.cpp +++ b/apps/JAWS/PROTOTYPE/JAWS/Reaper.cpp @@ -6,7 +6,8 @@ ACE_RCSID(JAWS, Reaper, "$Id$") JAWS_Reaper::JAWS_Reaper (JAWS_Concurrency_Base *concurrency) - : concurrency_ (concurrency) + : concurrency_ (concurrency), + waiting_ (0) { } |