summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorschmidt <douglascraigschmidt@users.noreply.github.com>2001-03-21 11:59:40 +0000
committerschmidt <douglascraigschmidt@users.noreply.github.com>2001-03-21 11:59:40 +0000
commitb7ff7d2f3f86fc125f741cfe56a721e86e516993 (patch)
tree623a559f568a8a9e904a4e8bdd63e2cf3916738e
parent5156e424454de9a2e949a25ad2ee39a951feb57a (diff)
downloadATCD-b7ff7d2f3f86fc125f741cfe56a721e86e516993.tar.gz
ChangeLogTag:Tue Mar 20 17:31:21 2001 Carlos O'Ryan <coryan@uci.edu>
-rw-r--r--ChangeLog16
-rw-r--r--ChangeLogs/ChangeLog-02a16
-rw-r--r--ChangeLogs/ChangeLog-03a16
-rw-r--r--THANKS2
-rw-r--r--ace/OS.i6
-rw-r--r--ace/Select_Reactor_T.h2
-rw-r--r--apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp153
-rw-r--r--apps/Gateway/Gateway/Concrete_Connection_Handlers.h28
-rw-r--r--apps/Gateway/Gateway/Event_Channel.cpp99
-rw-r--r--apps/Gateway/Gateway/Event_Channel.h9
-rw-r--r--apps/Gateway/Gateway/Options.cpp26
-rw-r--r--apps/Gateway/Gateway/Options.h4
-rw-r--r--apps/Gateway/Peer/Peer.cpp78
-rw-r--r--apps/Gateway/Peer/Peer.h4
14 files changed, 355 insertions, 104 deletions
diff --git a/ChangeLog b/ChangeLog
index 50a439b4d92..9e9168e1a0b 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -4,6 +4,13 @@ Tue Mar 20 17:31:21 2001 Carlos O'Ryan <coryan@uci.edu>
The Logging_Strategy_Test dynamically loads the Logger service
(from netsvcs), therefore it does not work on static builds.
+Tue Mar 20 18:17:24 2001 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu>
+
+ * ace/OS.i: Fixed the USYNC_PROCESS arm of CreateMutex() in
+ ACE_OS::mutex_init() so that it calls
+ ACE_OS::set_errno_to_last_error(). Thanks to Ram Ben-Yakir
+ <Ram@bandwiz.com> for reporting this.
+
Tue Mar 20 01:33:24 2001 Ossama Othman <ossama@uci.edu>
* ace/SSL/SSL_SOCK_Acceptor.cpp (ssl_accept):
@@ -71,8 +78,15 @@ Sun Mar 18 22:12:16 2001 Ossama Othman <ossama@uci.edu>
Sun Mar 18 09:46:47 2001 Balachandran Natarajan <bala@cs.wustl.edu>
- * tests/New_Fail_Test.cpp: Fixed warnings in g++.
+ * tests/New_Fail_Test.cpp: Fixed warnings in g++.
+
+Sun Mar 18 08:31:34 2001 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu>
+ * apps/Gateway/Gateway,
+ * apps/Gateway/Peer: Added a number of fixes to the Gateway and Peer
+ applications. Thanks to Lu Yunhai <luyunhai@huawei.com> for
+ contributing these.
+
Sun Mar 18 08:31:34 2001 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu>
* examples/IPC_SAP/FILE_SAP/client.cpp (main): Added a couple of
diff --git a/ChangeLogs/ChangeLog-02a b/ChangeLogs/ChangeLog-02a
index 50a439b4d92..9e9168e1a0b 100644
--- a/ChangeLogs/ChangeLog-02a
+++ b/ChangeLogs/ChangeLog-02a
@@ -4,6 +4,13 @@ Tue Mar 20 17:31:21 2001 Carlos O'Ryan <coryan@uci.edu>
The Logging_Strategy_Test dynamically loads the Logger service
(from netsvcs), therefore it does not work on static builds.
+Tue Mar 20 18:17:24 2001 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu>
+
+ * ace/OS.i: Fixed the USYNC_PROCESS arm of CreateMutex() in
+ ACE_OS::mutex_init() so that it calls
+ ACE_OS::set_errno_to_last_error(). Thanks to Ram Ben-Yakir
+ <Ram@bandwiz.com> for reporting this.
+
Tue Mar 20 01:33:24 2001 Ossama Othman <ossama@uci.edu>
* ace/SSL/SSL_SOCK_Acceptor.cpp (ssl_accept):
@@ -71,8 +78,15 @@ Sun Mar 18 22:12:16 2001 Ossama Othman <ossama@uci.edu>
Sun Mar 18 09:46:47 2001 Balachandran Natarajan <bala@cs.wustl.edu>
- * tests/New_Fail_Test.cpp: Fixed warnings in g++.
+ * tests/New_Fail_Test.cpp: Fixed warnings in g++.
+
+Sun Mar 18 08:31:34 2001 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu>
+ * apps/Gateway/Gateway,
+ * apps/Gateway/Peer: Added a number of fixes to the Gateway and Peer
+ applications. Thanks to Lu Yunhai <luyunhai@huawei.com> for
+ contributing these.
+
Sun Mar 18 08:31:34 2001 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu>
* examples/IPC_SAP/FILE_SAP/client.cpp (main): Added a couple of
diff --git a/ChangeLogs/ChangeLog-03a b/ChangeLogs/ChangeLog-03a
index 50a439b4d92..9e9168e1a0b 100644
--- a/ChangeLogs/ChangeLog-03a
+++ b/ChangeLogs/ChangeLog-03a
@@ -4,6 +4,13 @@ Tue Mar 20 17:31:21 2001 Carlos O'Ryan <coryan@uci.edu>
The Logging_Strategy_Test dynamically loads the Logger service
(from netsvcs), therefore it does not work on static builds.
+Tue Mar 20 18:17:24 2001 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu>
+
+ * ace/OS.i: Fixed the USYNC_PROCESS arm of CreateMutex() in
+ ACE_OS::mutex_init() so that it calls
+ ACE_OS::set_errno_to_last_error(). Thanks to Ram Ben-Yakir
+ <Ram@bandwiz.com> for reporting this.
+
Tue Mar 20 01:33:24 2001 Ossama Othman <ossama@uci.edu>
* ace/SSL/SSL_SOCK_Acceptor.cpp (ssl_accept):
@@ -71,8 +78,15 @@ Sun Mar 18 22:12:16 2001 Ossama Othman <ossama@uci.edu>
Sun Mar 18 09:46:47 2001 Balachandran Natarajan <bala@cs.wustl.edu>
- * tests/New_Fail_Test.cpp: Fixed warnings in g++.
+ * tests/New_Fail_Test.cpp: Fixed warnings in g++.
+
+Sun Mar 18 08:31:34 2001 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu>
+ * apps/Gateway/Gateway,
+ * apps/Gateway/Peer: Added a number of fixes to the Gateway and Peer
+ applications. Thanks to Lu Yunhai <luyunhai@huawei.com> for
+ contributing these.
+
Sun Mar 18 08:31:34 2001 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu>
* examples/IPC_SAP/FILE_SAP/client.cpp (main): Added a couple of
diff --git a/THANKS b/THANKS
index 79b93f234b9..64c491f92b2 100644
--- a/THANKS
+++ b/THANKS
@@ -1179,6 +1179,8 @@ Rahul Shukla <rshukla@ggn.aithent.com>
Pierre Fayolle <fayolle@enseirb.fr>
Greg McCain <greg.mccain@veritas.com>
Matt Cheers <matt.cheers@boeing.com>
+Benjamin Fry <ben@thrownet.com>
+Ram Ben-Yakir <Ram@bandwiz.com>
I would particularly like to thank Paul Stephenson, who worked with me
at Ericsson in the early 1990's. Paul devised the recursive Makefile
diff --git a/ace/OS.i b/ace/OS.i
index 6ffc6f973ac..7684b82e0e0 100644
--- a/ace/OS.i
+++ b/ace/OS.i
@@ -1598,7 +1598,11 @@ ACE_OS::mutex_init (ACE_mutex_t *m,
if (m->proc_mutex_ == 0)
ACE_FAIL_RETURN (-1);
else
- return 0;
+ {
+ // Make sure to set errno to ERROR_ALREADY_EXISTS if necessary.
+ ACE_OS::set_errno_to_last_error ();
+ return 0;
+ }
case USYNC_THREAD:
return ACE_OS::thread_mutex_init (&m->thr_mutex_,
type,
diff --git a/ace/Select_Reactor_T.h b/ace/Select_Reactor_T.h
index 5cb075391ad..6537effca36 100644
--- a/ace/Select_Reactor_T.h
+++ b/ace/Select_Reactor_T.h
@@ -514,7 +514,7 @@ public:
/// Wake up all threads in waiting in the event loop
virtual void wakeup_all_threads (void);
- // = Only the owner thread that can perform a <handle_events>.
+ // = Only the owner thread can perform a <handle_events>.
/// Set the new owner of the thread and return the old owner.
virtual int owner (ACE_thread_t n_id, ACE_thread_t *o_id = 0);
diff --git a/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp b/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp
index b1acae66bde..95c82b8c159 100644
--- a/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp
+++ b/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp
@@ -18,22 +18,28 @@ Consumer_Handler::Consumer_Handler (const Connection_Config_Info &pci)
// unexpectedly. This method simply marks the Connection_Handler as
// having failed so that handle_close () can reconnect.
+// Do not close handler when received data successfully.
+// Consumer_Handler should could process received data.
+// For example, Consumer could send reply-event to Supplier.
int
Consumer_Handler::handle_input (ACE_HANDLE)
{
- char buf[1];
+ // Do not set FAILED state at here, just at real failed place.
- this->state (Connection_Handler::FAILED);
+ char buf[BUFSIZ];
+ ssize_t received = this->peer ().recv (buf, sizeof buf);
- switch (this->peer ().recv (buf, sizeof buf))
+ switch (received)
{
case -1:
+ this->state (Connection_Handler::FAILED);
ACE_ERROR_RETURN ((LM_ERROR,
"(%t) Peer has failed unexpectedly for Consumer_Handler %d\n",
this->connection_id ()),
-1);
/* NOTREACHED */
case 0:
+ this->state (Connection_Handler::FAILED);
ACE_ERROR_RETURN ((LM_ERROR,
"(%t) Peer has shutdown unexpectedly for Consumer_Handler %d\n",
this->connection_id ()),
@@ -41,9 +47,11 @@ Consumer_Handler::handle_input (ACE_HANDLE)
/* NOTREACHED */
default:
ACE_ERROR_RETURN ((LM_ERROR,
- "(%t) Consumer is erroneously sending input to Consumer_Handler %d\n",
- this->connection_id ()),
- -1);
+ "(%t) IGNORED: Consumer is erroneously sending input to Consumer_Handler %d\n"
+ "data size = %d\n",
+ this->connection_id (),
+ received),
+ 0); // Return 0 to identify received data successfully.
/* NOTREACHED */
}
}
@@ -392,7 +400,9 @@ Supplier_Handler::recv (ACE_Message_Block *&forward_addr)
ssize_t (event->header_.len_ - (msg_frag_->wr_ptr () - event->data_));
ssize_t data_received =
- this->peer ().recv (this->msg_frag_->wr_ptr (), data_bytes_left_to_read);
+ !data_bytes_left_to_read
+ ? 0 // peer().recv() should not be called when data_bytes_left_to_read is 0.
+ : this->peer ().recv (this->msg_frag_->wr_ptr (), data_bytes_left_to_read);
// Try to receive the remainder of the event.
@@ -406,8 +416,12 @@ Supplier_Handler::recv (ACE_Message_Block *&forward_addr)
/* FALLTHROUGH */;
case 0: // Premature EOF.
- this->msg_frag_ = this->msg_frag_->release ();
- return 0;
+ if (data_bytes_left_to_read)
+ {
+ this->msg_frag_ = this->msg_frag_->release ();
+ return 0;
+ }
+ /* FALLTHROUGH */;
default:
// Set the write pointer at 1 past the end of the event.
@@ -468,8 +482,8 @@ Supplier_Handler::recv (ACE_Message_Block *&forward_addr)
}
}
-// Receive various types of input (e.g., Peer event from the
-// gatewayd, as well as stdio).
+// Receive various types of input (e.g., Peer event from the gatewayd,
+// as well as stdio).
int
Supplier_Handler::handle_input (ACE_HANDLE)
@@ -519,26 +533,47 @@ Supplier_Handler::process (ACE_Message_Block *event_key)
Thr_Consumer_Handler::Thr_Consumer_Handler (const Connection_Config_Info &pci)
: Consumer_Handler (pci)
{
+ // It is not in thread svc() now.
+ in_thread_ = 0;
+}
+
+// Overriding handle_close() method. If in thread svc(), no need to
+// process handle_close() when call peer().close(), because the
+// connection is blocked now.
+
+int
+Thr_Consumer_Handler::handle_close (ACE_HANDLE h, ACE_Reactor_Mask m)
+{
+ if (in_thread_)
+ return 0;
+ else
+ return Consumer_Handler::handle_close (h, m);
}
// This method should be called only when the Consumer shuts down
-// unexpectedly. This method marks the Connection_Handler as having failed
-// and deactivates the ACE_Message_Queue (to wake up the thread
+// unexpectedly. This method marks the Connection_Handler as having
+// failed and deactivates the ACE_Message_Queue (to wake up the thread
// blocked on <dequeue_head> in svc()).
-// Thr_Output_Handler::handle_close () will eventually try to
+// Thr_Consumer_Handler::handle_close () will eventually try to
// reconnect...
+// Let Consumer_Handler receive normal data.
int
Thr_Consumer_Handler::handle_input (ACE_HANDLE h)
{
// Call down to the <Consumer_Handler> to handle this first.
- this->Consumer_Handler::handle_input (h);
+ if (this->Consumer_Handler::handle_input (h) != 0)
+ {
+ // Only do such work when failed.
- ACE_Reactor::instance ()->remove_handler
- (h, ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL);
+ ACE_Reactor::instance ()->remove_handler
+ (h, ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL);
- // Deactivate the queue while we try to get reconnected.
- this->msg_queue ()->deactivate ();
+ // Deactivate the queue while we try to get reconnected.
+ this->msg_queue ()->deactivate ();
+ // Will call handle_close.
+ return -1;
+ }
return 0;
}
@@ -550,31 +585,42 @@ Thr_Consumer_Handler::open (void *)
{
// Turn off non-blocking I/O.
if (this->peer ().disable (ACE_NONBLOCK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1);
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) %p\n",
+ "disable"),
+ -1); // Incorrect info fixed.
// Call back to the <Event_Channel> to complete our initialization.
else if (this->event_channel_->complete_connection_connection (this) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_connection_connection"), -1);
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) %p\n",
+ "complete_connection_connection"),
+ -1);
// Register ourselves to receive input events (which indicate that
// the Consumer has shut down unexpectedly).
else if (ACE_Reactor::instance ()->register_handler
(this, ACE_Event_Handler::READ_MASK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1);
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) %p\n",
+ "register_handler"),
+ -1);
// Reactivate message queue. If it was active then this is the
// first time in and we need to spawn a thread, otherwise the queue
// was inactive due to some problem and we've already got a thread.
else if (this->msg_queue ()->activate () == ACE_Message_Queue<ACE_SYNCH>::WAS_ACTIVE)
{
- ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n"));
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) spawning new thread\n"));
// Become an active object by spawning a new thread to transmit
// events to Consumers.
return this->activate (THR_NEW_LWP | THR_DETACHED);
}
else
{
- ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n"));
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) reusing existing thread\n"));
return 0;
}
}
@@ -597,8 +643,7 @@ Thr_Consumer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *)
int
Thr_Consumer_Handler::svc (void)
{
-
- for (;;)
+ for (in_thread_ = 1;;)
{
ACE_DEBUG ((LM_DEBUG,
"(%t) Thr_Consumer_Handler's handle = %d\n",
@@ -627,7 +672,12 @@ Thr_Consumer_Handler::svc (void)
// Re-establish the connection, using expoential backoff.
for (this->timeout (1);
// Default is to reconnect synchronously.
- this->event_channel_->initiate_connection_connection (this) == -1; )
+ this->event_channel_->initiate_connection_connection (this, 1) == -1;
+ // Second parameter '1' means using sync mode directly,
+ // don't care Options::blocking_semantics(). If don't do
+ // so, async mode will be used to connect which won't
+ // satisfy original design.
+ )
{
ACE_Time_Value tv (this->timeout ());
@@ -645,6 +695,21 @@ Thr_Consumer_Handler::svc (void)
Thr_Supplier_Handler::Thr_Supplier_Handler (const Connection_Config_Info &pci)
: Supplier_Handler (pci)
{
+ // It is not in thread svc() now.
+ in_thread_ = 0;
+}
+
+// Overriding handle_close() method. If in thread svc(), no need to
+// process handle_close() when call peer().close(), because the
+// connection is blocked now.
+
+int
+Thr_Supplier_Handler::handle_close (ACE_HANDLE h, ACE_Reactor_Mask m)
+{
+ if (in_thread_)
+ return 0;
+ else
+ return Supplier_Handler::handle_close (h, m);
}
int
@@ -652,18 +717,25 @@ Thr_Supplier_Handler::open (void *)
{
// Turn off non-blocking I/O.
if (this->peer ().disable (ACE_NONBLOCK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1);
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) %p\n",
+ "disable"),
+ -1); // Incorrect info fixed.
// Call back to the <Event_Channel> to complete our initialization.
else if (this->event_channel_->complete_connection_connection (this) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_connection_connection"), -1);
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) %p\n",
+ "complete_connection_connection"),
+ -1);
// Reactivate message queue. If it was active then this is the
// first time in and we need to spawn a thread, otherwise the queue
// was inactive due to some problem and we've already got a thread.
else if (this->msg_queue ()->activate () == ACE_Message_Queue<ACE_SYNCH>::WAS_ACTIVE)
{
- ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n"));
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) spawning new thread\n"));
// Become an active object by spawning a new thread to transmit
// events to peers.
return this->activate (THR_NEW_LWP | THR_DETACHED);
@@ -681,7 +753,7 @@ Thr_Supplier_Handler::open (void *)
int
Thr_Supplier_Handler::svc (void)
{
- for (;;)
+ for (in_thread_ = 1;;)
{
ACE_DEBUG ((LM_DEBUG,
"(%t) Thr_Supplier_Handler's handle = %d\n",
@@ -708,7 +780,12 @@ Thr_Supplier_Handler::svc (void)
// Re-establish the connection, using expoential backoff.
for (this->timeout (1);
// Default is to reconnect synchronously.
- this->event_channel_->initiate_connection_connection (this) == -1; )
+ this->event_channel_->initiate_connection_connection (this, 1) == -1;
+ // Second parameter '1' means using sync mode directly,
+ // don't care Options::blocking_semantics(). If don't do
+ // so, async mode will be used to connect which won't
+ // satisfy original design.
+ )
{
ACE_Time_Value tv (this->timeout ());
ACE_ERROR ((LM_ERROR,
@@ -719,15 +796,3 @@ Thr_Supplier_Handler::svc (void)
}
ACE_NOTREACHED(return 0);
}
-
-int
-Thr_Consumer_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
-{
- return 0;
-}
-
-int
-Thr_Supplier_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
-{
- return 0;
-}
diff --git a/apps/Gateway/Gateway/Concrete_Connection_Handlers.h b/apps/Gateway/Gateway/Concrete_Connection_Handlers.h
index 3f9203d037b..287a4c8ec34 100644
--- a/apps/Gateway/Gateway/Concrete_Connection_Handlers.h
+++ b/apps/Gateway/Gateway/Concrete_Connection_Handlers.h
@@ -14,7 +14,7 @@
// appropriate threaded/reactive Consumer/Supplier behavior.
//
// = AUTHOR
-// Doug Schmidt
+// Doug Schmidt <schmidt@cs.wustl.edu>
//
// ============================================================================
@@ -108,8 +108,17 @@ protected:
virtual int svc (void);
// Transmit peer messages.
- virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask);
- // Called when peers shutdown.
+ virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,
+ ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK);
+ // When thread started, connection become blocked, so no need to use
+ // handle_close to reinitiate the connection_handler, so should
+ // override this function to justify if controlling is in thread or
+ // not. If yes, handle_close do nothing, otherwise, it call parent
+ // handle_close().
+
+private:
+ int in_thread_;
+ // If the controlling is in thread's svc() or not.
};
class Thr_Supplier_Handler : public Supplier_Handler
@@ -123,11 +132,20 @@ public:
// Initialize the object and spawn a new thread.
protected:
+ virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,
+ ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK);
+ // When thread started, connection become blocked, so no need to use
+ // handle_close to reinitiate the connection_handler, so should
+ // override this function to justify if controlling is in thread or
+ // not. If yes, handle_close do nothing, otherwise, it call parent
+ // handle_close().
+
virtual int svc (void);
// Transmit peer messages.
- virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask);
- // Called when peers shutdown.
+private:
+ int in_thread_;
+ // If the controlling is in thread's svc() or not.
};
#endif /* CONCRETE_CONNECTION_HANDLER */
diff --git a/apps/Gateway/Gateway/Event_Channel.cpp b/apps/Gateway/Gateway/Event_Channel.cpp
index 7c236c862a9..350a72a4c16 100644
--- a/apps/Gateway/Gateway/Event_Channel.cpp
+++ b/apps/Gateway/Gateway/Event_Channel.cpp
@@ -30,8 +30,12 @@ Event_Channel::compute_performance_statistics (void)
!= Options::REACTIVE)
{
if (ACE_Thread_Manager::instance ()->suspend_all () == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "suspend_all"), -1);
- ACE_DEBUG ((LM_DEBUG, "(%t) suspending all threads..."));
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) %p\n",
+ "suspend_all"),
+ -1);
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) suspending all threads..."));
}
size_t total_bytes_in = 0;
@@ -74,10 +78,15 @@ Event_Channel::compute_performance_statistics (void)
!= Options::REACTIVE)
{
if (ACE_Thread_Manager::instance ()->resume_all () == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "resume_all"), -1);
- ACE_DEBUG ((LM_DEBUG, "(%t) resuming all threads..."));
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) %p\n",
+ "resume_all"),
+ -1);
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) resuming all threads..."));
}
+
return 0;
}
@@ -191,13 +200,15 @@ Event_Channel::routing_event (Event_Key *forwarding_address,
// counting.
ACE_Message_Block *dup_msg = data->duplicate ();
- ACE_DEBUG ((LM_DEBUG, "(%t) forwarding to Consumer %d\n",
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) forwarding to Consumer %d\n",
(*connection_handler)->connection_id ()));
if ((*connection_handler)->put (dup_msg) == -1)
{
if (errno == EWOULDBLOCK) // The queue has filled up!
- ACE_ERROR ((LM_ERROR, "(%t) %p\n",
+ ACE_ERROR ((LM_ERROR,
+ "(%t) %p\n",
"gateway is flow controlled, so we're dropping events"));
else
ACE_ERROR ((LM_ERROR,
@@ -216,11 +227,16 @@ Event_Channel::routing_event (Event_Key *forwarding_address,
}
int
-Event_Channel::initiate_connection_connection (Connection_Handler *connection_handler)
+Event_Channel::initiate_connection_connection (Connection_Handler *connection_handler,
+ int sync_directly)
{
ACE_Synch_Options synch_options;
- if (Options::instance ()->blocking_semantics () == ACE_NONBLOCK)
+ if (sync_directly)
+ // In separated connection handler thread, connection can be
+ // initiated by block mode (synch mode) directly.
+ synch_options = ACE_Synch_Options::synch;
+ else if (Options::instance ()->blocking_semantics () == ACE_NONBLOCK)
synch_options = ACE_Synch_Options::asynch;
else
synch_options = ACE_Synch_Options::synch;
@@ -232,15 +248,20 @@ Event_Channel::initiate_connection_connection (Connection_Handler *connection_ha
int
Event_Channel::complete_connection_connection (Connection_Handler *connection_handler)
{
- int option = connection_handler->connection_role () == 'S' ? SO_RCVBUF : SO_SNDBUF;
- int socket_queue_size = Options::instance ()->socket_queue_size ();
+ int option = connection_handler->connection_role () == 'S'
+ ? SO_RCVBUF
+ : SO_SNDBUF;
+ int socket_queue_size =
+ Options::instance ()->socket_queue_size ();
if (socket_queue_size > 0)
if (connection_handler->peer ().set_option (SOL_SOCKET,
- option,
- &socket_queue_size,
- sizeof (int)) == -1)
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "set_option"));
+ option,
+ &socket_queue_size,
+ sizeof (int)) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "(%t) %p\n",
+ "set_option"));
connection_handler->thr_mgr (ACE_Thread_Manager::instance ());
@@ -254,10 +275,12 @@ Event_Channel::complete_connection_connection (Connection_Handler *connection_ha
// Send the connection id to the peerd.
- ssize_t n = connection_handler->peer ().send ((const void *) &id, sizeof id);
+ ssize_t n = connection_handler->peer ().send ((const void *) &id,
+ sizeof id);
if (n != sizeof id)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) %p\n",
n == 0 ? "peer has closed down unexpectedly" : "send"),
-1);
return 0;
@@ -269,10 +292,9 @@ Event_Channel::complete_connection_connection (Connection_Handler *connection_ha
int
Event_Channel::reinitiate_connection_connection (Connection_Handler *connection_handler)
{
- // Skip over proxies with deactivated handles.
- if (connection_handler->get_handle () != ACE_INVALID_HANDLE)
- // Make sure to close down peer to reclaim descriptor.
- connection_handler->peer ().close ();
+ // Cancel asynchronous connecting before re-initializing. It will
+ // close the peer and cancel the asynchronous connecting.
+ this->cancel_connection_connection(connection_handler);
if (connection_handler->state () != Connection_Handler::DISCONNECTING)
{
@@ -282,9 +304,30 @@ Event_Channel::reinitiate_connection_connection (Connection_Handler *connection_
// Reschedule ourselves to try and connect again.
if (ACE_Reactor::instance ()->schedule_timer
- (connection_handler, 0, connection_handler->timeout ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "schedule_timer"), -1);
+ (connection_handler,
+ 0,
+ connection_handler->timeout ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) %p\n",
+ "schedule_timer"),
+ -1);
+ }
+ return 0;
+}
+
+// It is useful to provide a separate method to cancel the
+// asynchronous connecting.
+
+int
+Event_Channel::cancel_connection_connection (Connection_Handler *connection_handler)
+{
+ // Skip over proxies with deactivated handles.
+ if (connection_handler->get_handle () != ACE_INVALID_HANDLE)
+ {
+ // Make sure to close down peer to reclaim descriptor.
+ connection_handler->peer ().close ();
+ // Cancel asynchronous connecting before re-initializing.
+ return this->connector_.cancel(connection_handler);
}
return 0;
}
@@ -338,7 +381,7 @@ Event_Channel::initiate_acceptors (void)
}
if (Options::instance ()->enabled (Options::SUPPLIER_ACCEPTOR))
{
- if (this->supplier_acceptor_.open
+ if(this->supplier_acceptor_.open
(Options::instance ()->supplier_acceptor_port (),
ACE_Reactor::instance (),
Options::instance ()->blocking_semantics ()) == -1)
@@ -378,7 +421,7 @@ Event_Channel::close (u_long)
// Iterate over all the handlers and shut them down.
- for (CONNECTION_MAP_ENTRY *me;
+ for (CONNECTION_MAP_ENTRY *me = 0; // It's safe to reset me to 0.
cmi.next (me) != 0;
cmi.advance ())
{
@@ -388,6 +431,10 @@ Event_Channel::close (u_long)
"(%t) closing down connection %d\n",
connection_handler->connection_id ()));
+ // If have no this statement, the gatewayd will abort when exiting
+ // with some Consumer/Supplier not connected.
+ if (connection_handler->state()==Connection_Handler::CONNECTING)
+ this->cancel_connection_connection(connection_handler);
// Mark Connection_Handler as DISCONNECTING so we don't try to
// reconnect...
connection_handler->state (Connection_Handler::DISCONNECTING);
@@ -407,7 +454,7 @@ Event_Channel::close (u_long)
{
CONNECTION_MAP_ITERATOR cmi (this->connection_map_);
- for (CONNECTION_MAP_ENTRY *me;
+ for (CONNECTION_MAP_ENTRY *me = 0; // It's safe to reset me to 0.
cmi.next (me) != 0;
cmi.advance ())
{
diff --git a/apps/Gateway/Gateway/Event_Channel.h b/apps/Gateway/Gateway/Event_Channel.h
index 4dcb86d24d3..4f739d1cd8b 100644
--- a/apps/Gateway/Gateway/Event_Channel.h
+++ b/apps/Gateway/Gateway/Event_Channel.h
@@ -10,7 +10,7 @@
// Event_Channel.h
//
// = AUTHOR
-// Doug Schmidt
+// Doug Schmidt <schmidt@cs.wustl.edu>
//
// ============================================================================
@@ -47,8 +47,11 @@ public:
// Close down the Channel.
// = Proxy management methods.
- int initiate_connection_connection (Connection_Handler *);
+ int initiate_connection_connection (Connection_Handler *, int sync_directly = 0);
// Initiate the connection of the <Connection_Handler> to its peer.
+ // Second paratemer is used for thread connection-handler which will
+ // block the connecting procedure directly, need not care
+ // Options::blocking_semantics().
int complete_connection_connection (Connection_Handler *);
// Complete the initialization of the <Connection_Handler> once it's
@@ -56,6 +59,8 @@ public:
int reinitiate_connection_connection (Connection_Handler *);
// Reinitiate a connection asynchronously when the Peer fails.
+ int cancel_connection_connection (Connection_Handler *);
+ // Cancel a asynchronous connection.
int bind_proxy (Connection_Handler *);
// Bind the <Connection_Handler> to the <connection_map_>.
diff --git a/apps/Gateway/Gateway/Options.cpp b/apps/Gateway/Gateway/Options.cpp
index 0dde3ddd83b..8e8e6b22eac 100644
--- a/apps/Gateway/Gateway/Options.cpp
+++ b/apps/Gateway/Gateway/Options.cpp
@@ -11,6 +11,29 @@ ACE_RCSID(Gateway, Options, "$Id$")
// Static initialization.
Options *Options::instance_ = 0;
+// Let's have a usage prompt.
+void
+Options::print_usage (void)
+{
+ ACE_DEBUG ((LM_INFO,
+ "gatewayd [-a {C|S}:acceptor-port] [-c {C|S}:connector-port]"
+ " [-C consumer_config_file] [-P connection_config_filename]"
+ " [-q socket_queue_size] [-t OUTPUT_MT|INPUT_MT] [-w time_out]"
+ " [-b] [-d] [-v] [-T]\n"
+ ""
+ "\t-a Become an Acceptor\n"
+ "\t-b Use blocking connection establishment\n"
+ "\t-c Become a Connector\n"
+ "\t-d debugging\n"
+ "\t-q Use a different socket queue size\n"
+ "\t-t Use a different threading strategy\n"
+ "\t-v Verbose mode\n"
+ "\t-w Time performance for a designated amount of time\n"
+ "\t-C Use a different proxy config filename\n"
+ "\t-P Use a different consumer config filename\n"
+ "\t-T Tracing\n"
+ ));
+}
Options *
Options::instance (void)
{
@@ -220,7 +243,7 @@ Options::parse_args (int argc, char *argv[])
ACE_SET_BITS (this->options_,
Options::DEBUG);
break;
- case 'P': // Use a different consumer config filename.
+ case 'P': // Use a different connection config filename.
ACE_OS::strncpy (this->connection_config_file_,
get_opt.optarg,
sizeof this->connection_config_file_);
@@ -252,6 +275,7 @@ Options::parse_args (int argc, char *argv[])
this->blocking_semantics_ = 0;
break;
default:
+ this->print_usage(); // It's nice to have a usage prompt.
break;
}
}
diff --git a/apps/Gateway/Gateway/Options.h b/apps/Gateway/Gateway/Options.h
index b3da9174a9b..5c56816f39a 100644
--- a/apps/Gateway/Gateway/Options.h
+++ b/apps/Gateway/Gateway/Options.h
@@ -10,7 +10,7 @@
// Options.h
//
// = AUTHOR
-// Douglas C. Schmidt
+// Douglas C. Schmidt <schmidt@cs.wustl.edu>
//
// ============================================================================
@@ -54,6 +54,8 @@ public:
int parse_args (int argc, char *argv[]);
// Parse the arguments and set the options.
+ void print_usage(void);
+ // Print the gateway supported parameters.
// = Accessor methods.
int enabled (int option) const;
// Determine if an option is enabled.
diff --git a/apps/Gateway/Peer/Peer.cpp b/apps/Gateway/Peer/Peer.cpp
index e6cb4f97956..e8fd5adca98 100644
--- a/apps/Gateway/Peer/Peer.cpp
+++ b/apps/Gateway/Peer/Peer.cpp
@@ -7,13 +7,14 @@
ACE_RCSID(Peer, Peer, "$Id$")
Peer_Handler::Peer_Handler (void)
- : connection_id_ (0),
+ : connection_id_ (-1), // Maybe it's better than 0.
msg_frag_ (0),
total_bytes_ (0)
{
// Set the high water mark of the <ACE_Message_Queue>. This is used
// to exert flow control.
this->msg_queue ()->high_water_mark (Options::instance ()->max_queue_size ());
+ first_time_ = 1; // It will be first time to open Peer_Handler.
}
// Upcall from the <ACE_Acceptor::handle_input> that turns control
@@ -93,7 +94,7 @@ Peer_Handler::transmit (ACE_Message_Block *mb,
else
ACE_ERROR ((LM_ERROR,
"%p\n",
- "transmission failure in transmit_stdin"));
+ "transmission failure in transmit()")); // Function name fixed.
// Caller is responsible for freeing a ACE_Message_Block
// if failures occur.
mb->release ();
@@ -107,6 +108,8 @@ Peer_Handler::transmit (ACE_Message_Block *mb,
int
Peer_Handler::transmit_stdin (void)
{
+ // If return value is -1, then first_time_ must be reset to 1.
+ int result = 0;
if (this->connection_id_ != -1)
{
ACE_Message_Block *mb;
@@ -133,6 +136,7 @@ Peer_Handler::transmit_stdin (void)
(ACE_STDIN,
ACE_Event_Handler::DONT_CALL | ACE_Event_Handler::READ_MASK);
mb->release ();
+ result = 0; //
break;
/* NOTREACHED */
case -1:
@@ -140,18 +144,30 @@ Peer_Handler::transmit_stdin (void)
ACE_ERROR ((LM_ERROR,
"%p\n",
"read"));
+ result = 0; //
break;
/* NOTREACHED */
default:
- return this->transmit (mb, n, ROUTING_EVENT);
+ // Do not return directly, save the return value.
+ result = this->transmit (mb, n, ROUTING_EVENT);
+ break;
/* NOTREACHED */
}
- return 0;
- }
+ // Do not return at here, but at exit of function.
+ /*return 0;*/
+ }
+ else
+ {
ACE_DEBUG ((LM_DEBUG,
"Must transmit over an opened channel.\n"));
- return -1;
+ result = -1; // Save return value at here, return at exit of function.
+ }
+ // If transmit error, the stdin-thread will be cancelled, so should
+ // reset first_time_ to 1, which will register_stdin_handler again.
+ if (result == -1)
+ first_time_ = 1;
+ return ret;
}
// Perform a non-blocking <put> of event MB. If we are unable to send
@@ -387,7 +403,8 @@ Peer_Handler::recv (ACE_Message_Block *&mb)
ssize_t data_bytes_left_to_read =
ssize_t (event->header_.len_ - (msg_frag_->wr_ptr () - event->data_));
- ssize_t data_received =
+ // peer().recv() should not be called when data_bytes_left_to_read is 0.
+ ssize_t data_received = !data_bytes_left_to_read ? 0 :
this->peer ().recv (this->msg_frag_->wr_ptr (),
data_bytes_left_to_read);
@@ -403,8 +420,12 @@ Peer_Handler::recv (ACE_Message_Block *&mb)
/* FALLTHROUGH */;
case 0: // Premature EOF.
+ if (data_bytes_left_to_read)
+ {
this->msg_frag_ = this->msg_frag_->release ();
return 0;
+ }
+ /* FALLTHROUGH */;
default:
// Set the write pointer at 1 past the end of the event.
@@ -492,21 +513,32 @@ Peer_Handler::await_connection_id (void)
if (Options::instance ()->enabled (Options::CONSUMER_CONNECTOR))
this->subscribe ();
+ // No need to disconnect by timeout.
+ ACE_Reactor::instance ()->cancel_timer(this);
// Transition to the action that waits for Peer events.
this->do_action_ = &Peer_Handler::await_events;
// Reset standard input.
ACE_OS::rewind (stdin);
- // Register this handler to receive test events on stdin.
- if (ACE_Event_Handler::register_stdin_handler
- (this,
- ACE_Reactor::instance (),
- ACE_Thread_Manager::instance ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "(%t) %p\n",
- "register_stdin_handler"),
- -1);
+ // Call register_stdin_handler only once, until the stdin-thread
+ // closed which caused by transmit_stdin error.
+ if (first_time_)
+ {
+ // Register this handler to receive test events on stdin.
+ if (ACE_Event_Handler::register_stdin_handler
+ (this,
+ ACE_Reactor::instance (),
+ ACE_Thread_Manager::instance ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) %p\n",
+ "register_stdin_handler"),
+ -1);
+
+ // Next time in await_connection_id(), I'll don't call
+ // register_stdin_handler().
+ first_time_ = 0;
+ }
return 0;
}
@@ -519,8 +551,11 @@ Peer_Handler::subscribe (void)
ACE_Message_Block (sizeof (Event)),
-1);
- Subscription *subscription = (Subscription *) ((Event *) mb->rd_ptr ())->data_;
- subscription->connection_id_ = Options::instance ()->connection_id ();
+ Subscription *subscription =
+ (Subscription *) ((Event *) mb->rd_ptr ())->data_;
+ subscription->connection_id_ =
+ Options::instance ()->connection_id ();
+
return this->transmit (mb, sizeof *subscription, SUBSCRIPTION_EVENT);
}
@@ -738,8 +773,11 @@ int
Peer_Factory::handle_signal (int signum, siginfo_t *, ucontext_t *)
{
if (signum != SIGPIPE)
+ {
// Shut down the main event loop.
+ ACE_DEBUG((LM_NOTICE, "Exit case signal\n")); // Why do I exit?
ACE_Reactor::end_event_loop();
+ }
return 0;
}
@@ -817,14 +855,14 @@ Peer_Factory::init (int argc, char *argv[])
-1);
if (Options::instance ()->enabled (Options::SUPPLIER_ACCEPTOR)
- && this->consumer_acceptor_.open
+ && this->supplier_acceptor_.open
(Options::instance ()->supplier_acceptor_port ()) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
"Acceptor::open"),
-1);
else if (Options::instance ()->enabled (Options::CONSUMER_ACCEPTOR)
- && this->supplier_acceptor_.open
+ && this->consumer_acceptor_.open
(Options::instance ()->consumer_acceptor_port ()) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
diff --git a/apps/Gateway/Peer/Peer.h b/apps/Gateway/Peer/Peer.h
index 8a48de1c9a7..78cac65fa4a 100644
--- a/apps/Gateway/Peer/Peer.h
+++ b/apps/Gateway/Peer/Peer.h
@@ -140,6 +140,10 @@ protected:
size_t total_bytes_;
// The total number of bytes sent/received to the gatewayd thus far.
+
+ int first_time_;
+ // Used to call register_stdin_handle only once. Otherwise, thread
+ // leak will occur on Win32.
};
class ACE_Svc_Export Peer_Acceptor : public ACE_Acceptor<Peer_Handler, ACE_SOCK_ACCEPTOR>