diff options
Diffstat (limited to 'ACE/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp')
-rw-r--r-- | ACE/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp | 802 |
1 files changed, 802 insertions, 0 deletions
diff --git a/ACE/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp b/ACE/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp new file mode 100644 index 00000000000..e6ac36ba1fe --- /dev/null +++ b/ACE/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp @@ -0,0 +1,802 @@ +// $Id$ + +#define ACE_BUILD_SVC_DLL + +#include "ace/OS_NS_unistd.h" +#include "Event_Channel.h" +#include "Concrete_Connection_Handlers.h" + +ACE_RCSID(Gateway, Concrete_Connection_Handlers, "$Id$") + +Consumer_Handler::Consumer_Handler (const Connection_Config_Info &pci) + : Connection_Handler (pci) +{ + this->connection_role_ = 'C'; + this->msg_queue ()->high_water_mark (Options::instance ()->max_queue_size ()); +} + +// This method should be called only when the Consumer shuts down +// unexpectedly. This method simply marks the Connection_Handler as +// having failed so that handle_close () can reconnect. + +// Do not close handler when received data successfully. +// Consumer_Handler should could process received data. +// For example, Consumer could send reply-event to Supplier. +int +Consumer_Handler::handle_input (ACE_HANDLE) +{ + // Do not set FAILED state at here, just at real failed place. + + char buf[BUFSIZ]; + ssize_t received = this->peer ().recv (buf, sizeof buf); + + switch (received) + { + case -1: + this->state (Connection_Handler::FAILED); + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) Peer has failed unexpectedly for Consumer_Handler %d\n", + this->connection_id ()), + -1); + /* NOTREACHED */ + case 0: + this->state (Connection_Handler::FAILED); + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) Peer has shutdown unexpectedly for Consumer_Handler %d\n", + this->connection_id ()), + -1); + /* NOTREACHED */ + default: + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) IGNORED: Consumer is erroneously sending input to Consumer_Handler %d\n" + "data size = %d\n", + this->connection_id (), + received), + 0); // Return 0 to identify received data successfully. + /* NOTREACHED */ + } +} + +// Perform a non-blocking put() of event. If we are unable to send +// the entire event the remainder is re-queued at the *front* of the +// Event_List. + +int +Consumer_Handler::nonblk_put (ACE_Message_Block *event) +{ + // Try to send the event. If we don't send it all (e.g., due to + // flow control), then re-queue the remainder at the head of the + // Event_List and ask the ACE_Reactor to inform us (via + // handle_output()) when it is possible to try again. + + ssize_t n = this->send (event); + + if (n == -1) + { + // -1 is returned only when things have really gone wrong (i.e., + // not when flow control occurs). Thus, let's try to close down + // and set up a new reconnection by calling handle_close(). + this->state (Connection_Handler::FAILED); + this->handle_close (); + return -1; + } + else if (errno == EWOULDBLOCK) + { + // We didn't manage to send everything, so we need to queue + // things up. + + ACE_DEBUG ((LM_DEBUG, + "(%t) queueing activated on handle %d to routing id %d\n", + this->get_handle (), + this->connection_id ())); + + // ACE_Queue in *front* of the list to preserve order. + if (this->msg_queue ()->enqueue_head + (event, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + "enqueue_head"), + -1); + + // Tell ACE_Reactor to call us back when we can send again. + else if (ACE_Reactor::instance ()->schedule_wakeup + (this, ACE_Event_Handler::WRITE_MASK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + "schedule_wakeup"), + -1); + return 0; + } + else + return n; +} + +ssize_t +Consumer_Handler::send (ACE_Message_Block *event) +{ + ACE_DEBUG ((LM_DEBUG, + "(%t) sending %d bytes to Consumer %d\n", + event->length (), + this->connection_id ())); + + ssize_t len = event->length (); + ssize_t n = this->peer ().send (event->rd_ptr (), len); + + if (n <= 0) + return errno == EWOULDBLOCK ? 0 : n; + else if (n < len) + { + // Re-adjust pointer to skip over the part we did send. + event->rd_ptr (n); + errno = EWOULDBLOCK; + } + else // if (n == length) + { + // The whole event is sent, we now decrement the reference count + // (which deletes itself with it reaches 0). + event->release (); + errno = 0; + } + this->total_bytes (n); + return n; +} + +// Finish sending an event when flow control conditions abate. +// This method is automatically called by the ACE_Reactor. + +int +Consumer_Handler::handle_output (ACE_HANDLE) +{ + ACE_Message_Block *event = 0; + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("(%t) Receiver signalled 'resume transmission' %d\n"), + this->get_handle ())); + + // WIN32 Notes: When the receiver blocked, we started adding to the + // consumer handler's message Q. At this time, we registered a + // callback with the reactor to tell us when the TCP layer signalled + // that we could continue to send messages to the consumer. However, + // Winsock only sends this notification ONCE, so we have to assume + // at the application level, that we can continue to send until we + // get any subsequent blocking signals from the receiver's buffer. + +#if defined (ACE_WIN32) + // Win32 Winsock doesn't trigger multiple "You can write now" + // signals, so we have to assume that we can continue to write until + // we get another EWOULDBLOCK. + + // We cancel the wakeup callback we set earlier. + if (ACE_Reactor::instance ()->cancel_wakeup + (this, ACE_Event_Handler::WRITE_MASK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("Error in ACE_Reactor::cancel_wakeup()")), + -1); + + // The list had better not be empty, otherwise there's a bug! + while (this->msg_queue ()->dequeue_head + (event, (ACE_Time_Value *) &ACE_Time_Value::zero) != -1) + { + switch (this->nonblk_put (event)) + { + case -1: // Error sending message to consumer. + { + // We are responsible for releasing an ACE_Message_Block if + // failures occur. + event->release (); + + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("transmission failure"))); + break; + } + case 0: // Partial Send - we got flow controlled by the receiver + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%D Partial Send due to flow control") + ACE_TEXT ("- scheduling new wakeup with reactor\n"))); + + // Re-schedule a wakeup call from the reactor when the + // flow control conditions abate. + if (ACE_Reactor::instance ()->schedule_wakeup + (this, + ACE_Event_Handler::WRITE_MASK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("Error in ACE_Reactor::schedule_wakeup()")), + -1); + + // Didn't write everything this time, come back later... + return 0; + } + default: // Sent the whole thing + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Sent message from message Q, Q size = %d\n"), + this->msg_queue()->message_count ())); + break; + } + } + } + + // If we drop out of the while loop, then the message Q should be + // empty...or there's a problem in the dequeue_head() call...but + // thats another story. + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%D Sent all messages from consumers message Q\n"))); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) queueing deactivated on handle %d to routing id %d\n"), + this->get_handle (), + this->connection_id ())); +#else /* !defined (ACE_WIN32) */ + // The list had better not be empty, otherwise there's a bug! + if (this->msg_queue ()->dequeue_head + (event, (ACE_Time_Value *) &ACE_Time_Value::zero) != -1) + { + switch (this->nonblk_put (event)) + { + case 0: // Partial send. + ACE_ASSERT (errno == EWOULDBLOCK); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%D Partial Send\n"))); + + // Didn't write everything this time, come back later... + break; + + case -1: + // We are responsible for releasing an ACE_Message_Block if + // failures occur. + event->release (); + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("transmission failure"))); + + /* FALLTHROUGH */ + default: // Sent the whole thing. + + // If we succeed in writing the entire event (or we did not + // fail due to EWOULDBLOCK) then check if there are more + // events on the Message_Queue. If there aren't, tell the + // ACE_Reactor not to notify us anymore (at least until + // there are new events queued up). + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("QQQ::Sent Message from consumer's Q\n"))); + + if (this->msg_queue ()->is_empty ()) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) queueing deactivated on handle %d to routing id %d\n"), + this->get_handle (), + this->connection_id ())); + + if (ACE_Reactor::instance ()->cancel_wakeup + (this, ACE_Event_Handler::WRITE_MASK) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("cancel_wakeup"))); + } + } + } + else + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("dequeue_head - handle_output called by reactor but nothing in Q"))); +#endif /* ACE_WIN32 */ + return 0; +} + +// Send an event to a Consumer (may queue if necessary). + +int +Consumer_Handler::put (ACE_Message_Block *event, + ACE_Time_Value *) +{ + if (this->msg_queue ()->is_empty ()) + // Try to send the event *without* blocking! + return this->nonblk_put (event); + else + // If we have queued up events due to flow control then just + // enqueue and return. + return this->msg_queue ()->enqueue_tail + (event, (ACE_Time_Value *) &ACE_Time_Value::zero); +} + +Supplier_Handler::Supplier_Handler (const Connection_Config_Info &pci) + : Connection_Handler (pci), + msg_frag_ (0) +{ + this->connection_role_ = 'S'; + this->msg_queue ()->high_water_mark (0); +} + +// Receive an Event from a Supplier. Handles fragmentation. +// +// The event returned from recv consists of two parts: +// +// 1. The Address part, contains the "virtual" routing id. +// +// 2. The Data part, which contains the actual data to be forwarded. +// +// The reason for having two parts is to shield the higher layers +// of software from knowledge of the event structure. + +int +Supplier_Handler::recv (ACE_Message_Block *&forward_addr) +{ + if (this->msg_frag_ == 0) + // No existing fragment... + ACE_NEW_RETURN (this->msg_frag_, + ACE_Message_Block (sizeof (Event), + ACE_Message_Block::MB_DATA, + 0, + 0, + 0, + Options::instance ()->locking_strategy ()), + -1); + + Event *event = (Event *) this->msg_frag_->rd_ptr (); + ssize_t header_received = 0; + + const size_t HEADER_SIZE = sizeof (Event_Header); + ssize_t header_bytes_left_to_read = + HEADER_SIZE - this->msg_frag_->length (); + + if (header_bytes_left_to_read > 0) + { + header_received = this->peer ().recv + (this->msg_frag_->wr_ptr (), header_bytes_left_to_read); + + if (header_received == -1 /* error */ + || header_received == 0 /* EOF */) + { + ACE_ERROR ((LM_ERROR, "%p\n", + "Recv error during header read ")); + ACE_DEBUG ((LM_DEBUG, + "attempted to read %d\n", + header_bytes_left_to_read)); + this->msg_frag_ = this->msg_frag_->release (); + return header_received; + } + + // Bump the write pointer by the amount read. + this->msg_frag_->wr_ptr (header_received); + + // At this point we may or may not have the ENTIRE header. + if (this->msg_frag_->length () < HEADER_SIZE) + { + ACE_DEBUG ((LM_DEBUG, + "Partial header received: only %d bytes\n", + this->msg_frag_->length ())); + // Notify the caller that we didn't get an entire event. + errno = EWOULDBLOCK; + return -1; + } + + // Convert the header into host byte order so that we can access + // it directly without having to repeatedly muck with it... + event->header_.decode (); + + if (event->header_.len_ > ACE_INT32 (sizeof event->data_)) + { + // This data_ payload is too big! + errno = EINVAL; + ACE_DEBUG ((LM_DEBUG, + "Data payload is too big (%d bytes)\n", + event->header_.len_)); + return -1; + } + + } + + // At this point there is a complete, valid header in Event. Now we + // need to get the event payload. Due to incomplete reads this may + // not be the first time we've read in a fragment for this message. + // We account for this here. Note that the first time in here + // msg_frag_->wr_ptr() will point to event->data_. Every time we do + // a successful fragment read, we advance wr_ptr(). Therefore, by + // subtracting how much we've already read from the + // event->header_.len_ we complete the data_bytes_left_to_read... + + ssize_t data_bytes_left_to_read = + ssize_t (event->header_.len_ - (msg_frag_->wr_ptr () - event->data_)); + + ssize_t data_received = + !data_bytes_left_to_read + ? 0 // peer().recv() should not be called when data_bytes_left_to_read is 0. + : this->peer ().recv (this->msg_frag_->wr_ptr (), data_bytes_left_to_read); + + // Try to receive the remainder of the event. + + switch (data_received) + { + case -1: + if (errno == EWOULDBLOCK) + // This might happen if only the header came through. + return -1; + else + /* FALLTHROUGH */; + + case 0: // Premature EOF. + if (data_bytes_left_to_read) + { + this->msg_frag_ = this->msg_frag_->release (); + return 0; + } + /* FALLTHROUGH */; + + default: + // Set the write pointer at 1 past the end of the event. + this->msg_frag_->wr_ptr (data_received); + + if (data_received != data_bytes_left_to_read) + { + errno = EWOULDBLOCK; + // Inform caller that we didn't get the whole event. + return -1; + } + else + { + // Set the read pointer to the beginning of the event. + this->msg_frag_->rd_ptr (this->msg_frag_->base ()); + + // Allocate an event forwarding header and chain the data + // portion onto its continuation field. + forward_addr = new ACE_Message_Block (sizeof (Event_Key), + ACE_Message_Block::MB_PROTO, + this->msg_frag_, + 0, + 0, + Options::instance ()->locking_strategy ()); + if (forward_addr == 0) + { + this->msg_frag_ = this->msg_frag_->release (); + errno = ENOMEM; + return -1; + } + + Event_Key event_addr (this->connection_id (), + event->header_.type_); + // Copy the forwarding address from the Event_Key into + // forward_addr. + forward_addr->copy ((char *) &event_addr, sizeof (Event_Key)); + + // Reset the pointer to indicate we've got an entire event. + this->msg_frag_ = 0; + } + + this->total_bytes (data_received + header_received); + ACE_DEBUG ((LM_DEBUG, + "(%t) connection id = %d, cur len = %d, total bytes read = %d\n", + event->header_.connection_id_, + event->header_.len_, + data_received + header_received)); + if (Options::instance ()->enabled (Options::VERBOSE)) + ACE_DEBUG ((LM_DEBUG, + "data_ = %*s\n", + event->header_.len_ - 2, + event->data_)); + + // Encode before returning so that we can set things out in + // network byte order. + event->header_.encode (); + return data_received + header_received; + } +} + +// Receive various types of input (e.g., Peer event from the gatewayd, +// as well as stdio). + +int +Supplier_Handler::handle_input (ACE_HANDLE) +{ + ACE_Message_Block *event_key = 0; + + switch (this->recv (event_key)) + { + case 0: + // Note that a peer shouldn't initiate a shutdown by closing the + // connection. Therefore, the peer must have crashed, so we'll + // need to bail out here and let the higher layers reconnect. + this->state (Connection_Handler::FAILED); + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) Peer has closed down unexpectedly for Input Connection_Handler %d\n", + this->connection_id ()), + -1); + /* NOTREACHED */ + case -1: + if (errno == EWOULDBLOCK) + // A short-read, we'll come back and finish it up later on! + return 0; + else // A weird problem occurred, shut down and start again. + { + this->state (Connection_Handler::FAILED); + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p for Input Connection_Handler %d\n", + "Peer has failed unexpectedly", + this->connection_id ()), + -1); + } + /* NOTREACHED */ + default: + // Route messages to Consumers. + return this->process (event_key); + } +} + +// This delegates to the <Event_Channel> to do the actual processing. +// Typically, this forwards the event to its appropriate Consumer(s). + +int +Supplier_Handler::process (ACE_Message_Block *event_key) +{ + return this->event_channel_->put (event_key); +} + +Thr_Consumer_Handler::Thr_Consumer_Handler (const Connection_Config_Info &pci) + : Consumer_Handler (pci) +{ + // It is not in thread svc() now. + in_thread_ = 0; +} + +// Overriding handle_close() method. If in thread svc(), no need to +// process handle_close() when call peer().close(), because the +// connection is blocked now. + +int +Thr_Consumer_Handler::handle_close (ACE_HANDLE h, ACE_Reactor_Mask m) +{ + if (in_thread_) + return 0; + else + return Consumer_Handler::handle_close (h, m); +} + +// This method should be called only when the Consumer shuts down +// unexpectedly. This method marks the Connection_Handler as having +// failed and deactivates the ACE_Message_Queue (to wake up the thread +// blocked on <dequeue_head> in svc()). +// Thr_Consumer_Handler::handle_close () will eventually try to +// reconnect... + +// Let Consumer_Handler receive normal data. +int +Thr_Consumer_Handler::handle_input (ACE_HANDLE h) +{ + // Call down to the <Consumer_Handler> to handle this first. + if (this->Consumer_Handler::handle_input (h) != 0) + { + // Only do such work when failed. + + ACE_Reactor::instance ()->remove_handler + (h, ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL); + + // Deactivate the queue while we try to get reconnected. + this->msg_queue ()->deactivate (); + // Will call handle_close. + return -1; + } + return 0; +} + +// Initialize the threaded Consumer_Handler object and spawn a new +// thread. + +int +Thr_Consumer_Handler::open (void *) +{ + // Turn off non-blocking I/O. + if (this->peer ().disable (ACE_NONBLOCK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + "disable"), + -1); // Incorrect info fixed. + + // Call back to the <Event_Channel> to complete our initialization. + else if (this->event_channel_->complete_connection_connection (this) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + "complete_connection_connection"), + -1); + + // Register ourselves to receive input events (which indicate that + // the Consumer has shut down unexpectedly). + else if (ACE_Reactor::instance ()->register_handler + (this, ACE_Event_Handler::READ_MASK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + "register_handler"), + -1); + + // Reactivate message queue. If it was active then this is the + // first time in and we need to spawn a thread, otherwise the queue + // was inactive due to some problem and we've already got a thread. + else if (this->msg_queue ()->activate () == ACE_Message_Queue<ACE_SYNCH>::WAS_ACTIVE) + { + ACE_DEBUG ((LM_DEBUG, + "(%t) spawning new thread\n")); + // Become an active object by spawning a new thread to transmit + // events to Consumers. + return this->activate (THR_NEW_LWP | THR_DETACHED); + } + else + { + ACE_DEBUG ((LM_DEBUG, + "(%t) reusing existing thread\n")); + return 0; + } +} + +// Queue up an event for transmission (must not block since +// Supplier_Handlers may be single-threaded). + +int +Thr_Consumer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *) +{ + // Perform non-blocking enqueue, i.e., if <msg_queue> is full + // *don't* block! + return this->msg_queue ()->enqueue_tail + (mb, (ACE_Time_Value *) &ACE_Time_Value::zero); +} + +// Transmit events to the peer. Note the simplification resulting +// from the use of threads, compared with the Reactive solution. + +int +Thr_Consumer_Handler::svc (void) +{ + for (in_thread_ = 1;;) + { + ACE_DEBUG ((LM_DEBUG, + "(%t) Thr_Consumer_Handler's handle = %d\n", + this->peer ().get_handle ())); + + // Since this method runs in its own thread it is OK to block on + // output. + + for (ACE_Message_Block *mb = 0; + this->msg_queue ()->dequeue_head (mb) != -1; + ) + if (this->send (mb) == -1) + ACE_ERROR ((LM_ERROR, + "(%t) %p\n", + "send failed")); + + ACE_ASSERT (errno == ESHUTDOWN); + + ACE_DEBUG ((LM_DEBUG, + "(%t) shutting down threaded Consumer_Handler %d on handle %d\n", + this->connection_id (), + this->get_handle ())); + + this->peer ().close (); + + // Re-establish the connection, using exponential backoff. + for (this->timeout (1); + // Default is to reconnect synchronously. + this->event_channel_->initiate_connection_connection (this, 1) == -1; + // Second parameter '1' means using sync mode directly, + // don't care Options::blocking_semantics(). If don't do + // so, async mode will be used to connect which won't + // satisfy original design. + ) + { + ACE_Time_Value tv (this->timeout ()); + + ACE_ERROR ((LM_ERROR, + "(%t) reattempting connection, sec = %d\n", + tv.sec ())); + + ACE_OS::sleep (tv); + } + } + + ACE_NOTREACHED (return 0;) +} + +Thr_Supplier_Handler::Thr_Supplier_Handler (const Connection_Config_Info &pci) + : Supplier_Handler (pci) +{ + // It is not in thread svc() now. + in_thread_ = 0; +} + +// Overriding handle_close() method. If in thread svc(), no need to +// process handle_close() when call peer().close(), because the +// connection is blocked now. + +int +Thr_Supplier_Handler::handle_close (ACE_HANDLE h, ACE_Reactor_Mask m) +{ + if (in_thread_) + return 0; + else + return Supplier_Handler::handle_close (h, m); +} + +int +Thr_Supplier_Handler::open (void *) +{ + // Turn off non-blocking I/O. + if (this->peer ().disable (ACE_NONBLOCK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + "disable"), + -1); // Incorrect info fixed. + + // Call back to the <Event_Channel> to complete our initialization. + else if (this->event_channel_->complete_connection_connection (this) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + "complete_connection_connection"), + -1); + + // Reactivate message queue. If it was active then this is the + // first time in and we need to spawn a thread, otherwise the queue + // was inactive due to some problem and we've already got a thread. + else if (this->msg_queue ()->activate () == ACE_Message_Queue<ACE_SYNCH>::WAS_ACTIVE) + { + ACE_DEBUG ((LM_DEBUG, + "(%t) spawning new thread\n")); + // Become an active object by spawning a new thread to transmit + // events to peers. + return this->activate (THR_NEW_LWP | THR_DETACHED); + } + else + { + ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n")); + return 0; + } +} + +// Receive events from a Peer in a separate thread (note reuse of +// existing code!). + +int +Thr_Supplier_Handler::svc (void) +{ + for (in_thread_ = 1;;) + { + ACE_DEBUG ((LM_DEBUG, + "(%t) Thr_Supplier_Handler's handle = %d\n", + this->peer ().get_handle ())); + + // Since this method runs in its own thread and processes events + // for one connection it is OK to call down to the + // <Supplier_Handler::handle_input> method, which blocks on + // input. + + while (this->Supplier_Handler::handle_input () != -1) + continue; + + ACE_DEBUG ((LM_DEBUG, + "(%t) shutting down threaded Supplier_Handler %d on handle %d\n", + this->connection_id (), + this->get_handle ())); + + this->peer ().close (); + + // Deactivate the queue while we try to get reconnected. + this->msg_queue ()->deactivate (); + + // Re-establish the connection, using expoential backoff. + for (this->timeout (1); + // Default is to reconnect synchronously. + this->event_channel_->initiate_connection_connection (this, 1) == -1; + // Second parameter '1' means using sync mode directly, + // don't care Options::blocking_semantics(). If don't do + // so, async mode will be used to connect which won't + // satisfy original design. + ) + { + ACE_Time_Value tv (this->timeout ()); + ACE_ERROR ((LM_ERROR, + "(%t) reattempting connection, sec = %d\n", + tv.sec ())); + ACE_OS::sleep (tv); + } + } + ACE_NOTREACHED(return 0); +} |