diff options
-rw-r--r-- | ChangeLog | 42 | ||||
-rw-r--r-- | ChangeLogs/ChangeLog-02a | 42 | ||||
-rw-r--r-- | ChangeLogs/ChangeLog-03a | 42 | ||||
-rw-r--r-- | ace/MEM_Acceptor.cpp | 34 | ||||
-rw-r--r-- | ace/MEM_Acceptor.h | 7 | ||||
-rw-r--r-- | ace/MEM_Acceptor.i | 12 | ||||
-rw-r--r-- | ace/MEM_Connector.cpp | 42 | ||||
-rw-r--r-- | ace/MEM_Connector.h | 7 | ||||
-rw-r--r-- | ace/MEM_Connector.i | 12 | ||||
-rw-r--r-- | ace/MEM_IO.cpp | 372 | ||||
-rw-r--r-- | ace/MEM_IO.h | 185 | ||||
-rw-r--r-- | ace/MEM_IO.i | 156 | ||||
-rw-r--r-- | ace/MEM_SAP.cpp | 25 | ||||
-rw-r--r-- | ace/MEM_SAP.h | 96 | ||||
-rw-r--r-- | ace/MEM_SAP.i | 98 | ||||
-rw-r--r-- | ace/MEM_Stream.cpp | 2 | ||||
-rw-r--r-- | tests/MEM_Stream_Test.cpp | 166 | ||||
-rw-r--r-- | tests/MEM_Stream_Test.h | 8 |
18 files changed, 1119 insertions, 229 deletions
diff --git a/ChangeLog b/ChangeLog index 039e0c5ed13..1da7975a5d2 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,41 @@ +Mon Apr 02 23:41:34 2001 Nanbor Wang <nanbor@cs.wustl.edu> + + * ace/MEM_SAP.cpp: + * ace/MEM_SAP.h: + * ace/MEM_SAP.i: + Generalized the ACE_MEM_SAP class to allocate memory in the new + memory wrapper class ACE_MEM_SAP_Node. + + * ace/MEM_IO.cpp: + * ace/MEM_IO.h: + * ace/MEM_IO.i: + Separated the different signaling mechanisms into a different + class in ACE_MEM_IO so it can determine the "right" signaling + strategy allowed. Currently, we implement the Reactive strategy + (ACE_Reactive_MEM_IO) which uses sockets for signaling and + multithreaded strategy (ACE_MT_MEM_IO) which uses semaphores for + signaling. + + * ace/MEM_Stream.cpp: + Sending an empty buffer over to wake up the "other" end when we + are closing down. + + * ace/MEM_Acceptor.cpp: + * ace/MEM_Acceptor.h: + * ace/MEM_Acceptor.i: + * ace/MEM_Connector.cpp: + * ace/MEM_Connector.h: + * ace/MEM_Connector.i: Added facility to specify the "preferred" + signaling strategy so the acceptor and connector can negociate + and agree on the best signaling mechanism to use. + + * ace/MEM_Stream_Test.h: + * ace/MEM_Stream_Test.cpp: + Added the test for the new MT signaling MEM_Stream transfer. + This part of the test seems to be failing on system that depends + on SysV semaphores (because we need more semaphore than the + system can provide.) + Mon Apr 2 15:17:13 2001 Chad Elliott <elliott_c@ociweb.com> * ace/config-chorus.h: @@ -15,14 +53,14 @@ Mon Apr 2 09:57:31 2001 Darrell Brunsch <brunsch@uci.edu> * bin/PerlACE/MSProject.pm: - Made a change to the tao_idl depencency checking. + Made a change to the tao_idl depencency checking. It was only checking for tao_idl and $(InputName) where some places we use $(InputPath) instead. * ace/config-win32-msvc-5.h: * ace/config-win32-msvc-6.h: - Disabled the Inheritance by Dominance informational + Disabled the Inheritance by Dominance informational warning that MSVC gives. We have cases of this all over the place, and normally we just disable the warning on a file by file basis, but now we just do a blanket disable. diff --git a/ChangeLogs/ChangeLog-02a b/ChangeLogs/ChangeLog-02a index 039e0c5ed13..1da7975a5d2 100644 --- a/ChangeLogs/ChangeLog-02a +++ b/ChangeLogs/ChangeLog-02a @@ -1,3 +1,41 @@ +Mon Apr 02 23:41:34 2001 Nanbor Wang <nanbor@cs.wustl.edu> + + * ace/MEM_SAP.cpp: + * ace/MEM_SAP.h: + * ace/MEM_SAP.i: + Generalized the ACE_MEM_SAP class to allocate memory in the new + memory wrapper class ACE_MEM_SAP_Node. + + * ace/MEM_IO.cpp: + * ace/MEM_IO.h: + * ace/MEM_IO.i: + Separated the different signaling mechanisms into a different + class in ACE_MEM_IO so it can determine the "right" signaling + strategy allowed. Currently, we implement the Reactive strategy + (ACE_Reactive_MEM_IO) which uses sockets for signaling and + multithreaded strategy (ACE_MT_MEM_IO) which uses semaphores for + signaling. + + * ace/MEM_Stream.cpp: + Sending an empty buffer over to wake up the "other" end when we + are closing down. + + * ace/MEM_Acceptor.cpp: + * ace/MEM_Acceptor.h: + * ace/MEM_Acceptor.i: + * ace/MEM_Connector.cpp: + * ace/MEM_Connector.h: + * ace/MEM_Connector.i: Added facility to specify the "preferred" + signaling strategy so the acceptor and connector can negociate + and agree on the best signaling mechanism to use. + + * ace/MEM_Stream_Test.h: + * ace/MEM_Stream_Test.cpp: + Added the test for the new MT signaling MEM_Stream transfer. + This part of the test seems to be failing on system that depends + on SysV semaphores (because we need more semaphore than the + system can provide.) + Mon Apr 2 15:17:13 2001 Chad Elliott <elliott_c@ociweb.com> * ace/config-chorus.h: @@ -15,14 +53,14 @@ Mon Apr 2 09:57:31 2001 Darrell Brunsch <brunsch@uci.edu> * bin/PerlACE/MSProject.pm: - Made a change to the tao_idl depencency checking. + Made a change to the tao_idl depencency checking. It was only checking for tao_idl and $(InputName) where some places we use $(InputPath) instead. * ace/config-win32-msvc-5.h: * ace/config-win32-msvc-6.h: - Disabled the Inheritance by Dominance informational + Disabled the Inheritance by Dominance informational warning that MSVC gives. We have cases of this all over the place, and normally we just disable the warning on a file by file basis, but now we just do a blanket disable. diff --git a/ChangeLogs/ChangeLog-03a b/ChangeLogs/ChangeLog-03a index 039e0c5ed13..1da7975a5d2 100644 --- a/ChangeLogs/ChangeLog-03a +++ b/ChangeLogs/ChangeLog-03a @@ -1,3 +1,41 @@ +Mon Apr 02 23:41:34 2001 Nanbor Wang <nanbor@cs.wustl.edu> + + * ace/MEM_SAP.cpp: + * ace/MEM_SAP.h: + * ace/MEM_SAP.i: + Generalized the ACE_MEM_SAP class to allocate memory in the new + memory wrapper class ACE_MEM_SAP_Node. + + * ace/MEM_IO.cpp: + * ace/MEM_IO.h: + * ace/MEM_IO.i: + Separated the different signaling mechanisms into a different + class in ACE_MEM_IO so it can determine the "right" signaling + strategy allowed. Currently, we implement the Reactive strategy + (ACE_Reactive_MEM_IO) which uses sockets for signaling and + multithreaded strategy (ACE_MT_MEM_IO) which uses semaphores for + signaling. + + * ace/MEM_Stream.cpp: + Sending an empty buffer over to wake up the "other" end when we + are closing down. + + * ace/MEM_Acceptor.cpp: + * ace/MEM_Acceptor.h: + * ace/MEM_Acceptor.i: + * ace/MEM_Connector.cpp: + * ace/MEM_Connector.h: + * ace/MEM_Connector.i: Added facility to specify the "preferred" + signaling strategy so the acceptor and connector can negociate + and agree on the best signaling mechanism to use. + + * ace/MEM_Stream_Test.h: + * ace/MEM_Stream_Test.cpp: + Added the test for the new MT signaling MEM_Stream transfer. + This part of the test seems to be failing on system that depends + on SysV semaphores (because we need more semaphore than the + system can provide.) + Mon Apr 2 15:17:13 2001 Chad Elliott <elliott_c@ociweb.com> * ace/config-chorus.h: @@ -15,14 +53,14 @@ Mon Apr 2 09:57:31 2001 Darrell Brunsch <brunsch@uci.edu> * bin/PerlACE/MSProject.pm: - Made a change to the tao_idl depencency checking. + Made a change to the tao_idl depencency checking. It was only checking for tao_idl and $(InputName) where some places we use $(InputPath) instead. * ace/config-win32-msvc-5.h: * ace/config-win32-msvc-6.h: - Disabled the Inheritance by Dominance informational + Disabled the Inheritance by Dominance informational warning that MSVC gives. We have cases of this all over the place, and normally we just disable the warning on a file by file basis, but now we just do a blanket disable. diff --git a/ace/MEM_Acceptor.cpp b/ace/MEM_Acceptor.cpp index 428ec6912a5..4d3b084954e 100644 --- a/ace/MEM_Acceptor.cpp +++ b/ace/MEM_Acceptor.cpp @@ -23,7 +23,8 @@ ACE_MEM_Acceptor::dump (void) const ACE_MEM_Acceptor::ACE_MEM_Acceptor (void) : mmap_prefix_ (0), - malloc_options_ (ACE_DEFAULT_BASE_ADDR, 0) + malloc_options_ (ACE_DEFAULT_BASE_ADDR, 0), + preferred_strategy_ (ACE_MEM_IO::Reactive) { ACE_TRACE ("ACE_MEM_Acceptor::ACE_MEM_Acceptor"); } @@ -41,7 +42,8 @@ ACE_MEM_Acceptor::ACE_MEM_Acceptor (const ACE_MEM_Addr &remote_sap, int backlog, int protocol) : mmap_prefix_ (0), - malloc_options_ (ACE_DEFAULT_BASE_ADDR, 0) + malloc_options_ (ACE_DEFAULT_BASE_ADDR, 0), + preferred_strategy_ (ACE_MEM_IO::Reactive) { ACE_TRACE ("ACE_MEM_Acceptor::ACE_MEM_Acceptor"); if (this->open (remote_sap, @@ -80,7 +82,7 @@ ACE_MEM_Acceptor::accept (ACE_MEM_Stream &new_stream, int *len_ptr = 0; sockaddr *addr = 0; - int in_blocking_mode = 0; + int in_blocking_mode = 1; if (this->shared_accept_start (timeout, restart, in_blocking_mode) == -1) @@ -149,13 +151,35 @@ ACE_MEM_Acceptor::accept (ACE_MEM_Stream &new_stream, // Make sure we have a fresh start. ACE_OS::unlink (buf); + new_stream.disable (ACE_NONBLOCK); + ACE_HANDLE new_handle = new_stream.get_handle (); + + // Protocol negociation: + // Tell the client side what level of signaling strategy + // we support. + ACE_INT16 client_signaling = this->preferred_strategy_; + if (ACE::send (new_handle, &client_signaling, + sizeof (ACE_INT16)) == -1) + ACE_ERROR_RETURN ((LM_DEBUG, + ACE_LIB_TEXT ("ACE_MEM_Acceptor::accept error sending strategy\n")), + -1); + + // Now we get the signaling strategy the client support. + if (ACE::recv (new_handle, &client_signaling, + sizeof (ACE_INT16)) == -1) + ACE_ERROR_RETURN ((LM_DEBUG, + ACE_LIB_TEXT ("ACE_MEM_Acceptor::%p error receiving strategy\n"), "accept"), + -1); + + // Client will decide what signaling strategy to use. + // Now set up the shared memory malloc pool. - if (new_stream.create_shm_malloc (buf, &this->malloc_options_) == -1) + if (new_stream.init (buf, ACE_static_cast (ACE_MEM_IO::Signal_Strategy, client_signaling), + &this->malloc_options_) == -1) return -1; // @@ Need to handle timeout here. ACE_UINT16 buf_len = (ACE_OS::strlen (buf) + 1) * sizeof (ACE_TCHAR); - ACE_HANDLE new_handle = new_stream.get_handle (); // No need to worry about byte-order because both parties should always // be on the same machine. diff --git a/ace/MEM_Acceptor.h b/ace/MEM_Acceptor.h index b8a719d7de8..a1e6ffddcfe 100644 --- a/ace/MEM_Acceptor.h +++ b/ace/MEM_Acceptor.h @@ -97,6 +97,10 @@ public: const ACE_TCHAR *mmap_prefix (void) const; void mmap_prefix (const ACE_TCHAR *prefix); + // Set/get the preferred signaling strategy. + ACE_MEM_IO::Signal_Strategy preferred_strategy (void) const; + void preferred_strategy (ACE_MEM_IO::Signal_Strategy strategy); + /// Return the local endpoint address in the referenced <ACE_Addr>. /// Returns 0 if successful, else -1. int get_local_addr (ACE_MEM_Addr &) const; @@ -153,6 +157,9 @@ private: /// A cached MALLOC_OPTIONS. MEM_Accaptor use it to create the shared /// mamory malloc upon every incoming connection. ACE_MEM_SAP::MALLOC_OPTIONS malloc_options_; + + // Preferred signaling strategy. + ACE_MEM_IO::Signal_Strategy preferred_strategy_; }; #if !defined (ACE_LACKS_INLINE_FUNCTIONS) diff --git a/ace/MEM_Acceptor.i b/ace/MEM_Acceptor.i index e3c681eb7bf..22ad9f53b1e 100644 --- a/ace/MEM_Acceptor.i +++ b/ace/MEM_Acceptor.i @@ -63,6 +63,18 @@ ACE_MEM_Acceptor::mmap_prefix (const ACE_TCHAR *prefix) this->mmap_prefix_ = ACE::strnew (prefix); } +ASYS_INLINE ACE_MEM_IO::Signal_Strategy +ACE_MEM_Acceptor::preferred_strategy (void) const +{ + return this->preferred_strategy_; +} + +ASYS_INLINE void +ACE_MEM_Acceptor::preferred_strategy (ACE_MEM_IO::Signal_Strategy strategy) +{ + this->preferred_strategy_ = strategy; +} + ASYS_INLINE ACE_MEM_SAP::MALLOC_OPTIONS & ACE_MEM_Acceptor::malloc_options (void) { diff --git a/ace/MEM_Connector.cpp b/ace/MEM_Connector.cpp index 3fabb12d5ae..b87302bfe2e 100644 --- a/ace/MEM_Connector.cpp +++ b/ace/MEM_Connector.cpp @@ -24,7 +24,8 @@ ACE_MEM_Connector::dump (void) const } ACE_MEM_Connector::ACE_MEM_Connector (void) - : malloc_options_ (ACE_DEFAULT_BASE_ADDR, 0) + : malloc_options_ (ACE_DEFAULT_BASE_ADDR, 0), + preferred_strategy_ (ACE_MEM_IO::Reactive) { ACE_TRACE ("ACE_MEM_Connector::ACE_MEM_Connector"); } @@ -38,7 +39,8 @@ ACE_MEM_Connector::ACE_MEM_Connector (ACE_MEM_Stream &new_stream, int flags, int perms, int protocol) - : malloc_options_ (ACE_DEFAULT_BASE_ADDR, 0) + : malloc_options_ (ACE_DEFAULT_BASE_ADDR, 0), + preferred_strategy_ (ACE_MEM_IO::Reactive) { ACE_TRACE ("ACE_MEM_Connector::ACE_MEM_Connector"); // This is necessary due to the weird inheritance relationships of @@ -83,9 +85,13 @@ ACE_MEM_Connector::connect (ACE_MEM_Stream &new_stream, timeout, local_sap, reuse_addr, flags, perms, PF_INET, protocol) == -1) - return -1; + ACE_ERROR_RETURN ((LM_DEBUG, + ACE_LIB_TEXT ("ACE_MEM_Connector::connect error connecting to socket\n")), + -1); + ACE_HANDLE new_handle = temp_stream.get_handle (); + new_stream.disable (ACE_NONBLOCK); new_stream.set_handle (new_handle); // Do not close the handle. @@ -93,15 +99,39 @@ ACE_MEM_Connector::connect (ACE_MEM_Stream &new_stream, ACE_TCHAR buf[MAXPATHLEN]; // @@ Need to handle timeout here. + ACE_INT16 server_strategy = ACE_MEM_IO::Reactive; + // Receive the signaling strategy theserver support. + if (ACE::recv (new_handle, &server_strategy, + sizeof (ACE_INT16)) == -1) + ACE_ERROR_RETURN ((LM_DEBUG, + ACE_LIB_TEXT ("ACE_MEM_Connector::connect error receiving strategy\n")), + -1); + + // If either side don't support MT, we will not use it. + if (! (this->preferred_strategy_ == ACE_MEM_IO::MT && + server_strategy == ACE_MEM_IO::MT)) + server_strategy = ACE_MEM_IO::Reactive; + + if (ACE::send (new_handle, &server_strategy, + sizeof (ACE_INT16)) == -1) + ACE_ERROR_RETURN ((LM_DEBUG, + ACE_LIB_TEXT ("ACE_MEM_Connector::connect error sending strategy\n")), + -1); + ACE_INT16 buf_len; // Byte-order is not a problem for this read. if (ACE::recv (new_handle, &buf_len, sizeof (buf_len)) == -1) - return -1; + ACE_ERROR_RETURN ((LM_DEBUG, + ACE_LIB_TEXT ("ACE_MEM_Connector::connect error receiving shm filename length\n")), + -1); if (ACE::recv (new_handle, buf, buf_len) == -1) - return -1; + ACE_ERROR_RETURN ((LM_DEBUG, + ACE_LIB_TEXT ("ACE_MEM_Connector::connect error receiving shm filename.\n")), + -1); - if (new_stream.create_shm_malloc (buf, &this->malloc_options_) == -1) + if (new_stream.init (buf, ACE_static_cast (ACE_MEM_IO::Signal_Strategy, server_strategy), + &this->malloc_options_) == -1) return -1; return 0; diff --git a/ace/MEM_Connector.h b/ace/MEM_Connector.h index bf6ea813bec..900619bd539 100644 --- a/ace/MEM_Connector.h +++ b/ace/MEM_Connector.h @@ -86,6 +86,10 @@ public: int perms = 0, int protocol = 0); + // Set/get the preferred signaling strategy. + ACE_MEM_IO::Signal_Strategy preferred_strategy (void) const; + void preferred_strategy (ACE_MEM_IO::Signal_Strategy strategy); + /// Accessor to underlying malloc options. ACE_MEM_SAP::MALLOC_OPTIONS &malloc_options (void); @@ -106,6 +110,9 @@ private: /// A cached MALLOC_OPTIONS that the MEM_Connector used to initialize /// the shared memory malloc update connection establishment. ACE_MEM_SAP::MALLOC_OPTIONS malloc_options_; + + // Preferred signaling strategy. + ACE_MEM_IO::Signal_Strategy preferred_strategy_; }; #if !defined (ACE_LACKS_INLINE_FUNCTIONS) diff --git a/ace/MEM_Connector.i b/ace/MEM_Connector.i index 3fdf59e19e0..09e311d76a2 100644 --- a/ace/MEM_Connector.i +++ b/ace/MEM_Connector.i @@ -5,6 +5,18 @@ // Establish a connection. +ASYS_INLINE ACE_MEM_IO::Signal_Strategy +ACE_MEM_Connector::preferred_strategy (void) const +{ + return this->preferred_strategy_; +} + +ASYS_INLINE void +ACE_MEM_Connector::preferred_strategy (ACE_MEM_IO::Signal_Strategy strategy) +{ + this->preferred_strategy_ = strategy; +} + ASYS_INLINE ACE_MEM_SAP::MALLOC_OPTIONS & ACE_MEM_Connector::malloc_options (void) { diff --git a/ace/MEM_IO.cpp b/ace/MEM_IO.cpp index 2a9672c0f70..51ded63e4b8 100644 --- a/ace/MEM_IO.cpp +++ b/ace/MEM_IO.cpp @@ -14,12 +14,357 @@ ACE_RCSID(ace, MEM_IO, "$Id$") ACE_ALLOC_HOOK_DEFINE(ACE_MEM_IO) +ACE_Reactive_MEM_IO::~ACE_Reactive_MEM_IO () +{ +} + +int +ACE_Reactive_MEM_IO::init (ACE_HANDLE handle, + const ACE_TCHAR *name, + MALLOC_OPTIONS *options) +{ + ACE_TRACE ("ACE_Reactive_MEM_IO::init"); + this->handle_ = handle; + return this->create_shm_malloc (name, + options); +} + +int +ACE_Reactive_MEM_IO::fini (int remove) +{ + ACE_TRACE ("ACE_Reactive_MEM_IO::init"); + + return this->close_shm_malloc (remove); +} + +int +ACE_Reactive_MEM_IO::recv_buf (ACE_MEM_SAP_Node *&buf, + int flags, + const ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Reactive_MEM_IO::recv_buf"); + + if (this->shm_malloc_ == 0) + return -1; + + off_t new_offset = 0; + int retv = ACE::recv (this->handle_, + (char *) &new_offset, + sizeof (off_t), + flags, + timeout); + + if (retv == 0) + return 0; + else if (retv != sizeof (off_t)) + { + // Nothing available or we are really screwed. + buf = 0; + return -1; + } + else + return this->get_buf_len (new_offset, buf); + + ACE_NOTREACHED (return 0;) +} + +int +ACE_Reactive_MEM_IO::send_buf (ACE_MEM_SAP_Node *buf, + int flags, + const ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Reactive_MEM_IO::send_buf"); + + if (this->shm_malloc_ == 0) + return -1; + + off_t offset = ACE_reinterpret_cast (char *, buf) - + ACE_static_cast (char *, this->shm_malloc_->base_addr ()); + // the offset. + // Send the offset value over the socket. + if (ACE::send (this->handle_, + (const char *) &offset, + sizeof (offset), + flags, + timeout) != sizeof (offset)) + { + // unsucessful send, release the memory in the shared-memory. + this->release_buffer (buf); + + return -1; + } + return buf->size (); +} + +int +ACE_MT_MEM_IO::Simple_Queue::write (ACE_MEM_SAP_Node *new_node) +{ + if (this->mq_ == 0) + return -1; + + // Here, we assume we already have acquired the lock necessary. + // And we are allowed to write. + if (this->mq_->tail_ == (void *) 0) // nothing in the queue. + { + this->mq_->head_ = new_node; + this->mq_->tail_ = new_node; + new_node->next_ = 0; + } + else + { + this->mq_->tail_->next_ = new_node; + new_node->next_ = 0; + this->mq_->tail_ = new_node; + } + return 0; +} + +ACE_MEM_SAP_Node * +ACE_MT_MEM_IO::Simple_Queue::read () +{ + if (this->mq_ == 0) + return 0; + + ACE_MEM_SAP_Node *retv = 0; + + ACE_SEH_TRY + { + retv = this->mq_->head_; + // Here, we assume we already have acquired the lock necessary + // and there are soemthing in the queue. + if (this->mq_->head_ == this->mq_->tail_) + { + // Last message in the queue. + this->mq_->head_ = 0; + this->mq_->tail_ = 0; + } + else + this->mq_->head_ = retv->next_; + } + ACE_SEH_EXCEPT (this->malloc_->memory_pool ().seh_selector (GetExceptionInformation ())) + { + } + + return retv; +} + +ACE_MT_MEM_IO::~ACE_MT_MEM_IO () +{ + delete this->recv_channel_.sema_; + delete this->recv_channel_.lock_; + delete this->send_channel_.sema_; + delete this->send_channel_.lock_; +} + +int +ACE_MT_MEM_IO::init (ACE_HANDLE handle, + const ACE_TCHAR *name, + MALLOC_OPTIONS *options) +{ + ACE_TRACE ("ACE_MT_MEM_IO::init"); + ACE_UNUSED_ARG (handle); + + // @@ Give me a rule on naming and how the queue should + // be kept in the shared memory and we are done + // with this. + if (this->create_shm_malloc (name, options) == -1) + return -1; + + ACE_TCHAR server_sema [MAXPATHLEN]; + ACE_TCHAR client_sema [MAXPATHLEN]; + ACE_TCHAR server_lock [MAXPATHLEN]; + ACE_TCHAR client_lock [MAXPATHLEN]; + const ACE_TCHAR *basename = ACE::basename (name); + // size_t baselen = ACE_OS::strlen (basename); + + // Building names. @@ Check buffer overflow? + ACE_OS::strcpy (server_sema, basename); + ACE_OS::strcat (server_sema, ACE_TEXT ("_sema_to_server")); + ACE_OS::strcpy (client_sema, basename); + ACE_OS::strcat (client_sema, ACE_TEXT ("_sema_to_client")); + ACE_OS::strcpy (server_lock, basename); + ACE_OS::strcat (server_lock, ACE_TEXT ("_lock_to_server")); + ACE_OS::strcpy (client_lock, basename); + ACE_OS::strcat (client_lock, ACE_TEXT ("_lock_to_client")); + + void *to_server_ptr = 0; + // @@ Here, we assume the shared memory fill will never be resued. + // So we can determine whether we are server or client by examining + // if the simple message queues have already been set up in + // the Malloc object or not. + if (this->shm_malloc_->find ("to_server", to_server_ptr) == -1) + { + void *ptr = 0; + // We are server. + ACE_ALLOCATOR_RETURN (ptr, + this->shm_malloc_->malloc (2 * sizeof (MQ_Struct)), + -1); + + MQ_Struct *mymq = ACE_reinterpret_cast (MQ_Struct *, ptr); + mymq->tail_ = 0; + mymq->head_ = 0; + (mymq + 1)->tail_ = 0; + (mymq + 1)->head_ = 0; + if (this->shm_malloc_->bind ("to_server", mymq) == -1) + return -1; + + if (this->shm_malloc_->bind ("to_client", mymq + 1) == -1) + return -1; + + this->recv_channel_.queue_.init (mymq, this->shm_malloc_); + ACE_NEW_RETURN (this->recv_channel_.sema_, + ACE_SYNCH_PROCESS_SEMAPHORE (0, server_sema), + -1); + ACE_NEW_RETURN (this->recv_channel_.lock_, + ACE_SYNCH_PROCESS_MUTEX (server_lock), + -1); + + this->send_channel_.queue_.init (mymq + 1, this->shm_malloc_); + ACE_NEW_RETURN (this->send_channel_.sema_, + ACE_SYNCH_PROCESS_SEMAPHORE (0, client_sema), + -1); + ACE_NEW_RETURN (this->send_channel_.lock_, + ACE_SYNCH_PROCESS_MUTEX (client_lock), + -1); + } + else + { + // we are client. + MQ_Struct *mymq = ACE_reinterpret_cast (MQ_Struct *, to_server_ptr); + this->recv_channel_.queue_.init (mymq +1, this->shm_malloc_); + ACE_NEW_RETURN (this->recv_channel_.sema_, + ACE_SYNCH_PROCESS_SEMAPHORE (0, client_sema), + -1); + ACE_NEW_RETURN (this->recv_channel_.lock_, + ACE_SYNCH_PROCESS_MUTEX (client_lock), + -1); + + this->send_channel_.queue_.init (mymq, this->shm_malloc_); + ACE_NEW_RETURN (this->send_channel_.sema_, + ACE_SYNCH_PROCESS_SEMAPHORE (0, server_sema), + -1); + ACE_NEW_RETURN (this->send_channel_.lock_, + ACE_SYNCH_PROCESS_MUTEX (server_lock), + -1); + } + return 0; +} + +int +ACE_MT_MEM_IO::fini (int remove) +{ + ACE_TRACE ("ACE_MT_MEM_IO::init"); + + return this->close_shm_malloc (remove); +} + +int +ACE_MT_MEM_IO::recv_buf (ACE_MEM_SAP_Node *&buf, + int flags, + const ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_MT_MEM_IO::recv_buf"); + + // @@ Don't know how to handle timeout yet. + ACE_UNUSED_ARG (timeout); + ACE_UNUSED_ARG (flags); + + if (this->shm_malloc_ == 0) + return -1; + + // Need to handle timeout here. + if (this->recv_channel_.sema_->acquire () == -1) + return -1; + + { + // @@ We can probably skip the lock in certain circumstance. + ACE_GUARD_RETURN (ACE_SYNCH_PROCESS_MUTEX, ace_mon, *this->recv_channel_.lock_, -1); + + buf = this->recv_channel_.queue_.read (); + if (buf != 0) + return buf->size (); + return -1; + } + + ACE_NOTREACHED (return 0;) +} + +int +ACE_MT_MEM_IO::send_buf (ACE_MEM_SAP_Node *buf, + int flags, + const ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_MT_MEM_IO::send_buf"); + + // @@ Don't know how to handle timeout yet. + ACE_UNUSED_ARG (timeout); + ACE_UNUSED_ARG (flags); + + if (this->shm_malloc_ == 0) + return -1; + + { + // @@ We can probably skip the lock in certain curcumstances. + ACE_GUARD_RETURN (ACE_SYNCH_PROCESS_MUTEX, ace_mon, *this->send_channel_.lock_, -1); + + if (this->send_channel_.queue_.write (buf) == -1) + { + this->release_buffer (buf); + return -1; + } + } + + if (this->send_channel_.sema_->release () == -1) + return -1; + + return buf->size (); +} + void ACE_MEM_IO::dump (void) const { ACE_TRACE ("ACE_MEM_IO::dump"); } +int +ACE_MEM_IO::init (const ACE_TCHAR *name, + ACE_MEM_IO::Signal_Strategy type, + ACE_MEM_SAP::MALLOC_OPTIONS *options) +{ + ACE_UNUSED_ARG (type); + + delete this->deliver_strategy_; + this->deliver_strategy_ = 0; + switch (type) + { + case ACE_MEM_IO::Reactive: + ACE_NEW_RETURN (this->deliver_strategy_, + ACE_Reactive_MEM_IO (), + -1); + break; + case ACE_MEM_IO::MT: + ACE_NEW_RETURN (this->deliver_strategy_, + ACE_MT_MEM_IO (), + -1); + break; + default: + return -1; + } + + return this->deliver_strategy_->init (this->get_handle (), + name, + options); +} + +int +ACE_MEM_IO::fini (int remove) +{ + if (this->deliver_strategy_ != 0) + return this->deliver_strategy_->fini (remove); + else + return -1; +} + // Allows a client to read from a socket without having to provide // a buffer to read. This method determines how much data is in the // socket, allocates a buffer of this size, reads in the data, and @@ -31,15 +376,20 @@ ACE_MEM_IO::send (const ACE_Message_Block *message_block, { ACE_TRACE ("ACE_MEM_IO::send"); + if (this->deliver_strategy_ == 0) + return -1; // Something went seriously wrong. + ssize_t len = message_block->total_length (); if (len != 0) { - char *buf = ACE_static_cast (char *, this->acquire_buffer (len)); + ACE_MEM_SAP_Node *buf = + ACE_reinterpret_cast (ACE_MEM_SAP_Node *, + this->deliver_strategy_->acquire_buffer (len)); ssize_t n = 0; while (message_block != 0) { - ACE_OS::memcpy (buf + n, + ACE_OS::memcpy (ACE_static_cast (char *, buf->data ()) + n, message_block->rd_ptr (), message_block->length ()); n += message_block->length (); @@ -50,19 +400,11 @@ ACE_MEM_IO::send (const ACE_Message_Block *message_block, message_block = message_block->next (); } - off_t offset = this->set_buf_len (buf, len); - if (ACE::send (this->get_handle (), - (const char *) &offset, - sizeof (offset), - 0, - timeout) != sizeof (offset)) - { - // unsucessful send, release the memory in the shared-memory. - this->release_buffer (buf); + buf->size_ = len; - return -1; - } - return len; + return this->deliver_strategy_->send_buf (buf, + 0, + timeout); } return 0; } @@ -71,7 +413,7 @@ ACE_MEM_IO::send (const ACE_Message_Block *message_block, #if 0 ssize_t ACE_MEM_IO::recvv (iovec *io_vec, - const ACE_Time_Value *timeout) + const ACE_Time_Value *timeout) { ACE_TRACE ("ACE_MEM_IO::recvv"); #if defined (FIONREAD) diff --git a/ace/MEM_IO.h b/ace/MEM_IO.h index 6b52edcaa54..cfb4a0de4eb 100644 --- a/ace/MEM_IO.h +++ b/ace/MEM_IO.h @@ -19,6 +19,8 @@ #include "ace/MEM_SAP.h" #include "ace/Memory_Pool.h" #include "ace/Message_Block.h" +#include "ace/Process_Semaphore.h" +#include "ace/Process_Mutex.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once @@ -26,6 +28,120 @@ #if (ACE_HAS_POSITION_INDEPENDENT_POINTERS == 1) +class ACE_Export ACE_Reactive_MEM_IO : public ACE_MEM_SAP +{ +public: + ACE_Reactive_MEM_IO (void); + + virtual ~ACE_Reactive_MEM_IO (void); + + /** + * Initialize the MEM_SAP object. + */ + virtual int init (ACE_HANDLE handle, + const ACE_TCHAR *name, + MALLOC_OPTIONS *options); + + /** + * Finalizing the MEM_SAP object. This method doesn't invoke + * the <remove> method. + */ + virtual int fini (int remove); + /** + * Fetch location of next available data into <recv_buffer_>. + * As this operation read the address of the data off the socket + * using ACE::recv, <timeout> only applies to ACE::recv. + */ + virtual int recv_buf (ACE_MEM_SAP_Node *&buf, + int flags, + const ACE_Time_Value *timeout); + + /** + * Wait to to <timeout> amount of time to send <buf>. If <send> + * times out a -1 is returned with <errno == ETIME>. If it succeeds + * the number of bytes sent is returned. */ + virtual int send_buf (ACE_MEM_SAP_Node *buf, + int flags, + const ACE_Time_Value *timeout); + + /** + * Convert the buffer offset <off> to absolute address to <buf>. + * Return the size of valid information containing in the <buf>, + * -1 if <shm_malloc_> is not initialized. + */ + ssize_t get_buf_len (const off_t off, ACE_MEM_SAP_Node *&buf); +}; + +class ACE_Export ACE_MT_MEM_IO : public ACE_MEM_SAP +{ +public: + typedef struct + { + ACE_MEM_SAP_Node::ACE_MEM_SAP_NODE_PTR head_; + ACE_MEM_SAP_Node::ACE_MEM_SAP_NODE_PTR tail_; + } MQ_Struct; // Structure for a simple queue + + class Simple_Queue + { + public: + Simple_Queue (void); + Simple_Queue (MQ_Struct *mq); + + int init (MQ_Struct *mq, ACE_MEM_SAP::MALLOC_TYPE *malloc); + + int write (ACE_MEM_SAP_Node *new_msg); + + ACE_MEM_SAP_Node *read (void); + private: + MQ_Struct *mq_; + ACE_MEM_SAP::MALLOC_TYPE *malloc_; + }; + + typedef struct + { + ACE_SYNCH_PROCESS_SEMAPHORE *sema_; + ACE_SYNCH_PROCESS_MUTEX *lock_; + Simple_Queue queue_; + } Channel; + + ACE_MT_MEM_IO (void); + + virtual ~ACE_MT_MEM_IO (void); + + /** + * Initialize the MEM_SAP object. + */ + virtual int init (ACE_HANDLE handle, + const ACE_TCHAR *name, + MALLOC_OPTIONS *options); + + /** + * Finalizing the MEM_SAP object. This method doesn't invoke + * the <remove> method. + */ + virtual int fini (int remove); + /** + * Fetch location of next available data into <recv_buffer_>. + * As this operation read the address of the data off the socket + * using ACE::recv, <timeout> only applies to ACE::recv. + */ + virtual int recv_buf (ACE_MEM_SAP_Node *&buf, + int flags, + const ACE_Time_Value *timeout); + + /** + * Wait to to <timeout> amount of time to send <buf>. If <send> + * times out a -1 is returned with <errno == ETIME>. If it succeeds + * the number of bytes sent is returned. */ + virtual int send_buf (ACE_MEM_SAP_Node *buf, + int flags, + const ACE_Time_Value *timeout); + +private: + Channel recv_channel_; + Channel send_channel_; +}; + /** * @class ACE_MEM_IO * @@ -52,7 +168,7 @@ * the other end. The receiving side then reverses the * procedures and copies the information into user buffer. */ -class ACE_Export ACE_MEM_IO : public ACE_SOCK, public ACE_MEM_SAP +class ACE_Export ACE_MEM_IO : public ACE_SOCK { public: // = Initialization and termination methods. @@ -62,6 +178,25 @@ public: /// Destructor. ~ACE_MEM_IO (void); + typedef enum + { + Reactive, + MT + } Signal_Strategy; + + /** + * Initialize the MEM_SAP object. + */ + int init (const ACE_TCHAR *name, + ACE_MEM_IO::Signal_Strategy type = ACE_MEM_IO::Reactive, + ACE_MEM_SAP::MALLOC_OPTIONS *options = 0); + + /** + * Finalizing the MEM_IO object. This method doesn't invoke + * the <remove> method. + */ + int fini (int remove = 0); + /// Send an <n> byte buffer to the other process using shm_malloc_ /// connected thru the socket. ssize_t send (const void *buf, @@ -90,18 +225,6 @@ public: */ ssize_t send (const void *buf, size_t n, - int flags, - const ACE_Time_Value *timeout); - - /** - * Wait up to <timeout> amount of time to receive up to <n> bytes - * into <buf> from <handle> (uses the <recv> call). If <recv> times - * out a -1 is returned with <errno == ETIME>. If it succeeds the - * number of bytes received is returned. - */ - ssize_t recv (void *buf, - size_t n, - int flags, const ACE_Time_Value *timeout); /** @@ -112,6 +235,15 @@ public: */ ssize_t send (const void *buf, size_t n, + int flags, + const ACE_Time_Value *timeout); + + /** + * Wait to to <timeout> amount of time to send the <message_block>. + * If <send> times out a -1 is returned with <errno == ETIME>. If + * it succeeds the number of bytes sent is returned. + */ + ssize_t send (const ACE_Message_Block *message_block, const ACE_Time_Value *timeout); /** @@ -125,13 +257,17 @@ public: const ACE_Time_Value *timeout); /** - * Wait to to <timeout> amount of time to send the <message_block>. - * If <send> times out a -1 is returned with <errno == ETIME>. If - * it succeeds the number of bytes sent is returned. + * Wait up to <timeout> amount of time to receive up to <n> bytes + * into <buf> from <handle> (uses the <recv> call). If <recv> times + * out a -1 is returned with <errno == ETIME>. If it succeeds the + * number of bytes received is returned. */ - ssize_t send (const ACE_Message_Block *message_block, + ssize_t recv (void *buf, + size_t n, + int flags, const ACE_Time_Value *timeout); + /// Dump the state of an object. void dump (void) const; @@ -147,17 +283,14 @@ public: int get_remote_port (u_short &) const; */ -protected: - /** - * Fetch location of next available data into <recv_buffer_>. - * As this operation read the address of the data off the socket - * using ACE::recv, <timeout> only applies to ACE::recv. - */ - ssize_t fetch_recv_buf (int flags, const ACE_Time_Value *timeout = 0); - private: + ssize_t fetch_recv_buf (int flag, const ACE_Time_Value *timeout); + + /// Actual deliverying mechanism. + ACE_MEM_SAP *deliver_strategy_; + /// Internal pointer for support recv/send. - void *recv_buffer_; + ACE_MEM_SAP_Node *recv_buffer_; /// Record the current total buffer size of <recv_buffer_>. ssize_t buf_size_; diff --git a/ace/MEM_IO.i b/ace/MEM_IO.i index 89fb83e23da..064dc94534b 100644 --- a/ace/MEM_IO.i +++ b/ace/MEM_IO.i @@ -3,10 +3,78 @@ // MEM_IO.i +ASYS_INLINE +ACE_Reactive_MEM_IO::ACE_Reactive_MEM_IO () +{ +} + +ASYS_INLINE +ACE_MT_MEM_IO::ACE_MT_MEM_IO () +{ + this->recv_channel_.sema_ = 0; + this->recv_channel_.lock_ = 0; + this->send_channel_.sema_ = 0; + this->send_channel_.lock_ = 0; +} + +ASYS_INLINE +ACE_MT_MEM_IO::Simple_Queue::Simple_Queue (void) + : mq_ (0), + malloc_ (0) +{ +} + +ASYS_INLINE +ACE_MT_MEM_IO::Simple_Queue::Simple_Queue (MQ_Struct *mq) + : mq_ (mq), + malloc_ (0) +{ +} + +ASYS_INLINE int +ACE_MT_MEM_IO::Simple_Queue::init (MQ_Struct *mq, + ACE_MEM_SAP::MALLOC_TYPE *malloc) +{ + if (this->mq_ != 0) + return -1; + + this->mq_ = mq; + this->malloc_ = malloc; + return 0; +} + +ASYS_INLINE ssize_t +ACE_Reactive_MEM_IO::get_buf_len (const off_t off, ACE_MEM_SAP_Node *&buf) +{ +#if !defined (ACE_HAS_WIN32_STRUCTURAL_EXCEPTIONS) + ACE_TRACE ("ACE_Reactive_MEM_IO::get_buf_len"); +#endif /* ACE_HAS_WIN32_STRUCTURAL_EXCEPTIONS */ + + if (this->shm_malloc_ == 0) + return -1; + + ssize_t retv = 0; + + ACE_SEH_TRY + { + buf = ACE_reinterpret_cast (ACE_MEM_SAP_Node *, + (ACE_static_cast(char *, + this->shm_malloc_->base_addr ()) + + off)); + retv = buf->size (); + } + ACE_SEH_EXCEPT (this->shm_malloc_->memory_pool ().seh_selector (GetExceptionInformation ())) + { + } + + return retv; +} + // Send an n byte message to the connected socket. ASYS_INLINE ACE_MEM_IO::ACE_MEM_IO (void) - : recv_buffer_ (0), + : deliver_strategy_ (0), + recv_buffer_ (0), buf_size_ (0), cur_offset_ (0) { @@ -18,76 +86,64 @@ ACE_MEM_IO::fetch_recv_buf (int flag, const ACE_Time_Value *timeout) { ACE_TRACE ("ACE_MEM_IO::fetch_recv_buf"); + if (this->deliver_strategy_ == 0) + return -1; + // This method can only be called when <buf_size_> == <cur_offset_>. ACE_ASSERT (this->buf_size_ == this->cur_offset_); // We have done using the previous buffer, return it to malloc. if (this->recv_buffer_ != 0) - this->release_buffer (this->recv_buffer_); + this->deliver_strategy_->release_buffer (this->recv_buffer_); this->cur_offset_ = 0; - off_t new_offset = 0; - int retv = ACE::recv (this->get_handle (), - (char *) &new_offset, - sizeof (off_t), - flag, - timeout); - - if (retv == 0) - return 0; - else if (retv != sizeof (off_t)) - { - // Nothing available or we are really screwed. - this->buf_size_ = 0; - this->recv_buffer_ = 0; - return -1; - } + int retv = 0; + + if ((retv = this->deliver_strategy_->recv_buf (this->recv_buffer_, + flag, + timeout)) > 0) + this->buf_size_ = retv; else - this->buf_size_ = this->get_buf_len (new_offset, - this->recv_buffer_); - return this->buf_size_; + this->buf_size_ = 0; + + return retv; } ASYS_INLINE ACE_MEM_IO::~ACE_MEM_IO (void) { - // ACE_TRACE ("ACE_MEM_IO::~ACE_MEM_IO"); + delete this->deliver_strategy_; } ASYS_INLINE ssize_t ACE_MEM_IO::send (const void *buf, - size_t len, - int flags, - const ACE_Time_Value *timeout) + size_t len, + int flags, + const ACE_Time_Value *timeout) { ACE_TRACE ("ACE_MEM_IO::send"); - void *sbuf = this->acquire_buffer (len); + if (this->deliver_strategy_ == 0) + return 0; + + ACE_MEM_SAP_Node *sbuf = this->deliver_strategy_->acquire_buffer (len); if (sbuf == 0) return -1; // Memory buffer not initialized. - ACE_OS::memcpy (sbuf, buf, len); - off_t offset = this->set_buf_len (sbuf, len); // <set_buf_len> also calculate - // the offset. - - // Send the offset value over the socket. - if (ACE::send (this->get_handle (), - (const char *) &offset, - sizeof (offset), - flags, - timeout) != sizeof (offset)) - { - // unsucessful send, release the memory in the shared-memory. - this->release_buffer (sbuf); + ACE_OS::memcpy (sbuf->data (), buf, len); - return -1; - } - return len; + /// + + sbuf->size_ = len; + + return this->deliver_strategy_->send_buf (sbuf, + flags, + timeout); } ASYS_INLINE ssize_t ACE_MEM_IO::recv (void *buf, - size_t len, - int flags, - const ACE_Time_Value *timeout) + size_t len, + int flags, + const ACE_Time_Value *timeout) { ACE_TRACE ("ACE_MEM_IO::recv"); @@ -108,7 +164,7 @@ ACE_MEM_IO::recv (void *buf, size_t length = (len > buf_len ? buf_len : len); ACE_OS::memcpy ((char *) buf + count, - (char *) this->recv_buffer_ + this->cur_offset_, + (char *) this->recv_buffer_->data () + this->cur_offset_, length); this->cur_offset_ += length; // len -= length; @@ -155,8 +211,8 @@ ACE_MEM_IO::recv (void *buf, size_t n) ASYS_INLINE ssize_t ACE_MEM_IO::recv (void *buf, - size_t len, - const ACE_Time_Value *timeout) + size_t len, + const ACE_Time_Value *timeout) { ACE_TRACE ("ACE_MEM_IO::recv"); return this->recv (buf, len, 0, timeout); @@ -164,8 +220,8 @@ ACE_MEM_IO::recv (void *buf, ASYS_INLINE ssize_t ACE_MEM_IO::send (const void *buf, - size_t len, - const ACE_Time_Value *timeout) + size_t len, + const ACE_Time_Value *timeout) { ACE_TRACE ("ACE_MEM_IO::send"); return this->send (buf, len, 0, timeout); diff --git a/ace/MEM_SAP.cpp b/ace/MEM_SAP.cpp index 88725dcad8b..ded84c604e6 100644 --- a/ace/MEM_SAP.cpp +++ b/ace/MEM_SAP.cpp @@ -26,7 +26,8 @@ ACE_MEM_SAP::dump (void) const } ACE_MEM_SAP::ACE_MEM_SAP (void) - : shm_malloc_ (0) + : handle_ (ACE_INVALID_HANDLE), + shm_malloc_ (0) { // ACE_TRACE ("ACE_MEM_SAP::ACE_MEM_SAP"); } @@ -52,32 +53,26 @@ ACE_MEM_SAP::create_shm_malloc (const ACE_TCHAR *name, int ACE_MEM_SAP::close_shm_malloc (const int remove) { + ACE_TRACE ("ACE_MEM_SAP::close_shm_malloc"); + if (this->shm_malloc_ != 0 && remove != 0) - { - this->shm_malloc_->remove (); - return 0; - } + return this->shm_malloc_->remove (); + return -1; } #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) template class ACE_Write_Guard<ACE_Process_Mutex>; template class ACE_Read_Guard<ACE_Process_Mutex>; -#if (ACE_HAS_POSITION_INDEPENDENT_POINTERS == 1) template class ACE_Malloc_T<ACE_MMAP_MEMORY_POOL, ACE_Process_Mutex, ACE_PI_Control_Block>; -#else -template class ACE_Malloc<ACE_MMAP_MEMORY_POOL, ACE_Process_Mutex>; -template class ACE_Malloc_T<ACE_MMAP_MEMORY_POOL, ACE_Process_Mutex, ACE_Control_Block>; -#endif /* ACE_HAS_POSITION_INDEPENDENT_POINTERS == 1 */ +template class ACE_Based_Pointer<ACE_MEM_SAP_Node>; +template class ACE_Based_Pointer_Basic<ACE_MEM_SAP_Node>; #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) #pragma instantiate ACE_Write_Guard<ACE_Process_Mutex> #pragma instantiate ACE_Read_Guard<ACE_Process_Mutex> -#if (ACE_HAS_POSITION_INDEPENDENT_POINTERS == 1) #pragma instantiate ACE_Malloc_T<ACE_MMAP_MEMORY_POOL, ACE_Process_Mutex, ACE_PI_Control_Block> -#else -#pragma instantiate ACE_Malloc<ACE_MMAP_MEMORY_POOL, ACE_Process_Mutex> -#pragma instantiate ACE_Malloc_T<ACE_MMAP_MEMORY_POOL, ACE_Process_Mutex, ACE_Control_Block> -#endif /* ACE_HAS_POSITION_INDEPENDENT_POINTERS == 1 */ +#pragma instantiate ACE_Based_Pointer<ACE_MEM_SAP_Node> +#pragma instantiate ACE_Based_Pointer_Basic<ACE_MEM_SAP_Node> #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ #endif /* ACE_HAS_POSITION_INDEPENDENT_POINTERS == 1 */ diff --git a/ace/MEM_SAP.h b/ace/MEM_SAP.h index 1a480b81884..8f62dbb9580 100644 --- a/ace/MEM_SAP.h +++ b/ace/MEM_SAP.h @@ -26,8 +26,47 @@ #include "ace/Process_Mutex.h" +class ACE_MEM_SAP; +class ACE_Reactive_MEM_IO; +class ACE_MT_MEM_IO; +class ACE_MEM_IO; + +// Internal data structure +// MEM_SAP uses to queue up +// data. +class ACE_Export ACE_MEM_SAP_Node +{ +public: +// friend class ACE_MEM_SAP; +// friend class ACE_Reactive_MEM_IO; +// friend class ACE_MT_MEM_IO; +// friend class ACE_MEM_IO; + + typedef ACE_Based_Pointer<ACE_MEM_SAP_Node> ACE_MEM_SAP_NODE_PTR; + + // Initialize the node with its capacity. + ACE_MEM_SAP_Node (size_t cap); + + // Get the size of the data we hold. + size_t size (void) const; + + // Get the capacity of this block of data. + size_t capacity (void) const; + + // Get the pointer to the block of data we hold. + void *data (void); + + // The maximum size of this memory block. + size_t capacity_; + + // The actualy size used. + size_t size_; + + ACE_MEM_SAP_NODE_PTR next_; +}; + /** - * @class ACE_MEM_SAP + * @Class ACE_MEM_SAP * * @brief Defines the methods of shared memory management for * shared memory transport. @@ -41,34 +80,45 @@ public: typedef ACE_MMAP_Memory_Pool_Options MALLOC_OPTIONS; /// Destructor. - ~ACE_MEM_SAP (void); + virtual ~ACE_MEM_SAP (void); - /// request a buffer of size <size>. Return 0 if the <shm_malloc_> is - /// not initialized. - void *acquire_buffer (const ssize_t size); - - /// release a buffer pointed by <buf>. Return -1 if the <shm_malloc_> - /// is not initialized. - int release_buffer (void *buf); + /** + * Initialize the MEM_SAP object. + */ + virtual int init (ACE_HANDLE handle, + const ACE_TCHAR *name, + MALLOC_OPTIONS *options) = 0; /** - * Set the length of buf (containing information) to <n> bytes. - * Return the offset of the <buf> relative to the base address. - * <buf> must be acquired by <get_buffer> method. Return -1 if the - * <shm_malloc_> is not initialized. + * Finalizing the MEM_SAP object. This method doesn't invoke + * the <remove> method. */ - off_t set_buf_len (void *buf, - size_t n); + virtual int fini (int remove) = 0; /** - * Convert the buffer offset <off> to absolute address to <buf>. - * Return the size of valid information containing in the <buf>, - * -1 if <shm_malloc_> is not initialized. + * Fetch location of next available data into <recv_buffer_>. + * As this operation read the address of the data off the socket + * using ACE::recv, <timeout> only applies to ACE::recv. */ - ssize_t get_buf_len (const off_t off, void *&buf); + virtual int recv_buf (ACE_MEM_SAP_Node *&buf, + int flags, + const ACE_Time_Value *timeout) = 0; - /// Remove the shared resouce (mmap file) used by us. - int remove (void); + /** + * Wait to to <timeout> amount of time to send <buf>. If <send> + * times out a -1 is returned with <errno == ETIME>. If it succeeds + * the number of bytes sent is returned. */ + virtual int send_buf (ACE_MEM_SAP_Node *buf, + int flags, + const ACE_Time_Value *timeout) = 0; + + /// request a buffer of size <size>. Return 0 if the <shm_malloc_> is + /// not initialized. + ACE_MEM_SAP_Node *acquire_buffer (const ssize_t size); + + /// release a buffer pointed by <buf>. Return -1 if the <shm_malloc_> + /// is not initialized. + int release_buffer (ACE_MEM_SAP_Node *buf); /// Dump the state of an object. void dump (void) const; @@ -86,12 +136,14 @@ protected: * communication. */ int create_shm_malloc (const ACE_TCHAR *name, - MALLOC_OPTIONS *options = 0); + MALLOC_OPTIONS *options); /// Close down the share memory pool. If <remove> != 0, then the /// mmap file will also get removed. int close_shm_malloc (const int remove = 0); + ACE_HANDLE handle_; + /// Data exchange channel. MALLOC_TYPE *shm_malloc_; diff --git a/ace/MEM_SAP.i b/ace/MEM_SAP.i index fea55e230ea..d1acbf79ce4 100644 --- a/ace/MEM_SAP.i +++ b/ace/MEM_SAP.i @@ -3,6 +3,33 @@ // MEM_SAP.i + +ASYS_INLINE +ACE_MEM_SAP_Node::ACE_MEM_SAP_Node (size_t cap) + : capacity_ (cap), + size_ (0), + next_ (0) +{ +} + +ASYS_INLINE size_t +ACE_MEM_SAP_Node::size (void) const +{ + return this->size_; +} + +ASYS_INLINE size_t +ACE_MEM_SAP_Node::capacity (void) const +{ + return this->capacity_; +} + +ASYS_INLINE void * +ACE_MEM_SAP_Node::data (void) +{ + return this + 1; +} + ASYS_INLINE ACE_MEM_SAP::~ACE_MEM_SAP (void) { @@ -11,81 +38,30 @@ ACE_MEM_SAP::~ACE_MEM_SAP (void) } -ASYS_INLINE void * +ASYS_INLINE ACE_MEM_SAP_Node * ACE_MEM_SAP::acquire_buffer (const ssize_t size) { ACE_TRACE ("ACE_MEM_SAP::acquire_buffer"); if (this->shm_malloc_ == 0) return 0; // not initialized. - size_t *lptr = ACE_static_cast (size_t *, - this->shm_malloc_->malloc (sizeof (size_t) + size)); - - *lptr = size; - ++lptr; + ACE_MEM_SAP_Node *buf = + ACE_reinterpret_cast (ACE_MEM_SAP_Node *, + this->shm_malloc_->malloc (sizeof (ACE_MEM_SAP_Node) + + size)); + if (buf != 0) + return new (buf) ACE_MEM_SAP_Node (size); - return lptr; + return 0; } ASYS_INLINE int -ACE_MEM_SAP::release_buffer (void *buf) +ACE_MEM_SAP::release_buffer (ACE_MEM_SAP_Node *buf) { ACE_TRACE ("ACE_MEM_SAP::release_buffer"); if (this->shm_malloc_ == 0) return -1; // not initialized. - size_t *lptr = ACE_static_cast (size_t *, buf); - - --lptr; - this->shm_malloc_->free (lptr); + this->shm_malloc_->free (buf); return 0; } - -ASYS_INLINE off_t -ACE_MEM_SAP::set_buf_len (void *buf, size_t n) -{ - ACE_TRACE ("ACE_MEM_SAP::set_buf_len"); - if (this->shm_malloc_ == 0) - return -1; - - size_t *lptr = ACE_static_cast (size_t *, buf); - --lptr; - - if (*lptr >= n) - *lptr = n; - - return ((char *) lptr - (char *) this->shm_malloc_->base_addr ()); -} - -ASYS_INLINE ssize_t -ACE_MEM_SAP::get_buf_len (const off_t off, void *&buf) -{ -#if !defined (ACE_HAS_WIN32_STRUCTURAL_EXCEPTIONS) - ACE_TRACE ("ACE_MEM_SAP::get_buf_len"); -#endif /* ACE_HAS_WIN32_STRUCTURAL_EXCEPTIONS */ - - if (this->shm_malloc_ == 0) - return -1; - - ssize_t retv = 0; - - ACE_SEH_TRY - { - size_t *lptr = (size_t*) ((char *) this->shm_malloc_->base_addr () + off); - buf = lptr + 1; - retv = *lptr; - } - ACE_SEH_EXCEPT (this->shm_malloc_->memory_pool ().seh_selector (GetExceptionInformation ())) - { - } - - return retv; -} - -ASYS_INLINE int -ACE_MEM_SAP::remove (void) -{ - ACE_TRACE ("ACE_MEM_SAP::remove"); - - return close_shm_malloc (1); -} diff --git a/ace/MEM_Stream.cpp b/ace/MEM_Stream.cpp index 0b1f9ca3394..239262c8d5d 100644 --- a/ace/MEM_Stream.cpp +++ b/ace/MEM_Stream.cpp @@ -21,6 +21,8 @@ ACE_MEM_Stream::dump (void) const int ACE_MEM_Stream::close (void) { + this->send ((char *)0, 0); + #if defined (ACE_WIN32) // We need the following call to make things work correctly on // Win32, which requires use to do a <close_writer> before doing the diff --git a/tests/MEM_Stream_Test.cpp b/tests/MEM_Stream_Test.cpp index dfdc7d4e560..de0a0bb78be 100644 --- a/tests/MEM_Stream_Test.cpp +++ b/tests/MEM_Stream_Test.cpp @@ -38,26 +38,36 @@ ACE_RCSID(tests, MEM_Stream_Test, "$Id$") static int opt_wfmo_reactor = 1; static int opt_select_reactor = 1; +static ACE_MEM_IO::Signal_Strategy client_strategy = ACE_MEM_IO::Reactive; -u_short Echo_Handler::waiting_ = NO_OF_CONNECTION; +ACE_Atomic_Op <ACE_Thread_Mutex, u_short> Echo_Handler::waiting_ = NO_OF_CONNECTION; u_short Echo_Handler::connection_count_ = 0; typedef ACE_Acceptor<Echo_Handler, ACE_MEM_ACCEPTOR> ACCEPTOR; +typedef ACE_Strategy_Acceptor<Echo_Handler, ACE_MEM_ACCEPTOR> S_ACCEPTOR; -// int -// Echo_Handler::open (void *) -// { -// // @@ Nanbor, this method doesn't anything? -// return 0; -// } +int +Echo_Handler::open (void *) +{ + // @@ Nanbor, this method doesn't anything? + return 0; +} -Echo_Handler::Echo_Handler () - : connection_ (++Echo_Handler::connection_count_) +Echo_Handler::Echo_Handler (ACE_Thread_Manager *thr_mgr) + : ACE_Svc_Handler<ACE_MEM_STREAM, ACE_MT_SYNCH> (thr_mgr), + connection_ (++Echo_Handler::connection_count_) { ACE_OS::sprintf (this->name_, ACE_TEXT ("Connection %d --> "), this->connection_); } +void +Echo_Handler::reset_handler (void) +{ + Echo_Handler::waiting_ = NO_OF_CONNECTION; + Echo_Handler::connection_count_ = 0; +} + int Echo_Handler::handle_input (ACE_HANDLE) { @@ -88,11 +98,17 @@ Echo_Handler::handle_input (ACE_HANDLE) int Echo_Handler::handle_close (ACE_HANDLE, - ACE_Reactor_Mask) + ACE_Reactor_Mask mask) { // Reduce count. this->waiting_--; +#if 1 + if (client_strategy != ACE_MEM_IO::Reactive) + this->reactor ()->remove_handler (this, + mask | ACE_Event_Handler::DONT_CALL); +#endif /* tests */ + // If no connections are open. if (this->waiting_ == 0) ACE_Reactor::instance ()->end_event_loop (); @@ -102,19 +118,31 @@ Echo_Handler::handle_close (ACE_HANDLE, this->connection_)); // Shutdown - this->peer ().remove (); + this->peer ().close (); + this->peer ().fini (1); this->destroy (); return 0; } +int +Echo_Handler::svc (void) +{ + while (this->handle_input (this->get_handle ()) >= 0) + ; + return 0; +} + void * connect_client (void *arg) { u_short sport = (*ACE_reinterpret_cast (u_short *, arg)); ACE_MEM_Addr to_server (sport); ACE_MEM_Connector connector; + connector.preferred_strategy (client_strategy); ACE_MEM_Stream stream; + // connector.preferred_strategy (ACE_MEM_IO::MT); + if (connector.connect (stream, to_server.get_remote_addr ()) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("connect_client")), @@ -166,22 +194,84 @@ create_reactor (void) ACE_Reactor::instance (reactor); } -int -main (int, ACE_TCHAR *[]) +int test_reactive (ACE_MEM_Addr &server_addr) { - ACE_START_TEST (ACE_TEXT ("MEM_Stream_Test")); + ACE_DEBUG ((LM_DEBUG, "Testing Reactive MEM_Stream\n\n")); + + ACE_Accept_Strategy<Echo_Handler, ACE_MEM_ACCEPTOR> accept_strategy; + ACE_Creation_Strategy<Echo_Handler> create_strategy; + ACE_Reactive_Strategy<Echo_Handler> reactive_strategy (ACE_Reactor::instance ()); + S_ACCEPTOR acceptor; + if (acceptor.open (server_addr, + ACE_Reactor::instance (), + &create_strategy, + &accept_strategy, + &reactive_strategy) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("MEM_Acceptor::accept\n")), 1); + acceptor.acceptor ().mmap_prefix (ACE_TEXT ("MEM_Acceptor_")); - create_reactor (); + ACE_MEM_Addr local_addr; + if (acceptor.acceptor ().get_local_addr (local_addr) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("MEM_Acceptor::get_local_addr\n")), + 1); + } - unsigned short port = 0; - ACE_MEM_Addr server_addr (port); + u_short sport = local_addr.get_port_number (); - ACCEPTOR acceptor; - acceptor.acceptor ().mmap_prefix (ACE_TEXT ("MEM_Acceptor_")); - if (acceptor.open (server_addr) == -1) + ACE_Thread_Manager::instance ()->spawn_n (NO_OF_CONNECTION, + connect_client, + &sport); + ACE_Time_Value tv(60, 0); + ACE_Reactor::instance ()->run_event_loop (tv); + + if (tv == ACE_Time_Value::zero) + { + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT("Reactor::run_event_loop timeout\n")), + 1); + } + + ACE_DEBUG ((LM_DEBUG, "Reactor::run_event_loop finished\n")); + + ACE_Thread_Manager::instance ()->wait (); + + if (acceptor.close () == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("MEM_Acceptor::close\n")), + 1); + } + return 0; +} + +int test_multithreaded (ACE_MEM_Addr &server_addr) +{ + ACE_DEBUG ((LM_DEBUG, "Testing Multithreaded MEM_Stream\n\n")); + + Echo_Handler::reset_handler (); + + client_strategy = ACE_MEM_IO::MT; + ACE_Accept_Strategy<Echo_Handler, ACE_MEM_ACCEPTOR> accept_strategy; + ACE_Creation_Strategy<Echo_Handler> create_strategy; + ACE_Thread_Strategy<Echo_Handler> thr_strategy; + S_ACCEPTOR acceptor; + + + if (acceptor.open (server_addr, + ACE_Reactor::instance (), + &create_strategy, + &accept_strategy, + &thr_strategy) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("MEM_Acceptor::accept\n")), 1); + acceptor.acceptor ().malloc_options ().minimum_bytes_ = 1024 * 1024 ; + acceptor.acceptor ().mmap_prefix (ACE_TEXT ("MEM_Acceptor_")); + acceptor.acceptor ().preferred_strategy (ACE_MEM_IO::MT); + ACE_MEM_Addr local_addr; if (acceptor.acceptor ().get_local_addr (local_addr) == -1) { @@ -215,6 +305,26 @@ main (int, ACE_TCHAR *[]) ACE_TEXT ("MEM_Acceptor::close\n")), 1); } + return 0; +} + +int +main (int, ACE_TCHAR *[]) +{ + ACE_START_TEST (ACE_TEXT ("MEM_Stream_Test")); + + create_reactor (); + + unsigned short port = 0; + ACE_MEM_Addr server_addr (port); + + test_reactive (server_addr); + + ACE_Reactor::instance ()->reset_event_loop (); + + test_multithreaded (server_addr); + + // Now testing ACE_END_TEST; return 0; @@ -223,9 +333,25 @@ main (int, ACE_TCHAR *[]) #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) template class ACE_Svc_Handler <ACE_MEM_STREAM, ACE_MT_SYNCH>; template class ACE_Acceptor<Echo_Handler, ACE_MEM_ACCEPTOR>; +template class ACE_Atomic_Op<ACE_Thread_Mutex, u_short>; +template class ACE_Accept_Strategy<Echo_Handler, ACE_MEM_ACCEPTOR>; +template class ACE_Creation_Strategy<Echo_Handler>; +template class ACE_Reactive_Strategy<Echo_Handler>; +template class ACE_Strategy_Acceptor<Echo_Handler, ACE_MEM_ACCEPTOR>; +template class ACE_Concurrency_Strategy<Echo_Handler>; +template class ACE_Scheduling_Strategy<Echo_Handler>; +template class ACE_Thread_Strategy<Echo_Handler>; #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) #pragma instantiate ACE_Svc_Handler <ACE_MEM_STREAM, ACE_MT_SYNCH> #pragma instantiate ACE_Acceptor<Echo_Handler, ACE_MEM_ACCEPTOR> +#pragma instantiate ACE_Atomic_Op<ACE_Thread_Mutex, u_short> +#pragma instantiate ACE_Accept_Strategy<Echo_Handler, ACE_MEM_ACCEPTOR> +#pragma instantiate ACE_Creation_Strategy<Echo_Handler> +#pragma instantiate ACE_Reactive_Strategy<Echo_Handler> +#pragma instantiate ACE_Strategy_Acceptor<Echo_Handler, ACE_MEM_ACCEPTOR> +#pragma instantiate ACE_Concurrency_Strategy<Echo_Handler> +#pragma instantiate ACE_Scheduling_Strategy<Echo_Handler> +#pragma instantiate ACE_Thread_Strategy<Echo_Handler> #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ #else diff --git a/tests/MEM_Stream_Test.h b/tests/MEM_Stream_Test.h index 8f6ea264525..a681e9486dd 100644 --- a/tests/MEM_Stream_Test.h +++ b/tests/MEM_Stream_Test.h @@ -37,15 +37,17 @@ class Echo_Handler : public ACE_Svc_Handler<ACE_MEM_STREAM, ACE_MT_SYNCH> // = TITLE // Simple class for reading in the data and then sending it back public: - Echo_Handler (); - // virtual int open (void *); + Echo_Handler (ACE_Thread_Manager *thr_mgr = 0); + virtual int open (void *); + static void reset_handler (void); virtual int handle_input (ACE_HANDLE h); virtual int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask); // The Svc_Handler callbacks. + virtual int svc (void); public: - static u_short waiting_; + static ACE_Atomic_Op <ACE_Thread_Mutex, u_short> waiting_; // How many connections are we waiting for. static u_short connection_count_; |