summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Notify
diff options
context:
space:
mode:
authorpradeep <pradeep@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2002-11-12 01:17:57 +0000
committerpradeep <pradeep@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2002-11-12 01:17:57 +0000
commit38b9c5540092225386e44e1e6c6323565cbbed3b (patch)
tree2efb9aecae08be85c871afc3cdc5a1448fdea2cf /TAO/orbsvcs/orbsvcs/Notify
parent6e88876b221cc28f52e7a00d48c6d0f7b3537851 (diff)
downloadATCD-38b9c5540092225386e44e1e6c6323565cbbed3b.tar.gz
Mon Nov 11 20:11:56 2002 Pradeep Gore <pradeep@oomworks.com>
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Notify')
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.h2
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.h2
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Builder.cpp6
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp56
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Consumer.h18
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Consumer.inl6
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/EventType.cpp7
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/EventType.h3
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/EventTypeSeq.cpp143
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/EventTypeSeq.h10
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Event_Manager.cpp181
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Event_Manager.h95
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Event_Manager.inl48
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Event_Map_T.cpp93
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Event_Map_T.h40
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Event_Map_T.inl16
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch.cpp22
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.cpp55
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.h64
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.inl3
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Notify_EventChannelFactory_i.h1
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Notify_Signal_Property_T.h8
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Peer.cpp100
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Peer.h37
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Peer.inl6
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Proxy.cpp22
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Proxy.h23
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer.cpp16
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer.h3
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.cpp24
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.h6
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/ProxySupplier.cpp25
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.cpp40
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.h6
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Refcountable.cpp4
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp36
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Subscription_Change_Worker.inl2
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Supplier.cpp10
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Supplier.h4
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/ThreadPool_Task.cpp13
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Types.h12
41 files changed, 763 insertions, 505 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.h b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.h
index 0a54fe04ee6..9beb79a6c68 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.h
@@ -55,7 +55,7 @@ public:
protected:
///= CosNotifyChannelAdmin::ProxyPushConsumer methods
- void push (const CORBA::Any & data ACE_ENV_ARG_DECL)
+ virtual void push (const CORBA::Any & data ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((
CORBA::SystemException,
CosEventComm::Disconnected
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.h b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.h
index c63e4dd2a61..41b2e895840 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.h
@@ -60,7 +60,7 @@ protected:
CORBA::SystemException
));
- void push (const CORBA::Any & data ACE_ENV_ARG_DECL)
+ virtual void push (const CORBA::Any & data ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((
CORBA::SystemException,
CosEventComm::Disconnected
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Builder.cpp b/TAO/orbsvcs/orbsvcs/Notify/Builder.cpp
index 54d3f2e4aab..b2dcaa416c4 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Builder.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Builder.cpp
@@ -36,7 +36,6 @@ ACE_RCSID(RT_Notify, TAO_NS_Builder, "$Id$")
#include "Structured/StructuredProxyPushSupplier.h"
#include "Sequence/SequenceProxyPushConsumer.h"
#include "Sequence/SequenceProxyPushSupplier.h"
-#include "Dispatch_Observer_T.h"
#include "ETCL_FilterFactory.h"
TAO_NS_Builder::TAO_NS_Builder (void)
@@ -166,11 +165,6 @@ TAO_NS_Builder::build_event_channel (TAO_NS_EventChannelFactory* ecf, const CosN
ec->event_manager_->init (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (ec_ret._retn ());
- /* Disable Retry Attempts */
- ec->event_manager_->event_dispatch_observer ()->max_retry_attempts (0);
- ec->event_manager_->updates_dispatch_observer ()->max_retry_attempts (0);
- /* Disable Retry Attempts */
-
ec->set_qos (initial_qos ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (ec_ret._retn ());
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp
index d2199303ca2..9924c38d391 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp
@@ -12,7 +12,6 @@ ACE_RCSID(RT_Notify, TAO_NS_Consumer, "$Id$")
#include "ace/Unbounded_Queue.h"
#include "tao/debug.h"
#include "ProxySupplier.h"
-#include "Dispatch_Observer_T.h"
#include "Proxy.h"
#include "Admin.h"
#include "EventChannel.h"
@@ -20,7 +19,7 @@ ACE_RCSID(RT_Notify, TAO_NS_Consumer, "$Id$")
#include "Notify_Service.h"
TAO_NS_Consumer::TAO_NS_Consumer (TAO_NS_ProxySupplier* proxy)
- :proxy_ (proxy), event_dispatch_observer_ (0), event_collection_ (0), is_suspended_ (0)
+ :proxy_ (proxy), event_collection_ (0), is_suspended_ (0)
{
this->event_collection_ = new TAO_NS_Event_Collection ();
}
@@ -41,44 +40,29 @@ TAO_NS_Consumer::push (const TAO_NS_Event_var &event ACE_ENV_ARG_DECL)
{
if (this->is_suspended_ == 1) // If we're suspended, queue for later delivery.
{
- {
- ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ());
- this->event_collection_->enqueue_head (event);
- }
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ());
+ this->event_collection_->enqueue_head (event);
+
+ return;
}
ACE_TRY
{
this->push_i (event ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
-
- if (this->event_dispatch_observer_ != 0)
- {
- this->event_dispatch_observer_->dispatch_success (this ACE_ENV_ARG_PARAMETER);
-
- ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ());
- this->retry_count_ = 0;
- }
+ }
+ ACE_CATCH (CORBA::OBJECT_NOT_EXIST, not_exist)
+ {
+ this->handle_dispatch_exception (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCH (CORBA::SystemException, sysex)
+ {
+ this->handle_dispatch_exception (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
}
ACE_CATCHANY
{
- if (TAO_debug_level > 0)
- {
- ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_NS_Consumer::push: error sending event. informing dispatch observer\n ");
- }
- //ACE_RE_THROW;
-
- if (this->event_dispatch_observer_ != 0)
- {
- {
- ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ());
-
- ++this->retry_count_;
- this->event_collection_->enqueue_head (event);
- }
-
- this->event_dispatch_observer_->dispatch_failure (this, this->retry_count_ ACE_ENV_ARG_PARAMETER);
- }
}
ACE_ENDTRY;
}
@@ -119,15 +103,9 @@ TAO_NS_Consumer::resume (ACE_ENV_SINGLE_ARG_DECL)
}
void
-TAO_NS_Consumer::dispatch_updates_i (const TAO_NS_EventTypeSeq & added, const TAO_NS_EventTypeSeq & removed
+TAO_NS_Consumer::dispatch_updates_i (const CosNotification::EventTypeSeq& added, const CosNotification::EventTypeSeq& removed
ACE_ENV_ARG_DECL)
{
- CosNotification::EventTypeSeq cos_added;
- CosNotification::EventTypeSeq cos_removed;
-
- added.populate (cos_added);
- removed.populate (cos_removed);
-
if (!CORBA::is_nil (this->publish_.in ()))
- this->publish_->offer_change (cos_added, cos_removed ACE_ENV_ARG_PARAMETER);
+ this->publish_->offer_change (added, removed ACE_ENV_ARG_PARAMETER);
}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Consumer.h b/TAO/orbsvcs/orbsvcs/Notify/Consumer.h
index c70f93b7a3d..0a72eaadce0 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Consumer.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Consumer.h
@@ -58,9 +58,6 @@ public:
/// Push <event> to this consumer.
virtual void push (const CosNotification::StructuredEvent& event ACE_ENV_ARG_DECL) = 0;
- /// Set Observer.
- void event_dispatch_observer (TAO_NS_Event_Dispatch_Observer* event_dispatch_observer);
-
/// Dispatch the pending events
void dispatch_pending (ACE_ENV_SINGLE_ARG_DECL);
@@ -74,23 +71,20 @@ public:
void resume (ACE_ENV_SINGLE_ARG_DECL);
protected:
- /// Get the shared Proxy Lock
- TAO_SYNCH_MUTEX* proxy_lock (void);
+ // Dispatch updates
+ virtual void dispatch_updates_i (const CosNotification::EventTypeSeq& added,
+ const CosNotification::EventTypeSeq& removed
+ ACE_ENV_ARG_DECL);
/// Push Implementation.
virtual void push_i (const TAO_NS_Event_var& event ACE_ENV_ARG_DECL) = 0;
- // Dispatch updates implementation.
- virtual void dispatch_updates_i (const TAO_NS_EventTypeSeq & added,
- const TAO_NS_EventTypeSeq & removed
- ACE_ENV_ARG_DECL);
+ /// Get the shared Proxy Lock
+ TAO_SYNCH_MUTEX* proxy_lock (void);
/// The Proxy that we associate with.
TAO_NS_ProxySupplier* proxy_;
- /// Observer
- TAO_NS_Event_Dispatch_Observer* event_dispatch_observer_;
-
/// Events pending to be delivered.
TAO_NS_Event_Collection* event_collection_;
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Consumer.inl b/TAO/orbsvcs/orbsvcs/Notify/Consumer.inl
index aa56593f7ab..d2a5a104c85 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Consumer.inl
+++ b/TAO/orbsvcs/orbsvcs/Notify/Consumer.inl
@@ -14,12 +14,6 @@ TAO_NS_Consumer::proxy_supplier (void)
return this->proxy_;
}
-ACE_INLINE void
-TAO_NS_Consumer::event_dispatch_observer (TAO_NS_Event_Dispatch_Observer* event_dispatch_observer)
-{
- this->event_dispatch_observer_ = event_dispatch_observer;
-}
-
ACE_INLINE CORBA::Boolean
TAO_NS_Consumer::is_suspended (void)
{
diff --git a/TAO/orbsvcs/orbsvcs/Notify/EventType.cpp b/TAO/orbsvcs/orbsvcs/Notify/EventType.cpp
index ccee7778d4f..13d1e1f5168 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/EventType.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/EventType.cpp
@@ -121,3 +121,10 @@ TAO_NS_EventType::is_special (void) const
else
return 0;
}
+
+void
+TAO_NS_EventType::dump (void)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%s,%s)", this->event_type_.domain_name.in (), this->event_type_.type_name.in ()));
+}
+
diff --git a/TAO/orbsvcs/orbsvcs/Notify/EventType.h b/TAO/orbsvcs/orbsvcs/Notify/EventType.h
index 5bce8b9f532..231ec34effa 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/EventType.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/EventType.h
@@ -66,6 +66,9 @@ public:
const CosNotification::EventType& native (void) const;
// Get the type underneath us.
+ /// Helper to print contents.
+ void dump (void);
+
protected:
/// Init this object.
void init_i (const char* domain_name, const char* type_name);
diff --git a/TAO/orbsvcs/orbsvcs/Notify/EventTypeSeq.cpp b/TAO/orbsvcs/orbsvcs/Notify/EventTypeSeq.cpp
index 750253c0982..30be26405fc 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/EventTypeSeq.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/EventTypeSeq.cpp
@@ -32,6 +32,31 @@ TAO_NS_EventTypeSeq::populate (CosNotification::EventTypeSeq& event_type_seq) co
}
void
+TAO_NS_EventTypeSeq::populate_no_special (CosNotification::EventTypeSeq& event_type_seq) const
+{
+ // If the special exists in us, don't include it.
+ const TAO_NS_EventType& special = TAO_NS_EventType::special ();
+
+ if (this->find (special) == 0)
+ {
+ event_type_seq.length (this->size () - 1);
+ }
+ else
+ event_type_seq.length (this->size ());
+
+ inherited::CONST_ITERATOR iter (*this);
+
+ TAO_NS_EventType* event_type;
+
+ CORBA::ULong i = 0;
+ for (iter.first (); iter.next (event_type); iter.advance (), ++i)
+ {
+ if (event_type->is_special () == 0) // if its not the special event type.
+ event_type_seq[i] = event_type->native ();
+ }
+}
+
+void
TAO_NS_EventTypeSeq::insert_seq (const CosNotification::EventTypeSeq& event_type_seq)
{
TAO_NS_EventType event_type;
@@ -78,63 +103,121 @@ TAO_NS_EventTypeSeq::remove_seq (const TAO_NS_EventTypeSeq& event_type_seq)
}
void
-TAO_NS_EventTypeSeq::init (TAO_NS_EventTypeSeq& seq_added, TAO_NS_EventTypeSeq& seq_remove_seq)
+TAO_NS_EventTypeSeq::init (TAO_NS_EventTypeSeq& seq_added, TAO_NS_EventTypeSeq& seq_remove)
{
const TAO_NS_EventType& special = TAO_NS_EventType::special ();
- if (this->find (special) == 0)
+ if (this->find (special) == 0) // If this object has the special type.
{
- if (seq_added.find (special) == 0)
+ if (seq_added.find (special) == 0) // if the seq. being added has the special type, you cannot be adding or removing anythings. * overrides.
{
- seq_added.reset ();
- seq_remove_seq.reset ();
+ seq_added.reset (); // remove everything from the sequence bening added.
+ seq_remove.reset (); // remove everything form whats being removed.
}
- else
+ else // sequence being added does not have *
{
- this->reset ();
- this->insert_seq (seq_added);
+ this->reset (); // take away the * from this object.
+ this->insert_seq (seq_added); // insert the sequence being added as the new list of types.
- seq_remove_seq.reset ();
- seq_remove_seq.insert (special);
+ seq_remove.reset (); // reset all that is being removed.
+ seq_remove.insert (special); // remove *
}
}
- else
+ else // if this object does not have the special type.
{
- if (seq_added.find (special) == 0)
+ if (seq_added.find (special) == 0) // but the seq. being added has the special type,
{
- if (seq_remove_seq.find (special) == 0)
+ if (seq_remove.find (special) == 0) // and you're removing * as well
{
- seq_added.reset ();
- seq_remove_seq.reset ();
+ seq_added.reset (); // ignore the request
+ seq_remove.reset (); // ignore the request
}
- else
+ else // seq being removed does not have the special type
{
- seq_remove_seq.reset ();
- seq_remove_seq.insert_seq (*this);
+ seq_remove.reset (); // everything that we're subscribed for is being removed.
+ seq_remove.insert_seq (*this);
- this->reset ();
+ this->reset (); // insert the special in this object.
this->insert (special);
- seq_added.reset ();
+ seq_added.reset (); // also clear our set and add only *
seq_added.insert (special);
}
}
- else
+ else // seq being added does not have special.
{
- if (seq_remove_seq.find (special) == 0)
+ if (seq_remove.find (special) == 0) // but we're removing everything.
{
-
- seq_remove_seq.reset ();
- seq_remove_seq.insert_seq (*this);
-
- this->reset ();
- this->insert_seq (seq_added);
+ seq_remove.reset (); // move all that we have currently to removed.
+ seq_remove.insert_seq (*this);
}
- else
+
+ // so now there are no specials anywhere..
{
+ //= First remove the duplicates in the added and removes lists.
+ // compute the intersection.
+
+ TAO_NS_EventTypeSeq common;
+ common.intersection (seq_added, seq_remove);
+
+ // remove the common elements from both the lists so Added {BCDK} and Removed {CDEA} will yield Added {BK} and Removed {EA}
+ seq_added.remove_seq (common);
+ seq_remove.remove_seq (common);
+
+ // If we're already subscribed for an element we should not subscribe again (duplicate events).
+ // so if we currently subscribe for ABC and we Added {BK} we should now get ABCK as current subscription and Added {K}
+ common.reset ();
+ common.intersection (*this, seq_added);
+ // remove the common elements from the added list. i,e. doent ask to add what we're already added for.
+ seq_added.remove_seq (common);
+ // update the current subscription.
this->insert_seq (seq_added);
- this->remove_seq (seq_remove_seq);
+
+
+ // Similarly for removed.. if we're removing EA and now our current list looks like ABC we should emd up with
+ // current subscription BC and Removed {A}
+ common.reset ();
+ common.intersection (*this, seq_remove);
+
+ seq_remove.reset ();
+ seq_remove.insert_seq (common); // only remove what we currently have.
+
+ this->remove_seq (seq_remove);
}
}
}
}
+
+void
+TAO_NS_EventTypeSeq::intersection (const TAO_NS_EventTypeSeq& rhs, const TAO_NS_EventTypeSeq& lhs)
+{
+ // linear search.
+ TAO_NS_EventTypeSeq::CONST_ITERATOR rhs_iter (rhs);
+ TAO_NS_EventType* rhs_event_type;
+
+ TAO_NS_EventTypeSeq::CONST_ITERATOR lhs_iter (lhs);
+ TAO_NS_EventType* lhs_event_type;
+
+ for (rhs_iter.first (); rhs_iter.next (rhs_event_type); rhs_iter.advance ())
+ {
+ for (lhs_iter.first (); lhs_iter.next (lhs_event_type); lhs_iter.advance ())
+ {
+ if (*rhs_event_type == *lhs_event_type) // if both are same add to this object.
+ this->insert (*rhs_event_type);
+ }
+ }
+}
+
+void
+TAO_NS_EventTypeSeq::dump (void)
+{
+ TAO_NS_EventTypeSeq::CONST_ITERATOR iter (*this);
+
+ TAO_NS_EventType* event_type;
+
+ for (iter.first (); iter.next (event_type); iter.advance ())
+ {
+ event_type->dump ();
+ ACE_DEBUG ((LM_DEBUG, ", "));
+ }
+}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/EventTypeSeq.h b/TAO/orbsvcs/orbsvcs/Notify/EventTypeSeq.h
index e4676c70a57..5104184f5f6 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/EventTypeSeq.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/EventTypeSeq.h
@@ -41,6 +41,9 @@ public:
/// Preprocess the types added and removed.
void init (TAO_NS_EventTypeSeq& added, TAO_NS_EventTypeSeq& removed);
+ /// Populate this sequence with the intersection of rhs and lhs.
+ void intersection (const TAO_NS_EventTypeSeq& rhs, const TAO_NS_EventTypeSeq& lhs);
+
/// insert_seq the contents of <event_type_seq> into this object.
void insert_seq (const CosNotification::EventTypeSeq& event_type_seq);
@@ -55,6 +58,13 @@ public:
/// Populate <event_type_seq> with the contents of this object.
void populate (CosNotification::EventTypeSeq& event_type) const;
+
+ /// Populate <event_type_seq> with the contents of this object.
+ // Excludes the special event type. This is used to avoid sending * type updates to proxys.
+ void populate_no_special (CosNotification::EventTypeSeq& event_type) const;
+
+ /// Print the contents.
+ void dump (void);
};
#if defined (__ACE_INLINE__)
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Event_Manager.cpp b/TAO/orbsvcs/orbsvcs/Notify/Event_Manager.cpp
index ea3736b1074..b73557e9d57 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Event_Manager.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Event_Manager.cpp
@@ -23,16 +23,10 @@ ACE_RCSID(RT_Notify, TAO_NS_Event_Manager, "$Id$")
#include "EventChannel.h"
#include "EventChannelFactory.h"
#include "Notify_Service.h"
-#include "Pending_Worker_T.h"
-#include "Event_Map_Observer_T.h"
-#include "Dispatch_Observer_T.h"
#include "Event_Map_T.h"
TAO_NS_Event_Manager::TAO_NS_Event_Manager (void)
- :consumer_map_ (0), supplier_map_ (0),
- consumer_map_observer_ (0), supplier_map_observer_ (0),
- event_dispatch_observer_(0), updates_dispatch_observer_ (0),
- event_pending_worker_ (0), updates_pending_worker_ (0)
+ :consumer_map_ (0), supplier_map_ (0)
{
}
@@ -41,17 +35,11 @@ TAO_NS_Event_Manager::~TAO_NS_Event_Manager ()
if (TAO_debug_level > 0)
{
ACE_DEBUG ((LM_DEBUG, "destroying consumer/supplier map count = %d/%d, \n",
- this->consumer_map_->event_type_count (), this->supplier_map_->event_type_count ()));
+ this->consumer_map_->proxy_count (), this->supplier_map_->proxy_count ()));
}
delete this->consumer_map_;
delete this->supplier_map_;
- delete this->consumer_map_observer_;
- delete this->supplier_map_observer_;
- delete this->event_dispatch_observer_;
- delete this->updates_dispatch_observer_;
- delete this->event_pending_worker_;
- delete this->updates_pending_worker_;
}
void
@@ -67,77 +55,87 @@ TAO_NS_Event_Manager::init (ACE_ENV_SINGLE_ARG_DECL)
CORBA::NO_MEMORY ());
ACE_CHECK;
- ACE_NEW_THROW_EX (this->consumer_map_observer_,
- TAO_NS_Consumer_Map_Observer (),
- CORBA::NO_MEMORY ());
+ this->consumer_map_->init (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
-
- ACE_NEW_THROW_EX (this->supplier_map_observer_,
- TAO_NS_Supplier_Map_Observer (),
- CORBA::NO_MEMORY ());
+ this->supplier_map_->init (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
+}
- ACE_NEW_THROW_EX (this->event_dispatch_observer_,
- TAO_NS_Event_Dispatch_Observer (),
- CORBA::NO_MEMORY ());
- ACE_CHECK;
+void
+TAO_NS_Event_Manager::shutdown (void)
+{
+}
- ACE_NEW_THROW_EX (this->updates_dispatch_observer_,
- TAO_NS_Updates_Dispatch_Observer (),
- CORBA::NO_MEMORY ());
- ACE_CHECK;
+void
+TAO_NS_Event_Manager::connect (TAO_NS_ProxySupplier* proxy_supplier ACE_ENV_ARG_DECL)
+{
+ this->consumer_map_->connect (proxy_supplier ACE_ENV_ARG_PARAMETER);
- ACE_NEW_THROW_EX (this->event_pending_worker_,
- TAO_NS_Event_Pending_Worker (),
- CORBA::NO_MEMORY ());
- ACE_CHECK;
+ // Inform about offered types.
+ TAO_NS_EventTypeSeq removed;
+ proxy_supplier->types_changed (this->offered_types (), removed ACE_ENV_ARG_PARAMETER);
+}
- ACE_NEW_THROW_EX (this->updates_pending_worker_,
- TAO_NS_Updates_Pending_Worker (),
- CORBA::NO_MEMORY ());
- ACE_CHECK;
+void
+TAO_NS_Event_Manager::disconnect (TAO_NS_ProxySupplier* proxy_supplier ACE_ENV_ARG_DECL)
+{
+ this->consumer_map_->disconnect (proxy_supplier ACE_ENV_ARG_PARAMETER);
+}
- this->consumer_map_->init (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK;
+void
+TAO_NS_Event_Manager::connect (TAO_NS_ProxyConsumer* proxy_consumer ACE_ENV_ARG_DECL)
+{
+ this->supplier_map_->connect (proxy_consumer ACE_ENV_ARG_PARAMETER);
- this->supplier_map_->init (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK;
+ // Inform about subscription types.
+ TAO_NS_EventTypeSeq removed;
+ proxy_consumer->types_changed (this->subscription_types (), removed ACE_ENV_ARG_PARAMETER);
+}
- this->event_dispatch_observer_->init (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK;
+void
+TAO_NS_Event_Manager::disconnect (TAO_NS_ProxyConsumer* proxy_consumer ACE_ENV_ARG_DECL)
+{
+ this->supplier_map_->disconnect (proxy_consumer ACE_ENV_ARG_PARAMETER);
+}
- this->updates_dispatch_observer_->init (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK;
+void
+TAO_NS_Event_Manager::offer_change (TAO_NS_ProxyConsumer* proxy_consumer, const TAO_NS_EventTypeSeq& added, const TAO_NS_EventTypeSeq& removed ACE_ENV_ARG_DECL)
+{
+ TAO_NS_EventTypeSeq new_added, last_removed;
- this->consumer_map_observer_->init (this->supplier_map_, this->updates_dispatch_observer_);
+ this->publish (proxy_consumer, added, new_added ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
- this->supplier_map_observer_->init (this->consumer_map_, this->updates_dispatch_observer_);
+ this->un_publish (proxy_consumer, removed, last_removed ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
- this->consumer_map_->attach_observer (this->consumer_map_observer_);
- this->supplier_map_->attach_observer (this->supplier_map_observer_);
-
- this->event_pending_worker_->init (this->event_dispatch_observer_, TAO_NS_PROPERTIES::instance()->update_period () ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
+ TAO_NS_Consumer_Map::ENTRY::COLLECTION* updates_collection = this->consumer_map_->updates_collection ();
- if (TAO_NS_PROPERTIES::instance()->updates () == 0)
- this->updates_pending_worker_->worker_suspend ();
+ TAO_NS_ProxySupplier_Update_Worker worker (new_added, last_removed);
- this->updates_pending_worker_->init (this->updates_dispatch_observer_, TAO_NS_PROPERTIES::instance()->update_period () ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
+ if (updates_collection != 0)
+ updates_collection->for_each (&worker ACE_ENV_ARG_PARAMETER);
}
void
-TAO_NS_Event_Manager::shutdown (void)
+TAO_NS_Event_Manager::subscription_change (TAO_NS_ProxySupplier* proxy_supplier, const TAO_NS_EventTypeSeq& added, const TAO_NS_EventTypeSeq& removed ACE_ENV_ARG_DECL)
{
- this->event_pending_worker_->shutdown ();
- this->updates_pending_worker_->shutdown ();
+ TAO_NS_EventTypeSeq new_added, last_removed;
+
+ this->subscribe (proxy_supplier, added, new_added ACE_ENV_ARG_PARAMETER);
+ this->un_subscribe (proxy_supplier, removed, last_removed ACE_ENV_ARG_PARAMETER);
+
+ TAO_NS_Supplier_Map::ENTRY::COLLECTION* updates_collection = this->supplier_map_->updates_collection ();
+
+ TAO_NS_ProxyConsumer_Update_Worker worker (new_added, last_removed);
+
+ if (updates_collection != 0)
+ updates_collection->for_each (&worker ACE_ENV_ARG_PARAMETER);
}
void
-TAO_NS_Event_Manager::subscribe (TAO_NS_ProxySupplier* proxy_supplier, const TAO_NS_EventTypeSeq& seq ACE_ENV_ARG_DECL)
+TAO_NS_Event_Manager::subscribe (TAO_NS_ProxySupplier* proxy_supplier, const TAO_NS_EventTypeSeq& seq, TAO_NS_EventTypeSeq& new_seq ACE_ENV_ARG_DECL)
{
TAO_NS_EventTypeSeq::CONST_ITERATOR iter (seq);
@@ -145,13 +143,16 @@ TAO_NS_Event_Manager::subscribe (TAO_NS_ProxySupplier* proxy_supplier, const TAO
for (iter.first (); iter.next (event_type) != 0; iter.advance ())
{
- consumer_map_->insert (proxy_supplier, *event_type ACE_ENV_ARG_PARAMETER);
+ int result = consumer_map_->insert (proxy_supplier, *event_type ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
+
+ if (result == 1)
+ new_seq.insert (*event_type);
}
}
void
-TAO_NS_Event_Manager::un_subscribe (TAO_NS_ProxySupplier* proxy_supplier, const TAO_NS_EventTypeSeq& seq ACE_ENV_ARG_DECL)
+TAO_NS_Event_Manager::un_subscribe (TAO_NS_ProxySupplier* proxy_supplier, const TAO_NS_EventTypeSeq& seq, TAO_NS_EventTypeSeq& last_seq ACE_ENV_ARG_DECL)
{
TAO_NS_EventTypeSeq::CONST_ITERATOR iter (seq);
@@ -159,13 +160,16 @@ TAO_NS_Event_Manager::un_subscribe (TAO_NS_ProxySupplier* proxy_supplier, const
for (iter.first (); iter.next (event_type) != 0; iter.advance ())
{
- consumer_map_->remove (proxy_supplier, *event_type ACE_ENV_ARG_PARAMETER);
+ int result = consumer_map_->remove (proxy_supplier, *event_type ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
+
+ if (result == 1)
+ last_seq.insert (*event_type);
}
}
void
-TAO_NS_Event_Manager::publish (TAO_NS_ProxyConsumer* proxy_consumer, const TAO_NS_EventTypeSeq& seq ACE_ENV_ARG_DECL)
+TAO_NS_Event_Manager::publish (TAO_NS_ProxyConsumer* proxy_consumer, const TAO_NS_EventTypeSeq& seq, TAO_NS_EventTypeSeq& new_seq ACE_ENV_ARG_DECL)
{
TAO_NS_EventTypeSeq::CONST_ITERATOR iter (seq);
@@ -173,14 +177,16 @@ TAO_NS_Event_Manager::publish (TAO_NS_ProxyConsumer* proxy_consumer, const TAO_N
for (iter.first (); iter.next (event_type) != 0; iter.advance ())
{
- supplier_map_->insert (proxy_consumer, *event_type ACE_ENV_ARG_PARAMETER);
+ int result = supplier_map_->insert (proxy_consumer, *event_type ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
- }
+ if (result == 1)
+ new_seq.insert (*event_type);
+ }
}
void
-TAO_NS_Event_Manager::un_publish (TAO_NS_ProxyConsumer* proxy_consumer, const TAO_NS_EventTypeSeq& seq ACE_ENV_ARG_DECL)
+TAO_NS_Event_Manager::un_publish (TAO_NS_ProxyConsumer* proxy_consumer, const TAO_NS_EventTypeSeq& seq, TAO_NS_EventTypeSeq& last_seq ACE_ENV_ARG_DECL)
{
TAO_NS_EventTypeSeq::CONST_ITERATOR iter (seq);
@@ -188,8 +194,11 @@ TAO_NS_Event_Manager::un_publish (TAO_NS_ProxyConsumer* proxy_consumer, const TA
for (iter.first (); iter.next (event_type) != 0; iter.advance ())
{
- supplier_map_->remove (proxy_consumer, *event_type ACE_ENV_ARG_PARAMETER);
+ int result = supplier_map_->remove (proxy_consumer, *event_type ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
+
+ if (result == 1)
+ last_seq.insert (*event_type);
}
}
@@ -198,22 +207,6 @@ TAO_NS_Event_Manager::un_publish (TAO_NS_ProxyConsumer* proxy_consumer, const TA
template class TAO_NS_Event_Map_T<TAO_NS_ProxySupplier, TAO_SYNCH_RW_MUTEX>;
template class TAO_NS_Event_Map_T<TAO_NS_ProxyConsumer, TAO_SYNCH_RW_MUTEX>;
-template class TAO_NS_Event_Map_Observer_T<TAO_NS_ProxyConsumer>;
-template class TAO_NS_Event_Map_Observer_T<TAO_NS_ProxySupplier>;
-
-template class TAO_NS_Dispatch_Observer_T<TAO_NS_Peer>;
-template class TAO_NS_Dispatch_Observer_T<TAO_NS_Consumer>;
-
-template class TAO_NS_Pending_Worker_T<TAO_NS_Peer>;
-template class TAO_NS_Pending_Worker_T<TAO_NS_Consumer>;
-
-template class TAO_NS_Dispatch_Pending_Worker_T<TAO_NS_Consumer>;
-template class TAO_NS_Dispatch_Pending_Worker_T<TAO_NS_Peer>;
-
-template class TAO_NS_Update_Removed_Worker<TAO_NS_ProxyConsumer>;
-template class TAO_NS_Update_Removed_Worker<TAO_NS_ProxySupplier>;
-template class TAO_NS_Update_Added_Worker<TAO_NS_ProxyConsumer>;
-
template class ACE_Hash<TAO_NS_EventType>;
template class ACE_Equal_To<TAO_NS_EventType>;
@@ -230,8 +223,6 @@ template class TAO_NS_Object_Find_Worker_T<TAO_NS_Proxy>;
template class TAO_NS_Object_Find_Worker_T<TAO_NS_Admin>;
template class TAO_NS_Object_Find_Worker_T<TAO_NS_EventChannel>;
-template class TAO_NS_Update_Added_Worker<TAO_NS_ProxySupplier>;
-
template class ACE_Unbounded_Set<TAO_NS_EventType>;
template class ACE_Unbounded_Set_Const_Iterator<TAO_NS_EventType>;
template class ACE_Unbounded_Queue<ACE_Refcounted_Auto_Ptr<TAO_NS_Event, TAO_SYNCH_MUTEX> >;
@@ -287,22 +278,6 @@ template class TAO_ESF_Shutdown_Proxy<TAO_NS_Proxy>;
#pragma instantiate TAO_NS_Event_Map_T<TAO_NS_ProxySupplier, TAO_SYNCH_RW_MUTEX>
#pragma instantiate TAO_NS_Event_Map_T<TAO_NS_ProxyConsumer, TAO_SYNCH_RW_MUTEX>
-#pragma instantiate TAO_NS_Event_Map_Observer_T<TAO_NS_ProxyConsumer>
-#pragma instantiate TAO_NS_Event_Map_Observer_T<TAO_NS_ProxySupplier>
-
-#pragma instantiate TAO_NS_Dispatch_Observer_T<TAO_NS_Peer>
-#pragma instantiate TAO_NS_Dispatch_Observer_T<TAO_NS_Consumer>
-
-#pragma instantiate TAO_NS_Pending_Worker_T<TAO_NS_Peer>
-#pragma instantiate TAO_NS_Pending_Worker_T<TAO_NS_Consumer>
-
-#pragma instantiate TAO_NS_Dispatch_Pending_Worker_T<TAO_NS_Consumer>
-#pragma instantiate TAO_NS_Dispatch_Pending_Worker_T<TAO_NS_Peer>
-
-#pragma instantiate TAO_NS_Update_Removed_Worker<TAO_NS_ProxyConsumer>
-#pragma instantiate TAO_NS_Update_Removed_Worker<TAO_NS_ProxySupplier>
-#pragma instantiate TAO_NS_Update_Added_Worker<TAO_NS_ProxyConsumer>
-
#pragma instantiate ACE_Hash<TAO_NS_EventType>
#pragma instantiate ACE_Equal_To<TAO_NS_EventType>
@@ -319,8 +294,6 @@ template class TAO_ESF_Shutdown_Proxy<TAO_NS_Proxy>;
#pragma instantiate TAO_NS_Object_Find_Worker_T<TAO_NS_Admin>
#pragma instantiate TAO_NS_Object_Find_Worker_T<TAO_NS_EventChannel>
-#pragma instantiate TAO_NS_Update_Added_Worker<TAO_NS_ProxySupplier>
-
#pragma instantiate ACE_Unbounded_Set<TAO_NS_EventType>
#pragma instantiate ACE_Unbounded_Set_Const_Iterator<TAO_NS_EventType>
#pragma instantiate ACE_Unbounded_Queue<ACE_Refcounted_Auto_Ptr<TAO_NS_Event, TAO_SYNCH_MUTEX> >
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Event_Manager.h b/TAO/orbsvcs/orbsvcs/Notify/Event_Manager.h
index eddef6ed5b6..5da8ed87089 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Event_Manager.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Event_Manager.h
@@ -20,7 +20,8 @@
#endif /* ACE_LACKS_PRAGMA_ONCE */
#include "Types.h"
-
+#include "EventTypeSeq.h"
+#include "orbsvcs/ESF/ESF_Worker.h"
/**
* @class TAO_NS_Event_Manager
@@ -43,54 +44,98 @@ public:
/// Init
void shutdown (void);
- /// Subscribe <proxy_supplier> to the event type sequence list <seq>.
- void subscribe (TAO_NS_ProxySupplier* proxy_supplier, const TAO_NS_EventTypeSeq& seq ACE_ENV_ARG_DECL);
+ /// Connect ProxySupplier
+ void connect (TAO_NS_ProxySupplier* proxy_supplier ACE_ENV_ARG_DECL);
- /// Unsubscribe <proxy_supplier> to the event type sequence list <seq>.
- void un_subscribe (TAO_NS_ProxySupplier* proxy_supplier, const TAO_NS_EventTypeSeq& seq ACE_ENV_ARG_DECL);
+ /// Disconnect ProxySupplier
+ void disconnect (TAO_NS_ProxySupplier* proxy_supplier ACE_ENV_ARG_DECL);
- /// Subscribe <proxy_consumer> to the event type sequence list <seq>.
- void publish (TAO_NS_ProxyConsumer* proxy_consumer, const TAO_NS_EventTypeSeq& seq ACE_ENV_ARG_DECL);
+ /// Connect ProxyConsumer
+ void connect (TAO_NS_ProxyConsumer* proxy_consumer ACE_ENV_ARG_DECL);
- /// Subscribe <proxy_consumer> to the event type sequence list <seq>.
- void un_publish (TAO_NS_ProxyConsumer* proxy_consumer, const TAO_NS_EventTypeSeq& seq ACE_ENV_ARG_DECL);
+ /// Disconnect ProxyConsumer
+ void disconnect (TAO_NS_ProxyConsumer* proxy_consumer ACE_ENV_ARG_DECL);
/// Map accessors.
TAO_NS_Consumer_Map* consumer_map (void);
TAO_NS_Supplier_Map* supplier_map (void);
- /// Event Dispatch Observer
- TAO_NS_Event_Dispatch_Observer* event_dispatch_observer (void);
+ /// Offer change received on <proxy_consumer>.
+ void offer_change (TAO_NS_ProxyConsumer* proxy_consumer, const TAO_NS_EventTypeSeq& added, const TAO_NS_EventTypeSeq& removed ACE_ENV_ARG_DECL);
- /// Update dispatch observer.
- TAO_NS_Updates_Dispatch_Observer* updates_dispatch_observer (void);
+ /// Subscription change received on <proxy_supplier>.
+ void subscription_change (TAO_NS_ProxySupplier* proxy_supplier, const TAO_NS_EventTypeSeq& added, const TAO_NS_EventTypeSeq& removed ACE_ENV_ARG_DECL);
+
+ /// What are the types being offered.
+ const TAO_NS_EventTypeSeq& offered_types (void);
+
+ /// What are the types being subscribed.
+ const TAO_NS_EventTypeSeq& subscription_types (void);
protected:
+ /// Subscribe <proxy_supplier> to the event type sequence list <seq>.
+ void subscribe (TAO_NS_ProxySupplier* proxy_supplier, const TAO_NS_EventTypeSeq& seq, TAO_NS_EventTypeSeq& new_seq ACE_ENV_ARG_DECL);
+
+ /// Unsubscribe <proxy_supplier> to the event type sequence list <seq>.
+ void un_subscribe (TAO_NS_ProxySupplier* proxy_supplier, const TAO_NS_EventTypeSeq& seq, TAO_NS_EventTypeSeq& last_seq ACE_ENV_ARG_DECL);
+
+ /// Subscribe <proxy_consumer> to the event type sequence list <seq>.
+ void publish (TAO_NS_ProxyConsumer* proxy_consumer, const TAO_NS_EventTypeSeq& seq, TAO_NS_EventTypeSeq& new_seq ACE_ENV_ARG_DECL);
+
+ /// Subscribe <proxy_consumer> to the event type sequence list <seq>.
+ void un_publish (TAO_NS_ProxyConsumer* proxy_consumer, const TAO_NS_EventTypeSeq& seq, TAO_NS_EventTypeSeq& last_seq ACE_ENV_ARG_DECL);
+
/// Consumer Map
TAO_NS_Consumer_Map* consumer_map_;
/// Supplier Map
TAO_NS_Supplier_Map* supplier_map_;
+};
+
+/********************************************************************************/
- /// Consumer Map Observer
- TAO_NS_Consumer_Map_Observer* consumer_map_observer_;
+/**
+ * @class TAO_NS_ProxyConsumer_Update_Worker
+ *
+ * @brief Inform ProxyConsumer of updates.
+ *
+ */
+class TAO_Notify_Export TAO_NS_ProxyConsumer_Update_Worker : public TAO_ESF_Worker<TAO_NS_ProxyConsumer>
+{
+public:
+ TAO_NS_ProxyConsumer_Update_Worker (const TAO_NS_EventTypeSeq& added, const TAO_NS_EventTypeSeq& removed);
- /// Supplier Map Observer
- TAO_NS_Supplier_Map_Observer* supplier_map_observer_;
+protected:
+ ///= TAO_ESF_Worker method
+ void work (TAO_NS_ProxyConsumer* proxy ACE_ENV_ARG_DECL);
- /// Event Dispatch Observer.
- TAO_NS_Event_Dispatch_Observer* event_dispatch_observer_;
+ const TAO_NS_EventTypeSeq& added_;
+ const TAO_NS_EventTypeSeq& removed_;
+};
- /// Update dispatch observer.
- TAO_NS_Updates_Dispatch_Observer* updates_dispatch_observer_;
+/********************************************************************************/
- /// Worker task that dispatches pending events.
- TAO_NS_Event_Pending_Worker* event_pending_worker_;
+/**
+ * @class TAO_NS_ProxySupplier_Update_Worker
+ *
+ * @brief Inform ProxySupplier of updates.
+ *
+ */
+class TAO_Notify_Export TAO_NS_ProxySupplier_Update_Worker : public TAO_ESF_Worker<TAO_NS_ProxySupplier>
+{
+public:
+ TAO_NS_ProxySupplier_Update_Worker (const TAO_NS_EventTypeSeq& added, const TAO_NS_EventTypeSeq& removed);
+
+protected:
+ ///= TAO_ESF_Worker method
+ void work (TAO_NS_ProxySupplier* proxy ACE_ENV_ARG_DECL);
- /// Worker task that dispatches pending update messges.
- TAO_NS_Updates_Pending_Worker* updates_pending_worker_;
+ const TAO_NS_EventTypeSeq& added_;
+ const TAO_NS_EventTypeSeq& removed_;
};
+/********************************************************************************/
+
#if defined (__ACE_INLINE__)
#include "Event_Manager.inl"
#endif /* __ACE_INLINE__ */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Event_Manager.inl b/TAO/orbsvcs/orbsvcs/Notify/Event_Manager.inl
index f988e9d2914..3ad395f331a 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Event_Manager.inl
+++ b/TAO/orbsvcs/orbsvcs/Notify/Event_Manager.inl
@@ -1,25 +1,57 @@
// $Id$
+#include "ProxyConsumer.h"
+#include "ProxySupplier.h"
+#include "Event_Map_T.h"
+
ACE_INLINE TAO_NS_Consumer_Map*
TAO_NS_Event_Manager::consumer_map (void)
{
- return consumer_map_;
+ return this->consumer_map_;
}
ACE_INLINE TAO_NS_Supplier_Map*
TAO_NS_Event_Manager::supplier_map (void)
{
- return supplier_map_;
+ return this->supplier_map_;
+}
+
+ACE_INLINE const TAO_NS_EventTypeSeq&
+TAO_NS_Event_Manager::offered_types (void)
+{
+ return this->supplier_map_->event_types ();
+}
+
+ACE_INLINE const TAO_NS_EventTypeSeq&
+TAO_NS_Event_Manager::subscription_types (void)
+{
+ return this->consumer_map_->event_types ();
+}
+
+/********************************************************************************/
+
+ACE_INLINE TAO_NS_ProxyConsumer_Update_Worker::TAO_NS_ProxyConsumer_Update_Worker (const TAO_NS_EventTypeSeq& added, const TAO_NS_EventTypeSeq& removed)
+ :added_ (added), removed_ (removed)
+{
+}
+
+ACE_INLINE void
+TAO_NS_ProxyConsumer_Update_Worker::work (TAO_NS_ProxyConsumer* proxy ACE_ENV_ARG_DECL)
+{
+ proxy->types_changed (added_, removed_ ACE_ENV_ARG_PARAMETER);
}
-ACE_INLINE TAO_NS_Event_Dispatch_Observer*
-TAO_NS_Event_Manager::event_dispatch_observer (void)
+/********************************************************************************/
+
+ACE_INLINE TAO_NS_ProxySupplier_Update_Worker::TAO_NS_ProxySupplier_Update_Worker (const TAO_NS_EventTypeSeq& added, const TAO_NS_EventTypeSeq& removed)
+ :added_ (added), removed_ (removed)
{
- return this->event_dispatch_observer_;
}
-ACE_INLINE TAO_NS_Updates_Dispatch_Observer*
-TAO_NS_Event_Manager::updates_dispatch_observer (void)
+ACE_INLINE void
+TAO_NS_ProxySupplier_Update_Worker::work (TAO_NS_ProxySupplier* proxy ACE_ENV_ARG_DECL)
{
- return this->updates_dispatch_observer_;
+ proxy->types_changed (added_, removed_ ACE_ENV_ARG_PARAMETER);
}
+
+/********************************************************************************/
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Event_Map_T.cpp b/TAO/orbsvcs/orbsvcs/Notify/Event_Map_T.cpp
index 546486af2c6..c97cba0453c 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Event_Map_T.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Event_Map_T.cpp
@@ -8,7 +8,6 @@
#include "Event_Map_Entry_T.h"
#include "Properties.h"
#include "Factory.h"
-#include "Event_Map_Observer.h"
#if ! defined (__ACE_INLINE__)
#include "Event_Map_T.inl"
@@ -18,7 +17,7 @@ ACE_RCSID(RT_Notify, TAO_NS_Event_Map_T, "$Id$")
template <class PROXY, class ACE_LOCK>
TAO_NS_Event_Map_T<PROXY, ACE_LOCK>::TAO_NS_Event_Map_T (void)
- :event_type_count_ (0), observer_ (0)
+ :proxy_count_ (0)
{
}
@@ -32,8 +31,30 @@ template <class PROXY, class ACE_LOCK> void
TAO_NS_Event_Map_T<PROXY, ACE_LOCK>::init (ACE_ENV_SINGLE_ARG_DECL)
{
this->broadcast_entry_.init (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ this->updates_entry_.init (ACE_ENV_SINGLE_ARG_PARAMETER);
+}
+
+template <class PROXY, class ACE_LOCK> void
+TAO_NS_Event_Map_T<PROXY, ACE_LOCK>::connect (PROXY* proxy ACE_ENV_ARG_DECL)
+{
+ this->updates_entry_.connected (proxy ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ ACE_WRITE_GUARD (ACE_LOCK, ace_mon, this->lock_);
+ ++this->proxy_count_;
}
+template <class PROXY, class ACE_LOCK> void
+TAO_NS_Event_Map_T<PROXY, ACE_LOCK>::disconnect (PROXY* proxy ACE_ENV_ARG_DECL)
+{
+ this->updates_entry_.disconnected (proxy ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ ACE_WRITE_GUARD (ACE_LOCK, ace_mon, this->lock_);
+ --this->proxy_count_;
+}
template <class PROXY, class ACE_LOCK> int
TAO_NS_Event_Map_T<PROXY, ACE_LOCK>::insert (PROXY* proxy, const TAO_NS_EventType& event_type ACE_ENV_ARG_DECL)
@@ -45,6 +66,7 @@ TAO_NS_Event_Map_T<PROXY, ACE_LOCK>::insert (PROXY* proxy, const TAO_NS_EventTyp
if (event_type.is_special () == 1)
{
entry = &this->broadcast_entry_;
+
result = 0;
}
else
@@ -54,7 +76,7 @@ TAO_NS_Event_Map_T<PROXY, ACE_LOCK>::insert (PROXY* proxy, const TAO_NS_EventTyp
result = this->map_.find (event_type, entry);
}
- if (result == -1)
+ if (result == -1) // This type is being seen for the first time.
{
ACE_NEW_THROW_EX (entry,
ENTRY (),
@@ -72,19 +94,18 @@ TAO_NS_Event_Map_T<PROXY, ACE_LOCK>::insert (PROXY* proxy, const TAO_NS_EventTyp
if (map_.bind (event_type, entry) == -1)
ACE_THROW_RETURN (CORBA::NO_MEMORY (), -1);
- if (this->observer_ != 0)
- this->observer_->type_added (event_type ACE_ENV_ARG_PARAMETER);
+ if (this->event_types_.insert (event_type) == -1)
+ return -1;
- return ++event_type_count_;
+ return 1;
}
- else
+ else // Add to existing entry or the broadcast entry.
{
entry->connected (proxy ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
-
- ACE_WRITE_GUARD_RETURN (ACE_LOCK, ace_mon, this->lock_, -1);
- return ++event_type_count_;
}
+
+ return 0;
}
template <class PROXY, class ACE_LOCK> int
@@ -92,42 +113,50 @@ TAO_NS_Event_Map_T<PROXY, ACE_LOCK>::remove (PROXY* proxy, const TAO_NS_EventTyp
{
ENTRY* entry;
- int result = -1;
-
if (event_type.is_special () == 1)
{
entry = &this->broadcast_entry_;
- result = 0;
+
+ entry->disconnected (proxy ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
}
else
{
- ACE_READ_GUARD_RETURN (ACE_LOCK, ace_mon, this->lock_, -1);
-
- result = this->map_.find (event_type, entry);
- }
+ int result = -1;
- if (result == 0)
- {
- entry->disconnected (proxy ACE_ENV_ARG_PARAMETER);
- ACE_CHECK_RETURN (-1);
+ {
+ ACE_READ_GUARD_RETURN (ACE_LOCK, ace_mon, this->lock_, -1);
- ACE_WRITE_GUARD_RETURN (ACE_LOCK, ace_mon, this->lock_, -1);
+ result = this->map_.find (event_type, entry);
+ }
- if (entry->count () == 0)
+ if (result == 0)
{
- if (this->observer_ != 0)
- this->observer_->type_removed (event_type ACE_ENV_ARG_PARAMETER);
+ entry->disconnected (proxy ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
- /// @@TODO: Exec a strategy for removing entries.
- /// Strategy 1: remove_immediately
- /// Strategy 2: remove_bunch_after_threshold
- /// Strategy 3: use cached allocator and 1
- }
+ if (entry->count () == 0)
+ {
+ /// Exec a strategy for removing entries.
+ /// Strategy 1: remove_immediately
+ /// Strategy 2: remove a bunch_after crossing a threshold
+ /// Strategy 3: use cached allocator and 1
- return --event_type_count_;
+ // Strategy 1:
+ ACE_WRITE_GUARD_RETURN (ACE_LOCK, ace_mon, this->lock_, -1);
+
+ this->map_.unbind (event_type);
+ delete entry;
+
+ if (this->event_types_.remove (event_type) == -1)
+ return -1;
+
+ return 1;
+ }
+ }
}
- return -1;
+ return 0;
}
#endif /* TAO_NS_EVENT_MAP_T_C */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Event_Map_T.h b/TAO/orbsvcs/orbsvcs/Notify/Event_Map_T.h
index 8a2fb014707..b32f7f5861e 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Event_Map_T.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Event_Map_T.h
@@ -24,8 +24,6 @@
#include "EventType.h"
#include "Event_Map_Entry_T.h"
-class TAO_NS_Event_Map_Observer;
-
/**
* @class TAO_NS_Event_Map_T
*
@@ -38,7 +36,7 @@ class TAO_NS_Event_Map_T
public:
typedef TAO_NS_Event_Map_Entry_T<PROXY> ENTRY;
-
+
/// Constuctor
TAO_NS_Event_Map_T (void);
@@ -48,13 +46,20 @@ public:
/// Init
void init (ACE_ENV_SINGLE_ARG_DECL);
- /// Attach an Observer.
- void attach_observer (TAO_NS_Event_Map_Observer* observer);
+ /// Connect a PROXY
+ void connect (PROXY* proxy ACE_ENV_ARG_DECL);
+
+ /// Disconnect a PROXY
+ void disconnect (PROXY* proxy ACE_ENV_ARG_DECL);
- /// Associate PROXY and event_type. returns count of PROXYs.
+ /// Associate PROXY and event_type.
+ /// Returns 1 if <event_type> is being seem for the 1st time otherwise returns 0.
+ /// Returns -1 on error.
int insert (PROXY* proxy, const TAO_NS_EventType& event_type ACE_ENV_ARG_DECL);
- /// Remove association of PROXY and event_type. returns count of PROXYs.
+ /// Remove association of PROXY and event_type.
+ /// Returns 1 if <event_type> is being seem for the last time otherwise returns 0.
+ /// Returns -1 on error.
int remove (PROXY* proxy, const TAO_NS_EventType& event_type ACE_ENV_ARG_DECL);
/// Find the collection mapped to the <event_type>
@@ -63,8 +68,14 @@ public:
/// Find the default broadcast list.
ACE_TYPENAME ENTRY::COLLECTION* broadcast_collection (void);
- /// Access count, number of different event types in the map.
- int event_type_count (void);
+ /// Find the update list. This is all the PROXYS connected to this Map.
+ ACE_TYPENAME ENTRY::COLLECTION* updates_collection (void);
+
+ /// Access all the event types available
+ const TAO_NS_EventTypeSeq& event_types (void);
+
+ /// Access number of proxys connected in all.
+ int proxy_count (void);
protected:
/// The Map that stores eventtype to entry mapping.
@@ -73,14 +84,17 @@ protected:
/// The lock to use.
ACE_LOCK lock_;
- /// Count of items entered in the map.
- int event_type_count_;
+ /// Count of proxys connected.
+ int proxy_count_;
/// The default broadcast list for EventType::special.
ENTRY broadcast_entry_;
- /// Observer attached to us.
- TAO_NS_Event_Map_Observer* observer_;
+ /// Update Entry - Keeps a list of all PROXY's connected to this Map. Updates are send to this list.
+ ENTRY updates_entry_;
+
+ /// The event types that are available in this map.
+ TAO_NS_EventTypeSeq event_types_;
};
#if defined (__ACE_INLINE__)
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Event_Map_T.inl b/TAO/orbsvcs/orbsvcs/Notify/Event_Map_T.inl
index d9f1ed75d56..7ea2cedce8c 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Event_Map_T.inl
+++ b/TAO/orbsvcs/orbsvcs/Notify/Event_Map_T.inl
@@ -19,14 +19,20 @@ TAO_NS_Event_Map_T<PROXY, ACE_LOCK>::broadcast_collection (void)
return this->broadcast_entry_.collection ();
}
-template <class PROXY, class ACE_LOCK> ACE_INLINE void
-TAO_NS_Event_Map_T<PROXY, ACE_LOCK>::attach_observer (TAO_NS_Event_Map_Observer* observer)
+template <class PROXY, class ACE_LOCK> ACE_INLINE ACE_TYPENAME TAO_NS_Event_Map_Entry_T<PROXY>::COLLECTION*
+TAO_NS_Event_Map_T<PROXY, ACE_LOCK>::updates_collection (void)
{
- this->observer_ = observer;
+ return this->updates_entry_.collection ();
}
template <class PROXY, class ACE_LOCK> ACE_INLINE int
-TAO_NS_Event_Map_T<PROXY, ACE_LOCK>::event_type_count (void)
+TAO_NS_Event_Map_T<PROXY, ACE_LOCK>::proxy_count (void)
+{
+ return this->proxy_count_;
+}
+
+template <class PROXY, class ACE_LOCK> ACE_INLINE const TAO_NS_EventTypeSeq&
+TAO_NS_Event_Map_T<PROXY, ACE_LOCK>::event_types (void)
{
- return this->event_type_count_;
+ return this->event_types_;
}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch.cpp b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch.cpp
index b1178ba0030..e573e691f2a 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch.cpp
@@ -50,20 +50,18 @@ TAO_NS_Method_Request_Dispatch::execute (ACE_ENV_SINGLE_ARG_DECL)
ACE_TRY
{
- this->proxy_supplier_->consumer ()->push (this->event_ ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
+ TAO_NS_Consumer* consumer = this->proxy_supplier_->consumer ();
+
+ if (consumer != 0)
+ {
+ consumer->push (this->event_ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
}
- ACE_CATCH (CORBA::UserException, ue)
- {
- ACE_PRINT_EXCEPTION (ue,
- "TAO_NS_Method_Request_Dispatch::: error sending event. ");
- //ACE_RE_THROW;
- }
- ACE_CATCH (CORBA::SystemException, se)
+ ACE_CATCHANY
{
- ACE_PRINT_EXCEPTION (se,
- "TAO_NS_Method_Request_Dispatch::: error sending event. ");
- //ACE_RE_THROW;
+ if (TAO_debug_level > 0)
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_NS_Method_Request_Dispatch::: error sending event. \n ");
}
ACE_ENDTRY;
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.cpp b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.cpp
new file mode 100644
index 00000000000..60049cf0bef
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.cpp
@@ -0,0 +1,55 @@
+// $Id$
+
+#include "Method_Request_Updates.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "Method_Request_Updates.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(Notify, TAO_NS_Method_Request_Updates, "$id$")
+
+#include "tao/debug.h"
+#include "Proxy.h"
+#include "Peer.h"
+
+TAO_NS_Method_Request_Updates::TAO_NS_Method_Request_Updates (const TAO_NS_EventTypeSeq& added, const TAO_NS_EventTypeSeq& removed, TAO_NS_Proxy* proxy)
+ :added_ (added), removed_ (removed), proxy_ (proxy), refcountable_guard_ (*proxy)
+{
+}
+
+TAO_NS_Method_Request_Updates::~TAO_NS_Method_Request_Updates ()
+{
+}
+
+TAO_NS_Method_Request*
+TAO_NS_Method_Request_Updates::copy (void)
+{
+ /// @@use factory
+ return new TAO_NS_Method_Request_Updates (this->added_, this->removed_, this->proxy_);
+}
+
+int
+TAO_NS_Method_Request_Updates::execute (ACE_ENV_SINGLE_ARG_DECL)
+{
+ if (this->proxy_->has_shutdown ())
+ return 0; // If we were shutdown while waiting in the queue, return with no action.
+
+ ACE_TRY
+ {
+ TAO_NS_Peer* peer = this->proxy_->peer();
+
+ if (peer != 0)
+ {
+ peer->dispatch_updates (this->added_, this->removed_ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ }
+ ACE_CATCHANY
+ {
+ if (TAO_debug_level > 0)
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_NS_Method_Request_Updates::execute error sending updates\n ");
+ }
+ ACE_ENDTRY;
+
+ return 0;
+}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.h b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.h
new file mode 100644
index 00000000000..00e68a21f18
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.h
@@ -0,0 +1,64 @@
+/* -*- C++ -*- */
+/**
+ * @file Method_Request_Updates.h
+ *
+ * $Id$
+ *
+ * @author Pradeep Gore <pradeep@oomworks.com>
+ *
+ *
+ */
+
+#ifndef TAO_NS_METHOD_REQUEST_UPDATES_H
+#define TAO_NS_METHOD_REQUEST_UPDATES_H
+#include "ace/pre.h"
+
+#include "notify_export.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "Method_Request.h"
+#include "Types.h"
+#include "EventTypeSeq.h"
+
+/**
+ * @class TAO_NS_Method_Request_Updates
+ *
+ * @brief
+ *
+ */
+class TAO_Notify_Export TAO_NS_Method_Request_Updates : public TAO_NS_Method_Request
+{
+public:
+ /// Constuctor
+ TAO_NS_Method_Request_Updates (const TAO_NS_EventTypeSeq& added, const TAO_NS_EventTypeSeq& removed, TAO_NS_Proxy* proxy);
+
+ /// Destructor
+ ~TAO_NS_Method_Request_Updates ();
+
+ /// Create a copy of this object.
+ TAO_NS_Method_Request* copy (void);
+
+ /// Execute the Request
+ virtual int execute (ACE_ENV_SINGLE_ARG_DECL);
+
+private:
+ /// The Updates
+ const TAO_NS_EventTypeSeq added_;
+ const TAO_NS_EventTypeSeq removed_;
+
+ /// The proxy that will receive the updates.
+ TAO_NS_Proxy* proxy_;
+
+ /// Guard to automatically inc/decr ref count on the proxy.
+ TAO_NS_Refcountable_Guard refcountable_guard_;
+};
+
+#if defined (__ACE_INLINE__)
+#include "Method_Request_Updates.inl"
+#endif /* __ACE_INLINE__ */
+
+#include "ace/post.h"
+#endif /* TAO_NS_METHOD_REQUEST_UPDATES_H */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.inl b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.inl
new file mode 100644
index 00000000000..bf5cc3848c2
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.inl
@@ -0,0 +1,3 @@
+// $Id$
+
+#include "Method_Request_Updates.h"
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_EventChannelFactory_i.h b/TAO/orbsvcs/orbsvcs/Notify/Notify_EventChannelFactory_i.h
index e5bef05033d..89bf1bf98c9 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Notify_EventChannelFactory_i.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_EventChannelFactory_i.h
@@ -14,7 +14,6 @@
#define NOTIFY_EVENTCHANNELFACTORY_I_H
#include "ace/pre.h"
-#include "Notify_ID_Pool_T.h"
#include "orbsvcs/CosNotifyChannelAdminS.h"
#include "notify_export.h"
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_Signal_Property_T.h b/TAO/orbsvcs/orbsvcs/Notify/Notify_Signal_Property_T.h
index 925ad33185d..cdc3e80d223 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Notify_Signal_Property_T.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_Signal_Property_T.h
@@ -56,12 +56,8 @@ public:
virtual int wait_for_change (const ACE_Time_Value* abstime);
private:
- ACE_UNIMPLEMENTED_FUNC (
- TAO_Notify_Signal_Property (
- const TAO_Notify_Signal_Property<ACE_LOCK, TYPE> &rhs))
- ACE_UNIMPLEMENTED_FUNC (
- TAO_Notify_Signal_Property& operator= (
- const TAO_Notify_Signal_Property<ACE_LOCK, TYPE> &rhs))
+ ACE_UNIMPLEMENTED_FUNC (TAO_Notify_Signal_Property (const TAO_Notify_Signal_Property<ACE_LOCK, TYPE> &rhs))
+ ACE_UNIMPLEMENTED_FUNC (TAO_Notify_Signal_Property& operator= (const TAO_Notify_Signal_Property<ACE_LOCK, TYPE> &rhs))
ACE_Atomic_Op <ACE_LOCK, TYPE> value_;
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Peer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Peer.cpp
index 57a8ac2cbaa..388aec1a7ca 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Peer.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Peer.cpp
@@ -9,7 +9,6 @@
ACE_RCSID(Notify, TAO_NS_Peer, "$id$")
#include "tao/debug.h"
-#include "Dispatch_Observer_T.h"
#include "Proxy.h"
#include "Proxy.h"
#include "Admin.h"
@@ -18,7 +17,6 @@ ACE_RCSID(Notify, TAO_NS_Peer, "$id$")
#include "Notify_Service.h"
TAO_NS_Peer::TAO_NS_Peer (void)
- :updates_dispatch_observer_ (0), retry_count_ (1)
{
}
@@ -39,61 +37,75 @@ TAO_NS_Peer::shutdown (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
}
void
-TAO_NS_Peer::dispatch_pending (ACE_ENV_SINGLE_ARG_DECL)
+TAO_NS_Peer::handle_dispatch_exception (ACE_ENV_SINGLE_ARG_DECL)
{
- TAO_NS_Proxy* proxy = this->proxy ();
- TAO_NS_EventTypeSeq& added = proxy->added_;
- TAO_NS_EventTypeSeq& removed = proxy->removed_;
-
- if (added.size () == 0 && removed.size () == 0)
- return; // Return if nothing to send.
-
- TAO_NS_EventTypeSeq added_copy;
- TAO_NS_EventTypeSeq removed_copy;
- TAO_NS_Reverse_Lock unlock (proxy->lock_);
-
- ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, proxy->lock_);
-
- added_copy = added;
- removed_copy = removed;
- added.reset ();
- removed.reset ();
+ // Sever all association when a remote client misbehaves. Other strategies like reties are possible but not implemented.
+ this->proxy ()->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
+}
+void
+TAO_NS_Peer::dispatch_updates (const TAO_NS_EventTypeSeq & added, const TAO_NS_EventTypeSeq & removed ACE_ENV_ARG_DECL)
+{
ACE_TRY
{
- {
- ACE_GUARD (TAO_NS_Reverse_Lock, ace_mon, unlock);
+ CosNotification::EventTypeSeq cos_added;
+ CosNotification::EventTypeSeq cos_removed;
- this->dispatch_updates_i (added_copy, removed_copy ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
+ const TAO_NS_EventTypeSeq& subscribed_types = this->proxy ()->subscribed_types ();
+ const TAO_NS_EventType& special = TAO_NS_EventType::special ();
- if (this->updates_dispatch_observer_ != 0)
- this->updates_dispatch_observer_->dispatch_success (this ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
- }
+ // Don;t inform of types that we already know about.
+ // E.g. if we're subscribed for {A,B,C,F}
+ // and we receive an update with added list {A,B,G}
+ // then, we should only send {G} because peer already knows about {A, B}
+ // However if we're subscribed for everything, send all kinds of adds.
- this->retry_count_ = 0;
- }
- ACE_CATCHANY
- {
- if (TAO_debug_level > 0)
- {
- ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "Peer:dispatch_pending serror sending updates\n ");
- }
- //ACE_RE_THROW;
+ // Don;t inform of removed types that we don;t care about.
+ // e.g. if we're currently subscribed for {A,B,C,F}
+ // and we receive an update with removed list {A, B, D}
+ // then, we should only send {A,B} because the peer is not interested in D.
+ // However if we're subscribed for everything, send all kinds of removes.
- ++this->retry_count_;
+ TAO_NS_EventTypeSeq added_result = added;
+ TAO_NS_EventTypeSeq removed_result;
- if (this->updates_dispatch_observer_ != 0)
+ if (subscribed_types.find (special) != 0)
{
- /// Restore the lists.
- added.insert_seq (added_copy);
- removed.insert_seq (removed_copy);
+ added_result.remove_seq (subscribed_types);
+ removed_result.intersection (subscribed_types, removed);
+ }
+ else
+ {
+ removed_result = removed;
+ }
- ACE_GUARD (TAO_NS_Reverse_Lock, ace_mon, unlock);
+ added_result.populate_no_special (cos_added);
+ removed_result.populate_no_special (cos_removed);
- this->updates_dispatch_observer_->dispatch_failure (this, this->retry_count_ ACE_ENV_ARG_PARAMETER);
+ if (cos_added.length () != 0 || cos_removed.length () != 0)
+ {
+ this->dispatch_updates_i (cos_added, cos_removed ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
}
}
+ ACE_CATCH (CORBA::OBJECT_NOT_EXIST, not_exist)
+ {
+ this->handle_dispatch_exception (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCH (CORBA::NO_IMPLEMENT, no_impl)
+ {
+ // The peer does not implement the offer/subscription_change method
+ // Do nothing. Later, perhaps set a flag that helps us decide if we should dispatch_updates_i.
+ }
+ ACE_CATCH (CORBA::SystemException, sysex)
+ {
+ this->handle_dispatch_exception (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ // Do nothing
+ }
ACE_ENDTRY;
}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Peer.h b/TAO/orbsvcs/orbsvcs/Notify/Peer.h
index 2d827073fda..658d9728711 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Peer.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Peer.h
@@ -22,14 +22,11 @@
#include "orbsvcs/CosNotificationC.h"
#include "Destroy_Callback.h"
#include "EventTypeSeq.h"
-//#include "Types.h"
class TAO_NS_Proxy;
class TAO_NS_QoSProperties;
class TAO_NS_Peer;
-template <class PEER> class TAO_NS_Dispatch_Observer_T;
-typedef TAO_NS_Dispatch_Observer_T<TAO_NS_Peer> TAO_NS_Updates_Dispatch_Observer;
/**
* @class TAO_NS_Peer
@@ -47,38 +44,32 @@ public:
/// Destructor
virtual ~TAO_NS_Peer ();
+ /// This method sigantures deliberately match the RefCounting methods required for ESF Proxy
+ CORBA::ULong _incr_refcnt (void);
+ CORBA::ULong _decr_refcnt (void);
+
/// Shutdown the peer.
virtual void shutdown (ACE_ENV_SINGLE_ARG_DECL);
- /// Install the updates observer.
- void updates_dispatch_observer (TAO_NS_Updates_Dispatch_Observer* updates_dispatch_observer);
-
/// Access Proxy.
virtual TAO_NS_Proxy* proxy (void) = 0;
- /// This method sigantures deliberately match the RefCounting methods required for ESF Proxy
- CORBA::ULong _incr_refcnt (void);
- CORBA::ULong _decr_refcnt (void);
-
- /// Dispatch Pending.
- void dispatch_pending (ACE_ENV_SINGLE_ARG_DECL);
+ // Dispatch updates
+ virtual void dispatch_updates (const TAO_NS_EventTypeSeq & added,
+ const TAO_NS_EventTypeSeq & removed
+ ACE_ENV_ARG_DECL);
/// QoS changed notification from the Peer.
virtual void qos_changed (TAO_NS_QoSProperties& qos_properties);
+ /// Handle dispatch exceptions.
+ void handle_dispatch_exception (ACE_ENV_SINGLE_ARG_DECL);
+
protected:
- // Dispatch updates implementation.
- virtual void dispatch_updates_i (const TAO_NS_EventTypeSeq & added,
- const TAO_NS_EventTypeSeq & removed
+ /// Implementation of Peer specific dispatch_updates
+ virtual void dispatch_updates_i (const CosNotification::EventTypeSeq& added,
+ const CosNotification::EventTypeSeq& removed
ACE_ENV_ARG_DECL) = 0;
-
- ///= Data Members
-
- // Updates Dispatch Observer
- TAO_NS_Updates_Dispatch_Observer* updates_dispatch_observer_;
-
- /// Retry count. How many times have we failed to contact the remote peer?
- int retry_count_;
};
#if defined (__ACE_INLINE__)
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Peer.inl b/TAO/orbsvcs/orbsvcs/Notify/Peer.inl
index e466a588442..9fc05856b37 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Peer.inl
+++ b/TAO/orbsvcs/orbsvcs/Notify/Peer.inl
@@ -13,9 +13,3 @@ TAO_NS_Peer::_decr_refcnt (void)
{
return this->proxy ()->_decr_refcnt ();
}
-
-ACE_INLINE void
-TAO_NS_Peer::updates_dispatch_observer (TAO_NS_Updates_Dispatch_Observer* updates_dispatch_observer)
-{
- this->updates_dispatch_observer_ = updates_dispatch_observer;
-}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Proxy.cpp b/TAO/orbsvcs/orbsvcs/Notify/Proxy.cpp
index 66cd7650ccc..78c619faa23 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Proxy.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Proxy.cpp
@@ -14,12 +14,12 @@ ACE_RCSID(RT_Notify, TAO_NS_Proxy, "$Id$")
#include "EventChannel.h"
#include "EventChannelFactory.h"
#include "Notify_Service.h"
+#include "Method_Request_Updates.h"
+#include "Worker_Task.h"
TAO_NS_Proxy::TAO_NS_Proxy (void)
:updates_off_ (0)
{
- // Set initial proxy mode to broadcast.
- this->subscribed_types_.insert (TAO_NS_EventType::special ());
}
TAO_NS_Proxy::~TAO_NS_Proxy ()
@@ -27,19 +27,11 @@ TAO_NS_Proxy::~TAO_NS_Proxy ()
}
void
-TAO_NS_Proxy::type_added (const TAO_NS_EventType& added)
+TAO_NS_Proxy::types_changed (const TAO_NS_EventTypeSeq& added, const TAO_NS_EventTypeSeq& removed ACE_ENV_ARG_DECL_NOT_USED)
{
- ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
- this->added_.insert (added);
- this->removed_.remove (added);
-}
+ TAO_NS_Method_Request_Updates request (added, removed, this);
-void
-TAO_NS_Proxy::type_removed (const TAO_NS_EventType& removed)
-{
- ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
- this->removed_.insert (removed);
- this->added_.remove (removed);
+ this->worker_task ()->exec (request);
}
CORBA::Boolean
@@ -67,7 +59,7 @@ TAO_NS_Proxy::check_filters (TAO_NS_Event_var &event ACE_ENV_ARG_DECL)
}
CosNotification::EventTypeSeq*
-TAO_NS_Proxy::obtain_types (CosNotifyChannelAdmin::ObtainInfoMode mode ACE_ENV_ARG_DECL)
+TAO_NS_Proxy::obtain_types (CosNotifyChannelAdmin::ObtainInfoMode mode, const TAO_NS_EventTypeSeq& types ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((
CORBA::SystemException
))
@@ -84,7 +76,7 @@ TAO_NS_Proxy::obtain_types (CosNotifyChannelAdmin::ObtainInfoMode mode ACE_ENV_A
if (mode == CosNotifyChannelAdmin::ALL_NOW_UPDATES_OFF ||
mode == CosNotifyChannelAdmin::ALL_NOW_UPDATES_ON)
{
- this->added_.populate (event_type_seq);
+ types.populate (event_type_seq);
}
if (mode == CosNotifyChannelAdmin::NONE_NOW_UPDATES_ON ||
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Proxy.h b/TAO/orbsvcs/orbsvcs/Notify/Proxy.h
index 0bb33c1a91d..5f49f2321e5 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Proxy.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Proxy.h
@@ -49,11 +49,8 @@ public:
/// Check if this event passes the admin and proxy filters.
CORBA::Boolean check_filters (TAO_NS_Event_var &event ACE_ENV_ARG_DECL);
- /// Subscription type added
- void type_added (const TAO_NS_EventType& added);
-
- /// Subscription type removed
- void type_removed (const TAO_NS_EventType& removed);
+ /// Inform this proxy that the following types are being advertised.
+ void types_changed (const TAO_NS_EventTypeSeq& added, const TAO_NS_EventTypeSeq& removed ACE_ENV_ARG_DECL);
/// Have updates been turned off.
CORBA::Boolean updates_off (void);
@@ -64,16 +61,16 @@ public:
/// Access our Peer.
virtual TAO_NS_Peer* peer (void) = 0;
- /// Obtain Types.
- virtual CosNotification::EventTypeSeq* obtain_types (CosNotifyChannelAdmin::ObtainInfoMode mode ACE_ENV_ARG_DECL)
+ /// Implement the Obtain Types.
+ virtual CosNotification::EventTypeSeq* obtain_types (CosNotifyChannelAdmin::ObtainInfoMode mode, const TAO_NS_EventTypeSeq& types ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((
CORBA::SystemException
));
/// Notification of subscriptions/offers set at the admin.
- virtual void admin_subscription (const CosNotification::EventTypeSeq & added,
- const CosNotification::EventTypeSeq & removed
- ACE_ENV_ARG_DECL) = 0;
+ virtual void admin_types_changed (const CosNotification::EventTypeSeq & added,
+ const CosNotification::EventTypeSeq & removed
+ ACE_ENV_ARG_DECL) = 0;
/// Override, TAO_NS_Object::qos_changed
@@ -85,12 +82,6 @@ protected:
/// Filter Administration
TAO_NS_FilterAdmin filter_admin_;
- /// Types added.
- TAO_NS_EventTypeSeq added_;
-
- /// Types removed.
- TAO_NS_EventTypeSeq removed_;
-
/// The types that we're subscribed with the event manager.
TAO_NS_EventTypeSeq subscribed_types_;
diff --git a/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer.cpp
index da2911b74c2..aec4fa2a758 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer.cpp
@@ -64,15 +64,18 @@ TAO_NS_ProxyConsumer::connect (TAO_NS_Supplier *supplier ACE_ENV_ARG_DECL)
{
supplier_ = supplier;
- supplier->updates_dispatch_observer (this->event_manager_->updates_dispatch_observer ());
-
this->parent_->subscribed_types (this->subscribed_types_ ACE_ENV_ARG_PARAMETER); // get the parents subscribed types.
ACE_CHECK;
// Inform QoS values.
supplier_->qos_changed (this->qos_properties_);
- event_manager_->publish (this, this->subscribed_types_ ACE_ENV_ARG_PARAMETER);
+ TAO_NS_EventTypeSeq removed;
+
+ this->event_manager_->offer_change (this, this->subscribed_types_, removed ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ this->event_manager_->connect (this ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
// Increment the global supplier count
@@ -83,7 +86,12 @@ TAO_NS_ProxyConsumer::connect (TAO_NS_Supplier *supplier ACE_ENV_ARG_DECL)
void
TAO_NS_ProxyConsumer::disconnect (ACE_ENV_SINGLE_ARG_DECL)
{
- event_manager_->un_publish (this, this->subscribed_types_ ACE_ENV_ARG_PARAMETER);
+ TAO_NS_EventTypeSeq added;
+
+ event_manager_->offer_change (this, added, this->subscribed_types_ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ this->event_manager_->disconnect (this ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
// Decrement the global supplier count
diff --git a/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer.h b/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer.h
index 781a03b6a0e..30852afa1f4 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer.h
@@ -53,6 +53,7 @@ public:
/// Shutdown (TAO_NS_Container_T method)
virtual void shutdown (ACE_ENV_SINGLE_ARG_DECL);
+ /// Start event propagation.
virtual void push (TAO_NS_Event_var &event);
/// Access our Peer.
@@ -61,10 +62,10 @@ public:
/// Access the Supplier
TAO_NS_Supplier* supplier (void);
-protected:
/// Return 1 if connected
int is_connected (void);
+protected:
/// The Supplier that we're connect to.
TAO_NS_Supplier* supplier_;
};
diff --git a/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.cpp b/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.cpp
index 23bee77b8fc..90a166ec142 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.cpp
@@ -24,9 +24,9 @@ TAO_NS_ProxyConsumer_T<SERVANT_TYPE>::~TAO_NS_ProxyConsumer_T ()
}
template <class SERVANT_TYPE> void
-TAO_NS_ProxyConsumer_T<SERVANT_TYPE>::admin_subscription (const CosNotification::EventTypeSeq & added,
- const CosNotification::EventTypeSeq & removed
- ACE_ENV_ARG_DECL)
+TAO_NS_ProxyConsumer_T<SERVANT_TYPE>::admin_types_changed (const CosNotification::EventTypeSeq & added,
+ const CosNotification::EventTypeSeq & removed
+ ACE_ENV_ARG_DECL)
{
this->offer_change (added, removed ACE_ENV_ARG_PARAMETER);
}
@@ -57,17 +57,15 @@ TAO_NS_ProxyConsumer_T<SERVANT_TYPE>::offer_change (const CosNotification::Event
TAO_NS_EventTypeSeq seq_added (added);
TAO_NS_EventTypeSeq seq_removed (removed);
- ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, ace_mon, this->lock_,
- CORBA::INTERNAL ());
- ACE_CHECK;
+ {
+ ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, ace_mon, this->lock_,
+ CORBA::INTERNAL ());
+ ACE_CHECK;
- this->subscribed_types_.init (seq_added, seq_removed);
+ this->subscribed_types_.init (seq_added, seq_removed);
+ }
- if (this->is_connected () == 1)
- {
- event_manager_->publish (this, seq_added ACE_ENV_ARG_PARAMETER);
- event_manager_->un_publish (this, seq_removed ACE_ENV_ARG_PARAMETER);
- }
+ this->event_manager_->offer_change (this, seq_added, seq_removed ACE_ENV_ARG_PARAMETER);
}
template <class SERVANT_TYPE> CosNotification::EventTypeSeq*
@@ -76,7 +74,7 @@ TAO_NS_ProxyConsumer_T<SERVANT_TYPE>::obtain_subscription_types (CosNotifyChanne
CORBA::SystemException
))
{
- return this->obtain_types (mode ACE_ENV_ARG_PARAMETER);
+ return this->obtain_types (mode, this->event_manager_->subscription_types () ACE_ENV_ARG_PARAMETER);
}
#endif /* TAO_NS_PROXYCONSUMER_T_CPP */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.h b/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.h
index 475559088ab..6ccc7dcec1d 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.h
@@ -39,9 +39,9 @@ public:
~TAO_NS_ProxyConsumer_T ();
/// Notification of subscriptions set at the admin.
- virtual void admin_subscription (const CosNotification::EventTypeSeq & added,
- const CosNotification::EventTypeSeq & removed
- ACE_ENV_ARG_DECL);
+ virtual void admin_types_changed (const CosNotification::EventTypeSeq & added,
+ const CosNotification::EventTypeSeq & removed
+ ACE_ENV_ARG_DECL);
virtual CosNotifyChannelAdmin::SupplierAdmin_ptr MyAdmin (ACE_ENV_SINGLE_ARG_DECL)
ACE_THROW_SPEC ((
diff --git a/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier.cpp b/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier.cpp
index dc9270191de..8b358a98682 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier.cpp
@@ -47,7 +47,7 @@ TAO_NS_ProxySupplier::connect (TAO_NS_Consumer *consumer ACE_ENV_ARG_DECL)
, CosEventChannelAdmin::AlreadyConnected
))
{
- const TAO_NS_Atomic_Property_Long& consumer_count = this->admin_properties_->consumers ();
+ TAO_NS_Atomic_Property_Long& consumer_count = this->admin_properties_->consumers ();
const TAO_NS_Property_Long& max_consumers = this->admin_properties_->max_consumers ();
if (max_consumers != 0 &&
@@ -67,24 +67,37 @@ TAO_NS_ProxySupplier::connect (TAO_NS_Consumer *consumer ACE_ENV_ARG_DECL)
{
consumer_ = consumer;
- consumer->event_dispatch_observer (this->event_manager_->event_dispatch_observer ());
- consumer->updates_dispatch_observer (this->event_manager_->updates_dispatch_observer ());
-
// Inform QoS values.
consumer_->qos_changed (this->qos_properties_);
this->parent_->subscribed_types (this->subscribed_types_ ACE_ENV_ARG_PARAMETER); // get the parents subscribed types.
ACE_CHECK;
- event_manager_->subscribe (this, this->subscribed_types_ ACE_ENV_ARG_PARAMETER);
+ TAO_NS_EventTypeSeq removed;
+
+ this->event_manager_->subscription_change (this, this->subscribed_types_, removed ACE_ENV_ARG_PARAMETER);
+
+ this->event_manager_->connect (this ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ // Increment the global consumer count
+ ++consumer_count;
}
}
void
TAO_NS_ProxySupplier::disconnect (ACE_ENV_SINGLE_ARG_DECL)
{
- event_manager_->un_subscribe (this, this->subscribed_types_ ACE_ENV_ARG_PARAMETER);
+ TAO_NS_EventTypeSeq added;
+
+ this->event_manager_->subscription_change (this, added, this->subscribed_types_ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ this->event_manager_->disconnect (this ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
+
+ // Decrement the global consumer count
+ this->admin_properties_->consumers ()--;
}
void
diff --git a/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.cpp b/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.cpp
index 38196db6784..7db52521e8b 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.cpp
@@ -32,9 +32,9 @@ TAO_NS_ProxySupplier_T<SERVANT_TYPE>::~TAO_NS_ProxySupplier_T ()
}
template <class SERVANT_TYPE> void
-TAO_NS_ProxySupplier_T<SERVANT_TYPE>::admin_subscription (const CosNotification::EventTypeSeq & added,
- const CosNotification::EventTypeSeq & removed
- ACE_ENV_ARG_DECL)
+TAO_NS_ProxySupplier_T<SERVANT_TYPE>::admin_types_changed (const CosNotification::EventTypeSeq & added,
+ const CosNotification::EventTypeSeq & removed
+ ACE_ENV_ARG_DECL)
{
this->subscription_change (added, removed ACE_ENV_ARG_PARAMETER);
}
@@ -97,7 +97,7 @@ TAO_NS_ProxySupplier_T<SERVANT_TYPE>::obtain_offered_types (CosNotifyChannelAdmi
CORBA::SystemException
))
{
- return this->obtain_types (mode ACE_ENV_ARG_PARAMETER);
+ return this->obtain_types (mode, this->event_manager_->offered_types () ACE_ENV_ARG_PARAMETER);
}
template <class SERVANT_TYPE> void
@@ -110,17 +110,15 @@ TAO_NS_ProxySupplier_T<SERVANT_TYPE>::subscription_change (const CosNotification
TAO_NS_EventTypeSeq seq_added (added);
TAO_NS_EventTypeSeq seq_removed (removed);
- ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, ace_mon, this->lock_,
- CORBA::INTERNAL ());
- ACE_CHECK;
+ {
+ ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, ace_mon, this->lock_,
+ CORBA::INTERNAL ());
+ ACE_CHECK;
- this->subscribed_types_.init (seq_added, seq_removed);
+ this->subscribed_types_.init (seq_added, seq_removed);
+ }
- if (this->is_connected () == 1)
- {
- event_manager_->subscribe (this, seq_added ACE_ENV_ARG_PARAMETER);
- event_manager_->un_subscribe (this, seq_removed ACE_ENV_ARG_PARAMETER);
- }
+ this->event_manager_->subscription_change (this, seq_added, seq_removed ACE_ENV_ARG_PARAMETER);
}
template <class SERVANT_TYPE> void
@@ -131,13 +129,15 @@ TAO_NS_ProxySupplier_T<SERVANT_TYPE>::suspend_connection (ACE_ENV_SINGLE_ARG_DEC
CosNotifyChannelAdmin::NotConnected
))
{
- ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, ace_mon, this->lock_, CORBA::INTERNAL ());
+ {
+ ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, ace_mon, this->lock_, CORBA::INTERNAL ());
- if (this->is_connected () == 0)
- ACE_THROW (CosNotifyChannelAdmin::NotConnected ());
+ if (this->is_connected () == 0)
+ ACE_THROW (CosNotifyChannelAdmin::NotConnected ());
- if (this->consumer_->is_suspended () == 1)
- ACE_THROW (CosNotifyChannelAdmin::ConnectionAlreadyInactive ());
+ if (this->consumer_->is_suspended () == 1)
+ ACE_THROW (CosNotifyChannelAdmin::ConnectionAlreadyInactive ());
+ }
this->consumer_->suspend (ACE_ENV_SINGLE_ARG_PARAMETER);
}
@@ -160,7 +160,7 @@ TAO_NS_ProxySupplier_T<SERVANT_TYPE>::resume_connection (ACE_ENV_SINGLE_ARG_DECL
ACE_THROW (CosNotifyChannelAdmin::ConnectionAlreadyActive ());
}
- this->consumer_->suspend (ACE_ENV_SINGLE_ARG_PARAMETER);
+ this->consumer_->resume (ACE_ENV_SINGLE_ARG_PARAMETER);
}
template <class SERVANT_TYPE> CosNotifyChannelAdmin::ConsumerAdmin_ptr
@@ -179,6 +179,8 @@ TAO_NS_ProxySupplier_T<SERVANT_TYPE>::MyAdmin (ACE_ENV_SINGLE_ARG_DECL)
return ret._retn ();
}
+/***************************** UNIMPLEMENTED METHODS***************************************/
+
template <class SERVANT_TYPE> CosNotifyFilter::MappingFilter_ptr
TAO_NS_ProxySupplier_T<SERVANT_TYPE>::priority_filter (ACE_ENV_SINGLE_ARG_DECL)
ACE_THROW_SPEC ((
diff --git a/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.h b/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.h
index 1364569e244..c0e8d2054fb 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.h
@@ -39,9 +39,9 @@ public:
~TAO_NS_ProxySupplier_T ();
/// Notification of subscriptions set at the admin.
- virtual void admin_subscription (const CosNotification::EventTypeSeq & added,
- const CosNotification::EventTypeSeq & removed
- ACE_ENV_ARG_DECL);
+ virtual void admin_types_changed (const CosNotification::EventTypeSeq & added,
+ const CosNotification::EventTypeSeq & removed
+ ACE_ENV_ARG_DECL);
///= POA_Notify_Internal methods
/// POA_Notify_Internal::Event_Forwarder method
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Refcountable.cpp b/TAO/orbsvcs/orbsvcs/Notify/Refcountable.cpp
index 46178ac6b46..c8d1073a3f5 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Refcountable.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Refcountable.cpp
@@ -25,7 +25,7 @@ TAO_NS_Refcountable::_incr_refcnt (void)
{
ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, 0);
- if (TAO_debug_level > 0 )
+ if (TAO_debug_level > 1 )
ACE_DEBUG ((LM_DEBUG,"object:%x incr refcount = %d\n", this, refcount_+1 ));
@@ -38,7 +38,7 @@ TAO_NS_Refcountable::_decr_refcnt (void)
{
ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, 0);
- if (TAO_debug_level > 0 )
+ if (TAO_debug_level > 1 )
ACE_DEBUG ((LM_DEBUG,"object:%x decr refcount = %d\n", this, refcount_-1 ));
this->refcount_--;
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp
index 161905dbfa9..363ebde793b 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp
@@ -15,7 +15,6 @@ ACE_RCSID(Notify, TAO_NS_SequencePushConsumer, "$id$")
#include "../ProxySupplier.h"
#include "../Worker_Task.h"
#include "../Consumer.h"
-#include "../Dispatch_Observer_T.h"
#include "Method_Request_Dispatch_EventBatch.h"
#include "EventBatch.h"
@@ -186,34 +185,19 @@ TAO_NS_SequencePushConsumer::push (const TAO_NS_Event_Collection event_collectio
this->push_consumer_->push_structured_events (event_batch ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
-
- if (this->event_dispatch_observer_ != 0)
- {
- this->event_dispatch_observer_->dispatch_success (this ACE_ENV_ARG_PARAMETER);
-
- ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ());
- this->retry_count_ = 0;
- }
+ }
+ ACE_CATCH (CORBA::OBJECT_NOT_EXIST, not_exist)
+ {
+ this->handle_dispatch_exception (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCH (CORBA::SystemException, sysex)
+ {
+ this->handle_dispatch_exception (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
}
ACE_CATCHANY
{
- if (TAO_debug_level > 0)
- {
- ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_NS_SequenceConsumer::push: error sending event. informing dispatch observer\n ");
- }
- //ACE_RE_THROW;
-
- if (this->event_dispatch_observer_ != 0)
- {
- {
- ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ());
-
- ++this->retry_count_;
- this->event_batch_.insert (event_collection);
- }
-
- this->event_dispatch_observer_->dispatch_failure (this, this->retry_count_ ACE_ENV_ARG_PARAMETER);
- }
}
ACE_ENDTRY;
}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Subscription_Change_Worker.inl b/TAO/orbsvcs/orbsvcs/Notify/Subscription_Change_Worker.inl
index 82ebfbe8c69..cb6180d5d7a 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Subscription_Change_Worker.inl
+++ b/TAO/orbsvcs/orbsvcs/Notify/Subscription_Change_Worker.inl
@@ -5,5 +5,5 @@
ACE_INLINE void
TAO_NS_Subscription_Change_Worker::work (TAO_NS_Proxy* proxy ACE_ENV_ARG_DECL)
{
- proxy->admin_subscription (this->added_, this->removed_ ACE_ENV_ARG_PARAMETER);
+ proxy->admin_types_changed (this->added_, this->removed_ ACE_ENV_ARG_PARAMETER);
}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Supplier.cpp b/TAO/orbsvcs/orbsvcs/Notify/Supplier.cpp
index f1867c3e2fc..28d6883d524 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Supplier.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Supplier.cpp
@@ -31,15 +31,9 @@ TAO_NS_Supplier::proxy (void)
}
void
-TAO_NS_Supplier::dispatch_updates_i (const TAO_NS_EventTypeSeq & added, const TAO_NS_EventTypeSeq & removed
+TAO_NS_Supplier::dispatch_updates_i (const CosNotification::EventTypeSeq& added, const CosNotification::EventTypeSeq& removed
ACE_ENV_ARG_DECL)
{
- CosNotification::EventTypeSeq cos_added;
- CosNotification::EventTypeSeq cos_removed;
-
- added.populate (cos_added);
- removed.populate (cos_removed);
-
if (!CORBA::is_nil (this->subscribe_.in ()))
- this->subscribe_->subscription_change (cos_added, cos_removed ACE_ENV_ARG_PARAMETER);
+ this->subscribe_->subscription_change (added, removed ACE_ENV_ARG_PARAMETER);
}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Supplier.h b/TAO/orbsvcs/orbsvcs/Notify/Supplier.h
index 813560ae893..f24bddae07c 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Supplier.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Supplier.h
@@ -47,8 +47,8 @@ public:
protected:
/// Dispatch updates implementation.
- virtual void dispatch_updates_i (const TAO_NS_EventTypeSeq & added,
- const TAO_NS_EventTypeSeq & removed
+ virtual void dispatch_updates_i (const CosNotification::EventTypeSeq& added,
+ const CosNotification::EventTypeSeq& removed
ACE_ENV_ARG_DECL);
/// The proxy that we associate with.
diff --git a/TAO/orbsvcs/orbsvcs/Notify/ThreadPool_Task.cpp b/TAO/orbsvcs/orbsvcs/Notify/ThreadPool_Task.cpp
index 9b605767204..51f1b14f39e 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/ThreadPool_Task.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/ThreadPool_Task.cpp
@@ -132,6 +132,19 @@ void
TAO_NS_ThreadPool_Task::shutdown (void)
{
this->msg_queue_.enqueue (new TAO_NS_Method_Request_Shutdown (this));
+
+ // We can not wait for ourselves to quit
+ if (this->thr_mgr ())
+ {
+ // call this->thr_mgr ()->task () in the main thread will assert ()
+ // fail in ACE_Thread_Manager::thread_desc_self (void) so I get
+ // task this way.
+ ACE_Thread_Descriptor *mydesc = this->thr_mgr ()->thread_descriptor (ACE_OS::thr_self ());
+
+ if (mydesc && mydesc->task () == this)
+ return;
+ }
+
this->wait ();
return;
}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Types.h b/TAO/orbsvcs/orbsvcs/Notify/Types.h
index 210cdce699c..bc17a2bd0ec 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Types.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Types.h
@@ -29,9 +29,6 @@
template <class PROXY> class TAO_ESF_Proxy_Collection;
template <class TYPE> class TAO_ESF_RefCount_Guard;
template <class PROXY, class ACE_LOCK> class TAO_NS_Event_Map_T;
-template <class PEER> class TAO_NS_Dispatch_Observer_T;
-template <class PROXY> class TAO_NS_Event_Map_Observer_T;
-template <class PEER> class TAO_NS_Pending_Worker_T;
/**
* Forward declare classes
@@ -72,14 +69,5 @@ typedef TAO_ESF_RefCount_Guard<CORBA::ULong> TAO_NS_Object_RefCount_Guard;
typedef TAO_NS_Event_Map_T<TAO_NS_ProxySupplier, TAO_SYNCH_RW_MUTEX> TAO_NS_Consumer_Map;
typedef TAO_NS_Event_Map_T<TAO_NS_ProxyConsumer, TAO_SYNCH_RW_MUTEX> TAO_NS_Supplier_Map;
-typedef TAO_NS_Event_Map_Observer_T<TAO_NS_ProxyConsumer> TAO_NS_Consumer_Map_Observer;
-typedef TAO_NS_Event_Map_Observer_T<TAO_NS_ProxySupplier> TAO_NS_Supplier_Map_Observer;
-
-typedef TAO_NS_Dispatch_Observer_T<TAO_NS_Peer> TAO_NS_Updates_Dispatch_Observer;
-typedef TAO_NS_Dispatch_Observer_T<TAO_NS_Consumer> TAO_NS_Event_Dispatch_Observer;
-
-typedef TAO_NS_Pending_Worker_T<TAO_NS_Peer> TAO_NS_Updates_Pending_Worker;
-typedef TAO_NS_Pending_Worker_T<TAO_NS_Consumer> TAO_NS_Event_Pending_Worker;
-
#include "ace/post.h"
#endif /* TAO_NS_TYPES_H */