summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/tests/Event/Mcast/Two_Way/application.cpp
blob: ce7c85cdd95f451ca48a0fd52360eaae6fa20694 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
// $Id$

#include "Constants.h"

#include "orbsvcs/Event_Utilities.h"
#include "orbsvcs/Event/EC_Lifetime_Utils_T.h"
#include "orbsvcs/Event/ECG_UDP_Sender.h"
#include "orbsvcs/Event/ECG_UDP_Receiver.h"

#include "orbsvcs/RtecEventChannelAdminC.h"
#include "orbsvcs/RtecEventCommS.h"

#include "tao/ORB_Core.h"

#include "ace/Array_Base.h"
#include "ace/Get_Opt.h"
#include "ace/Reactor.h"
#include "ace/OS_NS_unistd.h"
#include "ace/os_include/os_netdb.h"

// Indicates whether this application is responsible for destroying
// the Event Channel it's using upon exit.
int destroy_ec_flag = 0;

/**
 * @class Heartbeat_Application
 *
 * @brief A simple application for testing federation of Event
 *        Channels via multicast.
 *
 *        NOTE: Contains platform-specific code (event data), i.e.,
 *              might not work cross-platform.
 *
 * This class acts both as a receiver and a supplier of HEARTBEAT events
 * to a multicast-federated Event Channel.  After sending a prespecified
 * number of heartbeat events, it prints out a summary about received
 * heartbeats and shuts down.
 */
class Heartbeat_Application :
  public POA_RtecEventComm::PushConsumer,
  public TAO_EC_Deactivated_Object
{
public:

  /// Constructor.
  Heartbeat_Application (void);

  /// Destructor.
  ~Heartbeat_Application (void);

  // Initializes the object: connects with EC as a supplier and a
  // consumer and registers with reactor for timeouts.  If init ()
  // completes successfully, shutdown () must be called when this
  // object is no longer needed, for proper resource cleanup.  (This
  // is normally done by handle_timeout() method, but if handle_timeout()
  // will not have a chance to execute, it is the responsibility of
  // the user.)
  void init (CORBA::ORB_var orb,
             RtecEventChannelAdmin::EventChannel_var ec
             ACE_ENV_ARG_DECL);

  // No-op if the object hasn't been fully initialized.  Otherwise,
  // deregister from reactor and poa, destroy ec or just disconnect from it
  // (based on <destroy_ec> flag), and shut down the orb.
  void shutdown (void);

  /// Send another heartbeat or, if we already sent/attempted the required
  /// number of heartbeats, perform shutdown().
  int handle_timeout (const ACE_Time_Value& tv,
                      const void* act);

  /// PushConsumer methods.
  //@{
  /// Update our <heartbeats_> database to reflect newly received heartbeats.
  virtual void push (const RtecEventComm::EventSet &events
                     ACE_ENV_ARG_DECL)
    ACE_THROW_SPEC((CORBA::SystemException));

  /// Initiate shutdown().
  virtual void disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL)
    ACE_THROW_SPEC((CORBA::SystemException));
  //@}

private:

  /**
   * @class Timeout_Handler
   *
   * @brief Helper class for receiving timeouts from Reactor.
   */
  class Timeout_Handler : public ACE_Event_Handler
  {
  public:
    /// Constructor.
    Timeout_Handler (Heartbeat_Application *recv);
    /// Reactor callback.
    virtual int handle_timeout (const ACE_Time_Value& tv,
                                const void* act);
  private:
    /// We callback to this object when a message arrives.
    Heartbeat_Application* receiver_;
  };

  /// Helpers.
  //@{
  /// Verify that arguments are not nil and store their values.
  int check_args (CORBA::ORB_var orb,
                  RtecEventChannelAdmin::EventChannel_var ec);
  /// Connects to EC as a supplier.
  void connect_as_supplier (ACE_ENV_SINGLE_ARG_DECL);
  /// Connects to EC as a consumer.  Activate with default POA.
  void connect_as_consumer (ACE_ENV_SINGLE_ARG_DECL);
  /// Call destroy() on the EC.  Does not propagate exceptions.
  void destroy_ec (void);
  /// Registers with orb's reactor for timeouts ocurring every 0.5
  /// seconds. Returns 0 on success, -1 on error.
  int register_for_timeouts (void);
  /// Deregister from reactor.
  void stop_timeouts (void);
  //@}

  /// Flag indicating whether this object has been fully initialized.
  int initialized_;

  /// Helper object for receiving timeouts from Reactor.
  Timeout_Handler timeout_handler_;

  /// Number of heartbeats we sent so far.
  size_t n_timeouts_;

  /// Info we keep on each HEARTBEAT source.
  typedef struct {
    pid_t pid;
    char hostname [MAXHOSTNAMELEN];
    int total;
  } HEARTBEAT_SOURCE_ENTRY;

  /// Stores info on all heartbeats we received so far.
  ACE_Array_Base<HEARTBEAT_SOURCE_ENTRY> heartbeats_;

  /// Our identity: pid followed by hostname.  We include this info into each
  /// heartbeat we send.
  char hostname_and_pid_ [MAXHOSTNAMELEN+11];

  /// ORB and EC pointers - to allow cleanup down the road.
  CORBA::ORB_var orb_;
  RtecEventChannelAdmin::EventChannel_var ec_;

  /// Consumer proxy which represents us in EC as a supplier.
  RtecEventChannelAdmin::ProxyPushConsumer_var consumer_;

  typedef TAO_EC_Auto_Command<TAO_ECG_UDP_Sender_Disconnect_Command>
  Supplier_Proxy_Disconnect;
  typedef TAO_EC_Auto_Command<TAO_ECG_UDP_Receiver_Disconnect_Command>
  Consumer_Proxy_Disconnect;

  /// Manages our connection to Supplier Proxy.
  Supplier_Proxy_Disconnect supplier_proxy_disconnect_;
  /// Manages our connection to Consumer Proxy.
  Consumer_Proxy_Disconnect consumer_proxy_disconnect_;
};
// **************************************************************************

Heartbeat_Application::Timeout_Handler::
Timeout_Handler (Heartbeat_Application* r)
  :  receiver_ (r)
{
}

int
Heartbeat_Application::Timeout_Handler::
handle_timeout (const ACE_Time_Value& tv,
                const void* act)
{
  return this->receiver_->handle_timeout (tv, act);
}

// **************************************************************************

Heartbeat_Application::Heartbeat_Application (void)
  : initialized_ (0)
  , timeout_handler_ (this)
  , n_timeouts_ (0)
  , orb_ ()
  , ec_ ()
  , consumer_ ()
  , supplier_proxy_disconnect_ ()
  , consumer_proxy_disconnect_ ()
{
}

Heartbeat_Application::~Heartbeat_Application (void)
{
}

int
Heartbeat_Application::check_args (CORBA::ORB_var orb,
                                   RtecEventChannelAdmin::EventChannel_var ec)
{
  if (CORBA::is_nil (ec.in ()))
    {
      ACE_ERROR_RETURN ((LM_ERROR,
                         "%N (%l): Nil ec argument to "
                         "Heartbeat_Application::init\n"),
                        -1);
    }

  if (CORBA::is_nil (orb.in ()))
    {
      ACE_ERROR_RETURN ((LM_ERROR,
                         "%N (%l): Nil orb argument to "
                         "Heartbeat_Application::init\n"),
                        -1);
    }

  this->ec_ = ec;
  this->orb_ = orb;

  return 0;
}

void
Heartbeat_Application::init (CORBA::ORB_var orb,
                             RtecEventChannelAdmin::EventChannel_var ec
                             ACE_ENV_ARG_DECL)
{
  // Verify arguments.
  if (this->check_args (orb, ec) == -1)
    {
      ACE_THROW (CORBA::INTERNAL ());
    }

  // Get hostname & process id, i.e., identity of this application.
  pid_t pid = ACE_OS::getpid ();
  ACE_OS::memcpy (this->hostname_and_pid_,
                  &pid,
                  sizeof (pid));

  if (gethostname (this->hostname_and_pid_ + sizeof (pid),
                   MAXHOSTNAMELEN)
      != 0)
    {
      ACE_ERROR ((LM_ERROR,
                  "Heartbeat_Application::init - "
                  "cannot get hostname\n"));
      ACE_THROW (CORBA::INTERNAL ());
    }

  // Connect to EC as a supplier.
  this->connect_as_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  // Connect to EC as a consumer.
  ACE_TRY
    {
      this->connect_as_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;
    }
  ACE_CATCHANY
    {
      this->consumer_proxy_disconnect_.execute ();
      ACE_RE_THROW;
    }
  ACE_ENDTRY;
  ACE_CHECK;

  // Register for reactor timeouts.
  if (this->register_for_timeouts () == -1)
    {
      this->consumer_proxy_disconnect_.execute ();
      this->supplier_proxy_disconnect_.execute ();
      this->deactivator_.deactivate ();
      ACE_THROW (CORBA::INTERNAL ());
    }

  this->initialized_ = 1;
}

int
Heartbeat_Application::register_for_timeouts (void)
{
  // Schedule timeout every 0.5 seconds, for sending heartbeat events.
  ACE_Time_Value timeout_interval (0, 500000);
  ACE_Reactor *reactor = this->orb_->orb_core ()->reactor ();
  if (!reactor
      || reactor->schedule_timer (&this->timeout_handler_, 0,
                                  timeout_interval,
                                  timeout_interval) == -1)
    {
      ACE_ERROR_RETURN ((LM_ERROR,
                         "Heartbeat_Application::register_for_timeouts - "
                         "cannot schedule timer\n"),
                        -1);
    }

  return 0;
}

void
Heartbeat_Application::stop_timeouts (void)
{
  ACE_Reactor *reactor = this->orb_->orb_core ()->reactor ();
  if (!reactor
      || reactor->cancel_timer (&this->timeout_handler_) == -1)
    {
      ACE_ERROR ((LM_ERROR,
                  "Heartbeat_Application::stop_timeouts - "
                  "cannot deregister from reactor.\n"));
    }
}

void
Heartbeat_Application::connect_as_supplier (ACE_ENV_SINGLE_ARG_DECL)
{
  // Obtain reference to SupplierAdmin.
  RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
    this->ec_->for_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  // Obtain ProxyPushConsumer and connect this supplier.
  RtecEventChannelAdmin::ProxyPushConsumer_var proxy =
    supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;
  Consumer_Proxy_Disconnect new_proxy_disconnect (proxy.in ());

  ACE_SupplierQOS_Factory qos;
  qos.insert (SOURCE_ID, HEARTBEAT, 0, 1);

  proxy->connect_push_supplier (RtecEventComm::PushSupplier::_nil (),
                                qos.get_SupplierQOS ()
                                ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  // Update resource managers.
  this->consumer_ = proxy._retn ();
  this->consumer_proxy_disconnect_.set_command (new_proxy_disconnect);
}

void
Heartbeat_Application::connect_as_consumer (ACE_ENV_SINGLE_ARG_DECL)
{
  // Activate with poa.
  RtecEventComm::PushConsumer_var consumer_ref;
  PortableServer::POA_var poa = this->_default_POA ();

  TAO_EC_Object_Deactivator deactivator;
  activate (consumer_ref,
            poa.in (),
            this,
            deactivator
            ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  // Obtain reference to ConsumerAdmin.
  RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
    this->ec_->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  // Obtain ProxyPushSupplier..
  RtecEventChannelAdmin::ProxyPushSupplier_var proxy =
    consumer_admin->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;
  Supplier_Proxy_Disconnect new_proxy_disconnect (proxy.in ());

  // Connect this consumer.
  ACE_ConsumerQOS_Factory qos;
  qos.start_disjunction_group (1);
  qos.insert_type (ACE_ES_EVENT_ANY, 0);
  proxy->connect_push_consumer (consumer_ref.in (),
                                qos.get_ConsumerQOS ()
                                ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  // Update resource managers.
  this->supplier_proxy_disconnect_.set_command (new_proxy_disconnect);
  this->set_deactivator (deactivator);
}

int
Heartbeat_Application::handle_timeout (const ACE_Time_Value&,
                                       const void*)
{
  ACE_TRY_NEW_ENV
    {
      if (this->n_timeouts_++ < HEARTBEATS_TO_SEND)
        {
          RtecEventComm::EventSet events (1);
          events.length (1);
          // Events travelling through gateways must have a ttl count of at
          // least 1!
          events[0].header.ttl = 1;
          events[0].header.type = HEARTBEAT;
          events[0].header.source = SOURCE_ID;

          // Store our hostname and process id in the data portion of
          // the event.
          events[0].data.payload.replace (MAXHOSTNAMELEN+11,
                                          MAXHOSTNAMELEN+11,
                                          (u_char *)this->hostname_and_pid_,
                                          0);

          this->consumer_->push (events ACE_ENV_ARG_PARAMETER);
          ACE_TRY_CHECK;
        }
      else
        // We already sent the required number of heartbeats.  Time to
        // shutdown this app.
        {
          this->shutdown ();
        }
    }
  ACE_CATCHANY
    {
      ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
                           "Suppressed the following exception in "
                           "Heartbeat_Application::handle_timeout:\n");
    }
  ACE_ENDTRY;
  return 0;
}

void
Heartbeat_Application::push (const RtecEventComm::EventSet &events
                             ACE_ENV_ARG_DECL_NOT_USED)
  ACE_THROW_SPEC((CORBA::SystemException))
{
  for (CORBA::ULong i = 0; i < events.length (); ++i)
    {
      // Figure out heartbeat source.
      const u_char * buffer = events[i].data.payload.get_buffer ();
      pid_t pid = *((pid_t*) buffer);
      char * host = (char*) buffer + sizeof (pid);

      // Update heartbeat database.
      int found = 0;
      size_t size = this->heartbeats_.size ();
      for (size_t j = 0; j < size; ++j)
        {
          if (this->heartbeats_[j].pid == pid
              && ACE_OS::strcmp (this->heartbeats_[j].hostname, host)
              == 0)
            {
              this->heartbeats_[j].total++;
              found = 1;
              break;
            }
        }
      // Make new entry in the database.
      if (!found)
        {
          if (this->heartbeats_.size (size + 1)
              == -1)
            {
              ACE_ERROR ((LM_ERROR,
                          "Unable to add new entry "
                          "to heartbeat database \n"));
              break;
            }

          this->heartbeats_[size].pid = pid;
          this->heartbeats_[size].total = 1;
          ACE_OS::memcpy (this->heartbeats_[size].hostname,
                          host,
                          ACE_OS::strlen (host) + 1);
        }
    }
}

void
Heartbeat_Application::disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
  ACE_THROW_SPEC((CORBA::SystemException))
{
  this->shutdown ();
}

void
Heartbeat_Application::destroy_ec (void)
{
  if (!CORBA::is_nil (this->ec_.in ()))
    {
      ACE_TRY_NEW_ENV
        {
          this->ec_->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
          ACE_TRY_CHECK;
        }
      ACE_CATCHANY
        {
          ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
                               "Suppressed the following exception in "
                               "Application_Heartbeat::destroy_ec\n");
        }
      ACE_ENDTRY;

      this->ec_ = RtecEventChannelAdmin::EventChannel::_nil ();
    }
}
void
Heartbeat_Application::shutdown (void)
{
  if (!this->initialized_)
    return;

  this->initialized_ = 0;

  // Deregister from Reactor.
  this->stop_timeouts ();

  // Disconnect from ECs as a consumer.
  this->supplier_proxy_disconnect_.execute ();
  // Disconnect from EC as a supplier.
  this->consumer_proxy_disconnect_.execute ();

  if (destroy_ec_flag)
    {
      this->destroy_ec ();
    }

  // Deregister from POA.
  this->deactivator_.deactivate ();

  // Print out heartbeats report.
  pid_t pid = ACE_OS::getpid ();
  char hostname[MAXHOSTNAMELEN + 1];
  if (gethostname (hostname, MAXHOSTNAMELEN) != 0)
    {
      ACE_ERROR ((LM_ERROR,
                  "Heartbeat_Application::shutdown - "
                  "cannot get hostname\n"));
      hostname[0] = '\0';
    }
  ACE_DEBUG ((LM_DEBUG,
              "%d@%s Received following heartbeats:\n",
              pid, hostname));
  for (size_t i = 0; i < this->heartbeats_.size (); ++i)
    {
      ACE_DEBUG ((LM_DEBUG,
                  "Host %s, pid %d - total of %u\n",
                  this->heartbeats_[i].hostname,
                  this->heartbeats_[i].pid,
                  this->heartbeats_[i].total));
    }

  // Shutdown the ORB.
  ACE_TRY_NEW_ENV
    {
      this->orb_->shutdown (0 ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;
    }
  ACE_CATCHANY
    {
      ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
                           "The following exception occured in "
                           "Heartbeat_Application::shutdown:\n");
    }
  ACE_ENDTRY;
}

////////////////////////////////////////////////////////////
int
check_for_nil (CORBA::Object_ptr obj, const char *message)
{
  if (CORBA::is_nil (obj))
    ACE_ERROR_RETURN ((LM_ERROR,
                       "ERROR: Object reference <%s> is nil\n",
                       message),
                      -1);
  else
    return 0;
}

int
parse_args (int argc, char ** argv)
{
  ACE_Get_Opt get_opt (argc, argv, "d");
  int opt;

  while ((opt = get_opt ()) != EOF)
    {
      switch (opt)
        {
        case 'd':
          destroy_ec_flag = 1;
          break;

        case '?':
        default:
          ACE_DEBUG ((LM_DEBUG,
                      "Usage: %s "
                      "-d"
                      "\n",
                      argv[0]));
          return -1;
        }
    }

  return 0;
}

int
main (int argc, char *argv[])
{
  // We may want this to be alive beyond the next block.
  TAO_EC_Servant_Var<Heartbeat_Application> app;

  ACE_TRY_NEW_ENV
    {
      // Initialize ORB and POA, POA Manager, parse args.
      CORBA::ORB_var orb =
        CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      if (parse_args (argc, argv) == -1)
        return 1;

      CORBA::Object_var obj =
        orb->resolve_initial_references ("RootPOA" ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;
      PortableServer::POA_var poa =
        PortableServer::POA::_narrow (obj.in () ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;
      if (check_for_nil (poa.in (), "POA") == -1)
        return 1;

      PortableServer::POAManager_var manager =
        poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;

      // Obtain reference to EC.
      obj = orb->resolve_initial_references ("Event_Service" ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;
      RtecEventChannelAdmin::EventChannel_var ec =
        RtecEventChannelAdmin::EventChannel::_narrow (obj.in ()
                                                      ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;
      if (check_for_nil (ec.in (), "EC") == -1)
        return 1;

      // Init our application.
      app = new Heartbeat_Application;
      if (!app.in ())
        return 1;

      app->init (orb, ec ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      // Allow processing of CORBA requests.
      manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;

      // Receive events from EC.
      orb->run (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;
    }
  ACE_CATCHANY
    {
      ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
                           "Exception in Heartbeat Application:");
      // Since there was an exception, application might not have had
      // a chance to shutdown.
      app->shutdown ();
      return 1;
    }
  ACE_ENDTRY;

  return 0;
}