summaryrefslogtreecommitdiff
path: root/tests/Message_Queue_Notifications_Test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'tests/Message_Queue_Notifications_Test.cpp')
-rw-r--r--tests/Message_Queue_Notifications_Test.cpp362
1 files changed, 0 insertions, 362 deletions
diff --git a/tests/Message_Queue_Notifications_Test.cpp b/tests/Message_Queue_Notifications_Test.cpp
deleted file mode 100644
index ac28b577e04..00000000000
--- a/tests/Message_Queue_Notifications_Test.cpp
+++ /dev/null
@@ -1,362 +0,0 @@
-// $Id$
-
-// ============================================================================
-//
-// = LIBRARY
-// tests
-//
-// = FILENAME
-// Message_Queue_Notification_Test.cpp
-//
-// = DESCRIPTION
-// There are two tests that test 2 different notification
-// mechanisms in Message Queue.
-//
-// The first test illustrates the notification mechanisms in
-// Message_Queue and its integration with Reactor.
-//
-// Note the following things about this part of the test:
-//
-// 1. Multiple threads are not required.
-// 2. You do not have to explicitly notify the Reactor
-// 3. This code will work the same with any Reactor Implementation
-// 4. handle_input, handle_exception, handle_output are the only
-// callbacks supported by this mechanism
-// 5. The notification mechanism need not notify the Reactor. You can
-// write your own strategy classes that can do whatever application
-// specific behavior you want.
-//
-// The second test also makes sure the high/low water mark
-// signaling mechanism works flawlessly.
-//
-// = AUTHOR
-// Irfan Pyarali <irfan@cs.wustl.edu> and Nanbor Wang <nanbor@cs.wustl.edu>
-//
-// ============================================================================
-
-#include "test_config.h"
-#include "ace/Reactor.h"
-#include "ace/Strategies.h"
-#include "ace/Task.h"
-
-ACE_RCSID(tests, Message_Queue_Notifications_Test, "$Id$")
-
-static int iterations = 10;
-
-static const size_t worker_threads = 2;
-static const char * default_message = "ACE RULES";
-static const size_t default_high_water_mark = 20;
-static const size_t default_low_water_mark = 10;
-static const int watermark_iterations = 2 * default_high_water_mark;
-
-class Message_Handler : public ACE_Task<ACE_NULL_SYNCH>
-{
- // = TITLE
- // This class implements a notification strategy for the Reactor.
-public:
- // = Initialization and termination.
- Message_Handler (ACE_Reactor &reactor);
- // Constructor.
-
- // = Demuxing hooks.
- virtual int handle_input (ACE_HANDLE);
- virtual int handle_output (ACE_HANDLE fd = ACE_INVALID_HANDLE);
- virtual int handle_exception (ACE_HANDLE fd = ACE_INVALID_HANDLE);
-
-private:
- int process_message (void);
- void make_message (void);
-
- ACE_Reactor_Notification_Strategy notification_strategy_;
-};
-
-class Watermark_Test : public ACE_Task<ACE_SYNCH>
-{
- // = TITLE
- // This class test the correct functioning of build-in flow
- // control machanism in ACE_Task.
-public:
- Watermark_Test (void);
-
- virtual int svc (void);
-
- int consumer (void);
- int producer (void);
- int put_message (ACE_Time_Value* timeout = 0);
- int get_message (void);
- void print_producer_debug_message (void);
-
-private:
- const size_t len_;
- const size_t hwm_;
- const size_t lwm_;
- ACE_Atomic_Op <ACE_SYNCH_MUTEX, int> role_;
-#if defined (ACE_HAS_THREADS)
- ACE_Barrier mq_full_;
- ACE_Barrier mq_low_water_mark_hit_;
-#endif /* ACE_HAS_THREADS */
-};
-
-Message_Handler::Message_Handler (ACE_Reactor &reactor)
- // First time handle_input will be called
- : notification_strategy_ (&reactor,
- this,
- ACE_Event_Handler::READ_MASK)
-{
- this->msg_queue ()->notification_strategy (&this->notification_strategy_);
- this->make_message ();
-}
-
-int
-Message_Handler::handle_input (ACE_HANDLE)
-{
- ACE_DEBUG ((LM_DEBUG,
- ASYS_TEXT ("Message_Handler::handle_input\n")));
-
- // Next time handle_output will be called.
- this->notification_strategy_.mask (ACE_Event_Handler::WRITE_MASK);
-
- return process_message ();
-}
-
-int
-Message_Handler::handle_output (ACE_HANDLE fd)
-{
- ACE_DEBUG ((LM_DEBUG,
- ASYS_TEXT ("Message_Handler::handle_output\n")));
- ACE_UNUSED_ARG (fd);
-
- // Next time handle_exception will be called.
- this->notification_strategy_.mask (ACE_Event_Handler::EXCEPT_MASK);
-
- return process_message ();
-}
-
-int
-Message_Handler::handle_exception (ACE_HANDLE fd)
-{
- ACE_DEBUG ((LM_DEBUG,
- ASYS_TEXT ("Message_Handler::handle_exception\n")));
- ACE_UNUSED_ARG (fd);
-
- // Next time handle_input will be called.
- this->notification_strategy_.mask (ACE_Event_Handler::READ_MASK);
-
- return this->process_message ();
-}
-
-int
-Message_Handler::process_message (void)
-{
- ACE_Message_Block *mb;
-
- if (this->getq (mb,
- (ACE_Time_Value *) &ACE_Time_Value::zero) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ASYS_TEXT ("%p\n"),
- ASYS_TEXT ("dequeue_head")),
- -1);
- else
- {
- ACE_DEBUG ((LM_DEBUG,
- ASYS_TEXT ("message received = %s\n"),
- mb->rd_ptr ()));
- mb->release ();
- }
-
- this->make_message ();
- return 0;
-}
-
-void
-Message_Handler::make_message (void)
-{
- if (--iterations > 0)
- {
- ACE_Message_Block *mb;
- ACE_NEW (mb,
- ACE_Message_Block ((char *) ASYS_TEXT ("hello")));
-
- ACE_DEBUG ((LM_DEBUG,
- ASYS_TEXT ("sending message\n")));
- this->putq (mb);
- }
-}
-
-Watermark_Test::Watermark_Test (void)
- : len_ (ACE_OS::strlen (default_message) + 1),
- hwm_ (this->len_ * default_high_water_mark),
- lwm_ (this->len_ * default_low_water_mark),
- role_ (0)
-#if defined (ACE_HAS_THREADS)
- , mq_full_ (worker_threads),
- mq_low_water_mark_hit_ (worker_threads)
-#endif /* ACE_HAS_THREADS */
-{
- this->water_marks (ACE_IO_Cntl_Msg::SET_LWM,
- this->lwm_);
- this->water_marks (ACE_IO_Cntl_Msg::SET_HWM,
- this->hwm_);
-}
-
-int
-Watermark_Test::producer (void)
-{
- int i = watermark_iterations;
-
- for (int hwm = this->hwm_;
- hwm >= 0 ;
- hwm -= this->len_)
- {
- this->put_message ();
- this->print_producer_debug_message ();
- i--;
- }
- ACE_DEBUG ((LM_DEBUG,
- ASYS_TEXT ("(%P|%t) Producer: High water mark hit ---- \n")));
-
- ACE_MT (this->mq_full_.wait ());
-
- // The following put_message should block until the message queue
- // has dropped under the lwm.
- this->put_message ();
-
- ACE_ASSERT (this->msg_queue ()-> message_bytes () <= this->lwm_ + this->len_);
-
- this->print_producer_debug_message ();
-
- for (i--; i >= 0 ; i--)
- {
- this->put_message ();
- this->print_producer_debug_message ();
- }
-
- return 0;
-}
-
-int
-Watermark_Test::consumer (void)
-{
- ACE_MT (this->mq_full_.wait ());
-
- ACE_OS::sleep (1);
-
- // Let producer proceed and block in putq.
-
- for (int i = watermark_iterations; i >= 0; i--)
- {
- this->get_message ();
- ACE_OS::sleep (0);
- }
-
- return 0;
-}
-
-int
-Watermark_Test::get_message (void)
-{
- ACE_Message_Block *mb;
-
- if (this->getq (mb) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ASYS_TEXT ("%p\n"),
- ASYS_TEXT ("dequeue_head")),
- -1);
- else
- {
- ACE_DEBUG ((LM_DEBUG,
- ASYS_TEXT ("(%P|%t) Consumer: message size = %3d, ")
- ASYS_TEXT ("message count = %3d\n"),
- this->msg_queue ()-> message_bytes (),
- this->msg_queue ()-> message_count ()));
- mb->release ();
- }
-
- return 0;
-}
-
-int
-Watermark_Test::put_message (ACE_Time_Value *timeout)
-{
- ACE_Message_Block *mb;
-
- ACE_NEW_RETURN (mb,
- ACE_Message_Block (default_message,
- this->len_),
- -1);
-
- return this->putq (mb, timeout);
-}
-
-void
-Watermark_Test::print_producer_debug_message (void)
-{
- ACE_DEBUG ((LM_DEBUG,
- ASYS_TEXT ("(%P|%t) Producer: message size = %3d, ")
- ASYS_TEXT ("message count = %3d\n"),
- this->msg_queue ()-> message_bytes (),
- this->msg_queue ()-> message_count ()));
-}
-
-int
-Watermark_Test::svc (void)
-{
- // this->role_ is an Atomic_Op object.
- int role = this->role_++;
-
- switch (role)
- {
- case 0:
- this->producer ();
- break;
- case 1:
- this->consumer ();
- break;
- default:
- break;
- }
- return 0;
-}
-
-int
-main (int, ASYS_TCHAR *[])
-{
- ACE_START_TEST (ASYS_TEXT ("Message_Queue_Notifications_Test"));
-
- ACE_DEBUG ((LM_DEBUG,
- ASYS_TEXT ("Starting message queue reactive notification test...\n")));
-
- ACE_Reactor reactor;
- Message_Handler mh (reactor);
-
- while (iterations > 0)
- reactor.handle_events ();
-
-#if defined (ACE_HAS_THREADS)
- ACE_DEBUG ((LM_DEBUG,
- ASYS_TEXT ("Starting message queue watermark test...\n")));
- Watermark_Test watermark_test;
- ACE_DEBUG ((LM_DEBUG,
- ASYS_TEXT ("High water mark is %d\n")
- ASYS_TEXT ("Low water mark is %d\n"),
- default_high_water_mark,
- default_low_water_mark));
-
- watermark_test.activate (THR_NEW_LWP,
- worker_threads);
-
- ACE_Thread_Manager::instance ()->wait ();
-#else
- ACE_DEBUG ((LM_INFO,
- ASYS_TEXT ("Message queue watermark test not performed because threads are not supported\n")));
-#endif /* ACE_HAS_THREADS */
-
- ACE_END_TEST;
- return 0;
-}
-
-#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
-template class ACE_Atomic_Op<ACE_SYNCH_MUTEX, int>;
-#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
-#pragma instantiate ACE_Atomic_Op<ACE_SYNCH_MUTEX, int>
-#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */