summaryrefslogtreecommitdiff
path: root/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp
blob: 7e2018a3d5638ef198c7fe62a403b772f8151789 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
// $Id$

#include "Supplier_Router.h"
#include "Options.h"

ACE_RCSID(Event_Server, Supplier_Router, "$Id$")

// Handle outgoing messages in a separate thread.

int 
Supplier_Router::svc (void)
{
  assert (this->is_writer ());

  ACE_DEBUG ((LM_DEBUG, "(%t) starting svc in Supplier_Router\n"));

  for (ACE_Message_Block *mb = 0;
       this->getq (mb) >= 0;
       )
    {
      ACE_DEBUG ((LM_DEBUG, 
		  "(%t) warning: Supplier_Router is "
                  "forwarding a message via send_peers\n"));

      // Broadcast the message to the Suppliers, even though this is
      // "incorrect" (assuming a oneway flow of events from Suppliers
      // to Consumers)!

      if (this->context ()->send_peers (mb) == -1)
	ACE_ERROR_RETURN ((LM_ERROR, 
			   "(%t) send_peers failed in Supplier_Router\n"),
			   -1);
    }

  ACE_DEBUG ((LM_DEBUG,
              "(%t) stopping svc in Supplier_Router\n"));
  return 0;
}

Supplier_Router::Supplier_Router (Peer_Router_Context *prc)
  : Peer_Router (prc)
{
  // Increment the reference count.
  this->context ()->duplicate ();
}

// Initialize the Supplier Router.

int
Supplier_Router::open (void *)
{
  if (this->is_reader ())
    {
      // Set the <Peer_Router_Context> to point back to us so that all
      // the Peer_Handler's <put> their incoming <Message_Blocks> to
      // our reader Task.
      this->context ()->peer_router (this);
      return 0;
    }

  else // if (this->is_writer ()
    {
      // Increment the reference count.
      this->context ()->duplicate ();      

      // Make this an active object to handle the error cases in a
      // separate thread.
      return this->activate (Options::instance ()->t_flags ());
    }
}

// Close down the router. 

int
Supplier_Router::close (u_long)
{
  ACE_DEBUG ((LM_DEBUG, 
              "(%t) closing Supplier_Router %s\n",
  	      this->is_reader () ? "reader" : "writer"));

  if (this->is_writer ())
    // Inform the thread to shut down.
    this->msg_queue ()->deactivate ();

  // Both writer and reader call release(), so the context knows when
  // to clean itself up.
  this->context ()->release ();
  return 0;
}

// Send an <ACE_Message_Block> to the supplier(s).

int 
Supplier_Router::put (ACE_Message_Block *mb, 
		      ACE_Time_Value *)
{
  // Perform the necessary control operations before passing
  // the message up the stream.

  if (mb->msg_type () == ACE_Message_Block::MB_IOCTL)
    {
      this->control (mb);
      return this->put_next (mb);
    }

  // If we're the reader then we are responsible for pass messages up
  // to the next Module's reader Task.  Note that in a "real"
  // application this is likely where we'd take a look a the actual
  // information that was in the message, e.g., in order to figure out
  // what operation it was and what it's "parameters" where, etc.
  else if (this->is_reader ())
    return this->put_next (mb);

  else // if (this->is_writer ())
    {
      // Someone is trying to write to the Supplier.  In this
      // implementation this is considered an "error."  However, we'll
      // just go ahead and forward the message to the Supplier (who
      // hopefully is prepared to receive it).
      ACE_DEBUG ((LM_WARNING,
                  "(%t) warning: sending to a Supplier\n"));

      // Queue up the message to processed by <Supplier_Router::svc>.
      // Since we don't expect to be getting many of these messages,
      // we queue them up and run them in a separate thread to avoid
      // taxing the main thread.
      return this->putq (mb);
    }
}

// Return information about the <Supplier_Router>.

int 
Supplier_Router::info (char **strp, size_t length) const
{
  char buf[BUFSIZ];
  ACE_INET_Addr addr;
  const char *mod_name = this->name ();

  if (this->context ()->acceptor ().get_local_addr (addr) == -1)
    return -1;
  
  ACE_OS::sprintf (buf,
                   "%s\t %d/%s %s (%s)\n",
		   mod_name,
                   addr.get_port_number (),
                   "tcp",
		   "# supplier router",
                   this->is_reader () ? "reader" : "writer");
  if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0)
    return -1;
  else
    ACE_OS::strncpy (*strp, mod_name, length);

  return ACE_OS::strlen (mod_name);
}