summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormitza <mitza@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2006-07-14 15:21:34 +0000
committermitza <mitza@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2006-07-14 15:21:34 +0000
commit704b44ec5425b71899e08c4bb61aed7de1dd5d77 (patch)
tree8c4ced1812cdf19f9c9131cbf12bdca4997fdd17
parent4000705a20fe9ddd75ba849d592fd18a18dcdb9b (diff)
downloadATCD-704b44ec5425b71899e08c4bb61aed7de1dd5d77.tar.gz
Fri Jul 14 15:11:11 UTC 2006 Adam Mitz <mitza@ociweb.com>
-rw-r--r--TAO/ChangeLog45
-rw-r--r--TAO/NEWS14
-rw-r--r--TAO/docs/cec_options.html30
-rw-r--r--TAO/orbsvcs/orbsvcs/CosEvent/CEC_Default_Factory.cpp81
-rw-r--r--TAO/orbsvcs/orbsvcs/CosEvent/CEC_Default_Factory.h10
-rw-r--r--TAO/orbsvcs/orbsvcs/CosEvent/CEC_EventChannel.cpp7
-rw-r--r--TAO/orbsvcs/orbsvcs/CosEvent/CEC_EventChannel.h4
-rw-r--r--TAO/orbsvcs/orbsvcs/CosEvent/CEC_Event_Loader.cpp14
-rw-r--r--TAO/orbsvcs/orbsvcs/CosEvent/CEC_Factory.h8
-rw-r--r--TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullConsumer.cpp37
-rw-r--r--TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullConsumer.h16
-rw-r--r--TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullSupplier.cpp42
-rw-r--r--TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullSupplier.h16
-rw-r--r--TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.cpp39
-rw-r--r--TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.h16
-rw-r--r--TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushSupplier.cpp89
-rw-r--r--TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushSupplier.h34
-rw-r--r--TAO/orbsvcs/orbsvcs/CosEvent/CEC_TypedEventChannel.cpp31
-rw-r--r--TAO/orbsvcs/orbsvcs/CosEvent/CEC_TypedEventChannel.h3
-rw-r--r--TAO/orbsvcs/orbsvcs/CosEvent/CEC_TypedProxyPushConsumer.cpp38
-rw-r--r--TAO/orbsvcs/orbsvcs/CosEvent/CEC_TypedProxyPushConsumer.h17
-rw-r--r--TAO/orbsvcs/tests/CosEvent/Timeout/README40
-rw-r--r--TAO/orbsvcs/tests/CosEvent/Timeout/TestEventConsumer_i.cpp78
-rw-r--r--TAO/orbsvcs/tests/CosEvent/Timeout/TestEventConsumer_i.h46
-rw-r--r--TAO/orbsvcs/tests/CosEvent/Timeout/Timeout.mpc4
-rw-r--r--TAO/orbsvcs/tests/CosEvent/Timeout/TimeoutTestMain.cpp177
-rw-r--r--TAO/orbsvcs/tests/CosEvent/Timeout/cosevent.conf3
-rwxr-xr-xTAO/orbsvcs/tests/CosEvent/Timeout/run_test.pl66
28 files changed, 946 insertions, 59 deletions
diff --git a/TAO/ChangeLog b/TAO/ChangeLog
index 3e2ef4a57c6..98cdbb6731a 100644
--- a/TAO/ChangeLog
+++ b/TAO/ChangeLog
@@ -1,3 +1,48 @@
+Fri Jul 14 15:11:11 UTC 2006 Adam Mitz <mitza@ociweb.com>
+
+ * orbsvcs/orbsvcs/CosEvent/CEC_Default_Factory.h:
+ * orbsvcs/orbsvcs/CosEvent/CEC_Default_Factory.cpp:
+ * orbsvcs/orbsvcs/CosEvent/CEC_EventChannel.h:
+ * orbsvcs/orbsvcs/CosEvent/CEC_EventChannel.cpp:
+ * orbsvcs/orbsvcs/CosEvent/CEC_Event_Loader.cpp:
+ * orbsvcs/orbsvcs/CosEvent/CEC_Factory.h:
+ * orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullConsumer.h:
+ * orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullConsumer.cpp:
+ * orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullSupplier.h:
+ * orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullSupplier.cpp:
+ * orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.h:
+ * orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.cpp:
+ * orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushSupplier.h:
+ * orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushSupplier.cpp:
+ * orbsvcs/orbsvcs/CosEvent/CEC_TypedEventChannel.h:
+ * orbsvcs/orbsvcs/CosEvent/CEC_TypedEventChannel.cpp:
+ * orbsvcs/orbsvcs/CosEvent/CEC_TypedProxyPushConsumer.h:
+ * orbsvcs/orbsvcs/CosEvent/CEC_TypedProxyPushConsumer.cpp:
+
+ Added two new options to the CEC_Default_Factory,
+ -CECConsumerOperationTimeout and -CECSupplierOperationTimeout.
+ These options apply relative roundtrip timeout policies for consumer
+ and supplier objects. This allows users to minimize the ill effects of
+ unresponsive consumer and supplier operations (i.e. push() or pull()).
+ See bugzilla #2594 and docs/cec_options.html.
+ Also, exposed the "disconnect_callbacks" event channel attribute
+ as the "-b" option to the CosEvent_Service executable.
+
+ * NEWS
+ * docs/cec_options.html:
+
+ Documented the Timeout feature described above.
+
+ * orbsvcs/tests/CosEvent/Timeout/README:
+ * orbsvcs/tests/CosEvent/Timeout/TestEventConsumer_i.h:
+ * orbsvcs/tests/CosEvent/Timeout/TestEventConsumer_i.cpp:
+ * orbsvcs/tests/CosEvent/Timeout/Timeout.mpc:
+ * orbsvcs/tests/CosEvent/Timeout/TimeoutTestMain.cpp:
+ * orbsvcs/tests/CosEvent/Timeout/cosevent.conf:
+ * orbsvcs/tests/CosEvent/Timeout/run_test.pl:
+
+ Functional test for the Timeout feature described above.
+
Fri Jul 14 02:02:55 UTC 2006 Phil Mesnier <mesnier_p@ociweb.com>
* tao/IIOP_Acceptor.cpp:
diff --git a/TAO/NEWS b/TAO/NEWS
index 1a71a2c2aa1..8f17aa8a053 100644
--- a/TAO/NEWS
+++ b/TAO/NEWS
@@ -8,6 +8,20 @@ PLANNED MAJOR CHANGES "SOMETIME IN THE FUTURE" (i.e., exact beta not known)
. (Remedy) Add support for CORBA/e compact and micro
+
+USER VISIBLE CHANGES BETWEEN TAO-1.5.2 and TAO-1.5.3
+====================================================
+
+. Added new options, -CECConsumerOperationTimeout and
+ -CECSupplierOperationTimeout, to the CosEvent service. See Bugzilla #2594
+ and $TAO_ROOT/docs/cec_options.html for details. The purpose of these
+ options is to use the relative round-trip timeout feature from the TAO
+ Messaging library to detect clients that are "hung" in push() or pull()
+ operations but would otherwise not be detetcted as "bad" by the
+ -CECReactive*Control options (since they have a thread available in
+ orb->run()).
+
+
USER VISIBLE CHANGES BETWEEN TAO-1.5.1 and TAO-1.5.2
====================================================
diff --git a/TAO/docs/cec_options.html b/TAO/docs/cec_options.html
index b00b58ee590..a1109db123d 100644
--- a/TAO/docs/cec_options.html
+++ b/TAO/docs/cec_options.html
@@ -202,6 +202,36 @@ static CEC_Factory "-CECDispatching reactive ....."
</TD>
</TR>
+ <!-- <TR NAME="CECConsumerOperationTimeout"> -->
+ <TR>
+ <TD><CODE>-CECConsumerOperationTimeout</CODE>
+ <EM>timeout</EM>
+ </TD>
+ <TD>Set the relative roundtrip timeout for all operations (except
+ &quot;pings&quot; due to reactive consumer control) on consumers.
+ Reactive consumer control must be enabled for this option to take
+ effect. The units for <EM>timeout</EM> are microseconds.
+ If a remote operation invoked on the consumer exceeds the timeout,
+ then that consumer will be considered &quot;bad&quot; and will be
+ disconnected from the event channel.
+ </TD>
+ </TR>
+
+ <!-- <TR NAME="CECSupplierOperationTimeout"> -->
+ <TR>
+ <TD><CODE>-CECSupplierOperationTimeout</CODE>
+ <EM>timeout</EM>
+ </TD>
+ <TD>Set the relative roundtrip timeout for all operations (except
+ &quot;pings&quot; due to reactive supplier control) on suppliers.
+ Reactive supplier control must be enabled for this option to take
+ effect. The units for <EM>timeout</EM> are microseconds.
+ If a remote operation invoked on the supplier exceeds the timeout
+ then that supplier will be considered &quot;bad&quot; and will be
+ disconnected from the event channel.
+ </TD>
+ </TR>
+
<!-- <TR NAME="CECProxyConsumerCollection"> -->
<TR>
<TD><CODE>-CECProxyConsumerCollection</CODE>
diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Default_Factory.cpp b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Default_Factory.cpp
index dfd6db30524..89a9f8b028a 100644
--- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Default_Factory.cpp
+++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Default_Factory.cpp
@@ -19,6 +19,9 @@
#include "orbsvcs/ESF/ESF_Copy_On_Read.h"
#include "orbsvcs/ESF/ESF_Proxy_List.h"
#include "orbsvcs/ESF/ESF_Proxy_RB_Tree.h"
+#include "orbsvcs/Time_Utilities.h"
+
+#include "tao/Messaging/Messaging_RT_PolicyC.h"
#include "ace/Arg_Shifter.h"
#include "ace/Sched_Params.h"
@@ -368,6 +371,32 @@ TAO_CEC_Default_Factory::init (int argc, ACE_TCHAR* argv[])
}
}
+ else if (ACE_OS::strcasecmp (arg, ACE_TEXT("-CECConsumerOperationTimeout")) == 0)
+ {
+ arg_shifter.consume_arg ();
+
+ if (arg_shifter.is_parameter_next ())
+ {
+ const ACE_TCHAR* opt = arg_shifter.get_current ();
+ unsigned long timeout = ACE_OS::strtoul (opt, 0, 10);
+ this->consumer_timeout_.usec (timeout);
+ arg_shifter.consume_arg ();
+ }
+ }
+
+ else if (ACE_OS::strcasecmp (arg, ACE_TEXT("-CECSupplierOperationTimeout")) == 0)
+ {
+ arg_shifter.consume_arg ();
+
+ if (arg_shifter.is_parameter_next ())
+ {
+ const ACE_TCHAR* opt = arg_shifter.get_current ();
+ unsigned long timeout = ACE_OS::strtoul (opt, 0, 10);
+ this->supplier_timeout_.usec (timeout);
+ arg_shifter.consume_arg ();
+ }
+ }
+
else if (ACE_OS::strcasecmp (arg, ACE_TEXT("-CECProxyDisconnectRetries")) == 0)
{
arg_shifter.consume_arg ();
@@ -525,14 +554,22 @@ TAO_CEC_Default_Factory::destroy_supplier_admin (TAO_CEC_TypedSupplierAdmin *x)
TAO_CEC_ProxyPushSupplier*
TAO_CEC_Default_Factory::create_proxy_push_supplier (TAO_CEC_EventChannel *ec)
{
- return new TAO_CEC_ProxyPushSupplier (ec);
+ TAO_CEC_ProxyPushSupplier *created;
+ ACE_Time_Value timeout = this->consumer_control_ ? this->consumer_timeout_
+ : ACE_Time_Value::zero;
+ ACE_NEW_RETURN (created, TAO_CEC_ProxyPushSupplier (ec, timeout), 0);
+ return created;
}
#if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
TAO_CEC_ProxyPushSupplier*
TAO_CEC_Default_Factory::create_proxy_push_supplier (TAO_CEC_TypedEventChannel *ec)
{
- return new TAO_CEC_ProxyPushSupplier (ec);
+ TAO_CEC_ProxyPushSupplier *created;
+ ACE_Time_Value timeout = this->consumer_control_ ? this->consumer_timeout_
+ : ACE_Time_Value::zero;
+ ACE_NEW_RETURN (created, TAO_CEC_ProxyPushSupplier (ec, timeout), 0);
+ return created;
}
#endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
@@ -545,7 +582,11 @@ TAO_CEC_Default_Factory::destroy_proxy_push_supplier (TAO_CEC_ProxyPushSupplier
TAO_CEC_ProxyPullSupplier*
TAO_CEC_Default_Factory::create_proxy_pull_supplier (TAO_CEC_EventChannel *ec)
{
- return new TAO_CEC_ProxyPullSupplier (ec);
+ TAO_CEC_ProxyPullSupplier *created;
+ ACE_Time_Value timeout = this->consumer_control_ ? this->consumer_timeout_
+ : ACE_Time_Value::zero;
+ ACE_NEW_RETURN (created, TAO_CEC_ProxyPullSupplier (ec, timeout), 0);
+ return created;
}
void
@@ -557,14 +598,22 @@ TAO_CEC_Default_Factory::destroy_proxy_pull_supplier (TAO_CEC_ProxyPullSupplier
TAO_CEC_ProxyPushConsumer*
TAO_CEC_Default_Factory::create_proxy_push_consumer (TAO_CEC_EventChannel *ec)
{
- return new TAO_CEC_ProxyPushConsumer (ec);
+ TAO_CEC_ProxyPushConsumer *created;
+ ACE_Time_Value timeout = this->supplier_control_ ? this->supplier_timeout_
+ : ACE_Time_Value::zero;
+ ACE_NEW_RETURN (created, TAO_CEC_ProxyPushConsumer (ec, timeout), 0);
+ return created;
}
#if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
TAO_CEC_TypedProxyPushConsumer*
TAO_CEC_Default_Factory::create_proxy_push_consumer (TAO_CEC_TypedEventChannel *ec)
{
- return new TAO_CEC_TypedProxyPushConsumer (ec);
+ TAO_CEC_TypedProxyPushConsumer *created;
+ ACE_Time_Value timeout = this->supplier_control_ ? this->supplier_timeout_
+ : ACE_Time_Value::zero;
+ ACE_NEW_RETURN (created, TAO_CEC_TypedProxyPushConsumer (ec, timeout), 0);
+ return created;
}
#endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
@@ -585,7 +634,11 @@ TAO_CEC_Default_Factory::destroy_proxy_push_consumer (TAO_CEC_TypedProxyPushCons
TAO_CEC_ProxyPullConsumer*
TAO_CEC_Default_Factory::create_proxy_pull_consumer (TAO_CEC_EventChannel *ec)
{
- return new TAO_CEC_ProxyPullConsumer (ec);
+ TAO_CEC_ProxyPullConsumer *created;
+ ACE_Time_Value timeout = this->supplier_control_ ? this->supplier_timeout_
+ : ACE_Time_Value::zero;
+ ACE_NEW_RETURN (created, TAO_CEC_ProxyPullConsumer (ec, timeout), 0);
+ return created;
}
void
@@ -1322,6 +1375,22 @@ TAO_CEC_Default_Factory::destroy_supplier_control (TAO_CEC_SupplierControl* x)
delete x;
}
+CORBA::Policy_ptr
+TAO_CEC_Default_Factory::create_roundtrip_timeout_policy
+ (ACE_Time_Value timeout)
+{
+ //get the existing orb
+ int fake_argc = 0;
+ CORBA::ORB_var orb = CORBA::ORB_init (fake_argc, 0, this->orbid_);
+
+ CORBA::Any value;
+ TimeBase::TimeT timet;
+ ORBSVCS_Time::Time_Value_to_TimeT (timet, timeout);
+ value <<= timet;
+ return orb->create_policy (Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
+ value);
+}
+
TAO_END_VERSIONED_NAMESPACE_DECL
// ****************************************************************
diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Default_Factory.h b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Default_Factory.h
index 2704a7ca92c..b31ce703650 100644
--- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Default_Factory.h
+++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Default_Factory.h
@@ -161,6 +161,9 @@ public:
create_supplier_control (TAO_CEC_TypedEventChannel*);
#endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
+ virtual CORBA::Policy_ptr create_roundtrip_timeout_policy
+ (ACE_Time_Value timeout);
+
private:
/// Parse an argument to set the type of collections used.
int parse_collection_arg (ACE_TCHAR* opt);
@@ -203,6 +206,13 @@ private:
ACE_Time_Value consumer_control_timeout_;
ACE_Time_Value supplier_control_timeout_;
+ /// The consumer and supplier operation timeouts.
+ /// Only in effect if the corresponding "reactive control" is enabled.
+ /// Applies the given timeout as the round-trip time policy on the
+ /// object reference.
+ ACE_Time_Value consumer_timeout_;
+ ACE_Time_Value supplier_timeout_;
+
/// The number of retries before disconnecting a proxy
unsigned int proxy_disconnect_retries_;
};
diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_EventChannel.cpp b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_EventChannel.cpp
index c24469dae73..72b3352cb66 100644
--- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_EventChannel.cpp
+++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_EventChannel.cpp
@@ -232,4 +232,11 @@ TAO_CEC_EventChannel::destroy (ACE_ENV_SINGLE_ARG_DECL)
this->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
}
+CORBA::Policy_ptr
+TAO_CEC_EventChannel::create_roundtrip_timeout_policy
+ (ACE_Time_Value timeout)
+{
+ return this->factory_->create_roundtrip_timeout_policy (timeout);
+}
+
TAO_END_VERSIONED_NAMESPACE_DECL
diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_EventChannel.h b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_EventChannel.h
index 5be1829ce43..7471e653f51 100644
--- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_EventChannel.h
+++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_EventChannel.h
@@ -258,6 +258,10 @@ public:
ServantRetryMap& get_servant_retry_map (void);
+ /// Forwarded to the factory
+ CORBA::Policy_ptr create_roundtrip_timeout_policy
+ (ACE_Time_Value timeout);
+
private:
/// The POAs used to activate "supplier-side" and "consumer-side"
/// objects.
diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Event_Loader.cpp b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Event_Loader.cpp
index a627d07245e..e9a9e726717 100644
--- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Event_Loader.cpp
+++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Event_Loader.cpp
@@ -103,13 +103,14 @@ TAO_CEC_Event_Loader::create_object (CORBA::ORB_ptr orb,
// Parse the options, check if we should bind with the naming
// service and under what name...
- ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("n:o:p:xrtd"));
+ ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("n:o:p:xrtdb"));
int opt;
const ACE_TCHAR *service_name = ACE_TEXT("CosEventService");
const ACE_TCHAR *ior_file = 0;
const ACE_TCHAR *pid_file = 0;
this->bind_to_naming_service_ = 1;
int use_rebind = 0;
+ int disconnect_callbacks = 0;
#if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
// Flag to create a typed event channel
@@ -142,6 +143,10 @@ TAO_CEC_Event_Loader::create_object (CORBA::ORB_ptr orb,
use_rebind = 1;
break;
+ case 'b':
+ disconnect_callbacks = 1;
+ break;
+
#if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
case 't':
typed_ec = 1;
@@ -162,6 +167,7 @@ TAO_CEC_Event_Loader::create_object (CORBA::ORB_ptr orb,
ACE_TEXT ("-p pid_file_name ")
ACE_TEXT ("-x [disable naming service bind]")
ACE_TEXT ("-r [rebind, no AlreadyBound failures] ")
+ ACE_TEXT ("-b [send callBacks on disconnect] ")
ACE_TEXT ("-t [enable typed event channel]")
ACE_TEXT ("-d [destroy typed event channelon shutdown] ")
ACE_TEXT ("\n"),
@@ -174,6 +180,7 @@ TAO_CEC_Event_Loader::create_object (CORBA::ORB_ptr orb,
ACE_TEXT ("-p pid_file_name ")
ACE_TEXT ("-x [disable naming service bind] ")
ACE_TEXT ("-r [rebind, no AlreadyBound failures] ")
+ ACE_TEXT ("-b [send callBacks on disconnect] ")
ACE_TEXT ("\n"),
argv[0]));
#endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
@@ -210,6 +217,9 @@ TAO_CEC_Event_Loader::create_object (CORBA::ORB_ptr orb,
// Create and activate the event service
this->attributes_ = new TAO_CEC_EventChannel_Attributes(poa.in (),
poa.in ());
+
+ this->attributes_->disconnect_callbacks = disconnect_callbacks;
+
this->factory_ = 0;
this->ec_impl_ = new TAO_CEC_EventChannel (*this->attributes_,
@@ -349,6 +359,8 @@ TAO_CEC_Event_Loader::create_object (CORBA::ORB_ptr orb,
this->typed_attributes_->destroy_on_shutdown = 1;
}
+ this->typed_attributes_->disconnect_callbacks = disconnect_callbacks;
+
this->factory_ = 0;
this->typed_ec_impl_ = new TAO_CEC_TypedEventChannel (*this->typed_attributes_,
diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Factory.h b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Factory.h
index f7c6950815f..56ebfb288c5 100644
--- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Factory.h
+++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Factory.h
@@ -24,6 +24,11 @@
#include "ace/Service_Object.h"
+#if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
+#include "tao/AnyTypeCode/AnyTypeCode_methods.h"
+#endif
+
+#include "tao/Policy_ForwardC.h"
#include "tao/Versioned_Namespace.h"
ACE_BEGIN_VERSIONED_NAMESPACE_DECL
@@ -219,6 +224,9 @@ public:
#endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
virtual void
destroy_supplier_control (TAO_CEC_SupplierControl*) = 0;
+
+ virtual CORBA::Policy_ptr create_roundtrip_timeout_policy
+ (ACE_Time_Value timeout) = 0;
};
TAO_END_VERSIONED_NAMESPACE_DECL
diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullConsumer.cpp b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullConsumer.cpp
index 039ccb5f5f8..f512d50eb9d 100644
--- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullConsumer.cpp
+++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullConsumer.cpp
@@ -21,8 +21,9 @@ TAO_BEGIN_VERSIONED_NAMESPACE_DECL
typedef ACE_Reverse_Lock<ACE_Lock> TAO_CEC_Unlock;
TAO_CEC_ProxyPullConsumer::
- TAO_CEC_ProxyPullConsumer (TAO_CEC_EventChannel* ec)
+ TAO_CEC_ProxyPullConsumer (TAO_CEC_EventChannel* ec, ACE_Time_Value timeout)
: event_channel_ (ec),
+ timeout_ (timeout),
refcount_ (1)
{
this->lock_ =
@@ -201,11 +202,11 @@ TAO_CEC_ProxyPullConsumer::supplier_non_existent (
disconnected = 1;
return 0;
}
- if (CORBA::is_nil (this->supplier_.in ()))
+ if (CORBA::is_nil (this->nopolicy_supplier_.in ()))
{
return 0;
}
- supplier = CORBA::Object::_duplicate (this->supplier_.in ());
+ supplier = CORBA::Object::_duplicate (this->nopolicy_supplier_.in ());
}
#if (TAO_HAS_MINIMUM_CORBA == 0)
@@ -324,14 +325,40 @@ TAO_CEC_ProxyPullConsumer::connect_pull_supplier (
if (this->is_connected_i ())
return;
}
- this->supplier_ =
- CosEventComm::PullSupplier::_duplicate (pull_supplier);
+ this->supplier_ = apply_policy (pull_supplier);
}
// Notify the event channel...
this->event_channel_->connected (this ACE_ENV_ARG_PARAMETER);
}
+CosEventComm::PullSupplier_ptr
+TAO_CEC_ProxyPullConsumer::apply_policy (CosEventComm::PullSupplier_ptr pre)
+{
+ this->nopolicy_supplier_ = CosEventComm::PullSupplier::_duplicate (pre);
+#if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
+ CosEventComm::PullSupplier_var post =
+ CosEventComm::PullSupplier::_duplicate (pre);
+ if (this->timeout_ > ACE_Time_Value::zero)
+ {
+ CORBA::PolicyList policy_list;
+ policy_list.length (1);
+ policy_list[0] = this->event_channel_->
+ create_roundtrip_timeout_policy (this->timeout_);
+
+ CORBA::Object_var post_obj = pre->_set_policy_overrides
+ (policy_list, CORBA::ADD_OVERRIDE);
+ post = CosEventComm::PullSupplier::_narrow (post_obj.in ());
+
+ policy_list[0]->destroy ();
+ policy_list.length (0);
+ }
+ return post._retn ();
+#else
+ return CosEventComm::PullSupplier::_duplicate (pre);
+#endif /* TAO_HAS_CORBA_MESSAGING */
+}
+
void
TAO_CEC_ProxyPullConsumer::disconnect_pull_consumer (
ACE_ENV_SINGLE_ARG_DECL)
diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullConsumer.h b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullConsumer.h
index f47959a2933..7e3e48e7aad 100644
--- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullConsumer.h
+++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullConsumer.h
@@ -50,7 +50,8 @@ public:
typedef CosEventChannelAdmin::ProxyPullConsumer_var _var_type;
/// constructor...
- TAO_CEC_ProxyPullConsumer (TAO_CEC_EventChannel* event_channel);
+ TAO_CEC_ProxyPullConsumer (TAO_CEC_EventChannel* event_channel,
+ ACE_Time_Value timeout);
/// destructor...
virtual ~TAO_CEC_ProxyPullConsumer (void);
@@ -118,19 +119,30 @@ protected:
/// Release the supplier
void cleanup_i (void);
+ /// Assigns the parameter to both supplier_ and nopolicy_supplier_, and
+ /// applies policies (when appropriate) to supplier_.
+ CosEventComm::PullSupplier_ptr apply_policy
+ (CosEventComm::PullSupplier_ptr s);
+
private:
/// The supplier admin, used for activation and memory managment.
TAO_CEC_EventChannel* event_channel_;
+ ACE_Time_Value timeout_;
+
/// The locking strategy.
ACE_Lock* lock_;
/// The reference count.
CORBA::ULong refcount_;
- /// The supplier....
+ /// The supplier -- use apply_policy() instead of assigning directly to
+ /// supplier_. This will keep supplier_ and nopolicy_supplier_ in sync.
CosEventComm::PullSupplier_var supplier_;
+ /// The supplier without any policies applied
+ CosEventComm::PullSupplier_var nopolicy_supplier_;
+
/// Store the default POA.
PortableServer::POA_var default_POA_;
};
diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullSupplier.cpp b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullSupplier.cpp
index fb3f757e79c..4bef1b19c1f 100644
--- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullSupplier.cpp
+++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullSupplier.cpp
@@ -18,8 +18,10 @@ TAO_BEGIN_VERSIONED_NAMESPACE_DECL
typedef ACE_Reverse_Lock<ACE_Lock> TAO_CEC_Unlock;
-TAO_CEC_ProxyPullSupplier::TAO_CEC_ProxyPullSupplier (TAO_CEC_EventChannel* ec)
+TAO_CEC_ProxyPullSupplier::TAO_CEC_ProxyPullSupplier
+ (TAO_CEC_EventChannel* ec, ACE_Time_Value timeout)
: event_channel_ (ec),
+ timeout_ (timeout),
refcount_ (1),
connected_ (0),
wait_not_empty_ (queue_lock_)
@@ -138,11 +140,11 @@ TAO_CEC_ProxyPullSupplier::consumer_non_existent (
disconnected = 1;
return 0;
}
- if (CORBA::is_nil (this->consumer_.in ()))
+ if (CORBA::is_nil (this->nopolicy_consumer_.in ()))
{
return 0;
}
- consumer = CORBA::Object::_duplicate (this->consumer_.in ());
+ consumer = CORBA::Object::_duplicate (this->nopolicy_consumer_.in ());
}
#if (TAO_HAS_MINIMUM_CORBA == 0)
@@ -265,8 +267,7 @@ TAO_CEC_ProxyPullSupplier::connect_pull_consumer (
// Re-connections are allowed....
this->cleanup_i ();
- this->consumer_ =
- CosEventComm::PullConsumer::_duplicate (pull_consumer);
+ this->consumer_ = apply_policy (pull_consumer);
this->connected_ = 1;
TAO_CEC_Unlock reverse_lock (*this->lock_);
@@ -284,8 +285,7 @@ TAO_CEC_ProxyPullSupplier::connect_pull_consumer (
return;
}
- this->consumer_ =
- CosEventComm::PullConsumer::_duplicate (pull_consumer);
+ this->consumer_ = apply_policy (pull_consumer);
this->connected_ = 1;
}
@@ -293,6 +293,34 @@ TAO_CEC_ProxyPullSupplier::connect_pull_consumer (
this->event_channel_->connected (this ACE_ENV_ARG_PARAMETER);
}
+CosEventComm::PullConsumer_ptr
+TAO_CEC_ProxyPullSupplier::apply_policy (CosEventComm::PullConsumer_ptr pre)
+{
+ if (CORBA::is_nil (pre)) return pre;
+ this->nopolicy_consumer_ = CosEventComm::PullConsumer::_duplicate (pre);
+#if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
+ CosEventComm::PullConsumer_var post =
+ CosEventComm::PullConsumer::_duplicate (pre);
+ if (this->timeout_ > ACE_Time_Value::zero)
+ {
+ CORBA::PolicyList policy_list;
+ policy_list.length (1);
+ policy_list[0] = this->event_channel_->
+ create_roundtrip_timeout_policy (this->timeout_);
+
+ CORBA::Object_var post_obj = pre->_set_policy_overrides
+ (policy_list, CORBA::ADD_OVERRIDE);
+ post = CosEventComm::PullConsumer::_narrow(post_obj.in ());
+
+ policy_list[0]->destroy ();
+ policy_list.length (0);
+ }
+ return post._retn ();
+#else
+ return CosEventComm::PullConsumer::_duplicate (pre);
+#endif /* TAO_HAS_CORBA_MESSAGING */
+}
+
void
TAO_CEC_ProxyPullSupplier::disconnect_pull_supplier (
ACE_ENV_SINGLE_ARG_DECL)
diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullSupplier.h b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullSupplier.h
index 8fe65754426..3e6e2fdea08 100644
--- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullSupplier.h
+++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullSupplier.h
@@ -57,7 +57,8 @@ public:
typedef CosEventChannelAdmin::ProxyPullSupplier_var _var_type;
/// constructor...
- TAO_CEC_ProxyPullSupplier (TAO_CEC_EventChannel* event_channel);
+ TAO_CEC_ProxyPullSupplier (TAO_CEC_EventChannel* event_channel,
+ ACE_Time_Value timeout);
/// destructor...
virtual ~TAO_CEC_ProxyPullSupplier (void);
@@ -133,19 +134,30 @@ protected:
/// Release the child and the consumer
void cleanup_i (void);
+ /// Assigns the parameter to both consumer_ and nopolicy_consumer_, and
+ /// applies policies (when appropriate) to consumer_.
+ CosEventComm::PullConsumer_ptr apply_policy
+ (CosEventComm::PullConsumer_ptr c);
+
private:
/// The Event Channel that owns this object.
TAO_CEC_EventChannel* event_channel_;
+ ACE_Time_Value timeout_;
+
/// The locking strategy.
ACE_Lock* lock_;
/// The reference count.
CORBA::ULong refcount_;
- /// The consumer....
+ /// The consumer -- use apply_policy() instead of assigning directly to
+ /// consumer_. This will keep consumer_ and nopolicy_consumer_ in sync.
CosEventComm::PullConsumer_var consumer_;
+ /// The consumer without any policies applied
+ CosEventComm::PullConsumer_var nopolicy_consumer_;
+
/// If the flag is not zero then we are connected, notice that the
/// consumer can be nil.
int connected_;
diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.cpp b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.cpp
index 76d85c4b0c0..c0d85903564 100644
--- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.cpp
+++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.cpp
@@ -20,8 +20,10 @@ TAO_BEGIN_VERSIONED_NAMESPACE_DECL
typedef ACE_Reverse_Lock<ACE_Lock> TAO_CEC_Unlock;
TAO_CEC_ProxyPushConsumer::
- TAO_CEC_ProxyPushConsumer (TAO_CEC_EventChannel* ec)
+ TAO_CEC_ProxyPushConsumer (TAO_CEC_EventChannel* ec,
+ ACE_Time_Value timeout)
: event_channel_ (ec),
+ timeout_(timeout),
refcount_ (1),
connected_ (false)
{
@@ -103,11 +105,11 @@ TAO_CEC_ProxyPushConsumer::supplier_non_existent (
disconnected = true;
return false;
}
- if (CORBA::is_nil (this->supplier_.in ()))
+ if (CORBA::is_nil (this->nopolicy_supplier_.in ()))
{
return false;
}
- supplier = CORBA::Object::_duplicate (this->supplier_.in ());
+ supplier = CORBA::Object::_duplicate (this->nopolicy_supplier_.in ());
}
#if (TAO_HAS_MINIMUM_CORBA == 0)
@@ -224,8 +226,7 @@ TAO_CEC_ProxyPushConsumer::connect_push_supplier (
if (this->is_connected_i ())
return;
}
- this->supplier_ =
- CosEventComm::PushSupplier::_duplicate (push_supplier);
+ this->supplier_ = apply_policy (push_supplier);
this->connected_ = true;
}
@@ -233,6 +234,34 @@ TAO_CEC_ProxyPushConsumer::connect_push_supplier (
this->event_channel_->connected (this ACE_ENV_ARG_PARAMETER);
}
+CosEventComm::PushSupplier_ptr
+TAO_CEC_ProxyPushConsumer::apply_policy (CosEventComm::PushSupplier_ptr pre)
+{
+ if (CORBA::is_nil (pre)) return pre;
+ this->nopolicy_supplier_ = CosEventComm::PushSupplier::_duplicate (pre);
+#if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
+ CosEventComm::PushSupplier_var post =
+ CosEventComm::PushSupplier::_duplicate (pre);
+ if (this->timeout_ > ACE_Time_Value::zero)
+ {
+ CORBA::PolicyList policy_list;
+ policy_list.length (1);
+ policy_list[0] = this->event_channel_->
+ create_roundtrip_timeout_policy (this->timeout_);
+
+ CORBA::Object_var post_obj = pre->_set_policy_overrides
+ (policy_list, CORBA::ADD_OVERRIDE);
+ post = CosEventComm::PushSupplier::_narrow(post_obj.in ());
+
+ policy_list[0]->destroy ();
+ policy_list.length (0);
+ }
+ return post._retn ();
+#else
+ return CosEventComm::PushSupplier::_duplicate (pre);
+#endif /* TAO_HAS_CORBA_MESSAGING */
+}
+
void
TAO_CEC_ProxyPushConsumer::push (const CORBA::Any& event
ACE_ENV_ARG_DECL)
diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.h b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.h
index 63ba4bddad2..7e141637291 100644
--- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.h
+++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.h
@@ -52,7 +52,8 @@ public:
typedef CosEventChannelAdmin::ProxyPushConsumer_var _var_type;
/// constructor...
- TAO_CEC_ProxyPushConsumer (TAO_CEC_EventChannel* event_channel);
+ TAO_CEC_ProxyPushConsumer (TAO_CEC_EventChannel* event_channel,
+ ACE_Time_Value timeout);
/// destructor...
virtual ~TAO_CEC_ProxyPushConsumer (void);
@@ -121,19 +122,30 @@ protected:
/// Release the supplier
void cleanup_i (void);
+ /// Assigns the parameter to both supplier_ and nopolicy_supplier_, and
+ /// applies policies (when appropriate) to supplier_.
+ CosEventComm::PushSupplier_ptr apply_policy
+ (CosEventComm::PushSupplier_ptr s);
+
private:
/// The supplier admin, used for activation and memory managment.
TAO_CEC_EventChannel* event_channel_;
+ ACE_Time_Value timeout_;
+
/// The locking strategy.
ACE_Lock* lock_;
/// The reference count.
CORBA::ULong refcount_;
- /// The supplier....
+ /// The supplier -- use apply_policy() instead of assigning directly to
+ /// supplier_. This will keep supplier_ and nopolicy_supplier_ in sync.
CosEventComm::PushSupplier_var supplier_;
+ /// The supplier without any policies applied
+ CosEventComm::PushSupplier_var nopolicy_supplier_;
+
/// If the flag is true then we are connected, notice that the
/// supplier can be nil.
bool connected_;
diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushSupplier.cpp b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushSupplier.cpp
index 028bf6e4b28..03d7cf7b57d 100644
--- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushSupplier.cpp
+++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushSupplier.cpp
@@ -12,7 +12,7 @@
#include "orbsvcs/CosEvent/CEC_ConsumerControl.h"
#include "orbsvcs/ESF/ESF_RefCount_Guard.h"
#include "orbsvcs/ESF/ESF_Proxy_RefCount_Guard.h"
-#include "tao/debug.h"
+
#if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
#include "orbsvcs/CosEvent/CEC_TypedEvent.h"
#include "orbsvcs/CosEvent/CEC_TypedEventChannel.h"
@@ -35,8 +35,10 @@ TAO_BEGIN_VERSIONED_NAMESPACE_DECL
typedef ACE_Reverse_Lock<ACE_Lock> TAO_CEC_Unlock;
// TAO_CEC_ProxyPushSupplier Constructure (Un-typed EC)
-TAO_CEC_ProxyPushSupplier::TAO_CEC_ProxyPushSupplier (TAO_CEC_EventChannel* ec)
+TAO_CEC_ProxyPushSupplier::TAO_CEC_ProxyPushSupplier (TAO_CEC_EventChannel* ec,
+ ACE_Time_Value timeout)
: event_channel_ (ec),
+ timeout_ (timeout),
refcount_ (1)
{
#if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
@@ -54,8 +56,10 @@ TAO_CEC_ProxyPushSupplier::TAO_CEC_ProxyPushSupplier (TAO_CEC_EventChannel* ec)
// TAO_CEC_ProxyPushSupplier Constructure (Typed EC)
#if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
-TAO_CEC_ProxyPushSupplier::TAO_CEC_ProxyPushSupplier (TAO_CEC_TypedEventChannel* ec)
- : typed_event_channel_ (ec),
+TAO_CEC_ProxyPushSupplier::TAO_CEC_ProxyPushSupplier (TAO_CEC_TypedEventChannel* ec,
+ ACE_Time_Value timeout)
+ : timeout_ (timeout),
+ typed_event_channel_ (ec),
refcount_ (1)
{
event_channel_ = 0;
@@ -400,8 +404,7 @@ TAO_CEC_ProxyPushSupplier::connect_push_consumer (
// Re-connections are allowed....
this->cleanup_i ();
- this->typed_consumer_ =
- CosTypedEventComm::TypedPushConsumer::_duplicate (local_typed_consumer.in () );
+ this->typed_consumer_ = apply_policy (local_typed_consumer.in () );
ACE_CHECK;
TAO_CEC_Unlock reverse_lock (*this->lock_);
@@ -420,13 +423,12 @@ TAO_CEC_ProxyPushSupplier::connect_push_consumer (
}
- this->typed_consumer_ =
- CosTypedEventComm::TypedPushConsumer::_duplicate (local_typed_consumer.in () );
+ this->typed_consumer_ = apply_policy (local_typed_consumer.in () );
ACE_CHECK;
// Store the typed object interface from the consumer
this->typed_consumer_obj_ =
- CORBA::Object::_duplicate (local_typed_consumer_obj.in () );
+ apply_policy_obj (local_typed_consumer_obj.in () );
ACE_CHECK;
}
@@ -462,8 +464,7 @@ TAO_CEC_ProxyPushSupplier::connect_push_consumer (
// Re-connections are allowed....
this->cleanup_i ();
- this->consumer_ =
- CosEventComm::PushConsumer::_duplicate (push_consumer);
+ this->consumer_ = apply_policy (push_consumer);
TAO_CEC_Unlock reverse_lock (*this->lock_);
@@ -480,8 +481,7 @@ TAO_CEC_ProxyPushSupplier::connect_push_consumer (
return;
}
- this->consumer_ =
- CosEventComm::PushConsumer::_duplicate (push_consumer);
+ this->consumer_ = apply_policy (push_consumer);
}
// Notify the event channel...
@@ -494,6 +494,60 @@ TAO_CEC_ProxyPushSupplier::connect_push_consumer (
}
}
+CORBA::Object_ptr
+TAO_CEC_ProxyPushSupplier::apply_policy_obj (CORBA::Object_ptr pre)
+{
+#if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
+ CORBA::Object_var post = CORBA::Object::_duplicate (pre);
+ if (this->timeout_ > ACE_Time_Value::zero)
+ {
+ CORBA::PolicyList policy_list;
+ policy_list.length (1);
+ if (this->typed_event_channel_)
+ {
+ policy_list[0] = this->typed_event_channel_->
+ create_roundtrip_timeout_policy (this->timeout_);
+ }
+ else
+ {
+ policy_list[0] = this->event_channel_->
+ create_roundtrip_timeout_policy (this->timeout_);
+ }
+ post = pre->_set_policy_overrides (policy_list, CORBA::ADD_OVERRIDE);
+
+ policy_list[0]->destroy ();
+ policy_list.length (0);
+ }
+ return post._retn ();
+#else
+ return CORBA::Object::_duplicate (pre);
+#endif /* TAO_HAS_CORBA_MESSAGING */
+}
+
+CosEventComm::PushConsumer_ptr
+TAO_CEC_ProxyPushSupplier::apply_policy (CosEventComm::PushConsumer_ptr pre)
+{
+ this->nopolicy_consumer_ = CosEventComm::PushConsumer::_duplicate (pre);
+ CORBA::Object_var post_obj = apply_policy_obj (pre);
+ CosEventComm::PushConsumer_var post =
+ CosEventComm::PushConsumer::_narrow (post_obj.in ());
+ return post._retn ();
+}
+
+#if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
+CosTypedEventComm::TypedPushConsumer_ptr
+TAO_CEC_ProxyPushSupplier::apply_policy
+ (CosTypedEventComm::TypedPushConsumer_ptr pre)
+{
+ this->nopolicy_typed_consumer_ =
+ CosTypedEventComm::TypedPushConsumer::_duplicate (pre);
+ CORBA::Object_var post_obj = apply_policy_obj (pre);
+ CosTypedEventComm::TypedPushConsumer_var post =
+ CosTypedEventComm::TypedPushConsumer::_narrow (post_obj.in ());
+ return post._retn ();
+}
+#endif
+
void
TAO_CEC_ProxyPushSupplier::disconnect_push_supplier (
ACE_ENV_SINGLE_ARG_DECL)
@@ -879,20 +933,21 @@ TAO_CEC_ProxyPushSupplier::consumer_non_existent (
#if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
if (this->is_typed_ec () )
{
- if (CORBA::is_nil (this->typed_consumer_.in ()))
+ if (CORBA::is_nil (this->nopolicy_typed_consumer_.in ()))
{
return 0;
}
- consumer = CORBA::Object::_duplicate (this->typed_consumer_.in ());
+ consumer = CORBA::Object::_duplicate
+ (this->nopolicy_typed_consumer_.in ());
}
else
{
#endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
- if (CORBA::is_nil (this->consumer_.in ()))
+ if (CORBA::is_nil (this->nopolicy_consumer_.in ()))
{
return 0;
}
- consumer = CORBA::Object::_duplicate (this->consumer_.in ());
+ consumer = CORBA::Object::_duplicate (this->nopolicy_consumer_.in ());
#if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
} /* ! this->is_typed_ec */
#endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushSupplier.h b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushSupplier.h
index 5ebed2713c7..93edaeab43c 100644
--- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushSupplier.h
+++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushSupplier.h
@@ -62,11 +62,13 @@ public:
typedef CosEventChannelAdmin::ProxyPushSupplier_var _var_type;
/// constructor...
- TAO_CEC_ProxyPushSupplier (TAO_CEC_EventChannel* event_channel);
+ TAO_CEC_ProxyPushSupplier (TAO_CEC_EventChannel* event_channel,
+ ACE_Time_Value timeout);
/// typed ec constructor
#if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
- TAO_CEC_ProxyPushSupplier (TAO_CEC_TypedEventChannel* typed_event_channel);
+ TAO_CEC_ProxyPushSupplier (TAO_CEC_TypedEventChannel* typed_event_channel,
+ ACE_Time_Value timeout);
#endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
/// destructor...
@@ -166,10 +168,24 @@ protected:
CORBA::Boolean is_typed_ec (void) const;
#endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
+ /// Assigns the parameter to both consumer_ and nopolicy_consumer_, and
+ /// applies policies (when appropriate) to consumer_.
+ CosEventComm::PushConsumer_ptr apply_policy
+ (CosEventComm::PushConsumer_ptr c);
+
+#if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
+ CosTypedEventComm::TypedPushConsumer_ptr apply_policy
+ (CosTypedEventComm::TypedPushConsumer_ptr c);
+#endif
+
+ CORBA::Object_ptr apply_policy_obj (CORBA::Object_ptr c);
+
private:
/// The Event Channel that owns this object.
TAO_CEC_EventChannel* event_channel_;
+ ACE_Time_Value timeout_;
+
/// The Typed Event Channel that owns this object.
#if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
TAO_CEC_TypedEventChannel *typed_event_channel_;
@@ -181,15 +197,25 @@ private:
/// The reference count.
CORBA::ULong refcount_;
- /// The consumer....
+ /// The consumer -- use apply_policy() instead of assigning directly to
+ /// consumer_. This will keep consumer_ and nopolicy_consumer_ in sync.
CosEventComm::PushConsumer_var consumer_;
+ /// The consumer without any policies applied
+ CosEventComm::PushConsumer_var nopolicy_consumer_;
+
#if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
- /// The typed consumer....
+ /// The typed consumer -- use apply_policy() instead of assigning directly to
+ /// typed_consumer_. This will keep typed_consumer_ and
+ /// nopolicy_typed_consumer_ in sync.
CosTypedEventComm::TypedPushConsumer_var typed_consumer_;
/// The consumer object returned from get_typed_consumer()
CORBA::Object_var typed_consumer_obj_;
+
+ /// The typed consumer without any policies applied
+ CosTypedEventComm::TypedPushConsumer_var nopolicy_typed_consumer_;
+
#endif /* TAO_HAS_TYPED_EVENT_CHANNEL */
/// Store the default POA.
diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_TypedEventChannel.cpp b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_TypedEventChannel.cpp
index ffb80d8158c..ab0c397f78e 100644
--- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_TypedEventChannel.cpp
+++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_TypedEventChannel.cpp
@@ -9,7 +9,9 @@
#include "orbsvcs/CosEvent/CEC_ConsumerControl.h"
#include "orbsvcs/CosEvent/CEC_SupplierControl.h"
#include "tao/debug.h"
+#include "tao/ORB_Core.h"
#include "ace/Dynamic_Service.h"
+#include "ace/Reactor.h"
#if ! defined (__ACE_INLINE__)
#include "orbsvcs/CosEvent/CEC_TypedEventChannel.i"
@@ -81,6 +83,23 @@ TAO_CEC_TypedEventChannel::activate (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
this->supplier_control_->activate ();
}
+namespace
+{
+ struct ShutdownHandler : ACE_Event_Handler
+ {
+ ShutdownHandler (CORBA::ORB_ptr orb)
+ : orb_ (CORBA::ORB::_duplicate (orb)) {}
+ CORBA::ORB_var orb_;
+
+ virtual int handle_timeout (const ACE_Time_Value&, const void*)
+ {
+ orb_->shutdown (1);
+ return 0;
+ }
+
+ };
+}
+
void
TAO_CEC_TypedEventChannel::shutdown (ACE_ENV_SINGLE_ARG_DECL)
{
@@ -126,7 +145,10 @@ TAO_CEC_TypedEventChannel::shutdown (ACE_ENV_SINGLE_ARG_DECL)
t_poa->deactivate_object (t_id.in () ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
- this->orb_->shutdown(0);
+ ACE_Event_Handler *timer;
+ ACE_NEW (timer, ShutdownHandler (this->orb_.in ()));
+ ACE_Reactor *reactor = this->orb_->orb_core ()->reactor ();
+ reactor->schedule_timer (timer, 0, ACE_Time_Value (1));
}
}
@@ -565,4 +587,11 @@ TAO_CEC_TypedEventChannel::destroy (ACE_ENV_SINGLE_ARG_DECL)
}
}
+CORBA::Policy_ptr
+TAO_CEC_TypedEventChannel::create_roundtrip_timeout_policy
+ (ACE_Time_Value timeout)
+{
+ return this->factory_->create_roundtrip_timeout_policy (timeout);
+}
+
TAO_END_VERSIONED_NAMESPACE_DECL
diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_TypedEventChannel.h b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_TypedEventChannel.h
index e98f9b28375..e8c76b57d0b 100644
--- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_TypedEventChannel.h
+++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_TypedEventChannel.h
@@ -267,6 +267,9 @@ public:
ServantRetryMap& get_servant_retry_map (void);
+ /// Forwarded to the factory
+ CORBA::Policy_ptr create_roundtrip_timeout_policy (ACE_Time_Value timeout);
+
protected:
/// Function caches the full interface description from the IFR
int cache_interface_description (const char *interface ACE_ENV_ARG_DECL);
diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_TypedProxyPushConsumer.cpp b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_TypedProxyPushConsumer.cpp
index f745c5a942c..903163c1d49 100644
--- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_TypedProxyPushConsumer.cpp
+++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_TypedProxyPushConsumer.cpp
@@ -19,8 +19,10 @@ TAO_BEGIN_VERSIONED_NAMESPACE_DECL
typedef ACE_Reverse_Lock<ACE_Lock> TAO_CEC_Unlock;
// Implementation skeleton constructor
-TAO_CEC_TypedProxyPushConsumer::TAO_CEC_TypedProxyPushConsumer (TAO_CEC_TypedEventChannel* ec)
+TAO_CEC_TypedProxyPushConsumer::TAO_CEC_TypedProxyPushConsumer
+ (TAO_CEC_TypedEventChannel* ec, ACE_Time_Value timeout)
: typed_event_channel_ (ec),
+ timeout_ (timeout),
refcount_ (1),
connected_ (0)
{
@@ -138,11 +140,12 @@ TAO_CEC_TypedProxyPushConsumer::supplier_non_existent (
disconnected = 1;
return 0;
}
- if (CORBA::is_nil (this->typed_supplier_.in ()))
+ if (CORBA::is_nil (this->nopolicy_typed_supplier_.in ()))
{
return 0;
}
- supplier = CORBA::Object::_duplicate (this->typed_supplier_.in ());
+ supplier = CORBA::Object::_duplicate
+ (this->nopolicy_typed_supplier_.in ());
}
#if (TAO_HAS_MINIMUM_CORBA == 0)
@@ -259,8 +262,7 @@ TAO_CEC_TypedProxyPushConsumer::connect_push_supplier (
if (this->is_connected_i ())
return;
}
- this->typed_supplier_ =
- CosEventComm::PushSupplier::_duplicate (push_supplier);
+ this->typed_supplier_ = apply_policy (push_supplier);
this->connected_ = 1;
}
@@ -268,6 +270,32 @@ TAO_CEC_TypedProxyPushConsumer::connect_push_supplier (
this->typed_event_channel_->connected (this ACE_ENV_ARG_PARAMETER);
}
+CosEventComm::PushSupplier_ptr
+TAO_CEC_TypedProxyPushConsumer::apply_policy
+ (CosEventComm::PushSupplier_ptr pre)
+{
+ if (CORBA::is_nil(pre)) return pre;
+ this->nopolicy_typed_supplier_ =
+ CosEventComm::PushSupplier::_duplicate (pre);
+ CosEventComm::PushSupplier_var post =
+ CosEventComm::PushSupplier::_duplicate (pre);
+ if (this->timeout_ > ACE_Time_Value::zero)
+ {
+ CORBA::PolicyList policy_list;
+ policy_list.length (1);
+ policy_list[0] = this->typed_event_channel_->
+ create_roundtrip_timeout_policy (this->timeout_);
+
+ CORBA::Object_var post_obj = pre->_set_policy_overrides
+ (policy_list, CORBA::ADD_OVERRIDE);
+ post = CosEventComm::PushSupplier::_narrow(post_obj.in ());
+
+ policy_list[0]->destroy ();
+ policy_list.length (0);
+ }
+ return post._retn ();
+}
+
void
TAO_CEC_TypedProxyPushConsumer::push (const CORBA::Any& /* event */
ACE_ENV_ARG_DECL)
diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_TypedProxyPushConsumer.h b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_TypedProxyPushConsumer.h
index fa9e936dd78..32dcb2ee99a 100644
--- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_TypedProxyPushConsumer.h
+++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_TypedProxyPushConsumer.h
@@ -42,7 +42,8 @@ public:
//Constructor
TAO_CEC_TypedProxyPushConsumer (
- TAO_CEC_TypedEventChannel* typed_event_channel
+ TAO_CEC_TypedEventChannel* typed_event_channel,
+ ACE_Time_Value timeout
);
//Destructor
@@ -112,19 +113,31 @@ protected:
/// Release the supplier
void cleanup_i (void);
+ /// Assigns the parameter to both supplier_ and nopolicy_supplier_, and
+ /// applies policies (when appropriate) to supplier_.
+ CosEventComm::PushSupplier_ptr apply_policy
+ (CosEventComm::PushSupplier_ptr pre);
+
private:
/// The typed supplier admin, used for activation and memory managment.
TAO_CEC_TypedEventChannel* typed_event_channel_;
+ ACE_Time_Value timeout_;
+
/// The locking strategy.
ACE_Lock* lock_;
/// The reference count.
CORBA::ULong refcount_;
- /// The typed supplier....
+ /// The typed supplier -- use apply_policy() instead of assigning directly to
+ /// typed_supplier_. This will keep typed_supplier_ and
+ /// nopolicy_typed_supplier_ in sync.
CosEventComm::PushSupplier_var typed_supplier_;
+ /// The typed supplier without any policies applied
+ CosEventComm::PushSupplier_var nopolicy_typed_supplier_;
+
/// The DSI impl
TAO_CEC_DynamicImplementationServer* dsi_impl_;
diff --git a/TAO/orbsvcs/tests/CosEvent/Timeout/README b/TAO/orbsvcs/tests/CosEvent/Timeout/README
new file mode 100644
index 00000000000..6b415c9ba00
--- /dev/null
+++ b/TAO/orbsvcs/tests/CosEvent/Timeout/README
@@ -0,0 +1,40 @@
+# $Id$
+
+CosEvent timeout feature test
+
+*Purpose:
+
+Demonstrates the timeout feature (see cosevent.conf in this directory) of the
+CosEvent service. This feature was created because the reactive consumer/
+supplier control is lacking in the following way. A consumer may become
+unresponsive during a push() upcall. If the consumer is at all multithreaded,
+then the reactive control's ping operation will still succeed. Timeouts
+(via the Messaging library's relative round-trip timeout policy) allow the
+event service to disconnect an unresponsive client (consumer or supplier).
+Notice in run_test.pl that the CosEvent_Service executable's new -b option
+is used to enable callbacks on disconnect. This was previously only availble
+through code.
+
+*Running:
+
+> run_test.pl
+
+*Sample output:
+
+Found the EchoEventChannel.
+Consumer connected
+Ready to receive events...
+Supplier starting...
+TestEventConsumer_i::push(): Received event containing 40 bytes.
+TestEventConsumer_i::push(): Received event containing 40 bytes.
+TestEventConsumer_i::push(): Received event containing 40 bytes.
+TestEventConsumer_i::push(): Received event containing 40 bytes.
+TestEventConsumer_i::push(): Received event containing 40 bytes.
+TestEventConsumer_i::push(): Received event containing 40 bytes.
+TestEventConsumer_i::push(): Received event containing 40 bytes.
+TestEventConsumer_i::push(): Received event containing 40 bytes.
+TestEventConsumer_i::push(): Received event containing 40 bytes.
+TestEventConsumer_i::push(): Received event containing 40 bytes.
+TestEventConsumer_i::push(): Simulating hung consumer
+TestEventConsumer_i::disconnect_push_consumer()
+TestEventConsumer_i::push(): Done
diff --git a/TAO/orbsvcs/tests/CosEvent/Timeout/TestEventConsumer_i.cpp b/TAO/orbsvcs/tests/CosEvent/Timeout/TestEventConsumer_i.cpp
new file mode 100644
index 00000000000..d143251cf90
--- /dev/null
+++ b/TAO/orbsvcs/tests/CosEvent/Timeout/TestEventConsumer_i.cpp
@@ -0,0 +1,78 @@
+// $Id$
+
+#include "TestEventConsumer_i.h"
+
+#include "ace/Log_Msg.h"
+
+
+TestEventConsumer_i::TestEventConsumer_i (CORBA::ORB_ptr orb, bool hang)
+ : orb_ (CORBA::ORB::_duplicate (orb)),
+ hang_ (hang),
+ count_ (0),
+ orbtask_ (orb),
+ cond_ (this->mtx_),
+ shutdown_ (false)
+{
+}
+
+void
+TestEventConsumer_i::activate ()
+{
+ this->orbtask_.activate ();
+}
+
+int
+TestEventConsumer_i::ORB_task::svc ()
+{
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TestEventConsumer_i ORB_task starting\n")));
+ try
+ {
+ this->orb_->run ();
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TestEventConsumer_i ORB_task "
+ "exiting\n")));
+ }
+ catch (CORBA::SystemException &e)
+ {
+ ACE_PRINT_EXCEPTION (e, ACE_TEXT ("TestEventConsumer_i ORB_task: "
+ "Caught CORBA::Exception:"));
+ }
+ return 0;
+}
+
+
+void
+TestEventConsumer_i::push (const CORBA::Any & data)
+ throw (CORBA::SystemException, CosEventComm::Disconnected)
+{
+ // Extract event data from the any.
+ const char *eventData;
+ if (data >>= eventData)
+ {
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TestEventConsumer_i::push(): Received "
+ "event containing %d bytes.\n"),
+ ACE_OS::strlen (eventData)));
+ if (this->hang_ && ++this->count_ == 10)
+ {
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TestEventConsumer_i::push(): "
+ "Simulating hung consumer\n")));
+ {
+ ACE_GUARD (TAO_SYNCH_MUTEX, guard, this->mtx_);
+ while (!this->shutdown_)
+ this->cond_.wait ();
+ }
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TestEventConsumer_i::push Done\n")
+ ));
+ }
+ }
+}
+
+void
+TestEventConsumer_i::disconnect_push_consumer ()
+ throw (CORBA::SystemException)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TestEventConsumer_i::disconnect_push_consumer()\n")));
+ this->orb_->shutdown ();
+ this->shutdown_ = true;
+ this->cond_.signal ();
+}
diff --git a/TAO/orbsvcs/tests/CosEvent/Timeout/TestEventConsumer_i.h b/TAO/orbsvcs/tests/CosEvent/Timeout/TestEventConsumer_i.h
new file mode 100644
index 00000000000..2f97247ece0
--- /dev/null
+++ b/TAO/orbsvcs/tests/CosEvent/Timeout/TestEventConsumer_i.h
@@ -0,0 +1,46 @@
+/* -*- C++ -*- */
+// $Id$
+
+#ifndef TestEventConsumer_i_h_
+#define TestEventConsumer_i_h_
+
+#include "orbsvcs/CosEventCommS.h"
+
+#include "ace/Task.h"
+#include "ace/Condition_T.h"
+
+class TestEventConsumer_i : public virtual POA_CosEventComm::PushConsumer
+{
+public:
+ TestEventConsumer_i (CORBA::ORB_ptr orb, bool hang);
+
+ // Override operations from PushConsumer interface.
+ virtual void push (const CORBA::Any & data)
+ throw (CORBA::SystemException, CosEventComm::Disconnected);
+
+ virtual void disconnect_push_consumer () throw (CORBA::SystemException);
+
+ void activate ();
+
+ struct ORB_task : ACE_Task_Base
+ {
+ ORB_task (CORBA::ORB_ptr orb) : orb_ (CORBA::ORB::_duplicate (orb))
+ {}
+
+ CORBA::ORB_var orb_;
+
+ int svc ();
+ };
+
+private:
+ CORBA::ORB_var orb_;
+ bool hang_;
+ unsigned count_;
+ ORB_task orbtask_;
+
+ TAO_SYNCH_MUTEX mtx_;
+ ACE_Condition<TAO_SYNCH_MUTEX> cond_;
+ bool shutdown_;
+};
+
+#endif
diff --git a/TAO/orbsvcs/tests/CosEvent/Timeout/Timeout.mpc b/TAO/orbsvcs/tests/CosEvent/Timeout/Timeout.mpc
new file mode 100644
index 00000000000..8fca7257ef7
--- /dev/null
+++ b/TAO/orbsvcs/tests/CosEvent/Timeout/Timeout.mpc
@@ -0,0 +1,4 @@
+//$Id$
+project(*Timeout): orbsvcsexe, naming, event, event_skel, messaging {
+ exename = TimeoutTest
+}
diff --git a/TAO/orbsvcs/tests/CosEvent/Timeout/TimeoutTestMain.cpp b/TAO/orbsvcs/tests/CosEvent/Timeout/TimeoutTestMain.cpp
new file mode 100644
index 00000000000..d5a4986b562
--- /dev/null
+++ b/TAO/orbsvcs/tests/CosEvent/Timeout/TimeoutTestMain.cpp
@@ -0,0 +1,177 @@
+// $Id$
+
+#include "TestEventConsumer_i.h"
+
+#include "orbsvcs/CosEventCommC.h"
+#include "orbsvcs/CosEventChannelAdminC.h"
+#include "orbsvcs/CosNamingC.h"
+
+#include "ace/Task.h"
+#include "ace/Log_Msg.h"
+#include "ace/OS_NS_strings.h"
+#include "ace/OS_NS_unistd.h"
+
+namespace
+{
+ const char *forty_bytes = "1234567890123456789012345678901234567890";
+}
+
+struct SupplierTask : ACE_Task_Base
+{
+ static const int N_ITERATIONS = 15;
+
+ SupplierTask (CosEventChannelAdmin::ProxyPushConsumer_ptr ppc,
+ CORBA::ORB_ptr orb)
+ : consumer_ (CosEventChannelAdmin::ProxyPushConsumer::_duplicate (ppc)),
+ orb_ (CORBA::ORB::_duplicate (orb))
+ {
+ }
+
+ CosEventChannelAdmin::ProxyPushConsumer_var consumer_;
+ CORBA::ORB_var orb_;
+
+ int svc()
+ {
+ // Create an event (just a string in this case).
+ CORBA::String_var eventData = forty_bytes;
+ int delay_ms = 500;
+
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Supplier starting...\n")));
+
+ for (int i = 0; i < N_ITERATIONS; ++i)
+ {
+ // Insert the event data into an any.
+ CORBA::Any any;
+ any <<= eventData;
+
+ // Now push the event to the consumer
+ this->consumer_->push (any);
+
+ ACE_Time_Value event_delay (0, 1000 * delay_ms);
+ ACE_OS::sleep (event_delay);
+ }
+ this->orb_->destroy ();
+ return 0;
+ }
+};
+
+int ACE_TMAIN (int argc, ACE_TCHAR *argv[])
+{
+ try
+ {
+ // Initialize the ORB.
+ CORBA::ORB_var orb = CORBA::ORB_init (argc, argv);
+
+ // Find the Naming Service.
+ CORBA::Object_var obj = orb->resolve_initial_references ("NameService");
+ CosNaming::NamingContextExt_var root_context =
+ CosNaming::NamingContextExt::_narrow (obj.in ());
+
+ obj = root_context->resolve_str ("CosEventService");
+
+ // Downcast the object reference to an EventChannel reference.
+ CosEventChannelAdmin::EventChannel_var ec =
+ CosEventChannelAdmin::EventChannel::_narrow (obj.in ());
+ if (CORBA::is_nil (ec.in ()))
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Could not narrow the EventChannel.\n")));
+ return 1;
+ }
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Found the EventChannel.\n")));
+
+ bool consumer = false;
+ bool supplier = false;
+ bool hang = false;
+ for (int i=1; i < argc; ++i)
+ {
+ if (0 == ACE_OS::strcasecmp (argv[i], ACE_TEXT ("-consumer")))
+ consumer = true;
+ else if (0 == ACE_OS::strcasecmp (argv[i], ACE_TEXT ("-supplier")))
+ supplier = true;
+ else if (0 == ACE_OS::strcasecmp (argv[i], ACE_TEXT ("-hang")))
+ hang = true;
+ }
+
+ TestEventConsumer_i servant (orb.in (), hang);
+
+ if (consumer)
+ {
+ // Register it with the RootPOA.
+ obj = orb->resolve_initial_references ("RootPOA");
+ PortableServer::POA_var poa =
+ PortableServer::POA::_narrow (obj.in ());
+ PortableServer::ObjectId_var oid = poa->activate_object (&servant);
+ CORBA::Object_var consumer_obj = poa->id_to_reference (oid.in ());
+ CosEventComm::PushConsumer_var consumer =
+ CosEventComm::PushConsumer::_narrow (consumer_obj.in ());
+
+ // Get a ConsumerAdmin object from the EventChannel.
+ CosEventChannelAdmin::ConsumerAdmin_var consumerAdmin =
+ ec->for_consumers ();
+
+ // Get a ProxyPushSupplier from the ConsumerAdmin.
+ CosEventChannelAdmin::ProxyPushSupplier_var supplier =
+ consumerAdmin->obtain_push_supplier ();
+
+ // Connect to the ProxyPushSupplier, passing our PushConsumer object
+ // reference to it.
+ supplier->connect_push_consumer (consumer.in ());
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Consumer connected\n")));
+
+ // Activate the POA via its POAManager.
+ PortableServer::POAManager_var poa_manager = poa->the_POAManager ();
+ poa_manager->activate ();
+ servant.activate ();
+ }
+
+ SupplierTask *pST = 0;
+ if (supplier)
+ {
+ // The supplier will use its own ORB.
+ CORBA::String_var ec_str = orb->object_to_string (ec.in ());
+
+ int no_args = 0;
+ CORBA::ORB_var s_orb = CORBA::ORB_init (no_args, 0,
+ "Supplier_pure_client_ORB");
+
+ CORBA::Object_var s_ec_obj = s_orb->string_to_object (ec_str.in ());
+
+ CosEventChannelAdmin::EventChannel_var s_ec =
+ CosEventChannelAdmin::EventChannel::_narrow (s_ec_obj.in ());
+
+ // Get a SupplierAdmin object from the EventChannel.
+ CosEventChannelAdmin::SupplierAdmin_var supplierAdmin =
+ s_ec->for_suppliers ();
+
+ // Get a ProxyPushConsumer from the SupplierAdmin.
+ CosEventChannelAdmin::ProxyPushConsumer_var consumer =
+ supplierAdmin->obtain_push_consumer ();
+
+ // Connect to the ProxyPushConsumer as a PushSupplier
+ // (passing a nil PushSupplier object reference to it because
+ // we don't care to be notified about disconnects).
+ consumer->connect_push_supplier
+ (CosEventComm::PushSupplier::_nil ());
+
+ ACE_NEW_RETURN (pST, SupplierTask (consumer.in (), s_orb.in ()), -1);
+ pST->activate ();
+ }
+
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Ready to receive events...\n")));
+
+ // Enter the ORB event loop.
+ orb->run ();
+
+ ACE_Thread_Manager::instance ()->wait ();
+ orb->destroy ();
+ return 0;
+ }
+ catch (CORBA::Exception &ex)
+ {
+ ACE_PRINT_EXCEPTION (ex,
+ ACE_TEXT ("TimeoutTest: Caught CORBA::Exception:"));
+ }
+
+ return 1;
+}
diff --git a/TAO/orbsvcs/tests/CosEvent/Timeout/cosevent.conf b/TAO/orbsvcs/tests/CosEvent/Timeout/cosevent.conf
new file mode 100644
index 00000000000..1fa9bf39317
--- /dev/null
+++ b/TAO/orbsvcs/tests/CosEvent/Timeout/cosevent.conf
@@ -0,0 +1,3 @@
+# $Id$
+static CEC_Factory "-CECSupplierControl reactive -CECConsumerControl reactive -CECDispatching mt -CECConsumerOperationTimeout 3000000 -CECSupplierOperationTimeout 3000000"
+static Server_Strategy_Factory "-ORBConcurrency thread-per-connection"
diff --git a/TAO/orbsvcs/tests/CosEvent/Timeout/run_test.pl b/TAO/orbsvcs/tests/CosEvent/Timeout/run_test.pl
new file mode 100755
index 00000000000..191350e5691
--- /dev/null
+++ b/TAO/orbsvcs/tests/CosEvent/Timeout/run_test.pl
@@ -0,0 +1,66 @@
+eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}'
+ & eval 'exec perl -S $0 $argv:q'
+ if 0;
+# $Id$
+
+use Env (ACE_ROOT);
+use lib "$ACE_ROOT/bin";
+use PerlACE::Run_Test;
+
+$nsiorfile = PerlACE::LocalFile ("ns.ior");
+$esiorfile = PerlACE::LocalFile ("es.ior");
+$arg_ns_ref = "-ORBInitRef NameService=file://$nsiorfile";
+
+$status = 0;
+
+unlink $nsiorfile;
+unlink $esiorfile;
+
+# start Naming Service
+
+$NameService = "$ENV{ACE_ROOT}/TAO/orbsvcs/Naming_Service/Naming_Service";
+$NS = new PerlACE::Process($NameService, "-o $nsiorfile");
+$NS->Spawn();
+if (PerlACE::waitforfile_timed ($nsiorfile, 5) == -1) {
+ print STDERR "ERROR: cannot find file <$nsiorfile>\n";
+ $NS->Kill();
+ exit 1;
+}
+
+# start Event Service
+$EventService = "$ENV{ACE_ROOT}/TAO/orbsvcs/CosEvent_Service/CosEvent_Service";
+$ES = new PerlACE::Process($EventService, "-ORBSvcConf cosevent.conf "
+ ."-b -o $esiorfile $arg_ns_ref");
+$ES->Spawn();
+if (PerlACE::waitforfile_timed ($esiorfile, 5) == -1) {
+ print STDERR "ERROR: cannot find file <$esiorfile>\n";
+ $ES->Kill();
+ unlink $nsiorfile;
+ exit 1;
+}
+
+
+$S = new PerlACE::Process("TimeoutTest", "-supplier -consumer -hang "
+ . $arg_ns_ref);
+$ret = $S->SpawnWaitKill(30);
+if ($ret != 0) {
+ print STDERR "ERROR: application returned $ret\n";
+ $status = 1;
+}
+
+$ret = $ES->Kill();
+if ($ret != 0) {
+ print STDERR "ERROR: event channel returned $ret\n";
+ $status = 1;
+}
+
+$ret = $NS->Kill();
+if ($ret != 0) {
+ print STDERR "ERROR: name service returned $ret\n";
+ $status = 1;
+}
+
+unlink $nsiorfile;
+unlink $esiorfile;
+
+exit $status;