summaryrefslogtreecommitdiff
path: root/ACE/TAO/tao/RTCORBA/RT_Invocation_Endpoint_Selectors.cpp
blob: 3b556b2816af6ad9e4ba2304f0313113c5820510 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
#include "tao/RTCORBA/RT_Invocation_Endpoint_Selectors.h"

#if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0

#include "tao/RTCORBA/RT_Policy_i.h"
#include "tao/RTCORBA/RT_Stub.h"
#include "tao/RTCORBA/RT_Transport_Descriptor.h"
#include "tao/RTCORBA/RT_Transport_Descriptor_Property.h"
#include "tao/RTCORBA/RT_Endpoint_Utils.h"
#include "tao/RTCORBA/RT_Protocols_Hooks.h"
#include "tao/Stub.h"
#include "tao/ORB_Core.h"
#include "tao/Profile.h"
#include "tao/Endpoint.h"
#include "tao/debug.h"
#include "tao/Profile.h"
#include "tao/Endpoint.h"
#include "tao/Profile_Transport_Resolver.h"
#include "tao/ORB_Core.h"
#include "tao/SystemException.h"

ACE_RCSID (RTCORBA,
           RT_Invocation_Endpoint_Selectors,
           "$Id$")

TAO_BEGIN_VERSIONED_NAMESPACE_DECL

void
TAO_RT_Invocation_Endpoint_Selector::select_endpoint (
    TAO::Profile_Transport_Resolver *r,
    ACE_Time_Value *val)
{
  if (r == 0)
    throw ::CORBA::INTERNAL ();

  CORBA::Policy_var client_protocol_policy_base =
    TAO_RT_Endpoint_Utils::policy (TAO_CACHED_POLICY_RT_CLIENT_PROTOCOL, *r);

  if (CORBA::is_nil(client_protocol_policy_base.in ()))
    {
      do
        {
          r->profile (r->stub ()->profile_in_use ());

          if (this->endpoint_from_profile (*r, val) == 1)
            return;
        }
      while (r->stub ()->next_profile_retry () != 0);

      // If we get here, we completely failed to find an endpoint selector
      // that we know how to use, so throw an exception.
    }
  else
    {
      RTCORBA::ClientProtocolPolicy_var client_protocol_policy =
        RTCORBA::ClientProtocolPolicy::_narrow (
          client_protocol_policy_base.in ());

      /// Cast to TAO_ClientProtocolPolicy
      TAO_ClientProtocolPolicy *tao_client_protocol_policy =
        static_cast<TAO_ClientProtocolPolicy *> (client_protocol_policy.in ());

      /// Get the ProtocolList
      RTCORBA::ProtocolList &client_protocols =
        tao_client_protocol_policy->protocols_rep ();

      this->select_endpoint_based_on_client_protocol_policy (
        *r,
        client_protocol_policy.in (),
        client_protocols,
        val);
    }
}

void
TAO_RT_Invocation_Endpoint_Selector::select_endpoint_based_on_client_protocol_policy (
    TAO::Profile_Transport_Resolver &r,
    RTCORBA::ClientProtocolPolicy_ptr client_protocol_policy,
    RTCORBA::ProtocolList &client_protocols,
    ACE_Time_Value *val)
{
  CORBA::Boolean valid_profile_found = false;

  // Even though cycling through all the protocols is the correct
  // things to do to find a match, starting from the start of the
  // profile list is not.  In addition, this code is also ignoring the
  // forwarded reference (if it exists).  This behavior is caused by
  // problems with the profile management in TAO which are documented
  // in bugzilla bugs 1237, 1238, and 1239.  Once the above problems
  // are fixed, this behavior should be fixed to do the right thing.
  for (CORBA::ULong protocol_index = 0;
       protocol_index < client_protocols.length ();
       ++protocol_index)
    {
      // Find the profiles that match the current protocol.
      TAO_Profile *profile = 0;
      TAO_MProfile &mprofile = r.stub ()->base_profiles ();

      for (TAO_PHandle i = 0;
           i < mprofile.profile_count ();
           ++i)
        {
          profile = mprofile.get_profile (i);

          if (profile->tag () == client_protocols[protocol_index].protocol_type)
            {
              valid_profile_found = true;

              r.profile (profile);

              if (this->endpoint_from_profile (r, val) == 1)
                return;
              // @@ Else we should check for potential forwarding here.
            }
        }
    }

  // We have tried all the profiles specified in the client protocol
  // policy with no success.  Throw exception.
  if (!valid_profile_found)
    {
      CORBA::PolicyList *p = r.inconsistent_policies ();
      if (p)
        {

          p->length (1);
          (*p)[0u] = CORBA::Policy::_duplicate (client_protocol_policy);
        }
      throw ::CORBA::INV_POLICY ();
    }

  // If we get here, we found at least one pertinent profile, but no
  // usable endpoints.
}

int
TAO_RT_Invocation_Endpoint_Selector::endpoint_from_profile (
    TAO::Profile_Transport_Resolver &r,
    ACE_Time_Value *val)
{
  // Narrow to the RT Stub.
  TAO_RT_Stub *rt_stub = dynamic_cast <TAO_RT_Stub *> (r.stub ());

  if (!rt_stub)
    throw CORBA::INTERNAL ();

  // Get the priority model policy.
  CORBA::Policy_var priority_model_policy =
    rt_stub->get_cached_policy (TAO_CACHED_POLICY_PRIORITY_MODEL);

  // Get the bands policy.
  CORBA::Policy_var bands_policy =
    TAO_RT_Endpoint_Utils::policy (
      TAO_CACHED_POLICY_RT_PRIORITY_BANDED_CONNECTION, r);

  bool all_endpoints_are_valid = false;
  bool match_priority = false;
  bool match_bands = false;
  CORBA::Short client_thread_priority = 0;
  CORBA::Short min_priority = 0;
  CORBA::Short max_priority = 0;

  // If the priority model policy is not set.
  if (CORBA::is_nil (priority_model_policy.in ()))
    {
      // Bands without priority model do not make sense.
      if (!CORBA::is_nil (bands_policy.in ()))
        {
          CORBA::PolicyList *p = r.inconsistent_policies ();
          if (p)
            {

              p->length (1);
              (*p)[0u] = CORBA::Policy::_duplicate (bands_policy.in ());
            }
          // Indicate error.
          throw ::CORBA::INV_POLICY ();
        }

      // No priority model policy (and no bands policy): all endpoints
      // are fair game.
      all_endpoints_are_valid = true;
    }
  // If the priority model policy is set.
  else
    {
      // Get the protocol hooks.
      TAO_Protocols_Hooks *protocol_hooks =
        r.stub ()->orb_core ()->get_protocols_hooks ();

      if (protocol_hooks != 0)
        {
          CORBA::Short server_priority = 0;
          CORBA::Boolean is_client_propagated = false;

          // Check the priority model policy to see if it is client
          // propagated.
          protocol_hooks->get_selector_hook (priority_model_policy.in (),
                                             is_client_propagated,
                                             server_priority);

          if (!is_client_propagated)
            {
              // Server declared: all endpoints are fair game.
              all_endpoints_are_valid = true;
            }
          // Client propagated.
          else
            {
              // Get client thread priority from 'Current' or if not set by implying one
              // from the native thread priority via the mapping.
              if (protocol_hooks->get_thread_CORBA_priority (client_thread_priority) != -1 ||
                  protocol_hooks->get_thread_implicit_CORBA_priority (client_thread_priority) != -1)
                {
                  // OK
                }
              else
                {
                  if (TAO_debug_level > 0)
                    ACE_DEBUG ((LM_DEBUG, "ERROR: TAO_RT_Invocation_Endpoint_Selector::endpoint_from_profile. "
                                "Unable to access RT CORBA Priority in client thread "
                                "accessing object with CLIENT_PROPAGATED priority model.\n"));
                  throw CORBA::DATA_CONVERSION (CORBA::OMGVMCID | 2, CORBA::COMPLETED_NO);

                }

              // If there are no bands.
              if (bands_policy.ptr () == 0)
                {

                  // Match the priority of the client thread with the
                  // endpoint.
                  match_priority = true;
                }
              // There are bands.
              else
                {
                  // Check which band range we fall in.
                  bool in_range = false;
                  protocol_hooks->get_selector_bands_policy_hook (
                    bands_policy.in (),
                    client_thread_priority,
                    min_priority,
                    max_priority,
                    in_range);

                  // If priority doesn't fall into any of the bands.
                  if (!in_range)
                    {
                      CORBA::PolicyList *p = r.inconsistent_policies ();
                      if (p)
                        {
                          p->length (2);
                          (*p)[0u] = CORBA::Policy::_duplicate (bands_policy.in ());
                          (*p)[1u] =
                            CORBA::Policy::_duplicate (
                              priority_model_policy.in ());
                        }

                      // Indicate error.
                      throw ::CORBA::INV_POLICY ();
                    }

                  // Match the priority of the band with the endpoint.
                  match_bands = true;
                }
            }
        }
    }

  TAO_Endpoint *ep = r.profile ()->endpoint ();

  while (ep != 0)
    {
      // Get the priority of the endpoint.
      CORBA::Short endpoint_priority = ep->priority ();

      // If <all_endpoints_are_valid> or match the priority of the
      // client thread or match the priority of the band or
      // profile contains just one endpoint.  This happens when:
      //    a) we are talking to a nonTAO server (which doesn't have
      //       the concept of multiple endpoints per profile)
      //    or
      //    b) we have TAO server with a non-lane threadpool, in which
      //    case there is only one acceptor
      // In both cases we should use the endpoint regardless of its priority.

      if (all_endpoints_are_valid ||
          (match_priority &&
           client_thread_priority == endpoint_priority) ||
          (match_bands &&
           endpoint_priority <= max_priority &&
           endpoint_priority >= min_priority) ||
          (r.profile ()->endpoint_count () == 1 &&
           endpoint_priority == TAO_INVALID_PRIORITY))
        {
          TAO_RT_Transport_Descriptor_Private_Connection_Property
            private_connection_descriptor_property;

          TAO_RT_Transport_Descriptor_Banded_Connection_Property
            banded_connection_descriptor_property;

          TAO_RT_Transport_Descriptor rt_transport_descriptor (ep);

          CORBA::Policy_var private_connection_policy =
            rt_stub->get_cached_policy (TAO_CACHED_POLICY_RT_PRIVATE_CONNECTION);

          if (!CORBA::is_nil (private_connection_policy.in ()))
            {
              private_connection_descriptor_property.init
                (static_cast<long> (reinterpret_cast<ptrdiff_t> (r.stub ())));
              rt_transport_descriptor.insert
                (&private_connection_descriptor_property);
            }

          if (match_bands)
            {
              banded_connection_descriptor_property.init
                (min_priority, max_priority);

              rt_transport_descriptor.insert
                (&banded_connection_descriptor_property);
            }

          // Check if the invocation has completed.
          if (r.try_connect (&rt_transport_descriptor, val))
            return 1;
        }

      // Go to the next endpoint in this profile.
      ep = ep->next();
    }

  return 0;
}

TAO_END_VERSIONED_NAMESPACE_DECL

#endif /* TAO_HAS_CORBA_MESSAGING && TAO_HAS_CORBA_MESSAGING != 0 */