summaryrefslogtreecommitdiff
path: root/examples/QOS/Change_Receiver_FlowSpec/receiver.cpp
blob: abeda6cd58e955f578f16c8795da18280011dfb5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
/* -*- C++ -*- */
// $Id$

// ============================================================================
//
// = LIBRARY
//    ACE_wrappers/examples/QOS
//
// = FILENAME
//    server.cpp
//
// = AUTHOR
//    Vishal Kachroo <vishal@cs.wustl.edu>
//
// ============================================================================

#define QOSEVENT_MAIN

#include "ace/QoS/QoS_Session.h"
#include "ace/QoS/QoS_Session_Factory.h"
#include "ace/QoS/QoS_Decorator.h"
#include "ace/QoS/SOCK_Dgram_Mcast_QoS.h"

#include "QoS_Util.h"
#include "Fill_ACE_QoS.h"
#include "QoS_Signal_Handler.h"
#include "Receiver_QoS_Event_Handler.h"
    
// To open QOS sockets administrative access is required on the
// machine.  Fill in default values for QoS structure.  The default
// values were simply choosen from existing QOS templates available
// via WSAGetQosByName.  Notice that ProviderSpecific settings are
// being allowed when picking the "default" template but not for
// "well-known" QOS templates.  Also notice that since data is only
// flowing from sender to receiver, different flowspecs are filled in
// depending upon whether this application is acting as a sender or
// receiver.


// This function fills up the ACE_QoS_Params with the supplied iovec
// and ACE_QoS.

int
FillQoSParams (ACE_QoS_Params &qos_params,
               iovec* iov, 
               ACE_QoS* qos)
{
  qos_params.callee_data (iov);
  qos_params.caller_data (0);
  qos_params.socket_qos (qos);
  qos_params.group_socket_qos (0);
  qos_params.flags (ACE_JL_BOTH);

  return 0;
}

int
main (int argc, char * argv[])
{  

  QoS_Util qos_util(argc, argv);

  if (qos_util.parse_args () == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "Error in parsing args\n"),
                      -1);

  // This is a multicast application.
  if (qos_util.multicast_flag ())
    {
       Fill_ACE_QoS  fill_ace_qos;

       // The application adds the flow specs that it wants into the
       // Fill_ACE_QoS. The Fill_ACE_QoS indexes the flow specs by the
       // flow spec names. Here the new flowspec being added is g_711.
       ACE_CString g_711 ("g_711");

       switch (fill_ace_qos.map ().bind (g_711,
                                         new ACE_Flow_Spec (9200,
                                                            708,
                                                            18400,
                                                            0,
                                                            0,
                                                            ACE_SERVICETYPE_CONTROLLEDLOAD,
                                                            368,
                                                            368,
                                                            25,
                                                            1)))
         {
         case 1 : 
           ACE_ERROR_RETURN ((LM_ERROR,
                              "Unable to bind the new flow spec\n"
                              "The Flow Spec name already exists\n"),
                             -1);
           break;
         case -1 :
           ACE_ERROR_RETURN ((LM_ERROR,
                              "Unable to bind the new flow spec\n"),
                             -1);
           break;
         }
    
       ACE_DEBUG ((LM_DEBUG,
                   "g_711 Flow Spec bound successfully\n"));

       // This is a receiver. So we fill in the receiving QoS parameters.
       ACE_QoS ace_qos_receiver;
       if (fill_ace_qos.fill_simplex_receiver_qos (ace_qos_receiver,
                                                   g_711) !=0)
         ACE_ERROR_RETURN ((LM_ERROR,
                            "Unable to fill simplex receiver qos\n"),
                           -1);
       else
         ACE_DEBUG ((LM_DEBUG,
                     "Filled up the Receiver QoS parameters\n"));

      // Opening a new Multicast Datagram.
      ACE_SOCK_Dgram_Mcast_QoS dgram_mcast_qos;
      
      // Multicast Session Address specified by user at command line.
      // If this address is not specified, 
      // <localhost:ACE_DEFAULT_MULTICAST_PORT> is assumed. 
      ACE_INET_Addr mult_addr (*(qos_util.mult_session_addr ()));
      
      // Fill the ACE_QoS_Params to be passed to the <ACE_OS::join_leaf>
      // through subscribe.

      ACE_QoS_Params qos_params;
      FillQoSParams (qos_params, 0, &ace_qos_receiver);

      // Create a QoS Session Factory.
      ACE_QoS_Session_Factory session_factory;

      // Ask the factory to create a QoS session.
      ACE_QoS_Session *qos_session = 
        session_factory.create_session ();

      // Create a destination address for the QoS session. The same
      // address should be used for the subscribe call later. A copy
      // is made below only to distinguish the two usages of the dest
      // address.

      ACE_INET_Addr dest_addr (mult_addr);

      // A QoS session is defined by the 3-tuple [DestAddr, DestPort,
      // Protocol]. Initialize the QoS session.
      if (qos_session->open (mult_addr,
                             IPPROTO_UDP) == -1)
        ACE_ERROR_RETURN ((LM_ERROR,
                           "Error in opening the QoS session\n"),
                          -1);
      else
        ACE_DEBUG ((LM_DEBUG,
                    "QoS session opened successfully\n"));

      // The following call opens the Dgram_Mcast and calls the
      // <ACE_OS::join_leaf> with the qos_params supplied here. Note
      // the QoS session object is passed into this call. This
      // subscribes the underlying socket to the passed in QoS
      // session. For joining multiple multicast sessions, the
      // following subscribe call should be made with different
      // multicast addresses and a new QoS session object should be
      // passed in for each such call. The QoS session objects can be
      // created only through the session factory. Care should be
      // taken that the mult_addr for the subscribe() call matches the
      // dest_addr of the QoS session object. If this is not done, the
      // subscribe call will fail. A more abstract version of
      // subscribe will be added that constrains the various features
      // of GQoS like different flags etc.
      
      if (dgram_mcast_qos.subscribe (mult_addr,
                                     qos_params,
                                     1,
                                     0,
                                     AF_INET,
                                     // ACE_FROM_PROTOCOL_INFO,
                                     0,
                                     0, // ACE_Protocol_Info,
                                     0,
                                     ACE_OVERLAPPED_SOCKET_FLAG 
                                     | ACE_FLAG_MULTIPOINT_C_LEAF 
                                     | ACE_FLAG_MULTIPOINT_D_LEAF,
                                     qos_session) == -1)
        ACE_ERROR_RETURN ((LM_ERROR,
                           "Error in subscribe\n"),
                          -1);
      else
        ACE_DEBUG ((LM_DEBUG,
                    "Dgram_Mcast subscribe succeeds \n"));

      int nIP_TTL = 25;
      char achInBuf [BUFSIZ];
      u_long dwBytes;
  
      // Should this be abstracted into QoS objects ?? Doesnt seem to have
      // to do anything directly with QoS.
      if (ACE_OS::ioctl (dgram_mcast_qos.get_handle (), // Socket.
                         ACE_SIO_MULTICAST_SCOPE, // IO control code.
                         &nIP_TTL, // In buffer.
                         sizeof (nIP_TTL), // Length of in buffer.
                         achInBuf, // Out buffer.
                         BUFSIZ, // Length of Out buffer.
                         &dwBytes, // bytes returned.
                         0, // Overlapped.
                         0) == -1) // Func.
        ACE_ERROR ((LM_ERROR,
                    "Error in Multicast scope ACE_OS::ioctl() \n"));
      else
        ACE_DEBUG ((LM_DEBUG,
                    "Setting TTL with Multicast scope ACE_OS::ioctl call succeeds \n"));
      
      int bFlag = 0;
      
      // Should this be abstracted into QoS objects ?? Doesnt seem to have
      // to do anything directly with QoS.
      if (ACE_OS::ioctl (dgram_mcast_qos.get_handle (), // Socket.
                         ACE_SIO_MULTIPOINT_LOOPBACK, // IO control code.
                         &bFlag, // In buffer.
                         sizeof (bFlag), // Length of in buffer.
                         achInBuf, // Out buffer.
                         BUFSIZ, // Length of Out buffer.
                         &dwBytes, // bytes returned.
                         0, // Overlapped.
                         0) == -1) // Func.
        ACE_ERROR ((LM_ERROR,
                    "Error in Loopback ACE_OS::ioctl() \n"));
      else
        ACE_DEBUG ((LM_DEBUG,
                    "Disable Loopback with ACE_OS::ioctl call succeeds \n"));
      
      // This is a receiver. 
      qos_session->flags (ACE_QoS_Session::ACE_QOS_RECEIVER);
      
      ACE_QoS_Manager qos_manager = dgram_mcast_qos.qos_manager ();
      
      // Set the QoS for the session. Replaces the ioctl () call that
      // was being made previously.
      if (qos_session->qos (&dgram_mcast_qos,
                            &qos_manager,
                            ace_qos_receiver) == -1)
        ACE_ERROR_RETURN ((LM_ERROR,
                           "Unable to set QoS\n"),
                          -1);
      else
        ACE_DEBUG ((LM_DEBUG,
                    "Setting QOS succeeds.\n"));

      // Register a signal handler that helps to gracefully close the
      // open QoS sessions.
      QoS_Signal_Handler qos_signal_handler (qos_session);
      
      // Register the usual SIGINT signal handler with the Reactor for
      // the application to gracefully release the QoS session and
      // shutdown.
      if (ACE_Reactor::instance ()->register_handler 
          (SIGINT, &qos_signal_handler) == -1)
        ACE_ERROR_RETURN ((LM_ERROR,
                           "Error in registering the Signal Handler.\n"),
                          -1);

      // Handler to process QoS and Data events for the reciever.
      Receiver_QoS_Event_Handler qos_event_handler (dgram_mcast_qos,
                                                    qos_session);

      // Decorate the above handler with QoS functionality. 
      ACE_QoS_Decorator qos_decorator (&qos_event_handler,
                                       qos_session);

      // Initialize the Decorator.
      if (qos_decorator.init () != 0)
        ACE_ERROR_RETURN ((LM_ERROR,
                           "QoS Decorator init () failed.\n"),
                          -1);

      // Register the decorated Event Handler with the Reactor.
      if (ACE_Reactor::instance ()->register_handler (&qos_decorator,
                                                      ACE_Event_Handler::QOS_MASK |
                                                      ACE_Event_Handler::READ_MASK) == -1)
        ACE_ERROR_RETURN ((LM_ERROR,
                           "Error in registering the Decorator with the Reactor\n"),
                          -1);

      
      // Start the event loop.
      ACE_DEBUG ((LM_DEBUG,
                  "Running the Event Loop ... \n"));
      
      ACE_Reactor::instance ()->run_event_loop ();
      
      ACE_DEBUG ((LM_DEBUG,
                  "(%P|%t) shutting down server logging daemon\n"));
    }
  else
    ACE_DEBUG ((LM_DEBUG,
                "Specify a -m option for multicast application\n"));
  return 0;
}