summaryrefslogtreecommitdiff
path: root/trunk/TAO/docs/tutorials/Quoter/RTCORBA/docs/distributor.html
blob: 8738fbe78a02f33d752455490cedae44129667e0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
<!-- $Id$ -->

<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
<html>
<head>
  <title>Stock Distributor implementation for the Stock Quoter Publisher/Subscriber Real-time CORBA Service</title>
</head>

<body
 text = "#000000"
 link = "#000fff"
 vLink= "#ff0f0f"
 aLink = "#0000ff"
 bgColor = "#ffffff">

<hr><h2>Stock Distributor implementation for the Stock Quoter Publisher/Subscriber Real-time CORBA Service</h2><hr>

<h3>Implementation of StockQuoter interface</h3>

This interface is used by brokers to get detailed stock information. <P>

The Stock_StockQuoter_i class is generated automatically by the IDL compiler (using the -GI flag),
which is a subclass of POA_Stock::StockQuoter class. <P>

<h4>Implementing the get_stock_info () member function</h4>

<li>Call the get_stock_info () function of Stock_Database instance.</li>

<PRE>
    Stock::StockInfo *stock = STOCK_DATABASE->get_stock_info (stock_name);
</PRE>

<hr><h3>Implementation of StockDistributorHome interface</h3>

This interface is used to register the necessary factories and mappings with the specified orb and 
create StockDistributor objects. <P>

The Stock_StockDistributorHome_i class is generated automatically by the IDL compiler (using the -GI flag),
which is a subclass of POA_Stock::StockDistributorHome class. <P>

This class is also a subclass of ACE_Event_Handler, which can be used as an event handler. <P>

<h4>Implementing the Constructor</h4>

The main steps of this function are described as follows: <P>

<li>Register this class as an event handler with the ORB to catch ctrl-c from the command line. </li>

<PRE>
    if (orb_->orb_core ()->reactor ()->register_handler (SIGINT, this) == -1)
      ACE_DEBUG ((LM_DEBUG, "ERROR: Failed to register as a signal handler: %p\n", 
                  "register_handler\n"));
</PRE>

<li>Register the necessary factories and mappings with the specified orb.</li>

<PRE>
    Stock::StockName_init *stockname_factory = new Stock::StockName_init;
    orb->register_value_factory (stockname_factory->tao_repository_id (),
                                 stockname_factory
                                 ACE_ENV_ARG_PARAMETER);

    Stock::Cookie_init *cookie_factory = new Stock::Cookie_init;
    orb->register_value_factory (cookie_factory->tao_repository_id (),
                                 cookie_factory
                                 ACE_ENV_ARG_PARAMETER);

    Stock_PriorityMapping::register_mapping (orb);
</PRE>

<li>Initialize the Stock database.</li>

<PRE>
    STOCK_DATABASE->activate (THR_NEW_LWP | THR_JOINABLE, 1);
</PRE>

<li>Create a CORBA::PolicyList for the child POA.</li>

This step include several sub-stpes:
<OL>
<li>    Get a reference to the RTORB.</li>
<li>    Initialize a CORBA::PolicyList object.</li>
<li>    Create a SERVER_DECLARED priority model policy and add it into the former CORBA::PolicyList object.</li>
<li>    Create a threadpool with lanes for the distributor. Since the brokers have various priorities,
create a lane for each priority.</li>
<li>    Create a thread pool policy using the former threadpool and add it into the former CORBA::PolicyList object.</li>
<li>    Create a child POA using the former CORBA::PolicyList and narrow it to RTPOA.</li>
</OL>

<PRE>
    CORBA::Object_var obj = orb_->resolve_initial_references ("RTORB");
    RTCORBA::RTORB_var rt_orb = RTCORBA::RTORB::_narrow (obj.in ());
  
    TAO::Utils::PolicyList_Destroyer policies (2);
    policies.length (2);
  
    policies[0] =
      rt_orb->create_priority_model_policy (RTCORBA::SERVER_DECLARED,
                                            Stock::Priority_Mapping::VERY_LOW);
  
    RTCORBA::ThreadpoolLanes lanes (5); lanes.length (5);
  
    for (CORBA::ULong i = 0; i < lanes.length (); ++i) 
      {
        lanes[i].lane_priority = static_cast<RTCORBA::Priority> (i);
        lanes[i].static_threads = 5;
        lanes[i].dynamic_threads = 0;
      }
  
    RTCORBA::ThreadpoolId threadpool_id = 
      rt_orb->create_threadpool_with_lanes (1024*1024,
                                            lanes,
                                            false, false, 0, 0);
  
    policies[1] = rt_orb->create_threadpool_policy (threadpool_id);

    PortableServer::POA_var poa = this->_default_POA ();
    PortableServer::POAManager_var poa_mgr = poa->the_POAManager ();
  
    PortableServer::POA_var child_poa =
      poa->create_POA ("StockDistributor_POA", 
                       poa_mgr.in (),
                       policies);
  
    this->rt_poa_ = RTPortableServer::POA::_narrow (child_poa.in ());
</PRE>

<li>Create the initial distributor reference. </li>

<PRE>
    this->create_distributor ();
</PRE>

<h4>Implementing the create_distributor () member function</h4>

<li>Create a new instance of the StockDistributor_i. Then activate it under the located POA. 
This will cause the object to have the CLIENT_PROPAGATED policies. </li>

<PRE>
    StockDistributor_i *servant = new StockDistributor_i (this->rt_poa_.in ());
    PortableServer::ServantBase_var distributor_owner_transfer = servant;
    this->dist_id_ = this->rt_poa_->activate_object (servant);
</PRE>

<h4>Implementing the create () member function</h4>

<li>Get and narrow the StockDistributor object reference and return it. </li>

<PRE>
    obj = this->rt_poa_->id_to_reference (this->dist_id_.in ());
    return Stock::StockDistributor::_narrow (obj.in ());
</PRE>

<h4>Implementing the handle_signal () member function</h4>

<li>Get and shutdown the distributor object and the orb used by it. </li>

<PRE>
    CORBA::Object_var obj (this->rt_poa_->id_to_reference (this->dist_id_.in ()));
    Stock::StockDistributor_var dist (Stock::StockDistributor::_narrow (obj.in ()));
    dist->shutdown ();

    this->orb_->shutdown (true);
</PRE>

<hr><h3>Implementation of StockDistributor interface</h3>

This interface is used for Stock Distributor server. <P>
The Stock_StockDistributor_i class is generated automatically by the IDL compiler (using the -GI flag),
which is a subclass of POA_Stock::StockDistributor class. <P>

<h4>Implementing the Constructor</h4>

<PRE>
    StockDistributor_i::StockDistributor_i (RTPortableServer::POA_ptr poa)
      : rate_ (3), // Default is 3 seconds (3000 milliseconds).
        active_ (false),
        rt_poa_ (RTPortableServer::POA::_duplicate (poa)),
        orb_ (CORBA::ORB::_duplicate (poa->_get_orb ()))
</PRE>

The "rate_" and "active_" are two private members of the Stock_StockDistributor_i class. They stand 
for the notification rate and the active state of the StockDistributor object. <P>

<h4>Implementing the subscribe_notifier () member function</h4>

The main steps of this function are described as follows: <P>

<li>Get the write thread mutex of the subscriber map.</li> <P>

<li>Generate an unique id for the cookie and use it to create a new Cookie object.</li> <P>

<li>Insert the StockNameConsumer object and its priority into the StockDistributor object's subscriber map, 
using the cookie id as the key.</li> <P>

<li>Return the Cookie object.</li> <P>

<h4>Implementing the unsubscribe_notifier () member function</h4>

The main steps of this function are described as follows: <P>

<li>Check the value of the Cookie object.</li> <P>

<li>Get the write thread mutex of the subscriber map.</li> <P>
  
<li>Search for the Cookie object in the argument in the StockDistributor object's subscriber map.</li> <P>

<li>Erase the Cookie object from the StockDistributor object's subscribers' map.</li> <P>

<li>Return the StockNameConsumer object which is related to the erased Cookie object.</li> <P>

<h4>Implementing the provide_quoter_info () member function</h4>

<li>Get the read thread mutex of the subscriber map.</li> <P>
  
<li>Search for the Cookie object in the argument in the StockDistributor object's subscriber map.</li> <P>

<PRE>
    CookieMap::const_iterator iter = this->subscribers_list_.find (ck->cookie_id ());
</PRE>

<li>Create a new instance of the StockQuoter object and activate it under the former RTPOA with the given priority.</li> <P>

<PRE>
    Stock_StockQuoter_i *quoter = new Stock_StockQuoter_i;
    PortableServer::ServantBase_var owner_transfer (quoter);
  
    PortableServer::ObjectId *oid = 
      this->rt_poa_->activate_object_with_priority (quoter,
                                                    iter->second.second);
  
    CORBA::Object_var obj = rt_poa_->id_to_reference (*oid);
    Stock::StockQuoter_var quoter_var = ::Stock::StockQuoter::_narrow (obj.in ());
</PRE>

<li>Return the StockQuoter object reference.</li> <P>

<h4>Implementing the start () member function</h4>

Set the active state of the StockDistributor object to true and create a thread to publish the stock information
to the Stock Broker clients. <P>

The main steps of this thread function are described as follows:  <P>
While the state of the StockDistributor object is active, continue the following looping:

<li>Get the read thread mutex of the subscriber map.</li>

<li>Publish the stock information to all the StockNameConsumer objects that have subscribed.</li>
<PRE>
    for (Stock_StockDistributor_i::CookieMap::iterator iter = this->subscribers_list_.begin ();
	       iter != this->subscribers_list_.end ();
	       ++iter)
      {
        try
          {
            // Set the designated priority for current request.
            CORBA::Object_var obj = orb_->resolve_initial_references ("RTCurrent");
            RTCORBA::Current_var rt_current =
              RTCORBA::Current::_narrow (obj);
              
            rt_current->the_priority (iter->second.second);

            // Tell the database to push its information to the
            // <consumer>, which passes along the CORBA priority
            // in the service_context list of the GIOP message. 
            STOCK_DATABASE->publish_stock_info (iter->second.first);
          }
        catch (CORBA::Exception &ex)
          {
            ACE_PRINT_EXCEPTION (ex, "Stock_StockDistributor_i::svc: ");
          }
      }
</PRE>

<li>Sleep for the specified amount of seconds given in the notification rate.</li>

<h4>Implementing the stop () member function</h4>

<li>Set the active state of StockDistributor object to false.</li>

<h4>Implementing the shutdown () member function</h4>

<li>Stop publishing events.</li>

<PRE>
    this->stop ();
</PRE>

<li>Deactivate the StockDistributor object.</li>

<PRE>  
    ::Stock::StockDistributor_var dist = this->_this ();
    PortableServer::ObjectId_var oid = this->rt_poa_->reference_to_id (dist.in ());
  
    this->rt_poa_->deactivate_object (oid.in ());
</PRE>

<hr><b>Email: </b><a href="mailto:"</a<ADDRESS>shanshan.jiang@vanderbilt.edu</ADDRESS>

</body>

</html>