summaryrefslogtreecommitdiff
path: root/tests/Message_Queue_Notifications_Test.cpp
blob: 40b823afe9eee52c2227ba1c1f4b1adf35cadd3a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// $Id$

// ============================================================================
//
// = LIBRARY
//    tests
// 
// = FILENAME
//    Message_Queue_Notification_Test.cpp
//
// = DESCRIPTION
//      This is a test to illustrate the notification mechanisms in
//      Message_Queue and its integration with Reactor.
//
//      Note the following things about this example:
//
//      1. Multiple threads are not required.
//      2. You do not have to explicitly notify the Reactor
//      3. This code will work the same with any Reactor Implementation
//      4. handle_input, handle_exception, handle_output are the only
//         callbacks supported by this mechanism
//      5. The notification mechanism need not notify the Reactor. You can
//         write your own strategy classes that can do whatever application
//         specific behavior you want.
//
// = AUTHOR
//    Irfan Pyarali
// 
// ============================================================================

#include "test_config.h"
#include "ace/Reactor.h"
#include "ace/Strategies.h"
#include "ace/Task.h"

static int iterations = 10;

class Message_Handler : public ACE_Task<ACE_NULL_SYNCH>
{
public:
  Message_Handler (ACE_Reactor &reactor);
  
  virtual int handle_input (ACE_HANDLE);
  virtual int handle_output (ACE_HANDLE fd = ACE_INVALID_HANDLE);
  virtual int handle_exception (ACE_HANDLE fd = ACE_INVALID_HANDLE);

private:
  int process_message (void);
  void make_message (void);

  ACE_Reactor_Notification_Strategy notification_strategy_;  
};

Message_Handler::Message_Handler (ACE_Reactor &reactor)
  // First time handle_input will be called 
  : notification_strategy_ (&reactor,
			    this,
			    ACE_Event_Handler::READ_MASK)
{
  this->msg_queue ()->notification_strategy (&this->notification_strategy_);
  this->make_message ();
}

int
Message_Handler::handle_input (ACE_HANDLE)
{
  ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("Message_Handler::handle_input\n")));

  // Next time handle_output will be called 
  this->notification_strategy_.mask (ACE_Event_Handler::WRITE_MASK);

  return process_message ();
}

int 
Message_Handler::handle_output (ACE_HANDLE fd)
{
  ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("Message_Handler::handle_output\n")));
  ACE_UNUSED_ARG (fd);  

  // Next time handle_exception will be called 
  this->notification_strategy_.mask (ACE_Event_Handler::EXCEPT_MASK);

  return process_message ();  
}

int 
Message_Handler::handle_exception (ACE_HANDLE fd)
{
  ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("Message_Handler::handle_exception\n")));
  ACE_UNUSED_ARG (fd);  

  // Next time handle_input will be called 
  this->notification_strategy_.mask (ACE_Event_Handler::READ_MASK);

  return process_message ();
}

int
Message_Handler::process_message (void)
{
  ACE_Message_Block *mb;

  if (this->getq (mb, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1)
    ACE_ERROR_RETURN ((LM_ERROR, ASYS_TEXT ("%p\n"), ASYS_TEXT ("dequeue_head")), -1);
  else
    {
      ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("message received = %s\n"), mb->rd_ptr ()));
      delete mb;
    }

  this->make_message ();
  return 0;
}  

void
Message_Handler::make_message (void)
{
  if (--iterations > 0)
    {
      ACE_Message_Block *mb = new ACE_Message_Block ((char *) ASYS_TEXT ("hello"));
      
      ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("sending message\n")));
      this->putq (mb);
    }
}
  

int
main (int, ASYS_TCHAR *[])
{
  ACE_START_TEST (ASYS_TEXT ("Message_Queue_Notifications_Test"));

#if defined (ACE_HAS_THREADS)
  ACE_Reactor reactor; 
  Message_Handler mh (reactor);

  while (iterations > 0)
    reactor.handle_events ();

#else
  ACE_ERROR ((LM_ERROR, ASYS_TEXT ("threads not supported on this platform\n")));
#endif /* ACE_HAS_THREADS */
  ACE_END_TEST;
  return 0;
}