From aeb6a3d807bb99f4fd16833d9b1999fe62195466 Mon Sep 17 00:00:00 2001 From: nobody Date: Thu, 6 Feb 2003 23:41:28 +0000 Subject: This commit was manufactured by cvs2svn to create branch 'RT_Notify'. --- TAO/orbsvcs/orbsvcs/Notify/AdminProperties.cpp | 70 ++++++ TAO/orbsvcs/orbsvcs/Notify/AdminProperties.h | 115 +++++++++ TAO/orbsvcs/orbsvcs/Notify/AdminProperties.inl | 55 +++++ TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.cpp | 87 +++++++ TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.h | 80 ++++++ TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.inl | 3 + .../orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.cpp | 98 ++++++++ .../orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.h | 88 +++++++ .../orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.inl | 3 + .../orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.cpp | 74 ++++++ .../orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.h | 87 +++++++ .../orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.inl | 3 + .../orbsvcs/Notify/Any/ProxyPushConsumer.cpp | 110 +++++++++ TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.h | 94 +++++++ .../orbsvcs/Notify/Any/ProxyPushConsumer.inl | 3 + .../orbsvcs/Notify/Any/ProxyPushSupplier.cpp | 84 +++++++ TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushSupplier.h | 93 +++++++ .../orbsvcs/Notify/Any/ProxyPushSupplier.inl | 3 + TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp | 68 ++++++ TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.h | 66 +++++ TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.inl | 3 + TAO/orbsvcs/orbsvcs/Notify/Any/PushSupplier.cpp | 42 ++++ TAO/orbsvcs/orbsvcs/Notify/Any/PushSupplier.h | 60 +++++ TAO/orbsvcs/orbsvcs/Notify/Any/PushSupplier.inl | 3 + TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp | 271 +++++++++++++++++++++ TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.h | 137 +++++++++++ TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.inl | 19 ++ .../orbsvcs/Notify/CosNotify_Initializer.cpp | 13 + TAO/orbsvcs/orbsvcs/Notify/CosNotify_Initializer.h | 38 +++ .../orbsvcs/Notify/Method_Request_Updates.cpp | 55 +++++ .../orbsvcs/Notify/Method_Request_Updates.h | 64 +++++ .../orbsvcs/Notify/Method_Request_Updates.inl | 3 + TAO/orbsvcs/orbsvcs/Notify/Property.h | 39 +++ TAO/orbsvcs/orbsvcs/Notify/PropertySeq.cpp | 53 ++++ TAO/orbsvcs/orbsvcs/Notify/PropertySeq.h | 62 +++++ TAO/orbsvcs/orbsvcs/Notify/PropertySeq.inl | 7 + TAO/orbsvcs/orbsvcs/Notify/Property_Boolean.cpp | 43 ++++ TAO/orbsvcs/orbsvcs/Notify/Property_Boolean.h | 81 ++++++ TAO/orbsvcs/orbsvcs/Notify/Property_Boolean.inl | 49 ++++ TAO/orbsvcs/orbsvcs/Notify/Property_T.cpp | 119 +++++++++ TAO/orbsvcs/orbsvcs/Notify/Property_T.h | 161 ++++++++++++ TAO/orbsvcs/orbsvcs/Notify/Property_T.inl | 78 ++++++ TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.cpp | 80 ++++++ TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.h | 84 +++++++ TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.inl | 1 + TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.cpp | 222 +++++++++++++++++ TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.h | 162 ++++++++++++ TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.inl | 1 + TAO/orbsvcs/orbsvcs/Notify/Proxy_T.cpp | 135 ++++++++++ TAO/orbsvcs/orbsvcs/Notify/Proxy_T.h | 158 ++++++++++++ TAO/orbsvcs/orbsvcs/Notify/Proxy_T.inl | 1 + TAO/orbsvcs/orbsvcs/Notify/QoSProperties.cpp | 126 ++++++++++ TAO/orbsvcs/orbsvcs/Notify/QoSProperties.h | 99 ++++++++ TAO/orbsvcs/orbsvcs/Notify/QoSProperties.inl | 25 ++ .../Notify/Sequence/Batch_Buffering_Strategy.cpp | 92 +++++++ .../Notify/Sequence/Batch_Buffering_Strategy.h | 60 +++++ .../Notify/Sequence/Batch_Buffering_Strategy.inl | 3 + .../Notify/Sequence/SequenceProxyPushConsumer.cpp | 107 ++++++++ .../Notify/Sequence/SequenceProxyPushConsumer.h | 98 ++++++++ .../Notify/Sequence/SequenceProxyPushConsumer.inl | 3 + .../Notify/Sequence/SequenceProxyPushSupplier.cpp | 84 +++++++ .../Notify/Sequence/SequenceProxyPushSupplier.h | 94 +++++++ .../Notify/Sequence/SequenceProxyPushSupplier.inl | 3 + .../Notify/Sequence/SequencePushConsumer.cpp | 181 ++++++++++++++ .../orbsvcs/Notify/Sequence/SequencePushConsumer.h | 114 +++++++++ .../Notify/Sequence/SequencePushConsumer.inl | 3 + .../Notify/Sequence/SequencePushSupplier.cpp | 33 +++ .../orbsvcs/Notify/Sequence/SequencePushSupplier.h | 59 +++++ .../Notify/Sequence/SequencePushSupplier.inl | 3 + TAO/orbsvcs/orbsvcs/Notify/Timer.h | 49 ++++ TAO/orbsvcs/orbsvcs/Notify/Timer_Queue.cpp | 49 ++++ TAO/orbsvcs/orbsvcs/Notify/Timer_Queue.h | 68 ++++++ TAO/orbsvcs/orbsvcs/Notify/Timer_Queue.inl | 3 + TAO/orbsvcs/orbsvcs/Notify/Timer_Reactor.cpp | 49 ++++ TAO/orbsvcs/orbsvcs/Notify/Timer_Reactor.h | 64 +++++ TAO/orbsvcs/orbsvcs/Notify/Timer_Reactor.inl | 3 + 76 files changed, 5000 insertions(+) create mode 100644 TAO/orbsvcs/orbsvcs/Notify/AdminProperties.cpp create mode 100644 TAO/orbsvcs/orbsvcs/Notify/AdminProperties.h create mode 100644 TAO/orbsvcs/orbsvcs/Notify/AdminProperties.inl create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.cpp create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.h create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.inl create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.cpp create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.h create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.inl create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.cpp create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.h create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.inl create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.cpp create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.h create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.inl create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushSupplier.cpp create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushSupplier.h create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushSupplier.inl create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.h create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.inl create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Any/PushSupplier.cpp create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Any/PushSupplier.h create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Any/PushSupplier.inl create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.h create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.inl create mode 100644 TAO/orbsvcs/orbsvcs/Notify/CosNotify_Initializer.cpp create mode 100644 TAO/orbsvcs/orbsvcs/Notify/CosNotify_Initializer.h create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.cpp create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.h create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.inl create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Property.h create mode 100644 TAO/orbsvcs/orbsvcs/Notify/PropertySeq.cpp create mode 100644 TAO/orbsvcs/orbsvcs/Notify/PropertySeq.h create mode 100644 TAO/orbsvcs/orbsvcs/Notify/PropertySeq.inl create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Property_Boolean.cpp create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Property_Boolean.h create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Property_Boolean.inl create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Property_T.cpp create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Property_T.h create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Property_T.inl create mode 100644 TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.cpp create mode 100644 TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.h create mode 100644 TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.inl create mode 100644 TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.cpp create mode 100644 TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.h create mode 100644 TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.inl create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Proxy_T.cpp create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Proxy_T.h create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Proxy_T.inl create mode 100644 TAO/orbsvcs/orbsvcs/Notify/QoSProperties.cpp create mode 100644 TAO/orbsvcs/orbsvcs/Notify/QoSProperties.h create mode 100644 TAO/orbsvcs/orbsvcs/Notify/QoSProperties.inl create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.cpp create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.h create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.inl create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.cpp create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.h create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.inl create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.cpp create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.h create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.inl create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.inl create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.cpp create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.h create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.inl create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Timer.h create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Timer_Queue.cpp create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Timer_Queue.h create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Timer_Queue.inl create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Timer_Reactor.cpp create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Timer_Reactor.h create mode 100644 TAO/orbsvcs/orbsvcs/Notify/Timer_Reactor.inl diff --git a/TAO/orbsvcs/orbsvcs/Notify/AdminProperties.cpp b/TAO/orbsvcs/orbsvcs/Notify/AdminProperties.cpp new file mode 100644 index 00000000000..46b3d3c0ec2 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/AdminProperties.cpp @@ -0,0 +1,70 @@ +// $Id$ + +#include "AdminProperties.h" + +#if ! defined (__ACE_INLINE__) +#include "AdminProperties.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Notify, TAO_NS_AdminProperties, "$id$") + +#include "orbsvcs/CosNotificationC.h" + +TAO_NS_AdminProperties::TAO_NS_AdminProperties (void) + : max_global_queue_length_ (CosNotification::MaxQueueLength, 0), + max_consumers_ (CosNotification::MaxConsumers, 0), + max_suppliers_ (CosNotification::MaxSuppliers, 0), + reject_new_events_ (CosNotification::RejectNewEvents, 0), + global_queue_length_ (0), + global_queue_not_full_condition_ (global_queue_lock_) +{ +} + +TAO_NS_AdminProperties::~TAO_NS_AdminProperties () +{ +} + +int +TAO_NS_AdminProperties::init (const CosNotification::PropertySeq& prop_seq) +{ + if (TAO_NS_PropertySeq::init (prop_seq) != 0) + return -1; + + this->max_global_queue_length_.set (*this); + this->max_consumers_.set (*this); + this->max_suppliers_.set (*this); + this->reject_new_events_.set (*this); + + //@@ check if unsupported property was set. + // This will happen when number of successfull inits != numbers of items bound in map_. + + return 0; +} + +CORBA::Boolean +TAO_NS_AdminProperties::queue_full (void) +{ + ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, 1); + + if (this->max_global_queue_length () == 0) + return 0; + else + if (this->global_queue_length_ > this->max_global_queue_length ().value ()) + return 1; + + return 0; +} + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) + +template class ACE_Atomic_Op; +template class ACE_Atomic_Op_Ex; +template class ACE_Refcounted_Auto_Ptr; + +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) + +#pragma instantiate ACE_Atomic_Op +#pragma instantiate ACE_Atomic_Op_Ex +#pragma ACE_Refcounted_Auto_Ptr + +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/AdminProperties.h b/TAO/orbsvcs/orbsvcs/Notify/AdminProperties.h new file mode 100644 index 00000000000..fe14835ab24 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/AdminProperties.h @@ -0,0 +1,115 @@ +/* -*- C++ -*- */ +/** + * @file AdminProperties.h + * + * $Id$ + * + * @author Pradeep Gore + * + * + */ + +#ifndef TAO_NS_ADMINPROPERTIES_H +#define TAO_NS_ADMINPROPERTIES_H +#include "ace/pre.h" + +#include "notify_export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/Atomic_Op.h" +#include "ace/Refcounted_Auto_Ptr.h" +#include "tao/orbconf.h" +#include "Types.h" +#include "PropertySeq.h" +#include "Property_T.h" +#include "Property.h" +#include "Property_Boolean.h" + +/** + * @class TAO_NS_AdminProperties + * + * @brief The AdminProperties per EventChannel. + * + */ +class TAO_Notify_Export TAO_NS_AdminProperties : public TAO_NS_PropertySeq +{ +public: + /// Constuctor + TAO_NS_AdminProperties (void); + + /// Destructor + ~TAO_NS_AdminProperties (); + + // Init + int init (const CosNotification::PropertySeq& prop_seq); + + // = Accessors + const TAO_NS_Property_Long& max_global_queue_length (void) const; + const TAO_NS_Property_Long& max_consumers (void) const; + const TAO_NS_Property_Long& max_suppliers (void) const; + const TAO_NS_Property_Boolean& reject_new_events (void) const; + + CORBA::Long& global_queue_length (void); + TAO_SYNCH_MUTEX& global_queue_lock (void); + TAO_SYNCH_CONDITION& global_queue_not_full_condition (void); + + TAO_NS_Atomic_Property_Long& consumers (void); + TAO_NS_Atomic_Property_Long& suppliers (void); + + // = Helper method + /// Returns true if Queue is full + CORBA::Boolean queue_full (void); + +protected: + // @@ Pradeep can you explain why there is any maximum for these + // values? Should they be configurable by the user so the resource + // requirements can be bounded? + + // = Admin. properties + // for all these properties the default O implies no limit + /** + * The maximum number of events that will be queued by the channel before + * the channel begins discarding events or rejecting new events upon + * receipt of each new event. + */ + TAO_NS_Property_Long max_global_queue_length_; + + /// The maximum number of consumers that can be connected to the channel at + /// any given time. + TAO_NS_Property_Long max_consumers_; + + /// The maximum number of suppliers that can be connected to the channel at + /// any given time. + TAO_NS_Property_Long max_suppliers_; + + /// Reject any new event. + TAO_NS_Property_Boolean reject_new_events_; + + //= Variables + /// This is used to count the queue length across all buffers in the Notify Service + /// to enforce the "MaxQueueLength" property. + CORBA::Long global_queue_length_; + + /// Global queue lock used to serialize access to all queues. + TAO_SYNCH_MUTEX global_queue_lock_; + + /// The condition that the queue_length_ is not at max. + TAO_SYNCH_CONDITION global_queue_not_full_condition_; + + /// These are used to count the number of consumers and suppliers connected to + /// the system. + TAO_NS_Atomic_Property_Long consumers_; + TAO_NS_Atomic_Property_Long suppliers_; +}; + +typedef ACE_Refcounted_Auto_Ptr TAO_NS_AdminProperties_var; + +#if defined (__ACE_INLINE__) +#include "AdminProperties.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* TAO_NS_ADMINPROPERTIES_H */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/AdminProperties.inl b/TAO/orbsvcs/orbsvcs/Notify/AdminProperties.inl new file mode 100644 index 00000000000..4abdd9b071a --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/AdminProperties.inl @@ -0,0 +1,55 @@ +// $Id$ + +ACE_INLINE const TAO_NS_Property_Long& +TAO_NS_AdminProperties::max_global_queue_length (void) const +{ + return this->max_global_queue_length_; +} + +ACE_INLINE const TAO_NS_Property_Long& +TAO_NS_AdminProperties::max_consumers (void) const +{ + return this->max_consumers_; +} + +ACE_INLINE const TAO_NS_Property_Long& +TAO_NS_AdminProperties::max_suppliers (void) const +{ + return this->max_suppliers_; +} + +ACE_INLINE const TAO_NS_Property_Boolean& +TAO_NS_AdminProperties::reject_new_events (void) const +{ + return this->reject_new_events_; +} + +ACE_INLINE CORBA::Long& +TAO_NS_AdminProperties::global_queue_length (void) +{ + return this->global_queue_length_; +} + +ACE_INLINE TAO_SYNCH_MUTEX& +TAO_NS_AdminProperties::global_queue_lock (void) +{ + return this->global_queue_lock_; +} + +ACE_INLINE TAO_SYNCH_CONDITION& +TAO_NS_AdminProperties::global_queue_not_full_condition (void) +{ + return this->global_queue_not_full_condition_; +} + +ACE_INLINE TAO_NS_Atomic_Property_Long& +TAO_NS_AdminProperties::consumers (void) +{ + return this->consumers_; +} + +ACE_INLINE TAO_NS_Atomic_Property_Long& +TAO_NS_AdminProperties::suppliers (void) +{ + return this->suppliers_; +} diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.cpp b/TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.cpp new file mode 100644 index 00000000000..4eaaa2555b9 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.cpp @@ -0,0 +1,87 @@ +// $Id$ + +#include "AnyEvent.h" + +#if ! defined (__ACE_INLINE__) +#include "AnyEvent.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Notify, TAO_NS_AnyEvent, "$id$") + +#include "../Consumer.h" +#include "tao/debug.h" + +TAO_NS_EventType TAO_NS_AnyEvent::event_type_; + +TAO_NS_AnyEvent::TAO_NS_AnyEvent (const CORBA::Any &event) + : event_ (event) +{ +} + +TAO_NS_AnyEvent::~TAO_NS_AnyEvent () +{ +} + +const TAO_NS_EventType& +TAO_NS_AnyEvent::type (void) const +{ + return this->event_type_; +} + +void +TAO_NS_AnyEvent::convert (CosNotification::StructuredEvent& notification) +{ + TAO_NS_Event::translate (this->event_, notification); +} + +CORBA::Boolean +TAO_NS_AnyEvent::do_match (CosNotifyFilter::Filter_ptr filter ACE_ENV_ARG_DECL) +{ + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - " + "TAO_Notify_AnyEvent::do_match ()\n")); + + return filter->match(this->event_ ACE_ENV_ARG_PARAMETER); +} + +void +TAO_NS_AnyEvent::push (TAO_NS_Consumer* consumer ACE_ENV_ARG_DECL) const +{ + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - " + "TAO_Notify_AnyEvent::push \n")); + + consumer->push (this->event_ ACE_ENV_ARG_PARAMETER); +} + +void +TAO_NS_AnyEvent::push (Event_Forwarder::StructuredProxyPushSupplier_ptr forwarder ACE_ENV_ARG_DECL) +{ + CosNotification::StructuredEvent notification; + + TAO_NS_Event::translate (this->event_, notification); + + forwarder->forward_structured (notification ACE_ENV_ARG_PARAMETER); +} + +void +TAO_NS_AnyEvent::push_no_filtering (Event_Forwarder::StructuredProxyPushSupplier_ptr forwarder ACE_ENV_ARG_DECL) +{ + CosNotification::StructuredEvent notification; + + TAO_NS_Event::translate (this->event_, notification); + + forwarder->forward_structured_no_filtering (notification ACE_ENV_ARG_PARAMETER); +} + +void +TAO_NS_AnyEvent::push (Event_Forwarder::ProxyPushSupplier_ptr forwarder ACE_ENV_ARG_DECL) +{ + forwarder->forward_any (this->event_ ACE_ENV_ARG_PARAMETER); +} + +void +TAO_NS_AnyEvent::push_no_filtering (Event_Forwarder::ProxyPushSupplier_ptr forwarder ACE_ENV_ARG_DECL) +{ + forwarder->forward_any_no_filtering (this->event_ ACE_ENV_ARG_PARAMETER); +} diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.h b/TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.h new file mode 100644 index 00000000000..d77191e05b1 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.h @@ -0,0 +1,80 @@ +/* -*- C++ -*- */ +/** + * @file AnyEvent.h + * + * $Id$ + * + * @author Pradeep Gore + * + * + */ + +#ifndef TAO_NS_ANYEVENT_H +#define TAO_NS_ANYEVENT_H +#include "ace/pre.h" + +#include "../notify_export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "../Event.h" +#include "../EventType.h" +#include "orbsvcs/CosNotificationC.h" + +class TAO_NS_Consumer; + +/** + * @class TAO_NS_AnyEvent + * + * @brief + * + */ +class TAO_Notify_Export TAO_NS_AnyEvent : public TAO_NS_Event +{ + friend class TAO_NS_Builder; +public: + /// Constuctor + TAO_NS_AnyEvent (const CORBA::Any &event); + + /// Destructor + ~TAO_NS_AnyEvent (); + + /// Get the event type. + virtual const TAO_NS_EventType& type (void) const; + + CORBA::Boolean do_match (CosNotifyFilter::Filter_ptr filter ACE_ENV_ARG_DECL); + + /// Convert to CosNotification::Structured type + virtual void convert (CosNotification::StructuredEvent& notification); + + /// Push event to consumer + virtual void push (TAO_NS_Consumer* consumer ACE_ENV_ARG_DECL) const; + + /// Push event to the Event_Forwarder interface + virtual void push (Event_Forwarder::StructuredProxyPushSupplier_ptr forwarder ACE_ENV_ARG_DECL); + + /// Push event to the Event_Forwarder interface + virtual void push_no_filtering (Event_Forwarder::StructuredProxyPushSupplier_ptr forwarder ACE_ENV_ARG_DECL); + + /// Push event to the Event_Forwarder interface + virtual void push (Event_Forwarder::ProxyPushSupplier_ptr forwarder ACE_ENV_ARG_DECL); + + /// Push event to the Event_Forwarder interface + virtual void push_no_filtering (Event_Forwarder::ProxyPushSupplier_ptr forwarder ACE_ENV_ARG_DECL); + +protected: + /// Any Event + CORBA::Any event_; + + /// Our event type. + static TAO_NS_EventType event_type_; +}; + +#if defined (__ACE_INLINE__) +#include "AnyEvent.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* TAO_NS_ANYEVENT_H */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.inl b/TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.inl new file mode 100644 index 00000000000..4920e03d422 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.inl @@ -0,0 +1,3 @@ +// $Id$ + +#include "AnyEvent.h" diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.cpp new file mode 100644 index 00000000000..a8f0fa35236 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.cpp @@ -0,0 +1,98 @@ +// $Id$ + +#include "CosEC_ProxyPushConsumer.h" + +#if ! defined (__ACE_INLINE__) +#include "CosEC_ProxyPushConsumer.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Notify, TAO_NS_CosEC_ProxyPushConsumer, "$id$") + +#include "ace/Refcounted_Auto_Ptr.h" +#include "tao/debug.h" +#include "../Admin.h" +#include "../AdminProperties.h" +#include "AnyEvent.h" +#include "PushSupplier.h" + +TAO_NS_CosEC_ProxyPushConsumer::TAO_NS_CosEC_ProxyPushConsumer (void) +{ +} + +TAO_NS_CosEC_ProxyPushConsumer::~TAO_NS_CosEC_ProxyPushConsumer () +{ +} + +void +TAO_NS_CosEC_ProxyPushConsumer::release (void) +{ + delete this; + //@@ inform factory +} + +void +TAO_NS_CosEC_ProxyPushConsumer::destroy (ACE_ENV_SINGLE_ARG_DECL) +{ + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, "In TAO_NS_CosEC_ProxyPushConsumer::destroy \n")); + + this->inherited::destroy (this ACE_ENV_ARG_PARAMETER); +} + +void +TAO_NS_CosEC_ProxyPushConsumer::push (TAO_NS_Event_var &/*event*/) +{ + // This should never be called. + ACE_ASSERT (1); +} + +void +TAO_NS_CosEC_ProxyPushConsumer::push (const CORBA::Any& data ACE_ENV_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + , CosEventComm::Disconnected + )) +{ + // Check if we should proceed at all. + if (this->admin_properties_->reject_new_events () == 1 + && this->admin_properties_->queue_full ()) + ACE_THROW (CORBA::IMP_LIMIT ()); + + if (this->is_connected () == 0) + { + ACE_THROW (CosEventComm::Disconnected ()); + } + + // Convert + TAO_NS_Event_var event (new TAO_NS_AnyEvent (data)); + + // Continue processing. + this->TAO_NS_ProxyConsumer::push (event); +} + +void +TAO_NS_CosEC_ProxyPushConsumer::connect_push_supplier (CosEventComm::PushSupplier_ptr push_supplier ACE_ENV_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + , CosEventChannelAdmin::AlreadyConnected + )) +{ + // Convert Supplier to Base Type + TAO_NS_PushSupplier *supplier; + ACE_NEW_THROW_EX (supplier, + TAO_NS_PushSupplier (this), + CORBA::NO_MEMORY ()); + + supplier->init (push_supplier ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + this->connect (supplier ACE_ENV_ARG_PARAMETER); +} + +void TAO_NS_CosEC_ProxyPushConsumer::disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + this->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); +} diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.h b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.h new file mode 100644 index 00000000000..780eb08aaa2 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.h @@ -0,0 +1,88 @@ +/* -*- C++ -*- */ +/** + * @file CosEC_ProxyPushConsumer.h + * + * $Id$ + * + * @author Pradeep Gore + * + * + */ + +#ifndef TAO_NS_COSEC_PROXYPUSHCONSUMER_H +#define TAO_NS_COSEC_PROXYPUSHCONSUMER_H +#include "ace/pre.h" + +#include "../notify_export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "orbsvcs/CosEventChannelAdminS.h" +#include "../ProxyConsumer_T.h" +#include "../Destroy_Callback.h" + +#if defined(_MSC_VER) +#if (_MSC_VER >= 1200) +#pragma warning(push) +#endif /* _MSC_VER >= 1200 */ +#pragma warning(disable:4250) +#endif /* _MSC_VER */ + +/** + * @class TAO_NS_CosEC_ProxyPushConsumer + * + * @brief + * + */ +class TAO_Notify_Export TAO_NS_CosEC_ProxyPushConsumer : public virtual TAO_NS_ProxyConsumer_T , public TAO_NS_Destroy_Callback +{ + friend class TAO_NS_Builder; +public: + /// Constuctor + TAO_NS_CosEC_ProxyPushConsumer (void); + + /// Destructor + ~TAO_NS_CosEC_ProxyPushConsumer (); + + /// TAO_NS_Destroy_Callback methods + virtual void release (void); + + /// Destroy this object. + virtual void destroy (ACE_ENV_SINGLE_ARG_DECL); + +protected: + ///= CosNotifyChannelAdmin::ProxyPushConsumer methods + + virtual void push (const CORBA::Any & data ACE_ENV_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException, + CosEventComm::Disconnected + )); + + virtual void connect_push_supplier (CosEventComm::PushSupplier_ptr push_supplier ACE_ENV_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException, + CosEventChannelAdmin::AlreadyConnected + )); + + virtual void disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + )); +private: + // Overloaded TAO_NS_ProxyConsumer::push to get around Borland compiler warnings. + virtual void push (TAO_NS_Event_var &event); +}; + +#if defined(_MSC_VER) && (_MSC_VER >= 1200) +#pragma warning(pop) +#endif /* _MSC_VER */ + +#if defined (__ACE_INLINE__) +#include "CosEC_ProxyPushConsumer.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* TAO_NS_COSEC_PROXYPUSHCONSUMER_H */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.inl b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.inl new file mode 100644 index 00000000000..e787ced2266 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.inl @@ -0,0 +1,3 @@ +// $Id$ + +#include "CosEC_ProxyPushConsumer.h" diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.cpp b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.cpp new file mode 100644 index 00000000000..401d9e8d604 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.cpp @@ -0,0 +1,74 @@ +// $Id$ + +#include "CosEC_ProxyPushSupplier.h" + +#if ! defined (__ACE_INLINE__) +#include "CosEC_ProxyPushSupplier.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Notify, TAO_NS_CosEC_ProxyPushSupplier, "$id$") + +#include "tao/debug.h" +#include "PushConsumer.h" +#include "../Proxy.h" +#include "../Admin.h" +#include "../EventChannel.h" +#include "../EventChannelFactory.h" +#include "../Notify_Service.h" + + +TAO_NS_CosEC_ProxyPushSupplier::TAO_NS_CosEC_ProxyPushSupplier (void) +{ +} + +TAO_NS_CosEC_ProxyPushSupplier::~TAO_NS_CosEC_ProxyPushSupplier () +{ +} + +void +TAO_NS_CosEC_ProxyPushSupplier::destroy (ACE_ENV_SINGLE_ARG_DECL) +{ + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, "In TAO_NS_ProxyPushConsumer::destroy \n")); + + this->inherited::destroy (this ACE_ENV_ARG_PARAMETER); +} + +void +TAO_NS_CosEC_ProxyPushSupplier::release (void) +{ + this->consumer_->release (); + + delete this; + //@@ inform factory +} + +void +TAO_NS_CosEC_ProxyPushSupplier::connect_push_consumer (CosEventComm::PushConsumer_ptr push_consumer + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException, + CosEventChannelAdmin::AlreadyConnected, + CosEventChannelAdmin::TypeError + )) +{ + // Convert Consumer to Base Type + TAO_NS_PushConsumer* consumer; + ACE_NEW_THROW_EX (consumer, + TAO_NS_PushConsumer (this), + CORBA::NO_MEMORY ()); + + consumer->init (push_consumer ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + this->connect (consumer ACE_ENV_ARG_PARAMETER); +} + +void +TAO_NS_CosEC_ProxyPushSupplier::disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + this->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); +} diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.h b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.h new file mode 100644 index 00000000000..a7a27dc3221 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.h @@ -0,0 +1,87 @@ +/* -*- C++ -*- */ +/** + * @file CosEC_ProxyPushSupplier.h + * + * $Id$ + * + * @author Pradeep Gore + * + * + */ + +#ifndef TAO_NS_COSEC_PROXYPUSHSUPPLIER_H +#define TAO_NS_COSEC_PROXYPUSHSUPPLIER_H +#include "ace/pre.h" + +#include "../notify_export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "orbsvcs/CosEventChannelAdminS.h" +#include "../ProxySupplier_T.h" + +#if defined(_MSC_VER) +#if (_MSC_VER >= 1200) +#pragma warning(push) +#endif /* _MSC_VER >= 1200 */ +#pragma warning(disable:4250) +#endif /* _MSC_VER */ + +#if defined ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT +template class TAO_Notify_Export +TAO_NS_ProxySupplier_T; +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT */ + +/** + * @class TAO_NS_CosEC_ProxyPushSupplier + * + * @brief + * + */ +class TAO_Notify_Export TAO_NS_CosEC_ProxyPushSupplier : public virtual TAO_NS_ProxySupplier_T , public TAO_NS_Destroy_Callback +{ + friend class TAO_NS_Builder; +public: + /// Constuctor + TAO_NS_CosEC_ProxyPushSupplier (void); + + /// Destructor + ~TAO_NS_CosEC_ProxyPushSupplier (); + + /// Destroy this object. + virtual void destroy (ACE_ENV_SINGLE_ARG_DECL); + + /// TAO_NS_Destroy_Callback methods + virtual void release (void); + + // = Interface methods + virtual void connect_push_consumer ( + CosEventComm::PushConsumer_ptr push_consumer + ACE_ENV_ARG_DECL + ) + ACE_THROW_SPEC (( + CORBA::SystemException, + CosEventChannelAdmin::AlreadyConnected, + CosEventChannelAdmin::TypeError + )); + + virtual void disconnect_push_supplier ( + ACE_ENV_SINGLE_ARG_DECL + ) + ACE_THROW_SPEC (( + CORBA::SystemException + )); +}; + +#if defined(_MSC_VER) && (_MSC_VER >= 1200) +#pragma warning(pop) +#endif /* _MSC_VER */ + +#if defined (__ACE_INLINE__) +#include "CosEC_ProxyPushSupplier.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* TAO_NS_COSEC_PROXYPUSHSUPPLIER_H */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.inl b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.inl new file mode 100644 index 00000000000..c6551c656c4 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.inl @@ -0,0 +1,3 @@ +// $Id$ + +#include "CosEC_ProxyPushSupplier.h" diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.cpp new file mode 100644 index 00000000000..8b2ec3bb101 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.cpp @@ -0,0 +1,110 @@ +// $Id$ + +#include "ProxyPushConsumer.h" + +#if ! defined (__ACE_INLINE__) +#include "ProxyPushConsumer.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Notify, TAO_NS_ProxyPushConsumer, "$id$") + +#include "ace/Refcounted_Auto_Ptr.h" +#include "tao/debug.h" +#include "../Admin.h" +#include "../AdminProperties.h" +#include "AnyEvent.h" +#include "PushSupplier.h" + +TAO_NS_ProxyPushConsumer::TAO_NS_ProxyPushConsumer (void) +{ +} + +TAO_NS_ProxyPushConsumer::~TAO_NS_ProxyPushConsumer () +{ +} + +void +TAO_NS_ProxyPushConsumer::release (void) +{ + if (this->supplier_) + this->supplier_->release (); + + delete this; + //@@ inform factory +} + +void +TAO_NS_ProxyPushConsumer::destroy (ACE_ENV_SINGLE_ARG_DECL) +{ + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, "In TAO_NS_ProxyPushConsumer::destroy \n")); + + this->inherited::destroy (this ACE_ENV_ARG_PARAMETER); +} + +CosNotifyChannelAdmin::ProxyType +TAO_NS_ProxyPushConsumer::MyType (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + return CosNotifyChannelAdmin::PUSH_ANY; +} + +void +TAO_NS_ProxyPushConsumer::push (TAO_NS_Event_var &/*event*/) +{ + // This should never be called. + ACE_ASSERT (1); +} + +void +TAO_NS_ProxyPushConsumer::push (const CORBA::Any& data ACE_ENV_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + , CosEventComm::Disconnected + )) +{ + // Check if we should proceed at all. + if (this->admin_properties_->reject_new_events () == 1 + && this->admin_properties_->queue_full ()) + ACE_THROW (CORBA::IMP_LIMIT ()); + + if (this->is_connected () == 0) + { + ACE_THROW (CosEventComm::Disconnected ()); + } + + // Convert + TAO_NS_Event_var event (new TAO_NS_AnyEvent (data)); + + // Continue processing. + this->TAO_NS_ProxyConsumer::push (event); +} + +void +TAO_NS_ProxyPushConsumer::connect_any_push_supplier (CosEventComm::PushSupplier_ptr push_supplier ACE_ENV_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + , CosEventChannelAdmin::AlreadyConnected + )) +{ + // Convert Supplier to Base Type + TAO_NS_PushSupplier *supplier; + ACE_NEW_THROW_EX (supplier, + TAO_NS_PushSupplier (this), + CORBA::NO_MEMORY ()); + + supplier->init (push_supplier ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + this->connect (supplier ACE_ENV_ARG_PARAMETER); +} + +void TAO_NS_ProxyPushConsumer::disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + this->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); +} diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.h b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.h new file mode 100644 index 00000000000..83eb801af1f --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.h @@ -0,0 +1,94 @@ +/* -*- C++ -*- */ +/** + * @file ProxyPushConsumer.h + * + * $Id$ + * + * @author Pradeep Gore + * + * + */ + +#ifndef TAO_NS_PROXYPUSHCONSUMER_H +#define TAO_NS_PROXYPUSHCONSUMER_H +#include "ace/pre.h" + +#include "../notify_export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "orbsvcs/CosNotifyChannelAdminS.h" +#include "../ProxyConsumer_T.h" +#include "../Destroy_Callback.h" + +#if defined(_MSC_VER) +#if (_MSC_VER >= 1200) +#pragma warning(push) +#endif /* _MSC_VER >= 1200 */ +#pragma warning(disable:4250) +#endif /* _MSC_VER */ + +/** + * @class TAO_NS_ProxyPushConsumer + * + * @brief + * + */ +class TAO_Notify_Export TAO_NS_ProxyPushConsumer : public virtual TAO_NS_ProxyConsumer_T , public TAO_NS_Destroy_Callback +{ + friend class TAO_NS_Builder; +public: + /// Constuctor + TAO_NS_ProxyPushConsumer (void); + + /// Destructor + ~TAO_NS_ProxyPushConsumer (); + + /// TAO_NS_Destroy_Callback methods + virtual void release (void); + + /// Destroy this object. + virtual void destroy (ACE_ENV_SINGLE_ARG_DECL); + +protected: + ///= CosNotifyChannelAdmin::ProxyPushConsumer methods + + virtual CosNotifyChannelAdmin::ProxyType MyType (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + )); + + virtual void push (const CORBA::Any & data ACE_ENV_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException, + CosEventComm::Disconnected + )); + + virtual void connect_any_push_supplier (CosEventComm::PushSupplier_ptr push_supplier ACE_ENV_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException, + CosEventChannelAdmin::AlreadyConnected + )); + + virtual void disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + )); + +private: + // Overloaded TAO_NS_ProxyConsumer::push to get around Borland compiler warnings. + virtual void push (TAO_NS_Event_var &event); +}; + +#if defined(_MSC_VER) && (_MSC_VER >= 1200) +#pragma warning(pop) +#endif /* _MSC_VER */ + +#if defined (__ACE_INLINE__) +#include "ProxyPushConsumer.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* TAO_NS_PROXYPUSHCONSUMER_H */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.inl b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.inl new file mode 100644 index 00000000000..32c8ecf4355 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.inl @@ -0,0 +1,3 @@ +// $Id$ + +#include "ProxyPushConsumer.h" diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushSupplier.cpp b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushSupplier.cpp new file mode 100644 index 00000000000..a430c6f0a1c --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushSupplier.cpp @@ -0,0 +1,84 @@ +// $Id$ + +#include "ProxyPushSupplier.h" + +#if ! defined (__ACE_INLINE__) +#include "ProxyPushSupplier.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Notify, TAO_NS_ProxyPushSupplier, "$id$") + +#include "tao/debug.h" +#include "PushConsumer.h" +#include "../Proxy.h" +#include "../Admin.h" +#include "../EventChannel.h" +#include "../EventChannelFactory.h" +#include "../Notify_Service.h" + + +TAO_NS_ProxyPushSupplier::TAO_NS_ProxyPushSupplier (void) +{ +} + +TAO_NS_ProxyPushSupplier::~TAO_NS_ProxyPushSupplier () +{ +} + +void +TAO_NS_ProxyPushSupplier::destroy (ACE_ENV_SINGLE_ARG_DECL) +{ + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, "In TAO_NS_ProxyPushConsumer::destroy \n")); + + this->inherited::destroy (this ACE_ENV_ARG_PARAMETER); +} + +void +TAO_NS_ProxyPushSupplier::release (void) +{ + if (this->consumer_) + this->consumer_->release (); + + delete this; + //@@ inform factory +} + +void +TAO_NS_ProxyPushSupplier::connect_any_push_consumer (CosEventComm::PushConsumer_ptr push_consumer + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException, + CosEventChannelAdmin::AlreadyConnected, + CosEventChannelAdmin::TypeError + )) +{ + // Convert Consumer to Base Type + TAO_NS_PushConsumer* consumer; + ACE_NEW_THROW_EX (consumer, + TAO_NS_PushConsumer (this), + CORBA::NO_MEMORY ()); + + consumer->init (push_consumer ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + this->connect (consumer ACE_ENV_ARG_PARAMETER); +} + +void +TAO_NS_ProxyPushSupplier::disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + this->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); +} + +CosNotifyChannelAdmin::ProxyType +TAO_NS_ProxyPushSupplier::MyType (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + return CosNotifyChannelAdmin::PUSH_ANY; +} diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushSupplier.h b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushSupplier.h new file mode 100644 index 00000000000..66a6f0de2d8 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushSupplier.h @@ -0,0 +1,93 @@ +/* -*- C++ -*- */ +/** + * @file ProxyPushSupplier.h + * + * $Id$ + * + * @author Pradeep Gore + * + * + */ + +#ifndef TAO_NS_PROXYPUSHSUPPLIER_H +#define TAO_NS_PROXYPUSHSUPPLIER_H +#include "ace/pre.h" + +#include "../notify_export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "orbsvcs/CosNotifyChannelAdminS.h" +#include "orbsvcs/Event_ForwarderS.h" +#include "../ProxySupplier_T.h" + +#if defined(_MSC_VER) +#if (_MSC_VER >= 1200) +#pragma warning(push) +#endif /* _MSC_VER >= 1200 */ +#pragma warning(disable:4250) +#endif /* _MSC_VER */ + +#if defined ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT +template class TAO_Notify_Export +TAO_NS_ProxySupplier_T; +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT */ + +/** + * @class TAO_NS_ProxyPushSupplier + * + * @brief + * + */ +class TAO_Notify_Export TAO_NS_ProxyPushSupplier : public virtual TAO_NS_ProxySupplier_T , public TAO_NS_Destroy_Callback +{ + friend class TAO_NS_Builder; +public: + /// Constuctor + TAO_NS_ProxyPushSupplier (void); + + /// Destructor + ~TAO_NS_ProxyPushSupplier (); + + /// Destroy this object. + virtual void destroy (ACE_ENV_SINGLE_ARG_DECL); + + /// TAO_NS_Destroy_Callback methods + virtual void release (void); + + // = Interface methods + virtual CosNotifyChannelAdmin::ProxyType MyType (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + )); + + virtual void connect_any_push_consumer ( + CosEventComm::PushConsumer_ptr push_consumer + ACE_ENV_ARG_DECL + ) + ACE_THROW_SPEC (( + CORBA::SystemException, + CosEventChannelAdmin::AlreadyConnected, + CosEventChannelAdmin::TypeError + )); + + virtual void disconnect_push_supplier ( + ACE_ENV_SINGLE_ARG_DECL + ) + ACE_THROW_SPEC (( + CORBA::SystemException + )); +}; + +#if defined(_MSC_VER) && (_MSC_VER >= 1200) +#pragma warning(pop) +#endif /* _MSC_VER */ + +#if defined (__ACE_INLINE__) +#include "ProxyPushSupplier.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* TAO_NS_PROXYPUSHSUPPLIER_H */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushSupplier.inl b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushSupplier.inl new file mode 100644 index 00000000000..dda279acbfc --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushSupplier.inl @@ -0,0 +1,3 @@ +// $Id$ + +#include "ProxyPushSupplier.h" diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp new file mode 100644 index 00000000000..ec6b2298fac --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp @@ -0,0 +1,68 @@ +// $Id$ + +#include "PushConsumer.h" + +#if ! defined (__ACE_INLINE__) +#include "PushConsumer.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Notify, TAO_NS_PushConsumer, "$id$") + +#include "ace/Refcounted_Auto_Ptr.h" +#include "orbsvcs/CosEventCommC.h" +#include "../Event.h" + +TAO_NS_PushConsumer::TAO_NS_PushConsumer (TAO_NS_ProxySupplier* proxy) + :TAO_NS_Consumer (proxy) +{ +} + +TAO_NS_PushConsumer::~TAO_NS_PushConsumer () +{ +} + +void +TAO_NS_PushConsumer::init (CosEventComm::PushConsumer_ptr push_consumer ACE_ENV_ARG_DECL) +{ + this->push_consumer_ = CosEventComm::PushConsumer::_duplicate (push_consumer); + + ACE_TRY + { + this->publish_ = CosNotifyComm::NotifyPublish::_narrow (push_consumer ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + // _narrow failed which probably means the interface is CosEventComm type. + } + ACE_ENDTRY; +} + +void +TAO_NS_PushConsumer::release (void) +{ + delete this; + //@@ inform factory +} + +void +TAO_NS_PushConsumer::push_i (const TAO_NS_Event_var& event ACE_ENV_ARG_DECL) +{ + event->push (this ACE_ENV_ARG_PARAMETER); +} + +void +TAO_NS_PushConsumer::push (const CORBA::Any& payload ACE_ENV_ARG_DECL) +{ + this->push_consumer_->push (payload ACE_ENV_ARG_PARAMETER); +} + +void +TAO_NS_PushConsumer::push (const CosNotification::StructuredEvent& event ACE_ENV_ARG_DECL) +{ + CORBA::Any any; + + TAO_NS_Event::translate (event, any); + + this->push_consumer_->push (any ACE_ENV_ARG_PARAMETER); +} diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.h b/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.h new file mode 100644 index 00000000000..5660ddeee77 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.h @@ -0,0 +1,66 @@ +/* -*- C++ -*- */ +/** + * @file PushConsumer.h + * + * $Id$ + * + * @author Pradeep Gore + * + * + */ + +#ifndef TAO_NS_PUSHCONSUMER_H +#define TAO_NS_PUSHCONSUMER_H +#include "ace/pre.h" + +#include "../notify_export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "orbsvcs/CosNotifyCommC.h" +#include "../Consumer.h" +#include "../Destroy_Callback.h" + +/** + * @class TAO_NS_PushConsumer + * + * @brief Wrapper for the PushConsumer that connect to the EventChannel. + * + */ +class TAO_Notify_Export TAO_NS_PushConsumer : public TAO_NS_Consumer +{ +public: + /// Constuctor + TAO_NS_PushConsumer (TAO_NS_ProxySupplier* proxy); + + /// Destructor + ~TAO_NS_PushConsumer (); + + /// Init + void init (CosEventComm::PushConsumer_ptr push_consumer ACE_ENV_ARG_DECL); + + /// TAO_NS_Destroy_Callback methods. + virtual void release (void); + + /// Push to this consumer. + void push_i (const TAO_NS_Event_var& event ACE_ENV_ARG_DECL); + + /// Push to this consumer. + virtual void push (const CORBA::Any& event ACE_ENV_ARG_DECL); + + /// Push to this consumer. + virtual void push (const CosNotification::StructuredEvent& event ACE_ENV_ARG_DECL); + +protected: + /// The Consumer + CosEventComm::PushConsumer_var push_consumer_; +}; + +#if defined (__ACE_INLINE__) +#include "PushConsumer.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* TAO_NS_PUSHCONSUMER_H */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.inl b/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.inl new file mode 100644 index 00000000000..e557d6a7eda --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.inl @@ -0,0 +1,3 @@ +// $Id$ + +#include "PushConsumer.h" diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/PushSupplier.cpp b/TAO/orbsvcs/orbsvcs/Notify/Any/PushSupplier.cpp new file mode 100644 index 00000000000..fb1642a7759 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Any/PushSupplier.cpp @@ -0,0 +1,42 @@ +// $Id$ + +#include "PushSupplier.h" + +#if ! defined (__ACE_INLINE__) +#include "PushSupplier.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Notify, TAO_NS_PushSupplier, "$id$") + +TAO_NS_PushSupplier::TAO_NS_PushSupplier (TAO_NS_ProxyConsumer* proxy) + :TAO_NS_Supplier (proxy) +{ +} + +TAO_NS_PushSupplier::~TAO_NS_PushSupplier () +{ +} + +void +TAO_NS_PushSupplier::init (CosEventComm::PushSupplier_ptr push_supplier ACE_ENV_ARG_DECL) +{ + this->push_supplier_ = CosEventComm::PushSupplier::_duplicate (push_supplier); + + ACE_TRY + { + this->subscribe_ = CosNotifyComm::NotifySubscribe::_narrow (push_supplier ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + // _narrow failed which probably means the interface is CosEventComm type. + } + ACE_ENDTRY; +} + +void +TAO_NS_PushSupplier::release (void) +{ + delete this; + //@@ inform factory +} diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/PushSupplier.h b/TAO/orbsvcs/orbsvcs/Notify/Any/PushSupplier.h new file mode 100644 index 00000000000..3ba1c4fb2c0 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Any/PushSupplier.h @@ -0,0 +1,60 @@ +/* -*- C++ -*- */ +/** + * @file PushSupplier.h + * + * $Id$ + * + * @author Pradeep Gore + * + * + */ + +#ifndef TAO_NS_PUSHSUPPLIER_H +#define TAO_NS_PUSHSUPPLIER_H +#include "ace/pre.h" + +#include "../notify_export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + + +#include "orbsvcs/CosNotifyCommC.h" +#include "../Supplier.h" +#include "../Destroy_Callback.h" + +class TAO_NS_ProxyConsumer; + +/** + * @class TAO_NS_StructuredPushSupplier + * + * @brief Wrapper for the PushSupplier that connect to the EventChannel. + * + */ +class TAO_Notify_Export TAO_NS_PushSupplier : public TAO_NS_Supplier +{ +public: + /// Constuctor + TAO_NS_PushSupplier (TAO_NS_ProxyConsumer* proxy); + + /// Destructor + ~TAO_NS_PushSupplier (); + + /// Init + void init (CosEventComm::PushSupplier_ptr push_supplier ACE_ENV_ARG_DECL); + + /// TAO_NS_Destroy_Callback methods + virtual void release (void); + +protected: + /// The Supplier + CosEventComm::PushSupplier_var push_supplier_; +}; + +#if defined (__ACE_INLINE__) +#include "PushSupplier.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* TAO_NS_PUSHSUPPLIER_H */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/PushSupplier.inl b/TAO/orbsvcs/orbsvcs/Notify/Any/PushSupplier.inl new file mode 100644 index 00000000000..9f9bf75ff2f --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Any/PushSupplier.inl @@ -0,0 +1,3 @@ +// $Id$ + +#include "PushSupplier.h" diff --git a/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp b/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp new file mode 100644 index 00000000000..c030592d339 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp @@ -0,0 +1,271 @@ +// $Id$ + +#include "Buffering_Strategy.h" + +#if ! defined (__ACE_INLINE__) +#include "Buffering_Strategy.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Notify, TAO_NS_Buffering_Strategy, "$id$") + +#include "ace/Message_Queue.h" +#include "orbsvcs/CosNotificationC.h" +#include "Method_Request.h" +#include "Notify_Extensions.h" +#include "QoSProperties.h" +#include "tao/debug.h" + +TAO_NS_Buffering_Strategy::TAO_NS_Buffering_Strategy (TAO_NS_Message_Queue& msg_queue, TAO_NS_AdminProperties_var& admin_properties, CORBA::Long batch_size) + : msg_queue_ (msg_queue), + admin_properties_ (admin_properties), + global_queue_lock_ (admin_properties->global_queue_lock ()), + global_queue_not_full_condition_ (admin_properties->global_queue_not_full_condition ()), + global_queue_length_ (admin_properties->global_queue_length ()), + max_global_queue_length_ (admin_properties->max_global_queue_length ()), + max_local_queue_length_ (0), + order_policy_ (CosNotification::OrderPolicy, CosNotification::AnyOrder), + discard_policy_ (CosNotification::DiscardPolicy, CosNotification::AnyOrder), + use_discarding_ (1), + local_queue_not_full_condition_ (global_queue_lock_), + batch_size_ (batch_size), + batch_size_reached_condition_ (global_queue_lock_), + shutdown_ (0) +{ +} + +TAO_NS_Buffering_Strategy::~TAO_NS_Buffering_Strategy () +{ +} + +void +TAO_NS_Buffering_Strategy::update_qos_properties (const TAO_NS_QoSProperties& qos_properties) +{ + this->order_policy_.set (qos_properties); + + if (this->discard_policy_.set (qos_properties) != -1) + { + this->use_discarding_ = 1; + } + + TAO_NS_Property_Time blocking_timeout (TAO_Notify_Extensions::BlockingPolicy); + + if (blocking_timeout.set (qos_properties) != -1) // if set to a valid time, init the blocking_time_ + { + this->use_discarding_ = 0; + + this->blocking_time_ = +# if defined (ACE_CONFIG_WIN32_H) + ACE_Time_Value (ACE_static_cast (long, blocking_timeout.value ())); +# else + ACE_Time_Value (blocking_timeout.value () / 1); +# endif /* ACE_CONFIG_WIN32_H */ + } +} + +void +TAO_NS_Buffering_Strategy::shutdown (void) +{ + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_); + + this->shutdown_ = 1; + + this->global_queue_not_full_condition_.broadcast (); + this->local_queue_not_full_condition_.broadcast (); + this->batch_size_reached_condition_.broadcast (); +} + +int +TAO_NS_Buffering_Strategy::enqueue (TAO_NS_Method_Request& method_request) +{ + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1); + + // while either local or global max reached + while ((this->max_local_queue_length_ != 0 && + this->msg_queue_.message_count () == this->max_local_queue_length_) + || + (this->max_global_queue_length_.value () != 0 && + this->global_queue_length_ == this->max_global_queue_length_.value ())) + { + if (this->shutdown_ == 1) // if we're shutdown, don't play this silly game. + return -1; + + if (this->use_discarding_ == 1) + { + if (this->global_queue_length_ == this->max_global_queue_length_.value () + && this->msg_queue_.message_count () == 0) // global max. reached but can't discard + { + // block. this is a hack because the real solution is to locate the appropriate queue and dequeue from it. + this->global_queue_not_full_condition_.wait (); + } + else // local max reached or, at global max but non-zero local count. + { + if (this->discard () == -1) + return -1; + + --this->global_queue_length_; + + // ACE_DEBUG ((LM_DEBUG, "Discarded from %x, global_queue_length = %d\n", this, this->global_queue_length_)); + + this->global_queue_not_full_condition_.signal (); + this->local_queue_not_full_condition_.signal (); + } + } + else // block + { + if (this->msg_queue_.message_count () == this->max_local_queue_length_) // local maximum reached + { + if (this->blocking_time_ == ACE_Time_Value::zero) // wait forever if need be. + { + this->local_queue_not_full_condition_.wait (); + } + else // finite blocking time. + { + ACE_Time_Value absolute = ACE_OS::gettimeofday () + this->blocking_time_; + + if (this->local_queue_not_full_condition_.wait (&absolute) == -1) // returns -1 on timeout + return -1; // Note message is discarded if it could not be enqueued in the given time. + } + } + else // global max reached + { + if (this->blocking_time_ == ACE_Time_Value::zero) // wait forever if need be. + { + this->global_queue_not_full_condition_.wait (); + } + else // finite blocking time. + { + ACE_Time_Value absolute = ACE_OS::gettimeofday () + blocking_time_; + + if (this->global_queue_not_full_condition_.wait (&absolute) == -1) // returns -1 on timeout + return -1; + } + } + } // block + } // while + + if (this->queue (method_request) == -1) + { + ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - " + "Panic! failed to enqueue event")); + return -1; + } + + ++this->global_queue_length_; + + // ACE_DEBUG ((LM_DEBUG, "Inserted to %x, global_queue_length = %d\n", this, this->global_queue_length_)); + + if (this->msg_queue_.message_count () == this->batch_size_) + batch_size_reached_condition_.signal (); + + return this->msg_queue_.message_count (); +} + +int +TAO_NS_Buffering_Strategy::dequeue (TAO_NS_Method_Request* &method_request, const ACE_Time_Value *abstime) +{ + ACE_Message_Block *mb; + + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1); + + while (this->msg_queue_.message_count () < this->batch_size_) // block + { + this->batch_size_reached_condition_.wait (abstime); + + if (this->shutdown_ == 1) // if we're shutdown, don't play this silly game. + return -1; + + if (errno == ETIME) + return 0; + } + + if (this->msg_queue_.dequeue (mb) == -1) + return -1; + + method_request = ACE_dynamic_cast (TAO_NS_Method_Request*, mb); + + if (method_request == 0) + return -1; + + --this->global_queue_length_; + + // ACE_DEBUG ((LM_DEBUG, "Dequeued from %x, global_queue_length = %d\n", this, this->global_queue_length_)); + + this->global_queue_not_full_condition_.signal (); + this->local_queue_not_full_condition_.signal (); + + return 1; +} + +int +TAO_NS_Buffering_Strategy::queue (TAO_NS_Method_Request& method_request) +{ + int result; + + // Queue according to order policy + if (this->order_policy_ == CosNotification::AnyOrder || + this->order_policy_ == CosNotification::FifoOrder) + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - " + "enqueue in fifo order\n")); + // Insert at the end of the queue. + result = this->msg_queue_.enqueue_tail (&method_request); + } + else if (this->order_policy_ == CosNotification::PriorityOrder) + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - " + "enqueue in priority order\n")); + result = this->msg_queue_.enqueue_prio (&method_request); + } + else if (this->order_policy_ == CosNotification::DeadlineOrder) + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - " + "enqueue in deadline order\n")); + result = this->msg_queue_.enqueue_deadline (&method_request); + } + else + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid order policy\n")); + + result = -1; + } + + return result; +} + +int +TAO_NS_Buffering_Strategy::discard (void) +{ + ACE_Message_Block *mb; + int result; + + if (this->discard_policy_ == CosNotification::AnyOrder || + this->discard_policy_ == CosNotification::FifoOrder) + { + result = this->msg_queue_.dequeue_head (mb); + } + else if (this->discard_policy_ == CosNotification::LifoOrder) + { + result = this->msg_queue_.dequeue_tail (mb); + } + else if (this->discard_policy_ == CosNotification::DeadlineOrder) + { + result = this->msg_queue_.dequeue_deadline (mb); + } + else if (this->discard_policy_ == CosNotification::PriorityOrder) + { + result = this->msg_queue_.dequeue_prio (mb); + } + else + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - " + "Invalid discard policy\n")); + result = -1; + } + + return result; +} diff --git a/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.h b/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.h new file mode 100644 index 00000000000..7b854a47508 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.h @@ -0,0 +1,137 @@ +/* -*- C++ -*- */ +/** + * @file Buffering_Strategy.h + * + * $Id$ + * + * @author Pradeep Gore + * + * + */ + +#ifndef TAO_NS_BUFFERING_STRATEGY_H +#define TAO_NS_BUFFERING_STRATEGY_H +#include "ace/pre.h" + +#include "notify_export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + + +#include "ace/Message_Queue.h" +#include "orbsvcs/TimeBaseC.h" +#include "Property.h" +#include "Property_T.h" +#include "AdminProperties.h" + +class TAO_NS_Method_Request; +class TAO_NS_QoSProperties; + +typedef ACE_Message_Queue TAO_NS_Message_Queue; + +/** + * @class TAO_NS_Buffering_Strategy + * + * @brief Base Strategy to enqueue and dequeue items from a Message Queue. + * + */ +class TAO_Notify_Export TAO_NS_Buffering_Strategy +{ +public: + /// Constuctor + TAO_NS_Buffering_Strategy (TAO_NS_Message_Queue& msg_queue, TAO_NS_AdminProperties_var& admin_properties, CORBA::Long batch_size); + + /// Destructor + ~TAO_NS_Buffering_Strategy (); + + /// Update state with the following QoS Properties: + /// Order Policy + /// Discard Policy + /// MaxEventsPerConsumer + /// TAO_Notify_Extensions::BlockingPolicy + void update_qos_properties (const TAO_NS_QoSProperties& qos_properties); + + /// Enqueue according the enqueing strategy. + /// Return -1 on error else the number of items in the queue. + int enqueue (TAO_NS_Method_Request& method_request); + + /// Dequeue batch. This method will block for if non-zero or else blocks till an item is available. + /// Return -1 on error or if nothing is available, else the number of items actually dequeued (1). + int dequeue (TAO_NS_Method_Request* &method_request, const ACE_Time_Value *abstime); + + /// Shutdown + void shutdown (void); + + /// Set the new batch size. + void batch_size (CORBA::Long batch_size); + + /// Obtain our batch size + CORBA::Long batch_size (void); + + /// Set the max local queue length. + void max_local_queue_length (CORBA::Long length); + +protected: + /// Apply the Order Policy and queue. return -1 on error. + int queue (TAO_NS_Method_Request& method_request); + + /// Discard as per the Discard Policy. + int discard (void); + + ///= Data Members + + /// The local Message Queue + TAO_NS_Message_Queue& msg_queue_; + + /// Reference to the properties per event channel. + TAO_NS_AdminProperties_var admin_properties_; + + /// The shared global lock used by all the queues. + ACE_SYNCH_MUTEX& global_queue_lock_; + + /// The shared Condition for global queue not full. + ACE_SYNCH_CONDITION& global_queue_not_full_condition_; + + /// The global queue length - queue length accross all the queues. + CORBA::Long& global_queue_length_; + + /// The maximum events that can be queued overall. + const TAO_NS_Property_Long& max_global_queue_length_; + + /// The maximum queue length for the local queue. + CORBA::Long max_local_queue_length_; + + /// Order of events in internal buffers. + TAO_NS_Property_Short order_policy_; + + /// Policy to discard when buffers are full. + TAO_NS_Property_Short discard_policy_; + + /// Flag that we should use discarding(1) or blocking (0). + int use_discarding_; + + /// The blocking timeout will be used in place of discarding + /// This is a TAO specific extension. + ACE_Time_Value blocking_time_; // 0 means wait forever. + + /// Condition that the local queue is not full. + ACE_SYNCH_CONDITION local_queue_not_full_condition_; + + /// The batch size that we want to monitor for dequeuing. + CORBA::Long batch_size_; + + /// Condition that batch size reached. + ACE_SYNCH_CONDITION batch_size_reached_condition_; + + /// Flag to shutdown. + int shutdown_; +}; + +#if defined (__ACE_INLINE__) +#include "Buffering_Strategy.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* TAO_NS_BUFFERING_STRATEGY_H */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.inl b/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.inl new file mode 100644 index 00000000000..3dbfa925c5b --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.inl @@ -0,0 +1,19 @@ +// $Id$ + +ACE_INLINE void +TAO_NS_Buffering_Strategy::batch_size (CORBA::Long batch_size) +{ + this->batch_size_ = batch_size; +} + +ACE_INLINE CORBA::Long +TAO_NS_Buffering_Strategy::batch_size (void) +{ + return this->batch_size_; +} + +ACE_INLINE void +TAO_NS_Buffering_Strategy::max_local_queue_length (CORBA::Long length) +{ + this->max_local_queue_length_ = length; +} diff --git a/TAO/orbsvcs/orbsvcs/Notify/CosNotify_Initializer.cpp b/TAO/orbsvcs/orbsvcs/Notify/CosNotify_Initializer.cpp new file mode 100644 index 00000000000..7794ee10956 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/CosNotify_Initializer.cpp @@ -0,0 +1,13 @@ +// $Id$ + +#include "CosNotify_Initializer.h" + +ACE_RCSID(Notify, TAO_NS_CosNotify_Initializer, "$id$") + +#include "Notify_Service.h" + +TAO_NS_CosNotify_Initializer::TAO_NS_CosNotify_Initializer (void) +{ + ACE_Service_Config::static_svcs ()->insert (&ace_svc_desc_TAO_NS_Notify_Service); + ACE_Service_Config::static_svcs ()->insert (&ace_svc_desc_TAO_Notify_Default_EMO_Factory_OLD); +} diff --git a/TAO/orbsvcs/orbsvcs/Notify/CosNotify_Initializer.h b/TAO/orbsvcs/orbsvcs/Notify/CosNotify_Initializer.h new file mode 100644 index 00000000000..a2e73f4898a --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/CosNotify_Initializer.h @@ -0,0 +1,38 @@ +/* -*- C++ -*- */ +/** + * @file CosNotify_Initializer.h + * + * $Id$ + * + * @author Pradeep Gore + * + * + */ + +#ifndef TAO_NS_COSNOTIFY_INITIALIZER_H +#define TAO_NS_COSNOTIFY_INITIALIZER_H +#include "ace/pre.h" + +#include "notify_export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +/** + * @class TAO_NS_CosNotify_Initializer + * + * @brief Helper to load the Cos Notification service into the service conf. for static links. + * + */ +class TAO_Notify_Export TAO_NS_CosNotify_Initializer +{ +public: + /// Constuctor + TAO_NS_CosNotify_Initializer (void); +}; + +static TAO_NS_CosNotify_Initializer TAO_NS_CosNotify_initializer; + +#include "ace/post.h" +#endif /* TAO_NS_COSNOTIFY_INITIALIZER_H */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.cpp b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.cpp new file mode 100644 index 00000000000..60049cf0bef --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.cpp @@ -0,0 +1,55 @@ +// $Id$ + +#include "Method_Request_Updates.h" + +#if ! defined (__ACE_INLINE__) +#include "Method_Request_Updates.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Notify, TAO_NS_Method_Request_Updates, "$id$") + +#include "tao/debug.h" +#include "Proxy.h" +#include "Peer.h" + +TAO_NS_Method_Request_Updates::TAO_NS_Method_Request_Updates (const TAO_NS_EventTypeSeq& added, const TAO_NS_EventTypeSeq& removed, TAO_NS_Proxy* proxy) + :added_ (added), removed_ (removed), proxy_ (proxy), refcountable_guard_ (*proxy) +{ +} + +TAO_NS_Method_Request_Updates::~TAO_NS_Method_Request_Updates () +{ +} + +TAO_NS_Method_Request* +TAO_NS_Method_Request_Updates::copy (void) +{ + /// @@use factory + return new TAO_NS_Method_Request_Updates (this->added_, this->removed_, this->proxy_); +} + +int +TAO_NS_Method_Request_Updates::execute (ACE_ENV_SINGLE_ARG_DECL) +{ + if (this->proxy_->has_shutdown ()) + return 0; // If we were shutdown while waiting in the queue, return with no action. + + ACE_TRY + { + TAO_NS_Peer* peer = this->proxy_->peer(); + + if (peer != 0) + { + peer->dispatch_updates (this->added_, this->removed_ ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + } + ACE_CATCHANY + { + if (TAO_debug_level > 0) + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_NS_Method_Request_Updates::execute error sending updates\n "); + } + ACE_ENDTRY; + + return 0; +} diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.h b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.h new file mode 100644 index 00000000000..00e68a21f18 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.h @@ -0,0 +1,64 @@ +/* -*- C++ -*- */ +/** + * @file Method_Request_Updates.h + * + * $Id$ + * + * @author Pradeep Gore + * + * + */ + +#ifndef TAO_NS_METHOD_REQUEST_UPDATES_H +#define TAO_NS_METHOD_REQUEST_UPDATES_H +#include "ace/pre.h" + +#include "notify_export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "Method_Request.h" +#include "Types.h" +#include "EventTypeSeq.h" + +/** + * @class TAO_NS_Method_Request_Updates + * + * @brief + * + */ +class TAO_Notify_Export TAO_NS_Method_Request_Updates : public TAO_NS_Method_Request +{ +public: + /// Constuctor + TAO_NS_Method_Request_Updates (const TAO_NS_EventTypeSeq& added, const TAO_NS_EventTypeSeq& removed, TAO_NS_Proxy* proxy); + + /// Destructor + ~TAO_NS_Method_Request_Updates (); + + /// Create a copy of this object. + TAO_NS_Method_Request* copy (void); + + /// Execute the Request + virtual int execute (ACE_ENV_SINGLE_ARG_DECL); + +private: + /// The Updates + const TAO_NS_EventTypeSeq added_; + const TAO_NS_EventTypeSeq removed_; + + /// The proxy that will receive the updates. + TAO_NS_Proxy* proxy_; + + /// Guard to automatically inc/decr ref count on the proxy. + TAO_NS_Refcountable_Guard refcountable_guard_; +}; + +#if defined (__ACE_INLINE__) +#include "Method_Request_Updates.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* TAO_NS_METHOD_REQUEST_UPDATES_H */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.inl b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.inl new file mode 100644 index 00000000000..bf5cc3848c2 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.inl @@ -0,0 +1,3 @@ +// $Id$ + +#include "Method_Request_Updates.h" diff --git a/TAO/orbsvcs/orbsvcs/Notify/Property.h b/TAO/orbsvcs/orbsvcs/Notify/Property.h new file mode 100644 index 00000000000..ae91ce42aff --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Property.h @@ -0,0 +1,39 @@ +/* -*- C++ -*- */ +/** + * @file Property.h + * + * $Id$ + * + * @author Pradeep Gore + * + * + */ + +#ifndef TAO_NS_PROPERTY_H +#define TAO_NS_PROPERTY_H +#include "ace/pre.h" + +#include "notify_export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "tao/corba.h" +#include "tao/orbconf.h" +#include "tao/TimeBaseC.h" +#include "orbsvcs/NotifyExtC.h" + +template class ACE_Atomic_Op; +template class TAO_NS_Property_T; +template class TAO_NS_StructProperty_T; + +typedef ACE_Atomic_Op TAO_NS_Atomic_Property_Long; +typedef TAO_NS_Property_T TAO_NS_Property_Long; +typedef TAO_NS_Property_T TAO_NS_Property_Short; +typedef TAO_NS_Property_T TAO_NS_Property_Time; +typedef TAO_NS_StructProperty_T TAO_NS_Property_ThreadPool; +typedef TAO_NS_StructProperty_T TAO_NS_Property_ThreadPoolLanes; + +#include "ace/post.h" +#endif /* TAO_NS_PROPERTY_H */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/PropertySeq.cpp b/TAO/orbsvcs/orbsvcs/Notify/PropertySeq.cpp new file mode 100644 index 00000000000..ad5042b9e87 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/PropertySeq.cpp @@ -0,0 +1,53 @@ +// $Id$ + +#include "PropertySeq.h" + +#if ! defined (__ACE_INLINE__) +#include "PropertySeq.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Notify, TAO_NS_PropertySeq, "$id$") + +TAO_NS_PropertySeq::TAO_NS_PropertySeq (void) +{ +} + +TAO_NS_PropertySeq::~TAO_NS_PropertySeq () +{ +} + +int +TAO_NS_PropertySeq::init (const CosNotification::PropertySeq& prop_seq) +{ + ACE_CString name; + + for (CORBA::ULong i = 0; i < prop_seq.length (); ++i) + { + name = prop_seq[i].name.in (); + + if (this->property_map_.rebind (name, prop_seq[i].value) == -1) + return -1; + } + // Note call to rebind. This allows to call to set updates. + + return 0; +} + +int +TAO_NS_PropertySeq::populate (CosNotification::PropertySeq_var& prop_seq) +{ + PROPERTY_MAP::ITERATOR iterator (this->property_map_); + + int index = prop_seq->length (); + prop_seq->length (index + this->property_map_.current_size ()); + + for (PROPERTY_MAP::ENTRY *entry = 0; + iterator.next (entry) != 0; + iterator.advance (), ++index) + { + (*prop_seq)[index].name = CORBA::string_dup (entry->ext_id_.c_str ()); + (*prop_seq)[index].value = entry->int_id_; + } + + return 0; +} diff --git a/TAO/orbsvcs/orbsvcs/Notify/PropertySeq.h b/TAO/orbsvcs/orbsvcs/Notify/PropertySeq.h new file mode 100644 index 00000000000..4371aea34f9 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/PropertySeq.h @@ -0,0 +1,62 @@ +/* -*- C++ -*- */ +/** + * @file PropertySeq.h + * + * $Id$ + * + * @author Pradeep Gore + * + * + */ + +#ifndef TAO_NS_PROPERTYSEQ_H +#define TAO_NS_PROPERTYSEQ_H +#include "ace/pre.h" + +#include "notify_export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "orbsvcs/CosNotificationC.h" +#include "ace/Hash_Map_Manager.h" +#include "ace/SString.h" + +/** + * @class TAO_NS_PropertySeq + * + * @brief + * + */ +class TAO_Notify_Export TAO_NS_PropertySeq +{ +public: + /// Constuctor + TAO_NS_PropertySeq (void); + + /// Destructor + ~TAO_NS_PropertySeq (); + + /// Return 0 on success, -1 on error. + int init (const CosNotification::PropertySeq& prop_seq); + + /// Find the for property . Returns 0 on success. + int find (const ACE_CString& name, CosNotification::PropertyValue& value) const; + + /// Return -1 on error. + int populate (CosNotification::PropertySeq_var& prop_seq); + +protected: + /// Property Map. + typedef ACE_Hash_Map_Manager PROPERTY_MAP; + + PROPERTY_MAP property_map_; +}; + +#if defined (__ACE_INLINE__) +#include "PropertySeq.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* TAO_NS_PROPERTYSEQ_H */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/PropertySeq.inl b/TAO/orbsvcs/orbsvcs/Notify/PropertySeq.inl new file mode 100644 index 00000000000..271543a2f82 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/PropertySeq.inl @@ -0,0 +1,7 @@ +// $Id$ + +ACE_INLINE int +TAO_NS_PropertySeq::find (const ACE_CString& name, CosNotification::PropertyValue& value) const +{ + return this->property_map_.find (name, value); +} diff --git a/TAO/orbsvcs/orbsvcs/Notify/Property_Boolean.cpp b/TAO/orbsvcs/orbsvcs/Notify/Property_Boolean.cpp new file mode 100644 index 00000000000..4c240b12bd9 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Property_Boolean.cpp @@ -0,0 +1,43 @@ +// $Id$ + +#include "Property_Boolean.h" + +#if ! defined (__ACE_INLINE__) +#include "Property_Boolean.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Notify, TAO_NS_Property_Boolean, "$id$") + +#include "PropertySeq.h" + +TAO_NS_Property_Boolean::TAO_NS_Property_Boolean (const ACE_CString& name) + :name_ (name), valid_(0) +{ +} + +TAO_NS_Property_Boolean::TAO_NS_Property_Boolean (const ACE_CString& name, CORBA::Boolean initial) + :name_ (name), value_ (initial), valid_ (1) +{ +} + +int +TAO_NS_Property_Boolean::set (const TAO_NS_PropertySeq& property_seq) +{ + CosNotification::PropertyValue value; + + if (property_seq.find (this->name_, value) == -1) + return -1; + + value >>= CORBA::Any::to_boolean (this->value_); + + return 0; +} + +void +TAO_NS_Property_Boolean::get (CosNotification::PropertySeq& prop_seq) +{ + /// Make space + prop_seq.length (prop_seq.length () + 1); + + prop_seq[prop_seq.length () - 1].value <<= CORBA::Any::from_boolean (this->value_); +} diff --git a/TAO/orbsvcs/orbsvcs/Notify/Property_Boolean.h b/TAO/orbsvcs/orbsvcs/Notify/Property_Boolean.h new file mode 100644 index 00000000000..1ddb447f418 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Property_Boolean.h @@ -0,0 +1,81 @@ +/* -*- C++ -*- */ +/** + * @file Property_Boolean.h + * + * $Id$ + * + * @author Pradeep Gore + * + * + */ + +#ifndef TAO_NS_PROPERTY_BOOLEAN_H +#define TAO_NS_PROPERTY_BOOLEAN_H +#include "ace/pre.h" + +#include "notify_export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "orbsvcs/CosNotificationC.h" + +class TAO_NS_PropertySeq; + +/** + * @class TAO_NS_Property_Boolean + * + * @brief Boolean Property. + * + */ +/*******************************************************************************/ + +class TAO_Notify_Export TAO_NS_Property_Boolean +{ +public: + /// Constuctor + TAO_NS_Property_Boolean (const ACE_CString& name, CORBA::Boolean initial); + + /// Constuctor + TAO_NS_Property_Boolean (const ACE_CString& name); + + /// Assignment from TAO_NS_Property_Boolean + TAO_NS_Property_Boolean& operator= (const TAO_NS_Property_Boolean& rhs); + + /// Assignment from CORBA::Boolean + TAO_NS_Property_Boolean& operator= (const CORBA::Boolean& rhs); + + /// Equality comparison operator. + int operator== (const CORBA::Boolean &rhs) const; + + /// Inequality comparison operator. + int operator!= (const CORBA::Boolean &rhs) const; + + int set (const TAO_NS_PropertySeq& property_seq); + + void get (CosNotification::PropertySeq& prop_seq); + + /// Return the value. + CORBA::Boolean value (void) const; + + /// Is the current value valid + CORBA::Boolean is_valid (void) const; + +protected: + /// The Property name. + ACE_CString name_; + + /// The value + CORBA::Boolean value_; + + /// Is the value valid + CORBA::Boolean valid_; +}; + +#if defined (__ACE_INLINE__) +#include "Property_Boolean.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* TAO_NS_PROPERTY_BOOLEAN_H */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Property_Boolean.inl b/TAO/orbsvcs/orbsvcs/Notify/Property_Boolean.inl new file mode 100644 index 00000000000..d30cabb4e39 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Property_Boolean.inl @@ -0,0 +1,49 @@ +// $Id$ + +ACE_INLINE TAO_NS_Property_Boolean& +TAO_NS_Property_Boolean::operator= (const TAO_NS_Property_Boolean& rhs) +{ + if (this == &rhs) + return *this; + + if (rhs.is_valid ()) + { + this->name_ = rhs.name_; + this->value_ = rhs.value_; + this->valid_ = rhs.valid_; + } + + return *this; +} + +ACE_INLINE TAO_NS_Property_Boolean& +TAO_NS_Property_Boolean::operator= (const CORBA::Boolean& value) +{ + this->value_ = value; + + return *this; +} + +ACE_INLINE int +TAO_NS_Property_Boolean::operator== (const CORBA::Boolean &rhs) const +{ + return (this->value_ == rhs); +} + +ACE_INLINE int +TAO_NS_Property_Boolean::operator!= (const CORBA::Boolean &rhs) const +{ + return (this->value_ != rhs); +} + +ACE_INLINE CORBA::Boolean +TAO_NS_Property_Boolean::value (void) const +{ + return this->value_; +} + +ACE_INLINE CORBA::Boolean +TAO_NS_Property_Boolean::is_valid (void) const +{ + return this->valid_; +} diff --git a/TAO/orbsvcs/orbsvcs/Notify/Property_T.cpp b/TAO/orbsvcs/orbsvcs/Notify/Property_T.cpp new file mode 100644 index 00000000000..051345844c1 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Property_T.cpp @@ -0,0 +1,119 @@ +// $Id$ + +#ifndef TAO_NS_PROPERTY_T_CPP +#define TAO_NS_PROPERTY_T_CPP + +#include "Property_T.h" + +#if ! defined (__ACE_INLINE__) +#include "Property_T.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Notify, TAO_NS_Property_T, "$id$") + +#include "PropertySeq.h" + +/*******************************************************************************/ + +template +TAO_NS_PropertyBase_T::TAO_NS_PropertyBase_T (const ACE_CString& name) + :name_ (name), valid_(0) +{ +} + +template +TAO_NS_PropertyBase_T::TAO_NS_PropertyBase_T (const ACE_CString& name, const TYPE& initial) + :name_ (name), value_ (initial), valid_ (1) +{ +} + +template +TAO_NS_PropertyBase_T::TAO_NS_PropertyBase_T (const TAO_NS_PropertyBase_T &rhs) +{ + this->name_ = rhs.name_; + this->value_ = rhs.value_; + this->valid_ = rhs.valid_; +} + +template +TAO_NS_PropertyBase_T::~TAO_NS_PropertyBase_T () +{ +} + +template void +TAO_NS_PropertyBase_T::get (CosNotification::PropertySeq& prop_seq) +{ + /// Make space + prop_seq.length (prop_seq.length () + 1); + + prop_seq[prop_seq.length () - 1].value <<= this->value_; +} + +/*******************************************************************************/ + +template +TAO_NS_Property_T::TAO_NS_Property_T (const ACE_CString& name) + :TAO_NS_PropertyBase_T (name) +{ +} + +template +TAO_NS_Property_T::TAO_NS_Property_T (const ACE_CString& name, const TYPE& initial) + :TAO_NS_PropertyBase_T (name, initial) +{ +} + +template int +TAO_NS_Property_T::set (const TAO_NS_PropertySeq& property_seq) +{ + CosNotification::PropertyValue value; + + if (property_seq.find (this->name_, value) == -1) + { + this->valid_ = 0; + return -1; + } + + value >>= this->value_; + + this->valid_ = 1; + + return 0; +} + +/*******************************************************************************/ + +template +TAO_NS_StructProperty_T::TAO_NS_StructProperty_T (const ACE_CString& name) + :name_ (name), valid_(0) +{ +} + +template +TAO_NS_StructProperty_T::TAO_NS_StructProperty_T (const ACE_CString& name, const TYPE& initial) + :name_ (name), value_ (initial), valid_ (1) +{ +} + +template int +TAO_NS_StructProperty_T::set (const TAO_NS_PropertySeq& property_seq) +{ + CosNotification::PropertyValue value; + + if (property_seq.find (this->name_, value) == -1) + { + this->valid_ = 0; + return -1; + } + + TYPE* extract_type; + value >>= extract_type; + + this->value_ = *extract_type; // copy + + this->valid_ = 1; + + return 0; +} + +#endif /* TAO_NS_PROPERTY_T_CPP */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Property_T.h b/TAO/orbsvcs/orbsvcs/Notify/Property_T.h new file mode 100644 index 00000000000..5b05d472b9f --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Property_T.h @@ -0,0 +1,161 @@ +/* -*- C++ -*- */ +/** + * @file Property_T.h + * + * $Id$ + * + * @author Pradeep Gore + * + * + */ + +#ifndef TAO_NS_PROPERTY_T_H +#define TAO_NS_PROPERTY_T_H +#include "ace/pre.h" + +#include "notify_export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/SString.h" +#include "orbsvcs/CosNotificationC.h" + +class TAO_NS_PropertySeq; + +/** + * @class TAO_NS_PropertyBase_T + * + * @brief + * + */ +template +class TAO_NS_PropertyBase_T +{ +public: + /// Constuctor + TAO_NS_PropertyBase_T (const ACE_CString& name, const TYPE& initial); + + /// Constuctor + TAO_NS_PropertyBase_T (const ACE_CString& name); + + /// Copy Constuctor + TAO_NS_PropertyBase_T (const TAO_NS_PropertyBase_T &rhs); + + /// Destructor + ~TAO_NS_PropertyBase_T (); + + /// Assignment from TAO_NS_PropertyBase_T + TAO_NS_PropertyBase_T& operator= (const TAO_NS_PropertyBase_T& rhs); + + /// Assignment from TYPE + TAO_NS_PropertyBase_T& operator= (const TYPE& rhs); + + /// Equality comparison operator. + int operator== (const TYPE &rhs) const; + + /// Inequality comparison operator. + int operator!= (const TYPE &rhs) const; + + /// Populate the Property Sequence with this valid value. + void get (CosNotification::PropertySeq& prop_seq); + + /// Return the value. + const TYPE& value (void) const; + + /// Is the current value valid + CORBA::Boolean is_valid (void) const; + + /// Invalidate this property's value. + void invalidate (void); + +protected: + /// The Property name. + ACE_CString name_; + + /// The value + TYPE value_; + + /// Is the value valid + CORBA::Boolean valid_; +}; + + +/*******************************************************************************/ +/** + * @class TAO_NS_Property_T + * + * @brief + * + */ +template +class TAO_NS_Property_T : public TAO_NS_PropertyBase_T +{ +public: + /// Constuctor + TAO_NS_Property_T (const ACE_CString& name, const TYPE& initial); + + /// Constuctor + TAO_NS_Property_T (const ACE_CString& name); + + /// Assignment from TYPE + TAO_NS_Property_T& operator= (const TYPE& rhs); + + /// Init this Property from the sequence. + /// Returns 0 on success, -1 on error + int set (const TAO_NS_PropertySeq& property_seq); +}; + +/*******************************************************************************/ +/** + * @class TAO_NS_StructProperty_T + * + * @brief + * + */ +template +class TAO_NS_StructProperty_T +{ +public: + /// Constuctor + TAO_NS_StructProperty_T (const ACE_CString& name, const TYPE& initial); + + /// Constuctor + TAO_NS_StructProperty_T (const ACE_CString& name); + + /// Init this Property from the sequence. + /// Returns 0 on success, -1 on error + int set (const TAO_NS_PropertySeq& property_seq); + + /// Return the value. + const TYPE& value (void) const; + + /// Is the current value valid + CORBA::Boolean is_valid (void) const; + +protected: + /// The Property name. + ACE_CString name_; + + /// The value + TYPE value_; + + /// Is the value valid + CORBA::Boolean valid_; +}; + +#if defined (__ACE_INLINE__) +#include "Property_T.inl" +#endif /* __ACE_INLINE__ */ + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "Property_T.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + +#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) +#pragma implementation ("Property_T.cpp") +#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + +#include "ace/post.h" +#endif /* TAO_NS_PROPERTY_T_H */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Property_T.inl b/TAO/orbsvcs/orbsvcs/Notify/Property_T.inl new file mode 100644 index 00000000000..3442b0fe7c4 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Property_T.inl @@ -0,0 +1,78 @@ +// $Id$ + +template ACE_INLINE const TYPE& +TAO_NS_PropertyBase_T::value (void) const +{ + return this->value_; +} + +template ACE_INLINE CORBA::Boolean +TAO_NS_PropertyBase_T::is_valid (void) const +{ + return this->valid_; +} + +template ACE_INLINE int +TAO_NS_PropertyBase_T::operator== (const TYPE &rhs) const +{ + return (this->value_ == rhs); +} + +template ACE_INLINE int +TAO_NS_PropertyBase_T::operator!= (const TYPE &rhs) const +{ + return (this->value_ != rhs); +} + +template ACE_INLINE TAO_NS_PropertyBase_T& +TAO_NS_PropertyBase_T::operator= (const TAO_NS_PropertyBase_T& rhs) +{ + if (this == &rhs) + return *this; + + if (rhs.is_valid ()) + { + this->name_ = rhs.name_; + this->value_ = rhs.value_; + this->valid_ = rhs.valid_; + } + + return *this; +} + +template ACE_INLINE TAO_NS_PropertyBase_T& +TAO_NS_PropertyBase_T::operator=(const TYPE& value) +{ + this->value_ = value; + + return *this; +} + +template ACE_INLINE void +TAO_NS_PropertyBase_T:: invalidate (void) +{ + this->valid_ = 0; +} + +/******************************************************************************/ + +template ACE_INLINE TAO_NS_Property_T& +TAO_NS_Property_T::operator=(const TYPE& value) +{ + this->TAO_NS_PropertyBase_T::operator= (value); + return *this; +} + +/******************************************************************************/ + +template ACE_INLINE const TYPE& +TAO_NS_StructProperty_T::value (void) const +{ + return this->value_; +} + +template ACE_INLINE CORBA::Boolean +TAO_NS_StructProperty_T::is_valid (void) const +{ + return this->valid_; +} diff --git a/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.cpp b/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.cpp new file mode 100644 index 00000000000..90a166ec142 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.cpp @@ -0,0 +1,80 @@ +// $Id$ + +#ifndef TAO_NS_PROXYCONSUMER_T_CPP +#define TAO_NS_PROXYCONSUMER_T_CPP + +#include "ProxyConsumer_T.h" + +#if ! defined (__ACE_INLINE__) +#include "ProxyConsumer_T.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Notify, TAO_NS_ProxyConsumer_T, "$id$") + +#include "Event_Manager.h" + +template +TAO_NS_ProxyConsumer_T::TAO_NS_ProxyConsumer_T (void) +{ +} + +template +TAO_NS_ProxyConsumer_T::~TAO_NS_ProxyConsumer_T () +{ +} + +template void +TAO_NS_ProxyConsumer_T::admin_types_changed (const CosNotification::EventTypeSeq & added, + const CosNotification::EventTypeSeq & removed + ACE_ENV_ARG_DECL) +{ + this->offer_change (added, removed ACE_ENV_ARG_PARAMETER); +} + +template CosNotifyChannelAdmin::SupplierAdmin_ptr +TAO_NS_ProxyConsumer_T::MyAdmin (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + CosNotifyChannelAdmin::SupplierAdmin_var ret; + + CORBA::Object_var object = this->parent_->ref (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK_RETURN (ret._retn ()); + + ret = CosNotifyChannelAdmin::SupplierAdmin::_narrow (object.in () ACE_ENV_ARG_PARAMETER); + + return ret._retn (); +} + +template void +TAO_NS_ProxyConsumer_T::offer_change (const CosNotification::EventTypeSeq & added, const CosNotification::EventTypeSeq & removed ACE_ENV_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException, + CosNotifyComm::InvalidEventType + )) +{ + TAO_NS_EventTypeSeq seq_added (added); + TAO_NS_EventTypeSeq seq_removed (removed); + + { + ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, ace_mon, this->lock_, + CORBA::INTERNAL ()); + ACE_CHECK; + + this->subscribed_types_.init (seq_added, seq_removed); + } + + this->event_manager_->offer_change (this, seq_added, seq_removed ACE_ENV_ARG_PARAMETER); +} + +template CosNotification::EventTypeSeq* +TAO_NS_ProxyConsumer_T::obtain_subscription_types (CosNotifyChannelAdmin::ObtainInfoMode mode ACE_ENV_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + return this->obtain_types (mode, this->event_manager_->subscription_types () ACE_ENV_ARG_PARAMETER); +} + +#endif /* TAO_NS_PROXYCONSUMER_T_CPP */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.h b/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.h new file mode 100644 index 00000000000..6ccc7dcec1d --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.h @@ -0,0 +1,84 @@ +/* -*- C++ -*- */ +/** + * @file ProxyConsumer_T.h + * + * $Id$ + * + * @author Pradeep Gore + * + * + */ + +#ifndef TAO_NS_PROXYCONSUMER_T_H +#define TAO_NS_PROXYCONSUMER_T_H +#include "ace/pre.h" + +#include "notify_export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "Proxy_T.h" +#include "ProxyConsumer.h" + +/** + * @class TAO_NS_ProxyConsumer_T + * + * @brief + * + */ +template +class TAO_Notify_Export TAO_NS_ProxyConsumer_T : public virtual TAO_NS_Proxy_T , public virtual TAO_NS_ProxyConsumer +{ +public: + /// Constuctor + TAO_NS_ProxyConsumer_T (void); + + /// Destructor + ~TAO_NS_ProxyConsumer_T (); + + /// Notification of subscriptions set at the admin. + virtual void admin_types_changed (const CosNotification::EventTypeSeq & added, + const CosNotification::EventTypeSeq & removed + ACE_ENV_ARG_DECL); + + virtual CosNotifyChannelAdmin::SupplierAdmin_ptr MyAdmin (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + )); + + virtual CosNotification::EventTypeSeq * obtain_subscription_types ( + CosNotifyChannelAdmin::ObtainInfoMode mode + ACE_ENV_ARG_DECL + ) + ACE_THROW_SPEC (( + CORBA::SystemException + )); + + virtual void offer_change ( + const CosNotification::EventTypeSeq & added, + const CosNotification::EventTypeSeq & removed + ACE_ENV_ARG_DECL + ) + ACE_THROW_SPEC (( + CORBA::SystemException, + CosNotifyComm::InvalidEventType + )); + +}; + +#if defined (__ACE_INLINE__) +#include "ProxyConsumer_T.inl" +#endif /* __ACE_INLINE__ */ + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "ProxyConsumer_T.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + +#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) +#pragma implementation ("ProxyConsumer_T.cpp") +#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + +#include "ace/post.h" +#endif /* TAO_NS_PROXYCONSUMER_T_H */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.inl b/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.inl new file mode 100644 index 00000000000..cfa1da318d3 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.inl @@ -0,0 +1 @@ +// $Id$ diff --git a/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.cpp b/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.cpp new file mode 100644 index 00000000000..7db52521e8b --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.cpp @@ -0,0 +1,222 @@ +// $Id$ + +#ifndef TAO_NS_PROXYSUPPLIER_T_C +#define TAO_NS_PROXYSUPPLIER_T_C + +#include "ProxySupplier_T.h" + +#if ! defined (__ACE_INLINE__) +#include "ProxySupplier_T.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Notify, TAO_NS_ProxySupplier_T, "$id$") + +#include "Consumer.h" +#include "Structured/StructuredEvent.h" +#include "Any/AnyEvent.h" + +#include "Method_Request_Dispatch.h" +#include "Method_Request_Dispatch_No_Filtering.h" +#include "Worker_Task.h" +#include "Event_Manager.h" + +template +TAO_NS_ProxySupplier_T::TAO_NS_ProxySupplier_T (void) + :is_suspended_ (0) +{ +} + +template +TAO_NS_ProxySupplier_T::~TAO_NS_ProxySupplier_T () +{ +} + +template void +TAO_NS_ProxySupplier_T::admin_types_changed (const CosNotification::EventTypeSeq & added, + const CosNotification::EventTypeSeq & removed + ACE_ENV_ARG_DECL) +{ + this->subscription_change (added, removed ACE_ENV_ARG_PARAMETER); +} + +template void +TAO_NS_ProxySupplier_T::forward_structured (const CosNotification::StructuredEvent& notification ACE_ENV_ARG_DECL_NOT_USED) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + TAO_NS_Event_var event (new TAO_NS_StructuredEvent (notification)); + + TAO_NS_Method_Request_Dispatch request (event, this); + + this->worker_task ()->exec (request); +} + +template void +TAO_NS_ProxySupplier_T::forward_structured_no_filtering (const CosNotification::StructuredEvent& notification ACE_ENV_ARG_DECL_NOT_USED) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + TAO_NS_Event_var event (new TAO_NS_StructuredEvent (notification)); + + TAO_NS_Method_Request_Dispatch_No_Filtering request (event, this); + + this->worker_task ()->exec (request); +} + +template void +TAO_NS_ProxySupplier_T::forward_any (const CORBA::Any & data ACE_ENV_ARG_DECL_NOT_USED) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + TAO_NS_Event_var event (new TAO_NS_AnyEvent (data)); + + TAO_NS_Method_Request_Dispatch request (event, this); + + this->worker_task ()->exec (request); +} + +template void +TAO_NS_ProxySupplier_T::forward_any_no_filtering (const CORBA::Any& data ACE_ENV_ARG_DECL_NOT_USED) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + TAO_NS_Event_var event (new TAO_NS_AnyEvent (data)); + + TAO_NS_Method_Request_Dispatch_No_Filtering request (event, this); + + this->worker_task ()->exec (request); +} + +template CosNotification::EventTypeSeq* +TAO_NS_ProxySupplier_T::obtain_offered_types (CosNotifyChannelAdmin::ObtainInfoMode mode ACE_ENV_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + return this->obtain_types (mode, this->event_manager_->offered_types () ACE_ENV_ARG_PARAMETER); +} + +template void +TAO_NS_ProxySupplier_T::subscription_change (const CosNotification::EventTypeSeq & added, + const CosNotification::EventTypeSeq & removed + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException, + CosNotifyComm::InvalidEventType)) +{ + TAO_NS_EventTypeSeq seq_added (added); + TAO_NS_EventTypeSeq seq_removed (removed); + + { + ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, ace_mon, this->lock_, + CORBA::INTERNAL ()); + ACE_CHECK; + + this->subscribed_types_.init (seq_added, seq_removed); + } + + this->event_manager_->subscription_change (this, seq_added, seq_removed ACE_ENV_ARG_PARAMETER); +} + +template void +TAO_NS_ProxySupplier_T::suspend_connection (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException, + CosNotifyChannelAdmin::ConnectionAlreadyInactive, + CosNotifyChannelAdmin::NotConnected + )) +{ + { + ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, ace_mon, this->lock_, CORBA::INTERNAL ()); + + if (this->is_connected () == 0) + ACE_THROW (CosNotifyChannelAdmin::NotConnected ()); + + if (this->consumer_->is_suspended () == 1) + ACE_THROW (CosNotifyChannelAdmin::ConnectionAlreadyInactive ()); + } + + this->consumer_->suspend (ACE_ENV_SINGLE_ARG_PARAMETER); +} + +template void +TAO_NS_ProxySupplier_T::resume_connection (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException, + CosNotifyChannelAdmin::ConnectionAlreadyActive, + CosNotifyChannelAdmin::NotConnected + )) +{ + { + ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, ace_mon, this->lock_, CORBA::INTERNAL ()); + + if (this->is_connected () == 0) + ACE_THROW (CosNotifyChannelAdmin::NotConnected ()); + + if (this->consumer_->is_suspended () == 0) + ACE_THROW (CosNotifyChannelAdmin::ConnectionAlreadyActive ()); + } + + this->consumer_->resume (ACE_ENV_SINGLE_ARG_PARAMETER); +} + +template CosNotifyChannelAdmin::ConsumerAdmin_ptr +TAO_NS_ProxySupplier_T::MyAdmin (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + CosNotifyChannelAdmin::ConsumerAdmin_var ret; + + CORBA::Object_var object = this->parent_->ref (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK_RETURN (ret._retn ()); + + ret = CosNotifyChannelAdmin::ConsumerAdmin::_narrow (object.in () ACE_ENV_ARG_PARAMETER); + + return ret._retn (); +} + +/***************************** UNIMPLEMENTED METHODS***************************************/ + +template CosNotifyFilter::MappingFilter_ptr +TAO_NS_ProxySupplier_T::priority_filter (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + ACE_THROW_RETURN (CORBA::NO_IMPLEMENT (), + CosNotifyFilter::MappingFilter::_nil ()); +} + +template void +TAO_NS_ProxySupplier_T::priority_filter (CosNotifyFilter::MappingFilter_ptr /*priority_filter*/ ACE_ENV_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + ACE_THROW (CORBA::NO_IMPLEMENT ()); +} + +template CosNotifyFilter::MappingFilter_ptr +TAO_NS_ProxySupplier_T::lifetime_filter (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + ACE_THROW_RETURN (CORBA::NO_IMPLEMENT (), + CosNotifyFilter::MappingFilter::_nil ()); +} + +template void +TAO_NS_ProxySupplier_T::lifetime_filter (CosNotifyFilter::MappingFilter_ptr /*lifetime_filter*/ ACE_ENV_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + ACE_THROW (CORBA::NO_IMPLEMENT ()); +} + +#endif /* #define TAO_NS_PROXYSUPPLIER_T_C */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.h b/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.h new file mode 100644 index 00000000000..c0e8d2054fb --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.h @@ -0,0 +1,162 @@ +/* -*- C++ -*- */ +/** + * @file ProxySupplier_T.h + * + * $Id$ + * + * @author Pradeep Gore + * + * + */ + +#ifndef TAO_NS_PROXYSUPPLIER_T_H +#define TAO_NS_PROXYSUPPLIER_T_H +#include "ace/pre.h" + +#include "notify_export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "Proxy_T.h" +#include "ProxySupplier.h" + +/** + * @class TAO_NS_ProxySupplier_T + * + * @brief + * + */ +template +class TAO_NS_ProxySupplier_T : public virtual TAO_NS_Proxy_T , public virtual TAO_NS_ProxySupplier +{ +public: + /// Constuctor + TAO_NS_ProxySupplier_T (void); + + /// Destructor + ~TAO_NS_ProxySupplier_T (); + + /// Notification of subscriptions set at the admin. + virtual void admin_types_changed (const CosNotification::EventTypeSeq & added, + const CosNotification::EventTypeSeq & removed + ACE_ENV_ARG_DECL); + + ///= POA_Notify_Internal methods + /// POA_Notify_Internal::Event_Forwarder method + virtual void forward_structured (const CosNotification::StructuredEvent & event ACE_ENV_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + )); + + /// POA_Notify_Internal::Event_Forwarder method + virtual void forward_structured_no_filtering (const CosNotification::StructuredEvent & event ACE_ENV_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + )); + + /// POA_Notify_Internal::Event_Forwarder method + virtual void forward_any (const CORBA::Any & event ACE_ENV_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + )); + + /// POA_Notify_Internal::Event_Forwarder method + virtual void forward_any_no_filtering (const CORBA::Any & event ACE_ENV_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + )); +protected: + //= Data Members + CORBA::Boolean is_suspended_; + + // = Interface methods + virtual CosNotifyChannelAdmin::ConsumerAdmin_ptr MyAdmin ( + ACE_ENV_SINGLE_ARG_DECL + ) + ACE_THROW_SPEC (( + CORBA::SystemException + )); + + virtual void suspend_connection ( + ACE_ENV_SINGLE_ARG_DECL + ) + ACE_THROW_SPEC (( + CORBA::SystemException, + CosNotifyChannelAdmin::ConnectionAlreadyInactive, + CosNotifyChannelAdmin::NotConnected + )); + + virtual void resume_connection ( + ACE_ENV_SINGLE_ARG_DECL + ) + ACE_THROW_SPEC (( + CORBA::SystemException, + CosNotifyChannelAdmin::ConnectionAlreadyActive, + CosNotifyChannelAdmin::NotConnected + )); + + virtual CosNotifyFilter::MappingFilter_ptr priority_filter ( + ACE_ENV_SINGLE_ARG_DECL + ) + ACE_THROW_SPEC (( + CORBA::SystemException + )); + + virtual void priority_filter ( + CosNotifyFilter::MappingFilter_ptr priority_filter + ACE_ENV_ARG_DECL + ) + ACE_THROW_SPEC (( + CORBA::SystemException + )); + + virtual CosNotifyFilter::MappingFilter_ptr lifetime_filter ( + ACE_ENV_SINGLE_ARG_DECL + ) + ACE_THROW_SPEC (( + CORBA::SystemException + )); + + virtual void lifetime_filter ( + CosNotifyFilter::MappingFilter_ptr lifetime_filter + ACE_ENV_ARG_DECL + ) + ACE_THROW_SPEC (( + CORBA::SystemException + )); + + virtual CosNotification::EventTypeSeq * obtain_offered_types ( + CosNotifyChannelAdmin::ObtainInfoMode mode + ACE_ENV_ARG_DECL + ) + ACE_THROW_SPEC (( + CORBA::SystemException + )); + + virtual void subscription_change ( + const CosNotification::EventTypeSeq & added, + const CosNotification::EventTypeSeq & removed + ACE_ENV_ARG_DECL + ) + ACE_THROW_SPEC (( + CORBA::SystemException, + CosNotifyComm::InvalidEventType + )); +}; + +#if defined (__ACE_INLINE__) +#include "ProxySupplier_T.inl" +#endif /* __ACE_INLINE__ */ + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "ProxySupplier_T.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + +#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) +#pragma implementation ("ProxySupplier_T.cpp") +#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + +#include "ace/post.h" +#endif /* TAO_NS_PROXYSUPPLIER_T_H */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.inl b/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.inl new file mode 100644 index 00000000000..cfa1da318d3 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.inl @@ -0,0 +1 @@ +// $Id$ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Proxy_T.cpp b/TAO/orbsvcs/orbsvcs/Notify/Proxy_T.cpp new file mode 100644 index 00000000000..06291c3cf14 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Proxy_T.cpp @@ -0,0 +1,135 @@ +// $Id$ + +#include "Proxy_T.h" + +#ifndef TAO_NS_PROXY_T_CPP +#define TAO_NS_PROXY_T_CPP + +#if ! defined (__ACE_INLINE__) +#include "Proxy_T.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Notify, TAO_NS_Proxy_T, "$id$") + +template +TAO_NS_Proxy_T::TAO_NS_Proxy_T (void) +{ +} + +template +TAO_NS_Proxy_T::~TAO_NS_Proxy_T () +{ +} + +template PortableServer::Servant +TAO_NS_Proxy_T::servant (void) +{ + return this; +} + +template void +TAO_NS_Proxy_T::_add_ref (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) +{ + this->_incr_refcnt (); +} + +template void +TAO_NS_Proxy_T::_remove_ref (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) +{ + this->_decr_refcnt (); +} + +template void +TAO_NS_Proxy_T::validate_event_qos (const CosNotification::QoSProperties & /*required_qos*/, CosNotification::NamedPropertyRangeSeq_out /*available_qos*/ ACE_ENV_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException, + CosNotification::UnsupportedQoS + )) +{ + ACE_THROW (CORBA::NO_IMPLEMENT ()); +} + +template CosNotification::QoSProperties* +TAO_NS_Proxy_T::get_qos (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + return this->TAO_NS_Object::get_qos (ACE_ENV_SINGLE_ARG_PARAMETER); +} + +template void +TAO_NS_Proxy_T::set_qos (const CosNotification::QoSProperties & qos ACE_ENV_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException, + CosNotification::UnsupportedQoS + )) +{ + this->TAO_NS_Object::set_qos (qos ACE_ENV_ARG_PARAMETER); +} + +template void +TAO_NS_Proxy_T::validate_qos ( + const CosNotification::QoSProperties & /*required_qos*/, + CosNotification::NamedPropertyRangeSeq_out /*available_qos*/ + ACE_ENV_ARG_DECL + ) + ACE_THROW_SPEC (( + CORBA::SystemException, + CosNotification::UnsupportedQoS + )) +{ + ACE_THROW (CORBA::NO_IMPLEMENT ()); +} + +template CosNotifyFilter::FilterID +TAO_NS_Proxy_T::add_filter (CosNotifyFilter::Filter_ptr new_filter ACE_ENV_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + return this->filter_admin_.add_filter (new_filter ACE_ENV_ARG_PARAMETER); +} + +template void +TAO_NS_Proxy_T::remove_filter ( + CosNotifyFilter::FilterID filter + ACE_ENV_ARG_DECL + ) + ACE_THROW_SPEC (( + CORBA::SystemException, + CosNotifyFilter::FilterNotFound + )) +{ + this->filter_admin_.remove_filter (filter ACE_ENV_ARG_PARAMETER); +} + +template CosNotifyFilter::Filter_ptr +TAO_NS_Proxy_T::get_filter (CosNotifyFilter::FilterID filter ACE_ENV_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException, + CosNotifyFilter::FilterNotFound + )) +{ + return this->filter_admin_.get_filter (filter ACE_ENV_ARG_PARAMETER); +} + +template CosNotifyFilter::FilterIDSeq* +TAO_NS_Proxy_T::get_all_filters (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + return this->filter_admin_.get_all_filters (ACE_ENV_SINGLE_ARG_PARAMETER); +} + +template void +TAO_NS_Proxy_T::remove_all_filters (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + this->filter_admin_.remove_all_filters (ACE_ENV_SINGLE_ARG_PARAMETER); +} + +#endif /* TAO_NS_PROXY_T_CPP */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Proxy_T.h b/TAO/orbsvcs/orbsvcs/Notify/Proxy_T.h new file mode 100644 index 00000000000..090d4edd49a --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Proxy_T.h @@ -0,0 +1,158 @@ +/* -*- C++ -*- */ +/** + * @file Proxy_T.h + * + * $Id$ + * + * @author Pradeep Gore + * + * + */ + +#ifndef TAO_NS_PROXY_T_H +#define TAO_NS_PROXY_T_H +#include "ace/pre.h" + +#include "notify_export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "Proxy.h" + +/** + * @class TAO_NS_Proxy_T + * + * @brief The is a base class for all proxys , templatized by the servant + * type. All the Filter Admin and QoS Admin interface methods are + * implemented here by delegating to the admin implementations. + * + */ +template +class TAO_NS_Proxy_T : public SERVANT_TYPE, public virtual TAO_NS_Proxy +{ +public: + /// Constuctor + TAO_NS_Proxy_T (void); + + /// Destructor + ~TAO_NS_Proxy_T (); + + /// Implements TAO_NS_Object::servant method. + virtual PortableServer::Servant servant (void); + + /// ServantBase refcount methods. + virtual void _add_ref (ACE_ENV_SINGLE_ARG_DECL); + virtual void _remove_ref (ACE_ENV_SINGLE_ARG_DECL); + + virtual void validate_event_qos ( + const CosNotification::QoSProperties & required_qos, + CosNotification::NamedPropertyRangeSeq_out available_qos + ACE_ENV_ARG_DECL + ) + ACE_THROW_SPEC (( + CORBA::SystemException, + CosNotification::UnsupportedQoS + )); + + virtual CosNotification::QoSProperties * get_qos ( + ACE_ENV_SINGLE_ARG_DECL + ) + ACE_THROW_SPEC (( + CORBA::SystemException + )); + + virtual void set_qos ( + const CosNotification::QoSProperties & qos + ACE_ENV_ARG_DECL + ) + ACE_THROW_SPEC (( + CORBA::SystemException, + CosNotification::UnsupportedQoS + )); + + virtual void validate_qos ( + const CosNotification::QoSProperties & required_qos, + CosNotification::NamedPropertyRangeSeq_out available_qos + ACE_ENV_ARG_DECL + ) + ACE_THROW_SPEC (( + CORBA::SystemException, + CosNotification::UnsupportedQoS + )); + + virtual CosNotifyFilter::FilterID add_filter ( + CosNotifyFilter::Filter_ptr new_filter + ACE_ENV_ARG_DECL + ) + ACE_THROW_SPEC (( + CORBA::SystemException + )); + + virtual void remove_filter ( + CosNotifyFilter::FilterID filter + ACE_ENV_ARG_DECL + ) + ACE_THROW_SPEC (( + CORBA::SystemException, + CosNotifyFilter::FilterNotFound + )); + + virtual CosNotifyFilter::Filter_ptr get_filter ( + CosNotifyFilter::FilterID filter + ACE_ENV_ARG_DECL + ) + ACE_THROW_SPEC (( + CORBA::SystemException, + CosNotifyFilter::FilterNotFound + )); + + virtual CosNotifyFilter::FilterIDSeq * get_all_filters ( + ACE_ENV_SINGLE_ARG_DECL + ) + ACE_THROW_SPEC (( + CORBA::SystemException + )); + + virtual void remove_all_filters ( + ACE_ENV_SINGLE_ARG_DECL + ) + ACE_THROW_SPEC (( + CORBA::SystemException + )); +}; + +#if defined ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT +template class TAO_Notify_Export +TAO_NS_Proxy_T; +template class TAO_Notify_Export +TAO_NS_Proxy_T; +template class TAO_Notify_Export +TAO_NS_Proxy_T; +template class TAO_Notify_Export +TAO_NS_Proxy_T; +template class TAO_Notify_Export +TAO_NS_Proxy_T; +template class TAO_Notify_Export +TAO_NS_Proxy_T; +template class TAO_Notify_Export +TAO_NS_Proxy_T; +template class TAO_Notify_Export +TAO_NS_Proxy_T; +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT */ + +#if defined (__ACE_INLINE__) +#include "Proxy_T.inl" +#endif /* __ACE_INLINE__ */ + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "Proxy_T.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + +#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) +#pragma implementation ("Proxy_T.cpp") +#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + +#include "ace/post.h" +#endif /* TAO_NS_PROXY_T_H */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Proxy_T.inl b/TAO/orbsvcs/orbsvcs/Notify/Proxy_T.inl new file mode 100644 index 00000000000..cfa1da318d3 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Proxy_T.inl @@ -0,0 +1 @@ +// $Id$ diff --git a/TAO/orbsvcs/orbsvcs/Notify/QoSProperties.cpp b/TAO/orbsvcs/orbsvcs/Notify/QoSProperties.cpp new file mode 100644 index 00000000000..2a8672aa1f6 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/QoSProperties.cpp @@ -0,0 +1,126 @@ +// $Id$ + +#include "QoSProperties.h" + +#if ! defined (__ACE_INLINE__) +#include "QoSProperties.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Notify, TAO_NS_QoSProperties, "$id$") + +#include "Property.h" + +TAO_NS_QoSProperties::TAO_NS_QoSProperties (void) + :priority_ (CosNotification::Priority), + timeout_ (CosNotification::Timeout), + stop_time_supported_ (CosNotification::StopTimeSupported), + maximum_batch_size_ (CosNotification::MaximumBatchSize), + pacing_interval_ (CosNotification::PacingInterval), + thread_pool_ (NotifyExt::ThreadPool), + thread_pool_lane_ (NotifyExt::ThreadPoolLanes) +{ + unsupported_[0] = CosNotification::EventReliability; + unsupported_[1] = CosNotification::ConnectionReliability; + unsupported_[2] = CosNotification::StartTimeSupported; +} + +TAO_NS_QoSProperties::~TAO_NS_QoSProperties () +{ +} + +int +TAO_NS_QoSProperties::unsupported (ACE_CString& name) +{ + for (int i = 0; i < UNSUPPORTED_PROPERTY_COUNT; ++i) + { + if (this->unsupported_[i] == name) + return 1; + } + + return 0; +} + +int +TAO_NS_QoSProperties::init (const CosNotification::PropertySeq& prop_seq, CosNotification::PropertyErrorSeq& err_seq) +{ + int err_index = -1; + + ACE_CString name; + for (CORBA::ULong i = 0; i < prop_seq.length (); ++i) + { + name = prop_seq[i].name.in(); + + if (this->unsupported (name)) + { + err_index = err_seq.length (); + err_seq.length (err_seq.length () + 1); + + err_seq[err_index].code = CosNotification::UNSUPPORTED_PROPERTY; + err_seq[err_index].name = CORBA::string_dup (prop_seq[i].name); + } + else if (this->property_map_.rebind (prop_seq[i].name.in (), prop_seq[i].value) == -1) + return -1; + // Note call to rebind. This allows to call to set updates. + } + + // Now, init the supported properties + this->priority_.set (*this); + this->timeout_.set (*this); + this->stop_time_supported_.set (*this); + this->maximum_batch_size_.set (*this); + this->pacing_interval_.set (*this); + this->thread_pool_.set (*this); + this->thread_pool_lane_.set (*this); + + return err_index == -1 ? 0 : 1; +} + +void +TAO_NS_QoSProperties::transfer (TAO_NS_QoSProperties& qos_properties) +{ + qos_properties.priority_ = this->priority_; + qos_properties.timeout_ = this->timeout_; + qos_properties.stop_time_supported_ = this->stop_time_supported_; + qos_properties.maximum_batch_size_ = this->maximum_batch_size_; + qos_properties.pacing_interval_ = this->pacing_interval_; + + PROPERTY_MAP::ITERATOR iter (this->property_map_); + PROPERTY_MAP::ENTRY *entry; + + for (; iter.next (entry); iter.advance ()) + { + qos_properties.property_map_.bind (entry->ext_id_, entry->int_id_); + } + + // unbind the properties that we don't want to transfer. + qos_properties.property_map_.unbind (NotifyExt::ThreadPool); + qos_properties.property_map_.unbind (NotifyExt::ThreadPoolLanes); +} + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) + +template class TAO_NS_PropertyBase_T; +template class TAO_NS_PropertyBase_T; +template class TAO_NS_PropertyBase_T; + +template class TAO_NS_Property_T; +template class TAO_NS_Property_T; +template class TAO_NS_Property_T; + +template class TAO_NS_StructProperty_T; +template class TAO_NS_StructProperty_T; + +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) + +#pragma instantiate TAO_NS_PropertyBase_T +#pragma instantiate TAO_NS_PropertyBase_T +#pragma instantiate TAO_NS_PropertyBase_T + +#pragma instantiate TAO_NS_Property_T +#pragma instantiate TAO_NS_Property_T +#pragma instantiate TAO_NS_Property_T + +#pragma instantiate TAO_NS_StructProperty_T +#pragma instantiate TAO_NS_StructProperty_T + +#endif /*ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/QoSProperties.h b/TAO/orbsvcs/orbsvcs/Notify/QoSProperties.h new file mode 100644 index 00000000000..08911f86ef4 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/QoSProperties.h @@ -0,0 +1,99 @@ +/* -*- C++ -*- */ +/** + * @file QoSProperties.h + * + * $Id$ + * + * @author Pradeep Gore + * + * + */ + +#ifndef TAO_NS_QOSPROPERTIES_H +#define TAO_NS_QOSPROPERTIES_H +#include "ace/pre.h" + +#include "notify_export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "PropertySeq.h" +#include "Property_T.h" +#include "Property_Boolean.h" +#include "Property.h" + +/** + * @class TAO_NS_QoSProperties + * + * @brief + * + */ +class TAO_Notify_Export TAO_NS_QoSProperties : public TAO_NS_PropertySeq +{ +public: + /// Constuctor + TAO_NS_QoSProperties (void); + + /// Destructor + ~TAO_NS_QoSProperties (); + + /// Return 0 on success, 1 if unsupported properties were detected and -1 on error. + int init (const CosNotification::PropertySeq& prop_seq, CosNotification::PropertyErrorSeq& err_seq); + + /// Populate with properties that can be transfered. + void transfer (TAO_NS_QoSProperties& qos_properties); + + ///= Accessors + /// ThreadPool + const TAO_NS_Property_ThreadPool& thread_pool (void) const; + + /// ThreadPoolLane + const TAO_NS_Property_ThreadPoolLanes& thread_pool_lane (void) const; + + /// Maximum Batch Size + const TAO_NS_Property_Long& maximum_batch_size (void) const; + + /// Pacing Interval + const TAO_NS_Property_Time& pacing_interval (void) const; + +protected: + /// Return 1 if is unsupported. + int unsupported (ACE_CString& name); + + enum {UNSUPPORTED_PROPERTY_COUNT = 3}; + + ///= Unsupported Properties. + ACE_CString unsupported_[UNSUPPORTED_PROPERTY_COUNT]; + + ///= Supported properties + + /// Priority + TAO_NS_Property_Short priority_; + + /// Timeout + TAO_NS_Property_Time timeout_; + + /// Stop Time Supported + TAO_NS_Property_Boolean stop_time_supported_; + + /// Maximum Batch Size + TAO_NS_Property_Long maximum_batch_size_; + + /// Pacing Interval + TAO_NS_Property_Time pacing_interval_; + + /// ThreadPool Params. + TAO_NS_Property_ThreadPool thread_pool_; + + /// ThreadPoolLane Params. + TAO_NS_Property_ThreadPoolLanes thread_pool_lane_; +}; + +#if defined (__ACE_INLINE__) +#include "QoSProperties.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* TAO_NS_QOSPROPERTIES_H */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/QoSProperties.inl b/TAO/orbsvcs/orbsvcs/Notify/QoSProperties.inl new file mode 100644 index 00000000000..e331e8b1046 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/QoSProperties.inl @@ -0,0 +1,25 @@ +// $Id$ + +ACE_INLINE const TAO_NS_Property_ThreadPool& +TAO_NS_QoSProperties::thread_pool (void) const +{ + return this->thread_pool_; +} + +ACE_INLINE const TAO_NS_Property_ThreadPoolLanes& +TAO_NS_QoSProperties::thread_pool_lane (void) const +{ + return this->thread_pool_lane_; +} + +ACE_INLINE const TAO_NS_Property_Long& +TAO_NS_QoSProperties::maximum_batch_size (void) const +{ + return this->maximum_batch_size_; +} + +ACE_INLINE const TAO_NS_Property_Time& +TAO_NS_QoSProperties::pacing_interval (void) const +{ + return this->pacing_interval_; +} diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.cpp b/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.cpp new file mode 100644 index 00000000000..1c819aa54ba --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.cpp @@ -0,0 +1,92 @@ +// $Id$ + +#include "Batch_Buffering_Strategy.h" + +#if ! defined (__ACE_INLINE__) +#include "Batch_Buffering_Strategy.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Notify, TAO_NS_Batch_Buffering_Strategy, "$id$") + +TAO_NS_Batch_Buffering_Strategy::TAO_NS_Batch_Buffering_Strategy (TAO_NS_Message_Queue& msg_queue, TAO_NS_AdminProperties_var& admin_properties, CORBA::Long batch_size) + :TAO_NS_Buffering_Strategy (msg_queue, admin_properties, batch_size) +{ +} + +TAO_NS_Batch_Buffering_Strategy::~TAO_NS_Batch_Buffering_Strategy () +{ +} + +int +TAO_NS_Batch_Buffering_Strategy::dequeue_batch (CosNotification::EventBatch& event_batch) +{ + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1); + + // if batch_size is infinite, simply dequeue everything available. + + int pending = 0; // not used. + + if (this->batch_size_ == 0) + { + return this->dequeue_available (event_batch, pending); + } + else + { + // block till batch size of events are available. + while (this->msg_queue_.message_count () < this->batch_size_) + { + if (this->shutdown_ == 1) // if we're shutdown, don't play this silly game. + return -1; + + this->batch_size_reached_condition_.wait (); + } + + return this->dequeue_i (this->batch_size_, event_batch); + } +} + +int +TAO_NS_Batch_Buffering_Strategy::dequeue_available (CosNotification::EventBatch& event_batch, int &pending) +{ + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1); + + int deq_count = this->msg_queue_.message_count (); + + if (this->batch_size_ != 0 && deq_count > this->batch_size_) // Restrict upto batch size. + deq_count = this->batch_size_; + + pending = this->msg_queue_.message_count () - deq_count; + + return this->dequeue_i (deq_count, event_batch); +} + +int +TAO_NS_Batch_Buffering_Strategy::dequeue_i (int max_deq_count, CosNotification::EventBatch& event_batch) +{ + ACE_Message_Block *mb; + + int deq_count = 0; + + event_batch.length (max_deq_count); + + for (; deq_count < max_deq_count; ++deq_count) + { + if (this->msg_queue_.dequeue (mb) == -1) + break; // error, simply return what we could extract so far. + + --this->global_queue_length_; + + TAO_NS_Method_Request_Event* mre = ACE_dynamic_cast (TAO_NS_Method_Request_Event*, mb); + + mre->event ()->convert (event_batch[deq_count]); + + ACE_Message_Block::release (mb); + } + + event_batch.length (deq_count); + + this->global_queue_not_full_condition_.signal (); + this->local_queue_not_full_condition_.signal (); + + return deq_count; +} diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.h b/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.h new file mode 100644 index 00000000000..196f4a67242 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.h @@ -0,0 +1,60 @@ +/* -*- C++ -*- */ +/** + * @file Batch_Buffering_Strategy.h + * + * $Id$ + * + * @author Pradeep Gore + * + * + */ + +#ifndef TAO_NS_BATCH_BUFFERING_STRATEGY_H +#define TAO_NS_BATCH_BUFFERING_STRATEGY_H +#include "ace/pre.h" + +#include "../notify_export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "../Method_Request.h" +#include "../Buffering_Strategy.h" + +/** + * @class TAO_NS_Batch_Buffering_Strategy + * + * @brief + * + */ +class TAO_Notify_Export TAO_NS_Batch_Buffering_Strategy : public TAO_NS_Buffering_Strategy +{ +public: + /// Constuctor + TAO_NS_Batch_Buffering_Strategy (TAO_NS_Message_Queue& msg_queue, TAO_NS_AdminProperties_var& admin_properties, CORBA::Long batch_size); + + /// Destructor + ~TAO_NS_Batch_Buffering_Strategy (); + + /// Dequeue batch. This method will block till is available.. + /// Return -1 on error else the number of items actually dequeued. + int dequeue_batch (CosNotification::EventBatch& event_batch); + + /// Dequeue upto batch. This method will not block. + /// Return -1 on error else the number of items dequeued (). + /// is set to the number of events remaining in the queue. + int dequeue_available (CosNotification::EventBatch& event_batch, int &pending); + +protected: + + /// Extract upto number of items. + int dequeue_i (int max_deq_count, CosNotification::EventBatch& event_batch); +}; + +#if defined (__ACE_INLINE__) +#include "Batch_Buffering_Strategy.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* TAO_NS_BATCH_BUFFERING_STRATEGY_H */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.inl b/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.inl new file mode 100644 index 00000000000..72a7bb34341 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.inl @@ -0,0 +1,3 @@ +// $Id$ + +#include "Batch_Buffering_Strategy.h" diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.cpp new file mode 100644 index 00000000000..9101583601c --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.cpp @@ -0,0 +1,107 @@ +// $Id$ + +#include "SequenceProxyPushConsumer.h" + +#if ! defined (__ACE_INLINE__) +#include "SequenceProxyPushConsumer.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Notify, TAO_NS_SequenceProxyPushConsumer, "$id$") + +#include "ace/Refcounted_Auto_Ptr.h" +#include "tao/debug.h" +#include "SequencePushSupplier.h" +#include "../Admin.h" +#include "../AdminProperties.h" +#include "../Structured/StructuredEvent.h" + +TAO_NS_SequenceProxyPushConsumer::TAO_NS_SequenceProxyPushConsumer (void) +:pacing_interval_ (CosNotification::PacingInterval) +{ +} + +TAO_NS_SequenceProxyPushConsumer::~TAO_NS_SequenceProxyPushConsumer () +{ +} + +void +TAO_NS_SequenceProxyPushConsumer::release (void) +{ + if (this->supplier_) + this->supplier_->release (); + + delete this; + //@@ inform factory +} + +void +TAO_NS_SequenceProxyPushConsumer::destroy (ACE_ENV_SINGLE_ARG_DECL) +{ + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, "In TAO_NS_SequenceProxyPushConsumer::destroy \n")); + + this->inherited::destroy (this ACE_ENV_ARG_PARAMETER); +} + +CosNotifyChannelAdmin::ProxyType +TAO_NS_SequenceProxyPushConsumer::MyType (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + return CosNotifyChannelAdmin::PUSH_SEQUENCE; +} + +void +TAO_NS_SequenceProxyPushConsumer::connect_sequence_push_supplier (CosNotifyComm::SequencePushSupplier_ptr push_supplier ACE_ENV_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + , CosEventChannelAdmin::AlreadyConnected + )) +{ + // Convert Supplier to Base Type + TAO_NS_SequencePushSupplier *supplier; + ACE_NEW_THROW_EX (supplier, + TAO_NS_SequencePushSupplier (this), + CORBA::NO_MEMORY ()); + + supplier->init (push_supplier ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + this->connect (supplier ACE_ENV_ARG_PARAMETER); +} + +void +TAO_NS_SequenceProxyPushConsumer::push_structured_events (const CosNotification::EventBatch& event_batch ACE_ENV_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + , CosEventComm::Disconnected + )) +{ + // Check if we should proceed at all. + if (this->admin_properties_->reject_new_events () == 1 && this->admin_properties_->queue_full ()) + ACE_THROW (CORBA::IMP_LIMIT ()); + + if (this->is_connected () == 0) + { + ACE_THROW (CosEventComm::Disconnected ()); + } + + for (CORBA::ULong i = 0; i < event_batch.length (); ++i) + { + const CosNotification::StructuredEvent& notification = event_batch[i]; + + TAO_NS_Event_var event (new TAO_NS_StructuredEvent (notification)); + + this->push (event); + } +} + +void +TAO_NS_SequenceProxyPushConsumer::disconnect_sequence_push_consumer (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + this->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); +} diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.h b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.h new file mode 100644 index 00000000000..647ce01b185 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.h @@ -0,0 +1,98 @@ +/* -*- C++ -*- */ +/** + * @file SequenceProxyPushConsumer.h + * + * $Id$ + * + * @author Pradeep Gore + * + * + */ + +#ifndef TAO_NS_SEQUENCEPROXYPUSHCONSUMER_H +#define TAO_NS_SEQUENCEPROXYPUSHCONSUMER_H +#include "ace/pre.h" + +#include "../notify_export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "orbsvcs/CosNotifyChannelAdminS.h" +#include "../ProxyConsumer_T.h" +#include "../Destroy_Callback.h" + +#if defined(_MSC_VER) +#if (_MSC_VER >= 1200) +#pragma warning(push) +#endif /* _MSC_VER >= 1200 */ +#pragma warning(disable:4250) +#endif /* _MSC_VER */ + +/** + * @class TAO_NS_SequenceProxyPushConsumer + * + * @brief + * + */ +class TAO_Notify_Export TAO_NS_SequenceProxyPushConsumer : public virtual TAO_NS_ProxyConsumer_T , public TAO_NS_Destroy_Callback +{ + friend class TAO_NS_Builder; +public: + /// Constuctor + TAO_NS_SequenceProxyPushConsumer (void); + + /// Destructor + ~TAO_NS_SequenceProxyPushConsumer (); + + /// TAO_NS_Destroy_Callback methods + virtual void release (void); + + /// Destroy this object. + virtual void destroy (ACE_ENV_SINGLE_ARG_DECL); + +protected: + ///= Data Members + TAO_NS_Property_Time pacing_interval_; + + ///= Protected Methods + + //= interface methods + virtual CosNotifyChannelAdmin::ProxyType MyType (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + )); + + virtual void connect_sequence_push_supplier (CosNotifyComm::SequencePushSupplier_ptr push_supplier + ACE_ENV_ARG_DECL + ) + ACE_THROW_SPEC (( + CORBA::SystemException, + CosEventChannelAdmin::AlreadyConnected + )); + + virtual void push_structured_events (const CosNotification::EventBatch & notifications + ACE_ENV_ARG_DECL + ) + ACE_THROW_SPEC (( + CORBA::SystemException, + CosEventComm::Disconnected + )); + + virtual void disconnect_sequence_push_consumer (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + )); +}; + +#if defined(_MSC_VER) && (_MSC_VER >= 1200) +#pragma warning(pop) +#endif /* _MSC_VER */ + +#if defined (__ACE_INLINE__) +#include "SequenceProxyPushConsumer.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* TAO_NS_SEQUENCEPROXYPUSHCONSUMER_H */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.inl b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.inl new file mode 100644 index 00000000000..5470ff89e39 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.inl @@ -0,0 +1,3 @@ +// $Id$ + +#include "SequenceProxyPushConsumer.h" diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.cpp b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.cpp new file mode 100644 index 00000000000..f92380f1260 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.cpp @@ -0,0 +1,84 @@ +// $Id$ + +#include "SequenceProxyPushSupplier.h" + +#if ! defined (__ACE_INLINE__) +#include "SequenceProxyPushSupplier.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Notify, TAO_NS_SequenceProxyPushSupplier, "$id$") + +#include "tao/debug.h" +#include "SequencePushConsumer.h" +#include "../Proxy.h" +#include "../Admin.h" +#include "../EventChannel.h" +#include "../EventChannelFactory.h" +#include "../Notify_Service.h" + + +TAO_NS_SequenceProxyPushSupplier::TAO_NS_SequenceProxyPushSupplier (void) +{ +} + +TAO_NS_SequenceProxyPushSupplier::~TAO_NS_SequenceProxyPushSupplier () +{ +} + +void +TAO_NS_SequenceProxyPushSupplier::destroy (ACE_ENV_SINGLE_ARG_DECL) +{ + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, "In TAO_NS_SequenceProxyPushConsumer::destroy \n")); + + this->inherited::destroy (this ACE_ENV_ARG_PARAMETER); +} + +void +TAO_NS_SequenceProxyPushSupplier::release (void) +{ + if (this->consumer_) + this->consumer_->release (); + + delete this; + //@@ inform factory +} + +void +TAO_NS_SequenceProxyPushSupplier::connect_sequence_push_consumer (CosNotifyComm::SequencePushConsumer_ptr push_consumer ACE_ENV_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + , CosEventChannelAdmin::AlreadyConnected + , CosEventChannelAdmin::TypeError + )) +{ + // Convert Consumer to Base Type + TAO_NS_SequencePushConsumer* consumer; + ACE_NEW_THROW_EX (consumer, + TAO_NS_SequencePushConsumer (this), + CORBA::NO_MEMORY ()); + + consumer->init (push_consumer, this->admin_properties_ ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + this->connect (consumer ACE_ENV_ARG_PARAMETER); +} + +void +TAO_NS_SequenceProxyPushSupplier::disconnect_sequence_push_supplier (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + )) + +{ + this->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); +} + +CosNotifyChannelAdmin::ProxyType +TAO_NS_SequenceProxyPushSupplier::MyType (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + return CosNotifyChannelAdmin::PUSH_SEQUENCE; +} diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.h b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.h new file mode 100644 index 00000000000..722aecf4b8f --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.h @@ -0,0 +1,94 @@ +/* -*- C++ -*- */ +/** + * @file SequenceProxyPushSupplier.h + * + * $Id$ + * + * @author Pradeep Gore + * + * + */ + +#ifndef TAO_NS_SEQUENCEPROXYPUSHSUPPLIER_H +#define TAO_NS_SEQUENCEPROXYPUSHSUPPLIER_H +#include "ace/pre.h" + +#include "../notify_export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "orbsvcs/CosNotifyChannelAdminS.h" +#include "../ProxySupplier_T.h" + +#if defined(_MSC_VER) +#if (_MSC_VER >= 1200) +#pragma warning(push) +#endif /* _MSC_VER >= 1200 */ +#pragma warning(disable:4250) +#endif /* _MSC_VER */ + +#if defined ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT +template class TAO_Notify_Export +TAO_NS_ProxySupplier_T; +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT */ + +/** + * @class TAO_NS_SequenceProxyPushSupplier + * + * @brief Implements the CosNotifyChannelAdmin::SequenceProxyPushSupplier methods. + * + * + */ +class TAO_Notify_Export TAO_NS_SequenceProxyPushSupplier : public virtual TAO_NS_ProxySupplier_T , public TAO_NS_Destroy_Callback +{ + friend class TAO_NS_Builder; + +public: + /// Constuctor + TAO_NS_SequenceProxyPushSupplier (void); + + /// Destructor + ~TAO_NS_SequenceProxyPushSupplier (); + + /// Destroy this object. + virtual void destroy (ACE_ENV_SINGLE_ARG_DECL); + + /// TAO_NS_Destroy_Callback methods + virtual void release (void); + + /// = Servant methods + virtual CosNotifyChannelAdmin::ProxyType MyType (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + )); + + virtual void connect_sequence_push_consumer ( + CosNotifyComm::SequencePushConsumer_ptr push_consumer + ACE_ENV_ARG_DECL + ) + ACE_THROW_SPEC (( + CORBA::SystemException, + CosEventChannelAdmin::AlreadyConnected, + CosEventChannelAdmin::TypeError + )); + + virtual void disconnect_sequence_push_supplier ( + ACE_ENV_SINGLE_ARG_DECL + ) + ACE_THROW_SPEC (( + CORBA::SystemException + )); +}; + +#if defined(_MSC_VER) && (_MSC_VER >= 1200) +#pragma warning(pop) +#endif /* _MSC_VER */ + +#if defined (__ACE_INLINE__) +#include "SequenceProxyPushSupplier.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* TAO_NS_SEQUENCEPROXYPUSHSUPPLIER_H */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.inl b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.inl new file mode 100644 index 00000000000..5f4ceeb6181 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.inl @@ -0,0 +1,3 @@ +// $Id$ + +#include "SequenceProxyPushSupplier.h" diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp new file mode 100644 index 00000000000..36abbc2ba37 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp @@ -0,0 +1,181 @@ +// $Id$ + +#include "SequencePushConsumer.h" + +#if ! defined (__ACE_INLINE__) +#include "SequencePushConsumer.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Notify, TAO_NS_SequencePushConsumer, "$id$") + +#include "ace/Reactor.h" +#include "tao/debug.h" +#include "../QoSProperties.h" +#include "../ProxySupplier.h" +#include "../Worker_Task.h" +#include "../Consumer.h" +#include "../Method_Request.h" +#include "../Timer.h" + +TAO_NS_SequencePushConsumer::TAO_NS_SequencePushConsumer (TAO_NS_ProxySupplier* proxy) + : TAO_NS_Consumer (proxy), pacing_interval_ (ACE_Time_Value::zero), timer_id_ (-1), buffering_strategy_ (0), + max_batch_size_ (CosNotification::MaximumBatchSize, 0), timer_ (0) +{ +} + +TAO_NS_SequencePushConsumer::~TAO_NS_SequencePushConsumer () +{ + delete this->buffering_strategy_; +} + +void +TAO_NS_SequencePushConsumer::init (CosNotifyComm::SequencePushConsumer_ptr push_consumer, TAO_NS_AdminProperties_var& admin_properties ACE_ENV_ARG_DECL) +{ + this->push_consumer_ = CosNotifyComm::SequencePushConsumer::_duplicate (push_consumer); + + this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (push_consumer); + + ACE_NEW_THROW_EX (this->buffering_strategy_, + TAO_NS_Batch_Buffering_Strategy (this->msg_queue_, admin_properties, + this->max_batch_size_.value ()), + CORBA::NO_MEMORY ()); + + this->timer_ = this->proxy ()->timer (); +} + +void +TAO_NS_SequencePushConsumer::shutdown (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) +{ + this->cancel_timer (); + this->timer_->_decr_refcnt (); +} + +void +TAO_NS_SequencePushConsumer::release (void) +{ + delete this; + //@@ inform factory +} + +void +TAO_NS_SequencePushConsumer::qos_changed (const TAO_NS_QoSProperties& qos_properties) +{ + this->max_batch_size_ = qos_properties.maximum_batch_size (); + + if (this->max_batch_size_.is_valid ()) + {// set the max batch size. + this->buffering_strategy_->batch_size (this->max_batch_size_.value ()); + } + + const TAO_NS_Property_Time &pacing_interval = qos_properties.pacing_interval (); + + if (pacing_interval.is_valid ()) + { + this->pacing_interval_ = +# if defined (ACE_CONFIG_WIN32_H) + ACE_Time_Value (ACE_static_cast (long, pacing_interval.value ())); +# else + ACE_Time_Value (pacing_interval.value () / 1); +# endif /* ACE_CONFIG_WIN32_H */ + } + + // Inform the buffering strategy of qos change. + this->buffering_strategy_->update_qos_properties (qos_properties); +} + +void +TAO_NS_SequencePushConsumer::schedule_timer (void) +{ + // Schedule the timer. + if (this->pacing_interval_ != ACE_Time_Value::zero) + { + this->timer_id_ = this->timer_->schedule_timer (this, this->pacing_interval_, 0); + + if (this->timer_id_ == -1) + this->pacing_interval_ = ACE_Time_Value::zero; // some error, revert to no pacing. + } +} + +void +TAO_NS_SequencePushConsumer::cancel_timer (void) +{ + timer_->cancel_timer (this->timer_id_); +} + +void +TAO_NS_SequencePushConsumer::push_i (const TAO_NS_Event_var& event ACE_ENV_ARG_DECL_NOT_USED) +{ + TAO_NS_Method_Request_Event* method_request = new TAO_NS_Method_Request_Event (event); + + int msg_count = this->buffering_strategy_->enqueue (*method_request); + + if (msg_count == -1) + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, "NS_Seq_Reactive_Task (%P|%t) - " + "failed to enqueue\n")); + return; + } + + if (this->pacing_interval_ == ACE_Time_Value::zero) + { + // If pacing is zero, there is no timer, hence dispatch immediately + this->handle_timeout (ACE_Time_Value::zero, 0); + } + else if (msg_count == 1) + this->schedule_timer (); +} + +void +TAO_NS_SequencePushConsumer::push (const CORBA::Any& /*event*/ ACE_ENV_ARG_DECL_NOT_USED) +{ + //NOP +} + +void +TAO_NS_SequencePushConsumer::push (const CosNotification::StructuredEvent& /*notification*/ ACE_ENV_ARG_DECL_NOT_USED) +{ + //NOP +} + +int +TAO_NS_SequencePushConsumer::handle_timeout (const ACE_Time_Value& /*current_time*/, + const void* /*act*/) +{ + CosNotification::EventBatch event_batch; + + int pending = 0; + + int deq_count = this->buffering_strategy_->dequeue_available (event_batch, pending); + + if (deq_count > 0) + { + TAO_NS_Refcountable_Guard ref_guard(*this->proxy ()); // Protect this object from being destroyed in this scope. + + this->push (event_batch); + + if (pending) + this->schedule_timer (); + } + + return 0; +} + +void +TAO_NS_SequencePushConsumer::push (const CosNotification::EventBatch& event_batch) +{ + ACE_TRY_NEW_ENV + { + this->push_consumer_->push_structured_events (event_batch ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + this->handle_dispatch_exception (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // we're scheduled to be destroyed. don't set the timer. + this->pacing_interval_ = ACE_Time_Value::zero; + } + ACE_ENDTRY; +} diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h new file mode 100644 index 00000000000..cc7f689b6ac --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h @@ -0,0 +1,114 @@ +/* -*- C++ -*- */ +/** + * @file SequencePushConsumer.h + * + * $Id$ + * + * @author Pradeep Gore + * + * + */ + +#ifndef TAO_NS_SEQUENCEPUSHCONSUMER_H +#define TAO_NS_SEQUENCEPUSHCONSUMER_H +#include "ace/pre.h" + +#include "../notify_export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/Event_Handler.h" +#include "../Event.h" +#include "../Property.h" +#include "../Property_T.h" +#include "../Consumer.h" +#include "../AdminProperties.h" +#include "Batch_Buffering_Strategy.h" + +class TAO_NS_ProxySupplier; +class TAO_NS_QoSProperties; +class TAO_NS_Timer; + +/** + * @class TAO_NS_SequencePushConsumer + * + * @brief + * + */ +class TAO_Notify_Export TAO_NS_SequencePushConsumer : public ACE_Event_Handler, public TAO_NS_Consumer +{ +public: + /// Constuctor + TAO_NS_SequencePushConsumer (TAO_NS_ProxySupplier* proxy); + + /// Destructor + ~TAO_NS_SequencePushConsumer (); + + /// Init the Consumer + void init (CosNotifyComm::SequencePushConsumer_ptr push_consumer, TAO_NS_AdminProperties_var& admin_properties ACE_ENV_ARG_DECL); + + /// Shutdown the consumer + virtual void shutdown (ACE_ENV_SINGLE_ARG_DECL); + + /// TAO_NS_Destroy_Callback methods. + virtual void release (void); + + /// Push to this consumer. + void push_i (const TAO_NS_Event_var& event ACE_ENV_ARG_DECL); + + /// Push to this consumer. + virtual void push (const CORBA::Any& event ACE_ENV_ARG_DECL); + + // Push event. + virtual void push (const CosNotification::StructuredEvent & event ACE_ENV_ARG_DECL); + + /// Push to this consumer. + virtual void push (const CosNotification::EventBatch& event); + + /// Override, Peer::qos_changed + virtual void qos_changed (const TAO_NS_QoSProperties& qos_properties); + +protected: + /// When the pacing interval is used, handle_timeout () is called by + /// the reactor. + virtual int handle_timeout (const ACE_Time_Value& current_time, + const void* act = 0); + + /// Schedule timer + void schedule_timer (void); + + /// Cancel timer + void cancel_timer (void); + + ///= Protected Data Members + + /// The Pacing Interval + ACE_Time_Value pacing_interval_; + + /// Timer Id. + long timer_id_; + + /// The Consumer + CosNotifyComm::SequencePushConsumer_var push_consumer_; + + /// The Message queue. + TAO_NS_Message_Queue msg_queue_; + + /// The Buffering Strategy + TAO_NS_Batch_Buffering_Strategy* buffering_strategy_; + + /// Max. batch size. + TAO_NS_Property_Long max_batch_size_; + + /// The Timer Manager that we use. + TAO_NS_Timer* timer_; +}; + +#if defined (__ACE_INLINE__) +#include "SequencePushConsumer.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* TAO_NS_SEQUENCEPUSHCONSUMER_H */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.inl b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.inl new file mode 100644 index 00000000000..d050292323e --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.inl @@ -0,0 +1,3 @@ +// $Id$ + +#include "SequencePushConsumer.h" diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.cpp b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.cpp new file mode 100644 index 00000000000..94447663e55 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.cpp @@ -0,0 +1,33 @@ +// $Id$ + +#include "SequencePushSupplier.h" + +#if ! defined (__ACE_INLINE__) +#include "SequencePushSupplier.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Notify, TAO_NS_SequencePushSupplier, "$id$") + +TAO_NS_SequencePushSupplier::TAO_NS_SequencePushSupplier (TAO_NS_ProxyConsumer* proxy) + :TAO_NS_Supplier (proxy) +{ +} + +TAO_NS_SequencePushSupplier::~TAO_NS_SequencePushSupplier () +{ +} + +void +TAO_NS_SequencePushSupplier::init (CosNotifyComm::SequencePushSupplier_ptr push_supplier ACE_ENV_ARG_DECL_NOT_USED) +{ + this->push_supplier_ = CosNotifyComm::SequencePushSupplier::_duplicate (push_supplier); + + this->subscribe_ = CosNotifyComm::NotifySubscribe::_duplicate (push_supplier); +} + +void +TAO_NS_SequencePushSupplier::release (void) +{ + delete this; + //@@ inform factory +} diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.h b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.h new file mode 100644 index 00000000000..b83256362da --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.h @@ -0,0 +1,59 @@ +/* -*- C++ -*- */ +/** + * @file SequencePushSupplier.h + * + * $Id$ + * + * @author Pradeep Gore + * + * + */ + +#ifndef TAO_NS_SEQUENCEPUSHSUPPLIER_H +#define TAO_NS_SEQUENCEPUSHSUPPLIER_H +#include "ace/pre.h" + +#include "../notify_export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "orbsvcs/CosNotifyCommC.h" +#include "../Supplier.h" +#include "../Destroy_Callback.h" + +class TAO_NS_ProxyConsumer; + +/** + * @class TAO_NS_SequencePushSupplier + * + * @brief Wrapper for the SequencePushSupplier that connect to the EventChannel. + * + */ +class TAO_Notify_Export TAO_NS_SequencePushSupplier : public TAO_NS_Supplier +{ +public: + /// Constuctor + TAO_NS_SequencePushSupplier (TAO_NS_ProxyConsumer* proxy); + + /// Destructor + ~TAO_NS_SequencePushSupplier (); + + /// Init + void init (CosNotifyComm::SequencePushSupplier_ptr push_supplier ACE_ENV_ARG_DECL); + + /// TAO_NS_Destroy_Callback methods + virtual void release (void); + +protected: + /// The Supplier + CosNotifyComm::SequencePushSupplier_var push_supplier_; +}; + +#if defined (__ACE_INLINE__) +#include "SequencePushSupplier.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* TAO_NS_SEQUENCEPUSHSUPPLIER_H */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.inl b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.inl new file mode 100644 index 00000000000..507cd0b85c6 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.inl @@ -0,0 +1,3 @@ +// $Id$ + +#include "SequencePushSupplier.h" diff --git a/TAO/orbsvcs/orbsvcs/Notify/Timer.h b/TAO/orbsvcs/orbsvcs/Notify/Timer.h new file mode 100644 index 00000000000..84423f943d6 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Timer.h @@ -0,0 +1,49 @@ +/* -*- C++ -*- */ +/** + * @file Timer.h + * + * $Id$ + * + * @author Pradeep Gore + * + * + */ + +#ifndef TAO_NS_TIMER_H +#define TAO_NS_TIMER_H +#include "ace/pre.h" + +#include "notify_export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/Time_Value.h" +#include "Refcountable.h" + +class ACE_Event_Handler; + +/** + * @class TAO_NS_Timer + * + * @brief Interface for scheduling timers. + * + */ +class TAO_Notify_Export TAO_NS_Timer : public TAO_NS_Refcountable +{ +public: + /// Destructor + virtual ~TAO_NS_Timer (){}; + + /// Schedule a timer + virtual long schedule_timer (ACE_Event_Handler *handler, + const ACE_Time_Value &delay_time, + const ACE_Time_Value &interval) = 0; + + /// Cancel Timer + virtual int cancel_timer (long timer_id) = 0; +}; + +#include "ace/post.h" +#endif /* TAO_NS_TIMER_H */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Timer_Queue.cpp b/TAO/orbsvcs/orbsvcs/Notify/Timer_Queue.cpp new file mode 100644 index 00000000000..b7d4bbcd16e --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Timer_Queue.cpp @@ -0,0 +1,49 @@ +// $Id$ + +#include "Timer_Queue.h" + +#if ! defined (__ACE_INLINE__) +#include "Timer_Queue.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Notify, TAO_NS_Timer_Queue, "$id$") + + +TAO_NS_Timer_Queue::TAO_NS_Timer_Queue (void) +{ + this->destroy_callback (this); +} + +TAO_NS_Timer_Queue::~TAO_NS_Timer_Queue () +{ +} + +void +TAO_NS_Timer_Queue::release (void) +{ + delete this; + //@@ inform factory +} + +long +TAO_NS_Timer_Queue::schedule_timer (ACE_Event_Handler *handler, + const ACE_Time_Value &delay_time, + const ACE_Time_Value &interval) +{ + return this->timer_queue_.schedule (handler, + 0, + timer_queue_.gettimeofday () + delay_time, + interval); +} + +int +TAO_NS_Timer_Queue::cancel_timer (long timer_id) +{ + return this->timer_queue_.cancel (timer_id); +} + +ACE_Timer_Queue& +TAO_NS_Timer_Queue::impl (void) +{ + return this->timer_queue_; +} diff --git a/TAO/orbsvcs/orbsvcs/Notify/Timer_Queue.h b/TAO/orbsvcs/orbsvcs/Notify/Timer_Queue.h new file mode 100644 index 00000000000..3e8bbdb30f3 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Timer_Queue.h @@ -0,0 +1,68 @@ +/* -*- C++ -*- */ +/** + * @file Timer_Queue.h + * + * $Id$ + * + * @author Pradeep Gore + * + * + */ + +#ifndef TAO_NS_TIMER_QUEUE_H +#define TAO_NS_TIMER_QUEUE_H +#include "ace/pre.h" + +#include "notify_export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "Timer.h" +#include "Destroy_Callback.h" + +#include "ace/Timer_Queue.h" +#include "ace/Timer_Heap.h" + +/** + * @class TAO_NS_Timer_Queue + * + * @brief ACE_Timer_Queue based timer. + * + */ +class TAO_Notify_Export TAO_NS_Timer_Queue : public TAO_NS_Timer + , public TAO_NS_Destroy_Callback +{ +public: + /// Constuctor + TAO_NS_Timer_Queue (void); + + /// Destructor + virtual ~TAO_NS_Timer_Queue (); + + /// TAO_NS_Destroy_Callback methods + virtual void release (void); + + /// Schedule a timer + virtual long schedule_timer (ACE_Event_Handler *handler, + const ACE_Time_Value &delay_time, + const ACE_Time_Value &interval); + + /// Cancel Timer + virtual int cancel_timer (long timer_id); + + /// Get the native impl. + ACE_Timer_Queue& impl (void); + +protected: + /// The Timer Queue + ACE_Timer_Heap timer_queue_; +}; + +#if defined (__ACE_INLINE__) +#include "Timer_Queue.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* TAO_NS_TIMER_QUEUE_H */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Timer_Queue.inl b/TAO/orbsvcs/orbsvcs/Notify/Timer_Queue.inl new file mode 100644 index 00000000000..eeaf18f3e5f --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Timer_Queue.inl @@ -0,0 +1,3 @@ +// $Id$ + +#include "Timer_Queue.h" diff --git a/TAO/orbsvcs/orbsvcs/Notify/Timer_Reactor.cpp b/TAO/orbsvcs/orbsvcs/Notify/Timer_Reactor.cpp new file mode 100644 index 00000000000..6e57a1cdd62 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Timer_Reactor.cpp @@ -0,0 +1,49 @@ +// $Id$ + +#include "Timer_Reactor.h" + +#if ! defined (__ACE_INLINE__) +#include "Timer_Reactor.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Notify, TAO_NS_Timer_Reactor, "$id$") + +#include "ace/Reactor.h" +#include "tao/ORB_Core.h" +#include "Properties.h" + +TAO_NS_Timer_Reactor::TAO_NS_Timer_Reactor (void) + :reactor_ (0) +{ + this->destroy_callback (this); + + // Get the ORB + CORBA::ORB_var orb = TAO_NS_PROPERTIES::instance()->orb (); + + this->reactor_ = orb->orb_core ()->reactor (); +} + +TAO_NS_Timer_Reactor::~TAO_NS_Timer_Reactor () +{ +} + +void +TAO_NS_Timer_Reactor::release (void) +{ + delete this; + //@@ inform factory +} + +long +TAO_NS_Timer_Reactor::schedule_timer (ACE_Event_Handler *handler, + const ACE_Time_Value &delay_time, + const ACE_Time_Value &interval) +{ + return this->reactor_->schedule_timer (handler, 0, delay_time, interval); +} + +int +TAO_NS_Timer_Reactor::cancel_timer (long timer_id) +{ + return this->reactor_->cancel_timer (timer_id); +} diff --git a/TAO/orbsvcs/orbsvcs/Notify/Timer_Reactor.h b/TAO/orbsvcs/orbsvcs/Notify/Timer_Reactor.h new file mode 100644 index 00000000000..eb0cb6e40d2 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Timer_Reactor.h @@ -0,0 +1,64 @@ +/* -*- C++ -*- */ +/** + * @file Timer_Reactor.h + * + * $Id$ + * + * @author Pradeep Gore + * + * + */ + +#ifndef TAO_NS_TIMER_REACTOR_H +#define TAO_NS_TIMER_REACTOR_H +#include "ace/pre.h" + +#include "notify_export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "Timer.h" +#include "Destroy_Callback.h" + +class ACE_Reactor; + +/** + * @class TAO_NS_Timer_Reactor + * + * @brief Reactor::instance based timer. The timers are dispatched by the main thread. + * + */ +class TAO_Notify_Export TAO_NS_Timer_Reactor : public TAO_NS_Timer + , public TAO_NS_Destroy_Callback +{ +public: + /// Constuctor + TAO_NS_Timer_Reactor (void); + + /// Destructor + virtual ~TAO_NS_Timer_Reactor (); + + /// TAO_NS_Destroy_Callback methods + virtual void release (void); + + /// Schedule a timer + virtual long schedule_timer (ACE_Event_Handler *handler, + const ACE_Time_Value &delay_time, + const ACE_Time_Value &interval); + + /// Cancel Timer + virtual int cancel_timer (long timer_id); + +protected: + /// The instance reactor that we use. + ACE_Reactor* reactor_; +}; + +#if defined (__ACE_INLINE__) +#include "Timer_Reactor.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* TAO_NS_TIMER_REACTOR_H */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Timer_Reactor.inl b/TAO/orbsvcs/orbsvcs/Notify/Timer_Reactor.inl new file mode 100644 index 00000000000..ba9c8b4cae1 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Timer_Reactor.inl @@ -0,0 +1,3 @@ +// $Id$ + +#include "Timer_Reactor.h" -- cgit v1.2.1