summaryrefslogtreecommitdiff
path: root/TAO/tests/Bug_3531b_Regression/server.cpp
blob: 43806234818c5dc3a1263dcce030602e5cf87d67 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
#include "ace/Get_Opt.h"
#include "ace/Global_Macros.h"
#include "ace/Task.h"
#include "tao/ORB_Core.h"
#include "tao/default_resource.h"
#include "tao/Leader_Follower.h"
#include "tao/LF_Event_Loop_Thread_Helper.h"
#include "tao/LF_Event.h"
#include "tao/Transport.h"
#include "ace/Task_T.h"
#include "tao/TAO_Export.h"
#include "ace/TP_Reactor.h"
#include <memory>

#if defined (ACE_HAS_THREADS)

int nthreads = 4;
bool debug = false;

// TEST_ASSERT is exactly the same as ACE_ASSERT except it is active
// for both debug and *release* builds.
#define TEST_ASSERT(X)                        \
  do { if(!(X)) { \
      ACE_ERROR ((LM_ERROR, ACE_TEXT ("TEST_ASSERT: (%P|%t) file %N, line %l assertion failed for '%C'.%a\n"), \
                  #X, -1));                       \
    } } while (0)

#define TSS_ASSERT(TSS, LF, ELT, CLT, LA) \
    TEST_ASSERT ((TSS->event_loop_thread_ == ELT)); \
    TEST_ASSERT ((TSS->client_leader_thread_ == CLT)); \
    TEST_ASSERT ((LF.leader_available () == LA));

class Worker;

int
parse_args (int argc, ACE_TCHAR *argv[])
{
  ACE_Get_Opt get_opts (argc, argv, ACE_TEXT ("d"));
  int c;

  while ((c = get_opts ()) != -1)
    switch (c)
      {
      case 'd':
        debug = true;
        break;

      case '?':
      default:
        ACE_ERROR_RETURN ((LM_ERROR,
                           "usage:  %s "
                           "-d"
                           "\n",
                           argv [0]),
                          -1);
      }
  // Indicates sucessful parsing of the command line
  return 0;
}

class Command: public ACE_Message_Block
{
public:
  virtual int execute (Worker*) = 0;
};


//////////////////////////////////////////////////////////////////////
// NOTE: Do *NOT* put the same msg into the msg queue more than once.
// This will confuse the msg queue and result it in dropping messages
//////////////////////////////////////////////////////////////////////
class Worker: public ACE_Task<ACE_SYNCH>
{
public:
  Worker ()
    : shutdown_ (false)
  {}

  virtual int svc ();
  virtual int close (u_long = 0);
  virtual int put (ACE_Message_Block * mblk, ACE_Time_Value * tv = 0);
  int process_cmd ();
  void shutdown (bool do_shutdown);
  bool shutdown ();

private:
  bool shutdown_;
};

ACE_TSS<Worker> *workers_p = 0;
#define workers (*workers_p)

int Worker::svc ()
{
  if (debug)
    ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Worker thread starting up.\n")));
  // Register this worker
  workers.ts_object (const_cast<Worker*> (this));
  int retval = 0;
  while (!shutdown_ && retval != -1)
    {
      retval = this->process_cmd ();
    }
  if (debug)
    ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Worker thread shutting down.\n")));
  return retval;
}

int Worker::close (u_long)
{
  // de-register this worker, otherwise the ACE_TSS will try to destroy it
  workers.ts_object (0);
  return 0;
}

int Worker::put (ACE_Message_Block * mblk, ACE_Time_Value * tv)
{
  return this->putq (mblk, tv);
}

int Worker::process_cmd ()
{
  ACE_Message_Block *mb = 0;
  if (this->getq (mb, 0) == -1)
    {
      ACE_ERROR ((LM_ERROR, ACE_TEXT ("Error calling getq: $!\n")));
      // Strangely, message queues return this instead of ETIME
      if (errno == EWOULDBLOCK || errno == ESHUTDOWN)
        return 0;
      return -1;
    }
  Command* cmd = dynamic_cast <Command*> (mb);
  ACE_ASSERT (cmd != 0);
  cmd->execute (this);
  cmd->release ();
  return 0;
}

void Worker::shutdown (bool do_shutdown)
{
  shutdown_ = do_shutdown;
}

bool Worker::shutdown ()
{
  return shutdown_;
}

class Test_Reactor: public ACE_TP_Reactor
{
public:
  Test_Reactor (size_t max_number_of_handles,
                  bool restart = false,
                  ACE_Sig_Handler *sh = 0,
                  ACE_Timer_Queue *tq = 0,
                  bool mask_signals = true,
                  int s_queue = ACE_Select_Reactor_Token::FIFO)
  : ACE_TP_Reactor(max_number_of_handles, restart, sh, tq, mask_signals, s_queue) {}


  // This is the method that the Leader_Follower object calls.
  virtual int handle_events (ACE_Time_Value * = 0)
  {
    if (TAO_debug_level > 10)
      ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Executing Test_Reactor::handle_events\n")));
    // This is called by client leader threads.  Note, the loop here
    // glosses over the fact that the Leader_Follower code does not
    // work quite the way we want it to.  Namely, this logic:
    //  1 - client leader thread detects when there are event loop
    //  threads waiting
    //  2 - client leader wakes up event loop threads via broadcast
    //  3 - client leader temporarily gives up lock to allow event loop
    //  threads to take over leadership
    //  4 - client leader thread takes lock again and loops around to
    //  become follower
    // The problem is that the gap between 3 & 4 is often not long
    // enough for the event loop threads to get switched in and take
    // ovwnership of the lock, even though the client leader thread
    // does a thr_yield!
    // Thus this code, once shutdown, will continuely return and thus
    // give the leader follower multiple chances to hand off to an
    // event loop thread.  This is not ideal but it will have to do
    // until the leader follower code is fixed (if possible)
    while (!workers->shutdown())
      // call this thread's (worker's) process_cmd method
      workers->process_cmd ();
    return 0;
  };

  virtual int handle_events (ACE_Time_Value &)
  {
    return this->handle_events ();
  }
};

// Our own Resource_Factory for testing purposes.  This just returns
// our Test_Reactor to the Leader_Follower object via the ORB_Core.
class Test_Resource_Factory: public TAO_Default_Resource_Factory
{
public:
  Test_Resource_Factory ()
  {}

  virtual ACE_Reactor_Impl* allocate_reactor_impl () const
  {
    ACE_Reactor_Impl *impl = 0;
    ACE_NEW_RETURN (impl,
                    Test_Reactor (ACE::max_handles (),
                                    1,
                                    (ACE_Sig_Handler*)0,
                                    (ACE_Timer_Queue*)0,
                                    this->reactor_mask_signals_,
                                    ACE_Select_Reactor_Token::LIFO),
                    0);
    return impl;
  }

private:
};

// force export flag otherwise Windows will complain
#define TAO_Test_Export ACE_Proper_Export_Flag

ACE_FACTORY_DEFINE (TAO_Test, Test_Resource_Factory)
ACE_STATIC_SVC_DEFINE (Test_Resource_Factory,
                       ACE_TEXT ("Resource_Factory"),
                       ACE_SVC_OBJ_T,
                       &ACE_SVC_NAME (Test_Resource_Factory),
                       ACE_Service_Type::DELETE_THIS
                       | ACE_Service_Type::DELETE_OBJ,
                       0)
ACE_STATIC_SVC_REQUIRE (Test_Resource_Factory);

int load_test_resources =
ACE_Service_Config::process_directive (ace_svc_desc_Test_Resource_Factory);

class Test_LF_Event: public TAO_LF_Event
{
public:
  Test_LF_Event()
  {}

  void complete_event (TAO_Leader_Follower &lf)
  {
    if (debug)
      ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Completing event\n")));
    this->state_changed (TAO_LF_Event::LFS_SUCCESS, lf);
  }

protected:
  virtual bool successful_i () const
  {
    return this->state_ == TAO_LF_Event::LFS_SUCCESS;
  }

  virtual bool error_detected_i () const
  {
    return (this->state_ == TAO_LF_Event::LFS_FAILURE
            || this->state_ == TAO_LF_Event::LFS_TIMEOUT
            || this->state_ == TAO_LF_Event::LFS_CONNECTION_CLOSED);
  }
  virtual void state_changed_i (LFS_STATE new_state)
  {
    this->state_ = new_state;
  }

  virtual bool is_state_final () const
  {
    if (this->state_ == TAO_LF_Event::LFS_TIMEOUT ||
        this->state_ == TAO_LF_Event::LFS_FAILURE)
      return true;
    return false;
  }
};

class Test_Transport : public TAO_Transport
{
public:
  Test_Transport (CORBA::ULong tag,
                  TAO_ORB_Core *orb_core)
    : TAO_Transport (tag, orb_core)
  {}

  virtual int send_message (TAO_OutputCDR &,
                            TAO_Stub * = 0,
                            TAO_ServerRequest * = 0,
                            TAO_Message_Semantics  = TAO_Message_Semantics (),
                            ACE_Time_Value * = 0)
  {
    return 0;
  }

  virtual ssize_t send (iovec *, int ,
                        size_t &,
                        const ACE_Time_Value * = 0)
  {
    return 0;
  }

  virtual ssize_t recv (char *,
                        size_t,
                        const ACE_Time_Value * = 0)
  {
    return 0;
  }

  virtual int messaging_init (CORBA::Octet,
                              CORBA::Octet)
  {
    return 0;
  }

  virtual ACE_Event_Handler * event_handler_i ()
  {
    return 0;
  }

protected:
  virtual TAO_Connection_Handler * connection_handler_i ()
  {
    return 0;
  }

  virtual int send_request (TAO_Stub *,
                            TAO_ORB_Core *,
                            TAO_OutputCDR &,
                            TAO_Message_Semantics,
                            ACE_Time_Value *)
  {
    return 0;
  }
};

class Shutdown: public Command
{
public:
  virtual int execute (Worker* worker)
  {
    if (debug)
      ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Shutdown cmd\n")));
    worker->shutdown (true);
    return 0;
  }
};

class TSS_Assert: public Command
{
public:
  TSS_Assert (TAO_ORB_Core* orb_core,
              int elt_count,
              int clt_count,
              bool leader_available)
    : orb_core_ (orb_core),
      elt_count_ (elt_count),
      clt_count_ (clt_count),
      leader_available_ (leader_available)
  {}

  virtual int execute (Worker*)
  {
    if (debug)
      ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Executing TSS_Assert(%d,%d,%d) cmd\n"),
                  elt_count_, clt_count_, leader_available_));
    TAO_Leader_Follower &leader_follower = orb_core_->leader_follower ();
    TAO_ORB_Core_TSS_Resources* tss = orb_core_->get_tss_resources ();
    TSS_ASSERT (tss, leader_follower,
                elt_count_, clt_count_, leader_available_);
    return 0;
  }
private:
  TAO_ORB_Core*        orb_core_;
  const int            elt_count_;
  const int            clt_count_;
  const bool           leader_available_;
};

class Wait_For_Event: public Command
{
public:
  Wait_For_Event (Test_LF_Event& event,
                  Test_Transport& transport,
                  TAO_Leader_Follower& lf)
    : event_ (event),
      transport_ (transport),
      lf_ (lf)
  {}
  virtual int execute (Worker*)
  {
    if (debug)
      ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Executing Wait_For_Event cmd\n")));
    int retval = lf_.wait_for_event (&event_, &transport_, 0);
    // The worker has probably been shutdown in order for the client
    // leader event loop to exit - reactivate the worker so it from
    // process msgs once we return
    workers->shutdown (false);
    return retval;
  }
private:
  Test_LF_Event&       event_;
  Test_Transport&      transport_;
  TAO_Leader_Follower& lf_;
};

class Cond_Signal: public Command
{
public:
  Cond_Signal ()
    : lock_ (),
      cond_ (lock_),
      ref_count_ (2)
  {}

  virtual int execute (Worker*)
  {
    if (debug)
      ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Executing Cond_Signal cmd\n")));
    ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, guard ,this->cond_.mutex (), 0);
    return this->cond_.signal ();
  }
  TAO_SYNCH_MUTEX& lock ()
  {
    return lock_;
  }
  ACE_Condition_Thread_Mutex& cond ()
  {
    return cond_;
  }
  virtual ACE_Message_Block *release ()
  {
    // we need to only release once both the main and worker thread
    // are done with this object - each signals this by calling this
    // method
    --this->ref_count_;
    if (this->ref_count_ == 0)
      return ACE_Message_Block::release ();
    return this;
  }

private:
  TAO_SYNCH_MUTEX lock_;
  ACE_Condition_Thread_Mutex cond_;
  int ref_count_;
};

class Event_Loop_Thread: public Command
{
public:
  Event_Loop_Thread(TAO_Leader_Follower& lf,
                    TAO_LF_Strategy& lf_strategy)
    : lf_ (lf), lf_strategy_ (lf_strategy)
  {}

  virtual int execute (Worker* worker)
  {
    if (debug)
      ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Executing Event_Loop_Thread cmd\n")));
    TAO_LF_Event_Loop_Thread_Helper elt (lf_, lf_strategy_, 0);
    while (!worker->shutdown())
      worker->process_cmd ();
    // The worker has been shutdown in order for this event loop
    // thread to exit - reactivate the worker so it from process msgs
    // once we return
    worker->shutdown (false);
    return 0;
  }
private:
  TAO_Leader_Follower& lf_;
  TAO_LF_Strategy&     lf_strategy_;
};

class Set_Upcall_Thread: public Command
{
public:
  Set_Upcall_Thread (TAO_Leader_Follower& lf)
    : lf_ (lf)
  {}

  virtual int execute (Worker*)
  {
    if (debug)
      ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Executing Set_Upcall_Thread cmd\n")));
    lf_.set_upcall_thread ();
    return 0;
  }

private:
  TAO_Leader_Follower& lf_;
};

void synch_with_worker (Worker& worker)
{
  // This object is released by the worker thread after it has
  // executed the cmd
  Cond_Signal* cond = new Cond_Signal;
  {
    ACE_GUARD (TAO_SYNCH_MUTEX, guard, cond->lock ());
    worker.put (cond);
    ACE_Time_Value tv (1, 0);
    tv += ACE_OS::gettimeofday ();
    TEST_ASSERT ((cond->cond ().wait (&tv) == 0));
  }
}

// 1 - Simple event loop thread test
void Test_1 (TAO_ORB_Core* orb_core)
{
  TAO_LF_Strategy &lf_strategy = orb_core->lf_strategy ();
  TAO_Leader_Follower &leader_follower = orb_core->leader_follower ();
  TAO_ORB_Core_TSS_Resources* tss = orb_core->get_tss_resources ();

  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("==========\n")));
  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TEST #1 - Simple Event Loop call\n")));

  TSS_ASSERT (tss, leader_follower, 0, 0, false);

  std::unique_ptr<TAO_LF_Event_Loop_Thread_Helper>
    elt (new TAO_LF_Event_Loop_Thread_Helper(leader_follower,
                                             lf_strategy,
                                             0));
  TSS_ASSERT (tss, leader_follower, 1, 0, true);

  elt.reset (0);
  TSS_ASSERT (tss, leader_follower, 0, 0, false);
}

// 2 - Nested event loop threads - no set_upcall_thread call
void Test_2 (TAO_ORB_Core* orb_core)
{
  TAO_LF_Strategy &lf_strategy = orb_core->lf_strategy ();
  TAO_Leader_Follower &leader_follower = orb_core->leader_follower ();
  TAO_ORB_Core_TSS_Resources* tss = orb_core->get_tss_resources ();
  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("==========\n")));
  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TEST #2 - 2 nested Event Loop calls\n")));

  TSS_ASSERT (tss, leader_follower, 0, 0, false);

  std::unique_ptr<TAO_LF_Event_Loop_Thread_Helper>
    elt1 (new TAO_LF_Event_Loop_Thread_Helper(leader_follower,
                                              lf_strategy, 0));
  TSS_ASSERT (tss, leader_follower, 1, 0, true);

  std::unique_ptr<TAO_LF_Event_Loop_Thread_Helper>
    elt2 (new TAO_LF_Event_Loop_Thread_Helper(leader_follower,
                                              lf_strategy, 0));
  TSS_ASSERT (tss, leader_follower, 2, 0, true);

  elt2.reset (0);
  TSS_ASSERT (tss, leader_follower, 1, 0, true);

  elt1.reset (0);
  TSS_ASSERT (tss, leader_follower, 0, 0, false);
}

// 3 - Nested event loop threads - with set_upcall_thread call
void Test_3 (TAO_ORB_Core* orb_core)
{
  TAO_LF_Strategy &lf_strategy = orb_core->lf_strategy ();
  TAO_Leader_Follower &leader_follower = orb_core->leader_follower ();
  TAO_ORB_Core_TSS_Resources* tss = orb_core->get_tss_resources ();
  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("==========\n")));
  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TEST #3 - 2 nested Event Loop calls with set_upcall_thread\n")));

  TSS_ASSERT (tss, leader_follower, 0, 0, false);

  std::unique_ptr<TAO_LF_Event_Loop_Thread_Helper>
    elt1 (new TAO_LF_Event_Loop_Thread_Helper(leader_follower,
                                              lf_strategy, 0));
  TSS_ASSERT (tss, leader_follower, 1, 0, true);

  leader_follower.set_upcall_thread ();
  TSS_ASSERT (tss, leader_follower, 0, 0, false);

  std::unique_ptr<TAO_LF_Event_Loop_Thread_Helper>
    elt2 (new TAO_LF_Event_Loop_Thread_Helper(leader_follower,
                                              lf_strategy, 0));
  TSS_ASSERT (tss, leader_follower, 1, 0, true);

  elt2.reset (0);
  TSS_ASSERT (tss, leader_follower, 0, 0, false);

  elt1.reset (0);
  TSS_ASSERT (tss, leader_follower, 0, 0, false);
}

// 4 - client leader thread
void Test_4 (TAO_ORB_Core* orb_core)
{
  TAO_Leader_Follower &leader_follower = orb_core->leader_follower ();
  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("==========\n")));
  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TEST #4 - Simple Client Leader thread\n")));

  // Activate a thread
  Worker wrk1;
  wrk1.activate ();

  // Test initial conditions
  wrk1.put (new TSS_Assert(orb_core, 0, 0, false));

  // Have the thread wait on an event
  Test_LF_Event event;
  Test_Transport transport (0, orb_core);
  wrk1.put (new Wait_For_Event(event, transport, leader_follower));

  // The thread is still waiting on the event and thus should
  // now be a client-leader thread
  wrk1.put (new TSS_Assert(orb_core, 0, 1, true));

  // Synchronise with the thread before we complete its event
  synch_with_worker (wrk1);
  // Complete the event
  event.complete_event (leader_follower);

  // The thread is still inside handle_events - shutdown the
  // event processing
  wrk1.put (new Shutdown);

  // The thread should now return from being a client leader
  wrk1.put (new TSS_Assert(orb_core, 0, 0, false));

  wrk1.put (new Shutdown);
  wrk1.wait ();
}

// 5 - nested client leader thread
void Test_5 (TAO_ORB_Core* orb_core)
{
  TAO_Leader_Follower &leader_follower = orb_core->leader_follower ();
  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("==========\n")));
  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TEST #5 - 2 nested Client Leader calls\n")));

  // Activate a thread
  Worker wrk1;
  wrk1.activate ();

  // Test initial conditions
  wrk1.put (new TSS_Assert(orb_core, 0, 0, false));

  // Have the thread wait on an event
  Test_LF_Event event;
  Test_Transport transport (0, orb_core);
  wrk1.put (new Wait_For_Event(event, transport, leader_follower));

  // The thread is still waiting on the event and thus should
  // now be a client-leader thread
  wrk1.put (new TSS_Assert(orb_core, 0, 1, true));

  // Wait for another event
  Test_LF_Event event2;
  wrk1.put (new Wait_For_Event(event2, transport, leader_follower));

  // The thread is still waiting on the event and thus should
  // now be a client-leader thread
  wrk1.put (new TSS_Assert(orb_core, 0, 2, true));

  // Synchronise with the thread before we complete its event
  synch_with_worker (wrk1);

  // Complete the first event - nothing should happen
  event.complete_event (leader_follower);

  wrk1.put (new TSS_Assert(orb_core, 0, 2, true));

  // Complete the second event - everything should unwind
  synch_with_worker (wrk1);
  event2.complete_event (leader_follower);

  // The thread is still inside handle_events - shutdown the
  // event processing for the inner client leader
  wrk1.put (new Shutdown);

  wrk1.put (new TSS_Assert(orb_core, 0, 1, true));

  // The thread is now in the handle_events for the outter
  // client-leader - the event is already complete so just
  // shutdown the cmd processing.
  wrk1.put (new Shutdown);

  // We should now we back at our initial state.
  wrk1.put (new TSS_Assert(orb_core, 0, 0, false));

  wrk1.put (new Shutdown);
  wrk1.wait ();
}

// 6 - nested client leader thread with set_upcall_thread
void Test_6 (TAO_ORB_Core* orb_core)
{
  TAO_Leader_Follower &leader_follower = orb_core->leader_follower ();
  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("==========\n")));
  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TEST #6 - 2 nested Client Leader calls with set_upcall_thread\n")));

  // Activate a thread
  Worker wrk1;
  wrk1.activate ();

  // Test initial conditions
  wrk1.put (new TSS_Assert(orb_core, 0, 0, false));

  // Have the thread wait on an event
  Test_LF_Event event;
  Test_Transport transport (0, orb_core);
  wrk1.put (new Wait_For_Event(event, transport, leader_follower));

  // The thread is still waiting on the event and thus should
  // now be a client-leader thread
  wrk1.put (new TSS_Assert(orb_core, 0, 1, true));

  // Call set_upcall_thread
  wrk1.put (new Set_Upcall_Thread (leader_follower));
  wrk1.put (new TSS_Assert(orb_core, 0, 0, false));

  // Wait for another event
  Test_LF_Event event2;
  wrk1.put (new Wait_For_Event(event2, transport, leader_follower));

  // The thread is still waiting on the event and thus should
  // now be a client-leader thread
  wrk1.put (new TSS_Assert(orb_core, 0, 1, true));

  // Synchronise with the thread before we complete its event
  synch_with_worker (wrk1);

  // Complete the first event - nothing should happen
  event.complete_event (leader_follower);

  wrk1.put (new TSS_Assert(orb_core, 0, 1, true));

  // Complete the second event - everything should unwind
  synch_with_worker (wrk1);
  event2.complete_event (leader_follower);

  // The thread is still inside handle_events - shutdown the
  // event processing for the inner client leader
  wrk1.put (new Shutdown);

  wrk1.put (new TSS_Assert(orb_core, 0, 0, false));

  // The thread is now in the handle_events for the outter
  // client-leader - the event is already complete so just
  // shutdown the cmd processing.
  wrk1.put (new Shutdown);

  // We should now we back at our initial state.
  wrk1.put (new TSS_Assert(orb_core, 0, 0, false));

  wrk1.put (new Shutdown);
  wrk1.wait ();
}

// 7 - 2 client leader threads with set_upcall_thread
void Test_7 (TAO_ORB_Core* orb_core)
{
  TAO_Leader_Follower &leader_follower = orb_core->leader_follower ();
  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("==========\n")));
  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TEST #7 - Client Leader yields to another client thread\n")));

  // Activate a thread
  Worker wrk1;
  wrk1.activate ();

  // Test initial conditions
  wrk1.put (new TSS_Assert(orb_core, 0, 0, false));

  // Have the thread wait on an event
  Test_LF_Event event;
  Test_Transport transport (0, orb_core);
  wrk1.put (new Wait_For_Event(event, transport, leader_follower));

  // The thread is still waiting on the event and thus should
  // now be a client-leader thread
  wrk1.put (new TSS_Assert(orb_core, 0, 1, true));
  // Wait for the firs thread to be the client leader before we start
  // the second thread
  synch_with_worker (wrk1);

  // Create another worker and have it do the same
  Worker wrk2;
  wrk2.activate ();
  wrk2.put (new TSS_Assert(orb_core, 0, 0, true));
  // Make sure this test is complete before the Set_Upcall_Thread below
  synch_with_worker (wrk2);
  Test_LF_Event event2;
  wrk2.put (new Wait_For_Event(event2, transport, leader_follower));
  // Note, we can't test the new thread here - it is block waiting on
  // the follower cond var
  // wrk2.put (new TSS_Assert(orb_core, 0, 1, true));

  // Call set_upcall_thread on the first thread
  wrk1.put (new Set_Upcall_Thread (leader_follower));
  // Our second thread should now be the client leader and the first
  // thread should not.  Note, we need to first synchronise with
  // thread 2 (to make sure it is in handle_events) to avoid race
  // conditions.
  synch_with_worker (wrk2);
  wrk2.put (new TSS_Assert(orb_core, 0, 1, true));
  wrk1.put (new TSS_Assert(orb_core, 0, 0, true));

  // We should now be able to shutdown the first thread - have it
  // return from handle_events and complete its event.  If it has to
  // wait it will just go back to being a follower
  wrk1.put (new Shutdown);
  event.complete_event (leader_follower);
  synch_with_worker (wrk1);
  wrk1.put (new TSS_Assert(orb_core, 0, 0, true));
  // We can now shut-down the thread
  wrk1.put (new Shutdown);
  wrk1.wait ();

  // Now shut-down the second thread
  event2.complete_event (leader_follower);
  wrk2.put (new Shutdown);
  synch_with_worker (wrk2);
  wrk2.put (new TSS_Assert(orb_core, 0, 0, false));
  wrk2.put (new Shutdown);
  wrk2.wait ();
}

// 8 - client becomes leader when event loop thread dispatched
void Test_8 (TAO_ORB_Core* orb_core)
{
  TAO_LF_Strategy &lf_strategy = orb_core->lf_strategy ();
  TAO_Leader_Follower &leader_follower = orb_core->leader_follower ();
  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("==========\n")));
  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TEST #8 - client becomes leader when event thread dispatched\n")));

  // Activate a thread
  Worker wrk1;
  wrk1.activate ();

  // Test initial conditions
  wrk1.put (new TSS_Assert(orb_core, 0, 0, false));

  // Have the thread become an event loop thread
  wrk1.put (new Event_Loop_Thread (leader_follower,
                                   lf_strategy));
  wrk1.put (new TSS_Assert(orb_core, 1, 0, true));

  // Before we start the next thread synchronise with the first
  synch_with_worker (wrk1);

  // Start another thread and have it wait on an event
  Worker wrk2;
  wrk2.activate ();
  wrk2.put (new TSS_Assert(orb_core, 0, 0, true));
  Test_LF_Event event;
  Test_Transport transport (0, orb_core);
  synch_with_worker (wrk2);
  wrk2.put (new Wait_For_Event(event, transport, leader_follower));

  // The new thread is a follower and thus is waiting on the follower
  // cond var - we can't test this other than to check if the leader
  // follower has clients, however, because we can't synchronise with
  // that thread such a test would contain a race condition.

  // Now dispatch the event loop thread by having it call set_upcall_thread
  wrk1.put (new Set_Upcall_Thread (leader_follower));

  // the first worker should have given up leadership and the second
  // thread should have assumed leadership.  We have to synchronise
  // with both threads before we can test anything, otherwise we could
  // catch the window where there is no leader.
  synch_with_worker (wrk1);
  synch_with_worker (wrk2);
  wrk1.put (new TSS_Assert (orb_core, 0, 0, true));
  wrk2.put (new TSS_Assert (orb_core, 0, 1, true));

  // OK, now shut everything down - first the event loop thread
  wrk1.put (new Shutdown);
  wrk1.put (new TSS_Assert (orb_core, 0, 0, true));
  wrk1.put (new Shutdown);
  wrk1.wait ();

  // Now the client thread
  wrk2.put (new TSS_Assert (orb_core, 0, 1, true));
  synch_with_worker (wrk2);
  event.complete_event (leader_follower);
  wrk2.put (new Shutdown);
  wrk2.put (new TSS_Assert (orb_core, 0, 0, false));
  wrk2.put (new Shutdown);
  wrk2.wait ();
}

// 9 - client leader thread then event loop thread
void Test_9 (TAO_ORB_Core* orb_core)
{
  TAO_LF_Strategy &lf_strategy = orb_core->lf_strategy ();
  TAO_Leader_Follower &leader_follower = orb_core->leader_follower ();
  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("==========\n")));
  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TEST #9 - Client Leader thread yields to Event Loop thread\n")));

  // Activate a thread
  Worker wrk1;
  wrk1.activate ();

  // Test initial conditions
  wrk1.put (new TSS_Assert(orb_core, 0, 0, false));

  // Have the thread wait on an event
  Test_LF_Event event;
  Test_Transport transport (0, orb_core);
  wrk1.put (new Wait_For_Event(event, transport, leader_follower));

  // The thread is still waiting on the event and thus should
  // now be a client-leader thread
  wrk1.put (new TSS_Assert(orb_core, 0, 1, true));
  // We now need to synchronise with the worker, to make sure it
  // has processed all the cmds we have sent it
  synch_with_worker (wrk1);

  // create an event loop thread - this means a new worker
  Worker wrk2;
  wrk2.activate ();
  wrk2.put (new Event_Loop_Thread (leader_follower,
                                   lf_strategy));

  // Unfortunately there is no way to test if the event loop thread is
  // where we expect it to be (the
  // wait_for_client_leader_to_complete() method).  The only thing we
  // could check is the event_loop_threads_waiting_ count, however,
  // that is private to the TAO_Leader_Follower class.

  // We need to get the client leader thread to return from
  // process_cmd() and allow it to surrender leadership to the waiting
  // event loop thread - send it a shutdown.  The TAO_Leader_Follower
  // code may call handle_events a few more times, however, since the
  // cmd processing is shutdown (and won't be reactivated until the
  // event is complete) handle_events will just return.
  wrk1.put (new Shutdown);

  // Now test the new event loop thread
  wrk2.put (new TSS_Assert(orb_core, 1, 0, true));
  // Wait until the event loop thread is running before we test
  // the client thread
  synch_with_worker (wrk2);

  // We can't test the client thread either - it is blocked in a call
  // to the event's cond var's wait() method.  All we can do is
  // complete the event, which will signal the cond var
  event.complete_event (leader_follower);

  // The client thread should return from wait_for_event
  wrk1.put (new TSS_Assert(orb_core, 0, 0, true));
  // And the event loop thread should still be 'running'
  wrk2.put (new TSS_Assert(orb_core, 1, 0, true));

  // Some other misc checks
  synch_with_worker (wrk1);
  TEST_ASSERT ((leader_follower.has_clients () == 0));

  // OK, lets shut everything down now - the event loop thread
  // requires two shutdown cmds, one to exit the event loop thread cmd
  // and the second to exit the main cmd processor
  wrk2.put (new Shutdown);
  // Incidently there is now no leader
  wrk2.put (new TSS_Assert(orb_core, 0, 0, false));
  wrk2.put (new Shutdown);
  wrk2.wait ();

  // Shutdown the other worker
  wrk1.put (new Shutdown);
  wrk1.wait ();
}

// 10 - ET1437460 (second problem)
void Test_10 (TAO_ORB_Core* orb_core )
{
  TAO_LF_Strategy &lf_strategy = orb_core->lf_strategy ();
  TAO_Leader_Follower &leader_follower = orb_core->leader_follower ();
  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("==========\n")));
  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TEST #10 - ET1437460\n")));

  // This scenario involves:
  //  - an event loop thread
  //  - which calls set_upcall_thread
  //  - then becomes a client leader
  //  - is dispatched and then becomes a client leader again
  //  (without calling set_upcall_thread)
  //  - calls set_upcall_thread
  //  - unwinds

  // Originally this caused the leaders_ member to get set to -1
  // (the inner client leader still decremented leaders_ even
  // though set_upcall_thread was called)

  // Activate a thread
  Worker wrk1;
  wrk1.activate ();

  // Test initial conditions
  wrk1.put (new TSS_Assert(orb_core, 0, 0, false));

  // Have the thread become an event loop thread
  wrk1.put (new Event_Loop_Thread (leader_follower,
                                   lf_strategy));

  // The thread should be an event loop thread
  wrk1.put (new TSS_Assert(orb_core, 1, 0, true));

  // call set_upcall_thread
  wrk1.put (new Set_Upcall_Thread (leader_follower));

  // The thread should no longer be an event loop thread
  wrk1.put (new TSS_Assert(orb_core, 0, 0, false));

  // Have the thread wait on an event
  Test_LF_Event event;
  Test_Transport transport (0, orb_core);
  wrk1.put (new Wait_For_Event(event, transport, leader_follower));

  // The thread is still waiting on the event and thus should
  // now be a client-leader thread
  wrk1.put (new TSS_Assert(orb_core, 0, 1, true));

  // Have the thread wait on another event
  Test_LF_Event event2;
  wrk1.put (new Wait_For_Event(event2, transport, leader_follower));

  // The thread is still waiting on the event and thus should now be a
  // client-leader thread (again)
  wrk1.put (new TSS_Assert(orb_core, 0, 2, true));

  // Call set_upcall_thread
  wrk1.put (new Set_Upcall_Thread(leader_follower));

  // We now need to synchronise with the worker, to make sure it
  // has processed all the cmds we have sent it
  synch_with_worker (wrk1);

  // Now, complete the events, and then shutdown the cmd event loop
  event.complete_event (leader_follower);
  event2.complete_event (leader_follower);
  wrk1.put (new Shutdown);

  // The inner client has returned
  wrk1.put (new TSS_Assert(orb_core, 0, 1, true));

  // Shutdown the outter client thread
  wrk1.put (new Shutdown);

  // We should be back to the initial state
  wrk1.put (new TSS_Assert(orb_core, 0, 0, false));

  // Now shutdown the event loop thread
  wrk1.put (new Shutdown);
  wrk1.put (new TSS_Assert(orb_core, 0, 0, false));

  // Shutdown the other worker
  wrk1.put (new Shutdown);
  wrk1.wait ();
}


int
ACE_TMAIN(int argc, ACE_TCHAR *argv[])
{
  // scope TSS holder within main scope
  // so we're certain it gets destroyed before the
  // ACE object manager
  ACE_TSS<Worker> workers_;
  // provide global access
  workers_p = &workers_;

  try
    {
      CORBA::ORB_var orb = CORBA::ORB_init (argc, argv);
      if (parse_args (argc, argv) != 0)
        return 1;

      // Make sure the reactor is initialised in the leader_follower
      ACE_Reactor* reactor = orb->orb_core ()->leader_follower ().reactor ();
      TEST_ASSERT ((reactor != 0));

      // Ready to go
      Test_1 (orb->orb_core ());
      Test_2 (orb->orb_core ());
      Test_3 (orb->orb_core ());
      Test_4 (orb->orb_core ());
      Test_5 (orb->orb_core ());
      Test_6 (orb->orb_core ());
      Test_7 (orb->orb_core ());
      Test_8 (orb->orb_core ());
      Test_9 (orb->orb_core ());
      Test_10 (orb->orb_core ());

      ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Testing complete.\n")));

      // cleanup
      orb->destroy ();
    }
  catch (const CORBA::Exception& ex)
    {
      ex._tao_print_exception ("Exception caught:");
      return 1;
    }

  return 0;
}

#else

int
ACE_TMAIN(int /*argc*/, ACE_TCHAR * /*argv*/ [])
{
  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("This test only makes sense in an MT build.\n")));

  return 0;
}

#endif // !ACE_HAS_THREADS

// ****************************************************************