summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/Buffer_Stream_Test.cpp2
-rw-r--r--tests/Future_Test.cpp16
-rw-r--r--tests/Priority_Buffer_Test.cpp2
-rw-r--r--tests/Reactors_Test.cpp2
-rw-r--r--tests/Thread_Pool_Test.cpp171
-rw-r--r--tests/UPIPE_SAP_Test.cpp2
6 files changed, 113 insertions, 82 deletions
diff --git a/tests/Buffer_Stream_Test.cpp b/tests/Buffer_Stream_Test.cpp
index aa5ec27414c..2ba15d15cc3 100644
--- a/tests/Buffer_Stream_Test.cpp
+++ b/tests/Buffer_Stream_Test.cpp
@@ -174,7 +174,7 @@ Consumer::svc (void)
ACE_ASSERT (c == output[0]);
c++;
}
- delete mb;
+ mb->release ();
if (length == 0)
{
diff --git a/tests/Future_Test.cpp b/tests/Future_Test.cpp
index a02542c9aff..5f01738de12 100644
--- a/tests/Future_Test.cpp
+++ b/tests/Future_Test.cpp
@@ -382,10 +382,10 @@ main (int, char *[])
ACE_DEBUG ((LM_DEBUG,
"(%t) task_count %d future_count %d capsule_count %d methodobject_count %d\n",
- (u_long) task_count,
- (u_long) future_count,
- (u_long) capsule_count,
- (u_long) methodobject_count));
+ (int) task_count,
+ (int) future_count,
+ (int) capsule_count,
+ (int) methodobject_count));
}
// Close things down.
@@ -398,10 +398,10 @@ main (int, char *[])
ACE_DEBUG ((LM_DEBUG,
"(%t) task_count %d future_count %d capsule_count %d methodobject_count %d\n",
- (u_long) task_count,
- (u_long) future_count,
- (u_long) capsule_count,
- (u_long) methodobject_count));
+ (int) task_count,
+ (int) future_count,
+ (int) capsule_count,
+ (int) methodobject_count));
ACE_DEBUG ((LM_DEBUG,"(%t) th' that's all folks!\n"));
diff --git a/tests/Priority_Buffer_Test.cpp b/tests/Priority_Buffer_Test.cpp
index 09f1b7635d9..3a1134c63a2 100644
--- a/tests/Priority_Buffer_Test.cpp
+++ b/tests/Priority_Buffer_Test.cpp
@@ -68,7 +68,7 @@ consumer (void *args)
// Free up the buffer memory and the Message_Block. Note that
// the destructor of Message Block will delete the the actual
// buffer.
- delete mb;
+ mb->release ();
if (length == 0)
break;
diff --git a/tests/Reactors_Test.cpp b/tests/Reactors_Test.cpp
index 661298efd2d..42de847e165 100644
--- a/tests/Reactors_Test.cpp
+++ b/tests/Reactors_Test.cpp
@@ -127,7 +127,7 @@ Test_Task::handle_input (ACE_HANDLE)
done_count--;
ACE_DEBUG ((LM_DEBUG,
"(%t) handle_input, handled_ = %d, done_count = %d\n",
- this->handled_, (u_long) done_count));
+ this->handled_, (int) done_count));
}
ACE_OS::thr_yield ();
diff --git a/tests/Thread_Pool_Test.cpp b/tests/Thread_Pool_Test.cpp
index 2363850e7f3..ee4ce7c1ffa 100644
--- a/tests/Thread_Pool_Test.cpp
+++ b/tests/Thread_Pool_Test.cpp
@@ -10,20 +10,21 @@
//
// = DESCRIPTION
// This test program illustrates how the ACE task synchronization
-// mechanisms work in conjunction with the ACE_Task and the
-// ACE_Thread_Manager. If the manual flag is not set input comes
-// from stdin until the user enters a return only. This stops
-// all workers via a message block of length 0. This is an
-// alternative shutdown of workers compared to queue deactivate.
+// mechanisms and ACE_Message_Block reference counting works in
+// conjunction with the ACE_Task and the ACE_Thread_Manager. If
+// the manual flag is not set input comes from stdin until the
+// user enters a return. This stops all workers via a message
+// block of length 0. This shows an alternative way to shutdown
+// worker tasks compared to queue deactivate.
//
// = AUTHOR
// Karlheinz Dorn, Doug Schmidt, and Prashant Jain
//
// ============================================================================
+#define protected public
#include "ace/Task.h"
#include "ace/Service_Config.h"
-
#include "ace/Task.h"
#include "test_config.h"
@@ -35,22 +36,35 @@ static size_t n_iterations = 100;
class Thread_Pool : public ACE_Task<ACE_MT_SYNCH>
{
public:
- Thread_Pool (ACE_Thread_Manager *thr_mgr, int n_threads);
+ Thread_Pool (int n_threads);
+ // Create the thread pool containing <n_threads>.
+ ~Thread_Pool (void);
+
+ virtual int open (void * = 0);
+ // Produce the messages that are consumed by the threads in the
+ // thread pool.
+
virtual int svc (void);
// Iterate <n_iterations> time printing off a message and "waiting"
// for all other threads to complete this iteration.
virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv = 0);
- // This allows the producer to pass messages to the <Thread_Pool>.
+ // Allows the producer to pass messages to the <Thread_Pool>.
private:
virtual int close (u_long);
+ // Close hook.
- // = Not needed for this test.
- virtual int open (void *) { return 0; }
+ ACE_Lock_Adapter<ACE_Thread_Mutex> lock_adapter_;
+ // Serialize access to <ACE_Message_Block> reference count, which
+ // will be decremented from multiple threads.
};
+Thread_Pool::~Thread_Pool (void)
+{
+}
+
int
Thread_Pool::close (u_long)
{
@@ -58,9 +72,7 @@ Thread_Pool::close (u_long)
return 0;
}
-Thread_Pool::Thread_Pool (ACE_Thread_Manager *thr_mgr,
- int n_threads)
- : ACE_Task<ACE_MT_SYNCH> (thr_mgr)
+Thread_Pool::Thread_Pool (int n_threads)
{
// Create worker threads.
if (this->activate (THR_NEW_LWP, n_threads) == -1)
@@ -75,42 +87,49 @@ Thread_Pool::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
return this->putq (mb, tv);
}
-// Iterate <n_iterations> time printing off a message and "waiting"
-// for all other threads to complete this iteration.
+// Iterate <n_iterations> printing off a message and "waiting" for all
+// other threads to complete this iteration.
int
Thread_Pool::svc (void)
{
ACE_NEW_THREAD;
- // Note that the ACE_Task::svc_run () method automatically adds us to
- // the Thread_Manager when the thread begins.
-
- int count = 1;
+ // The <ACE_Task::svc_run()> method automatically adds us to the
+ // <ACE_Service_Config>'s <ACE_Thread_Manager> when the thread
+ // begins.
// Keep looping, reading a message out of the queue, until we get a
// message with a length == 0, which signals us to quit.
- for (;; count++)
+ for (int count = 1; ; count++)
{
ACE_Message_Block *mb;
+ ACE_DEBUG ((LM_DEBUG, "(%t) **** before head = %d, tail = %d\n",
+ this->msg_queue ()->head_,
+ this->msg_queue ()->tail_));
ACE_ASSERT (this->getq (mb) != -1);
+ ACE_DEBUG ((LM_DEBUG, "(%t) ++++ after head = %d, tail = %d\n",
+ this->msg_queue ()->head_,
+ this->msg_queue ()->tail_));
+
int length = mb->length ();
if (length > 0)
ACE_DEBUG ((LM_DEBUG,
- "(%t) in iteration %d, length = %d, text = \"%*s\"\n",
- count, length, length - 1, mb->rd_ptr ()));
+ "(%t) in iteration %d, queue len = %d, length = %d, text = \"%*s\"\n",
+ count, this->msg_queue ()->message_count (),
+ length, length - 1, mb->rd_ptr ()));
// We're responsible for deallocating this.
- delete mb;
+ mb->release ();
if (length == 0)
{
ACE_DEBUG ((LM_DEBUG,
- "(%t) in iteration %d, got NULL message, exiting\n",
- count));
+ "(%t) in iteration %d, queue len = %d, got NULL message, exiting\n",
+ count, this->msg_queue ()->message_count ()));
break;
}
}
@@ -120,17 +139,22 @@ Thread_Pool::svc (void)
return 0;
}
-static void
-produce (Thread_Pool &thread_pool)
+int
+Thread_Pool::open (void *)
{
- ACE_DEBUG ((LM_DEBUG, "(%t) producer start, dumping the Thread_Pool\n"));
- thread_pool.dump ();
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) producer start, dumping the Thread_Pool\n"));
+ this->dump ();
+
+ ACE_Message_Block *mb;
for (int n;;)
{
// Allocate a new message.
- ACE_Message_Block *mb;
- ACE_NEW (mb, ACE_Message_Block (BUFSIZ));
+ ACE_NEW_RETURN (mb,
+ ACE_Message_Block (BUFSIZ, ACE_Message_Block::MB_DATA,
+ 0, 0, 0, &this->lock_adapter_),
+ -1);
#if defined (manual)
ACE_DEBUG ((LM_DEBUG,
@@ -140,50 +164,53 @@ produce (Thread_Pool &thread_pool)
static size_t count = 0;
ACE_OS::sprintf (mb->rd_ptr (), "%d\n", count);
-
n = ACE_OS::strlen (mb->rd_ptr ());
- if (count == n_iterations)
- n = 1; // Indicate that we need to shut down.
+ if (count == n_iterations || n <= 1)
+ break;
else
count++;
if (count == 0 || (count % 20 == 0))
ACE_OS::sleep (1);
#endif /* manual */
- if (n > 1)
- {
- // Send a normal message to the waiting threads and continue
- // producing.
- mb->wr_ptr (n);
-
- // Pass the message to the Thread_Pool.
- if (thread_pool.put (mb) == -1)
- ACE_ERROR ((LM_ERROR, " (%t) %p\n", "put"));
- }
- else
- {
- // Send a shutdown message to the waiting threads and exit.
- ACE_DEBUG ((LM_DEBUG, "\n(%t) start loop, dump of task:\n"));
- thread_pool.dump ();
-
- for (int i = thread_pool.thr_count (); i > 0; i--)
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%t) EOF, enqueueing NULL block for thread = %d\n",
- i));
-
- // Enqueue a NULL message to flag each consumer to
- // shutdown.
- if (thread_pool.put (new ACE_Message_Block) == -1)
- ACE_ERROR ((LM_ERROR, " (%t) %p\n", "put"));
- }
-
- ACE_DEBUG ((LM_DEBUG, "\n(%t) end loop, dump of task:\n"));
- thread_pool.dump ();
- break;
- }
+ // Send a normal message to the waiting threads and continue
+ // producing.
+ mb->wr_ptr (n);
+
+ // Pass the message to the Thread_Pool.
+ if (this->put (mb) == -1)
+ ACE_ERROR ((LM_ERROR, " (%t) %p\n", "put"));
}
+
+ // Send a shutdown message to the waiting threads and exit.
+ ACE_DEBUG ((LM_DEBUG,
+ "\n(%t) sending shutdown message to %d threads, dump of task:\n",
+ this->thr_count ()));
+ this->dump ();
+
+ ACE_NEW_RETURN (mb,
+ ACE_Message_Block (0, ACE_Message_Block::MB_DATA,
+ 0, 0, 0, &this->lock_adapter_),
+ -1);
+
+ for (int i = this->thr_count (); i > 0; i--)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) EOF, enqueueing NULL block for thread = %d\n",
+ i));
+
+ // Enqueue an empty message to flag each consumer to shutdown.
+ // Note that we use reference counting to avoid having to copy
+ // the message.
+ if (this->put (mb->duplicate ()) == -1)
+ ACE_ERROR ((LM_ERROR, " (%t) %p\n", "put"));
+ }
+
+ mb->release ();
+
+ ACE_DEBUG ((LM_DEBUG, "\n(%t) end loop, dump of task:\n"));
+ this->dump ();
}
#endif /* ACE_HAS_THREADS */
@@ -198,17 +225,21 @@ main (int, char *[])
ACE_DEBUG ((LM_DEBUG, "(%t) threads = %d\n", n_threads));
// Create the worker tasks.
- Thread_Pool thread_pool (ACE_Service_Config::thr_mgr (),
- n_threads);
+ Thread_Pool thread_pool (n_threads);
// Create work for the worker tasks to process in their own threads.
- produce (thread_pool);
+ thread_pool.open ();
// Wait for all the threads to reach their exit point.
- ACE_DEBUG ((LM_DEBUG, "(%t) waiting with thread manager...\n"));
+ ACE_DEBUG ((LM_DEBUG, "(%t) waiting for worker tasks to finish...\n"));
+
ACE_Service_Config::thr_mgr ()->wait ();
+ ACE_ASSERT (thread_pool.msg_queue ()->is_empty ());
+ ACE_DEBUG ((LM_DEBUG, "(%t) head = %d, tail = %d\n",
+ thread_pool.msg_queue ()->head_,
+ thread_pool.msg_queue ()->tail_));
ACE_DEBUG ((LM_DEBUG, "(%t) destroying worker tasks and exiting...\n"));
#else
ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
diff --git a/tests/UPIPE_SAP_Test.cpp b/tests/UPIPE_SAP_Test.cpp
index 64a851c28bf..1771575b3cb 100644
--- a/tests/UPIPE_SAP_Test.cpp
+++ b/tests/UPIPE_SAP_Test.cpp
@@ -59,7 +59,7 @@ connector (void *)
ACE_ASSERT (ACE_OS::strcmp (mb->rd_ptr (), "thanks") == 0);
// Free up the memory block.
- delete mb;
+ mb->release ();
// Now try the send()/recv() interface.
char mytext[] = "This string is sent by connector as a buffer";