summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Notify/Peer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Notify/Peer.cpp')
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Peer.cpp100
1 files changed, 56 insertions, 44 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Peer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Peer.cpp
index 57a8ac2cbaa..388aec1a7ca 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Peer.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Peer.cpp
@@ -9,7 +9,6 @@
ACE_RCSID(Notify, TAO_NS_Peer, "$id$")
#include "tao/debug.h"
-#include "Dispatch_Observer_T.h"
#include "Proxy.h"
#include "Proxy.h"
#include "Admin.h"
@@ -18,7 +17,6 @@ ACE_RCSID(Notify, TAO_NS_Peer, "$id$")
#include "Notify_Service.h"
TAO_NS_Peer::TAO_NS_Peer (void)
- :updates_dispatch_observer_ (0), retry_count_ (1)
{
}
@@ -39,61 +37,75 @@ TAO_NS_Peer::shutdown (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
}
void
-TAO_NS_Peer::dispatch_pending (ACE_ENV_SINGLE_ARG_DECL)
+TAO_NS_Peer::handle_dispatch_exception (ACE_ENV_SINGLE_ARG_DECL)
{
- TAO_NS_Proxy* proxy = this->proxy ();
- TAO_NS_EventTypeSeq& added = proxy->added_;
- TAO_NS_EventTypeSeq& removed = proxy->removed_;
-
- if (added.size () == 0 && removed.size () == 0)
- return; // Return if nothing to send.
-
- TAO_NS_EventTypeSeq added_copy;
- TAO_NS_EventTypeSeq removed_copy;
- TAO_NS_Reverse_Lock unlock (proxy->lock_);
-
- ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, proxy->lock_);
-
- added_copy = added;
- removed_copy = removed;
- added.reset ();
- removed.reset ();
+ // Sever all association when a remote client misbehaves. Other strategies like reties are possible but not implemented.
+ this->proxy ()->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
+}
+void
+TAO_NS_Peer::dispatch_updates (const TAO_NS_EventTypeSeq & added, const TAO_NS_EventTypeSeq & removed ACE_ENV_ARG_DECL)
+{
ACE_TRY
{
- {
- ACE_GUARD (TAO_NS_Reverse_Lock, ace_mon, unlock);
+ CosNotification::EventTypeSeq cos_added;
+ CosNotification::EventTypeSeq cos_removed;
- this->dispatch_updates_i (added_copy, removed_copy ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
+ const TAO_NS_EventTypeSeq& subscribed_types = this->proxy ()->subscribed_types ();
+ const TAO_NS_EventType& special = TAO_NS_EventType::special ();
- if (this->updates_dispatch_observer_ != 0)
- this->updates_dispatch_observer_->dispatch_success (this ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
- }
+ // Don;t inform of types that we already know about.
+ // E.g. if we're subscribed for {A,B,C,F}
+ // and we receive an update with added list {A,B,G}
+ // then, we should only send {G} because peer already knows about {A, B}
+ // However if we're subscribed for everything, send all kinds of adds.
- this->retry_count_ = 0;
- }
- ACE_CATCHANY
- {
- if (TAO_debug_level > 0)
- {
- ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "Peer:dispatch_pending serror sending updates\n ");
- }
- //ACE_RE_THROW;
+ // Don;t inform of removed types that we don;t care about.
+ // e.g. if we're currently subscribed for {A,B,C,F}
+ // and we receive an update with removed list {A, B, D}
+ // then, we should only send {A,B} because the peer is not interested in D.
+ // However if we're subscribed for everything, send all kinds of removes.
- ++this->retry_count_;
+ TAO_NS_EventTypeSeq added_result = added;
+ TAO_NS_EventTypeSeq removed_result;
- if (this->updates_dispatch_observer_ != 0)
+ if (subscribed_types.find (special) != 0)
{
- /// Restore the lists.
- added.insert_seq (added_copy);
- removed.insert_seq (removed_copy);
+ added_result.remove_seq (subscribed_types);
+ removed_result.intersection (subscribed_types, removed);
+ }
+ else
+ {
+ removed_result = removed;
+ }
- ACE_GUARD (TAO_NS_Reverse_Lock, ace_mon, unlock);
+ added_result.populate_no_special (cos_added);
+ removed_result.populate_no_special (cos_removed);
- this->updates_dispatch_observer_->dispatch_failure (this, this->retry_count_ ACE_ENV_ARG_PARAMETER);
+ if (cos_added.length () != 0 || cos_removed.length () != 0)
+ {
+ this->dispatch_updates_i (cos_added, cos_removed ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
}
}
+ ACE_CATCH (CORBA::OBJECT_NOT_EXIST, not_exist)
+ {
+ this->handle_dispatch_exception (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCH (CORBA::NO_IMPLEMENT, no_impl)
+ {
+ // The peer does not implement the offer/subscription_change method
+ // Do nothing. Later, perhaps set a flag that helps us decide if we should dispatch_updates_i.
+ }
+ ACE_CATCH (CORBA::SystemException, sysex)
+ {
+ this->handle_dispatch_exception (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ // Do nothing
+ }
ACE_ENDTRY;
}