summaryrefslogtreecommitdiff
path: root/ACE/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp
blob: 3cafe8d1149bf777b2d10febd50cebbe9159b163 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
#include "ace/os_include/os_assert.h"
#include "ace/OS_NS_stdio.h"
#include "ace/OS_NS_string.h"
#include "ace/Truncate.h"
#include "Consumer_Router.h"
#include "Options.h"



Consumer_Router::Consumer_Router (Peer_Router_Context *prc)
  : Peer_Router (prc)
{
  this->context ()->duplicate ();
}

// Initialize the Router.

int
Consumer_Router::open (void *)
{
  if (this->is_writer ())
    {
      // Set the <Peer_Router_Context> to point back to us so that if
      // any Consumer's "accidentally" send us data we'll be able to
      // handle it by passing it down the stream.
      this->context ()->peer_router (this);

      // Increment the reference count.
      this->context ()->duplicate ();

      // Make this an active object to handle the error cases in a
      // separate thread.  This is mostly just for illustration, i.e.,
      // it's probably overkill to use a thread for this!
      return this->activate (Options::instance ()->t_flags ());
    }
  else // if (this->is_reader ())

    // Nothing to do since this side is primarily used to transmit to
    // Consumers, rather than receive.
    return 0;
}

int
Consumer_Router::close (u_long)
{
  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) closing Consumer_Router %s\n"),
              this->is_reader () ? ACE_TEXT ("reader") : ACE_TEXT ("writer")));

  if (this->is_writer ())
    // Inform the thread to shut down.
    this->msg_queue ()->deactivate ();

  // Both writer and reader call <release>, so the context knows when
  // to clean itself up.
  this->context ()->release ();
  return 0;
}

// Handle incoming messages in a separate thread.

int
Consumer_Router::svc (void)
{
  assert (this->is_writer ());

  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("(%t) starting svc in Consumer_Router\n")));

  for (ACE_Message_Block *mb = 0;
       this->getq (mb) >= 0;
       )
    {
      ACE_DEBUG ((LM_DEBUG,
                  ACE_TEXT ("(%t) warning: Consumer_Router is ")
                  ACE_TEXT ("forwarding a message to Supplier_Router\n")));

      // Pass this message down to the next Module's writer Task.
      if (this->put_next (mb) == -1)
        ACE_ERROR_RETURN
          ((LM_ERROR,
            ACE_TEXT ("(%t) send_peers failed in Consumer_Router\n")),
           -1);
    }

  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("(%t) stopping svc in Consumer_Router\n")));
  return 0;
  // Note the implicit ACE_OS::thr_exit() via destructor.
}

// Send a <Message_Block> to the supplier(s).

int
Consumer_Router::put (ACE_Message_Block *mb,
                      ACE_Time_Value *)
{
  // Perform the necessary control operations before passing the
  // message down the stream.

  if (mb->msg_type () == ACE_Message_Block::MB_IOCTL)
    {
      this->control (mb);
      return this->put_next (mb);
    }

  // If we're the reader then we're responsible for broadcasting
  // messages to Consumers.

  else if (this->is_reader ())
    {
      if (this->context ()->send_peers (mb) == -1)
        ACE_ERROR_RETURN
          ((LM_ERROR,
            ACE_TEXT ("(%t) send_peers failed in Consumer_Router\n")),
           -1);
      else
        return 0;
    }
  else // if (this->is_writer ())
    // Queue up the message to processed by <Consumer_Router::svc>
    // Since we don't expect to be getting many of these messages, we
    // queue them up and run them in a separate thread to avoid taxing
    // the main thread.
    return this->putq (mb);
}

// Return information about the <Consumer_Router>.

int
Consumer_Router::info (ACE_TCHAR **strp, size_t length) const
{
  ACE_TCHAR buf[BUFSIZ];
  ACE_INET_Addr  addr;
  const ACE_TCHAR *module_name = this->name ();

  if (this->context ()->acceptor ().get_local_addr (addr) == -1)
    return -1;

  ACE_OS::sprintf (buf,
                   ACE_TEXT ("%") ACE_TEXT_PRIs
                   ACE_TEXT ("\t %d/%") ACE_TEXT_PRIs
                   ACE_TEXT (" %") ACE_TEXT_PRIs
                   ACE_TEXT (" (%") ACE_TEXT_PRIs
                   ACE_TEXT (")\n"),
                   module_name,
                   addr.get_port_number (),
                   ACE_TEXT ("tcp"),
                   ACE_TEXT ("# consumer router"),
                   this->is_reader () ? ACE_TEXT ("reader") : ACE_TEXT ("writer"));
  if (*strp == 0 && (*strp = ACE_OS::strdup (module_name)) == 0)
    return -1;
  else
    ACE_OS::strncpy (*strp, module_name, length);

  return ACE_Utils::truncate_cast<int> (ACE_OS::strlen (module_name));
}