summaryrefslogtreecommitdiff
path: root/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'examples/ASX/Event_Server/Event_Server/Peer_Router.cpp')
-rw-r--r--examples/ASX/Event_Server/Event_Server/Peer_Router.cpp451
1 files changed, 0 insertions, 451 deletions
diff --git a/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp b/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp
deleted file mode 100644
index b01611b14d6..00000000000
--- a/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp
+++ /dev/null
@@ -1,451 +0,0 @@
-// $Id$
-
-#if !defined (_PEER_ROUTER_C)
-#define _PEER_ROUTER_C
-
-#include "ace/Service_Config.h"
-#include "ace/Get_Opt.h"
-#include "Options.h"
-#include "Peer_Router.h"
-
-ACE_RCSID(Event_Server, Peer_Router, "$Id$")
-
-// Send the <ACE_Message_Block> to all the peers. Note that in a
-// "real" application this logic would most likely be more selective,
-// i.e., it would actually do "routing" based on addressing
-// information passed in the <ACE_Message_Block>.
-
-int
-Peer_Router_Context::send_peers (ACE_Message_Block *mb)
-{
- PEER_ITERATOR map_iter = this->peer_map_;
- int bytes = 0;
- int iterations = 0;
-
- // Skip past the header and get the message to send.
- ACE_Message_Block *data_block = mb->cont ();
-
- // Use an iterator to "multicast" the data to *all* the registered
- // peers. Note that this doesn't really multicast, it just makes a
- // "logical" copy of the <ACE_Message_Block> and enqueues it in the
- // appropriate <Peer_Handler> corresponding to each peer. Note that
- // a "real" application would probably "route" the data to a subset
- // of connected peers here, rather than send it to all the peers.
-
- for (PEER_ENTRY *ss = 0;
- map_iter.next (ss) != 0;
- map_iter.advance ())
- {
- if (Options::instance ()->debug ())
- ACE_DEBUG ((LM_DEBUG,
- "(%t) sending to peer via handle %d\n",
- ss->ext_id_));
-
- iterations++;
-
- // Increment reference count before sending since the
- // <Peer_Handler> might be running in its own thread of control.
- bytes += ss->int_id_->put (data_block->duplicate ());
- }
-
- mb->release ();
- return bytes == 0 ? 0 : bytes / iterations;
-}
-
-// Remove the <Peer_Handler> from the peer connection map.
-
-int
-Peer_Router_Context::unbind_peer (ROUTING_KEY key)
-{
- return this->peer_map_.unbind (key);
-}
-
-// Add the <Peer_Handler> to the peer connection map.
-
-int
-Peer_Router_Context::bind_peer (ROUTING_KEY key,
- Peer_Handler *peer_handler)
-{
- return this->peer_map_.bind (key, peer_handler);
-}
-
-void
-Peer_Router_Context::duplicate (void)
-{
- this->reference_count_++;
-}
-
-void
-Peer_Router_Context::release (void)
-{
- ACE_ASSERT (this->reference_count_ > 0);
- this->reference_count_--;
-
- if (this->reference_count_ == 0)
- delete this;
-}
-
-Peer_Router_Context::Peer_Router_Context (u_short port)
- : reference_count_ (0)
-{
- // Initialize the Acceptor's "listen-mode" socket.
- ACE_INET_Addr endpoint (port);
- if (this->open (endpoint) == -1)
- ACE_ERROR ((LM_ERROR,
- "%p\n",
- "Acceptor::open"));
-
- // Initialize the connection map.
- else if (this->peer_map_.open () == -1)
- ACE_ERROR ((LM_ERROR,
- "%p\n",
- "Map_Manager::open"));
- else
- {
- ACE_INET_Addr addr;
-
- if (this->acceptor ().get_local_addr (addr) != -1)
- ACE_DEBUG ((LM_DEBUG,
- "(%t) initializing %s on port = %d, handle = %d, this = %u\n",
- addr.get_port_number () == Options::instance ()->supplier_port ()
- ? "Supplier_Handler" : "Consumer_Handler",
- addr.get_port_number (),
- this->acceptor().get_handle (),
- this));
- else
- ACE_ERROR ((LM_ERROR,
- "%p\n",
- "get_local_addr"));
- }
-}
-
-Peer_Router_Context::~Peer_Router_Context (void)
-{
- // Free up the handle and close down the listening socket.
- ACE_DEBUG ((LM_DEBUG,
- "(%t) closing down Peer_Router_Context\n"));
-
- // Close down the Acceptor and take ourselves out of the Reactor.
- this->handle_close ();
-
- PEER_ITERATOR map_iter = this->peer_map_;
-
- // Make sure to take all the handles out of the map to avoid
- // "resource leaks."
-
- for (PEER_ENTRY *ss = 0;
- map_iter.next (ss) != 0;
- map_iter.advance ())
- {
- if (Options::instance ()->debug ())
- ACE_DEBUG ((LM_DEBUG,
- "(%t) closing down peer on handle %d\n",
- ss->ext_id_));
-
- if (ACE_Reactor::instance ()->remove_handler
- (ss->ext_id_,
- ACE_Event_Handler::READ_MASK) == -1)
- ACE_ERROR ((LM_ERROR,
- "(%t) p\n",
- "remove_handle"));
- }
-
- // Close down the map.
- this->peer_map_.close ();
-}
-
-Peer_Router *
-Peer_Router_Context::peer_router (void)
-{
- return this->peer_router_;
-}
-
-void
-Peer_Router_Context::peer_router (Peer_Router *pr)
-{
- this->peer_router_ = pr;
-}
-
-// Factory Method that creates a new <Peer_Handler> for each
-// connection.
-
-int
-Peer_Router_Context::make_svc_handler (Peer_Handler *&sh)
-{
- ACE_NEW_RETURN (sh,
- Peer_Handler (this),
- -1);
- return 0;
-}
-
-Peer_Handler::Peer_Handler (Peer_Router_Context *prc)
- : peer_router_context_ (prc)
-{
-}
-
-// Send output to a peer. Note that this implementation "blocks" if
-// flow control occurs. This is undesirable for "real" applications.
-// The best way around this is to make the <Peer_Handler> an Active
-// Object, e.g., as done in the $ACE_ROOT/apps/Gateway/Gateway
-// application.
-
-int
-Peer_Handler::put (ACE_Message_Block *mb,
- ACE_Time_Value *tv)
-{
-#if 0
- // If we're running as Active Objects just enqueue the message here.
- return this->putq (mb, tv);
-#else
- ACE_UNUSED_ARG (tv);
-
- int result = this->peer ().send_n (mb->rd_ptr (),
- mb->length ());
- // Release the memory.
- mb->release ();
-
- return result;
-#endif /* 0 */
-}
-
-// Initialize a newly connected handler.
-
-int
-Peer_Handler::open (void *)
-{
- char buf[BUFSIZ], *p = buf;
-
- if (this->peer_router_context_->peer_router ()->info (&p,
- sizeof buf) != -1)
- ACE_DEBUG ((LM_DEBUG,
- "(%t) creating handler for %s, handle = %d\n",
- buf,
- this->get_handle ()));
- else
- ACE_ERROR_RETURN ((LM_ERROR,
- "%p\n",
- "info"),
- -1);
-#if 0
- // If we're running as an Active Object activate the Peer_Handler
- // here.
- if (this->activate (Options::instance ()->t_flags ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "%p\n",
- "activation of thread failed"),
- -1);
- ACE_DEBUG ((LM_DEBUG,
- "(%t) Peer_Handler::open registering with Reactor for handle_input\n"));
-#else
-
- // Register with the Reactor to receive messages from our Peer.
- if (ACE_Reactor::instance ()->register_handler
- (this, ACE_Event_Handler::READ_MASK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "%p\n",
- "register_handler"),
- -1);
-#endif /* 0 */
-
- // Insert outselves into the routing map.
- else if (this->peer_router_context_->bind_peer (this->get_handle (),
- this) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "%p\n",
- "bind_peer"),
- -1);
- else
- return 0;
-}
-
-// Receive a message from a Peer.
-
-int
-Peer_Handler::handle_input (ACE_HANDLE h)
-{
- ACE_DEBUG ((LM_DEBUG,
- "(%t) input arrived on handle %d\n",
- h));
-
- ACE_Message_Block *db = new ACE_Message_Block (BUFSIZ);
- ACE_Message_Block *hb = new ACE_Message_Block (sizeof (ROUTING_KEY),
- ACE_Message_Block::MB_PROTO, db);
-
- // Check for memory failures.
- if (db == 0 || hb == 0)
- {
- hb->release ();
- db->release ();
- errno = ENOMEM;
- return -1;
- }
-
- ssize_t n = this->peer ().recv (db->rd_ptr (),
- db->size ());
-
- if (n == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "%p",
- "recv failed"),
- -1);
- else if (n == 0) // Client has closed down the connection.
- {
- if (this->peer_router_context_->unbind_peer (this->get_handle ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "%p",
- "unbind failed"),
- -1);
-
- ACE_DEBUG ((LM_DEBUG,
- "(%t) shutting down handle %d\n", h));
- // Instruct the <ACE_Reactor> to deregister us by returning -1.
- return -1;
- }
- else
- {
- // Transform incoming buffer into an <ACE_Message_Block>.
-
- // First, increment the write pointer to the end of the newly
- // read data block.
- db->wr_ptr (n);
-
- // Second, copy the "address" into the header block. Note that
- // for this implementation the HANDLE we receive the message on
- // is considered the "address." A "real" application would want
- // to do something more sophisticated.
- *(ACE_HANDLE *) hb->rd_ptr () = this->get_handle ();
-
- // Third, update the write pointer in the header block.
- hb->wr_ptr (sizeof (ACE_HANDLE));
-
- // Finally, pass the message through the stream. Note that we
- // use <Task::put> here because this gives the method at *our*
- // level in the stream a chance to do something with the message
- // before it is sent up the other side. For instance, if we
- // receive messages in the <Supplier_Router>, it will just call
- // <put_next> and send them up the stream to the
- // <Consumer_Router> (which broadcasts them to consumers).
- // However, if we receive messages in the <Consumer_Router>, it
- // could reply to the Consumer with an error since it's not
- // correct for Consumers to send messages (we don't do this in
- // the current implementation, but it could be done in a "real"
- // application).
-
- if (this->peer_router_context_->peer_router ()->put (hb) == -1)
- return -1;
- else
- return 0;
- }
-}
-
-Peer_Router::Peer_Router (Peer_Router_Context *prc)
- : peer_router_context_ (prc)
-{
-}
-
-Peer_Router_Context *
-Peer_Router::context (void) const
-{
- return this->peer_router_context_;
-}
-
-int
-Peer_Router::control (ACE_Message_Block *mb)
-{
- ACE_IO_Cntl_Msg *ioc = (ACE_IO_Cntl_Msg *) mb->rd_ptr ();
- ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds command;
-
- switch (command = ioc->cmd ())
- {
- case ACE_IO_Cntl_Msg::SET_LWM:
- case ACE_IO_Cntl_Msg::SET_HWM:
- this->water_marks (command, *(size_t *) mb->cont ()->rd_ptr ());
- break;
- default:
- return -1;
- }
- return 0;
-}
-
-#if 0
-
-// Right now, Peer_Handlers are purely Reactive, i.e., they all run in
-// a single thread of control. It would be easy to make them Active
-// Objects by calling activate() in Peer_Handler::open(), making
-// Peer_Handler::put() enqueue each message on the message queue, and
-// (3) then running the following svc() routine to route each message
-// to its final destination within a separate thread. Note that we'd
-// want to move the svc() call up to the Consumer_Router and
-// Supplier_Router level in order to get the right level of control
-// for input and output.
-
-Peer_Handler::svc (void)
-{
- ACE_Message_Block *db, *hb;
-
- // Do an endless loop
- for (;;)
- {
- db = new Message_Block (BUFSIZ);
- hb = new Message_Block (sizeof (ROUTING_KEY),
- Message_Block::MB_PROTO,
- db);
-
- ssize_t n = this->peer_.recv (db->rd_ptr (), db->size ());
-
- if (n == -1)
- LM_ERROR_RETURN ((LOG_ERROR,
- "%p",
- "recv failed"),
- -1);
- else if (n == 0) // Client has closed down the connection.
- {
- if (this->peer_router_context_->peer_router ()->unbind_peer (this->get_handle ()) == -1)
- LM_ERROR_RETURN ((LOG_ERROR,
- "%p",
- "unbind failed"),
- -1);
- LM_DEBUG ((LOG_DEBUG,
- "(%t) shutting down \n"));
-
- // We do not need to be deregistered by reactor
- // as we were not registered at all.
- return -1;
- }
- else
- {
- // Transform incoming buffer into a Message.
- db->wr_ptr (n);
- *(long *) hb->rd_ptr () = this->get_handle (); // Structure assignment.
- hb->wr_ptr (sizeof (long));
-
- // Pass the message to the stream.
- if (this->peer_router_context_->peer_router ()->reply (hb) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "(%t) %p\n",
- "Peer_Handler.svc : peer_router->reply failed"),
- -1);
- }
- }
- return 0;
-}
-#endif /* 0 */
-#endif /* _PEER_ROUTER_C */
-
-#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
-template class ACE_Acceptor<Peer_Handler, ACE_SOCK_ACCEPTOR>;
-template class ACE_Map_Entry<ROUTING_KEY, Peer_Handler *>;
-template class ACE_Map_Iterator_Base<ROUTING_KEY, Peer_Handler *, ACE_SYNCH_RW_MUTEX>;
-template class ACE_Map_Iterator<ROUTING_KEY, Peer_Handler *, ACE_SYNCH_RW_MUTEX>;
-template class ACE_Map_Reverse_Iterator<ROUTING_KEY, Peer_Handler *, ACE_SYNCH_RW_MUTEX>;
-template class ACE_Map_Manager<ROUTING_KEY, Peer_Handler *, ACE_SYNCH_RW_MUTEX>;
-template class ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_SYNCH>;
-#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
-#pragma instantiate ACE_Acceptor<Peer_Handler, ACE_SOCK_ACCEPTOR>
-#pragma instantiate ACE_Map_Entry<ROUTING_KEY, Peer_Handler *>
-#pragma instantiate ACE_Map_Iterator_Base<ROUTING_KEY, Peer_Handler *, ACE_SYNCH_RW_MUTEX>
-#pragma instantiate ACE_Map_Iterator<ROUTING_KEY, Peer_Handler *, ACE_SYNCH_RW_MUTEX>
-#pragma instantiate ACE_Map_Reverse_Iterator<ROUTING_KEY, Peer_Handler *, ACE_SYNCH_RW_MUTEX>
-#pragma instantiate ACE_Map_Manager<ROUTING_KEY, Peer_Handler *, ACE_SYNCH_RW_MUTEX>
-#pragma instantiate ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_SYNCH>
-#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */