summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authornobody <nobody@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2003-02-06 23:41:28 +0000
committernobody <nobody@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2003-02-06 23:41:28 +0000
commitaeb6a3d807bb99f4fd16833d9b1999fe62195466 (patch)
treecdcc50345f1515cbbd2b21004b66873dafbcc7f5
parentfb0732aab8a524962bbcd58351c020169eb57f41 (diff)
downloadATCD-aeb6a3d807bb99f4fd16833d9b1999fe62195466.tar.gz
This commit was manufactured by cvs2svn to create branch 'RT_Notify'.
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/AdminProperties.cpp70
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/AdminProperties.h115
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/AdminProperties.inl55
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.cpp87
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.h80
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.inl3
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.cpp98
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.h88
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.inl3
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.cpp74
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.h87
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.inl3
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.cpp110
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.h94
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.inl3
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushSupplier.cpp84
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushSupplier.h93
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushSupplier.inl3
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp68
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.h66
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.inl3
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Any/PushSupplier.cpp42
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Any/PushSupplier.h60
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Any/PushSupplier.inl3
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp271
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.h137
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.inl19
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/CosNotify_Initializer.cpp13
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/CosNotify_Initializer.h38
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.cpp55
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.h64
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.inl3
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Property.h39
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/PropertySeq.cpp53
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/PropertySeq.h62
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/PropertySeq.inl7
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Property_Boolean.cpp43
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Property_Boolean.h81
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Property_Boolean.inl49
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Property_T.cpp119
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Property_T.h161
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Property_T.inl78
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.cpp80
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.h84
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.inl1
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.cpp222
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.h162
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.inl1
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Proxy_T.cpp135
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Proxy_T.h158
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Proxy_T.inl1
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/QoSProperties.cpp126
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/QoSProperties.h99
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/QoSProperties.inl25
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.cpp92
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.h60
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.inl3
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.cpp107
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.h98
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.inl3
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.cpp84
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.h94
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.inl3
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp181
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h114
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.inl3
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.cpp33
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.h59
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.inl3
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Timer.h49
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Timer_Queue.cpp49
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Timer_Queue.h68
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Timer_Queue.inl3
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Timer_Reactor.cpp49
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Timer_Reactor.h64
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Timer_Reactor.inl3
76 files changed, 5000 insertions, 0 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Notify/AdminProperties.cpp b/TAO/orbsvcs/orbsvcs/Notify/AdminProperties.cpp
new file mode 100644
index 00000000000..46b3d3c0ec2
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/AdminProperties.cpp
@@ -0,0 +1,70 @@
+// $Id$
+
+#include "AdminProperties.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "AdminProperties.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(Notify, TAO_NS_AdminProperties, "$id$")
+
+#include "orbsvcs/CosNotificationC.h"
+
+TAO_NS_AdminProperties::TAO_NS_AdminProperties (void)
+ : max_global_queue_length_ (CosNotification::MaxQueueLength, 0),
+ max_consumers_ (CosNotification::MaxConsumers, 0),
+ max_suppliers_ (CosNotification::MaxSuppliers, 0),
+ reject_new_events_ (CosNotification::RejectNewEvents, 0),
+ global_queue_length_ (0),
+ global_queue_not_full_condition_ (global_queue_lock_)
+{
+}
+
+TAO_NS_AdminProperties::~TAO_NS_AdminProperties ()
+{
+}
+
+int
+TAO_NS_AdminProperties::init (const CosNotification::PropertySeq& prop_seq)
+{
+ if (TAO_NS_PropertySeq::init (prop_seq) != 0)
+ return -1;
+
+ this->max_global_queue_length_.set (*this);
+ this->max_consumers_.set (*this);
+ this->max_suppliers_.set (*this);
+ this->reject_new_events_.set (*this);
+
+ //@@ check if unsupported property was set.
+ // This will happen when number of successfull inits != numbers of items bound in map_.
+
+ return 0;
+}
+
+CORBA::Boolean
+TAO_NS_AdminProperties::queue_full (void)
+{
+ ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, 1);
+
+ if (this->max_global_queue_length () == 0)
+ return 0;
+ else
+ if (this->global_queue_length_ > this->max_global_queue_length ().value ())
+ return 1;
+
+ return 0;
+}
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+
+template class ACE_Atomic_Op<TAO_SYNCH_MUTEX,int>;
+template class ACE_Atomic_Op_Ex<TAO_SYNCH_MUTEX,int>;
+template class ACE_Refcounted_Auto_Ptr<TAO_NS_AdminProperties, TAO_SYNCH_MUTEX>;
+
+#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+
+#pragma instantiate ACE_Atomic_Op<TAO_SYNCH_MUTEX,int>
+#pragma instantiate ACE_Atomic_Op_Ex<TAO_SYNCH_MUTEX,int>
+#pragma ACE_Refcounted_Auto_Ptr<TAO_NS_AdminProperties, TAO_SYNCH_MUTEX>
+
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/AdminProperties.h b/TAO/orbsvcs/orbsvcs/Notify/AdminProperties.h
new file mode 100644
index 00000000000..fe14835ab24
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/AdminProperties.h
@@ -0,0 +1,115 @@
+/* -*- C++ -*- */
+/**
+ * @file AdminProperties.h
+ *
+ * $Id$
+ *
+ * @author Pradeep Gore <pradeep@oomworks.com>
+ *
+ *
+ */
+
+#ifndef TAO_NS_ADMINPROPERTIES_H
+#define TAO_NS_ADMINPROPERTIES_H
+#include "ace/pre.h"
+
+#include "notify_export.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "ace/Atomic_Op.h"
+#include "ace/Refcounted_Auto_Ptr.h"
+#include "tao/orbconf.h"
+#include "Types.h"
+#include "PropertySeq.h"
+#include "Property_T.h"
+#include "Property.h"
+#include "Property_Boolean.h"
+
+/**
+ * @class TAO_NS_AdminProperties
+ *
+ * @brief The AdminProperties per EventChannel.
+ *
+ */
+class TAO_Notify_Export TAO_NS_AdminProperties : public TAO_NS_PropertySeq
+{
+public:
+ /// Constuctor
+ TAO_NS_AdminProperties (void);
+
+ /// Destructor
+ ~TAO_NS_AdminProperties ();
+
+ // Init
+ int init (const CosNotification::PropertySeq& prop_seq);
+
+ // = Accessors
+ const TAO_NS_Property_Long& max_global_queue_length (void) const;
+ const TAO_NS_Property_Long& max_consumers (void) const;
+ const TAO_NS_Property_Long& max_suppliers (void) const;
+ const TAO_NS_Property_Boolean& reject_new_events (void) const;
+
+ CORBA::Long& global_queue_length (void);
+ TAO_SYNCH_MUTEX& global_queue_lock (void);
+ TAO_SYNCH_CONDITION& global_queue_not_full_condition (void);
+
+ TAO_NS_Atomic_Property_Long& consumers (void);
+ TAO_NS_Atomic_Property_Long& suppliers (void);
+
+ // = Helper method
+ /// Returns true if Queue is full
+ CORBA::Boolean queue_full (void);
+
+protected:
+ // @@ Pradeep can you explain why there is any maximum for these
+ // values? Should they be configurable by the user so the resource
+ // requirements can be bounded?
+
+ // = Admin. properties
+ // for all these properties the default O implies no limit
+ /**
+ * The maximum number of events that will be queued by the channel before
+ * the channel begins discarding events or rejecting new events upon
+ * receipt of each new event.
+ */
+ TAO_NS_Property_Long max_global_queue_length_;
+
+ /// The maximum number of consumers that can be connected to the channel at
+ /// any given time.
+ TAO_NS_Property_Long max_consumers_;
+
+ /// The maximum number of suppliers that can be connected to the channel at
+ /// any given time.
+ TAO_NS_Property_Long max_suppliers_;
+
+ /// Reject any new event.
+ TAO_NS_Property_Boolean reject_new_events_;
+
+ //= Variables
+ /// This is used to count the queue length across all buffers in the Notify Service
+ /// to enforce the "MaxQueueLength" property.
+ CORBA::Long global_queue_length_;
+
+ /// Global queue lock used to serialize access to all queues.
+ TAO_SYNCH_MUTEX global_queue_lock_;
+
+ /// The condition that the queue_length_ is not at max.
+ TAO_SYNCH_CONDITION global_queue_not_full_condition_;
+
+ /// These are used to count the number of consumers and suppliers connected to
+ /// the system.
+ TAO_NS_Atomic_Property_Long consumers_;
+ TAO_NS_Atomic_Property_Long suppliers_;
+};
+
+typedef ACE_Refcounted_Auto_Ptr<TAO_NS_AdminProperties, TAO_SYNCH_MUTEX> TAO_NS_AdminProperties_var;
+
+#if defined (__ACE_INLINE__)
+#include "AdminProperties.inl"
+#endif /* __ACE_INLINE__ */
+
+#include "ace/post.h"
+#endif /* TAO_NS_ADMINPROPERTIES_H */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/AdminProperties.inl b/TAO/orbsvcs/orbsvcs/Notify/AdminProperties.inl
new file mode 100644
index 00000000000..4abdd9b071a
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/AdminProperties.inl
@@ -0,0 +1,55 @@
+// $Id$
+
+ACE_INLINE const TAO_NS_Property_Long&
+TAO_NS_AdminProperties::max_global_queue_length (void) const
+{
+ return this->max_global_queue_length_;
+}
+
+ACE_INLINE const TAO_NS_Property_Long&
+TAO_NS_AdminProperties::max_consumers (void) const
+{
+ return this->max_consumers_;
+}
+
+ACE_INLINE const TAO_NS_Property_Long&
+TAO_NS_AdminProperties::max_suppliers (void) const
+{
+ return this->max_suppliers_;
+}
+
+ACE_INLINE const TAO_NS_Property_Boolean&
+TAO_NS_AdminProperties::reject_new_events (void) const
+{
+ return this->reject_new_events_;
+}
+
+ACE_INLINE CORBA::Long&
+TAO_NS_AdminProperties::global_queue_length (void)
+{
+ return this->global_queue_length_;
+}
+
+ACE_INLINE TAO_SYNCH_MUTEX&
+TAO_NS_AdminProperties::global_queue_lock (void)
+{
+ return this->global_queue_lock_;
+}
+
+ACE_INLINE TAO_SYNCH_CONDITION&
+TAO_NS_AdminProperties::global_queue_not_full_condition (void)
+{
+ return this->global_queue_not_full_condition_;
+}
+
+ACE_INLINE TAO_NS_Atomic_Property_Long&
+TAO_NS_AdminProperties::consumers (void)
+{
+ return this->consumers_;
+}
+
+ACE_INLINE TAO_NS_Atomic_Property_Long&
+TAO_NS_AdminProperties::suppliers (void)
+{
+ return this->suppliers_;
+}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.cpp b/TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.cpp
new file mode 100644
index 00000000000..4eaaa2555b9
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.cpp
@@ -0,0 +1,87 @@
+// $Id$
+
+#include "AnyEvent.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "AnyEvent.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(Notify, TAO_NS_AnyEvent, "$id$")
+
+#include "../Consumer.h"
+#include "tao/debug.h"
+
+TAO_NS_EventType TAO_NS_AnyEvent::event_type_;
+
+TAO_NS_AnyEvent::TAO_NS_AnyEvent (const CORBA::Any &event)
+ : event_ (event)
+{
+}
+
+TAO_NS_AnyEvent::~TAO_NS_AnyEvent ()
+{
+}
+
+const TAO_NS_EventType&
+TAO_NS_AnyEvent::type (void) const
+{
+ return this->event_type_;
+}
+
+void
+TAO_NS_AnyEvent::convert (CosNotification::StructuredEvent& notification)
+{
+ TAO_NS_Event::translate (this->event_, notification);
+}
+
+CORBA::Boolean
+TAO_NS_AnyEvent::do_match (CosNotifyFilter::Filter_ptr filter ACE_ENV_ARG_DECL)
+{
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - "
+ "TAO_Notify_AnyEvent::do_match ()\n"));
+
+ return filter->match(this->event_ ACE_ENV_ARG_PARAMETER);
+}
+
+void
+TAO_NS_AnyEvent::push (TAO_NS_Consumer* consumer ACE_ENV_ARG_DECL) const
+{
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - "
+ "TAO_Notify_AnyEvent::push \n"));
+
+ consumer->push (this->event_ ACE_ENV_ARG_PARAMETER);
+}
+
+void
+TAO_NS_AnyEvent::push (Event_Forwarder::StructuredProxyPushSupplier_ptr forwarder ACE_ENV_ARG_DECL)
+{
+ CosNotification::StructuredEvent notification;
+
+ TAO_NS_Event::translate (this->event_, notification);
+
+ forwarder->forward_structured (notification ACE_ENV_ARG_PARAMETER);
+}
+
+void
+TAO_NS_AnyEvent::push_no_filtering (Event_Forwarder::StructuredProxyPushSupplier_ptr forwarder ACE_ENV_ARG_DECL)
+{
+ CosNotification::StructuredEvent notification;
+
+ TAO_NS_Event::translate (this->event_, notification);
+
+ forwarder->forward_structured_no_filtering (notification ACE_ENV_ARG_PARAMETER);
+}
+
+void
+TAO_NS_AnyEvent::push (Event_Forwarder::ProxyPushSupplier_ptr forwarder ACE_ENV_ARG_DECL)
+{
+ forwarder->forward_any (this->event_ ACE_ENV_ARG_PARAMETER);
+}
+
+void
+TAO_NS_AnyEvent::push_no_filtering (Event_Forwarder::ProxyPushSupplier_ptr forwarder ACE_ENV_ARG_DECL)
+{
+ forwarder->forward_any_no_filtering (this->event_ ACE_ENV_ARG_PARAMETER);
+}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.h b/TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.h
new file mode 100644
index 00000000000..d77191e05b1
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.h
@@ -0,0 +1,80 @@
+/* -*- C++ -*- */
+/**
+ * @file AnyEvent.h
+ *
+ * $Id$
+ *
+ * @author Pradeep Gore <pradeep@oomworks.com>
+ *
+ *
+ */
+
+#ifndef TAO_NS_ANYEVENT_H
+#define TAO_NS_ANYEVENT_H
+#include "ace/pre.h"
+
+#include "../notify_export.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "../Event.h"
+#include "../EventType.h"
+#include "orbsvcs/CosNotificationC.h"
+
+class TAO_NS_Consumer;
+
+/**
+ * @class TAO_NS_AnyEvent
+ *
+ * @brief
+ *
+ */
+class TAO_Notify_Export TAO_NS_AnyEvent : public TAO_NS_Event
+{
+ friend class TAO_NS_Builder;
+public:
+ /// Constuctor
+ TAO_NS_AnyEvent (const CORBA::Any &event);
+
+ /// Destructor
+ ~TAO_NS_AnyEvent ();
+
+ /// Get the event type.
+ virtual const TAO_NS_EventType& type (void) const;
+
+ CORBA::Boolean do_match (CosNotifyFilter::Filter_ptr filter ACE_ENV_ARG_DECL);
+
+ /// Convert to CosNotification::Structured type
+ virtual void convert (CosNotification::StructuredEvent& notification);
+
+ /// Push event to consumer
+ virtual void push (TAO_NS_Consumer* consumer ACE_ENV_ARG_DECL) const;
+
+ /// Push event to the Event_Forwarder interface
+ virtual void push (Event_Forwarder::StructuredProxyPushSupplier_ptr forwarder ACE_ENV_ARG_DECL);
+
+ /// Push event to the Event_Forwarder interface
+ virtual void push_no_filtering (Event_Forwarder::StructuredProxyPushSupplier_ptr forwarder ACE_ENV_ARG_DECL);
+
+ /// Push event to the Event_Forwarder interface
+ virtual void push (Event_Forwarder::ProxyPushSupplier_ptr forwarder ACE_ENV_ARG_DECL);
+
+ /// Push event to the Event_Forwarder interface
+ virtual void push_no_filtering (Event_Forwarder::ProxyPushSupplier_ptr forwarder ACE_ENV_ARG_DECL);
+
+protected:
+ /// Any Event
+ CORBA::Any event_;
+
+ /// Our event type.
+ static TAO_NS_EventType event_type_;
+};
+
+#if defined (__ACE_INLINE__)
+#include "AnyEvent.inl"
+#endif /* __ACE_INLINE__ */
+
+#include "ace/post.h"
+#endif /* TAO_NS_ANYEVENT_H */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.inl b/TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.inl
new file mode 100644
index 00000000000..4920e03d422
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Any/AnyEvent.inl
@@ -0,0 +1,3 @@
+// $Id$
+
+#include "AnyEvent.h"
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.cpp
new file mode 100644
index 00000000000..a8f0fa35236
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.cpp
@@ -0,0 +1,98 @@
+// $Id$
+
+#include "CosEC_ProxyPushConsumer.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "CosEC_ProxyPushConsumer.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(Notify, TAO_NS_CosEC_ProxyPushConsumer, "$id$")
+
+#include "ace/Refcounted_Auto_Ptr.h"
+#include "tao/debug.h"
+#include "../Admin.h"
+#include "../AdminProperties.h"
+#include "AnyEvent.h"
+#include "PushSupplier.h"
+
+TAO_NS_CosEC_ProxyPushConsumer::TAO_NS_CosEC_ProxyPushConsumer (void)
+{
+}
+
+TAO_NS_CosEC_ProxyPushConsumer::~TAO_NS_CosEC_ProxyPushConsumer ()
+{
+}
+
+void
+TAO_NS_CosEC_ProxyPushConsumer::release (void)
+{
+ delete this;
+ //@@ inform factory
+}
+
+void
+TAO_NS_CosEC_ProxyPushConsumer::destroy (ACE_ENV_SINGLE_ARG_DECL)
+{
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "In TAO_NS_CosEC_ProxyPushConsumer::destroy \n"));
+
+ this->inherited::destroy (this ACE_ENV_ARG_PARAMETER);
+}
+
+void
+TAO_NS_CosEC_ProxyPushConsumer::push (TAO_NS_Event_var &/*event*/)
+{
+ // This should never be called.
+ ACE_ASSERT (1);
+}
+
+void
+TAO_NS_CosEC_ProxyPushConsumer::push (const CORBA::Any& data ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ , CosEventComm::Disconnected
+ ))
+{
+ // Check if we should proceed at all.
+ if (this->admin_properties_->reject_new_events () == 1
+ && this->admin_properties_->queue_full ())
+ ACE_THROW (CORBA::IMP_LIMIT ());
+
+ if (this->is_connected () == 0)
+ {
+ ACE_THROW (CosEventComm::Disconnected ());
+ }
+
+ // Convert
+ TAO_NS_Event_var event (new TAO_NS_AnyEvent (data));
+
+ // Continue processing.
+ this->TAO_NS_ProxyConsumer::push (event);
+}
+
+void
+TAO_NS_CosEC_ProxyPushConsumer::connect_push_supplier (CosEventComm::PushSupplier_ptr push_supplier ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ , CosEventChannelAdmin::AlreadyConnected
+ ))
+{
+ // Convert Supplier to Base Type
+ TAO_NS_PushSupplier *supplier;
+ ACE_NEW_THROW_EX (supplier,
+ TAO_NS_PushSupplier (this),
+ CORBA::NO_MEMORY ());
+
+ supplier->init (push_supplier ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ this->connect (supplier ACE_ENV_ARG_PARAMETER);
+}
+
+void TAO_NS_CosEC_ProxyPushConsumer::disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ))
+{
+ this->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
+}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.h b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.h
new file mode 100644
index 00000000000..780eb08aaa2
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.h
@@ -0,0 +1,88 @@
+/* -*- C++ -*- */
+/**
+ * @file CosEC_ProxyPushConsumer.h
+ *
+ * $Id$
+ *
+ * @author Pradeep Gore <pradeep@oomworks.com>
+ *
+ *
+ */
+
+#ifndef TAO_NS_COSEC_PROXYPUSHCONSUMER_H
+#define TAO_NS_COSEC_PROXYPUSHCONSUMER_H
+#include "ace/pre.h"
+
+#include "../notify_export.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "orbsvcs/CosEventChannelAdminS.h"
+#include "../ProxyConsumer_T.h"
+#include "../Destroy_Callback.h"
+
+#if defined(_MSC_VER)
+#if (_MSC_VER >= 1200)
+#pragma warning(push)
+#endif /* _MSC_VER >= 1200 */
+#pragma warning(disable:4250)
+#endif /* _MSC_VER */
+
+/**
+ * @class TAO_NS_CosEC_ProxyPushConsumer
+ *
+ * @brief
+ *
+ */
+class TAO_Notify_Export TAO_NS_CosEC_ProxyPushConsumer : public virtual TAO_NS_ProxyConsumer_T <POA_CosEventChannelAdmin::ProxyPushConsumer>, public TAO_NS_Destroy_Callback
+{
+ friend class TAO_NS_Builder;
+public:
+ /// Constuctor
+ TAO_NS_CosEC_ProxyPushConsumer (void);
+
+ /// Destructor
+ ~TAO_NS_CosEC_ProxyPushConsumer ();
+
+ /// TAO_NS_Destroy_Callback methods
+ virtual void release (void);
+
+ /// Destroy this object.
+ virtual void destroy (ACE_ENV_SINGLE_ARG_DECL);
+
+protected:
+ ///= CosNotifyChannelAdmin::ProxyPushConsumer methods
+
+ virtual void push (const CORBA::Any & data ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException,
+ CosEventComm::Disconnected
+ ));
+
+ virtual void connect_push_supplier (CosEventComm::PushSupplier_ptr push_supplier ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException,
+ CosEventChannelAdmin::AlreadyConnected
+ ));
+
+ virtual void disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ));
+private:
+ // Overloaded TAO_NS_ProxyConsumer::push to get around Borland compiler warnings.
+ virtual void push (TAO_NS_Event_var &event);
+};
+
+#if defined(_MSC_VER) && (_MSC_VER >= 1200)
+#pragma warning(pop)
+#endif /* _MSC_VER */
+
+#if defined (__ACE_INLINE__)
+#include "CosEC_ProxyPushConsumer.inl"
+#endif /* __ACE_INLINE__ */
+
+#include "ace/post.h"
+#endif /* TAO_NS_COSEC_PROXYPUSHCONSUMER_H */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.inl b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.inl
new file mode 100644
index 00000000000..e787ced2266
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.inl
@@ -0,0 +1,3 @@
+// $Id$
+
+#include "CosEC_ProxyPushConsumer.h"
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.cpp b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.cpp
new file mode 100644
index 00000000000..401d9e8d604
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.cpp
@@ -0,0 +1,74 @@
+// $Id$
+
+#include "CosEC_ProxyPushSupplier.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "CosEC_ProxyPushSupplier.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(Notify, TAO_NS_CosEC_ProxyPushSupplier, "$id$")
+
+#include "tao/debug.h"
+#include "PushConsumer.h"
+#include "../Proxy.h"
+#include "../Admin.h"
+#include "../EventChannel.h"
+#include "../EventChannelFactory.h"
+#include "../Notify_Service.h"
+
+
+TAO_NS_CosEC_ProxyPushSupplier::TAO_NS_CosEC_ProxyPushSupplier (void)
+{
+}
+
+TAO_NS_CosEC_ProxyPushSupplier::~TAO_NS_CosEC_ProxyPushSupplier ()
+{
+}
+
+void
+TAO_NS_CosEC_ProxyPushSupplier::destroy (ACE_ENV_SINGLE_ARG_DECL)
+{
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "In TAO_NS_ProxyPushConsumer::destroy \n"));
+
+ this->inherited::destroy (this ACE_ENV_ARG_PARAMETER);
+}
+
+void
+TAO_NS_CosEC_ProxyPushSupplier::release (void)
+{
+ this->consumer_->release ();
+
+ delete this;
+ //@@ inform factory
+}
+
+void
+TAO_NS_CosEC_ProxyPushSupplier::connect_push_consumer (CosEventComm::PushConsumer_ptr push_consumer
+ ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException,
+ CosEventChannelAdmin::AlreadyConnected,
+ CosEventChannelAdmin::TypeError
+ ))
+{
+ // Convert Consumer to Base Type
+ TAO_NS_PushConsumer* consumer;
+ ACE_NEW_THROW_EX (consumer,
+ TAO_NS_PushConsumer (this),
+ CORBA::NO_MEMORY ());
+
+ consumer->init (push_consumer ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ this->connect (consumer ACE_ENV_ARG_PARAMETER);
+}
+
+void
+TAO_NS_CosEC_ProxyPushSupplier::disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ))
+{
+ this->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
+}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.h b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.h
new file mode 100644
index 00000000000..a7a27dc3221
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.h
@@ -0,0 +1,87 @@
+/* -*- C++ -*- */
+/**
+ * @file CosEC_ProxyPushSupplier.h
+ *
+ * $Id$
+ *
+ * @author Pradeep Gore <pradeep@oomworks.com>
+ *
+ *
+ */
+
+#ifndef TAO_NS_COSEC_PROXYPUSHSUPPLIER_H
+#define TAO_NS_COSEC_PROXYPUSHSUPPLIER_H
+#include "ace/pre.h"
+
+#include "../notify_export.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "orbsvcs/CosEventChannelAdminS.h"
+#include "../ProxySupplier_T.h"
+
+#if defined(_MSC_VER)
+#if (_MSC_VER >= 1200)
+#pragma warning(push)
+#endif /* _MSC_VER >= 1200 */
+#pragma warning(disable:4250)
+#endif /* _MSC_VER */
+
+#if defined ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT
+template class TAO_Notify_Export
+TAO_NS_ProxySupplier_T<POA_CosEventChannelAdmin::ProxyPushSupplier>;
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT */
+
+/**
+ * @class TAO_NS_CosEC_ProxyPushSupplier
+ *
+ * @brief
+ *
+ */
+class TAO_Notify_Export TAO_NS_CosEC_ProxyPushSupplier : public virtual TAO_NS_ProxySupplier_T <POA_CosEventChannelAdmin::ProxyPushSupplier>, public TAO_NS_Destroy_Callback
+{
+ friend class TAO_NS_Builder;
+public:
+ /// Constuctor
+ TAO_NS_CosEC_ProxyPushSupplier (void);
+
+ /// Destructor
+ ~TAO_NS_CosEC_ProxyPushSupplier ();
+
+ /// Destroy this object.
+ virtual void destroy (ACE_ENV_SINGLE_ARG_DECL);
+
+ /// TAO_NS_Destroy_Callback methods
+ virtual void release (void);
+
+ // = Interface methods
+ virtual void connect_push_consumer (
+ CosEventComm::PushConsumer_ptr push_consumer
+ ACE_ENV_ARG_DECL
+ )
+ ACE_THROW_SPEC ((
+ CORBA::SystemException,
+ CosEventChannelAdmin::AlreadyConnected,
+ CosEventChannelAdmin::TypeError
+ ));
+
+ virtual void disconnect_push_supplier (
+ ACE_ENV_SINGLE_ARG_DECL
+ )
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ));
+};
+
+#if defined(_MSC_VER) && (_MSC_VER >= 1200)
+#pragma warning(pop)
+#endif /* _MSC_VER */
+
+#if defined (__ACE_INLINE__)
+#include "CosEC_ProxyPushSupplier.inl"
+#endif /* __ACE_INLINE__ */
+
+#include "ace/post.h"
+#endif /* TAO_NS_COSEC_PROXYPUSHSUPPLIER_H */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.inl b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.inl
new file mode 100644
index 00000000000..c6551c656c4
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushSupplier.inl
@@ -0,0 +1,3 @@
+// $Id$
+
+#include "CosEC_ProxyPushSupplier.h"
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.cpp
new file mode 100644
index 00000000000..8b2ec3bb101
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.cpp
@@ -0,0 +1,110 @@
+// $Id$
+
+#include "ProxyPushConsumer.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "ProxyPushConsumer.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(Notify, TAO_NS_ProxyPushConsumer, "$id$")
+
+#include "ace/Refcounted_Auto_Ptr.h"
+#include "tao/debug.h"
+#include "../Admin.h"
+#include "../AdminProperties.h"
+#include "AnyEvent.h"
+#include "PushSupplier.h"
+
+TAO_NS_ProxyPushConsumer::TAO_NS_ProxyPushConsumer (void)
+{
+}
+
+TAO_NS_ProxyPushConsumer::~TAO_NS_ProxyPushConsumer ()
+{
+}
+
+void
+TAO_NS_ProxyPushConsumer::release (void)
+{
+ if (this->supplier_)
+ this->supplier_->release ();
+
+ delete this;
+ //@@ inform factory
+}
+
+void
+TAO_NS_ProxyPushConsumer::destroy (ACE_ENV_SINGLE_ARG_DECL)
+{
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "In TAO_NS_ProxyPushConsumer::destroy \n"));
+
+ this->inherited::destroy (this ACE_ENV_ARG_PARAMETER);
+}
+
+CosNotifyChannelAdmin::ProxyType
+TAO_NS_ProxyPushConsumer::MyType (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ))
+{
+ return CosNotifyChannelAdmin::PUSH_ANY;
+}
+
+void
+TAO_NS_ProxyPushConsumer::push (TAO_NS_Event_var &/*event*/)
+{
+ // This should never be called.
+ ACE_ASSERT (1);
+}
+
+void
+TAO_NS_ProxyPushConsumer::push (const CORBA::Any& data ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ , CosEventComm::Disconnected
+ ))
+{
+ // Check if we should proceed at all.
+ if (this->admin_properties_->reject_new_events () == 1
+ && this->admin_properties_->queue_full ())
+ ACE_THROW (CORBA::IMP_LIMIT ());
+
+ if (this->is_connected () == 0)
+ {
+ ACE_THROW (CosEventComm::Disconnected ());
+ }
+
+ // Convert
+ TAO_NS_Event_var event (new TAO_NS_AnyEvent (data));
+
+ // Continue processing.
+ this->TAO_NS_ProxyConsumer::push (event);
+}
+
+void
+TAO_NS_ProxyPushConsumer::connect_any_push_supplier (CosEventComm::PushSupplier_ptr push_supplier ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ , CosEventChannelAdmin::AlreadyConnected
+ ))
+{
+ // Convert Supplier to Base Type
+ TAO_NS_PushSupplier *supplier;
+ ACE_NEW_THROW_EX (supplier,
+ TAO_NS_PushSupplier (this),
+ CORBA::NO_MEMORY ());
+
+ supplier->init (push_supplier ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ this->connect (supplier ACE_ENV_ARG_PARAMETER);
+}
+
+void TAO_NS_ProxyPushConsumer::disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ))
+{
+ this->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
+}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.h b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.h
new file mode 100644
index 00000000000..83eb801af1f
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.h
@@ -0,0 +1,94 @@
+/* -*- C++ -*- */
+/**
+ * @file ProxyPushConsumer.h
+ *
+ * $Id$
+ *
+ * @author Pradeep Gore <pradeep@oomworks.com>
+ *
+ *
+ */
+
+#ifndef TAO_NS_PROXYPUSHCONSUMER_H
+#define TAO_NS_PROXYPUSHCONSUMER_H
+#include "ace/pre.h"
+
+#include "../notify_export.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "orbsvcs/CosNotifyChannelAdminS.h"
+#include "../ProxyConsumer_T.h"
+#include "../Destroy_Callback.h"
+
+#if defined(_MSC_VER)
+#if (_MSC_VER >= 1200)
+#pragma warning(push)
+#endif /* _MSC_VER >= 1200 */
+#pragma warning(disable:4250)
+#endif /* _MSC_VER */
+
+/**
+ * @class TAO_NS_ProxyPushConsumer
+ *
+ * @brief
+ *
+ */
+class TAO_Notify_Export TAO_NS_ProxyPushConsumer : public virtual TAO_NS_ProxyConsumer_T <POA_CosNotifyChannelAdmin::ProxyPushConsumer>, public TAO_NS_Destroy_Callback
+{
+ friend class TAO_NS_Builder;
+public:
+ /// Constuctor
+ TAO_NS_ProxyPushConsumer (void);
+
+ /// Destructor
+ ~TAO_NS_ProxyPushConsumer ();
+
+ /// TAO_NS_Destroy_Callback methods
+ virtual void release (void);
+
+ /// Destroy this object.
+ virtual void destroy (ACE_ENV_SINGLE_ARG_DECL);
+
+protected:
+ ///= CosNotifyChannelAdmin::ProxyPushConsumer methods
+
+ virtual CosNotifyChannelAdmin::ProxyType MyType (ACE_ENV_SINGLE_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ));
+
+ virtual void push (const CORBA::Any & data ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException,
+ CosEventComm::Disconnected
+ ));
+
+ virtual void connect_any_push_supplier (CosEventComm::PushSupplier_ptr push_supplier ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException,
+ CosEventChannelAdmin::AlreadyConnected
+ ));
+
+ virtual void disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ));
+
+private:
+ // Overloaded TAO_NS_ProxyConsumer::push to get around Borland compiler warnings.
+ virtual void push (TAO_NS_Event_var &event);
+};
+
+#if defined(_MSC_VER) && (_MSC_VER >= 1200)
+#pragma warning(pop)
+#endif /* _MSC_VER */
+
+#if defined (__ACE_INLINE__)
+#include "ProxyPushConsumer.inl"
+#endif /* __ACE_INLINE__ */
+
+#include "ace/post.h"
+#endif /* TAO_NS_PROXYPUSHCONSUMER_H */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.inl b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.inl
new file mode 100644
index 00000000000..32c8ecf4355
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.inl
@@ -0,0 +1,3 @@
+// $Id$
+
+#include "ProxyPushConsumer.h"
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushSupplier.cpp b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushSupplier.cpp
new file mode 100644
index 00000000000..a430c6f0a1c
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushSupplier.cpp
@@ -0,0 +1,84 @@
+// $Id$
+
+#include "ProxyPushSupplier.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "ProxyPushSupplier.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(Notify, TAO_NS_ProxyPushSupplier, "$id$")
+
+#include "tao/debug.h"
+#include "PushConsumer.h"
+#include "../Proxy.h"
+#include "../Admin.h"
+#include "../EventChannel.h"
+#include "../EventChannelFactory.h"
+#include "../Notify_Service.h"
+
+
+TAO_NS_ProxyPushSupplier::TAO_NS_ProxyPushSupplier (void)
+{
+}
+
+TAO_NS_ProxyPushSupplier::~TAO_NS_ProxyPushSupplier ()
+{
+}
+
+void
+TAO_NS_ProxyPushSupplier::destroy (ACE_ENV_SINGLE_ARG_DECL)
+{
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "In TAO_NS_ProxyPushConsumer::destroy \n"));
+
+ this->inherited::destroy (this ACE_ENV_ARG_PARAMETER);
+}
+
+void
+TAO_NS_ProxyPushSupplier::release (void)
+{
+ if (this->consumer_)
+ this->consumer_->release ();
+
+ delete this;
+ //@@ inform factory
+}
+
+void
+TAO_NS_ProxyPushSupplier::connect_any_push_consumer (CosEventComm::PushConsumer_ptr push_consumer
+ ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException,
+ CosEventChannelAdmin::AlreadyConnected,
+ CosEventChannelAdmin::TypeError
+ ))
+{
+ // Convert Consumer to Base Type
+ TAO_NS_PushConsumer* consumer;
+ ACE_NEW_THROW_EX (consumer,
+ TAO_NS_PushConsumer (this),
+ CORBA::NO_MEMORY ());
+
+ consumer->init (push_consumer ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ this->connect (consumer ACE_ENV_ARG_PARAMETER);
+}
+
+void
+TAO_NS_ProxyPushSupplier::disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ))
+{
+ this->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
+}
+
+CosNotifyChannelAdmin::ProxyType
+TAO_NS_ProxyPushSupplier::MyType (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ))
+{
+ return CosNotifyChannelAdmin::PUSH_ANY;
+}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushSupplier.h b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushSupplier.h
new file mode 100644
index 00000000000..66a6f0de2d8
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushSupplier.h
@@ -0,0 +1,93 @@
+/* -*- C++ -*- */
+/**
+ * @file ProxyPushSupplier.h
+ *
+ * $Id$
+ *
+ * @author Pradeep Gore <pradeep@oomworks.com>
+ *
+ *
+ */
+
+#ifndef TAO_NS_PROXYPUSHSUPPLIER_H
+#define TAO_NS_PROXYPUSHSUPPLIER_H
+#include "ace/pre.h"
+
+#include "../notify_export.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "orbsvcs/CosNotifyChannelAdminS.h"
+#include "orbsvcs/Event_ForwarderS.h"
+#include "../ProxySupplier_T.h"
+
+#if defined(_MSC_VER)
+#if (_MSC_VER >= 1200)
+#pragma warning(push)
+#endif /* _MSC_VER >= 1200 */
+#pragma warning(disable:4250)
+#endif /* _MSC_VER */
+
+#if defined ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT
+template class TAO_Notify_Export
+TAO_NS_ProxySupplier_T<POA_Event_Forwarder::ProxyPushSupplier>;
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT */
+
+/**
+ * @class TAO_NS_ProxyPushSupplier
+ *
+ * @brief
+ *
+ */
+class TAO_Notify_Export TAO_NS_ProxyPushSupplier : public virtual TAO_NS_ProxySupplier_T <POA_Event_Forwarder::ProxyPushSupplier>, public TAO_NS_Destroy_Callback
+{
+ friend class TAO_NS_Builder;
+public:
+ /// Constuctor
+ TAO_NS_ProxyPushSupplier (void);
+
+ /// Destructor
+ ~TAO_NS_ProxyPushSupplier ();
+
+ /// Destroy this object.
+ virtual void destroy (ACE_ENV_SINGLE_ARG_DECL);
+
+ /// TAO_NS_Destroy_Callback methods
+ virtual void release (void);
+
+ // = Interface methods
+ virtual CosNotifyChannelAdmin::ProxyType MyType (ACE_ENV_SINGLE_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ));
+
+ virtual void connect_any_push_consumer (
+ CosEventComm::PushConsumer_ptr push_consumer
+ ACE_ENV_ARG_DECL
+ )
+ ACE_THROW_SPEC ((
+ CORBA::SystemException,
+ CosEventChannelAdmin::AlreadyConnected,
+ CosEventChannelAdmin::TypeError
+ ));
+
+ virtual void disconnect_push_supplier (
+ ACE_ENV_SINGLE_ARG_DECL
+ )
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ));
+};
+
+#if defined(_MSC_VER) && (_MSC_VER >= 1200)
+#pragma warning(pop)
+#endif /* _MSC_VER */
+
+#if defined (__ACE_INLINE__)
+#include "ProxyPushSupplier.inl"
+#endif /* __ACE_INLINE__ */
+
+#include "ace/post.h"
+#endif /* TAO_NS_PROXYPUSHSUPPLIER_H */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushSupplier.inl b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushSupplier.inl
new file mode 100644
index 00000000000..dda279acbfc
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushSupplier.inl
@@ -0,0 +1,3 @@
+// $Id$
+
+#include "ProxyPushSupplier.h"
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp
new file mode 100644
index 00000000000..ec6b2298fac
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp
@@ -0,0 +1,68 @@
+// $Id$
+
+#include "PushConsumer.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "PushConsumer.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(Notify, TAO_NS_PushConsumer, "$id$")
+
+#include "ace/Refcounted_Auto_Ptr.h"
+#include "orbsvcs/CosEventCommC.h"
+#include "../Event.h"
+
+TAO_NS_PushConsumer::TAO_NS_PushConsumer (TAO_NS_ProxySupplier* proxy)
+ :TAO_NS_Consumer (proxy)
+{
+}
+
+TAO_NS_PushConsumer::~TAO_NS_PushConsumer ()
+{
+}
+
+void
+TAO_NS_PushConsumer::init (CosEventComm::PushConsumer_ptr push_consumer ACE_ENV_ARG_DECL)
+{
+ this->push_consumer_ = CosEventComm::PushConsumer::_duplicate (push_consumer);
+
+ ACE_TRY
+ {
+ this->publish_ = CosNotifyComm::NotifyPublish::_narrow (push_consumer ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ // _narrow failed which probably means the interface is CosEventComm type.
+ }
+ ACE_ENDTRY;
+}
+
+void
+TAO_NS_PushConsumer::release (void)
+{
+ delete this;
+ //@@ inform factory
+}
+
+void
+TAO_NS_PushConsumer::push_i (const TAO_NS_Event_var& event ACE_ENV_ARG_DECL)
+{
+ event->push (this ACE_ENV_ARG_PARAMETER);
+}
+
+void
+TAO_NS_PushConsumer::push (const CORBA::Any& payload ACE_ENV_ARG_DECL)
+{
+ this->push_consumer_->push (payload ACE_ENV_ARG_PARAMETER);
+}
+
+void
+TAO_NS_PushConsumer::push (const CosNotification::StructuredEvent& event ACE_ENV_ARG_DECL)
+{
+ CORBA::Any any;
+
+ TAO_NS_Event::translate (event, any);
+
+ this->push_consumer_->push (any ACE_ENV_ARG_PARAMETER);
+}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.h b/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.h
new file mode 100644
index 00000000000..5660ddeee77
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.h
@@ -0,0 +1,66 @@
+/* -*- C++ -*- */
+/**
+ * @file PushConsumer.h
+ *
+ * $Id$
+ *
+ * @author Pradeep Gore <pradeep@oomworks.com>
+ *
+ *
+ */
+
+#ifndef TAO_NS_PUSHCONSUMER_H
+#define TAO_NS_PUSHCONSUMER_H
+#include "ace/pre.h"
+
+#include "../notify_export.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "orbsvcs/CosNotifyCommC.h"
+#include "../Consumer.h"
+#include "../Destroy_Callback.h"
+
+/**
+ * @class TAO_NS_PushConsumer
+ *
+ * @brief Wrapper for the PushConsumer that connect to the EventChannel.
+ *
+ */
+class TAO_Notify_Export TAO_NS_PushConsumer : public TAO_NS_Consumer
+{
+public:
+ /// Constuctor
+ TAO_NS_PushConsumer (TAO_NS_ProxySupplier* proxy);
+
+ /// Destructor
+ ~TAO_NS_PushConsumer ();
+
+ /// Init
+ void init (CosEventComm::PushConsumer_ptr push_consumer ACE_ENV_ARG_DECL);
+
+ /// TAO_NS_Destroy_Callback methods.
+ virtual void release (void);
+
+ /// Push <event> to this consumer.
+ void push_i (const TAO_NS_Event_var& event ACE_ENV_ARG_DECL);
+
+ /// Push <event> to this consumer.
+ virtual void push (const CORBA::Any& event ACE_ENV_ARG_DECL);
+
+ /// Push <event> to this consumer.
+ virtual void push (const CosNotification::StructuredEvent& event ACE_ENV_ARG_DECL);
+
+protected:
+ /// The Consumer
+ CosEventComm::PushConsumer_var push_consumer_;
+};
+
+#if defined (__ACE_INLINE__)
+#include "PushConsumer.inl"
+#endif /* __ACE_INLINE__ */
+
+#include "ace/post.h"
+#endif /* TAO_NS_PUSHCONSUMER_H */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.inl b/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.inl
new file mode 100644
index 00000000000..e557d6a7eda
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.inl
@@ -0,0 +1,3 @@
+// $Id$
+
+#include "PushConsumer.h"
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/PushSupplier.cpp b/TAO/orbsvcs/orbsvcs/Notify/Any/PushSupplier.cpp
new file mode 100644
index 00000000000..fb1642a7759
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Any/PushSupplier.cpp
@@ -0,0 +1,42 @@
+// $Id$
+
+#include "PushSupplier.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "PushSupplier.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(Notify, TAO_NS_PushSupplier, "$id$")
+
+TAO_NS_PushSupplier::TAO_NS_PushSupplier (TAO_NS_ProxyConsumer* proxy)
+ :TAO_NS_Supplier (proxy)
+{
+}
+
+TAO_NS_PushSupplier::~TAO_NS_PushSupplier ()
+{
+}
+
+void
+TAO_NS_PushSupplier::init (CosEventComm::PushSupplier_ptr push_supplier ACE_ENV_ARG_DECL)
+{
+ this->push_supplier_ = CosEventComm::PushSupplier::_duplicate (push_supplier);
+
+ ACE_TRY
+ {
+ this->subscribe_ = CosNotifyComm::NotifySubscribe::_narrow (push_supplier ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ // _narrow failed which probably means the interface is CosEventComm type.
+ }
+ ACE_ENDTRY;
+}
+
+void
+TAO_NS_PushSupplier::release (void)
+{
+ delete this;
+ //@@ inform factory
+}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/PushSupplier.h b/TAO/orbsvcs/orbsvcs/Notify/Any/PushSupplier.h
new file mode 100644
index 00000000000..3ba1c4fb2c0
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Any/PushSupplier.h
@@ -0,0 +1,60 @@
+/* -*- C++ -*- */
+/**
+ * @file PushSupplier.h
+ *
+ * $Id$
+ *
+ * @author Pradeep Gore <pradeep@oomworks.com>
+ *
+ *
+ */
+
+#ifndef TAO_NS_PUSHSUPPLIER_H
+#define TAO_NS_PUSHSUPPLIER_H
+#include "ace/pre.h"
+
+#include "../notify_export.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+
+#include "orbsvcs/CosNotifyCommC.h"
+#include "../Supplier.h"
+#include "../Destroy_Callback.h"
+
+class TAO_NS_ProxyConsumer;
+
+/**
+ * @class TAO_NS_StructuredPushSupplier
+ *
+ * @brief Wrapper for the PushSupplier that connect to the EventChannel.
+ *
+ */
+class TAO_Notify_Export TAO_NS_PushSupplier : public TAO_NS_Supplier
+{
+public:
+ /// Constuctor
+ TAO_NS_PushSupplier (TAO_NS_ProxyConsumer* proxy);
+
+ /// Destructor
+ ~TAO_NS_PushSupplier ();
+
+ /// Init
+ void init (CosEventComm::PushSupplier_ptr push_supplier ACE_ENV_ARG_DECL);
+
+ /// TAO_NS_Destroy_Callback methods
+ virtual void release (void);
+
+protected:
+ /// The Supplier
+ CosEventComm::PushSupplier_var push_supplier_;
+};
+
+#if defined (__ACE_INLINE__)
+#include "PushSupplier.inl"
+#endif /* __ACE_INLINE__ */
+
+#include "ace/post.h"
+#endif /* TAO_NS_PUSHSUPPLIER_H */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/PushSupplier.inl b/TAO/orbsvcs/orbsvcs/Notify/Any/PushSupplier.inl
new file mode 100644
index 00000000000..9f9bf75ff2f
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Any/PushSupplier.inl
@@ -0,0 +1,3 @@
+// $Id$
+
+#include "PushSupplier.h"
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp b/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp
new file mode 100644
index 00000000000..c030592d339
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp
@@ -0,0 +1,271 @@
+// $Id$
+
+#include "Buffering_Strategy.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "Buffering_Strategy.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(Notify, TAO_NS_Buffering_Strategy, "$id$")
+
+#include "ace/Message_Queue.h"
+#include "orbsvcs/CosNotificationC.h"
+#include "Method_Request.h"
+#include "Notify_Extensions.h"
+#include "QoSProperties.h"
+#include "tao/debug.h"
+
+TAO_NS_Buffering_Strategy::TAO_NS_Buffering_Strategy (TAO_NS_Message_Queue& msg_queue, TAO_NS_AdminProperties_var& admin_properties, CORBA::Long batch_size)
+ : msg_queue_ (msg_queue),
+ admin_properties_ (admin_properties),
+ global_queue_lock_ (admin_properties->global_queue_lock ()),
+ global_queue_not_full_condition_ (admin_properties->global_queue_not_full_condition ()),
+ global_queue_length_ (admin_properties->global_queue_length ()),
+ max_global_queue_length_ (admin_properties->max_global_queue_length ()),
+ max_local_queue_length_ (0),
+ order_policy_ (CosNotification::OrderPolicy, CosNotification::AnyOrder),
+ discard_policy_ (CosNotification::DiscardPolicy, CosNotification::AnyOrder),
+ use_discarding_ (1),
+ local_queue_not_full_condition_ (global_queue_lock_),
+ batch_size_ (batch_size),
+ batch_size_reached_condition_ (global_queue_lock_),
+ shutdown_ (0)
+{
+}
+
+TAO_NS_Buffering_Strategy::~TAO_NS_Buffering_Strategy ()
+{
+}
+
+void
+TAO_NS_Buffering_Strategy::update_qos_properties (const TAO_NS_QoSProperties& qos_properties)
+{
+ this->order_policy_.set (qos_properties);
+
+ if (this->discard_policy_.set (qos_properties) != -1)
+ {
+ this->use_discarding_ = 1;
+ }
+
+ TAO_NS_Property_Time blocking_timeout (TAO_Notify_Extensions::BlockingPolicy);
+
+ if (blocking_timeout.set (qos_properties) != -1) // if set to a valid time, init the blocking_time_
+ {
+ this->use_discarding_ = 0;
+
+ this->blocking_time_ =
+# if defined (ACE_CONFIG_WIN32_H)
+ ACE_Time_Value (ACE_static_cast (long, blocking_timeout.value ()));
+# else
+ ACE_Time_Value (blocking_timeout.value () / 1);
+# endif /* ACE_CONFIG_WIN32_H */
+ }
+}
+
+void
+TAO_NS_Buffering_Strategy::shutdown (void)
+{
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_);
+
+ this->shutdown_ = 1;
+
+ this->global_queue_not_full_condition_.broadcast ();
+ this->local_queue_not_full_condition_.broadcast ();
+ this->batch_size_reached_condition_.broadcast ();
+}
+
+int
+TAO_NS_Buffering_Strategy::enqueue (TAO_NS_Method_Request& method_request)
+{
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1);
+
+ // while either local or global max reached
+ while ((this->max_local_queue_length_ != 0 &&
+ this->msg_queue_.message_count () == this->max_local_queue_length_)
+ ||
+ (this->max_global_queue_length_.value () != 0 &&
+ this->global_queue_length_ == this->max_global_queue_length_.value ()))
+ {
+ if (this->shutdown_ == 1) // if we're shutdown, don't play this silly game.
+ return -1;
+
+ if (this->use_discarding_ == 1)
+ {
+ if (this->global_queue_length_ == this->max_global_queue_length_.value ()
+ && this->msg_queue_.message_count () == 0) // global max. reached but can't discard
+ {
+ // block. this is a hack because the real solution is to locate the appropriate queue and dequeue from it.
+ this->global_queue_not_full_condition_.wait ();
+ }
+ else // local max reached or, at global max but non-zero local count.
+ {
+ if (this->discard () == -1)
+ return -1;
+
+ --this->global_queue_length_;
+
+ // ACE_DEBUG ((LM_DEBUG, "Discarded from %x, global_queue_length = %d\n", this, this->global_queue_length_));
+
+ this->global_queue_not_full_condition_.signal ();
+ this->local_queue_not_full_condition_.signal ();
+ }
+ }
+ else // block
+ {
+ if (this->msg_queue_.message_count () == this->max_local_queue_length_) // local maximum reached
+ {
+ if (this->blocking_time_ == ACE_Time_Value::zero) // wait forever if need be.
+ {
+ this->local_queue_not_full_condition_.wait ();
+ }
+ else // finite blocking time.
+ {
+ ACE_Time_Value absolute = ACE_OS::gettimeofday () + this->blocking_time_;
+
+ if (this->local_queue_not_full_condition_.wait (&absolute) == -1) // returns -1 on timeout
+ return -1; // Note message is discarded if it could not be enqueued in the given time.
+ }
+ }
+ else // global max reached
+ {
+ if (this->blocking_time_ == ACE_Time_Value::zero) // wait forever if need be.
+ {
+ this->global_queue_not_full_condition_.wait ();
+ }
+ else // finite blocking time.
+ {
+ ACE_Time_Value absolute = ACE_OS::gettimeofday () + blocking_time_;
+
+ if (this->global_queue_not_full_condition_.wait (&absolute) == -1) // returns -1 on timeout
+ return -1;
+ }
+ }
+ } // block
+ } // while
+
+ if (this->queue (method_request) == -1)
+ {
+ ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - "
+ "Panic! failed to enqueue event"));
+ return -1;
+ }
+
+ ++this->global_queue_length_;
+
+ // ACE_DEBUG ((LM_DEBUG, "Inserted to %x, global_queue_length = %d\n", this, this->global_queue_length_));
+
+ if (this->msg_queue_.message_count () == this->batch_size_)
+ batch_size_reached_condition_.signal ();
+
+ return this->msg_queue_.message_count ();
+}
+
+int
+TAO_NS_Buffering_Strategy::dequeue (TAO_NS_Method_Request* &method_request, const ACE_Time_Value *abstime)
+{
+ ACE_Message_Block *mb;
+
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1);
+
+ while (this->msg_queue_.message_count () < this->batch_size_) // block
+ {
+ this->batch_size_reached_condition_.wait (abstime);
+
+ if (this->shutdown_ == 1) // if we're shutdown, don't play this silly game.
+ return -1;
+
+ if (errno == ETIME)
+ return 0;
+ }
+
+ if (this->msg_queue_.dequeue (mb) == -1)
+ return -1;
+
+ method_request = ACE_dynamic_cast (TAO_NS_Method_Request*, mb);
+
+ if (method_request == 0)
+ return -1;
+
+ --this->global_queue_length_;
+
+ // ACE_DEBUG ((LM_DEBUG, "Dequeued from %x, global_queue_length = %d\n", this, this->global_queue_length_));
+
+ this->global_queue_not_full_condition_.signal ();
+ this->local_queue_not_full_condition_.signal ();
+
+ return 1;
+}
+
+int
+TAO_NS_Buffering_Strategy::queue (TAO_NS_Method_Request& method_request)
+{
+ int result;
+
+ // Queue according to order policy
+ if (this->order_policy_ == CosNotification::AnyOrder ||
+ this->order_policy_ == CosNotification::FifoOrder)
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - "
+ "enqueue in fifo order\n"));
+ // Insert at the end of the queue.
+ result = this->msg_queue_.enqueue_tail (&method_request);
+ }
+ else if (this->order_policy_ == CosNotification::PriorityOrder)
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - "
+ "enqueue in priority order\n"));
+ result = this->msg_queue_.enqueue_prio (&method_request);
+ }
+ else if (this->order_policy_ == CosNotification::DeadlineOrder)
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - "
+ "enqueue in deadline order\n"));
+ result = this->msg_queue_.enqueue_deadline (&method_request);
+ }
+ else
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - Invalid order policy\n"));
+
+ result = -1;
+ }
+
+ return result;
+}
+
+int
+TAO_NS_Buffering_Strategy::discard (void)
+{
+ ACE_Message_Block *mb;
+ int result;
+
+ if (this->discard_policy_ == CosNotification::AnyOrder ||
+ this->discard_policy_ == CosNotification::FifoOrder)
+ {
+ result = this->msg_queue_.dequeue_head (mb);
+ }
+ else if (this->discard_policy_ == CosNotification::LifoOrder)
+ {
+ result = this->msg_queue_.dequeue_tail (mb);
+ }
+ else if (this->discard_policy_ == CosNotification::DeadlineOrder)
+ {
+ result = this->msg_queue_.dequeue_deadline (mb);
+ }
+ else if (this->discard_policy_ == CosNotification::PriorityOrder)
+ {
+ result = this->msg_queue_.dequeue_prio (mb);
+ }
+ else
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "Notify (%P|%t) - "
+ "Invalid discard policy\n"));
+ result = -1;
+ }
+
+ return result;
+}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.h b/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.h
new file mode 100644
index 00000000000..7b854a47508
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.h
@@ -0,0 +1,137 @@
+/* -*- C++ -*- */
+/**
+ * @file Buffering_Strategy.h
+ *
+ * $Id$
+ *
+ * @author Pradeep Gore <pradeep@oomworks.com>
+ *
+ *
+ */
+
+#ifndef TAO_NS_BUFFERING_STRATEGY_H
+#define TAO_NS_BUFFERING_STRATEGY_H
+#include "ace/pre.h"
+
+#include "notify_export.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+
+#include "ace/Message_Queue.h"
+#include "orbsvcs/TimeBaseC.h"
+#include "Property.h"
+#include "Property_T.h"
+#include "AdminProperties.h"
+
+class TAO_NS_Method_Request;
+class TAO_NS_QoSProperties;
+
+typedef ACE_Message_Queue<ACE_NULL_SYNCH> TAO_NS_Message_Queue;
+
+/**
+ * @class TAO_NS_Buffering_Strategy
+ *
+ * @brief Base Strategy to enqueue and dequeue items from a Message Queue.
+ *
+ */
+class TAO_Notify_Export TAO_NS_Buffering_Strategy
+{
+public:
+ /// Constuctor
+ TAO_NS_Buffering_Strategy (TAO_NS_Message_Queue& msg_queue, TAO_NS_AdminProperties_var& admin_properties, CORBA::Long batch_size);
+
+ /// Destructor
+ ~TAO_NS_Buffering_Strategy ();
+
+ /// Update state with the following QoS Properties:
+ /// Order Policy
+ /// Discard Policy
+ /// MaxEventsPerConsumer
+ /// TAO_Notify_Extensions::BlockingPolicy
+ void update_qos_properties (const TAO_NS_QoSProperties& qos_properties);
+
+ /// Enqueue according the enqueing strategy.
+ /// Return -1 on error else the number of items in the queue.
+ int enqueue (TAO_NS_Method_Request& method_request);
+
+ /// Dequeue batch. This method will block for <abstime> if non-zero or else blocks till an item is available.
+ /// Return -1 on error or if nothing is available, else the number of items actually dequeued (1).
+ int dequeue (TAO_NS_Method_Request* &method_request, const ACE_Time_Value *abstime);
+
+ /// Shutdown
+ void shutdown (void);
+
+ /// Set the new batch size.
+ void batch_size (CORBA::Long batch_size);
+
+ /// Obtain our batch size
+ CORBA::Long batch_size (void);
+
+ /// Set the max local queue length.
+ void max_local_queue_length (CORBA::Long length);
+
+protected:
+ /// Apply the Order Policy and queue. return -1 on error.
+ int queue (TAO_NS_Method_Request& method_request);
+
+ /// Discard as per the Discard Policy.
+ int discard (void);
+
+ ///= Data Members
+
+ /// The local Message Queue
+ TAO_NS_Message_Queue& msg_queue_;
+
+ /// Reference to the properties per event channel.
+ TAO_NS_AdminProperties_var admin_properties_;
+
+ /// The shared global lock used by all the queues.
+ ACE_SYNCH_MUTEX& global_queue_lock_;
+
+ /// The shared Condition for global queue not full.
+ ACE_SYNCH_CONDITION& global_queue_not_full_condition_;
+
+ /// The global queue length - queue length accross all the queues.
+ CORBA::Long& global_queue_length_;
+
+ /// The maximum events that can be queued overall.
+ const TAO_NS_Property_Long& max_global_queue_length_;
+
+ /// The maximum queue length for the local queue.
+ CORBA::Long max_local_queue_length_;
+
+ /// Order of events in internal buffers.
+ TAO_NS_Property_Short order_policy_;
+
+ /// Policy to discard when buffers are full.
+ TAO_NS_Property_Short discard_policy_;
+
+ /// Flag that we should use discarding(1) or blocking (0).
+ int use_discarding_;
+
+ /// The blocking timeout will be used in place of discarding
+ /// This is a TAO specific extension.
+ ACE_Time_Value blocking_time_; // 0 means wait forever.
+
+ /// Condition that the local queue is not full.
+ ACE_SYNCH_CONDITION local_queue_not_full_condition_;
+
+ /// The batch size that we want to monitor for dequeuing.
+ CORBA::Long batch_size_;
+
+ /// Condition that batch size reached.
+ ACE_SYNCH_CONDITION batch_size_reached_condition_;
+
+ /// Flag to shutdown.
+ int shutdown_;
+};
+
+#if defined (__ACE_INLINE__)
+#include "Buffering_Strategy.inl"
+#endif /* __ACE_INLINE__ */
+
+#include "ace/post.h"
+#endif /* TAO_NS_BUFFERING_STRATEGY_H */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.inl b/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.inl
new file mode 100644
index 00000000000..3dbfa925c5b
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.inl
@@ -0,0 +1,19 @@
+// $Id$
+
+ACE_INLINE void
+TAO_NS_Buffering_Strategy::batch_size (CORBA::Long batch_size)
+{
+ this->batch_size_ = batch_size;
+}
+
+ACE_INLINE CORBA::Long
+TAO_NS_Buffering_Strategy::batch_size (void)
+{
+ return this->batch_size_;
+}
+
+ACE_INLINE void
+TAO_NS_Buffering_Strategy::max_local_queue_length (CORBA::Long length)
+{
+ this->max_local_queue_length_ = length;
+}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/CosNotify_Initializer.cpp b/TAO/orbsvcs/orbsvcs/Notify/CosNotify_Initializer.cpp
new file mode 100644
index 00000000000..7794ee10956
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/CosNotify_Initializer.cpp
@@ -0,0 +1,13 @@
+// $Id$
+
+#include "CosNotify_Initializer.h"
+
+ACE_RCSID(Notify, TAO_NS_CosNotify_Initializer, "$id$")
+
+#include "Notify_Service.h"
+
+TAO_NS_CosNotify_Initializer::TAO_NS_CosNotify_Initializer (void)
+{
+ ACE_Service_Config::static_svcs ()->insert (&ace_svc_desc_TAO_NS_Notify_Service);
+ ACE_Service_Config::static_svcs ()->insert (&ace_svc_desc_TAO_Notify_Default_EMO_Factory_OLD);
+}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/CosNotify_Initializer.h b/TAO/orbsvcs/orbsvcs/Notify/CosNotify_Initializer.h
new file mode 100644
index 00000000000..a2e73f4898a
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/CosNotify_Initializer.h
@@ -0,0 +1,38 @@
+/* -*- C++ -*- */
+/**
+ * @file CosNotify_Initializer.h
+ *
+ * $Id$
+ *
+ * @author Pradeep Gore <pradeep@oomworks.com>
+ *
+ *
+ */
+
+#ifndef TAO_NS_COSNOTIFY_INITIALIZER_H
+#define TAO_NS_COSNOTIFY_INITIALIZER_H
+#include "ace/pre.h"
+
+#include "notify_export.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+/**
+ * @class TAO_NS_CosNotify_Initializer
+ *
+ * @brief Helper to load the Cos Notification service into the service conf. for static links.
+ *
+ */
+class TAO_Notify_Export TAO_NS_CosNotify_Initializer
+{
+public:
+ /// Constuctor
+ TAO_NS_CosNotify_Initializer (void);
+};
+
+static TAO_NS_CosNotify_Initializer TAO_NS_CosNotify_initializer;
+
+#include "ace/post.h"
+#endif /* TAO_NS_COSNOTIFY_INITIALIZER_H */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.cpp b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.cpp
new file mode 100644
index 00000000000..60049cf0bef
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.cpp
@@ -0,0 +1,55 @@
+// $Id$
+
+#include "Method_Request_Updates.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "Method_Request_Updates.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(Notify, TAO_NS_Method_Request_Updates, "$id$")
+
+#include "tao/debug.h"
+#include "Proxy.h"
+#include "Peer.h"
+
+TAO_NS_Method_Request_Updates::TAO_NS_Method_Request_Updates (const TAO_NS_EventTypeSeq& added, const TAO_NS_EventTypeSeq& removed, TAO_NS_Proxy* proxy)
+ :added_ (added), removed_ (removed), proxy_ (proxy), refcountable_guard_ (*proxy)
+{
+}
+
+TAO_NS_Method_Request_Updates::~TAO_NS_Method_Request_Updates ()
+{
+}
+
+TAO_NS_Method_Request*
+TAO_NS_Method_Request_Updates::copy (void)
+{
+ /// @@use factory
+ return new TAO_NS_Method_Request_Updates (this->added_, this->removed_, this->proxy_);
+}
+
+int
+TAO_NS_Method_Request_Updates::execute (ACE_ENV_SINGLE_ARG_DECL)
+{
+ if (this->proxy_->has_shutdown ())
+ return 0; // If we were shutdown while waiting in the queue, return with no action.
+
+ ACE_TRY
+ {
+ TAO_NS_Peer* peer = this->proxy_->peer();
+
+ if (peer != 0)
+ {
+ peer->dispatch_updates (this->added_, this->removed_ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ }
+ ACE_CATCHANY
+ {
+ if (TAO_debug_level > 0)
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_NS_Method_Request_Updates::execute error sending updates\n ");
+ }
+ ACE_ENDTRY;
+
+ return 0;
+}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.h b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.h
new file mode 100644
index 00000000000..00e68a21f18
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.h
@@ -0,0 +1,64 @@
+/* -*- C++ -*- */
+/**
+ * @file Method_Request_Updates.h
+ *
+ * $Id$
+ *
+ * @author Pradeep Gore <pradeep@oomworks.com>
+ *
+ *
+ */
+
+#ifndef TAO_NS_METHOD_REQUEST_UPDATES_H
+#define TAO_NS_METHOD_REQUEST_UPDATES_H
+#include "ace/pre.h"
+
+#include "notify_export.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "Method_Request.h"
+#include "Types.h"
+#include "EventTypeSeq.h"
+
+/**
+ * @class TAO_NS_Method_Request_Updates
+ *
+ * @brief
+ *
+ */
+class TAO_Notify_Export TAO_NS_Method_Request_Updates : public TAO_NS_Method_Request
+{
+public:
+ /// Constuctor
+ TAO_NS_Method_Request_Updates (const TAO_NS_EventTypeSeq& added, const TAO_NS_EventTypeSeq& removed, TAO_NS_Proxy* proxy);
+
+ /// Destructor
+ ~TAO_NS_Method_Request_Updates ();
+
+ /// Create a copy of this object.
+ TAO_NS_Method_Request* copy (void);
+
+ /// Execute the Request
+ virtual int execute (ACE_ENV_SINGLE_ARG_DECL);
+
+private:
+ /// The Updates
+ const TAO_NS_EventTypeSeq added_;
+ const TAO_NS_EventTypeSeq removed_;
+
+ /// The proxy that will receive the updates.
+ TAO_NS_Proxy* proxy_;
+
+ /// Guard to automatically inc/decr ref count on the proxy.
+ TAO_NS_Refcountable_Guard refcountable_guard_;
+};
+
+#if defined (__ACE_INLINE__)
+#include "Method_Request_Updates.inl"
+#endif /* __ACE_INLINE__ */
+
+#include "ace/post.h"
+#endif /* TAO_NS_METHOD_REQUEST_UPDATES_H */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.inl b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.inl
new file mode 100644
index 00000000000..bf5cc3848c2
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.inl
@@ -0,0 +1,3 @@
+// $Id$
+
+#include "Method_Request_Updates.h"
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Property.h b/TAO/orbsvcs/orbsvcs/Notify/Property.h
new file mode 100644
index 00000000000..ae91ce42aff
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Property.h
@@ -0,0 +1,39 @@
+/* -*- C++ -*- */
+/**
+ * @file Property.h
+ *
+ * $Id$
+ *
+ * @author Pradeep Gore <pradeep@oomworks.com>
+ *
+ *
+ */
+
+#ifndef TAO_NS_PROPERTY_H
+#define TAO_NS_PROPERTY_H
+#include "ace/pre.h"
+
+#include "notify_export.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "tao/corba.h"
+#include "tao/orbconf.h"
+#include "tao/TimeBaseC.h"
+#include "orbsvcs/NotifyExtC.h"
+
+template <class LOCK, class TYPE> class ACE_Atomic_Op;
+template <class TYPE> class TAO_NS_Property_T;
+template <class TYPE> class TAO_NS_StructProperty_T;
+
+typedef ACE_Atomic_Op <TAO_SYNCH_MUTEX,CORBA::Long> TAO_NS_Atomic_Property_Long;
+typedef TAO_NS_Property_T<CORBA::Long> TAO_NS_Property_Long;
+typedef TAO_NS_Property_T<CORBA::Short> TAO_NS_Property_Short;
+typedef TAO_NS_Property_T<TimeBase::TimeT> TAO_NS_Property_Time;
+typedef TAO_NS_StructProperty_T<NotifyExt::ThreadPoolParams> TAO_NS_Property_ThreadPool;
+typedef TAO_NS_StructProperty_T<NotifyExt::ThreadPoolLanesParams> TAO_NS_Property_ThreadPoolLanes;
+
+#include "ace/post.h"
+#endif /* TAO_NS_PROPERTY_H */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/PropertySeq.cpp b/TAO/orbsvcs/orbsvcs/Notify/PropertySeq.cpp
new file mode 100644
index 00000000000..ad5042b9e87
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/PropertySeq.cpp
@@ -0,0 +1,53 @@
+// $Id$
+
+#include "PropertySeq.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "PropertySeq.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(Notify, TAO_NS_PropertySeq, "$id$")
+
+TAO_NS_PropertySeq::TAO_NS_PropertySeq (void)
+{
+}
+
+TAO_NS_PropertySeq::~TAO_NS_PropertySeq ()
+{
+}
+
+int
+TAO_NS_PropertySeq::init (const CosNotification::PropertySeq& prop_seq)
+{
+ ACE_CString name;
+
+ for (CORBA::ULong i = 0; i < prop_seq.length (); ++i)
+ {
+ name = prop_seq[i].name.in ();
+
+ if (this->property_map_.rebind (name, prop_seq[i].value) == -1)
+ return -1;
+ }
+ // Note call to rebind. This allows to call <init> to set updates.
+
+ return 0;
+}
+
+int
+TAO_NS_PropertySeq::populate (CosNotification::PropertySeq_var& prop_seq)
+{
+ PROPERTY_MAP::ITERATOR iterator (this->property_map_);
+
+ int index = prop_seq->length ();
+ prop_seq->length (index + this->property_map_.current_size ());
+
+ for (PROPERTY_MAP::ENTRY *entry = 0;
+ iterator.next (entry) != 0;
+ iterator.advance (), ++index)
+ {
+ (*prop_seq)[index].name = CORBA::string_dup (entry->ext_id_.c_str ());
+ (*prop_seq)[index].value = entry->int_id_;
+ }
+
+ return 0;
+}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/PropertySeq.h b/TAO/orbsvcs/orbsvcs/Notify/PropertySeq.h
new file mode 100644
index 00000000000..4371aea34f9
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/PropertySeq.h
@@ -0,0 +1,62 @@
+/* -*- C++ -*- */
+/**
+ * @file PropertySeq.h
+ *
+ * $Id$
+ *
+ * @author Pradeep Gore <pradeep@oomworks.com>
+ *
+ *
+ */
+
+#ifndef TAO_NS_PROPERTYSEQ_H
+#define TAO_NS_PROPERTYSEQ_H
+#include "ace/pre.h"
+
+#include "notify_export.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "orbsvcs/CosNotificationC.h"
+#include "ace/Hash_Map_Manager.h"
+#include "ace/SString.h"
+
+/**
+ * @class TAO_NS_PropertySeq
+ *
+ * @brief
+ *
+ */
+class TAO_Notify_Export TAO_NS_PropertySeq
+{
+public:
+ /// Constuctor
+ TAO_NS_PropertySeq (void);
+
+ /// Destructor
+ ~TAO_NS_PropertySeq ();
+
+ /// Return 0 on success, -1 on error.
+ int init (const CosNotification::PropertySeq& prop_seq);
+
+ /// Find the <value> for property <name>. Returns 0 on success.
+ int find (const ACE_CString& name, CosNotification::PropertyValue& value) const;
+
+ /// Return -1 on error.
+ int populate (CosNotification::PropertySeq_var& prop_seq);
+
+protected:
+ /// Property Map.
+ typedef ACE_Hash_Map_Manager <ACE_CString, CosNotification::PropertyValue, ACE_SYNCH_NULL_MUTEX> PROPERTY_MAP;
+
+ PROPERTY_MAP property_map_;
+};
+
+#if defined (__ACE_INLINE__)
+#include "PropertySeq.inl"
+#endif /* __ACE_INLINE__ */
+
+#include "ace/post.h"
+#endif /* TAO_NS_PROPERTYSEQ_H */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/PropertySeq.inl b/TAO/orbsvcs/orbsvcs/Notify/PropertySeq.inl
new file mode 100644
index 00000000000..271543a2f82
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/PropertySeq.inl
@@ -0,0 +1,7 @@
+// $Id$
+
+ACE_INLINE int
+TAO_NS_PropertySeq::find (const ACE_CString& name, CosNotification::PropertyValue& value) const
+{
+ return this->property_map_.find (name, value);
+}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Property_Boolean.cpp b/TAO/orbsvcs/orbsvcs/Notify/Property_Boolean.cpp
new file mode 100644
index 00000000000..4c240b12bd9
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Property_Boolean.cpp
@@ -0,0 +1,43 @@
+// $Id$
+
+#include "Property_Boolean.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "Property_Boolean.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(Notify, TAO_NS_Property_Boolean, "$id$")
+
+#include "PropertySeq.h"
+
+TAO_NS_Property_Boolean::TAO_NS_Property_Boolean (const ACE_CString& name)
+ :name_ (name), valid_(0)
+{
+}
+
+TAO_NS_Property_Boolean::TAO_NS_Property_Boolean (const ACE_CString& name, CORBA::Boolean initial)
+ :name_ (name), value_ (initial), valid_ (1)
+{
+}
+
+int
+TAO_NS_Property_Boolean::set (const TAO_NS_PropertySeq& property_seq)
+{
+ CosNotification::PropertyValue value;
+
+ if (property_seq.find (this->name_, value) == -1)
+ return -1;
+
+ value >>= CORBA::Any::to_boolean (this->value_);
+
+ return 0;
+}
+
+void
+TAO_NS_Property_Boolean::get (CosNotification::PropertySeq& prop_seq)
+{
+ /// Make space
+ prop_seq.length (prop_seq.length () + 1);
+
+ prop_seq[prop_seq.length () - 1].value <<= CORBA::Any::from_boolean (this->value_);
+}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Property_Boolean.h b/TAO/orbsvcs/orbsvcs/Notify/Property_Boolean.h
new file mode 100644
index 00000000000..1ddb447f418
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Property_Boolean.h
@@ -0,0 +1,81 @@
+/* -*- C++ -*- */
+/**
+ * @file Property_Boolean.h
+ *
+ * $Id$
+ *
+ * @author Pradeep Gore <pradeep@oomworks.com>
+ *
+ *
+ */
+
+#ifndef TAO_NS_PROPERTY_BOOLEAN_H
+#define TAO_NS_PROPERTY_BOOLEAN_H
+#include "ace/pre.h"
+
+#include "notify_export.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "orbsvcs/CosNotificationC.h"
+
+class TAO_NS_PropertySeq;
+
+/**
+ * @class TAO_NS_Property_Boolean
+ *
+ * @brief Boolean Property.
+ *
+ */
+/*******************************************************************************/
+
+class TAO_Notify_Export TAO_NS_Property_Boolean
+{
+public:
+ /// Constuctor
+ TAO_NS_Property_Boolean (const ACE_CString& name, CORBA::Boolean initial);
+
+ /// Constuctor
+ TAO_NS_Property_Boolean (const ACE_CString& name);
+
+ /// Assignment from TAO_NS_Property_Boolean
+ TAO_NS_Property_Boolean& operator= (const TAO_NS_Property_Boolean& rhs);
+
+ /// Assignment from CORBA::Boolean
+ TAO_NS_Property_Boolean& operator= (const CORBA::Boolean& rhs);
+
+ /// Equality comparison operator.
+ int operator== (const CORBA::Boolean &rhs) const;
+
+ /// Inequality comparison operator.
+ int operator!= (const CORBA::Boolean &rhs) const;
+
+ int set (const TAO_NS_PropertySeq& property_seq);
+
+ void get (CosNotification::PropertySeq& prop_seq);
+
+ /// Return the value.
+ CORBA::Boolean value (void) const;
+
+ /// Is the current value valid
+ CORBA::Boolean is_valid (void) const;
+
+protected:
+ /// The Property name.
+ ACE_CString name_;
+
+ /// The value
+ CORBA::Boolean value_;
+
+ /// Is the value valid
+ CORBA::Boolean valid_;
+};
+
+#if defined (__ACE_INLINE__)
+#include "Property_Boolean.inl"
+#endif /* __ACE_INLINE__ */
+
+#include "ace/post.h"
+#endif /* TAO_NS_PROPERTY_BOOLEAN_H */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Property_Boolean.inl b/TAO/orbsvcs/orbsvcs/Notify/Property_Boolean.inl
new file mode 100644
index 00000000000..d30cabb4e39
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Property_Boolean.inl
@@ -0,0 +1,49 @@
+// $Id$
+
+ACE_INLINE TAO_NS_Property_Boolean&
+TAO_NS_Property_Boolean::operator= (const TAO_NS_Property_Boolean& rhs)
+{
+ if (this == &rhs)
+ return *this;
+
+ if (rhs.is_valid ())
+ {
+ this->name_ = rhs.name_;
+ this->value_ = rhs.value_;
+ this->valid_ = rhs.valid_;
+ }
+
+ return *this;
+}
+
+ACE_INLINE TAO_NS_Property_Boolean&
+TAO_NS_Property_Boolean::operator= (const CORBA::Boolean& value)
+{
+ this->value_ = value;
+
+ return *this;
+}
+
+ACE_INLINE int
+TAO_NS_Property_Boolean::operator== (const CORBA::Boolean &rhs) const
+{
+ return (this->value_ == rhs);
+}
+
+ACE_INLINE int
+TAO_NS_Property_Boolean::operator!= (const CORBA::Boolean &rhs) const
+{
+ return (this->value_ != rhs);
+}
+
+ACE_INLINE CORBA::Boolean
+TAO_NS_Property_Boolean::value (void) const
+{
+ return this->value_;
+}
+
+ACE_INLINE CORBA::Boolean
+TAO_NS_Property_Boolean::is_valid (void) const
+{
+ return this->valid_;
+}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Property_T.cpp b/TAO/orbsvcs/orbsvcs/Notify/Property_T.cpp
new file mode 100644
index 00000000000..051345844c1
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Property_T.cpp
@@ -0,0 +1,119 @@
+// $Id$
+
+#ifndef TAO_NS_PROPERTY_T_CPP
+#define TAO_NS_PROPERTY_T_CPP
+
+#include "Property_T.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "Property_T.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(Notify, TAO_NS_Property_T, "$id$")
+
+#include "PropertySeq.h"
+
+/*******************************************************************************/
+
+template <class TYPE>
+TAO_NS_PropertyBase_T<TYPE>::TAO_NS_PropertyBase_T (const ACE_CString& name)
+ :name_ (name), valid_(0)
+{
+}
+
+template <class TYPE>
+TAO_NS_PropertyBase_T<TYPE>::TAO_NS_PropertyBase_T (const ACE_CString& name, const TYPE& initial)
+ :name_ (name), value_ (initial), valid_ (1)
+{
+}
+
+template <class TYPE>
+TAO_NS_PropertyBase_T<TYPE>::TAO_NS_PropertyBase_T (const TAO_NS_PropertyBase_T &rhs)
+{
+ this->name_ = rhs.name_;
+ this->value_ = rhs.value_;
+ this->valid_ = rhs.valid_;
+}
+
+template <class TYPE>
+TAO_NS_PropertyBase_T<TYPE>::~TAO_NS_PropertyBase_T ()
+{
+}
+
+template <class TYPE> void
+TAO_NS_PropertyBase_T<TYPE>::get (CosNotification::PropertySeq& prop_seq)
+{
+ /// Make space
+ prop_seq.length (prop_seq.length () + 1);
+
+ prop_seq[prop_seq.length () - 1].value <<= this->value_;
+}
+
+/*******************************************************************************/
+
+template <class TYPE>
+TAO_NS_Property_T<TYPE>::TAO_NS_Property_T (const ACE_CString& name)
+ :TAO_NS_PropertyBase_T <TYPE> (name)
+{
+}
+
+template <class TYPE>
+TAO_NS_Property_T<TYPE>::TAO_NS_Property_T (const ACE_CString& name, const TYPE& initial)
+ :TAO_NS_PropertyBase_T <TYPE> (name, initial)
+{
+}
+
+template <class TYPE> int
+TAO_NS_Property_T<TYPE>::set (const TAO_NS_PropertySeq& property_seq)
+{
+ CosNotification::PropertyValue value;
+
+ if (property_seq.find (this->name_, value) == -1)
+ {
+ this->valid_ = 0;
+ return -1;
+ }
+
+ value >>= this->value_;
+
+ this->valid_ = 1;
+
+ return 0;
+}
+
+/*******************************************************************************/
+
+template <class TYPE>
+TAO_NS_StructProperty_T<TYPE>::TAO_NS_StructProperty_T (const ACE_CString& name)
+ :name_ (name), valid_(0)
+{
+}
+
+template <class TYPE>
+TAO_NS_StructProperty_T<TYPE>::TAO_NS_StructProperty_T (const ACE_CString& name, const TYPE& initial)
+ :name_ (name), value_ (initial), valid_ (1)
+{
+}
+
+template <class TYPE> int
+TAO_NS_StructProperty_T<TYPE>::set (const TAO_NS_PropertySeq& property_seq)
+{
+ CosNotification::PropertyValue value;
+
+ if (property_seq.find (this->name_, value) == -1)
+ {
+ this->valid_ = 0;
+ return -1;
+ }
+
+ TYPE* extract_type;
+ value >>= extract_type;
+
+ this->value_ = *extract_type; // copy
+
+ this->valid_ = 1;
+
+ return 0;
+}
+
+#endif /* TAO_NS_PROPERTY_T_CPP */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Property_T.h b/TAO/orbsvcs/orbsvcs/Notify/Property_T.h
new file mode 100644
index 00000000000..5b05d472b9f
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Property_T.h
@@ -0,0 +1,161 @@
+/* -*- C++ -*- */
+/**
+ * @file Property_T.h
+ *
+ * $Id$
+ *
+ * @author Pradeep Gore <pradeep@oomworks.com>
+ *
+ *
+ */
+
+#ifndef TAO_NS_PROPERTY_T_H
+#define TAO_NS_PROPERTY_T_H
+#include "ace/pre.h"
+
+#include "notify_export.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "ace/SString.h"
+#include "orbsvcs/CosNotificationC.h"
+
+class TAO_NS_PropertySeq;
+
+/**
+ * @class TAO_NS_PropertyBase_T
+ *
+ * @brief
+ *
+ */
+template <class TYPE>
+class TAO_NS_PropertyBase_T
+{
+public:
+ /// Constuctor
+ TAO_NS_PropertyBase_T (const ACE_CString& name, const TYPE& initial);
+
+ /// Constuctor
+ TAO_NS_PropertyBase_T (const ACE_CString& name);
+
+ /// Copy Constuctor
+ TAO_NS_PropertyBase_T (const TAO_NS_PropertyBase_T &rhs);
+
+ /// Destructor
+ ~TAO_NS_PropertyBase_T ();
+
+ /// Assignment from TAO_NS_PropertyBase_T
+ TAO_NS_PropertyBase_T& operator= (const TAO_NS_PropertyBase_T& rhs);
+
+ /// Assignment from TYPE
+ TAO_NS_PropertyBase_T& operator= (const TYPE& rhs);
+
+ /// Equality comparison operator.
+ int operator== (const TYPE &rhs) const;
+
+ /// Inequality comparison operator.
+ int operator!= (const TYPE &rhs) const;
+
+ /// Populate the Property Sequence with this valid value.
+ void get (CosNotification::PropertySeq& prop_seq);
+
+ /// Return the value.
+ const TYPE& value (void) const;
+
+ /// Is the current value valid
+ CORBA::Boolean is_valid (void) const;
+
+ /// Invalidate this property's value.
+ void invalidate (void);
+
+protected:
+ /// The Property name.
+ ACE_CString name_;
+
+ /// The value
+ TYPE value_;
+
+ /// Is the value valid
+ CORBA::Boolean valid_;
+};
+
+
+/*******************************************************************************/
+/**
+ * @class TAO_NS_Property_T
+ *
+ * @brief
+ *
+ */
+template <class TYPE>
+class TAO_NS_Property_T : public TAO_NS_PropertyBase_T<TYPE>
+{
+public:
+ /// Constuctor
+ TAO_NS_Property_T (const ACE_CString& name, const TYPE& initial);
+
+ /// Constuctor
+ TAO_NS_Property_T (const ACE_CString& name);
+
+ /// Assignment from TYPE
+ TAO_NS_Property_T& operator= (const TYPE& rhs);
+
+ /// Init this Property from the sequence.
+ /// Returns 0 on success, -1 on error
+ int set (const TAO_NS_PropertySeq& property_seq);
+};
+
+/*******************************************************************************/
+/**
+ * @class TAO_NS_StructProperty_T
+ *
+ * @brief
+ *
+ */
+template <class TYPE>
+class TAO_NS_StructProperty_T
+{
+public:
+ /// Constuctor
+ TAO_NS_StructProperty_T (const ACE_CString& name, const TYPE& initial);
+
+ /// Constuctor
+ TAO_NS_StructProperty_T (const ACE_CString& name);
+
+ /// Init this Property from the sequence.
+ /// Returns 0 on success, -1 on error
+ int set (const TAO_NS_PropertySeq& property_seq);
+
+ /// Return the value.
+ const TYPE& value (void) const;
+
+ /// Is the current value valid
+ CORBA::Boolean is_valid (void) const;
+
+protected:
+ /// The Property name.
+ ACE_CString name_;
+
+ /// The value
+ TYPE value_;
+
+ /// Is the value valid
+ CORBA::Boolean valid_;
+};
+
+#if defined (__ACE_INLINE__)
+#include "Property_T.inl"
+#endif /* __ACE_INLINE__ */
+
+#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
+#include "Property_T.cpp"
+#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
+
+#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
+#pragma implementation ("Property_T.cpp")
+#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */
+
+#include "ace/post.h"
+#endif /* TAO_NS_PROPERTY_T_H */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Property_T.inl b/TAO/orbsvcs/orbsvcs/Notify/Property_T.inl
new file mode 100644
index 00000000000..3442b0fe7c4
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Property_T.inl
@@ -0,0 +1,78 @@
+// $Id$
+
+template <class TYPE> ACE_INLINE const TYPE&
+TAO_NS_PropertyBase_T<TYPE>::value (void) const
+{
+ return this->value_;
+}
+
+template <class TYPE> ACE_INLINE CORBA::Boolean
+TAO_NS_PropertyBase_T<TYPE>::is_valid (void) const
+{
+ return this->valid_;
+}
+
+template <class TYPE> ACE_INLINE int
+TAO_NS_PropertyBase_T<TYPE>::operator== (const TYPE &rhs) const
+{
+ return (this->value_ == rhs);
+}
+
+template <class TYPE> ACE_INLINE int
+TAO_NS_PropertyBase_T<TYPE>::operator!= (const TYPE &rhs) const
+{
+ return (this->value_ != rhs);
+}
+
+template <class TYPE> ACE_INLINE TAO_NS_PropertyBase_T<TYPE>&
+TAO_NS_PropertyBase_T<TYPE>::operator= (const TAO_NS_PropertyBase_T<TYPE>& rhs)
+{
+ if (this == &rhs)
+ return *this;
+
+ if (rhs.is_valid ())
+ {
+ this->name_ = rhs.name_;
+ this->value_ = rhs.value_;
+ this->valid_ = rhs.valid_;
+ }
+
+ return *this;
+}
+
+template <class TYPE> ACE_INLINE TAO_NS_PropertyBase_T<TYPE>&
+TAO_NS_PropertyBase_T<TYPE>::operator=(const TYPE& value)
+{
+ this->value_ = value;
+
+ return *this;
+}
+
+template <class TYPE> ACE_INLINE void
+TAO_NS_PropertyBase_T<TYPE>:: invalidate (void)
+{
+ this->valid_ = 0;
+}
+
+/******************************************************************************/
+
+template <class TYPE> ACE_INLINE TAO_NS_Property_T<TYPE>&
+TAO_NS_Property_T<TYPE>::operator=(const TYPE& value)
+{
+ this->TAO_NS_PropertyBase_T<TYPE>::operator= (value);
+ return *this;
+}
+
+/******************************************************************************/
+
+template <class TYPE> ACE_INLINE const TYPE&
+TAO_NS_StructProperty_T<TYPE>::value (void) const
+{
+ return this->value_;
+}
+
+template <class TYPE> ACE_INLINE CORBA::Boolean
+TAO_NS_StructProperty_T<TYPE>::is_valid (void) const
+{
+ return this->valid_;
+}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.cpp b/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.cpp
new file mode 100644
index 00000000000..90a166ec142
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.cpp
@@ -0,0 +1,80 @@
+// $Id$
+
+#ifndef TAO_NS_PROXYCONSUMER_T_CPP
+#define TAO_NS_PROXYCONSUMER_T_CPP
+
+#include "ProxyConsumer_T.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "ProxyConsumer_T.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(Notify, TAO_NS_ProxyConsumer_T, "$id$")
+
+#include "Event_Manager.h"
+
+template <class SERVANT_TYPE>
+TAO_NS_ProxyConsumer_T<SERVANT_TYPE>::TAO_NS_ProxyConsumer_T (void)
+{
+}
+
+template <class SERVANT_TYPE>
+TAO_NS_ProxyConsumer_T<SERVANT_TYPE>::~TAO_NS_ProxyConsumer_T ()
+{
+}
+
+template <class SERVANT_TYPE> void
+TAO_NS_ProxyConsumer_T<SERVANT_TYPE>::admin_types_changed (const CosNotification::EventTypeSeq & added,
+ const CosNotification::EventTypeSeq & removed
+ ACE_ENV_ARG_DECL)
+{
+ this->offer_change (added, removed ACE_ENV_ARG_PARAMETER);
+}
+
+template <class SERVANT_TYPE> CosNotifyChannelAdmin::SupplierAdmin_ptr
+TAO_NS_ProxyConsumer_T<SERVANT_TYPE>::MyAdmin (ACE_ENV_SINGLE_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ))
+{
+ CosNotifyChannelAdmin::SupplierAdmin_var ret;
+
+ CORBA::Object_var object = this->parent_->ref (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK_RETURN (ret._retn ());
+
+ ret = CosNotifyChannelAdmin::SupplierAdmin::_narrow (object.in () ACE_ENV_ARG_PARAMETER);
+
+ return ret._retn ();
+}
+
+template <class SERVANT_TYPE> void
+TAO_NS_ProxyConsumer_T<SERVANT_TYPE>::offer_change (const CosNotification::EventTypeSeq & added, const CosNotification::EventTypeSeq & removed ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException,
+ CosNotifyComm::InvalidEventType
+ ))
+{
+ TAO_NS_EventTypeSeq seq_added (added);
+ TAO_NS_EventTypeSeq seq_removed (removed);
+
+ {
+ ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, ace_mon, this->lock_,
+ CORBA::INTERNAL ());
+ ACE_CHECK;
+
+ this->subscribed_types_.init (seq_added, seq_removed);
+ }
+
+ this->event_manager_->offer_change (this, seq_added, seq_removed ACE_ENV_ARG_PARAMETER);
+}
+
+template <class SERVANT_TYPE> CosNotification::EventTypeSeq*
+TAO_NS_ProxyConsumer_T<SERVANT_TYPE>::obtain_subscription_types (CosNotifyChannelAdmin::ObtainInfoMode mode ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ))
+{
+ return this->obtain_types (mode, this->event_manager_->subscription_types () ACE_ENV_ARG_PARAMETER);
+}
+
+#endif /* TAO_NS_PROXYCONSUMER_T_CPP */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.h b/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.h
new file mode 100644
index 00000000000..6ccc7dcec1d
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.h
@@ -0,0 +1,84 @@
+/* -*- C++ -*- */
+/**
+ * @file ProxyConsumer_T.h
+ *
+ * $Id$
+ *
+ * @author Pradeep Gore <pradeep@oomworks.com>
+ *
+ *
+ */
+
+#ifndef TAO_NS_PROXYCONSUMER_T_H
+#define TAO_NS_PROXYCONSUMER_T_H
+#include "ace/pre.h"
+
+#include "notify_export.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "Proxy_T.h"
+#include "ProxyConsumer.h"
+
+/**
+ * @class TAO_NS_ProxyConsumer_T
+ *
+ * @brief
+ *
+ */
+template <class SERVANT_TYPE>
+class TAO_Notify_Export TAO_NS_ProxyConsumer_T : public virtual TAO_NS_Proxy_T <SERVANT_TYPE>, public virtual TAO_NS_ProxyConsumer
+{
+public:
+ /// Constuctor
+ TAO_NS_ProxyConsumer_T (void);
+
+ /// Destructor
+ ~TAO_NS_ProxyConsumer_T ();
+
+ /// Notification of subscriptions set at the admin.
+ virtual void admin_types_changed (const CosNotification::EventTypeSeq & added,
+ const CosNotification::EventTypeSeq & removed
+ ACE_ENV_ARG_DECL);
+
+ virtual CosNotifyChannelAdmin::SupplierAdmin_ptr MyAdmin (ACE_ENV_SINGLE_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ));
+
+ virtual CosNotification::EventTypeSeq * obtain_subscription_types (
+ CosNotifyChannelAdmin::ObtainInfoMode mode
+ ACE_ENV_ARG_DECL
+ )
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ));
+
+ virtual void offer_change (
+ const CosNotification::EventTypeSeq & added,
+ const CosNotification::EventTypeSeq & removed
+ ACE_ENV_ARG_DECL
+ )
+ ACE_THROW_SPEC ((
+ CORBA::SystemException,
+ CosNotifyComm::InvalidEventType
+ ));
+
+};
+
+#if defined (__ACE_INLINE__)
+#include "ProxyConsumer_T.inl"
+#endif /* __ACE_INLINE__ */
+
+#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
+#include "ProxyConsumer_T.cpp"
+#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
+
+#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
+#pragma implementation ("ProxyConsumer_T.cpp")
+#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */
+
+#include "ace/post.h"
+#endif /* TAO_NS_PROXYCONSUMER_T_H */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.inl b/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.inl
new file mode 100644
index 00000000000..cfa1da318d3
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.inl
@@ -0,0 +1 @@
+// $Id$
diff --git a/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.cpp b/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.cpp
new file mode 100644
index 00000000000..7db52521e8b
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.cpp
@@ -0,0 +1,222 @@
+// $Id$
+
+#ifndef TAO_NS_PROXYSUPPLIER_T_C
+#define TAO_NS_PROXYSUPPLIER_T_C
+
+#include "ProxySupplier_T.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "ProxySupplier_T.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(Notify, TAO_NS_ProxySupplier_T, "$id$")
+
+#include "Consumer.h"
+#include "Structured/StructuredEvent.h"
+#include "Any/AnyEvent.h"
+
+#include "Method_Request_Dispatch.h"
+#include "Method_Request_Dispatch_No_Filtering.h"
+#include "Worker_Task.h"
+#include "Event_Manager.h"
+
+template <class SERVANT_TYPE>
+TAO_NS_ProxySupplier_T<SERVANT_TYPE>::TAO_NS_ProxySupplier_T (void)
+ :is_suspended_ (0)
+{
+}
+
+template <class SERVANT_TYPE>
+TAO_NS_ProxySupplier_T<SERVANT_TYPE>::~TAO_NS_ProxySupplier_T ()
+{
+}
+
+template <class SERVANT_TYPE> void
+TAO_NS_ProxySupplier_T<SERVANT_TYPE>::admin_types_changed (const CosNotification::EventTypeSeq & added,
+ const CosNotification::EventTypeSeq & removed
+ ACE_ENV_ARG_DECL)
+{
+ this->subscription_change (added, removed ACE_ENV_ARG_PARAMETER);
+}
+
+template <class SERVANT_TYPE> void
+TAO_NS_ProxySupplier_T<SERVANT_TYPE>::forward_structured (const CosNotification::StructuredEvent& notification ACE_ENV_ARG_DECL_NOT_USED)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ))
+{
+ TAO_NS_Event_var event (new TAO_NS_StructuredEvent (notification));
+
+ TAO_NS_Method_Request_Dispatch request (event, this);
+
+ this->worker_task ()->exec (request);
+}
+
+template <class SERVANT_TYPE> void
+TAO_NS_ProxySupplier_T<SERVANT_TYPE>::forward_structured_no_filtering (const CosNotification::StructuredEvent& notification ACE_ENV_ARG_DECL_NOT_USED)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ))
+{
+ TAO_NS_Event_var event (new TAO_NS_StructuredEvent (notification));
+
+ TAO_NS_Method_Request_Dispatch_No_Filtering request (event, this);
+
+ this->worker_task ()->exec (request);
+}
+
+template <class SERVANT_TYPE> void
+TAO_NS_ProxySupplier_T<SERVANT_TYPE>::forward_any (const CORBA::Any & data ACE_ENV_ARG_DECL_NOT_USED)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ))
+{
+ TAO_NS_Event_var event (new TAO_NS_AnyEvent (data));
+
+ TAO_NS_Method_Request_Dispatch request (event, this);
+
+ this->worker_task ()->exec (request);
+}
+
+template <class SERVANT_TYPE> void
+TAO_NS_ProxySupplier_T<SERVANT_TYPE>::forward_any_no_filtering (const CORBA::Any& data ACE_ENV_ARG_DECL_NOT_USED)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ))
+{
+ TAO_NS_Event_var event (new TAO_NS_AnyEvent (data));
+
+ TAO_NS_Method_Request_Dispatch_No_Filtering request (event, this);
+
+ this->worker_task ()->exec (request);
+}
+
+template <class SERVANT_TYPE> CosNotification::EventTypeSeq*
+TAO_NS_ProxySupplier_T<SERVANT_TYPE>::obtain_offered_types (CosNotifyChannelAdmin::ObtainInfoMode mode ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ))
+{
+ return this->obtain_types (mode, this->event_manager_->offered_types () ACE_ENV_ARG_PARAMETER);
+}
+
+template <class SERVANT_TYPE> void
+TAO_NS_ProxySupplier_T<SERVANT_TYPE>::subscription_change (const CosNotification::EventTypeSeq & added,
+ const CosNotification::EventTypeSeq & removed
+ ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException,
+ CosNotifyComm::InvalidEventType))
+{
+ TAO_NS_EventTypeSeq seq_added (added);
+ TAO_NS_EventTypeSeq seq_removed (removed);
+
+ {
+ ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, ace_mon, this->lock_,
+ CORBA::INTERNAL ());
+ ACE_CHECK;
+
+ this->subscribed_types_.init (seq_added, seq_removed);
+ }
+
+ this->event_manager_->subscription_change (this, seq_added, seq_removed ACE_ENV_ARG_PARAMETER);
+}
+
+template <class SERVANT_TYPE> void
+TAO_NS_ProxySupplier_T<SERVANT_TYPE>::suspend_connection (ACE_ENV_SINGLE_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException,
+ CosNotifyChannelAdmin::ConnectionAlreadyInactive,
+ CosNotifyChannelAdmin::NotConnected
+ ))
+{
+ {
+ ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, ace_mon, this->lock_, CORBA::INTERNAL ());
+
+ if (this->is_connected () == 0)
+ ACE_THROW (CosNotifyChannelAdmin::NotConnected ());
+
+ if (this->consumer_->is_suspended () == 1)
+ ACE_THROW (CosNotifyChannelAdmin::ConnectionAlreadyInactive ());
+ }
+
+ this->consumer_->suspend (ACE_ENV_SINGLE_ARG_PARAMETER);
+}
+
+template <class SERVANT_TYPE> void
+TAO_NS_ProxySupplier_T<SERVANT_TYPE>::resume_connection (ACE_ENV_SINGLE_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException,
+ CosNotifyChannelAdmin::ConnectionAlreadyActive,
+ CosNotifyChannelAdmin::NotConnected
+ ))
+{
+ {
+ ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, ace_mon, this->lock_, CORBA::INTERNAL ());
+
+ if (this->is_connected () == 0)
+ ACE_THROW (CosNotifyChannelAdmin::NotConnected ());
+
+ if (this->consumer_->is_suspended () == 0)
+ ACE_THROW (CosNotifyChannelAdmin::ConnectionAlreadyActive ());
+ }
+
+ this->consumer_->resume (ACE_ENV_SINGLE_ARG_PARAMETER);
+}
+
+template <class SERVANT_TYPE> CosNotifyChannelAdmin::ConsumerAdmin_ptr
+TAO_NS_ProxySupplier_T<SERVANT_TYPE>::MyAdmin (ACE_ENV_SINGLE_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ))
+{
+ CosNotifyChannelAdmin::ConsumerAdmin_var ret;
+
+ CORBA::Object_var object = this->parent_->ref (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK_RETURN (ret._retn ());
+
+ ret = CosNotifyChannelAdmin::ConsumerAdmin::_narrow (object.in () ACE_ENV_ARG_PARAMETER);
+
+ return ret._retn ();
+}
+
+/***************************** UNIMPLEMENTED METHODS***************************************/
+
+template <class SERVANT_TYPE> CosNotifyFilter::MappingFilter_ptr
+TAO_NS_ProxySupplier_T<SERVANT_TYPE>::priority_filter (ACE_ENV_SINGLE_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ))
+{
+ ACE_THROW_RETURN (CORBA::NO_IMPLEMENT (),
+ CosNotifyFilter::MappingFilter::_nil ());
+}
+
+template <class SERVANT_TYPE> void
+TAO_NS_ProxySupplier_T<SERVANT_TYPE>::priority_filter (CosNotifyFilter::MappingFilter_ptr /*priority_filter*/ ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ))
+{
+ ACE_THROW (CORBA::NO_IMPLEMENT ());
+}
+
+template <class SERVANT_TYPE> CosNotifyFilter::MappingFilter_ptr
+TAO_NS_ProxySupplier_T<SERVANT_TYPE>::lifetime_filter (ACE_ENV_SINGLE_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ))
+{
+ ACE_THROW_RETURN (CORBA::NO_IMPLEMENT (),
+ CosNotifyFilter::MappingFilter::_nil ());
+}
+
+template <class SERVANT_TYPE> void
+TAO_NS_ProxySupplier_T<SERVANT_TYPE>::lifetime_filter (CosNotifyFilter::MappingFilter_ptr /*lifetime_filter*/ ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ))
+{
+ ACE_THROW (CORBA::NO_IMPLEMENT ());
+}
+
+#endif /* #define TAO_NS_PROXYSUPPLIER_T_C */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.h b/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.h
new file mode 100644
index 00000000000..c0e8d2054fb
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.h
@@ -0,0 +1,162 @@
+/* -*- C++ -*- */
+/**
+ * @file ProxySupplier_T.h
+ *
+ * $Id$
+ *
+ * @author Pradeep Gore <pradeep@oomworks.com>
+ *
+ *
+ */
+
+#ifndef TAO_NS_PROXYSUPPLIER_T_H
+#define TAO_NS_PROXYSUPPLIER_T_H
+#include "ace/pre.h"
+
+#include "notify_export.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "Proxy_T.h"
+#include "ProxySupplier.h"
+
+/**
+ * @class TAO_NS_ProxySupplier_T
+ *
+ * @brief
+ *
+ */
+template <class SERVANT_TYPE>
+class TAO_NS_ProxySupplier_T : public virtual TAO_NS_Proxy_T <SERVANT_TYPE>, public virtual TAO_NS_ProxySupplier
+{
+public:
+ /// Constuctor
+ TAO_NS_ProxySupplier_T (void);
+
+ /// Destructor
+ ~TAO_NS_ProxySupplier_T ();
+
+ /// Notification of subscriptions set at the admin.
+ virtual void admin_types_changed (const CosNotification::EventTypeSeq & added,
+ const CosNotification::EventTypeSeq & removed
+ ACE_ENV_ARG_DECL);
+
+ ///= POA_Notify_Internal methods
+ /// POA_Notify_Internal::Event_Forwarder method
+ virtual void forward_structured (const CosNotification::StructuredEvent & event ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ));
+
+ /// POA_Notify_Internal::Event_Forwarder method
+ virtual void forward_structured_no_filtering (const CosNotification::StructuredEvent & event ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ));
+
+ /// POA_Notify_Internal::Event_Forwarder method
+ virtual void forward_any (const CORBA::Any & event ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ));
+
+ /// POA_Notify_Internal::Event_Forwarder method
+ virtual void forward_any_no_filtering (const CORBA::Any & event ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ));
+protected:
+ //= Data Members
+ CORBA::Boolean is_suspended_;
+
+ // = Interface methods
+ virtual CosNotifyChannelAdmin::ConsumerAdmin_ptr MyAdmin (
+ ACE_ENV_SINGLE_ARG_DECL
+ )
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ));
+
+ virtual void suspend_connection (
+ ACE_ENV_SINGLE_ARG_DECL
+ )
+ ACE_THROW_SPEC ((
+ CORBA::SystemException,
+ CosNotifyChannelAdmin::ConnectionAlreadyInactive,
+ CosNotifyChannelAdmin::NotConnected
+ ));
+
+ virtual void resume_connection (
+ ACE_ENV_SINGLE_ARG_DECL
+ )
+ ACE_THROW_SPEC ((
+ CORBA::SystemException,
+ CosNotifyChannelAdmin::ConnectionAlreadyActive,
+ CosNotifyChannelAdmin::NotConnected
+ ));
+
+ virtual CosNotifyFilter::MappingFilter_ptr priority_filter (
+ ACE_ENV_SINGLE_ARG_DECL
+ )
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ));
+
+ virtual void priority_filter (
+ CosNotifyFilter::MappingFilter_ptr priority_filter
+ ACE_ENV_ARG_DECL
+ )
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ));
+
+ virtual CosNotifyFilter::MappingFilter_ptr lifetime_filter (
+ ACE_ENV_SINGLE_ARG_DECL
+ )
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ));
+
+ virtual void lifetime_filter (
+ CosNotifyFilter::MappingFilter_ptr lifetime_filter
+ ACE_ENV_ARG_DECL
+ )
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ));
+
+ virtual CosNotification::EventTypeSeq * obtain_offered_types (
+ CosNotifyChannelAdmin::ObtainInfoMode mode
+ ACE_ENV_ARG_DECL
+ )
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ));
+
+ virtual void subscription_change (
+ const CosNotification::EventTypeSeq & added,
+ const CosNotification::EventTypeSeq & removed
+ ACE_ENV_ARG_DECL
+ )
+ ACE_THROW_SPEC ((
+ CORBA::SystemException,
+ CosNotifyComm::InvalidEventType
+ ));
+};
+
+#if defined (__ACE_INLINE__)
+#include "ProxySupplier_T.inl"
+#endif /* __ACE_INLINE__ */
+
+#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
+#include "ProxySupplier_T.cpp"
+#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
+
+#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
+#pragma implementation ("ProxySupplier_T.cpp")
+#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */
+
+#include "ace/post.h"
+#endif /* TAO_NS_PROXYSUPPLIER_T_H */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.inl b/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.inl
new file mode 100644
index 00000000000..cfa1da318d3
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.inl
@@ -0,0 +1 @@
+// $Id$
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Proxy_T.cpp b/TAO/orbsvcs/orbsvcs/Notify/Proxy_T.cpp
new file mode 100644
index 00000000000..06291c3cf14
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Proxy_T.cpp
@@ -0,0 +1,135 @@
+// $Id$
+
+#include "Proxy_T.h"
+
+#ifndef TAO_NS_PROXY_T_CPP
+#define TAO_NS_PROXY_T_CPP
+
+#if ! defined (__ACE_INLINE__)
+#include "Proxy_T.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(Notify, TAO_NS_Proxy_T, "$id$")
+
+template <class SERVANT_TYPE>
+TAO_NS_Proxy_T<SERVANT_TYPE>::TAO_NS_Proxy_T (void)
+{
+}
+
+template <class SERVANT_TYPE>
+TAO_NS_Proxy_T<SERVANT_TYPE>::~TAO_NS_Proxy_T ()
+{
+}
+
+template <class SERVANT_TYPE> PortableServer::Servant
+TAO_NS_Proxy_T<SERVANT_TYPE>::servant (void)
+{
+ return this;
+}
+
+template <class SERVANT_TYPE> void
+TAO_NS_Proxy_T<SERVANT_TYPE>::_add_ref (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
+{
+ this->_incr_refcnt ();
+}
+
+template <class SERVANT_TYPE> void
+TAO_NS_Proxy_T<SERVANT_TYPE>::_remove_ref (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
+{
+ this->_decr_refcnt ();
+}
+
+template <class SERVANT_TYPE> void
+TAO_NS_Proxy_T<SERVANT_TYPE>::validate_event_qos (const CosNotification::QoSProperties & /*required_qos*/, CosNotification::NamedPropertyRangeSeq_out /*available_qos*/ ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException,
+ CosNotification::UnsupportedQoS
+ ))
+{
+ ACE_THROW (CORBA::NO_IMPLEMENT ());
+}
+
+template <class SERVANT_TYPE> CosNotification::QoSProperties*
+TAO_NS_Proxy_T<SERVANT_TYPE>::get_qos (ACE_ENV_SINGLE_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ))
+{
+ return this->TAO_NS_Object::get_qos (ACE_ENV_SINGLE_ARG_PARAMETER);
+}
+
+template <class SERVANT_TYPE> void
+TAO_NS_Proxy_T<SERVANT_TYPE>::set_qos (const CosNotification::QoSProperties & qos ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException,
+ CosNotification::UnsupportedQoS
+ ))
+{
+ this->TAO_NS_Object::set_qos (qos ACE_ENV_ARG_PARAMETER);
+}
+
+template <class SERVANT_TYPE> void
+TAO_NS_Proxy_T<SERVANT_TYPE>::validate_qos (
+ const CosNotification::QoSProperties & /*required_qos*/,
+ CosNotification::NamedPropertyRangeSeq_out /*available_qos*/
+ ACE_ENV_ARG_DECL
+ )
+ ACE_THROW_SPEC ((
+ CORBA::SystemException,
+ CosNotification::UnsupportedQoS
+ ))
+{
+ ACE_THROW (CORBA::NO_IMPLEMENT ());
+}
+
+template <class SERVANT_TYPE> CosNotifyFilter::FilterID
+TAO_NS_Proxy_T<SERVANT_TYPE>::add_filter (CosNotifyFilter::Filter_ptr new_filter ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ))
+{
+ return this->filter_admin_.add_filter (new_filter ACE_ENV_ARG_PARAMETER);
+}
+
+template <class SERVANT_TYPE> void
+TAO_NS_Proxy_T<SERVANT_TYPE>::remove_filter (
+ CosNotifyFilter::FilterID filter
+ ACE_ENV_ARG_DECL
+ )
+ ACE_THROW_SPEC ((
+ CORBA::SystemException,
+ CosNotifyFilter::FilterNotFound
+ ))
+{
+ this->filter_admin_.remove_filter (filter ACE_ENV_ARG_PARAMETER);
+}
+
+template <class SERVANT_TYPE> CosNotifyFilter::Filter_ptr
+TAO_NS_Proxy_T<SERVANT_TYPE>::get_filter (CosNotifyFilter::FilterID filter ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException,
+ CosNotifyFilter::FilterNotFound
+ ))
+{
+ return this->filter_admin_.get_filter (filter ACE_ENV_ARG_PARAMETER);
+}
+
+template <class SERVANT_TYPE> CosNotifyFilter::FilterIDSeq*
+TAO_NS_Proxy_T<SERVANT_TYPE>::get_all_filters (ACE_ENV_SINGLE_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ))
+{
+ return this->filter_admin_.get_all_filters (ACE_ENV_SINGLE_ARG_PARAMETER);
+}
+
+template <class SERVANT_TYPE> void
+TAO_NS_Proxy_T<SERVANT_TYPE>::remove_all_filters (ACE_ENV_SINGLE_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ))
+{
+ this->filter_admin_.remove_all_filters (ACE_ENV_SINGLE_ARG_PARAMETER);
+}
+
+#endif /* TAO_NS_PROXY_T_CPP */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Proxy_T.h b/TAO/orbsvcs/orbsvcs/Notify/Proxy_T.h
new file mode 100644
index 00000000000..090d4edd49a
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Proxy_T.h
@@ -0,0 +1,158 @@
+/* -*- C++ -*- */
+/**
+ * @file Proxy_T.h
+ *
+ * $Id$
+ *
+ * @author Pradeep Gore <pradeep@oomworks.com>
+ *
+ *
+ */
+
+#ifndef TAO_NS_PROXY_T_H
+#define TAO_NS_PROXY_T_H
+#include "ace/pre.h"
+
+#include "notify_export.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "Proxy.h"
+
+/**
+ * @class TAO_NS_Proxy_T
+ *
+ * @brief The is a base class for all proxys , templatized by the servant
+ * type. All the Filter Admin and QoS Admin interface methods are
+ * implemented here by delegating to the admin implementations.
+ *
+ */
+template <class SERVANT_TYPE>
+class TAO_NS_Proxy_T : public SERVANT_TYPE, public virtual TAO_NS_Proxy
+{
+public:
+ /// Constuctor
+ TAO_NS_Proxy_T (void);
+
+ /// Destructor
+ ~TAO_NS_Proxy_T ();
+
+ /// Implements TAO_NS_Object::servant method.
+ virtual PortableServer::Servant servant (void);
+
+ /// ServantBase refcount methods.
+ virtual void _add_ref (ACE_ENV_SINGLE_ARG_DECL);
+ virtual void _remove_ref (ACE_ENV_SINGLE_ARG_DECL);
+
+ virtual void validate_event_qos (
+ const CosNotification::QoSProperties & required_qos,
+ CosNotification::NamedPropertyRangeSeq_out available_qos
+ ACE_ENV_ARG_DECL
+ )
+ ACE_THROW_SPEC ((
+ CORBA::SystemException,
+ CosNotification::UnsupportedQoS
+ ));
+
+ virtual CosNotification::QoSProperties * get_qos (
+ ACE_ENV_SINGLE_ARG_DECL
+ )
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ));
+
+ virtual void set_qos (
+ const CosNotification::QoSProperties & qos
+ ACE_ENV_ARG_DECL
+ )
+ ACE_THROW_SPEC ((
+ CORBA::SystemException,
+ CosNotification::UnsupportedQoS
+ ));
+
+ virtual void validate_qos (
+ const CosNotification::QoSProperties & required_qos,
+ CosNotification::NamedPropertyRangeSeq_out available_qos
+ ACE_ENV_ARG_DECL
+ )
+ ACE_THROW_SPEC ((
+ CORBA::SystemException,
+ CosNotification::UnsupportedQoS
+ ));
+
+ virtual CosNotifyFilter::FilterID add_filter (
+ CosNotifyFilter::Filter_ptr new_filter
+ ACE_ENV_ARG_DECL
+ )
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ));
+
+ virtual void remove_filter (
+ CosNotifyFilter::FilterID filter
+ ACE_ENV_ARG_DECL
+ )
+ ACE_THROW_SPEC ((
+ CORBA::SystemException,
+ CosNotifyFilter::FilterNotFound
+ ));
+
+ virtual CosNotifyFilter::Filter_ptr get_filter (
+ CosNotifyFilter::FilterID filter
+ ACE_ENV_ARG_DECL
+ )
+ ACE_THROW_SPEC ((
+ CORBA::SystemException,
+ CosNotifyFilter::FilterNotFound
+ ));
+
+ virtual CosNotifyFilter::FilterIDSeq * get_all_filters (
+ ACE_ENV_SINGLE_ARG_DECL
+ )
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ));
+
+ virtual void remove_all_filters (
+ ACE_ENV_SINGLE_ARG_DECL
+ )
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ));
+};
+
+#if defined ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT
+template class TAO_Notify_Export
+TAO_NS_Proxy_T<POA_Event_Forwarder::StructuredProxyPushSupplier>;
+template class TAO_Notify_Export
+TAO_NS_Proxy_T<POA_CosNotifyChannelAdmin::SequenceProxyPushSupplier>;
+template class TAO_Notify_Export
+TAO_NS_Proxy_T<POA_Event_Forwarder::ProxyPushSupplier>;
+template class TAO_Notify_Export
+TAO_NS_Proxy_T<POA_CosEventChannelAdmin::ProxyPushSupplier>;
+template class TAO_Notify_Export
+TAO_NS_Proxy_T<POA_CosNotifyChannelAdmin::StructuredProxyPushConsumer>;
+template class TAO_Notify_Export
+TAO_NS_Proxy_T<POA_CosNotifyChannelAdmin::SequenceProxyPushConsumer>;
+template class TAO_Notify_Export
+TAO_NS_Proxy_T<POA_CosNotifyChannelAdmin::ProxyPushConsumer>;
+template class TAO_Notify_Export
+TAO_NS_Proxy_T<POA_CosEventChannelAdmin::ProxyPushConsumer>;
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT */
+
+#if defined (__ACE_INLINE__)
+#include "Proxy_T.inl"
+#endif /* __ACE_INLINE__ */
+
+#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
+#include "Proxy_T.cpp"
+#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
+
+#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
+#pragma implementation ("Proxy_T.cpp")
+#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */
+
+#include "ace/post.h"
+#endif /* TAO_NS_PROXY_T_H */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Proxy_T.inl b/TAO/orbsvcs/orbsvcs/Notify/Proxy_T.inl
new file mode 100644
index 00000000000..cfa1da318d3
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Proxy_T.inl
@@ -0,0 +1 @@
+// $Id$
diff --git a/TAO/orbsvcs/orbsvcs/Notify/QoSProperties.cpp b/TAO/orbsvcs/orbsvcs/Notify/QoSProperties.cpp
new file mode 100644
index 00000000000..2a8672aa1f6
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/QoSProperties.cpp
@@ -0,0 +1,126 @@
+// $Id$
+
+#include "QoSProperties.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "QoSProperties.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(Notify, TAO_NS_QoSProperties, "$id$")
+
+#include "Property.h"
+
+TAO_NS_QoSProperties::TAO_NS_QoSProperties (void)
+ :priority_ (CosNotification::Priority),
+ timeout_ (CosNotification::Timeout),
+ stop_time_supported_ (CosNotification::StopTimeSupported),
+ maximum_batch_size_ (CosNotification::MaximumBatchSize),
+ pacing_interval_ (CosNotification::PacingInterval),
+ thread_pool_ (NotifyExt::ThreadPool),
+ thread_pool_lane_ (NotifyExt::ThreadPoolLanes)
+{
+ unsupported_[0] = CosNotification::EventReliability;
+ unsupported_[1] = CosNotification::ConnectionReliability;
+ unsupported_[2] = CosNotification::StartTimeSupported;
+}
+
+TAO_NS_QoSProperties::~TAO_NS_QoSProperties ()
+{
+}
+
+int
+TAO_NS_QoSProperties::unsupported (ACE_CString& name)
+{
+ for (int i = 0; i < UNSUPPORTED_PROPERTY_COUNT; ++i)
+ {
+ if (this->unsupported_[i] == name)
+ return 1;
+ }
+
+ return 0;
+}
+
+int
+TAO_NS_QoSProperties::init (const CosNotification::PropertySeq& prop_seq, CosNotification::PropertyErrorSeq& err_seq)
+{
+ int err_index = -1;
+
+ ACE_CString name;
+ for (CORBA::ULong i = 0; i < prop_seq.length (); ++i)
+ {
+ name = prop_seq[i].name.in();
+
+ if (this->unsupported (name))
+ {
+ err_index = err_seq.length ();
+ err_seq.length (err_seq.length () + 1);
+
+ err_seq[err_index].code = CosNotification::UNSUPPORTED_PROPERTY;
+ err_seq[err_index].name = CORBA::string_dup (prop_seq[i].name);
+ }
+ else if (this->property_map_.rebind (prop_seq[i].name.in (), prop_seq[i].value) == -1)
+ return -1;
+ // Note call to rebind. This allows to call <init> to set updates.
+ }
+
+ // Now, init the supported properties
+ this->priority_.set (*this);
+ this->timeout_.set (*this);
+ this->stop_time_supported_.set (*this);
+ this->maximum_batch_size_.set (*this);
+ this->pacing_interval_.set (*this);
+ this->thread_pool_.set (*this);
+ this->thread_pool_lane_.set (*this);
+
+ return err_index == -1 ? 0 : 1;
+}
+
+void
+TAO_NS_QoSProperties::transfer (TAO_NS_QoSProperties& qos_properties)
+{
+ qos_properties.priority_ = this->priority_;
+ qos_properties.timeout_ = this->timeout_;
+ qos_properties.stop_time_supported_ = this->stop_time_supported_;
+ qos_properties.maximum_batch_size_ = this->maximum_batch_size_;
+ qos_properties.pacing_interval_ = this->pacing_interval_;
+
+ PROPERTY_MAP::ITERATOR iter (this->property_map_);
+ PROPERTY_MAP::ENTRY *entry;
+
+ for (; iter.next (entry); iter.advance ())
+ {
+ qos_properties.property_map_.bind (entry->ext_id_, entry->int_id_);
+ }
+
+ // unbind the properties that we don't want to transfer.
+ qos_properties.property_map_.unbind (NotifyExt::ThreadPool);
+ qos_properties.property_map_.unbind (NotifyExt::ThreadPoolLanes);
+}
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+
+template class TAO_NS_PropertyBase_T<CORBA::Long>;
+template class TAO_NS_PropertyBase_T<CORBA::Short>;
+template class TAO_NS_PropertyBase_T<TimeBase::TimeT>;
+
+template class TAO_NS_Property_T<CORBA::Long>;
+template class TAO_NS_Property_T<CORBA::Short>;
+template class TAO_NS_Property_T<TimeBase::TimeT>;
+
+template class TAO_NS_StructProperty_T<NotifyExt::ThreadPoolParams>;
+template class TAO_NS_StructProperty_T<NotifyExt::ThreadPoolLanesParams>;
+
+#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+
+#pragma instantiate TAO_NS_PropertyBase_T<CORBA::Long>
+#pragma instantiate TAO_NS_PropertyBase_T<CORBA::Short>
+#pragma instantiate TAO_NS_PropertyBase_T<TimeBase::TimeT>
+
+#pragma instantiate TAO_NS_Property_T<CORBA::Long>
+#pragma instantiate TAO_NS_Property_T<CORBA::Short>
+#pragma instantiate TAO_NS_Property_T<TimeBase::TimeT>
+
+#pragma instantiate TAO_NS_StructProperty_T<NotifyExt::ThreadPoolParams>
+#pragma instantiate TAO_NS_StructProperty_T<NotifyExt::ThreadPoolLanesParams>
+
+#endif /*ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/QoSProperties.h b/TAO/orbsvcs/orbsvcs/Notify/QoSProperties.h
new file mode 100644
index 00000000000..08911f86ef4
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/QoSProperties.h
@@ -0,0 +1,99 @@
+/* -*- C++ -*- */
+/**
+ * @file QoSProperties.h
+ *
+ * $Id$
+ *
+ * @author Pradeep Gore <pradeep@oomworks.com>
+ *
+ *
+ */
+
+#ifndef TAO_NS_QOSPROPERTIES_H
+#define TAO_NS_QOSPROPERTIES_H
+#include "ace/pre.h"
+
+#include "notify_export.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "PropertySeq.h"
+#include "Property_T.h"
+#include "Property_Boolean.h"
+#include "Property.h"
+
+/**
+ * @class TAO_NS_QoSProperties
+ *
+ * @brief
+ *
+ */
+class TAO_Notify_Export TAO_NS_QoSProperties : public TAO_NS_PropertySeq
+{
+public:
+ /// Constuctor
+ TAO_NS_QoSProperties (void);
+
+ /// Destructor
+ ~TAO_NS_QoSProperties ();
+
+ /// Return 0 on success, 1 if unsupported properties were detected and -1 on error.
+ int init (const CosNotification::PropertySeq& prop_seq, CosNotification::PropertyErrorSeq& err_seq);
+
+ /// Populate <qos_properties> with properties that can be transfered.
+ void transfer (TAO_NS_QoSProperties& qos_properties);
+
+ ///= Accessors
+ /// ThreadPool
+ const TAO_NS_Property_ThreadPool& thread_pool (void) const;
+
+ /// ThreadPoolLane
+ const TAO_NS_Property_ThreadPoolLanes& thread_pool_lane (void) const;
+
+ /// Maximum Batch Size
+ const TAO_NS_Property_Long& maximum_batch_size (void) const;
+
+ /// Pacing Interval
+ const TAO_NS_Property_Time& pacing_interval (void) const;
+
+protected:
+ /// Return 1 if <value> is unsupported.
+ int unsupported (ACE_CString& name);
+
+ enum {UNSUPPORTED_PROPERTY_COUNT = 3};
+
+ ///= Unsupported Properties.
+ ACE_CString unsupported_[UNSUPPORTED_PROPERTY_COUNT];
+
+ ///= Supported properties
+
+ /// Priority
+ TAO_NS_Property_Short priority_;
+
+ /// Timeout
+ TAO_NS_Property_Time timeout_;
+
+ /// Stop Time Supported
+ TAO_NS_Property_Boolean stop_time_supported_;
+
+ /// Maximum Batch Size
+ TAO_NS_Property_Long maximum_batch_size_;
+
+ /// Pacing Interval
+ TAO_NS_Property_Time pacing_interval_;
+
+ /// ThreadPool Params.
+ TAO_NS_Property_ThreadPool thread_pool_;
+
+ /// ThreadPoolLane Params.
+ TAO_NS_Property_ThreadPoolLanes thread_pool_lane_;
+};
+
+#if defined (__ACE_INLINE__)
+#include "QoSProperties.inl"
+#endif /* __ACE_INLINE__ */
+
+#include "ace/post.h"
+#endif /* TAO_NS_QOSPROPERTIES_H */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/QoSProperties.inl b/TAO/orbsvcs/orbsvcs/Notify/QoSProperties.inl
new file mode 100644
index 00000000000..e331e8b1046
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/QoSProperties.inl
@@ -0,0 +1,25 @@
+// $Id$
+
+ACE_INLINE const TAO_NS_Property_ThreadPool&
+TAO_NS_QoSProperties::thread_pool (void) const
+{
+ return this->thread_pool_;
+}
+
+ACE_INLINE const TAO_NS_Property_ThreadPoolLanes&
+TAO_NS_QoSProperties::thread_pool_lane (void) const
+{
+ return this->thread_pool_lane_;
+}
+
+ACE_INLINE const TAO_NS_Property_Long&
+TAO_NS_QoSProperties::maximum_batch_size (void) const
+{
+ return this->maximum_batch_size_;
+}
+
+ACE_INLINE const TAO_NS_Property_Time&
+TAO_NS_QoSProperties::pacing_interval (void) const
+{
+ return this->pacing_interval_;
+}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.cpp b/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.cpp
new file mode 100644
index 00000000000..1c819aa54ba
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.cpp
@@ -0,0 +1,92 @@
+// $Id$
+
+#include "Batch_Buffering_Strategy.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "Batch_Buffering_Strategy.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(Notify, TAO_NS_Batch_Buffering_Strategy, "$id$")
+
+TAO_NS_Batch_Buffering_Strategy::TAO_NS_Batch_Buffering_Strategy (TAO_NS_Message_Queue& msg_queue, TAO_NS_AdminProperties_var& admin_properties, CORBA::Long batch_size)
+ :TAO_NS_Buffering_Strategy (msg_queue, admin_properties, batch_size)
+{
+}
+
+TAO_NS_Batch_Buffering_Strategy::~TAO_NS_Batch_Buffering_Strategy ()
+{
+}
+
+int
+TAO_NS_Batch_Buffering_Strategy::dequeue_batch (CosNotification::EventBatch& event_batch)
+{
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1);
+
+ // if batch_size is infinite, simply dequeue everything available.
+
+ int pending = 0; // not used.
+
+ if (this->batch_size_ == 0)
+ {
+ return this->dequeue_available (event_batch, pending);
+ }
+ else
+ {
+ // block till batch size of events are available.
+ while (this->msg_queue_.message_count () < this->batch_size_)
+ {
+ if (this->shutdown_ == 1) // if we're shutdown, don't play this silly game.
+ return -1;
+
+ this->batch_size_reached_condition_.wait ();
+ }
+
+ return this->dequeue_i (this->batch_size_, event_batch);
+ }
+}
+
+int
+TAO_NS_Batch_Buffering_Strategy::dequeue_available (CosNotification::EventBatch& event_batch, int &pending)
+{
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1);
+
+ int deq_count = this->msg_queue_.message_count ();
+
+ if (this->batch_size_ != 0 && deq_count > this->batch_size_) // Restrict upto batch size.
+ deq_count = this->batch_size_;
+
+ pending = this->msg_queue_.message_count () - deq_count;
+
+ return this->dequeue_i (deq_count, event_batch);
+}
+
+int
+TAO_NS_Batch_Buffering_Strategy::dequeue_i (int max_deq_count, CosNotification::EventBatch& event_batch)
+{
+ ACE_Message_Block *mb;
+
+ int deq_count = 0;
+
+ event_batch.length (max_deq_count);
+
+ for (; deq_count < max_deq_count; ++deq_count)
+ {
+ if (this->msg_queue_.dequeue (mb) == -1)
+ break; // error, simply return what we could extract so far.
+
+ --this->global_queue_length_;
+
+ TAO_NS_Method_Request_Event* mre = ACE_dynamic_cast (TAO_NS_Method_Request_Event*, mb);
+
+ mre->event ()->convert (event_batch[deq_count]);
+
+ ACE_Message_Block::release (mb);
+ }
+
+ event_batch.length (deq_count);
+
+ this->global_queue_not_full_condition_.signal ();
+ this->local_queue_not_full_condition_.signal ();
+
+ return deq_count;
+}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.h b/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.h
new file mode 100644
index 00000000000..196f4a67242
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.h
@@ -0,0 +1,60 @@
+/* -*- C++ -*- */
+/**
+ * @file Batch_Buffering_Strategy.h
+ *
+ * $Id$
+ *
+ * @author Pradeep Gore <pradeep@oomworks.com>
+ *
+ *
+ */
+
+#ifndef TAO_NS_BATCH_BUFFERING_STRATEGY_H
+#define TAO_NS_BATCH_BUFFERING_STRATEGY_H
+#include "ace/pre.h"
+
+#include "../notify_export.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "../Method_Request.h"
+#include "../Buffering_Strategy.h"
+
+/**
+ * @class TAO_NS_Batch_Buffering_Strategy
+ *
+ * @brief
+ *
+ */
+class TAO_Notify_Export TAO_NS_Batch_Buffering_Strategy : public TAO_NS_Buffering_Strategy
+{
+public:
+ /// Constuctor
+ TAO_NS_Batch_Buffering_Strategy (TAO_NS_Message_Queue& msg_queue, TAO_NS_AdminProperties_var& admin_properties, CORBA::Long batch_size);
+
+ /// Destructor
+ ~TAO_NS_Batch_Buffering_Strategy ();
+
+ /// Dequeue batch. This method will block till <batch_size> is available..
+ /// Return -1 on error else the number of items actually dequeued.
+ int dequeue_batch (CosNotification::EventBatch& event_batch);
+
+ /// Dequeue upto batch. This method will not block.
+ /// Return -1 on error else the number of items dequeued (<batch_size>).
+ /// <pending> is set to the number of events remaining in the queue.
+ int dequeue_available (CosNotification::EventBatch& event_batch, int &pending);
+
+protected:
+
+ /// Extract upto <max_deq_count> number of items.
+ int dequeue_i (int max_deq_count, CosNotification::EventBatch& event_batch);
+};
+
+#if defined (__ACE_INLINE__)
+#include "Batch_Buffering_Strategy.inl"
+#endif /* __ACE_INLINE__ */
+
+#include "ace/post.h"
+#endif /* TAO_NS_BATCH_BUFFERING_STRATEGY_H */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.inl b/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.inl
new file mode 100644
index 00000000000..72a7bb34341
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.inl
@@ -0,0 +1,3 @@
+// $Id$
+
+#include "Batch_Buffering_Strategy.h"
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.cpp
new file mode 100644
index 00000000000..9101583601c
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.cpp
@@ -0,0 +1,107 @@
+// $Id$
+
+#include "SequenceProxyPushConsumer.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "SequenceProxyPushConsumer.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(Notify, TAO_NS_SequenceProxyPushConsumer, "$id$")
+
+#include "ace/Refcounted_Auto_Ptr.h"
+#include "tao/debug.h"
+#include "SequencePushSupplier.h"
+#include "../Admin.h"
+#include "../AdminProperties.h"
+#include "../Structured/StructuredEvent.h"
+
+TAO_NS_SequenceProxyPushConsumer::TAO_NS_SequenceProxyPushConsumer (void)
+:pacing_interval_ (CosNotification::PacingInterval)
+{
+}
+
+TAO_NS_SequenceProxyPushConsumer::~TAO_NS_SequenceProxyPushConsumer ()
+{
+}
+
+void
+TAO_NS_SequenceProxyPushConsumer::release (void)
+{
+ if (this->supplier_)
+ this->supplier_->release ();
+
+ delete this;
+ //@@ inform factory
+}
+
+void
+TAO_NS_SequenceProxyPushConsumer::destroy (ACE_ENV_SINGLE_ARG_DECL)
+{
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "In TAO_NS_SequenceProxyPushConsumer::destroy \n"));
+
+ this->inherited::destroy (this ACE_ENV_ARG_PARAMETER);
+}
+
+CosNotifyChannelAdmin::ProxyType
+TAO_NS_SequenceProxyPushConsumer::MyType (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ))
+{
+ return CosNotifyChannelAdmin::PUSH_SEQUENCE;
+}
+
+void
+TAO_NS_SequenceProxyPushConsumer::connect_sequence_push_supplier (CosNotifyComm::SequencePushSupplier_ptr push_supplier ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ , CosEventChannelAdmin::AlreadyConnected
+ ))
+{
+ // Convert Supplier to Base Type
+ TAO_NS_SequencePushSupplier *supplier;
+ ACE_NEW_THROW_EX (supplier,
+ TAO_NS_SequencePushSupplier (this),
+ CORBA::NO_MEMORY ());
+
+ supplier->init (push_supplier ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ this->connect (supplier ACE_ENV_ARG_PARAMETER);
+}
+
+void
+TAO_NS_SequenceProxyPushConsumer::push_structured_events (const CosNotification::EventBatch& event_batch ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ , CosEventComm::Disconnected
+ ))
+{
+ // Check if we should proceed at all.
+ if (this->admin_properties_->reject_new_events () == 1 && this->admin_properties_->queue_full ())
+ ACE_THROW (CORBA::IMP_LIMIT ());
+
+ if (this->is_connected () == 0)
+ {
+ ACE_THROW (CosEventComm::Disconnected ());
+ }
+
+ for (CORBA::ULong i = 0; i < event_batch.length (); ++i)
+ {
+ const CosNotification::StructuredEvent& notification = event_batch[i];
+
+ TAO_NS_Event_var event (new TAO_NS_StructuredEvent (notification));
+
+ this->push (event);
+ }
+}
+
+void
+TAO_NS_SequenceProxyPushConsumer::disconnect_sequence_push_consumer (ACE_ENV_SINGLE_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ))
+{
+ this->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
+}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.h b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.h
new file mode 100644
index 00000000000..647ce01b185
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.h
@@ -0,0 +1,98 @@
+/* -*- C++ -*- */
+/**
+ * @file SequenceProxyPushConsumer.h
+ *
+ * $Id$
+ *
+ * @author Pradeep Gore <pradeep@oomworks.com>
+ *
+ *
+ */
+
+#ifndef TAO_NS_SEQUENCEPROXYPUSHCONSUMER_H
+#define TAO_NS_SEQUENCEPROXYPUSHCONSUMER_H
+#include "ace/pre.h"
+
+#include "../notify_export.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "orbsvcs/CosNotifyChannelAdminS.h"
+#include "../ProxyConsumer_T.h"
+#include "../Destroy_Callback.h"
+
+#if defined(_MSC_VER)
+#if (_MSC_VER >= 1200)
+#pragma warning(push)
+#endif /* _MSC_VER >= 1200 */
+#pragma warning(disable:4250)
+#endif /* _MSC_VER */
+
+/**
+ * @class TAO_NS_SequenceProxyPushConsumer
+ *
+ * @brief
+ *
+ */
+class TAO_Notify_Export TAO_NS_SequenceProxyPushConsumer : public virtual TAO_NS_ProxyConsumer_T <POA_CosNotifyChannelAdmin::SequenceProxyPushConsumer>, public TAO_NS_Destroy_Callback
+{
+ friend class TAO_NS_Builder;
+public:
+ /// Constuctor
+ TAO_NS_SequenceProxyPushConsumer (void);
+
+ /// Destructor
+ ~TAO_NS_SequenceProxyPushConsumer ();
+
+ /// TAO_NS_Destroy_Callback methods
+ virtual void release (void);
+
+ /// Destroy this object.
+ virtual void destroy (ACE_ENV_SINGLE_ARG_DECL);
+
+protected:
+ ///= Data Members
+ TAO_NS_Property_Time pacing_interval_;
+
+ ///= Protected Methods
+
+ //= interface methods
+ virtual CosNotifyChannelAdmin::ProxyType MyType (ACE_ENV_SINGLE_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ));
+
+ virtual void connect_sequence_push_supplier (CosNotifyComm::SequencePushSupplier_ptr push_supplier
+ ACE_ENV_ARG_DECL
+ )
+ ACE_THROW_SPEC ((
+ CORBA::SystemException,
+ CosEventChannelAdmin::AlreadyConnected
+ ));
+
+ virtual void push_structured_events (const CosNotification::EventBatch & notifications
+ ACE_ENV_ARG_DECL
+ )
+ ACE_THROW_SPEC ((
+ CORBA::SystemException,
+ CosEventComm::Disconnected
+ ));
+
+ virtual void disconnect_sequence_push_consumer (ACE_ENV_SINGLE_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ));
+};
+
+#if defined(_MSC_VER) && (_MSC_VER >= 1200)
+#pragma warning(pop)
+#endif /* _MSC_VER */
+
+#if defined (__ACE_INLINE__)
+#include "SequenceProxyPushConsumer.inl"
+#endif /* __ACE_INLINE__ */
+
+#include "ace/post.h"
+#endif /* TAO_NS_SEQUENCEPROXYPUSHCONSUMER_H */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.inl b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.inl
new file mode 100644
index 00000000000..5470ff89e39
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.inl
@@ -0,0 +1,3 @@
+// $Id$
+
+#include "SequenceProxyPushConsumer.h"
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.cpp b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.cpp
new file mode 100644
index 00000000000..f92380f1260
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.cpp
@@ -0,0 +1,84 @@
+// $Id$
+
+#include "SequenceProxyPushSupplier.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "SequenceProxyPushSupplier.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(Notify, TAO_NS_SequenceProxyPushSupplier, "$id$")
+
+#include "tao/debug.h"
+#include "SequencePushConsumer.h"
+#include "../Proxy.h"
+#include "../Admin.h"
+#include "../EventChannel.h"
+#include "../EventChannelFactory.h"
+#include "../Notify_Service.h"
+
+
+TAO_NS_SequenceProxyPushSupplier::TAO_NS_SequenceProxyPushSupplier (void)
+{
+}
+
+TAO_NS_SequenceProxyPushSupplier::~TAO_NS_SequenceProxyPushSupplier ()
+{
+}
+
+void
+TAO_NS_SequenceProxyPushSupplier::destroy (ACE_ENV_SINGLE_ARG_DECL)
+{
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "In TAO_NS_SequenceProxyPushConsumer::destroy \n"));
+
+ this->inherited::destroy (this ACE_ENV_ARG_PARAMETER);
+}
+
+void
+TAO_NS_SequenceProxyPushSupplier::release (void)
+{
+ if (this->consumer_)
+ this->consumer_->release ();
+
+ delete this;
+ //@@ inform factory
+}
+
+void
+TAO_NS_SequenceProxyPushSupplier::connect_sequence_push_consumer (CosNotifyComm::SequencePushConsumer_ptr push_consumer ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ , CosEventChannelAdmin::AlreadyConnected
+ , CosEventChannelAdmin::TypeError
+ ))
+{
+ // Convert Consumer to Base Type
+ TAO_NS_SequencePushConsumer* consumer;
+ ACE_NEW_THROW_EX (consumer,
+ TAO_NS_SequencePushConsumer (this),
+ CORBA::NO_MEMORY ());
+
+ consumer->init (push_consumer, this->admin_properties_ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ this->connect (consumer ACE_ENV_ARG_PARAMETER);
+}
+
+void
+TAO_NS_SequenceProxyPushSupplier::disconnect_sequence_push_supplier (ACE_ENV_SINGLE_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ))
+
+{
+ this->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
+}
+
+CosNotifyChannelAdmin::ProxyType
+TAO_NS_SequenceProxyPushSupplier::MyType (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ))
+{
+ return CosNotifyChannelAdmin::PUSH_SEQUENCE;
+}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.h b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.h
new file mode 100644
index 00000000000..722aecf4b8f
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.h
@@ -0,0 +1,94 @@
+/* -*- C++ -*- */
+/**
+ * @file SequenceProxyPushSupplier.h
+ *
+ * $Id$
+ *
+ * @author Pradeep Gore <pradeep@oomworks.com>
+ *
+ *
+ */
+
+#ifndef TAO_NS_SEQUENCEPROXYPUSHSUPPLIER_H
+#define TAO_NS_SEQUENCEPROXYPUSHSUPPLIER_H
+#include "ace/pre.h"
+
+#include "../notify_export.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "orbsvcs/CosNotifyChannelAdminS.h"
+#include "../ProxySupplier_T.h"
+
+#if defined(_MSC_VER)
+#if (_MSC_VER >= 1200)
+#pragma warning(push)
+#endif /* _MSC_VER >= 1200 */
+#pragma warning(disable:4250)
+#endif /* _MSC_VER */
+
+#if defined ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT
+template class TAO_Notify_Export
+TAO_NS_ProxySupplier_T<POA_CosNotifyChannelAdmin::SequenceProxyPushSupplier>;
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT */
+
+/**
+ * @class TAO_NS_SequenceProxyPushSupplier
+ *
+ * @brief Implements the CosNotifyChannelAdmin::SequenceProxyPushSupplier methods.
+ *
+ *
+ */
+class TAO_Notify_Export TAO_NS_SequenceProxyPushSupplier : public virtual TAO_NS_ProxySupplier_T <POA_CosNotifyChannelAdmin::SequenceProxyPushSupplier>, public TAO_NS_Destroy_Callback
+{
+ friend class TAO_NS_Builder;
+
+public:
+ /// Constuctor
+ TAO_NS_SequenceProxyPushSupplier (void);
+
+ /// Destructor
+ ~TAO_NS_SequenceProxyPushSupplier ();
+
+ /// Destroy this object.
+ virtual void destroy (ACE_ENV_SINGLE_ARG_DECL);
+
+ /// TAO_NS_Destroy_Callback methods
+ virtual void release (void);
+
+ /// = Servant methods
+ virtual CosNotifyChannelAdmin::ProxyType MyType (ACE_ENV_SINGLE_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ));
+
+ virtual void connect_sequence_push_consumer (
+ CosNotifyComm::SequencePushConsumer_ptr push_consumer
+ ACE_ENV_ARG_DECL
+ )
+ ACE_THROW_SPEC ((
+ CORBA::SystemException,
+ CosEventChannelAdmin::AlreadyConnected,
+ CosEventChannelAdmin::TypeError
+ ));
+
+ virtual void disconnect_sequence_push_supplier (
+ ACE_ENV_SINGLE_ARG_DECL
+ )
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ));
+};
+
+#if defined(_MSC_VER) && (_MSC_VER >= 1200)
+#pragma warning(pop)
+#endif /* _MSC_VER */
+
+#if defined (__ACE_INLINE__)
+#include "SequenceProxyPushSupplier.inl"
+#endif /* __ACE_INLINE__ */
+
+#include "ace/post.h"
+#endif /* TAO_NS_SEQUENCEPROXYPUSHSUPPLIER_H */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.inl b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.inl
new file mode 100644
index 00000000000..5f4ceeb6181
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.inl
@@ -0,0 +1,3 @@
+// $Id$
+
+#include "SequenceProxyPushSupplier.h"
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp
new file mode 100644
index 00000000000..36abbc2ba37
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp
@@ -0,0 +1,181 @@
+// $Id$
+
+#include "SequencePushConsumer.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "SequencePushConsumer.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(Notify, TAO_NS_SequencePushConsumer, "$id$")
+
+#include "ace/Reactor.h"
+#include "tao/debug.h"
+#include "../QoSProperties.h"
+#include "../ProxySupplier.h"
+#include "../Worker_Task.h"
+#include "../Consumer.h"
+#include "../Method_Request.h"
+#include "../Timer.h"
+
+TAO_NS_SequencePushConsumer::TAO_NS_SequencePushConsumer (TAO_NS_ProxySupplier* proxy)
+ : TAO_NS_Consumer (proxy), pacing_interval_ (ACE_Time_Value::zero), timer_id_ (-1), buffering_strategy_ (0),
+ max_batch_size_ (CosNotification::MaximumBatchSize, 0), timer_ (0)
+{
+}
+
+TAO_NS_SequencePushConsumer::~TAO_NS_SequencePushConsumer ()
+{
+ delete this->buffering_strategy_;
+}
+
+void
+TAO_NS_SequencePushConsumer::init (CosNotifyComm::SequencePushConsumer_ptr push_consumer, TAO_NS_AdminProperties_var& admin_properties ACE_ENV_ARG_DECL)
+{
+ this->push_consumer_ = CosNotifyComm::SequencePushConsumer::_duplicate (push_consumer);
+
+ this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (push_consumer);
+
+ ACE_NEW_THROW_EX (this->buffering_strategy_,
+ TAO_NS_Batch_Buffering_Strategy (this->msg_queue_, admin_properties,
+ this->max_batch_size_.value ()),
+ CORBA::NO_MEMORY ());
+
+ this->timer_ = this->proxy ()->timer ();
+}
+
+void
+TAO_NS_SequencePushConsumer::shutdown (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
+{
+ this->cancel_timer ();
+ this->timer_->_decr_refcnt ();
+}
+
+void
+TAO_NS_SequencePushConsumer::release (void)
+{
+ delete this;
+ //@@ inform factory
+}
+
+void
+TAO_NS_SequencePushConsumer::qos_changed (const TAO_NS_QoSProperties& qos_properties)
+{
+ this->max_batch_size_ = qos_properties.maximum_batch_size ();
+
+ if (this->max_batch_size_.is_valid ())
+ {// set the max batch size.
+ this->buffering_strategy_->batch_size (this->max_batch_size_.value ());
+ }
+
+ const TAO_NS_Property_Time &pacing_interval = qos_properties.pacing_interval ();
+
+ if (pacing_interval.is_valid ())
+ {
+ this->pacing_interval_ =
+# if defined (ACE_CONFIG_WIN32_H)
+ ACE_Time_Value (ACE_static_cast (long, pacing_interval.value ()));
+# else
+ ACE_Time_Value (pacing_interval.value () / 1);
+# endif /* ACE_CONFIG_WIN32_H */
+ }
+
+ // Inform the buffering strategy of qos change.
+ this->buffering_strategy_->update_qos_properties (qos_properties);
+}
+
+void
+TAO_NS_SequencePushConsumer::schedule_timer (void)
+{
+ // Schedule the timer.
+ if (this->pacing_interval_ != ACE_Time_Value::zero)
+ {
+ this->timer_id_ = this->timer_->schedule_timer (this, this->pacing_interval_, 0);
+
+ if (this->timer_id_ == -1)
+ this->pacing_interval_ = ACE_Time_Value::zero; // some error, revert to no pacing.
+ }
+}
+
+void
+TAO_NS_SequencePushConsumer::cancel_timer (void)
+{
+ timer_->cancel_timer (this->timer_id_);
+}
+
+void
+TAO_NS_SequencePushConsumer::push_i (const TAO_NS_Event_var& event ACE_ENV_ARG_DECL_NOT_USED)
+{
+ TAO_NS_Method_Request_Event* method_request = new TAO_NS_Method_Request_Event (event);
+
+ int msg_count = this->buffering_strategy_->enqueue (*method_request);
+
+ if (msg_count == -1)
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "NS_Seq_Reactive_Task (%P|%t) - "
+ "failed to enqueue\n"));
+ return;
+ }
+
+ if (this->pacing_interval_ == ACE_Time_Value::zero)
+ {
+ // If pacing is zero, there is no timer, hence dispatch immediately
+ this->handle_timeout (ACE_Time_Value::zero, 0);
+ }
+ else if (msg_count == 1)
+ this->schedule_timer ();
+}
+
+void
+TAO_NS_SequencePushConsumer::push (const CORBA::Any& /*event*/ ACE_ENV_ARG_DECL_NOT_USED)
+{
+ //NOP
+}
+
+void
+TAO_NS_SequencePushConsumer::push (const CosNotification::StructuredEvent& /*notification*/ ACE_ENV_ARG_DECL_NOT_USED)
+{
+ //NOP
+}
+
+int
+TAO_NS_SequencePushConsumer::handle_timeout (const ACE_Time_Value& /*current_time*/,
+ const void* /*act*/)
+{
+ CosNotification::EventBatch event_batch;
+
+ int pending = 0;
+
+ int deq_count = this->buffering_strategy_->dequeue_available (event_batch, pending);
+
+ if (deq_count > 0)
+ {
+ TAO_NS_Refcountable_Guard ref_guard(*this->proxy ()); // Protect this object from being destroyed in this scope.
+
+ this->push (event_batch);
+
+ if (pending)
+ this->schedule_timer ();
+ }
+
+ return 0;
+}
+
+void
+TAO_NS_SequencePushConsumer::push (const CosNotification::EventBatch& event_batch)
+{
+ ACE_TRY_NEW_ENV
+ {
+ this->push_consumer_->push_structured_events (event_batch ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ this->handle_dispatch_exception (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ // we're scheduled to be destroyed. don't set the timer.
+ this->pacing_interval_ = ACE_Time_Value::zero;
+ }
+ ACE_ENDTRY;
+}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h
new file mode 100644
index 00000000000..cc7f689b6ac
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h
@@ -0,0 +1,114 @@
+/* -*- C++ -*- */
+/**
+ * @file SequencePushConsumer.h
+ *
+ * $Id$
+ *
+ * @author Pradeep Gore <pradeep@oomworks.com>
+ *
+ *
+ */
+
+#ifndef TAO_NS_SEQUENCEPUSHCONSUMER_H
+#define TAO_NS_SEQUENCEPUSHCONSUMER_H
+#include "ace/pre.h"
+
+#include "../notify_export.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "ace/Event_Handler.h"
+#include "../Event.h"
+#include "../Property.h"
+#include "../Property_T.h"
+#include "../Consumer.h"
+#include "../AdminProperties.h"
+#include "Batch_Buffering_Strategy.h"
+
+class TAO_NS_ProxySupplier;
+class TAO_NS_QoSProperties;
+class TAO_NS_Timer;
+
+/**
+ * @class TAO_NS_SequencePushConsumer
+ *
+ * @brief
+ *
+ */
+class TAO_Notify_Export TAO_NS_SequencePushConsumer : public ACE_Event_Handler, public TAO_NS_Consumer
+{
+public:
+ /// Constuctor
+ TAO_NS_SequencePushConsumer (TAO_NS_ProxySupplier* proxy);
+
+ /// Destructor
+ ~TAO_NS_SequencePushConsumer ();
+
+ /// Init the Consumer
+ void init (CosNotifyComm::SequencePushConsumer_ptr push_consumer, TAO_NS_AdminProperties_var& admin_properties ACE_ENV_ARG_DECL);
+
+ /// Shutdown the consumer
+ virtual void shutdown (ACE_ENV_SINGLE_ARG_DECL);
+
+ /// TAO_NS_Destroy_Callback methods.
+ virtual void release (void);
+
+ /// Push <event> to this consumer.
+ void push_i (const TAO_NS_Event_var& event ACE_ENV_ARG_DECL);
+
+ /// Push <event> to this consumer.
+ virtual void push (const CORBA::Any& event ACE_ENV_ARG_DECL);
+
+ // Push event.
+ virtual void push (const CosNotification::StructuredEvent & event ACE_ENV_ARG_DECL);
+
+ /// Push <event> to this consumer.
+ virtual void push (const CosNotification::EventBatch& event);
+
+ /// Override, Peer::qos_changed
+ virtual void qos_changed (const TAO_NS_QoSProperties& qos_properties);
+
+protected:
+ /// When the pacing interval is used, handle_timeout () is called by
+ /// the reactor.
+ virtual int handle_timeout (const ACE_Time_Value& current_time,
+ const void* act = 0);
+
+ /// Schedule timer
+ void schedule_timer (void);
+
+ /// Cancel timer
+ void cancel_timer (void);
+
+ ///= Protected Data Members
+
+ /// The Pacing Interval
+ ACE_Time_Value pacing_interval_;
+
+ /// Timer Id.
+ long timer_id_;
+
+ /// The Consumer
+ CosNotifyComm::SequencePushConsumer_var push_consumer_;
+
+ /// The Message queue.
+ TAO_NS_Message_Queue msg_queue_;
+
+ /// The Buffering Strategy
+ TAO_NS_Batch_Buffering_Strategy* buffering_strategy_;
+
+ /// Max. batch size.
+ TAO_NS_Property_Long max_batch_size_;
+
+ /// The Timer Manager that we use.
+ TAO_NS_Timer* timer_;
+};
+
+#if defined (__ACE_INLINE__)
+#include "SequencePushConsumer.inl"
+#endif /* __ACE_INLINE__ */
+
+#include "ace/post.h"
+#endif /* TAO_NS_SEQUENCEPUSHCONSUMER_H */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.inl b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.inl
new file mode 100644
index 00000000000..d050292323e
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.inl
@@ -0,0 +1,3 @@
+// $Id$
+
+#include "SequencePushConsumer.h"
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.cpp b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.cpp
new file mode 100644
index 00000000000..94447663e55
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.cpp
@@ -0,0 +1,33 @@
+// $Id$
+
+#include "SequencePushSupplier.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "SequencePushSupplier.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(Notify, TAO_NS_SequencePushSupplier, "$id$")
+
+TAO_NS_SequencePushSupplier::TAO_NS_SequencePushSupplier (TAO_NS_ProxyConsumer* proxy)
+ :TAO_NS_Supplier (proxy)
+{
+}
+
+TAO_NS_SequencePushSupplier::~TAO_NS_SequencePushSupplier ()
+{
+}
+
+void
+TAO_NS_SequencePushSupplier::init (CosNotifyComm::SequencePushSupplier_ptr push_supplier ACE_ENV_ARG_DECL_NOT_USED)
+{
+ this->push_supplier_ = CosNotifyComm::SequencePushSupplier::_duplicate (push_supplier);
+
+ this->subscribe_ = CosNotifyComm::NotifySubscribe::_duplicate (push_supplier);
+}
+
+void
+TAO_NS_SequencePushSupplier::release (void)
+{
+ delete this;
+ //@@ inform factory
+}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.h b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.h
new file mode 100644
index 00000000000..b83256362da
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.h
@@ -0,0 +1,59 @@
+/* -*- C++ -*- */
+/**
+ * @file SequencePushSupplier.h
+ *
+ * $Id$
+ *
+ * @author Pradeep Gore <pradeep@oomworks.com>
+ *
+ *
+ */
+
+#ifndef TAO_NS_SEQUENCEPUSHSUPPLIER_H
+#define TAO_NS_SEQUENCEPUSHSUPPLIER_H
+#include "ace/pre.h"
+
+#include "../notify_export.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "orbsvcs/CosNotifyCommC.h"
+#include "../Supplier.h"
+#include "../Destroy_Callback.h"
+
+class TAO_NS_ProxyConsumer;
+
+/**
+ * @class TAO_NS_SequencePushSupplier
+ *
+ * @brief Wrapper for the SequencePushSupplier that connect to the EventChannel.
+ *
+ */
+class TAO_Notify_Export TAO_NS_SequencePushSupplier : public TAO_NS_Supplier
+{
+public:
+ /// Constuctor
+ TAO_NS_SequencePushSupplier (TAO_NS_ProxyConsumer* proxy);
+
+ /// Destructor
+ ~TAO_NS_SequencePushSupplier ();
+
+ /// Init
+ void init (CosNotifyComm::SequencePushSupplier_ptr push_supplier ACE_ENV_ARG_DECL);
+
+ /// TAO_NS_Destroy_Callback methods
+ virtual void release (void);
+
+protected:
+ /// The Supplier
+ CosNotifyComm::SequencePushSupplier_var push_supplier_;
+};
+
+#if defined (__ACE_INLINE__)
+#include "SequencePushSupplier.inl"
+#endif /* __ACE_INLINE__ */
+
+#include "ace/post.h"
+#endif /* TAO_NS_SEQUENCEPUSHSUPPLIER_H */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.inl b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.inl
new file mode 100644
index 00000000000..507cd0b85c6
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.inl
@@ -0,0 +1,3 @@
+// $Id$
+
+#include "SequencePushSupplier.h"
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Timer.h b/TAO/orbsvcs/orbsvcs/Notify/Timer.h
new file mode 100644
index 00000000000..84423f943d6
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Timer.h
@@ -0,0 +1,49 @@
+/* -*- C++ -*- */
+/**
+ * @file Timer.h
+ *
+ * $Id$
+ *
+ * @author Pradeep Gore <pradeep@oomworks.com>
+ *
+ *
+ */
+
+#ifndef TAO_NS_TIMER_H
+#define TAO_NS_TIMER_H
+#include "ace/pre.h"
+
+#include "notify_export.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "ace/Time_Value.h"
+#include "Refcountable.h"
+
+class ACE_Event_Handler;
+
+/**
+ * @class TAO_NS_Timer
+ *
+ * @brief Interface for scheduling timers.
+ *
+ */
+class TAO_Notify_Export TAO_NS_Timer : public TAO_NS_Refcountable
+{
+public:
+ /// Destructor
+ virtual ~TAO_NS_Timer (){};
+
+ /// Schedule a timer
+ virtual long schedule_timer (ACE_Event_Handler *handler,
+ const ACE_Time_Value &delay_time,
+ const ACE_Time_Value &interval) = 0;
+
+ /// Cancel Timer
+ virtual int cancel_timer (long timer_id) = 0;
+};
+
+#include "ace/post.h"
+#endif /* TAO_NS_TIMER_H */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Timer_Queue.cpp b/TAO/orbsvcs/orbsvcs/Notify/Timer_Queue.cpp
new file mode 100644
index 00000000000..b7d4bbcd16e
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Timer_Queue.cpp
@@ -0,0 +1,49 @@
+// $Id$
+
+#include "Timer_Queue.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "Timer_Queue.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(Notify, TAO_NS_Timer_Queue, "$id$")
+
+
+TAO_NS_Timer_Queue::TAO_NS_Timer_Queue (void)
+{
+ this->destroy_callback (this);
+}
+
+TAO_NS_Timer_Queue::~TAO_NS_Timer_Queue ()
+{
+}
+
+void
+TAO_NS_Timer_Queue::release (void)
+{
+ delete this;
+ //@@ inform factory
+}
+
+long
+TAO_NS_Timer_Queue::schedule_timer (ACE_Event_Handler *handler,
+ const ACE_Time_Value &delay_time,
+ const ACE_Time_Value &interval)
+{
+ return this->timer_queue_.schedule (handler,
+ 0,
+ timer_queue_.gettimeofday () + delay_time,
+ interval);
+}
+
+int
+TAO_NS_Timer_Queue::cancel_timer (long timer_id)
+{
+ return this->timer_queue_.cancel (timer_id);
+}
+
+ACE_Timer_Queue&
+TAO_NS_Timer_Queue::impl (void)
+{
+ return this->timer_queue_;
+}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Timer_Queue.h b/TAO/orbsvcs/orbsvcs/Notify/Timer_Queue.h
new file mode 100644
index 00000000000..3e8bbdb30f3
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Timer_Queue.h
@@ -0,0 +1,68 @@
+/* -*- C++ -*- */
+/**
+ * @file Timer_Queue.h
+ *
+ * $Id$
+ *
+ * @author Pradeep Gore <pradeep@oomworks.com>
+ *
+ *
+ */
+
+#ifndef TAO_NS_TIMER_QUEUE_H
+#define TAO_NS_TIMER_QUEUE_H
+#include "ace/pre.h"
+
+#include "notify_export.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "Timer.h"
+#include "Destroy_Callback.h"
+
+#include "ace/Timer_Queue.h"
+#include "ace/Timer_Heap.h"
+
+/**
+ * @class TAO_NS_Timer_Queue
+ *
+ * @brief ACE_Timer_Queue based timer.
+ *
+ */
+class TAO_Notify_Export TAO_NS_Timer_Queue : public TAO_NS_Timer
+ , public TAO_NS_Destroy_Callback
+{
+public:
+ /// Constuctor
+ TAO_NS_Timer_Queue (void);
+
+ /// Destructor
+ virtual ~TAO_NS_Timer_Queue ();
+
+ /// TAO_NS_Destroy_Callback methods
+ virtual void release (void);
+
+ /// Schedule a timer
+ virtual long schedule_timer (ACE_Event_Handler *handler,
+ const ACE_Time_Value &delay_time,
+ const ACE_Time_Value &interval);
+
+ /// Cancel Timer
+ virtual int cancel_timer (long timer_id);
+
+ /// Get the native impl.
+ ACE_Timer_Queue& impl (void);
+
+protected:
+ /// The Timer Queue
+ ACE_Timer_Heap timer_queue_;
+};
+
+#if defined (__ACE_INLINE__)
+#include "Timer_Queue.inl"
+#endif /* __ACE_INLINE__ */
+
+#include "ace/post.h"
+#endif /* TAO_NS_TIMER_QUEUE_H */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Timer_Queue.inl b/TAO/orbsvcs/orbsvcs/Notify/Timer_Queue.inl
new file mode 100644
index 00000000000..eeaf18f3e5f
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Timer_Queue.inl
@@ -0,0 +1,3 @@
+// $Id$
+
+#include "Timer_Queue.h"
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Timer_Reactor.cpp b/TAO/orbsvcs/orbsvcs/Notify/Timer_Reactor.cpp
new file mode 100644
index 00000000000..6e57a1cdd62
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Timer_Reactor.cpp
@@ -0,0 +1,49 @@
+// $Id$
+
+#include "Timer_Reactor.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "Timer_Reactor.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(Notify, TAO_NS_Timer_Reactor, "$id$")
+
+#include "ace/Reactor.h"
+#include "tao/ORB_Core.h"
+#include "Properties.h"
+
+TAO_NS_Timer_Reactor::TAO_NS_Timer_Reactor (void)
+ :reactor_ (0)
+{
+ this->destroy_callback (this);
+
+ // Get the ORB
+ CORBA::ORB_var orb = TAO_NS_PROPERTIES::instance()->orb ();
+
+ this->reactor_ = orb->orb_core ()->reactor ();
+}
+
+TAO_NS_Timer_Reactor::~TAO_NS_Timer_Reactor ()
+{
+}
+
+void
+TAO_NS_Timer_Reactor::release (void)
+{
+ delete this;
+ //@@ inform factory
+}
+
+long
+TAO_NS_Timer_Reactor::schedule_timer (ACE_Event_Handler *handler,
+ const ACE_Time_Value &delay_time,
+ const ACE_Time_Value &interval)
+{
+ return this->reactor_->schedule_timer (handler, 0, delay_time, interval);
+}
+
+int
+TAO_NS_Timer_Reactor::cancel_timer (long timer_id)
+{
+ return this->reactor_->cancel_timer (timer_id);
+}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Timer_Reactor.h b/TAO/orbsvcs/orbsvcs/Notify/Timer_Reactor.h
new file mode 100644
index 00000000000..eb0cb6e40d2
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Timer_Reactor.h
@@ -0,0 +1,64 @@
+/* -*- C++ -*- */
+/**
+ * @file Timer_Reactor.h
+ *
+ * $Id$
+ *
+ * @author Pradeep Gore <pradeep@oomworks.com>
+ *
+ *
+ */
+
+#ifndef TAO_NS_TIMER_REACTOR_H
+#define TAO_NS_TIMER_REACTOR_H
+#include "ace/pre.h"
+
+#include "notify_export.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "Timer.h"
+#include "Destroy_Callback.h"
+
+class ACE_Reactor;
+
+/**
+ * @class TAO_NS_Timer_Reactor
+ *
+ * @brief Reactor::instance based timer. The timers are dispatched by the main thread.
+ *
+ */
+class TAO_Notify_Export TAO_NS_Timer_Reactor : public TAO_NS_Timer
+ , public TAO_NS_Destroy_Callback
+{
+public:
+ /// Constuctor
+ TAO_NS_Timer_Reactor (void);
+
+ /// Destructor
+ virtual ~TAO_NS_Timer_Reactor ();
+
+ /// TAO_NS_Destroy_Callback methods
+ virtual void release (void);
+
+ /// Schedule a timer
+ virtual long schedule_timer (ACE_Event_Handler *handler,
+ const ACE_Time_Value &delay_time,
+ const ACE_Time_Value &interval);
+
+ /// Cancel Timer
+ virtual int cancel_timer (long timer_id);
+
+protected:
+ /// The instance reactor that we use.
+ ACE_Reactor* reactor_;
+};
+
+#if defined (__ACE_INLINE__)
+#include "Timer_Reactor.inl"
+#endif /* __ACE_INLINE__ */
+
+#include "ace/post.h"
+#endif /* TAO_NS_TIMER_REACTOR_H */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Timer_Reactor.inl b/TAO/orbsvcs/orbsvcs/Notify/Timer_Reactor.inl
new file mode 100644
index 00000000000..ba9c8b4cae1
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Timer_Reactor.inl
@@ -0,0 +1,3 @@
+// $Id$
+
+#include "Timer_Reactor.h"