diff options
Diffstat (limited to 'examples')
-rw-r--r-- | examples/Reactor/ReactorEx/test_reactorEx.cpp | 80 | ||||
-rw-r--r-- | examples/Reactor/WFMO_Reactor/test_reactorEx.cpp | 80 |
2 files changed, 60 insertions, 100 deletions
diff --git a/examples/Reactor/ReactorEx/test_reactorEx.cpp b/examples/Reactor/ReactorEx/test_reactorEx.cpp index 4fc65c0b284..1a033eae434 100644 --- a/examples/Reactor/ReactorEx/test_reactorEx.cpp +++ b/examples/Reactor/ReactorEx/test_reactorEx.cpp @@ -45,6 +45,7 @@ class Peer_Handler : public MT_TASK { public: Peer_Handler (int argc, char *argv[]); + ~Peer_Handler (void); int open (void * =0); // This method creates the network connection to the remote peer. @@ -71,24 +72,19 @@ public: virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask); // We've been removed from the ReactorEx. - virtual int handle_signal (int index, siginfo_t *, ucontext_t *); - // We've been signaled by the STDIN thread. Try to dequeue a - // message. - - void try_send (void); - // Try to dequeue a message. If successful, send it proactively to - // the remote peer. - - virtual int put (ACE_Message_Block *mb, - ACE_Time_Value *tv = 0); - // Enqueue the new mb and signal the main thread. + virtual int handle_output (ACE_HANDLE fd); + // Called when output events should start private: + int put (ACE_Message_Block *, ACE_Time_Value *) { return 0; } + // Make Task happy. + ACE_SOCK_Stream stream_; // Socket that we have connected to the server. - ACE_HANDLE new_msg_event_; - // Event that gets signaled when messages arrive. + ACE_ReactorEx_Notification_Strategy strategy_; + // strategy created such that the reactorEx notifies us when + // something is added to the queue. // = Remote peer info. char *host_; @@ -138,8 +134,19 @@ private: Peer_Handler::Peer_Handler (int argc, char *argv[]) : host_ (0), - port_ (ACE_DEFAULT_SERVER_PORT) + port_ (ACE_DEFAULT_SERVER_PORT), + strategy_ (ACE_Service_Config::reactorEx (), + this, + ACE_Event_Handler::WRITE_MASK) { + // This code sets up the message to notify us when a new message is + // added to the queue. Actually, the queue notifies ReactorEx which + // then notifies us. + ACE_Message_Queue<Peer_Handler::SYNCH>* mq; + ACE_NEW (mq, ACE_Message_Queue<Peer_Handler::SYNCH>); + mq->notification_strategy (&this->strategy_); + this->msg_queue (mq); + ACE_Get_Opt get_opt (argc, argv, "h:p:"); int c; @@ -155,11 +162,12 @@ Peer_Handler::Peer_Handler (int argc, char *argv[]) break; } } +} - // Auto reset event. - new_msg_event_ = ::CreateEvent (NULL, FALSE, FALSE, NULL); - ACE_Service_Config::reactorEx ()-> - register_handler (this, new_msg_event_); +Peer_Handler::~Peer_Handler (void) +{ + ACE_Message_Queue<Peer_Handler::SYNCH>* mq = this->msg_queue (); + delete mq; } // This method creates the network connection to the remote peer. It @@ -266,21 +274,9 @@ Peer_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask) return 0; } -// We've been signaled by the STDIN thread. Try to dequeue a -// message. - +// New stuff added to the message queue. Try to dequeue a message. int -Peer_Handler::handle_signal (int, siginfo_t *, ucontext_t *) -{ - this->try_send (); - return 0; -} - -// Try to dequeue a message. If successful, send it proactively to -// the remote peer. - -void -Peer_Handler::try_send (void) +Peer_Handler::handle_output (ACE_HANDLE fd) { ACE_Message_Block *mb; @@ -292,23 +288,7 @@ Peer_Handler::try_send (void) initiate (this, ACE_Event_Handler::WRITE_MASK, mb) == -1) ACE_ERROR ((LM_ERROR, "%p Write initiate.\n", "Peer_Handler")); } -} - -// Enqueue the new mb and signal the main thread. - -int -Peer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *tv) -{ - // Enqueue the mb. - int result = this->putq (mb, tv); - - // Signal the main thread. This will remain signaled until the main - // thread wakes up. - - if (::SetEvent (new_msg_event_) == 0) - ACE_ERROR ((LM_ERROR, "Pulse Failed!\n")); - - return result; + return 0; } void @@ -368,7 +348,7 @@ STDIN_Handler::svc (void) if (read_result > 0) { mb->wr_ptr (read_result); - this->ph_.put (mb); + this->ph_.putq (mb); } else break; diff --git a/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp b/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp index 4fc65c0b284..1a033eae434 100644 --- a/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp +++ b/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp @@ -45,6 +45,7 @@ class Peer_Handler : public MT_TASK { public: Peer_Handler (int argc, char *argv[]); + ~Peer_Handler (void); int open (void * =0); // This method creates the network connection to the remote peer. @@ -71,24 +72,19 @@ public: virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask); // We've been removed from the ReactorEx. - virtual int handle_signal (int index, siginfo_t *, ucontext_t *); - // We've been signaled by the STDIN thread. Try to dequeue a - // message. - - void try_send (void); - // Try to dequeue a message. If successful, send it proactively to - // the remote peer. - - virtual int put (ACE_Message_Block *mb, - ACE_Time_Value *tv = 0); - // Enqueue the new mb and signal the main thread. + virtual int handle_output (ACE_HANDLE fd); + // Called when output events should start private: + int put (ACE_Message_Block *, ACE_Time_Value *) { return 0; } + // Make Task happy. + ACE_SOCK_Stream stream_; // Socket that we have connected to the server. - ACE_HANDLE new_msg_event_; - // Event that gets signaled when messages arrive. + ACE_ReactorEx_Notification_Strategy strategy_; + // strategy created such that the reactorEx notifies us when + // something is added to the queue. // = Remote peer info. char *host_; @@ -138,8 +134,19 @@ private: Peer_Handler::Peer_Handler (int argc, char *argv[]) : host_ (0), - port_ (ACE_DEFAULT_SERVER_PORT) + port_ (ACE_DEFAULT_SERVER_PORT), + strategy_ (ACE_Service_Config::reactorEx (), + this, + ACE_Event_Handler::WRITE_MASK) { + // This code sets up the message to notify us when a new message is + // added to the queue. Actually, the queue notifies ReactorEx which + // then notifies us. + ACE_Message_Queue<Peer_Handler::SYNCH>* mq; + ACE_NEW (mq, ACE_Message_Queue<Peer_Handler::SYNCH>); + mq->notification_strategy (&this->strategy_); + this->msg_queue (mq); + ACE_Get_Opt get_opt (argc, argv, "h:p:"); int c; @@ -155,11 +162,12 @@ Peer_Handler::Peer_Handler (int argc, char *argv[]) break; } } +} - // Auto reset event. - new_msg_event_ = ::CreateEvent (NULL, FALSE, FALSE, NULL); - ACE_Service_Config::reactorEx ()-> - register_handler (this, new_msg_event_); +Peer_Handler::~Peer_Handler (void) +{ + ACE_Message_Queue<Peer_Handler::SYNCH>* mq = this->msg_queue (); + delete mq; } // This method creates the network connection to the remote peer. It @@ -266,21 +274,9 @@ Peer_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask) return 0; } -// We've been signaled by the STDIN thread. Try to dequeue a -// message. - +// New stuff added to the message queue. Try to dequeue a message. int -Peer_Handler::handle_signal (int, siginfo_t *, ucontext_t *) -{ - this->try_send (); - return 0; -} - -// Try to dequeue a message. If successful, send it proactively to -// the remote peer. - -void -Peer_Handler::try_send (void) +Peer_Handler::handle_output (ACE_HANDLE fd) { ACE_Message_Block *mb; @@ -292,23 +288,7 @@ Peer_Handler::try_send (void) initiate (this, ACE_Event_Handler::WRITE_MASK, mb) == -1) ACE_ERROR ((LM_ERROR, "%p Write initiate.\n", "Peer_Handler")); } -} - -// Enqueue the new mb and signal the main thread. - -int -Peer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *tv) -{ - // Enqueue the mb. - int result = this->putq (mb, tv); - - // Signal the main thread. This will remain signaled until the main - // thread wakes up. - - if (::SetEvent (new_msg_event_) == 0) - ACE_ERROR ((LM_ERROR, "Pulse Failed!\n")); - - return result; + return 0; } void @@ -368,7 +348,7 @@ STDIN_Handler::svc (void) if (read_result > 0) { mb->wr_ptr (read_result); - this->ph_.put (mb); + this->ph_.putq (mb); } else break; |