diff options
author | jxh <jxh@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-07-29 22:05:29 +0000 |
---|---|---|
committer | jxh <jxh@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-07-29 22:05:29 +0000 |
commit | c87a026faac44804463d24f21079231d6bffa4b4 (patch) | |
tree | 0c8f76d04778a4c4c4d7279b9686eeb5452edaea | |
parent | f85d95f9e8fc7b0b90e939cfb86d5089eeb27d89 (diff) | |
download | ATCD-c87a026faac44804463d24f21079231d6bffa4b4.tar.gz |
Added a reaper class which waits for threads to die. Changes to
other files to enable Asynchronous Accept calls.
-rw-r--r-- | apps/JAWS/PROTOTYPE/JAWS/Concurrency.cpp | 13 | ||||
-rw-r--r-- | apps/JAWS/PROTOTYPE/JAWS/Concurrency.h | 2 | ||||
-rw-r--r-- | apps/JAWS/PROTOTYPE/JAWS/IO_Acceptor.cpp | 131 | ||||
-rw-r--r-- | apps/JAWS/PROTOTYPE/JAWS/IO_Acceptor.h | 47 | ||||
-rw-r--r-- | apps/JAWS/PROTOTYPE/JAWS/IO_Handler.cpp | 31 | ||||
-rw-r--r-- | apps/JAWS/PROTOTYPE/JAWS/IO_Handler.h | 10 | ||||
-rw-r--r-- | apps/JAWS/PROTOTYPE/JAWS/Makefile | 224 | ||||
-rw-r--r-- | apps/JAWS/PROTOTYPE/JAWS/Reaper.cpp | 36 | ||||
-rw-r--r-- | apps/JAWS/PROTOTYPE/JAWS/Reaper.h | 43 | ||||
-rw-r--r-- | apps/JAWS/PROTOTYPE/JAWS/Server.cpp | 13 |
10 files changed, 466 insertions, 84 deletions
diff --git a/apps/JAWS/PROTOTYPE/JAWS/Concurrency.cpp b/apps/JAWS/PROTOTYPE/JAWS/Concurrency.cpp index 6bece81aa7f..7bcdb170ec3 100644 --- a/apps/JAWS/PROTOTYPE/JAWS/Concurrency.cpp +++ b/apps/JAWS/PROTOTYPE/JAWS/Concurrency.cpp @@ -9,11 +9,13 @@ #include "JAWS/Policy.h" #include "JAWS/Data_Block.h" #include "JAWS/Waiter.h" +#include "JAWS/Reaper.h" JAWS_Concurrency_Base::JAWS_Concurrency_Base (void) : ACE_Task<ACE_MT_SYNCH> (new ACE_Thread_Manager), mb_acquired_ (0), - mb_ (0) + mb_ (0), + reaper_ (new JAWS_Reaper (this)) { } @@ -142,6 +144,7 @@ JAWS_Concurrency_Base::svc_hook (JAWS_Data_Block *db) // handler maintains the state of the protocol task = handler->task (); ts_db = handler->message_block (); + handler->waiter_index (waiter_index); // Use a NULL task to make the thread recycle now if (task == 0) @@ -250,6 +253,10 @@ JAWS_Thread_Pool_Task::open (long flags, int nthreads, int maxthreads) this->thr_mgr_->resume_all (); + if (this->reaper_->open () == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "JAWS_Reaper::open"), + -1); + return 0; } @@ -305,6 +312,10 @@ JAWS_Thread_Per_Task::activate_hook (void) this->thr_mgr_->resume (thr_name); + if (this->reaper_->open () == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "JAWS_Reaper::open"), + -1); + return 0; } diff --git a/apps/JAWS/PROTOTYPE/JAWS/Concurrency.h b/apps/JAWS/PROTOTYPE/JAWS/Concurrency.h index 953543e889e..c9386f99994 100644 --- a/apps/JAWS/PROTOTYPE/JAWS/Concurrency.h +++ b/apps/JAWS/PROTOTYPE/JAWS/Concurrency.h @@ -13,6 +13,7 @@ class JAWS_Data_Block; class JAWS_Dispatch_Policy; +class JAWS_Reaper; class JAWS_Export JAWS_Concurrency_Base : public ACE_Task<ACE_MT_SYNCH> // = TITLE @@ -46,6 +47,7 @@ public: protected: int mb_acquired_; ACE_Message_Block *mb_; + JAWS_Reaper *reaper_; ACE_SYNCH_MUTEX lock_; }; diff --git a/apps/JAWS/PROTOTYPE/JAWS/IO_Acceptor.cpp b/apps/JAWS/PROTOTYPE/JAWS/IO_Acceptor.cpp index 57c84cce954..4c304c0f274 100644 --- a/apps/JAWS/PROTOTYPE/JAWS/IO_Acceptor.cpp +++ b/apps/JAWS/PROTOTYPE/JAWS/IO_Acceptor.cpp @@ -18,6 +18,12 @@ JAWS_IO_Acceptor::open (const ACE_INET_Addr &) } int +JAWS_IO_Acceptor::open (const ACE_HANDLE &) +{ + return -1; +} + +int JAWS_IO_Acceptor::accept (ACE_SOCK_Stream &, ACE_Addr *, ACE_Time_Value *, int, int) const { @@ -30,6 +36,12 @@ JAWS_IO_Acceptor::accept (size_t) return -1; } +ACE_HANDLE +JAWS_IO_Acceptor::get_handle (size_t) +{ + return ACE_INVALID_HANDLE; +} + int JAWS_IO_Synch_Acceptor::open (const ACE_INET_Addr &local_sap) { @@ -37,6 +49,20 @@ JAWS_IO_Synch_Acceptor::open (const ACE_INET_Addr &local_sap) } int +JAWS_IO_Synch_Acceptor::open (const ACE_HANDLE &socket) +{ + ACE_HANDLE handle = this->acceptor_.get_handle (); + if (handle == socket) + return 0; + + if (handle != ACE_INVALID_HANDLE) + ACE_OS::closesocket (this->acceptor_.get_handle ()); + this->acceptor_.set_handle (socket); + + return 0; +} + +int JAWS_IO_Synch_Acceptor::accept (ACE_SOCK_Stream &new_stream, ACE_Addr *remote_addr, ACE_Time_Value *timeout, @@ -47,6 +73,93 @@ JAWS_IO_Synch_Acceptor::accept (ACE_SOCK_Stream &new_stream, restart, reset_new_handle); } +ACE_HANDLE +JAWS_IO_Synch_Acceptor::get_handle (void) +{ + return this->acceptor_.get_handle (); +} + +int +JAWS_Asynch_Acceptor::open (const ACE_INET_Addr &address, + size_t bytes_to_read, + int pass_addresses, + int backlog, + int reuse_addr, + ACE_Proactor *proactor, + int validate_new_connection, + int reissue_accept) +{ + // Borrowed straight out of ACE_Asynch_Acceptor, except don't initiate + // accepts. + + this->proactor (proactor); + this->pass_addresses_ = pass_addresses; + this->bytes_to_read_ = bytes_to_read; + this->validate_new_connection_ = validate_new_connection; + this->reissue_accept_ = reissue_accept; + + // Create the listener socket + this->listen_handle_ = ACE_OS::socket (PF_INET, SOCK_STREAM, 0); + if (this->listen_handle_ == ACE_INVALID_HANDLE) + ACE_ERROR_RETURN ((LM_ERROR, + ASYS_TEXT ("%p\n"), ASYS_TEXT ("ACE_OS::socket")), -1); + + // Initialize the ACE_Asynch_Accept + if (this->asynch_accept_.open (*this, + this->listen_handle_, + 0, + this->proactor ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ASYS_TEXT ("%p\n"), + ASYS_TEXT ("ACE_Asynch_Accept::open")), -1); + + if (reuse_addr) + { + // Reuse the address + int one = 1; + if (ACE_OS::setsockopt (this->listen_handle_, + SOL_SOCKET, + SO_REUSEADDR, + (const char*) &one, + sizeof one) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ASYS_TEXT ("%p\n"), + ASYS_TEXT ("ACE_OS::setsockopt")), -1); + } + + // If port is not specified, bind to any port. + static ACE_INET_Addr sa ((const ACE_INET_Addr &) ACE_Addr::sap_any); + + if (address == sa && ACE::bind_port (this->listen_handle_) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ASYS_TEXT ("%p\n"), + ASYS_TEXT ("ACE::bind_port")), -1); + + // Bind to the specified port. + if (ACE_OS::bind (this->listen_handle_, + (sockaddr *) address.get_addr (), + address.get_size ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "ACE_OS::bind"), + -1); + + // Start listening + if (ACE_OS::listen (this->listen_handle_, backlog) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "ACE_OS::listen"), -1); + + return 0; +} + + +ACE_HANDLE +JAWS_Asynch_Acceptor::get_handle (void) +{ + return this->handle (); +} + int JAWS_IO_Asynch_Acceptor::open (const ACE_INET_Addr &address) @@ -62,11 +175,25 @@ JAWS_IO_Asynch_Acceptor::open (const ACE_INET_Addr &address) } int +JAWS_IO_Asynch_Acceptor::open (const ACE_HANDLE &socket) +{ + ACE_HANDLE handle = this->acceptor_.get_handle (); + if (handle == socket) + return 0; + + if (handle != ACE_INVALID_HANDLE) + ACE_OS::closesocket (this->acceptor_.get_handle ()); + this->acceptor_.set_handle (socket); + + return 0; +} + +int JAWS_IO_Asynch_Acceptor::accept (size_t bytes_to_read) { #if defined (ACE_WIN32) || defined (ACE_HAS_AIO_CALLS) - // This only works on Win32 platforms - return this->acceptor_.accept (bytes_to_read); + // Do nothing, since asynchronous accepts have already been created. + return 0; #else ACE_UNUSED_ARG (bytes_to_read); return -1; diff --git a/apps/JAWS/PROTOTYPE/JAWS/IO_Acceptor.h b/apps/JAWS/PROTOTYPE/JAWS/IO_Acceptor.h index 61aab8ec912..f08245cf01b 100644 --- a/apps/JAWS/PROTOTYPE/JAWS/IO_Acceptor.h +++ b/apps/JAWS/PROTOTYPE/JAWS/IO_Acceptor.h @@ -35,6 +35,9 @@ public: virtual int open (const ACE_INET_Addr &address); // Initiate a passive mode socket. + virtual int open (const ACE_HANDLE &socket); + // Initiate a passive mode socket. + virtual int accept (ACE_SOCK_Stream &new_stream, ACE_Addr *remote_addr = 0, ACE_Time_Value *timeout = 0, @@ -45,12 +48,13 @@ public: virtual int accept (size_t bytes_to_read = 0); // This initiates a new asynchronous accept through the AcceptEx call. + virtual ACE_HANDLE get_handle (void); + // Get the listener's handle + enum { ASYNC = 0, SYNCH = 1 }; // identify if this is being used for aynchronous or synchronous // accept calls -private: - }; class JAWS_Export JAWS_IO_Synch_Acceptor : public JAWS_IO_Acceptor @@ -60,6 +64,9 @@ public: virtual int open (const ACE_INET_Addr &local_sap); // Initiate a passive mode socket. + virtual int open (const ACE_HANDLE &socket); + // Initiate a passive mode socket. + virtual int accept (ACE_SOCK_Stream &new_stream, ACE_Addr *remote_addr = 0, ACE_Time_Value *timeout = 0, @@ -67,10 +74,35 @@ public: int reset_new_handle = 0) const; // Accept the connection + virtual ACE_HANDLE get_handle (void); + // Get the listener's handle + private: JAWS_IO_SOCK_Acceptor acceptor_; }; + JAWS_Asynch_Acceptor_Base; + +class JAWS_Export JAWS_Asynch_Acceptor +#if defined (ACE_WIN32) || defined (ACE_HAS_AIO_CALLS) +// This only works on Win32 platforms + : public ACE_Asynch_Acceptor<JAWS_Asynch_IO_Handler_Factory> +#endif /* defined (ACE_WIN32) */ +{ +public: + virtual int open (const ACE_INET_Addr &address, + size_t bytes_to_read = 0, + int pass_addresses = 0, + int backlog = 5, + int reuse_addr = 1, + ACE_Proactor *proactor = 0, + int validate_new_connection = 0, + int reissue_accept = 1); + + virtual ACE_HANDLE get_handle (void); + // get the ACE_HANDLE, so I can close it! +}; + class JAWS_Export JAWS_IO_Asynch_Acceptor : public JAWS_IO_Acceptor { @@ -79,15 +111,18 @@ public: virtual int open (const ACE_INET_Addr &address); // Initiate an asynchronous passive connection + virtual int open (const ACE_HANDLE &socket); + // Initiate an asynchronous passive connection + virtual int accept (size_t bytes_to_read = 0); // This initiates a new asynchronous accept through the AcceptEx call. + virtual ACE_HANDLE get_handle (void); + // Get the listener's handle + private: -#if defined (ACE_WIN32) -// This only works on Win32 platforms - ACE_Asynch_Acceptor<JAWS_Asynch_IO_Handler_Factory> acceptor_; -#endif /* defined (ACE_WIN32) */ + JAWS_Asynch_Acceptor acceptor_; }; typedef ACE_Singleton<JAWS_IO_Synch_Acceptor, ACE_SYNCH_MUTEX> diff --git a/apps/JAWS/PROTOTYPE/JAWS/IO_Handler.cpp b/apps/JAWS/PROTOTYPE/JAWS/IO_Handler.cpp index 7e988a8385d..7abd15ee6a0 100644 --- a/apps/JAWS/PROTOTYPE/JAWS/IO_Handler.cpp +++ b/apps/JAWS/PROTOTYPE/JAWS/IO_Handler.cpp @@ -24,6 +24,7 @@ JAWS_IO_Handler::JAWS_IO_Handler (JAWS_IO_Handler_Factory *factory) mb_ (0), handle_ (ACE_INVALID_HANDLE), task_ (0), + waiter_index_ (-1), factory_ (factory) { // this->io_->handler (this); @@ -150,6 +151,18 @@ JAWS_IO_Handler::task (void) } void +JAWS_IO_Handler::waiter_index (int index) +{ + this->waiter_index_ = index; +} + +int +JAWS_IO_Handler::waiter_index (void) +{ + return this->waiter_index_; +} + +void JAWS_IO_Handler::message_block (JAWS_Data_Block *mb) { this->mb_ = mb; @@ -350,12 +363,30 @@ JAWS_Asynch_IO_Handler::~JAWS_Asynch_IO_Handler (void) { } +void +JAWS_Asynch_IO_Handler::accept_complete (ACE_HANDLE handle) +{ + // callback into pipeline task, notify that the accept has completed + this->handle_ = handle; + this->status_ = ACCEPT_OK; + + JAWS_Dispatch_Policy *policy = this->mb_->policy (); + + // Irfan says at this point issue another accept + if (policy->acceptor () == JAWS_IO_Asynch_Acceptor_Singleton::instance ()) + policy->acceptor ()->accept (JAWS_Data_Block::JAWS_Data_Block_Size); + + // Do this so that Thread Per Request can spawn a new thread + policy->concurrency ()->activate_hook (); +} + ACE_Handler * JAWS_Asynch_IO_Handler::handler (void) { return &this->handler_; } + JAWS_IO_Handler * JAWS_Asynch_IO_Handler_Factory::create_io_handler (void) { diff --git a/apps/JAWS/PROTOTYPE/JAWS/IO_Handler.h b/apps/JAWS/PROTOTYPE/JAWS/IO_Handler.h index 93d6feee14c..2eada789c0d 100644 --- a/apps/JAWS/PROTOTYPE/JAWS/IO_Handler.h +++ b/apps/JAWS/PROTOTYPE/JAWS/IO_Handler.h @@ -140,15 +140,19 @@ public: virtual void write_error (void); virtual void confirmation_message_complete (void); virtual void error_message_complete (void); - virtual void task (JAWS_Pipeline_Handler *ph); virtual JAWS_IO_Handler_Factory *factory (void); virtual ACE_HANDLE handle (void); virtual void done (void); virtual int status (void); + + virtual void task (JAWS_Pipeline_Handler *ph); virtual JAWS_Pipeline_Handler *task (void); + virtual void waiter_index (int index); + virtual int waiter_index (void); + virtual void message_block (JAWS_Data_Block *mb); virtual JAWS_Data_Block *message_block (void); @@ -174,6 +178,8 @@ protected: // This is a reference to the next stage of the pipeline when the IO // request completes. + int waiter_index_; + JAWS_IO_Handler_Factory *factory_; // The reference to the handler's factory. }; @@ -267,6 +273,8 @@ public: JAWS_Asynch_IO_Handler (JAWS_IO_Handler_Factory *factory); virtual ~JAWS_Asynch_IO_Handler (void); + virtual void accept_complete (ACE_HANDLE handle); + virtual ACE_Handler *handler (void); private: diff --git a/apps/JAWS/PROTOTYPE/JAWS/Makefile b/apps/JAWS/PROTOTYPE/JAWS/Makefile index c27298ec0b2..30125c60226 100644 --- a/apps/JAWS/PROTOTYPE/JAWS/Makefile +++ b/apps/JAWS/PROTOTYPE/JAWS/Makefile @@ -15,6 +15,7 @@ SHLIB = libJAWS.$(SOEXT) MYFILES = \ Waiter \ + Reaper \ Pipeline \ Pipeline_Tasks \ Data_Block \ @@ -67,7 +68,6 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU # DO NOT DELETE THIS LINE -- g++dep uses it. # DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY. -.obj/Assoc_Array.o .obj/Assoc_Array.so .shobj/Assoc_Array.o .shobj/Assoc_Array.so: Assoc_Array.cpp ../JAWS/Assoc_Array.h .obj/Waiter.o .obj/Waiter.so .shobj/Waiter.o .shobj/Waiter.so: Waiter.cpp $(ACE_ROOT)/ace/Proactor.h \ $(ACE_ROOT)/ace/OS.h \ $(ACE_ROOT)/ace/inc_user_config.h \ @@ -143,7 +143,7 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU $(ACE_ROOT)/ace/Singleton.h \ $(ACE_ROOT)/ace/Singleton.i \ $(ACE_ROOT)/ace/Singleton.cpp \ - ../JAWS/Assoc_Array.h ../JAWS/Assoc_Array.cpp + ../JAWS/Assoc_Array.h ../JAWS/Assoc_Array.cpp ../JAWS/Export.h .obj/Pipeline.o .obj/Pipeline.so .shobj/Pipeline.o .shobj/Pipeline.so: Pipeline.cpp ../JAWS/Pipeline.h \ $(ACE_ROOT)/ace/Synch.h \ $(ACE_ROOT)/ace/ACE.h \ @@ -783,7 +783,8 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU $(ACE_ROOT)/ace/Stream.cpp \ ../JAWS/Pipeline_Handler.h ../JAWS/Pipeline_Handler.cpp \ ../JAWS/Pipeline_Tasks.h ../JAWS/Data_Block.h ../JAWS/Policy.h \ - ../JAWS/Waiter.h ../JAWS/Assoc_Array.h ../JAWS/Assoc_Array.cpp + ../JAWS/Waiter.h ../JAWS/Assoc_Array.h ../JAWS/Assoc_Array.cpp \ + ../JAWS/Reaper.h .obj/Server.o .obj/Server.so .shobj/Server.o .shobj/Server.so: Server.cpp $(ACE_ROOT)/ace/Get_Opt.h \ $(ACE_ROOT)/ace/ACE.h \ $(ACE_ROOT)/ace/OS.h \ @@ -923,8 +924,9 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU $(ACE_ROOT)/ace/SOCK_Acceptor.i \ $(ACE_ROOT)/ace/LOCK_SOCK_Acceptor.cpp \ ../JAWS/Pipeline_Tasks.h -.obj/IO_Acceptor.o .obj/IO_Acceptor.so .shobj/IO_Acceptor.o .shobj/IO_Acceptor.so: IO_Acceptor.cpp ../JAWS/IO_Acceptor.h \ - $(ACE_ROOT)/ace/Asynch_Acceptor.h \ +.obj/IO_Acceptor.o .obj/IO_Acceptor.so .shobj/IO_Acceptor.o .shobj/IO_Acceptor.so: IO_Acceptor.cpp ../JAWS/Data_Block.h \ + $(ACE_ROOT)/ace/Message_Block.h \ + $(ACE_ROOT)/ace/ACE.h \ $(ACE_ROOT)/ace/OS.h \ $(ACE_ROOT)/ace/inc_user_config.h \ $(ACE_ROOT)/ace/config.h \ @@ -937,27 +939,13 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU $(ACE_ROOT)/ace/Trace.h \ $(ACE_ROOT)/ace/Log_Msg.h \ $(ACE_ROOT)/ace/Log_Record.h \ - $(ACE_ROOT)/ace/ACE.h \ $(ACE_ROOT)/ace/ACE.i \ $(ACE_ROOT)/ace/Log_Priority.h \ $(ACE_ROOT)/ace/Log_Record.i \ - $(ACE_ROOT)/ace/LOCK_SOCK_Acceptor.h \ - $(ACE_ROOT)/ace/SOCK_Acceptor.h \ - $(ACE_ROOT)/ace/SOCK_Stream.h \ - $(ACE_ROOT)/ace/SOCK_IO.h \ - $(ACE_ROOT)/ace/SOCK.h \ - $(ACE_ROOT)/ace/Addr.h \ - $(ACE_ROOT)/ace/Addr.i \ - $(ACE_ROOT)/ace/IPC_SAP.h \ - $(ACE_ROOT)/ace/IPC_SAP.i \ - $(ACE_ROOT)/ace/SOCK.i \ - $(ACE_ROOT)/ace/SOCK_IO.i \ - $(ACE_ROOT)/ace/INET_Addr.h \ - $(ACE_ROOT)/ace/INET_Addr.i \ - $(ACE_ROOT)/ace/SOCK_Stream.i \ - $(ACE_ROOT)/ace/Time_Value.h \ - $(ACE_ROOT)/ace/SOCK_Acceptor.i \ - $(ACE_ROOT)/ace/LOCK_SOCK_Acceptor.cpp \ + $(ACE_ROOT)/ace/Malloc.h \ + $(ACE_ROOT)/ace/Malloc_Base.h \ + $(ACE_ROOT)/ace/Malloc.i \ + $(ACE_ROOT)/ace/Malloc_T.h \ $(ACE_ROOT)/ace/Synch.h \ $(ACE_ROOT)/ace/SV_Semaphore_Complex.h \ $(ACE_ROOT)/ace/SV_Semaphore_Simple.h \ @@ -972,17 +960,109 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU $(ACE_ROOT)/ace/Thread.i \ $(ACE_ROOT)/ace/Atomic_Op.i \ $(ACE_ROOT)/ace/Synch_T.cpp \ - $(ACE_ROOT)/ace/Singleton.h \ - $(ACE_ROOT)/ace/Singleton.i \ - $(ACE_ROOT)/ace/Singleton.cpp \ + $(ACE_ROOT)/ace/Free_List.h \ + $(ACE_ROOT)/ace/Free_List.i \ + $(ACE_ROOT)/ace/Free_List.cpp \ + $(ACE_ROOT)/ace/Malloc_T.i \ + $(ACE_ROOT)/ace/Malloc_T.cpp \ + $(ACE_ROOT)/ace/Memory_Pool.h \ + $(ACE_ROOT)/ace/Signal.h \ + $(ACE_ROOT)/ace/Containers.h \ + $(ACE_ROOT)/ace/Containers.i \ + $(ACE_ROOT)/ace/Containers.cpp \ + $(ACE_ROOT)/ace/Signal.i \ $(ACE_ROOT)/ace/Object_Manager.h \ $(ACE_ROOT)/ace/Object_Manager.i \ $(ACE_ROOT)/ace/Managed_Object.h \ $(ACE_ROOT)/ace/Managed_Object.i \ $(ACE_ROOT)/ace/Managed_Object.cpp \ - ../JAWS/Export.h ../JAWS/IO.h \ - $(ACE_ROOT)/ace/Asynch_IO.h -.obj/IO_Handler.o .obj/IO_Handler.so .shobj/IO_Handler.o .shobj/IO_Handler.so: IO_Handler.cpp ../JAWS/JAWS.h ../JAWS/IO.h \ + $(ACE_ROOT)/ace/Mem_Map.h \ + $(ACE_ROOT)/ace/Mem_Map.i \ + $(ACE_ROOT)/ace/Memory_Pool.i \ + $(ACE_ROOT)/ace/Message_Block.i ../JAWS/Export.h \ + ../JAWS/Pipeline.h \ + $(ACE_ROOT)/ace/Service_Config.h \ + $(ACE_ROOT)/ace/Service_Object.h \ + $(ACE_ROOT)/ace/Shared_Object.h \ + $(ACE_ROOT)/ace/Shared_Object.i \ + $(ACE_ROOT)/ace/Service_Object.i \ + $(ACE_ROOT)/ace/SString.h \ + $(ACE_ROOT)/ace/SString.i \ + $(ACE_ROOT)/ace/Service_Config.i \ + $(ACE_ROOT)/ace/Reactor.h \ + $(ACE_ROOT)/ace/Handle_Set.h \ + $(ACE_ROOT)/ace/Handle_Set.i \ + $(ACE_ROOT)/ace/Timer_Queue.h \ + $(ACE_ROOT)/ace/Timer_Queue_T.h \ + $(ACE_ROOT)/ace/Timer_Queue_T.i \ + $(ACE_ROOT)/ace/Timer_Queue_T.cpp \ + $(ACE_ROOT)/ace/Reactor.i \ + $(ACE_ROOT)/ace/Reactor_Impl.h \ + $(ACE_ROOT)/ace/Svc_Conf_Tokens.h \ + $(ACE_ROOT)/ace/Stream.h \ + $(ACE_ROOT)/ace/IO_Cntl_Msg.h \ + $(ACE_ROOT)/ace/Time_Value.h \ + $(ACE_ROOT)/ace/Module.h \ + $(ACE_ROOT)/ace/Task_T.h \ + $(ACE_ROOT)/ace/Message_Queue.h \ + $(ACE_ROOT)/ace/Message_Queue_T.h \ + $(ACE_ROOT)/ace/Message_Queue_T.i \ + $(ACE_ROOT)/ace/Message_Queue_T.cpp \ + $(ACE_ROOT)/ace/Strategies.h \ + $(ACE_ROOT)/ace/Strategies_T.h \ + $(ACE_ROOT)/ace/Synch_Options.h \ + $(ACE_ROOT)/ace/Synch_Options.i \ + $(ACE_ROOT)/ace/Hash_Map_Manager.h \ + $(ACE_ROOT)/ace/Hash_Map_Manager.cpp \ + $(ACE_ROOT)/ace/Strategies_T.i \ + $(ACE_ROOT)/ace/Strategies_T.cpp \ + $(ACE_ROOT)/ace/Service_Repository.h \ + $(ACE_ROOT)/ace/Service_Types.h \ + $(ACE_ROOT)/ace/Service_Types.i \ + $(ACE_ROOT)/ace/Service_Repository.i \ + $(ACE_ROOT)/ace/Thread_Manager.h \ + $(ACE_ROOT)/ace/Thread_Manager.i \ + $(ACE_ROOT)/ace/WFMO_Reactor.h \ + $(ACE_ROOT)/ace/WFMO_Reactor.i \ + $(ACE_ROOT)/ace/Strategies.i \ + $(ACE_ROOT)/ace/Message_Queue.i \ + $(ACE_ROOT)/ace/Task.h \ + $(ACE_ROOT)/ace/Task.i \ + $(ACE_ROOT)/ace/Task_T.i \ + $(ACE_ROOT)/ace/Task_T.cpp \ + $(ACE_ROOT)/ace/Module.i \ + $(ACE_ROOT)/ace/Module.cpp \ + $(ACE_ROOT)/ace/Stream_Modules.h \ + $(ACE_ROOT)/ace/Stream_Modules.i \ + $(ACE_ROOT)/ace/Stream_Modules.cpp \ + $(ACE_ROOT)/ace/Stream.i \ + $(ACE_ROOT)/ace/Stream.cpp \ + ../JAWS/Pipeline_Handler.h ../JAWS/Pipeline_Handler.cpp \ + ../JAWS/IO_Acceptor.h \ + $(ACE_ROOT)/ace/Asynch_Acceptor.h \ + $(ACE_ROOT)/ace/LOCK_SOCK_Acceptor.h \ + $(ACE_ROOT)/ace/SOCK_Acceptor.h \ + $(ACE_ROOT)/ace/SOCK_Stream.h \ + $(ACE_ROOT)/ace/SOCK_IO.h \ + $(ACE_ROOT)/ace/SOCK.h \ + $(ACE_ROOT)/ace/Addr.h \ + $(ACE_ROOT)/ace/Addr.i \ + $(ACE_ROOT)/ace/IPC_SAP.h \ + $(ACE_ROOT)/ace/IPC_SAP.i \ + $(ACE_ROOT)/ace/SOCK.i \ + $(ACE_ROOT)/ace/SOCK_IO.i \ + $(ACE_ROOT)/ace/INET_Addr.h \ + $(ACE_ROOT)/ace/INET_Addr.i \ + $(ACE_ROOT)/ace/SOCK_Stream.i \ + $(ACE_ROOT)/ace/SOCK_Acceptor.i \ + $(ACE_ROOT)/ace/LOCK_SOCK_Acceptor.cpp \ + $(ACE_ROOT)/ace/Singleton.h \ + $(ACE_ROOT)/ace/Singleton.i \ + $(ACE_ROOT)/ace/Singleton.cpp ../JAWS/IO.h \ + $(ACE_ROOT)/ace/Asynch_IO.h ../JAWS/IO_Handler.h +.obj/IO_Handler.o .obj/IO_Handler.so .shobj/IO_Handler.o .shobj/IO_Handler.so: IO_Handler.cpp ../JAWS/JAWS.h \ + $(ACE_ROOT)/ace/Filecache.h \ + $(ACE_ROOT)/ace/Mem_Map.h \ $(ACE_ROOT)/ace/ACE.h \ $(ACE_ROOT)/ace/OS.h \ $(ACE_ROOT)/ace/inc_user_config.h \ @@ -999,43 +1079,32 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU $(ACE_ROOT)/ace/ACE.i \ $(ACE_ROOT)/ace/Log_Priority.h \ $(ACE_ROOT)/ace/Log_Record.i \ - $(ACE_ROOT)/ace/Asynch_IO.h \ - $(ACE_ROOT)/ace/SOCK_Stream.h \ - $(ACE_ROOT)/ace/SOCK_IO.h \ - $(ACE_ROOT)/ace/SOCK.h \ - $(ACE_ROOT)/ace/Addr.h \ - $(ACE_ROOT)/ace/Addr.i \ - $(ACE_ROOT)/ace/IPC_SAP.h \ - $(ACE_ROOT)/ace/IPC_SAP.i \ - $(ACE_ROOT)/ace/SOCK.i \ - $(ACE_ROOT)/ace/SOCK_IO.i \ - $(ACE_ROOT)/ace/INET_Addr.h \ - $(ACE_ROOT)/ace/INET_Addr.i \ - $(ACE_ROOT)/ace/SOCK_Stream.i \ - $(ACE_ROOT)/ace/Singleton.h \ + $(ACE_ROOT)/ace/Mem_Map.i \ + $(ACE_ROOT)/ace/Synch_T.h \ + $(ACE_ROOT)/ace/Event_Handler.h \ + $(ACE_ROOT)/ace/Event_Handler.i \ $(ACE_ROOT)/ace/Synch.h \ $(ACE_ROOT)/ace/SV_Semaphore_Complex.h \ $(ACE_ROOT)/ace/SV_Semaphore_Simple.h \ $(ACE_ROOT)/ace/SV_Semaphore_Simple.i \ $(ACE_ROOT)/ace/SV_Semaphore_Complex.i \ $(ACE_ROOT)/ace/Synch.i \ - $(ACE_ROOT)/ace/Synch_T.h \ - $(ACE_ROOT)/ace/Event_Handler.h \ - $(ACE_ROOT)/ace/Event_Handler.i \ $(ACE_ROOT)/ace/Synch_T.i \ $(ACE_ROOT)/ace/Thread.h \ $(ACE_ROOT)/ace/Thread.i \ $(ACE_ROOT)/ace/Atomic_Op.i \ $(ACE_ROOT)/ace/Synch_T.cpp \ - $(ACE_ROOT)/ace/Singleton.i \ - $(ACE_ROOT)/ace/Singleton.cpp \ - $(ACE_ROOT)/ace/Object_Manager.h \ - $(ACE_ROOT)/ace/Object_Manager.i \ - $(ACE_ROOT)/ace/Managed_Object.h \ - $(ACE_ROOT)/ace/Managed_Object.i \ - $(ACE_ROOT)/ace/Managed_Object.cpp \ - ../JAWS/Export.h ../JAWS/IO_Handler.h ../JAWS/Data_Block.h \ - $(ACE_ROOT)/ace/Message_Block.h \ + $(ACE_ROOT)/ace/Hash_Map_Manager.h \ + $(ACE_ROOT)/ace/Hash_Map_Manager.cpp \ + $(ACE_ROOT)/ace/Service_Config.h \ + $(ACE_ROOT)/ace/Service_Object.h \ + $(ACE_ROOT)/ace/Shared_Object.h \ + $(ACE_ROOT)/ace/Shared_Object.i \ + $(ACE_ROOT)/ace/Service_Object.i \ + $(ACE_ROOT)/ace/Signal.h \ + $(ACE_ROOT)/ace/Containers.h \ + $(ACE_ROOT)/ace/Containers.i \ + $(ACE_ROOT)/ace/Containers.cpp \ $(ACE_ROOT)/ace/Malloc.h \ $(ACE_ROOT)/ace/Malloc_Base.h \ $(ACE_ROOT)/ace/Malloc.i \ @@ -1046,20 +1115,13 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU $(ACE_ROOT)/ace/Malloc_T.i \ $(ACE_ROOT)/ace/Malloc_T.cpp \ $(ACE_ROOT)/ace/Memory_Pool.h \ - $(ACE_ROOT)/ace/Signal.h \ - $(ACE_ROOT)/ace/Containers.h \ - $(ACE_ROOT)/ace/Containers.i \ - $(ACE_ROOT)/ace/Containers.cpp \ - $(ACE_ROOT)/ace/Signal.i \ - $(ACE_ROOT)/ace/Mem_Map.h \ - $(ACE_ROOT)/ace/Mem_Map.i \ $(ACE_ROOT)/ace/Memory_Pool.i \ - $(ACE_ROOT)/ace/Message_Block.i ../JAWS/Pipeline.h \ - $(ACE_ROOT)/ace/Service_Config.h \ - $(ACE_ROOT)/ace/Service_Object.h \ - $(ACE_ROOT)/ace/Shared_Object.h \ - $(ACE_ROOT)/ace/Shared_Object.i \ - $(ACE_ROOT)/ace/Service_Object.i \ + $(ACE_ROOT)/ace/Signal.i \ + $(ACE_ROOT)/ace/Object_Manager.h \ + $(ACE_ROOT)/ace/Object_Manager.i \ + $(ACE_ROOT)/ace/Managed_Object.h \ + $(ACE_ROOT)/ace/Managed_Object.i \ + $(ACE_ROOT)/ace/Managed_Object.cpp \ $(ACE_ROOT)/ace/SString.h \ $(ACE_ROOT)/ace/SString.i \ $(ACE_ROOT)/ace/Service_Config.i \ @@ -1072,7 +1134,26 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU $(ACE_ROOT)/ace/Timer_Queue_T.cpp \ $(ACE_ROOT)/ace/Reactor.i \ $(ACE_ROOT)/ace/Reactor_Impl.h \ - $(ACE_ROOT)/ace/Svc_Conf_Tokens.h \ + $(ACE_ROOT)/ace/Svc_Conf_Tokens.h ../JAWS/IO.h \ + $(ACE_ROOT)/ace/Asynch_IO.h \ + $(ACE_ROOT)/ace/SOCK_Stream.h \ + $(ACE_ROOT)/ace/SOCK_IO.h \ + $(ACE_ROOT)/ace/SOCK.h \ + $(ACE_ROOT)/ace/Addr.h \ + $(ACE_ROOT)/ace/Addr.i \ + $(ACE_ROOT)/ace/IPC_SAP.h \ + $(ACE_ROOT)/ace/IPC_SAP.i \ + $(ACE_ROOT)/ace/SOCK.i \ + $(ACE_ROOT)/ace/SOCK_IO.i \ + $(ACE_ROOT)/ace/INET_Addr.h \ + $(ACE_ROOT)/ace/INET_Addr.i \ + $(ACE_ROOT)/ace/SOCK_Stream.i \ + $(ACE_ROOT)/ace/Singleton.h \ + $(ACE_ROOT)/ace/Singleton.i \ + $(ACE_ROOT)/ace/Singleton.cpp ../JAWS/Export.h \ + ../JAWS/IO_Handler.h ../JAWS/Data_Block.h \ + $(ACE_ROOT)/ace/Message_Block.h \ + $(ACE_ROOT)/ace/Message_Block.i ../JAWS/Pipeline.h \ $(ACE_ROOT)/ace/Stream.h \ $(ACE_ROOT)/ace/IO_Cntl_Msg.h \ $(ACE_ROOT)/ace/Time_Value.h \ @@ -1086,8 +1167,6 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU $(ACE_ROOT)/ace/Strategies_T.h \ $(ACE_ROOT)/ace/Synch_Options.h \ $(ACE_ROOT)/ace/Synch_Options.i \ - $(ACE_ROOT)/ace/Hash_Map_Manager.h \ - $(ACE_ROOT)/ace/Hash_Map_Manager.cpp \ $(ACE_ROOT)/ace/Strategies_T.i \ $(ACE_ROOT)/ace/Strategies_T.cpp \ $(ACE_ROOT)/ace/Service_Repository.h \ @@ -1112,7 +1191,8 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU $(ACE_ROOT)/ace/Stream.i \ $(ACE_ROOT)/ace/Stream.cpp \ ../JAWS/Pipeline_Handler.h ../JAWS/Pipeline_Handler.cpp \ - ../JAWS/Policy.h ../JAWS/Concurrency.h + ../JAWS/Policy.h ../JAWS/Concurrency.h ../JAWS/Waiter.h \ + ../JAWS/Assoc_Array.h ../JAWS/Assoc_Array.cpp .obj/IO.o .obj/IO.so .shobj/IO.o .shobj/IO.so: IO.cpp ../JAWS/JAWS.h ../JAWS/IO.h \ $(ACE_ROOT)/ace/ACE.h \ $(ACE_ROOT)/ace/OS.h \ diff --git a/apps/JAWS/PROTOTYPE/JAWS/Reaper.cpp b/apps/JAWS/PROTOTYPE/JAWS/Reaper.cpp new file mode 100644 index 00000000000..21538bef87f --- /dev/null +++ b/apps/JAWS/PROTOTYPE/JAWS/Reaper.cpp @@ -0,0 +1,36 @@ +// $Id$ + +#include "JAWS/Reaper.h" +#include "JAWS/Concurrency.h" + +JAWS_Reaper::JAWS_Reaper (JAWS_Concurrency_Base *concurrency) + : concurrency_ (concurrency) +{ +} + +JAWS_Reaper::~JAWS_Reaper (void) +{ +} + +int +JAWS_Reaper::open (void *) +{ + if (this->waiting_ == 0) + { + ACE_Guard<ACE_SYNCH_MUTEX> g (this->lock_); + if (this->waiting_ == 0) + { + if (this->activate () == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "JAWS_Reaper::activate"), + -1); + this->waiting_ = 1; + } + } + return 0; +} + +int +JAWS_Reaper::svc (void) +{ + return this->concurrency_->wait (); +} diff --git a/apps/JAWS/PROTOTYPE/JAWS/Reaper.h b/apps/JAWS/PROTOTYPE/JAWS/Reaper.h new file mode 100644 index 00000000000..444083498ce --- /dev/null +++ b/apps/JAWS/PROTOTYPE/JAWS/Reaper.h @@ -0,0 +1,43 @@ +/* -*- c++ -*- */ +// $Id$ + +#if !defined (JAWS_REAPER_H) +#define JAWS_REAPER_H + +#include "ace/Singleton.h" +#include "ace/Synch.h" +#include "ace/Task.h" + +#include "JAWS/Export.h" + +// A reaper class to reap the threads. + +class JAWS_Concurrency_Base; + +class JAWS_Export JAWS_Reaper : public ACE_Task<ACE_MT_SYNCH> +{ + // = TITLE + // Reap threads for the concurrency strategies + // + // = DESCRIPTION + // The JAWS_Reaper uses the default Thread Manager (while each + // concurrency strategy uses their own). The idea is that the + // reaper will spawn a thread to reap the threads of a concurrency + // strategy. This allows the main thread to reap the threads of + // the reaper before exiting. + +public: + JAWS_Reaper (JAWS_Concurrency_Base *concurrency); + virtual ~JAWS_Reaper (void); + + virtual int open (void * = 0); + virtual int svc (void); + +private: + JAWS_Concurrency_Base *concurrency_; + int waiting_; + ACE_SYNCH_MUTEX lock_; + +}; + +#endif /* JAWS_REAPER_H */ diff --git a/apps/JAWS/PROTOTYPE/JAWS/Server.cpp b/apps/JAWS/PROTOTYPE/JAWS/Server.cpp index c7de595c18c..f1b565fd809 100644 --- a/apps/JAWS/PROTOTYPE/JAWS/Server.cpp +++ b/apps/JAWS/PROTOTYPE/JAWS/Server.cpp @@ -94,7 +94,9 @@ JAWS_Server::open (JAWS_Pipeline_Handler *protocol, ACE_INET_Addr inet_addr (this->port_); JAWS_IO_Synch_Acceptor_Singleton::instance ()->open (inet_addr); - JAWS_IO_Asynch_Acceptor_Singleton::instance ()->open (inet_addr); + ACE_HANDLE socket + = JAWS_IO_Synch_Acceptor_Singleton::instance ()->get_handle (); + JAWS_IO_Asynch_Acceptor_Singleton::instance ()->open (socket); // initialize data block @@ -108,7 +110,14 @@ JAWS_Server::open (JAWS_Pipeline_Handler *protocol, policy->concurrency ()->put (db); - policy->concurrency ()->thr_mgr ()->wait (); + char buf[265]; + do + ACE_OS::fprintf (stderr, "%s", "hello: "); + while (ACE_OS::fgets (buf, sizeof (buf), stdin) != NULL); + + ACE_OS::closesocket (policy->acceptor ()->get_handle ()); + + ACE_Thread_Manager::instance ()->wait (); db->release (); |