// -*- C++ -*- /** * @file EC_ObserverStrategy.h * * $Id$ * * @author Carlos O'Ryan (coryan@cs.wustl.edu) * @author Johnny Willemsen (jwillemsen@remedy.nl) * @author Kees van Marle (kvmarle@remedy.nl) * * Based on previous work by Tim Harrison (harrison@cs.wustl.edu) and * other members of the DOC group. More details can be found in: * * http://doc.ece.uci.edu/~coryan/EC/index.html */ #ifndef TAO_EC_OBSERVERSTRATEGY_H #define TAO_EC_OBSERVERSTRATEGY_H #include /**/ "ace/pre.h" #include "orbsvcs/orbsvcs/ESF/ESF_Worker.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ #include "ace/Null_Mutex.h" #include "ace/RB_Tree.h" #include "ace/Map_Manager.h" #include "orbsvcs/orbsvcs/RtecEventChannelAdminC.h" #include /**/ "event_export.h" class ACE_Lock; class TAO_EC_Event_Channel_Base; class TAO_EC_ProxyPushConsumer; class TAO_EC_ProxyPushSupplier; /** * @class TAO_EC_ObserverStrategy * * @brief The strategy to handle observers for the Event Channel subscriptions * and publication. * * The Event Channel supports Observers for the set of subscriptions and * publications. This is used to implement federations of event channels, * either through UDP (multicast and unicast) and/or regular CORBA calls. * This behavior of the EC is strategized to avoid overhead when no gateways * are needed. */ class TAO_RTEvent_Export TAO_EC_ObserverStrategy { public: /// Destructor virtual ~TAO_EC_ObserverStrategy (void); /// The basic methods to support the EC strategies. virtual RtecEventChannelAdmin::Observer_Handle append_observer (RtecEventChannelAdmin::Observer_ptr ACE_ENV_ARG_DECL) ACE_THROW_SPEC (( CORBA::SystemException, RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR, RtecEventChannelAdmin::EventChannel::CANT_APPEND_OBSERVER)) = 0; virtual void remove_observer ( RtecEventChannelAdmin::Observer_Handle ACE_ENV_ARG_DECL) ACE_THROW_SPEC (( CORBA::SystemException, RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR, RtecEventChannelAdmin::EventChannel::CANT_REMOVE_OBSERVER)) = 0; /// Used by the EC to inform the ObserverStrategy that a Consumer has /// connected or disconnected from it. virtual void connected (TAO_EC_ProxyPushConsumer* ACE_ENV_ARG_DECL_NOT_USED) = 0; virtual void disconnected (TAO_EC_ProxyPushConsumer* ACE_ENV_ARG_DECL_NOT_USED) = 0; /// Used by the EC to inform the ObserverStrategy that a Supplier has /// connected or disconnected from it. virtual void connected (TAO_EC_ProxyPushSupplier* ACE_ENV_ARG_DECL_NOT_USED) = 0; virtual void disconnected (TAO_EC_ProxyPushSupplier* ACE_ENV_ARG_DECL_NOT_USED) = 0; }; // **************************************************************** /** * @class TAO_EC_Null_ObserverStrategy * * @brief A null observer strategy. * * This class keeps no information and simply ignores the messages * from the EC. */ class TAO_RTEvent_Export TAO_EC_Null_ObserverStrategy : public TAO_EC_ObserverStrategy { public: /// Constructor TAO_EC_Null_ObserverStrategy (void); // = The TAO_EC_ObserverStrategy methods. virtual RtecEventChannelAdmin::Observer_Handle append_observer (RtecEventChannelAdmin::Observer_ptr ACE_ENV_ARG_DECL) ACE_THROW_SPEC (( CORBA::SystemException, RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR, RtecEventChannelAdmin::EventChannel::CANT_APPEND_OBSERVER)); virtual void remove_observer ( RtecEventChannelAdmin::Observer_Handle ACE_ENV_ARG_DECL) ACE_THROW_SPEC (( CORBA::SystemException, RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR, RtecEventChannelAdmin::EventChannel::CANT_REMOVE_OBSERVER)); virtual void connected (TAO_EC_ProxyPushConsumer* ACE_ENV_ARG_DECL_NOT_USED); virtual void disconnected (TAO_EC_ProxyPushConsumer* ACE_ENV_ARG_DECL_NOT_USED); virtual void connected (TAO_EC_ProxyPushSupplier* ACE_ENV_ARG_DECL_NOT_USED); virtual void disconnected (TAO_EC_ProxyPushSupplier* ACE_ENV_ARG_DECL_NOT_USED); }; // **************************************************************** /** * @class TAO_EC_Basic_ObserverStrategy * * @brief A simple observer strategy. * * This class simply keeps the information about the current list * of observers, whenever the list of consumers and/or suppliers * changes in queries the EC, computes the global subscription * and/or publication list and sends the update message to all the * observers. * *

Memory Management

* It assumes ownership of the @a lock, but not of the * Event_Channel. */ class TAO_RTEvent_Export TAO_EC_Basic_ObserverStrategy : public TAO_EC_ObserverStrategy { public: /// Constructor TAO_EC_Basic_ObserverStrategy (TAO_EC_Event_Channel_Base* ec, ACE_Lock* lock); /// Destructor virtual ~TAO_EC_Basic_ObserverStrategy (void); // = The TAO_EC_ObserverStrategy methods. virtual RtecEventChannelAdmin::Observer_Handle append_observer (RtecEventChannelAdmin::Observer_ptr ACE_ENV_ARG_DECL) ACE_THROW_SPEC (( CORBA::SystemException, RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR, RtecEventChannelAdmin::EventChannel::CANT_APPEND_OBSERVER)); virtual void remove_observer ( RtecEventChannelAdmin::Observer_Handle ACE_ENV_ARG_DECL) ACE_THROW_SPEC (( CORBA::SystemException, RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR, RtecEventChannelAdmin::EventChannel::CANT_REMOVE_OBSERVER)); virtual void connected (TAO_EC_ProxyPushConsumer* ACE_ENV_ARG_DECL_NOT_USED); virtual void disconnected (TAO_EC_ProxyPushConsumer* ACE_ENV_ARG_DECL_NOT_USED); virtual void connected (TAO_EC_ProxyPushSupplier* ACE_ENV_ARG_DECL_NOT_USED); virtual void disconnected (TAO_EC_ProxyPushSupplier* ACE_ENV_ARG_DECL_NOT_USED); /** * @struct Observer_Entry * * @brief The data kept for each observer. * * The observer and its handle are kept in a simple structure. * In the future this structure could contain QoS information, * such as: * + how often do we update the observer? * + When was the last update. * + Does it want to receive all changes? */ struct Observer_Entry { Observer_Entry (void); Observer_Entry (RtecEventChannelAdmin::Observer_Handle h, RtecEventChannelAdmin::Observer_ptr o); /// The handle RtecEventChannelAdmin::Observer_Handle handle; /// The observer RtecEventChannelAdmin::Observer_var observer; }; struct Header_Compare { int operator () (const RtecEventComm::EventHeader& lhs, const RtecEventComm::EventHeader& rhs) const; }; typedef ACE_Map_Manager Observer_Map; typedef ACE_Map_Iterator Observer_Map_Iterator; typedef ACE_RB_Tree Headers; typedef ACE_RB_Tree_Iterator HeadersIterator; protected: /// Helpers. //@{ /// Recompute EC consumer subscriptions and send them out to all observers. virtual void consumer_qos_update (TAO_EC_ProxyPushSupplier *supplier ACE_ENV_ARG_DECL); /// Recompute EC supplier publications and send them out to all observers. virtual void supplier_qos_update (TAO_EC_ProxyPushConsumer *consumer ACE_ENV_ARG_DECL); /// Compute consumer QOS. void fill_qos (RtecEventChannelAdmin::ConsumerQOS &qos ACE_ENV_ARG_DECL); /// Compute supplier QOS. void fill_qos (RtecEventChannelAdmin::SupplierQOS &qos ACE_ENV_ARG_DECL); /// Copies all current observers into an array and passes it /// back to the caller through @a lst. Returns the size of the array. int create_observer_list (RtecEventChannelAdmin::Observer_var *&lst ACE_ENV_ARG_DECL); //@} protected: /// The event channel. TAO_EC_Event_Channel_Base* event_channel_; /// The lock ACE_Lock* lock_; /// The handles are generated in sequential order, but are opaque to /// the client. RtecEventChannelAdmin::Observer_Handle handle_generator_; /// Keep the set of Observers Observer_Map observers_; }; // **************************************************************** /** * @class TAO_EC_Reactive_ObserverStrategy * * @brief A reactive observer strategy. * * This class simply keeps the information about the current list * of observers, whenever the list of consumers and/or suppliers * changes in queries the EC, computes the global subscription * and/or publication list and sends the update message to all the * observers. When an observer isn't reachable it is removed from * the observer list. * *

Memory Management

* It assumes ownership of the , but not of the * Event_Channel. */ class TAO_RTEvent_Export TAO_EC_Reactive_ObserverStrategy : public TAO_EC_Basic_ObserverStrategy { public: /// Constructor TAO_EC_Reactive_ObserverStrategy (TAO_EC_Event_Channel_Base* ec, ACE_Lock* lock); /// Destructor virtual ~TAO_EC_Reactive_ObserverStrategy (void); protected: /// Helpers. //@{ /// Recompute EC consumer subscriptions and send them out to all observers. virtual void consumer_qos_update (TAO_EC_ProxyPushSupplier *supplier ACE_ENV_ARG_DECL); /// Recompute EC supplier publications and send them out to all observers. virtual void supplier_qos_update (TAO_EC_ProxyPushConsumer *consumer ACE_ENV_ARG_DECL); /** * Copies all current observers into a map and passes it * back to the caller through @a map. * @return Returns the size of the map. */ int create_observer_map (Observer_Map &map ACE_ENV_ARG_DECL); /// The observer doesn't exist anymore void observer_not_exists (Observer_Entry& observer ACE_ENV_ARG_DECL); //@} }; // **************************************************************** class TAO_EC_Accumulate_Supplier_Headers : public TAO_ESF_Worker { public: /// Constructor TAO_EC_Accumulate_Supplier_Headers (TAO_EC_Basic_ObserverStrategy::Headers &headers); virtual void work (TAO_EC_ProxyPushSupplier *supplier ACE_ENV_ARG_DECL); private: TAO_EC_Basic_ObserverStrategy::Headers &headers_; }; // **************************************************************** class TAO_EC_Accumulate_Consumer_Headers : public TAO_ESF_Worker { public: /// Constructor TAO_EC_Accumulate_Consumer_Headers (TAO_EC_Basic_ObserverStrategy::Headers &headers); virtual void work (TAO_EC_ProxyPushConsumer *consumer ACE_ENV_ARG_DECL); private: TAO_EC_Basic_ObserverStrategy::Headers &headers_; }; #if defined (__ACE_INLINE__) #include "EC_ObserverStrategy.i" #endif /* __ACE_INLINE__ */ #include /**/ "ace/post.h" #endif /* TAO_EC_OBSERVERSTRATEGY_H */