diff options
Diffstat (limited to 'examples/ASX/UPIPE_Event_Server/Peer_Router.cpp')
-rw-r--r-- | examples/ASX/UPIPE_Event_Server/Peer_Router.cpp | 273 |
1 files changed, 0 insertions, 273 deletions
diff --git a/examples/ASX/UPIPE_Event_Server/Peer_Router.cpp b/examples/ASX/UPIPE_Event_Server/Peer_Router.cpp deleted file mode 100644 index 23d9f6c7a35..00000000000 --- a/examples/ASX/UPIPE_Event_Server/Peer_Router.cpp +++ /dev/null @@ -1,273 +0,0 @@ -#if !defined (_PEER_ROUTER_C) -// $Id$ - -#define _PEER_ROUTER_C - -#include "ace/Get_Opt.h" -#include "ace/Service_Config.h" - -#include "Peer_Router.h" -#include "Options.h" - -#if defined (ACE_HAS_THREADS) - -// Define some short-hand macros to deal with long templates -// names... - -#define PH PEER_HANDLER -#define PA PEER_ACCEPTOR -#define PAD PEER_ADDR -#define PK PEER_KEY -#define PM PEER_MAP - -template <class PH, class PK> int -Acceptor_Factory<PH, PK>::init (int argc, char *argv[]) -{ - ACE_Get_Opt get_opt (argc, argv, "df:", 0); - ACE_UPIPE_Addr addr; - - for (int c; (c = get_opt ()) != -1; ) - switch (c) - { - case 'f': - addr.set (get_opt.optarg); - break; - case 'd': - break; - default: - break; - } - - if (this->open (addr, ACE_Service_Config::reactor ()) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1); - return 0; -} - -template <class PH, class PK> -Acceptor_Factory<PH, PK>::Acceptor_Factory (Peer_Router<PH, PK> *pr) - : pr_ (pr) -{ -} - -template <class PH, class PK> Peer_Router<PH, PK> * -Acceptor_Factory<PH, PK>::router (void) -{ - return this->pr_; -} - -template <class ROUTER, class KEY> -Peer_Handler<ROUTER, KEY>::Peer_Handler (ACE_Thread_Manager *tm) - : ACE_Svc_Handler<ACE_UPIPE_Stream, ACE_UPIPE_Addr, ACE_MT_SYNCH> (tm) -{ -} - -template <class ROUTER, class KEY> int -Peer_Handler<ROUTER, KEY>::svc (void) -{ - ACE_Thread_Control thread_control (tm); - // just a try !! - // we're just reading from our ACE_Message_Queue - ACE_Message_Block *db, *hb; - int n; - // do an endless loop - for (;;) - { - db = new ACE_Message_Block (BUFSIZ); - hb = new ACE_Message_Block (sizeof (KEY), ACE_Message_Block::MB_PROTO, db); - - if ((n = this->peer ().recv (db->rd_ptr (), db->size ())) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p", "recv failed"), -1); - else if (n == 0) // Client has closed down the connection. - { - - if (this->router_task_->unbind_peer (this->get_handle ()) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p", "unbind failed"), -1); - ACE_DEBUG ((LM_DEBUG, "(%t) shutting down \n")); - return -1; // We do not need to be deregistered by reactor - // as we were not registered at all - } - else // Transform incoming buffer into a Message and pass downstream. - { - db->wr_ptr (n); - *(long *) hb->rd_ptr () = this->get_handle (); // structure assignment. - hb->wr_ptr (sizeof (long)); - if (this->router_task_->reply (hb) == -1) - { - cout << "Peer_Handler.svc : router_task->reply failed" << endl ; - return -1; - } - - // return this->router_task_->reply (hb) == -1 ? -1 : 0; - } - } - return 0; -} - -template <class ROUTER, class KEY> int -Peer_Handler<ROUTER, KEY>::put (ACE_Message_Block *mb, ACE_Time_Value *) -{ - return this->peer ().send_n (mb->rd_ptr (), mb->length ()); -} - -// Create a new handler and point its ROUTER_TASK_ data member to the -// corresponding router. Note that this router is extracted out of -// the Acceptor_Factory * that is passed in via the -// ACE_Acceptor::handle_input() method. - -template <class ROUTER, class KEY> int -Peer_Handler<ROUTER, KEY>::open (void *a) -{ - char buf[BUFSIZ], *p = buf; - - if (this->router_task_->info (&p, sizeof buf) != -1) - ACE_DEBUG ((LM_DEBUG, "(%t) creating handler for %s, fd = %d, this = %d\n", - buf, this->get_handle (), a)); - else - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "info"), -1); - - if ( this->activate (options.t_flags ()) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "activation of thread failed"), -1); - else if (this->router_task_->bind_peer (this->get_handle (), this) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "bind_peer"), -1); - return 0; -} - -// Receive a message from a supplier.. - -template <class ROUTER, class KEY> int -Peer_Handler<ROUTER, KEY>::handle_input (ACE_HANDLE h) -{ - - ACE_DEBUG ((LM_DEBUG, "(%t) input arrived on sd %d\n", h)); -// ACE_Service_Config::reactor ()->remove_handler(h, -// ACE_Event_Handler::RWE_MASK -// |ACE_Event_Handler::DONT_CALL); -// this method should be called only if the peer shuts down -// so we deactivate our ACE_Message_Queue to awake our svc thread - - return 0; - -#if 0 - ACE_Message_Block *db = new ACE_Message_Block (BUFSIZ); - ACE_Message_Block *hb = new ACE_Message_Block (sizeof (KEY), ACE_Message_Block::MB_PROTO, db); - int n; - - if ((n = this->peer ().recv (db->rd_ptr (), db->size ())) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p", "recv failed"), -1); - else if (n == 0) // Client has closed down the connection. - { - if (this->router_task_->unbind_peer (this->get_handle ()) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p", "unbind failed"), -1); - ACE_DEBUG ((LM_DEBUG, "(%t) shutting down %d\n", h)); - return -1; // Instruct the ACE_Reactor to deregister us by returning -1. - } - else // Transform incoming buffer into a Message and pass downstream. - { - db->wr_ptr (n); - *(long *) hb->rd_ptr () = this->get_handle (); // structure assignment. - hb->wr_ptr (sizeof (long)); - return this->router_task_->reply (hb) == -1 ? -1 : 0; - } -#endif -} - -template <class PH, class PK> -Peer_Router<PH, PK>::Peer_Router (ACE_Thread_Manager *tm) - : ACE_Task<ACE_MT_SYNCH> (tm) -{ -} - -template <class PH, class PK> int -Peer_Router<PH, PK>::send_peers (ACE_Message_Block *mb) -{ - ACE_Map_Iterator<PK, PH *, ACE_RW_Mutex> map_iter = this->peer_map_; - int bytes = 0; - int iterations = 0; - ACE_Message_Block *data_block = mb->cont (); - for (ACE_Map_Entry<PK, PH *> *ss = 0; - map_iter.next (ss) != 0; - map_iter.advance ()) - { - if (options.debug ()) - ACE_DEBUG ((LM_DEBUG, "(%t) sending to peer via sd %d\n", ss->ext_id_)); - - iterations++; - bytes += ss->int_id_->put (data_block); - } - - delete mb; - return bytes == 0 ? 0 : bytes / iterations; -} - -template <class PH, class PK> -Peer_Router<PH, PK>::~Peer_Router (void) -{ -} - -template <class PH, class PK> int -Peer_Router<PH, PK>::fini (void) -{ - delete this->acceptor_; - return 0; -} - -template <class PH, class PK> int -Peer_Router<PH, PK>::control (ACE_Message_Block *mb) -{ - ACE_IO_Cntl_Msg *ioc = (ACE_IO_Cntl_Msg *) mb->rd_ptr (); - ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds command; - - switch (command = ioc->cmd ()) - { - case ACE_IO_Cntl_Msg::SET_LWM: - case ACE_IO_Cntl_Msg::SET_HWM: - this->water_marks (command, *(size_t *) mb->cont ()->rd_ptr ()); - break; - default: - return -1; - } - return 0; -} - -template <class PH, class PK> int -Peer_Router<PH, PK>::unbind_peer (PK key) -{ - return this->peer_map_.unbind (key); -} - -template <class PH, class PK> int -Peer_Router<PH, PK>::bind_peer (PK key, Peer_Handler<Peer_Router<PH, PK>, PK> *ph) -{ - PH *peer_handler = (PH *) ph; - return this->peer_map_.bind (key, peer_handler); -} - -template <class PH, class PK> int -Peer_Router<PH, PK>::init (int argc, char *argv[]) -{ - this->acceptor_ = new Acceptor_Factory <PH, PK> (this); - - if (this->acceptor_->init (argc, argv) == -1 - || this->peer_map_.open () == -1) - return -1; - else - { - ACE_UPIPE_Addr addr; - ACE_UPIPE_Acceptor &pa = this->acceptor_->acceptor (); - - if (pa.get_local_addr (addr) != -1) - ACE_DEBUG ((LM_DEBUG, "(%t) initializing %s, file = %s, fd = %d, this = %u\n", - this->name (), addr.get_path_name (), pa.get_handle (), this)); - else - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "get_local_addr"), -1); - } - return 0; -} - -#undef PH -#undef PA -#undef PAD -#undef PK -#undef PM -#endif /* ACE_HAS_THREADS */ -#endif /* _PEER_ROUTER_C */ |