summaryrefslogtreecommitdiff
path: root/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp
blob: 0ab012eaa4a32caf12c8264cb1258b8269a2220e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// $Id$

#include "Supplier_Router.h"
#include "Options.h"

// Handle outgoing messages in a separate thread.

int 
Supplier_Router::svc (void)
{
  assert (this->is_writer ());

  ACE_DEBUG ((LM_DEBUG, "(%t) starting svc in Supplier_Router\n"));

  for (ACE_Message_Block *mb = 0;
       this->getq (mb) >= 0;
       )
    {
      ACE_DEBUG ((LM_DEBUG, 
		  "(%t) warning: Supplier_Router is forwarding a message via send_peers\n"));

      // Broadcast the message to the Suppliers.

      if (this->context ()->send_peers (mb) == -1)
	ACE_ERROR_RETURN ((LM_ERROR, 
			   "(%t) send_peers failed in Supplier_Router\n"),
			   -1);
    }

  ACE_DEBUG ((LM_DEBUG, "(%t) stopping svc in Supplier_Router\n"));
  return 0;
}

Supplier_Router::Supplier_Router (Peer_Router_Context *prc)
  : Peer_Router (prc)
{
  this->context ()->duplicate ();
}

// Initialize the Supplier Router. 

int
Supplier_Router::open (void *)
{
  if (this->is_reader ())
    {
      // Set the Peer_Router_Context to point back to us so that all the
      // Peer_Handler's <put> their incoming <Message_Blocks> to our
      // reader Task.
      this->context ()->peer_router (this);
      return 0;
    }

  else // if (this->is_writer ()
    {
      // Make this an active object to handle the error cases in a
      // separate thread.
      this->context ()->duplicate ();      
      return this->activate (Options::instance ()->t_flags ());
    }
}

// Close down the router. 

int
Supplier_Router::close (u_long)
{
  ACE_DEBUG ((LM_DEBUG, "(%t) closing Supplier_Router %s\n",
  	      this->is_reader () ? "reader" : "writer"));

  if (this->is_writer ())
    // Inform the thread to shut down.
    this->msg_queue ()->deactivate ();

  // Both writer and reader call release(), so the context knows when
  // to clean itself up.
  this->context ()->release ();
  return 0;
}

// Send an <ACE_Message_Block> to the supplier(s).

int 
Supplier_Router::put (ACE_Message_Block *mb, 
		      ACE_Time_Value *)
{
  // Perform the necessary control operations before passing
  // the message up the stream.

  if (mb->msg_type () == ACE_Message_Block::MB_IOCTL)
    {
      this->control (mb);
      return this->put_next (mb);
    }

  // If we're the reader then we are responsible for pass messages up
  // to the next Module's writer Task.

  else if (this->is_reader ())
    return this->put_next (mb);
  else // if (this->is_writer ())
    {
      // Someone is trying to write to the Supplier.  In this
      // implementation this is considered an error.  However, we'll
      // just go ahead and forward the message to the Supplier (who
      // hopefully is prepared to receive it).
      ACE_DEBUG ((LM_WARNING, "(%t) warning: sending to a Supplier\n"));

      // Queue up the message to processed by Supplier_Router::svc().
      // Since we don't expect to be getting many of these messages,
      // we queue them up and run them in a separate thread to avoid
      // taxing the main thread.
      return this->putq (mb);
    }
}

// Return information about the Supplier_Router ACE_Module. 

int 
Supplier_Router::info (char **strp, size_t length) const
{
  char buf[BUFSIZ];
  ACE_INET_Addr addr;
  const char *mod_name = this->name ();

  if (this->context ()->acceptor ().get_local_addr (addr) == -1)
    return -1;
  
  ACE_OS::sprintf (buf, "%s\t %d/%s %s (%s)\n",
		   mod_name, addr.get_port_number (), "tcp",
		   "# supplier router", this->is_reader () ? "reader" : "writer");
  
  if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0)
    return -1;
  else
    ACE_OS::strncpy (*strp, mod_name, length);
  return ACE_OS::strlen (mod_name);
}