#if !defined (_PEER_ROUTER_C) // $Id$ #define _PEER_ROUTER_C #include "ace/Get_Opt.h" #include "ace/Service_Config.h" #include "Peer_Router.h" #include "Options.h" #if defined (ACE_HAS_THREADS) // Define some short-hand macros to deal with long templates // names... #define PH PEER_HANDLER #define PA PEER_ACCEPTOR #define PAD PEER_ADDR #define PK PEER_KEY #define PM PEER_MAP template int Acceptor_Factory::init (int argc, char *argv[]) { ACE_Get_Opt get_opt (argc, argv, "df:", 0); ACE_UPIPE_Addr addr; for (int c; (c = get_opt ()) != -1; ) switch (c) { case 'f': addr.set (get_opt.optarg); break; case 'd': break; default: break; } if (this->open (addr, ACE_Service_Config::reactor ()) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1); return 0; } template Acceptor_Factory::Acceptor_Factory (Peer_Router *pr) : pr_ (pr) { } template Peer_Router * Acceptor_Factory::router (void) { return this->pr_; } template Peer_Handler::Peer_Handler (ACE_Thread_Manager *tm) : ACE_Svc_Handler (tm) { } template int Peer_Handler::svc (void) { ACE_Thread_Control thread_control (tm); // just a try !! // we're just reading from our ACE_Message_Queue ACE_Message_Block *db, *hb; int n; // do an endless loop for (;;) { db = new ACE_Message_Block (BUFSIZ); hb = new ACE_Message_Block (sizeof (KEY), ACE_Message_Block::MB_PROTO, db); if ((n = this->peer ().recv (db->rd_ptr (), db->size ())) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p", "recv failed"), -1); else if (n == 0) // Client has closed down the connection. { if (this->router_task_->unbind_peer (this->get_handle ()) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p", "unbind failed"), -1); ACE_DEBUG ((LM_DEBUG, "(%t) shutting down \n")); return -1; // We do not need to be deregistered by reactor // as we were not registered at all } else // Transform incoming buffer into a Message and pass downstream. { db->wr_ptr (n); *(long *) hb->rd_ptr () = this->get_handle (); // structure assignment. hb->wr_ptr (sizeof (long)); if (this->router_task_->reply (hb) == -1) { cout << "Peer_Handler.svc : router_task->reply failed" << endl ; return -1; } // return this->router_task_->reply (hb) == -1 ? -1 : 0; } } return 0; } template int Peer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *) { return this->peer ().send_n (mb->rd_ptr (), mb->length ()); } // Create a new handler and point its ROUTER_TASK_ data member to the // corresponding router. Note that this router is extracted out of // the Acceptor_Factory * that is passed in via the // ACE_Acceptor::handle_input() method. template int Peer_Handler::open (void *a) { char buf[BUFSIZ], *p = buf; if (this->router_task_->info (&p, sizeof buf) != -1) ACE_DEBUG ((LM_DEBUG, "(%t) creating handler for %s, fd = %d, this = %d\n", buf, this->get_handle (), a)); else ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "info"), -1); if ( this->activate (options.t_flags ()) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "activation of thread failed"), -1); else if (this->router_task_->bind_peer (this->get_handle (), this) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "bind_peer"), -1); return 0; } // Receive a message from a supplier.. template int Peer_Handler::handle_input (ACE_HANDLE h) { ACE_DEBUG ((LM_DEBUG, "(%t) input arrived on sd %d\n", h)); // ACE_Service_Config::reactor ()->remove_handler(h, // ACE_Event_Handler::ALL_EVENTS_MASK // |ACE_Event_Handler::DONT_CALL); // this method should be called only if the peer shuts down // so we deactivate our ACE_Message_Queue to awake our svc thread return 0; #if 0 ACE_Message_Block *db = new ACE_Message_Block (BUFSIZ); ACE_Message_Block *hb = new ACE_Message_Block (sizeof (KEY), ACE_Message_Block::MB_PROTO, db); int n; if ((n = this->peer ().recv (db->rd_ptr (), db->size ())) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p", "recv failed"), -1); else if (n == 0) // Client has closed down the connection. { if (this->router_task_->unbind_peer (this->get_handle ()) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p", "unbind failed"), -1); ACE_DEBUG ((LM_DEBUG, "(%t) shutting down %d\n", h)); return -1; // Instruct the ACE_Reactor to deregister us by returning -1. } else // Transform incoming buffer into a Message and pass downstream. { db->wr_ptr (n); *(long *) hb->rd_ptr () = this->get_handle (); // structure assignment. hb->wr_ptr (sizeof (long)); return this->router_task_->reply (hb) == -1 ? -1 : 0; } #endif } template Peer_Router::Peer_Router (ACE_Thread_Manager *tm) : ACE_Task (tm) { } template int Peer_Router::send_peers (ACE_Message_Block *mb) { ACE_Map_Iterator map_iter = this->peer_map_; int bytes = 0; int iterations = 0; ACE_Message_Block *data_block = mb->cont (); for (ACE_Map_Entry *ss = 0; map_iter.next (ss) != 0; map_iter.advance ()) { if (options.debug ()) ACE_DEBUG ((LM_DEBUG, "(%t) sending to peer via sd %d\n", ss->ext_id_)); iterations++; bytes += ss->int_id_->put (data_block); } delete mb; return bytes == 0 ? 0 : bytes / iterations; } template Peer_Router::~Peer_Router (void) { } template int Peer_Router::fini (void) { delete this->acceptor_; return 0; } template int Peer_Router::control (ACE_Message_Block *mb) { ACE_IO_Cntl_Msg *ioc = (ACE_IO_Cntl_Msg *) mb->rd_ptr (); ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds command; switch (command = ioc->cmd ()) { case ACE_IO_Cntl_Msg::SET_LWM: case ACE_IO_Cntl_Msg::SET_HWM: this->water_marks (command, *(size_t *) mb->cont ()->rd_ptr ()); break; default: return -1; } return 0; } template int Peer_Router::unbind_peer (PK key) { return this->peer_map_.unbind (key); } template int Peer_Router::bind_peer (PK key, Peer_Handler, PK> *ph) { PH *peer_handler = (PH *) ph; return this->peer_map_.bind (key, peer_handler); } template int Peer_Router::init (int argc, char *argv[]) { this->acceptor_ = new Acceptor_Factory (this); if (this->acceptor_->init (argc, argv) == -1 || this->peer_map_.open () == -1) return -1; else { ACE_UPIPE_Addr addr; ACE_UPIPE_Acceptor &pa = this->acceptor_->acceptor (); if (pa.get_local_addr (addr) != -1) ACE_DEBUG ((LM_DEBUG, "(%t) initializing %s, file = %s, fd = %d, this = %u\n", this->name (), addr.get_path_name (), pa.get_handle (), this)); else ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "get_local_addr"), -1); } return 0; } #undef PH #undef PA #undef PAD #undef PK #undef PM #endif /* ACE_HAS_THREADS */ #endif /* _PEER_ROUTER_C */