summaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
Diffstat (limited to 'examples')
-rw-r--r--examples/Reactor/ReactorEx/test_reactorEx.cpp80
-rw-r--r--examples/Reactor/WFMO_Reactor/test_reactorEx.cpp80
2 files changed, 60 insertions, 100 deletions
diff --git a/examples/Reactor/ReactorEx/test_reactorEx.cpp b/examples/Reactor/ReactorEx/test_reactorEx.cpp
index 4fc65c0b284..1a033eae434 100644
--- a/examples/Reactor/ReactorEx/test_reactorEx.cpp
+++ b/examples/Reactor/ReactorEx/test_reactorEx.cpp
@@ -45,6 +45,7 @@ class Peer_Handler : public MT_TASK
{
public:
Peer_Handler (int argc, char *argv[]);
+ ~Peer_Handler (void);
int open (void * =0);
// This method creates the network connection to the remote peer.
@@ -71,24 +72,19 @@ public:
virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask);
// We've been removed from the ReactorEx.
- virtual int handle_signal (int index, siginfo_t *, ucontext_t *);
- // We've been signaled by the STDIN thread. Try to dequeue a
- // message.
-
- void try_send (void);
- // Try to dequeue a message. If successful, send it proactively to
- // the remote peer.
-
- virtual int put (ACE_Message_Block *mb,
- ACE_Time_Value *tv = 0);
- // Enqueue the new mb and signal the main thread.
+ virtual int handle_output (ACE_HANDLE fd);
+ // Called when output events should start
private:
+ int put (ACE_Message_Block *, ACE_Time_Value *) { return 0; }
+ // Make Task happy.
+
ACE_SOCK_Stream stream_;
// Socket that we have connected to the server.
- ACE_HANDLE new_msg_event_;
- // Event that gets signaled when messages arrive.
+ ACE_ReactorEx_Notification_Strategy strategy_;
+ // strategy created such that the reactorEx notifies us when
+ // something is added to the queue.
// = Remote peer info.
char *host_;
@@ -138,8 +134,19 @@ private:
Peer_Handler::Peer_Handler (int argc, char *argv[])
: host_ (0),
- port_ (ACE_DEFAULT_SERVER_PORT)
+ port_ (ACE_DEFAULT_SERVER_PORT),
+ strategy_ (ACE_Service_Config::reactorEx (),
+ this,
+ ACE_Event_Handler::WRITE_MASK)
{
+ // This code sets up the message to notify us when a new message is
+ // added to the queue. Actually, the queue notifies ReactorEx which
+ // then notifies us.
+ ACE_Message_Queue<Peer_Handler::SYNCH>* mq;
+ ACE_NEW (mq, ACE_Message_Queue<Peer_Handler::SYNCH>);
+ mq->notification_strategy (&this->strategy_);
+ this->msg_queue (mq);
+
ACE_Get_Opt get_opt (argc, argv, "h:p:");
int c;
@@ -155,11 +162,12 @@ Peer_Handler::Peer_Handler (int argc, char *argv[])
break;
}
}
+}
- // Auto reset event.
- new_msg_event_ = ::CreateEvent (NULL, FALSE, FALSE, NULL);
- ACE_Service_Config::reactorEx ()->
- register_handler (this, new_msg_event_);
+Peer_Handler::~Peer_Handler (void)
+{
+ ACE_Message_Queue<Peer_Handler::SYNCH>* mq = this->msg_queue ();
+ delete mq;
}
// This method creates the network connection to the remote peer. It
@@ -266,21 +274,9 @@ Peer_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
return 0;
}
-// We've been signaled by the STDIN thread. Try to dequeue a
-// message.
-
+// New stuff added to the message queue. Try to dequeue a message.
int
-Peer_Handler::handle_signal (int, siginfo_t *, ucontext_t *)
-{
- this->try_send ();
- return 0;
-}
-
-// Try to dequeue a message. If successful, send it proactively to
-// the remote peer.
-
-void
-Peer_Handler::try_send (void)
+Peer_Handler::handle_output (ACE_HANDLE fd)
{
ACE_Message_Block *mb;
@@ -292,23 +288,7 @@ Peer_Handler::try_send (void)
initiate (this, ACE_Event_Handler::WRITE_MASK, mb) == -1)
ACE_ERROR ((LM_ERROR, "%p Write initiate.\n", "Peer_Handler"));
}
-}
-
-// Enqueue the new mb and signal the main thread.
-
-int
-Peer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
-{
- // Enqueue the mb.
- int result = this->putq (mb, tv);
-
- // Signal the main thread. This will remain signaled until the main
- // thread wakes up.
-
- if (::SetEvent (new_msg_event_) == 0)
- ACE_ERROR ((LM_ERROR, "Pulse Failed!\n"));
-
- return result;
+ return 0;
}
void
@@ -368,7 +348,7 @@ STDIN_Handler::svc (void)
if (read_result > 0)
{
mb->wr_ptr (read_result);
- this->ph_.put (mb);
+ this->ph_.putq (mb);
}
else
break;
diff --git a/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp b/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp
index 4fc65c0b284..1a033eae434 100644
--- a/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp
+++ b/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp
@@ -45,6 +45,7 @@ class Peer_Handler : public MT_TASK
{
public:
Peer_Handler (int argc, char *argv[]);
+ ~Peer_Handler (void);
int open (void * =0);
// This method creates the network connection to the remote peer.
@@ -71,24 +72,19 @@ public:
virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask);
// We've been removed from the ReactorEx.
- virtual int handle_signal (int index, siginfo_t *, ucontext_t *);
- // We've been signaled by the STDIN thread. Try to dequeue a
- // message.
-
- void try_send (void);
- // Try to dequeue a message. If successful, send it proactively to
- // the remote peer.
-
- virtual int put (ACE_Message_Block *mb,
- ACE_Time_Value *tv = 0);
- // Enqueue the new mb and signal the main thread.
+ virtual int handle_output (ACE_HANDLE fd);
+ // Called when output events should start
private:
+ int put (ACE_Message_Block *, ACE_Time_Value *) { return 0; }
+ // Make Task happy.
+
ACE_SOCK_Stream stream_;
// Socket that we have connected to the server.
- ACE_HANDLE new_msg_event_;
- // Event that gets signaled when messages arrive.
+ ACE_ReactorEx_Notification_Strategy strategy_;
+ // strategy created such that the reactorEx notifies us when
+ // something is added to the queue.
// = Remote peer info.
char *host_;
@@ -138,8 +134,19 @@ private:
Peer_Handler::Peer_Handler (int argc, char *argv[])
: host_ (0),
- port_ (ACE_DEFAULT_SERVER_PORT)
+ port_ (ACE_DEFAULT_SERVER_PORT),
+ strategy_ (ACE_Service_Config::reactorEx (),
+ this,
+ ACE_Event_Handler::WRITE_MASK)
{
+ // This code sets up the message to notify us when a new message is
+ // added to the queue. Actually, the queue notifies ReactorEx which
+ // then notifies us.
+ ACE_Message_Queue<Peer_Handler::SYNCH>* mq;
+ ACE_NEW (mq, ACE_Message_Queue<Peer_Handler::SYNCH>);
+ mq->notification_strategy (&this->strategy_);
+ this->msg_queue (mq);
+
ACE_Get_Opt get_opt (argc, argv, "h:p:");
int c;
@@ -155,11 +162,12 @@ Peer_Handler::Peer_Handler (int argc, char *argv[])
break;
}
}
+}
- // Auto reset event.
- new_msg_event_ = ::CreateEvent (NULL, FALSE, FALSE, NULL);
- ACE_Service_Config::reactorEx ()->
- register_handler (this, new_msg_event_);
+Peer_Handler::~Peer_Handler (void)
+{
+ ACE_Message_Queue<Peer_Handler::SYNCH>* mq = this->msg_queue ();
+ delete mq;
}
// This method creates the network connection to the remote peer. It
@@ -266,21 +274,9 @@ Peer_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
return 0;
}
-// We've been signaled by the STDIN thread. Try to dequeue a
-// message.
-
+// New stuff added to the message queue. Try to dequeue a message.
int
-Peer_Handler::handle_signal (int, siginfo_t *, ucontext_t *)
-{
- this->try_send ();
- return 0;
-}
-
-// Try to dequeue a message. If successful, send it proactively to
-// the remote peer.
-
-void
-Peer_Handler::try_send (void)
+Peer_Handler::handle_output (ACE_HANDLE fd)
{
ACE_Message_Block *mb;
@@ -292,23 +288,7 @@ Peer_Handler::try_send (void)
initiate (this, ACE_Event_Handler::WRITE_MASK, mb) == -1)
ACE_ERROR ((LM_ERROR, "%p Write initiate.\n", "Peer_Handler"));
}
-}
-
-// Enqueue the new mb and signal the main thread.
-
-int
-Peer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
-{
- // Enqueue the mb.
- int result = this->putq (mb, tv);
-
- // Signal the main thread. This will remain signaled until the main
- // thread wakes up.
-
- if (::SetEvent (new_msg_event_) == 0)
- ACE_ERROR ((LM_ERROR, "Pulse Failed!\n"));
-
- return result;
+ return 0;
}
void
@@ -368,7 +348,7 @@ STDIN_Handler::svc (void)
if (read_result > 0)
{
mb->wr_ptr (read_result);
- this->ph_.put (mb);
+ this->ph_.putq (mb);
}
else
break;