summaryrefslogtreecommitdiff
path: root/apps/Gateway/Gateway/Thr_Proxy_Handler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'apps/Gateway/Gateway/Thr_Proxy_Handler.cpp')
-rw-r--r--apps/Gateway/Gateway/Thr_Proxy_Handler.cpp211
1 files changed, 0 insertions, 211 deletions
diff --git a/apps/Gateway/Gateway/Thr_Proxy_Handler.cpp b/apps/Gateway/Gateway/Thr_Proxy_Handler.cpp
deleted file mode 100644
index f316e4e82bf..00000000000
--- a/apps/Gateway/Gateway/Thr_Proxy_Handler.cpp
+++ /dev/null
@@ -1,211 +0,0 @@
-// $Id$
-
-#include "Event_Channel.h"
-#include "Thr_Proxy_Handler.h"
-
-#if defined (ACE_HAS_THREADS)
-Thr_Consumer_Proxy::Thr_Consumer_Proxy (ACE_Event_Channel &ec,
- const ACE_INET_Addr &remote_addr,
- const ACE_INET_Addr &local_addr,
- ACE_INT32 conn_id)
- : Consumer_Proxy (ec, remote_addr, local_addr, conn_id)
-{
-}
-
-// This method should be called only when the Consumer shuts down
-// unexpectedly. This method marks the Proxy_Handler as having failed
-// and deactivates the ACE_Message_Queue (to wake up the thread
-// blocked on <dequeue_head> in svc()).
-// Thr_Output_Handler::handle_close () will eventually try to
-// reconnect...
-
-int
-Thr_Consumer_Proxy::handle_input (ACE_HANDLE h)
-{
- // Call down to the <Consumer_Proxy> to handle this first.
- this->Consumer_Proxy::handle_input (h);
-
- ACE_Service_Config::reactor ()->remove_handler
- (h, ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL);
-
- // Deactivate the queue while we try to get reconnected.
- this->msg_queue ()->deactivate ();
- return 0;
-}
-
-// Initialize the threaded Consumer_Proxy object and spawn a new
-// thread.
-
-int
-Thr_Consumer_Proxy::open (void *)
-{
- // Turn off non-blocking I/O.
- if (this->peer ().disable (ACE_NONBLOCK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1);
-
- // Call back to the <Event_Channel> to complete our initialization.
- else if (this->event_channel_.complete_proxy_connection (this) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_proxy_connection"), -1);
-
- // Register ourselves to receive input events (which indicate that
- // the Consumer has shut down unexpectedly).
- else if (ACE_Service_Config::reactor ()->register_handler
- (this, ACE_Event_Handler::READ_MASK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1);
-
- // Reactivate message queue. If it was active then this is the
- // first time in and we need to spawn a thread, otherwise the queue
- // was inactive due to some problem and we've already got a thread.
- else if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE)
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n"));
- // Become an active object by spawning a new thread to transmit
- // events to Consumers.
- return this->activate (THR_NEW_LWP | THR_DETACHED);
- }
- else
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n"));
- return 0;
- }
-}
-
-// Queue up an event for transmission (must not block since
-// Supplier_Proxys may be single-threaded).
-
-int
-Thr_Consumer_Proxy::put (ACE_Message_Block *mb, ACE_Time_Value *)
-{
- // Perform non-blocking enqueue.
- return this->msg_queue ()->enqueue_tail
- (mb, (ACE_Time_Value *) &ACE_Time_Value::zero);
-}
-
-// Transmit events to the peer (note simplification resulting from
-// threads...)
-
-int
-Thr_Consumer_Proxy::svc (void)
-{
-
- for (;;)
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%t) connected! Thr_Consumer_Proxy's handle = %d\n",
- this->peer ().get_handle ()));
-
- // Since this method runs in its own thread it is OK to block on
- // output.
-
- for (ACE_Message_Block *mb = 0;
- this->msg_queue ()->dequeue_head (mb) != -1;
- )
- {
- if (this->send (mb) == -1)
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "send failed"));
- }
-
- ACE_ASSERT (errno == ESHUTDOWN);
-
- ACE_DEBUG ((LM_DEBUG,
- "(%t) shutting down threaded Consumer_Proxy %d on handle %d\n",
- this->id (), this->get_handle ()));
-
- this->peer ().close ();
-
- for (this->timeout (1);
- // Default is to reconnect synchronously.
- this->event_channel_.initiate_proxy_connection (this) == -1; )
- {
- ACE_Time_Value tv (this->timeout ());
-
- ACE_ERROR ((LM_ERROR,
- "(%t) reattempting connection, sec = %d\n",
- tv.sec ()));
-
- ACE_OS::sleep (tv);
- }
- }
-
- return 0;
-}
-
-Thr_Supplier_Proxy::Thr_Supplier_Proxy (ACE_Event_Channel &ec,
- const ACE_INET_Addr &remote_addr,
- const ACE_INET_Addr &local_addr,
- ACE_INT32 conn_id)
- : Supplier_Proxy (ec, remote_addr, local_addr, conn_id)
-{
-}
-
-int
-Thr_Supplier_Proxy::open (void *)
-{
- // Turn off non-blocking I/O.
- if (this->peer ().disable (ACE_NONBLOCK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1);
-
- // Call back to the <Event_Channel> to complete our initialization.
- else if (this->event_channel_.complete_proxy_connection (this) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_proxy_connection"), -1);
-
- // Reactivate message queue. If it was active then this is the
- // first time in and we need to spawn a thread, otherwise the queue
- // was inactive due to some problem and we've already got a thread.
- else if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE)
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n"));
- // Become an active object by spawning a new thread to transmit
- // events to peers.
- return this->activate (THR_NEW_LWP | THR_DETACHED);
- }
- else
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n"));
- return 0;
- }
-}
-
-// Receive events from a Peer in a separate thread (note reuse of
-// existing code!).
-
-int
-Thr_Supplier_Proxy::svc (void)
-{
- for (;;)
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%t) connected! Thr_Supplier_Proxy's handle = %d\n",
- this->peer ().get_handle ()));
-
- // Since this method runs in its own thread and processes events
- // for one connection it is OK to call down to the
- // <Supplier_Proxy::handle_input> method, which blocks on input.
-
- while (this->handle_input () != -1)
- continue;
-
- ACE_DEBUG ((LM_DEBUG,
- "(%t) shutting down threaded Supplier_Proxy %d on handle %d\n",
- this->id (), this->get_handle ()));
-
- this->peer ().close ();
-
- // Deactivate the queue while we try to get reconnected.
- this->msg_queue ()->deactivate ();
-
- for (this->timeout (1);
- // Default is to reconnect synchronously.
- this->event_channel_.initiate_proxy_connection (this) == -1; )
- {
- ACE_Time_Value tv (this->timeout ());
- ACE_ERROR ((LM_ERROR,
- "(%t) reattempting connection, sec = %d\n",
- tv.sec ()));
- ACE_OS::sleep (tv);
- }
- }
- return 0;
-}
-
-#endif /* ACE_HAS_THREADS */