diff options
author | schmidt <douglascraigschmidt@users.noreply.github.com> | 2001-03-21 11:59:40 +0000 |
---|---|---|
committer | schmidt <douglascraigschmidt@users.noreply.github.com> | 2001-03-21 11:59:40 +0000 |
commit | b7ff7d2f3f86fc125f741cfe56a721e86e516993 (patch) | |
tree | 623a559f568a8a9e904a4e8bdd63e2cf3916738e | |
parent | 5156e424454de9a2e949a25ad2ee39a951feb57a (diff) | |
download | ATCD-b7ff7d2f3f86fc125f741cfe56a721e86e516993.tar.gz |
ChangeLogTag:Tue Mar 20 17:31:21 2001 Carlos O'Ryan <coryan@uci.edu>
-rw-r--r-- | ChangeLog | 16 | ||||
-rw-r--r-- | ChangeLogs/ChangeLog-02a | 16 | ||||
-rw-r--r-- | ChangeLogs/ChangeLog-03a | 16 | ||||
-rw-r--r-- | THANKS | 2 | ||||
-rw-r--r-- | ace/OS.i | 6 | ||||
-rw-r--r-- | ace/Select_Reactor_T.h | 2 | ||||
-rw-r--r-- | apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp | 153 | ||||
-rw-r--r-- | apps/Gateway/Gateway/Concrete_Connection_Handlers.h | 28 | ||||
-rw-r--r-- | apps/Gateway/Gateway/Event_Channel.cpp | 99 | ||||
-rw-r--r-- | apps/Gateway/Gateway/Event_Channel.h | 9 | ||||
-rw-r--r-- | apps/Gateway/Gateway/Options.cpp | 26 | ||||
-rw-r--r-- | apps/Gateway/Gateway/Options.h | 4 | ||||
-rw-r--r-- | apps/Gateway/Peer/Peer.cpp | 78 | ||||
-rw-r--r-- | apps/Gateway/Peer/Peer.h | 4 |
14 files changed, 355 insertions, 104 deletions
diff --git a/ChangeLog b/ChangeLog index 50a439b4d92..9e9168e1a0b 100644 --- a/ChangeLog +++ b/ChangeLog @@ -4,6 +4,13 @@ Tue Mar 20 17:31:21 2001 Carlos O'Ryan <coryan@uci.edu> The Logging_Strategy_Test dynamically loads the Logger service (from netsvcs), therefore it does not work on static builds. +Tue Mar 20 18:17:24 2001 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu> + + * ace/OS.i: Fixed the USYNC_PROCESS arm of CreateMutex() in + ACE_OS::mutex_init() so that it calls + ACE_OS::set_errno_to_last_error(). Thanks to Ram Ben-Yakir + <Ram@bandwiz.com> for reporting this. + Tue Mar 20 01:33:24 2001 Ossama Othman <ossama@uci.edu> * ace/SSL/SSL_SOCK_Acceptor.cpp (ssl_accept): @@ -71,8 +78,15 @@ Sun Mar 18 22:12:16 2001 Ossama Othman <ossama@uci.edu> Sun Mar 18 09:46:47 2001 Balachandran Natarajan <bala@cs.wustl.edu> - * tests/New_Fail_Test.cpp: Fixed warnings in g++. + * tests/New_Fail_Test.cpp: Fixed warnings in g++. + +Sun Mar 18 08:31:34 2001 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu> + * apps/Gateway/Gateway, + * apps/Gateway/Peer: Added a number of fixes to the Gateway and Peer + applications. Thanks to Lu Yunhai <luyunhai@huawei.com> for + contributing these. + Sun Mar 18 08:31:34 2001 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu> * examples/IPC_SAP/FILE_SAP/client.cpp (main): Added a couple of diff --git a/ChangeLogs/ChangeLog-02a b/ChangeLogs/ChangeLog-02a index 50a439b4d92..9e9168e1a0b 100644 --- a/ChangeLogs/ChangeLog-02a +++ b/ChangeLogs/ChangeLog-02a @@ -4,6 +4,13 @@ Tue Mar 20 17:31:21 2001 Carlos O'Ryan <coryan@uci.edu> The Logging_Strategy_Test dynamically loads the Logger service (from netsvcs), therefore it does not work on static builds. +Tue Mar 20 18:17:24 2001 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu> + + * ace/OS.i: Fixed the USYNC_PROCESS arm of CreateMutex() in + ACE_OS::mutex_init() so that it calls + ACE_OS::set_errno_to_last_error(). Thanks to Ram Ben-Yakir + <Ram@bandwiz.com> for reporting this. + Tue Mar 20 01:33:24 2001 Ossama Othman <ossama@uci.edu> * ace/SSL/SSL_SOCK_Acceptor.cpp (ssl_accept): @@ -71,8 +78,15 @@ Sun Mar 18 22:12:16 2001 Ossama Othman <ossama@uci.edu> Sun Mar 18 09:46:47 2001 Balachandran Natarajan <bala@cs.wustl.edu> - * tests/New_Fail_Test.cpp: Fixed warnings in g++. + * tests/New_Fail_Test.cpp: Fixed warnings in g++. + +Sun Mar 18 08:31:34 2001 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu> + * apps/Gateway/Gateway, + * apps/Gateway/Peer: Added a number of fixes to the Gateway and Peer + applications. Thanks to Lu Yunhai <luyunhai@huawei.com> for + contributing these. + Sun Mar 18 08:31:34 2001 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu> * examples/IPC_SAP/FILE_SAP/client.cpp (main): Added a couple of diff --git a/ChangeLogs/ChangeLog-03a b/ChangeLogs/ChangeLog-03a index 50a439b4d92..9e9168e1a0b 100644 --- a/ChangeLogs/ChangeLog-03a +++ b/ChangeLogs/ChangeLog-03a @@ -4,6 +4,13 @@ Tue Mar 20 17:31:21 2001 Carlos O'Ryan <coryan@uci.edu> The Logging_Strategy_Test dynamically loads the Logger service (from netsvcs), therefore it does not work on static builds. +Tue Mar 20 18:17:24 2001 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu> + + * ace/OS.i: Fixed the USYNC_PROCESS arm of CreateMutex() in + ACE_OS::mutex_init() so that it calls + ACE_OS::set_errno_to_last_error(). Thanks to Ram Ben-Yakir + <Ram@bandwiz.com> for reporting this. + Tue Mar 20 01:33:24 2001 Ossama Othman <ossama@uci.edu> * ace/SSL/SSL_SOCK_Acceptor.cpp (ssl_accept): @@ -71,8 +78,15 @@ Sun Mar 18 22:12:16 2001 Ossama Othman <ossama@uci.edu> Sun Mar 18 09:46:47 2001 Balachandran Natarajan <bala@cs.wustl.edu> - * tests/New_Fail_Test.cpp: Fixed warnings in g++. + * tests/New_Fail_Test.cpp: Fixed warnings in g++. + +Sun Mar 18 08:31:34 2001 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu> + * apps/Gateway/Gateway, + * apps/Gateway/Peer: Added a number of fixes to the Gateway and Peer + applications. Thanks to Lu Yunhai <luyunhai@huawei.com> for + contributing these. + Sun Mar 18 08:31:34 2001 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu> * examples/IPC_SAP/FILE_SAP/client.cpp (main): Added a couple of @@ -1179,6 +1179,8 @@ Rahul Shukla <rshukla@ggn.aithent.com> Pierre Fayolle <fayolle@enseirb.fr> Greg McCain <greg.mccain@veritas.com> Matt Cheers <matt.cheers@boeing.com> +Benjamin Fry <ben@thrownet.com> +Ram Ben-Yakir <Ram@bandwiz.com> I would particularly like to thank Paul Stephenson, who worked with me at Ericsson in the early 1990's. Paul devised the recursive Makefile @@ -1598,7 +1598,11 @@ ACE_OS::mutex_init (ACE_mutex_t *m, if (m->proc_mutex_ == 0) ACE_FAIL_RETURN (-1); else - return 0; + { + // Make sure to set errno to ERROR_ALREADY_EXISTS if necessary. + ACE_OS::set_errno_to_last_error (); + return 0; + } case USYNC_THREAD: return ACE_OS::thread_mutex_init (&m->thr_mutex_, type, diff --git a/ace/Select_Reactor_T.h b/ace/Select_Reactor_T.h index 5cb075391ad..6537effca36 100644 --- a/ace/Select_Reactor_T.h +++ b/ace/Select_Reactor_T.h @@ -514,7 +514,7 @@ public: /// Wake up all threads in waiting in the event loop virtual void wakeup_all_threads (void); - // = Only the owner thread that can perform a <handle_events>. + // = Only the owner thread can perform a <handle_events>. /// Set the new owner of the thread and return the old owner. virtual int owner (ACE_thread_t n_id, ACE_thread_t *o_id = 0); diff --git a/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp b/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp index b1acae66bde..95c82b8c159 100644 --- a/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp +++ b/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp @@ -18,22 +18,28 @@ Consumer_Handler::Consumer_Handler (const Connection_Config_Info &pci) // unexpectedly. This method simply marks the Connection_Handler as // having failed so that handle_close () can reconnect. +// Do not close handler when received data successfully. +// Consumer_Handler should could process received data. +// For example, Consumer could send reply-event to Supplier. int Consumer_Handler::handle_input (ACE_HANDLE) { - char buf[1]; + // Do not set FAILED state at here, just at real failed place. - this->state (Connection_Handler::FAILED); + char buf[BUFSIZ]; + ssize_t received = this->peer ().recv (buf, sizeof buf); - switch (this->peer ().recv (buf, sizeof buf)) + switch (received) { case -1: + this->state (Connection_Handler::FAILED); ACE_ERROR_RETURN ((LM_ERROR, "(%t) Peer has failed unexpectedly for Consumer_Handler %d\n", this->connection_id ()), -1); /* NOTREACHED */ case 0: + this->state (Connection_Handler::FAILED); ACE_ERROR_RETURN ((LM_ERROR, "(%t) Peer has shutdown unexpectedly for Consumer_Handler %d\n", this->connection_id ()), @@ -41,9 +47,11 @@ Consumer_Handler::handle_input (ACE_HANDLE) /* NOTREACHED */ default: ACE_ERROR_RETURN ((LM_ERROR, - "(%t) Consumer is erroneously sending input to Consumer_Handler %d\n", - this->connection_id ()), - -1); + "(%t) IGNORED: Consumer is erroneously sending input to Consumer_Handler %d\n" + "data size = %d\n", + this->connection_id (), + received), + 0); // Return 0 to identify received data successfully. /* NOTREACHED */ } } @@ -392,7 +400,9 @@ Supplier_Handler::recv (ACE_Message_Block *&forward_addr) ssize_t (event->header_.len_ - (msg_frag_->wr_ptr () - event->data_)); ssize_t data_received = - this->peer ().recv (this->msg_frag_->wr_ptr (), data_bytes_left_to_read); + !data_bytes_left_to_read + ? 0 // peer().recv() should not be called when data_bytes_left_to_read is 0. + : this->peer ().recv (this->msg_frag_->wr_ptr (), data_bytes_left_to_read); // Try to receive the remainder of the event. @@ -406,8 +416,12 @@ Supplier_Handler::recv (ACE_Message_Block *&forward_addr) /* FALLTHROUGH */; case 0: // Premature EOF. - this->msg_frag_ = this->msg_frag_->release (); - return 0; + if (data_bytes_left_to_read) + { + this->msg_frag_ = this->msg_frag_->release (); + return 0; + } + /* FALLTHROUGH */; default: // Set the write pointer at 1 past the end of the event. @@ -468,8 +482,8 @@ Supplier_Handler::recv (ACE_Message_Block *&forward_addr) } } -// Receive various types of input (e.g., Peer event from the -// gatewayd, as well as stdio). +// Receive various types of input (e.g., Peer event from the gatewayd, +// as well as stdio). int Supplier_Handler::handle_input (ACE_HANDLE) @@ -519,26 +533,47 @@ Supplier_Handler::process (ACE_Message_Block *event_key) Thr_Consumer_Handler::Thr_Consumer_Handler (const Connection_Config_Info &pci) : Consumer_Handler (pci) { + // It is not in thread svc() now. + in_thread_ = 0; +} + +// Overriding handle_close() method. If in thread svc(), no need to +// process handle_close() when call peer().close(), because the +// connection is blocked now. + +int +Thr_Consumer_Handler::handle_close (ACE_HANDLE h, ACE_Reactor_Mask m) +{ + if (in_thread_) + return 0; + else + return Consumer_Handler::handle_close (h, m); } // This method should be called only when the Consumer shuts down -// unexpectedly. This method marks the Connection_Handler as having failed -// and deactivates the ACE_Message_Queue (to wake up the thread +// unexpectedly. This method marks the Connection_Handler as having +// failed and deactivates the ACE_Message_Queue (to wake up the thread // blocked on <dequeue_head> in svc()). -// Thr_Output_Handler::handle_close () will eventually try to +// Thr_Consumer_Handler::handle_close () will eventually try to // reconnect... +// Let Consumer_Handler receive normal data. int Thr_Consumer_Handler::handle_input (ACE_HANDLE h) { // Call down to the <Consumer_Handler> to handle this first. - this->Consumer_Handler::handle_input (h); + if (this->Consumer_Handler::handle_input (h) != 0) + { + // Only do such work when failed. - ACE_Reactor::instance ()->remove_handler - (h, ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL); + ACE_Reactor::instance ()->remove_handler + (h, ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL); - // Deactivate the queue while we try to get reconnected. - this->msg_queue ()->deactivate (); + // Deactivate the queue while we try to get reconnected. + this->msg_queue ()->deactivate (); + // Will call handle_close. + return -1; + } return 0; } @@ -550,31 +585,42 @@ Thr_Consumer_Handler::open (void *) { // Turn off non-blocking I/O. if (this->peer ().disable (ACE_NONBLOCK) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1); + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + "disable"), + -1); // Incorrect info fixed. // Call back to the <Event_Channel> to complete our initialization. else if (this->event_channel_->complete_connection_connection (this) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_connection_connection"), -1); + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + "complete_connection_connection"), + -1); // Register ourselves to receive input events (which indicate that // the Consumer has shut down unexpectedly). else if (ACE_Reactor::instance ()->register_handler (this, ACE_Event_Handler::READ_MASK) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1); + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + "register_handler"), + -1); // Reactivate message queue. If it was active then this is the // first time in and we need to spawn a thread, otherwise the queue // was inactive due to some problem and we've already got a thread. else if (this->msg_queue ()->activate () == ACE_Message_Queue<ACE_SYNCH>::WAS_ACTIVE) { - ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n")); + ACE_DEBUG ((LM_DEBUG, + "(%t) spawning new thread\n")); // Become an active object by spawning a new thread to transmit // events to Consumers. return this->activate (THR_NEW_LWP | THR_DETACHED); } else { - ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n")); + ACE_DEBUG ((LM_DEBUG, + "(%t) reusing existing thread\n")); return 0; } } @@ -597,8 +643,7 @@ Thr_Consumer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *) int Thr_Consumer_Handler::svc (void) { - - for (;;) + for (in_thread_ = 1;;) { ACE_DEBUG ((LM_DEBUG, "(%t) Thr_Consumer_Handler's handle = %d\n", @@ -627,7 +672,12 @@ Thr_Consumer_Handler::svc (void) // Re-establish the connection, using expoential backoff. for (this->timeout (1); // Default is to reconnect synchronously. - this->event_channel_->initiate_connection_connection (this) == -1; ) + this->event_channel_->initiate_connection_connection (this, 1) == -1; + // Second parameter '1' means using sync mode directly, + // don't care Options::blocking_semantics(). If don't do + // so, async mode will be used to connect which won't + // satisfy original design. + ) { ACE_Time_Value tv (this->timeout ()); @@ -645,6 +695,21 @@ Thr_Consumer_Handler::svc (void) Thr_Supplier_Handler::Thr_Supplier_Handler (const Connection_Config_Info &pci) : Supplier_Handler (pci) { + // It is not in thread svc() now. + in_thread_ = 0; +} + +// Overriding handle_close() method. If in thread svc(), no need to +// process handle_close() when call peer().close(), because the +// connection is blocked now. + +int +Thr_Supplier_Handler::handle_close (ACE_HANDLE h, ACE_Reactor_Mask m) +{ + if (in_thread_) + return 0; + else + return Supplier_Handler::handle_close (h, m); } int @@ -652,18 +717,25 @@ Thr_Supplier_Handler::open (void *) { // Turn off non-blocking I/O. if (this->peer ().disable (ACE_NONBLOCK) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1); + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + "disable"), + -1); // Incorrect info fixed. // Call back to the <Event_Channel> to complete our initialization. else if (this->event_channel_->complete_connection_connection (this) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_connection_connection"), -1); + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + "complete_connection_connection"), + -1); // Reactivate message queue. If it was active then this is the // first time in and we need to spawn a thread, otherwise the queue // was inactive due to some problem and we've already got a thread. else if (this->msg_queue ()->activate () == ACE_Message_Queue<ACE_SYNCH>::WAS_ACTIVE) { - ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n")); + ACE_DEBUG ((LM_DEBUG, + "(%t) spawning new thread\n")); // Become an active object by spawning a new thread to transmit // events to peers. return this->activate (THR_NEW_LWP | THR_DETACHED); @@ -681,7 +753,7 @@ Thr_Supplier_Handler::open (void *) int Thr_Supplier_Handler::svc (void) { - for (;;) + for (in_thread_ = 1;;) { ACE_DEBUG ((LM_DEBUG, "(%t) Thr_Supplier_Handler's handle = %d\n", @@ -708,7 +780,12 @@ Thr_Supplier_Handler::svc (void) // Re-establish the connection, using expoential backoff. for (this->timeout (1); // Default is to reconnect synchronously. - this->event_channel_->initiate_connection_connection (this) == -1; ) + this->event_channel_->initiate_connection_connection (this, 1) == -1; + // Second parameter '1' means using sync mode directly, + // don't care Options::blocking_semantics(). If don't do + // so, async mode will be used to connect which won't + // satisfy original design. + ) { ACE_Time_Value tv (this->timeout ()); ACE_ERROR ((LM_ERROR, @@ -719,15 +796,3 @@ Thr_Supplier_Handler::svc (void) } ACE_NOTREACHED(return 0); } - -int -Thr_Consumer_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask) -{ - return 0; -} - -int -Thr_Supplier_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask) -{ - return 0; -} diff --git a/apps/Gateway/Gateway/Concrete_Connection_Handlers.h b/apps/Gateway/Gateway/Concrete_Connection_Handlers.h index 3f9203d037b..287a4c8ec34 100644 --- a/apps/Gateway/Gateway/Concrete_Connection_Handlers.h +++ b/apps/Gateway/Gateway/Concrete_Connection_Handlers.h @@ -14,7 +14,7 @@ // appropriate threaded/reactive Consumer/Supplier behavior. // // = AUTHOR -// Doug Schmidt +// Doug Schmidt <schmidt@cs.wustl.edu> // // ============================================================================ @@ -108,8 +108,17 @@ protected: virtual int svc (void); // Transmit peer messages. - virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask); - // Called when peers shutdown. + virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE, + ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK); + // When thread started, connection become blocked, so no need to use + // handle_close to reinitiate the connection_handler, so should + // override this function to justify if controlling is in thread or + // not. If yes, handle_close do nothing, otherwise, it call parent + // handle_close(). + +private: + int in_thread_; + // If the controlling is in thread's svc() or not. }; class Thr_Supplier_Handler : public Supplier_Handler @@ -123,11 +132,20 @@ public: // Initialize the object and spawn a new thread. protected: + virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE, + ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK); + // When thread started, connection become blocked, so no need to use + // handle_close to reinitiate the connection_handler, so should + // override this function to justify if controlling is in thread or + // not. If yes, handle_close do nothing, otherwise, it call parent + // handle_close(). + virtual int svc (void); // Transmit peer messages. - virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask); - // Called when peers shutdown. +private: + int in_thread_; + // If the controlling is in thread's svc() or not. }; #endif /* CONCRETE_CONNECTION_HANDLER */ diff --git a/apps/Gateway/Gateway/Event_Channel.cpp b/apps/Gateway/Gateway/Event_Channel.cpp index 7c236c862a9..350a72a4c16 100644 --- a/apps/Gateway/Gateway/Event_Channel.cpp +++ b/apps/Gateway/Gateway/Event_Channel.cpp @@ -30,8 +30,12 @@ Event_Channel::compute_performance_statistics (void) != Options::REACTIVE) { if (ACE_Thread_Manager::instance ()->suspend_all () == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "suspend_all"), -1); - ACE_DEBUG ((LM_DEBUG, "(%t) suspending all threads...")); + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + "suspend_all"), + -1); + ACE_DEBUG ((LM_DEBUG, + "(%t) suspending all threads...")); } size_t total_bytes_in = 0; @@ -74,10 +78,15 @@ Event_Channel::compute_performance_statistics (void) != Options::REACTIVE) { if (ACE_Thread_Manager::instance ()->resume_all () == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "resume_all"), -1); - ACE_DEBUG ((LM_DEBUG, "(%t) resuming all threads...")); + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + "resume_all"), + -1); + ACE_DEBUG ((LM_DEBUG, + "(%t) resuming all threads...")); } + return 0; } @@ -191,13 +200,15 @@ Event_Channel::routing_event (Event_Key *forwarding_address, // counting. ACE_Message_Block *dup_msg = data->duplicate (); - ACE_DEBUG ((LM_DEBUG, "(%t) forwarding to Consumer %d\n", + ACE_DEBUG ((LM_DEBUG, + "(%t) forwarding to Consumer %d\n", (*connection_handler)->connection_id ())); if ((*connection_handler)->put (dup_msg) == -1) { if (errno == EWOULDBLOCK) // The queue has filled up! - ACE_ERROR ((LM_ERROR, "(%t) %p\n", + ACE_ERROR ((LM_ERROR, + "(%t) %p\n", "gateway is flow controlled, so we're dropping events")); else ACE_ERROR ((LM_ERROR, @@ -216,11 +227,16 @@ Event_Channel::routing_event (Event_Key *forwarding_address, } int -Event_Channel::initiate_connection_connection (Connection_Handler *connection_handler) +Event_Channel::initiate_connection_connection (Connection_Handler *connection_handler, + int sync_directly) { ACE_Synch_Options synch_options; - if (Options::instance ()->blocking_semantics () == ACE_NONBLOCK) + if (sync_directly) + // In separated connection handler thread, connection can be + // initiated by block mode (synch mode) directly. + synch_options = ACE_Synch_Options::synch; + else if (Options::instance ()->blocking_semantics () == ACE_NONBLOCK) synch_options = ACE_Synch_Options::asynch; else synch_options = ACE_Synch_Options::synch; @@ -232,15 +248,20 @@ Event_Channel::initiate_connection_connection (Connection_Handler *connection_ha int Event_Channel::complete_connection_connection (Connection_Handler *connection_handler) { - int option = connection_handler->connection_role () == 'S' ? SO_RCVBUF : SO_SNDBUF; - int socket_queue_size = Options::instance ()->socket_queue_size (); + int option = connection_handler->connection_role () == 'S' + ? SO_RCVBUF + : SO_SNDBUF; + int socket_queue_size = + Options::instance ()->socket_queue_size (); if (socket_queue_size > 0) if (connection_handler->peer ().set_option (SOL_SOCKET, - option, - &socket_queue_size, - sizeof (int)) == -1) - ACE_ERROR ((LM_ERROR, "(%t) %p\n", "set_option")); + option, + &socket_queue_size, + sizeof (int)) == -1) + ACE_ERROR ((LM_ERROR, + "(%t) %p\n", + "set_option")); connection_handler->thr_mgr (ACE_Thread_Manager::instance ()); @@ -254,10 +275,12 @@ Event_Channel::complete_connection_connection (Connection_Handler *connection_ha // Send the connection id to the peerd. - ssize_t n = connection_handler->peer ().send ((const void *) &id, sizeof id); + ssize_t n = connection_handler->peer ().send ((const void *) &id, + sizeof id); if (n != sizeof id) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", n == 0 ? "peer has closed down unexpectedly" : "send"), -1); return 0; @@ -269,10 +292,9 @@ Event_Channel::complete_connection_connection (Connection_Handler *connection_ha int Event_Channel::reinitiate_connection_connection (Connection_Handler *connection_handler) { - // Skip over proxies with deactivated handles. - if (connection_handler->get_handle () != ACE_INVALID_HANDLE) - // Make sure to close down peer to reclaim descriptor. - connection_handler->peer ().close (); + // Cancel asynchronous connecting before re-initializing. It will + // close the peer and cancel the asynchronous connecting. + this->cancel_connection_connection(connection_handler); if (connection_handler->state () != Connection_Handler::DISCONNECTING) { @@ -282,9 +304,30 @@ Event_Channel::reinitiate_connection_connection (Connection_Handler *connection_ // Reschedule ourselves to try and connect again. if (ACE_Reactor::instance ()->schedule_timer - (connection_handler, 0, connection_handler->timeout ()) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", - "schedule_timer"), -1); + (connection_handler, + 0, + connection_handler->timeout ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + "schedule_timer"), + -1); + } + return 0; +} + +// It is useful to provide a separate method to cancel the +// asynchronous connecting. + +int +Event_Channel::cancel_connection_connection (Connection_Handler *connection_handler) +{ + // Skip over proxies with deactivated handles. + if (connection_handler->get_handle () != ACE_INVALID_HANDLE) + { + // Make sure to close down peer to reclaim descriptor. + connection_handler->peer ().close (); + // Cancel asynchronous connecting before re-initializing. + return this->connector_.cancel(connection_handler); } return 0; } @@ -338,7 +381,7 @@ Event_Channel::initiate_acceptors (void) } if (Options::instance ()->enabled (Options::SUPPLIER_ACCEPTOR)) { - if (this->supplier_acceptor_.open + if(this->supplier_acceptor_.open (Options::instance ()->supplier_acceptor_port (), ACE_Reactor::instance (), Options::instance ()->blocking_semantics ()) == -1) @@ -378,7 +421,7 @@ Event_Channel::close (u_long) // Iterate over all the handlers and shut them down. - for (CONNECTION_MAP_ENTRY *me; + for (CONNECTION_MAP_ENTRY *me = 0; // It's safe to reset me to 0. cmi.next (me) != 0; cmi.advance ()) { @@ -388,6 +431,10 @@ Event_Channel::close (u_long) "(%t) closing down connection %d\n", connection_handler->connection_id ())); + // If have no this statement, the gatewayd will abort when exiting + // with some Consumer/Supplier not connected. + if (connection_handler->state()==Connection_Handler::CONNECTING) + this->cancel_connection_connection(connection_handler); // Mark Connection_Handler as DISCONNECTING so we don't try to // reconnect... connection_handler->state (Connection_Handler::DISCONNECTING); @@ -407,7 +454,7 @@ Event_Channel::close (u_long) { CONNECTION_MAP_ITERATOR cmi (this->connection_map_); - for (CONNECTION_MAP_ENTRY *me; + for (CONNECTION_MAP_ENTRY *me = 0; // It's safe to reset me to 0. cmi.next (me) != 0; cmi.advance ()) { diff --git a/apps/Gateway/Gateway/Event_Channel.h b/apps/Gateway/Gateway/Event_Channel.h index 4dcb86d24d3..4f739d1cd8b 100644 --- a/apps/Gateway/Gateway/Event_Channel.h +++ b/apps/Gateway/Gateway/Event_Channel.h @@ -10,7 +10,7 @@ // Event_Channel.h // // = AUTHOR -// Doug Schmidt +// Doug Schmidt <schmidt@cs.wustl.edu> // // ============================================================================ @@ -47,8 +47,11 @@ public: // Close down the Channel. // = Proxy management methods. - int initiate_connection_connection (Connection_Handler *); + int initiate_connection_connection (Connection_Handler *, int sync_directly = 0); // Initiate the connection of the <Connection_Handler> to its peer. + // Second paratemer is used for thread connection-handler which will + // block the connecting procedure directly, need not care + // Options::blocking_semantics(). int complete_connection_connection (Connection_Handler *); // Complete the initialization of the <Connection_Handler> once it's @@ -56,6 +59,8 @@ public: int reinitiate_connection_connection (Connection_Handler *); // Reinitiate a connection asynchronously when the Peer fails. + int cancel_connection_connection (Connection_Handler *); + // Cancel a asynchronous connection. int bind_proxy (Connection_Handler *); // Bind the <Connection_Handler> to the <connection_map_>. diff --git a/apps/Gateway/Gateway/Options.cpp b/apps/Gateway/Gateway/Options.cpp index 0dde3ddd83b..8e8e6b22eac 100644 --- a/apps/Gateway/Gateway/Options.cpp +++ b/apps/Gateway/Gateway/Options.cpp @@ -11,6 +11,29 @@ ACE_RCSID(Gateway, Options, "$Id$") // Static initialization. Options *Options::instance_ = 0; +// Let's have a usage prompt. +void +Options::print_usage (void) +{ + ACE_DEBUG ((LM_INFO, + "gatewayd [-a {C|S}:acceptor-port] [-c {C|S}:connector-port]" + " [-C consumer_config_file] [-P connection_config_filename]" + " [-q socket_queue_size] [-t OUTPUT_MT|INPUT_MT] [-w time_out]" + " [-b] [-d] [-v] [-T]\n" + "" + "\t-a Become an Acceptor\n" + "\t-b Use blocking connection establishment\n" + "\t-c Become a Connector\n" + "\t-d debugging\n" + "\t-q Use a different socket queue size\n" + "\t-t Use a different threading strategy\n" + "\t-v Verbose mode\n" + "\t-w Time performance for a designated amount of time\n" + "\t-C Use a different proxy config filename\n" + "\t-P Use a different consumer config filename\n" + "\t-T Tracing\n" + )); +} Options * Options::instance (void) { @@ -220,7 +243,7 @@ Options::parse_args (int argc, char *argv[]) ACE_SET_BITS (this->options_, Options::DEBUG); break; - case 'P': // Use a different consumer config filename. + case 'P': // Use a different connection config filename. ACE_OS::strncpy (this->connection_config_file_, get_opt.optarg, sizeof this->connection_config_file_); @@ -252,6 +275,7 @@ Options::parse_args (int argc, char *argv[]) this->blocking_semantics_ = 0; break; default: + this->print_usage(); // It's nice to have a usage prompt. break; } } diff --git a/apps/Gateway/Gateway/Options.h b/apps/Gateway/Gateway/Options.h index b3da9174a9b..5c56816f39a 100644 --- a/apps/Gateway/Gateway/Options.h +++ b/apps/Gateway/Gateway/Options.h @@ -10,7 +10,7 @@ // Options.h // // = AUTHOR -// Douglas C. Schmidt +// Douglas C. Schmidt <schmidt@cs.wustl.edu> // // ============================================================================ @@ -54,6 +54,8 @@ public: int parse_args (int argc, char *argv[]); // Parse the arguments and set the options. + void print_usage(void); + // Print the gateway supported parameters. // = Accessor methods. int enabled (int option) const; // Determine if an option is enabled. diff --git a/apps/Gateway/Peer/Peer.cpp b/apps/Gateway/Peer/Peer.cpp index e6cb4f97956..e8fd5adca98 100644 --- a/apps/Gateway/Peer/Peer.cpp +++ b/apps/Gateway/Peer/Peer.cpp @@ -7,13 +7,14 @@ ACE_RCSID(Peer, Peer, "$Id$") Peer_Handler::Peer_Handler (void) - : connection_id_ (0), + : connection_id_ (-1), // Maybe it's better than 0. msg_frag_ (0), total_bytes_ (0) { // Set the high water mark of the <ACE_Message_Queue>. This is used // to exert flow control. this->msg_queue ()->high_water_mark (Options::instance ()->max_queue_size ()); + first_time_ = 1; // It will be first time to open Peer_Handler. } // Upcall from the <ACE_Acceptor::handle_input> that turns control @@ -93,7 +94,7 @@ Peer_Handler::transmit (ACE_Message_Block *mb, else ACE_ERROR ((LM_ERROR, "%p\n", - "transmission failure in transmit_stdin")); + "transmission failure in transmit()")); // Function name fixed. // Caller is responsible for freeing a ACE_Message_Block // if failures occur. mb->release (); @@ -107,6 +108,8 @@ Peer_Handler::transmit (ACE_Message_Block *mb, int Peer_Handler::transmit_stdin (void) { + // If return value is -1, then first_time_ must be reset to 1. + int result = 0; if (this->connection_id_ != -1) { ACE_Message_Block *mb; @@ -133,6 +136,7 @@ Peer_Handler::transmit_stdin (void) (ACE_STDIN, ACE_Event_Handler::DONT_CALL | ACE_Event_Handler::READ_MASK); mb->release (); + result = 0; // break; /* NOTREACHED */ case -1: @@ -140,18 +144,30 @@ Peer_Handler::transmit_stdin (void) ACE_ERROR ((LM_ERROR, "%p\n", "read")); + result = 0; // break; /* NOTREACHED */ default: - return this->transmit (mb, n, ROUTING_EVENT); + // Do not return directly, save the return value. + result = this->transmit (mb, n, ROUTING_EVENT); + break; /* NOTREACHED */ } - return 0; - } + // Do not return at here, but at exit of function. + /*return 0;*/ + } + else + { ACE_DEBUG ((LM_DEBUG, "Must transmit over an opened channel.\n")); - return -1; + result = -1; // Save return value at here, return at exit of function. + } + // If transmit error, the stdin-thread will be cancelled, so should + // reset first_time_ to 1, which will register_stdin_handler again. + if (result == -1) + first_time_ = 1; + return ret; } // Perform a non-blocking <put> of event MB. If we are unable to send @@ -387,7 +403,8 @@ Peer_Handler::recv (ACE_Message_Block *&mb) ssize_t data_bytes_left_to_read = ssize_t (event->header_.len_ - (msg_frag_->wr_ptr () - event->data_)); - ssize_t data_received = + // peer().recv() should not be called when data_bytes_left_to_read is 0. + ssize_t data_received = !data_bytes_left_to_read ? 0 : this->peer ().recv (this->msg_frag_->wr_ptr (), data_bytes_left_to_read); @@ -403,8 +420,12 @@ Peer_Handler::recv (ACE_Message_Block *&mb) /* FALLTHROUGH */; case 0: // Premature EOF. + if (data_bytes_left_to_read) + { this->msg_frag_ = this->msg_frag_->release (); return 0; + } + /* FALLTHROUGH */; default: // Set the write pointer at 1 past the end of the event. @@ -492,21 +513,32 @@ Peer_Handler::await_connection_id (void) if (Options::instance ()->enabled (Options::CONSUMER_CONNECTOR)) this->subscribe (); + // No need to disconnect by timeout. + ACE_Reactor::instance ()->cancel_timer(this); // Transition to the action that waits for Peer events. this->do_action_ = &Peer_Handler::await_events; // Reset standard input. ACE_OS::rewind (stdin); - // Register this handler to receive test events on stdin. - if (ACE_Event_Handler::register_stdin_handler - (this, - ACE_Reactor::instance (), - ACE_Thread_Manager::instance ()) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "(%t) %p\n", - "register_stdin_handler"), - -1); + // Call register_stdin_handler only once, until the stdin-thread + // closed which caused by transmit_stdin error. + if (first_time_) + { + // Register this handler to receive test events on stdin. + if (ACE_Event_Handler::register_stdin_handler + (this, + ACE_Reactor::instance (), + ACE_Thread_Manager::instance ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + "register_stdin_handler"), + -1); + + // Next time in await_connection_id(), I'll don't call + // register_stdin_handler(). + first_time_ = 0; + } return 0; } @@ -519,8 +551,11 @@ Peer_Handler::subscribe (void) ACE_Message_Block (sizeof (Event)), -1); - Subscription *subscription = (Subscription *) ((Event *) mb->rd_ptr ())->data_; - subscription->connection_id_ = Options::instance ()->connection_id (); + Subscription *subscription = + (Subscription *) ((Event *) mb->rd_ptr ())->data_; + subscription->connection_id_ = + Options::instance ()->connection_id (); + return this->transmit (mb, sizeof *subscription, SUBSCRIPTION_EVENT); } @@ -738,8 +773,11 @@ int Peer_Factory::handle_signal (int signum, siginfo_t *, ucontext_t *) { if (signum != SIGPIPE) + { // Shut down the main event loop. + ACE_DEBUG((LM_NOTICE, "Exit case signal\n")); // Why do I exit? ACE_Reactor::end_event_loop(); + } return 0; } @@ -817,14 +855,14 @@ Peer_Factory::init (int argc, char *argv[]) -1); if (Options::instance ()->enabled (Options::SUPPLIER_ACCEPTOR) - && this->consumer_acceptor_.open + && this->supplier_acceptor_.open (Options::instance ()->supplier_acceptor_port ()) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Acceptor::open"), -1); else if (Options::instance ()->enabled (Options::CONSUMER_ACCEPTOR) - && this->supplier_acceptor_.open + && this->consumer_acceptor_.open (Options::instance ()->consumer_acceptor_port ()) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", diff --git a/apps/Gateway/Peer/Peer.h b/apps/Gateway/Peer/Peer.h index 8a48de1c9a7..78cac65fa4a 100644 --- a/apps/Gateway/Peer/Peer.h +++ b/apps/Gateway/Peer/Peer.h @@ -140,6 +140,10 @@ protected: size_t total_bytes_; // The total number of bytes sent/received to the gatewayd thus far. + + int first_time_; + // Used to call register_stdin_handle only once. Otherwise, thread + // leak will occur on Win32. }; class ACE_Svc_Export Peer_Acceptor : public ACE_Acceptor<Peer_Handler, ACE_SOCK_ACCEPTOR> |