diff options
Diffstat (limited to 'examples/APG/ThreadSafety/Semaphore.cpp')
-rw-r--r-- | examples/APG/ThreadSafety/Semaphore.cpp | 133 |
1 files changed, 133 insertions, 0 deletions
diff --git a/examples/APG/ThreadSafety/Semaphore.cpp b/examples/APG/ThreadSafety/Semaphore.cpp new file mode 100644 index 00000000000..2ab1bb633cd --- /dev/null +++ b/examples/APG/ThreadSafety/Semaphore.cpp @@ -0,0 +1,133 @@ +// $Id$ + +#include "ace/OS_NS_string.h" +#include "ace/Task.h" +#include "ace/Synch.h" + +// Listing 2 code/ch14 +class Consumer : public ACE_Task<ACE_MT_SYNCH> +{ +public: + enum { N_THREADS = 5 }; + + Consumer (ACE_Semaphore& psema, ACE_Semaphore& csema) + : psema_(psema), csema_(csema), exit_condition_(0) + { } + + int svc (void) + { + while (!is_closed ()) + consume_item (); + return 0; + } + + void consume_item () + { + csema_.acquire (); + if (!is_closed ()) + { + ACE_Message_Block *mb; + this->getq (mb); + if (mb->msg_type () == ACE_Message_Block::MB_HANGUP) + { + shutdown (); + mb->release (); + return; + } + else + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Consumed %d\n"), + *((int*)mb->rd_ptr ()))); + mb->release(); + } + psema_.release (); + } + } + + void shutdown (void) + { + exit_condition_ = 1; + this->msg_queue ()->deactivate (); + csema_.release (N_THREADS); + } + + int is_closed (void) + { + return exit_condition_; + } + +private: + ACE_Semaphore& psema_; + ACE_Semaphore& csema_; + int exit_condition_; +}; +// Listing 2 +// Listing 1 code/ch14 +class Producer : public ACE_Task_Base +{ +public: + enum { MAX_PROD = 128 }; + + Producer (ACE_Semaphore& psema, ACE_Semaphore& csema, + Consumer &consumer) + : psema_(psema), csema_(csema), consumer_(consumer) + { } + + int svc (void) + { + for (int i = 0; i <= MAX_PROD; i++) + produce_item (i); + hang_up (); + return 0; + } + + void produce_item (int item) + { + psema_.acquire (); + ACE_Message_Block *mb + = new ACE_Message_Block (sizeof (int), + ACE_Message_Block::MB_DATA); + ACE_OS::memcpy (mb->wr_ptr (), &item, sizeof item); + mb->wr_ptr (sizeof (int)); + this->consumer_.putq (mb); + + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Produced %d\n"), item)); + csema_.release(); + } + + void hang_up () + { + psema_.acquire (); + ACE_Message_Block *mb = + new ACE_Message_Block (0, ACE_Message_Block::MB_HANGUP); + this->consumer_.putq (mb); + csema_.release (); + } + +private: + ACE_Semaphore& psema_; + ACE_Semaphore& csema_; + Consumer& consumer_; +}; +// Listing 1 +// Listing 3 code/ch14 +int ACE_TMAIN (int, ACE_TCHAR *[]) +{ + ACE_Semaphore psem (5); + ACE_Semaphore csem (0); + + Consumer consumer (psem, csem); + Producer producer (psem, csem, consumer); + + producer.activate (); + consumer.activate (THR_NEW_LWP | THR_JOINABLE, + Consumer::N_THREADS); + + producer.wait (); + consumer.wait (); + + return 0; +} +// Listing 3 + |