summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ChangeLog42
-rw-r--r--ChangeLogs/ChangeLog-02a42
-rw-r--r--ChangeLogs/ChangeLog-03a42
-rw-r--r--ace/MEM_Acceptor.cpp34
-rw-r--r--ace/MEM_Acceptor.h7
-rw-r--r--ace/MEM_Acceptor.i12
-rw-r--r--ace/MEM_Connector.cpp42
-rw-r--r--ace/MEM_Connector.h7
-rw-r--r--ace/MEM_Connector.i12
-rw-r--r--ace/MEM_IO.cpp372
-rw-r--r--ace/MEM_IO.h185
-rw-r--r--ace/MEM_IO.i156
-rw-r--r--ace/MEM_SAP.cpp25
-rw-r--r--ace/MEM_SAP.h96
-rw-r--r--ace/MEM_SAP.i98
-rw-r--r--ace/MEM_Stream.cpp2
-rw-r--r--tests/MEM_Stream_Test.cpp166
-rw-r--r--tests/MEM_Stream_Test.h8
18 files changed, 1119 insertions, 229 deletions
diff --git a/ChangeLog b/ChangeLog
index 039e0c5ed13..1da7975a5d2 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,41 @@
+Mon Apr 02 23:41:34 2001 Nanbor Wang <nanbor@cs.wustl.edu>
+
+ * ace/MEM_SAP.cpp:
+ * ace/MEM_SAP.h:
+ * ace/MEM_SAP.i:
+ Generalized the ACE_MEM_SAP class to allocate memory in the new
+ memory wrapper class ACE_MEM_SAP_Node.
+
+ * ace/MEM_IO.cpp:
+ * ace/MEM_IO.h:
+ * ace/MEM_IO.i:
+ Separated the different signaling mechanisms into a different
+ class in ACE_MEM_IO so it can determine the "right" signaling
+ strategy allowed. Currently, we implement the Reactive strategy
+ (ACE_Reactive_MEM_IO) which uses sockets for signaling and
+ multithreaded strategy (ACE_MT_MEM_IO) which uses semaphores for
+ signaling.
+
+ * ace/MEM_Stream.cpp:
+ Sending an empty buffer over to wake up the "other" end when we
+ are closing down.
+
+ * ace/MEM_Acceptor.cpp:
+ * ace/MEM_Acceptor.h:
+ * ace/MEM_Acceptor.i:
+ * ace/MEM_Connector.cpp:
+ * ace/MEM_Connector.h:
+ * ace/MEM_Connector.i: Added facility to specify the "preferred"
+ signaling strategy so the acceptor and connector can negociate
+ and agree on the best signaling mechanism to use.
+
+ * ace/MEM_Stream_Test.h:
+ * ace/MEM_Stream_Test.cpp:
+ Added the test for the new MT signaling MEM_Stream transfer.
+ This part of the test seems to be failing on system that depends
+ on SysV semaphores (because we need more semaphore than the
+ system can provide.)
+
Mon Apr 2 15:17:13 2001 Chad Elliott <elliott_c@ociweb.com>
* ace/config-chorus.h:
@@ -15,14 +53,14 @@ Mon Apr 2 09:57:31 2001 Darrell Brunsch <brunsch@uci.edu>
* bin/PerlACE/MSProject.pm:
- Made a change to the tao_idl depencency checking.
+ Made a change to the tao_idl depencency checking.
It was only checking for tao_idl and $(InputName) where
some places we use $(InputPath) instead.
* ace/config-win32-msvc-5.h:
* ace/config-win32-msvc-6.h:
- Disabled the Inheritance by Dominance informational
+ Disabled the Inheritance by Dominance informational
warning that MSVC gives. We have cases of this all over
the place, and normally we just disable the warning on
a file by file basis, but now we just do a blanket disable.
diff --git a/ChangeLogs/ChangeLog-02a b/ChangeLogs/ChangeLog-02a
index 039e0c5ed13..1da7975a5d2 100644
--- a/ChangeLogs/ChangeLog-02a
+++ b/ChangeLogs/ChangeLog-02a
@@ -1,3 +1,41 @@
+Mon Apr 02 23:41:34 2001 Nanbor Wang <nanbor@cs.wustl.edu>
+
+ * ace/MEM_SAP.cpp:
+ * ace/MEM_SAP.h:
+ * ace/MEM_SAP.i:
+ Generalized the ACE_MEM_SAP class to allocate memory in the new
+ memory wrapper class ACE_MEM_SAP_Node.
+
+ * ace/MEM_IO.cpp:
+ * ace/MEM_IO.h:
+ * ace/MEM_IO.i:
+ Separated the different signaling mechanisms into a different
+ class in ACE_MEM_IO so it can determine the "right" signaling
+ strategy allowed. Currently, we implement the Reactive strategy
+ (ACE_Reactive_MEM_IO) which uses sockets for signaling and
+ multithreaded strategy (ACE_MT_MEM_IO) which uses semaphores for
+ signaling.
+
+ * ace/MEM_Stream.cpp:
+ Sending an empty buffer over to wake up the "other" end when we
+ are closing down.
+
+ * ace/MEM_Acceptor.cpp:
+ * ace/MEM_Acceptor.h:
+ * ace/MEM_Acceptor.i:
+ * ace/MEM_Connector.cpp:
+ * ace/MEM_Connector.h:
+ * ace/MEM_Connector.i: Added facility to specify the "preferred"
+ signaling strategy so the acceptor and connector can negociate
+ and agree on the best signaling mechanism to use.
+
+ * ace/MEM_Stream_Test.h:
+ * ace/MEM_Stream_Test.cpp:
+ Added the test for the new MT signaling MEM_Stream transfer.
+ This part of the test seems to be failing on system that depends
+ on SysV semaphores (because we need more semaphore than the
+ system can provide.)
+
Mon Apr 2 15:17:13 2001 Chad Elliott <elliott_c@ociweb.com>
* ace/config-chorus.h:
@@ -15,14 +53,14 @@ Mon Apr 2 09:57:31 2001 Darrell Brunsch <brunsch@uci.edu>
* bin/PerlACE/MSProject.pm:
- Made a change to the tao_idl depencency checking.
+ Made a change to the tao_idl depencency checking.
It was only checking for tao_idl and $(InputName) where
some places we use $(InputPath) instead.
* ace/config-win32-msvc-5.h:
* ace/config-win32-msvc-6.h:
- Disabled the Inheritance by Dominance informational
+ Disabled the Inheritance by Dominance informational
warning that MSVC gives. We have cases of this all over
the place, and normally we just disable the warning on
a file by file basis, but now we just do a blanket disable.
diff --git a/ChangeLogs/ChangeLog-03a b/ChangeLogs/ChangeLog-03a
index 039e0c5ed13..1da7975a5d2 100644
--- a/ChangeLogs/ChangeLog-03a
+++ b/ChangeLogs/ChangeLog-03a
@@ -1,3 +1,41 @@
+Mon Apr 02 23:41:34 2001 Nanbor Wang <nanbor@cs.wustl.edu>
+
+ * ace/MEM_SAP.cpp:
+ * ace/MEM_SAP.h:
+ * ace/MEM_SAP.i:
+ Generalized the ACE_MEM_SAP class to allocate memory in the new
+ memory wrapper class ACE_MEM_SAP_Node.
+
+ * ace/MEM_IO.cpp:
+ * ace/MEM_IO.h:
+ * ace/MEM_IO.i:
+ Separated the different signaling mechanisms into a different
+ class in ACE_MEM_IO so it can determine the "right" signaling
+ strategy allowed. Currently, we implement the Reactive strategy
+ (ACE_Reactive_MEM_IO) which uses sockets for signaling and
+ multithreaded strategy (ACE_MT_MEM_IO) which uses semaphores for
+ signaling.
+
+ * ace/MEM_Stream.cpp:
+ Sending an empty buffer over to wake up the "other" end when we
+ are closing down.
+
+ * ace/MEM_Acceptor.cpp:
+ * ace/MEM_Acceptor.h:
+ * ace/MEM_Acceptor.i:
+ * ace/MEM_Connector.cpp:
+ * ace/MEM_Connector.h:
+ * ace/MEM_Connector.i: Added facility to specify the "preferred"
+ signaling strategy so the acceptor and connector can negociate
+ and agree on the best signaling mechanism to use.
+
+ * ace/MEM_Stream_Test.h:
+ * ace/MEM_Stream_Test.cpp:
+ Added the test for the new MT signaling MEM_Stream transfer.
+ This part of the test seems to be failing on system that depends
+ on SysV semaphores (because we need more semaphore than the
+ system can provide.)
+
Mon Apr 2 15:17:13 2001 Chad Elliott <elliott_c@ociweb.com>
* ace/config-chorus.h:
@@ -15,14 +53,14 @@ Mon Apr 2 09:57:31 2001 Darrell Brunsch <brunsch@uci.edu>
* bin/PerlACE/MSProject.pm:
- Made a change to the tao_idl depencency checking.
+ Made a change to the tao_idl depencency checking.
It was only checking for tao_idl and $(InputName) where
some places we use $(InputPath) instead.
* ace/config-win32-msvc-5.h:
* ace/config-win32-msvc-6.h:
- Disabled the Inheritance by Dominance informational
+ Disabled the Inheritance by Dominance informational
warning that MSVC gives. We have cases of this all over
the place, and normally we just disable the warning on
a file by file basis, but now we just do a blanket disable.
diff --git a/ace/MEM_Acceptor.cpp b/ace/MEM_Acceptor.cpp
index 428ec6912a5..4d3b084954e 100644
--- a/ace/MEM_Acceptor.cpp
+++ b/ace/MEM_Acceptor.cpp
@@ -23,7 +23,8 @@ ACE_MEM_Acceptor::dump (void) const
ACE_MEM_Acceptor::ACE_MEM_Acceptor (void)
: mmap_prefix_ (0),
- malloc_options_ (ACE_DEFAULT_BASE_ADDR, 0)
+ malloc_options_ (ACE_DEFAULT_BASE_ADDR, 0),
+ preferred_strategy_ (ACE_MEM_IO::Reactive)
{
ACE_TRACE ("ACE_MEM_Acceptor::ACE_MEM_Acceptor");
}
@@ -41,7 +42,8 @@ ACE_MEM_Acceptor::ACE_MEM_Acceptor (const ACE_MEM_Addr &remote_sap,
int backlog,
int protocol)
: mmap_prefix_ (0),
- malloc_options_ (ACE_DEFAULT_BASE_ADDR, 0)
+ malloc_options_ (ACE_DEFAULT_BASE_ADDR, 0),
+ preferred_strategy_ (ACE_MEM_IO::Reactive)
{
ACE_TRACE ("ACE_MEM_Acceptor::ACE_MEM_Acceptor");
if (this->open (remote_sap,
@@ -80,7 +82,7 @@ ACE_MEM_Acceptor::accept (ACE_MEM_Stream &new_stream,
int *len_ptr = 0;
sockaddr *addr = 0;
- int in_blocking_mode = 0;
+ int in_blocking_mode = 1;
if (this->shared_accept_start (timeout,
restart,
in_blocking_mode) == -1)
@@ -149,13 +151,35 @@ ACE_MEM_Acceptor::accept (ACE_MEM_Stream &new_stream,
// Make sure we have a fresh start.
ACE_OS::unlink (buf);
+ new_stream.disable (ACE_NONBLOCK);
+ ACE_HANDLE new_handle = new_stream.get_handle ();
+
+ // Protocol negociation:
+ // Tell the client side what level of signaling strategy
+ // we support.
+ ACE_INT16 client_signaling = this->preferred_strategy_;
+ if (ACE::send (new_handle, &client_signaling,
+ sizeof (ACE_INT16)) == -1)
+ ACE_ERROR_RETURN ((LM_DEBUG,
+ ACE_LIB_TEXT ("ACE_MEM_Acceptor::accept error sending strategy\n")),
+ -1);
+
+ // Now we get the signaling strategy the client support.
+ if (ACE::recv (new_handle, &client_signaling,
+ sizeof (ACE_INT16)) == -1)
+ ACE_ERROR_RETURN ((LM_DEBUG,
+ ACE_LIB_TEXT ("ACE_MEM_Acceptor::%p error receiving strategy\n"), "accept"),
+ -1);
+
+ // Client will decide what signaling strategy to use.
+
// Now set up the shared memory malloc pool.
- if (new_stream.create_shm_malloc (buf, &this->malloc_options_) == -1)
+ if (new_stream.init (buf, ACE_static_cast (ACE_MEM_IO::Signal_Strategy, client_signaling),
+ &this->malloc_options_) == -1)
return -1;
// @@ Need to handle timeout here.
ACE_UINT16 buf_len = (ACE_OS::strlen (buf) + 1) * sizeof (ACE_TCHAR);
- ACE_HANDLE new_handle = new_stream.get_handle ();
// No need to worry about byte-order because both parties should always
// be on the same machine.
diff --git a/ace/MEM_Acceptor.h b/ace/MEM_Acceptor.h
index b8a719d7de8..a1e6ffddcfe 100644
--- a/ace/MEM_Acceptor.h
+++ b/ace/MEM_Acceptor.h
@@ -97,6 +97,10 @@ public:
const ACE_TCHAR *mmap_prefix (void) const;
void mmap_prefix (const ACE_TCHAR *prefix);
+ // Set/get the preferred signaling strategy.
+ ACE_MEM_IO::Signal_Strategy preferred_strategy (void) const;
+ void preferred_strategy (ACE_MEM_IO::Signal_Strategy strategy);
+
/// Return the local endpoint address in the referenced <ACE_Addr>.
/// Returns 0 if successful, else -1.
int get_local_addr (ACE_MEM_Addr &) const;
@@ -153,6 +157,9 @@ private:
/// A cached MALLOC_OPTIONS. MEM_Accaptor use it to create the shared
/// mamory malloc upon every incoming connection.
ACE_MEM_SAP::MALLOC_OPTIONS malloc_options_;
+
+ // Preferred signaling strategy.
+ ACE_MEM_IO::Signal_Strategy preferred_strategy_;
};
#if !defined (ACE_LACKS_INLINE_FUNCTIONS)
diff --git a/ace/MEM_Acceptor.i b/ace/MEM_Acceptor.i
index e3c681eb7bf..22ad9f53b1e 100644
--- a/ace/MEM_Acceptor.i
+++ b/ace/MEM_Acceptor.i
@@ -63,6 +63,18 @@ ACE_MEM_Acceptor::mmap_prefix (const ACE_TCHAR *prefix)
this->mmap_prefix_ = ACE::strnew (prefix);
}
+ASYS_INLINE ACE_MEM_IO::Signal_Strategy
+ACE_MEM_Acceptor::preferred_strategy (void) const
+{
+ return this->preferred_strategy_;
+}
+
+ASYS_INLINE void
+ACE_MEM_Acceptor::preferred_strategy (ACE_MEM_IO::Signal_Strategy strategy)
+{
+ this->preferred_strategy_ = strategy;
+}
+
ASYS_INLINE ACE_MEM_SAP::MALLOC_OPTIONS &
ACE_MEM_Acceptor::malloc_options (void)
{
diff --git a/ace/MEM_Connector.cpp b/ace/MEM_Connector.cpp
index 3fabb12d5ae..b87302bfe2e 100644
--- a/ace/MEM_Connector.cpp
+++ b/ace/MEM_Connector.cpp
@@ -24,7 +24,8 @@ ACE_MEM_Connector::dump (void) const
}
ACE_MEM_Connector::ACE_MEM_Connector (void)
- : malloc_options_ (ACE_DEFAULT_BASE_ADDR, 0)
+ : malloc_options_ (ACE_DEFAULT_BASE_ADDR, 0),
+ preferred_strategy_ (ACE_MEM_IO::Reactive)
{
ACE_TRACE ("ACE_MEM_Connector::ACE_MEM_Connector");
}
@@ -38,7 +39,8 @@ ACE_MEM_Connector::ACE_MEM_Connector (ACE_MEM_Stream &new_stream,
int flags,
int perms,
int protocol)
- : malloc_options_ (ACE_DEFAULT_BASE_ADDR, 0)
+ : malloc_options_ (ACE_DEFAULT_BASE_ADDR, 0),
+ preferred_strategy_ (ACE_MEM_IO::Reactive)
{
ACE_TRACE ("ACE_MEM_Connector::ACE_MEM_Connector");
// This is necessary due to the weird inheritance relationships of
@@ -83,9 +85,13 @@ ACE_MEM_Connector::connect (ACE_MEM_Stream &new_stream,
timeout, local_sap,
reuse_addr, flags, perms,
PF_INET, protocol) == -1)
- return -1;
+ ACE_ERROR_RETURN ((LM_DEBUG,
+ ACE_LIB_TEXT ("ACE_MEM_Connector::connect error connecting to socket\n")),
+ -1);
+
ACE_HANDLE new_handle = temp_stream.get_handle ();
+ new_stream.disable (ACE_NONBLOCK);
new_stream.set_handle (new_handle);
// Do not close the handle.
@@ -93,15 +99,39 @@ ACE_MEM_Connector::connect (ACE_MEM_Stream &new_stream,
ACE_TCHAR buf[MAXPATHLEN];
// @@ Need to handle timeout here.
+ ACE_INT16 server_strategy = ACE_MEM_IO::Reactive;
+ // Receive the signaling strategy theserver support.
+ if (ACE::recv (new_handle, &server_strategy,
+ sizeof (ACE_INT16)) == -1)
+ ACE_ERROR_RETURN ((LM_DEBUG,
+ ACE_LIB_TEXT ("ACE_MEM_Connector::connect error receiving strategy\n")),
+ -1);
+
+ // If either side don't support MT, we will not use it.
+ if (! (this->preferred_strategy_ == ACE_MEM_IO::MT &&
+ server_strategy == ACE_MEM_IO::MT))
+ server_strategy = ACE_MEM_IO::Reactive;
+
+ if (ACE::send (new_handle, &server_strategy,
+ sizeof (ACE_INT16)) == -1)
+ ACE_ERROR_RETURN ((LM_DEBUG,
+ ACE_LIB_TEXT ("ACE_MEM_Connector::connect error sending strategy\n")),
+ -1);
+
ACE_INT16 buf_len;
// Byte-order is not a problem for this read.
if (ACE::recv (new_handle, &buf_len, sizeof (buf_len)) == -1)
- return -1;
+ ACE_ERROR_RETURN ((LM_DEBUG,
+ ACE_LIB_TEXT ("ACE_MEM_Connector::connect error receiving shm filename length\n")),
+ -1);
if (ACE::recv (new_handle, buf, buf_len) == -1)
- return -1;
+ ACE_ERROR_RETURN ((LM_DEBUG,
+ ACE_LIB_TEXT ("ACE_MEM_Connector::connect error receiving shm filename.\n")),
+ -1);
- if (new_stream.create_shm_malloc (buf, &this->malloc_options_) == -1)
+ if (new_stream.init (buf, ACE_static_cast (ACE_MEM_IO::Signal_Strategy, server_strategy),
+ &this->malloc_options_) == -1)
return -1;
return 0;
diff --git a/ace/MEM_Connector.h b/ace/MEM_Connector.h
index bf6ea813bec..900619bd539 100644
--- a/ace/MEM_Connector.h
+++ b/ace/MEM_Connector.h
@@ -86,6 +86,10 @@ public:
int perms = 0,
int protocol = 0);
+ // Set/get the preferred signaling strategy.
+ ACE_MEM_IO::Signal_Strategy preferred_strategy (void) const;
+ void preferred_strategy (ACE_MEM_IO::Signal_Strategy strategy);
+
/// Accessor to underlying malloc options.
ACE_MEM_SAP::MALLOC_OPTIONS &malloc_options (void);
@@ -106,6 +110,9 @@ private:
/// A cached MALLOC_OPTIONS that the MEM_Connector used to initialize
/// the shared memory malloc update connection establishment.
ACE_MEM_SAP::MALLOC_OPTIONS malloc_options_;
+
+ // Preferred signaling strategy.
+ ACE_MEM_IO::Signal_Strategy preferred_strategy_;
};
#if !defined (ACE_LACKS_INLINE_FUNCTIONS)
diff --git a/ace/MEM_Connector.i b/ace/MEM_Connector.i
index 3fdf59e19e0..09e311d76a2 100644
--- a/ace/MEM_Connector.i
+++ b/ace/MEM_Connector.i
@@ -5,6 +5,18 @@
// Establish a connection.
+ASYS_INLINE ACE_MEM_IO::Signal_Strategy
+ACE_MEM_Connector::preferred_strategy (void) const
+{
+ return this->preferred_strategy_;
+}
+
+ASYS_INLINE void
+ACE_MEM_Connector::preferred_strategy (ACE_MEM_IO::Signal_Strategy strategy)
+{
+ this->preferred_strategy_ = strategy;
+}
+
ASYS_INLINE ACE_MEM_SAP::MALLOC_OPTIONS &
ACE_MEM_Connector::malloc_options (void)
{
diff --git a/ace/MEM_IO.cpp b/ace/MEM_IO.cpp
index 2a9672c0f70..51ded63e4b8 100644
--- a/ace/MEM_IO.cpp
+++ b/ace/MEM_IO.cpp
@@ -14,12 +14,357 @@ ACE_RCSID(ace, MEM_IO, "$Id$")
ACE_ALLOC_HOOK_DEFINE(ACE_MEM_IO)
+ACE_Reactive_MEM_IO::~ACE_Reactive_MEM_IO ()
+{
+}
+
+int
+ACE_Reactive_MEM_IO::init (ACE_HANDLE handle,
+ const ACE_TCHAR *name,
+ MALLOC_OPTIONS *options)
+{
+ ACE_TRACE ("ACE_Reactive_MEM_IO::init");
+ this->handle_ = handle;
+ return this->create_shm_malloc (name,
+ options);
+}
+
+int
+ACE_Reactive_MEM_IO::fini (int remove)
+{
+ ACE_TRACE ("ACE_Reactive_MEM_IO::init");
+
+ return this->close_shm_malloc (remove);
+}
+
+int
+ACE_Reactive_MEM_IO::recv_buf (ACE_MEM_SAP_Node *&buf,
+ int flags,
+ const ACE_Time_Value *timeout)
+{
+ ACE_TRACE ("ACE_Reactive_MEM_IO::recv_buf");
+
+ if (this->shm_malloc_ == 0)
+ return -1;
+
+ off_t new_offset = 0;
+ int retv = ACE::recv (this->handle_,
+ (char *) &new_offset,
+ sizeof (off_t),
+ flags,
+ timeout);
+
+ if (retv == 0)
+ return 0;
+ else if (retv != sizeof (off_t))
+ {
+ // Nothing available or we are really screwed.
+ buf = 0;
+ return -1;
+ }
+ else
+ return this->get_buf_len (new_offset, buf);
+
+ ACE_NOTREACHED (return 0;)
+}
+
+int
+ACE_Reactive_MEM_IO::send_buf (ACE_MEM_SAP_Node *buf,
+ int flags,
+ const ACE_Time_Value *timeout)
+{
+ ACE_TRACE ("ACE_Reactive_MEM_IO::send_buf");
+
+ if (this->shm_malloc_ == 0)
+ return -1;
+
+ off_t offset = ACE_reinterpret_cast (char *, buf) -
+ ACE_static_cast (char *, this->shm_malloc_->base_addr ());
+ // the offset.
+ // Send the offset value over the socket.
+ if (ACE::send (this->handle_,
+ (const char *) &offset,
+ sizeof (offset),
+ flags,
+ timeout) != sizeof (offset))
+ {
+ // unsucessful send, release the memory in the shared-memory.
+ this->release_buffer (buf);
+
+ return -1;
+ }
+ return buf->size ();
+}
+
+int
+ACE_MT_MEM_IO::Simple_Queue::write (ACE_MEM_SAP_Node *new_node)
+{
+ if (this->mq_ == 0)
+ return -1;
+
+ // Here, we assume we already have acquired the lock necessary.
+ // And we are allowed to write.
+ if (this->mq_->tail_ == (void *) 0) // nothing in the queue.
+ {
+ this->mq_->head_ = new_node;
+ this->mq_->tail_ = new_node;
+ new_node->next_ = 0;
+ }
+ else
+ {
+ this->mq_->tail_->next_ = new_node;
+ new_node->next_ = 0;
+ this->mq_->tail_ = new_node;
+ }
+ return 0;
+}
+
+ACE_MEM_SAP_Node *
+ACE_MT_MEM_IO::Simple_Queue::read ()
+{
+ if (this->mq_ == 0)
+ return 0;
+
+ ACE_MEM_SAP_Node *retv = 0;
+
+ ACE_SEH_TRY
+ {
+ retv = this->mq_->head_;
+ // Here, we assume we already have acquired the lock necessary
+ // and there are soemthing in the queue.
+ if (this->mq_->head_ == this->mq_->tail_)
+ {
+ // Last message in the queue.
+ this->mq_->head_ = 0;
+ this->mq_->tail_ = 0;
+ }
+ else
+ this->mq_->head_ = retv->next_;
+ }
+ ACE_SEH_EXCEPT (this->malloc_->memory_pool ().seh_selector (GetExceptionInformation ()))
+ {
+ }
+
+ return retv;
+}
+
+ACE_MT_MEM_IO::~ACE_MT_MEM_IO ()
+{
+ delete this->recv_channel_.sema_;
+ delete this->recv_channel_.lock_;
+ delete this->send_channel_.sema_;
+ delete this->send_channel_.lock_;
+}
+
+int
+ACE_MT_MEM_IO::init (ACE_HANDLE handle,
+ const ACE_TCHAR *name,
+ MALLOC_OPTIONS *options)
+{
+ ACE_TRACE ("ACE_MT_MEM_IO::init");
+ ACE_UNUSED_ARG (handle);
+
+ // @@ Give me a rule on naming and how the queue should
+ // be kept in the shared memory and we are done
+ // with this.
+ if (this->create_shm_malloc (name, options) == -1)
+ return -1;
+
+ ACE_TCHAR server_sema [MAXPATHLEN];
+ ACE_TCHAR client_sema [MAXPATHLEN];
+ ACE_TCHAR server_lock [MAXPATHLEN];
+ ACE_TCHAR client_lock [MAXPATHLEN];
+ const ACE_TCHAR *basename = ACE::basename (name);
+ // size_t baselen = ACE_OS::strlen (basename);
+
+ // Building names. @@ Check buffer overflow?
+ ACE_OS::strcpy (server_sema, basename);
+ ACE_OS::strcat (server_sema, ACE_TEXT ("_sema_to_server"));
+ ACE_OS::strcpy (client_sema, basename);
+ ACE_OS::strcat (client_sema, ACE_TEXT ("_sema_to_client"));
+ ACE_OS::strcpy (server_lock, basename);
+ ACE_OS::strcat (server_lock, ACE_TEXT ("_lock_to_server"));
+ ACE_OS::strcpy (client_lock, basename);
+ ACE_OS::strcat (client_lock, ACE_TEXT ("_lock_to_client"));
+
+ void *to_server_ptr = 0;
+ // @@ Here, we assume the shared memory fill will never be resued.
+ // So we can determine whether we are server or client by examining
+ // if the simple message queues have already been set up in
+ // the Malloc object or not.
+ if (this->shm_malloc_->find ("to_server", to_server_ptr) == -1)
+ {
+ void *ptr = 0;
+ // We are server.
+ ACE_ALLOCATOR_RETURN (ptr,
+ this->shm_malloc_->malloc (2 * sizeof (MQ_Struct)),
+ -1);
+
+ MQ_Struct *mymq = ACE_reinterpret_cast (MQ_Struct *, ptr);
+ mymq->tail_ = 0;
+ mymq->head_ = 0;
+ (mymq + 1)->tail_ = 0;
+ (mymq + 1)->head_ = 0;
+ if (this->shm_malloc_->bind ("to_server", mymq) == -1)
+ return -1;
+
+ if (this->shm_malloc_->bind ("to_client", mymq + 1) == -1)
+ return -1;
+
+ this->recv_channel_.queue_.init (mymq, this->shm_malloc_);
+ ACE_NEW_RETURN (this->recv_channel_.sema_,
+ ACE_SYNCH_PROCESS_SEMAPHORE (0, server_sema),
+ -1);
+ ACE_NEW_RETURN (this->recv_channel_.lock_,
+ ACE_SYNCH_PROCESS_MUTEX (server_lock),
+ -1);
+
+ this->send_channel_.queue_.init (mymq + 1, this->shm_malloc_);
+ ACE_NEW_RETURN (this->send_channel_.sema_,
+ ACE_SYNCH_PROCESS_SEMAPHORE (0, client_sema),
+ -1);
+ ACE_NEW_RETURN (this->send_channel_.lock_,
+ ACE_SYNCH_PROCESS_MUTEX (client_lock),
+ -1);
+ }
+ else
+ {
+ // we are client.
+ MQ_Struct *mymq = ACE_reinterpret_cast (MQ_Struct *, to_server_ptr);
+ this->recv_channel_.queue_.init (mymq +1, this->shm_malloc_);
+ ACE_NEW_RETURN (this->recv_channel_.sema_,
+ ACE_SYNCH_PROCESS_SEMAPHORE (0, client_sema),
+ -1);
+ ACE_NEW_RETURN (this->recv_channel_.lock_,
+ ACE_SYNCH_PROCESS_MUTEX (client_lock),
+ -1);
+
+ this->send_channel_.queue_.init (mymq, this->shm_malloc_);
+ ACE_NEW_RETURN (this->send_channel_.sema_,
+ ACE_SYNCH_PROCESS_SEMAPHORE (0, server_sema),
+ -1);
+ ACE_NEW_RETURN (this->send_channel_.lock_,
+ ACE_SYNCH_PROCESS_MUTEX (server_lock),
+ -1);
+ }
+ return 0;
+}
+
+int
+ACE_MT_MEM_IO::fini (int remove)
+{
+ ACE_TRACE ("ACE_MT_MEM_IO::init");
+
+ return this->close_shm_malloc (remove);
+}
+
+int
+ACE_MT_MEM_IO::recv_buf (ACE_MEM_SAP_Node *&buf,
+ int flags,
+ const ACE_Time_Value *timeout)
+{
+ ACE_TRACE ("ACE_MT_MEM_IO::recv_buf");
+
+ // @@ Don't know how to handle timeout yet.
+ ACE_UNUSED_ARG (timeout);
+ ACE_UNUSED_ARG (flags);
+
+ if (this->shm_malloc_ == 0)
+ return -1;
+
+ // Need to handle timeout here.
+ if (this->recv_channel_.sema_->acquire () == -1)
+ return -1;
+
+ {
+ // @@ We can probably skip the lock in certain circumstance.
+ ACE_GUARD_RETURN (ACE_SYNCH_PROCESS_MUTEX, ace_mon, *this->recv_channel_.lock_, -1);
+
+ buf = this->recv_channel_.queue_.read ();
+ if (buf != 0)
+ return buf->size ();
+ return -1;
+ }
+
+ ACE_NOTREACHED (return 0;)
+}
+
+int
+ACE_MT_MEM_IO::send_buf (ACE_MEM_SAP_Node *buf,
+ int flags,
+ const ACE_Time_Value *timeout)
+{
+ ACE_TRACE ("ACE_MT_MEM_IO::send_buf");
+
+ // @@ Don't know how to handle timeout yet.
+ ACE_UNUSED_ARG (timeout);
+ ACE_UNUSED_ARG (flags);
+
+ if (this->shm_malloc_ == 0)
+ return -1;
+
+ {
+ // @@ We can probably skip the lock in certain curcumstances.
+ ACE_GUARD_RETURN (ACE_SYNCH_PROCESS_MUTEX, ace_mon, *this->send_channel_.lock_, -1);
+
+ if (this->send_channel_.queue_.write (buf) == -1)
+ {
+ this->release_buffer (buf);
+ return -1;
+ }
+ }
+
+ if (this->send_channel_.sema_->release () == -1)
+ return -1;
+
+ return buf->size ();
+}
+
void
ACE_MEM_IO::dump (void) const
{
ACE_TRACE ("ACE_MEM_IO::dump");
}
+int
+ACE_MEM_IO::init (const ACE_TCHAR *name,
+ ACE_MEM_IO::Signal_Strategy type,
+ ACE_MEM_SAP::MALLOC_OPTIONS *options)
+{
+ ACE_UNUSED_ARG (type);
+
+ delete this->deliver_strategy_;
+ this->deliver_strategy_ = 0;
+ switch (type)
+ {
+ case ACE_MEM_IO::Reactive:
+ ACE_NEW_RETURN (this->deliver_strategy_,
+ ACE_Reactive_MEM_IO (),
+ -1);
+ break;
+ case ACE_MEM_IO::MT:
+ ACE_NEW_RETURN (this->deliver_strategy_,
+ ACE_MT_MEM_IO (),
+ -1);
+ break;
+ default:
+ return -1;
+ }
+
+ return this->deliver_strategy_->init (this->get_handle (),
+ name,
+ options);
+}
+
+int
+ACE_MEM_IO::fini (int remove)
+{
+ if (this->deliver_strategy_ != 0)
+ return this->deliver_strategy_->fini (remove);
+ else
+ return -1;
+}
+
// Allows a client to read from a socket without having to provide
// a buffer to read. This method determines how much data is in the
// socket, allocates a buffer of this size, reads in the data, and
@@ -31,15 +376,20 @@ ACE_MEM_IO::send (const ACE_Message_Block *message_block,
{
ACE_TRACE ("ACE_MEM_IO::send");
+ if (this->deliver_strategy_ == 0)
+ return -1; // Something went seriously wrong.
+
ssize_t len = message_block->total_length ();
if (len != 0)
{
- char *buf = ACE_static_cast (char *, this->acquire_buffer (len));
+ ACE_MEM_SAP_Node *buf =
+ ACE_reinterpret_cast (ACE_MEM_SAP_Node *,
+ this->deliver_strategy_->acquire_buffer (len));
ssize_t n = 0;
while (message_block != 0)
{
- ACE_OS::memcpy (buf + n,
+ ACE_OS::memcpy (ACE_static_cast (char *, buf->data ()) + n,
message_block->rd_ptr (),
message_block->length ());
n += message_block->length ();
@@ -50,19 +400,11 @@ ACE_MEM_IO::send (const ACE_Message_Block *message_block,
message_block = message_block->next ();
}
- off_t offset = this->set_buf_len (buf, len);
- if (ACE::send (this->get_handle (),
- (const char *) &offset,
- sizeof (offset),
- 0,
- timeout) != sizeof (offset))
- {
- // unsucessful send, release the memory in the shared-memory.
- this->release_buffer (buf);
+ buf->size_ = len;
- return -1;
- }
- return len;
+ return this->deliver_strategy_->send_buf (buf,
+ 0,
+ timeout);
}
return 0;
}
@@ -71,7 +413,7 @@ ACE_MEM_IO::send (const ACE_Message_Block *message_block,
#if 0
ssize_t
ACE_MEM_IO::recvv (iovec *io_vec,
- const ACE_Time_Value *timeout)
+ const ACE_Time_Value *timeout)
{
ACE_TRACE ("ACE_MEM_IO::recvv");
#if defined (FIONREAD)
diff --git a/ace/MEM_IO.h b/ace/MEM_IO.h
index 6b52edcaa54..cfb4a0de4eb 100644
--- a/ace/MEM_IO.h
+++ b/ace/MEM_IO.h
@@ -19,6 +19,8 @@
#include "ace/MEM_SAP.h"
#include "ace/Memory_Pool.h"
#include "ace/Message_Block.h"
+#include "ace/Process_Semaphore.h"
+#include "ace/Process_Mutex.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
@@ -26,6 +28,120 @@
#if (ACE_HAS_POSITION_INDEPENDENT_POINTERS == 1)
+class ACE_Export ACE_Reactive_MEM_IO : public ACE_MEM_SAP
+{
+public:
+ ACE_Reactive_MEM_IO (void);
+
+ virtual ~ACE_Reactive_MEM_IO (void);
+
+ /**
+ * Initialize the MEM_SAP object.
+ */
+ virtual int init (ACE_HANDLE handle,
+ const ACE_TCHAR *name,
+ MALLOC_OPTIONS *options);
+
+ /**
+ * Finalizing the MEM_SAP object. This method doesn't invoke
+ * the <remove> method.
+ */
+ virtual int fini (int remove);
+ /**
+ * Fetch location of next available data into <recv_buffer_>.
+ * As this operation read the address of the data off the socket
+ * using ACE::recv, <timeout> only applies to ACE::recv.
+ */
+ virtual int recv_buf (ACE_MEM_SAP_Node *&buf,
+ int flags,
+ const ACE_Time_Value *timeout);
+
+ /**
+ * Wait to to <timeout> amount of time to send <buf>. If <send>
+ * times out a -1 is returned with <errno == ETIME>. If it succeeds
+ * the number of bytes sent is returned. */
+ virtual int send_buf (ACE_MEM_SAP_Node *buf,
+ int flags,
+ const ACE_Time_Value *timeout);
+
+ /**
+ * Convert the buffer offset <off> to absolute address to <buf>.
+ * Return the size of valid information containing in the <buf>,
+ * -1 if <shm_malloc_> is not initialized.
+ */
+ ssize_t get_buf_len (const off_t off, ACE_MEM_SAP_Node *&buf);
+};
+
+class ACE_Export ACE_MT_MEM_IO : public ACE_MEM_SAP
+{
+public:
+ typedef struct
+ {
+ ACE_MEM_SAP_Node::ACE_MEM_SAP_NODE_PTR head_;
+ ACE_MEM_SAP_Node::ACE_MEM_SAP_NODE_PTR tail_;
+ } MQ_Struct; // Structure for a simple queue
+
+ class Simple_Queue
+ {
+ public:
+ Simple_Queue (void);
+ Simple_Queue (MQ_Struct *mq);
+
+ int init (MQ_Struct *mq, ACE_MEM_SAP::MALLOC_TYPE *malloc);
+
+ int write (ACE_MEM_SAP_Node *new_msg);
+
+ ACE_MEM_SAP_Node *read (void);
+ private:
+ MQ_Struct *mq_;
+ ACE_MEM_SAP::MALLOC_TYPE *malloc_;
+ };
+
+ typedef struct
+ {
+ ACE_SYNCH_PROCESS_SEMAPHORE *sema_;
+ ACE_SYNCH_PROCESS_MUTEX *lock_;
+ Simple_Queue queue_;
+ } Channel;
+
+ ACE_MT_MEM_IO (void);
+
+ virtual ~ACE_MT_MEM_IO (void);
+
+ /**
+ * Initialize the MEM_SAP object.
+ */
+ virtual int init (ACE_HANDLE handle,
+ const ACE_TCHAR *name,
+ MALLOC_OPTIONS *options);
+
+ /**
+ * Finalizing the MEM_SAP object. This method doesn't invoke
+ * the <remove> method.
+ */
+ virtual int fini (int remove);
+ /**
+ * Fetch location of next available data into <recv_buffer_>.
+ * As this operation read the address of the data off the socket
+ * using ACE::recv, <timeout> only applies to ACE::recv.
+ */
+ virtual int recv_buf (ACE_MEM_SAP_Node *&buf,
+ int flags,
+ const ACE_Time_Value *timeout);
+
+ /**
+ * Wait to to <timeout> amount of time to send <buf>. If <send>
+ * times out a -1 is returned with <errno == ETIME>. If it succeeds
+ * the number of bytes sent is returned. */
+ virtual int send_buf (ACE_MEM_SAP_Node *buf,
+ int flags,
+ const ACE_Time_Value *timeout);
+
+private:
+ Channel recv_channel_;
+ Channel send_channel_;
+};
+
/**
* @class ACE_MEM_IO
*
@@ -52,7 +168,7 @@
* the other end. The receiving side then reverses the
* procedures and copies the information into user buffer.
*/
-class ACE_Export ACE_MEM_IO : public ACE_SOCK, public ACE_MEM_SAP
+class ACE_Export ACE_MEM_IO : public ACE_SOCK
{
public:
// = Initialization and termination methods.
@@ -62,6 +178,25 @@ public:
/// Destructor.
~ACE_MEM_IO (void);
+ typedef enum
+ {
+ Reactive,
+ MT
+ } Signal_Strategy;
+
+ /**
+ * Initialize the MEM_SAP object.
+ */
+ int init (const ACE_TCHAR *name,
+ ACE_MEM_IO::Signal_Strategy type = ACE_MEM_IO::Reactive,
+ ACE_MEM_SAP::MALLOC_OPTIONS *options = 0);
+
+ /**
+ * Finalizing the MEM_IO object. This method doesn't invoke
+ * the <remove> method.
+ */
+ int fini (int remove = 0);
+
/// Send an <n> byte buffer to the other process using shm_malloc_
/// connected thru the socket.
ssize_t send (const void *buf,
@@ -90,18 +225,6 @@ public:
*/
ssize_t send (const void *buf,
size_t n,
- int flags,
- const ACE_Time_Value *timeout);
-
- /**
- * Wait up to <timeout> amount of time to receive up to <n> bytes
- * into <buf> from <handle> (uses the <recv> call). If <recv> times
- * out a -1 is returned with <errno == ETIME>. If it succeeds the
- * number of bytes received is returned.
- */
- ssize_t recv (void *buf,
- size_t n,
- int flags,
const ACE_Time_Value *timeout);
/**
@@ -112,6 +235,15 @@ public:
*/
ssize_t send (const void *buf,
size_t n,
+ int flags,
+ const ACE_Time_Value *timeout);
+
+ /**
+ * Wait to to <timeout> amount of time to send the <message_block>.
+ * If <send> times out a -1 is returned with <errno == ETIME>. If
+ * it succeeds the number of bytes sent is returned.
+ */
+ ssize_t send (const ACE_Message_Block *message_block,
const ACE_Time_Value *timeout);
/**
@@ -125,13 +257,17 @@ public:
const ACE_Time_Value *timeout);
/**
- * Wait to to <timeout> amount of time to send the <message_block>.
- * If <send> times out a -1 is returned with <errno == ETIME>. If
- * it succeeds the number of bytes sent is returned.
+ * Wait up to <timeout> amount of time to receive up to <n> bytes
+ * into <buf> from <handle> (uses the <recv> call). If <recv> times
+ * out a -1 is returned with <errno == ETIME>. If it succeeds the
+ * number of bytes received is returned.
*/
- ssize_t send (const ACE_Message_Block *message_block,
+ ssize_t recv (void *buf,
+ size_t n,
+ int flags,
const ACE_Time_Value *timeout);
+
/// Dump the state of an object.
void dump (void) const;
@@ -147,17 +283,14 @@ public:
int get_remote_port (u_short &) const;
*/
-protected:
- /**
- * Fetch location of next available data into <recv_buffer_>.
- * As this operation read the address of the data off the socket
- * using ACE::recv, <timeout> only applies to ACE::recv.
- */
- ssize_t fetch_recv_buf (int flags, const ACE_Time_Value *timeout = 0);
-
private:
+ ssize_t fetch_recv_buf (int flag, const ACE_Time_Value *timeout);
+
+ /// Actual deliverying mechanism.
+ ACE_MEM_SAP *deliver_strategy_;
+
/// Internal pointer for support recv/send.
- void *recv_buffer_;
+ ACE_MEM_SAP_Node *recv_buffer_;
/// Record the current total buffer size of <recv_buffer_>.
ssize_t buf_size_;
diff --git a/ace/MEM_IO.i b/ace/MEM_IO.i
index 89fb83e23da..064dc94534b 100644
--- a/ace/MEM_IO.i
+++ b/ace/MEM_IO.i
@@ -3,10 +3,78 @@
// MEM_IO.i
+ASYS_INLINE
+ACE_Reactive_MEM_IO::ACE_Reactive_MEM_IO ()
+{
+}
+
+ASYS_INLINE
+ACE_MT_MEM_IO::ACE_MT_MEM_IO ()
+{
+ this->recv_channel_.sema_ = 0;
+ this->recv_channel_.lock_ = 0;
+ this->send_channel_.sema_ = 0;
+ this->send_channel_.lock_ = 0;
+}
+
+ASYS_INLINE
+ACE_MT_MEM_IO::Simple_Queue::Simple_Queue (void)
+ : mq_ (0),
+ malloc_ (0)
+{
+}
+
+ASYS_INLINE
+ACE_MT_MEM_IO::Simple_Queue::Simple_Queue (MQ_Struct *mq)
+ : mq_ (mq),
+ malloc_ (0)
+{
+}
+
+ASYS_INLINE int
+ACE_MT_MEM_IO::Simple_Queue::init (MQ_Struct *mq,
+ ACE_MEM_SAP::MALLOC_TYPE *malloc)
+{
+ if (this->mq_ != 0)
+ return -1;
+
+ this->mq_ = mq;
+ this->malloc_ = malloc;
+ return 0;
+}
+
+ASYS_INLINE ssize_t
+ACE_Reactive_MEM_IO::get_buf_len (const off_t off, ACE_MEM_SAP_Node *&buf)
+{
+#if !defined (ACE_HAS_WIN32_STRUCTURAL_EXCEPTIONS)
+ ACE_TRACE ("ACE_Reactive_MEM_IO::get_buf_len");
+#endif /* ACE_HAS_WIN32_STRUCTURAL_EXCEPTIONS */
+
+ if (this->shm_malloc_ == 0)
+ return -1;
+
+ ssize_t retv = 0;
+
+ ACE_SEH_TRY
+ {
+ buf = ACE_reinterpret_cast (ACE_MEM_SAP_Node *,
+ (ACE_static_cast(char *,
+ this->shm_malloc_->base_addr ())
+ + off));
+ retv = buf->size ();
+ }
+ ACE_SEH_EXCEPT (this->shm_malloc_->memory_pool ().seh_selector (GetExceptionInformation ()))
+ {
+ }
+
+ return retv;
+}
+
// Send an n byte message to the connected socket.
ASYS_INLINE
ACE_MEM_IO::ACE_MEM_IO (void)
- : recv_buffer_ (0),
+ : deliver_strategy_ (0),
+ recv_buffer_ (0),
buf_size_ (0),
cur_offset_ (0)
{
@@ -18,76 +86,64 @@ ACE_MEM_IO::fetch_recv_buf (int flag, const ACE_Time_Value *timeout)
{
ACE_TRACE ("ACE_MEM_IO::fetch_recv_buf");
+ if (this->deliver_strategy_ == 0)
+ return -1;
+
// This method can only be called when <buf_size_> == <cur_offset_>.
ACE_ASSERT (this->buf_size_ == this->cur_offset_);
// We have done using the previous buffer, return it to malloc.
if (this->recv_buffer_ != 0)
- this->release_buffer (this->recv_buffer_);
+ this->deliver_strategy_->release_buffer (this->recv_buffer_);
this->cur_offset_ = 0;
- off_t new_offset = 0;
- int retv = ACE::recv (this->get_handle (),
- (char *) &new_offset,
- sizeof (off_t),
- flag,
- timeout);
-
- if (retv == 0)
- return 0;
- else if (retv != sizeof (off_t))
- {
- // Nothing available or we are really screwed.
- this->buf_size_ = 0;
- this->recv_buffer_ = 0;
- return -1;
- }
+ int retv = 0;
+
+ if ((retv = this->deliver_strategy_->recv_buf (this->recv_buffer_,
+ flag,
+ timeout)) > 0)
+ this->buf_size_ = retv;
else
- this->buf_size_ = this->get_buf_len (new_offset,
- this->recv_buffer_);
- return this->buf_size_;
+ this->buf_size_ = 0;
+
+ return retv;
}
ASYS_INLINE
ACE_MEM_IO::~ACE_MEM_IO (void)
{
- // ACE_TRACE ("ACE_MEM_IO::~ACE_MEM_IO");
+ delete this->deliver_strategy_;
}
ASYS_INLINE ssize_t
ACE_MEM_IO::send (const void *buf,
- size_t len,
- int flags,
- const ACE_Time_Value *timeout)
+ size_t len,
+ int flags,
+ const ACE_Time_Value *timeout)
{
ACE_TRACE ("ACE_MEM_IO::send");
- void *sbuf = this->acquire_buffer (len);
+ if (this->deliver_strategy_ == 0)
+ return 0;
+
+ ACE_MEM_SAP_Node *sbuf = this->deliver_strategy_->acquire_buffer (len);
if (sbuf == 0)
return -1; // Memory buffer not initialized.
- ACE_OS::memcpy (sbuf, buf, len);
- off_t offset = this->set_buf_len (sbuf, len); // <set_buf_len> also calculate
- // the offset.
-
- // Send the offset value over the socket.
- if (ACE::send (this->get_handle (),
- (const char *) &offset,
- sizeof (offset),
- flags,
- timeout) != sizeof (offset))
- {
- // unsucessful send, release the memory in the shared-memory.
- this->release_buffer (sbuf);
+ ACE_OS::memcpy (sbuf->data (), buf, len);
- return -1;
- }
- return len;
+ ///
+
+ sbuf->size_ = len;
+
+ return this->deliver_strategy_->send_buf (sbuf,
+ flags,
+ timeout);
}
ASYS_INLINE ssize_t
ACE_MEM_IO::recv (void *buf,
- size_t len,
- int flags,
- const ACE_Time_Value *timeout)
+ size_t len,
+ int flags,
+ const ACE_Time_Value *timeout)
{
ACE_TRACE ("ACE_MEM_IO::recv");
@@ -108,7 +164,7 @@ ACE_MEM_IO::recv (void *buf,
size_t length = (len > buf_len ? buf_len : len);
ACE_OS::memcpy ((char *) buf + count,
- (char *) this->recv_buffer_ + this->cur_offset_,
+ (char *) this->recv_buffer_->data () + this->cur_offset_,
length);
this->cur_offset_ += length;
// len -= length;
@@ -155,8 +211,8 @@ ACE_MEM_IO::recv (void *buf, size_t n)
ASYS_INLINE ssize_t
ACE_MEM_IO::recv (void *buf,
- size_t len,
- const ACE_Time_Value *timeout)
+ size_t len,
+ const ACE_Time_Value *timeout)
{
ACE_TRACE ("ACE_MEM_IO::recv");
return this->recv (buf, len, 0, timeout);
@@ -164,8 +220,8 @@ ACE_MEM_IO::recv (void *buf,
ASYS_INLINE ssize_t
ACE_MEM_IO::send (const void *buf,
- size_t len,
- const ACE_Time_Value *timeout)
+ size_t len,
+ const ACE_Time_Value *timeout)
{
ACE_TRACE ("ACE_MEM_IO::send");
return this->send (buf, len, 0, timeout);
diff --git a/ace/MEM_SAP.cpp b/ace/MEM_SAP.cpp
index 88725dcad8b..ded84c604e6 100644
--- a/ace/MEM_SAP.cpp
+++ b/ace/MEM_SAP.cpp
@@ -26,7 +26,8 @@ ACE_MEM_SAP::dump (void) const
}
ACE_MEM_SAP::ACE_MEM_SAP (void)
- : shm_malloc_ (0)
+ : handle_ (ACE_INVALID_HANDLE),
+ shm_malloc_ (0)
{
// ACE_TRACE ("ACE_MEM_SAP::ACE_MEM_SAP");
}
@@ -52,32 +53,26 @@ ACE_MEM_SAP::create_shm_malloc (const ACE_TCHAR *name,
int
ACE_MEM_SAP::close_shm_malloc (const int remove)
{
+ ACE_TRACE ("ACE_MEM_SAP::close_shm_malloc");
+
if (this->shm_malloc_ != 0 && remove != 0)
- {
- this->shm_malloc_->remove ();
- return 0;
- }
+ return this->shm_malloc_->remove ();
+
return -1;
}
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Write_Guard<ACE_Process_Mutex>;
template class ACE_Read_Guard<ACE_Process_Mutex>;
-#if (ACE_HAS_POSITION_INDEPENDENT_POINTERS == 1)
template class ACE_Malloc_T<ACE_MMAP_MEMORY_POOL, ACE_Process_Mutex, ACE_PI_Control_Block>;
-#else
-template class ACE_Malloc<ACE_MMAP_MEMORY_POOL, ACE_Process_Mutex>;
-template class ACE_Malloc_T<ACE_MMAP_MEMORY_POOL, ACE_Process_Mutex, ACE_Control_Block>;
-#endif /* ACE_HAS_POSITION_INDEPENDENT_POINTERS == 1 */
+template class ACE_Based_Pointer<ACE_MEM_SAP_Node>;
+template class ACE_Based_Pointer_Basic<ACE_MEM_SAP_Node>;
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate ACE_Write_Guard<ACE_Process_Mutex>
#pragma instantiate ACE_Read_Guard<ACE_Process_Mutex>
-#if (ACE_HAS_POSITION_INDEPENDENT_POINTERS == 1)
#pragma instantiate ACE_Malloc_T<ACE_MMAP_MEMORY_POOL, ACE_Process_Mutex, ACE_PI_Control_Block>
-#else
-#pragma instantiate ACE_Malloc<ACE_MMAP_MEMORY_POOL, ACE_Process_Mutex>
-#pragma instantiate ACE_Malloc_T<ACE_MMAP_MEMORY_POOL, ACE_Process_Mutex, ACE_Control_Block>
-#endif /* ACE_HAS_POSITION_INDEPENDENT_POINTERS == 1 */
+#pragma instantiate ACE_Based_Pointer<ACE_MEM_SAP_Node>
+#pragma instantiate ACE_Based_Pointer_Basic<ACE_MEM_SAP_Node>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
#endif /* ACE_HAS_POSITION_INDEPENDENT_POINTERS == 1 */
diff --git a/ace/MEM_SAP.h b/ace/MEM_SAP.h
index 1a480b81884..8f62dbb9580 100644
--- a/ace/MEM_SAP.h
+++ b/ace/MEM_SAP.h
@@ -26,8 +26,47 @@
#include "ace/Process_Mutex.h"
+class ACE_MEM_SAP;
+class ACE_Reactive_MEM_IO;
+class ACE_MT_MEM_IO;
+class ACE_MEM_IO;
+
+// Internal data structure
+// MEM_SAP uses to queue up
+// data.
+class ACE_Export ACE_MEM_SAP_Node
+{
+public:
+// friend class ACE_MEM_SAP;
+// friend class ACE_Reactive_MEM_IO;
+// friend class ACE_MT_MEM_IO;
+// friend class ACE_MEM_IO;
+
+ typedef ACE_Based_Pointer<ACE_MEM_SAP_Node> ACE_MEM_SAP_NODE_PTR;
+
+ // Initialize the node with its capacity.
+ ACE_MEM_SAP_Node (size_t cap);
+
+ // Get the size of the data we hold.
+ size_t size (void) const;
+
+ // Get the capacity of this block of data.
+ size_t capacity (void) const;
+
+ // Get the pointer to the block of data we hold.
+ void *data (void);
+
+ // The maximum size of this memory block.
+ size_t capacity_;
+
+ // The actualy size used.
+ size_t size_;
+
+ ACE_MEM_SAP_NODE_PTR next_;
+};
+
/**
- * @class ACE_MEM_SAP
+ * @Class ACE_MEM_SAP
*
* @brief Defines the methods of shared memory management for
* shared memory transport.
@@ -41,34 +80,45 @@ public:
typedef ACE_MMAP_Memory_Pool_Options MALLOC_OPTIONS;
/// Destructor.
- ~ACE_MEM_SAP (void);
+ virtual ~ACE_MEM_SAP (void);
- /// request a buffer of size <size>. Return 0 if the <shm_malloc_> is
- /// not initialized.
- void *acquire_buffer (const ssize_t size);
-
- /// release a buffer pointed by <buf>. Return -1 if the <shm_malloc_>
- /// is not initialized.
- int release_buffer (void *buf);
+ /**
+ * Initialize the MEM_SAP object.
+ */
+ virtual int init (ACE_HANDLE handle,
+ const ACE_TCHAR *name,
+ MALLOC_OPTIONS *options) = 0;
/**
- * Set the length of buf (containing information) to <n> bytes.
- * Return the offset of the <buf> relative to the base address.
- * <buf> must be acquired by <get_buffer> method. Return -1 if the
- * <shm_malloc_> is not initialized.
+ * Finalizing the MEM_SAP object. This method doesn't invoke
+ * the <remove> method.
*/
- off_t set_buf_len (void *buf,
- size_t n);
+ virtual int fini (int remove) = 0;
/**
- * Convert the buffer offset <off> to absolute address to <buf>.
- * Return the size of valid information containing in the <buf>,
- * -1 if <shm_malloc_> is not initialized.
+ * Fetch location of next available data into <recv_buffer_>.
+ * As this operation read the address of the data off the socket
+ * using ACE::recv, <timeout> only applies to ACE::recv.
*/
- ssize_t get_buf_len (const off_t off, void *&buf);
+ virtual int recv_buf (ACE_MEM_SAP_Node *&buf,
+ int flags,
+ const ACE_Time_Value *timeout) = 0;
- /// Remove the shared resouce (mmap file) used by us.
- int remove (void);
+ /**
+ * Wait to to <timeout> amount of time to send <buf>. If <send>
+ * times out a -1 is returned with <errno == ETIME>. If it succeeds
+ * the number of bytes sent is returned. */
+ virtual int send_buf (ACE_MEM_SAP_Node *buf,
+ int flags,
+ const ACE_Time_Value *timeout) = 0;
+
+ /// request a buffer of size <size>. Return 0 if the <shm_malloc_> is
+ /// not initialized.
+ ACE_MEM_SAP_Node *acquire_buffer (const ssize_t size);
+
+ /// release a buffer pointed by <buf>. Return -1 if the <shm_malloc_>
+ /// is not initialized.
+ int release_buffer (ACE_MEM_SAP_Node *buf);
/// Dump the state of an object.
void dump (void) const;
@@ -86,12 +136,14 @@ protected:
* communication.
*/
int create_shm_malloc (const ACE_TCHAR *name,
- MALLOC_OPTIONS *options = 0);
+ MALLOC_OPTIONS *options);
/// Close down the share memory pool. If <remove> != 0, then the
/// mmap file will also get removed.
int close_shm_malloc (const int remove = 0);
+ ACE_HANDLE handle_;
+
/// Data exchange channel.
MALLOC_TYPE *shm_malloc_;
diff --git a/ace/MEM_SAP.i b/ace/MEM_SAP.i
index fea55e230ea..d1acbf79ce4 100644
--- a/ace/MEM_SAP.i
+++ b/ace/MEM_SAP.i
@@ -3,6 +3,33 @@
// MEM_SAP.i
+
+ASYS_INLINE
+ACE_MEM_SAP_Node::ACE_MEM_SAP_Node (size_t cap)
+ : capacity_ (cap),
+ size_ (0),
+ next_ (0)
+{
+}
+
+ASYS_INLINE size_t
+ACE_MEM_SAP_Node::size (void) const
+{
+ return this->size_;
+}
+
+ASYS_INLINE size_t
+ACE_MEM_SAP_Node::capacity (void) const
+{
+ return this->capacity_;
+}
+
+ASYS_INLINE void *
+ACE_MEM_SAP_Node::data (void)
+{
+ return this + 1;
+}
+
ASYS_INLINE
ACE_MEM_SAP::~ACE_MEM_SAP (void)
{
@@ -11,81 +38,30 @@ ACE_MEM_SAP::~ACE_MEM_SAP (void)
}
-ASYS_INLINE void *
+ASYS_INLINE ACE_MEM_SAP_Node *
ACE_MEM_SAP::acquire_buffer (const ssize_t size)
{
ACE_TRACE ("ACE_MEM_SAP::acquire_buffer");
if (this->shm_malloc_ == 0)
return 0; // not initialized.
- size_t *lptr = ACE_static_cast (size_t *,
- this->shm_malloc_->malloc (sizeof (size_t) + size));
-
- *lptr = size;
- ++lptr;
+ ACE_MEM_SAP_Node *buf =
+ ACE_reinterpret_cast (ACE_MEM_SAP_Node *,
+ this->shm_malloc_->malloc (sizeof (ACE_MEM_SAP_Node)
+ + size));
+ if (buf != 0)
+ return new (buf) ACE_MEM_SAP_Node (size);
- return lptr;
+ return 0;
}
ASYS_INLINE int
-ACE_MEM_SAP::release_buffer (void *buf)
+ACE_MEM_SAP::release_buffer (ACE_MEM_SAP_Node *buf)
{
ACE_TRACE ("ACE_MEM_SAP::release_buffer");
if (this->shm_malloc_ == 0)
return -1; // not initialized.
- size_t *lptr = ACE_static_cast (size_t *, buf);
-
- --lptr;
- this->shm_malloc_->free (lptr);
+ this->shm_malloc_->free (buf);
return 0;
}
-
-ASYS_INLINE off_t
-ACE_MEM_SAP::set_buf_len (void *buf, size_t n)
-{
- ACE_TRACE ("ACE_MEM_SAP::set_buf_len");
- if (this->shm_malloc_ == 0)
- return -1;
-
- size_t *lptr = ACE_static_cast (size_t *, buf);
- --lptr;
-
- if (*lptr >= n)
- *lptr = n;
-
- return ((char *) lptr - (char *) this->shm_malloc_->base_addr ());
-}
-
-ASYS_INLINE ssize_t
-ACE_MEM_SAP::get_buf_len (const off_t off, void *&buf)
-{
-#if !defined (ACE_HAS_WIN32_STRUCTURAL_EXCEPTIONS)
- ACE_TRACE ("ACE_MEM_SAP::get_buf_len");
-#endif /* ACE_HAS_WIN32_STRUCTURAL_EXCEPTIONS */
-
- if (this->shm_malloc_ == 0)
- return -1;
-
- ssize_t retv = 0;
-
- ACE_SEH_TRY
- {
- size_t *lptr = (size_t*) ((char *) this->shm_malloc_->base_addr () + off);
- buf = lptr + 1;
- retv = *lptr;
- }
- ACE_SEH_EXCEPT (this->shm_malloc_->memory_pool ().seh_selector (GetExceptionInformation ()))
- {
- }
-
- return retv;
-}
-
-ASYS_INLINE int
-ACE_MEM_SAP::remove (void)
-{
- ACE_TRACE ("ACE_MEM_SAP::remove");
-
- return close_shm_malloc (1);
-}
diff --git a/ace/MEM_Stream.cpp b/ace/MEM_Stream.cpp
index 0b1f9ca3394..239262c8d5d 100644
--- a/ace/MEM_Stream.cpp
+++ b/ace/MEM_Stream.cpp
@@ -21,6 +21,8 @@ ACE_MEM_Stream::dump (void) const
int
ACE_MEM_Stream::close (void)
{
+ this->send ((char *)0, 0);
+
#if defined (ACE_WIN32)
// We need the following call to make things work correctly on
// Win32, which requires use to do a <close_writer> before doing the
diff --git a/tests/MEM_Stream_Test.cpp b/tests/MEM_Stream_Test.cpp
index dfdc7d4e560..de0a0bb78be 100644
--- a/tests/MEM_Stream_Test.cpp
+++ b/tests/MEM_Stream_Test.cpp
@@ -38,26 +38,36 @@ ACE_RCSID(tests, MEM_Stream_Test, "$Id$")
static int opt_wfmo_reactor = 1;
static int opt_select_reactor = 1;
+static ACE_MEM_IO::Signal_Strategy client_strategy = ACE_MEM_IO::Reactive;
-u_short Echo_Handler::waiting_ = NO_OF_CONNECTION;
+ACE_Atomic_Op <ACE_Thread_Mutex, u_short> Echo_Handler::waiting_ = NO_OF_CONNECTION;
u_short Echo_Handler::connection_count_ = 0;
typedef ACE_Acceptor<Echo_Handler, ACE_MEM_ACCEPTOR> ACCEPTOR;
+typedef ACE_Strategy_Acceptor<Echo_Handler, ACE_MEM_ACCEPTOR> S_ACCEPTOR;
-// int
-// Echo_Handler::open (void *)
-// {
-// // @@ Nanbor, this method doesn't anything?
-// return 0;
-// }
+int
+Echo_Handler::open (void *)
+{
+ // @@ Nanbor, this method doesn't anything?
+ return 0;
+}
-Echo_Handler::Echo_Handler ()
- : connection_ (++Echo_Handler::connection_count_)
+Echo_Handler::Echo_Handler (ACE_Thread_Manager *thr_mgr)
+ : ACE_Svc_Handler<ACE_MEM_STREAM, ACE_MT_SYNCH> (thr_mgr),
+ connection_ (++Echo_Handler::connection_count_)
{
ACE_OS::sprintf (this->name_, ACE_TEXT ("Connection %d --> "),
this->connection_);
}
+void
+Echo_Handler::reset_handler (void)
+{
+ Echo_Handler::waiting_ = NO_OF_CONNECTION;
+ Echo_Handler::connection_count_ = 0;
+}
+
int
Echo_Handler::handle_input (ACE_HANDLE)
{
@@ -88,11 +98,17 @@ Echo_Handler::handle_input (ACE_HANDLE)
int
Echo_Handler::handle_close (ACE_HANDLE,
- ACE_Reactor_Mask)
+ ACE_Reactor_Mask mask)
{
// Reduce count.
this->waiting_--;
+#if 1
+ if (client_strategy != ACE_MEM_IO::Reactive)
+ this->reactor ()->remove_handler (this,
+ mask | ACE_Event_Handler::DONT_CALL);
+#endif /* tests */
+
// If no connections are open.
if (this->waiting_ == 0)
ACE_Reactor::instance ()->end_event_loop ();
@@ -102,19 +118,31 @@ Echo_Handler::handle_close (ACE_HANDLE,
this->connection_));
// Shutdown
- this->peer ().remove ();
+ this->peer ().close ();
+ this->peer ().fini (1);
this->destroy ();
return 0;
}
+int
+Echo_Handler::svc (void)
+{
+ while (this->handle_input (this->get_handle ()) >= 0)
+ ;
+ return 0;
+}
+
void *
connect_client (void *arg)
{
u_short sport = (*ACE_reinterpret_cast (u_short *, arg));
ACE_MEM_Addr to_server (sport);
ACE_MEM_Connector connector;
+ connector.preferred_strategy (client_strategy);
ACE_MEM_Stream stream;
+ // connector.preferred_strategy (ACE_MEM_IO::MT);
+
if (connector.connect (stream, to_server.get_remote_addr ()) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("%p\n"), ACE_TEXT ("connect_client")),
@@ -166,22 +194,84 @@ create_reactor (void)
ACE_Reactor::instance (reactor);
}
-int
-main (int, ACE_TCHAR *[])
+int test_reactive (ACE_MEM_Addr &server_addr)
{
- ACE_START_TEST (ACE_TEXT ("MEM_Stream_Test"));
+ ACE_DEBUG ((LM_DEBUG, "Testing Reactive MEM_Stream\n\n"));
+
+ ACE_Accept_Strategy<Echo_Handler, ACE_MEM_ACCEPTOR> accept_strategy;
+ ACE_Creation_Strategy<Echo_Handler> create_strategy;
+ ACE_Reactive_Strategy<Echo_Handler> reactive_strategy (ACE_Reactor::instance ());
+ S_ACCEPTOR acceptor;
+ if (acceptor.open (server_addr,
+ ACE_Reactor::instance (),
+ &create_strategy,
+ &accept_strategy,
+ &reactive_strategy) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("MEM_Acceptor::accept\n")), 1);
+ acceptor.acceptor ().mmap_prefix (ACE_TEXT ("MEM_Acceptor_"));
- create_reactor ();
+ ACE_MEM_Addr local_addr;
+ if (acceptor.acceptor ().get_local_addr (local_addr) == -1)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("MEM_Acceptor::get_local_addr\n")),
+ 1);
+ }
- unsigned short port = 0;
- ACE_MEM_Addr server_addr (port);
+ u_short sport = local_addr.get_port_number ();
- ACCEPTOR acceptor;
- acceptor.acceptor ().mmap_prefix (ACE_TEXT ("MEM_Acceptor_"));
- if (acceptor.open (server_addr) == -1)
+ ACE_Thread_Manager::instance ()->spawn_n (NO_OF_CONNECTION,
+ connect_client,
+ &sport);
+ ACE_Time_Value tv(60, 0);
+ ACE_Reactor::instance ()->run_event_loop (tv);
+
+ if (tv == ACE_Time_Value::zero)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT("Reactor::run_event_loop timeout\n")),
+ 1);
+ }
+
+ ACE_DEBUG ((LM_DEBUG, "Reactor::run_event_loop finished\n"));
+
+ ACE_Thread_Manager::instance ()->wait ();
+
+ if (acceptor.close () == -1)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("MEM_Acceptor::close\n")),
+ 1);
+ }
+ return 0;
+}
+
+int test_multithreaded (ACE_MEM_Addr &server_addr)
+{
+ ACE_DEBUG ((LM_DEBUG, "Testing Multithreaded MEM_Stream\n\n"));
+
+ Echo_Handler::reset_handler ();
+
+ client_strategy = ACE_MEM_IO::MT;
+ ACE_Accept_Strategy<Echo_Handler, ACE_MEM_ACCEPTOR> accept_strategy;
+ ACE_Creation_Strategy<Echo_Handler> create_strategy;
+ ACE_Thread_Strategy<Echo_Handler> thr_strategy;
+ S_ACCEPTOR acceptor;
+
+
+ if (acceptor.open (server_addr,
+ ACE_Reactor::instance (),
+ &create_strategy,
+ &accept_strategy,
+ &thr_strategy) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("MEM_Acceptor::accept\n")), 1);
+ acceptor.acceptor ().malloc_options ().minimum_bytes_ = 1024 * 1024 ;
+ acceptor.acceptor ().mmap_prefix (ACE_TEXT ("MEM_Acceptor_"));
+ acceptor.acceptor ().preferred_strategy (ACE_MEM_IO::MT);
+
ACE_MEM_Addr local_addr;
if (acceptor.acceptor ().get_local_addr (local_addr) == -1)
{
@@ -215,6 +305,26 @@ main (int, ACE_TCHAR *[])
ACE_TEXT ("MEM_Acceptor::close\n")),
1);
}
+ return 0;
+}
+
+int
+main (int, ACE_TCHAR *[])
+{
+ ACE_START_TEST (ACE_TEXT ("MEM_Stream_Test"));
+
+ create_reactor ();
+
+ unsigned short port = 0;
+ ACE_MEM_Addr server_addr (port);
+
+ test_reactive (server_addr);
+
+ ACE_Reactor::instance ()->reset_event_loop ();
+
+ test_multithreaded (server_addr);
+
+ // Now testing
ACE_END_TEST;
return 0;
@@ -223,9 +333,25 @@ main (int, ACE_TCHAR *[])
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Svc_Handler <ACE_MEM_STREAM, ACE_MT_SYNCH>;
template class ACE_Acceptor<Echo_Handler, ACE_MEM_ACCEPTOR>;
+template class ACE_Atomic_Op<ACE_Thread_Mutex, u_short>;
+template class ACE_Accept_Strategy<Echo_Handler, ACE_MEM_ACCEPTOR>;
+template class ACE_Creation_Strategy<Echo_Handler>;
+template class ACE_Reactive_Strategy<Echo_Handler>;
+template class ACE_Strategy_Acceptor<Echo_Handler, ACE_MEM_ACCEPTOR>;
+template class ACE_Concurrency_Strategy<Echo_Handler>;
+template class ACE_Scheduling_Strategy<Echo_Handler>;
+template class ACE_Thread_Strategy<Echo_Handler>;
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate ACE_Svc_Handler <ACE_MEM_STREAM, ACE_MT_SYNCH>
#pragma instantiate ACE_Acceptor<Echo_Handler, ACE_MEM_ACCEPTOR>
+#pragma instantiate ACE_Atomic_Op<ACE_Thread_Mutex, u_short>
+#pragma instantiate ACE_Accept_Strategy<Echo_Handler, ACE_MEM_ACCEPTOR>
+#pragma instantiate ACE_Creation_Strategy<Echo_Handler>
+#pragma instantiate ACE_Reactive_Strategy<Echo_Handler>
+#pragma instantiate ACE_Strategy_Acceptor<Echo_Handler, ACE_MEM_ACCEPTOR>
+#pragma instantiate ACE_Concurrency_Strategy<Echo_Handler>
+#pragma instantiate ACE_Scheduling_Strategy<Echo_Handler>
+#pragma instantiate ACE_Thread_Strategy<Echo_Handler>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
#else
diff --git a/tests/MEM_Stream_Test.h b/tests/MEM_Stream_Test.h
index 8f6ea264525..a681e9486dd 100644
--- a/tests/MEM_Stream_Test.h
+++ b/tests/MEM_Stream_Test.h
@@ -37,15 +37,17 @@ class Echo_Handler : public ACE_Svc_Handler<ACE_MEM_STREAM, ACE_MT_SYNCH>
// = TITLE
// Simple class for reading in the data and then sending it back
public:
- Echo_Handler ();
- // virtual int open (void *);
+ Echo_Handler (ACE_Thread_Manager *thr_mgr = 0);
+ virtual int open (void *);
+ static void reset_handler (void);
virtual int handle_input (ACE_HANDLE h);
virtual int handle_close (ACE_HANDLE handle,
ACE_Reactor_Mask close_mask);
// The Svc_Handler callbacks.
+ virtual int svc (void);
public:
- static u_short waiting_;
+ static ACE_Atomic_Op <ACE_Thread_Mutex, u_short> waiting_;
// How many connections are we waiting for.
static u_short connection_count_;