summaryrefslogtreecommitdiff
path: root/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp
blob: f76a18012e0feeee0f552051930a2e6cf57e34f0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// $Id$

#include "Consumer_Router.h"
#include "Options.h"

Consumer_Router::Consumer_Router (Peer_Router_Context *prc)
  : Peer_Router (prc)
{
  this->context ()->duplicate ();
}

// Initialize the Router. 

int
Consumer_Router::open (void *)
{
  if (this->is_writer ())
    {
      // Set the Peer_Router_Context to point back to us so that if
      // any Consumer's "accidentally" send us data we'll be able to
      // handle it.
      this->context ()->peer_router (this);

      // Make this an active object to handle the error cases in a
      // separate thread.
      this->context ()->duplicate ();
      return this->activate (Options::instance ()->t_flags ());
    }
  else // if (this->is_reader ())

    // Nothing to do since this side is primarily used to transmit to
    // Consumers, rather than receive.
    return 0; 
}

int
Consumer_Router::close (u_long)
{
  ACE_DEBUG ((LM_DEBUG, "(%t) closing Consumer_Router %s\n",
	      this->is_reader () ? "reader" : "writer"));

  if (this->is_writer ())
    // Inform the thread to shut down.
    this->msg_queue ()->deactivate ();

  // Both writer and reader call release(), so the context knows when
  // to clean itself up.
  this->context ()->release ();
  return 0;
}

// Handle incoming messages in a separate thread.

int 
Consumer_Router::svc (void)
{
  assert (this->is_writer ());

  ACE_DEBUG ((LM_DEBUG, 
	      "(%t) starting svc in Consumer_Router\n"));

  for (ACE_Message_Block *mb = 0;
       this->getq (mb) >= 0;
       )
    {
      ACE_DEBUG ((LM_DEBUG, 
		  "(%t) warning: Consumer_Router is forwarding a message to Supplier_Router\n"));

      // Pass this message down to the next Module's writer Task.
      if (this->put_next (mb) == -1)
	ACE_ERROR_RETURN ((LM_ERROR, 
			   "(%t) send_peers failed in Consumer_Router\n"),
			  -1);
    }

  ACE_DEBUG ((LM_DEBUG, 
	      "(%t) stopping svc in Consumer_Router\n"));
  return 0;
  // Note the implicit ACE_OS::thr_exit() via destructor.
}

// Send a <Message_Block> to the supplier(s). 

int 
Consumer_Router::put (ACE_Message_Block *mb, 
		      ACE_Time_Value *)
{
  // Perform the necessary control operations before passing
  // the message down the stream.

  if (mb->msg_type () == ACE_Message_Block::MB_IOCTL)
    {
      this->control (mb);
      return this->put_next (mb);
    }

  // If we're the reader side then we're responsible for broadcasting
  // messages to Consumers.

  else if (this->is_reader ())
    {
      if (this->context ()->send_peers (mb) == -1)
	ACE_ERROR_RETURN ((LM_ERROR, 
			   "(%t) send_peers failed in Consumer_Router\n"),
			  -1);
      else 
	return 0;
    }
  else // if (this->is_writer ())

    // Queue up the message to processed by Consumer_Router::svc().
    // Since we don't expect to be getting many of these messages, we
    // queue them up and run them in a separate thread to avoid taxing
    // the main thread.
    return this->putq (mb);
}
// Return information about the Client_Router ACE_Module.

int
Consumer_Router::info (char **strp, size_t length) const
{
  char buf[BUFSIZ];
  ACE_INET_Addr  addr;
  const char *mod_name = this->name ();
  
  if (this->context ()->acceptor ().get_local_addr (addr) == -1)
    return -1;
  
  ACE_OS::sprintf (buf, "%s\t %d/%s %s (%s)\n",
		   mod_name, addr.get_port_number (), "tcp",
		   "# consumer router", this->is_reader () ? "reader" : "writer");
  
  if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0)
    return -1;
  else
    ACE_OS::strncpy (*strp, mod_name, length);
  return ACE_OS::strlen (mod_name);
}