summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/examples/Notify/ThreadPool/Supplier.cpp
blob: d58430352d45d83494dc42591783882eeb20692f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
#include "Supplier.h"



#include "tao/ORB_Core.h"

TAO_Notify_ThreadPool_Supplier::TAO_Notify_ThreadPool_Supplier  (TAO_Notify_ORB_Objects& orb_objects)
  : orb_objects_ (orb_objects)
    , proxy_consumer_id_ (0)
    , expected_consumer_count_ (2)
    , consumers_connected_ (lock_)
    , consumer_count_ (0)
    , max_events_ (10)
    , proxy_consumer_thread_count_ (0)
{
}

TAO_Notify_ThreadPool_Supplier::~TAO_Notify_ThreadPool_Supplier ()
{
}

void
TAO_Notify_ThreadPool_Supplier::init (CosNotifyChannelAdmin::SupplierAdmin_var& admin, int expected_consumer_count ,int max_events,
                       int proxy_consumer_thread_count)
{
  // First initialize the class members.
  this->admin_ = admin;
  this->expected_consumer_count_ = expected_consumer_count;
  this->max_events_ = max_events;
  this->proxy_consumer_thread_count_ = proxy_consumer_thread_count;

  this->connect ();
}

void
TAO_Notify_ThreadPool_Supplier::run (void)
{
  {
    ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_);

    ACE_DEBUG ((LM_DEBUG, "(%P, %t) Waiting for %d consumers to connect...\n", this->expected_consumer_count_-1));

    // Wait till the consumers are ready to go.
    while (this->consumer_count_ != this->expected_consumer_count_)
      this->consumers_connected_.wait ();
  }

  ACE_DEBUG ((LM_DEBUG,
              "(%P, %t) Supplier is sending an events...\n"));

  // Send events to each consumer.
  for (int i = 0; i < this->max_events_; ++i)
    {
      for (int j = 0; j < this->expected_consumer_count_; ++j)
        {
          // send the event
          this->send_event (this->event_[j]);
        }
    }

  // Disconnect from the EC
  this->disconnect ();

  // Deactivate this object.
  this->deactivate ();

  // we're done. shutdown the ORB to exit the process.
  this->orb_objects_.orb_->shutdown (1);
}

void
TAO_Notify_ThreadPool_Supplier::connect (void)
{
  // Activate the supplier object.
  CosNotifyComm::StructuredPushSupplier_var objref = this->_this ();

  CosNotifyChannelAdmin::ProxyConsumer_var proxyconsumer;

  if (this->proxy_consumer_thread_count_ != 0)
    {
      // Narrow to the extended interface.
      NotifyExt::SupplierAdmin_var admin_ext = NotifyExt::SupplierAdmin::_narrow (this->admin_.in ());

      NotifyExt::ThreadPoolParams tp_params = { NotifyExt::CLIENT_PROPAGATED, 0,
                                                0, static_cast<CORBA::ULong> (this->proxy_consumer_thread_count_),
                                                0, 0, 0, 0, 0 };

      CosNotification::QoSProperties qos (1);
      qos.length (1);
      qos[0].name = CORBA::string_dup (NotifyExt::ThreadPool);
      qos[0].value <<= tp_params;

      // Obtain the proxy. The QoS is applied to the POA in which the Proxy is hosted.
      proxyconsumer = admin_ext->obtain_notification_push_consumer_with_qos (CosNotifyChannelAdmin::STRUCTURED_EVENT
                                                                                   , proxy_consumer_id_, qos);
    }
  else
    {
      // Obtain the proxy.
      proxyconsumer = this->admin_->obtain_notification_push_consumer (CosNotifyChannelAdmin::STRUCTURED_EVENT
                                                                       , proxy_consumer_id_);
    }

  ACE_ASSERT (!CORBA::is_nil (proxyconsumer.in ()));

  // narrow
  this->proxy_consumer_ =
    CosNotifyChannelAdmin::StructuredProxyPushConsumer::_narrow (proxyconsumer.in ());

  ACE_ASSERT (!CORBA::is_nil (proxy_consumer_.in ()));

  // connect to the proxyconsumer.
  proxy_consumer_->connect_structured_push_supplier (objref.in ());

  ACE_DEBUG ((LM_DEBUG, "(%P,%t) Created Supplier %d with %d threads at the ProxyConsumer\n", proxy_consumer_id_,
              this->proxy_consumer_thread_count_));
}

void
TAO_Notify_ThreadPool_Supplier::disconnect (void)
{
  ACE_ASSERT (!CORBA::is_nil (this->proxy_consumer_.in ()));

  this->proxy_consumer_->disconnect_structured_push_consumer();
}

void
TAO_Notify_ThreadPool_Supplier::deactivate (void)
{
  PortableServer::POA_var poa (this->_default_POA ());

  PortableServer::ObjectId_var id (poa->servant_to_id (this));

  poa->deactivate_object (id.in());
}

void
TAO_Notify_ThreadPool_Supplier::subscription_change (const CosNotification::EventTypeSeq & added,
                                      const CosNotification::EventTypeSeq & /*removed */)
{
  ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_);

  // Count the number of consumers connect and signal the supplier thread when the expected count have connected.
  // Only 1 consumer connects at a time.
  if (added.length () > 0)
    {
      // Set the domain and type nams in the event's fixed header.
      this->event_[consumer_count_].header.fixed_header.event_type.domain_name = CORBA::string_dup(added[0].domain_name);
      this->event_[consumer_count_].header.fixed_header.event_type.type_name = CORBA::string_dup(added[0].type_name);

      ++this->consumer_count_;

      ACE_DEBUG ((LM_DEBUG, "(%P,%t) Received Type %d: (%s)\n", this->consumer_count_, added[0].type_name.in ()));

      if (this->consumer_count_ == this->expected_consumer_count_)
        this->consumers_connected_.signal ();
    }
}

void
TAO_Notify_ThreadPool_Supplier::send_event (const CosNotification::StructuredEvent& event)
{
  ACE_ASSERT (!CORBA::is_nil (this->proxy_consumer_.in ()));

  proxy_consumer_->push_structured_event (event);
}

void
TAO_Notify_ThreadPool_Supplier::disconnect_structured_push_supplier (void)
{
  this->deactivate ();
}