summaryrefslogtreecommitdiff
path: root/examples/ASX/UPIPE_Event_Server/Peer_Router.cpp
blob: f17560ad0e62e1e0900fa6b26b0199da4534d36d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
#if !defined (_PEER_ROUTER_C)
// $Id$

#define _PEER_ROUTER_C

#include "ace/Get_Opt.h"
#include "ace/Service_Config.h"

#include "Peer_Router.h"
#include "Options.h"

#if defined (ACE_HAS_THREADS)

// Define some short-hand macros to deal with long templates
// names... 

#define PH  PEER_HANDLER
#define PA  PEER_ACCEPTOR
#define PAD PEER_ADDR
#define PK  PEER_KEY
#define PM  PEER_MAP

template <class PH, class PK> int
Acceptor_Factory<PH, PK>::init (int argc, char *argv[])
{
  ACE_Get_Opt get_opt (argc, argv, "df:", 0);
  ACE_UPIPE_Addr addr;

  for (int c; (c = get_opt ()) != -1; )
     switch (c)
       {
       case 'f': 
	 addr.set (get_opt.optarg);
	 break;
       case 'd':
	 break;
       default:
	 break;
       }
  
  if (this->open (addr, ACE_Service_Config::reactor ()) == -1)
    ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1);
  return 0;
}

template <class PH, class PK> 
Acceptor_Factory<PH, PK>::Acceptor_Factory (Peer_Router<PH, PK> *pr)
  : pr_ (pr) 
{
}

template <class PH, class PK> Peer_Router<PH, PK> *
Acceptor_Factory<PH, PK>::router (void)
{ 
  return this->pr_; 
}

template <class ROUTER, class KEY> 
Peer_Handler<ROUTER, KEY>::Peer_Handler (ACE_Thread_Manager *tm)
  : ACE_Svc_Handler<ACE_UPIPE_Stream, ACE_UPIPE_Addr, ACE_MT_SYNCH> (tm)
{
}

template <class ROUTER, class KEY> int
Peer_Handler<ROUTER, KEY>::svc (void)
{
  ACE_Thread_Control thread_control (tm);
  // just a try !!
  // we're just reading from our ACE_Message_Queue
  ACE_Message_Block *db, *hb;
  int n;
  // do an endless loop
  for (;;)
    {
      db = new ACE_Message_Block (BUFSIZ);
      hb = new ACE_Message_Block (sizeof (KEY), ACE_Message_Block::MB_PROTO, db);
  
      if ((n = this->peer ().recv (db->rd_ptr (), db->size ())) == -1)
	ACE_ERROR_RETURN ((LM_ERROR, "%p", "recv failed"), -1);    
      else if (n == 0) // Client has closed down the connection.
	{

	  if (this->router_task_->unbind_peer (this->get_handle ()) == -1)
	    ACE_ERROR_RETURN ((LM_ERROR, "%p", "unbind failed"), -1);
          ACE_DEBUG ((LM_DEBUG, "(%t) shutting down \n"));
	  return -1; // We do not need to be deregistered by reactor
	  // as we were not registered at all
	}
      else // Transform incoming buffer into a Message and pass downstream.
	{
	  db->wr_ptr (n);
	  *(long *) hb->rd_ptr () = this->get_handle (); // structure assignment.
	  hb->wr_ptr (sizeof (long));
	  if (this->router_task_->reply (hb) == -1)
	    {
	      cout << "Peer_Handler.svc : router_task->reply failed" << endl ;
	      return -1;
	    }
       
          // return this->router_task_->reply (hb) == -1 ? -1 : 0;
	}
    }
  return 0;
}

template <class ROUTER, class KEY> int
Peer_Handler<ROUTER, KEY>::put (ACE_Message_Block *mb, ACE_Time_Value *)
{
  return this->peer ().send_n (mb->rd_ptr (), mb->length ());
}

// Create a new handler and point its ROUTER_TASK_ data member to the
// corresponding router.  Note that this router is extracted out of
// the Acceptor_Factory * that is passed in via the
// ACE_Acceptor::handle_input() method.

template <class ROUTER, class KEY> int
Peer_Handler<ROUTER, KEY>::open (void *a)
{
  char buf[BUFSIZ], *p = buf;

  if (this->router_task_->info (&p, sizeof buf) != -1)
    ACE_DEBUG ((LM_DEBUG, "(%t) creating handler for %s, fd = %d, this = %d\n", 
	       buf, this->get_handle (), a));
  else
    ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "info"), -1);

  if ( this->activate (options.t_flags ()) == -1)
    ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "activation of thread failed"), -1);
  else if (this->router_task_->bind_peer (this->get_handle (), this) == -1)
    ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "bind_peer"), -1);
  return 0;
}

// Receive a message from a supplier..

template <class ROUTER, class KEY> int
Peer_Handler<ROUTER, KEY>::handle_input (ACE_HANDLE h)
{

  ACE_DEBUG ((LM_DEBUG, "(%t) input arrived on sd %d\n", h));
//  ACE_Service_Config::reactor ()->remove_handler(h,
//                                          ACE_Event_Handler::ALL_EVENTS_MASK
//                                          |ACE_Event_Handler::DONT_CALL);
// this method should be called only if the peer shuts down
// so we deactivate our ACE_Message_Queue to awake our svc thread

  return 0;

#if 0
  ACE_Message_Block *db = new ACE_Message_Block (BUFSIZ);
  ACE_Message_Block *hb = new ACE_Message_Block (sizeof (KEY), ACE_Message_Block::MB_PROTO, db);
  int           n;

  if ((n = this->peer ().recv (db->rd_ptr (), db->size ())) == -1)
    ACE_ERROR_RETURN ((LM_ERROR, "%p", "recv failed"), -1);    
  else if (n == 0) // Client has closed down the connection.
    {
      if (this->router_task_->unbind_peer (this->get_handle ()) == -1)
	ACE_ERROR_RETURN ((LM_ERROR, "%p", "unbind failed"), -1);
      ACE_DEBUG ((LM_DEBUG, "(%t) shutting down %d\n", h));
      return -1; // Instruct the ACE_Reactor to deregister us by returning -1.
    }
  else // Transform incoming buffer into a Message and pass downstream.
    {
      db->wr_ptr (n);
      *(long *) hb->rd_ptr () = this->get_handle (); // structure assignment.
      hb->wr_ptr (sizeof (long));
      return this->router_task_->reply (hb) == -1 ? -1 : 0;
    }
#endif 
}

template <class PH, class PK>
Peer_Router<PH, PK>::Peer_Router (ACE_Thread_Manager *tm)
  : ACE_Task<ACE_MT_SYNCH> (tm)
{
}

template <class PH, class PK> int
Peer_Router<PH, PK>::send_peers (ACE_Message_Block *mb)
{
  ACE_Map_Iterator<PK, PH *, ACE_RW_Mutex> map_iter = this->peer_map_;
  int		 bytes       = 0;
  int		 iterations  = 0;
  ACE_Message_Block	 *data_block = mb->cont ();
  for (ACE_Map_Entry<PK, PH *> *ss = 0;
       map_iter.next (ss) != 0;
       map_iter.advance ())
    {
      if (options.debug ())
	ACE_DEBUG ((LM_DEBUG, "(%t) sending to peer via sd %d\n", ss->ext_id_));

      iterations++;
      bytes += ss->int_id_->put (data_block);
    }

  delete mb;
  return bytes == 0 ? 0 : bytes / iterations;
}

template <class PH, class PK>
Peer_Router<PH, PK>::~Peer_Router (void)
{
}

template <class PH, class PK> int
Peer_Router<PH, PK>::fini (void)
{
  delete this->acceptor_;
  return 0;
}

template <class PH, class PK> int
Peer_Router<PH, PK>::control (ACE_Message_Block *mb)
{
  ACE_IO_Cntl_Msg *ioc = (ACE_IO_Cntl_Msg *) mb->rd_ptr ();
  ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds command;

  switch (command = ioc->cmd ())
    {
    case ACE_IO_Cntl_Msg::SET_LWM:
    case ACE_IO_Cntl_Msg::SET_HWM:
      this->water_marks (command, *(size_t *) mb->cont ()->rd_ptr ());
      break;
    default:
      return -1;
    }
  return 0;
}

template <class PH, class PK> int
Peer_Router<PH, PK>::unbind_peer (PK key)
{
  return this->peer_map_.unbind (key);
}

template <class PH, class PK> int
Peer_Router<PH, PK>::bind_peer (PK key, Peer_Handler<Peer_Router<PH, PK>, PK> *ph)
{
  PH *peer_handler = (PH *) ph;
  return this->peer_map_.bind (key, peer_handler);
}

template <class PH, class PK> int 
Peer_Router<PH, PK>::init (int argc, char *argv[])
{
  this->acceptor_ = new Acceptor_Factory <PH, PK> (this);

  if (this->acceptor_->init (argc, argv) == -1
      || this->peer_map_.open () == -1)
    return -1;
  else
    {
      ACE_UPIPE_Addr addr;
      ACE_UPIPE_Acceptor &pa = this->acceptor_->acceptor ();
      
      if (pa.get_local_addr (addr) != -1)
	ACE_DEBUG ((LM_DEBUG, "(%t) initializing %s, file = %s, fd = %d, this = %u\n", 
		   this->name (), addr.get_path_name (), pa.get_handle (), this));
      else
	ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "get_local_addr"), -1);
    }
  return 0;
}

#undef PH
#undef PA
#undef PAD
#undef PK
#undef PM
#endif /* ACE_HAS_THREADS */
#endif /* _PEER_ROUTER_C */