1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
|
//$Id$
#include "tao/Messaging/Asynch_Invocation.h"
#include "tao/Messaging/Asynch_Reply_Dispatcher.h"
#include "tao/Profile_Transport_Resolver.h"
#include "tao/Invocation_Utils.h"
#include "tao/operation_details.h"
#include "tao/Bind_Dispatcher_Guard.h"
#include "tao/Transport.h"
#include "tao/Muxed_TMS.h"
#include "tao/Pluggable_Messaging.h"
#include "tao/ORB_Constants.h"
#if TAO_HAS_INTERCEPTORS == 1
# include "tao/PortableInterceptorC.h"
#endif /*TAO_HAS_INTERCEPTORS */
ACE_RCSID (Messaging,
Asynch_Invocation,
"$Id$")
TAO_BEGIN_VERSIONED_NAMESPACE_DECL
namespace TAO
{
Asynch_Remote_Invocation::Asynch_Remote_Invocation (
CORBA::Object_ptr otarget,
Profile_Transport_Resolver &resolver,
TAO_Operation_Details &detail,
TAO_Asynch_Reply_Dispatcher_Base *rd,
bool response_expected)
: Synch_Twoway_Invocation (otarget,
resolver,
detail,
response_expected)
, safe_rd_ (rd)
{
}
Invocation_Status
Asynch_Remote_Invocation::remote_invocation (ACE_Time_Value *max_wait_time
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::Exception))
{
TAO_Target_Specification tspec;
this->init_target_spec (tspec ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (TAO_INVOKE_FAILURE);
TAO_OutputCDR &cdr =
this->resolver_.transport ()->messaging_object ()->out_stream ();
Invocation_Status s = TAO_INVOKE_FAILURE;
#if TAO_HAS_INTERCEPTORS == 1
s =
this->send_request_interception (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (TAO_INVOKE_FAILURE);
if (s != TAO_INVOKE_SUCCESS)
return s;
#endif /*TAO_HAS_INTERCEPTORS */
// We have started the interception flow. We need to call the
// ending interception flow if things go wrong. The purpose of the
// try block is to take care of the cases when things go wrong.
ACE_TRY
{
// Oneway semantics. See comments for below send_message()
// call.
cdr.message_attributes (this->details_.request_id (),
this->resolver_.stub (),
TAO_Transport::TAO_ONEWAY_REQUEST,
max_wait_time);
this->write_header (tspec,
cdr
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
this->marshal_data (cdr
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
// Register a reply dispatcher for this invocation. Use the
// preallocated reply dispatcher.
TAO_Bind_Dispatcher_Guard dispatch_guard (
this->details_.request_id (),
this->safe_rd_.get (),
this->resolver_.transport ()->tms ());
// Now that we have bound the reply dispatcher to the map, just
// loose ownership of the reply dispatcher.
this->safe_rd_.release ();
if (dispatch_guard.status () != 0)
{
// @@ What is the right way to handle this error? Do we need
// to call the interceptors in this case?
ACE_THROW_RETURN (CORBA::INTERNAL (TAO::VMCID,
CORBA::COMPLETED_NO),
TAO_INVOKE_FAILURE);
}
// Do not unbind during destruction. We need the entry to be
// there in the map since the reply dispatcher depends on
// that. This is also a trigger to loose the ownership of the
// reply dispatcher.
dispatch_guard.status (TAO_Bind_Dispatcher_Guard::NO_UNBIND);
// Send it as a oneway request. It will make all the required
// paraphernalia within the ORB to fire, like buffering if
// send blocks etc.
s =
this->send_message (cdr,
TAO_Transport::TAO_ONEWAY_REQUEST,
max_wait_time
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
#if TAO_HAS_INTERCEPTORS == 1
// NOTE: We don't need to do the auto_ptr <> trick. We got here
// in the first place since the message was sent properly,
// which implies a reply would be available. Therefore the
// reply dispatcher should be available for another thread to
// collect and dispatch the reply. In MT cases, things are
// more hairy. Just imagine what happens when another thread
// is collecting the reply when we are happily invoking
// interceptors?
// Nothing great on here. If we get a restart during send or a
// proper send, we are supposed to call receiver_other ()
// interception point. So we do that here
Invocation_Status tmp =
this->receive_other_interception (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
// We got an error during the interception.
if (s == TAO_INVOKE_SUCCESS && tmp != TAO_INVOKE_SUCCESS)
s = tmp;
#endif /*TAO_HAS_INTERCEPTORS */
// If an error occurred just return. At this point all the
// endpoint interception would have been invoked. The callee
// would take care of the rest.
if (s != TAO_INVOKE_SUCCESS)
return s;
// NOTE: Not sure how things are handles with exclusive muxed
// strategy.
if (this->resolver_.transport ()->idle_after_send ())
(void) this->resolver_.transport_released ();
}
ACE_CATCHANY
{
#if TAO_HAS_INTERCEPTORS == 1
PortableInterceptor::ReplyStatus const status =
this->handle_any_exception (&ACE_ANY_EXCEPTION
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
if (status == PortableInterceptor::LOCATION_FORWARD ||
status == PortableInterceptor::TRANSPORT_RETRY)
s = TAO_INVOKE_RESTART;
else if (status == PortableInterceptor::SYSTEM_EXCEPTION
|| status == PortableInterceptor::USER_EXCEPTION)
#endif /*TAO_HAS_INTERCEPTORS*/
ACE_RE_THROW;
}
# if defined (ACE_HAS_EXCEPTIONS) \
&& defined (ACE_HAS_BROKEN_UNEXPECTED_EXCEPTIONS)
ACE_CATCHALL
{
#if TAO_HAS_INTERCEPTORS == 1
PortableInterceptor::ReplyStatus st =
this->handle_all_exception (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
if (st == PortableInterceptor::LOCATION_FORWARD ||
st == PortableInterceptor::TRANSPORT_RETRY)
s = TAO_INVOKE_RESTART;
else
#endif /*TAO_HAS_INTERCEPTORS == 1*/
ACE_RE_THROW;
}
# endif /* ACE_HAS_EXCEPTIONS &&
ACE_HAS_BROKEN_UNEXPECTED_EXCEPTION*/
ACE_ENDTRY;
ACE_CHECK_RETURN (TAO_INVOKE_FAILURE);
return s;
}
}
TAO_END_VERSIONED_NAMESPACE_DECL
|