diff options
author | schmidt <douglascraigschmidt@users.noreply.github.com> | 1998-08-22 23:04:53 +0000 |
---|---|---|
committer | schmidt <douglascraigschmidt@users.noreply.github.com> | 1998-08-22 23:04:53 +0000 |
commit | 9b0fd2e0d56403aa3a105e33ae474543d7e8e87a (patch) | |
tree | 23362b65b7355f747b816795dc870a0d2afa0615 /tests/Message_Queue_Notifications_Test.cpp | |
parent | d751b392181b3a593ee13f997fdee36491a6ae26 (diff) | |
download | ATCD-9b0fd2e0d56403aa3a105e33ae474543d7e8e87a.tar.gz |
*** empty log message ***
Diffstat (limited to 'tests/Message_Queue_Notifications_Test.cpp')
-rw-r--r-- | tests/Message_Queue_Notifications_Test.cpp | 319 |
1 files changed, 319 insertions, 0 deletions
diff --git a/tests/Message_Queue_Notifications_Test.cpp b/tests/Message_Queue_Notifications_Test.cpp new file mode 100644 index 00000000000..360d62ae9e5 --- /dev/null +++ b/tests/Message_Queue_Notifications_Test.cpp @@ -0,0 +1,319 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// tests +// +// = FILENAME +// MsgQueue_Notification_Test.cpp +// +// = DESCRIPTION +// There are two tests that test 2 different notification mechanisms +// in Message Queue. +// +// The first test illustrates the notification mechanisms in +// Message_Queue and its integration with Reactor. +// +// Note the following things about this part of the test: +// +// 1. Multiple threads are not required. +// 2. You do not have to explicitly notify the Reactor +// 3. This code will work the same with any Reactor Implementation +// 4. handle_input, handle_exception, handle_output are the only +// callbacks supported by this mechanism +// 5. The notification mechanism need not notify the Reactor. You can +// write your own strategy classes that can do whatever application +// specific behavior you want. +// +// The second test also makes sure the high/low water mark +// signaling mechanism works flawlessly. +// +// = AUTHOR +// Irfan Pyarali and Nanbor Wang. +// +// ============================================================================ + +#include "test_config.h" +#include "ace/Reactor.h" +#include "ace/Strategies.h" +#include "ace/Task.h" + +ACE_RCSID(tests, Message_Queue_Notifications_Test.cpp, "$Id$") + +#if defined(__BORLANDC__) && __BORLANDC__ >= 0x0530 +USELIB("..\ace\aced.lib"); +//--------------------------------------------------------------------------- +#endif /* defined(__BORLANDC__) && __BORLANDC__ >= 0x0530 */ + +static int iterations = 10; + +static const size_t worker_threads = 2; +static const char * default_message = "ACE RULES"; +static const size_t default_high_water_mark = 20; +static const size_t default_low_water_mark = 10; +static const int watermark_iterations = 2 * default_high_water_mark; + +class Message_Handler : public ACE_Task<ACE_NULL_SYNCH> +{ +public: + Message_Handler (ACE_Reactor &reactor); + + virtual int handle_input (ACE_HANDLE); + virtual int handle_output (ACE_HANDLE fd = ACE_INVALID_HANDLE); + virtual int handle_exception (ACE_HANDLE fd = ACE_INVALID_HANDLE); + +private: + int process_message (void); + void make_message (void); + + ACE_Reactor_Notification_Strategy notification_strategy_; +}; + +class Watermark_Test : public ACE_Task<ACE_SYNCH> +{ +public: + Watermark_Test (); + + virtual int svc (); + + int consumer (); + int producer (); + int put_message (ACE_Time_Value* timeout = 0); + int get_message (void); + void print_producer_debug_message (void); + +private: + const size_t len_, hwm_, lwm_; + ACE_Atomic_Op <ACE_SYNCH_MUTEX, int> role_; + ACE_Barrier mq_full_, mq_low_water_mark_hit_; +}; + +Message_Handler::Message_Handler (ACE_Reactor &reactor) + // First time handle_input will be called + : notification_strategy_ (&reactor, + this, + ACE_Event_Handler::READ_MASK) +{ + this->msg_queue ()->notification_strategy (&this->notification_strategy_); + this->make_message (); +} + +int +Message_Handler::handle_input (ACE_HANDLE) +{ + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("Message_Handler::handle_input\n"))); + + // Next time handle_output will be called + this->notification_strategy_.mask (ACE_Event_Handler::WRITE_MASK); + + return process_message (); +} + +int +Message_Handler::handle_output (ACE_HANDLE fd) +{ + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("Message_Handler::handle_output\n"))); + ACE_UNUSED_ARG (fd); + + // Next time handle_exception will be called + this->notification_strategy_.mask (ACE_Event_Handler::EXCEPT_MASK); + + return process_message (); +} + +int +Message_Handler::handle_exception (ACE_HANDLE fd) +{ + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("Message_Handler::handle_exception\n"))); + ACE_UNUSED_ARG (fd); + + // Next time handle_input will be called + this->notification_strategy_.mask (ACE_Event_Handler::READ_MASK); + + return process_message (); +} + +int +Message_Handler::process_message (void) +{ + ACE_Message_Block *mb; + + if (this->getq (mb, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1) + ACE_ERROR_RETURN ((LM_ERROR, ASYS_TEXT ("%p\n"), ASYS_TEXT ("dequeue_head")), -1); + else + { + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("message received = %s\n"), mb->rd_ptr ())); + delete mb; + } + + this->make_message (); + return 0; +} + +void +Message_Handler::make_message (void) +{ + if (--iterations > 0) + { + ACE_Message_Block *mb = new ACE_Message_Block ((char *) ASYS_TEXT ("hello")); + + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("sending message\n"))); + this->putq (mb); + } +} + +Watermark_Test::Watermark_Test () + : len_ (ACE_OS::strlen (default_message) + 1), + hwm_ (this->len_ * default_high_water_mark), + lwm_ (this->len_ * default_low_water_mark), + role_ (0), + mq_full_ (worker_threads), + mq_low_water_mark_hit_ (worker_threads) +{ + this->water_marks (ACE_IO_Cntl_Msg::SET_LWM, + this->lwm_); + this->water_marks (ACE_IO_Cntl_Msg::SET_HWM, + this->hwm_); +} + +int +Watermark_Test::producer (void) +{ + int i, hwm; + + for (hwm = this->hwm_, i = watermark_iterations; + hwm >= 0 ; + hwm -= this->len_, i--) + { + this->put_message (); + this->print_producer_debug_message (); + } + ACE_DEBUG ((LM_DEBUG, "(%P|%t) Producer: High water mark hit ---- \n")); + this->mq_full_.wait (); + + // The following put_message should block until the message queue + // has dropped under the lwm. + this->put_message (); + ACE_ASSERT (this->msg_queue ()-> message_bytes () <= this->lwm_ + this->len_); + this->print_producer_debug_message (); + + for (i--; i >= 0 ; i--) + { + this->put_message (); + this->print_producer_debug_message (); + } + return 0; +} + +int +Watermark_Test::consumer (void) +{ + this->mq_full_.wait (); + ACE_OS::sleep (1); + // Let producer proceed and block in putq. + + for (int i = watermark_iterations; i >= 0; i--) + { + this->get_message (); + ACE_OS::sleep (0); + } + return 0; +} + +int +Watermark_Test::get_message (void) +{ + ACE_Message_Block *mb; + + if (this->getq (mb) == -1) + ACE_ERROR_RETURN ((LM_ERROR, ASYS_TEXT ("%p\n"), ASYS_TEXT ("dequeue_head")), -1); + else + { + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("(%P|%t) Consumer: message size = %3d, ") + ASYS_TEXT ("message count = %3d\n"), + this->msg_queue ()-> message_bytes (), + this->msg_queue ()-> message_count ())); + delete mb; + } + return 0; +} + +int +Watermark_Test::put_message (ACE_Time_Value *timeout) +{ + ACE_Message_Block *mb; + ACE_NEW_RETURN (mb, + ACE_Message_Block (default_message, this->len_), + -1); + + return this->putq (mb, timeout); +} + +void +Watermark_Test::print_producer_debug_message (void) +{ + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("(%P|%t) Producer: message size = %3d, ") + ASYS_TEXT ("message count = %3d\n"), + this->msg_queue ()-> message_bytes (), + this->msg_queue ()-> message_count ())); +} + +int +Watermark_Test::svc (void) +{ + int role = this->role_.value (); + this->role_++; + + switch (role) + { + case 0: + this->producer (); + break; + case 1: + this->consumer (); + break; + default: + break; + } + return 0; +} + +int +main (int, ASYS_TCHAR *[]) +{ + ACE_START_TEST (ASYS_TEXT ("Message_Queue_Notifications_Test.cpp")); + + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("Starting message queue reactive notification test...\n"))); + ACE_Reactor reactor; + Message_Handler mh (reactor); + + while (iterations > 0) + reactor.handle_events (); + +#if defined (ACE_HAS_THREADS) + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("Starting message queue watermark test...\n"))); + Watermark_Test watermark_test; + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("High water mark is %d\n") + ASYS_TEXT ("Low water mark is %d\n"), + default_high_water_mark, + default_low_water_mark)); + + watermark_test.activate (THR_NEW_LWP, worker_threads); + + ACE_Thread_Manager::instance ()->wait (); +#else + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("Message queue watermark test not performed because threads are not supported\n"))); +#endif /* ACE_HAS_THREADS */ + + ACE_END_TEST; + return 0; +} + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +template class ACE_Atomic_Op<ACE_SYNCH_MUTEX, int>; +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +#pragma instantiate ACE_Atomic_Op<ACE_SYNCH_MUTEX, int> +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ |